CPS-API / api /routes /patients.py
Ali2206's picture
Update api/routes/patients.py
6109265 verified
raw
history blame
19.1 kB
from fastapi import APIRouter, HTTPException, Depends, Query, status
from db.mongo import patients_collection
from core.security import get_current_user
from utils.db import create_indexes
from utils.helpers import calculate_age, standardize_language
from models.entities import Note
from datetime import datetime
from bson import ObjectId
from bson.errors import InvalidId
from typing import Optional, List, Dict
from pymongo import UpdateOne
from pymongo.errors import BulkWriteError
import json
from pathlib import Path
import glob
import uuid
import re
import logging
import time
import os
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(name)s - %(message)s'
)
logger = logging.getLogger(__name__)
router = APIRouter()
# Configuration
BASE_DIR = Path(__file__).resolve().parent.parent.parent
SYNTHEA_DATA_DIR = BASE_DIR / "output" / "fhir"
os.makedirs(SYNTHEA_DATA_DIR, exist_ok=True)
async def process_synthea_patient(bundle: dict, file_path: str) -> Optional[dict]:
logger.debug(f"Processing patient from file: {file_path}")
patient_data = {}
notes = []
conditions = []
medications = []
encounters = []
# Validate bundle structure
if not isinstance(bundle, dict) or 'entry' not in bundle:
logger.error(f"Invalid FHIR bundle structure in {file_path}")
return None
for entry in bundle.get('entry', []):
resource = entry.get('resource', {})
resource_type = resource.get('resourceType')
if not resource_type:
logger.warning(f"Skipping entry with missing resourceType in {file_path}")
continue
try:
if resource_type == 'Patient':
name = resource.get('name', [{}])[0]
address = resource.get('address', [{}])[0]
# Construct full name and remove numbers
raw_full_name = f"{' '.join(name.get('given', ['']))} {name.get('family', '')}".strip()
clean_full_name = re.sub(r'\d+', '', raw_full_name).strip()
patient_data = {
'fhir_id': resource.get('id'),
'full_name': clean_full_name,
'gender': resource.get('gender', 'unknown'),
'date_of_birth': resource.get('birthDate', ''),
'address': ' '.join(address.get('line', [''])),
'city': address.get('city', ''),
'state': address.get('state', ''),
'postal_code': address.get('postalCode', ''),
'country': address.get('country', ''),
'marital_status': resource.get('maritalStatus', {}).get('text', ''),
'language': standardize_language(resource.get('communication', [{}])[0].get('language', {}).get('text', '')),
'source': 'synthea',
'last_updated': datetime.utcnow().isoformat()
}
elif resource_type == 'Encounter':
encounter = {
'id': resource.get('id'),
'type': resource.get('type', [{}])[0].get('text', ''),
'status': resource.get('status'),
'period': resource.get('period', {}),
'service_provider': resource.get('serviceProvider', {}).get('display', '')
}
encounters.append(encounter)
for note in resource.get('note', []):
if note.get('text'):
notes.append({
'date': resource.get('period', {}).get('start', datetime.utcnow().isoformat()),
'type': resource.get('type', [{}])[0].get('text', 'Encounter Note'),
'text': note.get('text'),
'context': f"Encounter: {encounter.get('type')}",
'author': 'System Generated'
})
elif resource_type == 'Condition':
conditions.append({
'id': resource.get('id'),
'code': resource.get('code', {}).get('text', ''),
'status': resource.get('clinicalStatus', {}).get('text', ''),
'onset_date': resource.get('onsetDateTime'),
'recorded_date': resource.get('recordedDate'),
'verification_status': resource.get('verificationStatus', {}).get('text', '')
})
elif resource_type == 'MedicationRequest':
medications.append({
'id': resource.get('id'),
'name': resource.get('medicationCodeableConcept', {}).get('text', ''),
'status': resource.get('status'),
'prescribed_date': resource.get('authoredOn'),
'requester': resource.get('requester', {}).get('display', ''),
'dosage': resource.get('dosageInstruction', [{}])[0].get('text', '')
})
except Exception as e:
logger.error(f"Error processing {resource_type} in {file_path}: {str(e)}")
continue
if patient_data:
patient_data.update({
'notes': notes,
'conditions': conditions,
'medications': medications,
'encounters': encounters,
'import_date': datetime.utcnow().isoformat()
})
logger.info(f"Successfully processed patient {patient_data.get('fhir_id')} from {file_path}")
return patient_data
logger.warning(f"No valid patient data found in {file_path}")
return None
@router.post("/import", status_code=status.HTTP_201_CREATED)
async def import_patients(
limit: int = Query(100, ge=1, le=1000),
current_user: dict = Depends(get_current_user)
):
request_id = str(uuid.uuid4())
logger.info(f"Starting import request {request_id} by user {current_user.get('email')}")
start_time = time.time()
if current_user.get('role') not in ['admin', 'doctor']:
logger.warning(f"Unauthorized import attempt by {current_user.get('email')}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only administrators and doctors can import data"
)
try:
await create_indexes()
if not SYNTHEA_DATA_DIR.exists():
logger.error(f"Synthea data directory not found: {SYNTHEA_DATA_DIR}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Data directory not found"
)
# Filter out non-patient files
files = [
f for f in glob.glob(str(SYNTHEA_DATA_DIR / "*.json"))
if not re.search(r'(hospitalInformation|practitionerInformation)\d+\.json$', f)
]
if not files:
logger.warning("No valid patient JSON files found in synthea data directory")
return {
"status": "success",
"message": "No patient data files found",
"imported": 0,
"request_id": request_id
}
operations = []
imported = 0
errors = []
for file_path in files[:limit]:
try:
logger.debug(f"Processing file: {file_path}")
# Check file accessibility
if not os.path.exists(file_path):
logger.error(f"File not found: {file_path}")
errors.append(f"File not found: {file_path}")
continue
# Check file size
file_size = os.path.getsize(file_path)
if file_size == 0:
logger.warning(f"Empty file: {file_path}")
errors.append(f"Empty file: {file_path}")
continue
with open(file_path, 'r', encoding='utf-8') as f:
try:
bundle = json.load(f)
except json.JSONDecodeError as je:
logger.error(f"Invalid JSON in {file_path}: {str(je)}")
errors.append(f"Invalid JSON in {file_path}: {str(je)}")
continue
patient = await process_synthea_patient(bundle, file_path)
if patient:
if not patient.get('fhir_id'):
logger.warning(f"Missing FHIR ID in patient data from {file_path}")
errors.append(f"Missing FHIR ID in {file_path}")
continue
operations.append(UpdateOne(
{"fhir_id": patient['fhir_id']},
{"$setOnInsert": patient},
upsert=True
))
imported += 1
else:
logger.warning(f"No valid patient data in {file_path}")
errors.append(f"No valid patient data in {file_path}")
except Exception as e:
logger.error(f"Error processing {file_path}: {str(e)}")
errors.append(f"Error in {file_path}: {str(e)}")
continue
response = {
"status": "success",
"imported": imported,
"errors": errors,
"request_id": request_id,
"duration_seconds": time.time() - start_time
}
if operations:
try:
result = await patients_collection.bulk_write(operations, ordered=False)
response.update({
"upserted": result.upserted_count,
"existing": len(operations) - result.upserted_count
})
logger.info(f"Import request {request_id} completed: {imported} patients processed, "
f"{result.upserted_count} upserted, {len(errors)} errors")
except BulkWriteError as bwe:
logger.error(f"Partial bulk write failure for request {request_id}: {str(bwe.details)}")
response.update({
"upserted": bwe.details.get('nUpserted', 0),
"existing": len(operations) - bwe.details.get('nUpserted', 0),
"write_errors": [
f"Index {err['index']}: {err['errmsg']}" for err in bwe.details.get('writeErrors', [])
]
})
logger.info(f"Import request {request_id} partially completed: {imported} patients processed, "
f"{response['upserted']} upserted, {len(errors)} errors")
except Exception as e:
logger.error(f"Bulk write failed for request {request_id}: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Database operation failed: {str(e)}"
)
else:
logger.info(f"Import request {request_id} completed: No new patients to import, {len(errors)} errors")
response["message"] = "No new patients found to import"
return response
except HTTPException:
raise
except Exception as e:
logger.error(f"Import request {request_id} failed: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Import failed: {str(e)}"
)
@router.get("/patients", response_model=List[dict])
async def list_patients(
search: Optional[str] = Query(None),
min_notes: int = Query(0, ge=0),
min_conditions: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=500),
skip: int = Query(0, ge=0)
):
logger.info(f"Listing patients with search: {search}, limit: {limit}, skip: {skip}")
query = {"source": "synthea"}
if search:
query["$or"] = [
{"full_name": {"$regex": search, "$options": "i"}},
{"fhir_id": search}
]
if min_notes > 0:
query[f"notes.{min_notes-1}"] = {"$exists": True}
if min_conditions > 0:
query[f"conditions.{min_conditions-1}"] = {"$exists": True}
# Removed $slice to return full arrays for the frontend
projection = {
"fhir_id": 1,
"full_name": 1,
"gender": 1,
"date_of_birth": 1,
"city": 1,
"state": 1,
"conditions": 1,
"medications": 1,
"encounters": 1,
"notes": 1
}
try:
cursor = patients_collection.find(query, projection).skip(skip).limit(limit)
patients = []
async for patient in cursor:
patients.append({
"id": str(patient["_id"]),
"fhir_id": patient.get("fhir_id"),
"full_name": patient.get("full_name"),
"gender": patient.get("gender"),
"date_of_birth": patient.get("date_of_birth"),
"city": patient.get("city"),
"state": patient.get("state"),
"conditions": patient.get("conditions", []),
"medications": patient.get("medications", []),
"encounters": patient.get("encounters", []),
"notes": patient.get("notes", []),
"age": calculate_age(patient.get("date_of_birth")),
"stats": {
"notes": len(patient.get("notes", [])),
"conditions": len(patient.get("conditions", [])),
"medications": len(patient.get("medications", []))
}
})
logger.info(f"Retrieved {len(patients)} patients")
return patients
except Exception as e:
logger.error(f"Failed to list patients: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to retrieve patients: {str(e)}"
)
@router.get("/patients/{patient_id}", response_model=dict)
async def get_patient(patient_id: str):
logger.info(f"Retrieving patient: {patient_id}")
try:
patient = await patients_collection.find_one({
"$or": [
{"_id": ObjectId(patient_id)},
{"fhir_id": patient_id}
]
})
if not patient:
logger.warning(f"Patient not found: {patient_id}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Patient not found"
)
response = {
"demographics": {
"id": str(patient["_id"]),
"fhir_id": patient.get("fhir_id"),
"full_name": patient.get("full_name"),
"gender": patient.get("gender"),
"date_of_birth": patient.get("date_of_birth"),
"age": calculate_age(patient.get("date_of_birth")),
"address": {
"line": patient.get("address"),
"city": patient.get("city"),
"state": patient.get("state"),
"postal_code": patient.get("postal_code"),
"country": patient.get("country")
},
"marital_status": patient.get("marital_status"),
"language": patient.get("language")
},
"clinical_data": {
"notes": patient.get("notes", []),
"conditions": patient.get("conditions", []),
"medications": patient.get("medications", []),
"encounters": patient.get("encounters", [])
},
"metadata": {
"source": patient.get("source"),
"import_date": patient.get("import_date"),
"last_updated": patient.get("last_updated")
}
}
logger.info(f"Successfully retrieved patient: {patient_id}")
return response
except ValueError as ve:
logger.error(f"Invalid patient ID format: {patient_id}, error: {str(ve)}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid patient ID format"
)
except Exception as e:
logger.error(f"Failed to retrieve patient {patient_id}: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to retrieve patient: {str(e)}"
)
@router.post("/patients/{patient_id}/notes", status_code=status.HTTP_201_CREATED)
async def add_note(
patient_id: str,
note: Note,
current_user: dict = Depends(get_current_user)
):
logger.info(f"Adding note for patient {patient_id} by user {current_user.get('email')}")
if current_user.get('role') not in ['doctor', 'admin']:
logger.warning(f"Unauthorized note addition attempt by {current_user.get('email')}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only clinicians can add notes"
)
try:
note_data = note.dict()
note_data.update({
"author": current_user.get('full_name', 'System'),
"timestamp": datetime.utcnow().isoformat()
})
result = await patients_collection.update_one(
{"$or": [
{"_id": ObjectId(patient_id)},
{"fhir_id": patient_id}
]},
{
"$push": {"notes": note_data},
"$set": {"last_updated": datetime.utcnow().isoformat()}
}
)
if result.modified_count == 0:
logger.warning(f"Patient not found for note addition: {patient_id}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Patient not found"
)
logger.info(f"Note added successfully for patient {patient_id}")
return {"status": "success", "message": "Note added"}
except ValueError as ve:
logger.error(f"Invalid patient ID format: {patient_id}, error: {str(ve)}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid patient ID format"
)
except Exception as e:
logger.error(f"Failed to add note for patient {patient_id}: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to add note: {str(e)}"
)
# Export the router as 'patients' for api.__init__.py
patients = router