Spaces:
Build error
Build error
| #!/usr/bin/env python3 | |
| """ | |
| Performance Testing Script for Multi-Agent Platform API | |
| Simulates load testing for various endpoints and WebSocket connections | |
| """ | |
| import asyncio | |
| import aiohttp | |
| import json | |
| import time | |
| import random | |
| import uuid | |
| from typing import Dict, List, Any, Optional | |
| from dataclasses import dataclass | |
| from statistics import mean, median, stdev | |
| import argparse | |
| class TestResult: | |
| endpoint: str | |
| method: str | |
| status_code: int | |
| response_time: float | |
| timestamp: float | |
| class PerformanceTester: | |
| def __init__(self, base_url: str, auth_token: str): | |
| self.base_url = base_url.rstrip('/') | |
| self.auth_token = auth_token | |
| self.results: List[TestResult] = [] | |
| self.session: Optional[aiohttp.ClientSession] = None | |
| async def __aenter__(self): | |
| self.session = aiohttp.ClientSession( | |
| headers={'Authorization': f'Bearer {self.auth_token}'} | |
| ) | |
| return self | |
| async def __aexit__(self, exc_type, exc_val, exc_tb): | |
| if self.session: | |
| await self.session.close() | |
| async def make_request(self, method: str, endpoint: str, data: Dict = None) -> TestResult: | |
| """Make a single request and record metrics""" | |
| url = f"{self.base_url}{endpoint}" | |
| start_time = time.time() | |
| try: | |
| if method.upper() == 'GET': | |
| async with self.session.get(url) as response: | |
| response_time = time.time() - start_time | |
| return TestResult( | |
| endpoint=endpoint, | |
| method=method, | |
| status_code=response.status, | |
| response_time=response_time, | |
| timestamp=start_time | |
| ) | |
| elif method.upper() == 'POST': | |
| async with self.session.post(url, json=data) as response: | |
| response_time = time.time() - start_time | |
| return TestResult( | |
| endpoint=endpoint, | |
| method=method, | |
| status_code=response.status, | |
| response_time=response_time, | |
| timestamp=start_time | |
| ) | |
| except Exception as e: | |
| response_time = time.time() - start_time | |
| return TestResult( | |
| endpoint=endpoint, | |
| method=method, | |
| status_code=0, | |
| response_time=response_time, | |
| timestamp=start_time | |
| ) | |
| async def test_agent_registration(self, num_agents: int = 10) -> List[TestResult]: | |
| """Test agent registration endpoint""" | |
| print(f"Testing agent registration with {num_agents} agents...") | |
| tasks = [] | |
| for i in range(num_agents): | |
| agent_data = { | |
| "name": f"test_agent_{i}", | |
| "capabilities": ["task_execution", "data_processing"], | |
| "resources": {"cpu": 1, "memory": "512Mi"}, | |
| "metadata": {"test": True, "id": i} | |
| } | |
| task = self.make_request('POST', '/api/v1/agents', agent_data) | |
| tasks.append(task) | |
| results = await asyncio.gather(*tasks) | |
| self.results.extend(results) | |
| return results | |
| async def test_task_submission(self, num_tasks: int = 20) -> List[TestResult]: | |
| """Test task submission endpoint""" | |
| print(f"Testing task submission with {num_tasks} tasks...") | |
| tasks = [] | |
| for i in range(num_tasks): | |
| task_data = { | |
| "title": f"Test Task {i}", | |
| "description": f"This is test task number {i}", | |
| "priority": random.randint(1, 10), | |
| "required_capabilities": ["task_execution"], | |
| "resources": {"cpu": 0.5, "memory": "256Mi"}, | |
| "metadata": {"test": True, "id": i} | |
| } | |
| task = self.make_request('POST', '/api/v1/tasks', task_data) | |
| tasks.append(task) | |
| results = await asyncio.gather(*tasks) | |
| self.results.extend(results) | |
| return results | |
| async def test_list_endpoints(self, num_requests: int = 50) -> List[TestResult]: | |
| """Test list endpoints (agents, tasks, resources, conflicts)""" | |
| print(f"Testing list endpoints with {num_requests} requests...") | |
| endpoints = [ | |
| '/api/v1/agents', | |
| '/api/v1/tasks', | |
| '/api/v1/resources', | |
| '/api/v1/conflicts' | |
| ] | |
| tasks = [] | |
| for _ in range(num_requests): | |
| endpoint = random.choice(endpoints) | |
| task = self.make_request('GET', endpoint) | |
| tasks.append(task) | |
| results = await asyncio.gather(*tasks) | |
| self.results.extend(results) | |
| return results | |
| async def test_health_endpoint(self, num_requests: int = 100) -> List[TestResult]: | |
| """Test health endpoint""" | |
| print(f"Testing health endpoint with {num_requests} requests...") | |
| tasks = [] | |
| for _ in range(num_requests): | |
| task = self.make_request('GET', '/api/v1/health') | |
| tasks.append(task) | |
| results = await asyncio.gather(*tasks) | |
| self.results.extend(results) | |
| return results | |
| async def test_dashboard_endpoints(self, num_requests: int = 30) -> List[TestResult]: | |
| """Test dashboard endpoints""" | |
| print(f"Testing dashboard endpoints with {num_requests} requests...") | |
| endpoints = [ | |
| '/api/v1/dashboard/summary', | |
| '/api/v1/dashboard/activity' | |
| ] | |
| tasks = [] | |
| for _ in range(num_requests): | |
| endpoint = random.choice(endpoints) | |
| task = self.make_request('GET', endpoint) | |
| tasks.append(task) | |
| results = await asyncio.gather(*tasks) | |
| self.results.extend(results) | |
| return results | |
| async def test_websocket_connections(self, num_connections: int = 10, duration: int = 30): | |
| """Test WebSocket connections""" | |
| print(f"Testing WebSocket connections with {num_connections} connections for {duration}s...") | |
| async def websocket_client(client_id: str): | |
| try: | |
| async with aiohttp.ClientSession() as session: | |
| async with session.ws_connect( | |
| f"{self.base_url.replace('http', 'ws')}/api/v1/ws/{client_id}" | |
| ) as ws: | |
| # Send subscription message | |
| await ws.send_json({ | |
| "type": "subscribe", | |
| "subscriptions": ["agent_activity", "task_activity"] | |
| }) | |
| # Keep connection alive and receive messages | |
| start_time = time.time() | |
| message_count = 0 | |
| while time.time() - start_time < duration: | |
| try: | |
| msg = await asyncio.wait_for(ws.receive_json(), timeout=1.0) | |
| message_count += 1 | |
| except asyncio.TimeoutError: | |
| continue | |
| print(f"Client {client_id}: Received {message_count} messages") | |
| except Exception as e: | |
| print(f"WebSocket client {client_id} error: {e}") | |
| # Create multiple WebSocket connections | |
| tasks = [] | |
| for i in range(num_connections): | |
| client_id = f"test_client_{i}" | |
| task = websocket_client(client_id) | |
| tasks.append(task) | |
| await asyncio.gather(*tasks) | |
| def generate_report(self) -> Dict[str, Any]: | |
| """Generate performance test report""" | |
| if not self.results: | |
| return {"error": "No test results available"} | |
| # Group results by endpoint | |
| endpoint_results = {} | |
| for result in self.results: | |
| if result.endpoint not in endpoint_results: | |
| endpoint_results[result.endpoint] = [] | |
| endpoint_results[result.endpoint].append(result) | |
| # Calculate statistics for each endpoint | |
| report = { | |
| "summary": { | |
| "total_requests": len(self.results), | |
| "successful_requests": len([r for r in self.results if r.status_code == 200]), | |
| "failed_requests": len([r for r in self.results if r.status_code != 200]), | |
| "total_duration": max(r.timestamp for r in self.results) - min(r.timestamp for r in self.results) | |
| }, | |
| "endpoints": {} | |
| } | |
| for endpoint, results in endpoint_results.items(): | |
| response_times = [r.response_time for r in results] | |
| status_codes = [r.status_code for r in results] | |
| report["endpoints"][endpoint] = { | |
| "total_requests": len(results), | |
| "successful_requests": len([r for r in results if r.status_code == 200]), | |
| "failed_requests": len([r for r in results if r.status_code != 200]), | |
| "response_time_stats": { | |
| "mean": mean(response_times), | |
| "median": median(response_times), | |
| "min": min(response_times), | |
| "max": max(response_times), | |
| "std_dev": stdev(response_times) if len(response_times) > 1 else 0 | |
| }, | |
| "status_code_distribution": { | |
| str(code): status_codes.count(code) for code in set(status_codes) | |
| } | |
| } | |
| return report | |
| def print_report(self, report: Dict[str, Any]): | |
| """Print formatted performance report""" | |
| print("\n" + "="*60) | |
| print("PERFORMANCE TEST REPORT") | |
| print("="*60) | |
| summary = report["summary"] | |
| print(f"\nSUMMARY:") | |
| print(f" Total Requests: {summary['total_requests']}") | |
| print(f" Successful: {summary['successful_requests']}") | |
| print(f" Failed: {summary['failed_requests']}") | |
| print(f" Success Rate: {summary['successful_requests']/summary['total_requests']*100:.2f}%") | |
| print(f" Total Duration: {summary['total_duration']:.2f}s") | |
| print(f"\nENDPOINT DETAILS:") | |
| for endpoint, stats in report["endpoints"].items(): | |
| print(f"\n {endpoint}:") | |
| print(f" Requests: {stats['total_requests']}") | |
| print(f" Success Rate: {stats['successful_requests']/stats['total_requests']*100:.2f}%") | |
| rt_stats = stats['response_time_stats'] | |
| print(f" Response Time (s):") | |
| print(f" Mean: {rt_stats['mean']:.3f}") | |
| print(f" Median: {rt_stats['median']:.3f}") | |
| print(f" Min: {rt_stats['min']:.3f}") | |
| print(f" Max: {rt_stats['max']:.3f}") | |
| print(f" Std Dev: {rt_stats['std_dev']:.3f}") | |
| print(f" Status Codes: {stats['status_code_distribution']}") | |
| async def main(): | |
| parser = argparse.ArgumentParser(description='Performance test for Multi-Agent Platform API') | |
| parser.add_argument('--base-url', default='http://localhost:8000', help='Base URL of the API') | |
| parser.add_argument('--auth-token', default='test-token', help='Authentication token') | |
| parser.add_argument('--agents', type=int, default=10, help='Number of agents to register') | |
| parser.add_argument('--tasks', type=int, default=20, help='Number of tasks to submit') | |
| parser.add_argument('--list-requests', type=int, default=50, help='Number of list requests') | |
| parser.add_argument('--health-requests', type=int, default=100, help='Number of health requests') | |
| parser.add_argument('--dashboard-requests', type=int, default=30, help='Number of dashboard requests') | |
| parser.add_argument('--websocket-connections', type=int, default=10, help='Number of WebSocket connections') | |
| parser.add_argument('--websocket-duration', type=int, default=30, help='WebSocket test duration in seconds') | |
| parser.add_argument('--output', help='Output file for JSON report') | |
| args = parser.parse_args() | |
| print("Starting Performance Test...") | |
| print(f"Target URL: {args.base_url}") | |
| print(f"Test Configuration:") | |
| print(f" Agents: {args.agents}") | |
| print(f" Tasks: {args.tasks}") | |
| print(f" List Requests: {args.list_requests}") | |
| print(f" Health Requests: {args.health_requests}") | |
| print(f" Dashboard Requests: {args.dashboard_requests}") | |
| print(f" WebSocket Connections: {args.websocket_connections}") | |
| print(f" WebSocket Duration: {args.websocket_duration}s") | |
| async with PerformanceTester(args.base_url, args.auth_token) as tester: | |
| # Run all tests | |
| await tester.test_health_endpoint(args.health_requests) | |
| await tester.test_agent_registration(args.agents) | |
| await tester.test_task_submission(args.tasks) | |
| await tester.test_list_endpoints(args.list_requests) | |
| await tester.test_dashboard_endpoints(args.dashboard_requests) | |
| # Test WebSocket connections | |
| await tester.test_websocket_connections(args.websocket_connections, args.websocket_duration) | |
| # Generate and print report | |
| report = tester.generate_report() | |
| tester.print_report(report) | |
| # Save report to file if specified | |
| if args.output: | |
| with open(args.output, 'w') as f: | |
| json.dump(report, f, indent=2) | |
| print(f"\nReport saved to: {args.output}") | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |