Spaces:
Sleeping
Sleeping
| """Async latency/throughput load test for the emotion API. | |
| Hammers ``POST /predict`` with a fixed concurrency and reports a latency | |
| distribution (p50/p95/p99) and sustained throughput (req/s), in the same spirit | |
| as the python-perf-optimizer benchmarker. Designed to run against the offline | |
| stub so the numbers are reproducible without a model download. | |
| It can drive an already-running server (``--url``) or, by default, spin up an | |
| in-process ASGI server on an ephemeral port, benchmark it, and tear it down -- | |
| so ``python scripts/loadtest.py`` works from a clean checkout with no setup. | |
| Usage: | |
| python scripts/loadtest.py # self-hosted, offline | |
| python scripts/loadtest.py --requests 5000 --concurrency 64 | |
| python scripts/loadtest.py --url http://localhost:8000 # external server | |
| python scripts/loadtest.py --markdown # emit the README table | |
| Output (default) is a human-readable summary; ``--markdown`` emits the exact | |
| benchmark table used in the README, and ``--json`` emits machine-readable stats. | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import asyncio | |
| import contextlib | |
| import json | |
| import math | |
| import socket | |
| import statistics | |
| import sys | |
| import threading | |
| import time | |
| from pathlib import Path | |
| from typing import Dict, List, Optional | |
| # Make the project importable when run as a plain script (no install needed). | |
| sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) | |
| import logging | |
| import httpx | |
| # httpx logs one INFO line per request; under load that I/O dominates the | |
| # measurement. Silence it (and uvicorn) so the benchmark times the server. | |
| logging.getLogger("httpx").setLevel(logging.WARNING) | |
| logging.getLogger("httpcore").setLevel(logging.WARNING) | |
| SAMPLE_TEXTS = [ | |
| "i can't stop smiling, today went better than i ever hoped", | |
| "i feel so let down after everything we had planned fell apart", | |
| "my hands are shaking, i really don't think i can walk in there", | |
| "how dare they take credit for the work i did all weekend", | |
| "i didn't expect a package on my doorstep and now i'm grinning", | |
| "i just feel really tender and grateful for the people around me", | |
| "this is fine i guess, nothing special happened today", | |
| "wow i genuinely did not see that coming at all", | |
| ] | |
| def _percentile(values: List[float], pct: float) -> float: | |
| if not values: | |
| return float("nan") | |
| s = sorted(values) | |
| k = (len(s) - 1) * (pct / 100.0) | |
| lo = math.floor(k) | |
| hi = math.ceil(k) | |
| if lo == hi: | |
| return s[int(k)] | |
| return s[lo] + (s[hi] - s[lo]) * (k - lo) | |
| def _free_port() -> int: | |
| with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: | |
| s.bind(("127.0.0.1", 0)) | |
| return s.getsockname()[1] | |
| def _ephemeral_server(host: str = "127.0.0.1"): | |
| """Run the app in a background uvicorn server for the duration of the block.""" | |
| import os | |
| import uvicorn | |
| os.environ.setdefault("OFFLINE", "1") | |
| from app.main import create_app | |
| port = _free_port() | |
| config = uvicorn.Config(create_app(), host=host, port=port, log_level="warning") | |
| server = uvicorn.Server(config) | |
| thread = threading.Thread(target=server.run, daemon=True) | |
| thread.start() | |
| # Wait for readiness. | |
| base = f"http://{host}:{port}" | |
| deadline = time.time() + 20 | |
| with httpx.Client(timeout=2.0) as c: | |
| while time.time() < deadline: | |
| try: | |
| if c.get(f"{base}/healthz").status_code == 200: | |
| break | |
| except httpx.HTTPError: | |
| time.sleep(0.05) | |
| else: | |
| raise RuntimeError("server did not become ready in time") | |
| try: | |
| yield base | |
| finally: | |
| server.should_exit = True | |
| thread.join(timeout=10) | |
| async def _worker( | |
| client: httpx.AsyncClient, | |
| url: str, | |
| work: "asyncio.Queue[int]", | |
| latencies: List[float], | |
| errors: List[int], | |
| ) -> None: | |
| while True: | |
| try: | |
| i = work.get_nowait() | |
| except asyncio.QueueEmpty: | |
| return | |
| payload = {"text": SAMPLE_TEXTS[i % len(SAMPLE_TEXTS)]} | |
| t0 = time.perf_counter() | |
| try: | |
| r = await client.post(url, json=payload) | |
| dt = (time.perf_counter() - t0) * 1000.0 # ms | |
| if r.status_code == 200: | |
| latencies.append(dt) | |
| else: | |
| errors.append(r.status_code) | |
| except httpx.HTTPError: | |
| errors.append(-1) | |
| finally: | |
| work.task_done() | |
| async def run_benchmark( | |
| base_url: str, | |
| total_requests: int, | |
| concurrency: int, | |
| warmup: int, | |
| ) -> Dict[str, float]: | |
| endpoint = base_url.rstrip("/") + "/predict" | |
| limits = httpx.Limits(max_connections=concurrency, max_keepalive_connections=concurrency) | |
| async with httpx.AsyncClient(timeout=30.0, limits=limits) as client: | |
| # Warm-up (not measured): primes connections and the batcher worker. | |
| for i in range(warmup): | |
| with contextlib.suppress(httpx.HTTPError): | |
| await client.post(endpoint, json={"text": SAMPLE_TEXTS[i % len(SAMPLE_TEXTS)]}) | |
| work: "asyncio.Queue[int]" = asyncio.Queue() | |
| for i in range(total_requests): | |
| work.put_nowait(i) | |
| latencies: List[float] = [] | |
| errors: List[int] = [] | |
| start = time.perf_counter() | |
| workers = [ | |
| asyncio.create_task(_worker(client, endpoint, work, latencies, errors)) | |
| for _ in range(concurrency) | |
| ] | |
| await asyncio.gather(*workers) | |
| wall = time.perf_counter() - start | |
| ok = len(latencies) | |
| return { | |
| "requests": total_requests, | |
| "concurrency": concurrency, | |
| "ok": ok, | |
| "errors": len(errors), | |
| "wall_s": wall, | |
| "throughput_rps": ok / wall if wall > 0 else float("nan"), | |
| "mean_ms": statistics.fmean(latencies) if latencies else float("nan"), | |
| "p50_ms": _percentile(latencies, 50), | |
| "p95_ms": _percentile(latencies, 95), | |
| "p99_ms": _percentile(latencies, 99), | |
| "min_ms": min(latencies) if latencies else float("nan"), | |
| "max_ms": max(latencies) if latencies else float("nan"), | |
| } | |
| def _print_human(s: Dict[str, float]) -> None: | |
| print("\nEmotion API load test (offline stub)") | |
| print("=" * 44) | |
| print(f" requests : {s['requests']}") | |
| print(f" concurrency : {s['concurrency']}") | |
| print(f" ok / errors : {s['ok']} / {s['errors']}") | |
| print(f" wall time : {s['wall_s']:.3f} s") | |
| print(f" throughput : {s['throughput_rps']:.1f} req/s") | |
| print(f" latency mean : {s['mean_ms']:.3f} ms") | |
| print(f" latency p50 : {s['p50_ms']:.3f} ms") | |
| print(f" latency p95 : {s['p95_ms']:.3f} ms") | |
| print(f" latency p99 : {s['p99_ms']:.3f} ms") | |
| print(f" latency range : {s['min_ms']:.3f} – {s['max_ms']:.3f} ms") | |
| def _print_markdown(s: Dict[str, float]) -> None: | |
| print("\n| concurrency | throughput (req/s) | p50 (ms) | p95 (ms) | p99 (ms) |") | |
| print("|---|---|---|---|---|") | |
| print( | |
| f"| {s['concurrency']} | {s['throughput_rps']:.0f} | " | |
| f"{s['p50_ms']:.2f} | {s['p95_ms']:.2f} | {s['p99_ms']:.2f} |" | |
| ) | |
| def main(argv: Optional[List[str]] = None) -> None: | |
| ap = argparse.ArgumentParser(description="Latency/throughput load test for the emotion API.") | |
| ap.add_argument("--url", default=None, help="Base URL of a running server. If omitted, an in-process server is started.") | |
| ap.add_argument("--requests", type=int, default=2000, help="Total requests to send (default 2000).") | |
| ap.add_argument("--concurrency", type=int, default=32, help="Concurrent in-flight requests (default 32).") | |
| ap.add_argument("--warmup", type=int, default=50, help="Unmeasured warm-up requests (default 50).") | |
| ap.add_argument("--markdown", action="store_true", help="Emit the README benchmark table.") | |
| ap.add_argument("--json", action="store_true", help="Emit machine-readable JSON stats.") | |
| args = ap.parse_args(argv) | |
| def _bench(base: str) -> Dict[str, float]: | |
| return asyncio.run(run_benchmark(base, args.requests, args.concurrency, args.warmup)) | |
| if args.url: | |
| stats = _bench(args.url) | |
| else: | |
| with _ephemeral_server() as base: | |
| stats = _bench(base) | |
| if args.json: | |
| print(json.dumps(stats, indent=2)) | |
| elif args.markdown: | |
| _print_markdown(stats) | |
| else: | |
| _print_human(stats) | |
| if __name__ == "__main__": | |
| main() | |