from fastapi import APIRouter, HTTPException, Depends, Query, status, Body from db.mongo import patients_collection from core.security import get_current_user from utils.db import create_indexes from utils.helpers import calculate_age, standardize_language from models.entities import Note, PatientCreate from datetime import datetime from bson import ObjectId from bson.errors import InvalidId from typing import Optional, List, Dict, Any from pymongo import UpdateOne, DeleteOne from pymongo.errors import BulkWriteError import json from pathlib import Path import glob import uuid import re import logging import time import os from pydantic import BaseModel, Field # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(name)s - %(message)s' ) logger = logging.getLogger(__name__) router = APIRouter() # Configuration BASE_DIR = Path(__file__).resolve().parent.parent.parent SYNTHEA_DATA_DIR = BASE_DIR / "output" / "fhir" os.makedirs(SYNTHEA_DATA_DIR, exist_ok=True) # Pydantic models for update validation class ConditionUpdate(BaseModel): id: Optional[str] = None code: Optional[str] = None status: Optional[str] = None onset_date: Optional[str] = None recorded_date: Optional[str] = None verification_status: Optional[str] = None notes: Optional[str] = None class MedicationUpdate(BaseModel): id: Optional[str] = None name: Optional[str] = None status: Optional[str] = None prescribed_date: Optional[str] = None requester: Optional[str] = None dosage: Optional[str] = None class EncounterUpdate(BaseModel): id: Optional[str] = None type: Optional[str] = None status: Optional[str] = None period: Optional[Dict[str, str]] = None service_provider: Optional[str] = None class NoteUpdate(BaseModel): id: Optional[str] = None title: Optional[str] = None date: Optional[str] = None author: Optional[str] = None content: Optional[str] = None class PatientUpdate(BaseModel): full_name: Optional[str] = None gender: Optional[str] = None date_of_birth: Optional[str] = None address: Optional[str] = None city: Optional[str] = None state: Optional[str] = None postal_code: Optional[str] = None country: Optional[str] = None marital_status: Optional[str] = None language: Optional[str] = None conditions: Optional[List[ConditionUpdate]] = None medications: Optional[List[MedicationUpdate]] = None encounters: Optional[List[EncounterUpdate]] = None notes: Optional[List[NoteUpdate]] = None @router.get("/debug/count") async def debug_patient_count(): """Debug endpoint to verify patient counts""" try: total = await patients_collection.count_documents({}) synthea = await patients_collection.count_documents({"source": "synthea"}) manual = await patients_collection.count_documents({"source": "manual"}) return { "total": total, "synthea": synthea, "manual": manual, "message": f"Found {total} total patients ({synthea} from synthea, {manual} manual)" } except Exception as e: logger.error(f"Error counting patients: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error counting patients: {str(e)}" ) @router.post("/patients", status_code=status.HTTP_201_CREATED) async def create_patient( patient_data: PatientCreate, current_user: dict = Depends(get_current_user) ): """Create a new patient in the database""" logger.info(f"Creating new patient by user {current_user.get('email')}") if current_user.get('role') not in ['admin', 'doctor']: logger.warning(f"Unauthorized create attempt by {current_user.get('email')}") raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Only administrators and doctors can create patients" ) try: # Prepare the patient document patient_doc = patient_data.dict() now = datetime.utcnow().isoformat() # Add system-generated fields patient_doc.update({ "fhir_id": str(uuid.uuid4()), "import_date": now, "last_updated": now, "source": "manual", "created_by": current_user.get('email') }) # Ensure arrays exist even if empty for field in ['conditions', 'medications', 'encounters', 'notes']: if field not in patient_doc: patient_doc[field] = [] # Insert the patient document result = await patients_collection.insert_one(patient_doc) # Return the created patient with the generated ID created_patient = await patients_collection.find_one( {"_id": result.inserted_id} ) if not created_patient: logger.error("Failed to retrieve created patient") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve created patient" ) created_patient["id"] = str(created_patient["_id"]) del created_patient["_id"] logger.info(f"Successfully created patient {created_patient['fhir_id']}") return created_patient except Exception as e: logger.error(f"Failed to create patient: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to create patient: {str(e)}" ) @router.delete("/patients/{patient_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_patient( patient_id: str, current_user: dict = Depends(get_current_user) ): """Delete a patient from the database""" logger.info(f"Deleting patient {patient_id} by user {current_user.get('email')}") if current_user.get('role') not in ['admin', 'doctor']: logger.warning(f"Unauthorized delete attempt by {current_user.get('email')}") raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Only administrators can delete patients" ) try: # Build the query based on whether patient_id is a valid ObjectId query = {"fhir_id": patient_id} if ObjectId.is_valid(patient_id): query = { "$or": [ {"_id": ObjectId(patient_id)}, {"fhir_id": patient_id} ] } # Check if patient exists patient = await patients_collection.find_one(query) if not patient: logger.warning(f"Patient not found for deletion: {patient_id}") raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Patient not found" ) # Perform deletion result = await patients_collection.delete_one(query) if result.deleted_count == 0: logger.error(f"Failed to delete patient {patient_id}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to delete patient" ) logger.info(f"Successfully deleted patient {patient_id}") return None except HTTPException: raise except Exception as e: logger.error(f"Failed to delete patient {patient_id}: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to delete patient: {str(e)}" ) async def process_synthea_patient(bundle: dict, file_path: str) -> Optional[dict]: logger.debug(f"Processing patient from file: {file_path}") patient_data = {} notes = [] conditions = [] medications = [] encounters = [] # Validate bundle structure if not isinstance(bundle, dict) or 'entry' not in bundle: logger.error(f"Invalid FHIR bundle structure in {file_path}") return None for entry in bundle.get('entry', []): resource = entry.get('resource', {}) resource_type = resource.get('resourceType') if not resource_type: logger.warning(f"Skipping entry with missing resourceType in {file_path}") continue try: if resource_type == 'Patient': name = resource.get('name', [{}])[0] address = resource.get('address', [{}])[0] # Construct full name and remove numbers raw_full_name = f"{' '.join(name.get('given', ['']))} {name.get('family', '')}".strip() clean_full_name = re.sub(r'\d+', '', raw_full_name).strip() patient_data = { 'fhir_id': resource.get('id'), 'full_name': clean_full_name, 'gender': resource.get('gender', 'unknown'), 'date_of_birth': resource.get('birthDate', ''), 'address': ' '.join(address.get('line', [''])), 'city': address.get('city', ''), 'state': address.get('state', ''), 'postal_code': address.get('postalCode', ''), 'country': address.get('country', ''), 'marital_status': resource.get('maritalStatus', {}).get('text', ''), 'language': standardize_language(resource.get('communication', [{}])[0].get('language', {}).get('text', '')), 'source': 'synthea', 'last_updated': datetime.utcnow().isoformat() } elif resource_type == 'Encounter': encounter = { 'id': resource.get('id'), 'type': resource.get('type', [{}])[0].get('text', ''), 'status': resource.get('status'), 'period': resource.get('period', {}), 'service_provider': resource.get('serviceProvider', {}).get('display', '') } encounters.append(encounter) for note in resource.get('note', []): if note.get('text'): notes.append({ 'date': resource.get('period', {}).get('start', datetime.utcnow().isoformat()), 'type': resource.get('type', [{}])[0].get('text', 'Encounter Note'), 'text': note.get('text'), 'context': f"Encounter: {encounter.get('type')}", 'author': 'System Generated' }) elif resource_type == 'Condition': conditions.append({ 'id': resource.get('id'), 'code': resource.get('code', {}).get('text', ''), 'status': resource.get('clinicalStatus', {}).get('text', ''), 'onset_date': resource.get('onsetDateTime'), 'recorded_date': resource.get('recordedDate'), 'verification_status': resource.get('verificationStatus', {}).get('text', '') }) elif resource_type == 'MedicationRequest': medications.append({ 'id': resource.get('id'), 'name': resource.get('medicationCodeableConcept', {}).get('text', ''), 'status': resource.get('status'), 'prescribed_date': resource.get('authoredOn'), 'requester': resource.get('requester', {}).get('display', ''), 'dosage': resource.get('dosageInstruction', [{}])[0].get('text', '') }) except Exception as e: logger.error(f"Error processing {resource_type} in {file_path}: {str(e)}") continue if patient_data: patient_data.update({ 'notes': notes, 'conditions': conditions, 'medications': medications, 'encounters': encounters, 'import_date': datetime.utcnow().isoformat() }) logger.info(f"Successfully processed patient {patient_data.get('fhir_id')} from {file_path}") return patient_data logger.warning(f"No valid patient data found in {file_path}") return None @router.post("/import", status_code=status.HTTP_201_CREATED) async def import_patients( limit: int = Query(100, ge=1, le=1000), current_user: dict = Depends(get_current_user) ): request_id = str(uuid.uuid4()) logger.info(f"Starting import request {request_id} by user {current_user.get('email')}") start_time = time.time() if current_user.get('role') not in ['admin', 'doctor']: logger.warning(f"Unauthorized import attempt by {current_user.get('email')}") raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Only administrators and doctors can import data" ) try: await create_indexes() if not SYNTHEA_DATA_DIR.exists(): logger.error(f"Synthea data directory not found: {SYNTHEA_DATA_DIR}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Data directory not found" ) # Filter out non-patient files files = [ f for f in glob.glob(str(SYNTHEA_DATA_DIR / "*.json")) if not re.search(r'(hospitalInformation|practitionerInformation)\d+\.json$', f) ] if not files: logger.warning("No valid patient JSON files found in synthea data directory") return { "status": "success", "message": "No patient data files found", "imported": 0, "request_id": request_id } operations = [] imported = 0 errors = [] for file_path in files[:limit]: try: logger.debug(f"Processing file: {file_path}") # Check file accessibility if not os.path.exists(file_path): logger.error(f"File not found: {file_path}") errors.append(f"File not found: {file_path}") continue # Check file size file_size = os.path.getsize(file_path) if file_size == 0: logger.warning(f"Empty file: {file_path}") errors.append(f"Empty file: {file_path}") continue with open(file_path, 'r', encoding='utf-8') as f: try: bundle = json.load(f) except json.JSONDecodeError as je: logger.error(f"Invalid JSON in {file_path}: {str(je)}") errors.append(f"Invalid JSON in {file_path}: {str(je)}") continue patient = await process_synthea_patient(bundle, file_path) if patient: if not patient.get('fhir_id'): logger.warning(f"Missing FHIR ID in patient data from {file_path}") errors.append(f"Missing FHIR ID in {file_path}") continue operations.append(UpdateOne( {"fhir_id": patient['fhir_id']}, {"$setOnInsert": patient}, upsert=True )) imported += 1 else: logger.warning(f"No valid patient data in {file_path}") errors.append(f"No valid patient data in {file_path}") except Exception as e: logger.error(f"Error processing {file_path}: {str(e)}") errors.append(f"Error in {file_path}: {str(e)}") continue response = { "status": "success", "imported": imported, "errors": errors, "request_id": request_id, "duration_seconds": time.time() - start_time } if operations: try: result = await patients_collection.bulk_write(operations, ordered=False) response.update({ "upserted": result.upserted_count, "existing": len(operations) - result.upserted_count }) logger.info(f"Import request {request_id} completed: {imported} patients processed, " f"{result.upserted_count} upserted, {len(errors)} errors") except BulkWriteError as bwe: logger.error(f"Partial bulk write failure for request {request_id}: {str(bwe.details)}") response.update({ "upserted": bwe.details.get('nUpserted', 0), "existing": len(operations) - bwe.details.get('nUpserted', 0), "write_errors": [ f"Index {err['index']}: {err['errmsg']}" for err in bwe.details.get('writeErrors', []) ] }) logger.info(f"Import request {request_id} partially completed: {imported} patients processed, " f"{response['upserted']} upserted, {len(errors)} errors") except Exception as e: logger.error(f"Bulk write failed for request {request_id}: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Database operation failed: {str(e)}" ) else: logger.info(f"Import request {request_id} completed: No new patients to import, {len(errors)} errors") response["message"] = "No new patients found to import" return response except HTTPException: raise except Exception as e: logger.error(f"Import request {request_id} failed: {str(e)}", exc_info=True) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Import failed: {str(e)}" ) @router.get("/patients", response_model=List[dict]) async def list_patients( search: Optional[str] = Query(None), min_notes: int = Query(0, ge=0), min_conditions: int = Query(0, ge=0), limit: int = Query(100, ge=1, le=500), skip: int = Query(0, ge=0) ): logger.info(f"Listing patients with search: {search}, limit: {limit}, skip: {skip}") query = {} if search: query["$or"] = [ {"full_name": {"$regex": search, "$options": "i"}}, {"fhir_id": search} ] if min_notes > 0: query[f"notes.{min_notes-1}"] = {"$exists": True} if min_conditions > 0: query[f"conditions.{min_conditions-1}"] = {"$exists": True} projection = { "fhir_id": 1, "full_name": 1, "gender": 1, "date_of_birth": 1, "city": 1, "state": 1, "conditions": 1, "medications": 1, "encounters": 1, "notes": 1, "source": 1 } try: cursor = patients_collection.find(query, projection).skip(skip).limit(limit) patients = [] async for patient in cursor: patients.append({ "id": str(patient["_id"]), "fhir_id": patient.get("fhir_id"), "full_name": patient.get("full_name"), "gender": patient.get("gender"), "date_of_birth": patient.get("date_of_birth"), "city": patient.get("city"), "state": patient.get("state"), "conditions": patient.get("conditions", []), "medications": patient.get("medications", []), "encounters": patient.get("encounters", []), "notes": patient.get("notes", []), "source": patient.get("source", "unknown"), "age": calculate_age(patient.get("date_of_birth")), "stats": { "notes": len(patient.get("notes", [])), "conditions": len(patient.get("conditions", [])), "medications": len(patient.get("medications", [])) } }) logger.info(f"Retrieved {len(patients)} patients") return patients except Exception as e: logger.error(f"Failed to list patients: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to retrieve patients: {str(e)}" ) @router.get("/patients/{patient_id}", response_model=dict) async def get_patient(patient_id: str): logger.info(f"Retrieving patient: {patient_id}") try: patient = await patients_collection.find_one({ "$or": [ {"_id": ObjectId(patient_id)}, {"fhir_id": patient_id} ] }) if not patient: logger.warning(f"Patient not found: {patient_id}") raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Patient not found" ) response = { "demographics": { "id": str(patient["_id"]), "fhir_id": patient.get("fhir_id"), "full_name": patient.get("full_name"), "gender": patient.get("gender"), "date_of_birth": patient.get("date_of_birth"), "age": calculate_age(patient.get("date_of_birth")), "address": { "line": patient.get("address"), "city": patient.get("city"), "state": patient.get("state"), "postal_code": patient.get("postal_code"), "country": patient.get("country") }, "marital_status": patient.get("marital_status"), "language": patient.get("language") }, "clinical_data": { "notes": patient.get("notes", []), "conditions": patient.get("conditions", []), "medications": patient.get("medications", []), "encounters": patient.get("encounters", []) }, "metadata": { "source": patient.get("source"), "import_date": patient.get("import_date"), "last_updated": patient.get("last_updated") } } logger.info(f"Successfully retrieved patient: {patient_id}") return response except ValueError as ve: logger.error(f"Invalid patient ID format: {patient_id}, error: {str(ve)}") raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid patient ID format" ) except Exception as e: logger.error(f"Failed to retrieve patient {patient_id}: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to retrieve patient: {str(e)}" ) @router.post("/{patient_id}/notes", status_code=status.HTTP_201_CREATED) async def add_note( patient_id: str, note: Note, current_user: dict = Depends(get_current_user) ): logger.info(f"Adding note for patient {patient_id} by user {current_user.get('email')}") if current_user.get('role') not in ['doctor', 'admin']: logger.warning(f"Unauthorized note addition attempt by {current_user.get('email')}") raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Only clinicians can add notes" ) try: note_data = note.dict() note_data.update({ "author": current_user.get('full_name', 'System'), "timestamp": datetime.utcnow().isoformat() }) result = await patients_collection.update_one( {"$or": [ {"_id": ObjectId(patient_id)}, {"fhir_id": patient_id} ]}, { "$push": {"notes": note_data}, "$set": {"last_updated": datetime.utcnow().isoformat()} } ) if result.modified_count == 0: logger.warning(f"Patient not found for note addition: {patient_id}") raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Patient not found" ) logger.info(f"Note added successfully for patient {patient_id}") return {"status": "success", "message": "Note added"} except ValueError as ve: logger.error(f"Invalid patient ID format: {patient_id}, error: {str(ve)}") raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid patient ID format" ) except Exception as e: logger.error(f"Failed to add note for patient {patient_id}: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to add note: {str(e)}" ) @router.put("/patients/{patient_id}", status_code=status.HTTP_200_OK) async def update_patient( patient_id: str, update_data: PatientUpdate, current_user: dict = Depends(get_current_user) ): """Update a patient's record in the database""" logger.info(f"Updating patient {patient_id} by user {current_user.get('email')}") if current_user.get('role') not in ['admin', 'doctor']: logger.warning(f"Unauthorized update attempt by {current_user.get('email')}") raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Only administrators and doctors can update patients" ) try: # Build the query based on whether patient_id is a valid ObjectId query = {"fhir_id": patient_id} if ObjectId.is_valid(patient_id): query = { "$or": [ {"_id": ObjectId(patient_id)}, {"fhir_id": patient_id} ] } # Check if patient exists patient = await patients_collection.find_one(query) if not patient: logger.warning(f"Patient not found for update: {patient_id}") raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Patient not found" ) # Prepare update operations update_ops = {"$set": {"last_updated": datetime.utcnow().isoformat()}} # Handle demographic updates demographics = { "full_name": update_data.full_name, "gender": update_data.gender, "date_of_birth": update_data.date_of_birth, "address": update_data.address, "city": update_data.city, "state": update_data.state, "postal_code": update_data.postal_code, "country": update_data.country, "marital_status": update_data.marital_status, "language": update_data.language } for key, value in demographics.items(): if value is not None: update_ops["$set"][key] = value # Handle array updates (conditions, medications, encounters, notes) array_fields = { "conditions": update_data.conditions, "medications": update_data.medications, "encounters": update_data.encounters, "notes": update_data.notes } for field, items in array_fields.items(): if items is not None: # Fetch existing items existing_items = patient.get(field, []) updated_items = [] for item in items: item_dict = item.dict(exclude_unset=True) if not item_dict: continue # Generate ID for new items if not item_dict.get("id"): item_dict["id"] = str(uuid.uuid4()) # Validate required fields if field == "conditions" and not item_dict.get("code"): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Condition code is required for {field}" ) if field == "medications" and not item_dict.get("name"): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Medication name is required for {field}" ) if field == "encounters" and not item_dict.get("type"): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Encounter type is required for {field}" ) if field == "notes" and not item_dict.get("content"): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Note content is required for {field}" ) updated_items.append(item_dict) # Replace the entire array update_ops["$set"][field] = updated_items # Perform the update result = await patients_collection.update_one(query, update_ops) if result.modified_count == 0 and result.matched_count == 0: logger.warning(f"Patient not found for update: {patient_id}") raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Patient not found" ) # Retrieve and return the updated patient updated_patient = await patients_collection.find_one(query) if not updated_patient: logger.error(f"Failed to retrieve updated patient: {patient_id}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve updated patient" ) response = { "id": str(updated_patient["_id"]), "fhir_id": updated_patient.get("fhir_id"), "full_name": updated_patient.get("full_name"), "gender": updated_patient.get("gender"), "date_of_birth": updated_patient.get("date_of_birth"), "address": updated_patient.get("address"), "city": updated_patient.get("city"), "state": updated_patient.get("state"), "postal_code": updated_patient.get("postal_code"), "country": updated_patient.get("country"), "marital_status": updated_patient.get("marital_status"), "language": updated_patient.get("language"), "conditions": updated_patient.get("conditions", []), "medications": updated_patient.get("medications", []), "encounters": updated_patient.get("encounters", []), "notes": updated_patient.get("notes", []), "source": updated_patient.get("source"), "import_date": updated_patient.get("import_date"), "last_updated": updated_patient.get("last_updated") } logger.info(f"Successfully updated patient {patient_id}") return response except ValueError as ve: logger.error(f"Invalid patient ID format: {patient_id}, error: {str(ve)}") raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid patient ID format" ) except HTTPException: raise except Exception as e: logger.error(f"Failed to update patient {patient_id}: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to update patient: {str(e)}" ) # Export the router as 'patients' for api.__init__.py patients = router