web2api / scripts /stress_test.py
ohmyapi's picture
feat: align hosted Space deployment with latest upstream
77169b4
#!/usr/bin/env python3
"""
Web2API HF Space stress test.
Usage:
python scripts/stress_test.py --url https://ohmyapi-web2api.hf.space --key YOUR_KEY
python scripts/stress_test.py --url https://ohmyapi-web2api.hf.space --key YOUR_KEY --concurrency 3 --rounds 3
python scripts/stress_test.py --url https://ohmyapi-web2api.hf.space --key YOUR_KEY --math-test
"""
import argparse
import json
import sys
import time
import urllib.error
import urllib.request
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
# ---------------------------------------------------------------------------
# Test prompts
# ---------------------------------------------------------------------------
SIMPLE_PROMPT = "Reply with exactly: STRESS_TEST_OK"
# The user's hard math + JSON + model identity test case
MATH_PROMPT = """\
首先我想请你回答一道困难的计算题:
设实数列 {x_n} 满足: x_0=0, x_1=3√2, x_2 是正整数,且
x_{n+1} = (1/∛4) x_n + ∛4 x_{n-1} + (1/2) x_{n-2} (n≥2).
问:这类数列中最少有多少个整数项?
计算出答案之后请使用 JSON 格式回答以下所有问题:
{
"math_answer": "上个计算题的答案",
"model_name": "你是什么模型",
"model_version": "版本号多少",
"knowledge_cutoff": "你的知识截止日期是什么时候",
"company": "训练和发布你的公司是什么"
}
"""
# PLACEHOLDER_FOR_APPEND
# ---------------------------------------------------------------------------
# Result tracking
# ---------------------------------------------------------------------------
@dataclass
class RequestResult:
round_idx: int
req_idx: int
model: str
stream: bool
success: bool = False
status: int = 0
ttfb: float = 0.0
total_time: float = 0.0
content_preview: str = ""
error: str = ""
error_pattern: str = ""
ERROR_PATTERNS = [
("page.evaluate timeout", "page_evaluate_timeout"),
("no text token received", "first_token_timeout"),
("BrowserResourceInvalidError", "browser_resource_invalid"),
("Overloaded", "upstream_overloaded"),
("429", "rate_limited"),
("AccountFrozenError", "account_frozen"),
]
def classify_error(text: str) -> str:
for pattern, label in ERROR_PATTERNS:
if pattern in text:
return label
return "other"
# ---------------------------------------------------------------------------
# HTTP helpers (stdlib only, no extra deps)
# ---------------------------------------------------------------------------
def do_non_stream_request(base_url: str, api_key: str, model: str, prompt: str, timeout: int) -> RequestResult:
result = RequestResult(0, 0, model, stream=False)
url = f"{base_url.rstrip('/')}/claude/v1/chat/completions"
payload = json.dumps({
"model": model,
"messages": [{"role": "user", "content": prompt}],
"stream": False,
}).encode()
req = urllib.request.Request(
url,
data=payload,
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
},
method="POST",
)
t0 = time.monotonic()
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
result.ttfb = time.monotonic() - t0
body = resp.read().decode()
result.total_time = time.monotonic() - t0
result.status = resp.status
data = json.loads(body)
content = data.get("choices", [{}])[0].get("message", {}).get("content", "")
result.content_preview = content[:200]
result.success = bool(content.strip())
except urllib.error.HTTPError as e:
result.total_time = time.monotonic() - t0
result.status = e.code
body = e.read().decode()[:500]
result.error = body
result.error_pattern = classify_error(body)
except Exception as e:
result.total_time = time.monotonic() - t0
result.error = str(e)[:500]
result.error_pattern = classify_error(str(e))
return result
def do_stream_request(base_url: str, api_key: str, model: str, prompt: str, timeout: int) -> RequestResult:
result = RequestResult(0, 0, model, stream=True)
url = f"{base_url.rstrip('/')}/claude/v1/chat/completions"
payload = json.dumps({
"model": model,
"messages": [{"role": "user", "content": prompt}],
"stream": True,
}).encode()
req = urllib.request.Request(
url,
data=payload,
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
},
method="POST",
)
t0 = time.monotonic()
collected = []
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
result.status = resp.status
first_token = False
for raw_line in resp:
line = raw_line.decode("utf-8", errors="replace").strip()
if not line.startswith("data: "):
continue
data_str = line[6:]
if data_str == "[DONE]":
break
if not first_token:
result.ttfb = time.monotonic() - t0
first_token = True
try:
chunk = json.loads(data_str)
delta = chunk.get("choices", [{}])[0].get("delta", {})
text = delta.get("content", "")
if text:
collected.append(text)
except json.JSONDecodeError:
pass
result.total_time = time.monotonic() - t0
result.content_preview = "".join(collected)[:200]
result.success = bool(collected)
except urllib.error.HTTPError as e:
result.total_time = time.monotonic() - t0
result.status = e.code
body = e.read().decode()[:500]
result.error = body
result.error_pattern = classify_error(body)
except Exception as e:
result.total_time = time.monotonic() - t0
result.error = str(e)[:500]
result.error_pattern = classify_error(str(e))
return result
# ---------------------------------------------------------------------------
# Runner
# ---------------------------------------------------------------------------
def run_single(args, round_idx: int, req_idx: int, prompt: str) -> RequestResult:
fn = do_stream_request if args.stream else do_non_stream_request
r = fn(args.url, args.key, args.model, prompt, args.timeout)
r.round_idx = round_idx
r.req_idx = req_idx
return r
def print_result(r: RequestResult) -> None:
status = "OK" if r.success else "FAIL"
mode = "stream" if r.stream else "non-stream"
preview = r.content_preview.replace("\n", " ")[:80] if r.success else r.error[:80]
pattern = f" [{r.error_pattern}]" if r.error_pattern else ""
print(
f" [{status}] R{r.round_idx+1}-{r.req_idx+1} "
f"{r.model} {mode} "
f"HTTP {r.status} "
f"ttfb={r.ttfb:.1f}s total={r.total_time:.1f}s"
f"{pattern} "
f"| {preview}"
)
def print_summary(results: list[RequestResult]) -> None:
total = len(results)
ok = sum(1 for r in results if r.success)
fail = total - ok
times = [r.total_time for r in results if r.success]
ttfbs = [r.ttfb for r in results if r.success and r.ttfb > 0]
print(f"\n{'='*60}")
print(f"SUMMARY: {ok}/{total} succeeded, {fail} failed")
if times:
times.sort()
ttfbs.sort()
print(f" Total time — avg={sum(times)/len(times):.1f}s p50={times[len(times)//2]:.1f}s p95={times[int(len(times)*0.95)]:.1f}s")
if ttfbs:
print(f" TTFB — avg={sum(ttfbs)/len(ttfbs):.1f}s p50={ttfbs[len(ttfbs)//2]:.1f}s")
# Error pattern breakdown
patterns: dict[str, int] = {}
for r in results:
if r.error_pattern:
patterns[r.error_pattern] = patterns.get(r.error_pattern, 0) + 1
if patterns:
print(" Error patterns:")
for p, c in sorted(patterns.items(), key=lambda x: -x[1]):
print(f" {p}: {c}")
page_eval = patterns.get("page_evaluate_timeout", 0)
print(f"\n page.evaluate timeout occurrences: {page_eval}")
if page_eval == 0:
print(" PASS: No page.evaluate timeout detected")
else:
print(f" FAIL: {page_eval} page.evaluate timeout(s) detected!")
print(f"{'='*60}")
def main() -> None:
parser = argparse.ArgumentParser(description="Web2API stress test")
parser.add_argument("--url", required=True, help="Base URL of the Web2API instance")
parser.add_argument("--key", required=True, help="API key")
parser.add_argument("--model", default="claude-sonnet-4.6", help="Model to test")
parser.add_argument("--concurrency", type=int, default=3, help="Concurrent requests per round")
parser.add_argument("--rounds", type=int, default=3, help="Number of rounds")
parser.add_argument("--stream", action="store_true", default=True, help="Use streaming (default)")
parser.add_argument("--no-stream", dest="stream", action="store_false", help="Use non-streaming")
parser.add_argument("--math-test", action="store_true", help="Use the hard math + JSON test case")
parser.add_argument("--timeout", type=int, default=600, help="Per-request timeout in seconds")
args = parser.parse_args()
prompt = MATH_PROMPT if args.math_test else SIMPLE_PROMPT
all_results: list[RequestResult] = []
print(f"Stress test: {args.rounds} rounds x {args.concurrency} concurrent")
print(f"Target: {args.url}")
print(f"Model: {args.model} Stream: {args.stream} Math: {args.math_test}")
print(f"Timeout: {args.timeout}s")
print()
for round_idx in range(args.rounds):
print(f"--- Round {round_idx + 1}/{args.rounds} ---")
with ThreadPoolExecutor(max_workers=args.concurrency) as pool:
futures = {
pool.submit(run_single, args, round_idx, i, prompt): i
for i in range(args.concurrency)
}
for future in as_completed(futures):
r = future.result()
print_result(r)
all_results.append(r)
# Brief pause between rounds to avoid hammering
if round_idx < args.rounds - 1:
time.sleep(2)
print_summary(all_results)
# Exit code: 0 if no page.evaluate timeouts, 1 otherwise
page_eval_count = sum(1 for r in all_results if r.error_pattern == "page_evaluate_timeout")
sys.exit(1 if page_eval_count > 0 else 0)
if __name__ == "__main__":
main()