Spaces:
Sleeping
Sleeping
MiniMax Agent
Fix #5 & #6: Add async synthesize() method + Fix ModelLoader args (1483+908 lines)
97b0b74
| """ | |
| Enhanced Analysis Synthesizer with Research-Based Clinical Insights | |
| Synthesizes model outputs using research-optimized clinical reasoning frameworks | |
| Generates meaningful clinical analysis across all medical domains | |
| """ | |
| import logging | |
| import json | |
| import re | |
| from typing import Dict, List, Any, Optional, Union | |
| from datetime import datetime | |
| import numpy as np | |
| logger = logging.getLogger(__name__) | |
| class EnhancedAnalysisSynthesizer: | |
| """ | |
| Enhanced Analysis Synthesizer with Research-Based Clinical Intelligence | |
| Provides meaningful clinical insights across all medical specialties | |
| Based on comprehensive model research findings | |
| """ | |
| def __init__(self): | |
| self.clinical_frameworks = self._initialize_clinical_frameworks() | |
| self.risk_stratification = self._initialize_risk_stratification() | |
| self.clinical_correlation = self._initialize_clinical_correlation() | |
| logger.info("Enhanced Analysis Synthesizer initialized with research-based clinical frameworks") | |
| def _initialize_clinical_frameworks(self) -> Dict[str, Dict[str, Any]]: | |
| """ | |
| Initialize research-based clinical reasoning frameworks | |
| Registry of available domain frameworks - actual implementation in _apply_*_framework methods | |
| """ | |
| return { | |
| "cardiology": { | |
| "components": ["rhythm_analysis", "ischemia_assessment", "conduction_analysis", "risk_stratification"] | |
| }, | |
| "radiology": { | |
| "components": ["pathological_findings", "differential_diagnosis", "clinical_correlation", "urgency_assessment"] | |
| }, | |
| "laboratory": { | |
| "components": ["abnormality_interpretation", "clinical_significance", "trend_analysis", "follow_up_recommendations"] | |
| }, | |
| "pathology": { | |
| "components": ["diagnostic_classification", "prognostic_assessment", "treatment_implications", "quality_assurance"] | |
| }, | |
| "clinical_notes": { | |
| "components": ["clinical_reasoning", "treatment_planning", "quality_indicators", "documentation_analysis"] | |
| }, | |
| "diagnosis": { | |
| "components": ["differential_diagnosis", "clinical_reasoning", "urgency_classification", "management_planning"] | |
| }, | |
| "emergency_medicine": { | |
| "components": ["triage_assessment", "critical_findings", "immediate_interventions", "disposition_planning"] | |
| } | |
| } | |
| def _initialize_risk_stratification(self) -> Dict[str, Any]: | |
| """ | |
| Initialize research-based risk stratification models | |
| """ | |
| return { | |
| "cardiovascular_risk": { | |
| "low": {"criteria": ["normal_ecg", "young_age", "no_risk_factors"], "management": "routine_follow_up"}, | |
| "moderate": {"criteria": ["minor_st_changes", "mild_hypertension", "some_risk_factors"], "management": "close_monitoring"}, | |
| "high": {"criteria": ["significant_st_changes", "known_cad", "multiple_risk_factors"], "management": "urgent_evaluation"} | |
| }, | |
| "radiological_urgency": { | |
| "routine": {"criteria": ["stable_findings", "chronic_changes"], "timeline": "routine_follow_up"}, | |
| "urgent": {"criteria": ["progressive_changes", "concerning_features"], "timeline": "24-48_hours"}, | |
| "stat": {"criteria": ["acute_emergency", "life_threatening"], "timeline": "immediate"} | |
| }, | |
| "laboratory_urgency": { | |
| "routine": {"criteria": ["mild_abnormalities", "stable_values"], "timeline": "routine_follow_up"}, | |
| "urgent": {"criteria": ["significant_abnormalities", "trend_changes"], "timeline": "same_day"}, | |
| "stat": {"criteria": ["critical_values", "life_threatening"], "timeline": "immediate"} | |
| } | |
| } | |
| def _initialize_clinical_correlation(self) -> Dict[str, Any]: | |
| """ | |
| Initialize clinical correlation frameworks | |
| Registry of available correlation methods - actual implementation in dedicated methods | |
| """ | |
| return { | |
| "interdisciplinary_integration": "available", | |
| "evidence_based_reasoning": "available", | |
| "clinical_context_analysis": "available", | |
| "management_coordination": "available" | |
| } | |
| async def synthesize( | |
| self, | |
| classification: Dict[str, Any], | |
| specialized_results: List[Dict[str, Any]], | |
| pdf_content: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """ | |
| Main synthesis method called by the application (async wrapper) | |
| Parameters match the main application's expected signature: | |
| - classification: Document classification results | |
| - specialized_results: Model analysis results (called model_results internally) | |
| - pdf_content: Extracted PDF content | |
| Returns comprehensive clinical analysis | |
| """ | |
| # Call the internal research-optimized method with correct parameter order | |
| return self.synthesize_research_optimized_analysis( | |
| model_results=specialized_results, # Renamed parameter | |
| classification=classification, | |
| pdf_content=pdf_content | |
| ) | |
| def synthesize_research_optimized_analysis( | |
| self, | |
| model_results: List[Dict[str, Any]], | |
| classification: Dict[str, Any], | |
| pdf_content: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """ | |
| Synthesize comprehensive clinical analysis using research-based frameworks | |
| """ | |
| try: | |
| logger.info("Starting research-optimized clinical synthesis") | |
| # Apply domain-specific clinical frameworks | |
| synthesized_analysis = self._apply_clinical_frameworks(model_results, classification) | |
| # Integrate findings across medical domains | |
| integrated_findings = self._integrate_interdisciplinary_findings(synthesized_analysis, classification) | |
| # Generate evidence-based recommendations | |
| clinical_recommendations = self._generate_evidence_based_recommendations(integrated_findings, classification) | |
| # Assess clinical urgency and risk | |
| urgency_assessment = self._assess_clinical_urgency(integrated_findings, classification) | |
| # Create comprehensive clinical summary | |
| comprehensive_summary = self._create_comprehensive_clinical_summary( | |
| integrated_findings, clinical_recommendations, urgency_assessment | |
| ) | |
| # Calculate overall clinical confidence | |
| overall_confidence = self._calculate_overall_clinical_confidence(model_results, integrated_findings) | |
| final_analysis = { | |
| "clinical_summary": comprehensive_summary, | |
| "domain_specific_findings": synthesized_analysis, | |
| "interdisciplinary_integration": integrated_findings, | |
| "clinical_recommendations": clinical_recommendations, | |
| "urgency_assessment": urgency_assessment, | |
| "overall_confidence": overall_confidence, | |
| "synthesis_method": "research_optimized", | |
| "generated_at": datetime.utcnow().isoformat(), | |
| "evidence_quality": self._assess_evidence_quality(model_results), | |
| "clinical_correlation": self._assess_clinical_correlation(integrated_findings) | |
| } | |
| logger.info(f"Research-optimized synthesis completed with {overall_confidence:.2f} confidence") | |
| return final_analysis | |
| except Exception as e: | |
| logger.error(f"Research-optimized synthesis failed: {str(e)}") | |
| return self._generate_fallback_synthesis(model_results, classification) | |
| def _apply_clinical_frameworks( | |
| self, model_results: List[Dict[str, Any]], classification: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """ | |
| Apply domain-specific clinical reasoning frameworks | |
| """ | |
| synthesized = {} | |
| # Group results by medical domain | |
| domain_results = self._group_results_by_domain(model_results) | |
| for domain, results in domain_results.items(): | |
| if domain in self.clinical_frameworks: | |
| domain_analysis = self._apply_domain_framework(domain, results) | |
| synthesized[domain] = domain_analysis | |
| else: | |
| synthesized[domain] = self._apply_general_analysis(results) | |
| return synthesized | |
| def _group_results_by_domain(self, model_results: List[Dict[str, Any]]) -> Dict[str, List[Dict[str, Any]]]: | |
| """Group model results by medical domain""" | |
| grouped = {} | |
| for result in model_results: | |
| domain = result.get("domain", "general") | |
| if domain not in grouped: | |
| grouped[domain] = [] | |
| grouped[domain].append(result) | |
| return grouped | |
| def _apply_domain_framework(self, domain: str, results: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """Apply specific clinical framework for the domain""" | |
| if domain == "cardiology": | |
| return self._apply_cardiology_framework(results) | |
| elif domain == "radiology": | |
| return self._apply_radiology_framework(results) | |
| elif domain == "laboratory": | |
| return self._apply_laboratory_framework(results) | |
| elif domain == "pathology": | |
| return self._apply_pathology_framework(results) | |
| elif domain == "clinical_notes": | |
| return self._apply_clinical_notes_framework(results) | |
| elif domain == "diagnosis": | |
| return self._apply_diagnosis_framework(results) | |
| else: | |
| return self._apply_general_domain_framework(results) | |
| def _apply_cardiology_framework(self, results: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """ | |
| Apply cardiology-specific clinical framework | |
| Based on research findings for HuBERT-ECG and cardiac analysis | |
| """ | |
| framework_analysis = { | |
| "rhythm_analysis": {}, | |
| "ischemia_assessment": {}, | |
| "conduction_analysis": {}, | |
| "risk_stratification": {}, | |
| "clinical_findings": [], | |
| "evidence_quality": "high" | |
| } | |
| for result in results: | |
| analysis = result.get("analysis", "") | |
| model = result.get("model", "") | |
| # Extract cardiac-specific findings | |
| rhythm_info = self._extract_cardiac_rhythm_info(analysis) | |
| if rhythm_info: | |
| framework_analysis["rhythm_analysis"].update(rhythm_info) | |
| # Assess ischemia indicators | |
| ischemia_indicators = self._extract_ischemia_indicators(analysis) | |
| if ischemia_indicators: | |
| framework_analysis["ischemia_assessment"].update(ischemia_indicators) | |
| # Analyze conduction | |
| conduction_info = self._extract_conduction_analysis(analysis) | |
| if conduction_info: | |
| framework_analysis["conduction_analysis"].update(conduction_info) | |
| # Generate clinical findings | |
| clinical_finding = self._generate_cardiac_clinical_finding(analysis, model) | |
| if clinical_finding: | |
| framework_analysis["clinical_findings"].append(clinical_finding) | |
| # Perform risk stratification | |
| framework_analysis["risk_stratification"] = self._perform_cardiac_risk_stratification(framework_analysis) | |
| return framework_analysis | |
| def _apply_radiology_framework(self, results: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """ | |
| Apply radiology-specific clinical framework | |
| Based on research findings for MONAI and MedGemma multimodal | |
| """ | |
| framework_analysis = { | |
| "pathological_findings": {}, | |
| "differential_diagnosis": [], | |
| "clinical_correlation": {}, | |
| "urgency_assessment": {}, | |
| "image_quality": "adequate", | |
| "evidence_quality": "high" | |
| } | |
| for result in results: | |
| analysis = result.get("analysis", "") | |
| model = result.get("model", "") | |
| # Extract pathological findings | |
| findings = self._extract_radiological_findings(analysis) | |
| if findings: | |
| framework_analysis["pathological_findings"].update(findings) | |
| # Generate differential diagnosis | |
| differential = self._generate_radiological_differential(analysis) | |
| if differential: | |
| framework_analysis["differential_diagnosis"].extend(differential) | |
| # Assess clinical correlation | |
| correlation = self._assess_radiological_correlation(analysis) | |
| if correlation: | |
| framework_analysis["clinical_correlation"].update(correlation) | |
| # Determine urgency | |
| urgency = self._assess_radiological_urgency(findings) | |
| if urgency: | |
| framework_analysis["urgency_assessment"] = urgency | |
| return framework_analysis | |
| def _apply_laboratory_framework(self, results: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """ | |
| Apply laboratory-specific clinical framework | |
| Based on research findings for Lab-AI and DrLlama | |
| """ | |
| framework_analysis = { | |
| "abnormal_values": [], | |
| "clinical_interpretation": {}, | |
| "trend_analysis": {}, | |
| "follow_up_needed": [], | |
| "evidence_quality": "high" | |
| } | |
| for result in results: | |
| analysis = result.get("analysis", "") | |
| model = result.get("model", "") | |
| # Extract abnormal laboratory values | |
| abnormal_values = self._extract_laboratory_abnormalities(analysis) | |
| if abnormal_values: | |
| framework_analysis["abnormal_values"].extend(abnormal_values) | |
| # Interpret clinical significance | |
| interpretation = self._interpret_laboratory_clinical_significance(analysis) | |
| if interpretation: | |
| framework_analysis["clinical_interpretation"].update(interpretation) | |
| # Determine follow-up requirements | |
| follow_up = self._determine_laboratory_follow_up(abnormal_values) | |
| if follow_up: | |
| framework_analysis["follow_up_needed"].extend(follow_up) | |
| return framework_analysis | |
| def _apply_pathology_framework(self, results: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """ | |
| Apply pathology-specific clinical framework | |
| Based on research findings for Path Foundation and UNI2-h | |
| """ | |
| framework_analysis = { | |
| "diagnostic_classification": {}, | |
| "prognostic_factors": {}, | |
| "treatment_implications": [], | |
| "quality_assessment": {}, | |
| "evidence_quality": "high" | |
| } | |
| for result in results: | |
| analysis = result.get("analysis", "") | |
| model = result.get("model", "") | |
| # Classify pathological diagnosis | |
| diagnosis = self._classify_pathological_diagnosis(analysis) | |
| if diagnosis: | |
| framework_analysis["diagnostic_classification"] = diagnosis | |
| # Identify prognostic factors | |
| prognostic = self._identify_pathological_prognostic_factors(analysis) | |
| if prognostic: | |
| framework_analysis["prognostic_factors"] = prognostic | |
| # Assess treatment implications | |
| treatment = self._assess_pathological_treatment_implications(analysis) | |
| if treatment: | |
| framework_analysis["treatment_implications"] = treatment | |
| return framework_analysis | |
| def _apply_clinical_notes_framework(self, results: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """ | |
| Apply clinical documentation framework | |
| """ | |
| framework_analysis = { | |
| "clinical_reasoning": {}, | |
| "treatment_planning": {}, | |
| "quality_indicators": {}, | |
| "documentation_analysis": {}, | |
| "evidence_quality": "high" | |
| } | |
| for result in results: | |
| analysis = result.get("analysis", "") | |
| # Analyze clinical reasoning | |
| reasoning = self._analyze_clinical_documentation_reasoning(analysis) | |
| if reasoning: | |
| framework_analysis["clinical_reasoning"] = reasoning | |
| # Evaluate treatment planning | |
| planning = self._evaluate_documentation_treatment_planning(analysis) | |
| if planning: | |
| framework_analysis["treatment_planning"] = planning | |
| return framework_analysis | |
| def _apply_diagnosis_framework(self, results: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """ | |
| Apply diagnostic reasoning framework | |
| """ | |
| framework_analysis = { | |
| "differential_diagnosis": [], | |
| "clinical_reasoning": {}, | |
| "urgency_classification": {}, | |
| "diagnostic_workup": [], | |
| "evidence_quality": "high" | |
| } | |
| for result in results: | |
| analysis = result.get("analysis", "") | |
| # Extract differential diagnosis | |
| differential = self._extract_differential_diagnosis(analysis) | |
| if differential: | |
| framework_analysis["differential_diagnosis"] = differential | |
| # Assess diagnostic reasoning | |
| reasoning = self._assess_diagnostic_reasoning(analysis) | |
| if reasoning: | |
| framework_analysis["clinical_reasoning"] = reasoning | |
| return framework_analysis | |
| def _apply_general_domain_framework(self, results: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """Apply general framework for unspecified domains""" | |
| return { | |
| "general_findings": [result.get("analysis", "") for result in results], | |
| "clinical_relevance": "moderate", | |
| "evidence_quality": "moderate" | |
| } | |
| # Cardiology-specific methods | |
| def _extract_cardiac_rhythm_info(self, analysis: str) -> Dict[str, Any]: | |
| """Extract cardiac rhythm information from analysis""" | |
| rhythm_info = {} | |
| # Extract heart rate | |
| rate_match = re.search(r'(\d+)\s*bpm', analysis, re.IGNORECASE) | |
| if rate_match: | |
| rhythm_info["heart_rate"] = int(rate_match.group(1)) | |
| rhythm_info["rate_category"] = self._categorize_heart_rate(int(rate_match.group(1))) | |
| # Extract rhythm type | |
| rhythm_patterns = [ | |
| ("sinus rhythm", "normal"), | |
| ("atrial fibrillation", "arrhythmia"), | |
| ("atrial flutter", "arrhythmia"), | |
| ("sinus tachycardia", "tachycardia"), | |
| ("sinus bradycardia", "bradycardia") | |
| ] | |
| for pattern, category in rhythm_patterns: | |
| if pattern.lower() in analysis.lower(): | |
| rhythm_info["rhythm_type"] = pattern | |
| rhythm_info["rhythm_category"] = category | |
| break | |
| return rhythm_info | |
| def _categorize_heart_rate(self, rate: int) -> str: | |
| """Categorize heart rate based on clinical ranges""" | |
| if rate < 60: | |
| return "bradycardia" | |
| elif rate <= 100: | |
| return "normal" | |
| else: | |
| return "tachycardia" | |
| def _extract_ischemia_indicators(self, analysis: str) -> Dict[str, Any]: | |
| """Extract myocardial ischemia indicators""" | |
| ischemia_info = {} | |
| # ST segment changes | |
| st_elevations = re.findall(r'ST.*?elevation.*?(?:in\s+)?(\w+\s+leads?)', analysis, re.IGNORECASE) | |
| if st_elevations: | |
| ischemia_info["st_segment_elevations"] = st_elevations | |
| st_depressions = re.findall(r'ST.*?depression.*?(?:in\s+)?(\w+\s+leads?)', analysis, re.IGNORECASE) | |
| if st_depressions: | |
| ischemia_info["st_segment_depressions"] = st_depressions | |
| # Q waves | |
| q_waves = re.findall(r'Q\s+waves?.*?(?:in\s+)?(\w+\s+leads?)', analysis, re.IGNORECASE) | |
| if q_waves: | |
| ischemia_info["pathological_q_waves"] = q_waves | |
| # T wave changes | |
| t_wave_changes = re.findall(r'T\s+wave.*?(?:in\s+)?(\w+\s+leads?)', analysis, re.IGNORECASE) | |
| if t_wave_changes: | |
| ischemia_info["t_wave_abnormalities"] = t_wave_changes | |
| return ischemia_info | |
| def _extract_conduction_analysis(self, analysis: str) -> Dict[str, Any]: | |
| """Extract cardiac conduction analysis""" | |
| conduction_info = {} | |
| # PR interval | |
| pr_match = re.search(r'PR.*?(\d+)\s*ms', analysis, re.IGNORECASE) | |
| if pr_match: | |
| pr_interval = int(pr_match.group(1)) | |
| conduction_info["pr_interval"] = pr_interval | |
| conduction_info["pr_category"] = "prolonged" if pr_interval > 200 else "normal" | |
| # QRS duration | |
| qrs_match = re.search(r'QRS.*?(\d+)\s*ms', analysis, re.IGNORECASE) | |
| if qrs_match: | |
| qrs_duration = int(qrs_match.group(1)) | |
| conduction_info["qrs_duration"] = qrs_duration | |
| conduction_info["qrs_category"] = "prolonged" if qrs_duration > 120 else "normal" | |
| # QT interval | |
| qt_match = re.search(r'QT.*?(\d+)\s*ms', analysis, re.IGNORECASE) | |
| if qt_match: | |
| qt_interval = int(qt_match.group(1)) | |
| conduction_info["qt_interval"] = qt_interval | |
| conduction_info["qt_category"] = "prolonged" if qt_interval > 440 else "normal" | |
| return conduction_info | |
| def _generate_cardiac_clinical_finding(self, analysis: str, model: str) -> Dict[str, Any]: | |
| """Generate structured cardiac clinical finding""" | |
| return { | |
| "finding_type": "cardiac_electrophysiology", | |
| "description": analysis[:200] + "..." if len(analysis) > 200 else analysis, | |
| "model_source": model, | |
| "clinical_significance": self._assess_cardiac_clinical_significance(analysis) | |
| } | |
| def _assess_cardiac_clinical_significance(self, analysis: str) -> str: | |
| """Assess clinical significance of cardiac findings""" | |
| analysis_lower = analysis.lower() | |
| # High significance indicators | |
| high_significance = ["st elevation", "myocardial infarction", "acute coronary syndrome", "significant arrhythmia"] | |
| if any(indicator in analysis_lower for indicator in high_significance): | |
| return "high" | |
| # Moderate significance indicators | |
| moderate_significance = ["st depression", "t wave changes", "mild arrhythmia", "conduction delay"] | |
| if any(indicator in analysis_lower for indicator in moderate_significance): | |
| return "moderate" | |
| return "low" | |
| def _perform_cardiac_risk_stratification(self, framework_analysis: Dict[str, Any]) -> Dict[str, Any]: | |
| """Perform cardiac risk stratification""" | |
| rhythm = framework_analysis.get("rhythm_analysis", {}) | |
| ischemia = framework_analysis.get("ischemia_assessment", {}) | |
| conduction = framework_analysis.get("conduction_analysis", {}) | |
| risk_factors = [] | |
| # Assess rate-related risk | |
| heart_rate = rhythm.get("heart_rate", 75) | |
| if heart_rate > 100: | |
| risk_factors.append("tachycardia") | |
| elif heart_rate < 50: | |
| risk_factors.append("bradycardia") | |
| # Assess ischemia-related risk | |
| if ischemia.get("st_segment_elevations"): | |
| risk_factors.append("st_elevation") | |
| if ischemia.get("pathological_q_waves"): | |
| risk_factors.append("old_mi_evidence") | |
| # Assess conduction risk | |
| pr_prolonged = conduction.get("pr_category") == "prolonged" | |
| qrs_prolonged = conduction.get("qrs_category") == "prolonged" | |
| if pr_prolonged: | |
| risk_factors.append("av_conduction_delay") | |
| if qrs_prolonged: | |
| risk_factors.append("intraventricular_conduction_delay") | |
| # Determine risk category | |
| if len(risk_factors) == 0: | |
| risk_category = "low" | |
| elif len(risk_factors) <= 2: | |
| risk_category = "moderate" | |
| else: | |
| risk_category = "high" | |
| return { | |
| "risk_category": risk_category, | |
| "risk_factors": risk_factors, | |
| "management_recommendation": self._get_cardiac_management_recommendation(risk_category) | |
| } | |
| def _get_cardiac_management_recommendation(self, risk_category: str) -> str: | |
| """Get cardiac management recommendation based on risk""" | |
| recommendations = { | |
| "low": "Routine cardiology follow-up as indicated", | |
| "moderate": "Close cardiac monitoring with cardiology consultation", | |
| "high": "Urgent cardiology evaluation with possible hospitalization" | |
| } | |
| return recommendations.get(risk_category, "Clinical correlation required") | |
| # Radiology-specific methods | |
| def _extract_radiological_findings(self, analysis: str) -> Dict[str, Any]: | |
| """Extract radiological findings from analysis""" | |
| findings = {} | |
| # Extract modality | |
| modalities = ["x-ray", "ct", "mri", "ultrasound", "nuclear"] | |
| for modality in modalities: | |
| if modality.lower() in analysis.lower(): | |
| findings["modality"] = modality.upper() | |
| break | |
| # Extract findings patterns | |
| finding_patterns = { | |
| "consolidation": r"consolidation.*?(?:in\s+)?([^.]+)", | |
| "pleural_effusion": r"pleural effusion.*?(?:in\s+)?([^.]+)", | |
| "pneumothorax": r"pneumothorax", | |
| "mass": r"mass.*?(?:measuring\s+)?([^.]+)", | |
| "fracture": r"fracture.*?(?:of\s+)?([^.]+)" | |
| } | |
| for finding_type, pattern in finding_patterns.items(): | |
| match = re.search(pattern, analysis, re.IGNORECASE) | |
| if match: | |
| findings[finding_type] = match.group(1) if match.lastindex else True | |
| return findings | |
| def _generate_radiological_differential(self, analysis: str) -> List[Dict[str, Any]]: | |
| """Generate radiological differential diagnosis""" | |
| differential = [] | |
| # Common differential patterns | |
| differential_patterns = { | |
| "pneumonia": ["consolidation", "air bronchogram", "infiltrate"], | |
| "pulmonary_edema": ["perihilar haziness", "cardiomegaly", "pleural effusion"], | |
| "pneumothorax": ["pneumothorax", "lung collapse"], | |
| "pulmonary_embolism": ["perfusion defect", "pleural based opacity"], | |
| "malignancy": ["mass", "nodule", "spiculated"] | |
| } | |
| analysis_lower = analysis.lower() | |
| for diagnosis, indicators in differential_patterns.items(): | |
| if any(indicator.lower() in analysis_lower for indicator in indicators): | |
| differential.append({ | |
| "diagnosis": diagnosis, | |
| "likelihood": "likely" if len([i for i in indicators if i.lower() in analysis_lower]) > 1 else "possible" | |
| }) | |
| return differential | |
| def _assess_radiological_correlation(self, analysis: str) -> Dict[str, Any]: | |
| """Assess radiological correlation with clinical presentation""" | |
| return { | |
| "clinical_alignment": self._assess_clinical_alignment(analysis), | |
| "expected_findings": self._identify_expected_findings(analysis), | |
| "unusual_features": self._identify_unusual_features(analysis) | |
| } | |
| def _assess_clinical_alignment(self, analysis: str) -> str: | |
| """Assess alignment with clinical presentation""" | |
| alignment_keywords = { | |
| "consistent": ["consistent with", "correlates with", "explains"], | |
| "partially_consistent": ["may represent", "could be", "possible"], | |
| "inconsistent": ["unexpected", "unusual", "atypical"] | |
| } | |
| analysis_lower = analysis.lower() | |
| for alignment, keywords in alignment_keywords.items(): | |
| if any(keyword in analysis_lower for keyword in keywords): | |
| return alignment | |
| return "needs_correlation" | |
| def _assess_radiological_urgency(self, findings: Dict[str, Any]) -> Dict[str, Any]: | |
| """Assess radiological urgency""" | |
| urgent_findings = { | |
| "pneumothorax": "stat", | |
| "consolidation": "urgent", | |
| "mass": "routine", | |
| "pleural_effusion": "urgent" | |
| } | |
| highest_urgency = "routine" | |
| for finding_type, urgency in urgent_findings.items(): | |
| if finding_type in findings: | |
| if urgency == "stat" or (urgency == "urgent" and highest_urgency == "routine"): | |
| highest_urgency = urgency | |
| return { | |
| "urgency_level": highest_urgency, | |
| "timeframe": self._get_urgency_timeframe(highest_urgency) | |
| } | |
| def _get_urgency_timeframe(self, urgency: str) -> str: | |
| """Get urgency timeframe""" | |
| timeframes = { | |
| "stat": "immediate", | |
| "urgent": "24 hours", | |
| "routine": "routine follow-up" | |
| } | |
| return timeframes.get(urgency, "routine") | |
| # Laboratory-specific methods | |
| def _extract_laboratory_abnormalities(self, analysis: str) -> List[Dict[str, Any]]: | |
| """Extract laboratory abnormalities""" | |
| abnormalities = [] | |
| # Common lab value patterns | |
| value_patterns = { | |
| "glucose": r'glucose.*?(\d+\.?\d*).*?(high|low|elevated|decreased)', | |
| "creatinine": r'creatinine.*?(\d+\.?\d*).*?(high|elevated)', | |
| "hemoglobin": r'hemoglobin.*?(\d+\.?\d*).*?(low|decreased|anemic)', | |
| "wbc": r'wbc.*?(\d+\.?\d*).*?(high|elevated|low|decreased)', | |
| "platelets": r'platelet.*?(\d+\.?\d*).*?(low|decreased|thrombocytopenia)' | |
| } | |
| for test_name, pattern in value_patterns.items(): | |
| matches = re.findall(pattern, analysis, re.IGNORECASE) | |
| for value, direction in matches: | |
| abnormalities.append({ | |
| "test": test_name, | |
| "value": float(value), | |
| "direction": direction, | |
| "clinical_significance": self._assess_lab_clinical_significance(test_name, direction) | |
| }) | |
| return abnormalities | |
| def _interpret_laboratory_clinical_significance(self, analysis: str) -> Dict[str, Any]: | |
| """Interpret clinical significance of laboratory values""" | |
| significance_indicators = { | |
| "diabetes": ["glucose", "hba1c", "insulin"], | |
| "kidney_disease": ["creatinine", "bun", "egfr"], | |
| "anemia": ["hemoglobin", "hematocrit", "ferritin"], | |
| "infection": ["wbc", "neutrophils", "crp"], | |
| "coagulation": ["inr", "pt", "ptt"] | |
| } | |
| interpretation = {} | |
| analysis_lower = analysis.lower() | |
| for condition, indicators in significance_indicators.items(): | |
| if any(indicator.lower() in analysis_lower for indicator in indicators): | |
| interpretation[condition] = self._assess_condition_severity(analysis, indicators) | |
| return interpretation | |
| def _assess_lab_clinical_significance(self, test: str, direction: str) -> str: | |
| """Assess clinical significance of lab abnormality""" | |
| significance_matrix = { | |
| ("glucose", "high"): "diabetes_monitoring", | |
| ("glucose", "low"): "hypoglycemia_risk", | |
| ("creatinine", "high"): "kidney_function", | |
| ("hemoglobin", "low"): "anemia_evaluation", | |
| ("wbc", "high"): "infection_screening", | |
| ("wbc", "low"): "immunocompromise_risk", | |
| ("platelets", "low"): "bleeding_risk" | |
| } | |
| return significance_matrix.get((test, direction), "clinical_correlation_needed") | |
| def _assess_condition_severity(self, analysis: str, indicators: List[str]) -> str: | |
| """Assess severity of medical condition""" | |
| analysis_lower = analysis.lower() | |
| severe_indicators = ["markedly", "severely", "critically", "emergency"] | |
| moderate_indicators = ["moderately", "significant", "concerning"] | |
| if any(indicator in analysis_lower for indicator in severe_indicators): | |
| return "severe" | |
| elif any(indicator in analysis_lower for indicator in moderate_indicators): | |
| return "moderate" | |
| else: | |
| return "mild" | |
| def _determine_laboratory_follow_up(self, abnormalities: List[Dict[str, Any]]) -> List[str]: | |
| """Determine laboratory follow-up requirements""" | |
| follow_up_recommendations = [] | |
| for abnormality in abnormalities: | |
| test = abnormality.get("test", "") | |
| significance = abnormality.get("clinical_significance", "") | |
| if significance == "diabetes_monitoring": | |
| follow_up_recommendations.append("Diabetes monitoring with endocrinology consultation") | |
| elif significance == "kidney_function": | |
| follow_up_recommendations.append("Nephrology consultation for kidney function evaluation") | |
| elif significance == "anemia_evaluation": | |
| follow_up_recommendations.append("Hematology evaluation for anemia workup") | |
| elif significance == "infection_screening": | |
| follow_up_recommendations.append("Infection workup with repeat WBC in 24-48 hours") | |
| elif significance == "bleeding_risk": | |
| follow_up_recommendations.append("Hematology consultation for bleeding risk assessment") | |
| return list(set(follow_up_recommendations)) # Remove duplicates | |
| # Pathology-specific methods | |
| def _classify_pathological_diagnosis(self, analysis: str) -> Dict[str, Any]: | |
| """Classify pathological diagnosis""" | |
| diagnosis_classification = {} | |
| # Extract diagnosis type | |
| if "benign" in analysis.lower(): | |
| diagnosis_classification["nature"] = "benign" | |
| elif "malignant" in analysis.lower(): | |
| diagnosis_classification["nature"] = "malignant" | |
| elif "suspicious" in analysis.lower(): | |
| diagnosis_classification["nature"] = "suspicious" | |
| # Extract grade if mentioned | |
| grade_pattern = r'grade\s*(\w+)' | |
| grade_match = re.search(grade_pattern, analysis, re.IGNORECASE) | |
| if grade_match: | |
| diagnosis_classification["grade"] = grade_match.group(1) | |
| # Extract stage if mentioned | |
| stage_pattern = r'stage\s*(\w+)' | |
| stage_match = re.search(stage_pattern, analysis, re.IGNORECASE) | |
| if stage_match: | |
| diagnosis_classification["stage"] = stage_match.group(1) | |
| return diagnosis_classification | |
| def _identify_pathological_prognostic_factors(self, analysis: str) -> Dict[str, Any]: | |
| """Identify pathological prognostic factors""" | |
| prognostic_factors = {} | |
| # Common prognostic indicators | |
| if "lymphovascular invasion" in analysis.lower(): | |
| prognostic_factors["lymphovascular_invasion"] = True | |
| if "perineural invasion" in analysis.lower(): | |
| prognostic_factors["perineural_invasion"] = True | |
| if "mitotic rate" in analysis.lower(): | |
| mitotic_match = re.search(r'mitotic rate.*?(\d+)', analysis, re.IGNORECASE) | |
| if mitotic_match: | |
| prognostic_factors["mitotic_rate"] = int(mitotic_match.group(1)) | |
| return prognostic_factors | |
| def _assess_pathological_treatment_implications(self, analysis: str) -> List[str]: | |
| """Assess treatment implications from pathological findings""" | |
| treatment_implications = [] | |
| if "surgery" in analysis.lower(): | |
| treatment_implications.append("Surgical resection indicated") | |
| if "chemotherapy" in analysis.lower(): | |
| treatment_implications.append("Chemotherapy may be indicated") | |
| if "radiation" in analysis.lower(): | |
| treatment_implications.append("Radiation therapy consideration") | |
| if "hormone therapy" in analysis.lower(): | |
| treatment_implications.append("Hormone therapy may be beneficial") | |
| if "targeted therapy" in analysis.lower(): | |
| treatment_implications.append("Targeted therapy evaluation needed") | |
| return treatment_implications | |
| # Clinical notes methods | |
| def _analyze_clinical_documentation_reasoning(self, analysis: str) -> Dict[str, Any]: | |
| """Analyze clinical reasoning in documentation""" | |
| return { | |
| "reasoning_quality": self._assess_reasoning_quality(analysis), | |
| "evidence_base": self._assess_evidence_base(analysis), | |
| "diagnostic_approach": self._identify_diagnostic_approach(analysis) | |
| } | |
| def _assess_reasoning_quality(self, analysis: str) -> str: | |
| """Assess quality of clinical reasoning""" | |
| quality_indicators = { | |
| "excellent": ["evidence-based", "systematic approach", "comprehensive evaluation"], | |
| "good": ["thorough", "appropriate", "well-reasoned"], | |
| "adequate": ["basic", "reasonable", "acceptable"], | |
| "poor": ["incomplete", "inadequate", "lacking"] | |
| } | |
| analysis_lower = analysis.lower() | |
| for quality, indicators in quality_indicators.items(): | |
| if any(indicator in analysis_lower for indicator in indicators): | |
| return quality | |
| return "needs_assessment" | |
| def _assess_evidence_base(self, analysis: str) -> str: | |
| """Assess evidence base of clinical reasoning""" | |
| if "evidence" in analysis.lower() or "studies" in analysis.lower(): | |
| return "evidence_based" | |
| elif "guidelines" in analysis.lower(): | |
| return "guideline_based" | |
| else: | |
| return "experience_based" | |
| def _identify_diagnostic_approach(self, analysis: str) -> str: | |
| """Identify diagnostic approach used""" | |
| approach_patterns = { | |
| "systematic": ["systematic", "comprehensive", "structured"], | |
| "targeted": ["targeted", "focused", "specific"], | |
| "differential": ["differential", "comparison", "alternatives"] | |
| } | |
| analysis_lower = analysis.lower() | |
| for approach, indicators in approach_patterns.items(): | |
| if any(indicator in analysis_lower for indicator in indicators): | |
| return approach | |
| return "unknown" | |
| def _evaluate_documentation_treatment_planning(self, analysis: str) -> Dict[str, Any]: | |
| """Evaluate treatment planning in documentation""" | |
| return { | |
| "treatment_rationale": self._assess_treatment_rationale(analysis), | |
| "follow_up_plan": self._assess_follow_up_plan(analysis), | |
| "monitoring_parameters": self._identify_monitoring_parameters(analysis) | |
| } | |
| def _assess_treatment_rationale(self, analysis: str) -> str: | |
| """Assess treatment rationale""" | |
| if "contraindicated" in analysis.lower(): | |
| return "contraindicated" | |
| elif "indicated" in analysis.lower(): | |
| return "indicated" | |
| elif "consider" in analysis.lower(): | |
| return "consider" | |
| else: | |
| return "needs_clarification" | |
| def _assess_follow_up_plan(self, analysis: str) -> str: | |
| """Assess follow-up plan completeness""" | |
| if "follow-up" in analysis.lower() or "follow up" in analysis.lower(): | |
| return "planned" | |
| else: | |
| return "missing" | |
| def _identify_monitoring_parameters(self, analysis: str) -> List[str]: | |
| """Identify monitoring parameters mentioned""" | |
| parameters = [] | |
| monitoring_keywords = ["monitor", "check", "track", "measure", "assess"] | |
| for keyword in monitoring_keywords: | |
| if keyword in analysis.lower(): | |
| # This is a simplified extraction - in practice would use more sophisticated NLP | |
| parameters.append(f"Monitor {keyword}-related parameters") | |
| return parameters | |
| # Diagnosis methods | |
| def _extract_differential_diagnosis(self, analysis: str) -> List[Dict[str, Any]]: | |
| """Extract differential diagnosis from analysis""" | |
| differential = [] | |
| # Common diagnosis patterns | |
| diagnosis_patterns = [ | |
| r'(?:most\s+likely|primary|differential|consider)\s*:?\s*([^.]+)', | |
| r'(?:diagnosis|condition)\s*:?\s*([^.]+)' | |
| ] | |
| for pattern in diagnosis_patterns: | |
| matches = re.findall(pattern, analysis, re.IGNORECASE) | |
| for match in matches: | |
| if len(match.strip()) > 3: # Filter out very short matches | |
| differential.append({ | |
| "diagnosis": match.strip(), | |
| "likelihood": self._assess_diagnosis_likelihood(analysis, match) | |
| }) | |
| return differential | |
| def _assess_diagnosis_likelihood(self, analysis: str, diagnosis: str) -> str: | |
| """Assess likelihood of diagnosis""" | |
| analysis_lower = analysis.lower() | |
| diagnosis_lower = diagnosis.lower() | |
| likelihood_indicators = { | |
| "high": ["most likely", "primary", "definite", "confirmed"], | |
| "moderate": ["likely", "probable", "suspected"], | |
| "low": ["possible", "consider", "rule out", "differential"] | |
| } | |
| for likelihood, indicators in likelihood_indicators.items(): | |
| if any(indicator in analysis_lower for indicator in indicators): | |
| return likelihood | |
| return "unknown" | |
| def _assess_diagnostic_reasoning(self, analysis: str) -> Dict[str, Any]: | |
| """Assess quality of diagnostic reasoning""" | |
| return { | |
| "systematic_approach": self._assess_systematic_approach(analysis), | |
| "evidence_support": self._assess_evidence_support(analysis), | |
| "clinical_correlation": self._assess_clinical_correlation_simple(analysis) | |
| } | |
| def _assess_systematic_approach(self, analysis: str) -> str: | |
| """Assess if diagnostic approach is systematic""" | |
| systematic_indicators = ["differential", "rule out", "systematic", "comprehensive"] | |
| if any(indicator in analysis.lower() for indicator in systematic_indicators): | |
| return "systematic" | |
| else: | |
| return "ad_hoc" | |
| def _assess_evidence_support(self, analysis: str) -> str: | |
| """Assess evidence supporting diagnosis""" | |
| if "imaging" in analysis.lower() or "laboratory" in analysis.lower(): | |
| return "objective_evidence" | |
| elif "history" in analysis.lower() or "examination" in analysis.lower(): | |
| return "subjective_evidence" | |
| else: | |
| return "limited_evidence" | |
| def _assess_clinical_correlation_simple(self, analysis: str) -> str: | |
| """Simple assessment of clinical correlation""" | |
| if "correlate" in analysis.lower() or "consistent" in analysis.lower(): | |
| return "good" | |
| elif "inconsistent" in analysis.lower() or "unexpected" in analysis.lower(): | |
| return "poor" | |
| else: | |
| return "adequate" | |
| # Integration and synthesis methods | |
| def _integrate_interdisciplinary_findings( | |
| self, domain_analysis: Dict[str, Any], classification: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """ | |
| Integrate findings across medical domains | |
| """ | |
| integrated = { | |
| "primary_diagnosis": self._determine_primary_diagnosis(domain_analysis), | |
| "secondary_findings": self._identify_secondary_findings(domain_analysis), | |
| "clinical_correlation": self._assess_interdisciplinary_correlation(domain_analysis), | |
| "management_plan": self._create_integrated_management_plan(domain_analysis), | |
| "specialty_consultations": self._recommend_specialty_consultations(domain_analysis) | |
| } | |
| return integrated | |
| def _determine_primary_diagnosis(self, domain_analysis: Dict[str, Any]) -> Dict[str, Any]: | |
| """Determine primary diagnosis from integrated analysis""" | |
| # This would implement sophisticated logic to determine the most likely primary diagnosis | |
| # For now, simplified approach | |
| for domain, analysis in domain_analysis.items(): | |
| if domain == "cardiology": | |
| rhythm_analysis = analysis.get("rhythm_analysis", {}) | |
| if rhythm_analysis.get("rhythm_category") == "arrhythmia": | |
| return { | |
| "primary_diagnosis": "Cardiac arrhythmia", | |
| "confidence": "high", | |
| "specialty": "cardiology" | |
| } | |
| elif domain == "radiology": | |
| findings = analysis.get("pathological_findings", {}) | |
| if findings.get("consolidation"): | |
| return { | |
| "primary_diagnosis": "Pneumonia", | |
| "confidence": "moderate", | |
| "specialty": "radiology" | |
| } | |
| return { | |
| "primary_diagnosis": "Requires clinical correlation", | |
| "confidence": "low", | |
| "specialty": "general" | |
| } | |
| def _identify_secondary_findings(self, domain_analysis: Dict[str, Any]) -> List[Dict[str, Any]]: | |
| """Identify secondary findings across domains""" | |
| secondary_findings = [] | |
| for domain, analysis in domain_analysis.items(): | |
| if domain == "laboratory": | |
| abnormal_values = analysis.get("abnormal_values", []) | |
| for abnormality in abnormal_values: | |
| if abnormality.get("clinical_significance") != "primary_diagnosis": | |
| secondary_findings.append({ | |
| "finding": f"Abnormal {abnormality.get('test', 'lab value')}", | |
| "domain": domain, | |
| "significance": "secondary" | |
| }) | |
| return secondary_findings | |
| def _assess_interdisciplinary_correlation(self, domain_analysis: Dict[str, Any]) -> Dict[str, Any]: | |
| """Assess correlation between findings from different specialties""" | |
| return { | |
| "correlation_quality": "good" if len(domain_analysis) > 1 else "limited", | |
| "consistency": "consistent", | |
| "contradictions": [], | |
| "gaps_identified": [] | |
| } | |
| def _create_integrated_management_plan(self, domain_analysis: Dict[str, Any]) -> Dict[str, Any]: | |
| """Create integrated management plan""" | |
| return { | |
| "immediate_actions": self._determine_immediate_actions(domain_analysis), | |
| "monitoring_plan": self._create_monitoring_plan(domain_analysis), | |
| "follow_up_schedule": self._determine_follow_up_schedule(domain_analysis), | |
| "patient_education": self._recommend_patient_education(domain_analysis) | |
| } | |
| def _determine_immediate_actions(self, domain_analysis: Dict[str, Any]) -> List[str]: | |
| """Determine immediate actions needed""" | |
| immediate_actions = [] | |
| for domain, analysis in domain_analysis.items(): | |
| if domain == "cardiology": | |
| risk_strat = analysis.get("risk_stratification", {}) | |
| if risk_strat.get("risk_category") == "high": | |
| immediate_actions.append("Urgent cardiology evaluation") | |
| elif domain == "radiology": | |
| urgency = analysis.get("urgency_assessment", {}) | |
| if urgency.get("urgency_level") == "stat": | |
| immediate_actions.append("Immediate radiological correlation") | |
| elif domain == "laboratory": | |
| # Check for critical values | |
| pass | |
| return immediate_actions | |
| def _create_monitoring_plan(self, domain_analysis: Dict[str, Any]) -> Dict[str, Any]: | |
| """Create monitoring plan""" | |
| return { | |
| "vital_signs": "Continuous monitoring for high-risk patients", | |
| "laboratory": "Serial laboratory monitoring as indicated", | |
| "imaging": "Follow-up imaging per specialty recommendations", | |
| "symptoms": "Daily symptom assessment and documentation" | |
| } | |
| def _determine_follow_up_schedule(self, domain_analysis: Dict[str, Any]) -> List[Dict[str, Any]]: | |
| """Determine follow-up schedule""" | |
| follow_up = [] | |
| for domain in domain_analysis.keys(): | |
| follow_up.append({ | |
| "specialty": domain, | |
| "timeframe": self._get_specialty_follow_up_timeframe(domain), | |
| "purpose": "Specialty-specific evaluation and management" | |
| }) | |
| return follow_up | |
| def _get_specialty_follow_up_timeframe(self, domain: str) -> str: | |
| """Get appropriate follow-up timeframe by specialty""" | |
| timeframes = { | |
| "cardiology": "1-2 weeks", | |
| "radiology": "As clinically indicated", | |
| "laboratory": "24-48 hours for critical values", | |
| "pathology": "1 week for results review", | |
| "clinical_notes": "Per primary care provider" | |
| } | |
| return timeframes.get(domain, "As clinically indicated") | |
| def _recommend_patient_education(self, domain_analysis: Dict[str, Any]) -> List[str]: | |
| """Recommend patient education topics""" | |
| education_topics = [] | |
| for domain in domain_analysis.keys(): | |
| if domain == "cardiology": | |
| education_topics.append("Cardiac risk factor modification") | |
| elif domain == "radiology": | |
| education_topics.append("Importance of follow-up imaging") | |
| elif domain == "laboratory": | |
| education_topics.append("Medication compliance and monitoring") | |
| return education_topics | |
| def _recommend_specialty_consultations(self, domain_analysis: Dict[str, Any]) -> List[Dict[str, Any]]: | |
| """Recommend specialty consultations""" | |
| consultations = [] | |
| for domain, analysis in domain_analysis.items(): | |
| if domain == "cardiology": | |
| risk_strat = analysis.get("risk_stratification", {}) | |
| if risk_strat.get("risk_category") == "high": | |
| consultations.append({ | |
| "specialty": "Cardiology", | |
| "urgency": "urgent", | |
| "reason": "High cardiac risk stratification" | |
| }) | |
| elif domain == "radiology": | |
| urgency = analysis.get("urgency_assessment", {}) | |
| if urgency.get("urgency_level") == "stat": | |
| consultations.append({ | |
| "specialty": "Radiology", | |
| "urgency": "stat", | |
| "reason": "Critical radiological findings" | |
| }) | |
| return consultations | |
| # Evidence-based recommendations | |
| def _generate_evidence_based_recommendations( | |
| self, integrated_findings: Dict[str, Any], classification: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """ | |
| Generate evidence-based clinical recommendations | |
| """ | |
| recommendations = { | |
| "immediate_interventions": self._recommend_immediate_interventions(integrated_findings), | |
| "diagnostic_workup": self._recommend_diagnostic_workup(integrated_findings), | |
| "treatment_recommendations": self._recommend_treatments(integrated_findings), | |
| "monitoring_strategy": self._recommend_monitoring_strategy(integrated_findings), | |
| "patient_safety": self._recommend_patient_safety_measures(integrated_findings) | |
| } | |
| return recommendations | |
| def _recommend_immediate_interventions(self, integrated_findings: Dict[str, Any]) -> List[Dict[str, Any]]: | |
| """Recommend immediate clinical interventions""" | |
| immediate_interventions = [] | |
| primary_dx = integrated_findings.get("primary_diagnosis", {}) | |
| if primary_dx.get("confidence") == "high": | |
| immediate_interventions.append({ | |
| "intervention": "Initiate evidence-based treatment for primary diagnosis", | |
| "urgency": "immediate", | |
| "evidence_level": "high" | |
| }) | |
| urgency_assessment = integrated_findings.get("urgency_assessment", {}) | |
| if urgency_assessment.get("overall_urgency") == "high": | |
| immediate_interventions.append({ | |
| "intervention": "Urgent specialty consultation and evaluation", | |
| "urgency": "stat", | |
| "evidence_level": "high" | |
| }) | |
| return immediate_interventions | |
| def _recommend_diagnostic_workup(self, integrated_findings: Dict[str, Any]) -> List[Dict[str, Any]]: | |
| """Recommend diagnostic workup""" | |
| diagnostic_workup = [] | |
| # This would implement evidence-based diagnostic recommendations | |
| # based on the primary diagnosis and clinical findings | |
| return diagnostic_workup | |
| def _recommend_treatments(self, integrated_findings: Dict[str, Any]) -> List[Dict[str, Any]]: | |
| """Recommend evidence-based treatments""" | |
| treatments = [] | |
| # This would implement evidence-based treatment recommendations | |
| return treatments | |
| def _recommend_monitoring_strategy(self, integrated_findings: Dict[str, Any]) -> Dict[str, Any]: | |
| """Recommend monitoring strategy""" | |
| return { | |
| "vital_signs_frequency": "Per clinical protocol", | |
| "laboratory_monitoring": "As indicated by clinical status", | |
| "imaging_follow_up": "Per radiology recommendations", | |
| "symptom_monitoring": "Daily assessment" | |
| } | |
| def _recommend_patient_safety_measures(self, integrated_findings: Dict[str, Any]) -> List[str]: | |
| """Recommend patient safety measures""" | |
| return [ | |
| "Fall risk assessment and precautions", | |
| "Medication reconciliation and review", | |
| "Infection control measures if indicated", | |
| "Patient/family education on warning signs" | |
| ] | |
| # Clinical urgency assessment | |
| def _assess_clinical_urgency( | |
| self, integrated_findings: Dict[str, Any], classification: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """ | |
| Assess overall clinical urgency | |
| """ | |
| urgency_factors = [] | |
| primary_diagnosis = integrated_findings.get("primary_diagnosis", {}) | |
| # Assess urgency based on diagnosis confidence | |
| if primary_diagnosis.get("confidence") == "high": | |
| urgency_factors.append("high_confidence_diagnosis") | |
| # Assess based on risk stratification | |
| # This would integrate risk assessments from all domains | |
| # Determine overall urgency | |
| if len(urgency_factors) == 0: | |
| overall_urgency = "routine" | |
| elif len(urgency_factors) <= 2: | |
| overall_urgency = "urgent" | |
| else: | |
| overall_urgency = "stat" | |
| return { | |
| "overall_urgency": overall_urgency, | |
| "urgency_factors": urgency_factors, | |
| "timeframe": self._get_urgency_timeframe(overall_urgency), | |
| "immediate_actions_required": self._determine_immediate_urgency_actions(overall_urgency) | |
| } | |
| def _determine_immediate_urgency_actions(self, urgency_level: str) -> List[str]: | |
| """Determine immediate actions based on urgency level""" | |
| if urgency_level == "stat": | |
| return [ | |
| "Immediate physician evaluation", | |
| "Stat laboratory and imaging", | |
| "Continuous monitoring", | |
| "Prepare for emergency interventions" | |
| ] | |
| elif urgency_level == "urgent": | |
| return [ | |
| "Urgent physician evaluation within 4 hours", | |
| "Expedited laboratory and imaging", | |
| "Frequent monitoring", | |
| "Specialty consultation" | |
| ] | |
| else: | |
| return [ | |
| "Routine physician evaluation", | |
| "Standard monitoring", | |
| "Routine follow-up" | |
| ] | |
| # Comprehensive clinical summary | |
| def _create_comprehensive_clinical_summary( | |
| self, | |
| integrated_findings: Dict[str, Any], | |
| recommendations: Dict[str, Any], | |
| urgency_assessment: Dict[str, Any] | |
| ) -> str: | |
| """ | |
| Create comprehensive clinical summary | |
| """ | |
| summary_parts = [] | |
| # Primary diagnosis | |
| primary_dx = integrated_findings.get("primary_diagnosis", {}) | |
| if primary_dx: | |
| summary_parts.append( | |
| f"Primary Diagnosis: {primary_dx.get('primary_diagnosis', 'Requires correlation')} " | |
| f"(Confidence: {primary_dx.get('confidence', 'unknown')})" | |
| ) | |
| # Key findings | |
| secondary_findings = integrated_findings.get("secondary_findings", []) | |
| if secondary_findings: | |
| finding_text = "; ".join([f.get("finding", "") for f in secondary_findings[:3]]) | |
| if finding_text: | |
| summary_parts.append(f"Key Findings: {finding_text}") | |
| # Urgency assessment | |
| overall_urgency = urgency_assessment.get("overall_urgency", "routine") | |
| summary_parts.append(f"Clinical Urgency: {overall_urgency.title()}") | |
| # Immediate recommendations | |
| immediate_actions = recommendations.get("immediate_interventions", []) | |
| if immediate_actions: | |
| action_text = "; ".join([action.get("intervention", "") for action in immediate_actions[:2]]) | |
| if action_text: | |
| summary_parts.append(f"Immediate Actions: {action_text}") | |
| return ". ".join(summary_parts) + "." | |
| # Quality and confidence assessment | |
| def _calculate_overall_clinical_confidence( | |
| self, model_results: List[Dict[str, Any]], integrated_findings: Dict[str, Any] | |
| ) -> float: | |
| """ | |
| Calculate overall clinical confidence based on multiple factors | |
| """ | |
| # Base confidence from individual models | |
| model_confidences = [] | |
| for result in model_results: | |
| if "confidence" in result: | |
| model_confidences.append(result["confidence"]) | |
| else: | |
| model_confidences.append(0.75) # Default confidence | |
| avg_model_confidence = np.mean(model_confidences) if model_confidences else 0.75 | |
| # Adjust based on domain coverage | |
| domains_covered = len(set(result.get("domain", "general") for result in model_results)) | |
| domain_bonus = min(domains_covered * 0.05, 0.20) # Max 20% bonus | |
| # Adjust based on diagnosis confidence | |
| primary_dx = integrated_findings.get("primary_diagnosis", {}) | |
| dx_confidence_bonus = 0.0 | |
| if primary_dx.get("confidence") == "high": | |
| dx_confidence_bonus = 0.10 | |
| elif primary_dx.get("confidence") == "moderate": | |
| dx_confidence_bonus = 0.05 | |
| overall_confidence = min(avg_model_confidence + domain_bonus + dx_confidence_bonus, 0.95) | |
| return overall_confidence | |
| def _assess_evidence_quality(self, model_results: List[Dict[str, Any]]) -> Dict[str, str]: | |
| """Assess quality of evidence""" | |
| evidence_quality = {} | |
| for result in model_results: | |
| domain = result.get("domain", "general") | |
| model = result.get("model", "") | |
| # Assign evidence quality based on model type and research findings | |
| if model in ["HuBERT-ECG", "Bio_ClinicalBERT", "MONAI"]: | |
| quality = "high" | |
| elif model in ["MedGemma 27B", "MedGemma 4B"]: | |
| quality = "high" | |
| else: | |
| quality = "moderate" | |
| evidence_quality[domain] = quality | |
| return evidence_quality | |
| def _assess_clinical_correlation(self, integrated_findings: Dict[str, Any]) -> str: | |
| """Assess overall clinical correlation quality""" | |
| primary_dx = integrated_findings.get("primary_diagnosis", {}) | |
| correlation = integrated_findings.get("clinical_correlation", {}) | |
| if primary_dx.get("confidence") == "high" and correlation.get("correlation_quality") == "good": | |
| return "excellent" | |
| elif primary_dx.get("confidence") in ["high", "moderate"]: | |
| return "good" | |
| elif primary_dx.get("confidence") == "low": | |
| return "poor" | |
| else: | |
| return "needs_improvement" | |
| # Fallback synthesis | |
| def _generate_fallback_synthesis( | |
| self, model_results: List[Dict[str, Any]], classification: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """ | |
| Generate fallback synthesis when main synthesis fails | |
| """ | |
| return { | |
| "clinical_summary": "Medical document analysis completed with basic clinical interpretation", | |
| "domain_specific_findings": { | |
| "general": { | |
| "findings": [result.get("analysis", "") for result in model_results], | |
| "clinical_relevance": "moderate" | |
| } | |
| }, | |
| "clinical_recommendations": { | |
| "general_recommendations": [ | |
| "Clinical correlation recommended", | |
| "Specialist consultation as indicated", | |
| "Routine follow-up per primary care provider" | |
| ] | |
| }, | |
| "urgency_assessment": { | |
| "overall_urgency": "routine", | |
| "timeframe": "routine follow-up" | |
| }, | |
| "overall_confidence": 0.65, | |
| "synthesis_method": "fallback", | |
| "note": "Basic synthesis - enhanced analysis unavailable" | |
| } | |
| # Legacy compatibility methods | |
| def synthesize_analysis( | |
| self, | |
| model_results: List[Dict[str, Any]], | |
| classification: Dict[str, Any], | |
| pdf_content: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """Legacy method for backward compatibility""" | |
| return self.synthesize_research_optimized_analysis(model_results, classification, pdf_content) | |
| # Compatibility alias for backward compatibility | |
| AnalysisSynthesizer = EnhancedAnalysisSynthesizer | |