Spaces:
Sleeping
Sleeping
| from fastapi import APIRouter, HTTPException, Depends | |
| from typing import List, Optional | |
| import httpx | |
| import json | |
| from datetime import datetime | |
| from ..auth.auth import get_current_user | |
| from models.schemas import User | |
| router = APIRouter() | |
| # HAPI FHIR Test Server URL | |
| HAPI_FHIR_BASE_URL = "https://hapi.fhir.org/baseR4" | |
| class FHIRIntegration: | |
| def __init__(self): | |
| self.base_url = HAPI_FHIR_BASE_URL | |
| self.client = httpx.AsyncClient(timeout=30.0) | |
| async def search_patients(self, limit: int = 10, offset: int = 0) -> dict: | |
| """Search for patients in HAPI FHIR Test Server""" | |
| try: | |
| url = f"{self.base_url}/Patient" | |
| params = { | |
| "_count": limit, | |
| "_getpagesoffset": offset | |
| } | |
| response = await self.client.get(url, params=params) | |
| response.raise_for_status() | |
| return response.json() | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"FHIR server error: {str(e)}") | |
| async def get_patient_by_id(self, patient_id: str) -> dict: | |
| """Get a specific patient by ID from HAPI FHIR Test Server""" | |
| try: | |
| url = f"{self.base_url}/Patient/{patient_id}" | |
| response = await self.client.get(url) | |
| response.raise_for_status() | |
| return response.json() | |
| except Exception as e: | |
| raise HTTPException(status_code=404, detail=f"Patient not found: {str(e)}") | |
| async def get_patient_observations(self, patient_id: str) -> dict: | |
| """Get observations (vital signs, lab results) for a patient""" | |
| try: | |
| url = f"{self.base_url}/Observation" | |
| params = { | |
| "patient": patient_id, | |
| "_count": 100 | |
| } | |
| response = await self.client.get(url, params=params) | |
| response.raise_for_status() | |
| return response.json() | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error fetching observations: {str(e)}") | |
| async def get_patient_medications(self, patient_id: str) -> dict: | |
| """Get medications for a patient""" | |
| try: | |
| url = f"{self.base_url}/MedicationRequest" | |
| params = { | |
| "patient": patient_id, | |
| "_count": 100 | |
| } | |
| response = await self.client.get(url, params=params) | |
| response.raise_for_status() | |
| return response.json() | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error fetching medications: {str(e)}") | |
| async def get_patient_conditions(self, patient_id: str) -> dict: | |
| """Get conditions (diagnoses) for a patient""" | |
| try: | |
| url = f"{self.base_url}/Condition" | |
| params = { | |
| "patient": patient_id, | |
| "_count": 100 | |
| } | |
| response = await self.client.get(url, params=params) | |
| response.raise_for_status() | |
| return response.json() | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error fetching conditions: {str(e)}") | |
| async def get_patient_encounters(self, patient_id: str) -> dict: | |
| """Get encounters (visits) for a patient""" | |
| try: | |
| url = f"{self.base_url}/Encounter" | |
| params = { | |
| "patient": patient_id, | |
| "_count": 100 | |
| } | |
| response = await self.client.get(url, params=params) | |
| response.raise_for_status() | |
| return response.json() | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error fetching encounters: {str(e)}") | |
| # Initialize FHIR integration | |
| fhir_integration = FHIRIntegration() | |
| async def get_fhir_patients( | |
| limit: int = 10, | |
| offset: int = 0, | |
| current_user: User = Depends(get_current_user) | |
| ): | |
| """Get patients from HAPI FHIR Test Server""" | |
| try: | |
| patients_data = await fhir_integration.search_patients(limit, offset) | |
| # Transform FHIR patients to our format | |
| transformed_patients = [] | |
| for patient in patients_data.get("entry", []): | |
| fhir_patient = patient.get("resource", {}) | |
| # Extract patient information | |
| patient_info = { | |
| "fhir_id": fhir_patient.get("id"), | |
| "full_name": "", | |
| "gender": fhir_patient.get("gender", "unknown"), | |
| "date_of_birth": "", | |
| "address": "", | |
| "phone": "", | |
| "email": "" | |
| } | |
| # Extract name | |
| if fhir_patient.get("name"): | |
| name_parts = [] | |
| for name in fhir_patient["name"]: | |
| if name.get("given"): | |
| name_parts.extend(name["given"]) | |
| if name.get("family"): | |
| name_parts.append(name["family"]) | |
| patient_info["full_name"] = " ".join(name_parts) | |
| # Extract birth date | |
| if fhir_patient.get("birthDate"): | |
| patient_info["date_of_birth"] = fhir_patient["birthDate"] | |
| # Extract address | |
| if fhir_patient.get("address"): | |
| address_parts = [] | |
| for address in fhir_patient["address"]: | |
| if address.get("line"): | |
| address_parts.extend(address["line"]) | |
| if address.get("city"): | |
| address_parts.append(address["city"]) | |
| if address.get("state"): | |
| address_parts.append(address["state"]) | |
| if address.get("postalCode"): | |
| address_parts.append(address["postalCode"]) | |
| patient_info["address"] = ", ".join(address_parts) | |
| # Extract contact information | |
| if fhir_patient.get("telecom"): | |
| for telecom in fhir_patient["telecom"]: | |
| if telecom.get("system") == "phone": | |
| patient_info["phone"] = telecom.get("value", "") | |
| elif telecom.get("system") == "email": | |
| patient_info["email"] = telecom.get("value", "") | |
| transformed_patients.append(patient_info) | |
| return { | |
| "patients": transformed_patients, | |
| "total": patients_data.get("total", len(transformed_patients)), | |
| "count": len(transformed_patients) | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error fetching FHIR patients: {str(e)}") | |
| async def get_fhir_patient_details( | |
| patient_id: str, | |
| current_user: User = Depends(get_current_user) | |
| ): | |
| """Get detailed patient information from HAPI FHIR Test Server""" | |
| try: | |
| # Get basic patient info | |
| patient_data = await fhir_integration.get_patient_by_id(patient_id) | |
| # Get additional patient data | |
| observations = await fhir_integration.get_patient_observations(patient_id) | |
| medications = await fhir_integration.get_patient_medications(patient_id) | |
| conditions = await fhir_integration.get_patient_conditions(patient_id) | |
| encounters = await fhir_integration.get_patient_encounters(patient_id) | |
| # Transform and combine all data | |
| patient_info = { | |
| "fhir_id": patient_data.get("id"), | |
| "full_name": "", | |
| "gender": patient_data.get("gender", "unknown"), | |
| "date_of_birth": patient_data.get("birthDate", ""), | |
| "address": "", | |
| "phone": "", | |
| "email": "", | |
| "observations": [], | |
| "medications": [], | |
| "conditions": [], | |
| "encounters": [] | |
| } | |
| # Extract name | |
| if patient_data.get("name"): | |
| name_parts = [] | |
| for name in patient_data["name"]: | |
| if name.get("given"): | |
| name_parts.extend(name["given"]) | |
| if name.get("family"): | |
| name_parts.append(name["family"]) | |
| patient_info["full_name"] = " ".join(name_parts) | |
| # Extract address | |
| if patient_data.get("address"): | |
| address_parts = [] | |
| for address in patient_data["address"]: | |
| if address.get("line"): | |
| address_parts.extend(address["line"]) | |
| if address.get("city"): | |
| address_parts.append(address["city"]) | |
| if address.get("state"): | |
| address_parts.append(address["state"]) | |
| if address.get("postalCode"): | |
| address_parts.append(address["postalCode"]) | |
| patient_info["address"] = ", ".join(address_parts) | |
| # Extract contact information | |
| if patient_data.get("telecom"): | |
| for telecom in patient_data["telecom"]: | |
| if telecom.get("system") == "phone": | |
| patient_info["phone"] = telecom.get("value", "") | |
| elif telecom.get("system") == "email": | |
| patient_info["email"] = telecom.get("value", "") | |
| # Transform observations | |
| for obs in observations.get("entry", []): | |
| resource = obs.get("resource", {}) | |
| if resource.get("code", {}).get("text"): | |
| patient_info["observations"].append({ | |
| "type": resource["code"]["text"], | |
| "value": resource.get("valueQuantity", {}).get("value"), | |
| "unit": resource.get("valueQuantity", {}).get("unit"), | |
| "date": resource.get("effectiveDateTime", "") | |
| }) | |
| # Transform medications | |
| for med in medications.get("entry", []): | |
| resource = med.get("resource", {}) | |
| if resource.get("medicationCodeableConcept", {}).get("text"): | |
| patient_info["medications"].append({ | |
| "name": resource["medicationCodeableConcept"]["text"], | |
| "status": resource.get("status", ""), | |
| "prescribed_date": resource.get("authoredOn", ""), | |
| "dosage": resource.get("dosageInstruction", [{}])[0].get("text", "") | |
| }) | |
| # Transform conditions | |
| for condition in conditions.get("entry", []): | |
| resource = condition.get("resource", {}) | |
| if resource.get("code", {}).get("text"): | |
| patient_info["conditions"].append({ | |
| "name": resource["code"]["text"], | |
| "status": resource.get("clinicalStatus", {}).get("text", ""), | |
| "onset_date": resource.get("onsetDateTime", ""), | |
| "severity": resource.get("severity", {}).get("text", "") | |
| }) | |
| # Transform encounters | |
| for encounter in encounters.get("entry", []): | |
| resource = encounter.get("resource", {}) | |
| if resource.get("type"): | |
| patient_info["encounters"].append({ | |
| "type": resource["type"][0].get("text", ""), | |
| "status": resource.get("status", ""), | |
| "start_date": resource.get("period", {}).get("start", ""), | |
| "end_date": resource.get("period", {}).get("end", ""), | |
| "service_provider": resource.get("serviceProvider", {}).get("display", "") | |
| }) | |
| return patient_info | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error fetching FHIR patient details: {str(e)}") | |
| async def import_fhir_patient( | |
| patient_id: str, | |
| current_user: User = Depends(get_current_user) | |
| ): | |
| """Import a patient from HAPI FHIR Test Server to our database""" | |
| try: | |
| from db.mongo import patients_collection | |
| from bson import ObjectId | |
| # Get patient data from FHIR server | |
| patient_data = await fhir_integration.get_patient_by_id(patient_id) | |
| # Transform FHIR data to our format | |
| transformed_patient = { | |
| "fhir_id": patient_data.get("id"), | |
| "full_name": "", | |
| "gender": patient_data.get("gender", "unknown"), | |
| "date_of_birth": patient_data.get("birthDate", ""), | |
| "address": "", | |
| "phone": "", | |
| "email": "", | |
| "source": "fhir_import", | |
| "status": "active", | |
| "assigned_doctor_id": str(current_user.id), | |
| "created_at": datetime.now(), | |
| "updated_at": datetime.now() | |
| } | |
| # Extract name | |
| if patient_data.get("name"): | |
| name_parts = [] | |
| for name in patient_data["name"]: | |
| if name.get("given"): | |
| name_parts.extend(name["given"]) | |
| if name.get("family"): | |
| name_parts.append(name["family"]) | |
| transformed_patient["full_name"] = " ".join(name_parts) | |
| # Extract address | |
| if patient_data.get("address"): | |
| address_parts = [] | |
| for address in patient_data["address"]: | |
| if address.get("line"): | |
| address_parts.extend(address["line"]) | |
| if address.get("city"): | |
| address_parts.append(address["city"]) | |
| if address.get("state"): | |
| address_parts.append(address["state"]) | |
| if address.get("postalCode"): | |
| address_parts.append(address["postalCode"]) | |
| transformed_patient["address"] = ", ".join(address_parts) | |
| # Extract contact information | |
| if patient_data.get("telecom"): | |
| for telecom in patient_data["telecom"]: | |
| if telecom.get("system") == "phone": | |
| transformed_patient["phone"] = telecom.get("value", "") | |
| elif telecom.get("system") == "email": | |
| transformed_patient["email"] = telecom.get("value", "") | |
| # Check if patient already exists | |
| existing_patient = await patients_collection.find_one({"fhir_id": patient_data.get("id")}) | |
| if existing_patient: | |
| raise HTTPException(status_code=400, detail="Patient already exists in database") | |
| # Insert patient into database | |
| result = await patients_collection.insert_one(transformed_patient) | |
| return { | |
| "message": "Patient imported successfully", | |
| "patient_id": str(result.inserted_id), | |
| "fhir_id": patient_data.get("id") | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error importing FHIR patient: {str(e)}") |