Handling Async Batch Processing for Multi-Site Document Ingestion

A production-grade asyncio blueprint for ingesting regulatory document packets from many clinical sites at once: bounded concurrency, queue backpressure, per-site rate limiting, jittered retries, idempotency, structured concurrency, graceful shutdown, and ALCOA+ audit logging that survives an FDA inspection.

When you activate dozens or hundreds of clinical sites, each one submits a packet — signed protocols, IRB approvals, 1572s, lab certifications, CVs — and they all arrive at unpredictable times. Processing them one site at a time is too slow; firing every download and parse at once will exhaust file descriptors, trip portal rate limits, and blow memory when ten 200-page scanned PDFs decompress simultaneously. The answer is bounded asyncio concurrency with explicit backpressure and audit-grade record keeping. This guide builds that worker end to end.

This page is a deep how-to under the Async Batch Processing for Site Packets cluster, which sits within the Automated Document Ingestion & Validation Workflows pillar. It pairs naturally with Categorizing validation errors in regulatory document pipelines for the validation stage and Configuring fallback routing when clinical portals timeout for the transport layer.

The core async correctness rule

There is exactly one mistake that ruins every async ingestion pipeline: calling blocking code inside a coroutine. The asyncio event loop runs on a single thread. The moment a coroutine calls pypdf.PdfReader(...), pytesseract.image_to_string(...), cv2.imread(...), hashlib.sha256(big_bytes), or a synchronous requests.get(...), the entire loop freezes — every other site’s download stalls, timeouts fire spuriously, and your “concurrent” worker becomes serial with extra steps.

Two rules govern the whole design:

  1. CPU-bound or blocking-I/O work runs in an executor. PDF parsing, OCR, OpenCV, and large hashing are offloaded with loop.run_in_executor(...) so the event loop keeps moving. A ProcessPoolExecutor sidesteps the GIL for genuinely CPU-bound parsing/OCR; a ThreadPoolExecutor is fine for blocking I/O.
  2. Every network await is wrapped in asyncio.timeout(...). An unbounded await against a flaky site portal will hang a worker forever and silently drain your concurrency budget.

Keep those two rules and the rest is plumbing.

Architecture overview

The pipeline is a classic producer/consumer with a bounded asyncio.Queue providing backpressure, a Semaphore capping global concurrency, and a per-site token bucket enforcing each portal’s rate limit. The producer enqueues work descriptors (never the document bytes — keep the queue cheap); consumers download, hash, validate, parse, and archive, emitting a tamper-evident audit record at every state transition.

flowchart TD
    A[Site packet manifest] --> B[Producer enqueues work items]
    B --> C{Bounded asyncio Queue}
    C -->|backpressure when full| B
    C --> D[Worker pool]
    D --> E[Acquire global Semaphore]
    E --> F[Acquire per site rate token]
    F --> G[Download with asyncio timeout]
    G --> H[Hash in executor for idempotency]
    H --> I{Already processed}
    I -->|yes| J[Skip and log dedup]
    I -->|no| K[Validate schema]
    K -->|invalid| L[Reject and audit]
    K -->|valid| M[Parse PDF or OCR in executor]
    M -->|success| N[Archive and audit ARCHIVED]
    M -->|retryable failure| F
    M -->|budget exhausted| L
    N --> O[Append to ALCOA plus audit chain]
    L --> O

Step 1: Bounded concurrency and per-site rate limiting

Global concurrency is capped with a single asyncio.Semaphore. Per-site rate limiting needs a token bucket — a Semaphore alone caps simultaneous requests but not request rate, and most clinical portals enforce requests-per-second. The bucket below refills lazily, so it allocates no background tasks and is safe to keep one per site.

import asyncio
import time


