aspara / benchmarks /bench_metrics_api.py
github-actions[bot]
Deploy to HF Spaces
6b18711
Raw
History Blame Contribute Delete
7.33 kB
"""Benchmark for the /api/projects/{project}/runs/metrics endpoint.
Measures response time and size for both JSON and msgpack formats.
Uses httpx AsyncClient with ASGITransport so no server process is needed.
Usage:
uv run python benchmarks/bench_metrics_api.py
uv run python benchmarks/bench_metrics_api.py --runs 10 --metrics 20 --steps 2000
"""
from __future__ import annotations
import argparse
import asyncio
import json
import statistics
import tempfile
import time
from pathlib import Path
import httpx
import msgpack
from aspara.dashboard.dependencies import configure_data_dir
from aspara.dashboard.main import app
def create_test_data(data_dir: Path, n_runs: int, n_metrics: int, n_steps: int) -> str:
"""Create test JSONL data files.
Returns:
Project name used for the test data.
"""
project = "bench_project"
project_dir = data_dir / project
project_dir.mkdir(parents=True, exist_ok=True)
base_ts = 1700000000000 # Fixed base timestamp in ms
for r in range(n_runs):
run_name = f"run_{r}"
run_file = project_dir / f"{run_name}.jsonl"
meta_file = project_dir / f"{run_name}.meta.json"
# Write metrics JSONL
with run_file.open("w") as f:
for s in range(n_steps):
entry = {
"timestamp": base_ts + s * 1000,
"step": s,
"metrics": {f"metric_{m}": s * 0.1 + m for m in range(n_metrics)},
}
f.write(json.dumps(entry) + "\n")
# Write minimal metadata
meta = {
"run_id": run_name,
"tags": [],
"notes": "",
"params": {},
"config": {},
"artifacts": [],
"summary": {},
"is_finished": True,
"exit_code": 0,
"start_time": base_ts,
"finish_time": base_ts + n_steps * 1000,
}
meta_file.write_text(json.dumps(meta))
return project
async def bench_endpoint(
client: httpx.AsyncClient,
url: str,
n_warmup: int = 3,
n_iterations: int = 20,
) -> tuple[list[float], int]:
"""Benchmark a single endpoint.
Returns:
(list of response times in ms, response size in bytes)
"""
# Warmup
for _ in range(n_warmup):
resp = await client.get(url, headers={"X-Requested-With": "benchmark"})
assert resp.status_code == 200, f"Status {resp.status_code}: {resp.text[:200]}"
# Measure
times: list[float] = []
size = 0
for _ in range(n_iterations):
start = time.perf_counter()
resp = await client.get(url, headers={"X-Requested-With": "benchmark"})
elapsed = (time.perf_counter() - start) * 1000 # ms
times.append(elapsed)
size = len(resp.content)
return times, size
def bench_serialization_only(data: dict, n_iterations: int = 100) -> dict[str, list[float]]:
"""Benchmark raw serialization of the metrics dict.
Returns:
Dict mapping format name to list of times in ms.
"""
results: dict[str, list[float]] = {}
# JSON via stdlib
json_times: list[float] = []
for _ in range(n_iterations):
start = time.perf_counter()
json.dumps(data)
elapsed = (time.perf_counter() - start) * 1000
json_times.append(elapsed)
results["json (stdlib)"] = json_times
# msgpack
msgpack_times: list[float] = []
for _ in range(n_iterations):
start = time.perf_counter()
msgpack.packb(data, use_single_float=True)
elapsed = (time.perf_counter() - start) * 1000
msgpack_times.append(elapsed)
results["msgpack"] = msgpack_times
return results
def print_stats(label: str, times: list[float], size_kb: float | None = None) -> None:
"""Print timing statistics."""
median = statistics.median(times)
p95 = sorted(times)[int(len(times) * 0.95)]
mean = statistics.mean(times)
size_str = f" size={size_kb:.1f}KB" if size_kb is not None else ""
print(f" {label:30s} median={median:7.2f}ms p95={p95:7.2f}ms mean={mean:7.2f}ms{size_str}")
async def main() -> None:
parser = argparse.ArgumentParser(description="Benchmark metrics API endpoint")
parser.add_argument("--runs", type=int, default=5, help="Number of runs (default: 5)")
parser.add_argument("--metrics", type=int, default=10, help="Number of metrics per run (default: 10)")
parser.add_argument("--steps", type=int, default=1000, help="Number of steps per metric (default: 1000)")
parser.add_argument("--iterations", type=int, default=20, help="Number of benchmark iterations (default: 20)")
args = parser.parse_args()
print(f"Benchmark config: {args.runs} runs x {args.metrics} metrics x {args.steps} steps")
print(f"Iterations: {args.iterations}")
print()
with tempfile.TemporaryDirectory() as tmp:
data_dir = Path(tmp)
project = create_test_data(data_dir, args.runs, args.metrics, args.steps)
configure_data_dir(str(data_dir))
try:
run_names = ",".join(f"run_{r}" for r in range(args.runs))
transport = httpx.ASGITransport(app=app)
async with httpx.AsyncClient(transport=transport, base_url="http://testserver") as client:
url_json = f"/api/projects/{project}/runs/metrics?runs={run_names}&format=json"
url_msgpack = f"/api/projects/{project}/runs/metrics?runs={run_names}&format=msgpack"
print("--- Endpoint response time ---")
json_times, json_size = await bench_endpoint(client, url_json, n_iterations=args.iterations)
msgpack_times, msgpack_size = await bench_endpoint(client, url_msgpack, n_iterations=args.iterations)
print_stats("JSON (Pydantic+Rust)", json_times, json_size / 1024)
print_stats("msgpack", msgpack_times, msgpack_size / 1024)
json_median = statistics.median(json_times)
msgpack_median = statistics.median(msgpack_times)
print(f"\n JSON/msgpack ratio: {json_median / msgpack_median:.2f}x")
# Serialization-only benchmark
print("\n--- Serialization only (raw dict -> bytes) ---")
# Build a representative data dict
sample_data: dict = {"project": project, "metrics": {}}
for m in range(args.metrics):
metric_dict: dict = {}
for r in range(args.runs):
metric_dict[f"run_{r}"] = {
"steps": list(range(args.steps)),
"values": [i * 0.1 for i in range(args.steps)],
"timestamps": [1700000000000 + i * 1000 for i in range(args.steps)],
}
sample_data["metrics"][f"metric_{m}"] = metric_dict
ser_results = bench_serialization_only(sample_data, n_iterations=args.iterations)
for label, times in ser_results.items():
size = len(json.dumps(sample_data).encode()) / 1024 if "json" in label else len(msgpack.packb(sample_data, use_single_float=True)) / 1024
print_stats(label, times, size)
finally:
configure_data_dir(None)
if __name__ == "__main__":
asyncio.run(main())