| | |
| | """ |
| | Shared Medical Extraction Utilities |
| | Centralized medical entity extraction logic to ensure consistency across all processors |
| | """ |
| |
|
| | import re |
| | from typing import Dict, Any, List |
| | import json |
| |
|
| | class MedicalExtractor: |
| | """Centralized medical entity extraction with consistent patterns""" |
| | |
| | def __init__(self): |
| | |
| | self.conditions_patterns = [ |
| | "hypertension", "diabetes", "diabetes mellitus", "type 2 diabetes", "type 1 diabetes", |
| | "pneumonia", "asthma", "copd", "chronic obstructive pulmonary disease", |
| | "depression", "anxiety", "arthritis", "rheumatoid arthritis", "osteoarthritis", |
| | "cancer", "stroke", "heart disease", "coronary artery disease", "myocardial infarction", |
| | "kidney disease", "chronic kidney disease", "liver disease", "hepatitis", |
| | "chest pain", "acute coronary syndrome", "angina", "atrial fibrillation", |
| | "congestive heart failure", "heart failure", "cardiomyopathy", |
| | "hyperlipidemia", "high cholesterol", "obesity", "metabolic syndrome" |
| | ] |
| | |
| | |
| | self.medication_patterns = [ |
| | r"([a-zA-Z]+(?:pril|sartan|olol|pine|statin|formin|cillin))\s+(\d+(?:\.\d+)?)\s*(mg|g|ml|units?)\s+(daily|twice daily|bid|tid|qid|once daily)", |
| | r"(aspirin|lisinopril|atorvastatin|metformin|insulin|warfarin|prednisone|omeprazole)\s+(\d+(?:\.\d+)?)\s*(mg|g|ml|units?)", |
| | r"([a-zA-Z]+)\s+(\d+(?:\.\d+)?)\s*(mg|g|ml|units?)\s+(daily|twice daily|bid|tid|qid)" |
| | ] |
| | |
| | |
| | self.vital_patterns = [ |
| | (r"bp:?\s*(\d{2,3}/\d{2,3})", "Blood Pressure"), |
| | (r"blood pressure:?\s*(\d{2,3}/\d{2,3})", "Blood Pressure"), |
| | (r"hr:?\s*(\d{2,3})", "Heart Rate"), |
| | (r"heart rate:?\s*(\d{2,3})", "Heart Rate"), |
| | (r"temp:?\s*(\d{2,3}(?:\.\d)?)", "Temperature"), |
| | (r"temperature:?\s*(\d{2,3}(?:\.\d)?)", "Temperature"), |
| | (r"o2 sat:?\s*(\d{2,3}%)", "O2 Saturation"), |
| | (r"oxygen saturation:?\s*(\d{2,3}%)", "O2 Saturation") |
| | ] |
| | |
| | |
| | self.procedures_keywords = [ |
| | "ecg", "ekg", "electrocardiogram", "x-ray", "ct scan", "mri", "ultrasound", |
| | "blood test", "lab work", "biopsy", "endoscopy", "colonoscopy", |
| | "surgery", "operation", "procedure", "catheterization", "angiography" |
| | ] |
| | |
| | def extract_all_entities(self, text: str, processing_mode: str = "standard") -> Dict[str, Any]: |
| | """ |
| | Extract all medical entities from text using consistent patterns |
| | |
| | Args: |
| | text: Medical text to analyze |
| | processing_mode: Processing mode for confidence scoring |
| | |
| | Returns: |
| | Dictionary with all extracted entities |
| | """ |
| | return { |
| | "patient_info": self.extract_patient_info(text), |
| | "date_of_birth": self.extract_date_of_birth(text), |
| | "conditions": self.extract_conditions(text), |
| | "medications": self.extract_medications(text), |
| | "vitals": self.extract_vitals(text), |
| | "procedures": self.extract_procedures(text), |
| | "confidence_score": self.calculate_confidence_score(text, processing_mode), |
| | "extraction_quality": self.assess_extraction_quality(text), |
| | "processing_mode": processing_mode |
| | } |
| | |
| | def extract_patient_info(self, text: str) -> str: |
| | """Extract patient information with consistent patterns""" |
| | text_lower = text.lower() |
| | |
| | |
| | patterns = [ |
| | r"patient:\s*([^\n\r,]+)", |
| | r"name:\s*([^\n\r,]+)", |
| | r"pt\.?\s*([^\n\r,]+)", |
| | r"mr\.?\s*([^\n\r,]+)", |
| | r"patient name:\s*([^\n\r,]+)" |
| | ] |
| | |
| | for pattern in patterns: |
| | match = re.search(pattern, text_lower) |
| | if match: |
| | name = match.group(1).strip().title() |
| | |
| | if (len(name) > 2 and |
| | not any(word in name.lower() for word in ['unknown', 'patient', 'test', 'sample']) and |
| | re.match(r'^[a-zA-Z\s]+$', name)): |
| | return name |
| | |
| | return "Unknown Patient" |
| | |
| | def extract_date_of_birth(self, text: str) -> str: |
| | """Extract date of birth with multiple formats""" |
| | text_lower = text.lower() |
| | |
| | |
| | dob_patterns = [ |
| | r"dob:?\s*([^\n\r]+)", |
| | r"date of birth:?\s*([^\n\r]+)", |
| | r"born:?\s*([^\n\r]+)", |
| | r"birth date:?\s*([^\n\r]+)" |
| | ] |
| | |
| | for pattern in dob_patterns: |
| | match = re.search(pattern, text_lower) |
| | if match: |
| | dob = match.group(1).strip() |
| | |
| | if re.match(r'\d{1,2}[/-]\d{1,2}[/-]\d{4}|\d{4}[/-]\d{1,2}[/-]\d{1,2}|[a-zA-Z]+ \d{1,2}, \d{4}', dob): |
| | return dob |
| | |
| | return "Not specified" |
| | |
| | def extract_conditions(self, text: str) -> List[str]: |
| | """Extract medical conditions with context""" |
| | text_lower = text.lower() |
| | found_conditions = [] |
| | |
| | for condition in self.conditions_patterns: |
| | if condition in text_lower: |
| | |
| | condition_pattern = rf"([^\n\r]*{re.escape(condition)}[^\n\r]*)" |
| | context_match = re.search(condition_pattern, text_lower) |
| | if context_match: |
| | context = context_match.group(1).strip().title() |
| | if context not in found_conditions and len(context) > len(condition): |
| | found_conditions.append(context) |
| | elif condition.title() not in found_conditions: |
| | found_conditions.append(condition.title()) |
| | |
| | return found_conditions[:5] |
| | |
| | def extract_medications(self, text: str) -> List[str]: |
| | """Extract medications with dosages using consistent patterns""" |
| | medications = [] |
| | |
| | for pattern in self.medication_patterns: |
| | matches = re.finditer(pattern, text, re.IGNORECASE) |
| | for match in matches: |
| | if len(match.groups()) >= 3: |
| | med_name = match.group(1).title() |
| | dose = match.group(2) |
| | unit = match.group(3).lower() |
| | frequency = match.group(4) if len(match.groups()) >= 4 else "" |
| | |
| | full_med = f"{med_name} {dose}{unit} {frequency}".strip() |
| | if full_med not in medications: |
| | medications.append(full_med) |
| | |
| | return medications[:5] |
| | |
| | def extract_vitals(self, text: str) -> List[str]: |
| | """Extract vital signs with consistent formatting""" |
| | vitals = [] |
| | |
| | for pattern, vital_type in self.vital_patterns: |
| | matches = re.finditer(pattern, text, re.IGNORECASE) |
| | for match in matches: |
| | vital_value = match.group(1) |
| | |
| | if vital_type == "Blood Pressure": |
| | vitals.append(f"Blood Pressure: {vital_value}") |
| | elif vital_type == "Heart Rate": |
| | vitals.append(f"Heart Rate: {vital_value} bpm") |
| | elif vital_type == "Temperature": |
| | vitals.append(f"Temperature: {vital_value}°F") |
| | elif vital_type == "O2 Saturation": |
| | vitals.append(f"O2 Saturation: {vital_value}") |
| | |
| | return vitals[:4] |
| | |
| | def extract_procedures(self, text: str) -> List[str]: |
| | """Extract procedures with consistent naming""" |
| | procedures = [] |
| | text_lower = text.lower() |
| | |
| | for procedure in self.procedures_keywords: |
| | if procedure in text_lower: |
| | procedures.append(procedure.title()) |
| | |
| | return procedures[:3] |
| | |
| | def calculate_confidence_score(self, text: str, processing_mode: str) -> float: |
| | """Calculate confidence score based on text quality and processing mode""" |
| | base_confidence = { |
| | "rule_based": 0.75, |
| | "ollama": 0.85, |
| | "modal": 0.94, |
| | "huggingface": 0.88, |
| | "standard": 0.80 |
| | } |
| | |
| | confidence = base_confidence.get(processing_mode, 0.80) |
| | |
| | |
| | if len(text) > 500: |
| | confidence += 0.05 |
| | if len(text) > 1000: |
| | confidence += 0.05 |
| | |
| | |
| | medical_keywords = ["patient", "diagnosis", "medication", "treatment", "clinical"] |
| | keyword_count = sum(1 for keyword in medical_keywords if keyword.lower() in text.lower()) |
| | confidence += keyword_count * 0.02 |
| | |
| | return min(0.98, confidence) |
| | |
| | def assess_extraction_quality(self, text: str) -> Dict[str, Any]: |
| | """Assess the quality of extraction based on text content""" |
| | |
| | patient = self.extract_patient_info(text) |
| | dob = self.extract_date_of_birth(text) |
| | conditions = self.extract_conditions(text) |
| | medications = self.extract_medications(text) |
| | vitals = self.extract_vitals(text) |
| | procedures = self.extract_procedures(text) |
| | |
| | return { |
| | "patient_identified": patient != "Unknown Patient", |
| | "dob_found": dob != "Not specified", |
| | "conditions_count": len(conditions), |
| | "medications_count": len(medications), |
| | "vitals_count": len(vitals), |
| | "procedures_count": len(procedures), |
| | "total_entities": len(conditions) + len(medications) + len(vitals) + len(procedures), |
| | "detailed_medications": sum(1 for med in medications if any(unit in med.lower() for unit in ['mg', 'g', 'ml'])), |
| | "has_vital_signs": len(vitals) > 0, |
| | "comprehensive_analysis": len(conditions) > 0 and len(medications) > 0 |
| | } |
| | |
| | def count_entities(self, extracted_data: Dict[str, Any]) -> int: |
| | """Count total entities consistently across the system""" |
| | return (len(extracted_data.get("conditions", [])) + |
| | len(extracted_data.get("medications", [])) + |
| | len(extracted_data.get("vitals", [])) + |
| | len(extracted_data.get("procedures", []))) |
| | |
| | def format_for_pydantic(self, extracted_data: Dict[str, Any]) -> Dict[str, Any]: |
| | """Format extracted data for Pydantic model compatibility""" |
| | return { |
| | "patient": extracted_data.get("patient_info", "Unknown Patient"), |
| | "date_of_birth": extracted_data.get("date_of_birth", "Not specified"), |
| | "conditions": extracted_data.get("conditions", []), |
| | "medications": extracted_data.get("medications", []), |
| | "vitals": extracted_data.get("vitals", []), |
| | "procedures": extracted_data.get("procedures", []), |
| | "confidence_score": extracted_data.get("confidence_score", 0.80), |
| | "extraction_quality": extracted_data.get("extraction_quality", {}), |
| | "_processing_metadata": { |
| | "mode": extracted_data.get("processing_mode", "standard"), |
| | "total_entities": self.count_entities(extracted_data), |
| | "extraction_timestamp": "2025-06-06T12:00:00Z" |
| | } |
| | } |
| |
|
| | |
| | medical_extractor = MedicalExtractor() |
| |
|
| | |
| | def extract_medical_entities(text: str, processing_mode: str = "standard") -> Dict[str, Any]: |
| | """Extract medical entities using the shared extractor""" |
| | return medical_extractor.extract_all_entities(text, processing_mode) |
| |
|
| | def count_entities(extracted_data: Dict[str, Any]) -> int: |
| | """Count entities using the shared method""" |
| | return medical_extractor.count_entities(extracted_data) |
| |
|
| | def format_for_pydantic(extracted_data: Dict[str, Any]) -> Dict[str, Any]: |
| | """Format for Pydantic using the shared method""" |
| | return medical_extractor.format_for_pydantic(extracted_data) |
| |
|
| | def calculate_quality_score(extracted_data: Dict[str, Any]) -> float: |
| | """Calculate quality score based on entity richness""" |
| | entity_count = count_entities(extracted_data) |
| | patient_found = bool(extracted_data.get("patient_info") and |
| | extracted_data.get("patient_info") != "Unknown Patient") |
| | |
| | base_score = 0.7 |
| | entity_bonus = min(0.25, entity_count * 0.04) |
| | patient_bonus = 0.05 if patient_found else 0 |
| | |
| | return min(0.98, base_score + entity_bonus + patient_bonus) |
| |
|
| | |
| | __all__ = [ |
| | "MedicalExtractor", |
| | "medical_extractor", |
| | "extract_medical_entities", |
| | "count_entities", |
| | "format_for_pydantic", |
| | "calculate_quality_score" |
| | ] |