Ali2206 commited on
Commit
6832b60
·
verified ·
1 Parent(s): 49f3013

Update api/routes.py

Browse files
Files changed (1) hide show
  1. api/routes.py +38 -6
api/routes.py CHANGED
@@ -8,6 +8,7 @@ from bson import ObjectId
8
  from typing import Optional, List, Dict
9
  from pydantic import BaseModel, Field
10
  from pymongo import UpdateOne, InsertOne, IndexModel
 
11
  import httpx
12
  import os
13
  import json
@@ -17,6 +18,7 @@ from motor.motor_asyncio import AsyncIOMotorClient
17
  import logging
18
  import time
19
  import uuid
 
20
 
21
  # Configure logging
22
  logging.basicConfig(
@@ -54,6 +56,15 @@ class PatientUpdate(BaseModel):
54
  conditions: Optional[List[Condition]] = None
55
  medications: Optional[List[Medication]] = None
56
 
 
 
 
 
 
 
 
 
 
57
  # Indexes
58
  async def create_indexes():
59
  try:
@@ -81,6 +92,12 @@ def calculate_age(birth_date: str) -> Optional[int]:
81
  logger.warning(f"Invalid birth date format: {birth_date}, error: {str(e)}")
82
  return None
83
 
 
 
 
 
 
 
84
  async def process_synthea_patient(bundle: dict, file_path: str) -> Optional[dict]:
85
  logger.debug(f"Processing patient from file: {file_path}")
86
  patient_data = {}
@@ -118,7 +135,7 @@ async def process_synthea_patient(bundle: dict, file_path: str) -> Optional[dict
118
  'postal_code': address.get('postalCode', ''),
119
  'country': address.get('country', ''),
120
  'marital_status': resource.get('maritalStatus', {}).get('text', ''),
121
- 'language': resource.get('communication', [{}])[0].get('language', {}).get('text', ''),
122
  'source': 'synthea',
123
  'last_updated': datetime.utcnow()
124
  }
@@ -207,12 +224,16 @@ async def import_patients(
207
  detail="Data directory not found"
208
  )
209
 
210
- files = glob.glob(str(SYNTHEA_DATA_DIR / "*.json"))
 
 
 
 
211
  if not files:
212
- logger.warning("No JSON files found in synthea data directory")
213
  return {
214
  "status": "success",
215
- "message": "No data files found",
216
  "imported": 0,
217
  "request_id": request_id
218
  }
@@ -278,13 +299,24 @@ async def import_patients(
278
 
279
  if operations:
280
  try:
281
- result = await patients_collection.bulk_write(operations)
282
  response.update({
283
  "upserted": result.upserted_count,
284
  "existing": len(operations) - result.upserted_count
285
  })
286
  logger.info(f"Import request {request_id} completed: {imported} patients processed, "
287
  f"{result.upserted_count} upserted, {len(errors)} errors")
 
 
 
 
 
 
 
 
 
 
 
288
  except Exception as e:
289
  logger.error(f"Bulk write failed for request {request_id}: {str(e)}")
290
  raise HTTPException(
@@ -450,7 +482,7 @@ async def add_note(
450
  note_data = note.dict()
451
  note_data.update({
452
  "author": current_user.get('full_name', 'System'),
453
- "timestamp": datetime.utcnow()
454
  })
455
 
456
  result = await patients_collection.update_one(
 
8
  from typing import Optional, List, Dict
9
  from pydantic import BaseModel, Field
10
  from pymongo import UpdateOne, InsertOne, IndexModel
11
+ from pymongo.errors import BulkWriteError
12
  import httpx
13
  import os
14
  import json
 
18
  import logging
19
  import time
20
  import uuid
21
+ import re
22
 
23
  # Configure logging
24
  logging.basicConfig(
 
56
  conditions: Optional[List[Condition]] = None
57
  medications: Optional[List[Medication]] = None
58
 
59
+ # Language mapping for MongoDB compatibility
60
+ LANGUAGE_MAP = {
61
+ 'English (United States)': 'en',
62
+ 'English': 'en',
63
+ 'Spanish (United States)': 'es',
64
+ 'Spanish': 'es',
65
+ # Add more mappings as needed
66
+ }
67
+
68
  # Indexes
69
  async def create_indexes():
70
  try:
 
92
  logger.warning(f"Invalid birth date format: {birth_date}, error: {str(e)}")
93
  return None
94
 
95
+ def standardize_language(language: str) -> str:
96
+ """Convert language to MongoDB-compatible language code."""
97
+ if not language:
98
+ return 'en' # Default to English
99
+ return LANGUAGE_MAP.get(language, 'en')
100
+
101
  async def process_synthea_patient(bundle: dict, file_path: str) -> Optional[dict]:
102
  logger.debug(f"Processing patient from file: {file_path}")
103
  patient_data = {}
 
135
  'postal_code': address.get('postalCode', ''),
136
  'country': address.get('country', ''),
137
  'marital_status': resource.get('maritalStatus', {}).get('text', ''),
138
+ 'language': standardize_language(resource.get('communication', [{}])[0].get('language', {}).get('text', '')),
139
  'source': 'synthea',
140
  'last_updated': datetime.utcnow()
141
  }
 
224
  detail="Data directory not found"
225
  )
226
 
227
+ # Filter out non-patient files
228
+ files = [
229
+ f for f in glob.glob(str(SYNTHEA_DATA_DIR / "*.json"))
230
+ if not re.search(r'(hospitalInformation|practitionerInformation)\d+\.json$', f)
231
+ ]
232
  if not files:
233
+ logger.warning("No valid patient JSON files found in synthea data directory")
234
  return {
235
  "status": "success",
236
+ "message": "No patient data files found",
237
  "imported": 0,
238
  "request_id": request_id
239
  }
 
299
 
300
  if operations:
301
  try:
302
+ result = await patients_collection.bulk_write(operations, ordered=False)
303
  response.update({
304
  "upserted": result.upserted_count,
305
  "existing": len(operations) - result.upserted_count
306
  })
307
  logger.info(f"Import request {request_id} completed: {imported} patients processed, "
308
  f"{result.upserted_count} upserted, {len(errors)} errors")
309
+ except BulkWriteError as bwe:
310
+ logger.error(f"Partial bulk write failure for request {request_id}: {str(bwe.details)}")
311
+ response.update({
312
+ "upserted": bwe.details.get('nUpserted', 0),
313
+ "existing": len(operations) - bwe.details.get('nUpserted', 0),
314
+ "write_errors": [
315
+ f"Index {err['index']}: {err['errmsg']}" for err in bwe.details.get('writeErrors', [])
316
+ ]
317
+ })
318
+ logger.info(f"Import request {request_id} partially completed: {imported} patients processed, "
319
+ f"{response['upserted']} upserted, {len(errors)} errors")
320
  except Exception as e:
321
  logger.error(f"Bulk write failed for request {request_id}: {str(e)}")
322
  raise HTTPException(
 
482
  note_data = note.dict()
483
  note_data.update({
484
  "author": current_user.get('full_name', 'System'),
485
+ туристика datetime.utcnow()
486
  })
487
 
488
  result = await patients_collection.update_one(