Spaces:
Sleeping
Sleeping
Initial deployment: ClinicalMatch AI v2.0 β FHIR R4 Β· MCP (9 tools) Β· A2A workflow Β· SHARP compliance Β· 100k synthetic patients Β· Neo4j graph Β· GraphRAG chatbot
59abb4f | """ | |
| FHIR R4 Server Client β connects to any FHIR R4 endpoint. | |
| Default: HAPI FHIR public sandbox (hapi.fhir.org/baseR4) | |
| Production: any EHR FHIR endpoint secured with SMART on FHIR OAuth2. | |
| SMART on FHIR token flow: | |
| 1. Client credentials grant β POST to FHIR_TOKEN_ENDPOINT | |
| 2. Bearer token attached to every FHIR API request | |
| 3. Token cached until expiry, then refreshed automatically | |
| """ | |
| import os | |
| import time | |
| import httpx | |
| from typing import Optional | |
| from dotenv import load_dotenv | |
| from fhir_adapter import ( | |
| FHIRPatient, FHIRCondition, FHIRObservation, FHIRMedication, | |
| FHIRCoding, build_patient_profile, | |
| ) | |
| load_dotenv() | |
| FHIR_BASE_URL = os.getenv("FHIR_BASE_URL", "https://hapi.fhir.org/baseR4") | |
| FHIR_TOKEN_ENDPOINT = os.getenv("FHIR_TOKEN_ENDPOINT", "") | |
| FHIR_CLIENT_ID = os.getenv("FHIR_CLIENT_ID", "") | |
| FHIR_CLIENT_SECRET = os.getenv("FHIR_CLIENT_SECRET", "") | |
| FHIR_STATIC_TOKEN = os.getenv("FHIR_TOKEN", "") # pre-issued bearer token | |
| _token_cache: dict = {"token": "", "expires_at": 0.0} | |
| # ββ SMART on FHIR token acquisition ββββββββββββββββββββββββββββββββββββββββββ | |
| def _get_smart_token() -> str: | |
| """ | |
| Obtain a SMART on FHIR bearer token via client credentials grant. | |
| Returns cached token if still valid. | |
| """ | |
| if FHIR_STATIC_TOKEN: | |
| return FHIR_STATIC_TOKEN | |
| if not FHIR_TOKEN_ENDPOINT: | |
| return "" | |
| if time.time() < _token_cache["expires_at"] - 30: | |
| return _token_cache["token"] | |
| try: | |
| resp = httpx.post( | |
| FHIR_TOKEN_ENDPOINT, | |
| data={ | |
| "grant_type": "client_credentials", | |
| "client_id": FHIR_CLIENT_ID, | |
| "client_secret": FHIR_CLIENT_SECRET, | |
| "scope": "system/Patient.read system/Observation.read system/Condition.read system/MedicationStatement.read", | |
| }, | |
| timeout=10, | |
| ) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| _token_cache["token"] = data["access_token"] | |
| _token_cache["expires_at"] = time.time() + int(data.get("expires_in", 3600)) | |
| return _token_cache["token"] | |
| except Exception as e: | |
| print(f"[fhir_server] SMART token error: {e}") | |
| return "" | |
| def _headers() -> dict: | |
| token = _get_smart_token() | |
| h = {"Accept": "application/fhir+json", "Content-Type": "application/fhir+json"} | |
| if token: | |
| h["Authorization"] = f"Bearer {token}" | |
| return h | |
| # ββ SHARP context envelope ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def build_sharp_context( | |
| patient_id: str, | |
| fhir_ref: str | None = None, | |
| session_id: str | None = None, | |
| tenant_id: str | None = None, | |
| ) -> dict: | |
| """ | |
| SHARP Extension Spec β patient context envelope. | |
| Carried on every inter-agent message and MCP tool call. | |
| """ | |
| import uuid | |
| return { | |
| "sharp_version": "1.0", | |
| "patient_context": { | |
| "id": patient_id, | |
| "fhir_ref": fhir_ref or f"Patient/{patient_id}", | |
| "fhir_base": FHIR_BASE_URL, | |
| "tenant_id": tenant_id or "clinicalmatch-demo", | |
| "session_id": session_id or str(uuid.uuid4()), | |
| }, | |
| "data_classification": "synthetic-demo", | |
| "baa_in_scope": False, | |
| "consent_status": "unknown", | |
| } | |
| # ββ FHIR resource fetchers ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def fetch_fhir_patient(patient_fhir_id: str) -> dict | None: | |
| """Fetch a Patient resource from the FHIR server by FHIR ID.""" | |
| try: | |
| resp = httpx.get( | |
| f"{FHIR_BASE_URL}/Patient/{patient_fhir_id}", | |
| headers=_headers(), timeout=10, | |
| ) | |
| resp.raise_for_status() | |
| return resp.json() | |
| except Exception as e: | |
| print(f"[fhir_server] Patient fetch error ({patient_fhir_id}): {e}") | |
| return None | |
| def search_fhir_patients(count: int = 10, condition_code: str | None = None) -> list[dict]: | |
| """Search for Patient resources on the FHIR server.""" | |
| params: dict = {"_count": count, "_format": "json"} | |
| if condition_code: | |
| params["_has:Condition:patient:code"] = condition_code | |
| try: | |
| resp = httpx.get(f"{FHIR_BASE_URL}/Patient", headers=_headers(), | |
| params=params, timeout=15) | |
| resp.raise_for_status() | |
| bundle = resp.json() | |
| return [e["resource"] for e in bundle.get("entry", []) if e.get("resource")] | |
| except Exception as e: | |
| print(f"[fhir_server] Patient search error: {e}") | |
| return [] | |
| def fetch_patient_conditions(patient_fhir_id: str) -> list[dict]: | |
| try: | |
| resp = httpx.get( | |
| f"{FHIR_BASE_URL}/Condition", | |
| headers=_headers(), | |
| params={"patient": patient_fhir_id, "_format": "json"}, | |
| timeout=10, | |
| ) | |
| resp.raise_for_status() | |
| bundle = resp.json() | |
| return [e["resource"] for e in bundle.get("entry", []) if e.get("resource")] | |
| except Exception as e: | |
| print(f"[fhir_server] Condition fetch error: {e}") | |
| return [] | |
| def fetch_patient_observations(patient_fhir_id: str) -> list[dict]: | |
| try: | |
| resp = httpx.get( | |
| f"{FHIR_BASE_URL}/Observation", | |
| headers=_headers(), | |
| params={"patient": patient_fhir_id, "_format": "json", "_count": 50}, | |
| timeout=10, | |
| ) | |
| resp.raise_for_status() | |
| bundle = resp.json() | |
| return [e["resource"] for e in bundle.get("entry", []) if e.get("resource")] | |
| except Exception as e: | |
| print(f"[fhir_server] Observation fetch error: {e}") | |
| return [] | |
| def fetch_patient_medications(patient_fhir_id: str) -> list[dict]: | |
| try: | |
| resp = httpx.get( | |
| f"{FHIR_BASE_URL}/MedicationStatement", | |
| headers=_headers(), | |
| params={"patient": patient_fhir_id, "_format": "json"}, | |
| timeout=10, | |
| ) | |
| resp.raise_for_status() | |
| bundle = resp.json() | |
| return [e["resource"] for e in bundle.get("entry", []) if e.get("resource")] | |
| except Exception as e: | |
| print(f"[fhir_server] Medication fetch error: {e}") | |
| return [] | |
| # ββ FHIR β internal model conversion βββββββββββββββββββββββββββββββββββββββββ | |
| def _safe_coding(codings: list[dict], fallback: str = "unknown") -> FHIRCoding: | |
| for c in codings: | |
| if c.get("code"): | |
| return FHIRCoding( | |
| system=c.get("system", ""), | |
| code=c.get("code", fallback), | |
| display=c.get("display", c.get("code", fallback)), | |
| ) | |
| return FHIRCoding(system="", code=fallback, display=fallback) | |
| def _parse_fhir_patient_resource(resource: dict) -> FHIRPatient | None: | |
| try: | |
| pid = resource.get("id", "") | |
| gender = resource.get("gender", "unknown") | |
| birth_date = resource.get("birthDate", "1970-01-01") | |
| return FHIRPatient(id=pid, gender=gender, birthDate=birth_date) | |
| except Exception as e: | |
| print(f"[fhir_server] Patient parse error: {e}") | |
| return None | |
| def _parse_conditions(resources: list[dict]) -> list[FHIRCondition]: | |
| conditions = [] | |
| for r in resources: | |
| try: | |
| coding_list = r.get("code", {}).get("coding", []) | |
| coding = _safe_coding(coding_list) | |
| conditions.append(FHIRCondition( | |
| id=r.get("id", ""), | |
| code=coding, | |
| clinicalStatus=r.get("clinicalStatus", {}).get("coding", [{}])[0].get("code", "active"), | |
| onsetDate=r.get("onsetDateTime", r.get("onsetDate", "")), | |
| )) | |
| except Exception: | |
| continue | |
| return conditions | |
| def _parse_observations(resources: list[dict]) -> list[FHIRObservation]: | |
| observations = [] | |
| for r in resources: | |
| try: | |
| coding_list = r.get("code", {}).get("coding", []) | |
| coding = _safe_coding(coding_list) | |
| vq = r.get("valueQuantity") | |
| vs = r.get("valueString") | |
| vb = r.get("valueBoolean") | |
| observations.append(FHIRObservation( | |
| id=r.get("id", ""), | |
| code=coding, | |
| valueQuantity={"value": vq["value"], "unit": vq.get("unit", "")} if vq and "value" in vq else None, | |
| valueString=str(vs) if vs is not None else None, | |
| valueBoolean=bool(vb) if vb is not None else None, | |
| status=r.get("status", "final"), | |
| )) | |
| except Exception: | |
| continue | |
| return observations | |
| def _parse_medications(resources: list[dict]) -> list[FHIRMedication]: | |
| medications = [] | |
| for r in resources: | |
| try: | |
| coding_list = ( | |
| r.get("medicationCodeableConcept", {}).get("coding", []) or | |
| r.get("medication", {}).get("concept", {}).get("coding", []) | |
| ) | |
| coding = _safe_coding(coding_list) | |
| medications.append(FHIRMedication( | |
| id=r.get("id", ""), | |
| medication=coding, | |
| status=r.get("status", "active"), | |
| )) | |
| except Exception: | |
| continue | |
| return medications | |
| # ββ Public API ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_live_patient_profile( | |
| patient_fhir_id: str, | |
| sharp_context: dict | None = None, | |
| ) -> dict | None: | |
| """ | |
| Fetch a full patient profile from the live FHIR server. | |
| Assembles Patient + Condition + Observation + MedicationStatement | |
| into the same internal profile dict used everywhere in the system. | |
| Attaches SHARP context envelope. | |
| """ | |
| resource = fetch_fhir_patient(patient_fhir_id) | |
| if not resource: | |
| return None | |
| patient = _parse_fhir_patient_resource(resource) | |
| if not patient: | |
| return None | |
| patient.conditions = _parse_conditions(fetch_patient_conditions(patient_fhir_id)) | |
| patient.observations = _parse_observations(fetch_patient_observations(patient_fhir_id)) | |
| patient.medications = _parse_medications(fetch_patient_medications(patient_fhir_id)) | |
| profile = build_patient_profile(patient) | |
| profile["fhir_source"] = "live" | |
| profile["fhir_base_url"] = FHIR_BASE_URL | |
| profile["fhir_ref"] = f"Patient/{patient_fhir_id}" | |
| profile["sharp_context"] = sharp_context or build_sharp_context( | |
| patient_id=patient_fhir_id, | |
| fhir_ref=f"Patient/{patient_fhir_id}", | |
| ) | |
| return profile | |
| def get_fhir_server_status() -> dict: | |
| """Probe the configured FHIR server and return capability statement summary.""" | |
| try: | |
| resp = httpx.get( | |
| f"{FHIR_BASE_URL}/metadata", | |
| headers=_headers(), timeout=8, | |
| ) | |
| resp.raise_for_status() | |
| cap = resp.json() | |
| return { | |
| "reachable": True, | |
| "fhir_version": cap.get("fhirVersion", "unknown"), | |
| "server_name": cap.get("software", {}).get("name", "unknown"), | |
| "base_url": FHIR_BASE_URL, | |
| "auth_method": "SMART/Bearer" if (FHIR_TOKEN_ENDPOINT or FHIR_STATIC_TOKEN) else "none (public sandbox)", | |
| "smart_token_configured": bool(FHIR_TOKEN_ENDPOINT or FHIR_STATIC_TOKEN), | |
| } | |
| except Exception as e: | |
| return { | |
| "reachable": False, | |
| "base_url": FHIR_BASE_URL, | |
| "error": str(e), | |
| "auth_method": "SMART/Bearer" if (FHIR_TOKEN_ENDPOINT or FHIR_STATIC_TOKEN) else "none", | |
| "smart_token_configured": bool(FHIR_TOKEN_ENDPOINT or FHIR_STATIC_TOKEN), | |
| } | |