DepthLens / src /pipeline.py
Rishabh Jain
Prepare Space for ZeroGPU deployment
842d4e9
"""
Pipeline orchestrator for incremental stage evaluation.
Stages:
Stage 1 β€” VLM only (baseline): raw image β†’ description
Stage 2 β€” VLM + Depth Anything V2 (core contribution):
depth context (scene layout) prepended to VLM query β†’ richer description
Models are lazily loaded on first use and reused across calls so the
cost is paid once per session, not per image.
"""
import time
from typing import Optional
import numpy as np
import torch
from PIL import Image
from .config import ASSISTIVE_PROMPT
from .depth_context import build_depth_context
from .models.depth import DepthEstimator
from .models.detector import ObjectDetector
from .models.vlm import VLM
# ---------------------------------------------------------------------------
# Timing helpers
# ---------------------------------------------------------------------------
def _sync() -> None:
"""Flush all pending CUDA kernels so wall-clock timestamps are accurate."""
if torch.cuda.is_available():
torch.cuda.synchronize()
def _vram_mb() -> float:
"""Return currently allocated CUDA VRAM in megabytes."""
if torch.cuda.is_available():
return float(torch.cuda.memory_allocated() / 1024 ** 2)
return 0.0
# ---------------------------------------------------------------------------
# Pipeline class
# ---------------------------------------------------------------------------
class Pipeline:
"""Orchestrates the depth-aware scene description pipeline.
Models are loaded lazily and cached:
- Stage 1 loads the VLM only.
- Stage 2 additionally loads DepthEstimator (VLM reused if already loaded).
Example::
pipe = Pipeline()
description, timing = pipe.run_stage1(frame_rgb)
description, context, timing = pipe.run_stage2(frame_rgb)
"""
def __init__(self, force_model: Optional[str] = None) -> None:
"""Create a Pipeline with no models loaded yet.
Args:
force_model: Override VLM selection. Pass ``"moondream"``,
``"qwen"``, or ``"gemma4"``. ``"gemma4"`` requires the
production environment (transformers >= 5.5.0).
"""
self._vlm = None # VLM | Gemma4VLM
self._depth: Optional[DepthEstimator] = None
self._detector: Optional[ObjectDetector] = None
self._force_model = force_model
# Cache last inference intermediates for the UI AR overlay
self.last_depth_np: Optional[np.ndarray] = None
self.last_boxes: Optional[np.ndarray] = None
self.last_classes: list[str] = []
self.last_confs: list[float] = []
# ── Lazy accessors ────────────────────────────────────────────────────────
def _get_vlm(self):
"""Return the cached VLM, loading it on first call."""
if self._vlm is None:
if self._force_model == "gemma4":
from .models.gemma4 import Gemma4VLM
self._vlm = Gemma4VLM()
else:
self._vlm = VLM(force_model=self._force_model)
return self._vlm
def _get_depth(self) -> DepthEstimator:
if self._depth is None:
self._depth = DepthEstimator()
return self._depth
def _get_detector(self) -> ObjectDetector:
if self._detector is None:
self._detector = ObjectDetector()
return self._detector
# ── Public stage functions ────────────────────────────────────────────────
def run_stage1(
self,
frame_rgb: np.ndarray,
) -> tuple[str, dict[str, float]]:
"""Stage 1 β€” VLM only (baseline).
Passes the raw image directly to the VLM with the standard assistive
prompt. No depth information is used. This is the reference output
that Stage 2 is measured against via BERTScore.
Args:
frame_rgb: uint8 RGB numpy array of shape (H, W, 3).
Returns:
description: VLM text response.
timing: Dict with keys:
``vlm_s`` β€” seconds spent in VLM inference.
``total_s`` β€” wall-clock seconds for the full stage.
``vram_mb`` β€” CUDA memory allocated at end of stage (MB).
"""
vlm = self._get_vlm()
image = Image.fromarray(frame_rgb)
_sync()
t_start = time.perf_counter()
_sync()
t0 = time.perf_counter()
description = vlm.query_vlm(image, ASSISTIVE_PROMPT)
_sync()
vlm_s = time.perf_counter() - t0
total_s = time.perf_counter() - t_start
timing: dict[str, float] = {
"vlm_s": vlm_s,
"total_s": total_s,
"vram_mb": _vram_mb(),
}
return description, timing
def run_stage2(
self,
frame_rgb: np.ndarray,
) -> tuple[str, str, dict[str, float]]:
"""Stage 2 β€” VLM + Depth Anything V2 (core contribution).
Estimates a depth map, builds a scene-layout preamble (no object
detector yet β€” per-object measurements are added in Stage 3), and
prepends the preamble to the VLM query so the model has explicit
spatial context.
The preamble follows this format::
You have access to 3D scene geometry from a depth sensor.
Measurements:
- Scene layout: foreground (32%), midground (45%), background (23%)
Question: Describe this scene for a visually impaired person...
Args:
frame_rgb: uint8 RGB numpy array of shape (H, W, 3).
Returns:
description: VLM text response with spatial awareness.
depth_context: The structured depth preamble that was injected,
useful for logging and evaluation.
timing: Dict with keys:
``depth_s`` β€” seconds spent in depth estimation.
``vlm_s`` β€” seconds spent in VLM inference.
``total_s`` β€” wall-clock seconds for the full stage.
``vram_mb`` β€” CUDA memory allocated at end of stage (MB).
"""
vlm = self._get_vlm()
depth_est = self._get_depth()
image = Image.fromarray(frame_rgb)
_sync()
t_start = time.perf_counter()
# ── Depth estimation ──────────────────────────────────────────────────
_sync()
t0 = time.perf_counter()
depth_np = depth_est.estimate_depth(frame_rgb)
_sync()
depth_s = time.perf_counter() - t0
# ── Build depth context preamble ──────────────────────────────────────
# No detector at Stage 2 β€” pass empty detections so build_depth_context
# emits only the scene layout line. Per-object lines are added in
# Stage 3 once YOLOv8n is introduced.
depth_context = build_depth_context(
frame_rgb=frame_rgb,
depth_np=depth_np,
boxes=[],
classes=[],
confidences=[],
)
# ── VLM query with depth preamble prepended ───────────────────────────
full_question = f"{depth_context}\n\nQuestion: {ASSISTIVE_PROMPT}"
_sync()
t0 = time.perf_counter()
description = vlm.query_vlm(image, full_question)
_sync()
vlm_s = time.perf_counter() - t0
total_s = time.perf_counter() - t_start
timing: dict[str, float] = {
"depth_s": depth_s,
"vlm_s": vlm_s,
"total_s": total_s,
"vram_mb": _vram_mb(),
}
return description, depth_context, timing
def run_stage3(
self,
frame_rgb: np.ndarray,
) -> tuple[str, str, dict[str, float]]:
"""Stage 3 β€” VLM + Depth Anything V2 + YOLOv8n (enhanced).
Adds precise per-object spatial measurements to the depth context by
grounding each YOLO detection in the depth map. The VLM receives both
the image and a preamble that names every detected object with its
distance, physical size, and horizontal position.
The preamble follows this format::
You have access to 3D scene geometry from a depth sensor.
Measurements:
- Object 1: cup (confidence 92%), depth ~35 cm, size ~8x10 cm, centre
- Object 2: laptop (confidence 87%), depth ~65 cm, size ~35x25 cm, right
- Scene layout: foreground (32%), midground (45%), background (23%)
Question: Describe this scene for a visually impaired person...
Args:
frame_rgb: uint8 RGB numpy array of shape (H, W, 3).
Returns:
description: VLM text response with per-object spatial context.
depth_context: The structured depth preamble that was injected,
useful for logging and evaluation.
timing: Dict with keys:
``depth_s`` β€” seconds spent in depth estimation.
``yolo_s`` β€” seconds spent in YOLO detection.
``vlm_s`` β€” seconds spent in VLM inference.
``total_s`` β€” wall-clock seconds for the full stage.
``vram_mb`` β€” CUDA memory allocated at end of stage (MB).
``n_detections`` β€” number of objects detected by YOLO.
"""
vlm = self._get_vlm()
depth_est = self._get_depth()
detector = self._get_detector()
image = Image.fromarray(frame_rgb)
_sync()
t_start = time.perf_counter()
# ── Depth estimation ──────────────────────────────────────────────────
_sync()
t0 = time.perf_counter()
depth_np = depth_est.estimate_depth(frame_rgb)
_sync()
depth_s = time.perf_counter() - t0
# ── YOLO detection ────────────────────────────────────────────────────
_sync()
t0 = time.perf_counter()
boxes, classes, confidences = detector.detect(frame_rgb)
_sync()
yolo_s = time.perf_counter() - t0
# Cache intermediates so the UI can build AR overlays without re-running
self.last_depth_np = depth_np
self.last_boxes = boxes
self.last_classes = list(classes)
self.last_confs = list(confidences)
# ── Build depth context with per-object measurements ──────────────────
depth_context = build_depth_context(
frame_rgb=frame_rgb,
depth_np=depth_np,
boxes=boxes,
classes=classes,
confidences=confidences,
)
# ── VLM query with depth preamble prepended ───────────────────────────
full_question = f"{depth_context}\n\nQuestion: {ASSISTIVE_PROMPT}"
_sync()
t0 = time.perf_counter()
description = vlm.query_vlm(image, full_question)
_sync()
vlm_s = time.perf_counter() - t0
total_s = time.perf_counter() - t_start
timing: dict[str, float] = {
"depth_s": depth_s,
"yolo_s": yolo_s,
"vlm_s": vlm_s,
"total_s": total_s,
"vram_mb": _vram_mb(),
"n_detections": float(len(classes)),
}
return description, depth_context, timing