from fastapi import FastAPI, File, UploadFile, HTTPException, Body from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from pydantic import BaseModel import cv2 import numpy as np import tempfile import os from io import BytesIO from PIL import Image import uvicorn import traceback import json from typing import List, Dict, Optional import re # Load .env file for local development. # Search from this file's directory upward so it works whether the server # is launched from project root (uvicorn backend.app:app) or from # inside the backend/ folder (python app.py). try: from dotenv import load_dotenv _here = os.path.dirname(os.path.abspath(__file__)) # Try backend/.env first, then project root .env for _env_path in [ os.path.join(_here, ".env"), os.path.join(_here, "..", ".env"), ]: if os.path.isfile(_env_path): load_dotenv(_env_path) print(f"✅ Loaded .env from: {os.path.abspath(_env_path)}") break else: print("⚠️ No .env file found. Set GEMINI_API_KEY in your environment.") except ImportError: pass try: from .inference import infer_aw_contour, analyze_frame, analyze_video_frame, infer_cervix_bbox except ImportError: from inference import infer_aw_contour, analyze_frame, analyze_video_frame, infer_cervix_bbox # Import Google Gemini (optional - graceful degradation if not installed) try: import google.generativeai as genai GEMINI_AVAILABLE = True except ImportError: GEMINI_AVAILABLE = False print("⚠️ google-generativeai not installed. LLM endpoints will be unavailable.") app = FastAPI(title="Pathora Colposcopy API", version="1.0.0") # Add CORS middleware to allow requests from frontend app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Initialize Gemini if available GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") or os.getenv("VITE_GEMINI_API_KEY") if GEMINI_AVAILABLE and GEMINI_API_KEY: try: genai.configure(api_key=GEMINI_API_KEY) print("✅ Gemini AI configured successfully") except Exception as e: print(f"⚠️ Failed to configure Gemini: {e}") GEMINI_AVAILABLE = False elif GEMINI_AVAILABLE: print("⚠️ GEMINI_API_KEY not found in environment variables") def get_supported_gemini_models() -> List[str]: """Return model names that support generateContent for this API key.""" if not GEMINI_AVAILABLE or not GEMINI_API_KEY: return [] discovered: List[str] = [] try: for model in genai.list_models(): methods = getattr(model, "supported_generation_methods", []) or [] if "generateContent" not in methods: continue raw_name = getattr(model, "name", "") if not raw_name: continue discovered.append(raw_name) # Some SDK calls accept short names while discovery returns models/. if raw_name.startswith("models/"): discovered.append(raw_name[len("models/"):]) except Exception as e: print(f"⚠️ Could not list Gemini models: {e}") return [] # De-duplicate while preserving order. unique_models: List[str] = [] seen = set() for name in discovered: if name not in seen: unique_models.append(name) seen.add(name) return unique_models # Cache models that fail due to quota so we skip them on subsequent requests. QUOTA_BLOCKED_MODELS: set[str] = set() def get_ordered_model_candidates(available_models: List[str]) -> List[str]: """Order models by preference and exclude quota-blocked models.""" preferred_models = [ # Put models that are usually available on free keys first. "models/gemini-2.5-flash", "gemini-2.5-flash", "models/gemini-flash-latest", "gemini-flash-latest", "models/gemini-2.5-flash-lite", "gemini-2.5-flash-lite", "models/gemini-flash-lite-latest", "gemini-flash-lite-latest", # Keep older families as fallback. "models/gemini-2.0-flash", "gemini-2.0-flash", "models/gemini-2.0-flash-lite", "gemini-2.0-flash-lite", "models/gemini-1.5-flash", "gemini-1.5-flash", "models/gemini-1.5-pro", "gemini-1.5-pro", "models/gemini-pro-latest", "gemini-pro-latest", "models/gemini-pro", "gemini-pro", ] available = [m for m in available_models if m not in QUOTA_BLOCKED_MODELS] ordered = [m for m in preferred_models if m in available] ordered.extend(m for m in available if m not in ordered) return ordered # Pydantic models for LLM endpoints class ChatMessage(BaseModel): role: str text: str class ChatRequest(BaseModel): message: str history: List[ChatMessage] = [] system_prompt: Optional[str] = None class ReportGenerationRequest(BaseModel): patient_data: Dict exam_findings: Dict images: Optional[List[str]] = [] # base64 encoded images system_prompt: Optional[str] = None class SPAStaticFiles(StaticFiles): async def get_response(self, path: str, scope): response = await super().get_response(path, scope) if response.status_code == 404: return await super().get_response("index.html", scope) return response @app.get("/health") async def health_check(): """Health check endpoint""" available_models = get_supported_gemini_models() return { "status": "healthy", "service": "Pathora Colposcopy API", "ai_models": { "acetowhite_model": "loaded", "cervix_model": "loaded" }, "llm": { "gemini_available": GEMINI_AVAILABLE, "api_key_configured": bool(GEMINI_API_KEY), "available_models": available_models } } @app.get("/api/health") async def api_health_check(): """Health check endpoint under /api for HF Spaces compatibility.""" return await health_check() @app.post("/api/chat") async def chat_endpoint(request: ChatRequest): """ LLM Chat endpoint for conversational AI assistant Args: request: ChatRequest with message, history, and optional system_prompt Returns: JSON with AI response """ if not GEMINI_AVAILABLE: raise HTTPException( status_code=503, detail="Gemini AI is not available. Install google-generativeai package." ) if not GEMINI_API_KEY: raise HTTPException( status_code=503, detail="GEMINI_API_KEY not configured in environment variables" ) try: # Use system prompt or default system_prompt = request.system_prompt or """You are Pathora AI — a specialist colposcopy assistant. \ Provide expert guidance on examination techniques, findings interpretation, and management guidelines. \ Be professional, evidence-based, and concise.""" # Prefer modern fast models, then fall back to any model exposed by this key. available_models = get_supported_gemini_models() if not available_models: raise Exception( "No Gemini models with generateContent are available for this API key. " "Check API key permissions and Gemini API enablement." ) model_names = get_ordered_model_candidates(available_models) print(f"✅ Chat available models: {available_models}") print(f"✅ Chat candidate models: {model_names}") response_text = None used_model = None for model_name in model_names: try: print(f"🔄 Trying chat model: {model_name}") # Initialize Gemini model model = genai.GenerativeModel( model_name=model_name, system_instruction=system_prompt ) # Build conversation history chat_history = [] for msg in request.history: role = "model" if msg.role == "bot" else "user" chat_history.append({ "role": role, "parts": [msg.text] }) # Start chat with history chat = model.start_chat(history=chat_history) # Send message and get response response = chat.send_message(request.message) response_text = response.text used_model = model_name print(f"✅ Successfully used chat model: {model_name}") break except Exception as model_err: err_str = str(model_err) if "429" in err_str or "quota exceeded" in err_str.lower(): QUOTA_BLOCKED_MODELS.add(model_name) print(f"⏭️ Skipping quota-blocked chat model: {model_name}") print(f"⚠️ Chat model {model_name} failed: {err_str}") continue if not response_text: raise Exception("All model attempts failed. Please check API key and model availability.") return JSONResponse({ "status": "success", "response": response_text, "model": used_model }) except Exception as e: error_msg = str(e) print(f"❌ Chat error: {error_msg}") traceback.print_exc() # Provide more helpful error messages if "API key" in error_msg or "authentication" in error_msg.lower(): detail = "API key authentication failed. Please add GEMINI_API_KEY to HF Space secrets." elif "not found" in error_msg.lower() or "404" in error_msg: detail = f"Gemini model not available. Error: {error_msg}. Please verify API key." else: detail = f"Chat error: {error_msg}" raise HTTPException(status_code=500, detail=detail) @app.post("/api/generate-report") async def generate_report_endpoint(request: ReportGenerationRequest): """ Generate colposcopy report using LLM based on patient data and exam findings Args: request: ReportGenerationRequest with patient data, exam findings, and images Returns: JSON with generated report """ if not GEMINI_AVAILABLE: raise HTTPException( status_code=503, detail="Gemini AI is not available. Install google-generativeai package." ) if not GEMINI_API_KEY: raise HTTPException( status_code=503, detail="GEMINI_API_KEY not configured in environment variables" ) try: # Use system prompt from frontend if provided, otherwise use a strict JSON-forcing default system_prompt = request.system_prompt or """You are an expert colposcopy AI assistant acting as a specialist gynaecologist. Analyse ALL the clinical data provided and return ONLY a valid JSON object — no markdown, no extra text, no code fences. The JSON must have EXACTLY these 10 keys and no others: { "examQuality": "", "transformationZone": "", "acetowL": "", "nativeFindings": "<2-3 sentence summary of native view findings>", "aceticFindings": "<2-3 sentence summary of acetic acid findings>", "biopsySites": "", "biopsyNotes": "", "colposcopicFindings": "", "treatmentPlan": "", "followUp": "" }""" # Build a clean data prompt — just present the clinical data, # the system_instruction above enforces the output format. prompt_parts = [] prompt_parts.append("PATIENT DATA:") prompt_parts.append(json.dumps(request.patient_data, indent=2)) prompt_parts.append("\n\nEXAMINATION FINDINGS & OBSERVATIONS:") prompt_parts.append(json.dumps(request.exam_findings, indent=2)) prompt_parts.append(""" Based on all the above clinical data, return ONLY the JSON object with exactly these 10 keys: examQuality, transformationZone, acetowL, nativeFindings, aceticFindings, biopsySites, biopsyNotes, colposcopicFindings, treatmentPlan, followUp Do NOT include any other keys. Do NOT wrap in markdown. Return raw JSON only.""") full_prompt = "\n".join(prompt_parts) # Prefer modern fast models, then fall back to any model exposed by this key. available_models = get_supported_gemini_models() if not available_models: raise Exception( "No Gemini models with generateContent are available for this API key. " "Check API key permissions and Gemini API enablement." ) model_names = get_ordered_model_candidates(available_models) print(f"✅ Report available models: {available_models}") print(f"✅ Report candidate models: {model_names}") response_text = None used_model = None for model_name in model_names: try: print(f"🔄 Trying model: {model_name}") model = genai.GenerativeModel( model_name=model_name, system_instruction=system_prompt ) response = model.generate_content(full_prompt) response_text = response.text used_model = model_name print(f"✅ Successfully used model: {model_name}") break except Exception as model_err: err_str = str(model_err) if "429" in err_str or "quota exceeded" in err_str.lower(): QUOTA_BLOCKED_MODELS.add(model_name) print(f"⏭️ Skipping quota-blocked report model: {model_name}") print(f"⚠️ Model {model_name} failed: {err_str}") continue if not response_text: raise Exception("All model attempts failed. Please check API key and model availability.") # Ensure response_text is valid JSON before returning try: # Strip markdown if present cleaned_text = response_text.strip() if cleaned_text.startswith('```'): cleaned_text = re.sub(r'^```[a-z]*\n?', '', cleaned_text, flags=re.IGNORECASE) cleaned_text = re.sub(r'\n?```\s*$', '', cleaned_text) cleaned_text = cleaned_text.strip() # Parse to verify it's valid JSON parsed_json = json.loads(cleaned_text) print(f"✅ Report is valid JSON with keys: {list(parsed_json.keys())}") # Return as JSON object (not string) so it's properly encoded by FastAPI return JSONResponse({ "status": "success", "report": cleaned_text, # Backward-compatible JSON string "report_json": parsed_json, # Structured payload for robust frontend mapping "model": used_model }) except json.JSONDecodeError as je: print(f"⚠️ Response is not valid JSON: {je}") print(f"Response text: {response_text[:500]}") raise Exception(f"Gemini returned invalid JSON: {str(je)}") except Exception as e: error_msg = str(e) print(f"❌ Report generation error: {error_msg}") traceback.print_exc() if "API key" in error_msg or "authentication" in error_msg.lower(): detail = "API key authentication failed. Please check GEMINI_API_KEY in HF Space secrets." elif "not found" in error_msg.lower() or "404" in error_msg: detail = f"Gemini model not available. Error: {error_msg}. Please verify API key has access to Gemini models." else: detail = f"Report generation error: {error_msg}" raise HTTPException(status_code=500, detail=detail) @app.post("/api/infer-aw-contour") async def infer_aw_contour_endpoint(file: UploadFile = File(...), conf_threshold: float = 0.4): """ Inference endpoint for Acetowhite contour detection Args: file: Image file (jpg, png, etc.) conf_threshold: Confidence threshold for YOLO model (0.0-1.0) Returns: JSON with base64 encoded result image """ try: # Read image file image_data = await file.read() print(f"✅ File received, size: {len(image_data)} bytes") # Try to open image - this will work regardless of content type try: image = Image.open(BytesIO(image_data)) print(f"✅ Image opened, mode: {image.mode}, size: {image.size}") except Exception as e: print(f"❌ Image open error: {e}") traceback.print_exc() raise HTTPException(status_code=400, detail=f"Invalid image file: {str(e)}") # Convert to numpy array and BGR format (OpenCV uses BGR) # Handle different image modes if image.mode == 'RGBA': # Convert RGBA to RGB image = image.convert('RGB') elif image.mode != 'RGB': # Convert other modes to RGB image = image.convert('RGB') frame = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) print(f"✅ Frame converted, shape: {frame.shape}") # Run inference - returns dict with 'overlay', 'contours', 'detections', etc. print(f"🔄 Running infer_aw_contour with conf_threshold={conf_threshold}") result = infer_aw_contour(frame, conf_threshold=conf_threshold) print(f"✅ Inference complete, detections: {result['detections']}") # Convert result overlay back to RGB for JSON serialization if result["overlay"] is not None: result_rgb = cv2.cvtColor(result["overlay"], cv2.COLOR_BGR2RGB) result_image = Image.fromarray(result_rgb) # Encode to base64 buffer = BytesIO() result_image.save(buffer, format="PNG") buffer.seek(0) import base64 image_base64 = base64.b64encode(buffer.getvalue()).decode() print(f"✅ Image encoded to base64, size: {len(image_base64)} chars") else: image_base64 = None print("⚠️ No overlay returned from inference") return JSONResponse({ "status": "success", "message": "Inference completed successfully", "result_image": image_base64, "contours": result["contours"], "detections": result["detections"], "confidence_threshold": conf_threshold }) except Exception as e: print(f"❌ EXCEPTION in infer_aw_contour:") traceback.print_exc() raise HTTPException(status_code=500, detail=f"Error during inference: {str(e)}") @app.post("/api/batch-infer") async def batch_infer(files: list[UploadFile] = File(...), conf_threshold: float = 0.4): """ Batch inference endpoint for multiple images Args: files: List of image files conf_threshold: Confidence threshold for YOLO model Returns: JSON with results for all images """ results = [] for file in files: try: image_data = await file.read() image = Image.open(BytesIO(image_data)) # Handle different image modes if image.mode == 'RGBA': image = image.convert('RGB') elif image.mode != 'RGB': image = image.convert('RGB') frame = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) # Run inference - returns dict with 'overlay', 'contours', 'detections', etc. result = infer_aw_contour(frame, conf_threshold=conf_threshold) if result["overlay"] is not None: result_rgb = cv2.cvtColor(result["overlay"], cv2.COLOR_BGR2RGB) result_image = Image.fromarray(result_rgb) buffer = BytesIO() result_image.save(buffer, format="PNG") buffer.seek(0) import base64 image_base64 = base64.b64encode(buffer.getvalue()).decode() else: image_base64 = None results.append({ "filename": file.filename, "status": "success", "result_image": image_base64, "contours": result["contours"], "detections": result["detections"] }) except Exception as e: results.append({ "filename": file.filename, "status": "error", "error": str(e) }) return JSONResponse({ "status": "completed", "total_files": len(results), "results": results }) @app.post("/infer/image") async def infer_image(file: UploadFile = File(...)): """ Single image inference endpoint for cervix detection/quality. """ try: contents = await file.read() nparr = np.frombuffer(contents, np.uint8) frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) result = analyze_frame(frame) return JSONResponse(content=result) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/infer/video") async def infer_video(file: UploadFile = File(...)): """ Video inference endpoint for cervix detection/quality (frame-by-frame). """ try: with tempfile.NamedTemporaryFile(delete=False) as tmp: tmp.write(await file.read()) temp_path = tmp.name cap = cv2.VideoCapture(temp_path) responses = [] frame_count = 0 while True: ret, frame = cap.read() if not ret: break result = analyze_video_frame(frame) responses.append({ "frame": frame_count, "status": result["status"], "quality_percent": result["quality_percent"] }) frame_count += 1 cap.release() os.remove(temp_path) return JSONResponse(content={ "total_frames": frame_count, "results": responses }) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/infer-cervix-bbox") async def infer_cervix_bbox_endpoint(file: UploadFile = File(...), conf_threshold: float = 0.4): """ Cervix bounding box detection endpoint for annotation. Detects cervix location and returns bounding boxes. Args: file: Image file (jpg, png, etc.) conf_threshold: Confidence threshold for YOLO model (0.0-1.0) Returns: JSON with base64 encoded annotated image and bounding box coordinates """ try: # Read image file image_data = await file.read() # Try to open image try: image = Image.open(BytesIO(image_data)) except Exception as e: raise HTTPException(status_code=400, detail=f"Invalid image file: {str(e)}") # Convert to numpy array and BGR format (OpenCV uses BGR) if image.mode == 'RGBA': image = image.convert('RGB') elif image.mode != 'RGB': image = image.convert('RGB') frame = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) # Run inference result = infer_cervix_bbox(frame, conf_threshold=conf_threshold) # Convert result overlay back to RGB for JSON serialization if result["overlay"] is not None: result_rgb = cv2.cvtColor(result["overlay"], cv2.COLOR_BGR2RGB) result_image = Image.fromarray(result_rgb) # Encode to base64 buffer = BytesIO() result_image.save(buffer, format="PNG") buffer.seek(0) import base64 image_base64 = base64.b64encode(buffer.getvalue()).decode() else: image_base64 = None return JSONResponse({ "status": "success", "message": "Cervix bounding box detection completed", "result_image": image_base64, "bounding_boxes": result["bounding_boxes"], "detections": result["detections"], "frame_width": result["frame_width"], "frame_height": result["frame_height"], "confidence_threshold": conf_threshold }) except Exception as e: raise HTTPException(status_code=500, detail=f"Error during cervix bbox inference: {str(e)}") # Serve the built frontend if present (Space/Docker runtime) frontend_dist = os.path.join(os.path.dirname(__file__), "..", "dist") if os.path.isdir(frontend_dist): app.mount("/", SPAStaticFiles(directory=frontend_dist, html=True), name="frontend") if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000)