HealthKit + Celery on ECS, Part 2: The Sync Pipeline
Part 2 of the HealthKit + Celery series. Part 1 covered why we needed background processing and why SQS over Redis for the broker. This post covers the actual sync pipeline.
The Data Flow
HealthKit data lives exclusively on the device. Every sample — heart rate, steps, sleep, cycle data — has to travel from the user’s iPhone through your API into your database.
iPhone (HealthKit) → Flutter (health package) → GraphQL mutation →
Django API → SQS queue → Celery worker → PostgreSQL
The Flutter app uses the health package to bridge Dart to native HealthKit APIs. On Android, the same package bridges to Health Connect, so the backend is platform-agnostic.
The Batch-Chunk Model
Rather than processing individual samples, the pipeline operates on batches and chunks. A batch represents a single sync operation (e.g., “30-day backfill of heart rate data”). A batch is split into chunks of ~100 samples each.
This is tracked in PostgreSQL, not in Celery’s result backend:
AppleHealthImportBatch — represents one sync operation. Tracks overall status (PENDING, PROCESSING, COMPLETED, FAILED), total chunk count, and failed chunk count.
AppleHealthImportChunk — one slice of a batch. Each chunk has a chunk_index, its own status, and the raw sample data. Chunks are processed independently and can be retried individually.
AppleHealthDataRecord — individual health samples with fingerprinting for deduplication.
AppleHealthRecordFingerprint — hash-based dedup to prevent re-importing identical samples.
We chose CELERY_RESULT_BACKEND = None deliberately. Celery’s result store gives you task success/failure per task ID. We needed richer state: which chunks succeeded, which failed, what’s the overall batch progress, can we resume from where we left off. PostgreSQL models give us that.
The Dispatcher Pattern
The task architecture uses two Celery tasks that chain through SQS:
@shared_task(bind=True, acks_late=True, reject_on_worker_lost=True,
autoretry_for=(Exception,), retry_backoff=True, max_retries=5)
def process_apple_health_import_batch_task(self, import_batch_id):
"""Dispatcher: enqueue the next pending chunk for this batch."""
AppleHealthImportService().enqueue_next_pending_chunk(import_batch_id)
@shared_task(bind=True, acks_late=True, reject_on_worker_lost=True)
def process_apple_health_import_chunk_task(self, import_batch_id, chunk_index):
"""Process a single chunk with exponential retry backoff."""
# ... process the chunk ...
# Then chain: enqueue the next pending chunk
if not service.enqueue_next_pending_chunk(import_batch_id):
service.finalize_import_batch(import_batch_id)
The pattern: the batch task dispatches the first chunk. When a chunk finishes processing, it calls enqueue_next_pending_chunk to dispatch the next one. When no pending chunks remain, it finalizes the batch.
This is a sequential chain through SQS. Each chunk becomes its own SQS message. If a chunk fails, the retry backoff schedule is [30, 90, 270] seconds — exponential with three attempts before marking the chunk as permanently failed.
The finally block in the chunk task ensures enqueue_next_pending_chunk is called even on failure, so a single chunk failure doesn’t stall the entire batch.
Why Sequential, Not Parallel
You might wonder why chunks are processed one at a time instead of fanning out all chunks to SQS simultaneously.
Database connection pressure. Each chunk runs update_or_create in a loop. With a 30-day backfill producing 50 chunks, fanning out all 50 simultaneously would mean 50 concurrent database-heavy tasks. On a single Celery worker with CELERY_WORKER_CONCURRENCY = 2, this would queue up internally anyway. On multiple workers, it could overwhelm the database.
SQS visibility timeout management. With CELERY_WORKER_PREFETCH_MULTIPLIER = 1, each worker only grabs one message at a time. But if all 50 chunks were in the queue, the prefetch behavior with SQS’s visibility timeout becomes unpredictable.
Easier failure recovery. Sequential processing means if chunk 15 fails, chunks 1–14 are committed and chunk 15 can be retried without re-processing anything.
The Import Service
The actual processing lives in AppleHealthImportService — a 2,300-line module that handles:
Type mapping. HealthKit has its own type system (HKQuantityTypeIdentifierHeartRate, HKCategoryTypeIdentifierSleepAnalysis, etc.). The service maps these to internal health measurement types via AppleHealthDataTypeMapping records in the database. This means new HealthKit types can be supported by adding a database row, not a code change.
Fingerprint deduplication. Each incoming sample gets hashed based on its content (type, timestamp, value, source). The hash is checked against AppleHealthRecordFingerprint. If it exists, the sample is skipped. This prevents duplicate imports when the same sync fires twice — which happens more often than you’d think with iOS background execution.
Daily aggregation. After samples are imported, the service triggers daily aggregate recalculation via health_daily_aggregate_service. This updates summary statistics (daily step totals, average heart rate, sleep duration) that the Flutter app’s dashboard reads.
Source tracking. Each sample records its data source (DataSource), enabling source-precedence queries later — Apple Watch data takes priority over iPhone sensor data, which takes priority over third-party apps.
The Privacy Decision
Early in the integration, the team raised concerns about bidirectional sync — specifically, writing app data back to Apple Health where it becomes visible to other apps.
The decision was read-only HealthKit access. The app consumes from the HealthKit hub but doesn’t write back to it. This simplified the architecture — no write mutations, no conflict resolution with other apps editing the same samples — and eliminated the data-sharing concern entirely.
SQS-Specific Task Configuration
The task decorators are tuned for SQS:
@shared_task(
bind=True,
acks_late=True, # Don't delete SQS message until task succeeds
reject_on_worker_lost=True, # Re-queue if worker dies (OOM, ECS kill)
)
acks_late=True — the SQS message isn’t deleted until the task completes successfully. If the worker crashes mid-processing, the message reappears in the queue after the visibility timeout (600 seconds). Combined with fingerprint deduplication, re-processing is safe.
reject_on_worker_lost=True — if the Celery worker process is killed (ECS task replacement, OOM kill), the message is explicitly rejected back to SQS rather than silently lost.
The retry schedule uses explicit countdowns rather than Celery’s built-in retry_backoff:
retry_schedule = [30, 90, 270] # seconds
if self.request.retries < len(retry_schedule):
raise self.retry(exc=exc, countdown=retry_schedule[self.request.retries])
After three retries, the chunk is marked as permanently failed in the database. The batch continues processing remaining chunks — a single chunk failure doesn’t kill the entire import.
Cleanup: The Beat Schedule
Processed import records accumulate. A periodic cleanup task runs via Celery Beat (when enabled):
CELERY_BEAT_SCHEDULE = {}
if APPLE_HEALTH_CLEANUP_ENABLED:
CELERY_BEAT_SCHEDULE['apple-health-ledger-cleanup'] = {
'task': 'api.tasks.cleanup_apple_health_records_task',
'schedule': APPLE_HEALTH_CLEANUP_SCHEDULE_SECONDS, # default: 86400 (daily)
}
The cleanup task deletes processed AppleHealthDataRecord rows older than APPLE_HEALTH_RECORD_RETENTION_DAYS (default: 90 days), in batches of 5,000 to avoid long-running transactions.
Next: ECS Deployment
In Part 3, I’ll cover deploying the worker to ECS Fargate — the sidecar container pattern, the env var crash on first deploy, SQS queue provisioning, and health checks for headless workers.