Spaces:
Sleeping
Sleeping
| from datetime import datetime | |
| from typing import List, Dict, Optional | |
| from api.utils.fhir_client import HAPIFHIRClient | |
| from db.mongo import db | |
| class HAPIFHIRIntegrationService: | |
| """ | |
| Service to integrate HAPI FHIR data with your existing database | |
| """ | |
| def __init__(self): | |
| self.fhir_client = HAPIFHIRClient() | |
| def _validate_patient_data_completeness(self, patient: Dict, require_medical_data: bool = False) -> Dict[str, any]: | |
| """ | |
| Validate if a patient has complete data | |
| Args: | |
| patient: Patient data dictionary | |
| require_medical_data: Whether to require medical data (observations, medications, conditions) | |
| Returns: | |
| Dict with validation results: | |
| { | |
| "is_complete": bool, | |
| "missing_fields": List[str], | |
| "has_medical_data": bool, | |
| "validation_score": float (0-1) | |
| } | |
| """ | |
| required_demographic_fields = [ | |
| 'full_name', 'gender', 'date_of_birth', 'address' | |
| ] | |
| optional_demographic_fields = [ | |
| 'phone', 'email', 'marital_status', 'language' | |
| ] | |
| medical_data_fields = [ | |
| 'observations', 'medications', 'conditions' | |
| ] | |
| missing_fields = [] | |
| validation_score = 0.0 | |
| total_fields = len(required_demographic_fields) + len(optional_demographic_fields) | |
| present_fields = 0 | |
| # Check required demographic fields | |
| for field in required_demographic_fields: | |
| value = patient.get(field, '') | |
| if not value or (isinstance(value, str) and value.strip() == ''): | |
| missing_fields.append(field) | |
| else: | |
| present_fields += 1 | |
| # Check optional demographic fields | |
| for field in optional_demographic_fields: | |
| value = patient.get(field, '') | |
| if value and (not isinstance(value, str) or value.strip() != ''): | |
| present_fields += 1 | |
| # Check medical data | |
| has_medical_data = False | |
| if 'clinical_data' in patient: | |
| clinical_data = patient['clinical_data'] | |
| for field in medical_data_fields: | |
| if field in clinical_data and clinical_data[field]: | |
| has_medical_data = True | |
| break | |
| # Calculate validation score | |
| validation_score = present_fields / total_fields if total_fields > 0 else 0.0 | |
| # Determine if patient is complete | |
| is_complete = len(missing_fields) == 0 and validation_score >= 0.7 | |
| # If medical data is required, check if patient has it | |
| if require_medical_data and not has_medical_data: | |
| is_complete = False | |
| missing_fields.append('medical_data') | |
| return { | |
| "is_complete": is_complete, | |
| "missing_fields": missing_fields, | |
| "has_medical_data": has_medical_data, | |
| "validation_score": validation_score, | |
| "demographic_completeness": present_fields / len(required_demographic_fields + optional_demographic_fields) if (len(required_demographic_fields) + len(optional_demographic_fields)) > 0 else 0.0 | |
| } | |
| async def import_patients_from_hapi(self, limit: int = 20, require_medical_data: bool = False, min_completeness_score: float = 0.7) -> dict: | |
| """ | |
| Import patients from HAPI FHIR Test Server with data completeness validation | |
| Args: | |
| limit: Number of patients to fetch from HAPI FHIR | |
| require_medical_data: Whether to require patients to have medical data | |
| min_completeness_score: Minimum validation score (0-1) for a patient to be considered complete | |
| """ | |
| try: | |
| print(f"Fetching {limit} patients from HAPI FHIR...") | |
| patients = self.fhir_client.get_patients(limit=limit) | |
| if not patients: | |
| print("No patients found in HAPI FHIR") | |
| return { | |
| "imported_count": 0, | |
| "skipped_count": 0, | |
| "filtered_count": 0, | |
| "total_found": 0, | |
| "imported_patients": [], | |
| "skipped_patients": [], | |
| "filtered_patients": [], | |
| "validation_summary": {}, | |
| "errors": [] | |
| } | |
| print(f"Found {len(patients)} patients, checking for duplicates and data completeness...") | |
| imported_count = 0 | |
| skipped_count = 0 | |
| filtered_count = 0 | |
| imported_patients = [] | |
| skipped_patients = [] | |
| filtered_patients = [] | |
| errors = [] | |
| validation_summary = { | |
| "total_processed": len(patients), | |
| "complete_patients": 0, | |
| "incomplete_patients": 0, | |
| "with_medical_data": 0, | |
| "without_medical_data": 0, | |
| "average_completeness_score": 0.0 | |
| } | |
| total_completeness_score = 0.0 | |
| for patient in patients: | |
| try: | |
| # Check if patient already exists by multiple criteria | |
| existing = await db.patients.find_one({ | |
| "$or": [ | |
| {"fhir_id": patient['fhir_id']}, | |
| {"full_name": patient['full_name'], "date_of_birth": patient['date_of_birth']}, | |
| {"demographics.fhir_id": patient['fhir_id']} | |
| ] | |
| }) | |
| if existing: | |
| skipped_count += 1 | |
| skipped_patients.append(patient['full_name']) | |
| print(f"Patient {patient['full_name']} already exists (fhir_id: {patient['fhir_id']}), skipping...") | |
| continue | |
| # Enhance patient data with additional FHIR data | |
| enhanced_patient = await self._enhance_patient_data(patient) | |
| # Validate data completeness | |
| validation_result = self._validate_patient_data_completeness( | |
| enhanced_patient, | |
| require_medical_data=require_medical_data | |
| ) | |
| # Update validation summary | |
| total_completeness_score += validation_result["validation_score"] | |
| if validation_result["has_medical_data"]: | |
| validation_summary["with_medical_data"] += 1 | |
| else: | |
| validation_summary["without_medical_data"] += 1 | |
| # Check if patient meets completeness criteria | |
| if not validation_result["is_complete"] or validation_result["validation_score"] < min_completeness_score: | |
| filtered_count += 1 | |
| filtered_patients.append({ | |
| "name": patient['full_name'], | |
| "fhir_id": patient['fhir_id'], | |
| "missing_fields": validation_result["missing_fields"], | |
| "completeness_score": validation_result["validation_score"], | |
| "has_medical_data": validation_result["has_medical_data"] | |
| }) | |
| print(f"Patient {patient['full_name']} filtered out - missing: {validation_result['missing_fields']}, score: {validation_result['validation_score']:.2f}") | |
| validation_summary["incomplete_patients"] += 1 | |
| continue | |
| validation_summary["complete_patients"] += 1 | |
| # Insert into database | |
| result = await db.patients.insert_one(enhanced_patient) | |
| if result.inserted_id: | |
| imported_count += 1 | |
| imported_patients.append({ | |
| "name": patient['full_name'], | |
| "fhir_id": patient['fhir_id'], | |
| "completeness_score": validation_result["validation_score"], | |
| "has_medical_data": validation_result["has_medical_data"] | |
| }) | |
| print(f"Imported patient: {patient['full_name']} (ID: {result.inserted_id}, Score: {validation_result['validation_score']:.2f})") | |
| except Exception as e: | |
| error_msg = f"Error importing patient {patient.get('full_name', 'Unknown')}: {e}" | |
| errors.append(error_msg) | |
| print(error_msg) | |
| continue | |
| # Calculate average completeness score | |
| if validation_summary["total_processed"] > 0: | |
| validation_summary["average_completeness_score"] = total_completeness_score / validation_summary["total_processed"] | |
| print(f"Import completed: {imported_count} imported, {skipped_count} skipped, {filtered_count} filtered out") | |
| print(f"Validation summary: {validation_summary}") | |
| return { | |
| "imported_count": imported_count, | |
| "skipped_count": skipped_count, | |
| "filtered_count": filtered_count, | |
| "total_found": len(patients), | |
| "imported_patients": imported_patients, | |
| "skipped_patients": skipped_patients, | |
| "filtered_patients": filtered_patients, | |
| "validation_summary": validation_summary, | |
| "errors": errors | |
| } | |
| except Exception as e: | |
| print(f"Error importing patients: {e}") | |
| return { | |
| "imported_count": 0, | |
| "skipped_count": 0, | |
| "filtered_count": 0, | |
| "total_found": 0, | |
| "imported_patients": [], | |
| "skipped_patients": [], | |
| "filtered_patients": [], | |
| "validation_summary": {}, | |
| "errors": [str(e)] | |
| } | |
| async def _enhance_patient_data(self, patient: Dict) -> Dict: | |
| """ | |
| Enhance patient data with additional FHIR resources | |
| """ | |
| try: | |
| patient_id = patient['fhir_id'] | |
| # Fetch additional data from HAPI FHIR | |
| observations = self.fhir_client.get_patient_observations(patient_id) | |
| medications = self.fhir_client.get_patient_medications(patient_id) | |
| conditions = self.fhir_client.get_patient_conditions(patient_id) | |
| # Structure the enhanced patient data | |
| enhanced_patient = { | |
| # Basic demographics | |
| **patient, | |
| # Clinical data | |
| 'demographics': { | |
| 'id': patient['id'], | |
| 'fhir_id': patient['fhir_id'], | |
| 'full_name': patient['full_name'], | |
| 'gender': patient['gender'], | |
| 'date_of_birth': patient['date_of_birth'], | |
| 'address': patient['address'], | |
| 'phone': patient.get('phone', ''), | |
| 'email': patient.get('email', ''), | |
| 'marital_status': patient.get('marital_status', 'Unknown'), | |
| 'language': patient.get('language', 'English') | |
| }, | |
| 'clinical_data': { | |
| 'observations': observations, | |
| 'medications': medications, | |
| 'conditions': conditions, | |
| 'notes': [], # Will be populated separately | |
| 'encounters': [] # Will be populated separately | |
| }, | |
| 'metadata': { | |
| 'source': 'hapi_fhir', | |
| 'import_date': datetime.now().isoformat(), | |
| 'last_updated': datetime.now().isoformat(), | |
| 'fhir_server': 'https://hapi.fhir.org/baseR4' | |
| } | |
| } | |
| return enhanced_patient | |
| except Exception as e: | |
| print(f"Error enhancing patient data: {e}") | |
| return patient | |
| async def sync_patient_data(self, patient_id: str) -> bool: | |
| """ | |
| Sync a specific patient's data from HAPI FHIR | |
| """ | |
| try: | |
| # Get patient from HAPI FHIR | |
| patient = self.fhir_client.get_patient_by_id(patient_id) | |
| if not patient: | |
| print(f"Patient {patient_id} not found in HAPI FHIR") | |
| return False | |
| # Enhance with additional data | |
| enhanced_patient = await self._enhance_patient_data(patient) | |
| # Update in database | |
| result = await db.patients.update_one( | |
| {"fhir_id": patient_id}, | |
| {"$set": enhanced_patient}, | |
| upsert=True | |
| ) | |
| if result.modified_count > 0 or result.upserted_id: | |
| print(f"Synced patient: {patient['full_name']}") | |
| return True | |
| else: | |
| print(f"No changes for patient: {patient['full_name']}") | |
| return False | |
| except Exception as e: | |
| print(f"Error syncing patient {patient_id}: {e}") | |
| return False | |
| async def get_patient_statistics(self) -> Dict: | |
| """ | |
| Get statistics about imported patients | |
| """ | |
| try: | |
| total_patients = await db.patients.count_documents({}) | |
| hapi_patients = await db.patients.count_documents({"source": "hapi_fhir"}) | |
| # Get sample patient data and convert ObjectId to string | |
| sample_patient = await db.patients.find_one({"source": "hapi_fhir"}) | |
| if sample_patient: | |
| # Convert ObjectId to string for JSON serialization | |
| sample_patient['_id'] = str(sample_patient['_id']) | |
| stats = { | |
| 'total_patients': total_patients, | |
| 'hapi_fhir_patients': hapi_patients, | |
| 'sample_patient': sample_patient | |
| } | |
| return stats | |
| except Exception as e: | |
| print(f"Error getting statistics: {e}") | |
| return {} | |
| async def get_hapi_patients(self, limit: int = 50) -> List[Dict]: | |
| """ | |
| Get patients from HAPI FHIR without importing them | |
| """ | |
| try: | |
| patients = self.fhir_client.get_patients(limit=limit) | |
| return patients | |
| except Exception as e: | |
| print(f"Error fetching HAPI patients: {e}") | |
| return [] | |
| async def get_hapi_patient_details(self, patient_id: str) -> Optional[Dict]: | |
| """ | |
| Get detailed information for a specific HAPI FHIR patient | |
| """ | |
| try: | |
| patient = self.fhir_client.get_patient_by_id(patient_id) | |
| if not patient: | |
| return None | |
| # Get additional data | |
| observations = self.fhir_client.get_patient_observations(patient_id) | |
| medications = self.fhir_client.get_patient_medications(patient_id) | |
| conditions = self.fhir_client.get_patient_conditions(patient_id) | |
| return { | |
| 'patient': patient, | |
| 'observations': observations, | |
| 'medications': medications, | |
| 'conditions': conditions | |
| } | |
| except Exception as e: | |
| print(f"Error fetching patient details: {e}") | |
| return None |