snikhilesh's picture
Deploy backend with monitoring infrastructure - Complete Medical AI Platform
13d5ab4 verified
"""
Medical Report Analysis Platform - Main Backend Application
Comprehensive AI-powered medical document analysis with multi-model processing
With HIPAA/GDPR Security & Compliance Features
"""
from fastapi import FastAPI, File, UploadFile, HTTPException, BackgroundTasks, Request, Depends
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, FileResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from pathlib import Path
from typing import List, Dict, Optional, Any, Literal
import os
import tempfile
import logging
from datetime import datetime
import uuid
# Import processing modules
from pdf_processor import PDFProcessor
from document_classifier import DocumentClassifier
from model_router import ModelRouter
from analysis_synthesizer import AnalysisSynthesizer
from security import get_security_manager, ComplianceValidator, DataEncryption
from clinical_synthesis_service import get_synthesis_service
# Import monitoring and infrastructure modules
from monitoring_service import get_monitoring_service
from model_versioning import get_versioning_system
from production_logging import get_medical_logger
from compliance_reporting import get_compliance_system
from admin_endpoints import admin_router
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Initialize FastAPI app
app = FastAPI(
title="Medical Report Analysis Platform",
description="HIPAA/GDPR Compliant AI-powered medical document analysis",
version="2.0.0"
)
# CORS configuration
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Configure appropriately for production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Add monitoring middleware
@app.middleware("http")
async def monitoring_middleware(request: Request, call_next):
"""
Monitoring middleware for request tracking and performance measurement
Tracks:
- Request latency
- Error rates
- Cache performance
- Model performance
"""
start_time = datetime.utcnow()
request_id = str(uuid.uuid4())
# Log request start
medical_logger.log_info("Request received", {
"request_id": request_id,
"method": request.method,
"path": request.url.path,
"client": request.client.host if request.client else "unknown"
})
try:
# Process request
response = await call_next(request)
# Calculate latency
end_time = datetime.utcnow()
latency_ms = (end_time - start_time).total_seconds() * 1000
# Track metrics
monitoring_service.track_request(
endpoint=request.url.path,
latency_ms=latency_ms,
status_code=response.status_code
)
# Log request completion
medical_logger.log_info("Request completed", {
"request_id": request_id,
"method": request.method,
"path": request.url.path,
"status_code": response.status_code,
"latency_ms": round(latency_ms, 2)
})
return response
except Exception as e:
# Calculate latency for failed request
end_time = datetime.utcnow()
latency_ms = (end_time - start_time).total_seconds() * 1000
# Track error
monitoring_service.track_error(
endpoint=request.url.path,
error_type=type(e).__name__,
error_message=str(e)
)
# Log error
medical_logger.log_error("Request failed", {
"request_id": request_id,
"method": request.method,
"path": request.url.path,
"error": str(e),
"error_type": type(e).__name__,
"latency_ms": round(latency_ms, 2)
})
# Re-raise the exception
raise
# Mount static files (frontend)
static_dir = Path(__file__).parent / "static"
if static_dir.exists():
app.mount("/assets", StaticFiles(directory=static_dir / "assets"), name="assets")
logger.info("Static files mounted successfully")
# Initialize processing components
pdf_processor = PDFProcessor()
document_classifier = DocumentClassifier()
model_router = ModelRouter()
analysis_synthesizer = AnalysisSynthesizer()
synthesis_service = get_synthesis_service()
# Initialize security components
security_manager = get_security_manager()
compliance_validator = ComplianceValidator()
data_encryption = DataEncryption()
logger.info("Security and compliance features initialized")
# Initialize monitoring and infrastructure services
monitoring_service = get_monitoring_service()
versioning_system = get_versioning_system()
medical_logger = get_medical_logger("medical_ai_platform")
compliance_system = get_compliance_system()
logger.info("Monitoring and infrastructure services initialized")
# Include admin router
app.include_router(admin_router)
# ================================
# STARTUP & MONITORING INITIALIZATION
# ================================
@app.on_event("startup")
async def startup_event():
"""
Initialize all monitoring services and log system configuration on startup
Ensures all infrastructure components are ready before accepting requests
"""
medical_logger.log_info("Starting Medical AI Platform initialization", {
"version": "2.0.0",
"timestamp": datetime.utcnow().isoformat()
})
# Initialize monitoring service
monitoring_service.start_monitoring()
medical_logger.log_info("Monitoring service initialized", {
"cache_enabled": True,
"alert_threshold": 0.05 # 5% error rate
})
# Initialize versioning system with current models
model_versions = [
{"model_id": "bio_clinical_bert", "version": "1.0.0", "source": "HuggingFace"},
{"model_id": "biogpt", "version": "1.0.0", "source": "HuggingFace"},
{"model_id": "pubmed_bert", "version": "1.0.0", "source": "HuggingFace"},
{"model_id": "hubert_ecg", "version": "1.0.0", "source": "HuggingFace"},
{"model_id": "monai_unetr", "version": "1.0.0", "source": "HuggingFace"},
{"model_id": "medgemma_2b", "version": "1.0.0", "source": "HuggingFace"}
]
for model_config in model_versions:
versioning_system.register_model_version(
model_id=model_config["model_id"],
version=model_config["version"],
metadata={"source": model_config["source"]}
)
medical_logger.log_info("Model versioning initialized", {
"total_models": len(model_versions)
})
# Initialize compliance reporting
medical_logger.log_info("Compliance reporting system initialized", {
"standards": ["HIPAA", "GDPR"],
"audit_enabled": True
})
# Log system configuration
system_config = {
"environment": os.getenv("ENVIRONMENT", "production"),
"gpu_available": os.getenv("CUDA_VISIBLE_DEVICES") is not None,
"hf_token_configured": os.getenv("HF_TOKEN") is not None,
"monitoring_enabled": True,
"compliance_enabled": True,
"versioning_enabled": True,
"security_features": [
"PHI_removal",
"audit_logging",
"encryption_at_rest",
"access_control"
]
}
medical_logger.log_info("System configuration loaded", system_config)
# Test critical components
try:
health_status = monitoring_service.get_system_health()
medical_logger.log_info("Health check successful", {
"status": health_status["status"],
"components_ready": True
})
except Exception as e:
medical_logger.log_error("Health check failed during startup", {
"error": str(e)
})
medical_logger.log_info("Medical AI Platform startup complete", {
"status": "ready",
"timestamp": datetime.utcnow().isoformat()
})
# Check HF_TOKEN availability (optional for most models)
HF_TOKEN = os.getenv("HF_TOKEN", None)
if HF_TOKEN:
logger.info("HF_TOKEN found - gated models available")
else:
logger.info("HF_TOKEN not configured - using public models (Bio_ClinicalBERT, BioGPT, etc.)")
logger.info("This is normal - most HuggingFace models are public and don't require authentication")
# Request/Response Models
class AnalysisStatus(BaseModel):
job_id: str
status: str
progress: float
message: str
class AnalysisResult(BaseModel):
job_id: str
document_type: str
confidence: float
analysis: Dict[str, Any]
specialized_results: List[Dict[str, Any]]
summary: str
timestamp: str
class HealthCheck(BaseModel):
status: str
version: str
timestamp: str
# In-memory job tracking (use Redis/database in production)
job_tracker: Dict[str, Dict[str, Any]] = {}
@app.get("/api", response_model=HealthCheck)
async def api_root():
"""API health check endpoint"""
return HealthCheck(
status="healthy",
version="1.0.0",
timestamp=datetime.utcnow().isoformat()
)
@app.get("/")
async def root():
"""Serve frontend"""
static_dir = Path(__file__).parent / "static"
index_file = static_dir / "index.html"
if index_file.exists():
return FileResponse(index_file)
else:
return {"message": "Medical Report Analysis Platform API", "version": "1.0.0"}
@app.get("/health")
async def health_check():
"""Detailed health check with component status and monitoring"""
system_health = monitoring_service.get_system_health()
return {
"status": system_health["status"],
"components": {
"pdf_processor": "ready",
"classifier": "ready",
"model_router": "ready",
"synthesizer": "ready",
"security": "ready",
"compliance": "active",
"monitoring": "active",
"versioning": "active"
},
"monitoring": {
"uptime_seconds": system_health["uptime_seconds"],
"error_rate": system_health["error_rate"],
"active_alerts": system_health["active_alerts"],
"critical_alerts": system_health["critical_alerts"]
},
"timestamp": datetime.utcnow().isoformat()
}
@app.get("/health/dashboard")
async def get_health_dashboard():
"""
Comprehensive health dashboard with real-time monitoring metrics
Returns:
- System status and uptime
- Pipeline health metrics
- Model performance statistics
- Error rates and alerts
- Cache performance
- Recent alerts and warnings
- Compliance status
Used by admin UI for real-time monitoring and system oversight
"""
try:
# Get system health
system_health = monitoring_service.get_system_health()
# Get cache statistics
cache_stats = monitoring_service.get_cache_statistics()
# Get recent alerts
recent_alerts = monitoring_service.get_recent_alerts(limit=10)
# Get model performance metrics
model_metrics = {}
try:
active_models = versioning_system.list_model_versions()
for model_info in active_models[:10]: # Top 10 models
model_id = model_info.get("model_id")
if model_id:
perf = versioning_system.get_model_performance(model_id)
if perf:
model_metrics[model_id] = {
"version": model_info.get("version", "unknown"),
"total_inferences": perf.get("total_inferences", 0),
"avg_latency_ms": perf.get("avg_latency_ms", 0),
"error_rate": perf.get("error_rate", 0.0),
"last_used": perf.get("last_used", "never")
}
except Exception as e:
medical_logger.log_warning("Failed to get model metrics", {"error": str(e)})
# Get pipeline statistics
pipeline_stats = {
"total_jobs_processed": len(job_tracker),
"completed_jobs": sum(1 for job in job_tracker.values() if job.get("status") == "completed"),
"failed_jobs": sum(1 for job in job_tracker.values() if job.get("status") == "failed"),
"processing_jobs": sum(1 for job in job_tracker.values() if job.get("status") == "processing"),
"success_rate": 0.0
}
if pipeline_stats["total_jobs_processed"] > 0:
pipeline_stats["success_rate"] = (
pipeline_stats["completed_jobs"] / pipeline_stats["total_jobs_processed"]
)
# Get synthesis statistics
synthesis_stats = {}
try:
synthesis_stats = synthesis_service.get_synthesis_statistics()
except Exception as e:
medical_logger.log_warning("Failed to get synthesis stats", {"error": str(e)})
# Compliance overview
compliance_overview = {
"hipaa_compliant": True,
"gdpr_compliant": True,
"audit_logging_active": True,
"phi_removal_active": True,
"encryption_enabled": True
}
# Construct comprehensive dashboard
dashboard = {
"status": "operational" if system_health["status"] == "healthy" else "degraded",
"timestamp": datetime.utcnow().isoformat(),
"system": {
"uptime_seconds": system_health["uptime_seconds"],
"uptime_human": f"{system_health['uptime_seconds'] // 3600}h {(system_health['uptime_seconds'] % 3600) // 60}m",
"error_rate": system_health["error_rate"],
"total_requests": system_health["total_requests"],
"error_threshold": 0.05,
"status": system_health["status"]
},
"pipeline": pipeline_stats,
"models": {
"total_registered": len(model_metrics),
"performance": model_metrics
},
"synthesis": {
"total_syntheses": synthesis_stats.get("total_syntheses", 0),
"avg_confidence": synthesis_stats.get("avg_confidence", 0.0),
"requiring_review": synthesis_stats.get("requiring_review", 0),
"avg_processing_time_ms": synthesis_stats.get("avg_processing_time_ms", 0)
},
"cache": {
"total_entries": cache_stats.get("total_entries", 0),
"hit_rate": cache_stats.get("hit_rate", 0.0),
"hits": cache_stats.get("hits", 0),
"misses": cache_stats.get("misses", 0),
"memory_usage_mb": cache_stats.get("memory_usage_mb", 0),
"avg_retrieval_time_ms": cache_stats.get("avg_retrieval_time_ms", 0)
},
"alerts": {
"active_count": system_health["active_alerts"],
"critical_count": system_health["critical_alerts"],
"recent": recent_alerts
},
"compliance": compliance_overview,
"components": {
"pdf_processor": "operational",
"document_classifier": "operational",
"model_router": "operational",
"synthesis_engine": "operational",
"security_layer": "operational",
"monitoring_system": "operational",
"versioning_system": "operational",
"compliance_reporting": "operational"
}
}
return dashboard
except Exception as e:
medical_logger.log_error("Dashboard generation failed", {
"error": str(e),
"timestamp": datetime.utcnow().isoformat()
})
# Return minimal dashboard on error
return {
"status": "error",
"timestamp": datetime.utcnow().isoformat(),
"error": "Failed to generate complete dashboard",
"message": str(e)
}
@app.get("/ai-models-health")
async def ai_models_health_check():
"""Check AI model loading status and performance"""
try:
# Test model loader
from model_loader import get_model_loader
model_loader = get_model_loader()
# Test model loading
test_result = await model_loader.test_model_loading()
return {
"status": "healthy" if test_result.get("models_loaded", 0) > 0 else "degraded",
"ai_models": {
"total_configured": test_result.get("total_models", 0),
"successfully_loaded": test_result.get("models_loaded", 0),
"failed_to_load": test_result.get("models_failed", 0),
"loading_errors": test_result.get("errors", []),
"device": test_result.get("device", "unknown"),
"pytorch_version": test_result.get("pytorch_version", "unknown")
},
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
return {
"status": "error",
"ai_models": {
"error": str(e),
"models_loaded": 0,
"device": "unknown"
},
"timestamp": datetime.utcnow().isoformat()
}
@app.get("/compliance-status")
async def get_compliance_status():
"""Get HIPAA/GDPR compliance status"""
return compliance_validator.check_compliance()
@app.post("/auth/login")
async def login(email: str, password: str):
"""
User authentication endpoint
In production, validate credentials against secure database
"""
# Demo authentication - in production, validate against database
logger.warning("Demo authentication - implement secure auth in production")
# For demo, accept any credentials
user_id = str(uuid.uuid4())
token = security_manager.create_access_token(user_id, email)
return {
"access_token": token,
"token_type": "bearer",
"user_id": user_id,
"email": email
}
@app.post("/analyze", response_model=AnalysisStatus)
async def analyze_document(
request: Request,
file: UploadFile = File(...),
background_tasks: BackgroundTasks = BackgroundTasks(),
current_user: Dict[str, Any] = Depends(security_manager.get_current_user)
):
"""
Upload and analyze a medical document with audit logging
This endpoint initiates the two-layer processing:
- Layer 1: PDF extraction and classification
- Layer 2: Specialized model analysis
Security: Logs all PHI access for HIPAA compliance
"""
# Generate unique job ID
job_id = str(uuid.uuid4())
# Audit log: Document upload
client_ip = request.client.host if request.client else "unknown"
security_manager.audit_logger.log_phi_access(
user_id=current_user.get("user_id", "unknown"),
document_id=job_id,
action="UPLOAD",
ip_address=client_ip
)
# Validate file type
if not file.filename.lower().endswith('.pdf'):
raise HTTPException(
status_code=400,
detail="Only PDF files are supported"
)
# Initialize job tracking
job_tracker[job_id] = {
"status": "processing",
"progress": 0.0,
"filename": file.filename,
"user_id": current_user.get("user_id"),
"created_at": datetime.utcnow().isoformat()
}
try:
# Save uploaded file temporarily
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file:
content = await file.read()
tmp_file.write(content)
tmp_file_path = tmp_file.name
# Schedule background processing
background_tasks.add_task(
process_document_pipeline,
job_id,
tmp_file_path,
file.filename,
current_user.get("user_id")
)
logger.info(f"Analysis job {job_id} created for file: {file.filename}")
return AnalysisStatus(
job_id=job_id,
status="processing",
progress=0.0,
message="Document uploaded successfully. Analysis in progress."
)
except Exception as e:
logger.error(f"Error creating analysis job: {str(e)}")
job_tracker[job_id]["status"] = "failed"
job_tracker[job_id]["error"] = str(e)
# Audit log: Failed upload
security_manager.audit_logger.log_access(
user_id=current_user.get("user_id", "unknown"),
action="UPLOAD_FAILED",
resource=f"document:{job_id}",
ip_address=client_ip,
status="FAILED",
details={"error": str(e)}
)
raise HTTPException(status_code=500, detail=f"Analysis failed: {str(e)}")
@app.get("/status/{job_id}", response_model=AnalysisStatus)
async def get_analysis_status(job_id: str):
"""Get the current status of an analysis job"""
if job_id not in job_tracker:
raise HTTPException(status_code=404, detail="Job not found")
job_data = job_tracker[job_id]
return AnalysisStatus(
job_id=job_id,
status=job_data["status"],
progress=job_data.get("progress", 0.0),
message=job_data.get("message", "Processing...")
)
@app.get("/results/{job_id}", response_model=AnalysisResult)
async def get_analysis_results(job_id: str):
"""Retrieve the analysis results for a completed job"""
if job_id not in job_tracker:
raise HTTPException(status_code=404, detail="Job not found")
job_data = job_tracker[job_id]
if job_data["status"] != "completed":
raise HTTPException(
status_code=400,
detail=f"Analysis not completed. Current status: {job_data['status']}"
)
return AnalysisResult(**job_data["result"])
@app.get("/supported-models")
async def get_supported_models():
"""Get list of supported medical AI models by domain"""
return {
"domains": {
"clinical_notes": {
"models": ["MedGemma 27B", "Bio_ClinicalBERT"],
"tasks": ["summarization", "entity_extraction", "coding"]
},
"radiology": {
"models": ["MedGemma 4B Multimodal", "MONAI"],
"tasks": ["vqa", "report_generation", "segmentation"]
},
"pathology": {
"models": ["Path Foundation", "UNI2-h"],
"tasks": ["slide_classification", "embedding_generation"]
},
"cardiology": {
"models": ["HuBERT-ECG"],
"tasks": ["ecg_analysis", "event_prediction"]
},
"laboratory": {
"models": ["DrLlama", "Lab-AI"],
"tasks": ["normalization", "explanation"]
},
"drug_interactions": {
"models": ["CatBoost DDI", "DrugGen"],
"tasks": ["interaction_classification"]
},
"diagnosis": {
"models": ["MedGemma 27B"],
"tasks": ["differential_diagnosis", "triage"]
},
"coding": {
"models": ["Rayyan Med Coding", "ICD-10 Predictors"],
"tasks": ["icd10_extraction", "cpt_coding"]
},
"mental_health": {
"models": ["MentalBERT"],
"tasks": ["screening", "sentiment_analysis"]
}
}
}
async def process_document_pipeline(job_id: str, file_path: str, filename: str, user_id: str = "unknown"):
"""
Background task for processing medical documents through the full pipeline
Pipeline stages:
1. PDF Extraction (text, images, tables)
2. Document Classification
3. Intelligent Routing
4. Specialized Model Analysis
5. Result Synthesis
Security: All stages logged for HIPAA compliance
"""
try:
# Stage 1: PDF Processing
job_tracker[job_id]["progress"] = 0.1
job_tracker[job_id]["message"] = "Extracting content from PDF..."
logger.info(f"Job {job_id}: Starting PDF extraction")
pdf_content = await pdf_processor.extract_content(file_path)
# Stage 2: Document Classification
job_tracker[job_id]["progress"] = 0.3
job_tracker[job_id]["message"] = "Classifying document type..."
logger.info(f"Job {job_id}: Classifying document")
classification = await document_classifier.classify(pdf_content)
# Audit log: Classification complete
security_manager.audit_logger.log_phi_access(
user_id=user_id,
document_id=job_id,
action="CLASSIFY",
ip_address="internal"
)
# Stage 3: Model Routing
job_tracker[job_id]["progress"] = 0.4
job_tracker[job_id]["message"] = "Routing to specialized models..."
logger.info(f"Job {job_id}: Routing to models - {classification['document_type']}")
model_tasks = model_router.route(classification, pdf_content)
# Stage 4: Specialized Analysis
job_tracker[job_id]["progress"] = 0.5
job_tracker[job_id]["message"] = "Running specialized analysis..."
logger.info(f"Job {job_id}: Running {len(model_tasks)} specialized models")
specialized_results = []
for i, task in enumerate(model_tasks):
result = await model_router.execute_task(task)
specialized_results.append(result)
progress = 0.5 + (0.3 * (i + 1) / len(model_tasks))
job_tracker[job_id]["progress"] = progress
# Stage 5: Result Synthesis
job_tracker[job_id]["progress"] = 0.9
job_tracker[job_id]["message"] = "Synthesizing results..."
logger.info(f"Job {job_id}: Synthesizing results")
final_analysis = await analysis_synthesizer.synthesize(
classification,
specialized_results,
pdf_content
)
# Complete
job_tracker[job_id]["progress"] = 1.0
job_tracker[job_id]["status"] = "completed"
job_tracker[job_id]["message"] = "Analysis complete"
job_tracker[job_id]["result"] = {
"job_id": job_id,
"document_type": classification["document_type"],
"confidence": classification["confidence"],
"analysis": final_analysis,
"specialized_results": specialized_results,
"summary": final_analysis.get("summary", ""),
"timestamp": datetime.utcnow().isoformat()
}
logger.info(f"Job {job_id}: Analysis completed successfully")
# Audit log: Analysis complete
security_manager.audit_logger.log_phi_access(
user_id=user_id,
document_id=job_id,
action="ANALYSIS_COMPLETE",
ip_address="internal"
)
# Secure cleanup of temporary file
data_encryption.secure_delete(file_path)
except Exception as e:
logger.error(f"Job {job_id}: Analysis failed - {str(e)}")
job_tracker[job_id]["status"] = "failed"
job_tracker[job_id]["message"] = f"Analysis failed: {str(e)}"
job_tracker[job_id]["error"] = str(e)
# Audit log: Analysis failed
security_manager.audit_logger.log_access(
user_id=user_id,
action="ANALYSIS_FAILED",
resource=f"document:{job_id}",
ip_address="internal",
status="FAILED",
details={"error": str(e)}
)
# Cleanup on error
if os.path.exists(file_path):
data_encryption.secure_delete(file_path)
# ================================
# CLINICAL SYNTHESIS ENDPOINTS
# ================================
class SynthesisRequest(BaseModel):
"""Request model for clinical synthesis"""
modality: str
structured_data: Dict[str, Any]
model_outputs: List[Dict[str, Any]] = []
summary_type: Literal["clinician", "patient"] = "clinician"
class MultiModalSynthesisRequest(BaseModel):
"""Request model for multi-modal synthesis"""
modalities_data: Dict[str, Dict[str, Any]]
summary_type: Literal["clinician", "patient"] = "clinician"
@app.post("/synthesize")
async def synthesize_clinical_summary(
request: SynthesisRequest,
current_user: Dict[str, Any] = Depends(security_manager.get_current_user)
):
"""
Generate clinical summary from structured medical data
Supports:
- Clinician-level technical summaries
- Patient-friendly explanations
- Confidence-based recommendations
- All medical modalities (ECG, radiology, laboratory, clinical notes)
Security: Requires authentication, logs all synthesis requests
"""
try:
user_id = current_user.get("user_id", "unknown")
logger.info(f"Synthesis request from user {user_id}: {request.modality} ({request.summary_type})")
# Audit log
security_manager.audit_logger.log_access(
user_id=user_id,
action="SYNTHESIS_REQUEST",
resource=f"synthesis:{request.modality}",
ip_address="internal",
status="INITIATED",
details={"summary_type": request.summary_type}
)
# Perform synthesis
result = await synthesis_service.synthesize_clinical_summary(
modality=request.modality,
structured_data=request.structured_data,
model_outputs=request.model_outputs,
summary_type=request.summary_type,
user_id=user_id
)
# Audit log: Success
security_manager.audit_logger.log_access(
user_id=user_id,
action="SYNTHESIS_COMPLETE",
resource=f"synthesis:{result.get('synthesis_id')}",
ip_address="internal",
status="SUCCESS",
details={
"confidence": result.get("confidence_scores", {}).get("overall_confidence", 0.0),
"requires_review": result.get("requires_review", False)
}
)
return result
except Exception as e:
logger.error(f"Synthesis failed: {str(e)}")
# Audit log: Failure
security_manager.audit_logger.log_access(
user_id=current_user.get("user_id", "unknown"),
action="SYNTHESIS_FAILED",
resource=f"synthesis:{request.modality}",
ip_address="internal",
status="FAILED",
details={"error": str(e)}
)
raise HTTPException(status_code=500, detail=f"Synthesis failed: {str(e)}")
@app.post("/synthesize/multi-modal")
async def synthesize_multi_modal(
request: MultiModalSynthesisRequest,
current_user: Dict[str, Any] = Depends(security_manager.get_current_user)
):
"""
Generate integrated clinical summary from multiple medical modalities
Combines ECG, radiology, laboratory, and clinical notes into unified assessment
Security: Requires authentication, logs all synthesis requests
"""
try:
user_id = current_user.get("user_id", "unknown")
modalities = list(request.modalities_data.keys())
logger.info(f"Multi-modal synthesis request from user {user_id}: {modalities}")
# Audit log
security_manager.audit_logger.log_access(
user_id=user_id,
action="MULTI_MODAL_SYNTHESIS",
resource=f"synthesis:multi-modal",
ip_address="internal",
status="INITIATED",
details={"modalities": modalities, "summary_type": request.summary_type}
)
# Perform multi-modal synthesis
result = await synthesis_service.synthesize_multi_modal(
modalities_data=request.modalities_data,
summary_type=request.summary_type,
user_id=user_id
)
# Audit log: Success
security_manager.audit_logger.log_access(
user_id=user_id,
action="MULTI_MODAL_SYNTHESIS_COMPLETE",
resource=f"synthesis:multi-modal",
ip_address="internal",
status="SUCCESS",
details={
"modalities": modalities,
"overall_confidence": result.get("overall_confidence", 0.0)
}
)
return result
except Exception as e:
logger.error(f"Multi-modal synthesis failed: {str(e)}")
# Audit log: Failure
security_manager.audit_logger.log_access(
user_id=current_user.get("user_id", "unknown"),
action="MULTI_MODAL_SYNTHESIS_FAILED",
resource=f"synthesis:multi-modal",
ip_address="internal",
status="FAILED",
details={"error": str(e)}
)
raise HTTPException(status_code=500, detail=f"Multi-modal synthesis failed: {str(e)}")
@app.get("/synthesize/history")
async def get_synthesis_history(
limit: int = 100,
current_user: Dict[str, Any] = Depends(security_manager.get_current_user)
):
"""
Get synthesis history for audit purposes
Security: Returns only current user's synthesis history
"""
user_id = current_user.get("user_id", "unknown")
history = synthesis_service.get_synthesis_history(user_id=user_id, limit=limit)
return {
"user_id": user_id,
"total_syntheses": len(history),
"history": history
}
@app.get("/synthesize/statistics")
async def get_synthesis_statistics(
current_user: Dict[str, Any] = Depends(security_manager.get_current_user)
):
"""
Get synthesis service usage statistics
Provides insights into:
- Total syntheses performed
- Average confidence scores
- Review requirements
- Processing times
"""
stats = synthesis_service.get_synthesis_statistics()
return {
"statistics": stats,
"timestamp": datetime.utcnow().isoformat()
}
# ================================
# END CLINICAL SYNTHESIS ENDPOINTS
# ================================
# Catch-all route for React Router (single-page application) - MUST BE LAST
@app.get("/{full_path:path}")
async def serve_react_app(full_path: str):
"""Serve React app for any non-API routes"""
static_dir = Path(__file__).parent / "static"
index_file = static_dir / "index.html"
# Check if this is an API route or static file
if (full_path.startswith(('api', 'health', 'analyze', 'status', 'results', 'supported-models', 'compliance-status', 'assets'))):
raise HTTPException(status_code=404, detail="API endpoint not found")
# Serve React app for everything else (client-side routing)
if index_file.exists():
return FileResponse(index_file)
else:
raise HTTPException(status_code=404, detail="React app not found")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)