""" Medical Report Analysis Platform - Main Backend Application Comprehensive AI-powered medical document analysis with multi-model processing With HIPAA/GDPR Security & Compliance Features """ from fastapi import FastAPI, File, UploadFile, HTTPException, BackgroundTasks, Request, Depends from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, FileResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel from pathlib import Path from typing import List, Dict, Optional, Any, Literal import os import tempfile import logging from datetime import datetime import uuid # Import processing modules from pdf_processor import PDFProcessor from document_classifier import DocumentClassifier from model_router import ModelRouter from analysis_synthesizer import AnalysisSynthesizer from security import get_security_manager, ComplianceValidator, DataEncryption from clinical_synthesis_service import get_synthesis_service # Import monitoring and infrastructure modules from monitoring_service import get_monitoring_service from model_versioning import get_versioning_system from production_logging import get_medical_logger, EventCategory from compliance_reporting import get_compliance_system from admin_endpoints import admin_router # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Initialize FastAPI app app = FastAPI( title="Medical Report Analysis Platform", description="HIPAA/GDPR Compliant AI-powered medical document analysis", version="2.0.0" ) # CORS configuration app.add_middleware( CORSMiddleware, allow_origins=["*"], # Configure appropriately for production allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Add monitoring middleware @app.middleware("http") async def monitoring_middleware(request: Request, call_next): """ Monitoring middleware for request tracking and performance measurement Tracks: - Request latency - Error rates - Cache performance - Model performance """ start_time = datetime.utcnow() request_id = str(uuid.uuid4()) # Log request start medical_logger.info("Request received", category=EventCategory.SYSTEM_EVENT, details={ "request_id": request_id, "method": request.method, "path": request.url.path, "client": request.client.host if request.client else "unknown" }) try: # Process request response = await call_next(request) # Calculate latency end_time = datetime.utcnow() latency_ms = (end_time - start_time).total_seconds() * 1000 # Track metrics monitoring_service.track_request( endpoint=request.url.path, latency_ms=latency_ms, status_code=response.status_code ) # Log request completion medical_logger.info("Request completed", category=EventCategory.SYSTEM_EVENT, details={ "request_id": request_id, "method": request.method, "path": request.url.path, "status_code": response.status_code, "latency_ms": round(latency_ms, 2) }) return response except Exception as e: # Calculate latency for failed request end_time = datetime.utcnow() latency_ms = (end_time - start_time).total_seconds() * 1000 # Track error monitoring_service.track_error( endpoint=request.url.path, error_type=type(e).__name__, error_message=str(e) ) # Log error medical_logger.error("Request failed", category=EventCategory.ERROR_EVENT, details={"request_id": request_id, "method": request.method, "path": request.url.path, "error": str(e), "error_type": type(e).__name__, "latency_ms": round(latency_ms, 2)}) # Re-raise the exception raise # Mount static files (frontend) static_dir = Path(__file__).parent / "static" if static_dir.exists(): app.mount("/assets", StaticFiles(directory=static_dir / "assets"), name="assets") logger.info("Static files mounted successfully") # Initialize processing components pdf_processor = PDFProcessor() document_classifier = DocumentClassifier() model_router = ModelRouter() analysis_synthesizer = AnalysisSynthesizer() synthesis_service = get_synthesis_service() # Initialize security components security_manager = get_security_manager() compliance_validator = ComplianceValidator() data_encryption = DataEncryption() logger.info("Security and compliance features initialized") # Initialize monitoring and infrastructure services monitoring_service = get_monitoring_service() versioning_system = get_versioning_system() medical_logger = get_medical_logger("medical_ai_platform") compliance_system = get_compliance_system() logger.info("Monitoring and infrastructure services initialized") # Include admin router app.include_router(admin_router) # ================================ # STARTUP & MONITORING INITIALIZATION # ================================ @app.on_event("startup") async def startup_event(): """ Initialize all monitoring services and log system configuration on startup Ensures all infrastructure components are ready before accepting requests """ medical_logger.info("Starting Medical AI Platform initialization", category=EventCategory.SYSTEM_EVENT, details={ "version": "2.0.0", "timestamp": datetime.utcnow().isoformat() }) # Initialize monitoring service monitoring_service.start_monitoring() medical_logger.info("Monitoring service initialized", category=EventCategory.SYSTEM_EVENT, details={ "cache_enabled": True, "alert_threshold": 0.05 }) # Initialize versioning system - default models already loaded in constructor # Additional models can be registered here if needed model_versions = [ {"model_id": "bio_clinical_bert", "version": "1.0.0", "source": "HuggingFace"}, {"model_id": "biogpt", "version": "1.0.0", "source": "HuggingFace"}, {"model_id": "pubmed_bert", "version": "1.0.0", "source": "HuggingFace"}, {"model_id": "hubert_ecg", "version": "1.0.0", "source": "HuggingFace"}, {"model_id": "monai_unetr", "version": "1.0.0", "source": "HuggingFace"}, {"model_id": "medgemma_2b", "version": "1.0.0", "source": "HuggingFace"} ] # Register additional models using the correct method for model_config in model_versions: versioning_system.model_registry.register_model( model_id=model_config["model_id"], version=model_config["version"], model_name=model_config["model_id"].replace("_", " ").title(), model_path=f"huggingface/{model_config['model_id']}", metadata={"source": model_config["source"]} ) medical_logger.info("Model versioning initialized", category=EventCategory.SYSTEM_EVENT, details={ "total_models": len(model_versions) }) # Initialize compliance reporting medical_logger.info("Compliance reporting system initialized", category=EventCategory.COMPLIANCE_EVENT, details={ "standards": ["HIPAA", "GDPR"], "audit_enabled": True }) # Log system configuration system_config = { "environment": os.getenv("ENVIRONMENT", "production"), "gpu_available": os.getenv("CUDA_VISIBLE_DEVICES") is not None, "hf_token_configured": os.getenv("HF_TOKEN") is not None, "monitoring_enabled": True, "compliance_enabled": True, "versioning_enabled": True, "security_features": [ "PHI_removal", "audit_logging", "encryption_at_rest", "access_control" ] } medical_logger.info("System configuration loaded", category=EventCategory.SYSTEM_EVENT, details=system_config) # Test critical components try: health_status = monitoring_service.get_system_health() medical_logger.info("Health check successful", category=EventCategory.SYSTEM_EVENT, details={ "status": health_status["status"], "components_ready": True }) except Exception as e: medical_logger.error("Health check failed during startup", category=EventCategory.ERROR_EVENT, details={ "error": str(e) }) medical_logger.info("Medical AI Platform startup complete", category=EventCategory.SYSTEM_EVENT, details={ "status": "ready", "timestamp": datetime.utcnow().isoformat() }) # Check HF_TOKEN availability (optional for most models) HF_TOKEN = os.getenv("HF_TOKEN", None) if HF_TOKEN: logger.info("HF_TOKEN found - gated models available") else: logger.info("HF_TOKEN not configured - using public models (Bio_ClinicalBERT, BioGPT, etc.)") logger.info("This is normal - most HuggingFace models are public and don't require authentication") # Request/Response Models class AnalysisStatus(BaseModel): job_id: str status: str progress: float message: str class AnalysisResult(BaseModel): job_id: str document_type: str confidence: float analysis: Dict[str, Any] specialized_results: List[Dict[str, Any]] summary: str timestamp: str class HealthCheck(BaseModel): status: str version: str timestamp: str # In-memory job tracking (use Redis/database in production) job_tracker: Dict[str, Dict[str, Any]] = {} @app.get("/api", response_model=HealthCheck) async def api_root(): """API health check endpoint""" return HealthCheck( status="healthy", version="1.0.0", timestamp=datetime.utcnow().isoformat() ) @app.get("/") async def root(): """Serve frontend""" static_dir = Path(__file__).parent / "static" index_file = static_dir / "index.html" if index_file.exists(): return FileResponse(index_file) else: return {"message": "Medical Report Analysis Platform API", "version": "1.0.0"} @app.get("/health") async def health_check(): """Detailed health check with component status and monitoring""" system_health = monitoring_service.get_system_health() return { "status": system_health["status"], "components": { "pdf_processor": "ready", "classifier": "ready", "model_router": "ready", "synthesizer": "ready", "security": "ready", "compliance": "active", "monitoring": "active", "versioning": "active" }, "monitoring": { "uptime_seconds": system_health["uptime_seconds"], "error_rate": system_health["error_rate"], "active_alerts": system_health["active_alerts"], "critical_alerts": system_health["critical_alerts"] }, "timestamp": datetime.utcnow().isoformat() } @app.get("/health/dashboard") async def get_health_dashboard(): """ Comprehensive health dashboard with real-time monitoring metrics Returns: - System status and uptime - Pipeline health metrics - Model performance statistics - Error rates and alerts - Cache performance - Recent alerts and warnings - Compliance status Used by admin UI for real-time monitoring and system oversight """ try: # Get system health system_health = monitoring_service.get_system_health() # Get cache statistics cache_stats = monitoring_service.get_cache_statistics() # Get recent alerts recent_alerts = monitoring_service.get_recent_alerts(limit=10) # Get model performance metrics model_metrics = {} try: active_models = versioning_system.list_model_versions() for model_info in active_models[:10]: # Top 10 models model_id = model_info.get("model_id") if model_id: perf = versioning_system.get_model_performance(model_id) if perf: model_metrics[model_id] = { "version": model_info.get("version", "unknown"), "total_inferences": perf.get("total_inferences", 0), "avg_latency_ms": perf.get("avg_latency_ms", 0), "error_rate": perf.get("error_rate", 0.0), "last_used": perf.get("last_used", "never") } except Exception as e: medical_logger.warning("Failed to get model metrics", category=EventCategory.PERFORMANCE_EVENT, details={"error": str(e)}) # Get pipeline statistics pipeline_stats = { "total_jobs_processed": len(job_tracker), "completed_jobs": sum(1 for job in job_tracker.values() if job.get("status") == "completed"), "failed_jobs": sum(1 for job in job_tracker.values() if job.get("status") == "failed"), "processing_jobs": sum(1 for job in job_tracker.values() if job.get("status") == "processing"), "success_rate": 0.0 } if pipeline_stats["total_jobs_processed"] > 0: pipeline_stats["success_rate"] = ( pipeline_stats["completed_jobs"] / pipeline_stats["total_jobs_processed"] ) # Get synthesis statistics synthesis_stats = {} try: synthesis_stats = synthesis_service.get_synthesis_statistics() except Exception as e: medical_logger.warning("Failed to get synthesis stats", category=EventCategory.PERFORMANCE_EVENT, details={"error": str(e)}) # Compliance overview compliance_overview = { "hipaa_compliant": True, "gdpr_compliant": True, "audit_logging_active": True, "phi_removal_active": True, "encryption_enabled": True } # Construct comprehensive dashboard dashboard = { "status": "operational" if system_health["status"] == "healthy" else "degraded", "timestamp": datetime.utcnow().isoformat(), "system": { "uptime_seconds": system_health["uptime_seconds"], "uptime_human": f"{system_health['uptime_seconds'] // 3600}h {(system_health['uptime_seconds'] % 3600) // 60}m", "error_rate": system_health["error_rate"], "total_requests": system_health["total_requests"], "error_threshold": 0.05, "status": system_health["status"] }, "pipeline": pipeline_stats, "models": { "total_registered": len(model_metrics), "performance": model_metrics }, "synthesis": { "total_syntheses": synthesis_stats.get("total_syntheses", 0), "avg_confidence": synthesis_stats.get("avg_confidence", 0.0), "requiring_review": synthesis_stats.get("requiring_review", 0), "avg_processing_time_ms": synthesis_stats.get("avg_processing_time_ms", 0) }, "cache": { "total_entries": cache_stats.get("total_entries", 0), "hit_rate": cache_stats.get("hit_rate", 0.0), "hits": cache_stats.get("hits", 0), "misses": cache_stats.get("misses", 0), "memory_usage_mb": cache_stats.get("memory_usage_mb", 0), "avg_retrieval_time_ms": cache_stats.get("avg_retrieval_time_ms", 0) }, "alerts": { "active_count": system_health["active_alerts"], "critical_count": system_health["critical_alerts"], "recent": recent_alerts }, "compliance": compliance_overview, "components": { "pdf_processor": "operational", "document_classifier": "operational", "model_router": "operational", "synthesis_engine": "operational", "security_layer": "operational", "monitoring_system": "operational", "versioning_system": "operational", "compliance_reporting": "operational" } } return dashboard except Exception as e: medical_logger.error("Dashboard generation failed", category=EventCategory.ERROR_EVENT, details={"error": str(e), "timestamp": datetime.utcnow().isoformat()}) # Return minimal dashboard on error return { "status": "error", "timestamp": datetime.utcnow().isoformat(), "error": "Failed to generate complete dashboard", "message": str(e) } @app.get("/ai-models-health") async def ai_models_health_check(): """Check AI model loading status and performance""" try: # Test model loader from model_loader import get_model_loader model_loader = get_model_loader() # Test model loading test_result = await model_loader.test_model_loading() return { "status": "healthy" if test_result.get("models_loaded", 0) > 0 else "degraded", "ai_models": { "total_configured": test_result.get("total_models", 0), "successfully_loaded": test_result.get("models_loaded", 0), "failed_to_load": test_result.get("models_failed", 0), "loading_errors": test_result.get("errors", []), "device": test_result.get("device", "unknown"), "pytorch_version": test_result.get("pytorch_version", "unknown") }, "timestamp": datetime.utcnow().isoformat() } except Exception as e: return { "status": "error", "ai_models": { "error": str(e), "models_loaded": 0, "device": "unknown" }, "timestamp": datetime.utcnow().isoformat() } @app.get("/compliance-status") async def get_compliance_status(): """Get HIPAA/GDPR compliance status""" return compliance_validator.check_compliance() @app.post("/auth/login") async def login(email: str, password: str): """ User authentication endpoint In production, validate credentials against secure database """ # Demo authentication - in production, validate against database logger.warning("Demo authentication - implement secure auth in production") # For demo, accept any credentials user_id = str(uuid.uuid4()) token = security_manager.create_access_token(user_id, email) return { "access_token": token, "token_type": "bearer", "user_id": user_id, "email": email } @app.post("/analyze", response_model=AnalysisStatus) async def analyze_document( request: Request, file: UploadFile = File(...), background_tasks: BackgroundTasks = BackgroundTasks(), current_user: Dict[str, Any] = Depends(security_manager.get_current_user) ): """ Upload and analyze a medical document with audit logging This endpoint initiates the two-layer processing: - Layer 1: PDF extraction and classification - Layer 2: Specialized model analysis Security: Logs all PHI access for HIPAA compliance """ # Generate unique job ID job_id = str(uuid.uuid4()) # Audit log: Document upload client_ip = request.client.host if request.client else "unknown" security_manager.audit_logger.log_phi_access( user_id=current_user.get("user_id", "unknown"), document_id=job_id, action="UPLOAD", ip_address=client_ip ) # Validate file type if not file.filename.lower().endswith('.pdf'): raise HTTPException( status_code=400, detail="Only PDF files are supported" ) # Initialize job tracking job_tracker[job_id] = { "status": "processing", "progress": 0.0, "filename": file.filename, "user_id": current_user.get("user_id"), "created_at": datetime.utcnow().isoformat() } try: # Save uploaded file temporarily with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file: content = await file.read() tmp_file.write(content) tmp_file_path = tmp_file.name # Schedule background processing background_tasks.add_task( process_document_pipeline, job_id, tmp_file_path, file.filename, current_user.get("user_id") ) logger.info(f"Analysis job {job_id} created for file: {file.filename}") return AnalysisStatus( job_id=job_id, status="processing", progress=0.0, message="Document uploaded successfully. Analysis in progress." ) except Exception as e: logger.error(f"Error creating analysis job: {str(e)}") job_tracker[job_id]["status"] = "failed" job_tracker[job_id]["error"] = str(e) # Audit log: Failed upload security_manager.audit_logger.log_access( user_id=current_user.get("user_id", "unknown"), action="UPLOAD_FAILED", resource=f"document:{job_id}", ip_address=client_ip, status="FAILED", details={"error": str(e)} ) raise HTTPException(status_code=500, detail=f"Analysis failed: {str(e)}") @app.get("/status/{job_id}", response_model=AnalysisStatus) async def get_analysis_status(job_id: str): """Get the current status of an analysis job""" if job_id not in job_tracker: raise HTTPException(status_code=404, detail="Job not found") job_data = job_tracker[job_id] return AnalysisStatus( job_id=job_id, status=job_data["status"], progress=job_data.get("progress", 0.0), message=job_data.get("message", "Processing...") ) @app.get("/results/{job_id}", response_model=AnalysisResult) async def get_analysis_results(job_id: str): """Retrieve the analysis results for a completed job""" if job_id not in job_tracker: raise HTTPException(status_code=404, detail="Job not found") job_data = job_tracker[job_id] if job_data["status"] != "completed": raise HTTPException( status_code=400, detail=f"Analysis not completed. Current status: {job_data['status']}" ) return AnalysisResult(**job_data["result"]) @app.get("/supported-models") async def get_supported_models(): """Get list of supported medical AI models by domain""" return { "domains": { "clinical_notes": { "models": ["MedGemma 27B", "Bio_ClinicalBERT"], "tasks": ["summarization", "entity_extraction", "coding"] }, "radiology": { "models": ["MedGemma 4B Multimodal", "MONAI"], "tasks": ["vqa", "report_generation", "segmentation"] }, "pathology": { "models": ["Path Foundation", "UNI2-h"], "tasks": ["slide_classification", "embedding_generation"] }, "cardiology": { "models": ["HuBERT-ECG"], "tasks": ["ecg_analysis", "event_prediction"] }, "laboratory": { "models": ["DrLlama", "Lab-AI"], "tasks": ["normalization", "explanation"] }, "drug_interactions": { "models": ["CatBoost DDI", "DrugGen"], "tasks": ["interaction_classification"] }, "diagnosis": { "models": ["MedGemma 27B"], "tasks": ["differential_diagnosis", "triage"] }, "coding": { "models": ["Rayyan Med Coding", "ICD-10 Predictors"], "tasks": ["icd10_extraction", "cpt_coding"] }, "mental_health": { "models": ["MentalBERT"], "tasks": ["screening", "sentiment_analysis"] } } } async def process_document_pipeline(job_id: str, file_path: str, filename: str, user_id: str = "unknown"): """ Background task for processing medical documents through the full pipeline Pipeline stages: 1. PDF Extraction (text, images, tables) 2. Document Classification 3. Intelligent Routing 4. Specialized Model Analysis 5. Result Synthesis Security: All stages logged for HIPAA compliance """ try: # Stage 1: PDF Processing job_tracker[job_id]["progress"] = 0.1 job_tracker[job_id]["message"] = "Extracting content from PDF..." logger.info(f"Job {job_id}: Starting PDF extraction") pdf_content = await pdf_processor.extract_content(file_path) # Stage 2: Document Classification job_tracker[job_id]["progress"] = 0.3 job_tracker[job_id]["message"] = "Classifying document type..." logger.info(f"Job {job_id}: Classifying document") classification = await document_classifier.classify(pdf_content) # Audit log: Classification complete security_manager.audit_logger.log_phi_access( user_id=user_id, document_id=job_id, action="CLASSIFY", ip_address="internal" ) # Stage 3: Model Routing job_tracker[job_id]["progress"] = 0.4 job_tracker[job_id]["message"] = "Routing to specialized models..." logger.info(f"Job {job_id}: Routing to models - {classification['document_type']}") model_tasks = model_router.route(classification, pdf_content) # Stage 4: Specialized Analysis job_tracker[job_id]["progress"] = 0.5 job_tracker[job_id]["message"] = "Running specialized analysis..." logger.info(f"Job {job_id}: Running {len(model_tasks)} specialized models") specialized_results = [] for i, task in enumerate(model_tasks): result = await model_router.execute_task(task) specialized_results.append(result) progress = 0.5 + (0.3 * (i + 1) / len(model_tasks)) job_tracker[job_id]["progress"] = progress # Stage 5: Result Synthesis job_tracker[job_id]["progress"] = 0.9 job_tracker[job_id]["message"] = "Synthesizing results..." logger.info(f"Job {job_id}: Synthesizing results") final_analysis = await analysis_synthesizer.synthesize( classification, specialized_results, pdf_content ) # Complete job_tracker[job_id]["progress"] = 1.0 job_tracker[job_id]["status"] = "completed" job_tracker[job_id]["message"] = "Analysis complete" job_tracker[job_id]["result"] = { "job_id": job_id, "document_type": classification["document_type"], "confidence": classification["confidence"], "analysis": final_analysis, "specialized_results": specialized_results, "summary": final_analysis.get("summary", ""), "timestamp": datetime.utcnow().isoformat() } logger.info(f"Job {job_id}: Analysis completed successfully") # Audit log: Analysis complete security_manager.audit_logger.log_phi_access( user_id=user_id, document_id=job_id, action="ANALYSIS_COMPLETE", ip_address="internal" ) # Secure cleanup of temporary file data_encryption.secure_delete(file_path) except Exception as e: logger.error(f"Job {job_id}: Analysis failed - {str(e)}") job_tracker[job_id]["status"] = "failed" job_tracker[job_id]["message"] = f"Analysis failed: {str(e)}" job_tracker[job_id]["error"] = str(e) # Audit log: Analysis failed security_manager.audit_logger.log_access( user_id=user_id, action="ANALYSIS_FAILED", resource=f"document:{job_id}", ip_address="internal", status="FAILED", details={"error": str(e)} ) # Cleanup on error if os.path.exists(file_path): data_encryption.secure_delete(file_path) # ================================ # CLINICAL SYNTHESIS ENDPOINTS # ================================ class SynthesisRequest(BaseModel): """Request model for clinical synthesis""" modality: str structured_data: Dict[str, Any] model_outputs: List[Dict[str, Any]] = [] summary_type: Literal["clinician", "patient"] = "clinician" class MultiModalSynthesisRequest(BaseModel): """Request model for multi-modal synthesis""" modalities_data: Dict[str, Dict[str, Any]] summary_type: Literal["clinician", "patient"] = "clinician" @app.post("/synthesize") async def synthesize_clinical_summary( request: SynthesisRequest, current_user: Dict[str, Any] = Depends(security_manager.get_current_user) ): """ Generate clinical summary from structured medical data Supports: - Clinician-level technical summaries - Patient-friendly explanations - Confidence-based recommendations - All medical modalities (ECG, radiology, laboratory, clinical notes) Security: Requires authentication, logs all synthesis requests """ try: user_id = current_user.get("user_id", "unknown") logger.info(f"Synthesis request from user {user_id}: {request.modality} ({request.summary_type})") # Audit log security_manager.audit_logger.log_access( user_id=user_id, action="SYNTHESIS_REQUEST", resource=f"synthesis:{request.modality}", ip_address="internal", status="INITIATED", details={"summary_type": request.summary_type} ) # Perform synthesis result = await synthesis_service.synthesize_clinical_summary( modality=request.modality, structured_data=request.structured_data, model_outputs=request.model_outputs, summary_type=request.summary_type, user_id=user_id ) # Audit log: Success security_manager.audit_logger.log_access( user_id=user_id, action="SYNTHESIS_COMPLETE", resource=f"synthesis:{result.get('synthesis_id')}", ip_address="internal", status="SUCCESS", details={ "confidence": result.get("confidence_scores", {}).get("overall_confidence", 0.0), "requires_review": result.get("requires_review", False) } ) return result except Exception as e: logger.error(f"Synthesis failed: {str(e)}") # Audit log: Failure security_manager.audit_logger.log_access( user_id=current_user.get("user_id", "unknown"), action="SYNTHESIS_FAILED", resource=f"synthesis:{request.modality}", ip_address="internal", status="FAILED", details={"error": str(e)} ) raise HTTPException(status_code=500, detail=f"Synthesis failed: {str(e)}") @app.post("/synthesize/multi-modal") async def synthesize_multi_modal( request: MultiModalSynthesisRequest, current_user: Dict[str, Any] = Depends(security_manager.get_current_user) ): """ Generate integrated clinical summary from multiple medical modalities Combines ECG, radiology, laboratory, and clinical notes into unified assessment Security: Requires authentication, logs all synthesis requests """ try: user_id = current_user.get("user_id", "unknown") modalities = list(request.modalities_data.keys()) logger.info(f"Multi-modal synthesis request from user {user_id}: {modalities}") # Audit log security_manager.audit_logger.log_access( user_id=user_id, action="MULTI_MODAL_SYNTHESIS", resource=f"synthesis:multi-modal", ip_address="internal", status="INITIATED", details={"modalities": modalities, "summary_type": request.summary_type} ) # Perform multi-modal synthesis result = await synthesis_service.synthesize_multi_modal( modalities_data=request.modalities_data, summary_type=request.summary_type, user_id=user_id ) # Audit log: Success security_manager.audit_logger.log_access( user_id=user_id, action="MULTI_MODAL_SYNTHESIS_COMPLETE", resource=f"synthesis:multi-modal", ip_address="internal", status="SUCCESS", details={ "modalities": modalities, "overall_confidence": result.get("overall_confidence", 0.0) } ) return result except Exception as e: logger.error(f"Multi-modal synthesis failed: {str(e)}") # Audit log: Failure security_manager.audit_logger.log_access( user_id=current_user.get("user_id", "unknown"), action="MULTI_MODAL_SYNTHESIS_FAILED", resource=f"synthesis:multi-modal", ip_address="internal", status="FAILED", details={"error": str(e)} ) raise HTTPException(status_code=500, detail=f"Multi-modal synthesis failed: {str(e)}") @app.get("/synthesize/history") async def get_synthesis_history( limit: int = 100, current_user: Dict[str, Any] = Depends(security_manager.get_current_user) ): """ Get synthesis history for audit purposes Security: Returns only current user's synthesis history """ user_id = current_user.get("user_id", "unknown") history = synthesis_service.get_synthesis_history(user_id=user_id, limit=limit) return { "user_id": user_id, "total_syntheses": len(history), "history": history } @app.get("/synthesize/statistics") async def get_synthesis_statistics( current_user: Dict[str, Any] = Depends(security_manager.get_current_user) ): """ Get synthesis service usage statistics Provides insights into: - Total syntheses performed - Average confidence scores - Review requirements - Processing times """ stats = synthesis_service.get_synthesis_statistics() return { "statistics": stats, "timestamp": datetime.utcnow().isoformat() } # ================================ # END CLINICAL SYNTHESIS ENDPOINTS # ================================ # Catch-all route for React Router (single-page application) - MUST BE LAST @app.get("/{full_path:path}") async def serve_react_app(full_path: str): """Serve React app for any non-API routes""" static_dir = Path(__file__).parent / "static" index_file = static_dir / "index.html" # Check if this is an API route or static file if (full_path.startswith(('api', 'health', 'analyze', 'status', 'results', 'supported-models', 'compliance-status', 'assets'))): raise HTTPException(status_code=404, detail="API endpoint not found") # Serve React app for everything else (client-side routing) if index_file.exists(): return FileResponse(index_file) else: raise HTTPException(status_code=404, detail="React app not found") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)