""" Agent lifecycle management for the Felix Framework. This module implements autonomous agents that traverse the helical path, matching the behavior specified in the OpenSCAD model from thefelix.md. Mathematical Foundation: - Agent spawn distribution: T_i ~ U(0,1) (uniform random timing) - Position progression: r_i(τ) = r(T_i + (τ - T_i)) along helix path - Workload distribution analysis supports Hypothesis H1 validation - Agent density evolution: ρ(t,τ) for attention focusing (Hypothesis H3) Key Features: - Random spawn timing using configurable seeds (matches OpenSCAD rands() function) - Helix path traversal with precise position tracking - State machine for lifecycle management (WAITING → ACTIVE → COMPLETED) - Task assignment and processing with performance measurement Mathematical references: - docs/mathematical_model.md, Section 4: Agent distribution functions and density evolution - docs/hypothesis_mathematics.md, Section H1: Workload distribution statistical analysis - docs/hypothesis_mathematics.md, Section H3: Attention focusing mechanism and agent density Implementation supports testing of Hypotheses H1 (task distribution) and H3 (attention focusing). """ import random from enum import Enum from typing import Optional, List, Tuple, Any from src.core.helix_geometry import HelixGeometry class AgentState(Enum): """Agent lifecycle states.""" WAITING = "waiting" # Before spawn time SPAWNING = "spawning" # Transitioning to active ACTIVE = "active" # Processing along helix COMPLETED = "completed" # Reached end of helix FAILED = "failed" # Error condition class Agent: """ Autonomous agent that traverses the helical path. Agents spawn at random times and progress along the helix while processing assigned tasks. Position and state are tracked throughout the lifecycle. """ def __init__(self, agent_id: str, spawn_time: float, helix: HelixGeometry): """ Initialize agent with lifecycle parameters. Args: agent_id: Unique identifier for the agent spawn_time: Time when agent becomes active (0.0 to 1.0) helix: Helix geometry for path calculation Raises: ValueError: If parameters are invalid """ self._validate_initialization(agent_id, spawn_time) self.agent_id = agent_id self.spawn_time = spawn_time self.helix = helix self.state = AgentState.WAITING self.current_task: Optional[Any] = None self.current_position: Optional[Tuple[float, float, float]] = None self._progress: float = 0.0 self._spawn_timestamp: Optional[float] = None # Non-linear progression mechanics self._velocity: float = random.uniform(0.7, 1.3) # Base velocity multiplier self._pause_until: Optional[float] = None # Pause agent until this time self._acceleration: float = 1.0 # Current acceleration factor self._confidence_history: List[float] = [] # Track confidence over time def _validate_initialization(self, agent_id: str, spawn_time: float) -> None: """Validate agent initialization parameters.""" if not agent_id or agent_id.strip() == "": raise ValueError("agent_id cannot be empty") if not (0.0 <= spawn_time <= 1.0): raise ValueError("spawn_time must be between 0 and 1") def can_spawn(self, current_time: float) -> bool: """ Check if agent can spawn at the current time. Args: current_time: Current simulation time (0.0 to 1.0) Returns: True if agent can spawn, False otherwise """ return current_time >= self.spawn_time def spawn(self, current_time: float, task: Any) -> None: """ Spawn the agent and begin processing. Args: current_time: Current simulation time task: Task to assign to the agent Raises: ValueError: If spawn conditions are not met """ if not self.can_spawn(current_time): raise ValueError("Cannot spawn agent before spawn_time") if self.state != AgentState.WAITING: raise ValueError("Agent already spawned") # Agents always start at the top of the helix (progress = 0) self._progress = 0.0 self.state = AgentState.ACTIVE self.current_task = task self.current_position = self.helix.get_position(self._progress) self._spawn_timestamp = current_time # Record when agent actually spawned def update_position(self, current_time: float) -> None: """ Update agent position based on current time with non-linear progression. Args: current_time: Current simulation time Raises: ValueError: If agent hasn't been spawned """ if self.state == AgentState.WAITING: raise ValueError("Cannot update position of unspawned agent") if self.state in [AgentState.COMPLETED, AgentState.FAILED]: return # No further updates needed # Calculate progression from when agent actually spawned if self._spawn_timestamp is None: raise ValueError("Cannot update position: agent has not spawned") # Check for pauses if self._pause_until is not None and current_time < self._pause_until: return # Agent is paused, don't update position # Non-linear progression calculation base_progression_time = current_time - self._spawn_timestamp # Apply velocity and acceleration modifiers effective_velocity = self._velocity * self._acceleration adjusted_progression_time = base_progression_time * effective_velocity # Update progress with non-linear factors self._progress = min(adjusted_progression_time, 1.0) # Cap at 1.0 # Update position self.current_position = self.helix.get_position(self._progress) # Adaptive acceleration based on confidence history self._adapt_velocity_from_confidence() # Check for completion if self._progress >= 1.0: self.state = AgentState.COMPLETED @property def progress(self) -> float: """Get current progress along helix (0.0 to 1.0).""" return self._progress def get_position(self, current_time: float) -> Optional[Tuple[float, float, float]]: """ Get agent's current position on the helix. Args: current_time: Current simulation time Returns: (x, y, z) position tuple or None if not spawned """ if self.state == AgentState.WAITING: return None # Update position based on current time self.update_position(current_time) return self.current_position def get_task_id(self) -> Optional[str]: """Get ID of current task, if any.""" if self.current_task and hasattr(self.current_task, 'id'): return self.current_task.id return None def __str__(self) -> str: """String representation for debugging.""" return (f"Agent(id={self.agent_id}, spawn_time={self.spawn_time}, " f"state={self.state.value}, progress={self._progress:.3f})") def __repr__(self) -> str: """Detailed representation for debugging.""" return str(self) def _adapt_velocity_from_confidence(self) -> None: """Adapt agent velocity based on confidence history.""" if len(self._confidence_history) < 2: return # Need at least 2 confidence readings for trend analysis # Calculate confidence trend (recent vs earlier) recent_avg = sum(self._confidence_history[-2:]) / 2 earlier_avg = sum(self._confidence_history[:-2]) / max(1, len(self._confidence_history) - 2) confidence_trend = recent_avg - earlier_avg # Adjust acceleration based on confidence trend if confidence_trend > 0.1: # Improving confidence self._acceleration = min(1.5, self._acceleration * 1.1) # Speed up elif confidence_trend < -0.1: # Decreasing confidence self._acceleration = max(0.5, self._acceleration * 0.9) # Slow down # Else maintain current acceleration def record_confidence(self, confidence: float) -> None: """Record confidence score for adaptive progression.""" self._confidence_history.append(confidence) # Keep only recent history to prevent memory bloat if len(self._confidence_history) > 10: self._confidence_history = self._confidence_history[-10:] def pause_for_duration(self, duration: float, current_time: float) -> None: """Pause agent progression for specified duration.""" self._pause_until = current_time + duration def set_velocity_multiplier(self, velocity: float) -> None: """Set base velocity multiplier for this agent.""" self._velocity = max(0.1, min(3.0, velocity)) # Clamp to reasonable range @property def velocity(self) -> float: """Get current effective velocity (velocity * acceleration).""" return self._velocity * self._acceleration @property def is_paused(self) -> bool: """Check if agent is currently paused.""" return self._pause_until is not None def get_progression_info(self) -> dict: """Get detailed progression information for debugging.""" return { "velocity": self._velocity, "acceleration": self._acceleration, "effective_velocity": self.velocity, "is_paused": self.is_paused, "pause_until": self._pause_until, "confidence_history": self._confidence_history.copy(), "confidence_trend": ( self._confidence_history[-1] - self._confidence_history[0] if len(self._confidence_history) > 1 else 0.0 ) } def generate_spawn_times(count: int, seed: Optional[int] = None) -> List[float]: """ Generate random spawn times matching OpenSCAD model. Replicates the OpenSCAD function: node_start_times = rands(0, 1, number_of_nodes, random_seed); Args: count: Number of spawn times to generate seed: Random seed for reproducibility (matches OpenSCAD seed) Returns: List of spawn times in range [0.0, 1.0] """ if seed is not None: random.seed(seed) return [random.random() for _ in range(count)] def create_agents_from_spawn_times(spawn_times: List[float], helix: HelixGeometry) -> List[Agent]: """ Create agent instances from spawn time list. Args: spawn_times: List of spawn times for agents helix: Helix geometry for agent path calculation Returns: List of initialized Agent instances """ agents = [] for i, spawn_time in enumerate(spawn_times): agent_id = f"agent_{i:03d}" agent = Agent(agent_id=agent_id, spawn_time=spawn_time, helix=helix) agents.append(agent) return agents def create_openscad_agents(helix: HelixGeometry, number_of_nodes: int = 133, random_seed: int = 42069) -> List[Agent]: """ Create agents matching OpenSCAD model parameters. Uses the exact parameters from thefelix.md: - number_of_nodes = 133 - random_seed = 42069 Args: helix: Helix geometry for agent paths number_of_nodes: Number of agents to create random_seed: Random seed for spawn time generation Returns: List of Agent instances with OpenSCAD-compatible spawn times """ spawn_times = generate_spawn_times(count=number_of_nodes, seed=random_seed) return create_agents_from_spawn_times(spawn_times, helix)