| |
| """ |
| Web2API HF Space stress test. |
| |
| Usage: |
| python scripts/stress_test.py --url https://ohmyapi-web2api.hf.space --key YOUR_KEY |
| python scripts/stress_test.py --url https://ohmyapi-web2api.hf.space --key YOUR_KEY --concurrency 3 --rounds 3 |
| python scripts/stress_test.py --url https://ohmyapi-web2api.hf.space --key YOUR_KEY --math-test |
| """ |
|
|
| import argparse |
| import json |
| import sys |
| import time |
| import urllib.error |
| import urllib.request |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
| from dataclasses import dataclass, field |
|
|
| |
| |
| |
|
|
| SIMPLE_PROMPT = "Reply with exactly: STRESS_TEST_OK" |
|
|
| |
| MATH_PROMPT = """\ |
| 首先我想请你回答一道困难的计算题: |
| 设实数列 {x_n} 满足: x_0=0, x_1=3√2, x_2 是正整数,且 |
| x_{n+1} = (1/∛4) x_n + ∛4 x_{n-1} + (1/2) x_{n-2} (n≥2). |
| 问:这类数列中最少有多少个整数项? |
| |
| 计算出答案之后请使用 JSON 格式回答以下所有问题: |
| { |
| "math_answer": "上个计算题的答案", |
| "model_name": "你是什么模型", |
| "model_version": "版本号多少", |
| "knowledge_cutoff": "你的知识截止日期是什么时候", |
| "company": "训练和发布你的公司是什么" |
| } |
| """ |
|
|
| |
|
|
| |
| |
| |
|
|
| @dataclass |
| class RequestResult: |
| round_idx: int |
| req_idx: int |
| model: str |
| stream: bool |
| success: bool = False |
| status: int = 0 |
| ttfb: float = 0.0 |
| total_time: float = 0.0 |
| content_preview: str = "" |
| error: str = "" |
| error_pattern: str = "" |
|
|
|
|
| ERROR_PATTERNS = [ |
| ("page.evaluate timeout", "page_evaluate_timeout"), |
| ("no text token received", "first_token_timeout"), |
| ("BrowserResourceInvalidError", "browser_resource_invalid"), |
| ("Overloaded", "upstream_overloaded"), |
| ("429", "rate_limited"), |
| ("AccountFrozenError", "account_frozen"), |
| ] |
|
|
|
|
| def classify_error(text: str) -> str: |
| for pattern, label in ERROR_PATTERNS: |
| if pattern in text: |
| return label |
| return "other" |
|
|
|
|
| |
| |
| |
|
|
| def do_non_stream_request(base_url: str, api_key: str, model: str, prompt: str, timeout: int) -> RequestResult: |
| result = RequestResult(0, 0, model, stream=False) |
| url = f"{base_url.rstrip('/')}/claude/v1/chat/completions" |
| payload = json.dumps({ |
| "model": model, |
| "messages": [{"role": "user", "content": prompt}], |
| "stream": False, |
| }).encode() |
| req = urllib.request.Request( |
| url, |
| data=payload, |
| headers={ |
| "Authorization": f"Bearer {api_key}", |
| "Content-Type": "application/json", |
| }, |
| method="POST", |
| ) |
| t0 = time.monotonic() |
| try: |
| with urllib.request.urlopen(req, timeout=timeout) as resp: |
| result.ttfb = time.monotonic() - t0 |
| body = resp.read().decode() |
| result.total_time = time.monotonic() - t0 |
| result.status = resp.status |
| data = json.loads(body) |
| content = data.get("choices", [{}])[0].get("message", {}).get("content", "") |
| result.content_preview = content[:200] |
| result.success = bool(content.strip()) |
| except urllib.error.HTTPError as e: |
| result.total_time = time.monotonic() - t0 |
| result.status = e.code |
| body = e.read().decode()[:500] |
| result.error = body |
| result.error_pattern = classify_error(body) |
| except Exception as e: |
| result.total_time = time.monotonic() - t0 |
| result.error = str(e)[:500] |
| result.error_pattern = classify_error(str(e)) |
| return result |
|
|
|
|
| def do_stream_request(base_url: str, api_key: str, model: str, prompt: str, timeout: int) -> RequestResult: |
| result = RequestResult(0, 0, model, stream=True) |
| url = f"{base_url.rstrip('/')}/claude/v1/chat/completions" |
| payload = json.dumps({ |
| "model": model, |
| "messages": [{"role": "user", "content": prompt}], |
| "stream": True, |
| }).encode() |
| req = urllib.request.Request( |
| url, |
| data=payload, |
| headers={ |
| "Authorization": f"Bearer {api_key}", |
| "Content-Type": "application/json", |
| }, |
| method="POST", |
| ) |
| t0 = time.monotonic() |
| collected = [] |
| try: |
| with urllib.request.urlopen(req, timeout=timeout) as resp: |
| result.status = resp.status |
| first_token = False |
| for raw_line in resp: |
| line = raw_line.decode("utf-8", errors="replace").strip() |
| if not line.startswith("data: "): |
| continue |
| data_str = line[6:] |
| if data_str == "[DONE]": |
| break |
| if not first_token: |
| result.ttfb = time.monotonic() - t0 |
| first_token = True |
| try: |
| chunk = json.loads(data_str) |
| delta = chunk.get("choices", [{}])[0].get("delta", {}) |
| text = delta.get("content", "") |
| if text: |
| collected.append(text) |
| except json.JSONDecodeError: |
| pass |
| result.total_time = time.monotonic() - t0 |
| result.content_preview = "".join(collected)[:200] |
| result.success = bool(collected) |
| except urllib.error.HTTPError as e: |
| result.total_time = time.monotonic() - t0 |
| result.status = e.code |
| body = e.read().decode()[:500] |
| result.error = body |
| result.error_pattern = classify_error(body) |
| except Exception as e: |
| result.total_time = time.monotonic() - t0 |
| result.error = str(e)[:500] |
| result.error_pattern = classify_error(str(e)) |
| return result |
|
|
|
|
| |
| |
| |
|
|
| def run_single(args, round_idx: int, req_idx: int, prompt: str) -> RequestResult: |
| fn = do_stream_request if args.stream else do_non_stream_request |
| r = fn(args.url, args.key, args.model, prompt, args.timeout) |
| r.round_idx = round_idx |
| r.req_idx = req_idx |
| return r |
|
|
|
|
| def print_result(r: RequestResult) -> None: |
| status = "OK" if r.success else "FAIL" |
| mode = "stream" if r.stream else "non-stream" |
| preview = r.content_preview.replace("\n", " ")[:80] if r.success else r.error[:80] |
| pattern = f" [{r.error_pattern}]" if r.error_pattern else "" |
| print( |
| f" [{status}] R{r.round_idx+1}-{r.req_idx+1} " |
| f"{r.model} {mode} " |
| f"HTTP {r.status} " |
| f"ttfb={r.ttfb:.1f}s total={r.total_time:.1f}s" |
| f"{pattern} " |
| f"| {preview}" |
| ) |
|
|
|
|
| def print_summary(results: list[RequestResult]) -> None: |
| total = len(results) |
| ok = sum(1 for r in results if r.success) |
| fail = total - ok |
| times = [r.total_time for r in results if r.success] |
| ttfbs = [r.ttfb for r in results if r.success and r.ttfb > 0] |
|
|
| print(f"\n{'='*60}") |
| print(f"SUMMARY: {ok}/{total} succeeded, {fail} failed") |
| if times: |
| times.sort() |
| ttfbs.sort() |
| print(f" Total time — avg={sum(times)/len(times):.1f}s p50={times[len(times)//2]:.1f}s p95={times[int(len(times)*0.95)]:.1f}s") |
| if ttfbs: |
| print(f" TTFB — avg={sum(ttfbs)/len(ttfbs):.1f}s p50={ttfbs[len(ttfbs)//2]:.1f}s") |
|
|
| |
| patterns: dict[str, int] = {} |
| for r in results: |
| if r.error_pattern: |
| patterns[r.error_pattern] = patterns.get(r.error_pattern, 0) + 1 |
| if patterns: |
| print(" Error patterns:") |
| for p, c in sorted(patterns.items(), key=lambda x: -x[1]): |
| print(f" {p}: {c}") |
|
|
| page_eval = patterns.get("page_evaluate_timeout", 0) |
| print(f"\n page.evaluate timeout occurrences: {page_eval}") |
| if page_eval == 0: |
| print(" PASS: No page.evaluate timeout detected") |
| else: |
| print(f" FAIL: {page_eval} page.evaluate timeout(s) detected!") |
| print(f"{'='*60}") |
|
|
|
|
| def main() -> None: |
| parser = argparse.ArgumentParser(description="Web2API stress test") |
| parser.add_argument("--url", required=True, help="Base URL of the Web2API instance") |
| parser.add_argument("--key", required=True, help="API key") |
| parser.add_argument("--model", default="claude-sonnet-4.6", help="Model to test") |
| parser.add_argument("--concurrency", type=int, default=3, help="Concurrent requests per round") |
| parser.add_argument("--rounds", type=int, default=3, help="Number of rounds") |
| parser.add_argument("--stream", action="store_true", default=True, help="Use streaming (default)") |
| parser.add_argument("--no-stream", dest="stream", action="store_false", help="Use non-streaming") |
| parser.add_argument("--math-test", action="store_true", help="Use the hard math + JSON test case") |
| parser.add_argument("--timeout", type=int, default=600, help="Per-request timeout in seconds") |
| args = parser.parse_args() |
|
|
| prompt = MATH_PROMPT if args.math_test else SIMPLE_PROMPT |
| all_results: list[RequestResult] = [] |
|
|
| print(f"Stress test: {args.rounds} rounds x {args.concurrency} concurrent") |
| print(f"Target: {args.url}") |
| print(f"Model: {args.model} Stream: {args.stream} Math: {args.math_test}") |
| print(f"Timeout: {args.timeout}s") |
| print() |
|
|
| for round_idx in range(args.rounds): |
| print(f"--- Round {round_idx + 1}/{args.rounds} ---") |
| with ThreadPoolExecutor(max_workers=args.concurrency) as pool: |
| futures = { |
| pool.submit(run_single, args, round_idx, i, prompt): i |
| for i in range(args.concurrency) |
| } |
| for future in as_completed(futures): |
| r = future.result() |
| print_result(r) |
| all_results.append(r) |
| |
| if round_idx < args.rounds - 1: |
| time.sleep(2) |
|
|
| print_summary(all_results) |
| |
| page_eval_count = sum(1 for r in all_results if r.error_pattern == "page_evaluate_timeout") |
| sys.exit(1 if page_eval_count > 0 else 0) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|