Aethero_github / Aethero_App /tests /test_thorough.py
xvadur's picture
Add complete Aethero_App and aethero_protocol directories
46f737d
import asyncio
import logging
from datetime import datetime
import sys
import os
import json
from typing import List, Dict, Any
from concurrent.futures import ThreadPoolExecutor
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
from src.agents.aethero_agent_bootstrap import BaseAetheroAgent
from src.agents.error_handler import ErrorHandler, ErrorContext
from src.agents.agent_bus import AgentBus, Message
from src.monitoring.monitor import AetheroMonitor
class StressTestAgent(BaseAetheroAgent):
async def process_task(self, task_data: Dict[str, Any], asl_context: Dict[str, Any]) -> Dict[str, Any]:
# Simulate varying processing times and memory usage
await asyncio.sleep(task_data.get("processing_time", 0.1))
# Simulate memory allocation
memory_size = task_data.get("memory_size", 1000)
_ = " " * memory_size
if task_data.get("should_fail", False):
raise ValueError(f"Task {task_data.get('task_id')} failed")
return {
"status": "success",
"result": f"Processed by {self.agent_id}",
"task_data": task_data,
"asl_context": asl_context,
"timestamp": datetime.now().isoformat()
}
async def generate_large_payload(size_kb: int) -> Dict[str, Any]:
"""Generate a large message payload."""
return {
"data": "x" * (size_kb * 1024),
"metadata": {
"size": size_kb,
"timestamp": datetime.now().isoformat()
}
}
async def test_concurrent_processing(agent: StressTestAgent, num_tasks: int):
"""Test concurrent task processing."""
tasks = []
for i in range(num_tasks):
task_data = {
"task_id": f"concurrent_task_{i}",
"processing_time": 0.1,
"memory_size": 1000
}
asl_context = {"pipeline_id": "stress_test"}
tasks.append(agent.execute_task(task_data, asl_context))
results = await asyncio.gather(*tasks, return_exceptions=True)
return [r for r in results if not isinstance(r, Exception)]
async def test_large_messages(agent_bus: AgentBus, sizes_kb: List[int]):
"""Test handling of large message payloads."""
results = []
for size in sizes_kb:
payload = await generate_large_payload(size)
try:
await agent_bus.publish(
topic="large_message_test",
message=payload,
asl_tags={"size_kb": size}
)
results.append({"size_kb": size, "status": "success"})
except Exception as e:
results.append({"size_kb": size, "status": "failed", "error": str(e)})
return results
async def test_network_interruption(agent: StressTestAgent, agent_bus: AgentBus):
"""Test system behavior during network interruptions."""
# Simulate network delay
original_publish = agent_bus.publish
async def delayed_publish(*args, **kwargs):
await asyncio.sleep(2) # Simulate network delay
return await original_publish(*args, **kwargs)
agent_bus.publish = delayed_publish
try:
task_data = {"task_id": "network_test", "processing_time": 0.5}
asl_context = {"pipeline_id": "network_test"}
result = await agent.execute_task(task_data, asl_context)
return {"status": "success", "result": result}
except Exception as e:
return {"status": "failed", "error": str(e)}
finally:
agent_bus.publish = original_publish
async def test_multi_agent_workflow(num_agents: int, tasks_per_agent: int):
"""Test complex multi-agent workflow scenarios."""
agents = []
agent_bus = AgentBus()
# Create agents
for i in range(num_agents):
agent = StressTestAgent(
f"stress_agent_{i}",
{"pipeline_id": "multi_agent_test"},
logging.getLogger(f"stress_agent_{i}"),
agent_bus
)
agents.append(agent)
# Create workflow
results = []
for agent in agents:
agent_results = await test_concurrent_processing(agent, tasks_per_agent)
results.extend(agent_results)
return results
async def test_system_recovery():
"""Test system recovery after failures."""
agent_bus = AgentBus()
monitor = AetheroMonitor()
agent = StressTestAgent(
"recovery_agent",
{"pipeline_id": "recovery_test"},
logging.getLogger("recovery_agent"),
agent_bus
)
results = []
# Test 1: Agent failure and recovery
try:
await agent.execute_task({"should_fail": True}, {})
except Exception as e:
results.append({"test": "agent_failure", "status": "expected_failure", "error": str(e)})
# Test 2: Recovery after failure
try:
result = await agent.execute_task({"task_id": "recovery_task"}, {})
results.append({"test": "recovery", "status": "success", "result": result})
except Exception as e:
results.append({"test": "recovery", "status": "failed", "error": str(e)})
return results
async def run_thorough_tests():
"""Run all thorough tests."""
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("thorough_tests")
try:
logger.info("Starting thorough testing suite...")
# Initialize components
agent_bus = AgentBus()
monitor = AetheroMonitor()
agent = StressTestAgent(
"stress_test_agent",
{"pipeline_id": "thorough_test"},
logger,
agent_bus
)
# Start monitoring
monitor_task = asyncio.create_task(monitor.start_monitoring(interval=5))
# 1. Test concurrent processing
logger.info("\nTesting concurrent processing...")
concurrent_results = await test_concurrent_processing(agent, 10)
logger.info(f"Concurrent processing results: {len(concurrent_results)} tasks completed")
# 2. Test large messages
logger.info("\nTesting large message handling...")
message_results = await test_large_messages(agent_bus, [1, 10, 100])
logger.info(f"Large message results: {json.dumps(message_results, indent=2)}")
# 3. Test network interruption handling
logger.info("\nTesting network interruption handling...")
network_result = await test_network_interruption(agent, agent_bus)
logger.info(f"Network interruption test result: {json.dumps(network_result, indent=2)}")
# 4. Test multi-agent workflow
logger.info("\nTesting multi-agent workflow...")
workflow_results = await test_multi_agent_workflow(3, 5)
logger.info(f"Multi-agent workflow results: {len(workflow_results)} total tasks completed")
# 5. Test system recovery
logger.info("\nTesting system recovery...")
recovery_results = await test_system_recovery()
logger.info(f"Recovery test results: {json.dumps(recovery_results, indent=2)}")
# Stop monitoring
monitor.running = False
await monitor_task
logger.info("\nAll thorough tests completed successfully!")
except Exception as e:
logger.error(f"Error in thorough testing: {str(e)}")
raise
if __name__ == "__main__":
asyncio.run(run_thorough_tests())