import streamlit as st import numpy as np import cv2 import os import random import tempfile import joblib from typing import Optional, Tuple, Dict, Any, List # NEW: use components.html for the long HTML report import streamlit.components.v1 as components # (Optional) Face landmarks similar to earlier app try: import mediapipe as mp mp_face_mesh = mp.solutions.face_mesh face_mesh = mp_face_mesh.FaceMesh(static_image_mode=True, max_num_faces=1, refine_landmarks=True, min_detection_confidence=0.5) except Exception: mp = None face_mesh = None # Live video capture (front-end) with WebRTC from streamlit_webrtc import webrtc_streamer, WebRtcMode, RTCConfiguration import av from collections import deque st.set_page_config(page_title="Face-based Lab Test AI Report", layout="wide") # ============================== # WebRTC config # ============================== RTC_CONFIGURATION = RTCConfiguration( {"iceServers": [{"urls": ["stun:stun.l.google.com:19302"]}]} ) # ============================== # Utility & UI helpers (copied style) # ============================== def get_risk_color(value: float, normal_range: Tuple[float, float]) -> Tuple[str, str, str]: low, high = normal_range if value < low: return ("Low", "๐Ÿ”ป", "#FFCCCC") elif value > high: return ("High", "๐Ÿ”บ", "#FFE680") else: return ("Normal", "โœ…", "#CCFFCC") def build_table(title: str, rows: List[Tuple[str, float, Tuple[float, float]]]) -> str: html = ( f'
' f'

{title}

' f'' f'' f'' f'' f'' f'' f'' ) for label, value, ref in rows: level, icon, bg = get_risk_color(value, ref) html += ( f'' f'' f'' f'' f'' f'' ) html += '
TestResultExpected RangeLevel
{label}{value:.2f}{ref[0]} โ€“ {ref[1]}{icon} {level}
' return html def extract_basic_rgb_features(image_rgb: np.ndarray) -> List[float]: red_percent = 100 * float(np.mean(image_rgb[:, :, 0])) / 255.0 green_percent = 100 * float(np.mean(image_rgb[:, :, 1])) / 255.0 blue_percent = 100 * float(np.mean(image_rgb[:, :, 2])) / 255.0 return [red_percent, green_percent, blue_percent] def blur_for_display(img_rgb: np.ndarray, k: int = 35) -> np.ndarray: k = max(3, k | 1) # odd kernel return cv2.GaussianBlur(img_rgb, (k, k), 0) # ============================== # Model loading (guarded) # ============================== def safe_load_joblib(path: str): try: if os.path.exists(path): return joblib.load(path) except Exception: pass return None hemoglobin_model = safe_load_joblib("hemoglobin_model_from_anemia_dataset.pkl") hr_model = safe_load_joblib("heart_rate_model.pkl") spo2_model = safe_load_joblib("spo2_model_simulated.pkl") hemoglobin_r2 = 0.385 # as shown in your previous UI # ============================== # Placeholder models # ============================== def random_model_predict(low: float, high: float) -> float: return random.uniform(low, high) # ============================== # Image path (analyze image once, show only final blurred frame) # ============================== def analyze_face(image_bgr: Optional[np.ndarray]) -> Tuple[str, Optional[np.ndarray]]: if image_bgr is None: return "
โš ๏ธ Error: No image provided.
", None frame_rgb = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2RGB) if face_mesh is not None and mp is not None: result = face_mesh.process(frame_rgb) if not result.multi_face_landmarks: return "
โš ๏ธ Error: Face not detected.
", None features = extract_basic_rgb_features(frame_rgb) if hemoglobin_model is not None: try: hb_pred = float(hemoglobin_model.predict([features])[0]) except Exception: hb_pred = float(random_model_predict(12.5, 15.0)) else: hb_pred = float(random_model_predict(12.5, 15.0)) heart_rate = float(random_model_predict(60, 100)) spo2 = float(random_model_predict(95, 100)) rr = int(12 + abs(int(heart_rate) % 5 - 2)) test_values: Dict[str, Any] = { "Hemoglobin": hb_pred, "WBC Count": random_model_predict(4.0, 11.0), "Platelet Count": random_model_predict(150, 450), "Iron": random_model_predict(60, 170), "Ferritin": random_model_predict(30, 300), "TIBC": random_model_predict(250, 400), "Bilirubin": random_model_predict(0.3, 1.2), "Creatinine": random_model_predict(0.6, 1.2), "Urea": random_model_predict(7, 20), "Sodium": random_model_predict(135, 145), "Potassium": random_model_predict(3.5, 5.1), "TSH": random_model_predict(0.4, 4.0), "Cortisol": random_model_predict(5, 25), "FBS": random_model_predict(70, 110), "HbA1c": random_model_predict(4.0, 5.7), "Albumin": random_model_predict(3.5, 5.5), "BP Systolic": random_model_predict(90, 120), "BP Diastolic": random_model_predict(60, 80), "Temperature": random_model_predict(97, 99), } html_output = "".join([ f'
Hemoglobin Rยฒ Score: {hemoglobin_r2:.2f}
', build_table("๐Ÿฉธ Hematology", [ ("Hemoglobin", test_values["Hemoglobin"], (13.5, 17.5)), ("WBC Count", test_values["WBC Count"], (4.0, 11.0)), ("Platelet Count", test_values["Platelet Count"], (150, 450)), ]), build_table("๐Ÿงฌ Iron Panel", [ ("Iron", test_values["Iron"], (60, 170)), ("Ferritin", test_values["Ferritin"], (30, 300)), ("TIBC", test_values["TIBC"], (250, 400)), ]), build_table("๐Ÿงฌ Liver & Kidney", [ ("Bilirubin", test_values["Bilirubin"], (0.3, 1.2)), ("Creatinine", test_values["Creatinine"], (0.6, 1.2)), ("Urea", test_values["Urea"], (7, 20)), ]), build_table("๐Ÿงช Electrolytes", [ ("Sodium", test_values["Sodium"], (135, 145)), ("Potassium", test_values["Potassium"], (3.5, 5.1)), ]), build_table("๐Ÿง Metabolic & Thyroid", [ ("FBS", test_values["FBS"], (70, 110)), ("HbA1c", test_values["HbA1c"], (4.0, 5.7)), ("TSH", test_values["TSH"], (0.4, 4.0)), ]), build_table("โค๏ธ Vitals", [ ("SpO2", spo2, (95, 100)), ("Heart Rate", heart_rate, (60, 100)), ("Respiratory Rate", rr, (12, 20)), ("Temperature", test_values["Temperature"], (97, 99)), ("BP Systolic", test_values["BP Systolic"], (90, 120)), ("BP Diastolic", test_values["BP Diastolic"], (60, 80)), ]), build_table("๐Ÿฉน Other Indicators", [ ("Cortisol", test_values["Cortisol"], (5, 25)), ("Albumin", test_values["Albumin"], (3.5, 5.5)), ]) ]) summary = "
" summary += "

