Automating Checklist Synchronization Between EDC and CTMS
Site-activation checklists live in both your EDC and your CTMS, and they drift apart the moment a coordinator updates one without the other. This guide builds a production-ready Python synchronization engine that reconciles checklist state across both systems with explicit source-of-truth precedence, conflict detection, idempotent upserts, resilient API clients, and a tamper-evident 21 CFR Part 11 audit trail.
The pattern below is the deep, code-first companion to Checklist Sync & Gap Analysis, which sits under the Automated Document Ingestion & Validation Workflows pillar. If your sync runs against portals that intermittently fail, pair it with configuring fallback routing when clinical portals timeout; for the bulk-ingestion side of the same problem, see handling async batch processing for multi-site document ingestion.
Why EDC and CTMS checklists diverge
An EDC (Electronic Data Capture) system records the operational truth of what a site has actually completed — regulatory documents uploaded, training acknowledged, lab certifications attached. A CTMS (Clinical Trial Management System) tracks the milestone view used by study managers and sponsors for site-activation reporting. The same logical fact — “site 1042 has a fully executed FDA Form 1572” — exists as a checklist item in both systems, with different field names, different enumerations, and independent edit histories.
Divergence is structural, not exceptional:
- Independent writes. A CRA marks an item complete in the CTMS during a monitoring visit while the underlying EDC record is still
in_review. - Taxonomy skew. The EDC exposes
status: SUBMITTED; the CTMS expectsstate: "Pending QC". A mapping layer must normalize both into one canonical vocabulary. - Clock skew and ordering. Updates carry timestamps from different servers. You cannot trust wall-clock order alone to decide which write is newer.
- Partial failures. A batch upsert commits some items, then hits a rate limit, leaving the two systems in a state that is neither fully synced nor cleanly rolled back.
Treating this as a naive two-way copy produces flapping records and silent data loss. The correct framing is reconciliation against a declared source of truth, with every divergence either resolved deterministically or quarantined as a conflict for a human to adjudicate.
Reconciliation model and source-of-truth precedence
Define one canonical checklist record, then compute the desired state from both systems according to an explicit precedence policy. A field-level precedence map is more honest than a blanket “EDC always wins,” because operational reality is mixed: completion evidence is authoritative in the EDC, but milestone sign-off and projected dates are owned by study managers in the CTMS.
For each item we classify the pair of records into one of four reconciliation outcomes:
| Outcome | Condition | Action |
|---|---|---|
IN_SYNC |
Canonical hashes match | No write |
EDC_AUTHORITATIVE |
Differ, EDC owns the changed field | Upsert into CTMS |
CTMS_AUTHORITATIVE |
Differ, CTMS owns the changed field | Upsert into EDC |
CONFLICT |
Both sides changed an owned field since last sync | Quarantine, no write |
Conflict detection needs a baseline. We store the canonical hash from the last successful sync per item; a side “changed” if its current hash differs from that baseline. If both sides changed fields they each own, no precedence rule can safely auto-resolve it — that is a genuine conflict and must be surfaced, never silently overwritten.
The reconciliation decision is deterministic given three inputs — the EDC record, the CTMS record, and the stored baseline:
where each is a canonical SHA-256 digest over the normalized, precedence-relevant fields. Hashing a normalized projection (sorted keys, compact separators, only owned fields) eliminates false positives from whitespace, key ordering, or fields neither system controls.
flowchart TD
A[Fetch EDC item] --> C[Normalize to canonical]
B[Fetch CTMS item] --> C
C --> D[Compute edc hash and ctms hash]
D --> E[Load baseline hash]
E --> F{Hashes equal}
F -->|edc equals ctms| G[In sync, record baseline]
F -->|differ| H{Which side changed vs baseline}
H -->|EDC only| I[EDC authoritative]
H -->|CTMS only| J[CTMS authoritative]
H -->|both changed| K[Conflict, quarantine]
I --> L[Idempotent upsert to CTMS]
J --> M[Idempotent upsert to EDC]
L --> N[Append audit entry]
M --> N
K --> N
G --> N
Canonical models with pydantic
We model the canonical item and the reconciliation result with pydantic, which gives us validation, coercion, and a single place to enforce required regulatory fields. The FieldOwner map encodes precedence per field.
from __future__ import annotations
import hashlib
import json
from datetime import datetime, timezone
from enum import Enum
from typing import Optional
from pydantic import BaseModel, Field, field_validator
class System(str, Enum):
EDC = "edc"
CTMS = "ctms"
class ItemStatus(str, Enum):
"""Canonical status vocabulary that both systems map into."""
NOT_STARTED = "not_started"
IN_REVIEW = "in_review"
COMPLETE = "complete"
REJECTED = "rejected"
# Field-level source of truth. Completion evidence is owned by the EDC;
# milestone sign-off and planned dates are owned by study managers in the CTMS.
FIELD_OWNER: dict[str, System] = {
"status": System.EDC,
"evidence_doc_id": System.EDC,
"milestone_signed_off": System.CTMS,
"planned_activation_date": System.CTMS,
}
class CanonicalItem(BaseModel):
"""One checklist item expressed in the canonical vocabulary."""
site_id: str
item_code: str
checklist_version: str
status: ItemStatus
evidence_doc_id: Optional[str] = None
milestone_signed_off: bool = False
planned_activation_date: Optional[str] = None # ISO-8601 date
source_updated_utc: datetime
operator_id: str = Field(..., min_length=1)
@field_validator("source_updated_utc")
@classmethod
def _require_tz(cls, value: datetime) -> datetime:
"""Reject naive timestamps; ALCOA+ requires unambiguous time."""
if value.tzinfo is None:
raise ValueError("source_updated_utc must be timezone-aware")
return value.astimezone(timezone.utc)
def owned_projection(self, owner: System) -> dict[str, object]:
"""The subset of fields a given system is the source of truth for."""
return {
name: getattr(self, name)
for name, field_owner in FIELD_OWNER.items()
if field_owner is owner
}
def canonical_hash(self) -> str:
"""Stable digest over all precedence-relevant fields."""
relevant = {name: getattr(self, name) for name in FIELD_OWNER}
payload = json.dumps(
relevant, sort_keys=True, separators=(",", ":"), default=str
)
return hashlib.sha256(payload.encode("utf-8")).hexdigest()
class Decision(str, Enum):
IN_SYNC = "in_sync"
EDC_AUTHORITATIVE = "edc_authoritative"
CTMS_AUTHORITATIVE = "ctms_authoritative"
CONFLICT = "conflict"
class Reconciliation(BaseModel):
site_id: str
item_code: str
decision: Decision
target: Optional[System] = None
desired: Optional[CanonicalItem] = None
reason: str = ""
The reconciliation function
Reconciliation is pure: given two canonical items and the stored baseline hash, it returns a Reconciliation with no side effects. Keeping it pure makes it trivially unit-testable and removes any temptation to mutate state mid-decision.
def reconcile(
edc: CanonicalItem,
ctms: CanonicalItem,
baseline_hash: Optional[str],
) -> Reconciliation:
"""Decide how to reconcile one item using field-level precedence.
A side is considered "changed" when its current hash differs from the
baseline captured at the last successful sync. When both sides changed
fields they each own, no precedence rule can resolve it safely.
"""
edc_hash = edc.canonical_hash()
ctms_hash = ctms.canonical_hash()
if edc_hash == ctms_hash:
return Reconciliation(
site_id=edc.site_id,
item_code=edc.item_code,
decision=Decision.IN_SYNC,
)
edc_changed = baseline_hash is None or edc_hash != baseline_hash
ctms_changed = baseline_hash is None or ctms_hash != baseline_hash
# Did the divergence touch EDC-owned fields, CTMS-owned fields, or both?
edc_owns_diff = edc.owned_projection(System.EDC) != ctms.owned_projection(System.EDC)
ctms_owns_diff = edc.owned_projection(System.CTMS) != ctms.owned_projection(System.CTMS)
if edc_owns_diff and ctms_owns_diff and edc_changed and ctms_changed:
return Reconciliation(
site_id=edc.site_id,
item_code=edc.item_code,
decision=Decision.CONFLICT,
reason="Both systems changed owned fields since last sync",
)
# Build the desired canonical record: take each field from its owner.
desired = edc.model_copy(
update={
"milestone_signed_off": ctms.milestone_signed_off,
"planned_activation_date": ctms.planned_activation_date,
}
)
if edc_owns_diff:
return Reconciliation(
site_id=edc.site_id,
item_code=edc.item_code,
decision=Decision.EDC_AUTHORITATIVE,
target=System.CTMS,
desired=desired,
reason="EDC-owned fields changed",
)
return Reconciliation(
site_id=edc.site_id,
item_code=edc.item_code,
decision=Decision.CTMS_AUTHORITATIVE,
target=System.EDC,
desired=desired,
reason="CTMS-owned fields changed",
)
A resilient async API client
EDC and CTMS APIs are remote, paginated, and rate-limited. A correct client must: read credentials from the environment (never hardcode them), retry transient failures with exponential backoff and jitter, respect server Retry-After headers on 429, paginate fully, and cap concurrency with a semaphore so one batch never stampedes the server. We use aiohttp and tenacity.
import asyncio
import os
import random
from collections.abc import AsyncIterator, Callable
from typing import Any
import aiohttp
from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential_jitter,
)
class RetryableHTTPError(Exception):
"""Raised for status codes that warrant a retry (429 / 5xx)."""
class ClinicalApiClient:
"""Async client for an EDC or CTMS REST API.
Secrets are read from the environment. Set, for the EDC:
EDC_BASE_URL, EDC_API_TOKEN
and for the CTMS:
CTMS_BASE_URL, CTMS_API_TOKEN
"""
def __init__(self, system: System, *, max_concurrency: int = 8) -> None:
prefix = "EDC" if system is System.EDC else "CTMS"
try:
self._base_url = os.environ[f"{prefix}_BASE_URL"].rstrip("/")
self._token = os.environ[f"{prefix}_API_TOKEN"]
except KeyError as exc:
raise RuntimeError(f"Missing required environment variable: {exc}") from exc
self.system = system
self._semaphore = asyncio.Semaphore(max_concurrency)
self._session: aiohttp.ClientSession | None = None
async def __aenter__(self) -> "ClinicalApiClient":
self._session = aiohttp.ClientSession(
base_url=self._base_url,
headers={
"Authorization": f"Bearer {self._token}",
"Accept": "application/json",
},
timeout=aiohttp.ClientTimeout(total=30),
connector=aiohttp.TCPConnector(limit=100, limit_per_host=10),
)
return self
async def __aexit__(self, *exc: object) -> None:
if self._session is not None:
await self._session.close()
@retry(
retry=retry_if_exception_type((RetryableHTTPError, aiohttp.ClientConnectionError)),
wait=wait_exponential_jitter(initial=0.5, max=20.0),
stop=stop_after_attempt(5),
reraise=True,
)
async def _request(self, method: str, path: str, **kwargs: Any) -> dict[str, Any]:
assert self._session is not None, "client must be used as an async context manager"
async with self._semaphore:
async with self._session.request(method, path, **kwargs) as resp:
if resp.status == 429:
delay = float(resp.headers.get("Retry-After", "1"))
await asyncio.sleep(delay + random.uniform(0, 0.5))
raise RetryableHTTPError("rate limited")
if resp.status >= 500:
raise RetryableHTTPError(f"server error {resp.status}")
resp.raise_for_status() # raises ClientResponseError for 4xx
return await resp.json()
async def iter_checklist(self, study_id: str) -> AsyncIterator[dict[str, Any]]:
"""Yield raw checklist items across all pages (cursor pagination)."""
cursor: str | None = None
while True:
params: dict[str, str] = {"study_id": study_id, "limit": "200"}
if cursor:
params["cursor"] = cursor
page = await self._request("GET", "/v1/checklist-items", params=params)
for item in page.get("items", []):
yield item
cursor = page.get("next_cursor")
if not cursor:
return
async def upsert_item(
self, payload: dict[str, Any], idempotency_key: str
) -> dict[str, Any]:
"""Idempotent create-or-update keyed by a deterministic hash."""
return await self._request(
"PUT",
f"/v1/checklist-items/{payload['item_code']}",
json=payload,
headers={"Idempotency-Key": idempotency_key},
)
PUT to a deterministic resource path (/checklist-items/{item_code}) is naturally idempotent, and the Idempotency-Key header lets the server collapse retried writes that carry the same canonical payload. The key is derived from the payload hash, so an accidental double-send never produces a second audit event or a duplicate milestone.
Mapping each system into the canonical model
Each system gets a small adapter that translates its native shape and enums into a CanonicalItem. Centralizing the vocabulary here is what defeats taxonomy skew — the reconciliation logic only ever sees canonical values.
from datetime import date
_EDC_STATUS = {
"DRAFT": ItemStatus.NOT_STARTED,
"SUBMITTED": ItemStatus.IN_REVIEW,
"APPROVED": ItemStatus.COMPLETE,
"RETURNED": ItemStatus.REJECTED,
}
_CTMS_STATUS = {
"Pending": ItemStatus.NOT_STARTED,
"Pending QC": ItemStatus.IN_REVIEW,
"Verified": ItemStatus.COMPLETE,
"Rework": ItemStatus.REJECTED,
}
def from_edc(raw: dict[str, Any]) -> CanonicalItem:
return CanonicalItem(
site_id=str(raw["siteId"]),
item_code=str(raw["code"]),
checklist_version=str(raw["version"]),
status=_EDC_STATUS[raw["status"]],
evidence_doc_id=raw.get("documentId"),
source_updated_utc=datetime.fromisoformat(raw["updatedAt"]),
operator_id=str(raw["lastEditedBy"]),
)
def from_ctms(raw: dict[str, Any]) -> CanonicalItem:
planned = raw.get("plannedActivation")
return CanonicalItem(
site_id=str(raw["site"]),
item_code=str(raw["taskCode"]),
checklist_version=str(raw["formVersion"]),
status=_CTMS_STATUS[raw["state"]],
milestone_signed_off=bool(raw.get("signedOff", False)),
planned_activation_date=date.fromisoformat(planned).isoformat() if planned else None,
source_updated_utc=datetime.fromisoformat(raw["modifiedUtc"]),
operator_id=str(raw["modifiedBy"]),
)
A missing or unexpected status raises a KeyError here rather than silently defaulting — an unknown enum is a mapping defect that belongs in categorizing validation errors in regulatory document pipelines, not a value to guess at.
Tamper-evident audit trail (21 CFR Part 11 / ALCOA+)
Every sync decision — including no-ops and conflicts — produces an audit entry. To make the trail tamper-evident we hash-chain entries: each entry includes the hash of the previous one, so any retroactive edit breaks the chain from that point forward. This satisfies the 21 CFR Part 11 expectation of a secure, computer-generated, time-stamped audit trail and the ALCOA+ attributes (Attributable, Legible, Contemporaneous, Original, Accurate, Complete, Consistent, Enduring, Available).
class AuditEntry(BaseModel):
sequence: int
correlation_id: str
site_id: str
item_code: str
decision: Decision
target: Optional[System]
payload_hash: str
operator_id: str
recorded_utc: datetime
previous_hash: str
entry_hash: str = ""
def compute_hash(self) -> str:
body = self.model_dump(exclude={"entry_hash"}, mode="json")
canonical = json.dumps(body, sort_keys=True, separators=(",", ":"))
return hashlib.sha256(canonical.encode("utf-8")).hexdigest()
class AuditLedger:
"""Append-only, hash-chained ledger. Concurrency-safe via a single lock."""
GENESIS = "0" * 64
def __init__(self) -> None:
self._tip = self.GENESIS
self._sequence = 0
self._lock = asyncio.Lock()
self._entries: list[AuditEntry] = []
async def append(
self, rec: Reconciliation, correlation_id: str, payload_hash: str, operator_id: str
) -> AuditEntry:
async with self._lock:
self._sequence += 1
entry = AuditEntry(
sequence=self._sequence,
correlation_id=correlation_id,
site_id=rec.site_id,
item_code=rec.item_code,
decision=rec.decision,
target=rec.target,
payload_hash=payload_hash,
operator_id=operator_id,
recorded_utc=datetime.now(timezone.utc),
previous_hash=self._tip,
)
sealed = entry.model_copy(update={"entry_hash": entry.compute_hash()})
self._tip = sealed.entry_hash
self._entries.append(sealed)
return sealed
def verify(self) -> bool:
"""Re-walk the chain; returns False if any entry was altered."""
prev = self.GENESIS
for entry in self._entries:
if entry.previous_hash != prev:
return False
if entry.compute_hash() != entry.entry_hash:
return False
prev = entry.entry_hash
return True
In production the ledger is not an in-memory list. Persist sealed entries to write-once storage — an append-only table with row-level security, or object storage with an immutability lock — and run verify() as a scheduled integrity check. The in-memory version above is the reference implementation you unit-test against.
Orchestrating a full sync run
The orchestrator pairs items by (site_id, item_code), reconciles each, performs idempotent upserts for authoritative decisions, quarantines conflicts, and records every outcome. Concurrency is bounded by the client semaphores; asyncio.gather collects results without leaking exceptions because each task wraps its own error handling.
import logging
logger = logging.getLogger("edc_ctms_sync")
def _idempotency_key(item: CanonicalItem) -> str:
return f"{item.site_id}:{item.item_code}:{item.canonical_hash()}"
async def sync_study(
study_id: str,
correlation_id: str,
ledger: AuditLedger,
baselines: dict[tuple[str, str], str],
) -> list[AuditEntry]:
"""Reconcile one study's checklist between EDC and CTMS.
``baselines`` maps (site_id, item_code) to the canonical hash recorded at
the previous successful sync; it is the basis for conflict detection.
"""
async with ClinicalApiClient(System.EDC) as edc, ClinicalApiClient(System.CTMS) as ctms:
edc_items = {
(i.site_id, i.item_code): i
async for i in _mapped(edc.iter_checklist(study_id), from_edc)
}
ctms_items = {
(i.site_id, i.item_code): i
async for i in _mapped(ctms.iter_checklist(study_id), from_ctms)
}
results: list[AuditEntry] = []
for key in edc_items.keys() & ctms_items.keys():
edc_item, ctms_item = edc_items[key], ctms_items[key]
rec = reconcile(edc_item, ctms_item, baselines.get(key))
try:
if rec.decision in (Decision.EDC_AUTHORITATIVE, Decision.CTMS_AUTHORITATIVE):
assert rec.desired is not None and rec.target is not None
target = ctms if rec.target is System.CTMS else edc
payload = rec.desired.model_dump(mode="json")
await target.upsert_item(payload, _idempotency_key(rec.desired))
baselines[key] = rec.desired.canonical_hash()
elif rec.decision == Decision.CONFLICT:
logger.warning(
"CONFLICT site=%s item=%s corr=%s", rec.site_id, rec.item_code, correlation_id
)
else: # IN_SYNC
baselines[key] = edc_item.canonical_hash()
except aiohttp.ClientResponseError as exc:
logger.error(
"UPSERT_FAILED site=%s item=%s status=%s", rec.site_id, rec.item_code, exc.status
)
rec = rec.model_copy(update={"decision": Decision.CONFLICT, "reason": "upsert failed"})
payload_hash = rec.desired.canonical_hash() if rec.desired else edc_item.canonical_hash()
entry = await ledger.append(rec, correlation_id, payload_hash, edc_item.operator_id)
results.append(entry)
return results
async def _mapped(
source: AsyncIterator[dict[str, Any]], mapper: "Callable[[dict[str, Any]], CanonicalItem]"
) -> AsyncIterator[CanonicalItem]:
async for raw in source:
yield mapper(raw)
Items present in only one system (a key in the symmetric difference rather than the intersection) are not silently dropped — extend the loop to emit a CONFLICT-style audit entry for them so gap analysis can flag the asymmetry. That cross-system completeness check is exactly the work covered in the parent Checklist Sync & Gap Analysis cluster.
Operational checklist
Before promoting this engine to a regulated environment:
- Credentials for both systems are injected from a secrets manager into
EDC_API_TOKEN/CTMS_API_TOKEN, never committed. - The
FIELD_OWNERprecedence map is signed off by clinical operations and reflects who actually owns each data point. - Conflicts route to a human-review queue with SLA, not an auto-resolve fallback.
- Audit entries persist to write-once storage and
verify()runs on a schedule. - Baselines are stored durably so conflict detection survives restarts.
- Rate limits and
Retry-Afterhandling are validated against each vendor’s documented quotas. - All timestamps are timezone-aware UTC; naive datetimes are rejected at the model boundary.
FAQ
Should the EDC or the CTMS be the source of truth?
Neither, globally. Use field-level precedence: completion evidence and document status are authoritative in the EDC, while milestone sign-off and planned dates are owned by study managers in the CTMS. A blanket “one system wins” policy inevitably overwrites legitimate edits in the other.
How does the engine avoid duplicate writes on retry?
Upserts use PUT against a deterministic resource path plus an Idempotency-Key derived from the canonical payload hash. A retried request carries the identical key, so the server collapses it to a single effect and the audit ledger records one event, not two.
What makes the audit trail tamper-evident rather than just append-only?
Each entry embeds the hash of the previous entry, forming a chain. Editing any historical field changes that entry’s hash and breaks every link after it, which verify() detects. Combined with write-once storage, this meets the 21 CFR Part 11 requirement for a secure, time-stamped, computer-generated audit trail.
How are genuine conflicts handled?
When both systems have changed fields they each own since the last successful sync, the reconciler returns CONFLICT and writes nothing. The item is quarantined for human adjudication and logged in the audit ledger, preserving ALCOA+ accuracy and completeness instead of guessing.
Does this scale to many sites and large checklists?
Yes. The API client paginates with cursors, bounds concurrency with a semaphore, and streams items through async iterators rather than buffering whole studies. For very large multi-site onboarding waves, drive the orchestrator from the batch patterns in handling async batch processing for multi-site document ingestion.