Spaces:
Configuration error
Configuration error
| """ | |
| Load and Performance Tests for AetheroOS | |
| """ | |
| import pytest | |
| import asyncio | |
| pytestmark = pytest.mark.skip(reason="aiohttp not supported on Python 3.13") | |
| # import aiohttp temporarily removed due to Python 3.13 compatibility issues | |
| import time | |
| from datetime import datetime | |
| import logging | |
| import statistics | |
| from typing import Dict, List, Any | |
| import concurrent.futures | |
| import psutil | |
| import docker | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class TestLoadPerformance: | |
| async def http_client(self): | |
| async with aiohttp.ClientSession() as session: | |
| yield session | |
| def docker_client(self): | |
| return docker.from_env() | |
| async def test_concurrent_agent_operations(self, http_client): | |
| """Test system performance under concurrent agent operations""" | |
| num_requests = 100 | |
| concurrent_limit = 10 | |
| async def make_request(task_id: int) -> Dict[str, Any]: | |
| task_data = { | |
| "directive": f"Test directive {task_id}", | |
| "priority": "high", | |
| "context": {"test_id": task_id} | |
| } | |
| start_time = time.time() | |
| try: | |
| async with http_client.post( | |
| "http://localhost:8000/api/v1/plan", | |
| json=task_data | |
| ) as response: | |
| assert response.status == 200 | |
| result = await response.json() | |
| duration = time.time() - start_time | |
| return {"success": True, "duration": duration} | |
| except Exception as e: | |
| duration = time.time() - start_time | |
| return {"success": False, "duration": duration, "error": str(e)} | |
| # Execute concurrent requests | |
| tasks = [] | |
| for i in range(num_requests): | |
| if len(tasks) >= concurrent_limit: | |
| done, tasks = await asyncio.wait( | |
| tasks, | |
| return_when=asyncio.FIRST_COMPLETED | |
| ) | |
| tasks.append(asyncio.create_task(make_request(i))) | |
| # Wait for remaining tasks | |
| results = await asyncio.gather(*tasks) | |
| # Analyze results | |
| durations = [r["duration"] for r in results if r["success"]] | |
| success_rate = len([r for r in results if r["success"]]) / len(results) | |
| assert success_rate >= 0.95, f"Success rate below threshold: {success_rate}" | |
| assert statistics.mean(durations) < 2.0, "Average response time too high" | |
| async def test_memory_system_load(self, http_client): | |
| """Test memory system performance under load""" | |
| num_operations = 1000 | |
| batch_size = 50 | |
| async def batch_operation(batch_id: int) -> List[Dict[str, Any]]: | |
| results = [] | |
| for i in range(batch_size): | |
| data = { | |
| "agent_id": f"test_agent_{batch_id}_{i}", | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "state": "processing", | |
| "data": {"test": f"data_{batch_id}_{i}"} | |
| } | |
| start_time = time.time() | |
| try: | |
| async with http_client.post( | |
| "http://localhost:9091/api/v1/states", | |
| json=data | |
| ) as response: | |
| assert response.status == 201 | |
| duration = time.time() - start_time | |
| results.append({"success": True, "duration": duration}) | |
| except Exception as e: | |
| duration = time.time() - start_time | |
| results.append({ | |
| "success": False, | |
| "duration": duration, | |
| "error": str(e) | |
| }) | |
| return results | |
| # Execute batched operations | |
| tasks = [] | |
| for i in range(0, num_operations, batch_size): | |
| tasks.append(asyncio.create_task(batch_operation(i // batch_size))) | |
| batch_results = await asyncio.gather(*tasks) | |
| results = [r for batch in batch_results for r in batch] | |
| # Analyze results | |
| durations = [r["duration"] for r in results if r["success"]] | |
| success_rate = len([r for r in results if r["success"]]) / len(results) | |
| assert success_rate >= 0.95, f"Memory system success rate below threshold: {success_rate}" | |
| assert statistics.mean(durations) < 0.1, "Memory system average response time too high" | |
| async def test_metrics_collection_performance(self, http_client): | |
| """Test metrics collection system under load""" | |
| num_metrics = 1000 | |
| collection_interval = 0.1 # seconds | |
| async def send_metrics(metric_id: int) -> Dict[str, Any]: | |
| metrics = { | |
| "test_metric": metric_id, | |
| "timestamp": time.time(), | |
| "value": metric_id % 100 | |
| } | |
| start_time = time.time() | |
| try: | |
| async with http_client.post( | |
| "http://localhost:9091/metrics/job/load_test", | |
| json=metrics | |
| ) as response: | |
| assert response.status == 200 | |
| duration = time.time() - start_time | |
| return {"success": True, "duration": duration} | |
| except Exception as e: | |
| duration = time.time() - start_time | |
| return {"success": False, "duration": duration, "error": str(e)} | |
| # Send metrics with controlled rate | |
| tasks = [] | |
| for i in range(num_metrics): | |
| tasks.append(asyncio.create_task(send_metrics(i))) | |
| await asyncio.sleep(collection_interval) | |
| results = await asyncio.gather(*tasks) | |
| # Analyze results | |
| durations = [r["duration"] for r in results if r["success"]] | |
| success_rate = len([r for r in results if r["success"]]) / len(results) | |
| assert success_rate >= 0.95, f"Metrics collection success rate below threshold: {success_rate}" | |
| assert statistics.mean(durations) < 0.05, "Metrics collection average time too high" | |
| def test_system_resource_usage(self, docker_client): | |
| """Test system resource usage under normal operation""" | |
| containers = docker_client.containers.list( | |
| filters={"name": "aetheros_"} | |
| ) | |
| for container in containers: | |
| stats = container.stats(stream=False) | |
| # Calculate CPU usage | |
| cpu_delta = stats["cpu_stats"]["cpu_usage"]["total_usage"] - \ | |
| stats["precpu_stats"]["cpu_usage"]["total_usage"] | |
| system_delta = stats["cpu_stats"]["system_cpu_usage"] - \ | |
| stats["precpu_stats"]["system_cpu_usage"] | |
| cpu_usage = (cpu_delta / system_delta) * 100.0 | |
| # Calculate memory usage | |
| memory_usage = stats["memory_stats"]["usage"] / \ | |
| stats["memory_stats"]["limit"] * 100.0 | |
| assert cpu_usage < 80.0, f"High CPU usage in {container.name}: {cpu_usage}%" | |
| assert memory_usage < 80.0, f"High memory usage in {container.name}: {memory_usage}%" | |
| async def test_network_resilience(self, http_client, docker_client): | |
| """Test system resilience under network stress""" | |
| # Simulate network latency | |
| container = docker_client.containers.get("aetheros_mem") | |
| container.exec_run( | |
| "tc qdisc add dev eth0 root netem delay 100ms 10ms distribution normal" | |
| ) | |
| try: | |
| # Test operations under latency | |
| async def test_operation() -> Dict[str, Any]: | |
| start_time = time.time() | |
| try: | |
| async with http_client.get( | |
| "http://localhost:9091/health" | |
| ) as response: | |
| assert response.status == 200 | |
| duration = time.time() - start_time | |
| return {"success": True, "duration": duration} | |
| except Exception as e: | |
| duration = time.time() - start_time | |
| return {"success": False, "duration": duration, "error": str(e)} | |
| # Execute multiple operations | |
| tasks = [asyncio.create_task(test_operation()) for _ in range(50)] | |
| results = await asyncio.gather(*tasks) | |
| # Analyze results | |
| success_rate = len([r for r in results if r["success"]]) / len(results) | |
| assert success_rate >= 0.9, f"Network resilience test failed: {success_rate}" | |
| finally: | |
| # Remove network latency | |
| container.exec_run("tc qdisc del dev eth0 root") | |
| async def test_recovery_time(self, http_client, docker_client): | |
| """Test system recovery time after component restart""" | |
| container = docker_client.containers.get("aetheros_mem") | |
| # Stop container | |
| container.stop() | |
| # Record start time | |
| start_time = time.time() | |
| # Start container | |
| container.start() | |
| # Wait for recovery | |
| recovered = False | |
| while time.time() - start_time < 30: # 30 second timeout | |
| try: | |
| async with http_client.get( | |
| "http://localhost:9091/health" | |
| ) as response: | |
| if response.status == 200: | |
| recovered = True | |
| break | |
| except: | |
| await asyncio.sleep(0.5) | |
| continue | |
| recovery_time = time.time() - start_time | |
| assert recovered, "System failed to recover" | |
| assert recovery_time < 10, f"Recovery time too long: {recovery_time}s" | |
| if __name__ == "__main__": | |
| pytest.main(["-v", __file__]) | |