Spaces:
Sleeping
Sleeping
| from fastapi import APIRouter, HTTPException, Depends, Query, status, Body | |
| from db.mongo import patients_collection | |
| from core.security import get_current_user | |
| from utils.db import create_indexes | |
| from utils.helpers import calculate_age, standardize_language | |
| from models.entities import Note, PatientCreate | |
| from datetime import datetime | |
| from bson import ObjectId | |
| from bson.errors import InvalidId | |
| from typing import Optional, List, Dict, Any | |
| from pymongo import UpdateOne, DeleteOne | |
| from pymongo.errors import BulkWriteError | |
| import json | |
| from pathlib import Path | |
| import glob | |
| import uuid | |
| import re | |
| import logging | |
| import time | |
| import os | |
| from pydantic import BaseModel, Field | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(name)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter() | |
| # Configuration | |
| BASE_DIR = Path(__file__).resolve().parent.parent.parent | |
| SYNTHEA_DATA_DIR = BASE_DIR / "output" / "fhir" | |
| os.makedirs(SYNTHEA_DATA_DIR, exist_ok=True) | |
| # Pydantic models for update validation | |
| class ConditionUpdate(BaseModel): | |
| id: Optional[str] = None | |
| code: Optional[str] = None | |
| status: Optional[str] = None | |
| onset_date: Optional[str] = None | |
| recorded_date: Optional[str] = None | |
| verification_status: Optional[str] = None | |
| notes: Optional[str] = None | |
| class MedicationUpdate(BaseModel): | |
| id: Optional[str] = None | |
| name: Optional[str] = None | |
| status: Optional[str] = None | |
| prescribed_date: Optional[str] = None | |
| requester: Optional[str] = None | |
| dosage: Optional[str] = None | |
| class EncounterUpdate(BaseModel): | |
| id: Optional[str] = None | |
| type: Optional[str] = None | |
| status: Optional[str] = None | |
| period: Optional[Dict[str, str]] = None | |
| service_provider: Optional[str] = None | |
| class NoteUpdate(BaseModel): | |
| id: Optional[str] = None | |
| title: Optional[str] = None | |
| date: Optional[str] = None | |
| author: Optional[str] = None | |
| content: Optional[str] = None | |
| class PatientUpdate(BaseModel): | |
| full_name: Optional[str] = None | |
| gender: Optional[str] = None | |
| date_of_birth: Optional[str] = None | |
| address: Optional[str] = None | |
| city: Optional[str] = None | |
| state: Optional[str] = None | |
| postal_code: Optional[str] = None | |
| country: Optional[str] = None | |
| marital_status: Optional[str] = None | |
| language: Optional[str] = None | |
| conditions: Optional[List[ConditionUpdate]] = None | |
| medications: Optional[List[MedicationUpdate]] = None | |
| encounters: Optional[List[EncounterUpdate]] = None | |
| notes: Optional[List[NoteUpdate]] = None | |
| async def debug_patient_count(): | |
| """Debug endpoint to verify patient counts""" | |
| try: | |
| total = await patients_collection.count_documents({}) | |
| synthea = await patients_collection.count_documents({"source": "synthea"}) | |
| manual = await patients_collection.count_documents({"source": "manual"}) | |
| return { | |
| "total": total, | |
| "synthea": synthea, | |
| "manual": manual, | |
| "message": f"Found {total} total patients ({synthea} from synthea, {manual} manual)" | |
| } | |
| except Exception as e: | |
| logger.error(f"Error counting patients: {str(e)}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Error counting patients: {str(e)}" | |
| ) | |
| async def create_patient( | |
| patient_data: PatientCreate, | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| """Create a new patient in the database""" | |
| logger.info(f"Creating new patient by user {current_user.get('email')}") | |
| if current_user.get('role') not in ['admin', 'doctor']: | |
| logger.warning(f"Unauthorized create attempt by {current_user.get('email')}") | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="Only administrators and doctors can create patients" | |
| ) | |
| try: | |
| # Prepare the patient document | |
| patient_doc = patient_data.dict() | |
| now = datetime.utcnow().isoformat() | |
| # Add system-generated fields | |
| patient_doc.update({ | |
| "fhir_id": str(uuid.uuid4()), | |
| "import_date": now, | |
| "last_updated": now, | |
| "source": "manual", | |
| "created_by": current_user.get('email') | |
| }) | |
| # Ensure arrays exist even if empty | |
| for field in ['conditions', 'medications', 'encounters', 'notes']: | |
| if field not in patient_doc: | |
| patient_doc[field] = [] | |
| # Insert the patient document | |
| result = await patients_collection.insert_one(patient_doc) | |
| # Return the created patient with the generated ID | |
| created_patient = await patients_collection.find_one( | |
| {"_id": result.inserted_id} | |
| ) | |
| if not created_patient: | |
| logger.error("Failed to retrieve created patient") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to retrieve created patient" | |
| ) | |
| created_patient["id"] = str(created_patient["_id"]) | |
| del created_patient["_id"] | |
| logger.info(f"Successfully created patient {created_patient['fhir_id']}") | |
| return created_patient | |
| except Exception as e: | |
| logger.error(f"Failed to create patient: {str(e)}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to create patient: {str(e)}" | |
| ) | |
| async def delete_patient( | |
| patient_id: str, | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| """Delete a patient from the database""" | |
| logger.info(f"Deleting patient {patient_id} by user {current_user.get('email')}") | |
| if current_user.get('role') not in ['admin', 'doctor']: | |
| logger.warning(f"Unauthorized delete attempt by {current_user.get('email')}") | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="Only administrators can delete patients" | |
| ) | |
| try: | |
| # Build the query based on whether patient_id is a valid ObjectId | |
| query = {"fhir_id": patient_id} | |
| if ObjectId.is_valid(patient_id): | |
| query = { | |
| "$or": [ | |
| {"_id": ObjectId(patient_id)}, | |
| {"fhir_id": patient_id} | |
| ] | |
| } | |
| # Check if patient exists | |
| patient = await patients_collection.find_one(query) | |
| if not patient: | |
| logger.warning(f"Patient not found for deletion: {patient_id}") | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="Patient not found" | |
| ) | |
| # Perform deletion | |
| result = await patients_collection.delete_one(query) | |
| if result.deleted_count == 0: | |
| logger.error(f"Failed to delete patient {patient_id}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to delete patient" | |
| ) | |
| logger.info(f"Successfully deleted patient {patient_id}") | |
| return None | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Failed to delete patient {patient_id}: {str(e)}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to delete patient: {str(e)}" | |
| ) | |
| async def process_synthea_patient(bundle: dict, file_path: str) -> Optional[dict]: | |
| logger.debug(f"Processing patient from file: {file_path}") | |
| patient_data = {} | |
| notes = [] | |
| conditions = [] | |
| medications = [] | |
| encounters = [] | |
| # Validate bundle structure | |
| if not isinstance(bundle, dict) or 'entry' not in bundle: | |
| logger.error(f"Invalid FHIR bundle structure in {file_path}") | |
| return None | |
| for entry in bundle.get('entry', []): | |
| resource = entry.get('resource', {}) | |
| resource_type = resource.get('resourceType') | |
| if not resource_type: | |
| logger.warning(f"Skipping entry with missing resourceType in {file_path}") | |
| continue | |
| try: | |
| if resource_type == 'Patient': | |
| name = resource.get('name', [{}])[0] | |
| address = resource.get('address', [{}])[0] | |
| # Construct full name and remove numbers | |
| raw_full_name = f"{' '.join(name.get('given', ['']))} {name.get('family', '')}".strip() | |
| clean_full_name = re.sub(r'\d+', '', raw_full_name).strip() | |
| patient_data = { | |
| 'fhir_id': resource.get('id'), | |
| 'full_name': clean_full_name, | |
| 'gender': resource.get('gender', 'unknown'), | |
| 'date_of_birth': resource.get('birthDate', ''), | |
| 'address': ' '.join(address.get('line', [''])), | |
| 'city': address.get('city', ''), | |
| 'state': address.get('state', ''), | |
| 'postal_code': address.get('postalCode', ''), | |
| 'country': address.get('country', ''), | |
| 'marital_status': resource.get('maritalStatus', {}).get('text', ''), | |
| 'language': standardize_language(resource.get('communication', [{}])[0].get('language', {}).get('text', '')), | |
| 'source': 'synthea', | |
| 'last_updated': datetime.utcnow().isoformat() | |
| } | |
| elif resource_type == 'Encounter': | |
| encounter = { | |
| 'id': resource.get('id'), | |
| 'type': resource.get('type', [{}])[0].get('text', ''), | |
| 'status': resource.get('status'), | |
| 'period': resource.get('period', {}), | |
| 'service_provider': resource.get('serviceProvider', {}).get('display', '') | |
| } | |
| encounters.append(encounter) | |
| for note in resource.get('note', []): | |
| if note.get('text'): | |
| notes.append({ | |
| 'date': resource.get('period', {}).get('start', datetime.utcnow().isoformat()), | |
| 'type': resource.get('type', [{}])[0].get('text', 'Encounter Note'), | |
| 'text': note.get('text'), | |
| 'context': f"Encounter: {encounter.get('type')}", | |
| 'author': 'System Generated' | |
| }) | |
| elif resource_type == 'Condition': | |
| conditions.append({ | |
| 'id': resource.get('id'), | |
| 'code': resource.get('code', {}).get('text', ''), | |
| 'status': resource.get('clinicalStatus', {}).get('text', ''), | |
| 'onset_date': resource.get('onsetDateTime'), | |
| 'recorded_date': resource.get('recordedDate'), | |
| 'verification_status': resource.get('verificationStatus', {}).get('text', '') | |
| }) | |
| elif resource_type == 'MedicationRequest': | |
| medications.append({ | |
| 'id': resource.get('id'), | |
| 'name': resource.get('medicationCodeableConcept', {}).get('text', ''), | |
| 'status': resource.get('status'), | |
| 'prescribed_date': resource.get('authoredOn'), | |
| 'requester': resource.get('requester', {}).get('display', ''), | |
| 'dosage': resource.get('dosageInstruction', [{}])[0].get('text', '') | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error processing {resource_type} in {file_path}: {str(e)}") | |
| continue | |
| if patient_data: | |
| patient_data.update({ | |
| 'notes': notes, | |
| 'conditions': conditions, | |
| 'medications': medications, | |
| 'encounters': encounters, | |
| 'import_date': datetime.utcnow().isoformat() | |
| }) | |
| logger.info(f"Successfully processed patient {patient_data.get('fhir_id')} from {file_path}") | |
| return patient_data | |
| logger.warning(f"No valid patient data found in {file_path}") | |
| return None | |
| async def import_patients( | |
| limit: int = Query(100, ge=1, le=1000), | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| request_id = str(uuid.uuid4()) | |
| logger.info(f"Starting import request {request_id} by user {current_user.get('email')}") | |
| start_time = time.time() | |
| if current_user.get('role') not in ['admin', 'doctor']: | |
| logger.warning(f"Unauthorized import attempt by {current_user.get('email')}") | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="Only administrators and doctors can import data" | |
| ) | |
| try: | |
| await create_indexes() | |
| if not SYNTHEA_DATA_DIR.exists(): | |
| logger.error(f"Synthea data directory not found: {SYNTHEA_DATA_DIR}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Data directory not found" | |
| ) | |
| # Filter out non-patient files | |
| files = [ | |
| f for f in glob.glob(str(SYNTHEA_DATA_DIR / "*.json")) | |
| if not re.search(r'(hospitalInformation|practitionerInformation)\d+\.json$', f) | |
| ] | |
| if not files: | |
| logger.warning("No valid patient JSON files found in synthea data directory") | |
| return { | |
| "status": "success", | |
| "message": "No patient data files found", | |
| "imported": 0, | |
| "request_id": request_id | |
| } | |
| operations = [] | |
| imported = 0 | |
| errors = [] | |
| for file_path in files[:limit]: | |
| try: | |
| logger.debug(f"Processing file: {file_path}") | |
| # Check file accessibility | |
| if not os.path.exists(file_path): | |
| logger.error(f"File not found: {file_path}") | |
| errors.append(f"File not found: {file_path}") | |
| continue | |
| # Check file size | |
| file_size = os.path.getsize(file_path) | |
| if file_size == 0: | |
| logger.warning(f"Empty file: {file_path}") | |
| errors.append(f"Empty file: {file_path}") | |
| continue | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| try: | |
| bundle = json.load(f) | |
| except json.JSONDecodeError as je: | |
| logger.error(f"Invalid JSON in {file_path}: {str(je)}") | |
| errors.append(f"Invalid JSON in {file_path}: {str(je)}") | |
| continue | |
| patient = await process_synthea_patient(bundle, file_path) | |
| if patient: | |
| if not patient.get('fhir_id'): | |
| logger.warning(f"Missing FHIR ID in patient data from {file_path}") | |
| errors.append(f"Missing FHIR ID in {file_path}") | |
| continue | |
| operations.append(UpdateOne( | |
| {"fhir_id": patient['fhir_id']}, | |
| {"$setOnInsert": patient}, | |
| upsert=True | |
| )) | |
| imported += 1 | |
| else: | |
| logger.warning(f"No valid patient data in {file_path}") | |
| errors.append(f"No valid patient data in {file_path}") | |
| except Exception as e: | |
| logger.error(f"Error processing {file_path}: {str(e)}") | |
| errors.append(f"Error in {file_path}: {str(e)}") | |
| continue | |
| response = { | |
| "status": "success", | |
| "imported": imported, | |
| "errors": errors, | |
| "request_id": request_id, | |
| "duration_seconds": time.time() - start_time | |
| } | |
| if operations: | |
| try: | |
| result = await patients_collection.bulk_write(operations, ordered=False) | |
| response.update({ | |
| "upserted": result.upserted_count, | |
| "existing": len(operations) - result.upserted_count | |
| }) | |
| logger.info(f"Import request {request_id} completed: {imported} patients processed, " | |
| f"{result.upserted_count} upserted, {len(errors)} errors") | |
| except BulkWriteError as bwe: | |
| logger.error(f"Partial bulk write failure for request {request_id}: {str(bwe.details)}") | |
| response.update({ | |
| "upserted": bwe.details.get('nUpserted', 0), | |
| "existing": len(operations) - bwe.details.get('nUpserted', 0), | |
| "write_errors": [ | |
| f"Index {err['index']}: {err['errmsg']}" for err in bwe.details.get('writeErrors', []) | |
| ] | |
| }) | |
| logger.info(f"Import request {request_id} partially completed: {imported} patients processed, " | |
| f"{response['upserted']} upserted, {len(errors)} errors") | |
| except Exception as e: | |
| logger.error(f"Bulk write failed for request {request_id}: {str(e)}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Database operation failed: {str(e)}" | |
| ) | |
| else: | |
| logger.info(f"Import request {request_id} completed: No new patients to import, {len(errors)} errors") | |
| response["message"] = "No new patients found to import" | |
| return response | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Import request {request_id} failed: {str(e)}", exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Import failed: {str(e)}" | |
| ) | |
| async def list_patients( | |
| search: Optional[str] = Query(None), | |
| min_notes: int = Query(0, ge=0), | |
| min_conditions: int = Query(0, ge=0), | |
| limit: int = Query(100, ge=1, le=500), | |
| skip: int = Query(0, ge=0) | |
| ): | |
| logger.info(f"Listing patients with search: {search}, limit: {limit}, skip: {skip}") | |
| query = {} | |
| if search: | |
| query["$or"] = [ | |
| {"full_name": {"$regex": search, "$options": "i"}}, | |
| {"fhir_id": search} | |
| ] | |
| if min_notes > 0: | |
| query[f"notes.{min_notes-1}"] = {"$exists": True} | |
| if min_conditions > 0: | |
| query[f"conditions.{min_conditions-1}"] = {"$exists": True} | |
| projection = { | |
| "fhir_id": 1, | |
| "full_name": 1, | |
| "gender": 1, | |
| "date_of_birth": 1, | |
| "city": 1, | |
| "state": 1, | |
| "conditions": 1, | |
| "medications": 1, | |
| "encounters": 1, | |
| "notes": 1, | |
| "source": 1 | |
| } | |
| try: | |
| cursor = patients_collection.find(query, projection).skip(skip).limit(limit) | |
| patients = [] | |
| async for patient in cursor: | |
| patients.append({ | |
| "id": str(patient["_id"]), | |
| "fhir_id": patient.get("fhir_id"), | |
| "full_name": patient.get("full_name"), | |
| "gender": patient.get("gender"), | |
| "date_of_birth": patient.get("date_of_birth"), | |
| "city": patient.get("city"), | |
| "state": patient.get("state"), | |
| "conditions": patient.get("conditions", []), | |
| "medications": patient.get("medications", []), | |
| "encounters": patient.get("encounters", []), | |
| "notes": patient.get("notes", []), | |
| "source": patient.get("source", "unknown"), | |
| "age": calculate_age(patient.get("date_of_birth")), | |
| "stats": { | |
| "notes": len(patient.get("notes", [])), | |
| "conditions": len(patient.get("conditions", [])), | |
| "medications": len(patient.get("medications", [])) | |
| } | |
| }) | |
| logger.info(f"Retrieved {len(patients)} patients") | |
| return patients | |
| except Exception as e: | |
| logger.error(f"Failed to list patients: {str(e)}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to retrieve patients: {str(e)}" | |
| ) | |
| async def get_patient(patient_id: str): | |
| logger.info(f"Retrieving patient: {patient_id}") | |
| try: | |
| patient = await patients_collection.find_one({ | |
| "$or": [ | |
| {"_id": ObjectId(patient_id)}, | |
| {"fhir_id": patient_id} | |
| ] | |
| }) | |
| if not patient: | |
| logger.warning(f"Patient not found: {patient_id}") | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="Patient not found" | |
| ) | |
| response = { | |
| "demographics": { | |
| "id": str(patient["_id"]), | |
| "fhir_id": patient.get("fhir_id"), | |
| "full_name": patient.get("full_name"), | |
| "gender": patient.get("gender"), | |
| "date_of_birth": patient.get("date_of_birth"), | |
| "age": calculate_age(patient.get("date_of_birth")), | |
| "address": { | |
| "line": patient.get("address"), | |
| "city": patient.get("city"), | |
| "state": patient.get("state"), | |
| "postal_code": patient.get("postal_code"), | |
| "country": patient.get("country") | |
| }, | |
| "marital_status": patient.get("marital_status"), | |
| "language": patient.get("language") | |
| }, | |
| "clinical_data": { | |
| "notes": patient.get("notes", []), | |
| "conditions": patient.get("conditions", []), | |
| "medications": patient.get("medications", []), | |
| "encounters": patient.get("encounters", []) | |
| }, | |
| "metadata": { | |
| "source": patient.get("source"), | |
| "import_date": patient.get("import_date"), | |
| "last_updated": patient.get("last_updated") | |
| } | |
| } | |
| logger.info(f"Successfully retrieved patient: {patient_id}") | |
| return response | |
| except ValueError as ve: | |
| logger.error(f"Invalid patient ID format: {patient_id}, error: {str(ve)}") | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Invalid patient ID format" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Failed to retrieve patient {patient_id}: {str(e)}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to retrieve patient: {str(e)}" | |
| ) | |
| async def add_note( | |
| patient_id: str, | |
| note: Note, | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| logger.info(f"Adding note for patient {patient_id} by user {current_user.get('email')}") | |
| if current_user.get('role') not in ['doctor', 'admin']: | |
| logger.warning(f"Unauthorized note addition attempt by {current_user.get('email')}") | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="Only clinicians can add notes" | |
| ) | |
| try: | |
| note_data = note.dict() | |
| note_data.update({ | |
| "author": current_user.get('full_name', 'System'), | |
| "timestamp": datetime.utcnow().isoformat() | |
| }) | |
| result = await patients_collection.update_one( | |
| {"$or": [ | |
| {"_id": ObjectId(patient_id)}, | |
| {"fhir_id": patient_id} | |
| ]}, | |
| { | |
| "$push": {"notes": note_data}, | |
| "$set": {"last_updated": datetime.utcnow().isoformat()} | |
| } | |
| ) | |
| if result.modified_count == 0: | |
| logger.warning(f"Patient not found for note addition: {patient_id}") | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="Patient not found" | |
| ) | |
| logger.info(f"Note added successfully for patient {patient_id}") | |
| return {"status": "success", "message": "Note added"} | |
| except ValueError as ve: | |
| logger.error(f"Invalid patient ID format: {patient_id}, error: {str(ve)}") | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Invalid patient ID format" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Failed to add note for patient {patient_id}: {str(e)}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to add note: {str(e)}" | |
| ) | |
| async def update_patient( | |
| patient_id: str, | |
| update_data: PatientUpdate, | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| """Update a patient's record in the database""" | |
| logger.info(f"Updating patient {patient_id} by user {current_user.get('email')}") | |
| if current_user.get('role') not in ['admin', 'doctor']: | |
| logger.warning(f"Unauthorized update attempt by {current_user.get('email')}") | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="Only administrators and doctors can update patients" | |
| ) | |
| try: | |
| # Build the query based on whether patient_id is a valid ObjectId | |
| query = {"fhir_id": patient_id} | |
| if ObjectId.is_valid(patient_id): | |
| query = { | |
| "$or": [ | |
| {"_id": ObjectId(patient_id)}, | |
| {"fhir_id": patient_id} | |
| ] | |
| } | |
| # Check if patient exists | |
| patient = await patients_collection.find_one(query) | |
| if not patient: | |
| logger.warning(f"Patient not found for update: {patient_id}") | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="Patient not found" | |
| ) | |
| # Prepare update operations | |
| update_ops = {"$set": {"last_updated": datetime.utcnow().isoformat()}} | |
| # Handle demographic updates | |
| demographics = { | |
| "full_name": update_data.full_name, | |
| "gender": update_data.gender, | |
| "date_of_birth": update_data.date_of_birth, | |
| "address": update_data.address, | |
| "city": update_data.city, | |
| "state": update_data.state, | |
| "postal_code": update_data.postal_code, | |
| "country": update_data.country, | |
| "marital_status": update_data.marital_status, | |
| "language": update_data.language | |
| } | |
| for key, value in demographics.items(): | |
| if value is not None: | |
| update_ops["$set"][key] = value | |
| # Handle array updates (conditions, medications, encounters, notes) | |
| array_fields = { | |
| "conditions": update_data.conditions, | |
| "medications": update_data.medications, | |
| "encounters": update_data.encounters, | |
| "notes": update_data.notes | |
| } | |
| for field, items in array_fields.items(): | |
| if items is not None: | |
| # Fetch existing items | |
| existing_items = patient.get(field, []) | |
| updated_items = [] | |
| for item in items: | |
| item_dict = item.dict(exclude_unset=True) | |
| if not item_dict: | |
| continue | |
| # Generate ID for new items | |
| if not item_dict.get("id"): | |
| item_dict["id"] = str(uuid.uuid4()) | |
| # Validate required fields | |
| if field == "conditions" and not item_dict.get("code"): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=f"Condition code is required for {field}" | |
| ) | |
| if field == "medications" and not item_dict.get("name"): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=f"Medication name is required for {field}" | |
| ) | |
| if field == "encounters" and not item_dict.get("type"): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=f"Encounter type is required for {field}" | |
| ) | |
| if field == "notes" and not item_dict.get("content"): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=f"Note content is required for {field}" | |
| ) | |
| updated_items.append(item_dict) | |
| # Replace the entire array | |
| update_ops["$set"][field] = updated_items | |
| # Perform the update | |
| result = await patients_collection.update_one(query, update_ops) | |
| if result.modified_count == 0 and result.matched_count == 0: | |
| logger.warning(f"Patient not found for update: {patient_id}") | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="Patient not found" | |
| ) | |
| # Retrieve and return the updated patient | |
| updated_patient = await patients_collection.find_one(query) | |
| if not updated_patient: | |
| logger.error(f"Failed to retrieve updated patient: {patient_id}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to retrieve updated patient" | |
| ) | |
| response = { | |
| "id": str(updated_patient["_id"]), | |
| "fhir_id": updated_patient.get("fhir_id"), | |
| "full_name": updated_patient.get("full_name"), | |
| "gender": updated_patient.get("gender"), | |
| "date_of_birth": updated_patient.get("date_of_birth"), | |
| "address": updated_patient.get("address"), | |
| "city": updated_patient.get("city"), | |
| "state": updated_patient.get("state"), | |
| "postal_code": updated_patient.get("postal_code"), | |
| "country": updated_patient.get("country"), | |
| "marital_status": updated_patient.get("marital_status"), | |
| "language": updated_patient.get("language"), | |
| "conditions": updated_patient.get("conditions", []), | |
| "medications": updated_patient.get("medications", []), | |
| "encounters": updated_patient.get("encounters", []), | |
| "notes": updated_patient.get("notes", []), | |
| "source": updated_patient.get("source"), | |
| "import_date": updated_patient.get("import_date"), | |
| "last_updated": updated_patient.get("last_updated") | |
| } | |
| logger.info(f"Successfully updated patient {patient_id}") | |
| return response | |
| except ValueError as ve: | |
| logger.error(f"Invalid patient ID format: {patient_id}, error: {str(ve)}") | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Invalid patient ID format" | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Failed to update patient {patient_id}: {str(e)}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to update patient: {str(e)}" | |
| ) | |
| # Export the router as 'patients' for api.__init__.py | |
| patients = router |