Spaces:
Sleeping
Sleeping
| """ | |
| Medical Report Analysis Platform - Main Backend Application | |
| Comprehensive AI-powered medical document analysis with multi-model processing | |
| With HIPAA/GDPR Security & Compliance Features | |
| """ | |
| from fastapi import FastAPI, File, UploadFile, HTTPException, BackgroundTasks, Request, Depends | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse, FileResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from pydantic import BaseModel | |
| from pathlib import Path | |
| from typing import List, Dict, Optional, Any | |
| import os | |
| import tempfile | |
| import logging | |
| from datetime import datetime | |
| import uuid | |
| # Import processing modules | |
| from pdf_processor import PDFProcessor | |
| from document_classifier import DocumentClassifier | |
| from model_router import ModelRouter | |
| from analysis_synthesizer import AnalysisSynthesizer | |
| from security import get_security_manager, ComplianceValidator, DataEncryption | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Initialize FastAPI app | |
| app = FastAPI( | |
| title="Medical Report Analysis Platform", | |
| description="HIPAA/GDPR Compliant AI-powered medical document analysis", | |
| version="2.0.0" | |
| ) | |
| # CORS configuration | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # Configure appropriately for production | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Mount static files (frontend) | |
| static_dir = Path(__file__).parent / "static" | |
| if static_dir.exists(): | |
| app.mount("/assets", StaticFiles(directory=static_dir / "assets"), name="assets") | |
| logger.info("Static files mounted successfully") | |
| # Initialize processing components | |
| pdf_processor = PDFProcessor() | |
| document_classifier = DocumentClassifier() | |
| model_router = ModelRouter() | |
| analysis_synthesizer = AnalysisSynthesizer() | |
| # Initialize security components | |
| security_manager = get_security_manager() | |
| compliance_validator = ComplianceValidator() | |
| data_encryption = DataEncryption() | |
| logger.info("Security and compliance features initialized") | |
| # Request/Response Models | |
| class AnalysisStatus(BaseModel): | |
| job_id: str | |
| status: str | |
| progress: float | |
| message: str | |
| class AnalysisResult(BaseModel): | |
| job_id: str | |
| document_type: str | |
| confidence: float | |
| analysis: Dict[str, Any] | |
| specialized_results: List[Dict[str, Any]] | |
| summary: str | |
| timestamp: str | |
| class HealthCheck(BaseModel): | |
| status: str | |
| version: str | |
| timestamp: str | |
| # In-memory job tracking (use Redis/database in production) | |
| job_tracker: Dict[str, Dict[str, Any]] = {} | |
| async def api_root(): | |
| """API health check endpoint""" | |
| return HealthCheck( | |
| status="healthy", | |
| version="1.0.0", | |
| timestamp=datetime.utcnow().isoformat() | |
| ) | |
| async def root(): | |
| """Serve frontend""" | |
| static_dir = Path(__file__).parent / "static" | |
| index_file = static_dir / "index.html" | |
| if index_file.exists(): | |
| return FileResponse(index_file) | |
| else: | |
| return {"message": "Medical Report Analysis Platform API", "version": "1.0.0"} | |
| async def health_check(): | |
| """Detailed health check with component status""" | |
| return { | |
| "status": "healthy", | |
| "components": { | |
| "pdf_processor": "ready", | |
| "classifier": "ready", | |
| "model_router": "ready", | |
| "synthesizer": "ready", | |
| "security": "ready", | |
| "compliance": "active" | |
| }, | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| async def get_compliance_status(): | |
| """Get HIPAA/GDPR compliance status""" | |
| return compliance_validator.check_compliance() | |
| async def login(email: str, password: str): | |
| """ | |
| User authentication endpoint | |
| In production, validate credentials against secure database | |
| """ | |
| # Demo authentication - in production, validate against database | |
| logger.warning("Demo authentication - implement secure auth in production") | |
| # For demo, accept any credentials | |
| user_id = str(uuid.uuid4()) | |
| token = security_manager.create_access_token(user_id, email) | |
| return { | |
| "access_token": token, | |
| "token_type": "bearer", | |
| "user_id": user_id, | |
| "email": email | |
| } | |
| async def analyze_document( | |
| request: Request, | |
| file: UploadFile = File(...), | |
| background_tasks: BackgroundTasks = BackgroundTasks(), | |
| current_user: Dict[str, Any] = Depends(security_manager.get_current_user) | |
| ): | |
| """ | |
| Upload and analyze a medical document with audit logging | |
| This endpoint initiates the two-layer processing: | |
| - Layer 1: PDF extraction and classification | |
| - Layer 2: Specialized model analysis | |
| Security: Logs all PHI access for HIPAA compliance | |
| """ | |
| # Generate unique job ID | |
| job_id = str(uuid.uuid4()) | |
| # Audit log: Document upload | |
| client_ip = request.client.host if request.client else "unknown" | |
| security_manager.audit_logger.log_phi_access( | |
| user_id=current_user.get("user_id", "unknown"), | |
| document_id=job_id, | |
| action="UPLOAD", | |
| ip_address=client_ip | |
| ) | |
| # Validate file type | |
| if not file.filename.lower().endswith('.pdf'): | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Only PDF files are supported" | |
| ) | |
| # Initialize job tracking | |
| job_tracker[job_id] = { | |
| "status": "processing", | |
| "progress": 0.0, | |
| "filename": file.filename, | |
| "user_id": current_user.get("user_id"), | |
| "created_at": datetime.utcnow().isoformat() | |
| } | |
| try: | |
| # Save uploaded file temporarily | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file: | |
| content = await file.read() | |
| tmp_file.write(content) | |
| tmp_file_path = tmp_file.name | |
| # Schedule background processing | |
| background_tasks.add_task( | |
| process_document_pipeline, | |
| job_id, | |
| tmp_file_path, | |
| file.filename, | |
| current_user.get("user_id") | |
| ) | |
| logger.info(f"Analysis job {job_id} created for file: {file.filename}") | |
| return AnalysisStatus( | |
| job_id=job_id, | |
| status="processing", | |
| progress=0.0, | |
| message="Document uploaded successfully. Analysis in progress." | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error creating analysis job: {str(e)}") | |
| job_tracker[job_id]["status"] = "failed" | |
| job_tracker[job_id]["error"] = str(e) | |
| # Audit log: Failed upload | |
| security_manager.audit_logger.log_access( | |
| user_id=current_user.get("user_id", "unknown"), | |
| action="UPLOAD_FAILED", | |
| resource=f"document:{job_id}", | |
| ip_address=client_ip, | |
| status="FAILED", | |
| details={"error": str(e)} | |
| ) | |
| raise HTTPException(status_code=500, detail=f"Analysis failed: {str(e)}") | |
| async def get_analysis_status(job_id: str): | |
| """Get the current status of an analysis job""" | |
| if job_id not in job_tracker: | |
| raise HTTPException(status_code=404, detail="Job not found") | |
| job_data = job_tracker[job_id] | |
| return AnalysisStatus( | |
| job_id=job_id, | |
| status=job_data["status"], | |
| progress=job_data.get("progress", 0.0), | |
| message=job_data.get("message", "Processing...") | |
| ) | |
| async def get_analysis_results(job_id: str): | |
| """Retrieve the analysis results for a completed job""" | |
| if job_id not in job_tracker: | |
| raise HTTPException(status_code=404, detail="Job not found") | |
| job_data = job_tracker[job_id] | |
| if job_data["status"] != "completed": | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Analysis not completed. Current status: {job_data['status']}" | |
| ) | |
| return AnalysisResult(**job_data["result"]) | |
| async def get_supported_models(): | |
| """Get list of supported medical AI models by domain""" | |
| return { | |
| "domains": { | |
| "clinical_notes": { | |
| "models": ["MedGemma 27B", "Bio_ClinicalBERT"], | |
| "tasks": ["summarization", "entity_extraction", "coding"] | |
| }, | |
| "radiology": { | |
| "models": ["MedGemma 4B Multimodal", "MONAI"], | |
| "tasks": ["vqa", "report_generation", "segmentation"] | |
| }, | |
| "pathology": { | |
| "models": ["Path Foundation", "UNI2-h"], | |
| "tasks": ["slide_classification", "embedding_generation"] | |
| }, | |
| "cardiology": { | |
| "models": ["HuBERT-ECG"], | |
| "tasks": ["ecg_analysis", "event_prediction"] | |
| }, | |
| "laboratory": { | |
| "models": ["DrLlama", "Lab-AI"], | |
| "tasks": ["normalization", "explanation"] | |
| }, | |
| "drug_interactions": { | |
| "models": ["CatBoost DDI", "DrugGen"], | |
| "tasks": ["interaction_classification"] | |
| }, | |
| "diagnosis": { | |
| "models": ["MedGemma 27B"], | |
| "tasks": ["differential_diagnosis", "triage"] | |
| }, | |
| "coding": { | |
| "models": ["Rayyan Med Coding", "ICD-10 Predictors"], | |
| "tasks": ["icd10_extraction", "cpt_coding"] | |
| }, | |
| "mental_health": { | |
| "models": ["MentalBERT"], | |
| "tasks": ["screening", "sentiment_analysis"] | |
| } | |
| } | |
| } | |
| async def process_document_pipeline(job_id: str, file_path: str, filename: str, user_id: str = "unknown"): | |
| """ | |
| Background task for processing medical documents through the full pipeline | |
| Pipeline stages: | |
| 1. PDF Extraction (text, images, tables) | |
| 2. Document Classification | |
| 3. Intelligent Routing | |
| 4. Specialized Model Analysis | |
| 5. Result Synthesis | |
| Security: All stages logged for HIPAA compliance | |
| """ | |
| try: | |
| # Stage 1: PDF Processing | |
| job_tracker[job_id]["progress"] = 0.1 | |
| job_tracker[job_id]["message"] = "Extracting content from PDF..." | |
| logger.info(f"Job {job_id}: Starting PDF extraction") | |
| pdf_content = await pdf_processor.extract_content(file_path) | |
| # Stage 2: Document Classification | |
| job_tracker[job_id]["progress"] = 0.3 | |
| job_tracker[job_id]["message"] = "Classifying document type..." | |
| logger.info(f"Job {job_id}: Classifying document") | |
| classification = await document_classifier.classify(pdf_content) | |
| # Audit log: Classification complete | |
| security_manager.audit_logger.log_phi_access( | |
| user_id=user_id, | |
| document_id=job_id, | |
| action="CLASSIFY", | |
| ip_address="internal" | |
| ) | |
| # Stage 3: Model Routing | |
| job_tracker[job_id]["progress"] = 0.4 | |
| job_tracker[job_id]["message"] = "Routing to specialized models..." | |
| logger.info(f"Job {job_id}: Routing to models - {classification['document_type']}") | |
| model_tasks = model_router.route(classification, pdf_content) | |
| # Stage 4: Specialized Analysis | |
| job_tracker[job_id]["progress"] = 0.5 | |
| job_tracker[job_id]["message"] = "Running specialized analysis..." | |
| logger.info(f"Job {job_id}: Running {len(model_tasks)} specialized models") | |
| specialized_results = [] | |
| for i, task in enumerate(model_tasks): | |
| result = await model_router.execute_task(task) | |
| specialized_results.append(result) | |
| progress = 0.5 + (0.3 * (i + 1) / len(model_tasks)) | |
| job_tracker[job_id]["progress"] = progress | |
| # Stage 5: Result Synthesis | |
| job_tracker[job_id]["progress"] = 0.9 | |
| job_tracker[job_id]["message"] = "Synthesizing results..." | |
| logger.info(f"Job {job_id}: Synthesizing results") | |
| final_analysis = await analysis_synthesizer.synthesize( | |
| classification, | |
| specialized_results, | |
| pdf_content | |
| ) | |
| # Complete | |
| job_tracker[job_id]["progress"] = 1.0 | |
| job_tracker[job_id]["status"] = "completed" | |
| job_tracker[job_id]["message"] = "Analysis complete" | |
| job_tracker[job_id]["result"] = { | |
| "job_id": job_id, | |
| "document_type": classification["document_type"], | |
| "confidence": classification["confidence"], | |
| "analysis": final_analysis, | |
| "specialized_results": specialized_results, | |
| "summary": final_analysis.get("summary", ""), | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| logger.info(f"Job {job_id}: Analysis completed successfully") | |
| # Audit log: Analysis complete | |
| security_manager.audit_logger.log_phi_access( | |
| user_id=user_id, | |
| document_id=job_id, | |
| action="ANALYSIS_COMPLETE", | |
| ip_address="internal" | |
| ) | |
| # Secure cleanup of temporary file | |
| data_encryption.secure_delete(file_path) | |
| except Exception as e: | |
| logger.error(f"Job {job_id}: Analysis failed - {str(e)}") | |
| job_tracker[job_id]["status"] = "failed" | |
| job_tracker[job_id]["message"] = f"Analysis failed: {str(e)}" | |
| job_tracker[job_id]["error"] = str(e) | |
| # Audit log: Analysis failed | |
| security_manager.audit_logger.log_access( | |
| user_id=user_id, | |
| action="ANALYSIS_FAILED", | |
| resource=f"document:{job_id}", | |
| ip_address="internal", | |
| status="FAILED", | |
| details={"error": str(e)} | |
| ) | |
| # Cleanup on error | |
| if os.path.exists(file_path): | |
| data_encryption.secure_delete(file_path) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |