File size: 13,137 Bytes
5412d82
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
"""
Per-stage latency and peak VRAM benchmark.

Measures wall-clock latency and peak GPU memory across Stage 1, Stage 2,
and Stage 3 on a single representative image.  Warmup runs prime CUDA caches
so the recorded runs reflect steady-state inference, not first-call overhead.

The headline target for the abstract: Stage 3 total latency < 700 ms.

Typical usage (library)::

    from src.evaluation.latency_benchmark import benchmark
    results = benchmark(Path("data/test_images/arkit_41159529_0000.jpg"))

CLI usage::

    python -m src.evaluation.latency_benchmark \\
        --image data/test_images/arkit_41159529_0000.jpg \\
        --output outputs/results/latency.csv
"""

from __future__ import annotations

import argparse
import csv
import time
from pathlib import Path
from typing import Optional

import numpy as np
import torch
from PIL import Image

from ..config import N_LATENCY_RUNS, RESULTS_DIR
from ..pipeline import Pipeline


# ---------------------------------------------------------------------------
# Core benchmark function
# ---------------------------------------------------------------------------

def benchmark(
    image_path: Path,
    n_warmup: int = 2,
    n_runs: int = 10,
    stages: list[int] | None = None,
    force_model: Optional[str] = None,
) -> dict:
    """Measure per-stage latency and peak VRAM on a single image.

    For each requested stage:
      1. Run ``n_warmup`` times (results discarded) to prime CUDA kernel
         caches and avoid measuring JIT / first-call overhead.
      2. Reset peak VRAM counter.
      3. Run ``n_runs`` times, recording wall-clock time bracketed by
         ``torch.cuda.synchronize()`` calls so the timer measures only
         completed GPU work.
      4. Capture ``torch.cuda.max_memory_allocated()`` as peak VRAM.

    All models are loaded once before the warmup loop and reused across
    stages, matching real deployment behaviour.

    Args:
        image_path:  Path to the image file to benchmark.
        n_warmup:    Number of throwaway runs before recording.  Default 2.
        n_runs:      Number of timed runs to average over.  Default 10.
        stages:      List of stage numbers to benchmark (subset of [1, 2, 3]).
                     Defaults to ``[1, 2, 3]``.
        force_model: Passed to Pipeline β€” ``"moondream"`` or ``"qwen"`` to
                     override the VRAM-based auto-selection.

    Returns:
        Dict keyed by stage number (int).  Each value is a dict with::

            mean_ms      β€” mean wall-clock latency in milliseconds
            std_ms       β€” standard deviation of latency
            min_ms       β€” minimum observed latency
            max_ms       β€” maximum observed latency
            peak_vram_mb β€” peak CUDA memory allocated during timed runs (MB)
            breakdown    β€” sub-timing means in ms (stage-dependent keys)

        A top-level ``"meta"`` key holds image path, n_warmup, n_runs.

    Raises:
        FileNotFoundError: If ``image_path`` does not exist.
        ValueError: If ``stages`` contains a value outside [1, 2, 3].
    """
    if stages is None:
        stages = [1, 2, 3]

    bad = [s for s in stages if s not in (1, 2, 3)]
    if bad:
        raise ValueError(f"stages must be subset of {{1, 2, 3}}, got: {bad}")

    image_path = Path(image_path)
    if not image_path.exists():
        raise FileNotFoundError(f"Image not found: {image_path}")

    frame_rgb = np.array(Image.open(image_path).convert("RGB"))

    # Load all models once before any timing starts.
    print(f"Loading pipeline models (force_model={force_model!r})...")
    pipeline = Pipeline(force_model=force_model)
    # Trigger lazy model loading now so first warmup isn't the load call.
    _preload_models(pipeline, stages, frame_rgb)

    results: dict = {
        "meta": {
            "image": str(image_path),
            "n_warmup": n_warmup,
            "n_runs": n_runs,
        }
    }

    for stage in stages:
        run_fn = _stage_runner(pipeline, stage, frame_rgb)
        print(
            f"\nStage {stage}: {n_warmup} warmup + {n_runs} timed runs...",
            flush=True,
        )

        # ── Warmup ────────────────────────────────────────────────────────────
        for _ in range(n_warmup):
            run_fn()

        # ── Timed runs ────────────────────────────────────────────────────────
        if torch.cuda.is_available():
            torch.cuda.reset_peak_memory_stats()

        wall_times_ms: list[float] = []
        sub_timings: list[dict[str, float]] = []

        for r in range(n_runs):
            if torch.cuda.is_available():
                torch.cuda.synchronize()
            t0 = time.perf_counter()

            _, timing = run_fn()

            if torch.cuda.is_available():
                torch.cuda.synchronize()
            elapsed_ms = (time.perf_counter() - t0) * 1000.0

            wall_times_ms.append(elapsed_ms)
            sub_timings.append(timing)
            print(f"  run {r + 1:2d}/{n_runs}: {elapsed_ms:7.1f} ms", flush=True)

        peak_vram_mb = (
            torch.cuda.max_memory_allocated() / (1024 ** 2)
            if torch.cuda.is_available()
            else 0.0
        )

        arr = np.array(wall_times_ms)
        results[stage] = {
            "mean_ms":      float(np.mean(arr)),
            "std_ms":       float(np.std(arr)),
            "min_ms":       float(np.min(arr)),
            "max_ms":       float(np.max(arr)),
            "peak_vram_mb": peak_vram_mb,
            "breakdown":    _mean_breakdown(sub_timings),
        }

    return results


# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------

def _preload_models(
    pipeline: Pipeline,
    stages: list[int],
    frame_rgb: np.ndarray,
) -> None:
    """Trigger lazy model loading with a single throwaway run per stage."""
    print("  Preloading models (one throwaway run per stage)...", flush=True)
    if 1 in stages:
        pipeline.run_stage1(frame_rgb)
    if 2 in stages:
        pipeline.run_stage2(frame_rgb)
    if 3 in stages:
        pipeline.run_stage3(frame_rgb)
    print("  Models loaded.", flush=True)


def _stage_runner(pipeline: Pipeline, stage: int, frame_rgb: np.ndarray):
    """Return a zero-argument callable that runs the requested stage.

    Returns ``(description, timing)`` so the caller can inspect sub-timings.
    Stage 2 and 3 return three values; we normalise to two here.
    """
    if stage == 1:
        def _run():
            desc, t = pipeline.run_stage1(frame_rgb)
            return desc, t
    elif stage == 2:
        def _run():
            desc, _ctx, t = pipeline.run_stage2(frame_rgb)
            return desc, t
    else:
        def _run():
            desc, _ctx, t = pipeline.run_stage3(frame_rgb)
            return desc, t
    return _run


def _mean_breakdown(timings: list[dict[str, float]]) -> dict[str, float]:
    """Average each sub-timing key across all runs, converting to ms."""
    if not timings:
        return {}
    keys = [k for k in timings[0] if k not in ("vram_mb", "n_detections")]
    result: dict[str, float] = {}
    for k in keys:
        vals = [t[k] * 1000.0 for t in timings if k in t]
        if vals:
            result[f"{k}_mean_ms"] = float(np.mean(vals))
    return result


# ---------------------------------------------------------------------------
# CSV writer
# ---------------------------------------------------------------------------

_CSV_FIELDNAMES = [
    "stage", "mean_ms", "std_ms", "min_ms", "max_ms", "peak_vram_mb",
]


def write_csv(results: dict, output_csv: Path) -> None:
    """Write benchmark results to a CSV file.

    Args:
        results:    Return value of :func:`benchmark`.
        output_csv: Destination path.
    """
    output_csv.parent.mkdir(parents=True, exist_ok=True)
    rows = []
    for stage in (1, 2, 3):
        if stage not in results:
            continue
        r = results[stage]
        rows.append({
            "stage":        stage,
            "mean_ms":      round(r["mean_ms"], 2),
            "std_ms":       round(r["std_ms"], 2),
            "min_ms":       round(r["min_ms"], 2),
            "max_ms":       round(r["max_ms"], 2),
            "peak_vram_mb": round(r["peak_vram_mb"], 1),
        })
    with open(output_csv, "w", newline="", encoding="utf-8") as fh:
        writer = csv.DictWriter(fh, fieldnames=_CSV_FIELDNAMES)
        writer.writeheader()
        writer.writerows(rows)


# ---------------------------------------------------------------------------
# Pretty-print summary
# ---------------------------------------------------------------------------

