Spaces:
Sleeping
Sleeping
| """ | |
| Production Gradio app β Gemma 4 E2B IT + Depth + YOLO. | |
| Designed for real-world assistive use, not academic demonstration: | |
| - Always runs Stage 3 (VLM + Depth + YOLO) β best spatial quality. | |
| - Browser TTS via Web Speech API reads every new description aloud. | |
| - Auto Mode fires inference automatically on scene change. | |
| - Description history shows the last 3 outputs for quick review. | |
| - Dark accessible theme with large text. | |
| Requirements | |
| ------------ | |
| Run in the production conda environment (scene-prod), NOT the academic one. | |
| See requirements_prod.txt for setup instructions. | |
| Launch:: | |
| conda activate scene-prod | |
| python -m src.ui.app_prod | |
| Environment variables:: | |
| HF_TOKEN=hf_... # required for gated Gemma 4 model | |
| PROD_SHARE=1 # set to expose a public Gradio tunnel | |
| """ | |
| from __future__ import annotations | |
| import difflib | |
| import textwrap | |
| import time | |
| from collections import deque | |
| from typing import Any | |
| import gradio as gr | |
| import matplotlib | |
| import numpy as np | |
| from PIL import Image, ImageDraw, ImageFont | |
| matplotlib.use("Agg") | |
| import matplotlib.cm as cm # noqa: E402 | |
| from ..config import DEPTH_MAX_CM, DEPTH_MIN_CM | |
| from ..depth_context import build_depth_context | |
| from ..models.depth import DepthEstimator | |
| from ..models.detector import ObjectDetector | |
| from ..models.gemma4 import Gemma4VLM | |
| from ..pipeline import Pipeline, _sync, _vram_mb | |
| # ββ Constants ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _FRAME_DIFF_THRESHOLD: float = 4.0 # grayscale MAD to trigger inference | |
| _SIMILARITY_THRESHOLD: float = 0.80 # SequenceMatcher ratio to suppress TTS | |
| _HISTORY_LEN: int = 3 # number of past descriptions to keep | |
| _AUTO_INTERVAL_S: float = 6.0 # seconds between auto-describe ticks | |
| _BOX_PALETTE: list[tuple[int, int, int]] = [ | |
| (255, 56, 56), # red | |
| (56, 168, 255), # blue | |
| (56, 255, 101), # green | |
| (255, 178, 29), # orange | |
| (180, 56, 255), # purple | |
| (255, 56, 200), # pink | |
| (29, 220, 220), # cyan | |
| (255, 225, 56), # yellow | |
| ] | |
| # Web Speech API JS snippet β injected via Gradio's js= parameter. | |
| # Reads `text` aloud, cancelling any in-progress utterance first. | |
| _TTS_JS = """ | |
| (text) => { | |
| if (!text || text.trim() === "") return text; | |
| window.speechSynthesis.cancel(); | |
| const u = new SpeechSynthesisUtterance(text); | |
| u.rate = 0.92; | |
| u.pitch = 1.0; | |
| window.speechSynthesis.speak(u); | |
| return text; | |
| } | |
| """ | |
| # ββ Global pipeline (loaded once at startup) βββββββββββββββββββββββββββββββββ | |
| _PIPELINE: Pipeline | None = None | |
| def _get_pipeline() -> Pipeline: | |
| global _PIPELINE | |
| if _PIPELINE is None: | |
| _PIPELINE = Pipeline(force_model="gemma4") | |
| return _PIPELINE | |
| # ββ AR overlay helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _draw_annotated( | |
| frame_rgb: np.ndarray, | |
| depth_np: np.ndarray | None, | |
| boxes: np.ndarray, | |
| classes: list[str], | |
| confidences: list[float], | |
| ) -> np.ndarray: | |
| """Draw YOLO bounding boxes with class + distance labels.""" | |
| img = Image.fromarray(frame_rgb).convert("RGB") | |
| draw = ImageDraw.Draw(img) | |
| W, H = img.size | |
| try: | |
| font = ImageFont.truetype("arial.ttf", size=max(14, W // 50)) | |
| except (IOError, OSError): | |
| font = ImageFont.load_default() | |
| for i, (box, cls, conf) in enumerate(zip(boxes, classes, confidences)): | |
| x1, y1, x2, y2 = (int(v) for v in box) | |
| colour = _BOX_PALETTE[i % len(_BOX_PALETTE)] | |
| for t in range(3): | |
| draw.rectangle([x1 - t, y1 - t, x2 + t, y2 + t], outline=colour) | |
| dist_str = "" | |
| if depth_np is not None: | |
| roi = depth_np[ | |
| max(0, y1):min(H - 1, y2) + 1, | |
| max(0, x1):min(W - 1, x2) + 1, | |
| ] | |
| if roi.size > 0: | |
| med = float(np.median(roi)) | |
| dist_cm = DEPTH_MAX_CM - (med / 255.0) * (DEPTH_MAX_CM - DEPTH_MIN_CM) | |
| dist_str = f" ~{dist_cm:.0f} cm" | |
| label = f"{cls}{dist_str} {int(conf * 100)}%" | |
| bbox = draw.textbbox((x1, y1), label, font=font) | |
| pad = 3 | |
| draw.rectangle( | |
| [bbox[0] - pad, bbox[1] - pad, bbox[2] + pad, bbox[3] + pad], | |
| fill=colour, | |
| ) | |
| draw.text((x1, y1), label, fill=(255, 255, 255), font=font) | |
| return np.array(img) | |
| def _depth_overlay(frame_rgb: np.ndarray, depth_np: np.ndarray, alpha: float = 0.50) -> np.ndarray: | |
| """Blend PLASMA colourmap of depth over the RGB frame.""" | |
| plasma = cm.get_cmap("plasma") | |
| coloured = (plasma(depth_np / 255.0)[..., :3] * 255).astype(np.uint8) | |
| return ( | |
| alpha * coloured.astype(np.float32) | |
| + (1.0 - alpha) * frame_rgb.astype(np.float32) | |
| ).astype(np.uint8) | |
| def _placeholder(message: str, w: int = 640, h: int = 360) -> np.ndarray: | |
| """Dark-grey placeholder image with a centred message.""" | |
| base = np.full((h, w, 3), 30, dtype=np.uint8) | |
| img = Image.fromarray(base).convert("RGB") | |
| draw = ImageDraw.Draw(img) | |
| try: | |
| font = ImageFont.truetype("arial.ttf", size=max(16, w // 38)) | |
| except (IOError, OSError): | |
| font = ImageFont.load_default() | |
| for i, line in enumerate(textwrap.wrap(message, width=38)): | |
| bbox = draw.textbbox((0, 0), line, font=font) | |
| x = (w - (bbox[2] - bbox[0])) // 2 | |
| y = h // 2 - 20 + i * 28 | |
| draw.text((x + 1, y + 1), line, fill=(0, 0, 0), font=font) | |
| draw.text((x, y), line, fill=(255, 220, 60), font=font) | |
| return np.array(img) | |
| # ββ Change detection ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _scene_changed(prev: np.ndarray | None, curr: np.ndarray) -> bool: | |
| if prev is None: | |
| return True | |
| thumb = lambda a: np.array(Image.fromarray(a).resize((128, 72))).mean(axis=2) | |
| return float(np.mean(np.abs(thumb(curr).astype(np.float32) - thumb(prev).astype(np.float32)))) >= _FRAME_DIFF_THRESHOLD | |
| def _too_similar(a: str, b: str) -> bool: | |
| if not a or not b: | |
| return False | |
| return difflib.SequenceMatcher(None, a.lower(), b.lower()).ratio() >= _SIMILARITY_THRESHOLD | |
| # ββ Core inference βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _run_stage3(frame_rgb: np.ndarray) -> tuple[np.ndarray, np.ndarray, str, str, str]: | |
| """Run Stage 3 inference and return all UI outputs. | |
| Returns: | |
| annotated_frame, depth_map, description, depth_context, timing_text | |
| """ | |
| pipeline = _get_pipeline() | |
| description, depth_context, timing = pipeline.run_stage3(frame_rgb) | |
| # Reuse intermediates cached by run_stage3 β no second depth/YOLO pass | |
| depth_np = pipeline.last_depth_np | |
| boxes_np = pipeline.last_boxes if pipeline.last_boxes is not None else np.empty((0, 4), dtype=np.float32) | |
| classes = pipeline.last_classes | |
| confs = pipeline.last_confs | |
| annotated = ( | |
| _draw_annotated(frame_rgb, depth_np, boxes_np, classes, confs) | |
| if len(boxes_np) > 0 | |
| else frame_rgb.copy() | |
| ) | |
| depth_vis = _depth_overlay(frame_rgb, depth_np) if depth_np is not None else frame_rgb.copy() | |
| n_det = len(classes) | |
| t = timing | |
| timing_text = ( | |
| f"Total {t['total_s']*1000:.0f} ms | " | |
| f"Depth {t.get('depth_s',0)*1000:.0f} ms | " | |
| f"YOLO {t.get('yolo_s',0)*1000:.0f} ms ({n_det} obj) | " | |
| f"VLM {t['vlm_s']*1000:.0f} ms | " | |
| f"VRAM {t['vram_mb']:.0f} MB" | |
| ) | |
| return annotated, depth_vis, description, depth_context, timing_text | |
| # ββ Gradio callbacks ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def on_describe( | |
| input_image: np.ndarray | None, | |
| history: list[str], | |
| ) -> tuple: | |
| """Manual describe button callback.""" | |
| if input_image is None: | |
| ph = _placeholder("Upload or capture an image first") | |
| return ph, ph, "", "", "", history, "" | |
| frame_rgb = input_image.astype(np.uint8) | |
| annotated, depth_vis, description, depth_ctx, timing = _run_stage3(frame_rgb) | |
| history = list(history) | |
| if description: | |
| history.insert(0, description) | |
| history = history[:_HISTORY_LEN] | |
| history_md = "\n\n---\n\n".join( | |
| f"**[{i+1}]** {d}" for i, d in enumerate(history) | |
| ) | |
| return annotated, depth_vis, description, depth_ctx, timing, history, history_md | |
| def on_auto_tick( | |
| live_frame: np.ndarray | None, | |
| prev_description: str, | |
| prev_frame_ref: np.ndarray | None, | |
| history: list[str], | |
| ) -> tuple: | |
| """Auto Mode timer callback β runs only when the scene has changed.""" | |
| def _skip(status: str) -> tuple: | |
| return ( | |
| gr.update(), gr.update(), # image panels | |
| prev_description, # keep previous description | |
| gr.update(), # depth context | |
| f"βΈ {status}", # timing / status | |
| prev_description, # state: prev_description | |
| prev_frame_ref, # state: prev frame | |
| history, # state: history | |
| gr.update(), # history markdown | |
| f"**{status}**", # status badge | |
| ) | |
| if live_frame is None: | |
| return _skip("Waiting for cameraβ¦") | |
| frame_rgb = live_frame.astype(np.uint8) | |
| if not _scene_changed(prev_frame_ref, frame_rgb): | |
| return _skip("No change detected") | |
| annotated, depth_vis, description, depth_ctx, timing = _run_stage3(frame_rgb) | |
| if _too_similar(description, prev_description): | |
| return ( | |
| annotated, depth_vis, | |
| prev_description, | |
| depth_ctx, timing, | |
| prev_description, frame_rgb, | |
| history, gr.update(), | |
| "**Minor shift β description unchanged**", | |
| ) | |
| new_history = list(history) | |
| if description: | |
| new_history.insert(0, description) | |
| new_history = new_history[:_HISTORY_LEN] | |
| history_md = "\n\n---\n\n".join( | |
| f"**[{i+1}]** {d}" for i, d in enumerate(new_history) | |
| ) | |
| return ( | |
| annotated, depth_vis, | |
| description, depth_ctx, timing, | |
| description, frame_rgb, | |
| new_history, history_md, | |
| "**Scene updated**", | |
| ) | |
| def on_toggle_auto(is_active: bool) -> tuple: | |
| """Toggle the Auto Mode timer and swap visible webcam inputs.""" | |
| new_active = not is_active | |
| label = "βΉ Stop Auto" if new_active else "βΆ Auto Mode" | |
| status = "π΄ **Auto β describes every scene change**" if new_active else "" | |
| return ( | |
| new_active, | |
| gr.update(active=new_active), | |
| gr.update(value=label), | |
| status, | |
| gr.update(visible=not new_active), # snapshot input | |
| gr.update(visible=new_active), # streaming input | |
| ) | |
| # ββ UI construction βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def build_ui() -> gr.Blocks: | |
| """Build and return the production Gradio Blocks interface.""" | |
| with gr.Blocks(title="Scene Describer β Gemma 4") as demo: | |
| # ββ Header ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| gr.Markdown( | |
| """ | |
| # Scene Describer | |
| ### Spatial scene understanding for visually impaired users | |
| Powered by **Gemma 4 E2B IT** Β· Depth Anything V2 Β· YOLOv8n | |
| """ | |
| ) | |
| # ββ Hidden state ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| auto_active = gr.State(False) | |
| prev_desc_st = gr.State("") | |
| prev_frame_st = gr.State(None) | |
| history_st = gr.State([]) | |
| timer = gr.Timer(value=_AUTO_INTERVAL_S, active=False) | |
| with gr.Row(): | |
| # ββ Left: input controls ββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Column(scale=1, min_width=300): | |
| gr.Markdown("### Camera") | |
| snapshot_in = gr.Image( | |
| label="Upload or webcam snapshot", | |
| sources=["webcam", "upload"], | |
| type="numpy", | |
| height=320, | |
| ) | |
| streaming_in = gr.Image( | |
| label="Live camera (Auto Mode)", | |
| sources=["webcam"], | |
| type="numpy", | |
| streaming=True, | |
| height=320, | |
| visible=False, | |
| ) | |
| with gr.Row(): | |
| describe_btn = gr.Button( | |
| "Describe Scene", | |
| variant="primary", | |
| scale=2, | |
| size="lg", | |
| ) | |
| auto_btn = gr.Button( | |
| "βΆ Auto Mode", | |
| variant="secondary", | |
| scale=1, | |
| size="lg", | |
| ) | |
| auto_status = gr.Markdown( | |
| value="", | |
| elem_classes=["status-badge"], | |
| ) | |
| gr.Markdown( | |
| "**Auto Mode** uses your live camera and describes the " | |
| "scene whenever it changes. Each description is read " | |
| "aloud automatically." | |
| ) | |
| # ββ Right: outputs ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Column(scale=2): | |
| with gr.Row(): | |
| annotated_out = gr.Image( | |
| label="Objects detected", | |
| type="numpy", | |
| height=250, | |
| interactive=False, | |
| ) | |
| depth_out = gr.Image( | |
| label="Depth map", | |
| type="numpy", | |
| height=250, | |
| interactive=False, | |
| ) | |
| description_out = gr.Textbox( | |
| label="Scene description", | |
| lines=6, | |
| interactive=False, | |
| elem_classes=["description-box"], | |
| placeholder="The description will appear here and be read aloudβ¦", | |
| ) | |
| with gr.Accordion("Depth measurements (injected preamble)", open=False): | |
| depth_ctx_out = gr.Textbox( | |
| label="", | |
| lines=6, | |
| interactive=False, | |
| show_label=False, | |
| ) | |
| with gr.Accordion("Performance", open=False): | |
| timing_out = gr.Textbox( | |
| label="", | |
| lines=2, | |
| interactive=False, | |
| show_label=False, | |
| elem_classes=["timing-box"], | |
| ) | |
| with gr.Accordion(f"Last {_HISTORY_LEN} descriptions", open=False): | |
| history_md_out = gr.Markdown( | |
| value="", | |
| elem_classes=["history-box"], | |
| ) | |
| # ββ Wiring ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Manual describe button | |
| describe_btn.click( | |
| fn=on_describe, | |
| inputs=[snapshot_in, history_st], | |
| outputs=[ | |
| annotated_out, depth_out, | |
| description_out, depth_ctx_out, timing_out, | |
| history_st, history_md_out, | |
| ], | |
| ).then( | |
| # After the Python callback, trigger TTS in the browser. | |
| fn=None, | |
| inputs=[description_out], | |
| outputs=[description_out], | |
| js=_TTS_JS, | |
| ) | |
| # Auto Mode toggle | |
| auto_btn.click( | |
| fn=on_toggle_auto, | |
| inputs=[auto_active], | |
| outputs=[ | |
| auto_active, timer, auto_btn, | |
| auto_status, snapshot_in, streaming_in, | |
| ], | |
| ) | |
| # Timer tick | |
| timer.tick( | |
| fn=on_auto_tick, | |
| inputs=[streaming_in, prev_desc_st, prev_frame_st, history_st], | |
| outputs=[ | |
| annotated_out, depth_out, | |
| description_out, depth_ctx_out, timing_out, | |
| prev_desc_st, prev_frame_st, | |
| history_st, history_md_out, | |
| auto_status, | |
| ], | |
| ).then( | |
| fn=None, | |
| inputs=[description_out], | |
| outputs=[description_out], | |
| js=_TTS_JS, | |
| ) | |
| return demo | |
| # ββ Entry point βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def main() -> None: | |
| """Load the pipeline, build the UI, and launch.""" | |
| import os | |
| # Eagerly load all models so the first request is fast. | |
| print("Initialising pipeline (Gemma 4 + Depth + YOLO)β¦") | |
| pipeline = _get_pipeline() | |
| pipeline._get_depth() | |
| pipeline._get_detector() | |
| print("All models loaded. Starting UIβ¦") | |
| demo = build_ui() | |
| demo.launch( | |
| server_name="127.0.0.1", | |
| server_port=7862, # separate port from the academic app | |
| share=bool(os.environ.get("PROD_SHARE", "")), | |
| favicon_path=None, | |
| theme=gr.themes.Base( | |
| primary_hue=gr.themes.colors.violet, | |
| secondary_hue=gr.themes.colors.slate, | |
| neutral_hue=gr.themes.colors.slate, | |
| font=gr.themes.GoogleFont("Inter"), | |
| ), | |
| css=""" | |
| .description-box textarea { font-size: 1.15rem !important; line-height: 1.6 !important; } | |
| .history-box { font-size: 0.95rem; } | |
| .status-badge { font-size: 0.9rem; color: #aaa; } | |
| .timing-box textarea { font-size: 0.8rem !important; font-family: monospace; } | |
| """, | |
| ) | |
| if __name__ == "__main__": | |
| main() | |