Spaces:
Build error
Build error
| # performance_test.py | |
| import asyncio | |
| import aiohttp | |
| import time | |
| import statistics | |
| from typing import List, Dict | |
| class PerformanceTester: | |
| def __init__(self, base_url: str, token: str): | |
| self.base_url = base_url | |
| self.headers = {"Authorization": f"Bearer {token}"} | |
| async def register_test_agents(self, count: int) -> List[str]: | |
| '''Register multiple test agents''' | |
| agent_ids = [] | |
| async with aiohttp.ClientSession() as session: | |
| tasks = [] | |
| for i in range(count): | |
| agent_data = { | |
| "name": f"TestAgent_{i}", | |
| "version": "1.0.0", | |
| "capabilities": ["REASONING", "COLLABORATION"], | |
| "tags": ["test", "performance"], | |
| "resources": {"cpu_cores": 0.5, "memory_mb": 256} | |
| } | |
| task = session.post( | |
| f"{self.base_url}/api/v2/agents/register", | |
| json=agent_data, | |
| headers=self.headers | |
| ) | |
| tasks.append(task) | |
| responses = await asyncio.gather(*tasks) | |
| for response in responses: | |
| data = await response.json() | |
| agent_ids.append(data["agent_id"]) | |
| return agent_ids | |
| async def submit_test_tasks(self, count: int) -> List[float]: | |
| '''Submit multiple tasks and measure latency''' | |
| latencies = [] | |
| async with aiohttp.ClientSession() as session: | |
| for i in range(count): | |
| task_data = { | |
| "task_type": "test_task", | |
| "priority": 5, | |
| "payload": {"test_id": i}, | |
| "required_capabilities": ["REASONING"] | |
| } | |
| start_time = time.time() | |
| response = await session.post( | |
| f"{self.base_url}/api/v2/tasks/submit", | |
| json=task_data, | |
| headers=self.headers | |
| ) | |
| latency = time.time() - start_time | |
| latencies.append(latency) | |
| if response.status != 200: | |
| print(f"Task submission failed: {await response.text()}") | |
| return latencies | |
| async def run_load_test(self, agent_count: int, task_count: int, | |
| concurrent_tasks: int = 10): | |
| '''Run a load test''' | |
| print(f"Starting load test with {agent_count} agents and {task_count} tasks") | |
| # Register agents | |
| print("Registering agents...") | |
| agent_ids = await self.register_test_agents(agent_count) | |
| print(f"Registered {len(agent_ids)} agents") | |
| # Submit tasks with concurrency control | |
| print(f"Submitting {task_count} tasks...") | |
| semaphore = asyncio.Semaphore(concurrent_tasks) | |
| async def submit_with_limit(): | |
| async with semaphore: | |
| return await self.submit_test_tasks(1) | |
| tasks = [submit_with_limit() for _ in range(task_count)] | |
| results = await asyncio.gather(*tasks) | |
| # Flatten results | |
| all_latencies = [lat for result in results for lat in result] | |
| # Calculate statistics | |
| avg_latency = statistics.mean(all_latencies) | |
| p50_latency = statistics.median(all_latencies) | |
| p95_latency = statistics.quantiles(all_latencies, n=20)[18] # 95th percentile | |
| p99_latency = statistics.quantiles(all_latencies, n=100)[98] # 99th percentile | |
| print(f"\nPerformance Results:") | |
| print(f"Total requests: {len(all_latencies)}") | |
| print(f"Average latency: {avg_latency:.3f}s") | |
| print(f"P50 latency: {p50_latency:.3f}s") | |
| print(f"P95 latency: {p95_latency:.3f}s") | |
| print(f"P99 latency: {p99_latency:.3f}s") | |
| return { | |
| "total_requests": len(all_latencies), | |
| "avg_latency": avg_latency, | |
| "p50_latency": p50_latency, | |
| "p95_latency": p95_latency, | |
| "p99_latency": p99_latency | |
| } | |
| async def main(): | |
| tester = PerformanceTester("http://localhost:8080", "valid_token") | |
| # Run different load scenarios | |
| scenarios = [ | |
| (10, 100, 10), # 10 agents, 100 tasks, 10 concurrent | |
| (50, 500, 20), # 50 agents, 500 tasks, 20 concurrent | |
| (100, 1000, 50), # 100 agents, 1000 tasks, 50 concurrent | |
| ] | |
| for agent_count, task_count, concurrent in scenarios: | |
| print(f"\n{'='*50}") | |
| results = await tester.run_load_test(agent_count, task_count, concurrent) | |
| print(f"{'='*50}\n") | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |