Ali2206 commited on
Commit
76a9f83
·
verified ·
1 Parent(s): ff07255

Update api/routes/patients.py

Browse files
Files changed (1) hide show
  1. api/routes/patients.py +463 -0
api/routes/patients.py CHANGED
@@ -0,0 +1,463 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import APIRouter, HTTPException, Depends, Query, status
2
+ from ...db.mongo import patients_collection
3
+ from ...core.security import get_current_user
4
+ from ...utils.db import create_indexes
5
+ from ...utils.helpers import calculate_age, standardize_language
6
+ from ...models.entities import Note
7
+ from datetime import datetime
8
+ from bson import ObjectId
9
+ from bson.errors import InvalidId
10
+ from typing import Optional, List, Dict
11
+ from pymongo import UpdateOne
12
+ from pymongo.errors import BulkWriteError
13
+ import json
14
+ from pathlib import Path
15
+ import glob
16
+ import uuid
17
+ import re
18
+ import logging
19
+
20
+ # Configure logging
21
+ logging.basicConfig(
22
+ level=logging.INFO,
23
+ format='%(asctime)s - %(levelname)s - %(name)s - %(message)s'
24
+ )
25
+ logger = logging.getLogger(__name__)
26
+
27
+ router = APIRouter()
28
+
29
+ # Configuration
30
+ BASE_DIR = Path(__file__).resolve().parent.parent.parent
31
+ SYNTHEA_DATA_DIR = BASE_DIR / "output" / "fhir"
32
+ os.makedirs(SYNTHEA_DATA_DIR, exist_ok=True)
33
+
34
+ async def process_synthea_patient(bundle: dict, file_path: str) -> Optional[dict]:
35
+ logger.debug(f"Processing patient from file: {file_path}")
36
+ patient_data = {}
37
+ notes = []
38
+ conditions = []
39
+ medications = []
40
+ encounters = []
41
+
42
+ # Validate bundle structure
43
+ if not isinstance(bundle, dict) or 'entry' not in bundle:
44
+ logger.error(f"Invalid FHIR bundle structure in {file_path}")
45
+ return None
46
+
47
+ for entry in bundle.get('entry', []):
48
+ resource = entry.get('resource', {})
49
+ resource_type = resource.get('resourceType')
50
+
51
+ if not resource_type:
52
+ logger.warning(f"Skipping entry with missing resourceType in {file_path}")
53
+ continue
54
+
55
+ try:
56
+ if resource_type == 'Patient':
57
+ name = resource.get('name', [{}])[0]
58
+ address = resource.get('address', [{}])[0]
59
+
60
+ patient_data = {
61
+ 'fhir_id': resource.get('id'),
62
+ 'full_name': f"{' '.join(name.get('given', ['']))} {name.get('family', '')}".strip(),
63
+ 'gender': resource.get('gender', 'unknown'),
64
+ 'date_of_birth': resource.get('birthDate', ''),
65
+ 'address': ' '.join(address.get('line', [''])),
66
+ 'city': address.get('city', ''),
67
+ 'state': address.get('state', ''),
68
+ 'postal_code': address.get('postalCode', ''),
69
+ 'country': address.get('country', ''),
70
+ 'marital_status': resource.get('maritalStatus', {}).get('text', ''),
71
+ 'language': standardize_language(resource.get('communication', [{}])[0].get('language', {}).get('text', '')),
72
+ 'source': 'synthea',
73
+ 'last_updated': datetime.utcnow().isoformat()
74
+ }
75
+
76
+ elif resource_type == 'Encounter':
77
+ encounter = {
78
+ 'id': resource.get('id'),
79
+ 'type': resource.get('type', [{}])[0].get('text', ''),
80
+ 'status': resource.get('status'),
81
+ 'period': resource.get('period', {}),
82
+ 'service_provider': resource.get('serviceProvider', {}).get('display', '')
83
+ }
84
+ encounters.append(encounter)
85
+
86
+ for note in resource.get('note', []):
87
+ if note.get('text'):
88
+ notes.append({
89
+ 'date': resource.get('period', {}).get('start', datetime.utcnow().isoformat()),
90
+ 'type': resource.get('type', [{}])[0].get('text', 'Encounter Note'),
91
+ 'text': note.get('text'),
92
+ 'context': f"Encounter: {encounter.get('type')}",
93
+ 'author': 'System Generated'
94
+ })
95
+
96
+ elif resource_type == 'Condition':
97
+ conditions.append({
98
+ 'id': resource.get('id'),
99
+ 'code': resource.get('code', {}).get('text', ''),
100
+ 'status': resource.get('clinicalStatus', {}).get('text', ''),
101
+ 'onset_date': resource.get('onsetDateTime'),
102
+ 'recorded_date': resource.get('recordedDate'),
103
+ 'verification_status': resource.get('verificationStatus', {}).get('text', '')
104
+ })
105
+
106
+ elif resource_type == 'MedicationRequest':
107
+ medications.append({
108
+ 'id': resource.get('id'),
109
+ 'name': resource.get('medicationCodeableConcept', {}).get('text', ''),
110
+ 'status': resource.get('status'),
111
+ 'prescribed_date': resource.get('authoredOn'),
112
+ 'requester': resource.get('requester', {}).get('display', ''),
113
+ 'dosage': resource.get('dosageInstruction', [{}])[0].get('text', '')
114
+ })
115
+
116
+ except Exception as e:
117
+ logger.error(f"Error processing {resource_type} in {file_path}: {str(e)}")
118
+ continue
119
+
120
+ if patient_data:
121
+ patient_data.update({
122
+ 'notes': notes,
123
+ 'conditions': conditions,
124
+ 'medications': medications,
125
+ 'encounters': encounters,
126
+ 'import_date': datetime.utcnow().isoformat()
127
+ })
128
+ logger.info(f"Successfully processed patient {patient_data.get('fhir_id')} from {file_path}")
129
+ return patient_data
130
+ logger.warning(f"No valid patient data found in {file_path}")
131
+ return None
132
+
133
+ @router.post("/import", status_code=status.HTTP_201_CREATED)
134
+ async def import_patients(
135
+ limit: int = Query(100, ge=1, le=1000),
136
+ current_user: dict = Depends(get_current_user)
137
+ ):
138
+ request_id = str(uuid.uuid4())
139
+ logger.info(f"Starting import request {request_id} by user {current_user.get('email')}")
140
+ start_time = time.time()
141
+
142
+ if current_user.get('role') not in ['admin', 'doctor']:
143
+ logger.warning(f"Unauthorized import attempt by {current_user.get('email')}")
144
+ raise HTTPException(
145
+ status_code=status.HTTP_403_FORBIDDEN,
146
+ detail="Only administrators and doctors can import data"
147
+ )
148
+
149
+ try:
150
+ await create_indexes()
151
+
152
+ if not SYNTHEA_DATA_DIR.exists():
153
+ logger.error(f"Synthea data directory not found: {SYNTHEA_DATA_DIR}")
154
+ raise HTTPException(
155
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
156
+ detail="Data directory not found"
157
+ )
158
+
159
+ # Filter out non-patient files
160
+ files = [
161
+ f for f in glob.glob(str(SYNTHEA_DATA_DIR / "*.json"))
162
+ if not re.search(r'(hospitalInformation|practitionerInformation)\d+\.json$', f)
163
+ ]
164
+ if not files:
165
+ logger.warning("No valid patient JSON files found in synthea data directory")
166
+ return {
167
+ "status": "success",
168
+ "message": "No patient data files found",
169
+ "imported": 0,
170
+ "request_id": request_id
171
+ }
172
+
173
+ operations = []
174
+ imported = 0
175
+ errors = []
176
+
177
+ for file_path in files[:limit]:
178
+ try:
179
+ logger.debug(f"Processing file: {file_path}")
180
+
181
+ # Check file accessibility
182
+ if not os.path.exists(file_path):
183
+ logger.error(f"File not found: {file_path}")
184
+ errors.append(f"File not found: {file_path}")
185
+ continue
186
+
187
+ # Check file size
188
+ file_size = os.path.getsize(file_path)
189
+ if file_size == 0:
190
+ logger.warning(f"Empty file: {file_path}")
191
+ errors.append(f"Empty file: {file_path}")
192
+ continue
193
+
194
+ with open(file_path, 'r', encoding='utf-8') as f:
195
+ try:
196
+ bundle = json.load(f)
197
+ except json.JSONDecodeError as je:
198
+ logger.error(f"Invalid JSON in {file_path}: {str(je)}")
199
+ errors.append(f"Invalid JSON in {file_path}: {str(je)}")
200
+ continue
201
+
202
+ patient = await process_synthea_patient(bundle, file_path)
203
+ if patient:
204
+ if not patient.get('fhir_id'):
205
+ logger.warning(f"Missing FHIR ID in patient data from {file_path}")
206
+ errors.append(f"Missing FHIR ID in {file_path}")
207
+ continue
208
+
209
+ operations.append(UpdateOne(
210
+ {"fhir_id": patient['fhir_id']},
211
+ {"$setOnInsert": patient},
212
+ upsert=True
213
+ ))
214
+ imported += 1
215
+ else:
216
+ logger.warning(f"No valid patient data in {file_path}")
217
+ errors.append(f"No valid patient data in {file_path}")
218
+
219
+ except Exception as e:
220
+ logger.error(f"Error processing {file_path}: {str(e)}")
221
+ errors.append(f"Error in {file_path}: {str(e)}")
222
+ continue
223
+
224
+ response = {
225
+ "status": "success",
226
+ "imported": imported,
227
+ "errors": errors,
228
+ "request_id": request_id,
229
+ "duration_seconds": time.time() - start_time
230
+ }
231
+
232
+ if operations:
233
+ try:
234
+ result = await patients_collection.bulk_write(operations, ordered=False)
235
+ response.update({
236
+ "upserted": result.upserted_count,
237
+ "existing": len(operations) - result.upserted_count
238
+ })
239
+ logger.info(f"Import request {request_id} completed: {imported} patients processed, "
240
+ f"{result.upserted_count} upserted, {len(errors)} errors")
241
+ except BulkWriteError as bwe:
242
+ logger.error(f"Partial bulk write failure for request {request_id}: {str(bwe.details)}")
243
+ response.update({
244
+ "upserted": bwe.details.get('nUpserted', 0),
245
+ "existing": len(operations) - bwe.details.get('nUpserted', 0),
246
+ "write_errors": [
247
+ f"Index {err['index']}: {err['errmsg']}" for err in bwe.details.get('writeErrors', [])
248
+ ]
249
+ })
250
+ logger.info(f"Import request {request_id} partially completed: {imported} patients processed, "
251
+ f"{response['upserted']} upserted, {len(errors)} errors")
252
+ except Exception as e:
253
+ logger.error(f"Bulk write failed for request {request_id}: {str(e)}")
254
+ raise HTTPException(
255
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
256
+ detail=f"Database operation failed: {str(e)}"
257
+ )
258
+ else:
259
+ logger.info(f"Import request {request_id} completed: No new patients to import, {len(errors)} errors")
260
+ response["message"] = "No new patients found to import"
261
+
262
+ return response
263
+
264
+ except HTTPException:
265
+ raise
266
+ except Exception as e:
267
+ logger.error(f"Import request {request_id} failed: {str(e)}", exc_info=True)
268
+ raise HTTPException(
269
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
270
+ detail=f"Import failed: {str(e)}"
271
+ )
272
+
273
+ @router.get("/patients", response_model=List[dict])
274
+ async def list_patients(
275
+ search: Optional[str] = Query(None),
276
+ min_notes: int = Query(0, ge=0),
277
+ min_conditions: int = Query(0, ge=0),
278
+ limit: int = Query(100, ge=1, le=500),
279
+ skip: int = Query(0, ge=0)
280
+ ):
281
+ logger.info(f"Listing patients with search: {search}, limit: {limit}, skip: {skip}")
282
+ query = {"source": "synthea"}
283
+
284
+ if search:
285
+ query["$or"] = [
286
+ {"full_name": {"$regex": search, "$options": "i"}},
287
+ {"fhir_id": search}
288
+ ]
289
+
290
+ if min_notes > 0:
291
+ query[f"notes.{min_notes-1}"] = {"$exists": True}
292
+
293
+ if min_conditions > 0:
294
+ query[f"conditions.{min_conditions-1}"] = {"$exists": True}
295
+
296
+ # Removed $slice to return full arrays for the frontend
297
+ projection = {
298
+ "fhir_id": 1,
299
+ "full_name": 1,
300
+ "gender": 1,
301
+ "date_of_birth": 1,
302
+ "city": 1,
303
+ "state": 1,
304
+ "conditions": 1,
305
+ "medications": 1,
306
+ "encounters": 1,
307
+ "notes": 1
308
+ }
309
+
310
+ try:
311
+ cursor = patients_collection.find(query, projection).skip(skip).limit(limit)
312
+ patients = []
313
+
314
+ async for patient in cursor:
315
+ patients.append({
316
+ "id": str(patient["_id"]),
317
+ "fhir_id": patient.get("fhir_id"),
318
+ "full_name": patient.get("full_name"),
319
+ "gender": patient.get("gender"),
320
+ "date_of_birth": patient.get("date_of_birth"),
321
+ "city": patient.get("city"),
322
+ "state": patient.get("state"),
323
+ "conditions": patient.get("conditions", []),
324
+ "medications": patient.get("medications", []),
325
+ "encounters": patient.get("encounters", []),
326
+ "notes": patient.get("notes", []),
327
+ "age": calculate_age(patient.get("date_of_birth")),
328
+ "stats": {
329
+ "notes": len(patient.get("notes", [])),
330
+ "conditions": len(patient.get("conditions", [])),
331
+ "medications": len(patient.get("medications", []))
332
+ }
333
+ })
334
+
335
+ logger.info(f"Retrieved {len(patients)} patients")
336
+ return patients
337
+
338
+ except Exception as e:
339
+ logger.error(f"Failed to list patients: {str(e)}")
340
+ raise HTTPException(
341
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
342
+ detail=f"Failed to retrieve patients: {str(e)}"
343
+ )
344
+
345
+ @router.get("/patients/{patient_id}", response_model=dict)
346
+ async def get_patient(patient_id: str):
347
+ logger.info(f"Retrieving patient: {patient_id}")
348
+ try:
349
+ patient = await patients_collection.find_one({
350
+ "$or": [
351
+ {"_id": ObjectId(patient_id)},
352
+ {"fhir_id": patient_id}
353
+ ]
354
+ })
355
+
356
+ if not patient:
357
+ logger.warning(f"Patient not found: {patient_id}")
358
+ raise HTTPException(
359
+ status_code=status.HTTP_404_NOT_FOUND,
360
+ detail="Patient not found"
361
+ )
362
+
363
+ response = {
364
+ "demographics": {
365
+ "id": str(patient["_id"]),
366
+ "fhir_id": patient.get("fhir_id"),
367
+ "full_name": patient.get("full_name"),
368
+ "gender": patient.get("gender"),
369
+ "date_of_birth": patient.get("date_of_birth"),
370
+ "age": calculate_age(patient.get("date_of_birth")),
371
+ "address": {
372
+ "line": patient.get("address"),
373
+ "city": patient.get("city"),
374
+ "state": patient.get("state"),
375
+ "postal_code": patient.get("postal_code"),
376
+ "country": patient.get("country")
377
+ },
378
+ "marital_status": patient.get("marital_status"),
379
+ "language": patient.get("language")
380
+ },
381
+ "clinical_data": {
382
+ "notes": patient.get("notes", []),
383
+ "conditions": patient.get("conditions", []),
384
+ "medications": patient.get("medications", []),
385
+ "encounters": patient.get("encounters", [])
386
+ },
387
+ "metadata": {
388
+ "source": patient.get("source"),
389
+ "import_date": patient.get("import_date"),
390
+ "last_updated": patient.get("last_updated")
391
+ }
392
+ }
393
+
394
+ logger.info(f"Successfully retrieved patient: {patient_id}")
395
+ return response
396
+
397
+ except ValueError as ve:
398
+ logger.error(f"Invalid patient ID format: {patient_id}, error: {str(ve)}")
399
+ raise HTTPException(
400
+ status_code=status.HTTP_400_BAD_REQUEST,
401
+ detail="Invalid patient ID format"
402
+ )
403
+ except Exception as e:
404
+ logger.error(f"Failed to retrieve patient {patient_id}: {str(e)}")
405
+ raise HTTPException(
406
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
407
+ detail=f"Failed to retrieve patient: {str(e)}"
408
+ )
409
+
410
+ @router.post("/patients/{patient_id}/notes", status_code=status.HTTP_201_CREATED)
411
+ async def add_note(
412
+ patient_id: str,
413
+ note: Note,
414
+ current_user: dict = Depends(get_current_user)
415
+ ):
416
+ logger.info(f"Adding note for patient {patient_id} by user {current_user.get('email')}")
417
+ if current_user.get('role') not in ['doctor', 'admin']:
418
+ logger.warning(f"Unauthorized note addition attempt by {current_user.get('email')}")
419
+ raise HTTPException(
420
+ status_code=status.HTTP_403_FORBIDDEN,
421
+ detail="Only clinicians can add notes"
422
+ )
423
+
424
+ try:
425
+ note_data = note.dict()
426
+ note_data.update({
427
+ "author": current_user.get('full_name', 'System'),
428
+ "timestamp": datetime.utcnow().isoformat()
429
+ })
430
+
431
+ result = await patients_collection.update_one(
432
+ {"$or": [
433
+ {"_id": ObjectId(patient_id)},
434
+ {"fhir_id": patient_id}
435
+ ]},
436
+ {
437
+ "$push": {"notes": note_data},
438
+ "$set": {"last_updated": datetime.utcnow().isoformat()}
439
+ }
440
+ )
441
+
442
+ if result.modified_count == 0:
443
+ logger.warning(f"Patient not found for note addition: {patient_id}")
444
+ raise HTTPException(
445
+ status_code=status.HTTP_404_NOT_FOUND,
446
+ detail="Patient not found"
447
+ )
448
+
449
+ logger.info(f"Note added successfully for patient {patient_id}")
450
+ return {"status": "success", "message": "Note added"}
451
+
452
+ except ValueError as ve:
453
+ logger.error(f"Invalid patient ID format: {patient_id}, error: {str(ve)}")
454
+ raise HTTPException(
455
+ status_code=status.HTTP_400_BAD_REQUEST,
456
+ detail="Invalid patient ID format"
457
+ )
458
+ except Exception as e:
459
+ logger.error(f"Failed to add note for patient {patient_id}: {str(e)}")
460
+ raise HTTPException(
461
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
462
+ detail=f"Failed to add note: {str(e)}"
463
+ )