Spaces:
Running
Running
Initial deployment: ClinicalMatch AI v2.0 β FHIR R4 Β· MCP (9 tools) Β· A2A workflow Β· SHARP compliance Β· 100k synthetic patients Β· Neo4j graph Β· GraphRAG chatbot
59abb4f | """ | |
| Consent & Scheduling Agent β A2A sub-agent that handles post-recruitment consent | |
| workflow and appointment scheduling. Triggered as a handoff from the Recruitment Agent. | |
| A2A task message format follows the Google A2A spec: | |
| {"task_id": str, "type": "CONSENT_REQUEST" | "SCHEDULE_REQUEST", "payload": {...}} | |
| """ | |
| import uuid | |
| from datetime import datetime, timedelta | |
| from typing import Optional | |
| from llm_client import chat | |
| # In-memory consent + scheduling store (production: Neo4j or Redis) | |
| _consent_records: dict[str, dict] = {} | |
| _schedule_records: dict[str, dict] = {} | |
| # ββ Consent status values ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| CONSENT_PENDING = "PENDING" | |
| CONSENT_SENT = "SENT" | |
| CONSENT_SIGNED = "SIGNED" | |
| CONSENT_DECLINED = "DECLINED" | |
| CONSENT_EXPIRED = "EXPIRED" | |
| # ββ A2A task receiver ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def receive_a2a_task(task: dict) -> dict: | |
| """ | |
| Entry point for A2A inter-agent handoffs. | |
| Accepts tasks from the Recruitment Agent and routes to consent or scheduling flows. | |
| """ | |
| task_type = task.get("type", "") | |
| payload = task.get("payload", {}) | |
| task_id = task.get("task_id", str(uuid.uuid4())) | |
| if task_type == "CONSENT_REQUEST": | |
| return initiate_consent( | |
| patient_id=payload["patient_id"], | |
| nct_id=payload["nct_id"], | |
| trial_title=payload.get("trial_title", ""), | |
| match_score=payload.get("match_score", 0.0), | |
| task_id=task_id, | |
| ) | |
| elif task_type == "SCHEDULE_REQUEST": | |
| return schedule_screening( | |
| patient_id=payload["patient_id"], | |
| nct_id=payload["nct_id"], | |
| site_city=payload.get("site_city", ""), | |
| site_state=payload.get("site_state", ""), | |
| task_id=task_id, | |
| ) | |
| else: | |
| return {"error": "UNKNOWN_TASK_TYPE", "task_id": task_id, "received_type": task_type} | |
| # ββ Consent flow βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def initiate_consent( | |
| patient_id: str, | |
| nct_id: str, | |
| trial_title: str, | |
| match_score: float = 0.0, | |
| task_id: str | None = None, | |
| ) -> dict: | |
| """Create a consent record and generate the consent document.""" | |
| record_id = task_id or str(uuid.uuid4()) | |
| expires_at = (datetime.utcnow() + timedelta(days=30)).isoformat() | |
| consent_doc = _generate_consent_document(patient_id, nct_id, trial_title) | |
| record = { | |
| "consent_id": record_id, | |
| "patient_id": patient_id, | |
| "nct_id": nct_id, | |
| "trial_title": trial_title, | |
| "match_score": match_score, | |
| "status": CONSENT_SENT, | |
| "consent_document": consent_doc, | |
| "created_at": datetime.utcnow().isoformat(), | |
| "expires_at": expires_at, | |
| "signed_at": None, | |
| "a2a_source": "recruitment_agent", | |
| } | |
| _consent_records[record_id] = record | |
| return {"consent_id": record_id, "status": CONSENT_SENT, "expires_at": expires_at} | |
| def update_consent_status(consent_id: str, status: str, notes: str = "") -> dict: | |
| record = _consent_records.get(consent_id) | |
| if not record: | |
| return {"error": "CONSENT_NOT_FOUND", "consent_id": consent_id} | |
| record["status"] = status | |
| if status == CONSENT_SIGNED: | |
| record["signed_at"] = datetime.utcnow().isoformat() | |
| if notes: | |
| record["notes"] = notes | |
| # If consent signed, auto-trigger scheduling handoff | |
| if status == CONSENT_SIGNED: | |
| _trigger_scheduling_handoff(record) | |
| return record | |
| def get_consent_record(consent_id: str) -> dict | None: | |
| return _consent_records.get(consent_id) | |
| def list_consent_records(patient_id: str | None = None) -> list[dict]: | |
| records = list(_consent_records.values()) | |
| if patient_id: | |
| records = [r for r in records if r["patient_id"] == patient_id] | |
| return sorted(records, key=lambda r: r["created_at"], reverse=True) | |
| # ββ Scheduling flow ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def schedule_screening( | |
| patient_id: str, | |
| nct_id: str, | |
| site_city: str = "", | |
| site_state: str = "", | |
| task_id: str | None = None, | |
| ) -> dict: | |
| """Create a screening appointment slot.""" | |
| appt_id = task_id or str(uuid.uuid4()) | |
| # Default slot: next business weekday at 10am | |
| proposed_dt = _next_business_day() | |
| appt = { | |
| "appointment_id": appt_id, | |
| "patient_id": patient_id, | |
| "nct_id": nct_id, | |
| "site_city": site_city, | |
| "site_state": site_state, | |
| "proposed_datetime": proposed_dt, | |
| "status": "PROPOSED", | |
| "created_at": datetime.utcnow().isoformat(), | |
| "a2a_source": "consent_agent", | |
| } | |
| _schedule_records[appt_id] = appt | |
| return {"appointment_id": appt_id, "proposed_datetime": proposed_dt, "status": "PROPOSED"} | |
| def confirm_appointment(appt_id: str) -> dict: | |
| appt = _schedule_records.get(appt_id) | |
| if not appt: | |
| return {"error": "APPOINTMENT_NOT_FOUND"} | |
| appt["status"] = "CONFIRMED" | |
| appt["confirmed_at"] = datetime.utcnow().isoformat() | |
| return appt | |
| def list_appointments(patient_id: str | None = None) -> list[dict]: | |
| appts = list(_schedule_records.values()) | |
| if patient_id: | |
| appts = [a for a in appts if a["patient_id"] == patient_id] | |
| return sorted(appts, key=lambda a: a["created_at"], reverse=True) | |
| # ββ Helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _trigger_scheduling_handoff(consent_record: dict): | |
| """Auto-schedule after consent signed β A2A internal handoff.""" | |
| schedule_screening( | |
| patient_id=consent_record["patient_id"], | |
| nct_id=consent_record["nct_id"], | |
| task_id=f"sched_{consent_record['consent_id']}", | |
| ) | |
| def _next_business_day() -> str: | |
| dt = datetime.utcnow() + timedelta(days=3) | |
| while dt.weekday() >= 5: # skip Sat/Sun | |
| dt += timedelta(days=1) | |
| return dt.replace(hour=10, minute=0, second=0, microsecond=0).isoformat() + "Z" | |
| def _generate_consent_document(patient_id: str, nct_id: str, trial_title: str) -> str: | |
| prompt = f"""Generate a concise, plain-language informed consent document (ICF) for clinical trial participation. | |
| Trial: {trial_title} | |
| NCT ID: {nct_id} | |
| Patient ID: {patient_id} | |
| The document should cover in 4 short sections: | |
| 1. What this study is about (2-3 sentences) | |
| 2. What you will be asked to do (bullet points) | |
| 3. Possible risks and benefits (bullet points) | |
| 4. Your rights as a participant (2-3 sentences) | |
| Use plain language (8th grade reading level). End with a signature block.""" | |
| try: | |
| return chat([{"role": "user", "content": prompt}], temperature=0.3, max_tokens=600) | |
| except Exception: | |
| return f"Informed Consent Document\nTrial: {trial_title} ({nct_id})\n\nPlease review this document carefully before signing." | |
| def get_consent_stats() -> dict: | |
| all_records = list(_consent_records.values()) | |
| return { | |
| "total": len(all_records), | |
| "sent": sum(1 for r in all_records if r["status"] == CONSENT_SENT), | |
| "signed": sum(1 for r in all_records if r["status"] == CONSENT_SIGNED), | |
| "declined": sum(1 for r in all_records if r["status"] == CONSENT_DECLINED), | |
| "appointments_scheduled": len(_schedule_records), | |
| } | |