class TokenBucket:
    """Async token-bucket rate limiter for a single site portal.

    Allows bursts up to `capacity` and refills at `rate` tokens/second.
    Lazy refill means no background task to clean up on shutdown.
    """

    def __init__(self, rate: float, capacity: float) -> None:
        if rate <= 0 or capacity <= 0:
            raise ValueError("rate and capacity must be positive")
        self._rate = rate
        self._capacity = capacity
        self._tokens = capacity
        self._updated = time.monotonic()
        self._lock = asyncio.Lock()

    async def acquire(self, tokens: float = 1.0) -> None:
        """Block until `tokens` are available, then consume them."""
        while True:
            async with self._lock:
                now = time.monotonic()
                self._tokens = min(
                    self._capacity,
                    self._tokens + (now - self._updated) * self._rate,
                )
                self._updated = now
                if self._tokens >= tokens:
                    self._tokens -= tokens
                    return
                deficit = tokens - self._tokens
                wait = deficit / self._rate
            # Sleep outside the lock so other coroutines can refill/check.
            await asyncio.sleep(wait)

Step 2: Offloading blocking work correctly

Hashing, PDF parsing, and OCR are wrapped so they never touch the event loop. We pass a process pool for CPU-bound parsing/OCR and reuse the default thread pool for cheap blocking calls.

import functools
import hashlib
from concurrent.futures import ProcessPoolExecutor


def _sha256(data: bytes) -> str:
    """Pure CPU work — must run off the event loop for large payloads."""
    return hashlib.sha256(data).hexdigest()


def _extract_fields(payload: bytes) -> dict:
    """Blocking parse/OCR. Runs in a process pool to avoid the GIL.

    Real implementations use pypdf for born-digital PDFs and fall back to
    pytesseract (Tesseract 4+ LSTM, --oem 1) over OpenCV-preprocessed
    images for scanned packets. Kept minimal here for focus.
    """
    import io
    from pypdf import PdfReader

    reader = PdfReader(io.BytesIO(payload))
    text = "\n".join(page.extract_text() or "" for page in reader.pages)
    if len(text.strip()) < 32:
        # Scanned packet: route to the OCR branch (pytesseract --oem 1).
        raise ValueError("insufficient extractable text; OCR required")
    return {"pages": len(reader.pages), "chars": len(text)}


async def hash_payload(loop: asyncio.AbstractEventLoop, payload: bytes) -> str:
    # hashlib releases the GIL for large buffers; a thread pool is plenty.
    return await loop.run_in_executor(None, _sha256, payload)


async def parse_payload(
    loop: asyncio.AbstractEventLoop,
    pool: ProcessPoolExecutor,
    payload: bytes,
) -> dict:
    return await loop.run_in_executor(
        pool, functools.partial(_extract_fields, payload)
    )

Step 3: Idempotency and audit records

Idempotency is keyed on the content hash plus the document’s logical identity. If the same bytes for the same (site_id, doc_id) were already archived, the worker records a dedup event and returns without re-archiving — so a redelivered queue message or a retried download never double-files a record. The audit record is frozen and chained with sequential SHA-256 so any tampering breaks the chain.

import json
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from enum import Enum
from typing import Optional


class State(str, Enum):
    RECEIVED = "RECEIVED"
    DEDUPED = "DEDUPED"
    VALIDATING = "VALIDATING"
    REJECTED = "REJECTED"
    ARCHIVED = "ARCHIVED"


def _utc_now() -> str:
    return datetime.now(timezone.utc).isoformat()


@dataclass(frozen=True)
class AuditRecord:
    """ALCOA+ event: Attributable, Legible, Contemporaneous, Original, Accurate."""
    correlation_id: str
    site_id: str
    doc_id: str
    document_hash: str
    state: State
    timestamp_utc: str
    schema_version: str
    worker_id: str
    error_code: Optional[str] = None


