cps-api-tx / api /routes /fhir_integration.py
Ali2206's picture
Initial CPS-API deployment with TxAgent integration
682caaf
from fastapi import APIRouter, HTTPException, Depends
from typing import List, Optional
import httpx
import json
from datetime import datetime
from ..auth.auth import get_current_user
from models.schemas import User
router = APIRouter()
# HAPI FHIR Test Server URL
HAPI_FHIR_BASE_URL = "https://hapi.fhir.org/baseR4"
class FHIRIntegration:
def __init__(self):
self.base_url = HAPI_FHIR_BASE_URL
self.client = httpx.AsyncClient(timeout=30.0)
async def search_patients(self, limit: int = 10, offset: int = 0) -> dict:
"""Search for patients in HAPI FHIR Test Server"""
try:
url = f"{self.base_url}/Patient"
params = {
"_count": limit,
"_getpagesoffset": offset
}
response = await self.client.get(url, params=params)
response.raise_for_status()
return response.json()
except Exception as e:
raise HTTPException(status_code=500, detail=f"FHIR server error: {str(e)}")
async def get_patient_by_id(self, patient_id: str) -> dict:
"""Get a specific patient by ID from HAPI FHIR Test Server"""
try:
url = f"{self.base_url}/Patient/{patient_id}"
response = await self.client.get(url)
response.raise_for_status()
return response.json()
except Exception as e:
raise HTTPException(status_code=404, detail=f"Patient not found: {str(e)}")
async def get_patient_observations(self, patient_id: str) -> dict:
"""Get observations (vital signs, lab results) for a patient"""
try:
url = f"{self.base_url}/Observation"
params = {
"patient": patient_id,
"_count": 100
}
response = await self.client.get(url, params=params)
response.raise_for_status()
return response.json()
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error fetching observations: {str(e)}")
async def get_patient_medications(self, patient_id: str) -> dict:
"""Get medications for a patient"""
try:
url = f"{self.base_url}/MedicationRequest"
params = {
"patient": patient_id,
"_count": 100
}
response = await self.client.get(url, params=params)
response.raise_for_status()
return response.json()
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error fetching medications: {str(e)}")
async def get_patient_conditions(self, patient_id: str) -> dict:
"""Get conditions (diagnoses) for a patient"""
try:
url = f"{self.base_url}/Condition"
params = {
"patient": patient_id,
"_count": 100
}
response = await self.client.get(url, params=params)
response.raise_for_status()
return response.json()
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error fetching conditions: {str(e)}")
async def get_patient_encounters(self, patient_id: str) -> dict:
"""Get encounters (visits) for a patient"""
try:
url = f"{self.base_url}/Encounter"
params = {
"patient": patient_id,
"_count": 100
}
response = await self.client.get(url, params=params)
response.raise_for_status()
return response.json()
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error fetching encounters: {str(e)}")
# Initialize FHIR integration
fhir_integration = FHIRIntegration()
@router.get("/fhir/patients")
async def get_fhir_patients(
limit: int = 10,
offset: int = 0,
current_user: User = Depends(get_current_user)
):
"""Get patients from HAPI FHIR Test Server"""
try:
patients_data = await fhir_integration.search_patients(limit, offset)
# Transform FHIR patients to our format
transformed_patients = []
for patient in patients_data.get("entry", []):
fhir_patient = patient.get("resource", {})
# Extract patient information
patient_info = {
"fhir_id": fhir_patient.get("id"),
"full_name": "",
"gender": fhir_patient.get("gender", "unknown"),
"date_of_birth": "",
"address": "",
"phone": "",
"email": ""
}
# Extract name
if fhir_patient.get("name"):
name_parts = []
for name in fhir_patient["name"]:
if name.get("given"):
name_parts.extend(name["given"])
if name.get("family"):
name_parts.append(name["family"])
patient_info["full_name"] = " ".join(name_parts)
# Extract birth date
if fhir_patient.get("birthDate"):
patient_info["date_of_birth"] = fhir_patient["birthDate"]
# Extract address
if fhir_patient.get("address"):
address_parts = []
for address in fhir_patient["address"]:
if address.get("line"):
address_parts.extend(address["line"])
if address.get("city"):
address_parts.append(address["city"])
if address.get("state"):
address_parts.append(address["state"])
if address.get("postalCode"):
address_parts.append(address["postalCode"])
patient_info["address"] = ", ".join(address_parts)
# Extract contact information
if fhir_patient.get("telecom"):
for telecom in fhir_patient["telecom"]:
if telecom.get("system") == "phone":
patient_info["phone"] = telecom.get("value", "")
elif telecom.get("system") == "email":
patient_info["email"] = telecom.get("value", "")
transformed_patients.append(patient_info)
return {
"patients": transformed_patients,
"total": patients_data.get("total", len(transformed_patients)),
"count": len(transformed_patients)
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error fetching FHIR patients: {str(e)}")
@router.get("/fhir/patients/{patient_id}")
async def get_fhir_patient_details(
patient_id: str,
current_user: User = Depends(get_current_user)
):
"""Get detailed patient information from HAPI FHIR Test Server"""
try:
# Get basic patient info
patient_data = await fhir_integration.get_patient_by_id(patient_id)
# Get additional patient data
observations = await fhir_integration.get_patient_observations(patient_id)
medications = await fhir_integration.get_patient_medications(patient_id)
conditions = await fhir_integration.get_patient_conditions(patient_id)
encounters = await fhir_integration.get_patient_encounters(patient_id)
# Transform and combine all data
patient_info = {
"fhir_id": patient_data.get("id"),
"full_name": "",
"gender": patient_data.get("gender", "unknown"),
"date_of_birth": patient_data.get("birthDate", ""),
"address": "",
"phone": "",
"email": "",
"observations": [],
"medications": [],
"conditions": [],
"encounters": []
}
# Extract name
if patient_data.get("name"):
name_parts = []
for name in patient_data["name"]:
if name.get("given"):
name_parts.extend(name["given"])
if name.get("family"):
name_parts.append(name["family"])
patient_info["full_name"] = " ".join(name_parts)
# Extract address
if patient_data.get("address"):
address_parts = []
for address in patient_data["address"]:
if address.get("line"):
address_parts.extend(address["line"])
if address.get("city"):
address_parts.append(address["city"])
if address.get("state"):
address_parts.append(address["state"])
if address.get("postalCode"):
address_parts.append(address["postalCode"])
patient_info["address"] = ", ".join(address_parts)
# Extract contact information
if patient_data.get("telecom"):
for telecom in patient_data["telecom"]:
if telecom.get("system") == "phone":
patient_info["phone"] = telecom.get("value", "")
elif telecom.get("system") == "email":
patient_info["email"] = telecom.get("value", "")
# Transform observations
for obs in observations.get("entry", []):
resource = obs.get("resource", {})
if resource.get("code", {}).get("text"):
patient_info["observations"].append({
"type": resource["code"]["text"],
"value": resource.get("valueQuantity", {}).get("value"),
"unit": resource.get("valueQuantity", {}).get("unit"),
"date": resource.get("effectiveDateTime", "")
})
# Transform medications
for med in medications.get("entry", []):
resource = med.get("resource", {})
if resource.get("medicationCodeableConcept", {}).get("text"):
patient_info["medications"].append({
"name": resource["medicationCodeableConcept"]["text"],
"status": resource.get("status", ""),
"prescribed_date": resource.get("authoredOn", ""),
"dosage": resource.get("dosageInstruction", [{}])[0].get("text", "")
})
# Transform conditions
for condition in conditions.get("entry", []):
resource = condition.get("resource", {})
if resource.get("code", {}).get("text"):
patient_info["conditions"].append({
"name": resource["code"]["text"],
"status": resource.get("clinicalStatus", {}).get("text", ""),
"onset_date": resource.get("onsetDateTime", ""),
"severity": resource.get("severity", {}).get("text", "")
})
# Transform encounters
for encounter in encounters.get("entry", []):
resource = encounter.get("resource", {})
if resource.get("type"):
patient_info["encounters"].append({
"type": resource["type"][0].get("text", ""),
"status": resource.get("status", ""),
"start_date": resource.get("period", {}).get("start", ""),
"end_date": resource.get("period", {}).get("end", ""),
"service_provider": resource.get("serviceProvider", {}).get("display", "")
})
return patient_info
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error fetching FHIR patient details: {str(e)}")
@router.post("/fhir/import-patient/{patient_id}")
async def import_fhir_patient(
patient_id: str,
current_user: User = Depends(get_current_user)
):
"""Import a patient from HAPI FHIR Test Server to our database"""
try:
from db.mongo import patients_collection
from bson import ObjectId
# Get patient data from FHIR server
patient_data = await fhir_integration.get_patient_by_id(patient_id)
# Transform FHIR data to our format
transformed_patient = {
"fhir_id": patient_data.get("id"),
"full_name": "",
"gender": patient_data.get("gender", "unknown"),
"date_of_birth": patient_data.get("birthDate", ""),
"address": "",
"phone": "",
"email": "",
"source": "fhir_import",
"status": "active",
"assigned_doctor_id": str(current_user.id),
"created_at": datetime.now(),
"updated_at": datetime.now()
}
# Extract name
if patient_data.get("name"):
name_parts = []
for name in patient_data["name"]:
if name.get("given"):
name_parts.extend(name["given"])
if name.get("family"):
name_parts.append(name["family"])
transformed_patient["full_name"] = " ".join(name_parts)
# Extract address
if patient_data.get("address"):
address_parts = []
for address in patient_data["address"]:
if address.get("line"):
address_parts.extend(address["line"])
if address.get("city"):
address_parts.append(address["city"])
if address.get("state"):
address_parts.append(address["state"])
if address.get("postalCode"):
address_parts.append(address["postalCode"])
transformed_patient["address"] = ", ".join(address_parts)
# Extract contact information
if patient_data.get("telecom"):
for telecom in patient_data["telecom"]:
if telecom.get("system") == "phone":
transformed_patient["phone"] = telecom.get("value", "")
elif telecom.get("system") == "email":
transformed_patient["email"] = telecom.get("value", "")
# Check if patient already exists
existing_patient = await patients_collection.find_one({"fhir_id": patient_data.get("id")})
if existing_patient:
raise HTTPException(status_code=400, detail="Patient already exists in database")
# Insert patient into database
result = await patients_collection.insert_one(transformed_patient)
return {
"message": "Patient imported successfully",
"patient_id": str(result.inserted_id),
"fhir_id": patient_data.get("id")
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error importing FHIR patient: {str(e)}")