Spaces:
Configuration error
Configuration error
| import asyncio | |
| import logging | |
| import time | |
| import psutil | |
| import statistics | |
| import pytest | |
| from datetime import datetime | |
| from typing import Dict, Any, List | |
| from concurrent.futures import ThreadPoolExecutor | |
| import sys | |
| import os | |
| sys.path.append(os.path.join(os.path.dirname(__file__), '..')) | |
| from src.agents.aethero_agent_bootstrap import BaseAetheroAgent | |
| from src.agents.agent_bus import AgentBus, Message | |
| from src.monitoring.monitor import AetheroMonitor | |
| class ScaleTestAgent(BaseAetheroAgent): | |
| """Agent implementation for scale testing""" | |
| async def process_task(self, task_data: Dict[str, Any], asl_context: Dict[str, Any]) -> Dict[str, Any]: | |
| """Process task with configurable load simulation""" | |
| # Simulate CPU load | |
| if task_data.get("cpu_intensive", False): | |
| self._simulate_cpu_load(task_data.get("cpu_load_duration", 0.1)) | |
| # Simulate memory allocation | |
| if task_data.get("memory_intensive", False): | |
| self._simulate_memory_load(task_data.get("memory_size_mb", 1)) | |
| return { | |
| "status": "success", | |
| "result": f"Processed by {self.agent_id}", | |
| "task_data": task_data, | |
| "asl_context": asl_context, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| def _simulate_cpu_load(self, duration: float): | |
| """Simulate CPU-intensive task""" | |
| start_time = time.time() | |
| while time.time() - start_time < duration: | |
| _ = [i * i for i in range(1000)] | |
| def _simulate_memory_load(self, size_mb: int): | |
| """Simulate memory-intensive task""" | |
| # Allocate memory (1MB = 1024 * 1024 bytes) | |
| _ = bytearray(size_mb * 1024 * 1024) | |
| class PerformanceMetrics: | |
| """Track performance metrics during scale testing""" | |
| def __init__(self): | |
| self.response_times = [] | |
| self.cpu_usage = [] | |
| self.memory_usage = [] | |
| self.error_count = 0 | |
| self.success_count = 0 | |
| def add_response_time(self, time_ms: float): | |
| self.response_times.append(time_ms) | |
| def add_system_metrics(self, cpu: float, memory: float): | |
| self.cpu_usage.append(cpu) | |
| self.memory_usage.append(memory) | |
| def increment_error(self): | |
| self.error_count += 1 | |
| def increment_success(self): | |
| self.success_count += 1 | |
| def get_summary(self) -> Dict[str, Any]: | |
| """Get summary of performance metrics""" | |
| return { | |
| "response_times": { | |
| "avg": statistics.mean(self.response_times) if self.response_times else 0, | |
| "p95": statistics.quantiles(self.response_times, n=20)[18] if len(self.response_times) >= 20 else 0, | |
| "max": max(self.response_times) if self.response_times else 0 | |
| }, | |
| "system_metrics": { | |
| "cpu_avg": statistics.mean(self.cpu_usage) if self.cpu_usage else 0, | |
| "memory_avg": statistics.mean(self.memory_usage) if self.memory_usage else 0 | |
| }, | |
| "success_rate": self.success_count / (self.success_count + self.error_count) if (self.success_count + self.error_count) > 0 else 0 | |
| } | |
| async def create_large_message(size_mb: int) -> Dict[str, Any]: | |
| """Create a message with specified size""" | |
| return { | |
| "data": "x" * (size_mb * 1024 * 1024), # 1MB = 1024 * 1024 bytes | |
| "metadata": { | |
| "size_mb": size_mb, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| } | |
| async def test_concurrent_agents( | |
| agent_bus: AgentBus, | |
| logger: logging.Logger, | |
| test_config: Dict[str, Any], | |
| test_agent_count: int, | |
| test_tasks_per_agent: int | |
| ): | |
| """Test system with multiple concurrent agents""" | |
| metrics = PerformanceMetrics() | |
| # Create agents | |
| agents = [] | |
| for i in range(test_agent_count): | |
| agent = ScaleTestAgent( | |
| f"scale_agent_{i}", | |
| test_config, | |
| logger, | |
| agent_bus | |
| ) | |
| agents.append(agent) | |
| # Create tasks | |
| tasks = [] | |
| for agent in agents: | |
| for j in range(test_tasks_per_agent): | |
| task_data = { | |
| "task_id": f"task_{j}", | |
| "cpu_intensive": True, | |
| "cpu_load_duration": 0.1, | |
| "memory_intensive": True, | |
| "memory_size_mb": 1 | |
| } | |
| async def execute_task(agent, task_data): | |
| try: | |
| start_time = time.time() | |
| result = await agent.execute_task(task_data, {}) | |
| metrics.add_response_time((time.time() - start_time) * 1000) | |
| metrics.increment_success() | |
| return result | |
| except Exception as e: | |
| metrics.increment_error() | |
| logger.error(f"Task execution failed: {str(e)}") | |
| raise | |
| tasks.append(execute_task(agent, task_data)) | |
| # Execute tasks concurrently | |
| results = await asyncio.gather(*tasks, return_exceptions=True) | |
| return results, metrics | |
| async def test_large_messages( | |
| agent_bus: AgentBus, | |
| logger: logging.Logger, | |
| test_sizes: List[int] | |
| ): | |
| """Test handling of large message payloads""" | |
| metrics = PerformanceMetrics() | |
| results = [] | |
| for size in test_sizes: | |
| try: | |
| message = await create_large_message(size) | |
| start_time = time.time() | |
| await agent_bus.publish( | |
| topic="large_message_test", | |
| message=message, | |
| asl_tags={"size_mb": size} | |
| ) | |
| metrics.add_response_time((time.time() - start_time) * 1000) | |
| results.append({"size_mb": size, "status": "success"}) | |
| metrics.increment_success() | |
| except Exception as e: | |
| logger.error(f"Large message test failed for size {size}MB: {str(e)}") | |
| results.append({"size_mb": size, "status": "failed", "error": str(e)}) | |
| metrics.increment_error() | |
| return results, metrics | |
| async def test_network_latency( | |
| agent_bus: AgentBus, | |
| logger: logging.Logger, | |
| test_message_count: int, | |
| test_message_size: int | |
| ): | |
| """Test system behavior with network latency simulation""" | |
| metrics = PerformanceMetrics() | |
| # Simulate network latency | |
| original_publish = agent_bus.publish | |
| async def delayed_publish(*args, **kwargs): | |
| await asyncio.sleep(0.05) # Simulate 50ms network latency | |
| return await original_publish(*args, **kwargs) | |
| agent_bus.publish = delayed_publish | |
| try: | |
| tasks = [] | |
| for i in range(test_message_count): | |
| message = { | |
| "data": "x" * (test_message_size * 1024), | |
| "sequence": i | |
| } | |
| async def send_message(message): | |
| try: | |
| start_time = time.time() | |
| await agent_bus.publish( | |
| topic="latency_test", | |
| message=message, | |
| asl_tags={"test": "latency"} | |
| ) | |
| metrics.add_response_time((time.time() - start_time) * 1000) | |
| metrics.increment_success() | |
| except Exception as e: | |
| metrics.increment_error() | |
| raise | |
| tasks.append(send_message(message)) | |
| await asyncio.gather(*tasks) | |
| finally: | |
| agent_bus.publish = original_publish | |
| return metrics | |
| async def test_all_scale_scenarios( | |
| agent_bus: AgentBus, | |
| logger: logging.Logger, | |
| test_config: Dict[str, Any], | |
| test_agent_count: int, | |
| test_tasks_per_agent: int, | |
| test_sizes: List[int], | |
| test_message_count: int, | |
| test_message_size: int | |
| ): | |
| """Run all scale tests in sequence""" | |
| try: | |
| logger.info("Starting scale testing suite...") | |
| # Test concurrent agents | |
| logger.info("\nTesting concurrent agents...") | |
| agent_results, agent_metrics = await test_concurrent_agents( | |
| agent_bus, logger, test_config, | |
| test_agent_count, test_tasks_per_agent | |
| ) | |
| logger.info(f"Concurrent agents metrics: {agent_metrics.get_summary()}") | |
| # Test large messages | |
| logger.info("\nTesting large message handling...") | |
| message_results, message_metrics = await test_large_messages( | |
| agent_bus, logger, test_sizes | |
| ) | |
| logger.info(f"Large message results: {message_results}") | |
| logger.info(f"Message handling metrics: {message_metrics.get_summary()}") | |
| # Test network latency | |
| logger.info("\nTesting network latency handling...") | |
| latency_metrics = await test_network_latency( | |
| agent_bus, logger, | |
| test_message_count, test_message_size | |
| ) | |
| logger.info(f"Network latency metrics: {latency_metrics.get_summary()}") | |
| logger.info("\nAll scale tests completed successfully!") | |
| return { | |
| "agent_metrics": agent_metrics.get_summary(), | |
| "message_metrics": message_metrics.get_summary(), | |
| "latency_metrics": latency_metrics.get_summary() | |
| } | |
| except Exception as e: | |
| logger.error(f"Error in scale testing: {str(e)}") | |
| raise | |
| if __name__ == "__main__": | |
| pytest.main([__file__, "-v"]) | |