File size: 8,504 Bytes
43a2563
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
"""Async latency/throughput load test for the emotion API.

Hammers ``POST /predict`` with a fixed concurrency and reports a latency
distribution (p50/p95/p99) and sustained throughput (req/s), in the same spirit
as the python-perf-optimizer benchmarker. Designed to run against the offline
stub so the numbers are reproducible without a model download.

It can drive an already-running server (``--url``) or, by default, spin up an
in-process ASGI server on an ephemeral port, benchmark it, and tear it down --
so ``python scripts/loadtest.py`` works from a clean checkout with no setup.

Usage:
    python scripts/loadtest.py                       # self-hosted, offline
    python scripts/loadtest.py --requests 5000 --concurrency 64
    python scripts/loadtest.py --url http://localhost:8000   # external server
    python scripts/loadtest.py --markdown             # emit the README table

Output (default) is a human-readable summary; ``--markdown`` emits the exact
benchmark table used in the README, and ``--json`` emits machine-readable stats.
"""
from __future__ import annotations

import argparse
import asyncio
import contextlib
import json
import math
import socket
import statistics
import sys
import threading
import time
from pathlib import Path
from typing import Dict, List, Optional

# Make the project importable when run as a plain script (no install needed).
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))

import logging

import httpx

# httpx logs one INFO line per request; under load that I/O dominates the
# measurement. Silence it (and uvicorn) so the benchmark times the server.
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)

SAMPLE_TEXTS = [
    "i can't stop smiling, today went better than i ever hoped",
    "i feel so let down after everything we had planned fell apart",
    "my hands are shaking, i really don't think i can walk in there",
    "how dare they take credit for the work i did all weekend",
    "i didn't expect a package on my doorstep and now i'm grinning",
    "i just feel really tender and grateful for the people around me",
    "this is fine i guess, nothing special happened today",
    "wow i genuinely did not see that coming at all",
]


def _percentile(values: List[float], pct: float) -> float:
    if not values:
        return float("nan")
    s = sorted(values)
    k = (len(s) - 1) * (pct / 100.0)
    lo = math.floor(k)
    hi = math.ceil(k)
    if lo == hi:
        return s[int(k)]
    return s[lo] + (s[hi] - s[lo]) * (k - lo)


def _free_port() -> int:
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.bind(("127.0.0.1", 0))
        return s.getsockname()[1]


@contextlib.contextmanager
def _ephemeral_server(host: str = "127.0.0.1"):
    """Run the app in a background uvicorn server for the duration of the block."""
    import os

    import uvicorn

    os.environ.setdefault("OFFLINE", "1")
    from app.main import create_app

    port = _free_port()
    config = uvicorn.Config(create_app(), host=host, port=port, log_level="warning")
    server = uvicorn.Server(config)
    thread = threading.Thread(target=server.run, daemon=True)
    thread.start()
    # Wait for readiness.
    base = f"http://{host}:{port}"
    deadline = time.time() + 20
    with httpx.Client(timeout=2.0) as c:
        while time.time() < deadline:
            try:
                if c.get(f"{base}/healthz").status_code == 200:
                    break
            except httpx.HTTPError:
                time.sleep(0.05)
        else:
            raise RuntimeError("server did not become ready in time")
    try:
        yield base
    finally:
        server.should_exit = True
        thread.join(timeout=10)


async def _worker(
    client: httpx.AsyncClient,
    url: str,
    work: "asyncio.Queue[int]",
    latencies: List[float],
    errors: List[int],
) -> None:
    while True:
        try:
            i = work.get_nowait()
        except asyncio.QueueEmpty:
            return
        payload = {"text": SAMPLE_TEXTS[i % len(SAMPLE_TEXTS)]}
        t0 = time.perf_counter()
        try:
            r = await client.post(url, json=payload)
            dt = (time.perf_counter() - t0) * 1000.0  # ms
            if r.status_code == 200:
                latencies.append(dt)
            else:
                errors.append(r.status_code)
        except httpx.HTTPError:
            errors.append(-1)
        finally:
            work.task_done()


