""" Per-stage latency and peak VRAM benchmark. Measures wall-clock latency and peak GPU memory across Stage 1, Stage 2, and Stage 3 on a single representative image. Warmup runs prime CUDA caches so the recorded runs reflect steady-state inference, not first-call overhead. The headline target for the abstract: Stage 3 total latency < 700 ms. Typical usage (library):: from src.evaluation.latency_benchmark import benchmark results = benchmark(Path("data/test_images/arkit_41159529_0000.jpg")) CLI usage:: python -m src.evaluation.latency_benchmark \\ --image data/test_images/arkit_41159529_0000.jpg \\ --output outputs/results/latency.csv """ from __future__ import annotations import argparse import csv import time from pathlib import Path from typing import Optional import numpy as np import torch from PIL import Image from ..config import N_LATENCY_RUNS, RESULTS_DIR from ..pipeline import Pipeline # --------------------------------------------------------------------------- # Core benchmark function # --------------------------------------------------------------------------- def benchmark( image_path: Path, n_warmup: int = 2, n_runs: int = 10, stages: list[int] | None = None, force_model: Optional[str] = None, ) -> dict: """Measure per-stage latency and peak VRAM on a single image. For each requested stage: 1. Run ``n_warmup`` times (results discarded) to prime CUDA kernel caches and avoid measuring JIT / first-call overhead. 2. Reset peak VRAM counter. 3. Run ``n_runs`` times, recording wall-clock time bracketed by ``torch.cuda.synchronize()`` calls so the timer measures only completed GPU work. 4. Capture ``torch.cuda.max_memory_allocated()`` as peak VRAM. All models are loaded once before the warmup loop and reused across stages, matching real deployment behaviour. Args: image_path: Path to the image file to benchmark. n_warmup: Number of throwaway runs before recording. Default 2. n_runs: Number of timed runs to average over. Default 10. stages: List of stage numbers to benchmark (subset of [1, 2, 3]). Defaults to ``[1, 2, 3]``. force_model: Passed to Pipeline — ``"moondream"`` or ``"qwen"`` to override the VRAM-based auto-selection. Returns: Dict keyed by stage number (int). Each value is a dict with:: mean_ms — mean wall-clock latency in milliseconds std_ms — standard deviation of latency min_ms — minimum observed latency max_ms — maximum observed latency peak_vram_mb — peak CUDA memory allocated during timed runs (MB) breakdown — sub-timing means in ms (stage-dependent keys) A top-level ``"meta"`` key holds image path, n_warmup, n_runs. Raises: FileNotFoundError: If ``image_path`` does not exist. ValueError: If ``stages`` contains a value outside [1, 2, 3]. """ if stages is None: stages = [1, 2, 3] bad = [s for s in stages if s not in (1, 2, 3)] if bad: raise ValueError(f"stages must be subset of {{1, 2, 3}}, got: {bad}") image_path = Path(image_path) if not image_path.exists(): raise FileNotFoundError(f"Image not found: {image_path}") frame_rgb = np.array(Image.open(image_path).convert("RGB")) # Load all models once before any timing starts. print(f"Loading pipeline models (force_model={force_model!r})...") pipeline = Pipeline(force_model=force_model) # Trigger lazy model loading now so first warmup isn't the load call. _preload_models(pipeline, stages, frame_rgb) results: dict = { "meta": { "image": str(image_path), "n_warmup": n_warmup, "n_runs": n_runs, } } for stage in stages: run_fn = _stage_runner(pipeline, stage, frame_rgb) print( f"\nStage {stage}: {n_warmup} warmup + {n_runs} timed runs...", flush=True, ) # ── Warmup ──────────────────────────────────────────────────────────── for _ in range(n_warmup): run_fn() # ── Timed runs ──────────────────────────────────────────────────────── if torch.cuda.is_available(): torch.cuda.reset_peak_memory_stats() wall_times_ms: list[float] = [] sub_timings: list[dict[str, float]] = [] for r in range(n_runs): if torch.cuda.is_available(): torch.cuda.synchronize() t0 = time.perf_counter() _, timing = run_fn() if torch.cuda.is_available(): torch.cuda.synchronize() elapsed_ms = (time.perf_counter() - t0) * 1000.0 wall_times_ms.append(elapsed_ms) sub_timings.append(timing) print(f" run {r + 1:2d}/{n_runs}: {elapsed_ms:7.1f} ms", flush=True) peak_vram_mb = ( torch.cuda.max_memory_allocated() / (1024 ** 2) if torch.cuda.is_available() else 0.0 ) arr = np.array(wall_times_ms) results[stage] = { "mean_ms": float(np.mean(arr)), "std_ms": float(np.std(arr)), "min_ms": float(np.min(arr)), "max_ms": float(np.max(arr)), "peak_vram_mb": peak_vram_mb, "breakdown": _mean_breakdown(sub_timings), } return results # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _preload_models( pipeline: Pipeline, stages: list[int], frame_rgb: np.ndarray, ) -> None: """Trigger lazy model loading with a single throwaway run per stage.""" print(" Preloading models (one throwaway run per stage)...", flush=True) if 1 in stages: pipeline.run_stage1(frame_rgb) if 2 in stages: pipeline.run_stage2(frame_rgb) if 3 in stages: pipeline.run_stage3(frame_rgb) print(" Models loaded.", flush=True) def _stage_runner(pipeline: Pipeline, stage: int, frame_rgb: np.ndarray): """Return a zero-argument callable that runs the requested stage. Returns ``(description, timing)`` so the caller can inspect sub-timings. Stage 2 and 3 return three values; we normalise to two here. """ if stage == 1: def _run(): desc, t = pipeline.run_stage1(frame_rgb) return desc, t elif stage == 2: def _run(): desc, _ctx, t = pipeline.run_stage2(frame_rgb) return desc, t else: def _run(): desc, _ctx, t = pipeline.run_stage3(frame_rgb) return desc, t return _run def _mean_breakdown(timings: list[dict[str, float]]) -> dict[str, float]: """Average each sub-timing key across all runs, converting to ms.""" if not timings: return {} keys = [k for k in timings[0] if k not in ("vram_mb", "n_detections")] result: dict[str, float] = {} for k in keys: vals = [t[k] * 1000.0 for t in timings if k in t] if vals: result[f"{k}_mean_ms"] = float(np.mean(vals)) return result # --------------------------------------------------------------------------- # CSV writer # --------------------------------------------------------------------------- _CSV_FIELDNAMES = [ "stage", "mean_ms", "std_ms", "min_ms", "max_ms", "peak_vram_mb", ] def write_csv(results: dict, output_csv: Path) -> None: """Write benchmark results to a CSV file. Args: results: Return value of :func:`benchmark`. output_csv: Destination path. """ output_csv.parent.mkdir(parents=True, exist_ok=True) rows = [] for stage in (1, 2, 3): if stage not in results: continue r = results[stage] rows.append({ "stage": stage, "mean_ms": round(r["mean_ms"], 2), "std_ms": round(r["std_ms"], 2), "min_ms": round(r["min_ms"], 2), "max_ms": round(r["max_ms"], 2), "peak_vram_mb": round(r["peak_vram_mb"], 1), }) with open(output_csv, "w", newline="", encoding="utf-8") as fh: writer = csv.DictWriter(fh, fieldnames=_CSV_FIELDNAMES) writer.writeheader() writer.writerows(rows) # --------------------------------------------------------------------------- # Pretty-print summary # --------------------------------------------------------------------------- def print_summary(results: dict) -> None: """Print a formatted three-stage comparison table to stdout. Highlights whether Stage 3 meets the <700 ms abstract target. Args: results: Return value of :func:`benchmark`. """ meta = results.get("meta", {}) n_runs = meta.get("n_runs", "?") image = Path(meta.get("image", "?")).name sep = "-" * 74 print(f"\n{sep}") print(f" Latency Benchmark — {image} ({n_runs} runs each)") print(sep) print( f" {'Stage':<22} {'Mean ms':>9} {'Std ms':>8} " f"{'Min ms':>8} {'Max ms':>8} {'Peak VRAM':>10}" ) print(sep) stage_labels = { 1: "Stage 1 (VLM only)", 2: "Stage 2 (VLM+Depth)", 3: "Stage 3 (VLM+Depth+YOLO)", } target_ms = 700.0 for stage in (1, 2, 3): if stage not in results: continue r = results[stage] tag = "" if stage == 3: tag = " ✓ <700ms" if r["mean_ms"] < target_ms else " ✗ >700ms" print( f" {stage_labels[stage]:<22} " f"{r['mean_ms']:>9.1f} " f"{r['std_ms']:>8.1f} " f"{r['min_ms']:>8.1f} " f"{r['max_ms']:>8.1f} " f"{r['peak_vram_mb']:>8.0f} MB" f"{tag}" ) print(sep) # Sub-timing breakdown for stage in (1, 2, 3): if stage not in results: continue bd = results[stage].get("breakdown", {}) if not bd: continue parts = " | ".join( f"{k.replace('_mean_ms', '')}: {v:.1f} ms" for k, v in sorted(bd.items()) ) print(f" S{stage} breakdown: {parts}") print(sep) # Abstract target verdict if 3 in results: mean3 = results[3]["mean_ms"] verdict = ( f"PASS ({mean3:.1f} ms < 700 ms)" if mean3 < target_ms else f"FAIL ({mean3:.1f} ms ≥ 700 ms)" ) print(f" Abstract target (Stage 3 < 700 ms): {verdict}") print(sep) # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def _parse_args(argv: list[str] | None = None) -> argparse.Namespace: p = argparse.ArgumentParser( description=( "Benchmark per-stage latency and peak VRAM. " "Headline target: Stage 3 total < 700 ms." ) ) p.add_argument( "--image", required=True, help="Path to the representative image to benchmark.", ) p.add_argument( "--output", default=None, help=( "Destination CSV " "(default: outputs/results/latency.csv)." ), ) p.add_argument( "--n-warmup", type=int, default=2, dest="n_warmup", help="Number of warmup runs before recording (default: 2).", ) p.add_argument( "--n-runs", type=int, default=N_LATENCY_RUNS, dest="n_runs", help=f"Number of timed runs to average (default: {N_LATENCY_RUNS}).", ) p.add_argument( "--stages", nargs="+", type=int, choices=[1, 2, 3], default=[1, 2, 3], help="Stages to benchmark (default: 1 2 3).", ) p.add_argument( "--force-model", choices=["moondream", "qwen"], default=None, dest="force_model", help="Override VRAM-based VLM selection.", ) return p.parse_args(argv) def main(argv: list[str] | None = None) -> None: """CLI entry point.""" args = _parse_args(argv) output_csv = Path(args.output) if args.output else RESULTS_DIR / "latency.csv" results = benchmark( image_path=Path(args.image), n_warmup=args.n_warmup, n_runs=args.n_runs, stages=sorted(set(args.stages)), force_model=args.force_model, ) print_summary(results) write_csv(results, output_csv) print(f"\n CSV written to: {output_csv}") if __name__ == "__main__": main()