Spaces:
Paused
Paused
| """ | |
| Test suite for communication system. | |
| Following test-first development: these tests define the expected behavior | |
| of spoke-based communication between agents and the central post BEFORE implementation. | |
| Tests validate communication patterns matching the OpenSCAD model from thefelix.md: | |
| - Spoke-based message passing from agents to central post | |
| - Central post coordination and response handling | |
| - Message queuing and delivery guarantees | |
| - Performance metrics for communication overhead (Hypothesis H2) | |
| """ | |
| import pytest | |
| from unittest.mock import Mock | |
| from typing import List, Dict, Any, Optional | |
| from dataclasses import dataclass | |
| from datetime import datetime | |
| # Import future communication classes (to be implemented) | |
| from src.communication.central_post import CentralPost, Message, MessageType | |
| from src.communication.spoke import Spoke, SpokeConnection | |
| from src.agents.agent import Agent, AgentState | |
| from src.core.helix_geometry import HelixGeometry | |
| # Removed TestMessage class to avoid pytest collection warning | |
| class TestCentralPost: | |
| """Test central coordination system and message handling.""" | |
| def central_post(self): | |
| """Create central post instance for testing.""" | |
| return CentralPost(max_agents=133) | |
| def standard_helix(self): | |
| """Create helix with OpenSCAD model parameters.""" | |
| return HelixGeometry( | |
| top_radius=33.0, | |
| bottom_radius=0.001, | |
| height=33.0, | |
| turns=33 | |
| ) | |
| def test_central_post_initialization(self, central_post): | |
| """Test central post can be initialized with configuration.""" | |
| assert central_post.max_agents == 133 | |
| assert central_post.active_connections == 0 | |
| assert central_post.message_queue_size == 0 | |
| assert central_post.is_active is True | |
| def test_agent_registration(self, central_post, standard_helix): | |
| """Test agents can register with central post.""" | |
| agent = Agent(agent_id="test_001", spawn_time=0.5, helix=standard_helix) | |
| # Register agent | |
| connection_id = central_post.register_agent(agent) | |
| assert connection_id is not None | |
| assert central_post.active_connections == 1 | |
| assert central_post.is_agent_registered(agent.agent_id) is True | |
| def test_agent_deregistration(self, central_post, standard_helix): | |
| """Test agents can deregister from central post.""" | |
| agent = Agent(agent_id="test_002", spawn_time=0.3, helix=standard_helix) | |
| # Register then deregister | |
| connection_id = central_post.register_agent(agent) | |
| success = central_post.deregister_agent(agent.agent_id) | |
| assert success is True | |
| assert central_post.active_connections == 0 | |
| assert central_post.is_agent_registered(agent.agent_id) is False | |
| def test_message_queuing(self, central_post, standard_helix): | |
| """Test central post can queue messages from agents.""" | |
| # Register agent first | |
| agent = Agent("agent_001", 0.3, standard_helix) | |
| central_post.register_agent(agent) | |
| message = Message( | |
| sender_id="agent_001", | |
| message_type=MessageType.TASK_REQUEST, | |
| content={"task_type": "word_count", "data": "hello world"}, | |
| timestamp=0.5 | |
| ) | |
| # Queue message | |
| message_id = central_post.queue_message(message) | |
| assert message_id is not None | |
| assert central_post.message_queue_size == 1 | |
| assert central_post.has_pending_messages() is True | |
| def test_message_processing_order(self, central_post, standard_helix): | |
| """Test messages are processed in order (FIFO).""" | |
| # Register agents first | |
| for i in [1, 2, 3]: | |
| agent = Agent(f"agent_{i:03d}", 0.1, standard_helix) | |
| central_post.register_agent(agent) | |
| # Queue multiple messages | |
| msg1 = Message("agent_001", MessageType.TASK_REQUEST, {}, 0.1) | |
| msg2 = Message("agent_002", MessageType.STATUS_UPDATE, {}, 0.2) | |
| msg3 = Message("agent_003", MessageType.TASK_COMPLETE, {}, 0.3) | |
| id1 = central_post.queue_message(msg1) | |
| id2 = central_post.queue_message(msg2) | |
| id3 = central_post.queue_message(msg3) | |
| # Process messages - should come out in order | |
| processed1 = central_post.process_next_message() | |
| processed2 = central_post.process_next_message() | |
| processed3 = central_post.process_next_message() | |
| assert processed1.sender_id == "agent_001" | |
| assert processed2.sender_id == "agent_002" | |
| assert processed3.sender_id == "agent_003" | |
| assert central_post.message_queue_size == 0 | |
| def test_maximum_agent_limit(self, central_post, standard_helix): | |
| """Test central post enforces maximum agent connections.""" | |
| # Set lower limit for testing | |
| central_post.max_agents = 2 | |
| agent1 = Agent("agent_001", 0.1, standard_helix) | |
| agent2 = Agent("agent_002", 0.2, standard_helix) | |
| agent3 = Agent("agent_003", 0.3, standard_helix) | |
| # First two should succeed | |
| conn1 = central_post.register_agent(agent1) | |
| conn2 = central_post.register_agent(agent2) | |
| assert conn1 is not None | |
| assert conn2 is not None | |
| assert central_post.active_connections == 2 | |
| # Third should fail | |
| with pytest.raises(ValueError, match="Maximum agent connections exceeded"): | |
| central_post.register_agent(agent3) | |
| class TestSpokeConnection: | |
| """Test spoke-based communication channels.""" | |
| def standard_helix(self): | |
| """Create helix with OpenSCAD model parameters.""" | |
| return HelixGeometry( | |
| top_radius=33.0, | |
| bottom_radius=0.001, | |
| height=33.0, | |
| turns=33 | |
| ) | |
| def central_post(self): | |
| """Create central post for spoke testing.""" | |
| return CentralPost(max_agents=10) | |
| def test_agent(self, standard_helix): | |
| """Create agent for spoke testing.""" | |
| return Agent(agent_id="spoke_test", spawn_time=0.4, helix=standard_helix) | |
| def test_spoke_creation(self, test_agent, central_post): | |
| """Test spoke connection can be created between agent and central post.""" | |
| spoke = Spoke(agent=test_agent, central_post=central_post) | |
| assert spoke.agent_id == test_agent.agent_id | |
| assert spoke.central_post == central_post | |
| assert spoke.is_connected is True | |
| assert spoke.message_count == 0 | |
| def test_spoke_message_sending(self, test_agent, central_post): | |
| """Test messages can be sent through spoke connection.""" | |
| spoke = Spoke(agent=test_agent, central_post=central_post) | |
| # Send message through spoke | |
| message = Message( | |
| sender_id=test_agent.agent_id, | |
| message_type=MessageType.TASK_REQUEST, | |
| content={"request": "word_counting_task"}, | |
| timestamp=0.6 | |
| ) | |
| message_id = spoke.send_message(message) | |
| assert message_id is not None | |
| assert spoke.message_count == 1 | |
| assert central_post.message_queue_size == 1 | |
| def test_spoke_bidirectional_communication(self, test_agent, central_post): | |
| """Test spoke allows bidirectional communication.""" | |
| spoke = Spoke(agent=test_agent, central_post=central_post) | |
| # Agent sends message to central post | |
| request = Message( | |
| sender_id=test_agent.agent_id, | |
| message_type=MessageType.TASK_REQUEST, | |
| content={"task": "process_data"}, | |
| timestamp=0.5 | |
| ) | |
| spoke.send_message(request) | |
| # Central post processes and responds | |
| processed_msg = central_post.process_next_message() | |
| response = Message( | |
| sender_id="central_post", | |
| message_type=MessageType.TASK_ASSIGNMENT, | |
| content={"task_id": "task_123", "data": "sample data"}, | |
| timestamp=0.51 | |
| ) | |
| # Send response back through spoke | |
| response_id = spoke.receive_message(response) | |
| assert response_id is not None | |
| assert len(spoke.get_received_messages()) == 1 | |
| def test_spoke_connection_lifecycle(self, test_agent, central_post): | |
| """Test spoke connection establishment and teardown.""" | |
| spoke = Spoke(agent=test_agent, central_post=central_post) | |
| # Initial state | |
| assert spoke.is_connected is True | |
| assert central_post.active_connections == 1 | |
| # Disconnect | |
| spoke.disconnect() | |
| assert spoke.is_connected is False | |
| assert central_post.active_connections == 0 | |
| # Reconnect | |
| spoke.reconnect() | |
| assert spoke.is_connected is True | |
| assert central_post.active_connections == 1 | |
| def test_spoke_message_reliability(self, test_agent, central_post): | |
| """Test spoke ensures reliable message delivery.""" | |
| spoke = Spoke(agent=test_agent, central_post=central_post) | |
| # Send message with delivery confirmation | |
| message = Message( | |
| sender_id=test_agent.agent_id, | |
| message_type=MessageType.STATUS_UPDATE, | |
| content={"status": "processing", "progress": 0.5}, | |
| timestamp=0.7 | |
| ) | |
| message_id = spoke.send_message_reliable(message) | |
| # Should get delivery confirmation | |
| assert message_id is not None | |
| assert spoke.is_message_delivered(message_id) is True | |
| assert spoke.get_delivery_time(message_id) is not None | |
| class TestMessageTypes: | |
| """Test different message types and their handling.""" | |
| def test_message_type_enumeration(self): | |
| """Test message types are properly defined.""" | |
| assert MessageType.TASK_REQUEST.value == "task_request" | |
| assert MessageType.TASK_ASSIGNMENT.value == "task_assignment" | |
| assert MessageType.STATUS_UPDATE.value == "status_update" | |
| assert MessageType.TASK_COMPLETE.value == "task_complete" | |
| assert MessageType.ERROR_REPORT.value == "error_report" | |
| def test_message_creation_with_types(self): | |
| """Test messages can be created with different types.""" | |
| # Task request message | |
| task_msg = Message( | |
| sender_id="agent_001", | |
| message_type=MessageType.TASK_REQUEST, | |
| content={"task_type": "word_count"}, | |
| timestamp=0.5 | |
| ) | |
| assert task_msg.message_type == MessageType.TASK_REQUEST | |
| assert task_msg.sender_id == "agent_001" | |
| # Status update message | |
| status_msg = Message( | |
| sender_id="agent_002", | |
| message_type=MessageType.STATUS_UPDATE, | |
| content={"progress": 0.75, "current_position": [1.2, 3.4, 5.6]}, | |
| timestamp=0.8 | |
| ) | |
| assert status_msg.message_type == MessageType.STATUS_UPDATE | |
| assert status_msg.content["progress"] == 0.75 | |
| class TestCommunicationPerformance: | |
| """Test communication system performance metrics for Hypothesis H2.""" | |
| def standard_helix(self): | |
| """Create helix with OpenSCAD model parameters.""" | |
| return HelixGeometry( | |
| top_radius=33.0, | |
| bottom_radius=0.001, | |
| height=33.0, | |
| turns=33 | |
| ) | |
| def performance_central_post(self): | |
| """Create central post configured for performance testing.""" | |
| return CentralPost(max_agents=133, enable_metrics=True) | |
| def test_message_throughput_measurement(self, performance_central_post, standard_helix): | |
| """Test system can measure message throughput.""" | |
| # Register agents first | |
| for i in range(100): | |
| agent = Agent(f"agent_{i:03d}", 0.1, standard_helix) | |
| performance_central_post.register_agent(agent) | |
| # Send batch of messages | |
| messages = [] | |
| for i in range(100): | |
| msg = Message( | |
| sender_id=f"agent_{i:03d}", | |
| message_type=MessageType.STATUS_UPDATE, | |
| content={"progress": i/100}, | |
| timestamp=i/1000 | |
| ) | |
| messages.append(msg) | |
| # Queue all messages and measure processing time | |
| start_time = performance_central_post.get_current_time() | |
| for msg in messages: | |
| performance_central_post.queue_message(msg) | |
| # Process all messages | |
| while performance_central_post.has_pending_messages(): | |
| performance_central_post.process_next_message() | |
| end_time = performance_central_post.get_current_time() | |
| # Check performance metrics | |
| throughput = performance_central_post.get_message_throughput() | |
| processing_time = end_time - start_time | |
| assert throughput > 0 | |
| assert processing_time > 0 | |
| assert performance_central_post.total_messages_processed == 100 | |
| def test_communication_overhead_tracking(self, performance_central_post): | |
| """Test system tracks communication overhead vs processing time.""" | |
| # This validates Hypothesis H2: spoke communication reduces overhead | |
| # Simulate agent processing with communication | |
| agent_processing_time = 0.1 # seconds | |
| communication_time = performance_central_post.measure_communication_overhead( | |
| num_messages=10, | |
| processing_time=agent_processing_time | |
| ) | |
| # Communication overhead should be measurable | |
| assert communication_time >= 0 | |
| overhead_ratio = communication_time / agent_processing_time | |
| # Store metrics for hypothesis validation | |
| performance_central_post.record_overhead_ratio(overhead_ratio) | |
| # Should be able to retrieve metrics | |
| avg_overhead = performance_central_post.get_average_overhead_ratio() | |
| assert avg_overhead >= 0 | |
| def test_scalability_with_multiple_agents(self, performance_central_post, standard_helix): | |
| """Test communication system scales with increasing agent count.""" | |
| agent_counts = [10, 50, 100, 133] # Up to OpenSCAD model size | |
| for count in agent_counts: | |
| # Create fresh central post for each iteration to avoid registration conflicts | |
| test_central_post = CentralPost(max_agents=count, enable_metrics=True) | |
| # Register agents for this test | |
| for i in range(count): | |
| agent = Agent(f"scale_agent_{i:03d}", 0.1, standard_helix) | |
| test_central_post.register_agent(agent) | |
| # Measure performance with different agent counts | |
| start_time = test_central_post.get_current_time() | |
| # Simulate concurrent messages from multiple agents | |
| for i in range(count): | |
| msg = Message( | |
| sender_id=f"scale_agent_{i:03d}", | |
| message_type=MessageType.STATUS_UPDATE, | |
| content={"agent_count": count}, | |
| timestamp=start_time | |
| ) | |
| test_central_post.queue_message(msg) | |
| # Process all messages | |
| while test_central_post.has_pending_messages(): | |
| test_central_post.process_next_message() | |
| end_time = test_central_post.get_current_time() | |
| # Record scaling metrics | |
| processing_time = end_time - start_time | |
| test_central_post.record_scaling_metric(count, processing_time) | |
| # Store metrics in main central post for final validation | |
| performance_central_post.record_scaling_metric(count, processing_time) | |
| # Should have scaling data for analysis | |
| scaling_data = performance_central_post.get_scaling_metrics() | |
| assert len(scaling_data) == len(agent_counts) | |
| # Check that system handles maximum load (133 agents) | |
| max_load_time = scaling_data[133] # Time for 133 agents | |
| assert max_load_time < 1.0 # Should process within 1 second | |
| class TestCommunicationIntegration: | |
| """Test integration between agents, spokes, and central post.""" | |
| def integration_setup(self): | |
| """Create full communication system for integration testing.""" | |
| helix = HelixGeometry(33.0, 0.001, 33.0, 33) | |
| central_post = CentralPost(max_agents=10) | |
| # Create multiple agents | |
| agents = [] | |
| spokes = [] | |
| for i in range(5): | |
| agent = Agent(f"integration_agent_{i:03d}", i/10, helix) | |
| spoke = Spoke(agent=agent, central_post=central_post) | |
| agents.append(agent) | |
| spokes.append(spoke) | |
| return { | |
| 'helix': helix, | |
| 'central_post': central_post, | |
| 'agents': agents, | |
| 'spokes': spokes | |
| } | |
| def test_full_communication_workflow(self, integration_setup): | |
| """Test complete workflow: agent spawn, communication, task processing.""" | |
| setup = integration_setup | |
| central_post = setup['central_post'] | |
| agents = setup['agents'] | |
| spokes = setup['spokes'] | |
| # Simulate time progression with communication | |
| current_time = 0.5 | |
| for i, (agent, spoke) in enumerate(zip(agents, spokes)): | |
| if agent.can_spawn(current_time): | |
| # Agent spawns and requests task | |
| mock_task = Mock() | |
| mock_task.id = f"integration_task_{i}" | |
| agent.spawn(current_time, mock_task) | |
| # Send task request through spoke | |
| request = Message( | |
| sender_id=agent.agent_id, | |
| message_type=MessageType.TASK_REQUEST, | |
| content={"request_type": "word_count"}, | |
| timestamp=current_time | |
| ) | |
| spoke.send_message(request) | |
| # Central post processes all requests | |
| task_assignments = [] | |
| while central_post.has_pending_messages(): | |
| msg = central_post.process_next_message() | |
| if msg.message_type == MessageType.TASK_REQUEST: | |
| # Generate task assignment | |
| assignment = Message( | |
| sender_id="central_post", | |
| message_type=MessageType.TASK_ASSIGNMENT, | |
| content={ | |
| "task_id": f"task_{len(task_assignments)}", | |
| "data": "sample text for processing" | |
| }, | |
| timestamp=current_time + 0.01 | |
| ) | |
| task_assignments.append(assignment) | |
| # Should have generated assignments for spawned agents | |
| spawned_count = sum(1 for agent in agents if agent.state != AgentState.WAITING) | |
| assert len(task_assignments) == spawned_count | |
| assert central_post.message_queue_size == 0 | |
| # Validate communication system is operational | |
| assert central_post.active_connections == len(spokes) | |
| assert all(spoke.is_connected for spoke in spokes) |