๐Ÿ“ Summary for You

๐Ÿ’ก Tip: This is an AI-based estimate. Please follow up with a lab.

" html_output += summary html_output += "
" html_output += "

๐Ÿ“ž Book a Lab Test

Prefer confirmation? Find certified labs near you.

" html_output += "
" # Return the single final frame (blurred for display only) return html_output, blur_for_display(frame_rgb) # ============================== # Video path (assemble once; show only final blurred keyframe) # ============================== def analyze_video(video_path: Optional[str]) -> Tuple[str, Optional[np.ndarray]]: if not video_path or not os.path.exists(video_path): return "
โš ๏ธ Face video missing or unreadable.
", None cap = cv2.VideoCapture(video_path) frame_sample = None frames = 0 while True: ret, frame = cap.read() if not ret: break if frame_sample is None: frame_sample = frame.copy() frames += 1 cap.release() if frame_sample is None or frames < 30: return "
โš ๏ธ Video too short; please record ~20โ€“30s.
", None # Placeholder HR/SpO2 (replace with proper rPPG) try: hr_value = float(random_model_predict(65, 85)) gray = cv2.cvtColor(frame_sample, cv2.COLOR_BGR2GRAY) brightness_var = float(np.std(gray) / 255.0) skin_tone_index = float(np.mean(frame_sample[100:150, 100:150]) / 255.0) if frame_sample[100:150, 100:150].size else 0.5 spo2 = float(98.0 - (1.5 * brightness_var - 0.5 * (skin_tone_index - 0.5))) spo2 = float(np.clip(spo2, 92.0, 100.0)) except Exception: hr_value = float(random_model_predict(65, 85)) spo2 = float(random_model_predict(95, 100)) rr = int(12 + abs(int(hr_value) % 5 - 2)) test_values: Dict[str, Any] = { "Hemoglobin": random_model_predict(12.5, 15.0), "WBC Count": random_model_predict(4.0, 11.0), "Platelet Count": random_model_predict(150, 450), "Iron": random_model_predict(60, 170), "Ferritin": random_model_predict(30, 300), "TIBC": random_model_predict(250, 400), "Bilirubin": random_model_predict(0.3, 1.2), "Creatinine": random_model_predict(0.6, 1.2), "Urea": random_model_predict(7, 20), "Sodium": random_model_predict(135, 145), "Potassium": random_model_predict(3.5, 5.1), "TSH": random_model_predict(0.4, 4.0), "Cortisol": random_model_predict(5, 25), "FBS": random_model_predict(70, 110), "HbA1c": random_model_predict(4.0, 5.7), "Albumin": random_model_predict(3.5, 5.5), "BP Systolic": random_model_predict(90, 120), "BP Diastolic": random_model_predict(60, 80), "Temperature": random_model_predict(97, 99), } frame_rgb = cv2.cvtColor(frame_sample, cv2.COLOR_BGR2RGB) html_output = "".join([ f'
Hemoglobin Rยฒ Score: {hemoglobin_r2:.2f}
', build_table("๐Ÿฉธ Hematology", [ ("Hemoglobin", test_values["Hemoglobin"], (13.5, 17.5)), ("WBC Count", test_values["WBC Count"], (4.0, 11.0)), ("Platelet Count", test_values["Platelet Count"], (150, 450)), ]), build_table("๐Ÿงฌ Iron Panel", [ ("Iron", test_values["Iron"], (60, 170)), ("Ferritin", test_values["Ferritin"], (30, 300)), ("TIBC", test_values["TIBC"], (250, 400)), ]), build_table("๐Ÿงฌ Liver & Kidney", [ ("Bilirubin", test_values["Bilirubin"], (0.3, 1.2)), ("Creatinine", test_values["Creatinine"], (0.6, 1.2)), ("Urea", test_values["Urea"], (7, 20)), ]), build_table("๐Ÿงช Electrolytes", [ ("Sodium", test_values["Sodium"], (135, 145)), ("Potassium", test_values["Potassium"], (3.5, 5.1)), ]), build_table("๐Ÿง Metabolic & Thyroid", [ ("FBS", test_values["FBS"], (70, 110)), ("HbA1c", test_values["HbA1c"], (4.0, 5.7)), ("TSH", test_values["TSH"], (0.4, 4.0)), ]), build_table("โค๏ธ Vitals", [ ("SpO2", spo2, (95, 100)), ("Heart Rate", hr_value, (60, 100)), ("Respiratory Rate", rr, (12, 20)), ("Temperature", test_values["Temperature"], (97, 99)), ("BP Systolic", test_values["BP Systolic"], (90, 120)), ("BP Diastolic", test_values["BP Diastolic"], (60, 80)), ]), build_table("๐Ÿฉน Other Indicators", [ ("Cortisol", test_values["Cortisol"], (5, 25)), ("Albumin", test_values["Albumin"], (3.5, 5.5)), ]) ]) summary = "
" summary += "

