""" End-to-End Integration Tests for AetheroOS """ import pytest import asyncio import httpx import json from datetime import datetime, timedelta, UTC import logging from typing import Dict, Any, List import docker logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) from unittest.mock import AsyncMock, patch class TestSystemIntegration: @pytest.fixture def http_client(self): # Create mock client mock_client = AsyncMock() mock_client.get = AsyncMock() mock_client.post = AsyncMock() # Set up response mocks get_response = AsyncMock() get_response.json = AsyncMock() post_response = AsyncMock() post_response.json = AsyncMock() # Configure default returns mock_client.get.return_value = get_response mock_client.post.return_value = post_response # Return synchronously since we'll mock the async behavior return mock_client @pytest.fixture(scope="class") def mock_responses(self): return { "plan": {"task_id": "test_123"}, "search": {"resources": ["resource1", "resource2"]}, "analyze": {"analysis": "test analysis"}, "synthesis": {"result": "test result"}, "state": {"id": "state_123", "data": {"test": "data"}}, "metrics": {"status": "success"}, "alerts": [{"labels": {"alertname": "HighErrorRate"}}], "validation": {"metrics": {"accuracy": 0.9}, "findings": ["finding1"]}, "graph": {"nodes": [{"id": "test_agent", "state": "processing"}]} } @pytest.mark.asyncio async def test_agent_communication_flow(self, http_client, mock_responses): """Test complete agent communication pipeline with mocked responses""" task_data = { "directive": "Test directive for agent communication", "priority": "high", "context": {"test": True} } # Configure mock responses http_client.post.return_value.status_code = 200 http_client.post.return_value.json.return_value = mock_responses["plan"] http_client.get.return_value.status_code = 200 # Submit to planner agent response = await http_client.post( "http://localhost:8000/api/v1/plan", json=task_data ) assert response.status_code == 200 plan_result = await response.json() task_id = plan_result["task_id"] # Configure mock for scout agent http_client.get.return_value.json.return_value = mock_responses["search"] # Verify scout agent processing response = await http_client.get( f"http://localhost:8001/api/v1/search?task_id={task_id}" ) assert response.status_code == 200 scout_result = await response.json() assert "resources" in scout_result # Configure mock for analyst agent http_client.get.return_value.json.return_value = mock_responses["analyze"] # Verify analyst agent processing response = await http_client.get( f"http://localhost:8002/api/v1/analyze?task_id={task_id}" ) assert response.status_code == 200 analysis_result = await response.json() assert "analysis" in analysis_result # Configure mock for synthesis http_client.get.return_value.json.return_value = mock_responses["synthesis"] # Verify final synthesis response = await http_client.get( f"http://localhost:8004/api/v1/synthesis?task_id={task_id}" ) assert response.status_code == 200 synthesis_result = await response.json() assert "result" in synthesis_result @pytest.mark.asyncio async def test_memory_system_persistence(self, http_client, mock_responses): """Test Aethero_Mem data persistence and retrieval with mocked responses""" test_data = { "agent_id": "test_agent", "timestamp": datetime.now(UTC).isoformat(), "state": "processing", "data": {"test": "data"} } # Configure mock responses http_client.post.return_value.status_code = 201 http_client.post.return_value.json.return_value = mock_responses["state"] http_client.get.return_value.status_code = 200 http_client.get.return_value.json.return_value = mock_responses["state"] # Store state response = await http_client.post( "http://localhost:9091/api/v1/states", json=test_data ) assert response.status_code == 201 result = await response.json() state_id = result["id"] # Verify immediate retrieval response = await http_client.get( f"http://localhost:9091/api/v1/states/{state_id}" ) assert response.status_code == 200 stored_data = await response.json() assert stored_data["data"]["test"] == "data" # Verify persistence (no need to wait in mocked tests) response = await http_client.get( f"http://localhost:9091/api/v1/states/{state_id}" ) assert response.status_code == 200 persisted_data = await response.json() assert persisted_data == stored_data @pytest.mark.asyncio async def test_metrics_collection_flow(self, http_client, mock_responses): """Test metrics collection and monitoring integration with mocked responses""" # Generate test metrics test_metrics = { "agent_response_time_seconds": 0.1, "agent_memory_usage_bytes": 1024, "agent_cpu_usage_percent": 5.0 } # Configure mock responses http_client.post.return_value.status_code = 200 http_client.post.return_value.json.return_value = mock_responses["metrics"] http_client.get.return_value.status_code = 200 http_client.get.return_value.json.return_value = { "data": { "result": [{"value": [1234567890, "0.1"]}] } } # Push metrics response = await http_client.post( "http://localhost:9091/metrics/job/test_agent", json=test_metrics ) assert response.status_code == 200 # Verify in Prometheus response = await http_client.get( "http://localhost:9090/api/v1/query", params={"query": 'agent_response_time_seconds{job="test_agent"}'} ) assert response.status_code == 200 result = await response.json() assert len(result["data"]["result"]) > 0 @pytest.mark.asyncio async def test_alert_triggering(self, http_client, mock_responses): """Test alert triggering and notification flow with mocked responses""" # Trigger test alert condition test_metrics = { "agent_error_total": 100, # Should trigger error rate alert "job": "test_agent" } # Configure mock responses http_client.post.return_value.status_code = 200 http_client.post.return_value.json.return_value = mock_responses["metrics"] http_client.get.return_value.status_code = 200 http_client.get.return_value.json.return_value = mock_responses["alerts"] # Push alert-triggering metrics response = await http_client.post( "http://localhost:9091/metrics/job/test_agent", json=test_metrics ) assert response.status_code == 200 # No need to wait in mocked tests # Verify alert in AlertManager response = await http_client.get( "http://localhost:9093/api/v2/alerts" ) assert response.status_code == 200 alerts = await response.json() assert any( alert["labels"].get("alertname") == "HighErrorRate" for alert in alerts ) @pytest.mark.asyncio async def test_reflection_integration(self, http_client, mock_responses): """Test reflection agent integration with DeepEval using mocked responses""" # Submit test output for evaluation test_output = { "agent_id": "test_agent", "output": { "result": "test_result", "confidence": 0.8 }, "context": { "task_type": "test", "priority": "high" } } # Configure mock responses http_client.post.return_value.status_code = 200 http_client.post.return_value.json.return_value = mock_responses["validation"] http_client.get.return_value.status_code = 200 http_client.get.return_value.json.return_value = [mock_responses["validation"]] # Request evaluation response = await http_client.post( "http://localhost:8005/api/v1/validate", json=test_output ) assert response.status_code == 200 eval_result = await response.json() assert "metrics" in eval_result assert "findings" in eval_result # Verify reflection result storage response = await http_client.get( f"http://localhost:9091/api/v1/reflections?agent_id=test_agent" ) assert response.status_code == 200 stored_results = await response.json() assert len(stored_results) > 0 @pytest.mark.asyncio async def test_visualization_updates(self, http_client, mock_responses): """Test LangGraph visualization updates with mocked responses""" state_update = { "agent_id": "test_agent", "state": "processing", "asl_tags": {"purpose": "test"} } # Configure mock responses http_client.post.return_value.status_code = 200 http_client.get.return_value.status_code = 200 http_client.get.return_value.json.return_value = mock_responses["graph"] # Send state update response = await http_client.post( "http://localhost:8080/api/v1/state", json=state_update ) assert response.status_code == 200 # Verify visualization update response = await http_client.get( "http://localhost:8080/api/v1/graph" ) assert response.status_code == 200 graph_data = await response.json() assert any( node["id"] == "test_agent" and node["state"] == "processing" for node in graph_data["nodes"] ) @pytest.mark.asyncio async def test_system_recovery(self, http_client, mock_responses): """Test system recovery after component failure with mocked responses""" # Configure mock responses for failure state http_client.get.return_value.status_code = 200 http_client.get.return_value.json.return_value = { "data": { "result": [{"value": [1234567890, "0"]}] } } # Verify system detection of failure response = await http_client.get( "http://localhost:9090/api/v1/query", params={"query": 'up{job="aethero_mem"}'} ) assert response.status_code == 200 result = await response.json() assert result["data"]["result"][0]["value"][1] == "0" # Configure mock responses for recovery state http_client.get.return_value.status_code = 200 # Verify system recovery response = await http_client.get( "http://localhost:9091/health" ) assert response.status_code == 200 @pytest.mark.asyncio async def test_data_consistency(self, http_client, mock_responses): """Test data consistency across system components with mocked responses""" decision = { "decision_id": f"test_dec_{datetime.now(UTC).timestamp()}", "agent_id": "test_agent", "decision": "test_decision", "context": {"test": True} } # Configure mock responses http_client.post.return_value.status_code = 201 http_client.post.return_value.json.return_value = decision http_client.get.return_value.status_code = 200 http_client.get.return_value.json.return_value = decision # Store decision response = await http_client.post( "http://localhost:9091/api/v1/decisions", json=decision ) assert response.status_code == 201 stored_decision = await response.json() # Verify in memory system response = await http_client.get( f"http://localhost:9091/api/v1/decisions/{decision['decision_id']}" ) assert response.status_code == 200 mem_decision = await response.json() assert mem_decision == stored_decision # Verify in reflection system response = await http_client.get( f"http://localhost:8005/api/v1/context/{decision['decision_id']}" ) assert response.status_code == 200 reflection_context = await response.json() assert reflection_context["decision_id"] == decision["decision_id"] if __name__ == "__main__": pytest.main(["-v", __file__])