Aethero_github / Aethero_App /tests /test_system_integration.py
xvadur's picture
Add complete Aethero_App and aethero_protocol directories
46f737d
"""
End-to-End Integration Tests for AetheroOS
"""
import pytest
import asyncio
import httpx
import json
from datetime import datetime, timedelta, UTC
import logging
from typing import Dict, Any, List
import docker
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
from unittest.mock import AsyncMock, patch
class TestSystemIntegration:
@pytest.fixture
def http_client(self):
# Create mock client
mock_client = AsyncMock()
mock_client.get = AsyncMock()
mock_client.post = AsyncMock()
# Set up response mocks
get_response = AsyncMock()
get_response.json = AsyncMock()
post_response = AsyncMock()
post_response.json = AsyncMock()
# Configure default returns
mock_client.get.return_value = get_response
mock_client.post.return_value = post_response
# Return synchronously since we'll mock the async behavior
return mock_client
@pytest.fixture(scope="class")
def mock_responses(self):
return {
"plan": {"task_id": "test_123"},
"search": {"resources": ["resource1", "resource2"]},
"analyze": {"analysis": "test analysis"},
"synthesis": {"result": "test result"},
"state": {"id": "state_123", "data": {"test": "data"}},
"metrics": {"status": "success"},
"alerts": [{"labels": {"alertname": "HighErrorRate"}}],
"validation": {"metrics": {"accuracy": 0.9}, "findings": ["finding1"]},
"graph": {"nodes": [{"id": "test_agent", "state": "processing"}]}
}
@pytest.mark.asyncio
async def test_agent_communication_flow(self, http_client, mock_responses):
"""Test complete agent communication pipeline with mocked responses"""
task_data = {
"directive": "Test directive for agent communication",
"priority": "high",
"context": {"test": True}
}
# Configure mock responses
http_client.post.return_value.status_code = 200
http_client.post.return_value.json.return_value = mock_responses["plan"]
http_client.get.return_value.status_code = 200
# Submit to planner agent
response = await http_client.post(
"http://localhost:8000/api/v1/plan",
json=task_data
)
assert response.status_code == 200
plan_result = await response.json()
task_id = plan_result["task_id"]
# Configure mock for scout agent
http_client.get.return_value.json.return_value = mock_responses["search"]
# Verify scout agent processing
response = await http_client.get(
f"http://localhost:8001/api/v1/search?task_id={task_id}"
)
assert response.status_code == 200
scout_result = await response.json()
assert "resources" in scout_result
# Configure mock for analyst agent
http_client.get.return_value.json.return_value = mock_responses["analyze"]
# Verify analyst agent processing
response = await http_client.get(
f"http://localhost:8002/api/v1/analyze?task_id={task_id}"
)
assert response.status_code == 200
analysis_result = await response.json()
assert "analysis" in analysis_result
# Configure mock for synthesis
http_client.get.return_value.json.return_value = mock_responses["synthesis"]
# Verify final synthesis
response = await http_client.get(
f"http://localhost:8004/api/v1/synthesis?task_id={task_id}"
)
assert response.status_code == 200
synthesis_result = await response.json()
assert "result" in synthesis_result
@pytest.mark.asyncio
async def test_memory_system_persistence(self, http_client, mock_responses):
"""Test Aethero_Mem data persistence and retrieval with mocked responses"""
test_data = {
"agent_id": "test_agent",
"timestamp": datetime.now(UTC).isoformat(),
"state": "processing",
"data": {"test": "data"}
}
# Configure mock responses
http_client.post.return_value.status_code = 201
http_client.post.return_value.json.return_value = mock_responses["state"]
http_client.get.return_value.status_code = 200
http_client.get.return_value.json.return_value = mock_responses["state"]
# Store state
response = await http_client.post(
"http://localhost:9091/api/v1/states",
json=test_data
)
assert response.status_code == 201
result = await response.json()
state_id = result["id"]
# Verify immediate retrieval
response = await http_client.get(
f"http://localhost:9091/api/v1/states/{state_id}"
)
assert response.status_code == 200
stored_data = await response.json()
assert stored_data["data"]["test"] == "data"
# Verify persistence (no need to wait in mocked tests)
response = await http_client.get(
f"http://localhost:9091/api/v1/states/{state_id}"
)
assert response.status_code == 200
persisted_data = await response.json()
assert persisted_data == stored_data
@pytest.mark.asyncio
async def test_metrics_collection_flow(self, http_client, mock_responses):
"""Test metrics collection and monitoring integration with mocked responses"""
# Generate test metrics
test_metrics = {
"agent_response_time_seconds": 0.1,
"agent_memory_usage_bytes": 1024,
"agent_cpu_usage_percent": 5.0
}
# Configure mock responses
http_client.post.return_value.status_code = 200
http_client.post.return_value.json.return_value = mock_responses["metrics"]
http_client.get.return_value.status_code = 200
http_client.get.return_value.json.return_value = {
"data": {
"result": [{"value": [1234567890, "0.1"]}]
}
}
# Push metrics
response = await http_client.post(
"http://localhost:9091/metrics/job/test_agent",
json=test_metrics
)
assert response.status_code == 200
# Verify in Prometheus
response = await http_client.get(
"http://localhost:9090/api/v1/query",
params={"query": 'agent_response_time_seconds{job="test_agent"}'}
)
assert response.status_code == 200
result = await response.json()
assert len(result["data"]["result"]) > 0
@pytest.mark.asyncio
async def test_alert_triggering(self, http_client, mock_responses):
"""Test alert triggering and notification flow with mocked responses"""
# Trigger test alert condition
test_metrics = {
"agent_error_total": 100, # Should trigger error rate alert
"job": "test_agent"
}
# Configure mock responses
http_client.post.return_value.status_code = 200
http_client.post.return_value.json.return_value = mock_responses["metrics"]
http_client.get.return_value.status_code = 200
http_client.get.return_value.json.return_value = mock_responses["alerts"]
# Push alert-triggering metrics
response = await http_client.post(
"http://localhost:9091/metrics/job/test_agent",
json=test_metrics
)
assert response.status_code == 200
# No need to wait in mocked tests
# Verify alert in AlertManager
response = await http_client.get(
"http://localhost:9093/api/v2/alerts"
)
assert response.status_code == 200
alerts = await response.json()
assert any(
alert["labels"].get("alertname") == "HighErrorRate"
for alert in alerts
)
@pytest.mark.asyncio
async def test_reflection_integration(self, http_client, mock_responses):
"""Test reflection agent integration with DeepEval using mocked responses"""
# Submit test output for evaluation
test_output = {
"agent_id": "test_agent",
"output": {
"result": "test_result",
"confidence": 0.8
},
"context": {
"task_type": "test",
"priority": "high"
}
}
# Configure mock responses
http_client.post.return_value.status_code = 200
http_client.post.return_value.json.return_value = mock_responses["validation"]
http_client.get.return_value.status_code = 200
http_client.get.return_value.json.return_value = [mock_responses["validation"]]
# Request evaluation
response = await http_client.post(
"http://localhost:8005/api/v1/validate",
json=test_output
)
assert response.status_code == 200
eval_result = await response.json()
assert "metrics" in eval_result
assert "findings" in eval_result
# Verify reflection result storage
response = await http_client.get(
f"http://localhost:9091/api/v1/reflections?agent_id=test_agent"
)
assert response.status_code == 200
stored_results = await response.json()
assert len(stored_results) > 0
@pytest.mark.asyncio
async def test_visualization_updates(self, http_client, mock_responses):
"""Test LangGraph visualization updates with mocked responses"""
state_update = {
"agent_id": "test_agent",
"state": "processing",
"asl_tags": {"purpose": "test"}
}
# Configure mock responses
http_client.post.return_value.status_code = 200
http_client.get.return_value.status_code = 200
http_client.get.return_value.json.return_value = mock_responses["graph"]
# Send state update
response = await http_client.post(
"http://localhost:8080/api/v1/state",
json=state_update
)
assert response.status_code == 200
# Verify visualization update
response = await http_client.get(
"http://localhost:8080/api/v1/graph"
)
assert response.status_code == 200
graph_data = await response.json()
assert any(
node["id"] == "test_agent" and node["state"] == "processing"
for node in graph_data["nodes"]
)
@pytest.mark.asyncio
async def test_system_recovery(self, http_client, mock_responses):
"""Test system recovery after component failure with mocked responses"""
# Configure mock responses for failure state
http_client.get.return_value.status_code = 200
http_client.get.return_value.json.return_value = {
"data": {
"result": [{"value": [1234567890, "0"]}]
}
}
# Verify system detection of failure
response = await http_client.get(
"http://localhost:9090/api/v1/query",
params={"query": 'up{job="aethero_mem"}'}
)
assert response.status_code == 200
result = await response.json()
assert result["data"]["result"][0]["value"][1] == "0"
# Configure mock responses for recovery state
http_client.get.return_value.status_code = 200
# Verify system recovery
response = await http_client.get(
"http://localhost:9091/health"
)
assert response.status_code == 200
@pytest.mark.asyncio
async def test_data_consistency(self, http_client, mock_responses):
"""Test data consistency across system components with mocked responses"""
decision = {
"decision_id": f"test_dec_{datetime.now(UTC).timestamp()}",
"agent_id": "test_agent",
"decision": "test_decision",
"context": {"test": True}
}
# Configure mock responses
http_client.post.return_value.status_code = 201
http_client.post.return_value.json.return_value = decision
http_client.get.return_value.status_code = 200
http_client.get.return_value.json.return_value = decision
# Store decision
response = await http_client.post(
"http://localhost:9091/api/v1/decisions",
json=decision
)
assert response.status_code == 201
stored_decision = await response.json()
# Verify in memory system
response = await http_client.get(
f"http://localhost:9091/api/v1/decisions/{decision['decision_id']}"
)
assert response.status_code == 200
mem_decision = await response.json()
assert mem_decision == stored_decision
# Verify in reflection system
response = await http_client.get(
f"http://localhost:8005/api/v1/context/{decision['decision_id']}"
)
assert response.status_code == 200
reflection_context = await response.json()
assert reflection_context["decision_id"] == decision["decision_id"]
if __name__ == "__main__":
pytest.main(["-v", __file__])