class AuditChain:
    """Append-only, tamper-evident audit log (sequential SHA-256 chaining).

    In production, `append` also writes to WORM storage; the in-memory
    head is only a verification convenience.
    """

    def __init__(self) -> None:
        self._head = "0" * 64  # genesis
        self._lock = asyncio.Lock()

    async def append(self, record: AuditRecord) -> str:
        body = json.dumps(asdict(record), sort_keys=True)
        async with self._lock:
            self._head = hashlib.sha256(
                f"{self._head}{body}".encode("utf-8")
            ).hexdigest()
            return self._head

Step 4: The worker — retries, timeouts, and structured logging

Each consumer pulls a work item, enforces concurrency and rate limits, then runs the guarded download → hash → validate → parse → archive flow. Retries use jittered exponential backoff and only fire for transient faults; a schema mismatch is permanent and fails fast. Every network call is inside asyncio.timeout.

import logging
import random

logger = logging.getLogger("clinical_ingestion.audit")

RETRYABLE = (asyncio.TimeoutError, ConnectionError)


@dataclass(frozen=True)
class WorkItem:
    site_id: str
    doc_id: str
    url: str
    schema_version: str


class Ingestor:
    def __init__(
        self,
        *,
        client,                      # an async HTTP client (e.g. httpx.AsyncClient)
        pool: ProcessPoolExecutor,
        audit: AuditChain,
        buckets: dict[str, TokenBucket],
        max_concurrency: int = 16,
        download_timeout: float = 30.0,
        max_retries: int = 4,
    ) -> None:
        self._client = client
        self._pool = pool
        self._audit = audit
        self._buckets = buckets
        self._sem = asyncio.Semaphore(max_concurrency)
        self._timeout = download_timeout
        self._max_retries = max_retries
        self._seen: set[tuple[str, str, str]] = set()
        self._seen_lock = asyncio.Lock()

    async def _download(self, item: WorkItem) -> bytes:
        """Network I/O — always guarded by a timeout."""
        async with asyncio.timeout(self._timeout):
            resp = await self._client.get(item.url)
            resp.raise_for_status()
            return resp.content

    async def _audit_event(
        self, item: WorkItem, cid: str, doc_hash: str,
        state: State, worker_id: str, error_code: Optional[str] = None,
    ) -> None:
        head = await self._audit.append(AuditRecord(
            correlation_id=cid, site_id=item.site_id, doc_id=item.doc_id,
            document_hash=doc_hash, state=state, timestamp_utc=_utc_now(),
            schema_version=item.schema_version, worker_id=worker_id,
            error_code=error_code,
        ))
        logger.info(
            "audit",
            extra={"correlation_id": cid, "state": state.value,
                   "error_code": error_code, "chain_head": head},
        )

    async def _process_one(self, item: WorkItem, worker_id: str) -> None:
        loop = asyncio.get_running_loop()
        cid = f"{item.site_id}:{item.doc_id}"
        bucket = self._buckets[item.site_id]

        # Bounded concurrency, then per-site rate limit.
        async with self._sem:
            payload = await self._download_with_retries(item, bucket, cid, worker_id)
            if payload is None:
                return  # already audited as REJECTED

            doc_hash = await hash_payload(loop, payload)

            key = (item.site_id, item.doc_id, doc_hash)
            async with self._seen_lock:
                if key in self._seen:
                    await self._audit_event(item, cid, doc_hash, State.DEDUPED, worker_id)
                    return
                self._seen.add(key)

            await self._audit_event(item, cid, doc_hash, State.VALIDATING, worker_id)
            if not _valid_schema(payload, item.schema_version):
                await self._audit_event(item, cid, doc_hash, State.REJECTED,
                                        worker_id, error_code="SCHEMA_MISMATCH")
                return

            try:
                result = await parse_payload(loop, self._pool, payload)
            except Exception as exc:  # parse/OCR failure is terminal for this doc
                logger.error("parse failed", extra={"correlation_id": cid,
                                                     "error": str(exc)})
                await self._audit_event(item, cid, doc_hash, State.REJECTED,
                                        worker_id, error_code="PARSER_FAILURE")
                return

            logger.info("parsed", extra={"correlation_id": cid, **result})
            await self._audit_event(item, cid, doc_hash, State.ARCHIVED, worker_id)

    async def _download_with_retries(
        self, item: WorkItem, bucket: TokenBucket, cid: str, worker_id: str,
    ) -> Optional[bytes]:
        for attempt in range(self._max_retries):
            await bucket.acquire()  # respect the site's rate limit per attempt
            try:
                return await self._download(item)
            except RETRYABLE as exc:
                if attempt == self._max_retries - 1:
                    logger.error("download budget exhausted",
                                 extra={"correlation_id": cid, "error": str(exc)})
                    await self._audit_event(item, cid, "", State.REJECTED,
                                            worker_id, error_code="DOWNLOAD_FAILED")
                    return None
                # Exponential backoff capped at 30s + full jitter.
                delay = min(2 ** attempt, 30) + random.uniform(0, 1.0)
                logger.warning("transient download failure; retrying",
                               extra={"correlation_id": cid, "attempt": attempt,
                                      "delay": round(delay, 2)})
                await asyncio.sleep(delay)
        return None


