xvadur's picture
Add complete Aethero_App and aethero_protocol directories
46f737d
import asyncio
import logging
import time
import psutil
import statistics
import pytest
from datetime import datetime
from typing import Dict, Any, List
from concurrent.futures import ThreadPoolExecutor
import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
from src.agents.aethero_agent_bootstrap import BaseAetheroAgent
from src.agents.agent_bus import AgentBus, Message
from src.monitoring.monitor import AetheroMonitor
class ScaleTestAgent(BaseAetheroAgent):
"""Agent implementation for scale testing"""
async def process_task(self, task_data: Dict[str, Any], asl_context: Dict[str, Any]) -> Dict[str, Any]:
"""Process task with configurable load simulation"""
# Simulate CPU load
if task_data.get("cpu_intensive", False):
self._simulate_cpu_load(task_data.get("cpu_load_duration", 0.1))
# Simulate memory allocation
if task_data.get("memory_intensive", False):
self._simulate_memory_load(task_data.get("memory_size_mb", 1))
return {
"status": "success",
"result": f"Processed by {self.agent_id}",
"task_data": task_data,
"asl_context": asl_context,
"timestamp": datetime.now().isoformat()
}
def _simulate_cpu_load(self, duration: float):
"""Simulate CPU-intensive task"""
start_time = time.time()
while time.time() - start_time < duration:
_ = [i * i for i in range(1000)]
def _simulate_memory_load(self, size_mb: int):
"""Simulate memory-intensive task"""
# Allocate memory (1MB = 1024 * 1024 bytes)
_ = bytearray(size_mb * 1024 * 1024)
class PerformanceMetrics:
"""Track performance metrics during scale testing"""
def __init__(self):
self.response_times = []
self.cpu_usage = []
self.memory_usage = []
self.error_count = 0
self.success_count = 0
def add_response_time(self, time_ms: float):
self.response_times.append(time_ms)
def add_system_metrics(self, cpu: float, memory: float):
self.cpu_usage.append(cpu)
self.memory_usage.append(memory)
def increment_error(self):
self.error_count += 1
def increment_success(self):
self.success_count += 1
def get_summary(self) -> Dict[str, Any]:
"""Get summary of performance metrics"""
return {
"response_times": {
"avg": statistics.mean(self.response_times) if self.response_times else 0,
"p95": statistics.quantiles(self.response_times, n=20)[18] if len(self.response_times) >= 20 else 0,
"max": max(self.response_times) if self.response_times else 0
},
"system_metrics": {
"cpu_avg": statistics.mean(self.cpu_usage) if self.cpu_usage else 0,
"memory_avg": statistics.mean(self.memory_usage) if self.memory_usage else 0
},
"success_rate": self.success_count / (self.success_count + self.error_count) if (self.success_count + self.error_count) > 0 else 0
}
async def create_large_message(size_mb: int) -> Dict[str, Any]:
"""Create a message with specified size"""
return {
"data": "x" * (size_mb * 1024 * 1024), # 1MB = 1024 * 1024 bytes
"metadata": {
"size_mb": size_mb,
"timestamp": datetime.now().isoformat()
}
}
@pytest.mark.asyncio
async def test_concurrent_agents(
agent_bus: AgentBus,
logger: logging.Logger,
test_config: Dict[str, Any],
test_agent_count: int,
test_tasks_per_agent: int
):
"""Test system with multiple concurrent agents"""
metrics = PerformanceMetrics()
# Create agents
agents = []
for i in range(test_agent_count):
agent = ScaleTestAgent(
f"scale_agent_{i}",
test_config,
logger,
agent_bus
)
agents.append(agent)
# Create tasks
tasks = []
for agent in agents:
for j in range(test_tasks_per_agent):
task_data = {
"task_id": f"task_{j}",
"cpu_intensive": True,
"cpu_load_duration": 0.1,
"memory_intensive": True,
"memory_size_mb": 1
}
async def execute_task(agent, task_data):
try:
start_time = time.time()
result = await agent.execute_task(task_data, {})
metrics.add_response_time((time.time() - start_time) * 1000)
metrics.increment_success()
return result
except Exception as e:
metrics.increment_error()
logger.error(f"Task execution failed: {str(e)}")
raise
tasks.append(execute_task(agent, task_data))
# Execute tasks concurrently
results = await asyncio.gather(*tasks, return_exceptions=True)
return results, metrics
@pytest.mark.asyncio
async def test_large_messages(
agent_bus: AgentBus,
logger: logging.Logger,
test_sizes: List[int]
):
"""Test handling of large message payloads"""
metrics = PerformanceMetrics()
results = []
for size in test_sizes:
try:
message = await create_large_message(size)
start_time = time.time()
await agent_bus.publish(
topic="large_message_test",
message=message,
asl_tags={"size_mb": size}
)
metrics.add_response_time((time.time() - start_time) * 1000)
results.append({"size_mb": size, "status": "success"})
metrics.increment_success()
except Exception as e:
logger.error(f"Large message test failed for size {size}MB: {str(e)}")
results.append({"size_mb": size, "status": "failed", "error": str(e)})
metrics.increment_error()
return results, metrics
@pytest.mark.asyncio
async def test_network_latency(
agent_bus: AgentBus,
logger: logging.Logger,
test_message_count: int,
test_message_size: int
):
"""Test system behavior with network latency simulation"""
metrics = PerformanceMetrics()
# Simulate network latency
original_publish = agent_bus.publish
async def delayed_publish(*args, **kwargs):
await asyncio.sleep(0.05) # Simulate 50ms network latency
return await original_publish(*args, **kwargs)
agent_bus.publish = delayed_publish
try:
tasks = []
for i in range(test_message_count):
message = {
"data": "x" * (test_message_size * 1024),
"sequence": i
}
async def send_message(message):
try:
start_time = time.time()
await agent_bus.publish(
topic="latency_test",
message=message,
asl_tags={"test": "latency"}
)
metrics.add_response_time((time.time() - start_time) * 1000)
metrics.increment_success()
except Exception as e:
metrics.increment_error()
raise
tasks.append(send_message(message))
await asyncio.gather(*tasks)
finally:
agent_bus.publish = original_publish
return metrics
@pytest.mark.asyncio
async def test_all_scale_scenarios(
agent_bus: AgentBus,
logger: logging.Logger,
test_config: Dict[str, Any],
test_agent_count: int,
test_tasks_per_agent: int,
test_sizes: List[int],
test_message_count: int,
test_message_size: int
):
"""Run all scale tests in sequence"""
try:
logger.info("Starting scale testing suite...")
# Test concurrent agents
logger.info("\nTesting concurrent agents...")
agent_results, agent_metrics = await test_concurrent_agents(
agent_bus, logger, test_config,
test_agent_count, test_tasks_per_agent
)
logger.info(f"Concurrent agents metrics: {agent_metrics.get_summary()}")
# Test large messages
logger.info("\nTesting large message handling...")
message_results, message_metrics = await test_large_messages(
agent_bus, logger, test_sizes
)
logger.info(f"Large message results: {message_results}")
logger.info(f"Message handling metrics: {message_metrics.get_summary()}")
# Test network latency
logger.info("\nTesting network latency handling...")
latency_metrics = await test_network_latency(
agent_bus, logger,
test_message_count, test_message_size
)
logger.info(f"Network latency metrics: {latency_metrics.get_summary()}")
logger.info("\nAll scale tests completed successfully!")
return {
"agent_metrics": agent_metrics.get_summary(),
"message_metrics": message_metrics.get_summary(),
"latency_metrics": latency_metrics.get_summary()
}
except Exception as e:
logger.error(f"Error in scale testing: {str(e)}")
raise
if __name__ == "__main__":
pytest.main([__file__, "-v"])