| import json |
| from datetime import datetime |
| from typing import List, Dict, Optional, Union |
|
|
| class PatientDataExtractor: |
| """Class to extract all fields from a FHIR Patient resource in a Bundle response.""" |
| |
| def __init__(self, patient_data: str): |
| """Initialize with patient data in JSON string format.""" |
| self.data = json.loads(patient_data) if isinstance(patient_data, str) else patient_data |
| self.patients = self._extract_patients() |
| self.current_patient_idx = 0 |
| |
| def _extract_patients(self) -> List[Dict]: |
| """Extract all patient entries from the Bundle.""" |
| if self.data.get("resourceType") != "Bundle" or "entry" not in self.data: |
| raise ValueError("Invalid FHIR Bundle format") |
| return [entry["resource"] for entry in self.data["entry"] if entry["resource"]["resourceType"] == "Patient"] |
|
|
| def set_patient_by_index(self, index: int) -> bool: |
| """Set the current patient by index. Returns True if successful.""" |
| if 0 <= index < len(self.patients): |
| self.current_patient_idx = index |
| return True |
| return False |
|
|
| def set_patient_by_id(self, patient_id: str) -> bool: |
| """Set the current patient by FHIR Patient ID. Returns True if successful.""" |
| for i, patient in enumerate(self.patients): |
| if patient["id"] == patient_id: |
| self.current_patient_idx = i |
| return True |
| return False |
|
|
| def _get_current_patient(self) -> Dict: |
| """Get the currently selected patient resource.""" |
| return self.patients[self.current_patient_idx] |
|
|
| |
| def get_id(self) -> str: |
| """Extract FHIR Patient ID.""" |
| return self._get_current_patient().get("id", "") |
|
|
| def get_resource_type(self) -> str: |
| """Extract resource type (should always be 'Patient').""" |
| return self._get_current_patient().get("resourceType", "") |
|
|
| def get_meta_last_updated(self) -> str: |
| """Extract last updated timestamp from meta.""" |
| return self._get_current_patient().get("meta", {}).get("lastUpdated", "") |
|
|
| def get_meta_profile(self) -> List[str]: |
| """Extract profile URIs from meta.""" |
| return self._get_current_patient().get("meta", {}).get("profile", []) |
|
|
| def get_text_div(self) -> str: |
| """Extract generated text narrative (div content).""" |
| return self._get_current_patient().get("text", {}).get("div", "") |
|
|
| |
| def get_first_name(self) -> str: |
| """Extract patient's first name.""" |
| patient = self._get_current_patient() |
| names = patient.get("name", []) |
| for name in names: |
| if name.get("use") == "official" and "given" in name: |
| return name["given"][0] |
| return "" |
|
|
| def get_last_name(self) -> str: |
| """Extract patient's last name.""" |
| patient = self._get_current_patient() |
| names = patient.get("name", []) |
| for name in names: |
| if name.get("use") == "official" and "family" in name: |
| return name["family"] |
| return "" |
|
|
| def get_middle_initial(self) -> str: |
| """Extract patient's middle initial (second given name initial if present).""" |
| patient = self._get_current_patient() |
| names = patient.get("name", []) |
| for name in names: |
| if name.get("use") == "official" and "given" in name and len(name["given"]) > 1: |
| return name["given"][1][0] |
| return "" |
|
|
| def get_name_prefix(self) -> str: |
| """Extract patient's name prefix (e.g., Mr., Mrs.).""" |
| patient = self._get_current_patient() |
| names = patient.get("name", []) |
| for name in names: |
| if name.get("use") == "official" and "prefix" in name: |
| return name["prefix"][0] |
| return "" |
|
|
| def get_maiden_name(self) -> str: |
| """Extract patient's maiden name if available.""" |
| patient = self._get_current_patient() |
| names = patient.get("name", []) |
| for name in names: |
| if name.get("use") == "maiden" and "family" in name: |
| return name["family"] |
| return "" |
|
|
| |
| def get_dob(self) -> str: |
| """Extract patient's date of birth.""" |
| return self._get_current_patient().get("birthDate", "") |
|
|
| def get_age(self) -> str: |
| """Calculate patient's age based on birth date.""" |
| dob = self.get_dob() |
| if not dob: |
| return "" |
| birth_date = datetime.strptime(dob, "%Y-%m-%d") |
| today = datetime.now() |
| age = today.year - birth_date.year - ((today.month, today.day) < (birth_date.month, birth_date.day)) |
| return str(age) |
|
|
| def get_gender(self) -> str: |
| """Extract patient's gender.""" |
| return self._get_current_patient().get("gender", "").capitalize() |
|
|
| def get_birth_sex(self) -> str: |
| """Extract patient's birth sex from extensions.""" |
| patient = self._get_current_patient() |
| for ext in patient.get("extension", []): |
| if ext.get("url") == "http://hl7.org/fhir/us/core/StructureDefinition/us-core-birthsex": |
| return ext.get("valueCode", "") |
| return "" |
|
|
| def get_multiple_birth(self) -> Union[bool, None]: |
| """Extract multiple birth status.""" |
| return self._get_current_patient().get("multipleBirthBoolean", None) |
|
|
| |
| def get_address_line(self) -> str: |
| """Extract patient's street address.""" |
| patient = self._get_current_patient() |
| addresses = patient.get("address", []) |
| return addresses[0]["line"][0] if addresses and "line" in addresses[0] else "" |
|
|
| def get_city(self) -> str: |
| """Extract patient's city.""" |
| patient = self._get_current_patient() |
| addresses = patient.get("address", []) |
| return addresses[0]["city"] if addresses and "city" in addresses[0] else "" |
|
|
| def get_state(self) -> str: |
| """Extract patient's state.""" |
| patient = self._get_current_patient() |
| addresses = patient.get("address", []) |
| return addresses[0]["state"] if addresses and "state" in addresses[0] else "" |
|
|
| def get_zip_code(self) -> str: |
| """Extract patient's postal code.""" |
| patient = self._get_current_patient() |
| addresses = patient.get("address", []) |
| return addresses[0]["postalCode"] if addresses and "postalCode" in addresses[0] else "" |
|
|
| def get_country(self) -> str: |
| """Extract patient's country.""" |
| patient = self._get_current_patient() |
| addresses = patient.get("address", []) |
| return addresses[0]["country"] if addresses and "country" in addresses[0] else "" |
|
|
| def get_geolocation(self) -> Dict[str, float]: |
| """Extract geolocation (latitude and longitude) from address extension.""" |
| patient = self._get_current_patient() |
| addresses = patient.get("address", []) |
| if not addresses: |
| return {"latitude": None, "longitude": None} |
| for ext in addresses[0].get("extension", []): |
| if ext.get("url") == "http://hl7.org/fhir/StructureDefinition/geolocation": |
| geo = {} |
| for sub_ext in ext.get("extension", []): |
| if sub_ext.get("url") == "latitude": |
| geo["latitude"] = sub_ext.get("valueDecimal") |
| elif sub_ext.get("url") == "longitude": |
| geo["longitude"] = sub_ext.get("valueDecimal") |
| return geo |
| return {"latitude": None, "longitude": None} |
|
|
| |
| def get_phone(self) -> str: |
| """Extract patient's phone number.""" |
| patient = self._get_current_patient() |
| telecoms = patient.get("telecom", []) |
| for telecom in telecoms: |
| if telecom.get("system") == "phone" and telecom.get("use") == "home": |
| return telecom.get("value", "") |
| return "" |
|
|
| |
| def get_identifiers(self) -> Dict[str, str]: |
| """Extract all identifiers (e.g., SSN, MRN, Driver's License).""" |
| patient = self._get_current_patient() |
| identifiers = patient.get("identifier", []) |
| id_dict = {} |
| for id_entry in identifiers: |
| id_type = id_entry.get("type", {}).get("text", "Unknown") |
| id_dict[id_type] = id_entry.get("value", "") |
| return id_dict |
|
|
| |
| def get_race(self) -> str: |
| """Extract patient's race from extensions.""" |
| patient = self._get_current_patient() |
| for ext in patient.get("extension", []): |
| if ext.get("url") == "http://hl7.org/fhir/us/core/StructureDefinition/us-core-race": |
| for sub_ext in ext.get("extension", []): |
| if sub_ext.get("url") == "text": |
| return sub_ext.get("valueString", "") |
| return "" |
|
|
| def get_ethnicity(self) -> str: |
| """Extract patient's ethnicity from extensions.""" |
| patient = self._get_current_patient() |
| for ext in patient.get("extension", []): |
| if ext.get("url") == "http://hl7.org/fhir/us/core/StructureDefinition/us-core-ethnicity": |
| for sub_ext in ext.get("extension", []): |
| if sub_ext.get("url") == "text": |
| return sub_ext.get("valueString", "") |
| return "" |
|
|
| def get_mothers_maiden_name(self) -> str: |
| """Extract patient's mother's maiden name from extensions.""" |
| patient = self._get_current_patient() |
| for ext in patient.get("extension", []): |
| if ext.get("url") == "http://hl7.org/fhir/StructureDefinition/patient-mothersMaidenName": |
| return ext.get("valueString", "") |
| return "" |
|
|
| def get_birth_place(self) -> Dict[str, str]: |
| """Extract patient's birth place from extensions.""" |
| patient = self._get_current_patient() |
| for ext in patient.get("extension", []): |
| if ext.get("url") == "http://hl7.org/fhir/StructureDefinition/patient-birthPlace": |
| addr = ext.get("valueAddress", {}) |
| return { |
| "city": addr.get("city", ""), |
| "state": addr.get("state", ""), |
| "country": addr.get("country", "") |
| } |
| return {"city": "", "state": "", "country": ""} |
|
|
| def get_disability_adjusted_life_years(self) -> Optional[float]: |
| """Extract disability-adjusted life years from extensions.""" |
| patient = self._get_current_patient() |
| for ext in patient.get("extension", []): |
| if ext.get("url") == "http://synthetichealth.github.io/synthea/disability-adjusted-life-years": |
| return ext.get("valueDecimal") |
| return None |
|
|
| def get_quality_adjusted_life_years(self) -> Optional[float]: |
| """Extract quality-adjusted life years from extensions.""" |
| patient = self._get_current_patient() |
| for ext in patient.get("extension", []): |
| if ext.get("url") == "http://synthetichealth.github.io/synthea/quality-adjusted-life-years": |
| return ext.get("valueDecimal") |
| return None |
|
|
| |
| def get_marital_status(self) -> str: |
| """Extract patient's marital status.""" |
| patient = self._get_current_patient() |
| status = patient.get("maritalStatus", {}).get("text", "") |
| return status if status else patient.get("maritalStatus", {}).get("coding", [{}])[0].get("display", "") |
|
|
| |
| def get_language(self) -> str: |
| """Extract patient's preferred language.""" |
| patient = self._get_current_patient() |
| comms = patient.get("communication", []) |
| return comms[0]["language"]["text"] if comms and "language" in comms[0] else "" |
|
|
| |
| def get_all_patient_data(self) -> Dict[str, Union[str, Dict, List, float, bool, None]]: |
| """Extract all available data for the current patient.""" |
| return { |
| "id": self.get_id(), |
| "resource_type": self.get_resource_type(), |
| "meta_last_updated": self.get_meta_last_updated(), |
| "meta_profile": self.get_meta_profile(), |
| "text_div": self.get_text_div(), |
| "first_name": self.get_first_name(), |
| "last_name": self.get_last_name(), |
| "middle_initial": self.get_middle_initial(), |
| "name_prefix": self.get_name_prefix(), |
| "maiden_name": self.get_maiden_name(), |
| "dob": self.get_dob(), |
| "age": self.get_age(), |
| "gender": self.get_gender(), |
| "birth_sex": self.get_birth_sex(), |
| "multiple_birth": self.get_multiple_birth(), |
| "address_line": self.get_address_line(), |
| "city": self.get_city(), |
| "state": self.get_state(), |
| "zip_code": self.get_zip_code(), |
| "country": self.get_country(), |
| "geolocation": self.get_geolocation(), |
| "phone": self.get_phone(), |
| "identifiers": self.get_identifiers(), |
| "race": self.get_race(), |
| "ethnicity": self.get_ethnicity(), |
| "mothers_maiden_name": self.get_mothers_maiden_name(), |
| "birth_place": self.get_birth_place(), |
| "disability_adjusted_life_years": self.get_disability_adjusted_life_years(), |
| "quality_adjusted_life_years": self.get_quality_adjusted_life_years(), |
| "marital_status": self.get_marital_status(), |
| "language": self.get_language() |
| } |
|
|
| def get_patient_dict(self) -> Dict[str, str]: |
| """Return a dictionary of patient data mapped to discharge form fields (for app.py compatibility).""" |
| patient_data = self.get_all_patient_data() |
| return { |
| "first_name": patient_data["first_name"], |
| "last_name": patient_data["last_name"], |
| "middle_initial": patient_data["middle_initial"], |
| "dob": patient_data["dob"], |
| "age": patient_data["age"], |
| "sex": patient_data["gender"], |
| "address": patient_data["address_line"], |
| "city": patient_data["city"], |
| "state": patient_data["state"], |
| "zip_code": patient_data["zip_code"], |
| "doctor_first_name": "", |
| "doctor_last_name": "", |
| "doctor_middle_initial": "", |
| "hospital_name": "", |
| "doctor_address": "", |
| "doctor_city": "", |
| "doctor_state": "", |
| "doctor_zip": "", |
| "admission_date": "", |
| "referral_source": "", |
| "admission_method": "", |
| "discharge_date": "", |
| "discharge_reason": "", |
| "date_of_death": "", |
| "diagnosis": "", |
| "procedures": "", |
| "medications": "", |
| "preparer_name": "", |
| "preparer_job_title": "" |
| } |
|
|
| def get_all_patients(self) -> List[Dict[str, str]]: |
| """Return a list of dictionaries for all patients (for app.py).""" |
| original_idx = self.current_patient_idx |
| all_patients = [] |
| for i in range(len(self.patients)): |
| self.set_patient_by_index(i) |
| all_patients.append(self.get_patient_dict()) |
| self.set_patient_by_index(original_idx) |
| return all_patients |
|
|
| def get_patient_ids(self) -> List[str]: |
| """Return a list of all patient IDs in the Bundle.""" |
| return [patient["id"] for patient in self.patients] |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
|
|
| |
|
|
| |
| |
| |
| |