""" Consent & Scheduling Agent — A2A sub-agent that handles post-recruitment consent workflow and appointment scheduling. Triggered as a handoff from the Recruitment Agent. A2A task message format follows the Google A2A spec: {"task_id": str, "type": "CONSENT_REQUEST" | "SCHEDULE_REQUEST", "payload": {...}} """ import uuid from datetime import datetime, timedelta from typing import Optional from llm_client import chat # In-memory consent + scheduling store (production: Neo4j or Redis) _consent_records: dict[str, dict] = {} _schedule_records: dict[str, dict] = {} # ── Consent status values ────────────────────────────────────────────────────── CONSENT_PENDING = "PENDING" CONSENT_SENT = "SENT" CONSENT_SIGNED = "SIGNED" CONSENT_DECLINED = "DECLINED" CONSENT_EXPIRED = "EXPIRED" # ── A2A task receiver ────────────────────────────────────────────────────────── def receive_a2a_task(task: dict) -> dict: """ Entry point for A2A inter-agent handoffs. Accepts tasks from the Recruitment Agent and routes to consent or scheduling flows. """ task_type = task.get("type", "") payload = task.get("payload", {}) task_id = task.get("task_id", str(uuid.uuid4())) if task_type == "CONSENT_REQUEST": return initiate_consent( patient_id=payload["patient_id"], nct_id=payload["nct_id"], trial_title=payload.get("trial_title", ""), match_score=payload.get("match_score", 0.0), task_id=task_id, ) elif task_type == "SCHEDULE_REQUEST": return schedule_screening( patient_id=payload["patient_id"], nct_id=payload["nct_id"], site_city=payload.get("site_city", ""), site_state=payload.get("site_state", ""), task_id=task_id, ) else: return {"error": "UNKNOWN_TASK_TYPE", "task_id": task_id, "received_type": task_type} # ── Consent flow ─────────────────────────────────────────────────────────────── def initiate_consent( patient_id: str, nct_id: str, trial_title: str, match_score: float = 0.0, task_id: str | None = None, ) -> dict: """Create a consent record and generate the consent document.""" record_id = task_id or str(uuid.uuid4()) expires_at = (datetime.utcnow() + timedelta(days=30)).isoformat() consent_doc = _generate_consent_document(patient_id, nct_id, trial_title) record = { "consent_id": record_id, "patient_id": patient_id, "nct_id": nct_id, "trial_title": trial_title, "match_score": match_score, "status": CONSENT_SENT, "consent_document": consent_doc, "created_at": datetime.utcnow().isoformat(), "expires_at": expires_at, "signed_at": None, "a2a_source": "recruitment_agent", } _consent_records[record_id] = record return {"consent_id": record_id, "status": CONSENT_SENT, "expires_at": expires_at} def update_consent_status(consent_id: str, status: str, notes: str = "") -> dict: record = _consent_records.get(consent_id) if not record: return {"error": "CONSENT_NOT_FOUND", "consent_id": consent_id} record["status"] = status if status == CONSENT_SIGNED: record["signed_at"] = datetime.utcnow().isoformat() if notes: record["notes"] = notes # If consent signed, auto-trigger scheduling handoff if status == CONSENT_SIGNED: _trigger_scheduling_handoff(record) return record def get_consent_record(consent_id: str) -> dict | None: return _consent_records.get(consent_id) def list_consent_records(patient_id: str | None = None) -> list[dict]: records = list(_consent_records.values()) if patient_id: records = [r for r in records if r["patient_id"] == patient_id] return sorted(records, key=lambda r: r["created_at"], reverse=True) # ── Scheduling flow ──────────────────────────────────────────────────────────── def schedule_screening( patient_id: str, nct_id: str, site_city: str = "", site_state: str = "", task_id: str | None = None, ) -> dict: """Create a screening appointment slot.""" appt_id = task_id or str(uuid.uuid4()) # Default slot: next business weekday at 10am proposed_dt = _next_business_day() appt = { "appointment_id": appt_id, "patient_id": patient_id, "nct_id": nct_id, "site_city": site_city, "site_state": site_state, "proposed_datetime": proposed_dt, "status": "PROPOSED", "created_at": datetime.utcnow().isoformat(), "a2a_source": "consent_agent", } _schedule_records[appt_id] = appt return {"appointment_id": appt_id, "proposed_datetime": proposed_dt, "status": "PROPOSED"} def confirm_appointment(appt_id: str) -> dict: appt = _schedule_records.get(appt_id) if not appt: return {"error": "APPOINTMENT_NOT_FOUND"} appt["status"] = "CONFIRMED" appt["confirmed_at"] = datetime.utcnow().isoformat() return appt def list_appointments(patient_id: str | None = None) -> list[dict]: appts = list(_schedule_records.values()) if patient_id: appts = [a for a in appts if a["patient_id"] == patient_id] return sorted(appts, key=lambda a: a["created_at"], reverse=True) # ── Helpers ──────────────────────────────────────────────────────────────────── def _trigger_scheduling_handoff(consent_record: dict): """Auto-schedule after consent signed — A2A internal handoff.""" schedule_screening( patient_id=consent_record["patient_id"], nct_id=consent_record["nct_id"], task_id=f"sched_{consent_record['consent_id']}", ) def _next_business_day() -> str: dt = datetime.utcnow() + timedelta(days=3) while dt.weekday() >= 5: # skip Sat/Sun dt += timedelta(days=1) return dt.replace(hour=10, minute=0, second=0, microsecond=0).isoformat() + "Z" def _generate_consent_document(patient_id: str, nct_id: str, trial_title: str) -> str: prompt = f"""Generate a concise, plain-language informed consent document (ICF) for clinical trial participation. Trial: {trial_title} NCT ID: {nct_id} Patient ID: {patient_id} The document should cover in 4 short sections: 1. What this study is about (2-3 sentences) 2. What you will be asked to do (bullet points) 3. Possible risks and benefits (bullet points) 4. Your rights as a participant (2-3 sentences) Use plain language (8th grade reading level). End with a signature block.""" try: return chat([{"role": "user", "content": prompt}], temperature=0.3, max_tokens=600) except Exception: return f"Informed Consent Document\nTrial: {trial_title} ({nct_id})\n\nPlease review this document carefully before signing." def get_consent_stats() -> dict: all_records = list(_consent_records.values()) return { "total": len(all_records), "sent": sum(1 for r in all_records if r["status"] == CONSENT_SENT), "signed": sum(1 for r in all_records if r["status"] == CONSENT_SIGNED), "declined": sum(1 for r in all_records if r["status"] == CONSENT_DECLINED), "appointments_scheduled": len(_schedule_records), }