Spaces:
Build error
Build error
File size: 4,866 Bytes
8a682b5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 | # performance_test.py
import asyncio
import aiohttp
import time
import statistics
from typing import List, Dict
class PerformanceTester:
def __init__(self, base_url: str, token: str):
self.base_url = base_url
self.headers = {"Authorization": f"Bearer {token}"}
async def register_test_agents(self, count: int) -> List[str]:
'''Register multiple test agents'''
agent_ids = []
async with aiohttp.ClientSession() as session:
tasks = []
for i in range(count):
agent_data = {
"name": f"TestAgent_{i}",
"version": "1.0.0",
"capabilities": ["REASONING", "COLLABORATION"],
"tags": ["test", "performance"],
"resources": {"cpu_cores": 0.5, "memory_mb": 256}
}
task = session.post(
f"{self.base_url}/api/v2/agents/register",
json=agent_data,
headers=self.headers
)
tasks.append(task)
responses = await asyncio.gather(*tasks)
for response in responses:
data = await response.json()
agent_ids.append(data["agent_id"])
return agent_ids
async def submit_test_tasks(self, count: int) -> List[float]:
'''Submit multiple tasks and measure latency'''
latencies = []
async with aiohttp.ClientSession() as session:
for i in range(count):
task_data = {
"task_type": "test_task",
"priority": 5,
"payload": {"test_id": i},
"required_capabilities": ["REASONING"]
}
start_time = time.time()
response = await session.post(
f"{self.base_url}/api/v2/tasks/submit",
json=task_data,
headers=self.headers
)
latency = time.time() - start_time
latencies.append(latency)
if response.status != 200:
print(f"Task submission failed: {await response.text()}")
return latencies
async def run_load_test(self, agent_count: int, task_count: int,
concurrent_tasks: int = 10):
'''Run a load test'''
print(f"Starting load test with {agent_count} agents and {task_count} tasks")
# Register agents
print("Registering agents...")
agent_ids = await self.register_test_agents(agent_count)
print(f"Registered {len(agent_ids)} agents")
# Submit tasks with concurrency control
print(f"Submitting {task_count} tasks...")
semaphore = asyncio.Semaphore(concurrent_tasks)
async def submit_with_limit():
async with semaphore:
return await self.submit_test_tasks(1)
tasks = [submit_with_limit() for _ in range(task_count)]
results = await asyncio.gather(*tasks)
# Flatten results
all_latencies = [lat for result in results for lat in result]
# Calculate statistics
avg_latency = statistics.mean(all_latencies)
p50_latency = statistics.median(all_latencies)
p95_latency = statistics.quantiles(all_latencies, n=20)[18] # 95th percentile
p99_latency = statistics.quantiles(all_latencies, n=100)[98] # 99th percentile
print(f"\nPerformance Results:")
print(f"Total requests: {len(all_latencies)}")
print(f"Average latency: {avg_latency:.3f}s")
print(f"P50 latency: {p50_latency:.3f}s")
print(f"P95 latency: {p95_latency:.3f}s")
print(f"P99 latency: {p99_latency:.3f}s")
return {
"total_requests": len(all_latencies),
"avg_latency": avg_latency,
"p50_latency": p50_latency,
"p95_latency": p95_latency,
"p99_latency": p99_latency
}
async def main():
tester = PerformanceTester("http://localhost:8080", "valid_token")
# Run different load scenarios
scenarios = [
(10, 100, 10), # 10 agents, 100 tasks, 10 concurrent
(50, 500, 20), # 50 agents, 500 tasks, 20 concurrent
(100, 1000, 50), # 100 agents, 1000 tasks, 50 concurrent
]
for agent_count, task_count, concurrent in scenarios:
print(f"\n{'='*50}")
results = await tester.run_load_test(agent_count, task_count, concurrent)
print(f"{'='*50}\n")
if __name__ == "__main__":
asyncio.run(main()) |