Spaces:
Running
Running
File size: 7,861 Bytes
59abb4f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 | """
Consent & Scheduling Agent β A2A sub-agent that handles post-recruitment consent
workflow and appointment scheduling. Triggered as a handoff from the Recruitment Agent.
A2A task message format follows the Google A2A spec:
{"task_id": str, "type": "CONSENT_REQUEST" | "SCHEDULE_REQUEST", "payload": {...}}
"""
import uuid
from datetime import datetime, timedelta
from typing import Optional
from llm_client import chat
# In-memory consent + scheduling store (production: Neo4j or Redis)
_consent_records: dict[str, dict] = {}
_schedule_records: dict[str, dict] = {}
# ββ Consent status values ββββββββββββββββββββββββββββββββββββββββββββββββββββββ
CONSENT_PENDING = "PENDING"
CONSENT_SENT = "SENT"
CONSENT_SIGNED = "SIGNED"
CONSENT_DECLINED = "DECLINED"
CONSENT_EXPIRED = "EXPIRED"
# ββ A2A task receiver ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def receive_a2a_task(task: dict) -> dict:
"""
Entry point for A2A inter-agent handoffs.
Accepts tasks from the Recruitment Agent and routes to consent or scheduling flows.
"""
task_type = task.get("type", "")
payload = task.get("payload", {})
task_id = task.get("task_id", str(uuid.uuid4()))
if task_type == "CONSENT_REQUEST":
return initiate_consent(
patient_id=payload["patient_id"],
nct_id=payload["nct_id"],
trial_title=payload.get("trial_title", ""),
match_score=payload.get("match_score", 0.0),
task_id=task_id,
)
elif task_type == "SCHEDULE_REQUEST":
return schedule_screening(
patient_id=payload["patient_id"],
nct_id=payload["nct_id"],
site_city=payload.get("site_city", ""),
site_state=payload.get("site_state", ""),
task_id=task_id,
)
else:
return {"error": "UNKNOWN_TASK_TYPE", "task_id": task_id, "received_type": task_type}
# ββ Consent flow βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def initiate_consent(
patient_id: str,
nct_id: str,
trial_title: str,
match_score: float = 0.0,
task_id: str | None = None,
) -> dict:
"""Create a consent record and generate the consent document."""
record_id = task_id or str(uuid.uuid4())
expires_at = (datetime.utcnow() + timedelta(days=30)).isoformat()
consent_doc = _generate_consent_document(patient_id, nct_id, trial_title)
record = {
"consent_id": record_id,
"patient_id": patient_id,
"nct_id": nct_id,
"trial_title": trial_title,
"match_score": match_score,
"status": CONSENT_SENT,
"consent_document": consent_doc,
"created_at": datetime.utcnow().isoformat(),
"expires_at": expires_at,
"signed_at": None,
"a2a_source": "recruitment_agent",
}
_consent_records[record_id] = record
return {"consent_id": record_id, "status": CONSENT_SENT, "expires_at": expires_at}
def update_consent_status(consent_id: str, status: str, notes: str = "") -> dict:
record = _consent_records.get(consent_id)
if not record:
return {"error": "CONSENT_NOT_FOUND", "consent_id": consent_id}
record["status"] = status
if status == CONSENT_SIGNED:
record["signed_at"] = datetime.utcnow().isoformat()
if notes:
record["notes"] = notes
# If consent signed, auto-trigger scheduling handoff
if status == CONSENT_SIGNED:
_trigger_scheduling_handoff(record)
return record
def get_consent_record(consent_id: str) -> dict | None:
return _consent_records.get(consent_id)
def list_consent_records(patient_id: str | None = None) -> list[dict]:
records = list(_consent_records.values())
if patient_id:
records = [r for r in records if r["patient_id"] == patient_id]
return sorted(records, key=lambda r: r["created_at"], reverse=True)
# ββ Scheduling flow ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def schedule_screening(
patient_id: str,
nct_id: str,
site_city: str = "",
site_state: str = "",
task_id: str | None = None,
) -> dict:
"""Create a screening appointment slot."""
appt_id = task_id or str(uuid.uuid4())
# Default slot: next business weekday at 10am
proposed_dt = _next_business_day()
appt = {
"appointment_id": appt_id,
"patient_id": patient_id,
"nct_id": nct_id,
"site_city": site_city,
"site_state": site_state,
"proposed_datetime": proposed_dt,
"status": "PROPOSED",
"created_at": datetime.utcnow().isoformat(),
"a2a_source": "consent_agent",
}
_schedule_records[appt_id] = appt
return {"appointment_id": appt_id, "proposed_datetime": proposed_dt, "status": "PROPOSED"}
def confirm_appointment(appt_id: str) -> dict:
appt = _schedule_records.get(appt_id)
if not appt:
return {"error": "APPOINTMENT_NOT_FOUND"}
appt["status"] = "CONFIRMED"
appt["confirmed_at"] = datetime.utcnow().isoformat()
return appt
def list_appointments(patient_id: str | None = None) -> list[dict]:
appts = list(_schedule_records.values())
if patient_id:
appts = [a for a in appts if a["patient_id"] == patient_id]
return sorted(appts, key=lambda a: a["created_at"], reverse=True)
# ββ Helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _trigger_scheduling_handoff(consent_record: dict):
"""Auto-schedule after consent signed β A2A internal handoff."""
schedule_screening(
patient_id=consent_record["patient_id"],
nct_id=consent_record["nct_id"],
task_id=f"sched_{consent_record['consent_id']}",
)
def _next_business_day() -> str:
dt = datetime.utcnow() + timedelta(days=3)
while dt.weekday() >= 5: # skip Sat/Sun
dt += timedelta(days=1)
return dt.replace(hour=10, minute=0, second=0, microsecond=0).isoformat() + "Z"
def _generate_consent_document(patient_id: str, nct_id: str, trial_title: str) -> str:
prompt = f"""Generate a concise, plain-language informed consent document (ICF) for clinical trial participation.
Trial: {trial_title}
NCT ID: {nct_id}
Patient ID: {patient_id}
The document should cover in 4 short sections:
1. What this study is about (2-3 sentences)
2. What you will be asked to do (bullet points)
3. Possible risks and benefits (bullet points)
4. Your rights as a participant (2-3 sentences)
Use plain language (8th grade reading level). End with a signature block."""
try:
return chat([{"role": "user", "content": prompt}], temperature=0.3, max_tokens=600)
except Exception:
return f"Informed Consent Document\nTrial: {trial_title} ({nct_id})\n\nPlease review this document carefully before signing."
def get_consent_stats() -> dict:
all_records = list(_consent_records.values())
return {
"total": len(all_records),
"sent": sum(1 for r in all_records if r["status"] == CONSENT_SENT),
"signed": sum(1 for r in all_records if r["status"] == CONSENT_SIGNED),
"declined": sum(1 for r in all_records if r["status"] == CONSENT_DECLINED),
"appointments_scheduled": len(_schedule_records),
}
|