def print_summary(results: dict) -> None:
    """Print a formatted three-stage comparison table to stdout.

    Highlights whether Stage 3 meets the <700 ms abstract target.

    Args:
        results: Return value of :func:`benchmark`.
    """
    meta = results.get("meta", {})
    n_runs = meta.get("n_runs", "?")
    image = Path(meta.get("image", "?")).name
    sep = "-" * 74

    print(f"\n{sep}")
    print(f"  Latency Benchmark  β€”  {image}  ({n_runs} runs each)")
    print(sep)
    print(
        f"  {'Stage':<22}  {'Mean ms':>9}  {'Std ms':>8}  "
        f"{'Min ms':>8}  {'Max ms':>8}  {'Peak VRAM':>10}"
    )
    print(sep)

    stage_labels = {
        1: "Stage 1  (VLM only)",
        2: "Stage 2  (VLM+Depth)",
        3: "Stage 3  (VLM+Depth+YOLO)",
    }
    target_ms = 700.0

    for stage in (1, 2, 3):
        if stage not in results:
            continue
        r = results[stage]
        tag = ""
        if stage == 3:
            tag = "  βœ“ <700ms" if r["mean_ms"] < target_ms else "  βœ— >700ms"
        print(
            f"  {stage_labels[stage]:<22}  "
            f"{r['mean_ms']:>9.1f}  "
            f"{r['std_ms']:>8.1f}  "
            f"{r['min_ms']:>8.1f}  "
            f"{r['max_ms']:>8.1f}  "
            f"{r['peak_vram_mb']:>8.0f} MB"
            f"{tag}"
        )

    print(sep)

    # Sub-timing breakdown
    for stage in (1, 2, 3):
        if stage not in results:
            continue
        bd = results[stage].get("breakdown", {})
        if not bd:
            continue
        parts = "  |  ".join(
            f"{k.replace('_mean_ms', '')}: {v:.1f} ms"
            for k, v in sorted(bd.items())
        )
        print(f"  S{stage} breakdown: {parts}")

    print(sep)

    # Abstract target verdict
    if 3 in results:
        mean3 = results[3]["mean_ms"]
        verdict = (
            f"PASS ({mean3:.1f} ms < 700 ms)"
            if mean3 < target_ms
            else f"FAIL ({mean3:.1f} ms β‰₯ 700 ms)"
        )
        print(f"  Abstract target (Stage 3 < 700 ms):  {verdict}")
        print(sep)


# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------

def _parse_args(argv: list[str] | None = None) -> argparse.Namespace:
    p = argparse.ArgumentParser(
        description=(
            "Benchmark per-stage latency and peak VRAM. "
            "Headline target: Stage 3 total < 700 ms."
        )
    )
    p.add_argument(
        "--image",
        required=True,
        help="Path to the representative image to benchmark.",
    )
    p.add_argument(
        "--output",
        default=None,
        help=(
            "Destination CSV "
            "(default: outputs/results/latency.csv)."
        ),
    )
    p.add_argument(
        "--n-warmup",
        type=int,
        default=2,
        dest="n_warmup",
        help="Number of warmup runs before recording (default: 2).",
    )
    p.add_argument(
        "--n-runs",
        type=int,
        default=N_LATENCY_RUNS,
        dest="n_runs",
        help=f"Number of timed runs to average (default: {N_LATENCY_RUNS}).",
    )
    p.add_argument(
        "--stages",
        nargs="+",
        type=int,
        choices=[1, 2, 3],
        default=[1, 2, 3],
        help="Stages to benchmark (default: 1 2 3).",
    )
    p.add_argument(
        "--force-model",
        choices=["moondream", "qwen"],
        default=None,
        dest="force_model",
        help="Override VRAM-based VLM selection.",
    )
    return p.parse_args(argv)


def main(argv: list[str] | None = None) -> None:
    """CLI entry point."""
    args = _parse_args(argv)
    output_csv = Path(args.output) if args.output else RESULTS_DIR / "latency.csv"

    results = benchmark(
        image_path=Path(args.image),
        n_warmup=args.n_warmup,
        n_runs=args.n_runs,
        stages=sorted(set(args.stages)),
        force_model=args.force_model,
    )

    print_summary(results)
    write_csv(results, output_csv)
    print(f"\n  CSV written to: {output_csv}")


if __name__ == "__main__":
    main()