#!/usr/bin/env python3 """Benchmark CPU vs GPU compression time for Kompress (v1/compress backend). Uses KompressCompressor directly — same code path as POST /v1/compress. Tests multiple payload sizes, reports mean/min/max per device. Usage: python benchmarks/bench_cpu_vs_gpu.py python benchmarks/bench_cpu_vs_gpu.py --runs 20 python benchmarks/bench_cpu_vs_gpu.py --markdown > BENCHMARK.md """ from __future__ import annotations import argparse import json import platform import statistics import sys import time from dataclasses import dataclass try: import torch except ImportError: torch = None # type: ignore[assignment] # revealed by torch.cuda.is_available check below @dataclass class BenchResult: device: str payload_label: str n_words: int runs: list[float] # ms per run compression_ratio: float @property def mean_ms(self) -> float: return statistics.mean(self.runs) @property def min_ms(self) -> float: return min(self.runs) @property def max_ms(self) -> float: return max(self.runs) @property def stddev_ms(self) -> float: return statistics.stdev(self.runs) if len(self.runs) > 1 else 0.0 @property def median_ms(self) -> float: return statistics.median(self.runs) def generate_payload(n_rows: int, seed: int = 42) -> str: """Generate a realistic compressible tool-output-style payload with n_rows entries. Uses varied content (not repeated templates) so Kompress has actual redundancy to compress. Simulates mixed tool outputs with headers, data rows, and metadata. Args: n_rows: Number of JSON rows in the payload. seed: Random seed for reproducibility. Fixed seed means identical payloads across runs — required for fair CPU vs GPU comparison on the same input. Override with --seed to study variance. """ import random random.seed(seed) lines = [] lines.append('{"tool":"search_files","status":"ok","count":%d,"results":[' % n_rows) for i in range(n_rows): path = ( f'"path":"src/services/payment_{i % 47}_handler.py"' if i % 3 == 0 else f'"path":"src/utils/helper_{i % 23}.py"' if i % 3 == 1 else f'"path":"tests/unit/test_{i % 31}.py"' ) size = 500 + (i * 37) % 9000 lang = '"python"' if i % 2 == 0 else '"typescript"' modified = f'"modified":"2024-01-{(i % 28) + 1:02d}"' lines.append( f' {{"type":"file",{path},"size":{size},"language":{lang},{modified}}},' ) lines.append("]}") return " ".join(" ".join(lines).split()) def benchmark_device( payload: str, device: str, n_runs: int, warmup: int = 2, ) -> BenchResult: """Run KompressCompressor on payload with given device, return timings.""" from headroom.transforms.kompress_compressor import ( KompressCompressor, KompressConfig, _kompress_cache, unload_kompress_model, ) # Clear model cache so each device loads fresh unload_kompress_model() config = KompressConfig(device=device) compressor = KompressCompressor(config=config) # Warmup runs (not recorded) for _ in range(warmup): compressor.compress(payload) # Timed runs runs = [] ratio = 1.0 for _ in range(n_runs): t0 = time.perf_counter() result = compressor.compress(payload) elapsed = (time.perf_counter() - t0) * 1000 runs.append(elapsed) ratio = result.compression_ratio # Cleanup unload_kompress_model() return BenchResult( device=device, payload_label="", n_words=len(payload.split()), runs=runs, compression_ratio=ratio, ) def benchmark_onnx( payload: str, providers: list[str], n_runs: int, warmup: int = 2, ) -> BenchResult: """Run KompressCompressor forcing ONNX INT8 backend with given providers. Builds the ONNX session directly (bypassing the cached helper) so we can pin the provider list to CPU only or CUDA+CPU without monkey- patching the module. Args: payload: Text to compress. providers: ONNX Runtime provider list, e.g. ["CPUExecutionProvider"] or ["CUDAExecutionProvider", "CPUExecutionProvider"]. n_runs: Number of timed runs. warmup: Number of warmup runs (not recorded). """ import onnxruntime as ort from huggingface_hub import hf_hub_download from transformers import AutoTokenizer from headroom.transforms.kompress_compressor import ( HF_MODEL_ID, _OnnxModel, _kompress_cache, _kompress_lock, ) from headroom.onnx_runtime import create_cpu_session_options # Build (or rebuild) the cached ONNX session with the requested providers with _kompress_lock: onnx_path = hf_hub_download(HF_MODEL_ID, "onnx/kompress-int8.onnx") using_gpu = "CUDAExecutionProvider" in providers sess_options = ( create_cpu_session_options(ort) if not using_gpu else ort.SessionOptions() ) session = ort.InferenceSession( onnx_path, sess_options, providers=providers ) model = _OnnxModel(session) tokenizer = AutoTokenizer.from_pretrained("answerdotai/ModernBERT-base") _kompress_cache[HF_MODEL_ID] = (model, tokenizer, "onnx") # Now compress using the cached model directly, mimicking KompressCompressor.compress config = {"chunk_words": 350} words = payload.split() n_words = len(words) # Rebind from cache to avoid closure-over-assignment scoping issue model_ref, tokenizer_ref, _backend = _kompress_cache[HF_MODEL_ID] def _run_once() -> float: kept_ids: set[int] = set() for chunk_start in range(0, n_words, config["chunk_words"]): chunk_words = words[chunk_start : chunk_start + config["chunk_words"]] encoding = tokenizer_ref( chunk_words, is_split_into_words=True, truncation=True, max_length=512, padding=True, return_tensors="np", ) input_ids = encoding["input_ids"] attention_mask = encoding["attention_mask"] word_ids = encoding.word_ids(batch_index=0) keep_mask = model_ref.get_keep_mask(input_ids, attention_mask) mask_list = keep_mask[0] for idx, wid in enumerate(word_ids): if wid is None: continue if bool(mask_list[idx]): kept_ids.add(wid + chunk_start) return len(kept_ids) / n_words if n_words else 1.0 # Warmup for _ in range(warmup): _run_once() # Timed runs runs = [] ratio = 1.0 for _ in range(n_runs): t0 = time.perf_counter() ratio = _run_once() elapsed = (time.perf_counter() - t0) * 1000 runs.append(elapsed) return BenchResult( device="+".join(p.replace("ExecutionProvider", "") for p in providers), payload_label="", n_words=n_words, runs=runs, compression_ratio=ratio, ) def _safe_unload_kompress() -> None: """Best-effort unload of cached Kompress model; never raises.""" try: from headroom.transforms.kompress_compressor import unload_kompress_model unload_kompress_model() except Exception: # noqa: BLE001 # cleanup must not propagate pass def _release_gpu_resources() -> None: """Release GPU resources (model cache + CUDA allocator); never raises. Called after each payload's GPU run so a failure on one payload doesn't strand allocations for subsequent ones. """ _safe_unload_kompress() if torch is not None and torch.cuda.is_available(): try: torch.cuda.empty_cache() except RuntimeError as cache_err: print( f" WARNING: torch.cuda.empty_cache() failed: {cache_err}", file=sys.stderr, ) def _run_gpu_benchmark( payload: str, label: str, n_runs: int, warmup: int, ) -> BenchResult: """Run GPU benchmark with ONNX CUDA → PyTorch FP32 fallback chain. Tries ONNX INT8 on CUDAExecutionProvider first. If that fails (e.g. cuDNN missing on Windows), falls back to PyTorch FP32 on CUDA. Raises RuntimeError if both paths fail. """ try: try: return benchmark_onnx( payload, ["CUDAExecutionProvider", "CPUExecutionProvider"], n_runs, warmup, ) except RuntimeError as onnx_err: print( f" NOTE: ONNX CUDA unavailable ({onnx_err}), falling back to PyTorch FP32", file=sys.stderr, ) try: return benchmark_device(payload, "cuda", n_runs, warmup) except torch.cuda.OutOfMemoryError as oom: raise RuntimeError( f"GPU benchmark failed for {label} — out of memory: {oom}. " f"Try a smaller payload or run `nvidia-smi` to free GPU memory." ) from oom finally: _release_gpu_resources() def print_markdown_report( results: list[tuple[BenchResult, BenchResult]], env_info: dict[str, str], ) -> str: """Generate markdown report from benchmark results.""" lines = [] lines.append("# CPU vs GPU Compression Benchmark (INT8)") lines.append("") lines.append("Kompress (ModernBERT) compression time comparison — ONNX INT8 on both sides.") lines.append("Same code path as `POST /v1/compress` via `KompressCompressor`.") lines.append("") lines.append("## Environment") lines.append("") for k, v in env_info.items(): lines.append(f"- **{k}**: {v}") lines.append("") lines.append("## Results") lines.append("") lines.append( "| Payload | Device | Words | Mean (ms) | Median (ms) | Min (ms) | Max (ms) |" " StdDev (ms) | Compression | Speedup |" ) lines.append( "|---------|--------|-------|-----------|-------------|----------|----------|" "-------------|-------------|---------|" ) for cpu_r, gpu_r in results: label = cpu_r.payload_label ratio = cpu_r.mean_ms / gpu_r.mean_ms if gpu_r.mean_ms > 0 else float("inf") speedup_str = f"**{ratio:.2f}x**" if gpu_r.mean_ms > 0 else "—" lines.append( f"| {label} | CPU INT8 | {cpu_r.n_words:,} | {cpu_r.mean_ms:.1f} | " f"{cpu_r.median_ms:.1f} | {cpu_r.min_ms:.1f} | {cpu_r.max_ms:.1f} | " f"{cpu_r.stddev_ms:.1f} | {1 - cpu_r.compression_ratio:.1%} | — |" ) lines.append( f"| {label} | GPU INT8 | {gpu_r.n_words:,} | {gpu_r.mean_ms:.1f} | " f"{gpu_r.median_ms:.1f} | {gpu_r.min_ms:.1f} | {gpu_r.max_ms:.1f} | " f"{gpu_r.stddev_ms:.1f} | {1 - gpu_r.compression_ratio:.1%} | " f"{speedup_str} |" ) lines.append("") lines.append("## Summary") lines.append("") speedups = [] for cpu_r, gpu_r in results: if gpu_r.mean_ms > 0: speedups.append(cpu_r.mean_ms / gpu_r.mean_ms) if speedups: lines.append(f"- **Mean speedup**: {statistics.mean(speedups):.2f}x") lines.append(f"- **Best speedup**: {max(speedups):.2f}x") lines.append(f"- **Worst speedup**: {min(speedups):.2f}x") lines.append("") return "\n".join(lines) def main() -> None: parser = argparse.ArgumentParser(description="CPU vs GPU Kompress benchmark") parser.add_argument( "--runs", type=int, default=10, help="Timed runs per config (positive integer)", ) parser.add_argument( "--warmup", type=int, default=2, help="Warmup runs not recorded (positive integer)", ) parser.add_argument( "--markdown", action="store_true", help="Output full markdown report" ) parser.add_argument( "--gpu", type=int, default=0, help="CUDA device index to benchmark (0-based, default 0)", ) args = parser.parse_args() if args.runs <= 0: raise ValueError("--runs must be a positive integer") if args.warmup < 0: raise ValueError("--warmup must be a non-negative integer") if torch is None: raise RuntimeError("PyTorch not installed. Install with: pip install torch") if not torch.cuda.is_available(): raise RuntimeError( "CUDA not available. GPU benchmark requires CUDA-enabled PyTorch." ) device_count = torch.cuda.device_count() if device_count == 0: raise RuntimeError("No CUDA devices found.") gpu_idx = args.gpu if gpu_idx < 0 or gpu_idx >= device_count: raise RuntimeError( f"--gpu {gpu_idx} out of range; valid indices: 0..{device_count - 1}" ) # Validate GPU has enough free memory for the Kompress model. # Model is ~50MB safetensors + ~200MB ModernBERT runtime; require # 1GB free to leave headroom for activations across batch sizes. MIN_FREE_GPU_MEM_MIB = 1024 try: free_mem, total_mem = torch.cuda.mem_get_info(gpu_idx) except (RuntimeError, AssertionError) as e: raise RuntimeError( f"Could not query memory for GPU {gpu_idx}: {e}. " f"Device may be in MIG/MPS mode or otherwise unavailable." ) from e free_mib = free_mem // (1024 * 1024) total_mib = total_mem // (1024 * 1024) if free_mib < MIN_FREE_GPU_MEM_MIB: raise RuntimeError( f"GPU {gpu_idx} has only {free_mib} MiB free, need " f">= {MIN_FREE_GPU_MEM_MIB} MiB for Kompress model. " f"Free up GPU memory or pick a different device with --gpu N." ) if device_count > 1: print( f"NOTE: {device_count} CUDA devices detected, using device {gpu_idx}. " f"Pass --gpu N to select a different device.", file=sys.stderr, ) gpu_name = torch.cuda.get_device_name(gpu_idx) gpu_mem = total_mib env_info = { "CPU": platform.processor() or "Unknown", "GPU": f"{gpu_name} ({gpu_mem} MiB)", "PyTorch": torch.__version__, "CUDA": torch.version.cuda or "N/A", "Python": platform.python_version(), "OS": f"{platform.system()} {platform.release()}", "Runs per config": str(args.runs), "Warmup runs": str(args.warmup), } payload_sizes = [ ("Small (20 rows)", 20), ("Medium (100 rows)", 100), ("Large (300 rows)", 300), ("XLarge (500 rows)", 500), ] results: list[tuple[BenchResult, BenchResult]] = [] for label, n_rows in payload_sizes: payload = generate_payload(n_rows) print(f"\n--- {label} ({n_rows} rows) ---", file=sys.stderr) # CPU benchmark — ONNX INT8 on CPUExecutionProvider (quantized) print(f" CPU (ONNX INT8)...", file=sys.stderr) try: cpu_result = benchmark_onnx( payload, ["CPUExecutionProvider"], args.runs, args.warmup ) except RuntimeError as e: raise RuntimeError( f"CPU benchmark failed for {label}: {e}" ) from e finally: _safe_unload_kompress() cpu_result.payload_label = label print( f" CPU: mean={cpu_result.mean_ms:.1f}ms median={cpu_result.median_ms:.1f}ms", file=sys.stderr, ) # GPU benchmark — ONNX INT8 on CUDAExecutionProvider (quantized) # Fall back to PyTorch FP32 if ONNX CUDA is not available. print(f" GPU (ONNX INT8)...", file=sys.stderr) gpu_result = _run_gpu_benchmark(payload, label, args.runs, args.warmup) gpu_result.payload_label = label speedup = cpu_result.mean_ms / gpu_result.mean_ms if gpu_result.mean_ms > 0 else float("inf") print( f" GPU: mean={gpu_result.mean_ms:.1f}ms median={gpu_result.median_ms:.1f}ms " f"speedup={speedup:.2f}x", file=sys.stderr, ) results.append((cpu_result, gpu_result)) report = print_markdown_report(results, env_info) if args.markdown: print(report) else: # Print summary table to stderr, write full report to BENCHMARK.md print(report, file=sys.stderr) out_path = "BENCHMARK.md" try: with open(out_path, "w", encoding="utf-8") as f: f.write(report) f.write("\n") except OSError as e: raise RuntimeError( f"Failed to write {out_path}: {e}. Check disk space and permissions." ) from e print(f"\nReport written to {out_path}", file=sys.stderr) if __name__ == "__main__": main()