cps-api-tx / api /services /fhir_integration.py
Ali2206's picture
device token
f0d524c
from datetime import datetime
from typing import List, Dict, Optional
from api.utils.fhir_client import HAPIFHIRClient
from db.mongo import db
class HAPIFHIRIntegrationService:
"""
Service to integrate HAPI FHIR data with your existing database
"""
def __init__(self):
self.fhir_client = HAPIFHIRClient()
def _validate_patient_data_completeness(self, patient: Dict, require_medical_data: bool = False) -> Dict[str, any]:
"""
Validate if a patient has complete data
Args:
patient: Patient data dictionary
require_medical_data: Whether to require medical data (observations, medications, conditions)
Returns:
Dict with validation results:
{
"is_complete": bool,
"missing_fields": List[str],
"has_medical_data": bool,
"validation_score": float (0-1)
}
"""
required_demographic_fields = [
'full_name', 'gender', 'date_of_birth', 'address'
]
optional_demographic_fields = [
'phone', 'email', 'marital_status', 'language'
]
medical_data_fields = [
'observations', 'medications', 'conditions'
]
missing_fields = []
validation_score = 0.0
total_fields = len(required_demographic_fields) + len(optional_demographic_fields)
present_fields = 0
# Check required demographic fields
for field in required_demographic_fields:
value = patient.get(field, '')
if not value or (isinstance(value, str) and value.strip() == ''):
missing_fields.append(field)
else:
present_fields += 1
# Check optional demographic fields
for field in optional_demographic_fields:
value = patient.get(field, '')
if value and (not isinstance(value, str) or value.strip() != ''):
present_fields += 1
# Check medical data
has_medical_data = False
if 'clinical_data' in patient:
clinical_data = patient['clinical_data']
for field in medical_data_fields:
if field in clinical_data and clinical_data[field]:
has_medical_data = True
break
# Calculate validation score
validation_score = present_fields / total_fields if total_fields > 0 else 0.0
# Determine if patient is complete
is_complete = len(missing_fields) == 0 and validation_score >= 0.7
# If medical data is required, check if patient has it
if require_medical_data and not has_medical_data:
is_complete = False
missing_fields.append('medical_data')
return {
"is_complete": is_complete,
"missing_fields": missing_fields,
"has_medical_data": has_medical_data,
"validation_score": validation_score,
"demographic_completeness": present_fields / len(required_demographic_fields + optional_demographic_fields) if (len(required_demographic_fields) + len(optional_demographic_fields)) > 0 else 0.0
}
async def import_patients_from_hapi(self, limit: int = 20, require_medical_data: bool = False, min_completeness_score: float = 0.7) -> dict:
"""
Import patients from HAPI FHIR Test Server with data completeness validation
Args:
limit: Number of patients to fetch from HAPI FHIR
require_medical_data: Whether to require patients to have medical data
min_completeness_score: Minimum validation score (0-1) for a patient to be considered complete
"""
try:
print(f"Fetching {limit} patients from HAPI FHIR...")
patients = self.fhir_client.get_patients(limit=limit)
if not patients:
print("No patients found in HAPI FHIR")
return {
"imported_count": 0,
"skipped_count": 0,
"filtered_count": 0,
"total_found": 0,
"imported_patients": [],
"skipped_patients": [],
"filtered_patients": [],
"validation_summary": {},
"errors": []
}
print(f"Found {len(patients)} patients, checking for duplicates and data completeness...")
imported_count = 0
skipped_count = 0
filtered_count = 0
imported_patients = []
skipped_patients = []
filtered_patients = []
errors = []
validation_summary = {
"total_processed": len(patients),
"complete_patients": 0,
"incomplete_patients": 0,
"with_medical_data": 0,
"without_medical_data": 0,
"average_completeness_score": 0.0
}
total_completeness_score = 0.0
for patient in patients:
try:
# Check if patient already exists by multiple criteria
existing = await db.patients.find_one({
"$or": [
{"fhir_id": patient['fhir_id']},
{"full_name": patient['full_name'], "date_of_birth": patient['date_of_birth']},
{"demographics.fhir_id": patient['fhir_id']}
]
})
if existing:
skipped_count += 1
skipped_patients.append(patient['full_name'])
print(f"Patient {patient['full_name']} already exists (fhir_id: {patient['fhir_id']}), skipping...")
continue
# Enhance patient data with additional FHIR data
enhanced_patient = await self._enhance_patient_data(patient)
# Validate data completeness
validation_result = self._validate_patient_data_completeness(
enhanced_patient,
require_medical_data=require_medical_data
)
# Update validation summary
total_completeness_score += validation_result["validation_score"]
if validation_result["has_medical_data"]:
validation_summary["with_medical_data"] += 1
else:
validation_summary["without_medical_data"] += 1
# Check if patient meets completeness criteria
if not validation_result["is_complete"] or validation_result["validation_score"] < min_completeness_score:
filtered_count += 1
filtered_patients.append({
"name": patient['full_name'],
"fhir_id": patient['fhir_id'],
"missing_fields": validation_result["missing_fields"],
"completeness_score": validation_result["validation_score"],
"has_medical_data": validation_result["has_medical_data"]
})
print(f"Patient {patient['full_name']} filtered out - missing: {validation_result['missing_fields']}, score: {validation_result['validation_score']:.2f}")
validation_summary["incomplete_patients"] += 1
continue
validation_summary["complete_patients"] += 1
# Insert into database
result = await db.patients.insert_one(enhanced_patient)
if result.inserted_id:
imported_count += 1
imported_patients.append({
"name": patient['full_name'],
"fhir_id": patient['fhir_id'],
"completeness_score": validation_result["validation_score"],
"has_medical_data": validation_result["has_medical_data"]
})
print(f"Imported patient: {patient['full_name']} (ID: {result.inserted_id}, Score: {validation_result['validation_score']:.2f})")
except Exception as e:
error_msg = f"Error importing patient {patient.get('full_name', 'Unknown')}: {e}"
errors.append(error_msg)
print(error_msg)
continue
# Calculate average completeness score
if validation_summary["total_processed"] > 0:
validation_summary["average_completeness_score"] = total_completeness_score / validation_summary["total_processed"]
print(f"Import completed: {imported_count} imported, {skipped_count} skipped, {filtered_count} filtered out")
print(f"Validation summary: {validation_summary}")
return {
"imported_count": imported_count,
"skipped_count": skipped_count,
"filtered_count": filtered_count,
"total_found": len(patients),
"imported_patients": imported_patients,
"skipped_patients": skipped_patients,
"filtered_patients": filtered_patients,
"validation_summary": validation_summary,
"errors": errors
}
except Exception as e:
print(f"Error importing patients: {e}")
return {
"imported_count": 0,
"skipped_count": 0,
"filtered_count": 0,
"total_found": 0,
"imported_patients": [],
"skipped_patients": [],
"filtered_patients": [],
"validation_summary": {},
"errors": [str(e)]
}
async def _enhance_patient_data(self, patient: Dict) -> Dict:
"""
Enhance patient data with additional FHIR resources
"""
try:
patient_id = patient['fhir_id']
# Fetch additional data from HAPI FHIR
observations = self.fhir_client.get_patient_observations(patient_id)
medications = self.fhir_client.get_patient_medications(patient_id)
conditions = self.fhir_client.get_patient_conditions(patient_id)
# Structure the enhanced patient data
enhanced_patient = {
# Basic demographics
**patient,
# Clinical data
'demographics': {
'id': patient['id'],
'fhir_id': patient['fhir_id'],
'full_name': patient['full_name'],
'gender': patient['gender'],
'date_of_birth': patient['date_of_birth'],
'address': patient['address'],
'phone': patient.get('phone', ''),
'email': patient.get('email', ''),
'marital_status': patient.get('marital_status', 'Unknown'),
'language': patient.get('language', 'English')
},
'clinical_data': {
'observations': observations,
'medications': medications,
'conditions': conditions,
'notes': [], # Will be populated separately
'encounters': [] # Will be populated separately
},
'metadata': {
'source': 'hapi_fhir',
'import_date': datetime.now().isoformat(),
'last_updated': datetime.now().isoformat(),
'fhir_server': 'https://hapi.fhir.org/baseR4'
}
}
return enhanced_patient
except Exception as e:
print(f"Error enhancing patient data: {e}")
return patient
async def sync_patient_data(self, patient_id: str) -> bool:
"""
Sync a specific patient's data from HAPI FHIR
"""
try:
# Get patient from HAPI FHIR
patient = self.fhir_client.get_patient_by_id(patient_id)
if not patient:
print(f"Patient {patient_id} not found in HAPI FHIR")
return False
# Enhance with additional data
enhanced_patient = await self._enhance_patient_data(patient)
# Update in database
result = await db.patients.update_one(
{"fhir_id": patient_id},
{"$set": enhanced_patient},
upsert=True
)
if result.modified_count > 0 or result.upserted_id:
print(f"Synced patient: {patient['full_name']}")
return True
else:
print(f"No changes for patient: {patient['full_name']}")
return False
except Exception as e:
print(f"Error syncing patient {patient_id}: {e}")
return False
async def get_patient_statistics(self) -> Dict:
"""
Get statistics about imported patients
"""
try:
total_patients = await db.patients.count_documents({})
hapi_patients = await db.patients.count_documents({"source": "hapi_fhir"})
# Get sample patient data and convert ObjectId to string
sample_patient = await db.patients.find_one({"source": "hapi_fhir"})
if sample_patient:
# Convert ObjectId to string for JSON serialization
sample_patient['_id'] = str(sample_patient['_id'])
stats = {
'total_patients': total_patients,
'hapi_fhir_patients': hapi_patients,
'sample_patient': sample_patient
}
return stats
except Exception as e:
print(f"Error getting statistics: {e}")
return {}
async def get_hapi_patients(self, limit: int = 50) -> List[Dict]:
"""
Get patients from HAPI FHIR without importing them
"""
try:
patients = self.fhir_client.get_patients(limit=limit)
return patients
except Exception as e:
print(f"Error fetching HAPI patients: {e}")
return []
async def get_hapi_patient_details(self, patient_id: str) -> Optional[Dict]:
"""
Get detailed information for a specific HAPI FHIR patient
"""
try:
patient = self.fhir_client.get_patient_by_id(patient_id)
if not patient:
return None
# Get additional data
observations = self.fhir_client.get_patient_observations(patient_id)
medications = self.fhir_client.get_patient_medications(patient_id)
conditions = self.fhir_client.get_patient_conditions(patient_id)
return {
'patient': patient,
'observations': observations,
'medications': medications,
'conditions': conditions
}
except Exception as e:
print(f"Error fetching patient details: {e}")
return None