""" Analysis Synthesizer - Result Aggregation and Synthesis Combines outputs from multiple specialized models """ import logging from typing import Dict, List, Any, Optional from datetime import datetime logger = logging.getLogger(__name__) class AnalysisSynthesizer: """ Synthesizes results from multiple specialized models into a comprehensive medical document analysis Implements: - Result aggregation - Conflict resolution - Confidence calibration - Clinical insights generation """ def __init__(self): self.fusion_strategies = { "early": self._early_fusion, "late": self._late_fusion, "weighted": self._weighted_fusion } logger.info("Analysis Synthesizer initialized") async def synthesize( self, classification: Dict[str, Any], specialized_results: List[Dict[str, Any]], pdf_content: Dict[str, Any] ) -> Dict[str, Any]: """ Synthesize results from multiple models Returns comprehensive analysis with: - Aggregated findings - Key insights - Recommendations - Risk assessment - Confidence scores """ try: logger.info(f"Synthesizing {len(specialized_results)} model results") # Extract successful results successful_results = [ r for r in specialized_results if r.get("status") == "completed" ] if not successful_results: return self._generate_fallback_analysis(classification, pdf_content) # Aggregate findings by domain aggregated_findings = self._aggregate_by_domain(successful_results) # Generate clinical insights insights = self._generate_insights( aggregated_findings, classification, pdf_content ) # Calculate overall confidence overall_confidence = self._calculate_overall_confidence(successful_results) # Generate summary summary = self._generate_summary( classification, aggregated_findings, insights ) # Generate recommendations recommendations = self._generate_recommendations( aggregated_findings, classification ) # Compile final analysis analysis = { "document_type": classification["document_type"], "classification_confidence": classification["confidence"], "overall_confidence": overall_confidence, "summary": summary, "aggregated_findings": aggregated_findings, "clinical_insights": insights, "recommendations": recommendations, "models_used": [ { "model": r["model_name"], "domain": r["domain"], "confidence": r.get("result", {}).get("confidence", 0.0) } for r in successful_results ], "quality_metrics": { "models_executed": len(successful_results), "models_failed": len(specialized_results) - len(successful_results), "overall_confidence": overall_confidence }, "metadata": { "synthesis_timestamp": datetime.utcnow().isoformat(), "page_count": pdf_content.get("page_count", 0), "has_images": len(pdf_content.get("images", [])) > 0, "has_tables": len(pdf_content.get("tables", [])) > 0 } } logger.info("Synthesis completed successfully") return analysis except Exception as e: logger.error(f"Synthesis failed: {str(e)}") return self._generate_fallback_analysis(classification, pdf_content) def _aggregate_by_domain( self, results: List[Dict[str, Any]] ) -> Dict[str, Any]: """Aggregate results by medical domain""" aggregated = {} for result in results: domain = result.get("domain", "general") if domain not in aggregated: aggregated[domain] = { "models": [], "findings": [], "confidence_scores": [] } aggregated[domain]["models"].append(result["model_name"]) # Extract findings from result result_data = result.get("result", {}) if "findings" in result_data: aggregated[domain]["findings"].append(result_data["findings"]) if "key_findings" in result_data: aggregated[domain]["findings"].extend(result_data["key_findings"]) if "analysis" in result_data: aggregated[domain]["findings"].append(result_data["analysis"]) confidence = result_data.get("confidence", 0.0) aggregated[domain]["confidence_scores"].append(confidence) # Calculate average confidence per domain for domain in aggregated: scores = aggregated[domain]["confidence_scores"] aggregated[domain]["average_confidence"] = sum(scores) / len(scores) if scores else 0.0 return aggregated def _generate_insights( self, aggregated_findings: Dict[str, Any], classification: Dict[str, Any], pdf_content: Dict[str, Any] ) -> List[Dict[str, str]]: """Generate clinical insights from aggregated findings""" insights = [] # Document structure insight page_count = pdf_content.get("page_count", 0) if page_count > 0: insights.append({ "category": "Document Structure", "insight": f"Document contains {page_count} pages with {'comprehensive' if page_count > 5 else 'standard'} documentation", "importance": "medium" }) # Classification insight doc_type = classification["document_type"] confidence = classification["confidence"] insights.append({ "category": "Document Classification", "insight": f"Document identified as {doc_type.replace('_', ' ').title()} with {confidence*100:.0f}% confidence", "importance": "high" }) # Domain-specific insights for domain, data in aggregated_findings.items(): avg_confidence = data.get("average_confidence", 0.0) model_count = len(data.get("models", [])) insights.append({ "category": domain.replace("_", " ").title(), "insight": f"Analysis completed by {model_count} specialized model(s) with {avg_confidence*100:.0f}% average confidence", "importance": "high" if avg_confidence > 0.8 else "medium" }) # Data richness insight has_images = pdf_content.get("images", []) has_tables = pdf_content.get("tables", []) if has_images: insights.append({ "category": "Multimodal Content", "insight": f"Document contains {len(has_images)} image(s) for enhanced analysis", "importance": "medium" }) if has_tables: insights.append({ "category": "Structured Data", "insight": f"Document contains {len(has_tables)} table(s) with structured information", "importance": "medium" }) return insights def _calculate_overall_confidence(self, results: List[Dict[str, Any]]) -> float: """Calculate weighted overall confidence score""" if not results: return 0.0 confidences = [] weights = [] for result in results: confidence = result.get("result", {}).get("confidence", 0.0) priority = result.get("priority", "secondary") # Weight by priority weight = 1.5 if priority == "primary" else 1.0 confidences.append(confidence) weights.append(weight) # Weighted average weighted_sum = sum(c * w for c, w in zip(confidences, weights)) total_weight = sum(weights) return weighted_sum / total_weight if total_weight > 0 else 0.0 def _generate_summary( self, classification: Dict[str, Any], aggregated_findings: Dict[str, Any], insights: List[Dict[str, str]] ) -> str: """Generate executive summary of analysis""" doc_type = classification["document_type"].replace("_", " ").title() summary_parts = [ f"Medical Document Analysis: {doc_type}", f"\nThis document has been processed through our comprehensive AI analysis pipeline using {len(aggregated_findings)} specialized medical AI domain(s).", ] # Add domain summaries for domain, data in aggregated_findings.items(): domain_name = domain.replace("_", " ").title() model_count = len(data.get("models", [])) avg_conf = data.get("average_confidence", 0.0) summary_parts.append( f"\n\n{domain_name}: Analyzed by {model_count} model(s) with {avg_conf*100:.0f}% confidence. " f"{'High confidence analysis completed.' if avg_conf > 0.8 else 'Analysis completed with moderate confidence.'}" ) # Add insights summary high_importance = [i for i in insights if i.get("importance") == "high"] if high_importance: summary_parts.append( f"\n\nKey Findings: {len(high_importance)} high-priority insights identified for clinical review." ) summary_parts.append( "\n\nThis analysis provides AI-assisted insights and should be reviewed by qualified healthcare professionals for clinical decision-making." ) return "".join(summary_parts) def _generate_recommendations( self, aggregated_findings: Dict[str, Any], classification: Dict[str, Any] ) -> List[Dict[str, str]]: """Generate recommendations based on analysis""" recommendations = [] # Classification-based recommendations doc_type = classification["document_type"] if doc_type == "radiology": recommendations.append({ "category": "Clinical Review", "recommendation": "Radiologist review recommended for imaging findings confirmation", "priority": "high" }) elif doc_type == "pathology": recommendations.append({ "category": "Clinical Review", "recommendation": "Pathologist verification required for tissue analysis", "priority": "high" }) elif doc_type == "laboratory": recommendations.append({ "category": "Clinical Review", "recommendation": "Review laboratory values in context of patient history", "priority": "medium" }) elif doc_type == "cardiology": recommendations.append({ "category": "Clinical Review", "recommendation": "Cardiologist review recommended for cardiac findings", "priority": "high" }) # General recommendations recommendations.append({ "category": "Data Quality", "recommendation": "All AI-generated insights should be validated by qualified healthcare professionals", "priority": "high" }) recommendations.append({ "category": "Documentation", "recommendation": "Maintain this analysis report with patient medical records", "priority": "medium" }) # Confidence-based recommendations low_confidence_domains = [ domain for domain, data in aggregated_findings.items() if data.get("average_confidence", 0.0) < 0.7 ] if low_confidence_domains: recommendations.append({ "category": "Analysis Quality", "recommendation": f"Lower confidence detected in {', '.join(low_confidence_domains)}. Consider manual review.", "priority": "medium" }) return recommendations def _generate_fallback_analysis( self, classification: Dict[str, Any], pdf_content: Dict[str, Any] ) -> Dict[str, Any]: """Generate fallback analysis when no models succeeded""" return { "document_type": classification["document_type"], "classification_confidence": classification["confidence"], "overall_confidence": 0.0, "summary": "Analysis could not be completed. Document was classified but specialized model processing failed.", "aggregated_findings": {}, "clinical_insights": [], "recommendations": [{ "category": "Manual Review", "recommendation": "Manual review required - automated analysis unavailable", "priority": "high" }], "models_used": [], "quality_metrics": { "models_executed": 0, "models_failed": 0, "overall_confidence": 0.0 }, "metadata": { "synthesis_timestamp": datetime.utcnow().isoformat(), "page_count": pdf_content.get("page_count", 0), "fallback": True } } def _early_fusion(self, results: List[Dict]) -> Dict: """Early fusion strategy - combine features before analysis""" pass def _late_fusion(self, results: List[Dict]) -> Dict: """Late fusion strategy - combine predictions after analysis""" pass def _weighted_fusion(self, results: List[Dict]) -> Dict: """Weighted fusion strategy - weight by model confidence""" pass