CTA / backend /consent_agent.py
TheQuantEd's picture
Initial deployment: ClinicalMatch AI v2.0 β€” FHIR R4 Β· MCP (9 tools) Β· A2A workflow Β· SHARP compliance Β· 100k synthetic patients Β· Neo4j graph Β· GraphRAG chatbot
59abb4f
"""
Consent & Scheduling Agent β€” A2A sub-agent that handles post-recruitment consent
workflow and appointment scheduling. Triggered as a handoff from the Recruitment Agent.
A2A task message format follows the Google A2A spec:
{"task_id": str, "type": "CONSENT_REQUEST" | "SCHEDULE_REQUEST", "payload": {...}}
"""
import uuid
from datetime import datetime, timedelta
from typing import Optional
from llm_client import chat
# In-memory consent + scheduling store (production: Neo4j or Redis)
_consent_records: dict[str, dict] = {}
_schedule_records: dict[str, dict] = {}
# ── Consent status values ──────────────────────────────────────────────────────
CONSENT_PENDING = "PENDING"
CONSENT_SENT = "SENT"
CONSENT_SIGNED = "SIGNED"
CONSENT_DECLINED = "DECLINED"
CONSENT_EXPIRED = "EXPIRED"
# ── A2A task receiver ──────────────────────────────────────────────────────────
def receive_a2a_task(task: dict) -> dict:
"""
Entry point for A2A inter-agent handoffs.
Accepts tasks from the Recruitment Agent and routes to consent or scheduling flows.
"""
task_type = task.get("type", "")
payload = task.get("payload", {})
task_id = task.get("task_id", str(uuid.uuid4()))
if task_type == "CONSENT_REQUEST":
return initiate_consent(
patient_id=payload["patient_id"],
nct_id=payload["nct_id"],
trial_title=payload.get("trial_title", ""),
match_score=payload.get("match_score", 0.0),
task_id=task_id,
)
elif task_type == "SCHEDULE_REQUEST":
return schedule_screening(
patient_id=payload["patient_id"],
nct_id=payload["nct_id"],
site_city=payload.get("site_city", ""),
site_state=payload.get("site_state", ""),
task_id=task_id,
)
else:
return {"error": "UNKNOWN_TASK_TYPE", "task_id": task_id, "received_type": task_type}
# ── Consent flow ───────────────────────────────────────────────────────────────
def initiate_consent(
patient_id: str,
nct_id: str,
trial_title: str,
match_score: float = 0.0,
task_id: str | None = None,
) -> dict:
"""Create a consent record and generate the consent document."""
record_id = task_id or str(uuid.uuid4())
expires_at = (datetime.utcnow() + timedelta(days=30)).isoformat()
consent_doc = _generate_consent_document(patient_id, nct_id, trial_title)
record = {
"consent_id": record_id,
"patient_id": patient_id,
"nct_id": nct_id,
"trial_title": trial_title,
"match_score": match_score,
"status": CONSENT_SENT,
"consent_document": consent_doc,
"created_at": datetime.utcnow().isoformat(),
"expires_at": expires_at,
"signed_at": None,
"a2a_source": "recruitment_agent",
}
_consent_records[record_id] = record
return {"consent_id": record_id, "status": CONSENT_SENT, "expires_at": expires_at}
def update_consent_status(consent_id: str, status: str, notes: str = "") -> dict:
record = _consent_records.get(consent_id)
if not record:
return {"error": "CONSENT_NOT_FOUND", "consent_id": consent_id}
record["status"] = status
if status == CONSENT_SIGNED:
record["signed_at"] = datetime.utcnow().isoformat()
if notes:
record["notes"] = notes
# If consent signed, auto-trigger scheduling handoff
if status == CONSENT_SIGNED:
_trigger_scheduling_handoff(record)
return record
def get_consent_record(consent_id: str) -> dict | None:
return _consent_records.get(consent_id)
def list_consent_records(patient_id: str | None = None) -> list[dict]:
records = list(_consent_records.values())
if patient_id:
records = [r for r in records if r["patient_id"] == patient_id]
return sorted(records, key=lambda r: r["created_at"], reverse=True)
# ── Scheduling flow ────────────────────────────────────────────────────────────
def schedule_screening(
patient_id: str,
nct_id: str,
site_city: str = "",
site_state: str = "",
task_id: str | None = None,
) -> dict:
"""Create a screening appointment slot."""
appt_id = task_id or str(uuid.uuid4())
# Default slot: next business weekday at 10am
proposed_dt = _next_business_day()
appt = {
"appointment_id": appt_id,
"patient_id": patient_id,
"nct_id": nct_id,
"site_city": site_city,
"site_state": site_state,
"proposed_datetime": proposed_dt,
"status": "PROPOSED",
"created_at": datetime.utcnow().isoformat(),
"a2a_source": "consent_agent",
}
_schedule_records[appt_id] = appt
return {"appointment_id": appt_id, "proposed_datetime": proposed_dt, "status": "PROPOSED"}
def confirm_appointment(appt_id: str) -> dict:
appt = _schedule_records.get(appt_id)
if not appt:
return {"error": "APPOINTMENT_NOT_FOUND"}
appt["status"] = "CONFIRMED"
appt["confirmed_at"] = datetime.utcnow().isoformat()
return appt
def list_appointments(patient_id: str | None = None) -> list[dict]:
appts = list(_schedule_records.values())
if patient_id:
appts = [a for a in appts if a["patient_id"] == patient_id]
return sorted(appts, key=lambda a: a["created_at"], reverse=True)
# ── Helpers ────────────────────────────────────────────────────────────────────
def _trigger_scheduling_handoff(consent_record: dict):
"""Auto-schedule after consent signed β€” A2A internal handoff."""
schedule_screening(
patient_id=consent_record["patient_id"],
nct_id=consent_record["nct_id"],
task_id=f"sched_{consent_record['consent_id']}",
)
def _next_business_day() -> str:
dt = datetime.utcnow() + timedelta(days=3)
while dt.weekday() >= 5: # skip Sat/Sun
dt += timedelta(days=1)
return dt.replace(hour=10, minute=0, second=0, microsecond=0).isoformat() + "Z"
def _generate_consent_document(patient_id: str, nct_id: str, trial_title: str) -> str:
prompt = f"""Generate a concise, plain-language informed consent document (ICF) for clinical trial participation.
Trial: {trial_title}
NCT ID: {nct_id}
Patient ID: {patient_id}
The document should cover in 4 short sections:
1. What this study is about (2-3 sentences)
2. What you will be asked to do (bullet points)
3. Possible risks and benefits (bullet points)
4. Your rights as a participant (2-3 sentences)
Use plain language (8th grade reading level). End with a signature block."""
try:
return chat([{"role": "user", "content": prompt}], temperature=0.3, max_tokens=600)
except Exception:
return f"Informed Consent Document\nTrial: {trial_title} ({nct_id})\n\nPlease review this document carefully before signing."
def get_consent_stats() -> dict:
all_records = list(_consent_records.values())
return {
"total": len(all_records),
"sent": sum(1 for r in all_records if r["status"] == CONSENT_SENT),
"signed": sum(1 for r in all_records if r["status"] == CONSENT_SIGNED),
"declined": sum(1 for r in all_records if r["status"] == CONSENT_DECLINED),
"appointments_scheduled": len(_schedule_records),
}