Handling Async Batch Processing for Multi-Site Document Ingestion
A production-grade asyncio blueprint for ingesting regulatory document packets from many clinical sites at once: bounded concurrency, queue backpressure, per-site rate limiting, jittered retries, idempotency, structured concurrency, graceful shutdown, and ALCOA+ audit logging that survives an FDA inspection.
When you activate dozens or hundreds of clinical sites, each one submits a packet — signed protocols, IRB approvals, 1572s, lab certifications, CVs — and they all arrive at unpredictable times. Processing them one site at a time is too slow; firing every download and parse at once will exhaust file descriptors, trip portal rate limits, and blow memory when ten 200-page scanned PDFs decompress simultaneously. The answer is bounded asyncio concurrency with explicit backpressure and audit-grade record keeping. This guide builds that worker end to end.
This page is a deep how-to under the Async Batch Processing for Site Packets cluster, which sits within the Automated Document Ingestion & Validation Workflows pillar. It pairs naturally with Categorizing validation errors in regulatory document pipelines for the validation stage and Configuring fallback routing when clinical portals timeout for the transport layer.
The core async correctness rule
There is exactly one mistake that ruins every async ingestion pipeline: calling blocking code inside a coroutine. The asyncio event loop runs on a single thread. The moment a coroutine calls pypdf.PdfReader(...), pytesseract.image_to_string(...), cv2.imread(...), hashlib.sha256(big_bytes), or a synchronous requests.get(...), the entire loop freezes — every other site’s download stalls, timeouts fire spuriously, and your “concurrent” worker becomes serial with extra steps.
Two rules govern the whole design:
- CPU-bound or blocking-I/O work runs in an executor. PDF parsing, OCR, OpenCV, and large hashing are offloaded with
loop.run_in_executor(...)so the event loop keeps moving. AProcessPoolExecutorsidesteps the GIL for genuinely CPU-bound parsing/OCR; aThreadPoolExecutoris fine for blocking I/O. - Every network
awaitis wrapped inasyncio.timeout(...). An unboundedawaitagainst a flaky site portal will hang a worker forever and silently drain your concurrency budget.
Keep those two rules and the rest is plumbing.
Architecture overview
The pipeline is a classic producer/consumer with a bounded asyncio.Queue providing backpressure, a Semaphore capping global concurrency, and a per-site token bucket enforcing each portal’s rate limit. The producer enqueues work descriptors (never the document bytes — keep the queue cheap); consumers download, hash, validate, parse, and archive, emitting a tamper-evident audit record at every state transition.
flowchart TD
A[Site packet manifest] --> B[Producer enqueues work items]
B --> C{Bounded asyncio Queue}
C -->|backpressure when full| B
C --> D[Worker pool]
D --> E[Acquire global Semaphore]
E --> F[Acquire per site rate token]
F --> G[Download with asyncio timeout]
G --> H[Hash in executor for idempotency]
H --> I{Already processed}
I -->|yes| J[Skip and log dedup]
I -->|no| K[Validate schema]
K -->|invalid| L[Reject and audit]
K -->|valid| M[Parse PDF or OCR in executor]
M -->|success| N[Archive and audit ARCHIVED]
M -->|retryable failure| F
M -->|budget exhausted| L
N --> O[Append to ALCOA plus audit chain]
L --> O
Step 1: Bounded concurrency and per-site rate limiting
Global concurrency is capped with a single asyncio.Semaphore. Per-site rate limiting needs a token bucket — a Semaphore alone caps simultaneous requests but not request rate, and most clinical portals enforce requests-per-second. The bucket below refills lazily, so it allocates no background tasks and is safe to keep one per site.
import asyncio
import time
class TokenBucket:
"""Async token-bucket rate limiter for a single site portal.
Allows bursts up to `capacity` and refills at `rate` tokens/second.
Lazy refill means no background task to clean up on shutdown.
"""
def __init__(self, rate: float, capacity: float) -> None:
if rate <= 0 or capacity <= 0:
raise ValueError("rate and capacity must be positive")
self._rate = rate
self._capacity = capacity
self._tokens = capacity
self._updated = time.monotonic()
self._lock = asyncio.Lock()
async def acquire(self, tokens: float = 1.0) -> None:
"""Block until `tokens` are available, then consume them."""
while True:
async with self._lock:
now = time.monotonic()
self._tokens = min(
self._capacity,
self._tokens + (now - self._updated) * self._rate,
)
self._updated = now
if self._tokens >= tokens:
self._tokens -= tokens
return
deficit = tokens - self._tokens
wait = deficit / self._rate
# Sleep outside the lock so other coroutines can refill/check.
await asyncio.sleep(wait)
Step 2: Offloading blocking work correctly
Hashing, PDF parsing, and OCR are wrapped so they never touch the event loop. We pass a process pool for CPU-bound parsing/OCR and reuse the default thread pool for cheap blocking calls.
import functools
import hashlib
from concurrent.futures import ProcessPoolExecutor
def _sha256(data: bytes) -> str:
"""Pure CPU work — must run off the event loop for large payloads."""
return hashlib.sha256(data).hexdigest()
def _extract_fields(payload: bytes) -> dict:
"""Blocking parse/OCR. Runs in a process pool to avoid the GIL.
Real implementations use pypdf for born-digital PDFs and fall back to
pytesseract (Tesseract 4+ LSTM, --oem 1) over OpenCV-preprocessed
images for scanned packets. Kept minimal here for focus.
"""
import io
from pypdf import PdfReader
reader = PdfReader(io.BytesIO(payload))
text = "\n".join(page.extract_text() or "" for page in reader.pages)
if len(text.strip()) < 32:
# Scanned packet: route to the OCR branch (pytesseract --oem 1).
raise ValueError("insufficient extractable text; OCR required")
return {"pages": len(reader.pages), "chars": len(text)}
async def hash_payload(loop: asyncio.AbstractEventLoop, payload: bytes) -> str:
# hashlib releases the GIL for large buffers; a thread pool is plenty.
return await loop.run_in_executor(None, _sha256, payload)
async def parse_payload(
loop: asyncio.AbstractEventLoop,
pool: ProcessPoolExecutor,
payload: bytes,
) -> dict:
return await loop.run_in_executor(
pool, functools.partial(_extract_fields, payload)
)
Step 3: Idempotency and audit records
Idempotency is keyed on the content hash plus the document’s logical identity. If the same bytes for the same (site_id, doc_id) were already archived, the worker records a dedup event and returns without re-archiving — so a redelivered queue message or a retried download never double-files a record. The audit record is frozen and chained with sequential SHA-256 so any tampering breaks the chain.
import json
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from enum import Enum
from typing import Optional
class State(str, Enum):
RECEIVED = "RECEIVED"
DEDUPED = "DEDUPED"
VALIDATING = "VALIDATING"
REJECTED = "REJECTED"
ARCHIVED = "ARCHIVED"
def _utc_now() -> str:
return datetime.now(timezone.utc).isoformat()
@dataclass(frozen=True)
class AuditRecord:
"""ALCOA+ event: Attributable, Legible, Contemporaneous, Original, Accurate."""
correlation_id: str
site_id: str
doc_id: str
document_hash: str
state: State
timestamp_utc: str
schema_version: str
worker_id: str
error_code: Optional[str] = None
class AuditChain:
"""Append-only, tamper-evident audit log (sequential SHA-256 chaining).
In production, `append` also writes to WORM storage; the in-memory
head is only a verification convenience.
"""
def __init__(self) -> None:
self._head = "0" * 64 # genesis
self._lock = asyncio.Lock()
async def append(self, record: AuditRecord) -> str:
body = json.dumps(asdict(record), sort_keys=True)
async with self._lock:
self._head = hashlib.sha256(
f"{self._head}{body}".encode("utf-8")
).hexdigest()
return self._head
Step 4: The worker — retries, timeouts, and structured logging
Each consumer pulls a work item, enforces concurrency and rate limits, then runs the guarded download → hash → validate → parse → archive flow. Retries use jittered exponential backoff and only fire for transient faults; a schema mismatch is permanent and fails fast. Every network call is inside asyncio.timeout.
import logging
import random
logger = logging.getLogger("clinical_ingestion.audit")
RETRYABLE = (asyncio.TimeoutError, ConnectionError)
@dataclass(frozen=True)
class WorkItem:
site_id: str
doc_id: str
url: str
schema_version: str
class Ingestor:
def __init__(
self,
*,
client, # an async HTTP client (e.g. httpx.AsyncClient)
pool: ProcessPoolExecutor,
audit: AuditChain,
buckets: dict[str, TokenBucket],
max_concurrency: int = 16,
download_timeout: float = 30.0,
max_retries: int = 4,
) -> None:
self._client = client
self._pool = pool
self._audit = audit
self._buckets = buckets
self._sem = asyncio.Semaphore(max_concurrency)
self._timeout = download_timeout
self._max_retries = max_retries
self._seen: set[tuple[str, str, str]] = set()
self._seen_lock = asyncio.Lock()
async def _download(self, item: WorkItem) -> bytes:
"""Network I/O — always guarded by a timeout."""
async with asyncio.timeout(self._timeout):
resp = await self._client.get(item.url)
resp.raise_for_status()
return resp.content
async def _audit_event(
self, item: WorkItem, cid: str, doc_hash: str,
state: State, worker_id: str, error_code: Optional[str] = None,
) -> None:
head = await self._audit.append(AuditRecord(
correlation_id=cid, site_id=item.site_id, doc_id=item.doc_id,
document_hash=doc_hash, state=state, timestamp_utc=_utc_now(),
schema_version=item.schema_version, worker_id=worker_id,
error_code=error_code,
))
logger.info(
"audit",
extra={"correlation_id": cid, "state": state.value,
"error_code": error_code, "chain_head": head},
)
async def _process_one(self, item: WorkItem, worker_id: str) -> None:
loop = asyncio.get_running_loop()
cid = f"{item.site_id}:{item.doc_id}"
bucket = self._buckets[item.site_id]
# Bounded concurrency, then per-site rate limit.
async with self._sem:
payload = await self._download_with_retries(item, bucket, cid, worker_id)
if payload is None:
return # already audited as REJECTED
doc_hash = await hash_payload(loop, payload)
key = (item.site_id, item.doc_id, doc_hash)
async with self._seen_lock:
if key in self._seen:
await self._audit_event(item, cid, doc_hash, State.DEDUPED, worker_id)
return
self._seen.add(key)
await self._audit_event(item, cid, doc_hash, State.VALIDATING, worker_id)
if not _valid_schema(payload, item.schema_version):
await self._audit_event(item, cid, doc_hash, State.REJECTED,
worker_id, error_code="SCHEMA_MISMATCH")
return
try:
result = await parse_payload(loop, self._pool, payload)
except Exception as exc: # parse/OCR failure is terminal for this doc
logger.error("parse failed", extra={"correlation_id": cid,
"error": str(exc)})
await self._audit_event(item, cid, doc_hash, State.REJECTED,
worker_id, error_code="PARSER_FAILURE")
return
logger.info("parsed", extra={"correlation_id": cid, **result})
await self._audit_event(item, cid, doc_hash, State.ARCHIVED, worker_id)
async def _download_with_retries(
self, item: WorkItem, bucket: TokenBucket, cid: str, worker_id: str,
) -> Optional[bytes]:
for attempt in range(self._max_retries):
await bucket.acquire() # respect the site's rate limit per attempt
try:
return await self._download(item)
except RETRYABLE as exc:
if attempt == self._max_retries - 1:
logger.error("download budget exhausted",
extra={"correlation_id": cid, "error": str(exc)})
await self._audit_event(item, cid, "", State.REJECTED,
worker_id, error_code="DOWNLOAD_FAILED")
return None
# Exponential backoff capped at 30s + full jitter.
delay = min(2 ** attempt, 30) + random.uniform(0, 1.0)
logger.warning("transient download failure; retrying",
extra={"correlation_id": cid, "attempt": attempt,
"delay": round(delay, 2)})
await asyncio.sleep(delay)
return None
def _valid_schema(payload: bytes, schema_version: str) -> bool:
"""Cheap pre-parse gate. Replace with jsonschema/pydantic validation."""
return schema_version.startswith("v2.") and len(payload) > 0
Step 5: Structured concurrency and graceful shutdown
The producer feeds a bounded queue; consumers drain it. We prefer Python 3.11+ asyncio.TaskGroup because it propagates exceptions and cancels siblings deterministically. A fallback for 3.10 uses asyncio.gather. Graceful shutdown installs signal handlers, stops accepting new work, lets in-flight items finish, and drains the executor.
import contextlib
import signal
import sys
async def _consumer(
name: str, queue: "asyncio.Queue[Optional[WorkItem]]", ing: Ingestor,
) -> None:
while True:
item = await queue.get()
try:
if item is None: # poison pill: clean stop
return
await ing._process_one(item, worker_id=name)
except asyncio.CancelledError:
raise
except Exception: # one bad doc must never kill the worker
logger.exception("unhandled error processing item")
finally:
queue.task_done()
async def run_pipeline(items: list[WorkItem], ing: Ingestor, n_workers: int = 8) -> None:
queue: asyncio.Queue[Optional[WorkItem]] = asyncio.Queue(maxsize=n_workers * 2)
stopping = asyncio.Event()
loop = asyncio.get_running_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
with contextlib.suppress(NotImplementedError): # Windows lacks these
loop.add_signal_handler(sig, stopping.set)
async def produce() -> None:
for item in items:
if stopping.is_set():
break
await queue.put(item) # blocks when full -> backpressure
for _ in range(n_workers): # one poison pill per worker
await queue.put(None)
# --- Python 3.11+: structured concurrency ---
if sys.version_info >= (3, 11):
async with asyncio.TaskGroup() as tg:
tg.create_task(produce())
for i in range(n_workers):
tg.create_task(_consumer(f"worker-{i:02d}", queue, ing))
return
# --- Python 3.10 fallback ---
tasks = [asyncio.create_task(produce())]
tasks += [asyncio.create_task(_consumer(f"worker-{i:02d}", queue, ing))
for i in range(n_workers)]
try:
await asyncio.gather(*tasks)
except Exception:
for t in tasks:
t.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
raise
Wiring it together — note the executor and HTTP client are closed deterministically:
async def main(items: list[WorkItem]) -> None:
import httpx # async client; never use blocking `requests` in a coroutine
buckets = {site: TokenBucket(rate=5.0, capacity=10.0)
for site in {i.site_id for i in items}}
audit = AuditChain()
with ProcessPoolExecutor(max_workers=4) as pool:
async with httpx.AsyncClient(timeout=None) as client: # we enforce timeouts ourselves
ing = Ingestor(client=client, pool=pool, audit=audit, buckets=buckets)
await run_pipeline(items, ing)
Why each control matters for compliance
| Control | Async primitive | Regulatory purpose |
|---|---|---|
| Global concurrency cap | asyncio.Semaphore |
Prevents resource exhaustion that causes silent drops (data loss is an ALCOA+ Complete failure) |
| Backpressure | bounded asyncio.Queue |
Producer slows to consumer speed; no unbounded memory growth |
| Per-site rate limit | TokenBucket |
Avoids portal bans that would orphan a site’s submissions |
| Timeouts | asyncio.timeout |
Bounds every network wait so workers cannot hang indefinitely |
| Jittered retries | backoff + random.uniform |
Rides out transient faults without a thundering herd |
| Idempotency | content hash + seen-set | Redelivery never double-files a TMF record (21 CFR Part 11 accuracy) |
| Audit chain | sequential SHA-256 | Tamper-evident trail aligned with 21 CFR Part 11 and EU Annex 11 |
| Off-loop CPU work | run_in_executor + process pool |
Keeps the loop responsive so timeouts and rate limits stay accurate |
Operational checklist
- No blocking call (pypdf, pytesseract, OpenCV, large
hashlib,requests) runs directly in a coroutine — all go throughrun_in_executor. - Every network
awaitis insideasyncio.timeout. - Queue is bounded (
maxsizeset) so backpressure is real. - One poison pill per worker guarantees clean drain on completion.
- Signal handlers set a stop event; in-flight items finish before exit.
- Retries fire only on transient errors; schema/parse failures fail fast.
- Idempotency key is
(site_id, doc_id, content_hash). - Audit chain head is persisted to WORM storage and verified after each run.
- Process pool and HTTP client are closed via
with/async with.
Validation before production
- Deterministic replay: Re-ingest a frozen 500-document corpus three times; confirm identical final states and identical per-doc audit records (the chain head will differ only by timestamps).
- Backpressure test: Shrink
max_concurrencyand queuemaxsize; confirm the producer blocks onqueue.putand memory stays flat under a flood of large packets. - Rate-limit test: Point many work items at one site; confirm request spacing matches the token-bucket
rateand no portal429responses occur. - Shutdown drill: Send
SIGTERMmid-run; confirm in-flight downloads finish, notask_donemismatch warnings appear, and the executor exits cleanly. - Audit reconstruction: Hand regulatory affairs only correlation IDs and chain heads; confirm they can reproduce the full ingestion timeline without the application database.
FAQ
Why not just use asyncio.gather over all sites at once?
gather with no concurrency cap launches every coroutine immediately. With hundreds of sites you exhaust file descriptors and memory, and you have no backpressure — large scanned packets all decompress at once. The Semaphore + bounded Queue pattern gives you steady, predictable throughput and a clean place to apply rate limits and timeouts.
Do I need a ProcessPoolExecutor, or will threads do?
For genuinely CPU-bound work — pypdf text extraction on large files, pytesseract OCR, OpenCV preprocessing — a process pool sidesteps the GIL and gives real parallelism. For blocking but light I/O (or hashing, where hashlib releases the GIL), the default thread pool is sufficient. Mixing both, as shown, is common.
How does this relate to the schema-validation and fallback work?
This worker stops at a cheap pre-parse gate; rigorous validation and error taxonomy belong in Categorizing validation errors in regulatory document pipelines. When a site portal is unreachable rather than slow, route through the strategy in Configuring fallback routing when clinical portals timeout. For the bigger picture of how ingestion fits the submission lifecycle, see the Async Batch Processing for Site Packets cluster and the Automated Document Ingestion & Validation Workflows pillar.