| |
| """ |
| Grok2API 并发性能测试脚本 |
| |
| 测试不同并发级别下的API性能表现 |
| """ |
|
|
| import asyncio |
| import aiohttp |
| import time |
| import statistics |
| import argparse |
| from datetime import datetime |
| from typing import List, Dict, Any |
| import json |
|
|
|
|
| class ConcurrencyTester: |
| """并发测试器""" |
| |
| def __init__(self, base_url: str, api_key: str = None): |
| self.base_url = base_url.rstrip('/') |
| self.api_key = api_key |
| self.results: List[Dict[str, Any]] = [] |
| |
| async def test_request(self, session: aiohttp.ClientSession, request_id: int) -> Dict[str, Any]: |
| """发送单个测试请求""" |
| url = f"{self.base_url}/v1/chat/completions" |
| |
| headers = { |
| "Content-Type": "application/json" |
| } |
| if self.api_key: |
| headers["Authorization"] = f"Bearer {self.api_key}" |
| |
| payload = { |
| "model": "grok-3-fast", |
| "messages": [ |
| {"role": "user", "content": f"测试请求 #{request_id},请简短回复OK"} |
| ], |
| "stream": False, |
| "max_tokens": 10 |
| } |
| |
| start_time = time.time() |
| |
| try: |
| async with session.post(url, json=payload, headers=headers, timeout=30) as response: |
| status = response.status |
| |
| if status == 200: |
| data = await response.json() |
| elapsed = time.time() - start_time |
| |
| return { |
| "id": request_id, |
| "status": "success", |
| "http_status": status, |
| "elapsed": elapsed, |
| "response_length": len(json.dumps(data)) |
| } |
| else: |
| elapsed = time.time() - start_time |
| error_text = await response.text() |
| |
| return { |
| "id": request_id, |
| "status": "error", |
| "http_status": status, |
| "elapsed": elapsed, |
| "error": error_text[:200] |
| } |
| |
| except asyncio.TimeoutError: |
| elapsed = time.time() - start_time |
| return { |
| "id": request_id, |
| "status": "timeout", |
| "elapsed": elapsed, |
| "error": "Request timeout" |
| } |
| |
| except Exception as e: |
| elapsed = time.time() - start_time |
| return { |
| "id": request_id, |
| "status": "exception", |
| "elapsed": elapsed, |
| "error": str(e) |
| } |
| |
| async def run_concurrent_test(self, concurrency: int, total_requests: int): |
| """运行并发测试""" |
| print(f"\n{'='*60}") |
| print(f"📊 测试配置:并发数 {concurrency}, 总请求数 {total_requests}") |
| print(f"{'='*60}") |
| |
| connector = aiohttp.TCPConnector(limit=concurrency, limit_per_host=concurrency) |
| timeout = aiohttp.ClientTimeout(total=60) |
| |
| async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session: |
| |
| print("🔥 预热中...") |
| await self.test_request(session, 0) |
| |
| |
| print(f"🚀 开始并发测试...") |
| start_time = time.time() |
| |
| |
| tasks = [] |
| for i in range(1, total_requests + 1): |
| task = asyncio.create_task(self.test_request(session, i)) |
| tasks.append(task) |
| |
| |
| if len(tasks) >= concurrency: |
| results = await asyncio.gather(*tasks) |
| self.results.extend(results) |
| tasks = [] |
| |
| |
| print(f" 进度: {i}/{total_requests} ({i/total_requests*100:.1f}%)", end='\r') |
| |
| |
| if tasks: |
| results = await asyncio.gather(*tasks) |
| self.results.extend(results) |
| |
| total_time = time.time() - start_time |
| |
| |
| self.print_statistics(concurrency, total_requests, total_time) |
| |
| def print_statistics(self, concurrency: int, total_requests: int, total_time: float): |
| """打印统计信息""" |
| success_results = [r for r in self.results if r["status"] == "success"] |
| error_results = [r for r in self.results if r["status"] != "success"] |
| |
| success_count = len(success_results) |
| error_count = len(error_results) |
| |
| if success_results: |
| latencies = [r["elapsed"] for r in success_results] |
| avg_latency = statistics.mean(latencies) |
| min_latency = min(latencies) |
| max_latency = max(latencies) |
| p50_latency = statistics.median(latencies) |
| p95_latency = sorted(latencies)[int(len(latencies) * 0.95)] if len(latencies) > 1 else latencies[0] |
| p99_latency = sorted(latencies)[int(len(latencies) * 0.99)] if len(latencies) > 1 else latencies[0] |
| else: |
| avg_latency = min_latency = max_latency = p50_latency = p95_latency = p99_latency = 0 |
| |
| throughput = total_requests / total_time if total_time > 0 else 0 |
| |
| print(f"\n\n{'='*60}") |
| print(f"📈 测试结果统计") |
| print(f"{'='*60}") |
| print(f" 测试时间: {total_time:.2f}s") |
| print(f" 总请求数: {total_requests}") |
| print(f" 并发数: {concurrency}") |
| print(f"") |
| print(f" 成功请求: {success_count} ({success_count/total_requests*100:.1f}%)") |
| print(f" 失败请求: {error_count} ({error_count/total_requests*100:.1f}%)") |
| print(f"") |
| print(f" 吞吐量: {throughput:.2f} req/s") |
| print(f"") |
| print(f" 延迟统计:") |
| print(f" 最小: {min_latency*1000:.0f}ms") |
| print(f" 平均: {avg_latency*1000:.0f}ms") |
| print(f" 最大: {max_latency*1000:.0f}ms") |
| print(f" P50: {p50_latency*1000:.0f}ms") |
| print(f" P95: {p95_latency*1000:.0f}ms") |
| print(f" P99: {p99_latency*1000:.0f}ms") |
| |
| |
| if error_results: |
| print(f"\n ⚠️ 错误详情:") |
| error_types = {} |
| for r in error_results: |
| error_type = r.get("status", "unknown") |
| error_types[error_type] = error_types.get(error_type, 0) + 1 |
| |
| for error_type, count in error_types.items(): |
| print(f" {error_type}: {count}") |
| |
| print(f"{'='*60}\n") |
| |
| |
| self.print_performance_rating(throughput, avg_latency) |
| |
| def print_performance_rating(self, throughput: float, avg_latency: float): |
| """打印性能评级""" |
| print(f"🎯 性能评级:") |
| |
| |
| if throughput >= 100: |
| rating = "⭐⭐⭐⭐⭐ 优秀" |
| elif throughput >= 60: |
| rating = "⭐⭐⭐⭐ 良好" |
| elif throughput >= 30: |
| rating = "⭐⭐⭐ 中等" |
| elif throughput >= 10: |
| rating = "⭐⭐ 较低" |
| else: |
| rating = "⭐ 需优化" |
| |
| print(f" 吞吐量 ({throughput:.1f} req/s): {rating}") |
| |
| |
| if avg_latency < 0.5: |
| rating = "⭐⭐⭐⭐⭐ 优秀" |
| elif avg_latency < 1.0: |
| rating = "⭐⭐⭐⭐ 良好" |
| elif avg_latency < 2.0: |
| rating = "⭐⭐⭐ 中等" |
| elif avg_latency < 5.0: |
| rating = "⭐⭐ 较高" |
| else: |
| rating = "⭐ 需优化" |
| |
| print(f" 平均延迟 ({avg_latency*1000:.0f}ms): {rating}") |
| print() |
|
|
|
|
| async def main(): |
| """主函数""" |
| parser = argparse.ArgumentParser(description='Grok2API 并发性能测试') |
| parser.add_argument('--url', default='http://localhost:8000', help='API 基础URL') |
| parser.add_argument('--key', default='', help='API Key(可选)') |
| parser.add_argument('-c', '--concurrency', type=int, default=10, help='并发数') |
| parser.add_argument('-n', '--requests', type=int, default=50, help='总请求数') |
| parser.add_argument('--multi-test', action='store_true', help='运行多级并发测试') |
| |
| args = parser.parse_args() |
| |
| print(f""" |
| ╔══════════════════════════════════════════════════════════╗ |
| ║ Grok2API 并发性能测试工具 ║ |
| ╚══════════════════════════════════════════════════════════╝ |
| |
| 🔗 测试目标: {args.url} |
| 🔑 API Key: {'已设置' if args.key else '未设置'} |
| ⏰ 开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} |
| """) |
| |
| tester = ConcurrencyTester(args.url, args.key) |
| |
| if args.multi_test: |
| |
| test_configs = [ |
| (5, 20), |
| (10, 50), |
| (20, 100), |
| (50, 200), |
| ] |
| |
| for concurrency, requests in test_configs: |
| tester.results = [] |
| await tester.run_concurrent_test(concurrency, requests) |
| await asyncio.sleep(2) |
| else: |
| |
| await tester.run_concurrent_test(args.concurrency, args.requests) |
| |
| print(f"\n✅ 测试完成!") |
| print(f"⏰ 结束时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") |
|
|
|
|
| if __name__ == "__main__": |
| try: |
| asyncio.run(main()) |
| except KeyboardInterrupt: |
| print("\n\n⚠️ 测试被用户中断") |
| except Exception as e: |
| print(f"\n\n❌ 测试失败: {e}") |
|
|