DepthLens / src /evaluation /latency_benchmark.py
Rishabh Jain
Initial upload — depth-aware scene description system
5412d82
"""
Per-stage latency and peak VRAM benchmark.
Measures wall-clock latency and peak GPU memory across Stage 1, Stage 2,
and Stage 3 on a single representative image. Warmup runs prime CUDA caches
so the recorded runs reflect steady-state inference, not first-call overhead.
The headline target for the abstract: Stage 3 total latency < 700 ms.
Typical usage (library)::
from src.evaluation.latency_benchmark import benchmark
results = benchmark(Path("data/test_images/arkit_41159529_0000.jpg"))
CLI usage::
python -m src.evaluation.latency_benchmark \\
--image data/test_images/arkit_41159529_0000.jpg \\
--output outputs/results/latency.csv
"""
from __future__ import annotations
import argparse
import csv
import time
from pathlib import Path
from typing import Optional
import numpy as np
import torch
from PIL import Image
from ..config import N_LATENCY_RUNS, RESULTS_DIR
from ..pipeline import Pipeline
# ---------------------------------------------------------------------------
# Core benchmark function
# ---------------------------------------------------------------------------
def benchmark(
image_path: Path,
n_warmup: int = 2,
n_runs: int = 10,
stages: list[int] | None = None,
force_model: Optional[str] = None,
) -> dict:
"""Measure per-stage latency and peak VRAM on a single image.
For each requested stage:
1. Run ``n_warmup`` times (results discarded) to prime CUDA kernel
caches and avoid measuring JIT / first-call overhead.
2. Reset peak VRAM counter.
3. Run ``n_runs`` times, recording wall-clock time bracketed by
``torch.cuda.synchronize()`` calls so the timer measures only
completed GPU work.
4. Capture ``torch.cuda.max_memory_allocated()`` as peak VRAM.
All models are loaded once before the warmup loop and reused across
stages, matching real deployment behaviour.
Args:
image_path: Path to the image file to benchmark.
n_warmup: Number of throwaway runs before recording. Default 2.
n_runs: Number of timed runs to average over. Default 10.
stages: List of stage numbers to benchmark (subset of [1, 2, 3]).
Defaults to ``[1, 2, 3]``.
force_model: Passed to Pipeline — ``"moondream"`` or ``"qwen"`` to
override the VRAM-based auto-selection.
Returns:
Dict keyed by stage number (int). Each value is a dict with::
mean_ms — mean wall-clock latency in milliseconds
std_ms — standard deviation of latency
min_ms — minimum observed latency
max_ms — maximum observed latency
peak_vram_mb — peak CUDA memory allocated during timed runs (MB)
breakdown — sub-timing means in ms (stage-dependent keys)
A top-level ``"meta"`` key holds image path, n_warmup, n_runs.
Raises:
FileNotFoundError: If ``image_path`` does not exist.
ValueError: If ``stages`` contains a value outside [1, 2, 3].
"""
if stages is None:
stages = [1, 2, 3]
bad = [s for s in stages if s not in (1, 2, 3)]
if bad:
raise ValueError(f"stages must be subset of {{1, 2, 3}}, got: {bad}")
image_path = Path(image_path)
if not image_path.exists():
raise FileNotFoundError(f"Image not found: {image_path}")
frame_rgb = np.array(Image.open(image_path).convert("RGB"))
# Load all models once before any timing starts.
print(f"Loading pipeline models (force_model={force_model!r})...")
pipeline = Pipeline(force_model=force_model)
# Trigger lazy model loading now so first warmup isn't the load call.
_preload_models(pipeline, stages, frame_rgb)
results: dict = {
"meta": {
"image": str(image_path),
"n_warmup": n_warmup,
"n_runs": n_runs,
}
}
for stage in stages:
run_fn = _stage_runner(pipeline, stage, frame_rgb)
print(
f"\nStage {stage}: {n_warmup} warmup + {n_runs} timed runs...",
flush=True,
)
# ── Warmup ────────────────────────────────────────────────────────────
for _ in range(n_warmup):
run_fn()
# ── Timed runs ────────────────────────────────────────────────────────
if torch.cuda.is_available():
torch.cuda.reset_peak_memory_stats()
wall_times_ms: list[float] = []
sub_timings: list[dict[str, float]] = []
for r in range(n_runs):
if torch.cuda.is_available():
torch.cuda.synchronize()
t0 = time.perf_counter()
_, timing = run_fn()
if torch.cuda.is_available():
torch.cuda.synchronize()
elapsed_ms = (time.perf_counter() - t0) * 1000.0
wall_times_ms.append(elapsed_ms)
sub_timings.append(timing)
print(f" run {r + 1:2d}/{n_runs}: {elapsed_ms:7.1f} ms", flush=True)
peak_vram_mb = (
torch.cuda.max_memory_allocated() / (1024 ** 2)
if torch.cuda.is_available()
else 0.0
)
arr = np.array(wall_times_ms)
results[stage] = {
"mean_ms": float(np.mean(arr)),
"std_ms": float(np.std(arr)),
"min_ms": float(np.min(arr)),
"max_ms": float(np.max(arr)),
"peak_vram_mb": peak_vram_mb,
"breakdown": _mean_breakdown(sub_timings),
}
return results
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _preload_models(
pipeline: Pipeline,
stages: list[int],
frame_rgb: np.ndarray,
) -> None:
"""Trigger lazy model loading with a single throwaway run per stage."""
print(" Preloading models (one throwaway run per stage)...", flush=True)
if 1 in stages:
pipeline.run_stage1(frame_rgb)
if 2 in stages:
pipeline.run_stage2(frame_rgb)
if 3 in stages:
pipeline.run_stage3(frame_rgb)
print(" Models loaded.", flush=True)
def _stage_runner(pipeline: Pipeline, stage: int, frame_rgb: np.ndarray):
"""Return a zero-argument callable that runs the requested stage.
Returns ``(description, timing)`` so the caller can inspect sub-timings.
Stage 2 and 3 return three values; we normalise to two here.
"""
if stage == 1:
def _run():
desc, t = pipeline.run_stage1(frame_rgb)
return desc, t
elif stage == 2:
def _run():
desc, _ctx, t = pipeline.run_stage2(frame_rgb)
return desc, t
else:
def _run():
desc, _ctx, t = pipeline.run_stage3(frame_rgb)
return desc, t
return _run
def _mean_breakdown(timings: list[dict[str, float]]) -> dict[str, float]:
"""Average each sub-timing key across all runs, converting to ms."""
if not timings:
return {}
keys = [k for k in timings[0] if k not in ("vram_mb", "n_detections")]
result: dict[str, float] = {}
for k in keys:
vals = [t[k] * 1000.0 for t in timings if k in t]
if vals:
result[f"{k}_mean_ms"] = float(np.mean(vals))
return result
# ---------------------------------------------------------------------------
# CSV writer
# ---------------------------------------------------------------------------
_CSV_FIELDNAMES = [
"stage", "mean_ms", "std_ms", "min_ms", "max_ms", "peak_vram_mb",
]
def write_csv(results: dict, output_csv: Path) -> None:
"""Write benchmark results to a CSV file.
Args:
results: Return value of :func:`benchmark`.
output_csv: Destination path.
"""
output_csv.parent.mkdir(parents=True, exist_ok=True)
rows = []
for stage in (1, 2, 3):
if stage not in results:
continue
r = results[stage]
rows.append({
"stage": stage,
"mean_ms": round(r["mean_ms"], 2),
"std_ms": round(r["std_ms"], 2),
"min_ms": round(r["min_ms"], 2),
"max_ms": round(r["max_ms"], 2),
"peak_vram_mb": round(r["peak_vram_mb"], 1),
})
with open(output_csv, "w", newline="", encoding="utf-8") as fh:
writer = csv.DictWriter(fh, fieldnames=_CSV_FIELDNAMES)
writer.writeheader()
writer.writerows(rows)
# ---------------------------------------------------------------------------
# Pretty-print summary
# ---------------------------------------------------------------------------
def print_summary(results: dict) -> None:
"""Print a formatted three-stage comparison table to stdout.
Highlights whether Stage 3 meets the <700 ms abstract target.
Args:
results: Return value of :func:`benchmark`.
"""
meta = results.get("meta", {})
n_runs = meta.get("n_runs", "?")
image = Path(meta.get("image", "?")).name
sep = "-" * 74
print(f"\n{sep}")
print(f" Latency Benchmark — {image} ({n_runs} runs each)")
print(sep)
print(
f" {'Stage':<22} {'Mean ms':>9} {'Std ms':>8} "
f"{'Min ms':>8} {'Max ms':>8} {'Peak VRAM':>10}"
)
print(sep)
stage_labels = {
1: "Stage 1 (VLM only)",
2: "Stage 2 (VLM+Depth)",
3: "Stage 3 (VLM+Depth+YOLO)",
}
target_ms = 700.0
for stage in (1, 2, 3):
if stage not in results:
continue
r = results[stage]
tag = ""
if stage == 3:
tag = " ✓ <700ms" if r["mean_ms"] < target_ms else " ✗ >700ms"
print(
f" {stage_labels[stage]:<22} "
f"{r['mean_ms']:>9.1f} "
f"{r['std_ms']:>8.1f} "
f"{r['min_ms']:>8.1f} "
f"{r['max_ms']:>8.1f} "
f"{r['peak_vram_mb']:>8.0f} MB"
f"{tag}"
)
print(sep)
# Sub-timing breakdown
for stage in (1, 2, 3):
if stage not in results:
continue
bd = results[stage].get("breakdown", {})
if not bd:
continue
parts = " | ".join(
f"{k.replace('_mean_ms', '')}: {v:.1f} ms"
for k, v in sorted(bd.items())
)
print(f" S{stage} breakdown: {parts}")
print(sep)
# Abstract target verdict
if 3 in results:
mean3 = results[3]["mean_ms"]
verdict = (
f"PASS ({mean3:.1f} ms < 700 ms)"
if mean3 < target_ms
else f"FAIL ({mean3:.1f} ms ≥ 700 ms)"
)
print(f" Abstract target (Stage 3 < 700 ms): {verdict}")
print(sep)
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def _parse_args(argv: list[str] | None = None) -> argparse.Namespace:
p = argparse.ArgumentParser(
description=(
"Benchmark per-stage latency and peak VRAM. "
"Headline target: Stage 3 total < 700 ms."
)
)
p.add_argument(
"--image",
required=True,
help="Path to the representative image to benchmark.",
)
p.add_argument(
"--output",
default=None,
help=(
"Destination CSV "
"(default: outputs/results/latency.csv)."
),
)
p.add_argument(
"--n-warmup",
type=int,
default=2,
dest="n_warmup",
help="Number of warmup runs before recording (default: 2).",
)
p.add_argument(
"--n-runs",
type=int,
default=N_LATENCY_RUNS,
dest="n_runs",
help=f"Number of timed runs to average (default: {N_LATENCY_RUNS}).",
)
p.add_argument(
"--stages",
nargs="+",
type=int,
choices=[1, 2, 3],
default=[1, 2, 3],
help="Stages to benchmark (default: 1 2 3).",
)
p.add_argument(
"--force-model",
choices=["moondream", "qwen"],
default=None,
dest="force_model",
help="Override VRAM-based VLM selection.",
)
return p.parse_args(argv)
def main(argv: list[str] | None = None) -> None:
"""CLI entry point."""
args = _parse_args(argv)
output_csv = Path(args.output) if args.output else RESULTS_DIR / "latency.csv"
results = benchmark(
image_path=Path(args.image),
n_warmup=args.n_warmup,
n_runs=args.n_runs,
stages=sorted(set(args.stages)),
force_model=args.force_model,
)
print_summary(results)
write_csv(results, output_csv)
print(f"\n CSV written to: {output_csv}")
if __name__ == "__main__":
main()