File size: 14,910 Bytes
13d5ab4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
"""
Analysis Synthesizer - Result Aggregation and Synthesis
Combines outputs from multiple specialized models
"""

import logging
from typing import Dict, List, Any, Optional
from datetime import datetime

logger = logging.getLogger(__name__)


class AnalysisSynthesizer:
    """
    Synthesizes results from multiple specialized models into
    a comprehensive medical document analysis
    
    Implements:
    - Result aggregation
    - Conflict resolution
    - Confidence calibration
    - Clinical insights generation
    """
    
    def __init__(self):
        self.fusion_strategies = {
            "early": self._early_fusion,
            "late": self._late_fusion,
            "weighted": self._weighted_fusion
        }
        logger.info("Analysis Synthesizer initialized")
    
    async def synthesize(
        self,
        classification: Dict[str, Any],
        specialized_results: List[Dict[str, Any]],
        pdf_content: Dict[str, Any]
    ) -> Dict[str, Any]:
        """
        Synthesize results from multiple models
        
        Returns comprehensive analysis with:
        - Aggregated findings
        - Key insights
        - Recommendations
        - Risk assessment
        - Confidence scores
        """
        try:
            logger.info(f"Synthesizing {len(specialized_results)} model results")
            
            # Extract successful results
            successful_results = [
                r for r in specialized_results
                if r.get("status") == "completed"
            ]
            
            if not successful_results:
                return self._generate_fallback_analysis(classification, pdf_content)
            
            # Aggregate findings by domain
            aggregated_findings = self._aggregate_by_domain(successful_results)
            
            # Generate clinical insights
            insights = self._generate_insights(
                aggregated_findings,
                classification,
                pdf_content
            )
            
            # Calculate overall confidence
            overall_confidence = self._calculate_overall_confidence(successful_results)
            
            # Generate summary
            summary = self._generate_summary(
                classification,
                aggregated_findings,
                insights
            )
            
            # Generate recommendations
            recommendations = self._generate_recommendations(
                aggregated_findings,
                classification
            )
            
            # Compile final analysis
            analysis = {
                "document_type": classification["document_type"],
                "classification_confidence": classification["confidence"],
                "overall_confidence": overall_confidence,
                "summary": summary,
                "aggregated_findings": aggregated_findings,
                "clinical_insights": insights,
                "recommendations": recommendations,
                "models_used": [
                    {
                        "model": r["model_name"],
                        "domain": r["domain"],
                        "confidence": r.get("result", {}).get("confidence", 0.0)
                    }
                    for r in successful_results
                ],
                "quality_metrics": {
                    "models_executed": len(successful_results),
                    "models_failed": len(specialized_results) - len(successful_results),
                    "overall_confidence": overall_confidence
                },
                "metadata": {
                    "synthesis_timestamp": datetime.utcnow().isoformat(),
                    "page_count": pdf_content.get("page_count", 0),
                    "has_images": len(pdf_content.get("images", [])) > 0,
                    "has_tables": len(pdf_content.get("tables", [])) > 0
                }
            }
            
            logger.info("Synthesis completed successfully")
            
            return analysis
            
        except Exception as e:
            logger.error(f"Synthesis failed: {str(e)}")
            return self._generate_fallback_analysis(classification, pdf_content)
    
    def _aggregate_by_domain(
        self,
        results: List[Dict[str, Any]]
    ) -> Dict[str, Any]:
        """Aggregate results by medical domain"""
        aggregated = {}
        
        for result in results:
            domain = result.get("domain", "general")
            
            if domain not in aggregated:
                aggregated[domain] = {
                    "models": [],
                    "findings": [],
                    "confidence_scores": []
                }
            
            aggregated[domain]["models"].append(result["model_name"])
            
            # Extract findings from result
            result_data = result.get("result", {})
            
            if "findings" in result_data:
                aggregated[domain]["findings"].append(result_data["findings"])
            
            if "key_findings" in result_data:
                aggregated[domain]["findings"].extend(result_data["key_findings"])
            
            if "analysis" in result_data:
                aggregated[domain]["findings"].append(result_data["analysis"])
            
            confidence = result_data.get("confidence", 0.0)
            aggregated[domain]["confidence_scores"].append(confidence)
        
        # Calculate average confidence per domain
        for domain in aggregated:
            scores = aggregated[domain]["confidence_scores"]
            aggregated[domain]["average_confidence"] = sum(scores) / len(scores) if scores else 0.0
        
        return aggregated
    
    def _generate_insights(
        self,
        aggregated_findings: Dict[str, Any],
        classification: Dict[str, Any],
        pdf_content: Dict[str, Any]
    ) -> List[Dict[str, str]]:
        """Generate clinical insights from aggregated findings"""
        insights = []
        
        # Document structure insight
        page_count = pdf_content.get("page_count", 0)
        if page_count > 0:
            insights.append({
                "category": "Document Structure",
                "insight": f"Document contains {page_count} pages with {'comprehensive' if page_count > 5 else 'standard'} documentation",
                "importance": "medium"
            })
        
        # Classification insight
        doc_type = classification["document_type"]
        confidence = classification["confidence"]
        insights.append({
            "category": "Document Classification",
            "insight": f"Document identified as {doc_type.replace('_', ' ').title()} with {confidence*100:.0f}% confidence",
            "importance": "high"
        })
        
        # Domain-specific insights
        for domain, data in aggregated_findings.items():
            avg_confidence = data.get("average_confidence", 0.0)
            model_count = len(data.get("models", []))
            
            insights.append({
                "category": domain.replace("_", " ").title(),
                "insight": f"Analysis completed by {model_count} specialized model(s) with {avg_confidence*100:.0f}% average confidence",
                "importance": "high" if avg_confidence > 0.8 else "medium"
            })
        
        # Data richness insight
        has_images = pdf_content.get("images", [])
        has_tables = pdf_content.get("tables", [])
        
        if has_images:
            insights.append({
                "category": "Multimodal Content",
                "insight": f"Document contains {len(has_images)} image(s) for enhanced analysis",
                "importance": "medium"
            })
        
        if has_tables:
            insights.append({
                "category": "Structured Data",
                "insight": f"Document contains {len(has_tables)} table(s) with structured information",
                "importance": "medium"
            })
        
        return insights
    
    def _calculate_overall_confidence(self, results: List[Dict[str, Any]]) -> float:
        """Calculate weighted overall confidence score"""
        if not results:
            return 0.0
        
        confidences = []
        weights = []
        
        for result in results:
            confidence = result.get("result", {}).get("confidence", 0.0)
            priority = result.get("priority", "secondary")
            
            # Weight by priority
            weight = 1.5 if priority == "primary" else 1.0
            
            confidences.append(confidence)
            weights.append(weight)
        
        # Weighted average
        weighted_sum = sum(c * w for c, w in zip(confidences, weights))
        total_weight = sum(weights)
        
        return weighted_sum / total_weight if total_weight > 0 else 0.0
    
    def _generate_summary(
        self,
        classification: Dict[str, Any],
        aggregated_findings: Dict[str, Any],
        insights: List[Dict[str, str]]
    ) -> str:
        """Generate executive summary of analysis"""
        doc_type = classification["document_type"].replace("_", " ").title()
        
        summary_parts = [
            f"Medical Document Analysis: {doc_type}",
            f"\nThis document has been processed through our comprehensive AI analysis pipeline using {len(aggregated_findings)} specialized medical AI domain(s).",
        ]
        
        # Add domain summaries
        for domain, data in aggregated_findings.items():
            domain_name = domain.replace("_", " ").title()
            model_count = len(data.get("models", []))
            avg_conf = data.get("average_confidence", 0.0)
            
            summary_parts.append(
                f"\n\n{domain_name}: Analyzed by {model_count} model(s) with {avg_conf*100:.0f}% confidence. "
                f"{'High confidence analysis completed.' if avg_conf > 0.8 else 'Analysis completed with moderate confidence.'}"
            )
        
        # Add insights summary
        high_importance = [i for i in insights if i.get("importance") == "high"]
        if high_importance:
            summary_parts.append(
                f"\n\nKey Findings: {len(high_importance)} high-priority insights identified for clinical review."
            )
        
        summary_parts.append(
            "\n\nThis analysis provides AI-assisted insights and should be reviewed by qualified healthcare professionals for clinical decision-making."
        )
        
        return "".join(summary_parts)
    
    def _generate_recommendations(
        self,
        aggregated_findings: Dict[str, Any],
        classification: Dict[str, Any]
    ) -> List[Dict[str, str]]:
        """Generate recommendations based on analysis"""
        recommendations = []
        
        # Classification-based recommendations
        doc_type = classification["document_type"]
        
        if doc_type == "radiology":
            recommendations.append({
                "category": "Clinical Review",
                "recommendation": "Radiologist review recommended for imaging findings confirmation",
                "priority": "high"
            })
        
        elif doc_type == "pathology":
            recommendations.append({
                "category": "Clinical Review",
                "recommendation": "Pathologist verification required for tissue analysis",
                "priority": "high"
            })
        
        elif doc_type == "laboratory":
            recommendations.append({
                "category": "Clinical Review",
                "recommendation": "Review laboratory values in context of patient history",
                "priority": "medium"
            })
        
        elif doc_type == "cardiology":
            recommendations.append({
                "category": "Clinical Review",
                "recommendation": "Cardiologist review recommended for cardiac findings",
                "priority": "high"
            })
        
        # General recommendations
        recommendations.append({
            "category": "Data Quality",
            "recommendation": "All AI-generated insights should be validated by qualified healthcare professionals",
            "priority": "high"
        })
        
        recommendations.append({
            "category": "Documentation",
            "recommendation": "Maintain this analysis report with patient medical records",
            "priority": "medium"
        })
        
        # Confidence-based recommendations
        low_confidence_domains = [
            domain for domain, data in aggregated_findings.items()
            if data.get("average_confidence", 0.0) < 0.7
        ]
        
        if low_confidence_domains:
            recommendations.append({
                "category": "Analysis Quality",
                "recommendation": f"Lower confidence detected in {', '.join(low_confidence_domains)}. Consider manual review.",
                "priority": "medium"
            })
        
        return recommendations
    
    def _generate_fallback_analysis(
        self,
        classification: Dict[str, Any],
        pdf_content: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Generate fallback analysis when no models succeeded"""
        return {
            "document_type": classification["document_type"],
            "classification_confidence": classification["confidence"],
            "overall_confidence": 0.0,
            "summary": "Analysis could not be completed. Document was classified but specialized model processing failed.",
            "aggregated_findings": {},
            "clinical_insights": [],
            "recommendations": [{
                "category": "Manual Review",
                "recommendation": "Manual review required - automated analysis unavailable",
                "priority": "high"
            }],
            "models_used": [],
            "quality_metrics": {
                "models_executed": 0,
                "models_failed": 0,
                "overall_confidence": 0.0
            },
            "metadata": {
                "synthesis_timestamp": datetime.utcnow().isoformat(),
                "page_count": pdf_content.get("page_count", 0),
                "fallback": True
            }
        }
    
    def _early_fusion(self, results: List[Dict]) -> Dict:
        """Early fusion strategy - combine features before analysis"""
        pass
    
    def _late_fusion(self, results: List[Dict]) -> Dict:
        """Late fusion strategy - combine predictions after analysis"""
        pass
    
    def _weighted_fusion(self, results: List[Dict]) -> Dict:
        """Weighted fusion strategy - weight by model confidence"""
        pass