๐Ÿ“ Summary for You

๐Ÿ’ก Tip: This is an AI-based estimate. Please follow up with a lab.

" html_output += summary html_output += "
" html_output += "

๐Ÿ“ž Book a Lab Test

Prefer confirmation? Find certified labs near you.

" html_output += "
" return html_output, blur_for_display(frame_rgb) # ============================== # WebRTC frame collector (no progress timer to avoid constant re-renders) # ============================== class HRCollectorVideoProcessor: def __init__(self): self.frames = deque(maxlen=30 * 90) # up to ~90s @30fps self.recording = False def recv(self, frame: av.VideoFrame) -> av.VideoFrame: img = frame.to_ndarray(format="bgr24") if self.recording: self.frames.append(img) return av.VideoFrame.from_ndarray(img, format="bgr24") def start(self): self.recording = True self.frames.clear() def stop_and_dump_to_file(self) -> Optional[str]: self.recording = False if len(self.frames) < 30: # ~1s return None h, w = self.frames[0].shape[:2] fourcc = cv2.VideoWriter_fourcc(*"mp4v") tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") tmp_path = tmp.name tmp.close() out = cv2.VideoWriter(tmp_path, fourcc, 30.0, (w, h)) for f in self.frames: out.write(f) out.release() return tmp_path # ============================== # Streamlit UI # ============================== st.markdown(""" # ๐Ÿง  Face-Based Lab Test AI Report Use **Image** (Hb) or **Video** (HR). Only a single **blurred preview** is shown **after** analysis to avoid flicker. """) # Session state to hold captured assets between form submits if "captured_image_bgr" not in st.session_state: st.session_state.captured_image_bgr = None if "captured_video_path" not in st.session_state: st.session_state.captured_video_path = None # NEW: buffer raw camera bytes in case user forgets to click "Save Image" if "img_bytes_buffer" not in st.session_state: st.session_state.img_bytes_buffer = None mode = st.radio("Choose Input Mode", ["Image", "Video"], horizontal=True) col_left, col_right = st.columns([1.2, 1], gap="large") with col_left: if mode == "Image": st.subheader("๐Ÿ“ธ Face / Eye Image") # Wrap inputs in a form so widget changes don't re-run the app with st.form("img_form", clear_on_submit=False): img_source = st.radio("Source", ["Camera", "Upload"], horizontal=True, key="img_source") uploaded_img = None if img_source == "Camera": # KEY ADDED + auto-buffer captured bytes cam_img = st.camera_input("Capture image", key="camera_img") if cam_img is not None: st.session_state.img_bytes_buffer = cam_img.getvalue() else: file_up = st.file_uploader("Upload JPG/PNG", type=["jpg", "jpeg", "png"]) if file_up is not None: uploaded_img = file_up.read() submitted_img = st.form_submit_button("Save Image") if submitted_img: # Prefer explicit upload; else fall back to auto-buffered camera bytes raw_bytes = uploaded_img if uploaded_img else st.session_state.img_bytes_buffer if raw_bytes: arr = np.frombuffer(raw_bytes, dtype=np.uint8) img_bgr = cv2.imdecode(arr, cv2.IMREAD_COLOR) if img_bgr is not None: st.session_state.captured_image_bgr = img_bgr st.success("Image saved. Now click **Analyze Image** below.") else: st.warning("Could not decode the image. Please recapture/upload.") else: st.warning("No image provided.") analyze_image = st.button("๐Ÿ” Analyze Image", type="primary", use_container_width=True) else: st.subheader("๐Ÿ“ฝ Face Video") with st.form("vid_form", clear_on_submit=False): vid_source = st.radio("Source", ["Camera (Live)", "Upload"], horizontal=True, key="vid_source") temp_video_path = None if vid_source == "Upload": up_vid = st.file_uploader("Upload MP4/AVI/MOV", type=["mp4", "avi", "mov"]) if up_vid is not None: with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmpf: tmpf.write(up_vid.read()) temp_video_path = tmpf.name else: st.write("Start โ†’ record ~20โ€“30s โ†’ Stop & Use") if "webrtc_ctx" not in st.session_state: st.session_state.webrtc_ctx = None st.session_state.hr_processor = HRCollectorVideoProcessor() ctx = webrtc_streamer( key="hr-webrtc", mode=WebRtcMode.SENDRECV, rtc_configuration=RTC_CONFIGURATION, media_stream_constraints={"video": True, "audio": False}, video_processor_factory=lambda: st.session_state.hr_processor, ) c1, c2 = st.columns(2) with c1: start_clicked = st.form_submit_button("Start Recording") with c2: stop_clicked = st.form_submit_button("Stop & Use") if start_clicked and ctx.state.playing: st.session_state.hr_processor.start() st.info("Recording started...") if stop_clicked: dump_path = st.session_state.hr_processor.stop_and_dump_to_file() if dump_path and os.path.exists(dump_path): temp_video_path = dump_path st.success("Video captured. Now click **Analyze Video** below.") else: st.warning("Captured video too short. Please record ~20โ€“30 seconds.") # Store selected/captured video after form submit submitted_vid = st.form_submit_button("Save Video") if submitted_vid: if temp_video_path: st.session_state.captured_video_path = temp_video_path elif vid_source == "Upload": st.warning("No video uploaded.") else: st.warning("No video captured yet.") analyze_video_btn = st.button("๐Ÿ” Analyze Video", type="primary", use_container_width=True) with col_right: st.subheader("๐Ÿ”’ Blurred Preview (shown only after analysis)") preview_placeholder = st.empty() st.markdown("---") # Single HTML report placeholder report_placeholder = st.empty() # Run analysis only when the explicit Analyze buttons are pressed if mode == "Image" and 'analyze_image' in locals() and analyze_image: # If user forgot "Save Image", try auto-buffer if st.session_state.captured_image_bgr is None and st.session_state.img_bytes_buffer is not None: arr = np.frombuffer(st.session_state.img_bytes_buffer, dtype=np.uint8) img_bgr = cv2.imdecode(arr, cv2.IMREAD_COLOR) if img_bgr is not None: st.session_state.captured_image_bgr = img_bgr html, frame_rgb_blurred = analyze_face(st.session_state.captured_image_bgr) if frame_rgb_blurred is not None: preview_placeholder.image(frame_rgb_blurred, caption="Blurred Image", use_container_width=True) with report_placeholder: components.html(html, height=1200, scrolling=True) elif mode == "Video" and 'analyze_video_btn' in locals() and analyze_video_btn: html, frame_rgb_blurred = analyze_video(st.session_state.captured_video_path) if frame_rgb_blurred is not None: preview_placeholder.image(frame_rgb_blurred, caption="Blurred Key Frame", use_container_width=True) with report_placeholder: components.html(html, height=1200, scrolling=True)