|
|
""" |
|
|
Medical Report Analysis Platform - Main Backend Application |
|
|
Comprehensive AI-powered medical document analysis with multi-model processing |
|
|
With HIPAA/GDPR Security & Compliance Features |
|
|
""" |
|
|
|
|
|
from fastapi import FastAPI, File, UploadFile, HTTPException, BackgroundTasks, Request, Depends |
|
|
from fastapi.middleware.cors import CORSMiddleware |
|
|
from fastapi.responses import JSONResponse, FileResponse |
|
|
from fastapi.staticfiles import StaticFiles |
|
|
from pydantic import BaseModel |
|
|
from pathlib import Path |
|
|
from typing import List, Dict, Optional, Any |
|
|
import os |
|
|
import tempfile |
|
|
import logging |
|
|
from datetime import datetime |
|
|
import uuid |
|
|
|
|
|
|
|
|
from pdf_processor import PDFProcessor |
|
|
from document_classifier import DocumentClassifier |
|
|
from model_router import ModelRouter |
|
|
from analysis_synthesizer import AnalysisSynthesizer |
|
|
from security import get_security_manager, ComplianceValidator, DataEncryption |
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
|
level=logging.INFO, |
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
|
|
) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
app = FastAPI( |
|
|
title="Medical Report Analysis Platform", |
|
|
description="HIPAA/GDPR Compliant AI-powered medical document analysis", |
|
|
version="2.0.0" |
|
|
) |
|
|
|
|
|
|
|
|
app.add_middleware( |
|
|
CORSMiddleware, |
|
|
allow_origins=["*"], |
|
|
allow_credentials=True, |
|
|
allow_methods=["*"], |
|
|
allow_headers=["*"], |
|
|
) |
|
|
|
|
|
|
|
|
static_dir = Path(__file__).parent / "static" |
|
|
if static_dir.exists(): |
|
|
app.mount("/assets", StaticFiles(directory=static_dir / "assets"), name="assets") |
|
|
logger.info("Static files mounted successfully") |
|
|
|
|
|
|
|
|
pdf_processor = PDFProcessor() |
|
|
document_classifier = DocumentClassifier() |
|
|
model_router = ModelRouter() |
|
|
analysis_synthesizer = AnalysisSynthesizer() |
|
|
|
|
|
|
|
|
security_manager = get_security_manager() |
|
|
compliance_validator = ComplianceValidator() |
|
|
data_encryption = DataEncryption() |
|
|
|
|
|
logger.info("Security and compliance features initialized") |
|
|
|
|
|
|
|
|
class AnalysisStatus(BaseModel): |
|
|
job_id: str |
|
|
status: str |
|
|
progress: float |
|
|
message: str |
|
|
|
|
|
class AnalysisResult(BaseModel): |
|
|
job_id: str |
|
|
document_type: str |
|
|
confidence: float |
|
|
analysis: Dict[str, Any] |
|
|
specialized_results: List[Dict[str, Any]] |
|
|
summary: str |
|
|
timestamp: str |
|
|
|
|
|
class HealthCheck(BaseModel): |
|
|
status: str |
|
|
version: str |
|
|
timestamp: str |
|
|
|
|
|
|
|
|
job_tracker: Dict[str, Dict[str, Any]] = {} |
|
|
|
|
|
|
|
|
@app.get("/api", response_model=HealthCheck) |
|
|
async def api_root(): |
|
|
"""API health check endpoint""" |
|
|
return HealthCheck( |
|
|
status="healthy", |
|
|
version="1.0.0", |
|
|
timestamp=datetime.utcnow().isoformat() |
|
|
) |
|
|
|
|
|
|
|
|
@app.get("/") |
|
|
async def root(): |
|
|
"""Serve frontend""" |
|
|
static_dir = Path(__file__).parent / "static" |
|
|
index_file = static_dir / "index.html" |
|
|
|
|
|
if index_file.exists(): |
|
|
return FileResponse(index_file) |
|
|
else: |
|
|
return {"message": "Medical Report Analysis Platform API", "version": "1.0.0"} |
|
|
|
|
|
|
|
|
@app.get("/health") |
|
|
async def health_check(): |
|
|
"""Detailed health check with component status""" |
|
|
return { |
|
|
"status": "healthy", |
|
|
"components": { |
|
|
"pdf_processor": "ready", |
|
|
"classifier": "ready", |
|
|
"model_router": "ready", |
|
|
"synthesizer": "ready", |
|
|
"security": "ready", |
|
|
"compliance": "active" |
|
|
}, |
|
|
"timestamp": datetime.utcnow().isoformat() |
|
|
} |
|
|
|
|
|
|
|
|
@app.get("/compliance-status") |
|
|
async def get_compliance_status(): |
|
|
"""Get HIPAA/GDPR compliance status""" |
|
|
return compliance_validator.check_compliance() |
|
|
|
|
|
|
|
|
@app.post("/auth/login") |
|
|
async def login(email: str, password: str): |
|
|
""" |
|
|
User authentication endpoint |
|
|
In production, validate credentials against secure database |
|
|
""" |
|
|
|
|
|
logger.warning("Demo authentication - implement secure auth in production") |
|
|
|
|
|
|
|
|
user_id = str(uuid.uuid4()) |
|
|
token = security_manager.create_access_token(user_id, email) |
|
|
|
|
|
return { |
|
|
"access_token": token, |
|
|
"token_type": "bearer", |
|
|
"user_id": user_id, |
|
|
"email": email |
|
|
} |
|
|
|
|
|
|
|
|
@app.post("/analyze", response_model=AnalysisStatus) |
|
|
async def analyze_document( |
|
|
request: Request, |
|
|
file: UploadFile = File(...), |
|
|
background_tasks: BackgroundTasks = BackgroundTasks(), |
|
|
current_user: Dict[str, Any] = Depends(security_manager.get_current_user) |
|
|
): |
|
|
""" |
|
|
Upload and analyze a medical document with audit logging |
|
|
|
|
|
This endpoint initiates the two-layer processing: |
|
|
- Layer 1: PDF extraction and classification |
|
|
- Layer 2: Specialized model analysis |
|
|
|
|
|
Security: Logs all PHI access for HIPAA compliance |
|
|
""" |
|
|
|
|
|
|
|
|
job_id = str(uuid.uuid4()) |
|
|
|
|
|
|
|
|
client_ip = request.client.host if request.client else "unknown" |
|
|
security_manager.audit_logger.log_phi_access( |
|
|
user_id=current_user.get("user_id", "unknown"), |
|
|
document_id=job_id, |
|
|
action="UPLOAD", |
|
|
ip_address=client_ip |
|
|
) |
|
|
|
|
|
|
|
|
if not file.filename.lower().endswith('.pdf'): |
|
|
raise HTTPException( |
|
|
status_code=400, |
|
|
detail="Only PDF files are supported" |
|
|
) |
|
|
|
|
|
|
|
|
job_tracker[job_id] = { |
|
|
"status": "processing", |
|
|
"progress": 0.0, |
|
|
"filename": file.filename, |
|
|
"user_id": current_user.get("user_id"), |
|
|
"created_at": datetime.utcnow().isoformat() |
|
|
} |
|
|
|
|
|
try: |
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file: |
|
|
content = await file.read() |
|
|
tmp_file.write(content) |
|
|
tmp_file_path = tmp_file.name |
|
|
|
|
|
|
|
|
background_tasks.add_task( |
|
|
process_document_pipeline, |
|
|
job_id, |
|
|
tmp_file_path, |
|
|
file.filename, |
|
|
current_user.get("user_id") |
|
|
) |
|
|
|
|
|
logger.info(f"Analysis job {job_id} created for file: {file.filename}") |
|
|
|
|
|
return AnalysisStatus( |
|
|
job_id=job_id, |
|
|
status="processing", |
|
|
progress=0.0, |
|
|
message="Document uploaded successfully. Analysis in progress." |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error creating analysis job: {str(e)}") |
|
|
job_tracker[job_id]["status"] = "failed" |
|
|
job_tracker[job_id]["error"] = str(e) |
|
|
|
|
|
|
|
|
security_manager.audit_logger.log_access( |
|
|
user_id=current_user.get("user_id", "unknown"), |
|
|
action="UPLOAD_FAILED", |
|
|
resource=f"document:{job_id}", |
|
|
ip_address=client_ip, |
|
|
status="FAILED", |
|
|
details={"error": str(e)} |
|
|
) |
|
|
|
|
|
raise HTTPException(status_code=500, detail=f"Analysis failed: {str(e)}") |
|
|
|
|
|
|
|
|
@app.get("/status/{job_id}", response_model=AnalysisStatus) |
|
|
async def get_analysis_status(job_id: str): |
|
|
"""Get the current status of an analysis job""" |
|
|
|
|
|
if job_id not in job_tracker: |
|
|
raise HTTPException(status_code=404, detail="Job not found") |
|
|
|
|
|
job_data = job_tracker[job_id] |
|
|
|
|
|
return AnalysisStatus( |
|
|
job_id=job_id, |
|
|
status=job_data["status"], |
|
|
progress=job_data.get("progress", 0.0), |
|
|
message=job_data.get("message", "Processing...") |
|
|
) |
|
|
|
|
|
|
|
|
@app.get("/results/{job_id}", response_model=AnalysisResult) |
|
|
async def get_analysis_results(job_id: str): |
|
|
"""Retrieve the analysis results for a completed job""" |
|
|
|
|
|
if job_id not in job_tracker: |
|
|
raise HTTPException(status_code=404, detail="Job not found") |
|
|
|
|
|
job_data = job_tracker[job_id] |
|
|
|
|
|
if job_data["status"] != "completed": |
|
|
raise HTTPException( |
|
|
status_code=400, |
|
|
detail=f"Analysis not completed. Current status: {job_data['status']}" |
|
|
) |
|
|
|
|
|
return AnalysisResult(**job_data["result"]) |
|
|
|
|
|
|
|
|
@app.get("/supported-models") |
|
|
async def get_supported_models(): |
|
|
"""Get list of supported medical AI models by domain""" |
|
|
return { |
|
|
"domains": { |
|
|
"clinical_notes": { |
|
|
"models": ["MedGemma 27B", "Bio_ClinicalBERT"], |
|
|
"tasks": ["summarization", "entity_extraction", "coding"] |
|
|
}, |
|
|
"radiology": { |
|
|
"models": ["MedGemma 4B Multimodal", "MONAI"], |
|
|
"tasks": ["vqa", "report_generation", "segmentation"] |
|
|
}, |
|
|
"pathology": { |
|
|
"models": ["Path Foundation", "UNI2-h"], |
|
|
"tasks": ["slide_classification", "embedding_generation"] |
|
|
}, |
|
|
"cardiology": { |
|
|
"models": ["HuBERT-ECG"], |
|
|
"tasks": ["ecg_analysis", "event_prediction"] |
|
|
}, |
|
|
"laboratory": { |
|
|
"models": ["DrLlama", "Lab-AI"], |
|
|
"tasks": ["normalization", "explanation"] |
|
|
}, |
|
|
"drug_interactions": { |
|
|
"models": ["CatBoost DDI", "DrugGen"], |
|
|
"tasks": ["interaction_classification"] |
|
|
}, |
|
|
"diagnosis": { |
|
|
"models": ["MedGemma 27B"], |
|
|
"tasks": ["differential_diagnosis", "triage"] |
|
|
}, |
|
|
"coding": { |
|
|
"models": ["Rayyan Med Coding", "ICD-10 Predictors"], |
|
|
"tasks": ["icd10_extraction", "cpt_coding"] |
|
|
}, |
|
|
"mental_health": { |
|
|
"models": ["MentalBERT"], |
|
|
"tasks": ["screening", "sentiment_analysis"] |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
async def process_document_pipeline(job_id: str, file_path: str, filename: str, user_id: str = "unknown"): |
|
|
""" |
|
|
Background task for processing medical documents through the full pipeline |
|
|
|
|
|
Pipeline stages: |
|
|
1. PDF Extraction (text, images, tables) |
|
|
2. Document Classification |
|
|
3. Intelligent Routing |
|
|
4. Specialized Model Analysis |
|
|
5. Result Synthesis |
|
|
|
|
|
Security: All stages logged for HIPAA compliance |
|
|
""" |
|
|
|
|
|
try: |
|
|
|
|
|
job_tracker[job_id]["progress"] = 0.1 |
|
|
job_tracker[job_id]["message"] = "Extracting content from PDF..." |
|
|
logger.info(f"Job {job_id}: Starting PDF extraction") |
|
|
|
|
|
pdf_content = await pdf_processor.extract_content(file_path) |
|
|
|
|
|
|
|
|
job_tracker[job_id]["progress"] = 0.3 |
|
|
job_tracker[job_id]["message"] = "Classifying document type..." |
|
|
logger.info(f"Job {job_id}: Classifying document") |
|
|
|
|
|
classification = await document_classifier.classify(pdf_content) |
|
|
|
|
|
|
|
|
security_manager.audit_logger.log_phi_access( |
|
|
user_id=user_id, |
|
|
document_id=job_id, |
|
|
action="CLASSIFY", |
|
|
ip_address="internal" |
|
|
) |
|
|
|
|
|
|
|
|
job_tracker[job_id]["progress"] = 0.4 |
|
|
job_tracker[job_id]["message"] = "Routing to specialized models..." |
|
|
logger.info(f"Job {job_id}: Routing to models - {classification['document_type']}") |
|
|
|
|
|
model_tasks = model_router.route(classification, pdf_content) |
|
|
|
|
|
|
|
|
job_tracker[job_id]["progress"] = 0.5 |
|
|
job_tracker[job_id]["message"] = "Running specialized analysis..." |
|
|
logger.info(f"Job {job_id}: Running {len(model_tasks)} specialized models") |
|
|
|
|
|
specialized_results = [] |
|
|
for i, task in enumerate(model_tasks): |
|
|
result = await model_router.execute_task(task) |
|
|
specialized_results.append(result) |
|
|
progress = 0.5 + (0.3 * (i + 1) / len(model_tasks)) |
|
|
job_tracker[job_id]["progress"] = progress |
|
|
|
|
|
|
|
|
job_tracker[job_id]["progress"] = 0.9 |
|
|
job_tracker[job_id]["message"] = "Synthesizing results..." |
|
|
logger.info(f"Job {job_id}: Synthesizing results") |
|
|
|
|
|
final_analysis = await analysis_synthesizer.synthesize( |
|
|
classification, |
|
|
specialized_results, |
|
|
pdf_content |
|
|
) |
|
|
|
|
|
|
|
|
job_tracker[job_id]["progress"] = 1.0 |
|
|
job_tracker[job_id]["status"] = "completed" |
|
|
job_tracker[job_id]["message"] = "Analysis complete" |
|
|
job_tracker[job_id]["result"] = { |
|
|
"job_id": job_id, |
|
|
"document_type": classification["document_type"], |
|
|
"confidence": classification["confidence"], |
|
|
"analysis": final_analysis, |
|
|
"specialized_results": specialized_results, |
|
|
"summary": final_analysis.get("summary", ""), |
|
|
"timestamp": datetime.utcnow().isoformat() |
|
|
} |
|
|
|
|
|
logger.info(f"Job {job_id}: Analysis completed successfully") |
|
|
|
|
|
|
|
|
security_manager.audit_logger.log_phi_access( |
|
|
user_id=user_id, |
|
|
document_id=job_id, |
|
|
action="ANALYSIS_COMPLETE", |
|
|
ip_address="internal" |
|
|
) |
|
|
|
|
|
|
|
|
data_encryption.secure_delete(file_path) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Job {job_id}: Analysis failed - {str(e)}") |
|
|
job_tracker[job_id]["status"] = "failed" |
|
|
job_tracker[job_id]["message"] = f"Analysis failed: {str(e)}" |
|
|
job_tracker[job_id]["error"] = str(e) |
|
|
|
|
|
|
|
|
security_manager.audit_logger.log_access( |
|
|
user_id=user_id, |
|
|
action="ANALYSIS_FAILED", |
|
|
resource=f"document:{job_id}", |
|
|
ip_address="internal", |
|
|
status="FAILED", |
|
|
details={"error": str(e)} |
|
|
) |
|
|
|
|
|
|
|
|
if os.path.exists(file_path): |
|
|
data_encryption.secure_delete(file_path) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
import uvicorn |
|
|
uvicorn.run(app, host="0.0.0.0", port=7860) |
|
|
|