def _valid_schema(payload: bytes, schema_version: str) -> bool:
    """Cheap pre-parse gate. Replace with jsonschema/pydantic validation."""
    return schema_version.startswith("v2.") and len(payload) > 0

Step 5: Structured concurrency and graceful shutdown

The producer feeds a bounded queue; consumers drain it. We prefer Python 3.11+ asyncio.TaskGroup because it propagates exceptions and cancels siblings deterministically. A fallback for 3.10 uses asyncio.gather. Graceful shutdown installs signal handlers, stops accepting new work, lets in-flight items finish, and drains the executor.

import contextlib
import signal
import sys


async def _consumer(
    name: str, queue: "asyncio.Queue[Optional[WorkItem]]", ing: Ingestor,
) -> None:
    while True:
        item = await queue.get()
        try:
            if item is None:  # poison pill: clean stop
                return
            await ing._process_one(item, worker_id=name)
        except asyncio.CancelledError:
            raise
        except Exception:  # one bad doc must never kill the worker
            logger.exception("unhandled error processing item")
        finally:
            queue.task_done()


async def run_pipeline(items: list[WorkItem], ing: Ingestor, n_workers: int = 8) -> None:
    queue: asyncio.Queue[Optional[WorkItem]] = asyncio.Queue(maxsize=n_workers * 2)
    stopping = asyncio.Event()

    loop = asyncio.get_running_loop()
    for sig in (signal.SIGINT, signal.SIGTERM):
        with contextlib.suppress(NotImplementedError):  # Windows lacks these
            loop.add_signal_handler(sig, stopping.set)

    async def produce() -> None:
        for item in items:
            if stopping.is_set():
                break
            await queue.put(item)          # blocks when full -> backpressure
        for _ in range(n_workers):         # one poison pill per worker
            await queue.put(None)

    # --- Python 3.11+: structured concurrency ---
    if sys.version_info >= (3, 11):
        async with asyncio.TaskGroup() as tg:
            tg.create_task(produce())
            for i in range(n_workers):
                tg.create_task(_consumer(f"worker-{i:02d}", queue, ing))
        return

    # --- Python 3.10 fallback ---
    tasks = [asyncio.create_task(produce())]
    tasks += [asyncio.create_task(_consumer(f"worker-{i:02d}", queue, ing))
              for i in range(n_workers)]
    try:
        await asyncio.gather(*tasks)
    except Exception:
        for t in tasks:
            t.cancel()
        await asyncio.gather(*tasks, return_exceptions=True)
        raise

Wiring it together — note the executor and HTTP client are closed deterministically:

