#!/usr/bin/env python3 """ Web2API HF Space stress test. Usage: python scripts/stress_test.py --url https://ohmyapi-web2api.hf.space --key YOUR_KEY python scripts/stress_test.py --url https://ohmyapi-web2api.hf.space --key YOUR_KEY --concurrency 3 --rounds 3 python scripts/stress_test.py --url https://ohmyapi-web2api.hf.space --key YOUR_KEY --math-test """ import argparse import json import sys import time import urllib.error import urllib.request from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass, field # --------------------------------------------------------------------------- # Test prompts # --------------------------------------------------------------------------- SIMPLE_PROMPT = "Reply with exactly: STRESS_TEST_OK" # The user's hard math + JSON + model identity test case MATH_PROMPT = """\ 首先我想请你回答一道困难的计算题: 设实数列 {x_n} 满足: x_0=0, x_1=3√2, x_2 是正整数,且 x_{n+1} = (1/∛4) x_n + ∛4 x_{n-1} + (1/2) x_{n-2} (n≥2). 问:这类数列中最少有多少个整数项? 计算出答案之后请使用 JSON 格式回答以下所有问题: { "math_answer": "上个计算题的答案", "model_name": "你是什么模型", "model_version": "版本号多少", "knowledge_cutoff": "你的知识截止日期是什么时候", "company": "训练和发布你的公司是什么" } """ # PLACEHOLDER_FOR_APPEND # --------------------------------------------------------------------------- # Result tracking # --------------------------------------------------------------------------- @dataclass class RequestResult: round_idx: int req_idx: int model: str stream: bool success: bool = False status: int = 0 ttfb: float = 0.0 total_time: float = 0.0 content_preview: str = "" error: str = "" error_pattern: str = "" ERROR_PATTERNS = [ ("page.evaluate timeout", "page_evaluate_timeout"), ("no text token received", "first_token_timeout"), ("BrowserResourceInvalidError", "browser_resource_invalid"), ("Overloaded", "upstream_overloaded"), ("429", "rate_limited"), ("AccountFrozenError", "account_frozen"), ] def classify_error(text: str) -> str: for pattern, label in ERROR_PATTERNS: if pattern in text: return label return "other" # --------------------------------------------------------------------------- # HTTP helpers (stdlib only, no extra deps) # --------------------------------------------------------------------------- def do_non_stream_request(base_url: str, api_key: str, model: str, prompt: str, timeout: int) -> RequestResult: result = RequestResult(0, 0, model, stream=False) url = f"{base_url.rstrip('/')}/claude/v1/chat/completions" payload = json.dumps({ "model": model, "messages": [{"role": "user", "content": prompt}], "stream": False, }).encode() req = urllib.request.Request( url, data=payload, headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", }, method="POST", ) t0 = time.monotonic() try: with urllib.request.urlopen(req, timeout=timeout) as resp: result.ttfb = time.monotonic() - t0 body = resp.read().decode() result.total_time = time.monotonic() - t0 result.status = resp.status data = json.loads(body) content = data.get("choices", [{}])[0].get("message", {}).get("content", "") result.content_preview = content[:200] result.success = bool(content.strip()) except urllib.error.HTTPError as e: result.total_time = time.monotonic() - t0 result.status = e.code body = e.read().decode()[:500] result.error = body result.error_pattern = classify_error(body) except Exception as e: result.total_time = time.monotonic() - t0 result.error = str(e)[:500] result.error_pattern = classify_error(str(e)) return result def do_stream_request(base_url: str, api_key: str, model: str, prompt: str, timeout: int) -> RequestResult: result = RequestResult(0, 0, model, stream=True) url = f"{base_url.rstrip('/')}/claude/v1/chat/completions" payload = json.dumps({ "model": model, "messages": [{"role": "user", "content": prompt}], "stream": True, }).encode() req = urllib.request.Request( url, data=payload, headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", }, method="POST", ) t0 = time.monotonic() collected = [] try: with urllib.request.urlopen(req, timeout=timeout) as resp: result.status = resp.status first_token = False for raw_line in resp: line = raw_line.decode("utf-8", errors="replace").strip() if not line.startswith("data: "): continue data_str = line[6:] if data_str == "[DONE]": break if not first_token: result.ttfb = time.monotonic() - t0 first_token = True try: chunk = json.loads(data_str) delta = chunk.get("choices", [{}])[0].get("delta", {}) text = delta.get("content", "") if text: collected.append(text) except json.JSONDecodeError: pass result.total_time = time.monotonic() - t0 result.content_preview = "".join(collected)[:200] result.success = bool(collected) except urllib.error.HTTPError as e: result.total_time = time.monotonic() - t0 result.status = e.code body = e.read().decode()[:500] result.error = body result.error_pattern = classify_error(body) except Exception as e: result.total_time = time.monotonic() - t0 result.error = str(e)[:500] result.error_pattern = classify_error(str(e)) return result # --------------------------------------------------------------------------- # Runner # --------------------------------------------------------------------------- def run_single(args, round_idx: int, req_idx: int, prompt: str) -> RequestResult: fn = do_stream_request if args.stream else do_non_stream_request r = fn(args.url, args.key, args.model, prompt, args.timeout) r.round_idx = round_idx r.req_idx = req_idx return r def print_result(r: RequestResult) -> None: status = "OK" if r.success else "FAIL" mode = "stream" if r.stream else "non-stream" preview = r.content_preview.replace("\n", " ")[:80] if r.success else r.error[:80] pattern = f" [{r.error_pattern}]" if r.error_pattern else "" print( f" [{status}] R{r.round_idx+1}-{r.req_idx+1} " f"{r.model} {mode} " f"HTTP {r.status} " f"ttfb={r.ttfb:.1f}s total={r.total_time:.1f}s" f"{pattern} " f"| {preview}" ) def print_summary(results: list[RequestResult]) -> None: total = len(results) ok = sum(1 for r in results if r.success) fail = total - ok times = [r.total_time for r in results if r.success] ttfbs = [r.ttfb for r in results if r.success and r.ttfb > 0] print(f"\n{'='*60}") print(f"SUMMARY: {ok}/{total} succeeded, {fail} failed") if times: times.sort() ttfbs.sort() print(f" Total time — avg={sum(times)/len(times):.1f}s p50={times[len(times)//2]:.1f}s p95={times[int(len(times)*0.95)]:.1f}s") if ttfbs: print(f" TTFB — avg={sum(ttfbs)/len(ttfbs):.1f}s p50={ttfbs[len(ttfbs)//2]:.1f}s") # Error pattern breakdown patterns: dict[str, int] = {} for r in results: if r.error_pattern: patterns[r.error_pattern] = patterns.get(r.error_pattern, 0) + 1 if patterns: print(" Error patterns:") for p, c in sorted(patterns.items(), key=lambda x: -x[1]): print(f" {p}: {c}") page_eval = patterns.get("page_evaluate_timeout", 0) print(f"\n page.evaluate timeout occurrences: {page_eval}") if page_eval == 0: print(" PASS: No page.evaluate timeout detected") else: print(f" FAIL: {page_eval} page.evaluate timeout(s) detected!") print(f"{'='*60}") def main() -> None: parser = argparse.ArgumentParser(description="Web2API stress test") parser.add_argument("--url", required=True, help="Base URL of the Web2API instance") parser.add_argument("--key", required=True, help="API key") parser.add_argument("--model", default="claude-sonnet-4.6", help="Model to test") parser.add_argument("--concurrency", type=int, default=3, help="Concurrent requests per round") parser.add_argument("--rounds", type=int, default=3, help="Number of rounds") parser.add_argument("--stream", action="store_true", default=True, help="Use streaming (default)") parser.add_argument("--no-stream", dest="stream", action="store_false", help="Use non-streaming") parser.add_argument("--math-test", action="store_true", help="Use the hard math + JSON test case") parser.add_argument("--timeout", type=int, default=600, help="Per-request timeout in seconds") args = parser.parse_args() prompt = MATH_PROMPT if args.math_test else SIMPLE_PROMPT all_results: list[RequestResult] = [] print(f"Stress test: {args.rounds} rounds x {args.concurrency} concurrent") print(f"Target: {args.url}") print(f"Model: {args.model} Stream: {args.stream} Math: {args.math_test}") print(f"Timeout: {args.timeout}s") print() for round_idx in range(args.rounds): print(f"--- Round {round_idx + 1}/{args.rounds} ---") with ThreadPoolExecutor(max_workers=args.concurrency) as pool: futures = { pool.submit(run_single, args, round_idx, i, prompt): i for i in range(args.concurrency) } for future in as_completed(futures): r = future.result() print_result(r) all_results.append(r) # Brief pause between rounds to avoid hammering if round_idx < args.rounds - 1: time.sleep(2) print_summary(all_results) # Exit code: 0 if no page.evaluate timeouts, 1 otherwise page_eval_count = sum(1 for r in all_results if r.error_pattern == "page_evaluate_timeout") sys.exit(1 if page_eval_count > 0 else 0) if __name__ == "__main__": main()