diff --git "a/backend/app.py" "b/backend/app.py" --- "a/backend/app.py" +++ "b/backend/app.py" @@ -1,1337 +1,1337 @@ -import os -import shutil - -for d in ["/tmp/huggingface", "/tmp/Ultralytics", "/tmp/matplotlib", "/tmp/torch", "/root/.cache"]: - shutil.rmtree(d, ignore_errors=True) - -os.environ["HF_HOME"] = "/tmp/huggingface" -os.environ["HUGGINGFACE_HUB_CACHE"] = "/tmp/huggingface" -os.environ["TORCH_HOME"] = "/tmp/torch" -os.environ["MPLCONFIGDIR"] = "/tmp/matplotlib" -os.environ["YOLO_CONFIG_DIR"] = "/tmp/Ultralytics" - -import json -import importlib.util -import uuid -import datetime -import numpy as np -import torch -import cv2 -import joblib -import torch.nn as nn -import torchvision.transforms as transforms -import torchvision.models as models -from io import BytesIO -from PIL import Image as PILImage -from fastapi import FastAPI, File, UploadFile, Form -from fastapi.middleware.cors import CORSMiddleware -from fastapi.responses import JSONResponse, FileResponse -import tensorflow as tf -from model_histo import BreastCancerClassifier -from fastapi.staticfiles import StaticFiles -import uvicorn -try: - from reportlab.lib.pagesizes import letter - from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Image as ReportLabImage - from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle - from reportlab.lib.enums import TA_CENTER, TA_JUSTIFY - from reportlab.lib.units import inch - from reportlab.lib.colors import navy, black - REPORTLAB_AVAILABLE = True -except ImportError: - REPORTLAB_AVAILABLE = False -from ultralytics import YOLO -from sklearn.preprocessing import MinMaxScaler -from model import MWT as create_model -from augmentations import Augmentations -from huggingface_hub import InferenceClient - -# ===================================================== - -# SETUP TEMP DIRS AND ENV - -# ===================================================== - -for d in ["/tmp/huggingface", "/tmp/Ultralytics", "/tmp/matplotlib", "/tmp/torch"]: - shutil.rmtree(d, ignore_errors=True) - -os.environ["HF_HOME"] = "/tmp/huggingface" -os.environ["HUGGINGFACE_HUB_CACHE"] = "/tmp/huggingface" -os.environ["TORCH_HOME"] = "/tmp/torch" -os.environ["MPLCONFIGDIR"] = "/tmp/matplotlib" -os.environ["YOLO_CONFIG_DIR"] = "/tmp/Ultralytics" - -# ===================================================== - -# HUGGING FACE CLIENT SETUP - -# ===================================================== - -HF_MODEL_ID = "BioMistral/BioMistral-7B" -hf_token = os.getenv("HF_TOKEN") -client = None - -if hf_token: - try: - client = InferenceClient(model=HF_MODEL_ID, token=hf_token) - print(f"✅ Hugging Face InferenceClient initialized for {HF_MODEL_ID}") - except Exception as e: - print("⚠️ Failed to initialize Hugging Face client:", e) -else: - print("⚠️ Warning: No HF_TOKEN found — summaries will be skipped.") - -def generate_ai_summary(abnormal_cells, normal_cells, avg_confidence): - """Generate a brief medical interpretation using Mistral.""" - if not client: - return "⚠️ Hugging Face client not initialized — skipping summary." - - try: - prompt = f"""Act as a cytopathology expert providing a brief diagnostic interpretation. - -Observed Cell Counts: -- {abnormal_cells} Abnormal Cells -- {normal_cells} Normal Cells - -Write a 2-3 sentence professional medical assessment focusing on: -1. Cell count analysis -2. Abnormality ratio ({abnormal_cells/(abnormal_cells + normal_cells)*100:.1f}%) -3. Clinical significance - -Use objective, scientific language suitable for a pathology report.""" - - # Use streaming to avoid StopIteration - response = client.text_generation( - prompt, - max_new_tokens=200, - temperature=0.7, - stream=False, - details=True, - stop_sequences=["\n\n", "###"] - ) - - # Handle different response formats - if hasattr(response, 'generated_text'): - return response.generated_text.strip() - elif isinstance(response, dict): - return response.get('generated_text', '').strip() - elif isinstance(response, str): - return response.strip() - - # Fallback summary if response format is unexpected - ratio = abnormal_cells / (abnormal_cells + normal_cells) * 100 if (abnormal_cells + normal_cells) > 0 else 0 - return f"Analysis shows {abnormal_cells} abnormal cells ({ratio:.1f}%) and {normal_cells} normal cells." - - except Exception as e: - # Provide a structured fallback summary instead of error message - total = abnormal_cells + normal_cells - if total == 0: - return "No cells were detected in the sample. Consider re-scanning or adjusting detection parameters." - - ratio = (abnormal_cells / total) * 100 - severity = "high" if ratio > 70 else "moderate" if ratio > 30 else "low" - - return f"Quantitative analysis detected {abnormal_cells} abnormal cells ({ratio:.1f}%) among {total} total cells, indicating {severity} abnormality ratio." - - -def generate_mwt_summary(predicted_label, confidences, avg_confidence): - """Generate a short MWT-specific interpretation using the HF client when available.""" - if not client: - return "⚠️ Hugging Face client not initialized — skipping AI interpretation." - - try: - prompt = f""" -You are a concise cytopathology expert. Given an MWT classifier result, write a 1-2 sentence professional interpretation suitable for embedding in a diagnostic report. - -Result: -- Predicted label: {predicted_label} -- Class probabilities: {json.dumps(confidences)} - -Provide guidance on the significance of the result and any suggested next steps in plain, objective language. -""" - - response = client.text_generation( - prompt, - max_new_tokens=120, - temperature=0.2, - stream=False, - details=True, - stop_sequences=["\n\n", "###"] - ) - - if hasattr(response, 'generated_text'): - return response.generated_text.strip() - elif isinstance(response, dict): - return response.get('generated_text', '').strip() - elif isinstance(response, str): - return response.strip() - - return f"Result: {predicted_label}." - except Exception as e: - return f"Quantitative result: {predicted_label}." - - -def generate_cin_summary(predicted_grade, confidences, avg_confidence): - """Generate a short CIN-specific interpretation using the HF client when available.""" - if not client: - return "⚠️ Hugging Face client not initialized — skipping AI interpretation." - - try: - prompt = f""" -You are a concise gynecologic pathology expert. Given a CIN classifier result, write a 1-2 sentence professional interpretation suitable for a diagnostic report. - -Result: -- Predicted grade: {predicted_grade} -- Class probabilities: {json.dumps(confidences)} - -Provide a brief statement about clinical significance and suggested next steps (e.g., further colposcopic evaluation) in objective, clinical language. -""" - - response = client.text_generation( - prompt, - max_new_tokens=140, - temperature=0.2, - stream=False, - details=True, - stop_sequences=["\n\n", "###"] - ) - - if hasattr(response, 'generated_text'): - return response.generated_text.strip() - elif isinstance(response, dict): - return response.get('generated_text', '').strip() - elif isinstance(response, str): - return response.strip() - - return f"Result: {predicted_grade}." - except Exception: - return f"Quantitative result: {predicted_grade}." - - -# ===================================================== - -# FASTAPI SETUP - -# ===================================================== - -app = FastAPI(title="Pathora Medical Diagnostic API") - -app.add_middleware( - CORSMiddleware, - allow_origins=["*", "http://localhost:5173", "http://127.0.0.1:5173"], - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], - expose_headers=["*"] # Allow access to response headers -) - -# Use /tmp for outputs in Hugging Face Spaces (writable directory) -OUTPUT_DIR = os.environ.get("OUTPUT_DIR", "/tmp/outputs") -os.makedirs(OUTPUT_DIR, exist_ok=True) - -# Create image outputs dir -IMAGES_DIR = os.path.join(OUTPUT_DIR, "images") -os.makedirs(IMAGES_DIR, exist_ok=True) - -app.mount("/outputs", StaticFiles(directory=OUTPUT_DIR), name="outputs") - -# Mount public sample images from frontend dist (Vite copies public/ to dist/ root) -# Check both possible locations: frontend/dist (Docker) and ../frontend/dist (local dev) -FRONTEND_DIST_CHECK = os.path.join(os.path.dirname(__file__), "frontend/dist") -if not os.path.isdir(FRONTEND_DIST_CHECK): - FRONTEND_DIST_CHECK = os.path.abspath(os.path.join(os.path.dirname(__file__), "../frontend/dist")) - -for sample_dir in ["cyto", "colpo", "histo"]: - sample_path = os.path.join(FRONTEND_DIST_CHECK, sample_dir) - if os.path.isdir(sample_path): - app.mount(f"/{sample_dir}", StaticFiles(directory=sample_path), name=sample_dir) - print(f"✅ Mounted /{sample_dir} from {sample_path}") - else: - print(f"⚠️ Sample directory not found: {sample_path}") - -# Mount other static assets (logos, banners) from dist root -for static_file in ["banner.jpeg", "white_logo.png", "black_logo.png", "manalife_LOGO.jpg"]: - static_path = os.path.join(FRONTEND_DIST_CHECK, static_file) - if os.path.isfile(static_path): - print(f"✅ Static file available: /{static_file}") - -device = torch.device("cuda" if torch.cuda.is_available() else "cpu") - -# ===================================================== - -# MODEL LOADS - -# ===================================================== - -print("🔹 Loading YOLO model...") -yolo_model = YOLO("best2.pt") - -print("🔹 Loading MWT model...") -mwt_model = create_model(num_classes=2).to(device) -mwt_model.load_state_dict(torch.load("MWTclass2.pth", map_location=device)) -mwt_model.eval() -mwt_class_names = ["Negative", "Positive"] - -print("🔹 Loading CIN model...") -try: - clf = joblib.load("logistic_regression_model.pkl") -except Exception as e: - print(f"⚠️ CIN classifier not available (logistic_regression_model.pkl missing or invalid): {e}") - clf = None - -yolo_colposcopy = YOLO("yolo_colposcopy.pt") - -# Load the Manalife Pathora colposcopy fusion pipeline -COLPO_INFERENCE_PATH = os.path.join(os.path.dirname(__file__), "Colpo", "inference.py") -colpo_inference = None - -try: - spec = importlib.util.spec_from_file_location("colpo_inference", COLPO_INFERENCE_PATH) - if spec and spec.loader: - colpo_inference = importlib.util.module_from_spec(spec) - spec.loader.exec_module(colpo_inference) - print("✅ Loaded Manalife Pathora colposcopy inference module.") - else: - raise ImportError("Invalid import spec for Colpo inference module") -except Exception as e: - colpo_inference = None - print(f"⚠️ Could not load Manalife Pathora colposcopy inference: {e}") - -# ===================================================== - -# RESNET FEATURE EXTRACTORS FOR CIN - -# ===================================================== - -def build_resnet(model_name="resnet50"): - if model_name == "resnet50": - model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT) - elif model_name == "resnet101": - model = models.resnet101(weights=models.ResNet101_Weights.DEFAULT) - elif model_name == "resnet152": - model = models.resnet152(weights=models.ResNet152_Weights.DEFAULT) - model.eval().to(device) - return ( - nn.Sequential(model.conv1, model.bn1, model.relu, model.maxpool), - model.layer1, model.layer2, model.layer3, model.layer4, - ) -gap = nn.AdaptiveAvgPool2d((1, 1)) -gmp = nn.AdaptiveMaxPool2d((1, 1)) -resnet50_blocks = build_resnet("resnet50") -resnet101_blocks = build_resnet("resnet101") -resnet152_blocks = build_resnet("resnet152") - -transform = transforms.Compose([ -transforms.ToPILImage(), -transforms.Resize((224, 224)), -transforms.ToTensor(), -transforms.Normalize(mean=[0.485, 0.456, 0.406], -std=[0.229, 0.224, 0.225]), -]) - - -def preprocess_for_mwt(image_np): - img = cv2.resize(image_np, (224, 224)) - img = Augmentations.Normalization((0, 1))(img) - img = np.array(img, np.float32) - img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) - img = img.transpose(2, 0, 1) - img = np.expand_dims(img, axis=0) - return torch.Tensor(img) - -def extract_cbf_features(blocks, img_t): - block1, block2, block3, block4, block5 = blocks - with torch.no_grad(): - f1 = block1(img_t) - f2 = block2(f1) - f3 = block3(f2) - f4 = block4(f3) - f5 = block5(f4) - p1 = gmp(f1).view(-1) - p2 = gmp(f2).view(-1) - p3 = gap(f3).view(-1) - p4 = gap(f4).view(-1) - p5 = gap(f5).view(-1) - return torch.cat([p1, p2, p3, p4, p5], dim=0).cpu().numpy() - -# ===================================================== -# Model 4: Histopathology Classifier (TensorFlow) -# ===================================================== -print("🔹 Attempting to load Breast Cancer Histopathology model...") - -try: - classifier = BreastCancerClassifier(fine_tune=False) - - # Safely handle Hugging Face token auth - hf_token = os.getenv("HF_TOKEN") - if hf_token: - if classifier.authenticate_huggingface(): - print("✅ Hugging Face authentication successful.") - else: - print("⚠️ Warning: Hugging Face authentication failed, using local model only.") - else: - print("⚠️ HF_TOKEN not found in environment — Path Foundation model cannot be loaded.") - print(" To use histopathology predictions, set the HF_TOKEN environment variable.") - - # Load Path Foundation model - print("Attempting to load Path Foundation model...") - if classifier.load_path_foundation(): - print("✅ Loaded Path Foundation base model.") - else: - print("⚠️ WARNING: Could not load Path Foundation base model.") - print(" Histopathology predictions will not work without this model.") - print(" Set HF_TOKEN environment variable to enable this feature.") - - # Load trained histopathology model - model_path = "histopathology_trained_model.keras" - if os.path.exists(model_path): - classifier.model = tf.keras.models.load_model(model_path) - print(f"✅ Loaded local histopathology model: {model_path}") - else: - print(f"⚠️ Model file not found: {model_path}") - print(" Histopathology predictions will not work without the trained weights.") - -except Exception as e: - classifier = None - print(f"❌ Error initializing histopathology model: {e}") - import traceback - traceback.print_exc() - -def predict_histopathology(image): - if classifier is None: - return {"error": "Histopathology model not available."} - - try: - if image.mode != "RGB": - image = image.convert("RGB") - image = image.resize((224, 224)) - img_array = np.expand_dims(np.array(image).astype("float32") / 255.0, axis=0) - - # Check if path_foundation model is loaded - if classifier.path_foundation is None: - return {"error": "Histopathology prediction failed: Path Foundation model not loaded. Ensure HF_TOKEN is set and the model can be downloaded."} - - embeddings = classifier.extract_embeddings(img_array) - - # Check if model is loaded - if classifier.model is None: - return {"error": "Histopathology prediction failed: Classification model not loaded."} - - prediction_proba = classifier.model.predict(embeddings, verbose=0)[0] - predicted_class = int(np.argmax(prediction_proba)) - class_names = ["Benign", "Malignant"] - - # Return confidence as dictionary with both class probabilities (like MWT/CIN) - confidences = {class_names[i]: float(prediction_proba[i]) for i in range(len(class_names))} - avg_confidence = float(np.max(prediction_proba)) * 100 - - return { - "model_used": "Histopathology Classifier", - "prediction": class_names[predicted_class], - "confidence": confidences, - "summary": { - "ai_interpretation": f"Histopathological analysis indicates {class_names[predicted_class].lower()} tissue.", - }, - } - except Exception as e: - return {"error": f"Histopathology prediction failed: {e}"} - - -# ===================================================== - -# MAIN ENDPOINT - -# ===================================================== - - -@app.post("/predict/") -async def predict(model_name: str = Form(...), file: UploadFile = File(...)): - print(f"Received prediction request - model: {model_name}, file: {file.filename}") - - # Validate model name - if model_name not in ["yolo", "mwt", "cin", "histopathology", "manalife_pathora_model"]: - return JSONResponse( - content={ - "error": f"Invalid model_name: {model_name}. Must be one of: yolo, mwt, cin, histopathology, manalife_pathora_model" - }, - status_code=400 - ) - - # Validate and read file - if not file.filename: - return JSONResponse( - content={"error": "No file provided"}, - status_code=400 - ) - - contents = await file.read() - if len(contents) == 0: - return JSONResponse( - content={"error": "Empty file provided"}, - status_code=400 - ) - - # Attempt to open and validate image - try: - image = PILImage.open(BytesIO(contents)).convert("RGB") - image_np = np.array(image) - if image_np.size == 0: - raise ValueError("Empty image array") - print(f"Successfully loaded image, shape: {image_np.shape}") - except Exception as e: - return JSONResponse( - content={"error": f"Invalid image file: {str(e)}"}, - status_code=400 - ) - - if model_name == "yolo": - results = yolo_model(image) - detections_json = results[0].to_json() - detections = json.loads(detections_json) - - abnormal_cells = sum(1 for d in detections if d["name"] == "abnormal") - normal_cells = sum(1 for d in detections if d["name"] == "normal") - avg_confidence = np.mean([d.get("confidence", 0) for d in detections]) * 100 if detections else 0 - - ai_summary = generate_ai_summary(abnormal_cells, normal_cells, avg_confidence) - - output_filename = f"detected_{uuid.uuid4().hex[:8]}.jpg" - output_path = os.path.join(IMAGES_DIR, output_filename) - results[0].save(filename=output_path) - - return { - "model_used": "YOLO Detection", - "detections": detections, - "annotated_image_url": f"/outputs/images/{output_filename}", - "summary": { - "abnormal_cells": abnormal_cells, - "normal_cells": normal_cells, - "ai_interpretation": ai_summary, - }, - } - - elif model_name == "mwt": - tensor = preprocess_for_mwt(image_np) - with torch.no_grad(): - output = mwt_model(tensor.to(device)).cpu() - probs = torch.softmax(output, dim=1)[0] - confidences = {mwt_class_names[i]: float(probs[i]) for i in range(2)} - predicted_label = mwt_class_names[int(torch.argmax(probs).item())] - # Average / primary confidence for display - avg_confidence = float(torch.max(probs).item()) * 100 - - # Generate a brief AI interpretation using the Mistral client (if available) - ai_interp = generate_mwt_summary(predicted_label, confidences, avg_confidence) - - return { - "model_used": "MWT Classifier", - "prediction": predicted_label, - "confidence": confidences, - "summary": { - "ai_interpretation": ai_interp, - }, - } - - elif model_name == "cin": - if clf is None: - return JSONResponse( - content={"error": "CIN classifier not available on server."}, - status_code=503, - ) - - # Decode uploaded image and run colposcopy detector - nparr = np.frombuffer(contents, np.uint8) - img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) - results = yolo_colposcopy.predict(source=img, conf=0.7, save=False, verbose=False) - if len(results[0].boxes) == 0: - return {"error": "No cervix detected"} - - x1, y1, x2, y2 = map(int, results[0].boxes.xyxy[0].cpu().numpy()) - crop = img[y1:y2, x1:x2] - crop = cv2.resize(crop, (224, 224)) - img_t = transform(crop).unsqueeze(0).to(device) - - # Extract features from multiple ResNet backbones - f50 = extract_cbf_features(resnet50_blocks, img_t) - f101 = extract_cbf_features(resnet101_blocks, img_t) - f152 = extract_cbf_features(resnet152_blocks, img_t) - features = np.concatenate([f50, f101, f152]).reshape(1, -1) - - # Scale and predict - X_scaled = MinMaxScaler().fit_transform(features) - - # Ensure classifier supports probability outputs - try: - proba = clf.predict_proba(X_scaled)[0] - except Exception as e: - return JSONResponse( - content={"error": "CIN classifier does not support probability estimates (predict_proba)."}, - status_code=503, - ) - - num_classes = int(len(proba)) - - # Handle different classifier output sizes: - # - If 3 classes: map directly to CIN1/CIN2/CIN3 - # - If 2 classes: apply a conservative heuristic to split High-grade into CIN2/CIN3 - if num_classes == 3: - classes = ["CIN1", "CIN2", "CIN3"] - confidences = {classes[i]: float(proba[i]) for i in range(3)} - predicted_idx = int(np.argmax(proba)) - predicted_label = classes[predicted_idx] - avg_confidence = float(np.max(proba)) * 100 - mapping_used = "direct_3class" - elif num_classes == 2: - # Binary model detected (e.g., Low-grade vs High-grade). We'll convert to CIN1/CIN2/CIN3 - # Heuristic: - # - CIN1 <- low_grade_prob - # - Split high_grade_prob into CIN2 and CIN3 based on how confident 'high' is. - # * If high <= 0.6 -> mostly CIN2 - # * If high >= 0.8 -> mostly CIN3 - # * Between 0.6 and 0.8 -> interpolate - low_prob = float(proba[0]) - high_prob = float(proba[1]) - - if high_prob <= 0.6: - cin3_factor = 0.0 - elif high_prob >= 0.8: - cin3_factor = 1.0 - else: - cin3_factor = (high_prob - 0.6) / 0.2 - - cin1 = low_prob - cin3 = high_prob * cin3_factor - cin2 = high_prob - cin3 - - confidences = {"CIN1": cin1, "CIN2": cin2, "CIN3": cin3} - # pick highest of the mapped three as primary prediction - predicted_label = max(confidences.items(), key=lambda x: x[1])[0] - avg_confidence = float(max(confidences.values())) * 100 - mapping_used = "binary_to_3class_heuristic" - else: - return JSONResponse( - content={ - "error": "CIN classifier must output 2-class (Low/High) or 3-class probabilities (CIN1, CIN2, CIN3).", - "detected_num_classes": num_classes, - }, - status_code=503, - ) - - # Generate AI interpretation using Mistral client (if available) - ai_interp = generate_cin_summary(predicted_label, confidences, avg_confidence) - - response = { - "model_used": "CIN Classifier", - "prediction": predicted_label, - "confidence": confidences, - "summary": { - "ai_interpretation": ai_interp, - }, - } - - # If we used the binary->3class heuristic, include a diagnostic field so callers know it was mapped - if 'mapping_used' in locals() and mapping_used == 'binary_to_3class_heuristic': - response["mapping_used"] = mapping_used - response["mapping_note"] = ( - "The server mapped a binary Low/High classifier to CIN1/CIN2/CIN3 using a heuristic split. " - "This is an approximation — for clinical use please supply a native 3-class model." - ) - - return response - elif model_name == "manalife_pathora_model": - if colpo_inference is None or not hasattr(colpo_inference, "run_inference"): - return JSONResponse( - content={"error": "Pathora colposcopy model not available on server."}, - status_code=503, - ) - - # Save the incoming image to disk for the fusion pipeline - input_name = f"colpo_input_{uuid.uuid4().hex[:8]}.png" - input_path = os.path.join(IMAGES_DIR, input_name) - with open(input_path, "wb") as f: - f.write(contents) - - try: - fusion_result = colpo_inference.run_inference(input_path) - except Exception as e: - return JSONResponse( - content={"error": f"Colposcopy fusion inference failed: {e}"}, - status_code=500, - ) - - prob_abn = float(fusion_result.get("probability_abnormal", 0.0)) - acet_present = int(fusion_result.get("acet_present", 0)) - decision_text = fusion_result.get("decision", "Decision unavailable") - - overlay_src = fusion_result.get("overlay_image") - overlay_url = None - if overlay_src and os.path.isfile(overlay_src): - overlay_name = f"colpo_overlay_{uuid.uuid4().hex[:8]}.png" - overlay_dst = os.path.join(IMAGES_DIR, overlay_name) - try: - shutil.copy(overlay_src, overlay_dst) - overlay_url = f"/outputs/images/{overlay_name}" - except Exception as copy_err: - print(f"⚠️ Failed to copy overlay image: {copy_err}") - - # Fallback: expose the raw input if overlay is missing - if not overlay_url: - overlay_url = f"/outputs/images/{input_name}" - - return { - "model_used": "Manalife_Pathora_model", - "decision": decision_text, - "probability_abnormal": round(prob_abn, 3), - "acet_present": acet_present, - "annotated_image_url": overlay_url, - "summary": { - "model_used": "Manalife_Pathora_model", - "decision": decision_text, - "probability_abnormal": round(prob_abn, 3), - "acet_present": "Yes" if acet_present else "No", - "ai_interpretation": decision_text, - }, - } - elif model_name == "histopathology": - result = predict_histopathology(image) - return result - - - else: - return JSONResponse(content={"error": "Invalid model name"}, status_code=400) - -# ===================================================== - -# ROUTES - -# ===================================================== - -def create_designed_pdf(pdf_path, report_data, analysis_summary_json, annotated_image_path=None): - doc = SimpleDocTemplate(pdf_path, pagesize=letter, - rightMargin=72, leftMargin=72, - topMargin=72, bottomMargin=18) - styles = getSampleStyleSheet() - story = [] - - styles.add(ParagraphStyle(name='Title', fontSize=20, fontName='Helvetica-Bold', alignment=TA_CENTER, textColor=navy)) - styles.add(ParagraphStyle(name='Section', fontSize=14, fontName='Helvetica-Bold', spaceBefore=10, spaceAfter=6)) - styles.add(ParagraphStyle(name='NormalSmall', fontSize=10, leading=12)) - styles.add(ParagraphStyle(name='Heading', fontSize=16, fontName='Helvetica-Bold', textColor=navy, spaceBefore=6, spaceAfter=4)) - - patient = report_data['patient'] - analysis = report_data.get('analysis', {}) - - # Safely parse analysis_summary_json - try: - ai_summary = json.loads(analysis_summary_json) if analysis_summary_json else {} - except (json.JSONDecodeError, TypeError): - ai_summary = {} - - # Determine report type based on model used - model_used = ai_summary.get('model_used', '') - if 'YOLO' in model_used or 'yolo' in str(analysis.get('id', '')).lower(): - report_type = "CYTOLOGY" - report_title = "Cytology Report" - elif 'MWT' in model_used or 'mwt' in str(model_used).lower(): - # MWT is a cytology classifier; use a clearer report title for MWT results - report_type = "CYTOLOGY" - report_title = "Cytology Analysis Report" - elif 'CIN' in model_used or 'cin' in str(analysis.get('id', '')).lower() or 'colpo' in str(analysis.get('id', '')).lower(): - report_type = "COLPOSCOPY" - report_title = "Colposcopy Report" - elif 'histo' in str(analysis.get('id', '')).lower() or 'histopathology' in model_used.lower(): - report_type = "HISTOPATHOLOGY" - report_title = "Histopathology Report" - else: - report_type = "CYTOLOGY" - report_title = "Medical Analysis Report" - - # Header - story.append(Paragraph("MANALIFE AI", styles['Title'])) - story.append(Paragraph("Advanced Medical Analysis", styles['NormalSmall'])) - story.append(Spacer(1, 0.3*inch)) - story.append(Paragraph(f"MEDICAL ANALYSIS REPORT OF {report_type}", styles['Heading'])) - story.append(Paragraph(report_title, styles['Section'])) - story.append(Spacer(1, 0.2*inch)) - - # Report ID and Date - story.append(Paragraph(f"Report ID: {report_data.get('report_id', 'N/A')}", styles['NormalSmall'])) - story.append(Paragraph(f"Generated: {datetime.datetime.now().strftime('%b %d, %Y, %I:%M %p')}", styles['NormalSmall'])) - story.append(Spacer(1, 0.2*inch)) - - # Patient Information Section - story.append(Paragraph("Patient Information", styles['Section'])) - story.append(Paragraph(f"Patient ID: {patient.get('id', 'N/A')}", styles['NormalSmall'])) - story.append(Paragraph(f"Exam Date: {patient.get('exam_date', 'N/A')}", styles['NormalSmall'])) - story.append(Paragraph(f"Physician: {patient.get('physician', 'N/A')}", styles['NormalSmall'])) - story.append(Paragraph(f"Facility: {patient.get('facility', 'N/A')}", styles['NormalSmall'])) - story.append(Spacer(1, 0.2*inch)) - - # Sample Information Section - story.append(Paragraph("Sample Information", styles['Section'])) - story.append(Paragraph(f"Specimen Type: {patient.get('specimen_type', 'Cervical Cytology')}", styles['NormalSmall'])) - story.append(Paragraph(f"Clinical History: {patient.get('clinical_history', 'N/A')}", styles['NormalSmall'])) - story.append(Spacer(1, 0.2*inch)) - - # AI Analysis Section - story.append(Paragraph("AI-ASSISTED ANALYSIS", styles['Section'])) - story.append(Paragraph("System: Manalife AI System — Automated Analysis", styles['NormalSmall'])) - - # Add metrics based on report type - if report_type == "HISTOPATHOLOGY": - # For histopathology, show Benign/Malignant confidence - confidence_dict = ai_summary.get('confidence', {}) - if isinstance(confidence_dict, dict): - benign_conf = confidence_dict.get('Benign', 0) * 100 - malignant_conf = confidence_dict.get('Malignant', 0) * 100 - story.append(Paragraph(f"Benign Confidence: {benign_conf:.2f}%", styles['NormalSmall'])) - story.append(Paragraph(f"Malignant Confidence: {malignant_conf:.2f}%", styles['NormalSmall'])) - elif report_type == "CYTOLOGY": - # For cytology and MWT, show class confidences if available, otherwise show abnormal/normal cells - confidence_dict = ai_summary.get('confidence', {}) - if isinstance(confidence_dict, dict) and confidence_dict: - for cls, val in confidence_dict.items(): - conf_pct = val * 100 if isinstance(val, (int, float)) else 0 - story.append(Paragraph(f"{cls} Confidence: {conf_pct:.2f}%", styles['NormalSmall'])) - else: - if 'abnormal_cells' in ai_summary: - story.append(Paragraph(f"Abnormal Cells: {ai_summary.get('abnormal_cells', 'N/A')}", styles['NormalSmall'])) - if 'normal_cells' in ai_summary: - story.append(Paragraph(f"Normal Cells: {ai_summary.get('normal_cells', 'N/A')}", styles['NormalSmall'])) - else: - # For CIN/Colposcopy, show class confidences - confidence_dict = ai_summary.get('confidence', {}) - if isinstance(confidence_dict, dict): - for cls, val in confidence_dict.items(): - conf_pct = val * 100 if isinstance(val, (int, float)) else 0 - story.append(Paragraph(f"{cls} Confidence: {conf_pct:.2f}%", styles['NormalSmall'])) - - story.append(Spacer(1, 0.1*inch)) - story.append(Paragraph("AI Interpretation:", styles['NormalSmall'])) - story.append(Paragraph(ai_summary.get('ai_interpretation', 'Not available.'), styles['NormalSmall'])) - story.append(Spacer(1, 0.2*inch)) - - # If an annotated image path was provided and exists on disk, embed it - if annotated_image_path: - try: - if os.path.isfile(annotated_image_path): - story.append(Spacer(1, 0.1*inch)) - # Determine image pixel size and scale to a reasonable width for PDF - try: - from PIL import Image as PILImageLocal - with PILImageLocal.open(annotated_image_path) as im: - img_w, img_h = im.size - except Exception: - img_w, img_h = (800, 600) - - # Display width in points (ReportLab uses points; 1 inch = 72 points). Assume 96 DPI for pixel->inch. - display_width_px = max(300, min(img_w, 800)) - width_points = min(5 * inch, (display_width_px / 96.0) * inch) - - img = ReportLabImage(annotated_image_path, width=width_points, kind='proportional') - story.append(img) - story.append(Spacer(1, 0.2*inch)) - except Exception as e: - # Don't fail the whole PDF creation if image embedding fails - print(f"⚠️ Could not embed annotated image in PDF: {e}") - - # Doctor's Notes - story.append(Paragraph("Doctor's Notes", styles['Section'])) - story.append(Paragraph(report_data.get('doctor_notes') or 'No additional notes provided.', styles['NormalSmall'])) - story.append(Spacer(1, 0.2*inch)) - - # Recommendations - story.append(Paragraph("RECOMMENDATIONS", styles['Section'])) - story.append(Paragraph("Continue routine screening as per standard guidelines. Follow up as directed by your physician.", styles['NormalSmall'])) - story.append(Spacer(1, 0.3*inch)) - - # Signatures - story.append(Paragraph("Signatures", styles['Section'])) - story.append(Paragraph("Rajesh Venugopal, Physician", styles['NormalSmall'])) - #story.append(Paragraph("", styles['NormalSmall'])) - story.append(Spacer(1, 0.1*inch)) - story.append(Paragraph(f"Generated on: {datetime.datetime.now().strftime('%b %d, %Y, %I:%M %p')}", styles['NormalSmall'])) - - doc.build(story) - - - -@app.post("/reports/") -async def generate_report( - patient_id: str = Form(...), - exam_date: str = Form(...), - metadata: str = Form(...), - notes: str = Form(None), - analysis_id: str = Form(None), - analysis_summary: str = Form(None), - file: UploadFile = File(None), -): - """Generate a structured medical report from analysis results and metadata.""" - try: - # Create reports directory if it doesn't exist - reports_dir = os.path.join(OUTPUT_DIR, "reports") - os.makedirs(reports_dir, exist_ok=True) - - # Generate unique report ID - report_id = f"{patient_id}_{uuid.uuid4().hex[:8]}" - report_dir = os.path.join(reports_dir, report_id) - os.makedirs(report_dir, exist_ok=True) - - # Parse metadata - metadata_dict = json.loads(metadata) - - # Get analysis results - assuming stored in memory or retrievable - # TODO: Implement analysis results storage/retrieval - - # Construct report data - report_data = { - "report_id": report_id, - "generated_at": datetime.datetime.now().isoformat(), - "patient": { - "id": patient_id, - "exam_date": exam_date, - **metadata_dict - }, - "analysis": { - "id": analysis_id, - # If the analysis_id is actually an annotated image URL, store it for report embedding - "annotated_image_url": analysis_id, - # TODO: Add actual analysis results - }, - "doctor_notes": notes - } - - # Save report data - report_json = os.path.join(report_dir, "report.json") - with open(report_json, "w", encoding="utf-8") as f: - json.dump(report_data, f, indent=2, ensure_ascii=False) - - # We'll create PDF later (after parsing analysis_summary and resolving - # any annotated/input image). Initialize pdf_url here. - pdf_url = None - - # Parse analysis_summary to get AI results - try: - ai_summary = json.loads(analysis_summary) if analysis_summary else {} - except (json.JSONDecodeError, TypeError): - ai_summary = {} - - # Resolve annotated image: prefer AI/analysis annotated image; if none, - # save the uploaded input image (if provided) into the report folder - # and use that as the embedded image. - annotated_img = ai_summary.get('annotated_image_url') or report_data.get("analysis", {}).get("annotated_image_url") or "" - annotated_img_full = "" - annotated_img_local = None - - if annotated_img: - # If it's an outputs path served by StaticFiles, map to local file - if isinstance(annotated_img, str) and annotated_img.startswith('/outputs/'): - rel = annotated_img[len('/outputs/'):].lstrip('/') - annotated_img_local = os.path.join(OUTPUT_DIR, rel) - annotated_img_full = annotated_img - else: - # keep absolute URLs as-is for HTML - annotated_img_full = annotated_img if isinstance(annotated_img, str) else '' - - # If no annotated image provided, but an input file was uploaded, save it - if not annotated_img_full and file is not None and getattr(file, 'filename', None): - try: - input_filename = f"input_image{os.path.splitext(file.filename)[1] or '.jpg'}" - input_path = os.path.join(report_dir, input_filename) - contents = await file.read() - with open(input_path, 'wb') as out_f: - out_f.write(contents) - annotated_img_full = f"/outputs/reports/{report_id}/{input_filename}" - annotated_img_local = input_path - except Exception as e: - print(f"⚠️ Failed to save uploaded input image for report: {e}") - - # Ensure annotated_img_full has a leading slash if it's a relative path - if annotated_img_full and not annotated_img_full.startswith(('http://', 'https://')): - annotated_img_full = annotated_img_full if annotated_img_full.startswith('/') else '/' + annotated_img_full - - # If we have a local annotated image but it's stored in the shared images folder - # (e.g. /outputs/images/...), copy it into this report's folder so the HTML/PDF - # can reference the image relative to the report directory. This also makes the - # image visible when opening report.html directly from disk (file://). - try: - if annotated_img_local: - annotated_img_local_abs = os.path.abspath(annotated_img_local) - report_dir_abs = os.path.abspath(report_dir) - # If the image is not already in the report directory, copy it there - if not os.path.commonpath([annotated_img_local_abs, report_dir_abs]) == report_dir_abs: - src_basename = os.path.basename(annotated_img_local_abs) - dest_name = f"annotated_{src_basename}" - dest_path = os.path.join(report_dir, dest_name) - try: - shutil.copy2(annotated_img_local_abs, dest_path) - annotated_img_local = dest_path - annotated_img_full = f"/outputs/reports/{report_id}/{dest_name}" - except Exception as e: - # If copy fails, keep using the original annotated_img_full (may be served by StaticFiles) - print(f"⚠️ Failed to copy annotated image into report folder: {e}") - except Exception: - pass - - # Now attempt to create the PDF (passing the local annotated image path - # so the PDF writer can embed it). If annotation is remote or not - # available, PDF creation will still proceed without the image. - if REPORTLAB_AVAILABLE: - try: - pdf_path = os.path.join(report_dir, "report.pdf") - create_designed_pdf(pdf_path, report_data, analysis_summary, annotated_img_local) - pdf_url = f"/outputs/reports/{report_id}/report.pdf" - except Exception as e: - print(f"Error creating designed PDF: {e}") - pdf_url = None - - # Determine report type based on analysis summary or model used - model_used = ai_summary.get('model_used', '') - if 'YOLO' in model_used or 'yolo' in str(analysis_id).lower(): - report_type = "Cytology" - report_title = "Cytology Report" - elif 'MWT' in model_used or 'mwt' in str(model_used).lower() or 'mwt' in str(analysis_id).lower(): - # MWT is a cytology classifier — use clearer title - report_type = "Cytology" - report_title = "Cytology Analysis Report" - elif 'CIN' in model_used or 'cin' in str(analysis_id).lower() or 'colpo' in str(analysis_id).lower(): - report_type = "Colposcopy" - report_title = "Colposcopy Report" - elif 'histo' in str(analysis_id).lower() or 'histopathology' in model_used.lower(): - report_type = "Histopathology" - report_title = "Histopathology Report" - else: - # Default fallback - report_type = "Cytology" - report_title = "Medical Analysis Report" - - # Build analysis metrics HTML based on report type - if report_type == "Histopathology": - # For histopathology, show Benign/Malignant confidence from the confidence dict - confidence_dict = ai_summary.get('confidence', {}) - benign_conf = confidence_dict.get('Benign', 0) * 100 if isinstance(confidence_dict, dict) else 0 - malignant_conf = confidence_dict.get('Malignant', 0) * 100 if isinstance(confidence_dict, dict) else 0 - - analysis_metrics_html = f""" - SystemManalife AI System — Automated Analysis - Benign Confidence{benign_conf:.2f}% - Malignant Confidence{malignant_conf:.2f}% - """ - elif report_type == "Cytology": - # For cytology (YOLO) or MWT, show class confidences if provided, else abnormal/normal counts - confidence_dict = ai_summary.get('confidence', {}) - if isinstance(confidence_dict, dict) and confidence_dict: - confidence_rows = "" - for cls, val in confidence_dict.items(): - conf_pct = val * 100 if isinstance(val, (int, float)) else 0 - confidence_rows += f"{cls} Confidence{conf_pct:.2f}%\n " - analysis_metrics_html = f""" - SystemManalife AI System — Automated Analysis - {confidence_rows} - """ - else: - analysis_metrics_html = f""" - SystemManalife AI System — Automated Analysis - Abnormal Cells{ai_summary.get('abnormal_cells', 'N/A')} - Normal Cells{ai_summary.get('normal_cells', 'N/A')} - """ - else: - # For CIN/Colposcopy or other models, show generic confidence - confidence_dict = ai_summary.get('confidence', {}) - confidence_rows = "" - if isinstance(confidence_dict, dict): - for cls, val in confidence_dict.items(): - conf_pct = val * 100 if isinstance(val, (int, float)) else 0 - confidence_rows += f"{cls} Confidence{conf_pct:.2f}%\n " - - analysis_metrics_html = f""" - SystemManalife AI System — Automated Analysis - {confidence_rows} - """ - - # Build final HTML including download links and embedded annotated image - report_html = os.path.join(report_dir, "report.html") - json_url = f"/outputs/reports/{report_id}/report.json" - html_url = f"/outputs/reports/{report_id}/report.html" - - # annotated_img_full was computed earlier; ensure it's defined and set - # annotated_img (used by the HTML template conditional) accordingly. - if 'annotated_img_full' not in locals() or not annotated_img_full: - annotated_img_full = '' - annotated_img = annotated_img_full - - # Compute a display path for the HTML. Prefer a relative filename when the - # annotated image is copied into the same report folder. This makes the - # HTML work when opened directly from disk (or via HF file viewer). - annotated_img_display = annotated_img_full - try: - if annotated_img_local: - annotated_img_local_abs = os.path.abspath(annotated_img_local) - report_dir_abs = os.path.abspath(report_dir) - # If the annotated image resides in the report folder, reference by basename - if os.path.commonpath([annotated_img_local_abs, report_dir_abs]) == report_dir_abs: - annotated_img_display = os.path.basename(annotated_img_local_abs) - else: - # If annotated image is inside the outputs/reports// path but not same - # absolute path (edge cases), make it relative to the report dir - prefix = f"/outputs/reports/{report_id}/" - if isinstance(annotated_img_full, str) and annotated_img_full.startswith(prefix): - annotated_img_display = annotated_img_full[len(prefix):] - except Exception: - annotated_img_display = annotated_img_full - - download_pdf_btn = f'' if pdf_url else '' - - # Format generated time - generated_time = datetime.datetime.now().strftime('%b %d, %Y, %I:%M %p') - - html_content = f""" - - - - - {report_title} — Manalife AI - - - -
-
-
- - Manalife Logo -
-
-

MANALIFE AI — Medical Analysis

-
Advanced cytological colposcopy and histopathology reporting
-
contact@manalife.ai • +1 (555) 123-4567
-
-
- -
-
-
-
MEDICAL ANALYSIS REPORT OF {report_type.upper()}
-

{report_title}

-
-
-
Report ID: {report_id}
-
Generated: {generated_time}
-
-
- -
- -
-
-
Patient Information
- - - - - -
Patient ID{patient_id}
Exam Date{exam_date}
Physician{metadata_dict.get('physician', 'N/A')}
Facility{metadata_dict.get('facility', 'N/A')}
-
- -
-
Sample Information
- - - - - -
Specimen Type{metadata_dict.get('specimen_type', 'N/A')}
Clinical History{metadata_dict.get('clinical_history', 'N/A')}
Collected{exam_date}
Reported{generated_time}
-
- -
-
AI-Assisted Analysis
- - {analysis_metrics_html} -
-
-
AI Interpretation:
-
{ai_summary.get('ai_interpretation', 'No AI interpretation available.')}
-
-
- - {'
Annotated Analysis Image
Annotated Analysis Result
' if annotated_img else ''} - -
-
Doctor\'s Notes
-

{notes or 'No additional notes provided.'}

-
- -
-
Recommendations
-

Continue routine screening as per standard guidelines. Follow up as directed by your physician.

-
- -
-
Signatures
-
-
-
Rajesh Venugopal
-
Physician
-
-
-
-
- - -
- -
- {download_pdf_btn} - -
-
- -""" - - with open(report_html, "w", encoding="utf-8") as f: - f.write(html_content) - - # Update report.json to include the resolved annotated image url so callers can find it - try: - report_data['analysis']['annotated_image_url'] = annotated_img_full or '' - with open(report_json, 'w', encoding='utf-8') as f: - json.dump(report_data, f, indent=2, ensure_ascii=False) - except Exception as e: - print(f"⚠️ Failed to update report.json with annotated image url: {e}") - - return { - "report_id": report_id, - "json_url": json_url, - "html_url": html_url, - "pdf_url": pdf_url, - } - - except Exception as e: - return JSONResponse( - content={"error": f"Failed to generate report: {str(e)}"}, - status_code=500 - ) - -@app.get("/reports/{report_id}") -async def get_report(report_id: str): - """Fetch a generated report by ID.""" - report_dir = os.path.join(OUTPUT_DIR, "reports", report_id) - report_json = os.path.join(report_dir, "report.json") - - if not os.path.exists(report_json): - return JSONResponse( - content={"error": "Report not found"}, - status_code=404 - ) - - with open(report_json, "r") as f: - report_data = json.load(f) - - return report_data - -@app.get("/reports") -async def list_reports(patient_id: str = None): - """List all generated reports, optionally filtered by patient ID.""" - reports_dir = os.path.join(OUTPUT_DIR, "reports") - if not os.path.exists(reports_dir): - return {"reports": []} - - reports = [] - for report_id in os.listdir(reports_dir): - report_json = os.path.join(reports_dir, report_id, "report.json") - if os.path.exists(report_json): - with open(report_json, "r") as f: - report_data = json.load(f) - if not patient_id or report_data["patient"]["id"] == patient_id: - reports.append({ - "report_id": report_id, - "patient_id": report_data["patient"]["id"], - "exam_date": report_data["patient"]["exam_date"], - "generated_at": report_data["generated_at"] - }) - - return {"reports": sorted(reports, key=lambda r: r["generated_at"], reverse=True)} - -@app.get("/models") -def get_models(): - return {"available_models": ["yolo", "mwt", "cin", "histopathology"]} - -@app.get("/health") -def health(): - return {"message": "Pathora Medical Diagnostic API is running!"} - -# FRONTEND - -# ===================================================== - - -# Serve frontend only if it has been built; avoid startup failure when dist/ is missing. -FRONTEND_DIST = os.path.abspath(os.path.join(os.path.dirname(__file__), "../frontend/dist")) - -# Check if frontend/dist exists in /app (Docker), otherwise check relative to script location -if not os.path.isdir(FRONTEND_DIST): - # Fallback for Docker: frontend is copied to ./frontend/dist during build - FRONTEND_DIST = os.path.join(os.path.dirname(__file__), "frontend/dist") - -ASSETS_DIR = os.path.join(FRONTEND_DIST, "assets") - -if os.path.isdir(ASSETS_DIR): - app.mount("/assets", StaticFiles(directory=ASSETS_DIR), name="assets") -else: - print("ℹ️ Frontend assets directory not found — skipping /assets mount.") - -@app.get("/") -async def serve_frontend(): - index_path = os.path.join(FRONTEND_DIST, "index.html") - if os.path.isfile(index_path): - return FileResponse(index_path) - return JSONResponse({"message": "Backend is running. Frontend build not found."}) - -@app.get("/{file_path:path}") -async def serve_static_files(file_path: str): - """Serve static files from frontend dist (images, logos, etc.)""" - # Skip API routes - if file_path.startswith(("predict", "reports", "models", "health", "outputs", "assets", "cyto", "colpo", "histo")): - return JSONResponse({"error": "Not found"}, status_code=404) - - # Try to serve file from dist root - static_file = os.path.join(FRONTEND_DIST, file_path) - if os.path.isfile(static_file): - return FileResponse(static_file) - - # Fallback to index.html for client-side routing - index_path = os.path.join(FRONTEND_DIST, "index.html") - if os.path.isfile(index_path): - return FileResponse(index_path) - - return JSONResponse({"error": "Not found"}, status_code=404) - -if __name__ == "__main__": - # Use PORT provided by the environment (Hugging Face Spaces sets PORT=7860) - port = int(os.environ.get("PORT", 7860)) - uvicorn.run(app, host="0.0.0.0", port=port) +import os +import shutil + +for d in ["/tmp/huggingface", "/tmp/Ultralytics", "/tmp/matplotlib", "/tmp/torch", "/root/.cache"]: + shutil.rmtree(d, ignore_errors=True) + +os.environ["HF_HOME"] = "/tmp/huggingface" +os.environ["HUGGINGFACE_HUB_CACHE"] = "/tmp/huggingface" +os.environ["TORCH_HOME"] = "/tmp/torch" +os.environ["MPLCONFIGDIR"] = "/tmp/matplotlib" +os.environ["YOLO_CONFIG_DIR"] = "/tmp/Ultralytics" + +import json +import importlib.util +import uuid +import datetime +import numpy as np +import torch +import cv2 +import joblib +import torch.nn as nn +import torchvision.transforms as transforms +import torchvision.models as models +from io import BytesIO +from PIL import Image as PILImage +from fastapi import FastAPI, File, UploadFile, Form +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import JSONResponse, FileResponse +import tensorflow as tf +from model_histo import BreastCancerClassifier +from fastapi.staticfiles import StaticFiles +import uvicorn +try: + from reportlab.lib.pagesizes import letter + from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Image as ReportLabImage + from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle + from reportlab.lib.enums import TA_CENTER, TA_JUSTIFY + from reportlab.lib.units import inch + from reportlab.lib.colors import navy, black + REPORTLAB_AVAILABLE = True +except ImportError: + REPORTLAB_AVAILABLE = False +from ultralytics import YOLO +from sklearn.preprocessing import MinMaxScaler +from model import MWT as create_model +from augmentations import Augmentations +from huggingface_hub import InferenceClient + +# ===================================================== + +# SETUP TEMP DIRS AND ENV + +# ===================================================== + +for d in ["/tmp/huggingface", "/tmp/Ultralytics", "/tmp/matplotlib", "/tmp/torch"]: + shutil.rmtree(d, ignore_errors=True) + +os.environ["HF_HOME"] = "/tmp/huggingface" +os.environ["HUGGINGFACE_HUB_CACHE"] = "/tmp/huggingface" +os.environ["TORCH_HOME"] = "/tmp/torch" +os.environ["MPLCONFIGDIR"] = "/tmp/matplotlib" +os.environ["YOLO_CONFIG_DIR"] = "/tmp/Ultralytics" + +# ===================================================== + +# HUGGING FACE CLIENT SETUP + +# ===================================================== + +HF_MODEL_ID = "BioMistral/BioMistral-7B" +hf_token = os.getenv("HF_TOKEN") +client = None + +if hf_token: + try: + client = InferenceClient(model=HF_MODEL_ID, token=hf_token) + print(f"✅ Hugging Face InferenceClient initialized for {HF_MODEL_ID}") + except Exception as e: + print("⚠️ Failed to initialize Hugging Face client:", e) +else: + print("⚠️ Warning: No HF_TOKEN found — summaries will be skipped.") + +def generate_ai_summary(abnormal_cells, normal_cells, avg_confidence): + """Generate a brief medical interpretation using Mistral.""" + if not client: + return "⚠️ Hugging Face client not initialized — skipping summary." + + try: + prompt = f"""Act as a cytopathology expert providing a brief diagnostic interpretation. + +Observed Cell Counts: +- {abnormal_cells} Abnormal Cells +- {normal_cells} Normal Cells + +Write a 2-3 sentence professional medical assessment focusing on: +1. Cell count analysis +2. Abnormality ratio ({abnormal_cells/(abnormal_cells + normal_cells)*100:.1f}%) +3. Clinical significance + +Use objective, scientific language suitable for a pathology report.""" + + # Use streaming to avoid StopIteration + response = client.text_generation( + prompt, + max_new_tokens=200, + temperature=0.7, + stream=False, + details=True, + stop_sequences=["\n\n", "###"] + ) + + # Handle different response formats + if hasattr(response, 'generated_text'): + return response.generated_text.strip() + elif isinstance(response, dict): + return response.get('generated_text', '').strip() + elif isinstance(response, str): + return response.strip() + + # Fallback summary if response format is unexpected + ratio = abnormal_cells / (abnormal_cells + normal_cells) * 100 if (abnormal_cells + normal_cells) > 0 else 0 + return f"Analysis shows {abnormal_cells} abnormal cells ({ratio:.1f}%) and {normal_cells} normal cells." + + except Exception as e: + # Provide a structured fallback summary instead of error message + total = abnormal_cells + normal_cells + if total == 0: + return "No cells were detected in the sample. Consider re-scanning or adjusting detection parameters." + + ratio = (abnormal_cells / total) * 100 + severity = "high" if ratio > 70 else "moderate" if ratio > 30 else "low" + + return f"Quantitative analysis detected {abnormal_cells} abnormal cells ({ratio:.1f}%) among {total} total cells, indicating {severity} abnormality ratio." + + +def generate_mwt_summary(predicted_label, confidences, avg_confidence): + """Generate a short MWT-specific interpretation using the HF client when available.""" + if not client: + return "⚠️ Hugging Face client not initialized — skipping AI interpretation." + + try: + prompt = f""" +You are a concise cytopathology expert. Given an MWT classifier result, write a 1-2 sentence professional interpretation suitable for embedding in a diagnostic report. + +Result: +- Predicted label: {predicted_label} +- Class probabilities: {json.dumps(confidences)} + +Provide guidance on the significance of the result and any suggested next steps in plain, objective language. +""" + + response = client.text_generation( + prompt, + max_new_tokens=120, + temperature=0.2, + stream=False, + details=True, + stop_sequences=["\n\n", "###"] + ) + + if hasattr(response, 'generated_text'): + return response.generated_text.strip() + elif isinstance(response, dict): + return response.get('generated_text', '').strip() + elif isinstance(response, str): + return response.strip() + + return f"Result: {predicted_label}." + except Exception as e: + return f"Quantitative result: {predicted_label}." + + +def generate_cin_summary(predicted_grade, confidences, avg_confidence): + """Generate a short CIN-specific interpretation using the HF client when available.""" + if not client: + return "⚠️ Hugging Face client not initialized — skipping AI interpretation." + + try: + prompt = f""" +You are a concise gynecologic pathology expert. Given a CIN classifier result, write a 1-2 sentence professional interpretation suitable for a diagnostic report. + +Result: +- Predicted grade: {predicted_grade} +- Class probabilities: {json.dumps(confidences)} + +Provide a brief statement about clinical significance and suggested next steps (e.g., further colposcopic evaluation) in objective, clinical language. +""" + + response = client.text_generation( + prompt, + max_new_tokens=140, + temperature=0.2, + stream=False, + details=True, + stop_sequences=["\n\n", "###"] + ) + + if hasattr(response, 'generated_text'): + return response.generated_text.strip() + elif isinstance(response, dict): + return response.get('generated_text', '').strip() + elif isinstance(response, str): + return response.strip() + + return f"Result: {predicted_grade}." + except Exception: + return f"Quantitative result: {predicted_grade}." + + +# ===================================================== + +# FASTAPI SETUP + +# ===================================================== + +app = FastAPI(title="Pathora Medical Diagnostic API") + +app.add_middleware( + CORSMiddleware, + allow_origins=["*", "http://localhost:5173", "http://127.0.0.1:5173"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], + expose_headers=["*"] # Allow access to response headers +) + +# Use /tmp for outputs in Hugging Face Spaces (writable directory) +OUTPUT_DIR = os.environ.get("OUTPUT_DIR", "/tmp/outputs") +os.makedirs(OUTPUT_DIR, exist_ok=True) + +# Create image outputs dir +IMAGES_DIR = os.path.join(OUTPUT_DIR, "images") +os.makedirs(IMAGES_DIR, exist_ok=True) + +app.mount("/outputs", StaticFiles(directory=OUTPUT_DIR), name="outputs") + +# Mount public sample images from frontend dist (Vite copies public/ to dist/ root) +# Check both possible locations: frontend/dist (Docker) and ../frontend/dist (local dev) +FRONTEND_DIST_CHECK = os.path.join(os.path.dirname(__file__), "frontend/dist") +if not os.path.isdir(FRONTEND_DIST_CHECK): + FRONTEND_DIST_CHECK = os.path.abspath(os.path.join(os.path.dirname(__file__), "../frontend/dist")) + +for sample_dir in ["cyto", "colpo", "histo"]: + sample_path = os.path.join(FRONTEND_DIST_CHECK, sample_dir) + if os.path.isdir(sample_path): + app.mount(f"/{sample_dir}", StaticFiles(directory=sample_path), name=sample_dir) + print(f"✅ Mounted /{sample_dir} from {sample_path}") + else: + print(f"⚠️ Sample directory not found: {sample_path}") + +# Mount other static assets (logos, banners) from dist root +for static_file in ["banner.jpeg", "white_logo.png", "black_logo.png", "manalife_LOGO.jpg"]: + static_path = os.path.join(FRONTEND_DIST_CHECK, static_file) + if os.path.isfile(static_path): + print(f"✅ Static file available: /{static_file}") + +device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + +# ===================================================== + +# MODEL LOADS + +# ===================================================== + +print("🔹 Loading YOLO model...") +yolo_model = YOLO("best2.pt") + +print("🔹 Loading MWT model...") +mwt_model = create_model(num_classes=2).to(device) +mwt_model.load_state_dict(torch.load("MWTclass2.pth", map_location=device)) +mwt_model.eval() +mwt_class_names = ["Negative", "Positive"] + +print("🔹 Loading CIN model...") +try: + clf = joblib.load("logistic_regression_model.pkl") +except Exception as e: + print(f"⚠️ CIN classifier not available (logistic_regression_model.pkl missing or invalid): {e}") + clf = None + +yolo_colposcopy = YOLO("yolo_colposcopy.pt") + +# Load the Manalife Pathora colposcopy fusion pipeline +COLPO_INFERENCE_PATH = os.path.join(os.path.dirname(__file__), "Colpo", "inference.py") +colpo_inference = None + +try: + spec = importlib.util.spec_from_file_location("colpo_inference", COLPO_INFERENCE_PATH) + if spec and spec.loader: + colpo_inference = importlib.util.module_from_spec(spec) + spec.loader.exec_module(colpo_inference) + print("✅ Loaded Manalife Pathora colposcopy inference module.") + else: + raise ImportError("Invalid import spec for Colpo inference module") +except Exception as e: + colpo_inference = None + print(f"⚠️ Could not load Manalife Pathora colposcopy inference: {e}") + +# ===================================================== + +# RESNET FEATURE EXTRACTORS FOR CIN + +# ===================================================== + +def build_resnet(model_name="resnet50"): + if model_name == "resnet50": + model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT) + elif model_name == "resnet101": + model = models.resnet101(weights=models.ResNet101_Weights.DEFAULT) + elif model_name == "resnet152": + model = models.resnet152(weights=models.ResNet152_Weights.DEFAULT) + model.eval().to(device) + return ( + nn.Sequential(model.conv1, model.bn1, model.relu, model.maxpool), + model.layer1, model.layer2, model.layer3, model.layer4, + ) +gap = nn.AdaptiveAvgPool2d((1, 1)) +gmp = nn.AdaptiveMaxPool2d((1, 1)) +resnet50_blocks = build_resnet("resnet50") +resnet101_blocks = build_resnet("resnet101") +resnet152_blocks = build_resnet("resnet152") + +transform = transforms.Compose([ +transforms.ToPILImage(), +transforms.Resize((224, 224)), +transforms.ToTensor(), +transforms.Normalize(mean=[0.485, 0.456, 0.406], +std=[0.229, 0.224, 0.225]), +]) + + +def preprocess_for_mwt(image_np): + img = cv2.resize(image_np, (224, 224)) + img = Augmentations.Normalization((0, 1))(img) + img = np.array(img, np.float32) + img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) + img = img.transpose(2, 0, 1) + img = np.expand_dims(img, axis=0) + return torch.Tensor(img) + +def extract_cbf_features(blocks, img_t): + block1, block2, block3, block4, block5 = blocks + with torch.no_grad(): + f1 = block1(img_t) + f2 = block2(f1) + f3 = block3(f2) + f4 = block4(f3) + f5 = block5(f4) + p1 = gmp(f1).view(-1) + p2 = gmp(f2).view(-1) + p3 = gap(f3).view(-1) + p4 = gap(f4).view(-1) + p5 = gap(f5).view(-1) + return torch.cat([p1, p2, p3, p4, p5], dim=0).cpu().numpy() + +# ===================================================== +# Model 4: Histopathology Classifier (TensorFlow) +# ===================================================== +print("🔹 Attempting to load Breast Cancer Histopathology model...") + +try: + classifier = BreastCancerClassifier(fine_tune=False) + + # Safely handle Hugging Face token auth + hf_token = os.getenv("HF_TOKEN") + if hf_token: + if classifier.authenticate_huggingface(): + print("✅ Hugging Face authentication successful.") + else: + print("⚠️ Warning: Hugging Face authentication failed, using local model only.") + else: + print("⚠️ HF_TOKEN not found in environment — Path Foundation model cannot be loaded.") + print(" To use histopathology predictions, set the HF_TOKEN environment variable.") + + # Load Path Foundation model + print("Attempting to load Path Foundation model...") + if classifier.load_path_foundation(): + print("✅ Loaded Path Foundation base model.") + else: + print("⚠️ WARNING: Could not load Path Foundation base model.") + print(" Histopathology predictions will not work without this model.") + print(" Set HF_TOKEN environment variable to enable this feature.") + + # Load trained histopathology model + model_path = "histopathology_trained_model.keras" + if os.path.exists(model_path): + classifier.model = tf.keras.models.load_model(model_path) + print(f"✅ Loaded local histopathology model: {model_path}") + else: + print(f"⚠️ Model file not found: {model_path}") + print(" Histopathology predictions will not work without the trained weights.") + +except Exception as e: + classifier = None + print(f"❌ Error initializing histopathology model: {e}") + import traceback + traceback.print_exc() + +def predict_histopathology(image): + if classifier is None: + return {"error": "Histopathology model not available."} + + try: + if image.mode != "RGB": + image = image.convert("RGB") + image = image.resize((224, 224)) + img_array = np.expand_dims(np.array(image).astype("float32") / 255.0, axis=0) + + # Check if path_foundation model is loaded + if classifier.path_foundation is None: + return {"error": "Histopathology prediction failed: Path Foundation model not loaded. Ensure HF_TOKEN is set and the model can be downloaded."} + + embeddings = classifier.extract_embeddings(img_array) + + # Check if model is loaded + if classifier.model is None: + return {"error": "Histopathology prediction failed: Classification model not loaded."} + + prediction_proba = classifier.model.predict(embeddings, verbose=0)[0] + predicted_class = int(np.argmax(prediction_proba)) + class_names = ["Benign", "Malignant"] + + # Return confidence as dictionary with both class probabilities (like MWT/CIN) + confidences = {class_names[i]: float(prediction_proba[i]) for i in range(len(class_names))} + avg_confidence = float(np.max(prediction_proba)) * 100 + + return { + "model_used": "Histopathology Classifier", + "prediction": class_names[predicted_class], + "confidence": confidences, + "summary": { + "ai_interpretation": f"Histopathological analysis indicates {class_names[predicted_class].lower()} tissue.", + }, + } + except Exception as e: + return {"error": f"Histopathology prediction failed: {e}"} + + +# ===================================================== + +# MAIN ENDPOINT + +# ===================================================== + + +@app.post("/predict/") +async def predict(model_name: str = Form(...), file: UploadFile = File(...)): + print(f"Received prediction request - model: {model_name}, file: {file.filename}") + + # Validate model name + if model_name not in ["yolo", "mwt", "cin", "histopathology", "manalife_pathora_model"]: + return JSONResponse( + content={ + "error": f"Invalid model_name: {model_name}. Must be one of: yolo, mwt, cin, histopathology, manalife_pathora_model" + }, + status_code=400 + ) + + # Validate and read file + if not file.filename: + return JSONResponse( + content={"error": "No file provided"}, + status_code=400 + ) + + contents = await file.read() + if len(contents) == 0: + return JSONResponse( + content={"error": "Empty file provided"}, + status_code=400 + ) + + # Attempt to open and validate image + try: + image = PILImage.open(BytesIO(contents)).convert("RGB") + image_np = np.array(image) + if image_np.size == 0: + raise ValueError("Empty image array") + print(f"Successfully loaded image, shape: {image_np.shape}") + except Exception as e: + return JSONResponse( + content={"error": f"Invalid image file: {str(e)}"}, + status_code=400 + ) + + if model_name == "yolo": + results = yolo_model(image) + detections_json = results[0].to_json() + detections = json.loads(detections_json) + + abnormal_cells = sum(1 for d in detections if d["name"] == "abnormal") + normal_cells = sum(1 for d in detections if d["name"] == "normal") + avg_confidence = np.mean([d.get("confidence", 0) for d in detections]) * 100 if detections else 0 + + ai_summary = generate_ai_summary(abnormal_cells, normal_cells, avg_confidence) + + output_filename = f"detected_{uuid.uuid4().hex[:8]}.jpg" + output_path = os.path.join(IMAGES_DIR, output_filename) + results[0].save(filename=output_path) + + return { + "model_used": "YOLO Detection", + "detections": detections, + "annotated_image_url": f"/outputs/images/{output_filename}", + "summary": { + "abnormal_cells": abnormal_cells, + "normal_cells": normal_cells, + "ai_interpretation": ai_summary, + }, + } + + elif model_name == "mwt": + tensor = preprocess_for_mwt(image_np) + with torch.no_grad(): + output = mwt_model(tensor.to(device)).cpu() + probs = torch.softmax(output, dim=1)[0] + confidences = {mwt_class_names[i]: float(probs[i]) for i in range(2)} + predicted_label = mwt_class_names[int(torch.argmax(probs).item())] + # Average / primary confidence for display + avg_confidence = float(torch.max(probs).item()) * 100 + + # Generate a brief AI interpretation using the Mistral client (if available) + ai_interp = generate_mwt_summary(predicted_label, confidences, avg_confidence) + + return { + "model_used": "MWT Classifier", + "prediction": predicted_label, + "confidence": confidences, + "summary": { + "ai_interpretation": ai_interp, + }, + } + + elif model_name == "cin": + if clf is None: + return JSONResponse( + content={"error": "CIN classifier not available on server."}, + status_code=503, + ) + + # Decode uploaded image and run colposcopy detector + nparr = np.frombuffer(contents, np.uint8) + img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) + results = yolo_colposcopy.predict(source=img, conf=0.7, save=False, verbose=False) + if len(results[0].boxes) == 0: + return {"error": "No cervix detected"} + + x1, y1, x2, y2 = map(int, results[0].boxes.xyxy[0].cpu().numpy()) + crop = img[y1:y2, x1:x2] + crop = cv2.resize(crop, (224, 224)) + img_t = transform(crop).unsqueeze(0).to(device) + + # Extract features from multiple ResNet backbones + f50 = extract_cbf_features(resnet50_blocks, img_t) + f101 = extract_cbf_features(resnet101_blocks, img_t) + f152 = extract_cbf_features(resnet152_blocks, img_t) + features = np.concatenate([f50, f101, f152]).reshape(1, -1) + + # Scale and predict + X_scaled = MinMaxScaler().fit_transform(features) + + # Ensure classifier supports probability outputs + try: + proba = clf.predict_proba(X_scaled)[0] + except Exception as e: + return JSONResponse( + content={"error": "CIN classifier does not support probability estimates (predict_proba)."}, + status_code=503, + ) + + num_classes = int(len(proba)) + + # Handle different classifier output sizes: + # - If 3 classes: map directly to CIN1/CIN2/CIN3 + # - If 2 classes: apply a conservative heuristic to split High-grade into CIN2/CIN3 + if num_classes == 3: + classes = ["CIN1", "CIN2", "CIN3"] + confidences = {classes[i]: float(proba[i]) for i in range(3)} + predicted_idx = int(np.argmax(proba)) + predicted_label = classes[predicted_idx] + avg_confidence = float(np.max(proba)) * 100 + mapping_used = "direct_3class" + elif num_classes == 2: + # Binary model detected (e.g., Low-grade vs High-grade). We'll convert to CIN1/CIN2/CIN3 + # Heuristic: + # - CIN1 <- low_grade_prob + # - Split high_grade_prob into CIN2 and CIN3 based on how confident 'high' is. + # * If high <= 0.6 -> mostly CIN2 + # * If high >= 0.8 -> mostly CIN3 + # * Between 0.6 and 0.8 -> interpolate + low_prob = float(proba[0]) + high_prob = float(proba[1]) + + if high_prob <= 0.6: + cin3_factor = 0.0 + elif high_prob >= 0.8: + cin3_factor = 1.0 + else: + cin3_factor = (high_prob - 0.6) / 0.2 + + cin1 = low_prob + cin3 = high_prob * cin3_factor + cin2 = high_prob - cin3 + + confidences = {"CIN1": cin1, "CIN2": cin2, "CIN3": cin3} + # pick highest of the mapped three as primary prediction + predicted_label = max(confidences.items(), key=lambda x: x[1])[0] + avg_confidence = float(max(confidences.values())) * 100 + mapping_used = "binary_to_3class_heuristic" + else: + return JSONResponse( + content={ + "error": "CIN classifier must output 2-class (Low/High) or 3-class probabilities (CIN1, CIN2, CIN3).", + "detected_num_classes": num_classes, + }, + status_code=503, + ) + + # Generate AI interpretation using Mistral client (if available) + ai_interp = generate_cin_summary(predicted_label, confidences, avg_confidence) + + response = { + "model_used": "CIN Classifier", + "prediction": predicted_label, + "confidence": confidences, + "summary": { + "ai_interpretation": ai_interp, + }, + } + + # If we used the binary->3class heuristic, include a diagnostic field so callers know it was mapped + if 'mapping_used' in locals() and mapping_used == 'binary_to_3class_heuristic': + response["mapping_used"] = mapping_used + response["mapping_note"] = ( + "The server mapped a binary Low/High classifier to CIN1/CIN2/CIN3 using a heuristic split. " + "This is an approximation — for clinical use please supply a native 3-class model." + ) + + return response + elif model_name == "manalife_pathora_model": + if colpo_inference is None or not hasattr(colpo_inference, "run_inference"): + return JSONResponse( + content={"error": "Pathora colposcopy model not available on server."}, + status_code=503, + ) + + # Save the incoming image to disk for the fusion pipeline + input_name = f"colpo_input_{uuid.uuid4().hex[:8]}.png" + input_path = os.path.join(IMAGES_DIR, input_name) + with open(input_path, "wb") as f: + f.write(contents) + + try: + fusion_result = colpo_inference.run_inference(input_path) + except Exception as e: + return JSONResponse( + content={"error": f"Colposcopy fusion inference failed: {e}"}, + status_code=500, + ) + + prob_abn = float(fusion_result.get("probability_abnormal", 0.0)) + acet_present = int(fusion_result.get("acet_present", 0)) + decision_text = fusion_result.get("decision", "Decision unavailable") + + overlay_src = fusion_result.get("overlay_image") + overlay_url = None + if overlay_src and os.path.isfile(overlay_src): + overlay_name = f"colpo_overlay_{uuid.uuid4().hex[:8]}.png" + overlay_dst = os.path.join(IMAGES_DIR, overlay_name) + try: + shutil.copy(overlay_src, overlay_dst) + overlay_url = f"/outputs/images/{overlay_name}" + except Exception as copy_err: + print(f"⚠️ Failed to copy overlay image: {copy_err}") + + # Fallback: expose the raw input if overlay is missing + if not overlay_url: + overlay_url = f"/outputs/images/{input_name}" + + return { + "model_used": "Manalife_Pathora_model", + "decision": decision_text, + "probability_abnormal": round(prob_abn, 3), + "acet_present": acet_present, + "annotated_image_url": overlay_url, + "summary": { + "model_used": "Manalife_Pathora_model", + "decision": decision_text, + "probability_abnormal": round(prob_abn, 3), + "acet_present": "Yes" if acet_present else "No", + "ai_interpretation": decision_text, + }, + } + elif model_name == "histopathology": + result = predict_histopathology(image) + return result + + + else: + return JSONResponse(content={"error": "Invalid model name"}, status_code=400) + +# ===================================================== + +# ROUTES + +# ===================================================== + +def create_designed_pdf(pdf_path, report_data, analysis_summary_json, annotated_image_path=None): + doc = SimpleDocTemplate(pdf_path, pagesize=letter, + rightMargin=72, leftMargin=72, + topMargin=72, bottomMargin=18) + styles = getSampleStyleSheet() + story = [] + + styles.add(ParagraphStyle(name='Title', fontSize=20, fontName='Helvetica-Bold', alignment=TA_CENTER, textColor=navy)) + styles.add(ParagraphStyle(name='Section', fontSize=14, fontName='Helvetica-Bold', spaceBefore=10, spaceAfter=6)) + styles.add(ParagraphStyle(name='NormalSmall', fontSize=10, leading=12)) + styles.add(ParagraphStyle(name='Heading', fontSize=16, fontName='Helvetica-Bold', textColor=navy, spaceBefore=6, spaceAfter=4)) + + patient = report_data['patient'] + analysis = report_data.get('analysis', {}) + + # Safely parse analysis_summary_json + try: + ai_summary = json.loads(analysis_summary_json) if analysis_summary_json else {} + except (json.JSONDecodeError, TypeError): + ai_summary = {} + + # Determine report type based on model used + model_used = ai_summary.get('model_used', '') + if 'YOLO' in model_used or 'yolo' in str(analysis.get('id', '')).lower(): + report_type = "CYTOLOGY" + report_title = "Cytology Report" + elif 'MWT' in model_used or 'mwt' in str(model_used).lower(): + # MWT is a cytology classifier; use a clearer report title for MWT results + report_type = "CYTOLOGY" + report_title = "Cytology Analysis Report" + elif 'CIN' in model_used or 'cin' in str(analysis.get('id', '')).lower() or 'colpo' in str(analysis.get('id', '')).lower(): + report_type = "COLPOSCOPY" + report_title = "Colposcopy Report" + elif 'histo' in str(analysis.get('id', '')).lower() or 'histopathology' in model_used.lower(): + report_type = "HISTOPATHOLOGY" + report_title = "Histopathology Report" + else: + report_type = "CYTOLOGY" + report_title = "Medical Analysis Report" + + # Header + story.append(Paragraph("MANALIFE AI", styles['Title'])) + story.append(Paragraph("Advanced Medical Analysis", styles['NormalSmall'])) + story.append(Spacer(1, 0.3*inch)) + story.append(Paragraph(f"MEDICAL ANALYSIS REPORT OF {report_type}", styles['Heading'])) + story.append(Paragraph(report_title, styles['Section'])) + story.append(Spacer(1, 0.2*inch)) + + # Report ID and Date + story.append(Paragraph(f"Report ID: {report_data.get('report_id', 'N/A')}", styles['NormalSmall'])) + story.append(Paragraph(f"Generated: {datetime.datetime.now().strftime('%b %d, %Y, %I:%M %p')}", styles['NormalSmall'])) + story.append(Spacer(1, 0.2*inch)) + + # Patient Information Section + story.append(Paragraph("Patient Information", styles['Section'])) + story.append(Paragraph(f"Patient ID: {patient.get('id', 'N/A')}", styles['NormalSmall'])) + story.append(Paragraph(f"Exam Date: {patient.get('exam_date', 'N/A')}", styles['NormalSmall'])) + story.append(Paragraph(f"Physician: {patient.get('physician', 'N/A')}", styles['NormalSmall'])) + story.append(Paragraph(f"Facility: {patient.get('facility', 'N/A')}", styles['NormalSmall'])) + story.append(Spacer(1, 0.2*inch)) + + # Sample Information Section + story.append(Paragraph("Sample Information", styles['Section'])) + story.append(Paragraph(f"Specimen Type: {patient.get('specimen_type', 'Cervical Cytology')}", styles['NormalSmall'])) + story.append(Paragraph(f"Clinical History: {patient.get('clinical_history', 'N/A')}", styles['NormalSmall'])) + story.append(Spacer(1, 0.2*inch)) + + # AI Analysis Section + story.append(Paragraph("AI-ASSISTED ANALYSIS", styles['Section'])) + story.append(Paragraph("System: Manalife AI System — Automated Analysis", styles['NormalSmall'])) + + # Add metrics based on report type + if report_type == "HISTOPATHOLOGY": + # For histopathology, show Benign/Malignant confidence + confidence_dict = ai_summary.get('confidence', {}) + if isinstance(confidence_dict, dict): + benign_conf = confidence_dict.get('Benign', 0) * 100 + malignant_conf = confidence_dict.get('Malignant', 0) * 100 + story.append(Paragraph(f"Benign Confidence: {benign_conf:.2f}%", styles['NormalSmall'])) + story.append(Paragraph(f"Malignant Confidence: {malignant_conf:.2f}%", styles['NormalSmall'])) + elif report_type == "CYTOLOGY": + # For cytology and MWT, show class confidences if available, otherwise show abnormal/normal cells + confidence_dict = ai_summary.get('confidence', {}) + if isinstance(confidence_dict, dict) and confidence_dict: + for cls, val in confidence_dict.items(): + conf_pct = val * 100 if isinstance(val, (int, float)) else 0 + story.append(Paragraph(f"{cls} Confidence: {conf_pct:.2f}%", styles['NormalSmall'])) + else: + if 'abnormal_cells' in ai_summary: + story.append(Paragraph(f"Abnormal Cells: {ai_summary.get('abnormal_cells', 'N/A')}", styles['NormalSmall'])) + if 'normal_cells' in ai_summary: + story.append(Paragraph(f"Normal Cells: {ai_summary.get('normal_cells', 'N/A')}", styles['NormalSmall'])) + else: + # For CIN/Colposcopy, show class confidences + confidence_dict = ai_summary.get('confidence', {}) + if isinstance(confidence_dict, dict): + for cls, val in confidence_dict.items(): + conf_pct = val * 100 if isinstance(val, (int, float)) else 0 + story.append(Paragraph(f"{cls} Confidence: {conf_pct:.2f}%", styles['NormalSmall'])) + + story.append(Spacer(1, 0.1*inch)) + story.append(Paragraph("AI Interpretation:", styles['NormalSmall'])) + story.append(Paragraph(ai_summary.get('ai_interpretation', 'Not available.'), styles['NormalSmall'])) + story.append(Spacer(1, 0.2*inch)) + + # If an annotated image path was provided and exists on disk, embed it + if annotated_image_path: + try: + if os.path.isfile(annotated_image_path): + story.append(Spacer(1, 0.1*inch)) + # Determine image pixel size and scale to a reasonable width for PDF + try: + from PIL import Image as PILImageLocal + with PILImageLocal.open(annotated_image_path) as im: + img_w, img_h = im.size + except Exception: + img_w, img_h = (800, 600) + + # Display width in points (ReportLab uses points; 1 inch = 72 points). Assume 96 DPI for pixel->inch. + display_width_px = max(300, min(img_w, 800)) + width_points = min(5 * inch, (display_width_px / 96.0) * inch) + + img = ReportLabImage(annotated_image_path, width=width_points, kind='proportional') + story.append(img) + story.append(Spacer(1, 0.2*inch)) + except Exception as e: + # Don't fail the whole PDF creation if image embedding fails + print(f"⚠️ Could not embed annotated image in PDF: {e}") + + # Doctor's Notes + story.append(Paragraph("Doctor's Notes", styles['Section'])) + story.append(Paragraph(report_data.get('doctor_notes') or 'No additional notes provided.', styles['NormalSmall'])) + story.append(Spacer(1, 0.2*inch)) + + # Recommendations + story.append(Paragraph("RECOMMENDATIONS", styles['Section'])) + story.append(Paragraph("Continue routine screening as per standard guidelines. Follow up as directed by your physician.", styles['NormalSmall'])) + story.append(Spacer(1, 0.3*inch)) + + # Signatures + story.append(Paragraph("Signatures", styles['Section'])) + story.append(Paragraph("Rajesh Venugopal, Physician", styles['NormalSmall'])) + #story.append(Paragraph("", styles['NormalSmall'])) + story.append(Spacer(1, 0.1*inch)) + story.append(Paragraph(f"Generated on: {datetime.datetime.now().strftime('%b %d, %Y, %I:%M %p')}", styles['NormalSmall'])) + + doc.build(story) + + + +@app.post("/reports/") +async def generate_report( + patient_id: str = Form(...), + exam_date: str = Form(...), + metadata: str = Form(...), + notes: str = Form(None), + analysis_id: str = Form(None), + analysis_summary: str = Form(None), + file: UploadFile = File(None), +): + """Generate a structured medical report from analysis results and metadata.""" + try: + # Create reports directory if it doesn't exist + reports_dir = os.path.join(OUTPUT_DIR, "reports") + os.makedirs(reports_dir, exist_ok=True) + + # Generate unique report ID + report_id = f"{patient_id}_{uuid.uuid4().hex[:8]}" + report_dir = os.path.join(reports_dir, report_id) + os.makedirs(report_dir, exist_ok=True) + + # Parse metadata + metadata_dict = json.loads(metadata) + + # Get analysis results - assuming stored in memory or retrievable + # TODO: Implement analysis results storage/retrieval + + # Construct report data + report_data = { + "report_id": report_id, + "generated_at": datetime.datetime.now().isoformat(), + "patient": { + "id": patient_id, + "exam_date": exam_date, + **metadata_dict + }, + "analysis": { + "id": analysis_id, + # If the analysis_id is actually an annotated image URL, store it for report embedding + "annotated_image_url": analysis_id, + # TODO: Add actual analysis results + }, + "doctor_notes": notes + } + + # Save report data + report_json = os.path.join(report_dir, "report.json") + with open(report_json, "w", encoding="utf-8") as f: + json.dump(report_data, f, indent=2, ensure_ascii=False) + + # We'll create PDF later (after parsing analysis_summary and resolving + # any annotated/input image). Initialize pdf_url here. + pdf_url = None + + # Parse analysis_summary to get AI results + try: + ai_summary = json.loads(analysis_summary) if analysis_summary else {} + except (json.JSONDecodeError, TypeError): + ai_summary = {} + + # Resolve annotated image: prefer AI/analysis annotated image; if none, + # save the uploaded input image (if provided) into the report folder + # and use that as the embedded image. + annotated_img = ai_summary.get('annotated_image_url') or report_data.get("analysis", {}).get("annotated_image_url") or "" + annotated_img_full = "" + annotated_img_local = None + + if annotated_img: + # If it's an outputs path served by StaticFiles, map to local file + if isinstance(annotated_img, str) and annotated_img.startswith('/outputs/'): + rel = annotated_img[len('/outputs/'):].lstrip('/') + annotated_img_local = os.path.join(OUTPUT_DIR, rel) + annotated_img_full = annotated_img + else: + # keep absolute URLs as-is for HTML + annotated_img_full = annotated_img if isinstance(annotated_img, str) else '' + + # If no annotated image provided, but an input file was uploaded, save it + if not annotated_img_full and file is not None and getattr(file, 'filename', None): + try: + input_filename = f"input_image{os.path.splitext(file.filename)[1] or '.jpg'}" + input_path = os.path.join(report_dir, input_filename) + contents = await file.read() + with open(input_path, 'wb') as out_f: + out_f.write(contents) + annotated_img_full = f"/outputs/reports/{report_id}/{input_filename}" + annotated_img_local = input_path + except Exception as e: + print(f"⚠️ Failed to save uploaded input image for report: {e}") + + # Ensure annotated_img_full has a leading slash if it's a relative path + if annotated_img_full and not annotated_img_full.startswith(('http://', 'https://')): + annotated_img_full = annotated_img_full if annotated_img_full.startswith('/') else '/' + annotated_img_full + + # If we have a local annotated image but it's stored in the shared images folder + # (e.g. /outputs/images/...), copy it into this report's folder so the HTML/PDF + # can reference the image relative to the report directory. This also makes the + # image visible when opening report.html directly from disk (file://). + try: + if annotated_img_local: + annotated_img_local_abs = os.path.abspath(annotated_img_local) + report_dir_abs = os.path.abspath(report_dir) + # If the image is not already in the report directory, copy it there + if not os.path.commonpath([annotated_img_local_abs, report_dir_abs]) == report_dir_abs: + src_basename = os.path.basename(annotated_img_local_abs) + dest_name = f"annotated_{src_basename}" + dest_path = os.path.join(report_dir, dest_name) + try: + shutil.copy2(annotated_img_local_abs, dest_path) + annotated_img_local = dest_path + annotated_img_full = f"/outputs/reports/{report_id}/{dest_name}" + except Exception as e: + # If copy fails, keep using the original annotated_img_full (may be served by StaticFiles) + print(f"⚠️ Failed to copy annotated image into report folder: {e}") + except Exception: + pass + + # Now attempt to create the PDF (passing the local annotated image path + # so the PDF writer can embed it). If annotation is remote or not + # available, PDF creation will still proceed without the image. + if REPORTLAB_AVAILABLE: + try: + pdf_path = os.path.join(report_dir, "report.pdf") + create_designed_pdf(pdf_path, report_data, analysis_summary, annotated_img_local) + pdf_url = f"/outputs/reports/{report_id}/report.pdf" + except Exception as e: + print(f"Error creating designed PDF: {e}") + pdf_url = None + + # Determine report type based on analysis summary or model used + model_used = ai_summary.get('model_used', '') + if 'YOLO' in model_used or 'yolo' in str(analysis_id).lower(): + report_type = "Cytology" + report_title = "Cytology Report" + elif 'MWT' in model_used or 'mwt' in str(model_used).lower() or 'mwt' in str(analysis_id).lower(): + # MWT is a cytology classifier — use clearer title + report_type = "Cytology" + report_title = "Cytology Analysis Report" + elif 'CIN' in model_used or 'cin' in str(analysis_id).lower() or 'colpo' in str(analysis_id).lower(): + report_type = "Colposcopy" + report_title = "Colposcopy Report" + elif 'histo' in str(analysis_id).lower() or 'histopathology' in model_used.lower(): + report_type = "Histopathology" + report_title = "Histopathology Report" + else: + # Default fallback + report_type = "Cytology" + report_title = "Medical Analysis Report" + + # Build analysis metrics HTML based on report type + if report_type == "Histopathology": + # For histopathology, show Benign/Malignant confidence from the confidence dict + confidence_dict = ai_summary.get('confidence', {}) + benign_conf = confidence_dict.get('Benign', 0) * 100 if isinstance(confidence_dict, dict) else 0 + malignant_conf = confidence_dict.get('Malignant', 0) * 100 if isinstance(confidence_dict, dict) else 0 + + analysis_metrics_html = f""" + SystemManalife AI System — Automated Analysis + Benign Confidence{benign_conf:.2f}% + Malignant Confidence{malignant_conf:.2f}% + """ + elif report_type == "Cytology": + # For cytology (YOLO) or MWT, show class confidences if provided, else abnormal/normal counts + confidence_dict = ai_summary.get('confidence', {}) + if isinstance(confidence_dict, dict) and confidence_dict: + confidence_rows = "" + for cls, val in confidence_dict.items(): + conf_pct = val * 100 if isinstance(val, (int, float)) else 0 + confidence_rows += f"{cls} Confidence{conf_pct:.2f}%\n " + analysis_metrics_html = f""" + SystemManalife AI System — Automated Analysis + {confidence_rows} + """ + else: + analysis_metrics_html = f""" + SystemManalife AI System — Automated Analysis + Abnormal Cells{ai_summary.get('abnormal_cells', 'N/A')} + Normal Cells{ai_summary.get('normal_cells', 'N/A')} + """ + else: + # For CIN/Colposcopy or other models, show generic confidence + confidence_dict = ai_summary.get('confidence', {}) + confidence_rows = "" + if isinstance(confidence_dict, dict): + for cls, val in confidence_dict.items(): + conf_pct = val * 100 if isinstance(val, (int, float)) else 0 + confidence_rows += f"{cls} Confidence{conf_pct:.2f}%\n " + + analysis_metrics_html = f""" + SystemManalife AI System — Automated Analysis + {confidence_rows} + """ + + # Build final HTML including download links and embedded annotated image + report_html = os.path.join(report_dir, "report.html") + json_url = f"/outputs/reports/{report_id}/report.json" + html_url = f"/outputs/reports/{report_id}/report.html" + + # annotated_img_full was computed earlier; ensure it's defined and set + # annotated_img (used by the HTML template conditional) accordingly. + if 'annotated_img_full' not in locals() or not annotated_img_full: + annotated_img_full = '' + annotated_img = annotated_img_full + + # Compute a display path for the HTML. Prefer a relative filename when the + # annotated image is copied into the same report folder. This makes the + # HTML work when opened directly from disk (or via HF file viewer). + annotated_img_display = annotated_img_full + try: + if annotated_img_local: + annotated_img_local_abs = os.path.abspath(annotated_img_local) + report_dir_abs = os.path.abspath(report_dir) + # If the annotated image resides in the report folder, reference by basename + if os.path.commonpath([annotated_img_local_abs, report_dir_abs]) == report_dir_abs: + annotated_img_display = os.path.basename(annotated_img_local_abs) + else: + # If annotated image is inside the outputs/reports// path but not same + # absolute path (edge cases), make it relative to the report dir + prefix = f"/outputs/reports/{report_id}/" + if isinstance(annotated_img_full, str) and annotated_img_full.startswith(prefix): + annotated_img_display = annotated_img_full[len(prefix):] + except Exception: + annotated_img_display = annotated_img_full + + download_pdf_btn = f'' if pdf_url else '' + + # Format generated time + generated_time = datetime.datetime.now().strftime('%b %d, %Y, %I:%M %p') + + html_content = f""" + + + + + {report_title} — Manalife AI + + + +
+
+
+ + Manalife Logo +
+
+

MANALIFE AI — Medical Analysis

+
Advanced cytological colposcopy and histopathology reporting
+
contact@manalife.ai • +1 (555) 123-4567
+
+
+ +
+
+
+
MEDICAL ANALYSIS REPORT OF {report_type.upper()}
+

{report_title}

+
+
+
Report ID: {report_id}
+
Generated: {generated_time}
+
+
+ +
+ +
+
+
Patient Information
+ + + + + +
Patient ID{patient_id}
Exam Date{exam_date}
Physician{metadata_dict.get('physician', 'N/A')}
Facility{metadata_dict.get('facility', 'N/A')}
+
+ +
+
Sample Information
+ + + + + +
Specimen Type{metadata_dict.get('specimen_type', 'N/A')}
Clinical History{metadata_dict.get('clinical_history', 'N/A')}
Collected{exam_date}
Reported{generated_time}
+
+ +
+
AI-Assisted Analysis
+ + {analysis_metrics_html} +
+
+
AI Interpretation:
+
{ai_summary.get('ai_interpretation', 'No AI interpretation available.')}
+
+
+ + {'
Annotated Analysis Image
Annotated Analysis Result
' if annotated_img else ''} + +
+
Doctor\'s Notes
+

{notes or 'No additional notes provided.'}

+
+ +
+
Recommendations
+

Continue routine screening as per standard guidelines. Follow up as directed by your physician.

+
+ +
+
Signatures
+
+
+
Rajesh Venugopal
+
Physician
+
+
+
+
+ + +
+ +
+ {download_pdf_btn} + +
+
+ +""" + + with open(report_html, "w", encoding="utf-8") as f: + f.write(html_content) + + # Update report.json to include the resolved annotated image url so callers can find it + try: + report_data['analysis']['annotated_image_url'] = annotated_img_full or '' + with open(report_json, 'w', encoding='utf-8') as f: + json.dump(report_data, f, indent=2, ensure_ascii=False) + except Exception as e: + print(f"⚠️ Failed to update report.json with annotated image url: {e}") + + return { + "report_id": report_id, + "json_url": json_url, + "html_url": html_url, + "pdf_url": pdf_url, + } + + except Exception as e: + return JSONResponse( + content={"error": f"Failed to generate report: {str(e)}"}, + status_code=500 + ) + +@app.get("/reports/{report_id}") +async def get_report(report_id: str): + """Fetch a generated report by ID.""" + report_dir = os.path.join(OUTPUT_DIR, "reports", report_id) + report_json = os.path.join(report_dir, "report.json") + + if not os.path.exists(report_json): + return JSONResponse( + content={"error": "Report not found"}, + status_code=404 + ) + + with open(report_json, "r") as f: + report_data = json.load(f) + + return report_data + +@app.get("/reports") +async def list_reports(patient_id: str = None): + """List all generated reports, optionally filtered by patient ID.""" + reports_dir = os.path.join(OUTPUT_DIR, "reports") + if not os.path.exists(reports_dir): + return {"reports": []} + + reports = [] + for report_id in os.listdir(reports_dir): + report_json = os.path.join(reports_dir, report_id, "report.json") + if os.path.exists(report_json): + with open(report_json, "r") as f: + report_data = json.load(f) + if not patient_id or report_data["patient"]["id"] == patient_id: + reports.append({ + "report_id": report_id, + "patient_id": report_data["patient"]["id"], + "exam_date": report_data["patient"]["exam_date"], + "generated_at": report_data["generated_at"] + }) + + return {"reports": sorted(reports, key=lambda r: r["generated_at"], reverse=True)} + +@app.get("/models") +def get_models(): + return {"available_models": ["yolo", "mwt", "cin", "histopathology"]} + +@app.get("/health") +def health(): + return {"message": "Pathora Medical Diagnostic API is running!"} + +# FRONTEND + +# ===================================================== + + +# Serve frontend only if it has been built; avoid startup failure when dist/ is missing. +FRONTEND_DIST = os.path.abspath(os.path.join(os.path.dirname(__file__), "../frontend/dist")) + +# Check if frontend/dist exists in /app (Docker), otherwise check relative to script location +if not os.path.isdir(FRONTEND_DIST): + # Fallback for Docker: frontend is copied to ./frontend/dist during build + FRONTEND_DIST = os.path.join(os.path.dirname(__file__), "frontend/dist") + +ASSETS_DIR = os.path.join(FRONTEND_DIST, "assets") + +if os.path.isdir(ASSETS_DIR): + app.mount("/assets", StaticFiles(directory=ASSETS_DIR), name="assets") +else: + print("ℹ️ Frontend assets directory not found — skipping /assets mount.") + +@app.get("/") +async def serve_frontend(): + index_path = os.path.join(FRONTEND_DIST, "index.html") + if os.path.isfile(index_path): + return FileResponse(index_path) + return JSONResponse({"message": "Backend is running. Frontend build not found."}) + +@app.get("/{file_path:path}") +async def serve_static_files(file_path: str): + """Serve static files from frontend dist (images, logos, etc.)""" + # Skip API routes + if file_path.startswith(("predict", "reports", "models", "health", "outputs", "assets", "cyto", "colpo", "histo")): + return JSONResponse({"error": "Not found"}, status_code=404) + + # Try to serve file from dist root + static_file = os.path.join(FRONTEND_DIST, file_path) + if os.path.isfile(static_file): + return FileResponse(static_file) + + # Fallback to index.html for client-side routing + index_path = os.path.join(FRONTEND_DIST, "index.html") + if os.path.isfile(index_path): + return FileResponse(index_path) + + return JSONResponse({"error": "Not found"}, status_code=404) + +if __name__ == "__main__": + # Use PORT provided by the environment (Hugging Face Spaces sets PORT=7860) + port = int(os.environ.get("PORT", 7860)) + uvicorn.run(app, host="0.0.0.0", port=port)