Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """Benchmark CPU vs GPU compression time for Kompress (v1/compress backend). | |
| Uses KompressCompressor directly — same code path as POST /v1/compress. | |
| Tests multiple payload sizes, reports mean/min/max per device. | |
| Usage: | |
| python benchmarks/bench_cpu_vs_gpu.py | |
| python benchmarks/bench_cpu_vs_gpu.py --runs 20 | |
| python benchmarks/bench_cpu_vs_gpu.py --markdown > BENCHMARK.md | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import platform | |
| import statistics | |
| import sys | |
| import time | |
| from dataclasses import dataclass | |
| try: | |
| import torch | |
| except ImportError: | |
| torch = None # type: ignore[assignment] # revealed by torch.cuda.is_available check below | |
| class BenchResult: | |
| device: str | |
| payload_label: str | |
| n_words: int | |
| runs: list[float] # ms per run | |
| compression_ratio: float | |
| def mean_ms(self) -> float: | |
| return statistics.mean(self.runs) | |
| def min_ms(self) -> float: | |
| return min(self.runs) | |
| def max_ms(self) -> float: | |
| return max(self.runs) | |
| def stddev_ms(self) -> float: | |
| return statistics.stdev(self.runs) if len(self.runs) > 1 else 0.0 | |
| def median_ms(self) -> float: | |
| return statistics.median(self.runs) | |
| def generate_payload(n_rows: int, seed: int = 42) -> str: | |
| """Generate a realistic compressible tool-output-style payload with n_rows entries. | |
| Uses varied content (not repeated templates) so Kompress has actual | |
| redundancy to compress. Simulates mixed tool outputs with headers, | |
| data rows, and metadata. | |
| Args: | |
| n_rows: Number of JSON rows in the payload. | |
| seed: Random seed for reproducibility. Fixed seed means identical | |
| payloads across runs — required for fair CPU vs GPU comparison | |
| on the same input. Override with --seed to study variance. | |
| """ | |
| import random | |
| random.seed(seed) | |
| lines = [] | |
| lines.append('{"tool":"search_files","status":"ok","count":%d,"results":[' % n_rows) | |
| for i in range(n_rows): | |
| path = ( | |
| f'"path":"src/services/payment_{i % 47}_handler.py"' | |
| if i % 3 == 0 | |
| else f'"path":"src/utils/helper_{i % 23}.py"' | |
| if i % 3 == 1 | |
| else f'"path":"tests/unit/test_{i % 31}.py"' | |
| ) | |
| size = 500 + (i * 37) % 9000 | |
| lang = '"python"' if i % 2 == 0 else '"typescript"' | |
| modified = f'"modified":"2024-01-{(i % 28) + 1:02d}"' | |
| lines.append( | |
| f' {{"type":"file",{path},"size":{size},"language":{lang},{modified}}},' | |
| ) | |
| lines.append("]}") | |
| return " ".join(" ".join(lines).split()) | |
| def benchmark_device( | |
| payload: str, | |
| device: str, | |
| n_runs: int, | |
| warmup: int = 2, | |
| ) -> BenchResult: | |
| """Run KompressCompressor on payload with given device, return timings.""" | |
| from headroom.transforms.kompress_compressor import ( | |
| KompressCompressor, | |
| KompressConfig, | |
| _kompress_cache, | |
| unload_kompress_model, | |
| ) | |
| # Clear model cache so each device loads fresh | |
| unload_kompress_model() | |
| config = KompressConfig(device=device) | |
| compressor = KompressCompressor(config=config) | |
| # Warmup runs (not recorded) | |
| for _ in range(warmup): | |
| compressor.compress(payload) | |
| # Timed runs | |
| runs = [] | |
| ratio = 1.0 | |
| for _ in range(n_runs): | |
| t0 = time.perf_counter() | |
| result = compressor.compress(payload) | |
| elapsed = (time.perf_counter() - t0) * 1000 | |
| runs.append(elapsed) | |
| ratio = result.compression_ratio | |
| # Cleanup | |
| unload_kompress_model() | |
| return BenchResult( | |
| device=device, | |
| payload_label="", | |
| n_words=len(payload.split()), | |
| runs=runs, | |
| compression_ratio=ratio, | |
| ) | |
| def benchmark_onnx( | |
| payload: str, | |
| providers: list[str], | |
| n_runs: int, | |
| warmup: int = 2, | |
| ) -> BenchResult: | |
| """Run KompressCompressor forcing ONNX INT8 backend with given providers. | |
| Builds the ONNX session directly (bypassing the cached helper) so we | |
| can pin the provider list to CPU only or CUDA+CPU without monkey- | |
| patching the module. | |
| Args: | |
| payload: Text to compress. | |
| providers: ONNX Runtime provider list, e.g. | |
| ["CPUExecutionProvider"] or | |
| ["CUDAExecutionProvider", "CPUExecutionProvider"]. | |
| n_runs: Number of timed runs. | |
| warmup: Number of warmup runs (not recorded). | |
| """ | |
| import onnxruntime as ort | |
| from huggingface_hub import hf_hub_download | |
| from transformers import AutoTokenizer | |
| from headroom.transforms.kompress_compressor import ( | |
| HF_MODEL_ID, | |
| _OnnxModel, | |
| _kompress_cache, | |
| _kompress_lock, | |
| ) | |
| from headroom.onnx_runtime import create_cpu_session_options | |
| # Build (or rebuild) the cached ONNX session with the requested providers | |
| with _kompress_lock: | |
| onnx_path = hf_hub_download(HF_MODEL_ID, "onnx/kompress-int8.onnx") | |
| using_gpu = "CUDAExecutionProvider" in providers | |
| sess_options = ( | |
| create_cpu_session_options(ort) | |
| if not using_gpu | |
| else ort.SessionOptions() | |
| ) | |
| session = ort.InferenceSession( | |
| onnx_path, sess_options, providers=providers | |
| ) | |
| model = _OnnxModel(session) | |
| tokenizer = AutoTokenizer.from_pretrained("answerdotai/ModernBERT-base") | |
| _kompress_cache[HF_MODEL_ID] = (model, tokenizer, "onnx") | |
| # Now compress using the cached model directly, mimicking KompressCompressor.compress | |
| config = {"chunk_words": 350} | |
| words = payload.split() | |
| n_words = len(words) | |
| # Rebind from cache to avoid closure-over-assignment scoping issue | |
| model_ref, tokenizer_ref, _backend = _kompress_cache[HF_MODEL_ID] | |
| def _run_once() -> float: | |
| kept_ids: set[int] = set() | |
| for chunk_start in range(0, n_words, config["chunk_words"]): | |
| chunk_words = words[chunk_start : chunk_start + config["chunk_words"]] | |
| encoding = tokenizer_ref( | |
| chunk_words, | |
| is_split_into_words=True, | |
| truncation=True, | |
| max_length=512, | |
| padding=True, | |
| return_tensors="np", | |
| ) | |
| input_ids = encoding["input_ids"] | |
| attention_mask = encoding["attention_mask"] | |
| word_ids = encoding.word_ids(batch_index=0) | |
| keep_mask = model_ref.get_keep_mask(input_ids, attention_mask) | |
| mask_list = keep_mask[0] | |
| for idx, wid in enumerate(word_ids): | |
| if wid is None: | |
| continue | |
| if bool(mask_list[idx]): | |
| kept_ids.add(wid + chunk_start) | |
| return len(kept_ids) / n_words if n_words else 1.0 | |
| # Warmup | |
| for _ in range(warmup): | |
| _run_once() | |
| # Timed runs | |
| runs = [] | |
| ratio = 1.0 | |
| for _ in range(n_runs): | |
| t0 = time.perf_counter() | |
| ratio = _run_once() | |
| elapsed = (time.perf_counter() - t0) * 1000 | |
| runs.append(elapsed) | |
| return BenchResult( | |
| device="+".join(p.replace("ExecutionProvider", "") for p in providers), | |
| payload_label="", | |
| n_words=n_words, | |
| runs=runs, | |
| compression_ratio=ratio, | |
| ) | |
| def _safe_unload_kompress() -> None: | |
| """Best-effort unload of cached Kompress model; never raises.""" | |
| try: | |
| from headroom.transforms.kompress_compressor import unload_kompress_model | |
| unload_kompress_model() | |
| except Exception: # noqa: BLE001 # cleanup must not propagate | |
| pass | |
| def _release_gpu_resources() -> None: | |
| """Release GPU resources (model cache + CUDA allocator); never raises. | |
| Called after each payload's GPU run so a failure on one payload | |
| doesn't strand allocations for subsequent ones. | |
| """ | |
| _safe_unload_kompress() | |
| if torch is not None and torch.cuda.is_available(): | |
| try: | |
| torch.cuda.empty_cache() | |
| except RuntimeError as cache_err: | |
| print( | |
| f" WARNING: torch.cuda.empty_cache() failed: {cache_err}", | |
| file=sys.stderr, | |
| ) | |
| def _run_gpu_benchmark( | |
| payload: str, | |
| label: str, | |
| n_runs: int, | |
| warmup: int, | |
| ) -> BenchResult: | |
| """Run GPU benchmark with ONNX CUDA → PyTorch FP32 fallback chain. | |
| Tries ONNX INT8 on CUDAExecutionProvider first. If that fails (e.g. | |
| cuDNN missing on Windows), falls back to PyTorch FP32 on CUDA. Raises | |
| RuntimeError if both paths fail. | |
| """ | |
| try: | |
| try: | |
| return benchmark_onnx( | |
| payload, | |
| ["CUDAExecutionProvider", "CPUExecutionProvider"], | |
| n_runs, | |
| warmup, | |
| ) | |
| except RuntimeError as onnx_err: | |
| print( | |
| f" NOTE: ONNX CUDA unavailable ({onnx_err}), falling back to PyTorch FP32", | |
| file=sys.stderr, | |
| ) | |
| try: | |
| return benchmark_device(payload, "cuda", n_runs, warmup) | |
| except torch.cuda.OutOfMemoryError as oom: | |
| raise RuntimeError( | |
| f"GPU benchmark failed for {label} — out of memory: {oom}. " | |
| f"Try a smaller payload or run `nvidia-smi` to free GPU memory." | |
| ) from oom | |
| finally: | |
| _release_gpu_resources() | |
| def print_markdown_report( | |
| results: list[tuple[BenchResult, BenchResult]], | |
| env_info: dict[str, str], | |
| ) -> str: | |
| """Generate markdown report from benchmark results.""" | |
| lines = [] | |
| lines.append("# CPU vs GPU Compression Benchmark (INT8)") | |
| lines.append("") | |
| lines.append("Kompress (ModernBERT) compression time comparison — ONNX INT8 on both sides.") | |
| lines.append("Same code path as `POST /v1/compress` via `KompressCompressor`.") | |
| lines.append("") | |
| lines.append("## Environment") | |
| lines.append("") | |
| for k, v in env_info.items(): | |
| lines.append(f"- **{k}**: {v}") | |
| lines.append("") | |
| lines.append("## Results") | |
| lines.append("") | |
| lines.append( | |
| "| Payload | Device | Words | Mean (ms) | Median (ms) | Min (ms) | Max (ms) |" | |
| " StdDev (ms) | Compression | Speedup |" | |
| ) | |
| lines.append( | |
| "|---------|--------|-------|-----------|-------------|----------|----------|" | |
| "-------------|-------------|---------|" | |
| ) | |
| for cpu_r, gpu_r in results: | |
| label = cpu_r.payload_label | |
| ratio = cpu_r.mean_ms / gpu_r.mean_ms if gpu_r.mean_ms > 0 else float("inf") | |
| speedup_str = f"**{ratio:.2f}x**" if gpu_r.mean_ms > 0 else "—" | |
| lines.append( | |
| f"| {label} | CPU INT8 | {cpu_r.n_words:,} | {cpu_r.mean_ms:.1f} | " | |
| f"{cpu_r.median_ms:.1f} | {cpu_r.min_ms:.1f} | {cpu_r.max_ms:.1f} | " | |
| f"{cpu_r.stddev_ms:.1f} | {1 - cpu_r.compression_ratio:.1%} | — |" | |
| ) | |
| lines.append( | |
| f"| {label} | GPU INT8 | {gpu_r.n_words:,} | {gpu_r.mean_ms:.1f} | " | |
| f"{gpu_r.median_ms:.1f} | {gpu_r.min_ms:.1f} | {gpu_r.max_ms:.1f} | " | |
| f"{gpu_r.stddev_ms:.1f} | {1 - gpu_r.compression_ratio:.1%} | " | |
| f"{speedup_str} |" | |
| ) | |
| lines.append("") | |
| lines.append("## Summary") | |
| lines.append("") | |
| speedups = [] | |
| for cpu_r, gpu_r in results: | |
| if gpu_r.mean_ms > 0: | |
| speedups.append(cpu_r.mean_ms / gpu_r.mean_ms) | |
| if speedups: | |
| lines.append(f"- **Mean speedup**: {statistics.mean(speedups):.2f}x") | |
| lines.append(f"- **Best speedup**: {max(speedups):.2f}x") | |
| lines.append(f"- **Worst speedup**: {min(speedups):.2f}x") | |
| lines.append("") | |
| return "\n".join(lines) | |
| def main() -> None: | |
| parser = argparse.ArgumentParser(description="CPU vs GPU Kompress benchmark") | |
| parser.add_argument( | |
| "--runs", | |
| type=int, | |
| default=10, | |
| help="Timed runs per config (positive integer)", | |
| ) | |
| parser.add_argument( | |
| "--warmup", | |
| type=int, | |
| default=2, | |
| help="Warmup runs not recorded (positive integer)", | |
| ) | |
| parser.add_argument( | |
| "--markdown", action="store_true", help="Output full markdown report" | |
| ) | |
| parser.add_argument( | |
| "--gpu", | |
| type=int, | |
| default=0, | |
| help="CUDA device index to benchmark (0-based, default 0)", | |
| ) | |
| args = parser.parse_args() | |
| if args.runs <= 0: | |
| raise ValueError("--runs must be a positive integer") | |
| if args.warmup < 0: | |
| raise ValueError("--warmup must be a non-negative integer") | |
| if torch is None: | |
| raise RuntimeError("PyTorch not installed. Install with: pip install torch") | |
| if not torch.cuda.is_available(): | |
| raise RuntimeError( | |
| "CUDA not available. GPU benchmark requires CUDA-enabled PyTorch." | |
| ) | |
| device_count = torch.cuda.device_count() | |
| if device_count == 0: | |
| raise RuntimeError("No CUDA devices found.") | |
| gpu_idx = args.gpu | |
| if gpu_idx < 0 or gpu_idx >= device_count: | |
| raise RuntimeError( | |
| f"--gpu {gpu_idx} out of range; valid indices: 0..{device_count - 1}" | |
| ) | |
| # Validate GPU has enough free memory for the Kompress model. | |
| # Model is ~50MB safetensors + ~200MB ModernBERT runtime; require | |
| # 1GB free to leave headroom for activations across batch sizes. | |
| MIN_FREE_GPU_MEM_MIB = 1024 | |
| try: | |
| free_mem, total_mem = torch.cuda.mem_get_info(gpu_idx) | |
| except (RuntimeError, AssertionError) as e: | |
| raise RuntimeError( | |
| f"Could not query memory for GPU {gpu_idx}: {e}. " | |
| f"Device may be in MIG/MPS mode or otherwise unavailable." | |
| ) from e | |
| free_mib = free_mem // (1024 * 1024) | |
| total_mib = total_mem // (1024 * 1024) | |
| if free_mib < MIN_FREE_GPU_MEM_MIB: | |
| raise RuntimeError( | |
| f"GPU {gpu_idx} has only {free_mib} MiB free, need " | |
| f">= {MIN_FREE_GPU_MEM_MIB} MiB for Kompress model. " | |
| f"Free up GPU memory or pick a different device with --gpu N." | |
| ) | |
| if device_count > 1: | |
| print( | |
| f"NOTE: {device_count} CUDA devices detected, using device {gpu_idx}. " | |
| f"Pass --gpu N to select a different device.", | |
| file=sys.stderr, | |
| ) | |
| gpu_name = torch.cuda.get_device_name(gpu_idx) | |
| gpu_mem = total_mib | |
| env_info = { | |
| "CPU": platform.processor() or "Unknown", | |
| "GPU": f"{gpu_name} ({gpu_mem} MiB)", | |
| "PyTorch": torch.__version__, | |
| "CUDA": torch.version.cuda or "N/A", | |
| "Python": platform.python_version(), | |
| "OS": f"{platform.system()} {platform.release()}", | |
| "Runs per config": str(args.runs), | |
| "Warmup runs": str(args.warmup), | |
| } | |
| payload_sizes = [ | |
| ("Small (20 rows)", 20), | |
| ("Medium (100 rows)", 100), | |
| ("Large (300 rows)", 300), | |
| ("XLarge (500 rows)", 500), | |
| ] | |
| results: list[tuple[BenchResult, BenchResult]] = [] | |
| for label, n_rows in payload_sizes: | |
| payload = generate_payload(n_rows) | |
| print(f"\n--- {label} ({n_rows} rows) ---", file=sys.stderr) | |
| # CPU benchmark — ONNX INT8 on CPUExecutionProvider (quantized) | |
| print(f" CPU (ONNX INT8)...", file=sys.stderr) | |
| try: | |
| cpu_result = benchmark_onnx( | |
| payload, ["CPUExecutionProvider"], args.runs, args.warmup | |
| ) | |
| except RuntimeError as e: | |
| raise RuntimeError( | |
| f"CPU benchmark failed for {label}: {e}" | |
| ) from e | |
| finally: | |
| _safe_unload_kompress() | |
| cpu_result.payload_label = label | |
| print( | |
| f" CPU: mean={cpu_result.mean_ms:.1f}ms median={cpu_result.median_ms:.1f}ms", | |
| file=sys.stderr, | |
| ) | |
| # GPU benchmark — ONNX INT8 on CUDAExecutionProvider (quantized) | |
| # Fall back to PyTorch FP32 if ONNX CUDA is not available. | |
| print(f" GPU (ONNX INT8)...", file=sys.stderr) | |
| gpu_result = _run_gpu_benchmark(payload, label, args.runs, args.warmup) | |
| gpu_result.payload_label = label | |
| speedup = cpu_result.mean_ms / gpu_result.mean_ms if gpu_result.mean_ms > 0 else float("inf") | |
| print( | |
| f" GPU: mean={gpu_result.mean_ms:.1f}ms median={gpu_result.median_ms:.1f}ms " | |
| f"speedup={speedup:.2f}x", | |
| file=sys.stderr, | |
| ) | |
| results.append((cpu_result, gpu_result)) | |
| report = print_markdown_report(results, env_info) | |
| if args.markdown: | |
| print(report) | |
| else: | |
| # Print summary table to stderr, write full report to BENCHMARK.md | |
| print(report, file=sys.stderr) | |
| out_path = "BENCHMARK.md" | |
| try: | |
| with open(out_path, "w", encoding="utf-8") as f: | |
| f.write(report) | |
| f.write("\n") | |
| except OSError as e: | |
| raise RuntimeError( | |
| f"Failed to write {out_path}: {e}. Check disk space and permissions." | |
| ) from e | |
| print(f"\nReport written to {out_path}", file=sys.stderr) | |
| if __name__ == "__main__": | |
| main() | |