| |
| """ |
| Elderly HealthWatch AI Backend (FastAPI) |
| Pipeline: |
| - receive images |
| - run VLM (remote gradio / chat_fn) -> JSON feature vector + raw text |
| - run LLM (remote gradio /chat) -> structured risk JSON (per requested schema) |
| - continue rest of processing and store results |
| |
| Notes: |
| - Add gradio_client==1.13.2 (or another compatible 1.x) to requirements.txt |
| - If VLM/LLM Spaces are private, set HF_TOKEN in the environment for authentication. |
| - This final variant: |
| * logs raw VLM responses, |
| * always returns raw VLM output in API responses, |
| * extracts JSON from VLM via regex when possible, and |
| * sends either cleaned JSON or raw VLM string into LLM (and logs which was used). |
| - VLM calls were simplified to a single call (no retries). |
| """ |
|
|
| import io |
| import os |
| import uuid |
| import json |
| import asyncio |
| import logging |
| import traceback |
| import re |
| import time |
| from typing import Dict, Any, Optional, Tuple |
| from datetime import datetime |
|
|
| from fastapi import FastAPI, UploadFile, File, BackgroundTasks, HTTPException |
| from fastapi.middleware.cors import CORSMiddleware |
| from PIL import Image |
| import numpy as np |
| import cv2 |
|
|
| |
| try: |
| from gradio_client import Client, handle_file |
| GRADIO_AVAILABLE = True |
| except Exception: |
| GRADIO_AVAILABLE = False |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger("elderly_healthwatch") |
|
|
| |
| GRADIO_VLM_SPACE = os.getenv("GRADIO_SPACE", "developer0hye/Qwen3-VL-8B-Instruct") |
| LLM_GRADIO_SPACE = os.getenv("LLM_GRADIO_SPACE", "Tonic/med-gpt-oss-20b-demo") |
| HF_TOKEN = os.getenv("HF_TOKEN", None) |
|
|
| |
| DEFAULT_VLM_PROMPT = ( |
| "From the provided face/eye images, compute the required screening features " |
| "(pallor, sclera yellowness, redness, mobility metrics, quality checks) " |
| "and output a clean JSON feature vector only with values ranging as probabilities." |
| ) |
|
|
| |
| LLM_MODEL_IDENTITY = os.getenv( |
| "LLM_MODEL_IDENTITY", |
| "You are GPT-Tonic, a large language model trained by TonicAI for clinical reasoning." |
| ) |
| LLM_SYSTEM_PROMPT = os.getenv( |
| "LLM_SYSTEM_PROMPT", |
| "System: This assistant MUST ONLY OUTPUT a single valid JSON object as its response — no prose, no explanations, no code fences, no annotations. The JSON must follow the schema requested by the user." |
| ) |
| LLM_DEVELOPER_PROMPT = os.getenv( |
| "LLM_DEVELOPER_PROMPT", |
| "Developer: Output ONLY a single valid JSON object with keys: risk_score, jaundice_probability, anemia_probability, hydration_issue_probability, neurological_issue_probability, summary, recommendation, confidence. Do NOT include any extra fields or natural language outside the JSON object." |
| ) |
|
|
| |
| _MTCNN_IMPL = None |
| try: |
| from facenet_pytorch import MTCNN as FacenetMTCNN |
| _MTCNN_IMPL = "facenet_pytorch" |
| except Exception: |
| FacenetMTCNN = None |
| _MTCNN_IMPL = None |
|
|
| if _MTCNN_IMPL is None: |
| try: |
| from mtcnn import MTCNN as ClassicMTCNN |
| _MTCNN_IMPL = "mtcnn" |
| except Exception: |
| ClassicMTCNN = None |
|
|
| def create_mtcnn_or_fallback(): |
| if _MTCNN_IMPL == "facenet_pytorch" and FacenetMTCNN is not None: |
| try: |
| return FacenetMTCNN(keep_all=False, device="cpu") |
| except Exception: |
| pass |
| if _MTCNN_IMPL == "mtcnn" and ClassicMTCNN is not None: |
| try: |
| return ClassicMTCNN() |
| except Exception: |
| pass |
| |
| try: |
| face_cascade_path = os.path.join(cv2.data.haarcascades, "haarcascade_frontalface_default.xml") |
| eye_cascade_path = os.path.join(cv2.data.haarcascades, "haarcascade_eye.xml") |
| if os.path.exists(face_cascade_path) and os.path.exists(eye_cascade_path): |
| return { |
| "impl": "opencv", |
| "face_cascade": cv2.CascadeClassifier(face_cascade_path), |
| "eye_cascade": cv2.CascadeClassifier(eye_cascade_path) |
| } |
| except Exception: |
| pass |
| return None |
|
|
| mtcnn = create_mtcnn_or_fallback() |
|
|
| app = FastAPI(title="Elderly HealthWatch AI Backend") |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| screenings_db: Dict[str, Dict[str, Any]] = {} |
|
|
| |
| |
| |
| def load_image_from_bytes(bytes_data: bytes) -> Image.Image: |
| return Image.open(io.BytesIO(bytes_data)).convert("RGB") |
|
|
| def estimate_eye_openness_from_detection(confidence: float) -> float: |
| try: |
| conf = float(confidence) |
| openness = min(max((conf * 1.15), 0.0), 1.0) |
| return openness |
| except Exception: |
| return 0.0 |
|
|
| |
| |
| |
| def extract_json_via_regex(raw_text: str) -> Dict[str, Any]: |
| """ |
| Extract numeric fields and text fields from the first {...} block found in raw_text. |
| Returns a dict with: |
| - risk_score (0..100) |
| - jaundice_probability (0..1) |
| - anemia_probability (0..1) |
| - hydration_issue_probability (0..1) |
| - neurological_issue_probability (0..1) |
| - confidence (0..1) |
| - summary (string) |
| - recommendation (string) |
| """ |
| match = re.search(r"\{[\s\S]*\}", raw_text) |
| if not match: |
| raise ValueError("No JSON-like block found in text") |
| block = match.group(0) |
|
|
| def find_number_for_key(key: str) -> Optional[float]: |
| patterns = [ |
| rf'"{key}"\s*:\s*["\']?\s*([-+]?\d+(\.\d+)?)\s*%?\s*["\']?', |
| rf"'{key}'\s*:\s*['\"]?\s*([-+]?\d+(\.\d+)?)\s*%?\s*['\"]?", |
| rf'\b{key}\b\s*:\s*["\']?\s*([-+]?\d+(\.\d+)?)\s*%?\s*["\']?', |
| rf'"{key}"\s*:\s*["\']([^"\']+)["\']', |
| rf"'{key}'\s*:\s*['\"]([^'\"]+)['\"]" |
| ] |
| for pat in patterns: |
| m = re.search(pat, block, flags=re.IGNORECASE) |
| if not m: |
| continue |
| g = m.group(1) |
| if g is None: |
| continue |
| s = str(g).strip().replace("%", "").strip() |
| try: |
| return float(s) |
| except Exception: |
| return None |
| return None |
|
|
| def find_text_for_key(key: str) -> str: |
| m = re.search(rf'"{key}"\s*:\s*"([^"]*)"', block, flags=re.IGNORECASE) |
| if m: |
| return m.group(1).strip() |
| m = re.search(rf"'{key}'\s*:\s*'([^']*)'", block, flags=re.IGNORECASE) |
| if m: |
| return m.group(1).strip() |
| m = re.search(rf'\b{key}\b\s*:\s*([^\n,}}]+)', block, flags=re.IGNORECASE) |
| if m: |
| return m.group(1).strip().strip('",') |
| return "" |
|
|
| raw_risk = find_number_for_key("risk_score") |
| raw_jaundice = find_number_for_key("jaundice_probability") |
| raw_anemia = find_number_for_key("anemia_probability") |
| raw_hydration = find_number_for_key("hydration_issue_probability") |
| raw_neuro = find_number_for_key("neurological_issue_probability") |
| raw_conf = find_number_for_key("confidence") |
|
|
| def normalize_prob(v: Optional[float]) -> float: |
| if v is None: |
| return 0.0 |
| if v > 1.0 and v <= 100.0: |
| return max(0.0, min(1.0, v / 100.0)) |
| if v > 100.0: |
| return 1.0 |
| return max(0.0, min(1.0, v)) |
|
|
| jaundice_probability = normalize_prob(raw_jaundice) |
| anemia_probability = normalize_prob(raw_anemia) |
| hydration_issue_probability = normalize_prob(raw_hydration) |
| neurological_issue_probability = normalize_prob(raw_neuro) |
| confidence = normalize_prob(raw_conf) |
|
|
| def normalize_risk(v: Optional[float]) -> float: |
| if v is None: |
| return 0.0 |
| if v <= 1.0: |
| return round(max(0.0, min(100.0, v * 100.0)), 2) |
| if v > 1.0 and v <= 100.0: |
| return round(max(0.0, min(100.0, v)), 2) |
| return round(max(0.0, min(100.0, v if v < float('inf') else 100.0)), 2) |
|
|
| risk_score = normalize_risk(raw_risk) |
|
|
| summary = find_text_for_key("summary") |
| recommendation = find_text_for_key("recommendation") |
|
|
| out = { |
| "risk_score": risk_score, |
| "jaundice_probability": round(jaundice_probability, 4), |
| "anemia_probability": round(anemia_probability, 4), |
| "hydration_issue_probability": round(hydration_issue_probability, 4), |
| "neurological_issue_probability": round(neurological_issue_probability, 4), |
| "confidence": round(confidence, 4), |
| "summary": summary, |
| "recommendation": recommendation |
| } |
| return out |
|
|
| |
| |
| |
| def get_gradio_client_for_space(space: str) -> Client: |
| if not GRADIO_AVAILABLE: |
| raise RuntimeError("gradio_client not installed in this environment. Add gradio_client to requirements.txt.") |
| if HF_TOKEN: |
| return Client(space, hf_token=HF_TOKEN) |
| return Client(space) |
|
|
| def run_vlm_and_get_features(face_path: str, eye_path: str, prompt: Optional[str] = None) -> Tuple[Optional[Dict[str, Any]], str]: |
| """ |
| Synchronous call to remote VLM (gradio /chat_fn). Returns tuple: |
| (parsed_features_dict_or_None, raw_text_response_str) |
| |
| Simplified: single call (no retries). Attempts json.loads then regex extraction. |
| """ |
| prompt = prompt or DEFAULT_VLM_PROMPT |
| if not os.path.exists(face_path) or not os.path.exists(eye_path): |
| raise FileNotFoundError("Face or eye image path missing for VLM call.") |
| if not GRADIO_AVAILABLE: |
| raise RuntimeError("gradio_client not available in this environment.") |
|
|
| client = get_gradio_client_for_space(GRADIO_VLM_SPACE) |
| message = {"text": prompt, "files": [handle_file(face_path), handle_file(eye_path)]} |
|
|
| |
| try: |
| logger.info("Calling VLM Space %s", GRADIO_VLM_SPACE) |
| result = client.predict(message=message, history=[], api_name="/chat_fn") |
| except Exception as e: |
| logger.exception("VLM call failed (no retries)") |
| raise RuntimeError(f"VLM call failed: {e}") |
|
|
| |
| raw_text = "" |
| if not result: |
| logger.warning("VLM returned empty result object") |
| raw_text = "" |
| else: |
| if isinstance(result, (list, tuple)): |
| out = result[0] |
| elif isinstance(result, dict): |
| out = result |
| else: |
| out = {"text": str(result)} |
|
|
| text_out = out.get("text") or out.get("output") or "" |
| raw_text = text_out |
| logger.info("VLM response object (debug): %s", out) |
|
|
| |
| if isinstance(out, dict) and ("files" in out) and (not text_out.strip()): |
| logger.warning("VLM returned no text AND files: %s", out.get("files")) |
|
|
| |
| logger.info("VLM raw output (length=%d):\n%s", len(raw_text or ""), (raw_text[:1000] + "...") if raw_text and len(raw_text) > 1000 else (raw_text or "<EMPTY>")) |
|
|
| |
| parsed_features = None |
| try: |
| parsed_features = json.loads(raw_text) if raw_text and raw_text.strip() else None |
| if parsed_features is not None and not isinstance(parsed_features, dict): |
| parsed_features = None |
| except Exception: |
| parsed_features = None |
|
|
| |
| if parsed_features is None and raw_text and raw_text.strip(): |
| try: |
| parsed_features = extract_json_via_regex(raw_text) |
| logger.info("VLM regex-extracted features:\n%s", json.dumps(parsed_features, indent=2, ensure_ascii=False)) |
| except Exception as e: |
| logger.info("VLM regex extraction failed or found nothing: %s", str(e)) |
| parsed_features = None |
|
|
| if parsed_features is None: |
| logger.info("VLM parsed features: None (will fallback to sending '{}' or raw string to LLM).") |
| else: |
| logger.info("VLM parsed features (final): %s", json.dumps(parsed_features, ensure_ascii=False)) |
|
|
| |
| return parsed_features, (raw_text or "") |
|
|
| |
| |
| |
| def run_llm_on_vlm(vlm_features_or_raw: Any, |
| max_new_tokens: int = 1024, |
| temperature: float = 0.0, |
| reasoning_effort: str = "medium", |
| model_identity: Optional[str] = None, |
| system_prompt: Optional[str] = None, |
| developer_prompt: Optional[str] = None) -> Dict[str, Any]: |
| """ |
| Call the remote LLM Space's /chat endpoint with defensive input handling and a single retry. |
| - Logs the VLM raw string and the chosen payload. |
| - Sends cleaned JSON (json.dumps(vlm_features)) if vlm_features_or_raw is dict, else sends raw string. |
| - Uses regex to extract the final JSON from LLM raw output. |
| """ |
| if not GRADIO_AVAILABLE: |
| raise RuntimeError("gradio_client not installed. Add gradio_client to requirements.txt") |
|
|
| |
| try: |
| from gradio_client import AppError |
| except Exception: |
| AppError = Exception |
|
|
| client = get_gradio_client_for_space(LLM_GRADIO_SPACE) |
| model_identity = model_identity or LLM_MODEL_IDENTITY |
| system_prompt = system_prompt or LLM_SYSTEM_PROMPT |
| developer_prompt = developer_prompt or LLM_DEVELOPER_PROMPT |
|
|
| |
| if isinstance(vlm_features_or_raw, str): |
| vlm_raw_str = vlm_features_or_raw |
| logger.info("LLM input will be RAW VLM STRING (len=%d)", len(vlm_raw_str or "")) |
| vlm_json_str_to_send = vlm_raw_str if vlm_raw_str and vlm_raw_str.strip() else "{}" |
| else: |
| vlm_raw_str = json.dumps(vlm_features_or_raw, ensure_ascii=False) if vlm_features_or_raw else "{}" |
| logger.info("LLM input will be CLEANED VLM JSON (len=%d)", len(vlm_raw_str)) |
| vlm_json_str_to_send = vlm_raw_str |
|
|
| |
| instruction = ( |
| "\n\nSTRICT INSTRUCTIONS (READ CAREFULLY):\n" |
| "1) OUTPUT ONLY a single valid JSON object and nothing else — no prose, no explanation, no code fences.\n" |
| "2) The JSON MUST include these keys: risk_score, jaundice_probability, anemia_probability, " |
| "hydration_issue_probability, neurological_issue_probability, summary, recommendation, confidence.\n" |
| "3) Use numeric values for probabilities (0..1) and for risk_score (0..100). Use strings for summary and recommendation.\n" |
| "4) Do NOT mention disease names in summary or recommendation; use neutral wording only.\n" |
| "If you cannot estimate a value, set it to null.\n\n" |
| "Now, based on the VLM output below, produce ONLY the JSON object described above.\n\n" |
| "===BEGIN VLM OUTPUT===\n" |
| f"{vlm_json_str_to_send}\n" |
| "===END VLM OUTPUT===\n\n" |
| ) |
|
|
| |
| try_max_new_tokens = int(max_new_tokens) if max_new_tokens is not None else 1024 |
| if try_max_new_tokens <= 0: |
| try_max_new_tokens = 1024 |
|
|
| try_temperature = float(temperature) if temperature is not None else 0.0 |
| |
| if try_temperature < 0.1: |
| try_temperature = 0.1 |
|
|
| predict_kwargs = dict( |
| input_data=instruction, |
| max_new_tokens=float(try_max_new_tokens), |
| model_identity=model_identity, |
| system_prompt=system_prompt, |
| developer_prompt=developer_prompt, |
| reasoning_effort=reasoning_effort, |
| temperature=float(try_temperature), |
| top_p=0.9, |
| top_k=50, |
| repetition_penalty=1.0, |
| api_name="/chat" |
| ) |
|
|
| last_exc = None |
| for attempt in (1, 2): |
| try: |
| logger.info("Calling LLM Space %s (attempt %d) with temperature=%s, max_new_tokens=%s", |
| LLM_GRADIO_SPACE, attempt, predict_kwargs.get("temperature"), predict_kwargs.get("max_new_tokens")) |
| result = client.predict(**predict_kwargs) |
|
|
| |
| if isinstance(result, (dict, list)): |
| text_out = json.dumps(result) |
| else: |
| text_out = str(result) |
|
|
| if not text_out or len(text_out.strip()) == 0: |
| raise RuntimeError("LLM returned empty response") |
|
|
| logger.info("LLM raw output (len=%d):\n%s", len(text_out or ""), (text_out[:2000] + "...") if len(text_out) > 2000 else text_out) |
|
|
| |
| parsed = None |
| try: |
| parsed = extract_json_via_regex(text_out) |
| except Exception: |
| |
| try: |
| parsed = json.loads(text_out) |
| if not isinstance(parsed, dict): |
| parsed = None |
| except Exception: |
| parsed = None |
|
|
| if parsed is None: |
| raise ValueError("Failed to extract JSON from LLM output") |
|
|
| |
| try: |
| logger.info("LLM parsed JSON:\n%s", json.dumps(parsed, indent=2, ensure_ascii=False)) |
| except Exception: |
| logger.info("LLM parsed JSON (raw dict): %s", str(parsed)) |
|
|
| |
| def safe_prob(val): |
| try: |
| v = float(val) |
| return max(0.0, min(1.0, v)) |
| except Exception: |
| return 0.0 |
|
|
| for k in [ |
| "jaundice_probability", |
| "anemia_probability", |
| "hydration_issue_probability", |
| "neurological_issue_probability" |
| ]: |
| parsed[k] = safe_prob(parsed.get(k, 0.0)) |
|
|
| try: |
| rs = float(parsed.get("risk_score", 0.0)) |
| parsed["risk_score"] = round(max(0.0, min(100.0, rs)), 2) |
| except Exception: |
| parsed["risk_score"] = 0.0 |
|
|
| parsed["confidence"] = safe_prob(parsed.get("confidence", 0.0)) |
| parsed["summary"] = str(parsed.get("summary", "") or "").strip() |
| parsed["recommendation"] = str(parsed.get("recommendation", "") or "").strip() |
|
|
| for k in [ |
| "jaundice_probability", |
| "anemia_probability", |
| "hydration_issue_probability", |
| "neurological_issue_probability", |
| "confidence", |
| "risk_score" |
| ]: |
| parsed[f"{k}_was_missing"] = False |
|
|
| return parsed |
|
|
| except AppError as app_e: |
| logger.exception("LLM AppError (remote validation failed) on attempt %d: %s", attempt, str(app_e)) |
| last_exc = app_e |
| if attempt == 1: |
| predict_kwargs["temperature"] = 0.2 |
| predict_kwargs["max_new_tokens"] = float(512) |
| logger.info("Retrying LLM call with temperature=0.2 and max_new_tokens=512") |
| continue |
| else: |
| raise RuntimeError(f"LLM call failed (AppError): {app_e}") |
| except Exception as e: |
| logger.exception("LLM call failed on attempt %d: %s", attempt, str(e)) |
| last_exc = e |
| if attempt == 1: |
| predict_kwargs["temperature"] = 0.2 |
| predict_kwargs["max_new_tokens"] = float(512) |
| continue |
| raise RuntimeError(f"LLM call failed: {e}") |
|
|
| raise RuntimeError(f"LLM call ultimately failed: {last_exc}") |
|
|
| |
| |
| |
| @app.get("/") |
| async def read_root(): |
| return {"message": "Elderly HealthWatch AI Backend"} |
|
|
| @app.get("/health") |
| async def health_check(): |
| impl = None |
| if mtcnn is None: |
| impl = "none" |
| elif isinstance(mtcnn, dict) and mtcnn.get("impl") == "opencv": |
| impl = "opencv_haar_fallback" |
| else: |
| impl = _MTCNN_IMPL |
| return { |
| "status": "healthy", |
| "detector": impl, |
| "vlm_available": GRADIO_AVAILABLE, |
| "vlm_space": GRADIO_VLM_SPACE, |
| "llm_space": LLM_GRADIO_SPACE |
| } |
|
|
| @app.post("/api/v1/validate-eye-photo") |
| async def validate_eye_photo(image: UploadFile = File(...)): |
| if mtcnn is None: |
| raise HTTPException(status_code=500, detail="No face detector available in this deployment.") |
| try: |
| content = await image.read() |
| if not content: |
| raise HTTPException(status_code=400, detail="Empty file uploaded.") |
| pil_img = load_image_from_bytes(content) |
| img_arr = np.asarray(pil_img) |
|
|
| if not isinstance(mtcnn, dict) and _MTCNN_IMPL == "facenet_pytorch": |
| try: |
| boxes, probs, landmarks = mtcnn.detect(pil_img, landmarks=True) |
| if boxes is None or len(boxes) == 0: |
| return {"valid": False, "face_detected": False, "eye_openness_score": 0.0, |
| "message_english": "No face detected. Please ensure your face is clearly visible in the frame.", |
| "message_hindi": "कोई चेहरा नहीं मिला। कृपया सुनिश्चित करें कि आपका चेहरा फ्रेम में स्पष्ट रूप से दिखाई दे रहा है।"} |
| prob = float(probs[0]) if probs is not None else 0.0 |
| lm = landmarks[0] if landmarks is not None else None |
| if lm is not None and len(lm) >= 2: |
| left_eye = {"x": float(lm[0][0]), "y": float(lm[0][1])} |
| right_eye = {"x": float(lm[1][0]), "y": float(lm[1][1])} |
| else: |
| left_eye = right_eye = None |
| eye_openness_score = estimate_eye_openness_from_detection(prob) |
| is_valid = eye_openness_score >= 0.3 |
| return {"valid": bool(is_valid), "face_detected": True, "eye_openness_score": round(eye_openness_score, 2), |
| "message_english": "Photo looks good! Eyes are properly open." if is_valid else "Eyes appear to be closed or partially closed. Please open your eyes wide and try again.", |
| "message_hindi": "फोटो अच्छी है! आंखें ठीक से खुली हैं।" if is_valid else "आंखें बंद या आंशिक रूप से बंद दिखाई दे रही हैं। कृपया अपनी आंखें चौड़ी खोलें और पुनः प्रयास करें।", |
| "eye_landmarks": {"left_eye": left_eye, "right_eye": right_eye}} |
| except Exception: |
| traceback.print_exc() |
| raise HTTPException(status_code=500, detail="Face detector failed during inference.") |
|
|
| if not isinstance(mtcnn, dict) and _MTCNN_IMPL == "mtcnn": |
| try: |
| detections = mtcnn.detect_faces(img_arr) |
| except Exception: |
| detections = mtcnn.detect_faces(pil_img) |
| if not detections: |
| return {"valid": False, "face_detected": False, "eye_openness_score": 0.0, |
| "message_english": "No face detected. Please ensure your face is clearly visible in the frame.", |
| "message_hindi": "कोई चेहरा नहीं मिला। कृपया सुनिश्चित करें कि आपका चेहरा फ्रेम में स्पष्ट रूप से दिखाई दे रहा है।"} |
| face = detections[0] |
| keypoints = face.get("keypoints", {}) |
| left_eye = keypoints.get("left_eye") |
| right_eye = keypoints.get("right_eye") |
| confidence = float(face.get("confidence", 0.0)) |
| eye_openness_score = estimate_eye_openness_from_detection(confidence) |
| is_valid = eye_openness_score >= 0.3 |
| return {"valid": bool(is_valid), "face_detected": True, "eye_openness_score": round(eye_openness_score, 2), |
| "message_english": "Photo looks good! Eyes are properly open." if is_valid else "Eyes appear to be closed or partially closed. Please open your eyes wide and try again.", |
| "message_hindi": "फोटो अच्छी है! आंखें ठीक से खुली हैं।" if is_valid else "आंखें बंद या आंशिक रूप से बंद दिखाई दे रही हैं। कृपया अपनी आंखें चौड़ी खोलें और पुनः प्रयास करें।", |
| "eye_landmarks": {"left_eye": left_eye, "right_eye": right_eye}} |
|
|
| if isinstance(mtcnn, dict) and mtcnn.get("impl") == "opencv": |
| try: |
| gray = cv2.cvtColor(img_arr, cv2.COLOR_RGB2GRAY) |
| face_cascade = mtcnn["face_cascade"] |
| eye_cascade = mtcnn["eye_cascade"] |
| faces = face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=4, minSize=(60, 60)) |
| if len(faces) == 0: |
| return {"valid": False, "face_detected": False, "eye_openness_score": 0.0, |
| "message_english": "No face detected. Please ensure your face is clearly visible in the frame.", |
| "message_hindi": "कोई चेहरा नहीं मिला। कृपया सुनिश्चित करें कि आपका चेहरा फ्रेम में स्पष्ट रूप से दिखाई दे रहा है।"} |
| (x, y, w, h) = faces[0] |
| roi_gray = gray[y:y+h, x:x+w] |
| eyes = eye_cascade.detectMultiScale(roi_gray, scaleFactor=1.1, minNeighbors=5, minSize=(20, 10)) |
| eye_openness_score = 1.0 if len(eyes) >= 1 else 0.0 |
| is_valid = eye_openness_score >= 0.3 |
| left_eye = None |
| right_eye = None |
| if len(eyes) >= 1: |
| ex, ey, ew, eh = eyes[0] |
| cx = float(x + ex + ew/2) |
| cy = float(y + ey + eh/2) |
| left_eye = {"x": cx, "y": cy} |
| return {"valid": bool(is_valid), "face_detected": True, "eye_openness_score": round(eye_openness_score, 2), |
| "message_english": "Photo looks good! Eyes are detected." if is_valid else "Eyes not detected. Please open your eyes wide and try again.", |
| "message_hindi": "फोटो अच्छी है! आंखें मिलीं।" if is_valid else "आंखें नहीं मिलीं। कृपया अपनी आंखें चौड़ी खोलें और पुनः प्रयास करें।", |
| "eye_landmarks": {"left_eye": left_eye, "right_eye": right_eye}} |
| except Exception: |
| traceback.print_exc() |
| raise HTTPException(status_code=500, detail="OpenCV fallback detector failed.") |
|
|
| raise HTTPException(status_code=500, detail="Invalid detector configuration.") |
| except HTTPException: |
| raise |
| except Exception as e: |
| traceback.print_exc() |
| return {"valid": False, "face_detected": False, "eye_openness_score": 0.0, |
| "message_english": "Error processing image. Please try again.", |
| "message_hindi": "छवि प्रोसेस करने में त्रुटि। कृपया पुनः प्रयास करें।", |
| "error": str(e)} |
|
|
| @app.post("/api/v1/upload") |
| async def upload_images( |
| background_tasks: BackgroundTasks, |
| face_image: UploadFile = File(...), |
| eye_image: UploadFile = File(...) |
| ): |
| """ |
| Save images and enqueue background processing. VLM -> LLM runs inside process_screening. |
| """ |
| try: |
| screening_id = str(uuid.uuid4()) |
| now = datetime.utcnow().isoformat() + "Z" |
| tmp_dir = "/tmp/elderly_healthwatch" |
| os.makedirs(tmp_dir, exist_ok=True) |
| face_path = os.path.join(tmp_dir, f"{screening_id}_face.jpg") |
| eye_path = os.path.join(tmp_dir, f"{screening_id}_eye.jpg") |
| face_bytes = await face_image.read() |
| eye_bytes = await eye_image.read() |
| with open(face_path, "wb") as f: |
| f.write(face_bytes) |
| with open(eye_path, "wb") as f: |
| f.write(eye_bytes) |
| screenings_db[screening_id] = { |
| "id": screening_id, |
| "timestamp": now, |
| "face_image_path": face_path, |
| "eye_image_path": eye_path, |
| "status": "queued", |
| "quality_metrics": {}, |
| "ai_results": {}, |
| "disease_predictions": [], |
| "recommendations": {} |
| } |
| background_tasks.add_task(process_screening, screening_id) |
| return {"screening_id": screening_id} |
| except Exception as e: |
| traceback.print_exc() |
| raise HTTPException(status_code=500, detail=f"Failed to upload images: {e}") |
|
|
| @app.post("/api/v1/analyze/{screening_id}") |
| async def analyze_screening(screening_id: str, background_tasks: BackgroundTasks): |
| if screening_id not in screenings_db: |
| raise HTTPException(status_code=404, detail="Screening not found") |
| if screenings_db[screening_id].get("status") == "processing": |
| return {"message": "Already processing"} |
| screenings_db[screening_id]["status"] = "queued" |
| background_tasks.add_task(process_screening, screening_id) |
| return {"message": "Analysis enqueued"} |
|
|
| @app.get("/api/v1/status/{screening_id}") |
| async def get_status(screening_id: str): |
| if screening_id not in screenings_db: |
| raise HTTPException(status_code=404, detail="Screening not found") |
| status = screenings_db[screening_id].get("status", "unknown") |
| progress = 50 if status == "processing" else (100 if status == "completed" else 0) |
| return {"screening_id": screening_id, "status": status, "progress": progress} |
|
|
| @app.get("/api/v1/results/{screening_id}") |
| async def get_results(screening_id: str): |
| if screening_id not in screenings_db: |
| raise HTTPException(status_code=404, detail="Screening not found") |
| |
| entry = screenings_db[screening_id] |
| entry.setdefault("ai_results", {}) |
| entry["ai_results"].setdefault("vlm_raw", entry.get("ai_results", {}).get("vlm_raw", "")) |
| return entry |
|
|
| @app.get("/api/v1/history/{user_id}") |
| async def get_history(user_id: str): |
| history = [s for s in screenings_db.values() if s.get("user_id") == user_id] |
| return {"screenings": history} |
|
|
| |
| |
| |
| @app.post("/api/v1/get-vitals") |
| async def get_vitals_from_upload( |
| face_image: UploadFile = File(...), |
| eye_image: UploadFile = File(...) |
| ): |
| """ |
| Run VLM -> LLM pipeline synchronously (but off the event loop) and return: |
| { vlm_parsed_features, vlm_raw_output, llm_structured_risk } |
| """ |
| if not GRADIO_AVAILABLE: |
| raise HTTPException(status_code=500, detail="VLM/LLM client not available in this deployment.") |
|
|
| |
| try: |
| tmp_dir = "/tmp/elderly_healthwatch" |
| os.makedirs(tmp_dir, exist_ok=True) |
| uid = str(uuid.uuid4()) |
| face_path = os.path.join(tmp_dir, f"{uid}_face.jpg") |
| eye_path = os.path.join(tmp_dir, f"{uid}_eye.jpg") |
| face_bytes = await face_image.read() |
| eye_bytes = await eye_image.read() |
| with open(face_path, "wb") as f: |
| f.write(face_bytes) |
| with open(eye_path, "wb") as f: |
| f.write(eye_bytes) |
| except Exception as e: |
| logger.exception("Failed saving uploaded images") |
| raise HTTPException(status_code=500, detail=f"Failed saving images: {e}") |
|
|
| try: |
| |
| vlm_features, vlm_raw = await asyncio.to_thread(run_vlm_and_get_features, face_path, eye_path) |
|
|
| |
| logger.info("get_vitals_from_upload - VLM raw (snippet): %s", (vlm_raw[:500] + "...") if vlm_raw else "<EMPTY>") |
| logger.info("get_vitals_from_upload - VLM parsed features: %s", json.dumps(vlm_features, indent=2, ensure_ascii=False) if vlm_features else "None") |
|
|
| |
| if vlm_features: |
| llm_input = json.dumps(vlm_features, ensure_ascii=False) |
| logger.info("Feeding CLEANED VLM JSON to LLM (len=%d).", len(llm_input)) |
| else: |
| llm_input = vlm_raw if vlm_raw and vlm_raw.strip() else "{}" |
| logger.info("Feeding RAW VLM STRING to LLM (len=%d).", len(llm_input)) |
|
|
| |
| structured_risk = await asyncio.to_thread(run_llm_on_vlm, llm_input) |
|
|
| |
| return { |
| "vlm_raw_output": vlm_raw, |
| "vlm_parsed_features": vlm_features, |
| "llm_structured_risk": structured_risk |
| } |
| except Exception as e: |
| logger.exception("get_vitals_from_upload pipeline failed") |
| raise HTTPException(status_code=500, detail=f"Pipeline failed: {e}") |
|
|
| @app.post("/api/v1/get-vitals/{screening_id}") |
| async def get_vitals_for_screening(screening_id: str): |
| """ |
| Re-run VLM->LLM on images already stored for `screening_id` in screenings_db. |
| Useful for re-processing or debugging. |
| """ |
| if screening_id not in screenings_db: |
| raise HTTPException(status_code=404, detail="Screening not found") |
|
|
| entry = screenings_db[screening_id] |
| face_path = entry.get("face_image_path") |
| eye_path = entry.get("eye_image_path") |
| if not (face_path and os.path.exists(face_path) and eye_path and os.path.exists(eye_path)): |
| raise HTTPException(status_code=400, detail="Stored images missing for this screening") |
|
|
| try: |
| |
| vlm_features, vlm_raw = await asyncio.to_thread(run_vlm_and_get_features, face_path, eye_path) |
|
|
| logger.info("get_vitals_for_screening(%s) - VLM raw (snippet): %s", screening_id, (vlm_raw[:500] + "...") if vlm_raw else "<EMPTY>") |
| logger.info("get_vitals_for_screening(%s) - VLM parsed features: %s", screening_id, json.dumps(vlm_features, indent=2, ensure_ascii=False) if vlm_features else "None") |
|
|
| if vlm_features: |
| llm_input = json.dumps(vlm_features, ensure_ascii=False) |
| logger.info("Feeding CLEANED VLM JSON to LLM (len=%d).", len(llm_input)) |
| else: |
| llm_input = vlm_raw if vlm_raw and vlm_raw.strip() else "{}" |
| logger.info("Feeding RAW VLM STRING to LLM (len=%d).", len(llm_input)) |
|
|
| structured_risk = await asyncio.to_thread(run_llm_on_vlm, llm_input) |
|
|
| |
| entry.setdefault("ai_results", {}) |
| entry["ai_results"].update({ |
| "vlm_parsed_features": vlm_features, |
| "vlm_raw": vlm_raw, |
| "structured_risk": structured_risk, |
| "last_vitals_run": datetime.utcnow().isoformat() + "Z" |
| }) |
|
|
| return { |
| "screening_id": screening_id, |
| "vlm_raw_output": vlm_raw, |
| "vlm_parsed_features": vlm_features, |
| "llm_structured_risk": structured_risk |
| } |
| except Exception as e: |
| logger.exception("get_vitals_for_screening pipeline failed") |
| raise HTTPException(status_code=500, detail=f"Pipeline failed: {e}") |
|
|
| |
| |
| |
| async def process_screening(screening_id: str): |
| """ |
| Main pipeline: |
| - load images |
| - quick detector-based quality metrics |
| - run VLM -> vlm_features (dict or None) + vlm_raw (string) |
| - run LLM on vlm_features (preferred) or vlm_raw -> structured risk JSON |
| - merge results into ai_results and finish |
| """ |
| try: |
| if screening_id not in screenings_db: |
| logger.error("[process_screening] screening %s not found", screening_id) |
| return |
| screenings_db[screening_id]["status"] = "processing" |
| logger.info("[process_screening] Starting %s", screening_id) |
|
|
| entry = screenings_db[screening_id] |
| face_path = entry.get("face_image_path") |
| eye_path = entry.get("eye_image_path") |
|
|
| if not (face_path and os.path.exists(face_path)): |
| raise RuntimeError("Face image missing") |
| if not (eye_path and os.path.exists(eye_path)): |
| raise RuntimeError("Eye image missing") |
|
|
| face_img = Image.open(face_path).convert("RGB") |
| eye_img = Image.open(eye_path).convert("RGB") |
|
|
| |
| face_detected = False |
| face_confidence = 0.0 |
| left_eye_coord = right_eye_coord = None |
|
|
| if mtcnn is not None and not isinstance(mtcnn, dict) and (_MTCNN_IMPL == "facenet_pytorch" or _MTCNN_IMPL == "mtcnn"): |
| try: |
| if _MTCNN_IMPL == "facenet_pytorch": |
| boxes, probs, landmarks = mtcnn.detect(face_img, landmarks=True) |
| if boxes is not None and len(boxes) > 0: |
| face_detected = True |
| face_confidence = float(probs[0]) if probs is not None else 0.0 |
| if landmarks is not None: |
| lm = landmarks[0] |
| if len(lm) >= 2: |
| left_eye_coord = {"x": float(lm[0][0]), "y": float(lm[0][1])} |
| right_eye_coord = {"x": float(lm[1][0]), "y": float(lm[1][1])} |
| else: |
| arr = np.asarray(face_img) |
| detections = mtcnn.detect_faces(arr) |
| if detections: |
| face_detected = True |
| face_confidence = float(detections[0].get("confidence", 0.0)) |
| k = detections[0].get("keypoints", {}) |
| left_eye_coord = k.get("left_eye") |
| right_eye_coord = k.get("right_eye") |
| except Exception: |
| traceback.print_exc() |
|
|
| if isinstance(mtcnn, dict) and mtcnn.get("impl") == "opencv": |
| try: |
| arr = np.asarray(face_img) |
| gray = cv2.cvtColor(arr, cv2.COLOR_RGB2GRAY) |
| face_cascade = mtcnn["face_cascade"] |
| eye_cascade = mtcnn["eye_cascade"] |
| faces = face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=4, minSize=(60, 60)) |
| if len(faces) > 0: |
| face_detected = True |
| (x, y, w, h) = faces[0] |
| face_confidence = min(1.0, (w*h) / (arr.shape[0]*arr.shape[1]) * 4.0) |
| roi_gray = gray[y:y+h, x:x+w] |
| eyes = eye_cascade.detectMultiScale(roi_gray, scaleFactor=1.1, minNeighbors=5, minSize=(20, 10)) |
| if len(eyes) >= 1: |
| ex, ey, ew, eh = eyes[0] |
| left_eye_coord = {"x": float(x + ex + ew/2), "y": float(y + ey + eh/2)} |
| except Exception: |
| traceback.print_exc() |
|
|
| face_quality_score = 0.85 if face_detected and face_confidence > 0.6 else 0.45 |
| quality_metrics = { |
| "face_detected": face_detected, |
| "face_confidence": round(face_confidence, 3), |
| "face_quality_score": round(face_quality_score, 2), |
| "eye_coords": {"left_eye": left_eye_coord, "right_eye": right_eye_coord}, |
| "face_brightness": int(np.mean(np.asarray(face_img.convert("L")))), |
| "face_blur_estimate": int(np.var(np.asarray(face_img.convert("L")))) |
| } |
| screenings_db[screening_id]["quality_metrics"] = quality_metrics |
|
|
| |
| |
| |
| vlm_features = None |
| vlm_raw = None |
| try: |
| vlm_features, vlm_raw = run_vlm_and_get_features(face_path, eye_path) |
| screenings_db[screening_id].setdefault("ai_results", {}) |
| screenings_db[screening_id]["ai_results"].update({ |
| "vlm_parsed_features": vlm_features, |
| "vlm_raw": vlm_raw |
| }) |
| except Exception as e: |
| logger.exception("VLM feature extraction failed") |
| screenings_db[screening_id].setdefault("ai_results", {}) |
| screenings_db[screening_id]["ai_results"].update({"vlm_error": str(e)}) |
| vlm_features = None |
| vlm_raw = "" |
|
|
| |
| logger.info("process_screening(%s) - VLM raw (snippet): %s", screening_id, (vlm_raw[:500] + "...") if vlm_raw else "<EMPTY>") |
| logger.info("process_screening(%s) - VLM parsed features: %s", screening_id, json.dumps(vlm_features, indent=2, ensure_ascii=False) if vlm_features else "None") |
|
|
| |
| |
| |
| structured_risk = None |
| try: |
| if vlm_features: |
| |
| llm_input = json.dumps(vlm_features, ensure_ascii=False) |
| else: |
| |
| llm_input = vlm_raw if vlm_raw and vlm_raw.strip() else "{}" |
|
|
| structured_risk = run_llm_on_vlm(llm_input) |
| screenings_db[screening_id].setdefault("ai_results", {}) |
| screenings_db[screening_id]["ai_results"].update({"structured_risk": structured_risk}) |
| except Exception as e: |
| logger.exception("LLM processing failed") |
| screenings_db[screening_id].setdefault("ai_results", {}) |
| screenings_db[screening_id]["ai_results"].update({"llm_error": str(e)}) |
| structured_risk = { |
| "risk_score": 0.0, |
| "jaundice_probability": 0.0, |
| "anemia_probability": 0.0, |
| "hydration_issue_probability": 0.0, |
| "neurological_issue_probability": 0.0, |
| "summary": "", |
| "recommendation": "", |
| "confidence": 0.0 |
| } |
|
|
| |
| screenings_db[screening_id].setdefault("ai_results", {}) |
| screenings_db[screening_id]["ai_results"].update({ |
| "processing_time_ms": 1200 |
| }) |
|
|
| disease_predictions = [ |
| { |
| "condition": "Anemia-like-signs", |
| "risk_level": "Medium" if structured_risk.get("anemia_probability", 0.0) > 0.5 else "Low", |
| "probability": structured_risk.get("anemia_probability", 0.0), |
| "confidence": structured_risk.get("confidence", 0.0) |
| }, |
| { |
| "condition": "Jaundice-like-signs", |
| "risk_level": "Medium" if structured_risk.get("jaundice_probability", 0.0) > 0.5 else "Low", |
| "probability": structured_risk.get("jaundice_probability", 0.0), |
| "confidence": structured_risk.get("confidence", 0.0) |
| } |
| ] |
|
|
| recommendations = { |
| "action_needed": "consult" if structured_risk.get("risk_score", 0.0) > 30.0 else "monitor", |
| "message_english": structured_risk.get("recommendation", "") or f"Please follow up with a health professional if concerns persist.", |
| "message_hindi": "" |
| } |
|
|
| screenings_db[screening_id].update({ |
| "status": "completed", |
| "disease_predictions": disease_predictions, |
| "recommendations": recommendations |
| }) |
|
|
| logger.info("[process_screening] Completed %s", screening_id) |
| except Exception as e: |
| traceback.print_exc() |
| if screening_id in screenings_db: |
| screenings_db[screening_id]["status"] = "failed" |
| screenings_db[screening_id]["error"] = str(e) |
| else: |
| logger.error("[process_screening] Failed for unknown screening %s: %s", screening_id, str(e)) |
|
|
| |
| |
| |
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=False) |
|
|