Async Batch Processing for Site Packets
Site packets bundle the IRB approvals, delegation logs, financial disclosures, and investigator certifications that gate clinical trial activation. Processing them one at a time stalls first-patient-in milestones. This guide shows how to build an asyncio batch pipeline with bounded concurrency, backpressure, idempotent retries, and a 21 CFR Part 11 audit trail.
A site packet is the regulatory bundle a single investigative site submits during activation: the IRB/IEC approval letter, the FDA Form 1572, financial disclosure forms, the delegation-of-authority log, signed protocol acknowledgments, and CV/licensure evidence. A multi-site Phase III program can produce hundreds of these packets in the days surrounding a coordinated activation wave. Synchronous, file-by-file processing turns that wave into a serial queue: a single slow OCR pass or a throttled portal call blocks every packet behind it, RAM balloons when whole PDFs are loaded eagerly, and failures surface as opaque stalls rather than actionable error states.
Async batch processing decouples ingestion from validation so that hundreds of packets progress concurrently while a small, fixed pool of workers does the actual CPU- and I/O-bound work. This page maps the architecture; for the deep, step-by-step build — including partition-aware routing and regional queue isolation — see the long-tail guide Handling async batch processing for multi-site document ingestion. This cluster sits under the pillar Automated Document Ingestion & Validation Workflows, which surveys the full ingest-to-archive lifecycle.
Why asyncio, and where it does not help
asyncio excels at I/O-bound concurrency: thousands of in-flight network calls (object storage, EDC/CTMS APIs, IRB portals) multiplexed on a single thread without the overhead of one OS thread per request. It does not make CPU-bound work faster, and — critically — a blocking call inside a coroutine freezes the entire event loop, silently serializing everything you thought was concurrent.
Site-packet work is a mix of both:
| Operation | Nature | Correct asyncio strategy |
|---|---|---|
| Download/upload to object store | I/O-bound | Native async client (aioboto3, aiohttp) |
| Call IRB portal / EDC API | I/O-bound | Native async client with timeouts |
| SHA-256 hashing of a large file | CPU-bound | run_in_executor (thread or process pool) |
OCR / PDF text extraction (pypdf, pytesseract) |
CPU-bound, blocking C libs | run_in_executor |
| Pydantic schema validation | Fast CPU | Inline is fine for small payloads |
The governing rule: never call a blocking function directly inside a coroutine. Hashing, OCR, OpenCV, and synchronous SDKs must be dispatched with loop.run_in_executor(...) (or asyncio.to_thread(...)), or they will block the loop and erase your concurrency.
Pipeline architecture
flowchart TD
A[Site packet sources: SFTP, sponsor portal, eTMF, REST upload] --> B[Ingestion: hash, assign correlation id, stream to object store]
B --> C[Bounded queue with backpressure]
C --> D[Worker pool, concurrency capped by semaphore]
D --> E{Structural validation}
E -->|fail| F[Fatal schema mismatch to dead-letter queue]
E -->|pass| G{Semantic validation}
G -->|missing signature or expired IRB| H[Compliance block to regulatory affairs]
G -->|transient downstream error| I[Recoverable timeout, backoff retry]
G -->|minor gap| J[Warning deviation, logged]
G -->|all pass| K[Accepted, persist state]
F --> L[Append-only audit log, HMAC chained]
H --> L
J --> L
K --> L
Stage 1: Ingestion and queue orchestration
Packets arrive through heterogeneous channels — secure SFTP drops, sponsor portals, CRO-managed eTMF integrations, and REST uploads. The ingestion layer normalizes each input into a discrete, traceable work unit. Every file is streamed (never loaded whole) to encrypted object storage via multipart upload, hashed with SHA-256, and tagged with a correlation ID, site number, protocol version, and UTC timestamp.
The queue is the backpressure boundary. Use a bounded asyncio.Queue(maxsize=...): when consumers fall behind, await queue.put(...) suspends the producer instead of accumulating an unbounded backlog that exhausts memory. A durable broker (RabbitMQ, Redis Streams, or SQS) provides cross-process durability and at-least-once delivery; the in-process asyncio.Queue provides the fine-grained flow control between the ingest coroutine and the worker pool. Priority tiers (imminent site-initiation packets ahead of routine financial agreements) are expressed as broker routing keys or a priority queue.
Stage 2: Parsing and schema validation
Once dequeued, a packet enters parsing and validation. Text and field extraction lean on PDF/DOCX Parsing for Clinical Docs and, for scanned material, OCR & Metadata Extraction Pipelines. Because pypdf and pytesseract are synchronous and CPU-bound, they run in an executor, never inline.
Validation operates on two tiers. Structural validation confirms file integrity, required page counts, signature presence, date formatting, and that the SHA-256 hash matches the value captured at ingestion. Semantic validation cross-references extracted values against protocol-version matrices and site-specific delegation logs. Errors are mapped to a deterministic taxonomy that drives routing — a discipline covered in depth by Schema Validation & Error Categorization:
FATAL_SCHEMA_MISMATCH— unparseable structure, corrupted headers, missing required sections. Routes immediately to the dead-letter queue; no retry.COMPLIANCE_BLOCK— missing mandatory signatures, expired IRB approval, unapproved protocol deviation. Halts processing and escalates to regulatory affairs; no retry.RECOVERABLE_TIMEOUT— transient downstream API failure or executor timeout. Eligible for bounded, jittered retry.WARNING_DEVIATION— minor formatting or non-critical metadata gaps. Logged for review; does not block activation.
The distinction matters operationally: retrying a COMPLIANCE_BLOCK wastes cycles and pollutes the audit trail, while failing to retry a RECOVERABLE_TIMEOUT drops a recoverable packet to the DLQ.
Stage 3: Concurrency control, idempotency, and retries
Bounded concurrency. Wrap the work body in an asyncio.Semaphore sized to the most constrained downstream resource — typically the IRB portal or EDC API rate limit, not your CPU count. The queue limits how much work is buffered; the semaphore limits how much runs at once.
Idempotency. Broker redelivery and network partitions cause duplicate dequeues. Derive an idempotency key from sha256(file) + site_id + protocol_version and record completed keys in a durable store. A worker checks the key before doing work and writes it within the same logical transaction as the result, so a redelivered packet is recognized and skipped rather than reprocessed — essential for ALCOA+ data integrity, where duplicate records corrupt the audit history.
Retries. Only RECOVERABLE_TIMEOUT errors retry, with jittered exponential backoff and a fixed budget (three attempts is a reasonable default). Backoff delays for attempt are drawn uniformly to spread thundering-herd load:
A circuit breaker around each downstream endpoint trips after a failure threshold so a dead portal sheds load instead of absorbing the entire retry budget. Exhausted retries route to the DLQ with full context preserved.
Stage 4: Audit logging and regulatory boundaries
Every ingestion, parse attempt, validation decision, and state transition is serialized as a structured JSON event with a trace ID. To satisfy 21 CFR Part 11 audit-trail requirements, log records are hash-chained: each record embeds an HMAC-SHA256 over its own payload plus the previous record’s MAC, so any retroactive edit or deletion breaks the chain and is detectable. Records are archived to write-once (WORM) storage under a defined retention policy.
Boundaries enforced at the pipeline edge:
- Data residency and jurisdictional routing — PHI/PII is tokenized or redacted before any cross-border hop; regional queues pin packets to in-jurisdiction validation nodes.
- Electronic signature validation — digital signatures are checked against approved certificate authorities; unsigned documents raise
COMPLIANCE_BLOCK. - Role-based access control — raw packet contents are restricted to authorized roles (Clinical Ops, Regulatory Affairs, QA) under time-bound credentials; audit reads are themselves logged.
- Gap detection — missing documents and version mismatches route to Checklist Sync & Gap Analysis for reconciliation.
Reference implementation
The pattern below is correct, runnable asyncio: a bounded queue for backpressure, a semaphore for concurrency, CPU-bound hashing and parsing pushed to an executor, structured per-packet logging, and an explicit error taxonomy that decides retry versus DLQ. It uses a TaskGroup (Python 3.11+) so a fatal error cancels siblings cleanly, and sentinel-based shutdown so workers drain the queue and exit.
"""Async site-packet batch processor with bounded concurrency and audit logging."""
from __future__ import annotations
import asyncio
import hashlib
import os
from dataclasses import dataclass, field
import structlog
from pydantic import BaseModel, ValidationError
logger = structlog.get_logger()
# Concurrency is capped by the most constrained downstream resource (e.g. the
# IRB portal rate limit), read from config — never hardcoded.
MAX_CONCURRENCY = int(os.environ.get("PACKET_MAX_CONCURRENCY", "8"))
QUEUE_MAXSIZE = int(os.environ.get("PACKET_QUEUE_MAXSIZE", "64"))
MAX_ATTEMPTS = int(os.environ.get("PACKET_MAX_ATTEMPTS", "3"))
class PacketError(Exception):
"""Carries an error-taxonomy category so routing is deterministic."""
def __init__(self, category: str, message: str) -> None:
super().__init__(message)
self.category = category
class SitePacket(BaseModel):
"""Validated metadata for one site packet work unit."""
correlation_id: str
site_id: str
protocol_version: str
file_hash: str # SHA-256 captured at ingestion
object_key: str # location in encrypted object storage
@dataclass
class BatchResult:
accepted: list[str] = field(default_factory=list)
dead_lettered: list[str] = field(default_factory=list)
def _sha256(data: bytes) -> str:
"""CPU-bound; always dispatched via run_in_executor, never called inline."""
return hashlib.sha256(data).hexdigest()
async def fetch_payload(packet: SitePacket) -> bytes:
"""Stream raw bytes from object storage using a native async client.
Replace with an aioboto3/aiohttp call; kept minimal here. A real
implementation wraps this in an asyncio.timeout() block.
"""
await asyncio.sleep(0) # placeholder for awaited network I/O
return b"<packet bytes from object store>"
async def verify_and_validate(packet: SitePacket, raw: bytes) -> None:
"""Verify integrity, then run schema/semantic checks.
Raises PacketError tagged with the taxonomy category that drives routing.
"""
loop = asyncio.get_running_loop()
# Hashing is CPU-bound: offload so the event loop stays responsive.
computed = await loop.run_in_executor(None, _sha256, raw)
if computed != packet.file_hash:
raise PacketError("FATAL_SCHEMA_MISMATCH", "payload hash mismatch")
# CPU-bound parsing (pypdf/pytesseract) would also go through the executor.
# Pydantic field validation on the already-parsed metadata is fast enough
# to run inline.
try:
SitePacket.model_validate(packet.model_dump())
except ValidationError as exc:
raise PacketError("FATAL_SCHEMA_MISMATCH", str(exc)) from exc
# Semantic example: an expired IRB approval is a compliance block, not a
# retryable error, so it must never re-enter the retry loop.
# if irb_expired(packet): raise PacketError("COMPLIANCE_BLOCK", "IRB expired")
async def process_one(
packet: SitePacket,
semaphore: asyncio.Semaphore,
seen_keys: set[str],
result: BatchResult,
) -> None:
"""Process a single packet with idempotency, bounded concurrency, retries."""
idem_key = f"{packet.file_hash}:{packet.site_id}:{packet.protocol_version}"
log = logger.bind(correlation_id=packet.correlation_id, idem_key=idem_key)
if idem_key in seen_keys: # in prod: a durable store, set within the txn
log.info("packet_skipped_idempotent")
return
async with semaphore: # cap simultaneous in-flight work
for attempt in range(1, MAX_ATTEMPTS + 1):
try:
async with asyncio.timeout(30):
raw = await fetch_payload(packet)
await verify_and_validate(packet, raw)
seen_keys.add(idem_key)
result.accepted.append(packet.correlation_id)
log.info("packet_accepted", attempt=attempt)
return
except PacketError as exc:
if exc.category == "RECOVERABLE_TIMEOUT" and attempt < MAX_ATTEMPTS:
backoff = min(30.0, 2 ** attempt)
log.warning("packet_retry", category=exc.category,
attempt=attempt, backoff=backoff)
await asyncio.sleep(backoff)
continue
# Non-retryable, or budget exhausted: dead-letter with context.
result.dead_lettered.append(packet.correlation_id)
log.error("packet_dead_lettered", category=exc.category)
return
except asyncio.TimeoutError:
if attempt < MAX_ATTEMPTS:
backoff = min(30.0, 2 ** attempt)
log.warning("packet_timeout_retry", attempt=attempt,
backoff=backoff)
await asyncio.sleep(backoff)
continue
result.dead_lettered.append(packet.correlation_id)
log.error("packet_dead_lettered", category="RECOVERABLE_TIMEOUT")
return
async def worker(
queue: asyncio.Queue[SitePacket | None],
semaphore: asyncio.Semaphore,
seen_keys: set[str],
result: BatchResult,
) -> None:
"""Drain the queue until a None sentinel signals shutdown."""
while True:
packet = await queue.get()
try:
if packet is None: # sentinel: no more work
return
await process_one(packet, semaphore, seen_keys, result)
finally:
queue.task_done()
async def run_batch(packets: list[SitePacket], workers: int = 4) -> BatchResult:
"""Feed packets through a bounded queue to a fixed worker pool."""
queue: asyncio.Queue[SitePacket | None] = asyncio.Queue(maxsize=QUEUE_MAXSIZE)
semaphore = asyncio.Semaphore(MAX_CONCURRENCY)
seen_keys: set[str] = set()
result = BatchResult()
async with asyncio.TaskGroup() as tg:
for _ in range(workers):
tg.create_task(worker(queue, semaphore, seen_keys, result))
# Bounded put() applies backpressure: if workers lag, this awaits.
for packet in packets:
await queue.put(packet)
for _ in range(workers): # one sentinel per worker
await queue.put(None)
logger.info("batch_complete", accepted=len(result.accepted),
dead_lettered=len(result.dead_lettered))
return result
if __name__ == "__main__":
demo = [
SitePacket(correlation_id="c1", site_id="S001",
protocol_version="v2", file_hash=_sha256(b"<packet bytes from object store>"),
object_key="packets/s001.zip"),
]
asyncio.run(run_batch(demo))
Why this is correct and not just plausible:
- No blocking calls in coroutines.
hashlib.sha256runs inrun_in_executor; the same applies topypdf/pytesseract/OpenCV. The event loop never stalls. - Real backpressure.
asyncio.Queue(maxsize=...)makesput()suspend when full, so an unbounded backlog cannot exhaust memory. - Structured concurrency.
asyncio.TaskGroup(Python 3.11+) propagates and aggregates worker failures and cancels siblings on error — cleaner than a bareasyncio.gatherfor long-lived workers. On 3.10,asyncio.gather(*tasks)with explicit cancellation is the fallback. - Deterministic shutdown. One
Nonesentinel per worker guarantees every worker exits after the queue drains;task_done()keepsqueue.join()accurate if a caller waits on it. - Taxonomy-driven routing. Only
RECOVERABLE_TIMEOUTand real timeouts retry;COMPLIANCE_BLOCK/FATAL_SCHEMA_MISMATCHdead-letter immediately. - No hardcoded config or secrets. Concurrency, queue size, and retry budget come from the environment.
Operational checklist
- Concurrency limit sized to the most constrained downstream API, sourced from config.
- Queue is bounded (
maxsize) so producers experience backpressure. - All CPU-bound and blocking-library calls dispatched via
run_in_executor/to_thread. - Every awaited network call wrapped in
asyncio.timeout(). - Idempotency key persisted in the same transaction as the result.
- Retry limited to transient categories with jittered backoff and a circuit breaker.
- Exhausted/non-retryable packets dead-lettered with full context.
- Audit log is append-only, HMAC-chained, and archived to WORM storage.
- PHI/PII tokenized before any cross-jurisdiction transmission.
FAQ
When should I use Celery or a broker instead of a raw asyncio queue?
Use both. The broker (RabbitMQ, Redis Streams, SQS) gives durability and at-least-once delivery across processes and restarts; the in-process asyncio.Queue gives fine-grained backpressure and concurrency control within a worker. For purely CPU-bound fan-out across machines, a distributed task queue like Celery is the better top-level orchestrator, with asyncio handling I/O concurrency inside each worker.
How do I keep CPU-bound OCR from blocking the event loop?
Never call pytesseract, pypdf, or OpenCV directly in a coroutine. Dispatch them with loop.run_in_executor(pool, fn, *args) using a ProcessPoolExecutor for genuinely CPU-bound work (it sidesteps the GIL) or a ThreadPoolExecutor for blocking I/O in synchronous SDKs.
What makes the audit trail 21 CFR Part 11 compliant?
The trail must be attributable, contemporaneous, and tamper-evident. Hash-chaining each record with HMAC-SHA256 over the prior record’s MAC makes any retroactive change detectable; writing to WORM storage with enforced retention and logging audit reads themselves covers the access-control and durability requirements. The signature and access-control practices belong to Security Boundaries for Clinical Data.
How do I guarantee a redelivered packet is not processed twice?
Derive an idempotency key from the file hash plus site and protocol version, and commit that key to a durable store atomically with the result. A redelivered packet finds its key already present and is skipped, preserving ALCOA+ integrity.
Where to go next
- Up to the pillar: Automated Document Ingestion & Validation Workflows
- Down to the deep build: Handling async batch processing for multi-site document ingestion
- Sibling: Schema Validation & Error Categorization
- Sibling: Checklist Sync & Gap Analysis