async def main(items: list[WorkItem]) -> None:
    import httpx  # async client; never use blocking `requests` in a coroutine

    buckets = {site: TokenBucket(rate=5.0, capacity=10.0)
               for site in {i.site_id for i in items}}
    audit = AuditChain()

    with ProcessPoolExecutor(max_workers=4) as pool:
        async with httpx.AsyncClient(timeout=None) as client:  # we enforce timeouts ourselves
            ing = Ingestor(client=client, pool=pool, audit=audit, buckets=buckets)
            await run_pipeline(items, ing)

Why each control matters for compliance

Control Async primitive Regulatory purpose
Global concurrency cap asyncio.Semaphore Prevents resource exhaustion that causes silent drops (data loss is an ALCOA+ Complete failure)
Backpressure bounded asyncio.Queue Producer slows to consumer speed; no unbounded memory growth
Per-site rate limit TokenBucket Avoids portal bans that would orphan a site’s submissions
Timeouts asyncio.timeout Bounds every network wait so workers cannot hang indefinitely
Jittered retries backoff + random.uniform Rides out transient faults without a thundering herd
Idempotency content hash + seen-set Redelivery never double-files a TMF record (21 CFR Part 11 accuracy)
Audit chain sequential SHA-256 Tamper-evident trail aligned with 21 CFR Part 11 and EU Annex 11
Off-loop CPU work run_in_executor + process pool Keeps the loop responsive so timeouts and rate limits stay accurate

Operational checklist

  • No blocking call (pypdf, pytesseract, OpenCV, large hashlib, requests) runs directly in a coroutine — all go through run_in_executor.
  • Every network await is inside asyncio.timeout.
  • Queue is bounded (maxsize set) so backpressure is real.
  • One poison pill per worker guarantees clean drain on completion.
  • Signal handlers set a stop event; in-flight items finish before exit.
  • Retries fire only on transient errors; schema/parse failures fail fast.
  • Idempotency key is (site_id, doc_id, content_hash).
  • Audit chain head is persisted to WORM storage and verified after each run.
  • Process pool and HTTP client are closed via with/async with.

Validation before production

  1. Deterministic replay: Re-ingest a frozen 500-document corpus three times; confirm identical final states and identical per-doc audit records (the chain head will differ only by timestamps).
  2. Backpressure test: Shrink max_concurrency and queue maxsize; confirm the producer blocks on queue.put and memory stays flat under a flood of large packets.
  3. Rate-limit test: Point many work items at one site; confirm request spacing matches the token-bucket rate and no portal 429 responses occur.
  4. Shutdown drill: Send SIGTERM mid-run; confirm in-flight downloads finish, no task_done mismatch warnings appear, and the executor exits cleanly.
  5. Audit reconstruction: Hand regulatory affairs only correlation IDs and chain heads; confirm they can reproduce the full ingestion timeline without the application database.

FAQ

Why not just use asyncio.gather over all sites at once?

gather with no concurrency cap launches every coroutine immediately. With hundreds of sites you exhaust file descriptors and memory, and you have no backpressure — large scanned packets all decompress at once. The Semaphore + bounded Queue pattern gives you steady, predictable throughput and a clean place to apply rate limits and timeouts.

Do I need a ProcessPoolExecutor, or will threads do?

For genuinely CPU-bound work — pypdf text extraction on large files, pytesseract OCR, OpenCV preprocessing — a process pool sidesteps the GIL and gives real parallelism. For blocking but light I/O (or hashing, where hashlib releases the GIL), the default thread pool is sufficient. Mixing both, as shown, is common.

How does this relate to the schema-validation and fallback work?

This worker stops at a cheap pre-parse gate; rigorous validation and error taxonomy belong in Categorizing validation errors in regulatory document pipelines. When a site portal is unreachable rather than slow, route through the strategy in Configuring fallback routing when clinical portals timeout. For the bigger picture of how ingestion fits the submission lifecycle, see the Async Batch Processing for Site Packets cluster and the Automated Document Ingestion & Validation Workflows pillar.