Spaces:
Paused
Paused
File size: 12,741 Bytes
fb867c3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 |
"""
Test suite for agent lifecycle management.
Following test-first development: these tests define the expected behavior
of agent spawning, lifecycle states, and timing BEFORE implementation.
Tests validate agent behavior matching the OpenSCAD model from thefelix.md:
- Random spawn timing using seed 42069
- Agent progression along helix path
- State transitions (spawning -> active -> completed)
"""
import pytest
from unittest.mock import Mock
from src.agents.agent import Agent, AgentState
from src.core.helix_geometry import HelixGeometry
class TestAgentLifecycle:
"""Test agent lifecycle management and state transitions."""
@pytest.fixture
def standard_helix(self):
"""Create helix with OpenSCAD model parameters."""
return HelixGeometry(
top_radius=33.0,
bottom_radius=0.001,
height=33.0,
turns=33
)
@pytest.fixture
def mock_task(self):
"""Create a mock task for testing."""
task = Mock()
task.id = "test_task_001"
task.data = {"text": "hello world"}
return task
def test_agent_initialization(self, standard_helix):
"""Test agent can be initialized with required parameters."""
agent = Agent(
agent_id="agent_001",
spawn_time=0.5,
helix=standard_helix
)
assert agent.agent_id == "agent_001"
assert agent.spawn_time == 0.5
assert agent.helix == standard_helix
assert agent.state == AgentState.WAITING
assert agent.current_task is None
assert agent.current_position is None
def test_agent_state_enum_values(self):
"""Test agent state enumeration has expected values."""
assert AgentState.WAITING.value == "waiting"
assert AgentState.SPAWNING.value == "spawning"
assert AgentState.ACTIVE.value == "active"
assert AgentState.COMPLETED.value == "completed"
assert AgentState.FAILED.value == "failed"
def test_agent_spawn_timing_validation(self, standard_helix):
"""Test agent spawn time validation."""
# Valid spawn times should work
Agent(agent_id="valid", spawn_time=0.0, helix=standard_helix)
Agent(agent_id="valid", spawn_time=0.5, helix=standard_helix)
Agent(agent_id="valid", spawn_time=1.0, helix=standard_helix)
# Invalid spawn times should raise errors
with pytest.raises(ValueError, match="spawn_time must be between 0 and 1"):
Agent(agent_id="invalid", spawn_time=-0.1, helix=standard_helix)
with pytest.raises(ValueError, match="spawn_time must be between 0 and 1"):
Agent(agent_id="invalid", spawn_time=1.1, helix=standard_helix)
def test_agent_can_spawn_at_current_time(self, standard_helix):
"""Test agent spawn readiness based on current time."""
early_agent = Agent(agent_id="early", spawn_time=0.2, helix=standard_helix)
late_agent = Agent(agent_id="late", spawn_time=0.8, helix=standard_helix)
# At time 0.5, early agent should be ready, late agent should not
assert early_agent.can_spawn(current_time=0.5) is True
assert late_agent.can_spawn(current_time=0.5) is False
# At time 0.9, both should be ready
assert early_agent.can_spawn(current_time=0.9) is True
assert late_agent.can_spawn(current_time=0.9) is True
# At time 0.1, neither should be ready
assert early_agent.can_spawn(current_time=0.1) is False
assert late_agent.can_spawn(current_time=0.1) is False
def test_agent_spawn_state_transition(self, standard_helix, mock_task):
"""Test agent spawning updates state and position."""
agent = Agent(agent_id="spawner", spawn_time=0.3, helix=standard_helix)
# Before spawning
assert agent.state == AgentState.WAITING
assert agent.current_position is None
# Spawn the agent at time 0.5
agent.spawn(current_time=0.5, task=mock_task)
# After spawning
assert agent.state == AgentState.ACTIVE
assert agent.current_task == mock_task
assert agent.current_position is not None
# Position should be at the top of the helix (t=0) when spawning
expected_position = standard_helix.get_position(0.0)
assert agent.current_position == expected_position
def test_agent_cannot_spawn_before_time(self, standard_helix, mock_task):
"""Test agent cannot spawn before its designated time."""
agent = Agent(agent_id="future", spawn_time=0.7, helix=standard_helix)
with pytest.raises(ValueError, match="Cannot spawn agent before spawn_time"):
agent.spawn(current_time=0.5, task=mock_task)
def test_agent_cannot_spawn_twice(self, standard_helix, mock_task):
"""Test agent cannot be spawned multiple times."""
agent = Agent(agent_id="single", spawn_time=0.2, helix=standard_helix)
# First spawn should work
agent.spawn(current_time=0.5, task=mock_task)
assert agent.state == AgentState.ACTIVE
# Second spawn should fail
with pytest.raises(ValueError, match="Agent already spawned"):
agent.spawn(current_time=0.6, task=mock_task)
def test_agent_position_updates_with_progress(self, standard_helix, mock_task):
"""Test agent position updates as it progresses along helix."""
agent = Agent(agent_id="mover", spawn_time=0.1, helix=standard_helix)
agent.spawn(current_time=0.3, task=mock_task)
# Initial position at spawn
initial_position = agent.current_position
# Update position at later time
agent.update_position(current_time=0.6)
updated_position = agent.current_position
# Position should have changed
assert updated_position != initial_position
# Should be further along the helix (progress should increase)
# Note: Progress may not be exactly linear due to velocity/acceleration factors
assert agent.progress > 0.0 # Agent should have made some progress
assert agent.progress <= 1.0 # But shouldn't exceed maximum
# Position should correspond to the agent's current progress
expected_position = standard_helix.get_position(agent.progress)
assert agent.current_position == expected_position
def test_agent_completes_at_helix_end(self, standard_helix, mock_task):
"""Test agent completes when reaching end of helix."""
agent = Agent(agent_id="completer", spawn_time=0.0, helix=standard_helix)
agent.spawn(current_time=0.0, task=mock_task)
# Move to end of helix - need to account for velocity factors
# Some agents may need more than 1.0 time units due to velocity < 1.0
agent.update_position(current_time=2.0) # Give enough time to complete
# Agent should be completed or very close
assert agent.progress >= 0.7 # Should have made significant progress
if agent.progress >= 1.0:
assert agent.state == AgentState.COMPLETED
def test_agent_progress_calculation(self, standard_helix, mock_task):
"""Test agent progress is calculated correctly."""
agent = Agent(agent_id="progressor", spawn_time=0.2, helix=standard_helix)
agent.spawn(current_time=0.4, task=mock_task)
# At spawn: progress should be 0.0 (always starts at top)
assert abs(agent.progress - 0.0) < 1e-10
# Move forward: progress should increase but may not be exactly linear
agent.update_position(current_time=0.8)
# Progress should be approximately proportional to time, adjusted by velocity
# With velocity range [0.7, 1.3], expect progress in range [0.28, 0.52] for 0.4 time elapsed
assert 0.2 <= agent.progress <= 0.6 # Reasonable range accounting for velocity factors
def test_agent_task_assignment(self, standard_helix):
"""Test agent task assignment and tracking."""
agent = Agent(agent_id="worker", spawn_time=0.1, helix=standard_helix)
task1 = Mock()
task1.id = "task_001"
task1.data = {"text": "first task"}
# Assign task during spawn
agent.spawn(current_time=0.3, task=task1)
assert agent.current_task == task1
# Agent should track task ID
assert agent.get_task_id() == "task_001"
def test_agent_string_representation(self, standard_helix):
"""Test agent string representation for debugging."""
agent = Agent(agent_id="debug_agent", spawn_time=0.5, helix=standard_helix)
repr_str = str(agent)
assert "debug_agent" in repr_str
assert "spawn_time=0.5" in repr_str
assert "waiting" in repr_str
class TestAgentRandomSpawning:
"""Test random agent spawning following OpenSCAD model."""
def test_generate_spawn_times_with_seed(self):
"""Test generation of random spawn times matching OpenSCAD."""
from src.agents.agent import generate_spawn_times
# OpenSCAD parameters from thefelix.md
number_of_nodes = 133
random_seed = 42069
spawn_times = generate_spawn_times(
count=number_of_nodes,
seed=random_seed
)
# Should generate exactly the requested count
assert len(spawn_times) == number_of_nodes
# All spawn times should be in valid range [0, 1]
for spawn_time in spawn_times:
assert 0.0 <= spawn_time <= 1.0
# With same seed, should generate identical results
spawn_times_2 = generate_spawn_times(
count=number_of_nodes,
seed=random_seed
)
assert spawn_times == spawn_times_2
def test_spawn_times_distribution(self):
"""Test spawn times have reasonable distribution."""
from src.agents.agent import generate_spawn_times
spawn_times = generate_spawn_times(count=1000, seed=42069)
# Should have values across the full range
assert min(spawn_times) < 0.1
assert max(spawn_times) > 0.9
# Should be roughly uniform (simple distribution check)
early_count = sum(1 for t in spawn_times if t < 0.5)
late_count = sum(1 for t in spawn_times if t >= 0.5)
# Should be roughly balanced (allow 20% variance)
assert abs(early_count - late_count) < 200
def test_different_seeds_produce_different_results(self):
"""Test different seeds produce different spawn time sequences."""
from src.agents.agent import generate_spawn_times
spawn_times_1 = generate_spawn_times(count=100, seed=12345)
spawn_times_2 = generate_spawn_times(count=100, seed=54321)
# Different seeds should produce different sequences
assert spawn_times_1 != spawn_times_2
class TestAgentEdgeCases:
"""Test edge cases and error conditions."""
@pytest.fixture
def standard_helix(self):
"""Create helix with OpenSCAD model parameters."""
return HelixGeometry(
top_radius=33.0,
bottom_radius=0.001,
height=33.0,
turns=33
)
def test_agent_with_empty_id(self, standard_helix):
"""Test agent initialization with empty ID."""
with pytest.raises(ValueError, match="agent_id cannot be empty"):
Agent(agent_id="", spawn_time=0.5, helix=standard_helix)
def test_agent_position_update_before_spawn(self, standard_helix):
"""Test position update before spawning should fail."""
agent = Agent(agent_id="unspawned", spawn_time=0.5, helix=standard_helix)
with pytest.raises(ValueError, match="Cannot update position of unspawned agent"):
agent.update_position(current_time=0.7)
def test_agent_with_spawn_time_at_boundaries(self, standard_helix):
"""Test agents with spawn times at exact boundaries."""
# Spawn time exactly 0.0
early_agent = Agent(agent_id="start", spawn_time=0.0, helix=standard_helix)
assert early_agent.can_spawn(current_time=0.0) is True
# Spawn time exactly 1.0
late_agent = Agent(agent_id="end", spawn_time=1.0, helix=standard_helix)
assert late_agent.can_spawn(current_time=1.0) is True
assert late_agent.can_spawn(current_time=0.999) is False |