Spaces:
Sleeping
Sleeping
| """ | |
| Per-stage latency and peak VRAM benchmark. | |
| Measures wall-clock latency and peak GPU memory across Stage 1, Stage 2, | |
| and Stage 3 on a single representative image. Warmup runs prime CUDA caches | |
| so the recorded runs reflect steady-state inference, not first-call overhead. | |
| The headline target for the abstract: Stage 3 total latency < 700 ms. | |
| Typical usage (library):: | |
| from src.evaluation.latency_benchmark import benchmark | |
| results = benchmark(Path("data/test_images/arkit_41159529_0000.jpg")) | |
| CLI usage:: | |
| python -m src.evaluation.latency_benchmark \\ | |
| --image data/test_images/arkit_41159529_0000.jpg \\ | |
| --output outputs/results/latency.csv | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import csv | |
| import time | |
| from pathlib import Path | |
| from typing import Optional | |
| import numpy as np | |
| import torch | |
| from PIL import Image | |
| from ..config import N_LATENCY_RUNS, RESULTS_DIR | |
| from ..pipeline import Pipeline | |
| # --------------------------------------------------------------------------- | |
| # Core benchmark function | |
| # --------------------------------------------------------------------------- | |
| def benchmark( | |
| image_path: Path, | |
| n_warmup: int = 2, | |
| n_runs: int = 10, | |
| stages: list[int] | None = None, | |
| force_model: Optional[str] = None, | |
| ) -> dict: | |
| """Measure per-stage latency and peak VRAM on a single image. | |
| For each requested stage: | |
| 1. Run ``n_warmup`` times (results discarded) to prime CUDA kernel | |
| caches and avoid measuring JIT / first-call overhead. | |
| 2. Reset peak VRAM counter. | |
| 3. Run ``n_runs`` times, recording wall-clock time bracketed by | |
| ``torch.cuda.synchronize()`` calls so the timer measures only | |
| completed GPU work. | |
| 4. Capture ``torch.cuda.max_memory_allocated()`` as peak VRAM. | |
| All models are loaded once before the warmup loop and reused across | |
| stages, matching real deployment behaviour. | |
| Args: | |
| image_path: Path to the image file to benchmark. | |
| n_warmup: Number of throwaway runs before recording. Default 2. | |
| n_runs: Number of timed runs to average over. Default 10. | |
| stages: List of stage numbers to benchmark (subset of [1, 2, 3]). | |
| Defaults to ``[1, 2, 3]``. | |
| force_model: Passed to Pipeline — ``"moondream"`` or ``"qwen"`` to | |
| override the VRAM-based auto-selection. | |
| Returns: | |
| Dict keyed by stage number (int). Each value is a dict with:: | |
| mean_ms — mean wall-clock latency in milliseconds | |
| std_ms — standard deviation of latency | |
| min_ms — minimum observed latency | |
| max_ms — maximum observed latency | |
| peak_vram_mb — peak CUDA memory allocated during timed runs (MB) | |
| breakdown — sub-timing means in ms (stage-dependent keys) | |
| A top-level ``"meta"`` key holds image path, n_warmup, n_runs. | |
| Raises: | |
| FileNotFoundError: If ``image_path`` does not exist. | |
| ValueError: If ``stages`` contains a value outside [1, 2, 3]. | |
| """ | |
| if stages is None: | |
| stages = [1, 2, 3] | |
| bad = [s for s in stages if s not in (1, 2, 3)] | |
| if bad: | |
| raise ValueError(f"stages must be subset of {{1, 2, 3}}, got: {bad}") | |
| image_path = Path(image_path) | |
| if not image_path.exists(): | |
| raise FileNotFoundError(f"Image not found: {image_path}") | |
| frame_rgb = np.array(Image.open(image_path).convert("RGB")) | |
| # Load all models once before any timing starts. | |
| print(f"Loading pipeline models (force_model={force_model!r})...") | |
| pipeline = Pipeline(force_model=force_model) | |
| # Trigger lazy model loading now so first warmup isn't the load call. | |
| _preload_models(pipeline, stages, frame_rgb) | |
| results: dict = { | |
| "meta": { | |
| "image": str(image_path), | |
| "n_warmup": n_warmup, | |
| "n_runs": n_runs, | |
| } | |
| } | |
| for stage in stages: | |
| run_fn = _stage_runner(pipeline, stage, frame_rgb) | |
| print( | |
| f"\nStage {stage}: {n_warmup} warmup + {n_runs} timed runs...", | |
| flush=True, | |
| ) | |
| # ── Warmup ──────────────────────────────────────────────────────────── | |
| for _ in range(n_warmup): | |
| run_fn() | |
| # ── Timed runs ──────────────────────────────────────────────────────── | |
| if torch.cuda.is_available(): | |
| torch.cuda.reset_peak_memory_stats() | |
| wall_times_ms: list[float] = [] | |
| sub_timings: list[dict[str, float]] = [] | |
| for r in range(n_runs): | |
| if torch.cuda.is_available(): | |
| torch.cuda.synchronize() | |
| t0 = time.perf_counter() | |
| _, timing = run_fn() | |
| if torch.cuda.is_available(): | |
| torch.cuda.synchronize() | |
| elapsed_ms = (time.perf_counter() - t0) * 1000.0 | |
| wall_times_ms.append(elapsed_ms) | |
| sub_timings.append(timing) | |
| print(f" run {r + 1:2d}/{n_runs}: {elapsed_ms:7.1f} ms", flush=True) | |
| peak_vram_mb = ( | |
| torch.cuda.max_memory_allocated() / (1024 ** 2) | |
| if torch.cuda.is_available() | |
| else 0.0 | |
| ) | |
| arr = np.array(wall_times_ms) | |
| results[stage] = { | |
| "mean_ms": float(np.mean(arr)), | |
| "std_ms": float(np.std(arr)), | |
| "min_ms": float(np.min(arr)), | |
| "max_ms": float(np.max(arr)), | |
| "peak_vram_mb": peak_vram_mb, | |
| "breakdown": _mean_breakdown(sub_timings), | |
| } | |
| return results | |
| # --------------------------------------------------------------------------- | |
| # Helpers | |
| # --------------------------------------------------------------------------- | |
| def _preload_models( | |
| pipeline: Pipeline, | |
| stages: list[int], | |
| frame_rgb: np.ndarray, | |
| ) -> None: | |
| """Trigger lazy model loading with a single throwaway run per stage.""" | |
| print(" Preloading models (one throwaway run per stage)...", flush=True) | |
| if 1 in stages: | |
| pipeline.run_stage1(frame_rgb) | |
| if 2 in stages: | |
| pipeline.run_stage2(frame_rgb) | |
| if 3 in stages: | |
| pipeline.run_stage3(frame_rgb) | |
| print(" Models loaded.", flush=True) | |
| def _stage_runner(pipeline: Pipeline, stage: int, frame_rgb: np.ndarray): | |
| """Return a zero-argument callable that runs the requested stage. | |
| Returns ``(description, timing)`` so the caller can inspect sub-timings. | |
| Stage 2 and 3 return three values; we normalise to two here. | |
| """ | |
| if stage == 1: | |
| def _run(): | |
| desc, t = pipeline.run_stage1(frame_rgb) | |
| return desc, t | |
| elif stage == 2: | |
| def _run(): | |
| desc, _ctx, t = pipeline.run_stage2(frame_rgb) | |
| return desc, t | |
| else: | |
| def _run(): | |
| desc, _ctx, t = pipeline.run_stage3(frame_rgb) | |
| return desc, t | |
| return _run | |
| def _mean_breakdown(timings: list[dict[str, float]]) -> dict[str, float]: | |
| """Average each sub-timing key across all runs, converting to ms.""" | |
| if not timings: | |
| return {} | |
| keys = [k for k in timings[0] if k not in ("vram_mb", "n_detections")] | |
| result: dict[str, float] = {} | |
| for k in keys: | |
| vals = [t[k] * 1000.0 for t in timings if k in t] | |
| if vals: | |
| result[f"{k}_mean_ms"] = float(np.mean(vals)) | |
| return result | |
| # --------------------------------------------------------------------------- | |
| # CSV writer | |
| # --------------------------------------------------------------------------- | |
| _CSV_FIELDNAMES = [ | |
| "stage", "mean_ms", "std_ms", "min_ms", "max_ms", "peak_vram_mb", | |
| ] | |
| def write_csv(results: dict, output_csv: Path) -> None: | |
| """Write benchmark results to a CSV file. | |
| Args: | |
| results: Return value of :func:`benchmark`. | |
| output_csv: Destination path. | |
| """ | |
| output_csv.parent.mkdir(parents=True, exist_ok=True) | |
| rows = [] | |
| for stage in (1, 2, 3): | |
| if stage not in results: | |
| continue | |
| r = results[stage] | |
| rows.append({ | |
| "stage": stage, | |
| "mean_ms": round(r["mean_ms"], 2), | |
| "std_ms": round(r["std_ms"], 2), | |
| "min_ms": round(r["min_ms"], 2), | |
| "max_ms": round(r["max_ms"], 2), | |
| "peak_vram_mb": round(r["peak_vram_mb"], 1), | |
| }) | |
| with open(output_csv, "w", newline="", encoding="utf-8") as fh: | |
| writer = csv.DictWriter(fh, fieldnames=_CSV_FIELDNAMES) | |
| writer.writeheader() | |
| writer.writerows(rows) | |
| # --------------------------------------------------------------------------- | |
| # Pretty-print summary | |
| # --------------------------------------------------------------------------- | |
| def print_summary(results: dict) -> None: | |
| """Print a formatted three-stage comparison table to stdout. | |
| Highlights whether Stage 3 meets the <700 ms abstract target. | |
| Args: | |
| results: Return value of :func:`benchmark`. | |
| """ | |
| meta = results.get("meta", {}) | |
| n_runs = meta.get("n_runs", "?") | |
| image = Path(meta.get("image", "?")).name | |
| sep = "-" * 74 | |
| print(f"\n{sep}") | |
| print(f" Latency Benchmark — {image} ({n_runs} runs each)") | |
| print(sep) | |
| print( | |
| f" {'Stage':<22} {'Mean ms':>9} {'Std ms':>8} " | |
| f"{'Min ms':>8} {'Max ms':>8} {'Peak VRAM':>10}" | |
| ) | |
| print(sep) | |
| stage_labels = { | |
| 1: "Stage 1 (VLM only)", | |
| 2: "Stage 2 (VLM+Depth)", | |
| 3: "Stage 3 (VLM+Depth+YOLO)", | |
| } | |
| target_ms = 700.0 | |
| for stage in (1, 2, 3): | |
| if stage not in results: | |
| continue | |
| r = results[stage] | |
| tag = "" | |
| if stage == 3: | |
| tag = " ✓ <700ms" if r["mean_ms"] < target_ms else " ✗ >700ms" | |
| print( | |
| f" {stage_labels[stage]:<22} " | |
| f"{r['mean_ms']:>9.1f} " | |
| f"{r['std_ms']:>8.1f} " | |
| f"{r['min_ms']:>8.1f} " | |
| f"{r['max_ms']:>8.1f} " | |
| f"{r['peak_vram_mb']:>8.0f} MB" | |
| f"{tag}" | |
| ) | |
| print(sep) | |
| # Sub-timing breakdown | |
| for stage in (1, 2, 3): | |
| if stage not in results: | |
| continue | |
| bd = results[stage].get("breakdown", {}) | |
| if not bd: | |
| continue | |
| parts = " | ".join( | |
| f"{k.replace('_mean_ms', '')}: {v:.1f} ms" | |
| for k, v in sorted(bd.items()) | |
| ) | |
| print(f" S{stage} breakdown: {parts}") | |
| print(sep) | |
| # Abstract target verdict | |
| if 3 in results: | |
| mean3 = results[3]["mean_ms"] | |
| verdict = ( | |
| f"PASS ({mean3:.1f} ms < 700 ms)" | |
| if mean3 < target_ms | |
| else f"FAIL ({mean3:.1f} ms ≥ 700 ms)" | |
| ) | |
| print(f" Abstract target (Stage 3 < 700 ms): {verdict}") | |
| print(sep) | |
| # --------------------------------------------------------------------------- | |
| # CLI | |
| # --------------------------------------------------------------------------- | |
| def _parse_args(argv: list[str] | None = None) -> argparse.Namespace: | |
| p = argparse.ArgumentParser( | |
| description=( | |
| "Benchmark per-stage latency and peak VRAM. " | |
| "Headline target: Stage 3 total < 700 ms." | |
| ) | |
| ) | |
| p.add_argument( | |
| "--image", | |
| required=True, | |
| help="Path to the representative image to benchmark.", | |
| ) | |
| p.add_argument( | |
| "--output", | |
| default=None, | |
| help=( | |
| "Destination CSV " | |
| "(default: outputs/results/latency.csv)." | |
| ), | |
| ) | |
| p.add_argument( | |
| "--n-warmup", | |
| type=int, | |
| default=2, | |
| dest="n_warmup", | |
| help="Number of warmup runs before recording (default: 2).", | |
| ) | |
| p.add_argument( | |
| "--n-runs", | |
| type=int, | |
| default=N_LATENCY_RUNS, | |
| dest="n_runs", | |
| help=f"Number of timed runs to average (default: {N_LATENCY_RUNS}).", | |
| ) | |
| p.add_argument( | |
| "--stages", | |
| nargs="+", | |
| type=int, | |
| choices=[1, 2, 3], | |
| default=[1, 2, 3], | |
| help="Stages to benchmark (default: 1 2 3).", | |
| ) | |
| p.add_argument( | |
| "--force-model", | |
| choices=["moondream", "qwen"], | |
| default=None, | |
| dest="force_model", | |
| help="Override VRAM-based VLM selection.", | |
| ) | |
| return p.parse_args(argv) | |
| def main(argv: list[str] | None = None) -> None: | |
| """CLI entry point.""" | |
| args = _parse_args(argv) | |
| output_csv = Path(args.output) if args.output else RESULTS_DIR / "latency.csv" | |
| results = benchmark( | |
| image_path=Path(args.image), | |
| n_warmup=args.n_warmup, | |
| n_runs=args.n_runs, | |
| stages=sorted(set(args.stages)), | |
| force_model=args.force_model, | |
| ) | |
| print_summary(results) | |
| write_csv(results, output_csv) | |
| print(f"\n CSV written to: {output_csv}") | |
| if __name__ == "__main__": | |
| main() | |