from datetime import datetime from typing import List, Dict, Optional from api.utils.fhir_client import HAPIFHIRClient from db.mongo import db class HAPIFHIRIntegrationService: """ Service to integrate HAPI FHIR data with your existing database """ def __init__(self): self.fhir_client = HAPIFHIRClient() def _validate_patient_data_completeness(self, patient: Dict, require_medical_data: bool = False) -> Dict[str, any]: """ Validate if a patient has complete data Args: patient: Patient data dictionary require_medical_data: Whether to require medical data (observations, medications, conditions) Returns: Dict with validation results: { "is_complete": bool, "missing_fields": List[str], "has_medical_data": bool, "validation_score": float (0-1) } """ required_demographic_fields = [ 'full_name', 'gender', 'date_of_birth', 'address' ] optional_demographic_fields = [ 'phone', 'email', 'marital_status', 'language' ] medical_data_fields = [ 'observations', 'medications', 'conditions' ] missing_fields = [] validation_score = 0.0 total_fields = len(required_demographic_fields) + len(optional_demographic_fields) present_fields = 0 # Check required demographic fields for field in required_demographic_fields: value = patient.get(field, '') if not value or (isinstance(value, str) and value.strip() == ''): missing_fields.append(field) else: present_fields += 1 # Check optional demographic fields for field in optional_demographic_fields: value = patient.get(field, '') if value and (not isinstance(value, str) or value.strip() != ''): present_fields += 1 # Check medical data has_medical_data = False if 'clinical_data' in patient: clinical_data = patient['clinical_data'] for field in medical_data_fields: if field in clinical_data and clinical_data[field]: has_medical_data = True break # Calculate validation score validation_score = present_fields / total_fields if total_fields > 0 else 0.0 # Determine if patient is complete is_complete = len(missing_fields) == 0 and validation_score >= 0.7 # If medical data is required, check if patient has it if require_medical_data and not has_medical_data: is_complete = False missing_fields.append('medical_data') return { "is_complete": is_complete, "missing_fields": missing_fields, "has_medical_data": has_medical_data, "validation_score": validation_score, "demographic_completeness": present_fields / len(required_demographic_fields + optional_demographic_fields) if (len(required_demographic_fields) + len(optional_demographic_fields)) > 0 else 0.0 } async def import_patients_from_hapi(self, limit: int = 20, require_medical_data: bool = False, min_completeness_score: float = 0.7) -> dict: """ Import patients from HAPI FHIR Test Server with data completeness validation Args: limit: Number of patients to fetch from HAPI FHIR require_medical_data: Whether to require patients to have medical data min_completeness_score: Minimum validation score (0-1) for a patient to be considered complete """ try: print(f"Fetching {limit} patients from HAPI FHIR...") patients = self.fhir_client.get_patients(limit=limit) if not patients: print("No patients found in HAPI FHIR") return { "imported_count": 0, "skipped_count": 0, "filtered_count": 0, "total_found": 0, "imported_patients": [], "skipped_patients": [], "filtered_patients": [], "validation_summary": {}, "errors": [] } print(f"Found {len(patients)} patients, checking for duplicates and data completeness...") imported_count = 0 skipped_count = 0 filtered_count = 0 imported_patients = [] skipped_patients = [] filtered_patients = [] errors = [] validation_summary = { "total_processed": len(patients), "complete_patients": 0, "incomplete_patients": 0, "with_medical_data": 0, "without_medical_data": 0, "average_completeness_score": 0.0 } total_completeness_score = 0.0 for patient in patients: try: # Check if patient already exists by multiple criteria existing = await db.patients.find_one({ "$or": [ {"fhir_id": patient['fhir_id']}, {"full_name": patient['full_name'], "date_of_birth": patient['date_of_birth']}, {"demographics.fhir_id": patient['fhir_id']} ] }) if existing: skipped_count += 1 skipped_patients.append(patient['full_name']) print(f"Patient {patient['full_name']} already exists (fhir_id: {patient['fhir_id']}), skipping...") continue # Enhance patient data with additional FHIR data enhanced_patient = await self._enhance_patient_data(patient) # Validate data completeness validation_result = self._validate_patient_data_completeness( enhanced_patient, require_medical_data=require_medical_data ) # Update validation summary total_completeness_score += validation_result["validation_score"] if validation_result["has_medical_data"]: validation_summary["with_medical_data"] += 1 else: validation_summary["without_medical_data"] += 1 # Check if patient meets completeness criteria if not validation_result["is_complete"] or validation_result["validation_score"] < min_completeness_score: filtered_count += 1 filtered_patients.append({ "name": patient['full_name'], "fhir_id": patient['fhir_id'], "missing_fields": validation_result["missing_fields"], "completeness_score": validation_result["validation_score"], "has_medical_data": validation_result["has_medical_data"] }) print(f"Patient {patient['full_name']} filtered out - missing: {validation_result['missing_fields']}, score: {validation_result['validation_score']:.2f}") validation_summary["incomplete_patients"] += 1 continue validation_summary["complete_patients"] += 1 # Insert into database result = await db.patients.insert_one(enhanced_patient) if result.inserted_id: imported_count += 1 imported_patients.append({ "name": patient['full_name'], "fhir_id": patient['fhir_id'], "completeness_score": validation_result["validation_score"], "has_medical_data": validation_result["has_medical_data"] }) print(f"Imported patient: {patient['full_name']} (ID: {result.inserted_id}, Score: {validation_result['validation_score']:.2f})") except Exception as e: error_msg = f"Error importing patient {patient.get('full_name', 'Unknown')}: {e}" errors.append(error_msg) print(error_msg) continue # Calculate average completeness score if validation_summary["total_processed"] > 0: validation_summary["average_completeness_score"] = total_completeness_score / validation_summary["total_processed"] print(f"Import completed: {imported_count} imported, {skipped_count} skipped, {filtered_count} filtered out") print(f"Validation summary: {validation_summary}") return { "imported_count": imported_count, "skipped_count": skipped_count, "filtered_count": filtered_count, "total_found": len(patients), "imported_patients": imported_patients, "skipped_patients": skipped_patients, "filtered_patients": filtered_patients, "validation_summary": validation_summary, "errors": errors } except Exception as e: print(f"Error importing patients: {e}") return { "imported_count": 0, "skipped_count": 0, "filtered_count": 0, "total_found": 0, "imported_patients": [], "skipped_patients": [], "filtered_patients": [], "validation_summary": {}, "errors": [str(e)] } async def _enhance_patient_data(self, patient: Dict) -> Dict: """ Enhance patient data with additional FHIR resources """ try: patient_id = patient['fhir_id'] # Fetch additional data from HAPI FHIR observations = self.fhir_client.get_patient_observations(patient_id) medications = self.fhir_client.get_patient_medications(patient_id) conditions = self.fhir_client.get_patient_conditions(patient_id) # Structure the enhanced patient data enhanced_patient = { # Basic demographics **patient, # Clinical data 'demographics': { 'id': patient['id'], 'fhir_id': patient['fhir_id'], 'full_name': patient['full_name'], 'gender': patient['gender'], 'date_of_birth': patient['date_of_birth'], 'address': patient['address'], 'phone': patient.get('phone', ''), 'email': patient.get('email', ''), 'marital_status': patient.get('marital_status', 'Unknown'), 'language': patient.get('language', 'English') }, 'clinical_data': { 'observations': observations, 'medications': medications, 'conditions': conditions, 'notes': [], # Will be populated separately 'encounters': [] # Will be populated separately }, 'metadata': { 'source': 'hapi_fhir', 'import_date': datetime.now().isoformat(), 'last_updated': datetime.now().isoformat(), 'fhir_server': 'https://hapi.fhir.org/baseR4' } } return enhanced_patient except Exception as e: print(f"Error enhancing patient data: {e}") return patient async def sync_patient_data(self, patient_id: str) -> bool: """ Sync a specific patient's data from HAPI FHIR """ try: # Get patient from HAPI FHIR patient = self.fhir_client.get_patient_by_id(patient_id) if not patient: print(f"Patient {patient_id} not found in HAPI FHIR") return False # Enhance with additional data enhanced_patient = await self._enhance_patient_data(patient) # Update in database result = await db.patients.update_one( {"fhir_id": patient_id}, {"$set": enhanced_patient}, upsert=True ) if result.modified_count > 0 or result.upserted_id: print(f"Synced patient: {patient['full_name']}") return True else: print(f"No changes for patient: {patient['full_name']}") return False except Exception as e: print(f"Error syncing patient {patient_id}: {e}") return False async def get_patient_statistics(self) -> Dict: """ Get statistics about imported patients """ try: total_patients = await db.patients.count_documents({}) hapi_patients = await db.patients.count_documents({"source": "hapi_fhir"}) # Get sample patient data and convert ObjectId to string sample_patient = await db.patients.find_one({"source": "hapi_fhir"}) if sample_patient: # Convert ObjectId to string for JSON serialization sample_patient['_id'] = str(sample_patient['_id']) stats = { 'total_patients': total_patients, 'hapi_fhir_patients': hapi_patients, 'sample_patient': sample_patient } return stats except Exception as e: print(f"Error getting statistics: {e}") return {} async def get_hapi_patients(self, limit: int = 50) -> List[Dict]: """ Get patients from HAPI FHIR without importing them """ try: patients = self.fhir_client.get_patients(limit=limit) return patients except Exception as e: print(f"Error fetching HAPI patients: {e}") return [] async def get_hapi_patient_details(self, patient_id: str) -> Optional[Dict]: """ Get detailed information for a specific HAPI FHIR patient """ try: patient = self.fhir_client.get_patient_by_id(patient_id) if not patient: return None # Get additional data observations = self.fhir_client.get_patient_observations(patient_id) medications = self.fhir_client.get_patient_medications(patient_id) conditions = self.fhir_client.get_patient_conditions(patient_id) return { 'patient': patient, 'observations': observations, 'medications': medications, 'conditions': conditions } except Exception as e: print(f"Error fetching patient details: {e}") return None