Spaces:
Configuration error
Configuration error
| import asyncio | |
| import logging | |
| from datetime import datetime | |
| import sys | |
| import os | |
| import json | |
| from typing import List, Dict, Any | |
| from concurrent.futures import ThreadPoolExecutor | |
| sys.path.append(os.path.join(os.path.dirname(__file__), '..')) | |
| from src.agents.aethero_agent_bootstrap import BaseAetheroAgent | |
| from src.agents.error_handler import ErrorHandler, ErrorContext | |
| from src.agents.agent_bus import AgentBus, Message | |
| from src.monitoring.monitor import AetheroMonitor | |
| class StressTestAgent(BaseAetheroAgent): | |
| async def process_task(self, task_data: Dict[str, Any], asl_context: Dict[str, Any]) -> Dict[str, Any]: | |
| # Simulate varying processing times and memory usage | |
| await asyncio.sleep(task_data.get("processing_time", 0.1)) | |
| # Simulate memory allocation | |
| memory_size = task_data.get("memory_size", 1000) | |
| _ = " " * memory_size | |
| if task_data.get("should_fail", False): | |
| raise ValueError(f"Task {task_data.get('task_id')} failed") | |
| return { | |
| "status": "success", | |
| "result": f"Processed by {self.agent_id}", | |
| "task_data": task_data, | |
| "asl_context": asl_context, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| async def generate_large_payload(size_kb: int) -> Dict[str, Any]: | |
| """Generate a large message payload.""" | |
| return { | |
| "data": "x" * (size_kb * 1024), | |
| "metadata": { | |
| "size": size_kb, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| } | |
| async def test_concurrent_processing(agent: StressTestAgent, num_tasks: int): | |
| """Test concurrent task processing.""" | |
| tasks = [] | |
| for i in range(num_tasks): | |
| task_data = { | |
| "task_id": f"concurrent_task_{i}", | |
| "processing_time": 0.1, | |
| "memory_size": 1000 | |
| } | |
| asl_context = {"pipeline_id": "stress_test"} | |
| tasks.append(agent.execute_task(task_data, asl_context)) | |
| results = await asyncio.gather(*tasks, return_exceptions=True) | |
| return [r for r in results if not isinstance(r, Exception)] | |
| async def test_large_messages(agent_bus: AgentBus, sizes_kb: List[int]): | |
| """Test handling of large message payloads.""" | |
| results = [] | |
| for size in sizes_kb: | |
| payload = await generate_large_payload(size) | |
| try: | |
| await agent_bus.publish( | |
| topic="large_message_test", | |
| message=payload, | |
| asl_tags={"size_kb": size} | |
| ) | |
| results.append({"size_kb": size, "status": "success"}) | |
| except Exception as e: | |
| results.append({"size_kb": size, "status": "failed", "error": str(e)}) | |
| return results | |
| async def test_network_interruption(agent: StressTestAgent, agent_bus: AgentBus): | |
| """Test system behavior during network interruptions.""" | |
| # Simulate network delay | |
| original_publish = agent_bus.publish | |
| async def delayed_publish(*args, **kwargs): | |
| await asyncio.sleep(2) # Simulate network delay | |
| return await original_publish(*args, **kwargs) | |
| agent_bus.publish = delayed_publish | |
| try: | |
| task_data = {"task_id": "network_test", "processing_time": 0.5} | |
| asl_context = {"pipeline_id": "network_test"} | |
| result = await agent.execute_task(task_data, asl_context) | |
| return {"status": "success", "result": result} | |
| except Exception as e: | |
| return {"status": "failed", "error": str(e)} | |
| finally: | |
| agent_bus.publish = original_publish | |
| async def test_multi_agent_workflow(num_agents: int, tasks_per_agent: int): | |
| """Test complex multi-agent workflow scenarios.""" | |
| agents = [] | |
| agent_bus = AgentBus() | |
| # Create agents | |
| for i in range(num_agents): | |
| agent = StressTestAgent( | |
| f"stress_agent_{i}", | |
| {"pipeline_id": "multi_agent_test"}, | |
| logging.getLogger(f"stress_agent_{i}"), | |
| agent_bus | |
| ) | |
| agents.append(agent) | |
| # Create workflow | |
| results = [] | |
| for agent in agents: | |
| agent_results = await test_concurrent_processing(agent, tasks_per_agent) | |
| results.extend(agent_results) | |
| return results | |
| async def test_system_recovery(): | |
| """Test system recovery after failures.""" | |
| agent_bus = AgentBus() | |
| monitor = AetheroMonitor() | |
| agent = StressTestAgent( | |
| "recovery_agent", | |
| {"pipeline_id": "recovery_test"}, | |
| logging.getLogger("recovery_agent"), | |
| agent_bus | |
| ) | |
| results = [] | |
| # Test 1: Agent failure and recovery | |
| try: | |
| await agent.execute_task({"should_fail": True}, {}) | |
| except Exception as e: | |
| results.append({"test": "agent_failure", "status": "expected_failure", "error": str(e)}) | |
| # Test 2: Recovery after failure | |
| try: | |
| result = await agent.execute_task({"task_id": "recovery_task"}, {}) | |
| results.append({"test": "recovery", "status": "success", "result": result}) | |
| except Exception as e: | |
| results.append({"test": "recovery", "status": "failed", "error": str(e)}) | |
| return results | |
| async def run_thorough_tests(): | |
| """Run all thorough tests.""" | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger("thorough_tests") | |
| try: | |
| logger.info("Starting thorough testing suite...") | |
| # Initialize components | |
| agent_bus = AgentBus() | |
| monitor = AetheroMonitor() | |
| agent = StressTestAgent( | |
| "stress_test_agent", | |
| {"pipeline_id": "thorough_test"}, | |
| logger, | |
| agent_bus | |
| ) | |
| # Start monitoring | |
| monitor_task = asyncio.create_task(monitor.start_monitoring(interval=5)) | |
| # 1. Test concurrent processing | |
| logger.info("\nTesting concurrent processing...") | |
| concurrent_results = await test_concurrent_processing(agent, 10) | |
| logger.info(f"Concurrent processing results: {len(concurrent_results)} tasks completed") | |
| # 2. Test large messages | |
| logger.info("\nTesting large message handling...") | |
| message_results = await test_large_messages(agent_bus, [1, 10, 100]) | |
| logger.info(f"Large message results: {json.dumps(message_results, indent=2)}") | |
| # 3. Test network interruption handling | |
| logger.info("\nTesting network interruption handling...") | |
| network_result = await test_network_interruption(agent, agent_bus) | |
| logger.info(f"Network interruption test result: {json.dumps(network_result, indent=2)}") | |
| # 4. Test multi-agent workflow | |
| logger.info("\nTesting multi-agent workflow...") | |
| workflow_results = await test_multi_agent_workflow(3, 5) | |
| logger.info(f"Multi-agent workflow results: {len(workflow_results)} total tasks completed") | |
| # 5. Test system recovery | |
| logger.info("\nTesting system recovery...") | |
| recovery_results = await test_system_recovery() | |
| logger.info(f"Recovery test results: {json.dumps(recovery_results, indent=2)}") | |
| # Stop monitoring | |
| monitor.running = False | |
| await monitor_task | |
| logger.info("\nAll thorough tests completed successfully!") | |
| except Exception as e: | |
| logger.error(f"Error in thorough testing: {str(e)}") | |
| raise | |
| if __name__ == "__main__": | |
| asyncio.run(run_thorough_tests()) | |