Spaces:
Sleeping
Sleeping
| """ | |
| Pipeline orchestrator for incremental stage evaluation. | |
| Stages: | |
| Stage 1 β VLM only (baseline): raw image β description | |
| Stage 2 β VLM + Depth Anything V2 (core contribution): | |
| depth context (scene layout) prepended to VLM query β richer description | |
| Models are lazily loaded on first use and reused across calls so the | |
| cost is paid once per session, not per image. | |
| """ | |
| import time | |
| from typing import Optional | |
| import numpy as np | |
| import torch | |
| from PIL import Image | |
| from .config import ASSISTIVE_PROMPT | |
| from .depth_context import build_depth_context | |
| from .models.depth import DepthEstimator | |
| from .models.detector import ObjectDetector | |
| from .models.vlm import VLM | |
| # --------------------------------------------------------------------------- | |
| # Timing helpers | |
| # --------------------------------------------------------------------------- | |
| def _sync() -> None: | |
| """Flush all pending CUDA kernels so wall-clock timestamps are accurate.""" | |
| if torch.cuda.is_available(): | |
| torch.cuda.synchronize() | |
| def _vram_mb() -> float: | |
| """Return currently allocated CUDA VRAM in megabytes.""" | |
| if torch.cuda.is_available(): | |
| return float(torch.cuda.memory_allocated() / 1024 ** 2) | |
| return 0.0 | |
| # --------------------------------------------------------------------------- | |
| # Pipeline class | |
| # --------------------------------------------------------------------------- | |
| class Pipeline: | |
| """Orchestrates the depth-aware scene description pipeline. | |
| Models are loaded lazily and cached: | |
| - Stage 1 loads the VLM only. | |
| - Stage 2 additionally loads DepthEstimator (VLM reused if already loaded). | |
| Example:: | |
| pipe = Pipeline() | |
| description, timing = pipe.run_stage1(frame_rgb) | |
| description, context, timing = pipe.run_stage2(frame_rgb) | |
| """ | |
| def __init__(self, force_model: Optional[str] = None) -> None: | |
| """Create a Pipeline with no models loaded yet. | |
| Args: | |
| force_model: Override VLM selection. Pass ``"moondream"``, | |
| ``"qwen"``, or ``"gemma4"``. ``"gemma4"`` requires the | |
| production environment (transformers >= 5.5.0). | |
| """ | |
| self._vlm = None # VLM | Gemma4VLM | |
| self._depth: Optional[DepthEstimator] = None | |
| self._detector: Optional[ObjectDetector] = None | |
| self._force_model = force_model | |
| # Cache last inference intermediates for the UI AR overlay | |
| self.last_depth_np: Optional[np.ndarray] = None | |
| self.last_boxes: Optional[np.ndarray] = None | |
| self.last_classes: list[str] = [] | |
| self.last_confs: list[float] = [] | |
| # ββ Lazy accessors ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _get_vlm(self): | |
| """Return the cached VLM, loading it on first call.""" | |
| if self._vlm is None: | |
| if self._force_model == "gemma4": | |
| from .models.gemma4 import Gemma4VLM | |
| self._vlm = Gemma4VLM() | |
| else: | |
| self._vlm = VLM(force_model=self._force_model) | |
| return self._vlm | |
| def _get_depth(self) -> DepthEstimator: | |
| if self._depth is None: | |
| self._depth = DepthEstimator() | |
| return self._depth | |
| def _get_detector(self) -> ObjectDetector: | |
| if self._detector is None: | |
| self._detector = ObjectDetector() | |
| return self._detector | |
| # ββ Public stage functions ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def run_stage1( | |
| self, | |
| frame_rgb: np.ndarray, | |
| ) -> tuple[str, dict[str, float]]: | |
| """Stage 1 β VLM only (baseline). | |
| Passes the raw image directly to the VLM with the standard assistive | |
| prompt. No depth information is used. This is the reference output | |
| that Stage 2 is measured against via BERTScore. | |
| Args: | |
| frame_rgb: uint8 RGB numpy array of shape (H, W, 3). | |
| Returns: | |
| description: VLM text response. | |
| timing: Dict with keys: | |
| ``vlm_s`` β seconds spent in VLM inference. | |
| ``total_s`` β wall-clock seconds for the full stage. | |
| ``vram_mb`` β CUDA memory allocated at end of stage (MB). | |
| """ | |
| vlm = self._get_vlm() | |
| image = Image.fromarray(frame_rgb) | |
| _sync() | |
| t_start = time.perf_counter() | |
| _sync() | |
| t0 = time.perf_counter() | |
| description = vlm.query_vlm(image, ASSISTIVE_PROMPT) | |
| _sync() | |
| vlm_s = time.perf_counter() - t0 | |
| total_s = time.perf_counter() - t_start | |
| timing: dict[str, float] = { | |
| "vlm_s": vlm_s, | |
| "total_s": total_s, | |
| "vram_mb": _vram_mb(), | |
| } | |
| return description, timing | |
| def run_stage2( | |
| self, | |
| frame_rgb: np.ndarray, | |
| ) -> tuple[str, str, dict[str, float]]: | |
| """Stage 2 β VLM + Depth Anything V2 (core contribution). | |
| Estimates a depth map, builds a scene-layout preamble (no object | |
| detector yet β per-object measurements are added in Stage 3), and | |
| prepends the preamble to the VLM query so the model has explicit | |
| spatial context. | |
| The preamble follows this format:: | |
| You have access to 3D scene geometry from a depth sensor. | |
| Measurements: | |
| - Scene layout: foreground (32%), midground (45%), background (23%) | |
| Question: Describe this scene for a visually impaired person... | |
| Args: | |
| frame_rgb: uint8 RGB numpy array of shape (H, W, 3). | |
| Returns: | |
| description: VLM text response with spatial awareness. | |
| depth_context: The structured depth preamble that was injected, | |
| useful for logging and evaluation. | |
| timing: Dict with keys: | |
| ``depth_s`` β seconds spent in depth estimation. | |
| ``vlm_s`` β seconds spent in VLM inference. | |
| ``total_s`` β wall-clock seconds for the full stage. | |
| ``vram_mb`` β CUDA memory allocated at end of stage (MB). | |
| """ | |
| vlm = self._get_vlm() | |
| depth_est = self._get_depth() | |
| image = Image.fromarray(frame_rgb) | |
| _sync() | |
| t_start = time.perf_counter() | |
| # ββ Depth estimation ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _sync() | |
| t0 = time.perf_counter() | |
| depth_np = depth_est.estimate_depth(frame_rgb) | |
| _sync() | |
| depth_s = time.perf_counter() - t0 | |
| # ββ Build depth context preamble ββββββββββββββββββββββββββββββββββββββ | |
| # No detector at Stage 2 β pass empty detections so build_depth_context | |
| # emits only the scene layout line. Per-object lines are added in | |
| # Stage 3 once YOLOv8n is introduced. | |
| depth_context = build_depth_context( | |
| frame_rgb=frame_rgb, | |
| depth_np=depth_np, | |
| boxes=[], | |
| classes=[], | |
| confidences=[], | |
| ) | |
| # ββ VLM query with depth preamble prepended βββββββββββββββββββββββββββ | |
| full_question = f"{depth_context}\n\nQuestion: {ASSISTIVE_PROMPT}" | |
| _sync() | |
| t0 = time.perf_counter() | |
| description = vlm.query_vlm(image, full_question) | |
| _sync() | |
| vlm_s = time.perf_counter() - t0 | |
| total_s = time.perf_counter() - t_start | |
| timing: dict[str, float] = { | |
| "depth_s": depth_s, | |
| "vlm_s": vlm_s, | |
| "total_s": total_s, | |
| "vram_mb": _vram_mb(), | |
| } | |
| return description, depth_context, timing | |
| def run_stage3( | |
| self, | |
| frame_rgb: np.ndarray, | |
| ) -> tuple[str, str, dict[str, float]]: | |
| """Stage 3 β VLM + Depth Anything V2 + YOLOv8n (enhanced). | |
| Adds precise per-object spatial measurements to the depth context by | |
| grounding each YOLO detection in the depth map. The VLM receives both | |
| the image and a preamble that names every detected object with its | |
| distance, physical size, and horizontal position. | |
| The preamble follows this format:: | |
| You have access to 3D scene geometry from a depth sensor. | |
| Measurements: | |
| - Object 1: cup (confidence 92%), depth ~35 cm, size ~8x10 cm, centre | |
| - Object 2: laptop (confidence 87%), depth ~65 cm, size ~35x25 cm, right | |
| - Scene layout: foreground (32%), midground (45%), background (23%) | |
| Question: Describe this scene for a visually impaired person... | |
| Args: | |
| frame_rgb: uint8 RGB numpy array of shape (H, W, 3). | |
| Returns: | |
| description: VLM text response with per-object spatial context. | |
| depth_context: The structured depth preamble that was injected, | |
| useful for logging and evaluation. | |
| timing: Dict with keys: | |
| ``depth_s`` β seconds spent in depth estimation. | |
| ``yolo_s`` β seconds spent in YOLO detection. | |
| ``vlm_s`` β seconds spent in VLM inference. | |
| ``total_s`` β wall-clock seconds for the full stage. | |
| ``vram_mb`` β CUDA memory allocated at end of stage (MB). | |
| ``n_detections`` β number of objects detected by YOLO. | |
| """ | |
| vlm = self._get_vlm() | |
| depth_est = self._get_depth() | |
| detector = self._get_detector() | |
| image = Image.fromarray(frame_rgb) | |
| _sync() | |
| t_start = time.perf_counter() | |
| # ββ Depth estimation ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _sync() | |
| t0 = time.perf_counter() | |
| depth_np = depth_est.estimate_depth(frame_rgb) | |
| _sync() | |
| depth_s = time.perf_counter() - t0 | |
| # ββ YOLO detection ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _sync() | |
| t0 = time.perf_counter() | |
| boxes, classes, confidences = detector.detect(frame_rgb) | |
| _sync() | |
| yolo_s = time.perf_counter() - t0 | |
| # Cache intermediates so the UI can build AR overlays without re-running | |
| self.last_depth_np = depth_np | |
| self.last_boxes = boxes | |
| self.last_classes = list(classes) | |
| self.last_confs = list(confidences) | |
| # ββ Build depth context with per-object measurements ββββββββββββββββββ | |
| depth_context = build_depth_context( | |
| frame_rgb=frame_rgb, | |
| depth_np=depth_np, | |
| boxes=boxes, | |
| classes=classes, | |
| confidences=confidences, | |
| ) | |
| # ββ VLM query with depth preamble prepended βββββββββββββββββββββββββββ | |
| full_question = f"{depth_context}\n\nQuestion: {ASSISTIVE_PROMPT}" | |
| _sync() | |
| t0 = time.perf_counter() | |
| description = vlm.query_vlm(image, full_question) | |
| _sync() | |
| vlm_s = time.perf_counter() - t0 | |
| total_s = time.perf_counter() - t_start | |
| timing: dict[str, float] = { | |
| "depth_s": depth_s, | |
| "yolo_s": yolo_s, | |
| "vlm_s": vlm_s, | |
| "total_s": total_s, | |
| "vram_mb": _vram_mb(), | |
| "n_detections": float(len(classes)), | |
| } | |
| return description, depth_context, timing | |