""" Medical Report Analysis Platform - Main Backend Application Comprehensive AI-powered medical document analysis with multi-model processing With HIPAA/GDPR Security & Compliance Features """ from fastapi import FastAPI, File, UploadFile, HTTPException, BackgroundTasks, Request, Depends from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, FileResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel from pathlib import Path from typing import List, Dict, Optional, Any import os import tempfile import logging from datetime import datetime import uuid # Import processing modules from pdf_processor import PDFProcessor from document_classifier import DocumentClassifier from model_router import ModelRouter from analysis_synthesizer import AnalysisSynthesizer from security import get_security_manager, ComplianceValidator, DataEncryption # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Initialize FastAPI app app = FastAPI( title="Medical Report Analysis Platform", description="HIPAA/GDPR Compliant AI-powered medical document analysis", version="2.0.0" ) # CORS configuration app.add_middleware( CORSMiddleware, allow_origins=["*"], # Configure appropriately for production allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Mount static files (frontend) static_dir = Path(__file__).parent / "static" if static_dir.exists(): app.mount("/assets", StaticFiles(directory=static_dir / "assets"), name="assets") logger.info("Static files mounted successfully") # Initialize processing components pdf_processor = PDFProcessor() document_classifier = DocumentClassifier() model_router = ModelRouter() analysis_synthesizer = AnalysisSynthesizer() # Initialize security components security_manager = get_security_manager() compliance_validator = ComplianceValidator() data_encryption = DataEncryption() logger.info("Security and compliance features initialized") # Request/Response Models class AnalysisStatus(BaseModel): job_id: str status: str progress: float message: str class AnalysisResult(BaseModel): job_id: str document_type: str confidence: float analysis: Dict[str, Any] specialized_results: List[Dict[str, Any]] summary: str timestamp: str class HealthCheck(BaseModel): status: str version: str timestamp: str # In-memory job tracking (use Redis/database in production) job_tracker: Dict[str, Dict[str, Any]] = {} @app.get("/api", response_model=HealthCheck) async def api_root(): """API health check endpoint""" return HealthCheck( status="healthy", version="1.0.0", timestamp=datetime.utcnow().isoformat() ) @app.get("/") async def root(): """Serve frontend""" static_dir = Path(__file__).parent / "static" index_file = static_dir / "index.html" if index_file.exists(): return FileResponse(index_file) else: return {"message": "Medical Report Analysis Platform API", "version": "1.0.0"} @app.get("/health") async def health_check(): """Detailed health check with component status""" return { "status": "healthy", "components": { "pdf_processor": "ready", "classifier": "ready", "model_router": "ready", "synthesizer": "ready", "security": "ready", "compliance": "active" }, "timestamp": datetime.utcnow().isoformat() } @app.get("/compliance-status") async def get_compliance_status(): """Get HIPAA/GDPR compliance status""" return compliance_validator.check_compliance() @app.post("/auth/login") async def login(email: str, password: str): """ User authentication endpoint In production, validate credentials against secure database """ # Demo authentication - in production, validate against database logger.warning("Demo authentication - implement secure auth in production") # For demo, accept any credentials user_id = str(uuid.uuid4()) token = security_manager.create_access_token(user_id, email) return { "access_token": token, "token_type": "bearer", "user_id": user_id, "email": email } @app.post("/analyze", response_model=AnalysisStatus) async def analyze_document( request: Request, file: UploadFile = File(...), background_tasks: BackgroundTasks = BackgroundTasks(), current_user: Dict[str, Any] = Depends(security_manager.get_current_user) ): """ Upload and analyze a medical document with audit logging This endpoint initiates the two-layer processing: - Layer 1: PDF extraction and classification - Layer 2: Specialized model analysis Security: Logs all PHI access for HIPAA compliance """ # Generate unique job ID job_id = str(uuid.uuid4()) # Audit log: Document upload client_ip = request.client.host if request.client else "unknown" security_manager.audit_logger.log_phi_access( user_id=current_user.get("user_id", "unknown"), document_id=job_id, action="UPLOAD", ip_address=client_ip ) # Validate file type if not file.filename.lower().endswith('.pdf'): raise HTTPException( status_code=400, detail="Only PDF files are supported" ) # Initialize job tracking job_tracker[job_id] = { "status": "processing", "progress": 0.0, "filename": file.filename, "user_id": current_user.get("user_id"), "created_at": datetime.utcnow().isoformat() } try: # Save uploaded file temporarily with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file: content = await file.read() tmp_file.write(content) tmp_file_path = tmp_file.name # Schedule background processing background_tasks.add_task( process_document_pipeline, job_id, tmp_file_path, file.filename, current_user.get("user_id") ) logger.info(f"Analysis job {job_id} created for file: {file.filename}") return AnalysisStatus( job_id=job_id, status="processing", progress=0.0, message="Document uploaded successfully. Analysis in progress." ) except Exception as e: logger.error(f"Error creating analysis job: {str(e)}") job_tracker[job_id]["status"] = "failed" job_tracker[job_id]["error"] = str(e) # Audit log: Failed upload security_manager.audit_logger.log_access( user_id=current_user.get("user_id", "unknown"), action="UPLOAD_FAILED", resource=f"document:{job_id}", ip_address=client_ip, status="FAILED", details={"error": str(e)} ) raise HTTPException(status_code=500, detail=f"Analysis failed: {str(e)}") @app.get("/status/{job_id}", response_model=AnalysisStatus) async def get_analysis_status(job_id: str): """Get the current status of an analysis job""" if job_id not in job_tracker: raise HTTPException(status_code=404, detail="Job not found") job_data = job_tracker[job_id] return AnalysisStatus( job_id=job_id, status=job_data["status"], progress=job_data.get("progress", 0.0), message=job_data.get("message", "Processing...") ) @app.get("/results/{job_id}", response_model=AnalysisResult) async def get_analysis_results(job_id: str): """Retrieve the analysis results for a completed job""" if job_id not in job_tracker: raise HTTPException(status_code=404, detail="Job not found") job_data = job_tracker[job_id] if job_data["status"] != "completed": raise HTTPException( status_code=400, detail=f"Analysis not completed. Current status: {job_data['status']}" ) return AnalysisResult(**job_data["result"]) @app.get("/supported-models") async def get_supported_models(): """Get list of supported medical AI models by domain""" return { "domains": { "clinical_notes": { "models": ["MedGemma 27B", "Bio_ClinicalBERT"], "tasks": ["summarization", "entity_extraction", "coding"] }, "radiology": { "models": ["MedGemma 4B Multimodal", "MONAI"], "tasks": ["vqa", "report_generation", "segmentation"] }, "pathology": { "models": ["Path Foundation", "UNI2-h"], "tasks": ["slide_classification", "embedding_generation"] }, "cardiology": { "models": ["HuBERT-ECG"], "tasks": ["ecg_analysis", "event_prediction"] }, "laboratory": { "models": ["DrLlama", "Lab-AI"], "tasks": ["normalization", "explanation"] }, "drug_interactions": { "models": ["CatBoost DDI", "DrugGen"], "tasks": ["interaction_classification"] }, "diagnosis": { "models": ["MedGemma 27B"], "tasks": ["differential_diagnosis", "triage"] }, "coding": { "models": ["Rayyan Med Coding", "ICD-10 Predictors"], "tasks": ["icd10_extraction", "cpt_coding"] }, "mental_health": { "models": ["MentalBERT"], "tasks": ["screening", "sentiment_analysis"] } } } async def process_document_pipeline(job_id: str, file_path: str, filename: str, user_id: str = "unknown"): """ Background task for processing medical documents through the full pipeline Pipeline stages: 1. PDF Extraction (text, images, tables) 2. Document Classification 3. Intelligent Routing 4. Specialized Model Analysis 5. Result Synthesis Security: All stages logged for HIPAA compliance """ try: # Stage 1: PDF Processing job_tracker[job_id]["progress"] = 0.1 job_tracker[job_id]["message"] = "Extracting content from PDF..." logger.info(f"Job {job_id}: Starting PDF extraction") pdf_content = await pdf_processor.extract_content(file_path) # Stage 2: Document Classification job_tracker[job_id]["progress"] = 0.3 job_tracker[job_id]["message"] = "Classifying document type..." logger.info(f"Job {job_id}: Classifying document") classification = await document_classifier.classify(pdf_content) # Audit log: Classification complete security_manager.audit_logger.log_phi_access( user_id=user_id, document_id=job_id, action="CLASSIFY", ip_address="internal" ) # Stage 3: Model Routing job_tracker[job_id]["progress"] = 0.4 job_tracker[job_id]["message"] = "Routing to specialized models..." logger.info(f"Job {job_id}: Routing to models - {classification['document_type']}") model_tasks = model_router.route(classification, pdf_content) # Stage 4: Specialized Analysis job_tracker[job_id]["progress"] = 0.5 job_tracker[job_id]["message"] = "Running specialized analysis..." logger.info(f"Job {job_id}: Running {len(model_tasks)} specialized models") specialized_results = [] for i, task in enumerate(model_tasks): result = await model_router.execute_task(task) specialized_results.append(result) progress = 0.5 + (0.3 * (i + 1) / len(model_tasks)) job_tracker[job_id]["progress"] = progress # Stage 5: Result Synthesis job_tracker[job_id]["progress"] = 0.9 job_tracker[job_id]["message"] = "Synthesizing results..." logger.info(f"Job {job_id}: Synthesizing results") final_analysis = await analysis_synthesizer.synthesize( classification, specialized_results, pdf_content ) # Complete job_tracker[job_id]["progress"] = 1.0 job_tracker[job_id]["status"] = "completed" job_tracker[job_id]["message"] = "Analysis complete" job_tracker[job_id]["result"] = { "job_id": job_id, "document_type": classification["document_type"], "confidence": classification["confidence"], "analysis": final_analysis, "specialized_results": specialized_results, "summary": final_analysis.get("summary", ""), "timestamp": datetime.utcnow().isoformat() } logger.info(f"Job {job_id}: Analysis completed successfully") # Audit log: Analysis complete security_manager.audit_logger.log_phi_access( user_id=user_id, document_id=job_id, action="ANALYSIS_COMPLETE", ip_address="internal" ) # Secure cleanup of temporary file data_encryption.secure_delete(file_path) except Exception as e: logger.error(f"Job {job_id}: Analysis failed - {str(e)}") job_tracker[job_id]["status"] = "failed" job_tracker[job_id]["message"] = f"Analysis failed: {str(e)}" job_tracker[job_id]["error"] = str(e) # Audit log: Analysis failed security_manager.audit_logger.log_access( user_id=user_id, action="ANALYSIS_FAILED", resource=f"document:{job_id}", ip_address="internal", status="FAILED", details={"error": str(e)} ) # Cleanup on error if os.path.exists(file_path): data_encryption.secure_delete(file_path) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)