cloud / app.py
dpv007's picture
Update app.py
3bfde28 verified
raw
history blame
45.6 kB
# app.py
"""
Elderly HealthWatch AI Backend (FastAPI)
Pipeline:
- receive images
- run VLM (remote gradio / chat_fn) -> JSON feature vector + raw text
- run LLM (remote gradio /chat) -> structured risk JSON (per requested schema)
- continue rest of processing and store results
Notes:
- Add gradio_client==1.13.2 (or another compatible 1.x) to requirements.txt
- If VLM/LLM Spaces are private, set HF_TOKEN in the environment for authentication.
- This final variant:
* logs raw VLM responses,
* always returns raw VLM output in API responses,
* extracts JSON from VLM via regex when possible, and
* sends either cleaned JSON or raw VLM string into LLM (and logs which was used).
- VLM calls were simplified to a single call (no retries).
"""
import io
import os
import uuid
import json
import asyncio
import logging
import traceback
import re
import time
from typing import Dict, Any, Optional, Tuple
from datetime import datetime
from fastapi import FastAPI, UploadFile, File, BackgroundTasks, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from PIL import Image
import numpy as np
import cv2 # opencv-python-headless expected installed
# Optional gradio client (for VLM + LLM calls)
try:
from gradio_client import Client, handle_file # type: ignore
GRADIO_AVAILABLE = True
except Exception:
GRADIO_AVAILABLE = False
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("elderly_healthwatch")
# Configuration for remote VLM and LLM spaces (change to your target Space names)
GRADIO_VLM_SPACE = os.getenv("GRADIO_SPACE", "developer0hye/Qwen3-VL-8B-Instruct")
LLM_GRADIO_SPACE = os.getenv("LLM_GRADIO_SPACE", "Tonic/med-gpt-oss-20b-demo")
HF_TOKEN = os.getenv("HF_TOKEN", None)
# Default VLM prompt
DEFAULT_VLM_PROMPT = (
"From the provided face/eye images, compute the required screening features "
"(pallor, sclera yellowness, redness, mobility metrics, quality checks) "
"and output a clean JSON feature vector only with values ranging as probabilities."
)
# Default LLM prompts / metadata (stricter: force JSON-only output)
LLM_MODEL_IDENTITY = os.getenv(
"LLM_MODEL_IDENTITY",
"You are GPT-Tonic, a large language model trained by TonicAI for clinical reasoning."
)
LLM_SYSTEM_PROMPT = os.getenv(
"LLM_SYSTEM_PROMPT",
"System: This assistant MUST ONLY OUTPUT a single valid JSON object as its response — no prose, no explanations, no code fences, no annotations. The JSON must follow the schema requested by the user."
)
LLM_DEVELOPER_PROMPT = os.getenv(
"LLM_DEVELOPER_PROMPT",
"Developer: Output ONLY a single valid JSON object with keys: risk_score, jaundice_probability, anemia_probability, hydration_issue_probability, neurological_issue_probability, summary, recommendation, confidence. Do NOT include any extra fields or natural language outside the JSON object."
)
# Try MTCNN libs; fallback to OpenCV haar cascades
_MTCNN_IMPL = None
try:
from facenet_pytorch import MTCNN as FacenetMTCNN # type: ignore
_MTCNN_IMPL = "facenet_pytorch"
except Exception:
FacenetMTCNN = None
_MTCNN_IMPL = None
if _MTCNN_IMPL is None:
try:
from mtcnn import MTCNN as ClassicMTCNN # type: ignore
_MTCNN_IMPL = "mtcnn"
except Exception:
ClassicMTCNN = None
def create_mtcnn_or_fallback():
if _MTCNN_IMPL == "facenet_pytorch" and FacenetMTCNN is not None:
try:
return FacenetMTCNN(keep_all=False, device="cpu")
except Exception:
pass
if _MTCNN_IMPL == "mtcnn" and ClassicMTCNN is not None:
try:
return ClassicMTCNN()
except Exception:
pass
# OpenCV Haar fallback
try:
face_cascade_path = os.path.join(cv2.data.haarcascades, "haarcascade_frontalface_default.xml")
eye_cascade_path = os.path.join(cv2.data.haarcascades, "haarcascade_eye.xml")
if os.path.exists(face_cascade_path) and os.path.exists(eye_cascade_path):
return {
"impl": "opencv",
"face_cascade": cv2.CascadeClassifier(face_cascade_path),
"eye_cascade": cv2.CascadeClassifier(eye_cascade_path)
}
except Exception:
pass
return None
mtcnn = create_mtcnn_or_fallback()
app = FastAPI(title="Elderly HealthWatch AI Backend")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# In-memory DB for demo
screenings_db: Dict[str, Dict[str, Any]] = {}
# -----------------------
# Utility helpers
# -----------------------
def load_image_from_bytes(bytes_data: bytes) -> Image.Image:
return Image.open(io.BytesIO(bytes_data)).convert("RGB")
def estimate_eye_openness_from_detection(confidence: float) -> float:
try:
conf = float(confidence)
openness = min(max((conf * 1.15), 0.0), 1.0)
return openness
except Exception:
return 0.0
# -----------------------
# Regex-based robust extractor (used for both VLM raw parsing & LLM raw parsing)
# -----------------------
def extract_json_via_regex(raw_text: str) -> Dict[str, Any]:
"""
Extract numeric fields and text fields from the first {...} block found in raw_text.
Returns a dict with:
- risk_score (0..100)
- jaundice_probability (0..1)
- anemia_probability (0..1)
- hydration_issue_probability (0..1)
- neurological_issue_probability (0..1)
- confidence (0..1)
- summary (string)
- recommendation (string)
"""
match = re.search(r"\{[\s\S]*\}", raw_text)
if not match:
raise ValueError("No JSON-like block found in text")
block = match.group(0)
def find_number_for_key(key: str) -> Optional[float]:
patterns = [
rf'"{key}"\s*:\s*["\']?\s*([-+]?\d+(\.\d+)?)\s*%?\s*["\']?',
rf"'{key}'\s*:\s*['\"]?\s*([-+]?\d+(\.\d+)?)\s*%?\s*['\"]?",
rf'\b{key}\b\s*:\s*["\']?\s*([-+]?\d+(\.\d+)?)\s*%?\s*["\']?',
rf'"{key}"\s*:\s*["\']([^"\']+)["\']',
rf"'{key}'\s*:\s*['\"]([^'\"]+)['\"]"
]
for pat in patterns:
m = re.search(pat, block, flags=re.IGNORECASE)
if not m:
continue
g = m.group(1)
if g is None:
continue
s = str(g).strip().replace("%", "").strip()
try:
return float(s)
except Exception:
return None
return None
def find_text_for_key(key: str) -> str:
m = re.search(rf'"{key}"\s*:\s*"([^"]*)"', block, flags=re.IGNORECASE)
if m:
return m.group(1).strip()
m = re.search(rf"'{key}'\s*:\s*'([^']*)'", block, flags=re.IGNORECASE)
if m:
return m.group(1).strip()
m = re.search(rf'\b{key}\b\s*:\s*([^\n,}}]+)', block, flags=re.IGNORECASE)
if m:
return m.group(1).strip().strip('",')
return ""
raw_risk = find_number_for_key("risk_score")
raw_jaundice = find_number_for_key("jaundice_probability")
raw_anemia = find_number_for_key("anemia_probability")
raw_hydration = find_number_for_key("hydration_issue_probability")
raw_neuro = find_number_for_key("neurological_issue_probability")
raw_conf = find_number_for_key("confidence")
def normalize_prob(v: Optional[float]) -> float:
if v is None:
return 0.0
if v > 1.0 and v <= 100.0:
return max(0.0, min(1.0, v / 100.0))
if v > 100.0:
return 1.0
return max(0.0, min(1.0, v))
jaundice_probability = normalize_prob(raw_jaundice)
anemia_probability = normalize_prob(raw_anemia)
hydration_issue_probability = normalize_prob(raw_hydration)
neurological_issue_probability = normalize_prob(raw_neuro)
confidence = normalize_prob(raw_conf)
def normalize_risk(v: Optional[float]) -> float:
if v is None:
return 0.0
if v <= 1.0:
return round(max(0.0, min(100.0, v * 100.0)), 2)
if v > 1.0 and v <= 100.0:
return round(max(0.0, min(100.0, v)), 2)
return round(max(0.0, min(100.0, v if v < float('inf') else 100.0)), 2)
risk_score = normalize_risk(raw_risk)
summary = find_text_for_key("summary")
recommendation = find_text_for_key("recommendation")
out = {
"risk_score": risk_score,
"jaundice_probability": round(jaundice_probability, 4),
"anemia_probability": round(anemia_probability, 4),
"hydration_issue_probability": round(hydration_issue_probability, 4),
"neurological_issue_probability": round(neurological_issue_probability, 4),
"confidence": round(confidence, 4),
"summary": summary,
"recommendation": recommendation
}
return out
# -----------------------
# Gradio / VLM helper (single-call, no retries)
# -----------------------
def get_gradio_client_for_space(space: str) -> Client:
if not GRADIO_AVAILABLE:
raise RuntimeError("gradio_client not installed in this environment. Add gradio_client to requirements.txt.")
if HF_TOKEN:
return Client(space, hf_token=HF_TOKEN)
return Client(space)
def run_vlm_and_get_features(face_path: str, eye_path: str, prompt: Optional[str] = None) -> Tuple[Optional[Dict[str, Any]], str]:
"""
Synchronous call to remote VLM (gradio /chat_fn). Returns tuple:
(parsed_features_dict_or_None, raw_text_response_str)
Simplified: single call (no retries). Attempts json.loads then regex extraction.
"""
prompt = prompt or DEFAULT_VLM_PROMPT
if not os.path.exists(face_path) or not os.path.exists(eye_path):
raise FileNotFoundError("Face or eye image path missing for VLM call.")
if not GRADIO_AVAILABLE:
raise RuntimeError("gradio_client not available in this environment.")
client = get_gradio_client_for_space(GRADIO_VLM_SPACE)
message = {"text": prompt, "files": [handle_file(face_path), handle_file(eye_path)]}
# SINGLE CALL (no retries)
try:
logger.info("Calling VLM Space %s", GRADIO_VLM_SPACE)
result = client.predict(message=message, history=[], api_name="/chat_fn")
except Exception as e:
logger.exception("VLM call failed (no retries)")
raise RuntimeError(f"VLM call failed: {e}")
# Normalize result
raw_text = ""
if not result:
logger.warning("VLM returned empty result object")
raw_text = ""
else:
if isinstance(result, (list, tuple)):
out = result[0]
elif isinstance(result, dict):
out = result
else:
out = {"text": str(result)}
text_out = out.get("text") or out.get("output") or ""
raw_text = text_out
logger.info("VLM response object (debug): %s", out)
# If files present but text empty, log it explicitly
if isinstance(out, dict) and ("files" in out) and (not text_out.strip()):
logger.warning("VLM returned no text AND files: %s", out.get("files"))
# Log raw VLM output for debugging/auditing
logger.info("VLM raw output (length=%d):\n%s", len(raw_text or ""), (raw_text[:1000] + "...") if raw_text and len(raw_text) > 1000 else (raw_text or "<EMPTY>"))
# Try to parse JSON first (fast path)
parsed_features = None
try:
parsed_features = json.loads(raw_text) if raw_text and raw_text.strip() else None
if parsed_features is not None and not isinstance(parsed_features, dict):
parsed_features = None
except Exception:
parsed_features = None
# If json.loads failed or returned None, try regex-based extraction
if parsed_features is None and raw_text and raw_text.strip():
try:
parsed_features = extract_json_via_regex(raw_text)
logger.info("VLM regex-extracted features:\n%s", json.dumps(parsed_features, indent=2, ensure_ascii=False))
except Exception as e:
logger.info("VLM regex extraction failed or found nothing: %s", str(e))
parsed_features = None
if parsed_features is None:
logger.info("VLM parsed features: None (will fallback to sending '{}' or raw string to LLM).")
else:
logger.info("VLM parsed features (final): %s", json.dumps(parsed_features, ensure_ascii=False))
# Always return raw_text (may be empty string) and parsed_features (or None)
return parsed_features, (raw_text or "")
# -----------------------
# Gradio / LLM helper (defensive, with retry + clamps)
# -----------------------
def run_llm_on_vlm(vlm_features_or_raw: Any,
max_new_tokens: int = 1024,
temperature: float = 0.0,
reasoning_effort: str = "medium",
model_identity: Optional[str] = None,
system_prompt: Optional[str] = None,
developer_prompt: Optional[str] = None) -> Dict[str, Any]:
"""
Call the remote LLM Space's /chat endpoint with defensive input handling and a single retry.
- Logs the VLM raw string and the chosen payload.
- Sends cleaned JSON (json.dumps(vlm_features)) if vlm_features_or_raw is dict, else sends raw string.
- Uses regex to extract the final JSON from LLM raw output.
"""
if not GRADIO_AVAILABLE:
raise RuntimeError("gradio_client not installed. Add gradio_client to requirements.txt")
# Try to import AppError for specific handling; fallback to Exception if unavailable
try:
from gradio_client import AppError # type: ignore
except Exception:
AppError = Exception # fallback
client = get_gradio_client_for_space(LLM_GRADIO_SPACE)
model_identity = model_identity or LLM_MODEL_IDENTITY
system_prompt = system_prompt or LLM_SYSTEM_PROMPT
developer_prompt = developer_prompt or LLM_DEVELOPER_PROMPT
# Decide what to send to LLM and log the raw input
if isinstance(vlm_features_or_raw, str):
vlm_raw_str = vlm_features_or_raw
logger.info("LLM input will be RAW VLM STRING (len=%d)", len(vlm_raw_str or ""))
vlm_json_str_to_send = vlm_raw_str if vlm_raw_str and vlm_raw_str.strip() else "{}"
else:
vlm_raw_str = json.dumps(vlm_features_or_raw, ensure_ascii=False) if vlm_features_or_raw else "{}"
logger.info("LLM input will be CLEANED VLM JSON (len=%d)", len(vlm_raw_str))
vlm_json_str_to_send = vlm_raw_str
# Build instruction payload
instruction = (
"\n\nSTRICT INSTRUCTIONS (READ CAREFULLY):\n"
"1) OUTPUT ONLY a single valid JSON object and nothing else — no prose, no explanation, no code fences.\n"
"2) The JSON MUST include these keys: risk_score, jaundice_probability, anemia_probability, "
"hydration_issue_probability, neurological_issue_probability, summary, recommendation, confidence.\n"
"3) Use numeric values for probabilities (0..1) and for risk_score (0..100). Use strings for summary and recommendation.\n"
"4) Do NOT mention disease names in summary or recommendation; use neutral wording only.\n"
"If you cannot estimate a value, set it to null.\n\n"
"Now, based on the VLM output below, produce ONLY the JSON object described above.\n\n"
"===BEGIN VLM OUTPUT===\n"
f"{vlm_json_str_to_send}\n"
"===END VLM OUTPUT===\n\n"
)
# Defensive coercion / clamps
try_max_new_tokens = int(max_new_tokens) if max_new_tokens is not None else 1024
if try_max_new_tokens <= 0:
try_max_new_tokens = 1024
try_temperature = float(temperature) if temperature is not None else 0.0
# Some Spaces validate temperature >= 0.1
if try_temperature < 0.1:
try_temperature = 0.1
predict_kwargs = dict(
input_data=instruction,
max_new_tokens=float(try_max_new_tokens),
model_identity=model_identity,
system_prompt=system_prompt,
developer_prompt=developer_prompt,
reasoning_effort=reasoning_effort,
temperature=float(try_temperature),
top_p=0.9,
top_k=50,
repetition_penalty=1.0,
api_name="/chat"
)
last_exc = None
for attempt in (1, 2):
try:
logger.info("Calling LLM Space %s (attempt %d) with temperature=%s, max_new_tokens=%s",
LLM_GRADIO_SPACE, attempt, predict_kwargs.get("temperature"), predict_kwargs.get("max_new_tokens"))
result = client.predict(**predict_kwargs)
# normalize to string
if isinstance(result, (dict, list)):
text_out = json.dumps(result)
else:
text_out = str(result)
if not text_out or len(text_out.strip()) == 0:
raise RuntimeError("LLM returned empty response")
logger.info("LLM raw output (len=%d):\n%s", len(text_out or ""), (text_out[:2000] + "...") if len(text_out) > 2000 else text_out)
# parse with regex extractor (may raise)
parsed = None
try:
parsed = extract_json_via_regex(text_out)
except Exception:
# fallback: attempt json.loads naive
try:
parsed = json.loads(text_out)
if not isinstance(parsed, dict):
parsed = None
except Exception:
parsed = None
if parsed is None:
raise ValueError("Failed to extract JSON from LLM output")
# pretty log parsed JSON
try:
logger.info("LLM parsed JSON:\n%s", json.dumps(parsed, indent=2, ensure_ascii=False))
except Exception:
logger.info("LLM parsed JSON (raw dict): %s", str(parsed))
# defensive clamps (same as extractor expectations)
def safe_prob(val):
try:
v = float(val)
return max(0.0, min(1.0, v))
except Exception:
return 0.0
for k in [
"jaundice_probability",
"anemia_probability",
"hydration_issue_probability",
"neurological_issue_probability"
]:
parsed[k] = safe_prob(parsed.get(k, 0.0))
try:
rs = float(parsed.get("risk_score", 0.0))
parsed["risk_score"] = round(max(0.0, min(100.0, rs)), 2)
except Exception:
parsed["risk_score"] = 0.0
parsed["confidence"] = safe_prob(parsed.get("confidence", 0.0))
parsed["summary"] = str(parsed.get("summary", "") or "").strip()
parsed["recommendation"] = str(parsed.get("recommendation", "") or "").strip()
for k in [
"jaundice_probability",
"anemia_probability",
"hydration_issue_probability",
"neurological_issue_probability",
"confidence",
"risk_score"
]:
parsed[f"{k}_was_missing"] = False
return parsed
except AppError as app_e:
logger.exception("LLM AppError (remote validation failed) on attempt %d: %s", attempt, str(app_e))
last_exc = app_e
if attempt == 1:
predict_kwargs["temperature"] = 0.2
predict_kwargs["max_new_tokens"] = float(512)
logger.info("Retrying LLM call with temperature=0.2 and max_new_tokens=512")
continue
else:
raise RuntimeError(f"LLM call failed (AppError): {app_e}")
except Exception as e:
logger.exception("LLM call failed on attempt %d: %s", attempt, str(e))
last_exc = e
if attempt == 1:
predict_kwargs["temperature"] = 0.2
predict_kwargs["max_new_tokens"] = float(512)
continue
raise RuntimeError(f"LLM call failed: {e}")
raise RuntimeError(f"LLM call ultimately failed: {last_exc}")
# -----------------------
# API endpoints
# -----------------------
@app.get("/")
async def read_root():
return {"message": "Elderly HealthWatch AI Backend"}
@app.get("/health")
async def health_check():
impl = None
if mtcnn is None:
impl = "none"
elif isinstance(mtcnn, dict) and mtcnn.get("impl") == "opencv":
impl = "opencv_haar_fallback"
else:
impl = _MTCNN_IMPL
return {
"status": "healthy",
"detector": impl,
"vlm_available": GRADIO_AVAILABLE,
"vlm_space": GRADIO_VLM_SPACE,
"llm_space": LLM_GRADIO_SPACE
}
@app.post("/api/v1/validate-eye-photo")
async def validate_eye_photo(image: UploadFile = File(...)):
if mtcnn is None:
raise HTTPException(status_code=500, detail="No face detector available in this deployment.")
try:
content = await image.read()
if not content:
raise HTTPException(status_code=400, detail="Empty file uploaded.")
pil_img = load_image_from_bytes(content)
img_arr = np.asarray(pil_img) # RGB
if not isinstance(mtcnn, dict) and _MTCNN_IMPL == "facenet_pytorch":
try:
boxes, probs, landmarks = mtcnn.detect(pil_img, landmarks=True)
if boxes is None or len(boxes) == 0:
return {"valid": False, "face_detected": False, "eye_openness_score": 0.0,
"message_english": "No face detected. Please ensure your face is clearly visible in the frame.",
"message_hindi": "कोई चेहरा नहीं मिला। कृपया सुनिश्चित करें कि आपका चेहरा फ्रेम में स्पष्ट रूप से दिखाई दे रहा है।"}
prob = float(probs[0]) if probs is not None else 0.0
lm = landmarks[0] if landmarks is not None else None
if lm is not None and len(lm) >= 2:
left_eye = {"x": float(lm[0][0]), "y": float(lm[0][1])}
right_eye = {"x": float(lm[1][0]), "y": float(lm[1][1])}
else:
left_eye = right_eye = None
eye_openness_score = estimate_eye_openness_from_detection(prob)
is_valid = eye_openness_score >= 0.3
return {"valid": bool(is_valid), "face_detected": True, "eye_openness_score": round(eye_openness_score, 2),
"message_english": "Photo looks good! Eyes are properly open." if is_valid else "Eyes appear to be closed or partially closed. Please open your eyes wide and try again.",
"message_hindi": "फोटो अच्छी है! आंखें ठीक से खुली हैं।" if is_valid else "आंखें बंद या आंशिक रूप से बंद दिखाई दे रही हैं। कृपया अपनी आंखें चौड़ी खोलें और पुनः प्रयास करें।",
"eye_landmarks": {"left_eye": left_eye, "right_eye": right_eye}}
except Exception:
traceback.print_exc()
raise HTTPException(status_code=500, detail="Face detector failed during inference.")
if not isinstance(mtcnn, dict) and _MTCNN_IMPL == "mtcnn":
try:
detections = mtcnn.detect_faces(img_arr)
except Exception:
detections = mtcnn.detect_faces(pil_img)
if not detections:
return {"valid": False, "face_detected": False, "eye_openness_score": 0.0,
"message_english": "No face detected. Please ensure your face is clearly visible in the frame.",
"message_hindi": "कोई चेहरा नहीं मिला। कृपया सुनिश्चित करें कि आपका चेहरा फ्रेम में स्पष्ट रूप से दिखाई दे रहा है।"}
face = detections[0]
keypoints = face.get("keypoints", {})
left_eye = keypoints.get("left_eye")
right_eye = keypoints.get("right_eye")
confidence = float(face.get("confidence", 0.0))
eye_openness_score = estimate_eye_openness_from_detection(confidence)
is_valid = eye_openness_score >= 0.3
return {"valid": bool(is_valid), "face_detected": True, "eye_openness_score": round(eye_openness_score, 2),
"message_english": "Photo looks good! Eyes are properly open." if is_valid else "Eyes appear to be closed or partially closed. Please open your eyes wide and try again.",
"message_hindi": "फोटो अच्छी है! आंखें ठीक से खुली हैं।" if is_valid else "आंखें बंद या आंशिक रूप से बंद दिखाई दे रही हैं। कृपया अपनी आंखें चौड़ी खोलें और पुनः प्रयास करें।",
"eye_landmarks": {"left_eye": left_eye, "right_eye": right_eye}}
if isinstance(mtcnn, dict) and mtcnn.get("impl") == "opencv":
try:
gray = cv2.cvtColor(img_arr, cv2.COLOR_RGB2GRAY)
face_cascade = mtcnn["face_cascade"]
eye_cascade = mtcnn["eye_cascade"]
faces = face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=4, minSize=(60, 60))
if len(faces) == 0:
return {"valid": False, "face_detected": False, "eye_openness_score": 0.0,
"message_english": "No face detected. Please ensure your face is clearly visible in the frame.",
"message_hindi": "कोई चेहरा नहीं मिला। कृपया सुनिश्चित करें कि आपका चेहरा फ्रेम में स्पष्ट रूप से दिखाई दे रहा है।"}
(x, y, w, h) = faces[0]
roi_gray = gray[y:y+h, x:x+w]
eyes = eye_cascade.detectMultiScale(roi_gray, scaleFactor=1.1, minNeighbors=5, minSize=(20, 10))
eye_openness_score = 1.0 if len(eyes) >= 1 else 0.0
is_valid = eye_openness_score >= 0.3
left_eye = None
right_eye = None
if len(eyes) >= 1:
ex, ey, ew, eh = eyes[0]
cx = float(x + ex + ew/2)
cy = float(y + ey + eh/2)
left_eye = {"x": cx, "y": cy}
return {"valid": bool(is_valid), "face_detected": True, "eye_openness_score": round(eye_openness_score, 2),
"message_english": "Photo looks good! Eyes are detected." if is_valid else "Eyes not detected. Please open your eyes wide and try again.",
"message_hindi": "फोटो अच्छी है! आंखें मिलीं।" if is_valid else "आंखें नहीं मिलीं। कृपया अपनी आंखें चौड़ी खोलें और पुनः प्रयास करें।",
"eye_landmarks": {"left_eye": left_eye, "right_eye": right_eye}}
except Exception:
traceback.print_exc()
raise HTTPException(status_code=500, detail="OpenCV fallback detector failed.")
raise HTTPException(status_code=500, detail="Invalid detector configuration.")
except HTTPException:
raise
except Exception as e:
traceback.print_exc()
return {"valid": False, "face_detected": False, "eye_openness_score": 0.0,
"message_english": "Error processing image. Please try again.",
"message_hindi": "छवि प्रोसेस करने में त्रुटि। कृपया पुनः प्रयास करें।",
"error": str(e)}
@app.post("/api/v1/upload")
async def upload_images(
background_tasks: BackgroundTasks,
face_image: UploadFile = File(...),
eye_image: UploadFile = File(...)
):
"""
Save images and enqueue background processing. VLM -> LLM runs inside process_screening.
"""
try:
screening_id = str(uuid.uuid4())
now = datetime.utcnow().isoformat() + "Z"
tmp_dir = "/tmp/elderly_healthwatch"
os.makedirs(tmp_dir, exist_ok=True)
face_path = os.path.join(tmp_dir, f"{screening_id}_face.jpg")
eye_path = os.path.join(tmp_dir, f"{screening_id}_eye.jpg")
face_bytes = await face_image.read()
eye_bytes = await eye_image.read()
with open(face_path, "wb") as f:
f.write(face_bytes)
with open(eye_path, "wb") as f:
f.write(eye_bytes)
screenings_db[screening_id] = {
"id": screening_id,
"timestamp": now,
"face_image_path": face_path,
"eye_image_path": eye_path,
"status": "queued",
"quality_metrics": {},
"ai_results": {},
"disease_predictions": [],
"recommendations": {}
}
background_tasks.add_task(process_screening, screening_id)
return {"screening_id": screening_id}
except Exception as e:
traceback.print_exc()
raise HTTPException(status_code=500, detail=f"Failed to upload images: {e}")
@app.post("/api/v1/analyze/{screening_id}")
async def analyze_screening(screening_id: str, background_tasks: BackgroundTasks):
if screening_id not in screenings_db:
raise HTTPException(status_code=404, detail="Screening not found")
if screenings_db[screening_id].get("status") == "processing":
return {"message": "Already processing"}
screenings_db[screening_id]["status"] = "queued"
background_tasks.add_task(process_screening, screening_id)
return {"message": "Analysis enqueued"}
@app.get("/api/v1/status/{screening_id}")
async def get_status(screening_id: str):
if screening_id not in screenings_db:
raise HTTPException(status_code=404, detail="Screening not found")
status = screenings_db[screening_id].get("status", "unknown")
progress = 50 if status == "processing" else (100 if status == "completed" else 0)
return {"screening_id": screening_id, "status": status, "progress": progress}
@app.get("/api/v1/results/{screening_id}")
async def get_results(screening_id: str):
if screening_id not in screenings_db:
raise HTTPException(status_code=404, detail="Screening not found")
# Ensure vlm_raw is always present in ai_results for debugging
entry = screenings_db[screening_id]
entry.setdefault("ai_results", {})
entry["ai_results"].setdefault("vlm_raw", entry.get("ai_results", {}).get("vlm_raw", ""))
return entry
@app.get("/api/v1/history/{user_id}")
async def get_history(user_id: str):
history = [s for s in screenings_db.values() if s.get("user_id") == user_id]
return {"screenings": history}
# -----------------------
# Immediate VLM -> LLM routes (return vitals in one call)
# -----------------------
@app.post("/api/v1/get-vitals")
async def get_vitals_from_upload(
face_image: UploadFile = File(...),
eye_image: UploadFile = File(...)
):
"""
Run VLM -> LLM pipeline synchronously (but off the event loop) and return:
{ vlm_parsed_features, vlm_raw_output, llm_structured_risk }
"""
if not GRADIO_AVAILABLE:
raise HTTPException(status_code=500, detail="VLM/LLM client not available in this deployment.")
# save files to a temp directory
try:
tmp_dir = "/tmp/elderly_healthwatch"
os.makedirs(tmp_dir, exist_ok=True)
uid = str(uuid.uuid4())
face_path = os.path.join(tmp_dir, f"{uid}_face.jpg")
eye_path = os.path.join(tmp_dir, f"{uid}_eye.jpg")
face_bytes = await face_image.read()
eye_bytes = await eye_image.read()
with open(face_path, "wb") as f:
f.write(face_bytes)
with open(eye_path, "wb") as f:
f.write(eye_bytes)
except Exception as e:
logger.exception("Failed saving uploaded images")
raise HTTPException(status_code=500, detail=f"Failed saving images: {e}")
try:
# Run VLM (off the event loop)
vlm_features, vlm_raw = await asyncio.to_thread(run_vlm_and_get_features, face_path, eye_path)
# Log VLM outputs (already logged inside run_vlm..., but additional context)
logger.info("get_vitals_from_upload - VLM raw (snippet): %s", (vlm_raw[:500] + "...") if vlm_raw else "<EMPTY>")
logger.info("get_vitals_from_upload - VLM parsed features: %s", json.dumps(vlm_features, indent=2, ensure_ascii=False) if vlm_features else "None")
# Decide what to feed to LLM: prefer cleaned JSON if available, else raw VLM string
if vlm_features:
llm_input = json.dumps(vlm_features, ensure_ascii=False)
logger.info("Feeding CLEANED VLM JSON to LLM (len=%d).", len(llm_input))
else:
llm_input = vlm_raw if vlm_raw and vlm_raw.strip() else "{}"
logger.info("Feeding RAW VLM STRING to LLM (len=%d).", len(llm_input))
# Run LLM (off the event loop)
structured_risk = await asyncio.to_thread(run_llm_on_vlm, llm_input)
# Return merged result (includes raw VLM output for debugging)
return {
"vlm_raw_output": vlm_raw,
"vlm_parsed_features": vlm_features,
"llm_structured_risk": structured_risk
}
except Exception as e:
logger.exception("get_vitals_from_upload pipeline failed")
raise HTTPException(status_code=500, detail=f"Pipeline failed: {e}")
@app.post("/api/v1/get-vitals/{screening_id}")
async def get_vitals_for_screening(screening_id: str):
"""
Re-run VLM->LLM on images already stored for `screening_id` in screenings_db.
Useful for re-processing or debugging.
"""
if screening_id not in screenings_db:
raise HTTPException(status_code=404, detail="Screening not found")
entry = screenings_db[screening_id]
face_path = entry.get("face_image_path")
eye_path = entry.get("eye_image_path")
if not (face_path and os.path.exists(face_path) and eye_path and os.path.exists(eye_path)):
raise HTTPException(status_code=400, detail="Stored images missing for this screening")
try:
# Run VLM off the event loop
vlm_features, vlm_raw = await asyncio.to_thread(run_vlm_and_get_features, face_path, eye_path)
logger.info("get_vitals_for_screening(%s) - VLM raw (snippet): %s", screening_id, (vlm_raw[:500] + "...") if vlm_raw else "<EMPTY>")
logger.info("get_vitals_for_screening(%s) - VLM parsed features: %s", screening_id, json.dumps(vlm_features, indent=2, ensure_ascii=False) if vlm_features else "None")
if vlm_features:
llm_input = json.dumps(vlm_features, ensure_ascii=False)
logger.info("Feeding CLEANED VLM JSON to LLM (len=%d).", len(llm_input))
else:
llm_input = vlm_raw if vlm_raw and vlm_raw.strip() else "{}"
logger.info("Feeding RAW VLM STRING to LLM (len=%d).", len(llm_input))
structured_risk = await asyncio.to_thread(run_llm_on_vlm, llm_input)
# Optionally store this run's outputs back into the DB for inspection
entry.setdefault("ai_results", {})
entry["ai_results"].update({
"vlm_parsed_features": vlm_features,
"vlm_raw": vlm_raw,
"structured_risk": structured_risk,
"last_vitals_run": datetime.utcnow().isoformat() + "Z"
})
return {
"screening_id": screening_id,
"vlm_raw_output": vlm_raw,
"vlm_parsed_features": vlm_features,
"llm_structured_risk": structured_risk
}
except Exception as e:
logger.exception("get_vitals_for_screening pipeline failed")
raise HTTPException(status_code=500, detail=f"Pipeline failed: {e}")
# -----------------------
# Main background pipeline (upload -> process_screening)
# -----------------------
async def process_screening(screening_id: str):
"""
Main pipeline:
- load images
- quick detector-based quality metrics
- run VLM -> vlm_features (dict or None) + vlm_raw (string)
- run LLM on vlm_features (preferred) or vlm_raw -> structured risk JSON
- merge results into ai_results and finish
"""
try:
if screening_id not in screenings_db:
logger.error("[process_screening] screening %s not found", screening_id)
return
screenings_db[screening_id]["status"] = "processing"
logger.info("[process_screening] Starting %s", screening_id)
entry = screenings_db[screening_id]
face_path = entry.get("face_image_path")
eye_path = entry.get("eye_image_path")
if not (face_path and os.path.exists(face_path)):
raise RuntimeError("Face image missing")
if not (eye_path and os.path.exists(eye_path)):
raise RuntimeError("Eye image missing")
face_img = Image.open(face_path).convert("RGB")
eye_img = Image.open(eye_path).convert("RGB")
# Basic detection + quality metrics (facenet/mtcnn/opencv)
face_detected = False
face_confidence = 0.0
left_eye_coord = right_eye_coord = None
if mtcnn is not None and not isinstance(mtcnn, dict) and (_MTCNN_IMPL == "facenet_pytorch" or _MTCNN_IMPL == "mtcnn"):
try:
if _MTCNN_IMPL == "facenet_pytorch":
boxes, probs, landmarks = mtcnn.detect(face_img, landmarks=True)
if boxes is not None and len(boxes) > 0:
face_detected = True
face_confidence = float(probs[0]) if probs is not None else 0.0
if landmarks is not None:
lm = landmarks[0]
if len(lm) >= 2:
left_eye_coord = {"x": float(lm[0][0]), "y": float(lm[0][1])}
right_eye_coord = {"x": float(lm[1][0]), "y": float(lm[1][1])}
else:
arr = np.asarray(face_img)
detections = mtcnn.detect_faces(arr)
if detections:
face_detected = True
face_confidence = float(detections[0].get("confidence", 0.0))
k = detections[0].get("keypoints", {})
left_eye_coord = k.get("left_eye")
right_eye_coord = k.get("right_eye")
except Exception:
traceback.print_exc()
if isinstance(mtcnn, dict) and mtcnn.get("impl") == "opencv":
try:
arr = np.asarray(face_img)
gray = cv2.cvtColor(arr, cv2.COLOR_RGB2GRAY)
face_cascade = mtcnn["face_cascade"]
eye_cascade = mtcnn["eye_cascade"]
faces = face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=4, minSize=(60, 60))
if len(faces) > 0:
face_detected = True
(x, y, w, h) = faces[0]
face_confidence = min(1.0, (w*h) / (arr.shape[0]*arr.shape[1]) * 4.0)
roi_gray = gray[y:y+h, x:x+w]
eyes = eye_cascade.detectMultiScale(roi_gray, scaleFactor=1.1, minNeighbors=5, minSize=(20, 10))
if len(eyes) >= 1:
ex, ey, ew, eh = eyes[0]
left_eye_coord = {"x": float(x + ex + ew/2), "y": float(y + ey + eh/2)}
except Exception:
traceback.print_exc()
face_quality_score = 0.85 if face_detected and face_confidence > 0.6 else 0.45
quality_metrics = {
"face_detected": face_detected,
"face_confidence": round(face_confidence, 3),
"face_quality_score": round(face_quality_score, 2),
"eye_coords": {"left_eye": left_eye_coord, "right_eye": right_eye_coord},
"face_brightness": int(np.mean(np.asarray(face_img.convert("L")))),
"face_blur_estimate": int(np.var(np.asarray(face_img.convert("L"))))
}
screenings_db[screening_id]["quality_metrics"] = quality_metrics
# --------------------------
# RUN VLM -> get vlm_features + vlm_raw
# --------------------------
vlm_features = None
vlm_raw = None
try:
vlm_features, vlm_raw = run_vlm_and_get_features(face_path, eye_path)
screenings_db[screening_id].setdefault("ai_results", {})
screenings_db[screening_id]["ai_results"].update({
"vlm_parsed_features": vlm_features,
"vlm_raw": vlm_raw
})
except Exception as e:
logger.exception("VLM feature extraction failed")
screenings_db[screening_id].setdefault("ai_results", {})
screenings_db[screening_id]["ai_results"].update({"vlm_error": str(e)})
vlm_features = None
vlm_raw = ""
# Log VLM outputs in pipeline context
logger.info("process_screening(%s) - VLM raw (snippet): %s", screening_id, (vlm_raw[:500] + "...") if vlm_raw else "<EMPTY>")
logger.info("process_screening(%s) - VLM parsed features: %s", screening_id, json.dumps(vlm_features, indent=2, ensure_ascii=False) if vlm_features else "None")
# --------------------------
# RUN LLM on vlm_parsed (preferred) or vlm_raw -> structured risk JSON
# --------------------------
structured_risk = None
try:
if vlm_features:
# prefer cleaned JSON
llm_input = json.dumps(vlm_features, ensure_ascii=False)
else:
# fallback to raw string (may be empty)
llm_input = vlm_raw if vlm_raw and vlm_raw.strip() else "{}"
structured_risk = run_llm_on_vlm(llm_input)
screenings_db[screening_id].setdefault("ai_results", {})
screenings_db[screening_id]["ai_results"].update({"structured_risk": structured_risk})
except Exception as e:
logger.exception("LLM processing failed")
screenings_db[screening_id].setdefault("ai_results", {})
screenings_db[screening_id]["ai_results"].update({"llm_error": str(e)})
structured_risk = {
"risk_score": 0.0,
"jaundice_probability": 0.0,
"anemia_probability": 0.0,
"hydration_issue_probability": 0.0,
"neurological_issue_probability": 0.0,
"summary": "",
"recommendation": "",
"confidence": 0.0
}
# Use structured_risk for summary recommendations & simple disease inference placeholders
screenings_db[screening_id].setdefault("ai_results", {})
screenings_db[screening_id]["ai_results"].update({
"processing_time_ms": 1200
})
disease_predictions = [
{
"condition": "Anemia-like-signs",
"risk_level": "Medium" if structured_risk.get("anemia_probability", 0.0) > 0.5 else "Low",
"probability": structured_risk.get("anemia_probability", 0.0),
"confidence": structured_risk.get("confidence", 0.0)
},
{
"condition": "Jaundice-like-signs",
"risk_level": "Medium" if structured_risk.get("jaundice_probability", 0.0) > 0.5 else "Low",
"probability": structured_risk.get("jaundice_probability", 0.0),
"confidence": structured_risk.get("confidence", 0.0)
}
]
recommendations = {
"action_needed": "consult" if structured_risk.get("risk_score", 0.0) > 30.0 else "monitor",
"message_english": structured_risk.get("recommendation", "") or f"Please follow up with a health professional if concerns persist.",
"message_hindi": ""
}
screenings_db[screening_id].update({
"status": "completed",
"disease_predictions": disease_predictions,
"recommendations": recommendations
})
logger.info("[process_screening] Completed %s", screening_id)
except Exception as e:
traceback.print_exc()
if screening_id in screenings_db:
screenings_db[screening_id]["status"] = "failed"
screenings_db[screening_id]["error"] = str(e)
else:
logger.error("[process_screening] Failed for unknown screening %s: %s", screening_id, str(e))
# -----------------------
# Run server (for local debugging)
# -----------------------
if __name__ == "__main__":
import uvicorn
uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=False)