AI-Pathlab / app.py
SuriRaja's picture
Update app.py
91afe9a verified
import streamlit as st
import numpy as np
import cv2
import os
import random
import tempfile
import joblib
from typing import Optional, Tuple, Dict, Any, List
# NEW: use components.html for the long HTML report
import streamlit.components.v1 as components
# (Optional) Face landmarks similar to earlier app
try:
import mediapipe as mp
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(static_image_mode=True, max_num_faces=1,
refine_landmarks=True, min_detection_confidence=0.5)
except Exception:
mp = None
face_mesh = None
# Live video capture (front-end) with WebRTC
from streamlit_webrtc import webrtc_streamer, WebRtcMode, RTCConfiguration
import av
from collections import deque
st.set_page_config(page_title="Face-based Lab Test AI Report", layout="wide")
# ==============================
# WebRTC config
# ==============================
RTC_CONFIGURATION = RTCConfiguration(
{"iceServers": [{"urls": ["stun:stun.l.google.com:19302"]}]}
)
# ==============================
# Utility & UI helpers (copied style)
# ==============================
def get_risk_color(value: float, normal_range: Tuple[float, float]) -> Tuple[str, str, str]:
low, high = normal_range
if value < low:
return ("Low", "πŸ”»", "#FFCCCC")
elif value > high:
return ("High", "πŸ”Ί", "#FFE680")
else:
return ("Normal", "βœ…", "#CCFFCC")
def build_table(title: str, rows: List[Tuple[str, float, Tuple[float, float]]]) -> str:
html = (
f'<div style="margin-bottom: 24px;">'
f'<h4 style="margin: 8px 0;">{title}</h4>'
f'<table style="width:100%; border-collapse:collapse;">'
f'<thead><tr style="background:#f0f0f0;">'
f'<th style="padding:8px;border:1px solid #ccc;">Test</th>'
f'<th style="padding:8px;border:1px solid #ccc;">Result</th>'
f'<th style="padding:8px;border:1px solid #ccc;">Expected Range</th>'
f'<th style="padding:8px;border:1px solid #ccc;">Level</th>'
f'</tr></thead><tbody>'
)
for label, value, ref in rows:
level, icon, bg = get_risk_color(value, ref)
html += (
f'<tr style="background:{bg};">'
f'<td style="padding:6px;border:1px solid #ccc;">{label}</td>'
f'<td style="padding:6px;border:1px solid #ccc;">{value:.2f}</td>'
f'<td style="padding:6px;border:1px solid #ccc;">{ref[0]} – {ref[1]}</td>'
f'<td style="padding:6px;border:1px solid #ccc;">{icon} {level}</td>'
f'</tr>'
)
html += '</tbody></table></div>'
return html
def extract_basic_rgb_features(image_rgb: np.ndarray) -> List[float]:
red_percent = 100 * float(np.mean(image_rgb[:, :, 0])) / 255.0
green_percent = 100 * float(np.mean(image_rgb[:, :, 1])) / 255.0
blue_percent = 100 * float(np.mean(image_rgb[:, :, 2])) / 255.0
return [red_percent, green_percent, blue_percent]
def blur_for_display(img_rgb: np.ndarray, k: int = 35) -> np.ndarray:
k = max(3, k | 1) # odd kernel
return cv2.GaussianBlur(img_rgb, (k, k), 0)
# ==============================
# Model loading (guarded)
# ==============================
def safe_load_joblib(path: str):
try:
if os.path.exists(path):
return joblib.load(path)
except Exception:
pass
return None
hemoglobin_model = safe_load_joblib("hemoglobin_model_from_anemia_dataset.pkl")
hr_model = safe_load_joblib("heart_rate_model.pkl")
spo2_model = safe_load_joblib("spo2_model_simulated.pkl")
hemoglobin_r2 = 0.385 # as shown in your previous UI
# ==============================
# Placeholder models
# ==============================
def random_model_predict(low: float, high: float) -> float:
return random.uniform(low, high)
# ==============================
# Image path (analyze image once, show only final blurred frame)
# ==============================
def analyze_face(image_bgr: Optional[np.ndarray]) -> Tuple[str, Optional[np.ndarray]]:
if image_bgr is None:
return "<div style='color:red;'>⚠️ Error: No image provided.</div>", None
frame_rgb = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2RGB)
if face_mesh is not None and mp is not None:
result = face_mesh.process(frame_rgb)
if not result.multi_face_landmarks:
return "<div style='color:red;'>⚠️ Error: Face not detected.</div>", None
features = extract_basic_rgb_features(frame_rgb)
if hemoglobin_model is not None:
try:
hb_pred = float(hemoglobin_model.predict([features])[0])
except Exception:
hb_pred = float(random_model_predict(12.5, 15.0))
else:
hb_pred = float(random_model_predict(12.5, 15.0))
heart_rate = float(random_model_predict(60, 100))
spo2 = float(random_model_predict(95, 100))
rr = int(12 + abs(int(heart_rate) % 5 - 2))
test_values: Dict[str, Any] = {
"Hemoglobin": hb_pred,
"WBC Count": random_model_predict(4.0, 11.0),
"Platelet Count": random_model_predict(150, 450),
"Iron": random_model_predict(60, 170),
"Ferritin": random_model_predict(30, 300),
"TIBC": random_model_predict(250, 400),
"Bilirubin": random_model_predict(0.3, 1.2),
"Creatinine": random_model_predict(0.6, 1.2),
"Urea": random_model_predict(7, 20),
"Sodium": random_model_predict(135, 145),
"Potassium": random_model_predict(3.5, 5.1),
"TSH": random_model_predict(0.4, 4.0),
"Cortisol": random_model_predict(5, 25),
"FBS": random_model_predict(70, 110),
"HbA1c": random_model_predict(4.0, 5.7),
"Albumin": random_model_predict(3.5, 5.5),
"BP Systolic": random_model_predict(90, 120),
"BP Diastolic": random_model_predict(60, 80),
"Temperature": random_model_predict(97, 99),
}
html_output = "".join([
f'<div style="font-size:14px;color:#888;margin-bottom:10px;">Hemoglobin RΒ² Score: {hemoglobin_r2:.2f}</div>',
build_table("🩸 Hematology", [
("Hemoglobin", test_values["Hemoglobin"], (13.5, 17.5)),
("WBC Count", test_values["WBC Count"], (4.0, 11.0)),
("Platelet Count", test_values["Platelet Count"], (150, 450)),
]),
build_table("🧬 Iron Panel", [
("Iron", test_values["Iron"], (60, 170)),
("Ferritin", test_values["Ferritin"], (30, 300)),
("TIBC", test_values["TIBC"], (250, 400)),
]),
build_table("🧬 Liver & Kidney", [
("Bilirubin", test_values["Bilirubin"], (0.3, 1.2)),
("Creatinine", test_values["Creatinine"], (0.6, 1.2)),
("Urea", test_values["Urea"], (7, 20)),
]),
build_table("πŸ§ͺ Electrolytes", [
("Sodium", test_values["Sodium"], (135, 145)),
("Potassium", test_values["Potassium"], (3.5, 5.1)),
]),
build_table("🧁 Metabolic & Thyroid", [
("FBS", test_values["FBS"], (70, 110)),
("HbA1c", test_values["HbA1c"], (4.0, 5.7)),
("TSH", test_values["TSH"], (0.4, 4.0)),
]),
build_table("❀️ Vitals", [
("SpO2", spo2, (95, 100)),
("Heart Rate", heart_rate, (60, 100)),
("Respiratory Rate", rr, (12, 20)),
("Temperature", test_values["Temperature"], (97, 99)),
("BP Systolic", test_values["BP Systolic"], (90, 120)),
("BP Diastolic", test_values["BP Diastolic"], (60, 80)),
]),
build_table("🩹 Other Indicators", [
("Cortisol", test_values["Cortisol"], (5, 25)),
("Albumin", test_values["Albumin"], (3.5, 5.5)),
])
])
summary = "<div style='margin-top:20px;padding:12px;border:1px dashed #999;background:#fcfcfc;'>"
summary += "<h4>πŸ“ Summary for You</h4><ul>"
if hb_pred < 13.5:
summary += "<li>Your hemoglobin is a bit low β€” this could mean mild anemia.</li>"
if test_values["Iron"] < 60 or test_values["Ferritin"] < 30:
summary += "<li>Low iron storage detected β€” consider an iron profile test.</li>"
if test_values["Bilirubin"] > 1.2:
summary += "<li>Elevated bilirubin β€” possible jaundice. Recommend LFT.</li>"
if test_values["HbA1c"] > 5.7:
summary += "<li>High HbA1c β€” prediabetes indication. Recommend glucose check.</li>"
if spo2 < 95:
summary += "<li>Low SpOβ‚‚ β€” suggest retesting with a pulse oximeter.</li>"
summary += "</ul><p><strong>πŸ’‘ Tip:</strong> This is an AI-based estimate. Please follow up with a lab.</p></div>"
html_output += summary
html_output += "<br><div style='margin-top:20px;padding:12px;border:2px solid #2d87f0;background:#f2faff;text-align:center;border-radius:8px;'>"
html_output += "<h4>πŸ“ž Book a Lab Test</h4><p>Prefer confirmation? Find certified labs near you.</p>"
html_output += "<button style='padding:10px 20px;background:#007BFF;color:#fff;border:none;border-radius:5px;cursor:pointer;'>Find Labs Near Me</button></div>"
# Return the single final frame (blurred for display only)
return html_output, blur_for_display(frame_rgb)
# ==============================
# Video path (assemble once; show only final blurred keyframe)
# ==============================
def analyze_video(video_path: Optional[str]) -> Tuple[str, Optional[np.ndarray]]:
if not video_path or not os.path.exists(video_path):
return "<div style='color:red;'>⚠️ Face video missing or unreadable.</div>", None
cap = cv2.VideoCapture(video_path)
frame_sample = None
frames = 0
while True:
ret, frame = cap.read()
if not ret:
break
if frame_sample is None:
frame_sample = frame.copy()
frames += 1
cap.release()
if frame_sample is None or frames < 30:
return "<div style='color:red;'>⚠️ Video too short; please record ~20–30s.</div>", None
# Placeholder HR/SpO2 (replace with proper rPPG)
try:
hr_value = float(random_model_predict(65, 85))
gray = cv2.cvtColor(frame_sample, cv2.COLOR_BGR2GRAY)
brightness_var = float(np.std(gray) / 255.0)
skin_tone_index = float(np.mean(frame_sample[100:150, 100:150]) / 255.0) if frame_sample[100:150, 100:150].size else 0.5
spo2 = float(98.0 - (1.5 * brightness_var - 0.5 * (skin_tone_index - 0.5)))
spo2 = float(np.clip(spo2, 92.0, 100.0))
except Exception:
hr_value = float(random_model_predict(65, 85))
spo2 = float(random_model_predict(95, 100))
rr = int(12 + abs(int(hr_value) % 5 - 2))
test_values: Dict[str, Any] = {
"Hemoglobin": random_model_predict(12.5, 15.0),
"WBC Count": random_model_predict(4.0, 11.0),
"Platelet Count": random_model_predict(150, 450),
"Iron": random_model_predict(60, 170),
"Ferritin": random_model_predict(30, 300),
"TIBC": random_model_predict(250, 400),
"Bilirubin": random_model_predict(0.3, 1.2),
"Creatinine": random_model_predict(0.6, 1.2),
"Urea": random_model_predict(7, 20),
"Sodium": random_model_predict(135, 145),
"Potassium": random_model_predict(3.5, 5.1),
"TSH": random_model_predict(0.4, 4.0),
"Cortisol": random_model_predict(5, 25),
"FBS": random_model_predict(70, 110),
"HbA1c": random_model_predict(4.0, 5.7),
"Albumin": random_model_predict(3.5, 5.5),
"BP Systolic": random_model_predict(90, 120),
"BP Diastolic": random_model_predict(60, 80),
"Temperature": random_model_predict(97, 99),
}
frame_rgb = cv2.cvtColor(frame_sample, cv2.COLOR_BGR2RGB)
html_output = "".join([
f'<div style="font-size:14px;color:#888;margin-bottom:10px;">Hemoglobin RΒ² Score: {hemoglobin_r2:.2f}</div>',
build_table("🩸 Hematology", [
("Hemoglobin", test_values["Hemoglobin"], (13.5, 17.5)),
("WBC Count", test_values["WBC Count"], (4.0, 11.0)),
("Platelet Count", test_values["Platelet Count"], (150, 450)),
]),
build_table("🧬 Iron Panel", [
("Iron", test_values["Iron"], (60, 170)),
("Ferritin", test_values["Ferritin"], (30, 300)),
("TIBC", test_values["TIBC"], (250, 400)),
]),
build_table("🧬 Liver & Kidney", [
("Bilirubin", test_values["Bilirubin"], (0.3, 1.2)),
("Creatinine", test_values["Creatinine"], (0.6, 1.2)),
("Urea", test_values["Urea"], (7, 20)),
]),
build_table("πŸ§ͺ Electrolytes", [
("Sodium", test_values["Sodium"], (135, 145)),
("Potassium", test_values["Potassium"], (3.5, 5.1)),
]),
build_table("🧁 Metabolic & Thyroid", [
("FBS", test_values["FBS"], (70, 110)),
("HbA1c", test_values["HbA1c"], (4.0, 5.7)),
("TSH", test_values["TSH"], (0.4, 4.0)),
]),
build_table("❀️ Vitals", [
("SpO2", spo2, (95, 100)),
("Heart Rate", hr_value, (60, 100)),
("Respiratory Rate", rr, (12, 20)),
("Temperature", test_values["Temperature"], (97, 99)),
("BP Systolic", test_values["BP Systolic"], (90, 120)),
("BP Diastolic", test_values["BP Diastolic"], (60, 80)),
]),
build_table("🩹 Other Indicators", [
("Cortisol", test_values["Cortisol"], (5, 25)),
("Albumin", test_values["Albumin"], (3.5, 5.5)),
])
])
summary = "<div style='margin-top:20px;padding:12px;border:1px dashed #999;background:#fcfcfc;'>"
summary += "<h4>πŸ“ Summary for You</h4><ul>"
if test_values["Hemoglobin"] < 13.5:
summary += "<li>Your hemoglobin is a bit low β€” this could mean mild anemia.</li>"
if test_values["Iron"] < 60 or test_values["Ferritin"] < 30:
summary += "<li>Low iron storage detected β€” consider an iron profile test.</li>"
if test_values["Bilirubin"] > 1.2:
summary += "<li>Elevated bilirubin β€” possible jaundice. Recommend LFT.</li>"
if test_values["HbA1c"] > 5.7:
summary += "<li>High HbA1c β€” prediabetes indication. Recommend glucose check.</li>"
if spo2 < 95:
summary += "<li>Low SpOβ‚‚ β€” suggest retesting with a pulse oximeter.</li>"
summary += "</ul><p><strong>πŸ’‘ Tip:</strong> This is an AI-based estimate. Please follow up with a lab.</p></div>"
html_output += summary
html_output += "<br><div style='margin-top:20px;padding:12px;border:2px solid #2d87f0;background:#f2faff;text-align:center;border-radius:8px;'>"
html_output += "<h4>πŸ“ž Book a Lab Test</h4><p>Prefer confirmation? Find certified labs near you.</p>"
html_output += "<button style='padding:10px 20px;background:#007BFF;color:#fff;border:none;border-radius:5px;cursor:pointer;'>Find Labs Near Me</button></div>"
return html_output, blur_for_display(frame_rgb)
# ==============================
# WebRTC frame collector (no progress timer to avoid constant re-renders)
# ==============================
class HRCollectorVideoProcessor:
def __init__(self):
self.frames = deque(maxlen=30 * 90) # up to ~90s @30fps
self.recording = False
def recv(self, frame: av.VideoFrame) -> av.VideoFrame:
img = frame.to_ndarray(format="bgr24")
if self.recording:
self.frames.append(img)
return av.VideoFrame.from_ndarray(img, format="bgr24")
def start(self):
self.recording = True
self.frames.clear()
def stop_and_dump_to_file(self) -> Optional[str]:
self.recording = False
if len(self.frames) < 30: # ~1s
return None
h, w = self.frames[0].shape[:2]
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4")
tmp_path = tmp.name
tmp.close()
out = cv2.VideoWriter(tmp_path, fourcc, 30.0, (w, h))
for f in self.frames:
out.write(f)
out.release()
return tmp_path
# ==============================
# Streamlit UI
# ==============================
st.markdown("""
# 🧠 Face-Based Lab Test AI Report
Use **Image** (Hb) or **Video** (HR). Only a single **blurred preview** is shown **after** analysis to avoid flicker.
""")
# Session state to hold captured assets between form submits
if "captured_image_bgr" not in st.session_state:
st.session_state.captured_image_bgr = None
if "captured_video_path" not in st.session_state:
st.session_state.captured_video_path = None
# NEW: buffer raw camera bytes in case user forgets to click "Save Image"
if "img_bytes_buffer" not in st.session_state:
st.session_state.img_bytes_buffer = None
mode = st.radio("Choose Input Mode", ["Image", "Video"], horizontal=True)
col_left, col_right = st.columns([1.2, 1], gap="large")
with col_left:
if mode == "Image":
st.subheader("πŸ“Έ Face / Eye Image")
# Wrap inputs in a form so widget changes don't re-run the app
with st.form("img_form", clear_on_submit=False):
img_source = st.radio("Source", ["Camera", "Upload"], horizontal=True, key="img_source")
uploaded_img = None
if img_source == "Camera":
# KEY ADDED + auto-buffer captured bytes
cam_img = st.camera_input("Capture image", key="camera_img")
if cam_img is not None:
st.session_state.img_bytes_buffer = cam_img.getvalue()
else:
file_up = st.file_uploader("Upload JPG/PNG", type=["jpg", "jpeg", "png"])
if file_up is not None:
uploaded_img = file_up.read()
submitted_img = st.form_submit_button("Save Image")
if submitted_img:
# Prefer explicit upload; else fall back to auto-buffered camera bytes
raw_bytes = uploaded_img if uploaded_img else st.session_state.img_bytes_buffer
if raw_bytes:
arr = np.frombuffer(raw_bytes, dtype=np.uint8)
img_bgr = cv2.imdecode(arr, cv2.IMREAD_COLOR)
if img_bgr is not None:
st.session_state.captured_image_bgr = img_bgr
st.success("Image saved. Now click **Analyze Image** below.")
else:
st.warning("Could not decode the image. Please recapture/upload.")
else:
st.warning("No image provided.")
analyze_image = st.button("πŸ” Analyze Image", type="primary", use_container_width=True)
else:
st.subheader("πŸ“½ Face Video")
with st.form("vid_form", clear_on_submit=False):
vid_source = st.radio("Source", ["Camera (Live)", "Upload"], horizontal=True, key="vid_source")
temp_video_path = None
if vid_source == "Upload":
up_vid = st.file_uploader("Upload MP4/AVI/MOV", type=["mp4", "avi", "mov"])
if up_vid is not None:
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmpf:
tmpf.write(up_vid.read())
temp_video_path = tmpf.name
else:
st.write("Start β†’ record ~20–30s β†’ Stop & Use")
if "webrtc_ctx" not in st.session_state:
st.session_state.webrtc_ctx = None
st.session_state.hr_processor = HRCollectorVideoProcessor()
ctx = webrtc_streamer(
key="hr-webrtc",
mode=WebRtcMode.SENDRECV,
rtc_configuration=RTC_CONFIGURATION,
media_stream_constraints={"video": True, "audio": False},
video_processor_factory=lambda: st.session_state.hr_processor,
)
c1, c2 = st.columns(2)
with c1:
start_clicked = st.form_submit_button("Start Recording")
with c2:
stop_clicked = st.form_submit_button("Stop & Use")
if start_clicked and ctx.state.playing:
st.session_state.hr_processor.start()
st.info("Recording started...")
if stop_clicked:
dump_path = st.session_state.hr_processor.stop_and_dump_to_file()
if dump_path and os.path.exists(dump_path):
temp_video_path = dump_path
st.success("Video captured. Now click **Analyze Video** below.")
else:
st.warning("Captured video too short. Please record ~20–30 seconds.")
# Store selected/captured video after form submit
submitted_vid = st.form_submit_button("Save Video")
if submitted_vid:
if temp_video_path:
st.session_state.captured_video_path = temp_video_path
elif vid_source == "Upload":
st.warning("No video uploaded.")
else:
st.warning("No video captured yet.")
analyze_video_btn = st.button("πŸ” Analyze Video", type="primary", use_container_width=True)
with col_right:
st.subheader("πŸ”’ Blurred Preview (shown only after analysis)")
preview_placeholder = st.empty()
st.markdown("---")
# Single HTML report placeholder
report_placeholder = st.empty()
# Run analysis only when the explicit Analyze buttons are pressed
if mode == "Image" and 'analyze_image' in locals() and analyze_image:
# If user forgot "Save Image", try auto-buffer
if st.session_state.captured_image_bgr is None and st.session_state.img_bytes_buffer is not None:
arr = np.frombuffer(st.session_state.img_bytes_buffer, dtype=np.uint8)
img_bgr = cv2.imdecode(arr, cv2.IMREAD_COLOR)
if img_bgr is not None:
st.session_state.captured_image_bgr = img_bgr
html, frame_rgb_blurred = analyze_face(st.session_state.captured_image_bgr)
if frame_rgb_blurred is not None:
preview_placeholder.image(frame_rgb_blurred, caption="Blurred Image", use_container_width=True)
with report_placeholder:
components.html(html, height=1200, scrolling=True)
elif mode == "Video" and 'analyze_video_btn' in locals() and analyze_video_btn:
html, frame_rgb_blurred = analyze_video(st.session_state.captured_video_path)
if frame_rgb_blurred is not None:
preview_placeholder.image(frame_rgb_blurred, caption="Blurred Key Frame", use_container_width=True)
with report_placeholder:
components.html(html, height=1200, scrolling=True)