headroom / benchmarks /bench_cpu_vs_gpu.py
tudragon154203
benchmark: add ONNX INT8 CPU vs GPU comparison to KompressCompressor
7a69c29
#!/usr/bin/env python3
"""Benchmark CPU vs GPU compression time for Kompress (v1/compress backend).
Uses KompressCompressor directly — same code path as POST /v1/compress.
Tests multiple payload sizes, reports mean/min/max per device.
Usage:
python benchmarks/bench_cpu_vs_gpu.py
python benchmarks/bench_cpu_vs_gpu.py --runs 20
python benchmarks/bench_cpu_vs_gpu.py --markdown > BENCHMARK.md
"""
from __future__ import annotations
import argparse
import json
import platform
import statistics
import sys
import time
from dataclasses import dataclass
try:
import torch
except ImportError:
torch = None # type: ignore[assignment] # revealed by torch.cuda.is_available check below
@dataclass
class BenchResult:
device: str
payload_label: str
n_words: int
runs: list[float] # ms per run
compression_ratio: float
@property
def mean_ms(self) -> float:
return statistics.mean(self.runs)
@property
def min_ms(self) -> float:
return min(self.runs)
@property
def max_ms(self) -> float:
return max(self.runs)
@property
def stddev_ms(self) -> float:
return statistics.stdev(self.runs) if len(self.runs) > 1 else 0.0
@property
def median_ms(self) -> float:
return statistics.median(self.runs)
def generate_payload(n_rows: int, seed: int = 42) -> str:
"""Generate a realistic compressible tool-output-style payload with n_rows entries.
Uses varied content (not repeated templates) so Kompress has actual
redundancy to compress. Simulates mixed tool outputs with headers,
data rows, and metadata.
Args:
n_rows: Number of JSON rows in the payload.
seed: Random seed for reproducibility. Fixed seed means identical
payloads across runs — required for fair CPU vs GPU comparison
on the same input. Override with --seed to study variance.
"""
import random
random.seed(seed)
lines = []
lines.append('{"tool":"search_files","status":"ok","count":%d,"results":[' % n_rows)
for i in range(n_rows):
path = (
f'"path":"src/services/payment_{i % 47}_handler.py"'
if i % 3 == 0
else f'"path":"src/utils/helper_{i % 23}.py"'
if i % 3 == 1
else f'"path":"tests/unit/test_{i % 31}.py"'
)
size = 500 + (i * 37) % 9000
lang = '"python"' if i % 2 == 0 else '"typescript"'
modified = f'"modified":"2024-01-{(i % 28) + 1:02d}"'
lines.append(
f' {{"type":"file",{path},"size":{size},"language":{lang},{modified}}},'
)
lines.append("]}")
return " ".join(" ".join(lines).split())
def benchmark_device(
payload: str,
device: str,
n_runs: int,
warmup: int = 2,
) -> BenchResult:
"""Run KompressCompressor on payload with given device, return timings."""
from headroom.transforms.kompress_compressor import (
KompressCompressor,
KompressConfig,
_kompress_cache,
unload_kompress_model,
)
# Clear model cache so each device loads fresh
unload_kompress_model()
config = KompressConfig(device=device)
compressor = KompressCompressor(config=config)
# Warmup runs (not recorded)
for _ in range(warmup):
compressor.compress(payload)
# Timed runs
runs = []
ratio = 1.0
for _ in range(n_runs):
t0 = time.perf_counter()
result = compressor.compress(payload)
elapsed = (time.perf_counter() - t0) * 1000
runs.append(elapsed)
ratio = result.compression_ratio
# Cleanup
unload_kompress_model()
return BenchResult(
device=device,
payload_label="",
n_words=len(payload.split()),
runs=runs,
compression_ratio=ratio,
)
def benchmark_onnx(
payload: str,
providers: list[str],
n_runs: int,
warmup: int = 2,
) -> BenchResult:
"""Run KompressCompressor forcing ONNX INT8 backend with given providers.
Builds the ONNX session directly (bypassing the cached helper) so we
can pin the provider list to CPU only or CUDA+CPU without monkey-
patching the module.
Args:
payload: Text to compress.
providers: ONNX Runtime provider list, e.g.
["CPUExecutionProvider"] or
["CUDAExecutionProvider", "CPUExecutionProvider"].
n_runs: Number of timed runs.
warmup: Number of warmup runs (not recorded).
"""
import onnxruntime as ort
from huggingface_hub import hf_hub_download
from transformers import AutoTokenizer
from headroom.transforms.kompress_compressor import (
HF_MODEL_ID,
_OnnxModel,
_kompress_cache,
_kompress_lock,
)
from headroom.onnx_runtime import create_cpu_session_options
# Build (or rebuild) the cached ONNX session with the requested providers
with _kompress_lock:
onnx_path = hf_hub_download(HF_MODEL_ID, "onnx/kompress-int8.onnx")
using_gpu = "CUDAExecutionProvider" in providers
sess_options = (
create_cpu_session_options(ort)
if not using_gpu
else ort.SessionOptions()
)
session = ort.InferenceSession(
onnx_path, sess_options, providers=providers
)
model = _OnnxModel(session)
tokenizer = AutoTokenizer.from_pretrained("answerdotai/ModernBERT-base")
_kompress_cache[HF_MODEL_ID] = (model, tokenizer, "onnx")
# Now compress using the cached model directly, mimicking KompressCompressor.compress
config = {"chunk_words": 350}
words = payload.split()
n_words = len(words)
# Rebind from cache to avoid closure-over-assignment scoping issue
model_ref, tokenizer_ref, _backend = _kompress_cache[HF_MODEL_ID]
def _run_once() -> float:
kept_ids: set[int] = set()
for chunk_start in range(0, n_words, config["chunk_words"]):
chunk_words = words[chunk_start : chunk_start + config["chunk_words"]]
encoding = tokenizer_ref(
chunk_words,
is_split_into_words=True,
truncation=True,
max_length=512,
padding=True,
return_tensors="np",
)
input_ids = encoding["input_ids"]
attention_mask = encoding["attention_mask"]
word_ids = encoding.word_ids(batch_index=0)
keep_mask = model_ref.get_keep_mask(input_ids, attention_mask)
mask_list = keep_mask[0]
for idx, wid in enumerate(word_ids):
if wid is None:
continue
if bool(mask_list[idx]):
kept_ids.add(wid + chunk_start)
return len(kept_ids) / n_words if n_words else 1.0
# Warmup
for _ in range(warmup):
_run_once()
# Timed runs
runs = []
ratio = 1.0
for _ in range(n_runs):
t0 = time.perf_counter()
ratio = _run_once()
elapsed = (time.perf_counter() - t0) * 1000
runs.append(elapsed)
return BenchResult(
device="+".join(p.replace("ExecutionProvider", "") for p in providers),
payload_label="",
n_words=n_words,
runs=runs,
compression_ratio=ratio,
)
def _safe_unload_kompress() -> None:
"""Best-effort unload of cached Kompress model; never raises."""
try:
from headroom.transforms.kompress_compressor import unload_kompress_model
unload_kompress_model()
except Exception: # noqa: BLE001 # cleanup must not propagate
pass
def _release_gpu_resources() -> None:
"""Release GPU resources (model cache + CUDA allocator); never raises.
Called after each payload's GPU run so a failure on one payload
doesn't strand allocations for subsequent ones.
"""
_safe_unload_kompress()
if torch is not None and torch.cuda.is_available():
try:
torch.cuda.empty_cache()
except RuntimeError as cache_err:
print(
f" WARNING: torch.cuda.empty_cache() failed: {cache_err}",
file=sys.stderr,
)
def _run_gpu_benchmark(
payload: str,
label: str,
n_runs: int,
warmup: int,
) -> BenchResult:
"""Run GPU benchmark with ONNX CUDA → PyTorch FP32 fallback chain.
Tries ONNX INT8 on CUDAExecutionProvider first. If that fails (e.g.
cuDNN missing on Windows), falls back to PyTorch FP32 on CUDA. Raises
RuntimeError if both paths fail.
"""
try:
try:
return benchmark_onnx(
payload,
["CUDAExecutionProvider", "CPUExecutionProvider"],
n_runs,
warmup,
)
except RuntimeError as onnx_err:
print(
f" NOTE: ONNX CUDA unavailable ({onnx_err}), falling back to PyTorch FP32",
file=sys.stderr,
)
try:
return benchmark_device(payload, "cuda", n_runs, warmup)
except torch.cuda.OutOfMemoryError as oom:
raise RuntimeError(
f"GPU benchmark failed for {label} — out of memory: {oom}. "
f"Try a smaller payload or run `nvidia-smi` to free GPU memory."
) from oom
finally:
_release_gpu_resources()
def print_markdown_report(
results: list[tuple[BenchResult, BenchResult]],
env_info: dict[str, str],
) -> str:
"""Generate markdown report from benchmark results."""
lines = []
lines.append("# CPU vs GPU Compression Benchmark (INT8)")
lines.append("")
lines.append("Kompress (ModernBERT) compression time comparison — ONNX INT8 on both sides.")
lines.append("Same code path as `POST /v1/compress` via `KompressCompressor`.")
lines.append("")
lines.append("## Environment")
lines.append("")
for k, v in env_info.items():
lines.append(f"- **{k}**: {v}")
lines.append("")
lines.append("## Results")
lines.append("")
lines.append(
"| Payload | Device | Words | Mean (ms) | Median (ms) | Min (ms) | Max (ms) |"
" StdDev (ms) | Compression | Speedup |"
)
lines.append(
"|---------|--------|-------|-----------|-------------|----------|----------|"
"-------------|-------------|---------|"
)
for cpu_r, gpu_r in results:
label = cpu_r.payload_label
ratio = cpu_r.mean_ms / gpu_r.mean_ms if gpu_r.mean_ms > 0 else float("inf")
speedup_str = f"**{ratio:.2f}x**" if gpu_r.mean_ms > 0 else "—"
lines.append(
f"| {label} | CPU INT8 | {cpu_r.n_words:,} | {cpu_r.mean_ms:.1f} | "
f"{cpu_r.median_ms:.1f} | {cpu_r.min_ms:.1f} | {cpu_r.max_ms:.1f} | "
f"{cpu_r.stddev_ms:.1f} | {1 - cpu_r.compression_ratio:.1%} | — |"
)
lines.append(
f"| {label} | GPU INT8 | {gpu_r.n_words:,} | {gpu_r.mean_ms:.1f} | "
f"{gpu_r.median_ms:.1f} | {gpu_r.min_ms:.1f} | {gpu_r.max_ms:.1f} | "
f"{gpu_r.stddev_ms:.1f} | {1 - gpu_r.compression_ratio:.1%} | "
f"{speedup_str} |"
)
lines.append("")
lines.append("## Summary")
lines.append("")
speedups = []
for cpu_r, gpu_r in results:
if gpu_r.mean_ms > 0:
speedups.append(cpu_r.mean_ms / gpu_r.mean_ms)
if speedups:
lines.append(f"- **Mean speedup**: {statistics.mean(speedups):.2f}x")
lines.append(f"- **Best speedup**: {max(speedups):.2f}x")
lines.append(f"- **Worst speedup**: {min(speedups):.2f}x")
lines.append("")
return "\n".join(lines)
def main() -> None:
parser = argparse.ArgumentParser(description="CPU vs GPU Kompress benchmark")
parser.add_argument(
"--runs",
type=int,
default=10,
help="Timed runs per config (positive integer)",
)
parser.add_argument(
"--warmup",
type=int,
default=2,
help="Warmup runs not recorded (positive integer)",
)
parser.add_argument(
"--markdown", action="store_true", help="Output full markdown report"
)
parser.add_argument(
"--gpu",
type=int,
default=0,
help="CUDA device index to benchmark (0-based, default 0)",
)
args = parser.parse_args()
if args.runs <= 0:
raise ValueError("--runs must be a positive integer")
if args.warmup < 0:
raise ValueError("--warmup must be a non-negative integer")
if torch is None:
raise RuntimeError("PyTorch not installed. Install with: pip install torch")
if not torch.cuda.is_available():
raise RuntimeError(
"CUDA not available. GPU benchmark requires CUDA-enabled PyTorch."
)
device_count = torch.cuda.device_count()
if device_count == 0:
raise RuntimeError("No CUDA devices found.")
gpu_idx = args.gpu
if gpu_idx < 0 or gpu_idx >= device_count:
raise RuntimeError(
f"--gpu {gpu_idx} out of range; valid indices: 0..{device_count - 1}"
)
# Validate GPU has enough free memory for the Kompress model.
# Model is ~50MB safetensors + ~200MB ModernBERT runtime; require
# 1GB free to leave headroom for activations across batch sizes.
MIN_FREE_GPU_MEM_MIB = 1024
try:
free_mem, total_mem = torch.cuda.mem_get_info(gpu_idx)
except (RuntimeError, AssertionError) as e:
raise RuntimeError(
f"Could not query memory for GPU {gpu_idx}: {e}. "
f"Device may be in MIG/MPS mode or otherwise unavailable."
) from e
free_mib = free_mem // (1024 * 1024)
total_mib = total_mem // (1024 * 1024)
if free_mib < MIN_FREE_GPU_MEM_MIB:
raise RuntimeError(
f"GPU {gpu_idx} has only {free_mib} MiB free, need "
f">= {MIN_FREE_GPU_MEM_MIB} MiB for Kompress model. "
f"Free up GPU memory or pick a different device with --gpu N."
)
if device_count > 1:
print(
f"NOTE: {device_count} CUDA devices detected, using device {gpu_idx}. "
f"Pass --gpu N to select a different device.",
file=sys.stderr,
)
gpu_name = torch.cuda.get_device_name(gpu_idx)
gpu_mem = total_mib
env_info = {
"CPU": platform.processor() or "Unknown",
"GPU": f"{gpu_name} ({gpu_mem} MiB)",
"PyTorch": torch.__version__,
"CUDA": torch.version.cuda or "N/A",
"Python": platform.python_version(),
"OS": f"{platform.system()} {platform.release()}",
"Runs per config": str(args.runs),
"Warmup runs": str(args.warmup),
}
payload_sizes = [
("Small (20 rows)", 20),
("Medium (100 rows)", 100),
("Large (300 rows)", 300),
("XLarge (500 rows)", 500),
]
results: list[tuple[BenchResult, BenchResult]] = []
for label, n_rows in payload_sizes:
payload = generate_payload(n_rows)
print(f"\n--- {label} ({n_rows} rows) ---", file=sys.stderr)
# CPU benchmark — ONNX INT8 on CPUExecutionProvider (quantized)
print(f" CPU (ONNX INT8)...", file=sys.stderr)
try:
cpu_result = benchmark_onnx(
payload, ["CPUExecutionProvider"], args.runs, args.warmup
)
except RuntimeError as e:
raise RuntimeError(
f"CPU benchmark failed for {label}: {e}"
) from e
finally:
_safe_unload_kompress()
cpu_result.payload_label = label
print(
f" CPU: mean={cpu_result.mean_ms:.1f}ms median={cpu_result.median_ms:.1f}ms",
file=sys.stderr,
)
# GPU benchmark — ONNX INT8 on CUDAExecutionProvider (quantized)
# Fall back to PyTorch FP32 if ONNX CUDA is not available.
print(f" GPU (ONNX INT8)...", file=sys.stderr)
gpu_result = _run_gpu_benchmark(payload, label, args.runs, args.warmup)
gpu_result.payload_label = label
speedup = cpu_result.mean_ms / gpu_result.mean_ms if gpu_result.mean_ms > 0 else float("inf")
print(
f" GPU: mean={gpu_result.mean_ms:.1f}ms median={gpu_result.median_ms:.1f}ms "
f"speedup={speedup:.2f}x",
file=sys.stderr,
)
results.append((cpu_result, gpu_result))
report = print_markdown_report(results, env_info)
if args.markdown:
print(report)
else:
# Print summary table to stderr, write full report to BENCHMARK.md
print(report, file=sys.stderr)
out_path = "BENCHMARK.md"
try:
with open(out_path, "w", encoding="utf-8") as f:
f.write(report)
f.write("\n")
except OSError as e:
raise RuntimeError(
f"Failed to write {out_path}: {e}. Check disk space and permissions."
) from e
print(f"\nReport written to {out_path}", file=sys.stderr)
if __name__ == "__main__":
main()