Spaces:
Sleeping
Sleeping
File size: 8,504 Bytes
43a2563 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 | """Async latency/throughput load test for the emotion API.
Hammers ``POST /predict`` with a fixed concurrency and reports a latency
distribution (p50/p95/p99) and sustained throughput (req/s), in the same spirit
as the python-perf-optimizer benchmarker. Designed to run against the offline
stub so the numbers are reproducible without a model download.
It can drive an already-running server (``--url``) or, by default, spin up an
in-process ASGI server on an ephemeral port, benchmark it, and tear it down --
so ``python scripts/loadtest.py`` works from a clean checkout with no setup.
Usage:
python scripts/loadtest.py # self-hosted, offline
python scripts/loadtest.py --requests 5000 --concurrency 64
python scripts/loadtest.py --url http://localhost:8000 # external server
python scripts/loadtest.py --markdown # emit the README table
Output (default) is a human-readable summary; ``--markdown`` emits the exact
benchmark table used in the README, and ``--json`` emits machine-readable stats.
"""
from __future__ import annotations
import argparse
import asyncio
import contextlib
import json
import math
import socket
import statistics
import sys
import threading
import time
from pathlib import Path
from typing import Dict, List, Optional
# Make the project importable when run as a plain script (no install needed).
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
import logging
import httpx
# httpx logs one INFO line per request; under load that I/O dominates the
# measurement. Silence it (and uvicorn) so the benchmark times the server.
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
SAMPLE_TEXTS = [
"i can't stop smiling, today went better than i ever hoped",
"i feel so let down after everything we had planned fell apart",
"my hands are shaking, i really don't think i can walk in there",
"how dare they take credit for the work i did all weekend",
"i didn't expect a package on my doorstep and now i'm grinning",
"i just feel really tender and grateful for the people around me",
"this is fine i guess, nothing special happened today",
"wow i genuinely did not see that coming at all",
]
def _percentile(values: List[float], pct: float) -> float:
if not values:
return float("nan")
s = sorted(values)
k = (len(s) - 1) * (pct / 100.0)
lo = math.floor(k)
hi = math.ceil(k)
if lo == hi:
return s[int(k)]
return s[lo] + (s[hi] - s[lo]) * (k - lo)
def _free_port() -> int:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("127.0.0.1", 0))
return s.getsockname()[1]
@contextlib.contextmanager
def _ephemeral_server(host: str = "127.0.0.1"):
"""Run the app in a background uvicorn server for the duration of the block."""
import os
import uvicorn
os.environ.setdefault("OFFLINE", "1")
from app.main import create_app
port = _free_port()
config = uvicorn.Config(create_app(), host=host, port=port, log_level="warning")
server = uvicorn.Server(config)
thread = threading.Thread(target=server.run, daemon=True)
thread.start()
# Wait for readiness.
base = f"http://{host}:{port}"
deadline = time.time() + 20
with httpx.Client(timeout=2.0) as c:
while time.time() < deadline:
try:
if c.get(f"{base}/healthz").status_code == 200:
break
except httpx.HTTPError:
time.sleep(0.05)
else:
raise RuntimeError("server did not become ready in time")
try:
yield base
finally:
server.should_exit = True
thread.join(timeout=10)
async def _worker(
client: httpx.AsyncClient,
url: str,
work: "asyncio.Queue[int]",
latencies: List[float],
errors: List[int],
) -> None:
while True:
try:
i = work.get_nowait()
except asyncio.QueueEmpty:
return
payload = {"text": SAMPLE_TEXTS[i % len(SAMPLE_TEXTS)]}
t0 = time.perf_counter()
try:
r = await client.post(url, json=payload)
dt = (time.perf_counter() - t0) * 1000.0 # ms
if r.status_code == 200:
latencies.append(dt)
else:
errors.append(r.status_code)
except httpx.HTTPError:
errors.append(-1)
finally:
work.task_done()
async def run_benchmark(
base_url: str,
total_requests: int,
concurrency: int,
warmup: int,
) -> Dict[str, float]:
endpoint = base_url.rstrip("/") + "/predict"
limits = httpx.Limits(max_connections=concurrency, max_keepalive_connections=concurrency)
async with httpx.AsyncClient(timeout=30.0, limits=limits) as client:
# Warm-up (not measured): primes connections and the batcher worker.
for i in range(warmup):
with contextlib.suppress(httpx.HTTPError):
await client.post(endpoint, json={"text": SAMPLE_TEXTS[i % len(SAMPLE_TEXTS)]})
work: "asyncio.Queue[int]" = asyncio.Queue()
for i in range(total_requests):
work.put_nowait(i)
latencies: List[float] = []
errors: List[int] = []
start = time.perf_counter()
workers = [
asyncio.create_task(_worker(client, endpoint, work, latencies, errors))
for _ in range(concurrency)
]
await asyncio.gather(*workers)
wall = time.perf_counter() - start
ok = len(latencies)
return {
"requests": total_requests,
"concurrency": concurrency,
"ok": ok,
"errors": len(errors),
"wall_s": wall,
"throughput_rps": ok / wall if wall > 0 else float("nan"),
"mean_ms": statistics.fmean(latencies) if latencies else float("nan"),
"p50_ms": _percentile(latencies, 50),
"p95_ms": _percentile(latencies, 95),
"p99_ms": _percentile(latencies, 99),
"min_ms": min(latencies) if latencies else float("nan"),
"max_ms": max(latencies) if latencies else float("nan"),
}
def _print_human(s: Dict[str, float]) -> None:
print("\nEmotion API load test (offline stub)")
print("=" * 44)
print(f" requests : {s['requests']}")
print(f" concurrency : {s['concurrency']}")
print(f" ok / errors : {s['ok']} / {s['errors']}")
print(f" wall time : {s['wall_s']:.3f} s")
print(f" throughput : {s['throughput_rps']:.1f} req/s")
print(f" latency mean : {s['mean_ms']:.3f} ms")
print(f" latency p50 : {s['p50_ms']:.3f} ms")
print(f" latency p95 : {s['p95_ms']:.3f} ms")
print(f" latency p99 : {s['p99_ms']:.3f} ms")
print(f" latency range : {s['min_ms']:.3f} – {s['max_ms']:.3f} ms")
def _print_markdown(s: Dict[str, float]) -> None:
print("\n| concurrency | throughput (req/s) | p50 (ms) | p95 (ms) | p99 (ms) |")
print("|---|---|---|---|---|")
print(
f"| {s['concurrency']} | {s['throughput_rps']:.0f} | "
f"{s['p50_ms']:.2f} | {s['p95_ms']:.2f} | {s['p99_ms']:.2f} |"
)
def main(argv: Optional[List[str]] = None) -> None:
ap = argparse.ArgumentParser(description="Latency/throughput load test for the emotion API.")
ap.add_argument("--url", default=None, help="Base URL of a running server. If omitted, an in-process server is started.")
ap.add_argument("--requests", type=int, default=2000, help="Total requests to send (default 2000).")
ap.add_argument("--concurrency", type=int, default=32, help="Concurrent in-flight requests (default 32).")
ap.add_argument("--warmup", type=int, default=50, help="Unmeasured warm-up requests (default 50).")
ap.add_argument("--markdown", action="store_true", help="Emit the README benchmark table.")
ap.add_argument("--json", action="store_true", help="Emit machine-readable JSON stats.")
args = ap.parse_args(argv)
def _bench(base: str) -> Dict[str, float]:
return asyncio.run(run_benchmark(base, args.requests, args.concurrency, args.warmup))
if args.url:
stats = _bench(args.url)
else:
with _ephemeral_server() as base:
stats = _bench(base)
if args.json:
print(json.dumps(stats, indent=2))
elif args.markdown:
_print_markdown(stats)
else:
_print_human(stats)
if __name__ == "__main__":
main()
|