""" Pipeline orchestrator for incremental stage evaluation. Stages: Stage 1 — VLM only (baseline): raw image → description Stage 2 — VLM + Depth Anything V2 (core contribution): depth context (scene layout) prepended to VLM query → richer description Models are lazily loaded on first use and reused across calls so the cost is paid once per session, not per image. """ import time from typing import Optional import numpy as np import torch from PIL import Image from .config import ASSISTIVE_PROMPT from .depth_context import build_depth_context from .models.depth import DepthEstimator from .models.detector import ObjectDetector from .models.vlm import VLM # --------------------------------------------------------------------------- # Timing helpers # --------------------------------------------------------------------------- def _sync() -> None: """Flush all pending CUDA kernels so wall-clock timestamps are accurate.""" if torch.cuda.is_available(): torch.cuda.synchronize() def _vram_mb() -> float: """Return currently allocated CUDA VRAM in megabytes.""" if torch.cuda.is_available(): return float(torch.cuda.memory_allocated() / 1024 ** 2) return 0.0 # --------------------------------------------------------------------------- # Pipeline class # --------------------------------------------------------------------------- class Pipeline: """Orchestrates the depth-aware scene description pipeline. Models are loaded lazily and cached: - Stage 1 loads the VLM only. - Stage 2 additionally loads DepthEstimator (VLM reused if already loaded). Example:: pipe = Pipeline() description, timing = pipe.run_stage1(frame_rgb) description, context, timing = pipe.run_stage2(frame_rgb) """ def __init__(self, force_model: Optional[str] = None) -> None: """Create a Pipeline with no models loaded yet. Args: force_model: Override VLM selection. Pass ``"moondream"``, ``"qwen"``, or ``"gemma4"``. ``"gemma4"`` requires the production environment (transformers >= 5.5.0). """ self._vlm = None # VLM | Gemma4VLM self._depth: Optional[DepthEstimator] = None self._detector: Optional[ObjectDetector] = None self._force_model = force_model # Cache last inference intermediates for the UI AR overlay self.last_depth_np: Optional[np.ndarray] = None self.last_boxes: Optional[np.ndarray] = None self.last_classes: list[str] = [] self.last_confs: list[float] = [] # ── Lazy accessors ──────────────────────────────────────────────────────── def _get_vlm(self): """Return the cached VLM, loading it on first call.""" if self._vlm is None: if self._force_model == "gemma4": from .models.gemma4 import Gemma4VLM self._vlm = Gemma4VLM() else: self._vlm = VLM(force_model=self._force_model) return self._vlm def _get_depth(self) -> DepthEstimator: if self._depth is None: self._depth = DepthEstimator() return self._depth def _get_detector(self) -> ObjectDetector: if self._detector is None: self._detector = ObjectDetector() return self._detector # ── Public stage functions ──────────────────────────────────────────────── def run_stage1( self, frame_rgb: np.ndarray, ) -> tuple[str, dict[str, float]]: """Stage 1 — VLM only (baseline). Passes the raw image directly to the VLM with the standard assistive prompt. No depth information is used. This is the reference output that Stage 2 is measured against via BERTScore. Args: frame_rgb: uint8 RGB numpy array of shape (H, W, 3). Returns: description: VLM text response. timing: Dict with keys: ``vlm_s`` — seconds spent in VLM inference. ``total_s`` — wall-clock seconds for the full stage. ``vram_mb`` — CUDA memory allocated at end of stage (MB). """ vlm = self._get_vlm() image = Image.fromarray(frame_rgb) _sync() t_start = time.perf_counter() _sync() t0 = time.perf_counter() description = vlm.query_vlm(image, ASSISTIVE_PROMPT) _sync() vlm_s = time.perf_counter() - t0 total_s = time.perf_counter() - t_start timing: dict[str, float] = { "vlm_s": vlm_s, "total_s": total_s, "vram_mb": _vram_mb(), } return description, timing def run_stage2( self, frame_rgb: np.ndarray, ) -> tuple[str, str, dict[str, float]]: """Stage 2 — VLM + Depth Anything V2 (core contribution). Estimates a depth map, builds a scene-layout preamble (no object detector yet — per-object measurements are added in Stage 3), and prepends the preamble to the VLM query so the model has explicit spatial context. The preamble follows this format:: You have access to 3D scene geometry from a depth sensor. Measurements: - Scene layout: foreground (32%), midground (45%), background (23%) Question: Describe this scene for a visually impaired person... Args: frame_rgb: uint8 RGB numpy array of shape (H, W, 3). Returns: description: VLM text response with spatial awareness. depth_context: The structured depth preamble that was injected, useful for logging and evaluation. timing: Dict with keys: ``depth_s`` — seconds spent in depth estimation. ``vlm_s`` — seconds spent in VLM inference. ``total_s`` — wall-clock seconds for the full stage. ``vram_mb`` — CUDA memory allocated at end of stage (MB). """ vlm = self._get_vlm() depth_est = self._get_depth() image = Image.fromarray(frame_rgb) _sync() t_start = time.perf_counter() # ── Depth estimation ────────────────────────────────────────────────── _sync() t0 = time.perf_counter() depth_np = depth_est.estimate_depth(frame_rgb) _sync() depth_s = time.perf_counter() - t0 # ── Build depth context preamble ────────────────────────────────────── # No detector at Stage 2 — pass empty detections so build_depth_context # emits only the scene layout line. Per-object lines are added in # Stage 3 once YOLOv8n is introduced. depth_context = build_depth_context( frame_rgb=frame_rgb, depth_np=depth_np, boxes=[], classes=[], confidences=[], ) # ── VLM query with depth preamble prepended ─────────────────────────── full_question = f"{depth_context}\n\nQuestion: {ASSISTIVE_PROMPT}" _sync() t0 = time.perf_counter() description = vlm.query_vlm(image, full_question) _sync() vlm_s = time.perf_counter() - t0 total_s = time.perf_counter() - t_start timing: dict[str, float] = { "depth_s": depth_s, "vlm_s": vlm_s, "total_s": total_s, "vram_mb": _vram_mb(), } return description, depth_context, timing def run_stage3( self, frame_rgb: np.ndarray, ) -> tuple[str, str, dict[str, float]]: """Stage 3 — VLM + Depth Anything V2 + YOLOv8n (enhanced). Adds precise per-object spatial measurements to the depth context by grounding each YOLO detection in the depth map. The VLM receives both the image and a preamble that names every detected object with its distance, physical size, and horizontal position. The preamble follows this format:: You have access to 3D scene geometry from a depth sensor. Measurements: - Object 1: cup (confidence 92%), depth ~35 cm, size ~8x10 cm, centre - Object 2: laptop (confidence 87%), depth ~65 cm, size ~35x25 cm, right - Scene layout: foreground (32%), midground (45%), background (23%) Question: Describe this scene for a visually impaired person... Args: frame_rgb: uint8 RGB numpy array of shape (H, W, 3). Returns: description: VLM text response with per-object spatial context. depth_context: The structured depth preamble that was injected, useful for logging and evaluation. timing: Dict with keys: ``depth_s`` — seconds spent in depth estimation. ``yolo_s`` — seconds spent in YOLO detection. ``vlm_s`` — seconds spent in VLM inference. ``total_s`` — wall-clock seconds for the full stage. ``vram_mb`` — CUDA memory allocated at end of stage (MB). ``n_detections`` — number of objects detected by YOLO. """ vlm = self._get_vlm() depth_est = self._get_depth() detector = self._get_detector() image = Image.fromarray(frame_rgb) _sync() t_start = time.perf_counter() # ── Depth estimation ────────────────────────────────────────────────── _sync() t0 = time.perf_counter() depth_np = depth_est.estimate_depth(frame_rgb) _sync() depth_s = time.perf_counter() - t0 # ── YOLO detection ──────────────────────────────────────────────────── _sync() t0 = time.perf_counter() boxes, classes, confidences = detector.detect(frame_rgb) _sync() yolo_s = time.perf_counter() - t0 # Cache intermediates so the UI can build AR overlays without re-running self.last_depth_np = depth_np self.last_boxes = boxes self.last_classes = list(classes) self.last_confs = list(confidences) # ── Build depth context with per-object measurements ────────────────── depth_context = build_depth_context( frame_rgb=frame_rgb, depth_np=depth_np, boxes=boxes, classes=classes, confidences=confidences, ) # ── VLM query with depth preamble prepended ─────────────────────────── full_question = f"{depth_context}\n\nQuestion: {ASSISTIVE_PROMPT}" _sync() t0 = time.perf_counter() description = vlm.query_vlm(image, full_question) _sync() vlm_s = time.perf_counter() - t0 total_s = time.perf_counter() - t_start timing: dict[str, float] = { "depth_s": depth_s, "yolo_s": yolo_s, "vlm_s": vlm_s, "total_s": total_s, "vram_mb": _vram_mb(), "n_detections": float(len(classes)), } return description, depth_context, timing