async def run_benchmark(
    base_url: str,
    total_requests: int,
    concurrency: int,
    warmup: int,
) -> Dict[str, float]:
    endpoint = base_url.rstrip("/") + "/predict"
    limits = httpx.Limits(max_connections=concurrency, max_keepalive_connections=concurrency)
    async with httpx.AsyncClient(timeout=30.0, limits=limits) as client:
        # Warm-up (not measured): primes connections and the batcher worker.
        for i in range(warmup):
            with contextlib.suppress(httpx.HTTPError):
                await client.post(endpoint, json={"text": SAMPLE_TEXTS[i % len(SAMPLE_TEXTS)]})

        work: "asyncio.Queue[int]" = asyncio.Queue()
        for i in range(total_requests):
            work.put_nowait(i)
        latencies: List[float] = []
        errors: List[int] = []

        start = time.perf_counter()
        workers = [
            asyncio.create_task(_worker(client, endpoint, work, latencies, errors))
            for _ in range(concurrency)
        ]
        await asyncio.gather(*workers)
        wall = time.perf_counter() - start

    ok = len(latencies)
    return {
        "requests": total_requests,
        "concurrency": concurrency,
        "ok": ok,
        "errors": len(errors),
        "wall_s": wall,
        "throughput_rps": ok / wall if wall > 0 else float("nan"),
        "mean_ms": statistics.fmean(latencies) if latencies else float("nan"),
        "p50_ms": _percentile(latencies, 50),
        "p95_ms": _percentile(latencies, 95),
        "p99_ms": _percentile(latencies, 99),
        "min_ms": min(latencies) if latencies else float("nan"),
        "max_ms": max(latencies) if latencies else float("nan"),
    }


def _print_human(s: Dict[str, float]) -> None:
    print("\nEmotion API load test (offline stub)")
    print("=" * 44)
    print(f"  requests       : {s['requests']}")
    print(f"  concurrency    : {s['concurrency']}")
    print(f"  ok / errors    : {s['ok']} / {s['errors']}")
    print(f"  wall time      : {s['wall_s']:.3f} s")
    print(f"  throughput     : {s['throughput_rps']:.1f} req/s")
    print(f"  latency mean   : {s['mean_ms']:.3f} ms")
    print(f"  latency p50    : {s['p50_ms']:.3f} ms")
    print(f"  latency p95    : {s['p95_ms']:.3f} ms")
    print(f"  latency p99    : {s['p99_ms']:.3f} ms")
    print(f"  latency range  : {s['min_ms']:.3f}{s['max_ms']:.3f} ms")


def _print_markdown(s: Dict[str, float]) -> None:
    print("\n| concurrency | throughput (req/s) | p50 (ms) | p95 (ms) | p99 (ms) |")
    print("|---|---|---|---|---|")
    print(
        f"| {s['concurrency']} | {s['throughput_rps']:.0f} | "
        f"{s['p50_ms']:.2f} | {s['p95_ms']:.2f} | {s['p99_ms']:.2f} |"
    )


def main(argv: Optional[List[str]] = None) -> None:
    ap = argparse.ArgumentParser(description="Latency/throughput load test for the emotion API.")
    ap.add_argument("--url", default=None, help="Base URL of a running server. If omitted, an in-process server is started.")
    ap.add_argument("--requests", type=int, default=2000, help="Total requests to send (default 2000).")
    ap.add_argument("--concurrency", type=int, default=32, help="Concurrent in-flight requests (default 32).")
    ap.add_argument("--warmup", type=int, default=50, help="Unmeasured warm-up requests (default 50).")
    ap.add_argument("--markdown", action="store_true", help="Emit the README benchmark table.")
    ap.add_argument("--json", action="store_true", help="Emit machine-readable JSON stats.")
    args = ap.parse_args(argv)

    def _bench(base: str) -> Dict[str, float]:
        return asyncio.run(run_benchmark(base, args.requests, args.concurrency, args.warmup))

    if args.url:
        stats = _bench(args.url)
    else:
        with _ephemeral_server() as base:
            stats = _bench(base)

    if args.json:
        print(json.dumps(stats, indent=2))
    elif args.markdown:
        _print_markdown(stats)
    else:
        _print_human(stats)


if __name__ == "__main__":
    main()