| | |
| | """ |
| | Grok2API 并发性能测试脚本 |
| | |
| | 测试不同并发级别下的API性能表现 |
| | """ |
| |
|
| | import asyncio |
| | import aiohttp |
| | import time |
| | import statistics |
| | import argparse |
| | from datetime import datetime |
| | from typing import List, Dict, Any |
| | import json |
| |
|
| |
|
| | class ConcurrencyTester: |
| | """并发测试器""" |
| | |
| | def __init__(self, base_url: str, api_key: str = None): |
| | self.base_url = base_url.rstrip('/') |
| | self.api_key = api_key |
| | self.results: List[Dict[str, Any]] = [] |
| | |
| | async def test_request(self, session: aiohttp.ClientSession, request_id: int) -> Dict[str, Any]: |
| | """发送单个测试请求""" |
| | url = f"{self.base_url}/v1/chat/completions" |
| | |
| | headers = { |
| | "Content-Type": "application/json" |
| | } |
| | if self.api_key: |
| | headers["Authorization"] = f"Bearer {self.api_key}" |
| | |
| | payload = { |
| | "model": "grok-3-fast", |
| | "messages": [ |
| | {"role": "user", "content": f"测试请求 #{request_id},请简短回复OK"} |
| | ], |
| | "stream": False, |
| | "max_tokens": 10 |
| | } |
| | |
| | start_time = time.time() |
| | |
| | try: |
| | async with session.post(url, json=payload, headers=headers, timeout=30) as response: |
| | status = response.status |
| | |
| | if status == 200: |
| | data = await response.json() |
| | elapsed = time.time() - start_time |
| | |
| | return { |
| | "id": request_id, |
| | "status": "success", |
| | "http_status": status, |
| | "elapsed": elapsed, |
| | "response_length": len(json.dumps(data)) |
| | } |
| | else: |
| | elapsed = time.time() - start_time |
| | error_text = await response.text() |
| | |
| | return { |
| | "id": request_id, |
| | "status": "error", |
| | "http_status": status, |
| | "elapsed": elapsed, |
| | "error": error_text[:200] |
| | } |
| | |
| | except asyncio.TimeoutError: |
| | elapsed = time.time() - start_time |
| | return { |
| | "id": request_id, |
| | "status": "timeout", |
| | "elapsed": elapsed, |
| | "error": "Request timeout" |
| | } |
| | |
| | except Exception as e: |
| | elapsed = time.time() - start_time |
| | return { |
| | "id": request_id, |
| | "status": "exception", |
| | "elapsed": elapsed, |
| | "error": str(e) |
| | } |
| | |
| | async def run_concurrent_test(self, concurrency: int, total_requests: int): |
| | """运行并发测试""" |
| | print(f"\n{'='*60}") |
| | print(f"📊 测试配置:并发数 {concurrency}, 总请求数 {total_requests}") |
| | print(f"{'='*60}") |
| | |
| | connector = aiohttp.TCPConnector(limit=concurrency, limit_per_host=concurrency) |
| | timeout = aiohttp.ClientTimeout(total=60) |
| | |
| | async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session: |
| | |
| | print("🔥 预热中...") |
| | await self.test_request(session, 0) |
| | |
| | |
| | print(f"🚀 开始并发测试...") |
| | start_time = time.time() |
| | |
| | |
| | tasks = [] |
| | for i in range(1, total_requests + 1): |
| | task = asyncio.create_task(self.test_request(session, i)) |
| | tasks.append(task) |
| | |
| | |
| | if len(tasks) >= concurrency: |
| | results = await asyncio.gather(*tasks) |
| | self.results.extend(results) |
| | tasks = [] |
| | |
| | |
| | print(f" 进度: {i}/{total_requests} ({i/total_requests*100:.1f}%)", end='\r') |
| | |
| | |
| | if tasks: |
| | results = await asyncio.gather(*tasks) |
| | self.results.extend(results) |
| | |
| | total_time = time.time() - start_time |
| | |
| | |
| | self.print_statistics(concurrency, total_requests, total_time) |
| | |
| | def print_statistics(self, concurrency: int, total_requests: int, total_time: float): |
| | """打印统计信息""" |
| | success_results = [r for r in self.results if r["status"] == "success"] |
| | error_results = [r for r in self.results if r["status"] != "success"] |
| | |
| | success_count = len(success_results) |
| | error_count = len(error_results) |
| | |
| | if success_results: |
| | latencies = [r["elapsed"] for r in success_results] |
| | avg_latency = statistics.mean(latencies) |
| | min_latency = min(latencies) |
| | max_latency = max(latencies) |
| | p50_latency = statistics.median(latencies) |
| | p95_latency = sorted(latencies)[int(len(latencies) * 0.95)] if len(latencies) > 1 else latencies[0] |
| | p99_latency = sorted(latencies)[int(len(latencies) * 0.99)] if len(latencies) > 1 else latencies[0] |
| | else: |
| | avg_latency = min_latency = max_latency = p50_latency = p95_latency = p99_latency = 0 |
| | |
| | throughput = total_requests / total_time if total_time > 0 else 0 |
| | |
| | print(f"\n\n{'='*60}") |
| | print(f"📈 测试结果统计") |
| | print(f"{'='*60}") |
| | print(f" 测试时间: {total_time:.2f}s") |
| | print(f" 总请求数: {total_requests}") |
| | print(f" 并发数: {concurrency}") |
| | print(f"") |
| | print(f" 成功请求: {success_count} ({success_count/total_requests*100:.1f}%)") |
| | print(f" 失败请求: {error_count} ({error_count/total_requests*100:.1f}%)") |
| | print(f"") |
| | print(f" 吞吐量: {throughput:.2f} req/s") |
| | print(f"") |
| | print(f" 延迟统计:") |
| | print(f" 最小: {min_latency*1000:.0f}ms") |
| | print(f" 平均: {avg_latency*1000:.0f}ms") |
| | print(f" 最大: {max_latency*1000:.0f}ms") |
| | print(f" P50: {p50_latency*1000:.0f}ms") |
| | print(f" P95: {p95_latency*1000:.0f}ms") |
| | print(f" P99: {p99_latency*1000:.0f}ms") |
| | |
| | |
| | if error_results: |
| | print(f"\n ⚠️ 错误详情:") |
| | error_types = {} |
| | for r in error_results: |
| | error_type = r.get("status", "unknown") |
| | error_types[error_type] = error_types.get(error_type, 0) + 1 |
| | |
| | for error_type, count in error_types.items(): |
| | print(f" {error_type}: {count}") |
| | |
| | print(f"{'='*60}\n") |
| | |
| | |
| | self.print_performance_rating(throughput, avg_latency) |
| | |
| | def print_performance_rating(self, throughput: float, avg_latency: float): |
| | """打印性能评级""" |
| | print(f"🎯 性能评级:") |
| | |
| | |
| | if throughput >= 100: |
| | rating = "⭐⭐⭐⭐⭐ 优秀" |
| | elif throughput >= 60: |
| | rating = "⭐⭐⭐⭐ 良好" |
| | elif throughput >= 30: |
| | rating = "⭐⭐⭐ 中等" |
| | elif throughput >= 10: |
| | rating = "⭐⭐ 较低" |
| | else: |
| | rating = "⭐ 需优化" |
| | |
| | print(f" 吞吐量 ({throughput:.1f} req/s): {rating}") |
| | |
| | |
| | if avg_latency < 0.5: |
| | rating = "⭐⭐⭐⭐⭐ 优秀" |
| | elif avg_latency < 1.0: |
| | rating = "⭐⭐⭐⭐ 良好" |
| | elif avg_latency < 2.0: |
| | rating = "⭐⭐⭐ 中等" |
| | elif avg_latency < 5.0: |
| | rating = "⭐⭐ 较高" |
| | else: |
| | rating = "⭐ 需优化" |
| | |
| | print(f" 平均延迟 ({avg_latency*1000:.0f}ms): {rating}") |
| | print() |
| |
|
| |
|
| | async def main(): |
| | """主函数""" |
| | parser = argparse.ArgumentParser(description='Grok2API 并发性能测试') |
| | parser.add_argument('--url', default='http://localhost:8000', help='API 基础URL') |
| | parser.add_argument('--key', default='', help='API Key(可选)') |
| | parser.add_argument('-c', '--concurrency', type=int, default=10, help='并发数') |
| | parser.add_argument('-n', '--requests', type=int, default=50, help='总请求数') |
| | parser.add_argument('--multi-test', action='store_true', help='运行多级并发测试') |
| | |
| | args = parser.parse_args() |
| | |
| | print(f""" |
| | ╔══════════════════════════════════════════════════════════╗ |
| | ║ Grok2API 并发性能测试工具 ║ |
| | ╚══════════════════════════════════════════════════════════╝ |
| | |
| | 🔗 测试目标: {args.url} |
| | 🔑 API Key: {'已设置' if args.key else '未设置'} |
| | ⏰ 开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} |
| | """) |
| | |
| | tester = ConcurrencyTester(args.url, args.key) |
| | |
| | if args.multi_test: |
| | |
| | test_configs = [ |
| | (5, 20), |
| | (10, 50), |
| | (20, 100), |
| | (50, 200), |
| | ] |
| | |
| | for concurrency, requests in test_configs: |
| | tester.results = [] |
| | await tester.run_concurrent_test(concurrency, requests) |
| | await asyncio.sleep(2) |
| | else: |
| | |
| | await tester.run_concurrent_test(args.concurrency, args.requests) |
| | |
| | print(f"\n✅ 测试完成!") |
| | print(f"⏰ 结束时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") |
| |
|
| |
|
| | if __name__ == "__main__": |
| | try: |
| | asyncio.run(main()) |
| | except KeyboardInterrupt: |
| | print("\n\n⚠️ 测试被用户中断") |
| | except Exception as e: |
| | print(f"\n\n❌ 测试失败: {e}") |
| |
|