Spaces:
Sleeping
Sleeping
| """ | |
| BitNet b1.58 async stress test. | |
| Fires N concurrent requests against the llama-server OpenAI-compatible API | |
| and collects per-request timing, token throughput, and error stats. | |
| Uses asyncio + aiohttp for non-blocking I/O. | |
| Usage: | |
| python stress_test.py [--url URL] [--requests N] [--concurrency C] [--warmup W] | |
| """ | |
| import sys | |
| import time | |
| import json | |
| import asyncio | |
| import statistics | |
| import argparse | |
| from datetime import datetime | |
| import aiohttp | |
| import colorama | |
| colorama.init(autoreset=True) | |
| C = colorama.Fore | |
| S = colorama.Style | |
| DIM = colorama.Style.DIM | |
| PAYLOAD = { | |
| "model": "bitnet", | |
| "messages": [ | |
| { | |
| "role": "user", | |
| "content": ( | |
| "Explain the concept of neural networks in simple terms. " | |
| "Include how they are trained and what makes them different " | |
| "from traditional computer programs." | |
| ) | |
| } | |
| ], | |
| "max_tokens": 80, | |
| "temperature": 0.7, | |
| } | |
| DESCRIPTION = "Medium chat completion (32 tok prompt, 80 tok generate)" | |
| def log(msg, color=C.WHITE, bright=False): | |
| style = S.BRIGHT if bright else "" | |
| ts = datetime.now().strftime("%H:%M:%S.%f")[:-3] | |
| print(f"{DIM}{ts}{S.RESET_ALL} {style}{color}{msg}{S.RESET_ALL}") | |
| # --------------------------------------------------------------------------- | |
| # Per-request worker | |
| # --------------------------------------------------------------------------- | |
| async def do_request(session, req_id, url, timeout): | |
| start = time.perf_counter() | |
| try: | |
| async with session.post( | |
| url, | |
| json=PAYLOAD, | |
| timeout=aiohttp.ClientTimeout(total=timeout), | |
| ) as resp: | |
| elapsed = time.perf_counter() - start | |
| status = resp.status | |
| body = await resp.json() | |
| if status == 200 and "choices" in body: | |
| usage = body.get("usage", {}) | |
| pt = usage.get("prompt_tokens", 0) | |
| ct = usage.get("completion_tokens", 0) | |
| tt = usage.get("total_tokens", 0) | |
| tok_sec = ct / elapsed if elapsed > 0 else 0 | |
| content = body.get("choices", [{}])[0].get("message", {}).get("content", "") | |
| ok = True | |
| err = None | |
| ctext = content[:55].replace("\n", " ") | |
| log( | |
| f"[#{req_id:03d}] {C.GREEN}OK{C.RESET} " | |
| f"{elapsed:6.1f}s {ct:3d} tok {tok_sec:5.1f} tok/s " | |
| f"\"{ctext}...\"", | |
| ) | |
| else: | |
| pt = ct = tt = 0 | |
| tok_sec = 0 | |
| ok = False | |
| err = body.get("error", {}).get("message", f"HTTP {status}") | |
| content = "" | |
| log(f"[#{req_id:03d}] {C.RED}FAIL{C.RESET} {elapsed:6.1f}s {err}", C.RED) | |
| except asyncio.TimeoutError: | |
| elapsed = time.perf_counter() - start | |
| ok = False | |
| err = "TIMEOUT" | |
| pt = ct = tt = 0 | |
| tok_sec = 0 | |
| content = "" | |
| log(f"[#{req_id:03d}] {C.RED}TIMEOUT{C.RESET} {elapsed:6.1f}s", C.RED) | |
| except Exception as e: | |
| elapsed = time.perf_counter() - start | |
| ok = False | |
| err = str(e)[:70] | |
| pt = ct = tt = 0 | |
| tok_sec = 0 | |
| content = "" | |
| log(f"[#{req_id:03d}] {C.RED}ERROR{C.RESET} {elapsed:6.1f}s {err}", C.RED) | |
| return { | |
| "id": req_id, | |
| "ok": ok, | |
| "status": status if "status" in locals() else 0, | |
| "elapsed": elapsed, | |
| "prompt_tokens": pt, | |
| "completion_tokens": ct, | |
| "total_tokens": tt, | |
| "tokens_per_sec": tok_sec, | |
| "content_len": len(content), | |
| "error": err, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Report | |
| # --------------------------------------------------------------------------- | |
| def print_audit(results, wall_time, url, n_req, concurrency, warmup): | |
| ok_r = [r for r in results if r["ok"]] | |
| fail_r = [r for r in results if not r["ok"]] | |
| elapsed_ok = [r["elapsed"] for r in ok_r] | |
| tok_sec_ok = [r["tokens_per_sec"] for r in ok_r] | |
| ct_ok = [r["completion_tokens"] for r in ok_r] | |
| pt_ok = [r["prompt_tokens"] for r in ok_r] | |
| total_prompt = sum(pt_ok) | |
| total_completion = sum(ct_ok) | |
| p95 = sorted(elapsed_ok)[int(len(elapsed_ok) * 0.95)] if elapsed_ok else 0 | |
| # Determine if we have slot info | |
| concurrency_note = f"{concurrency} concurrent (asyncio)" | |
| print() | |
| log("=" * 68, C.MAGENTA, bright=True) | |
| log(" STRESS TEST AUDIT REPORT", C.MAGENTA, bright=True) | |
| log("=" * 68, C.MAGENTA, bright=True) | |
| print() | |
| rows = [ | |
| ("Target URL", url), | |
| ("Description", DESCRIPTION), | |
| ("Total requests", str(n_req)), | |
| ("Concurrency", concurrency_note), | |
| ("Warmup requests", str(warmup)), | |
| ("Wall clock", f"{wall_time:.1f}s"), | |
| ] | |
| for k, v in rows: | |
| log(f" {C.YELLOW}{k:<22}{S.RESET_ALL} {v}") | |
| print() | |
| log(f" {C.CYAN}{'RESULTS':<22}{S.RESET_ALL}", bright=True) | |
| log(f" {'Succeeded':<22} {C.GREEN}{len(ok_r):>4}{S.RESET_ALL} ({len(ok_r)/max(len(results),1)*100:5.1f}%)", C.GREEN) | |
| log(f" {'Failed':<22} {C.RED}{len(fail_r):>4}{S.RESET_ALL} ({len(fail_r)/max(len(results),1)*100:5.1f}%)", C.RED) | |
| print() | |
| log(f" {C.CYAN}{'LATENCY (seconds)':<22}{S.RESET_ALL}", bright=True) | |
| if elapsed_ok: | |
| log(f" {'Average':<22} {statistics.mean(elapsed_ok):>6.2f}") | |
| log(f" {'Median':<22} {statistics.median(elapsed_ok):>6.2f}") | |
| log(f" {'Min':<22} {C.GREEN}{min(elapsed_ok):>6.2f}{S.RESET_ALL}") | |
| log(f" {'Max':<22} {C.RED}{max(elapsed_ok):>6.2f}{S.RESET_ALL}") | |
| log(f" {'P95':<22} {p95:>6.2f}") | |
| print() | |
| log(f" {C.CYAN}{'TOKEN THROUGHPUT (tok/s)':<22}{S.RESET_ALL}", bright=True) | |
| if tok_sec_ok: | |
| log(f" {'Average':<22} {statistics.mean(tok_sec_ok):>6.1f}") | |
| log(f" {'Median':<22} {statistics.median(tok_sec_ok):>6.1f}") | |
| log(f" {'Min':<22} {C.RED}{min(tok_sec_ok):>6.1f}{S.RESET_ALL}") | |
| log(f" {'Max':<22} {C.GREEN}{max(tok_sec_ok):>6.1f}{S.RESET_ALL}") | |
| print() | |
| log(f" {C.CYAN}{'TOKEN COUNTS':<22}{S.RESET_ALL}", bright=True) | |
| log(f" {'Total prompt tokens':<22} {total_prompt:>6}") | |
| log(f" {'Total completion tok':<22} {total_completion:>6}") | |
| log(f" {'Total combined':<22} {total_prompt + total_completion:>6}") | |
| if ct_ok: | |
| log(f" {'Avg completion/req':<22} {statistics.mean(ct_ok):>6.1f}") | |
| print() | |
| log(f" {C.CYAN}{'SYSTEM THROUGHPUT':<22}{S.RESET_ALL}", bright=True) | |
| agg_tok_sec = total_completion / wall_time if wall_time > 0 else 0 | |
| req_per_min = len(ok_r) / (wall_time / 60) if wall_time > 0 else 0 | |
| log(f" {'Completion tok/s':<22} {C.GREEN}{agg_tok_sec:>6.1f}{S.RESET_ALL}") | |
| log(f" {'Requests/min':<22} {req_per_min:>6.1f}") | |
| print() | |
| if fail_r: | |
| log(f" {C.RED}{'ERROR BREAKDOWN':<22}{S.RESET_ALL}", bright=True) | |
| counts = {} | |
| for r in fail_r: | |
| key = r["error"] or "UNKNOWN" | |
| counts[key] = counts.get(key, 0) + 1 | |
| for err, cnt in sorted(counts.items(), key=lambda x: -x[1]): | |
| log(f" {err:<30} {C.RED}{cnt}{S.RESET_ALL}") | |
| # ------------------------------------------------------------------ | |
| # Findings & recommendations | |
| # ------------------------------------------------------------------ | |
| print() | |
| log("=" * 68, C.MAGENTA, bright=True) | |
| log(" ANALYSIS", C.MAGENTA, bright=True) | |
| log("=" * 68, C.MAGENTA, bright=True) | |
| findings = [] | |
| if elapsed_ok: | |
| single = min(elapsed_ok) | |
| tail = max(elapsed_ok) | |
| if tail > single * 1.5: | |
| findings.append( | |
| f"{C.YELLOW}• High tail latency: max {tail:.1f}s vs min {single:.1f}s. " | |
| f"Requests queue behind each other.{S.RESET_ALL}" | |
| ) | |
| if fail_r: | |
| findings.append( | |
| f"{C.RED}• {len(fail_r)}/{len(results)} requests failed — " | |
| f"check server health and resource limits.{S.RESET_ALL}" | |
| ) | |
| findings.append( | |
| f"{C.GREEN}• Server runs with 2 CPU cores, 2 parallel slots " | |
| f"(--parallel 2), continuous batching enabled.{S.RESET_ALL}" | |
| ) | |
| if agg_tok_speed := globals().get("agg_tok_sec", 0): | |
| if agg_tok_speed < 5: | |
| findings.append( | |
| f"{C.YELLOW}• System throughput ({agg_tok_speed:.1f} tok/s) is low — " | |
| f"limited by 2-core CPU constraint.{S.RESET_ALL}" | |
| ) | |
| else: | |
| findings.append( | |
| f"{C.GREEN}• System throughput ({agg_tok_speed:.1f} tok/s) " | |
| f"is acceptable for CPU inference.{S.RESET_ALL}" | |
| ) | |
| for f_text in findings: | |
| log(f" {f_text}") | |
| print() | |
| log("=" * 68, C.MAGENTA, bright=True) | |
| print() | |
| # --------------------------------------------------------------------------- | |
| # Main | |
| # --------------------------------------------------------------------------- | |
| async def main(): | |
| parser = argparse.ArgumentParser( | |
| description="BitNet b1.58 async stress test", | |
| formatter_class=argparse.ArgumentDefaultsHelpFormatter, | |
| ) | |
| parser.add_argument("--url", default="http://localhost:8080/v1/chat/completions") | |
| parser.add_argument("--requests", type=int, default=50) | |
| parser.add_argument("--concurrency", type=int, default=10) | |
| parser.add_argument("--timeout", type=int, default=120) | |
| parser.add_argument("--warmup", type=int, default=0, | |
| help="Number of warmup requests (results discarded)") | |
| args = parser.parse_args() | |
| print() | |
| log("=" * 68, C.CYAN, bright=True) | |
| log(" BitNet b1.58 Async Stress Test", C.CYAN, bright=True) | |
| log(f" {args.requests} requests | {args.concurrency} concurrent " | |
| f"| {args.warmup} warmup | {args.url}", C.CYAN) | |
| log("=" * 68, C.CYAN, bright=True) | |
| print() | |
| connector = aiohttp.TCPConnector(limit=args.concurrency, force_close=True) | |
| async with aiohttp.ClientSession(connector=connector) as session: | |
| all_reqs = list(range(1, args.requests + 1 + args.warmup)) | |
| # Warmup phase | |
| for req_id in all_reqs[:args.warmup]: | |
| await do_request(session, req_id, args.url, args.timeout) | |
| # Main test | |
| wall_start = time.perf_counter() | |
| tasks = [ | |
| asyncio.create_task( | |
| do_request(session, req_id, args.url, args.timeout) | |
| ) | |
| for req_id in all_reqs[args.warmup:] | |
| ] | |
| results = await asyncio.gather(*tasks) | |
| wall_time = time.perf_counter() - wall_start | |
| print_audit(results, wall_time, args.url, args.requests, | |
| args.concurrency, args.warmup) | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |