""" Central coordination system for the Felix Framework. The central post manages communication and coordination between agents, implementing the hub of the spoke-based communication model from thefelix.md. Mathematical Foundation: - Spoke communication: O(N) message complexity vs O(N²) mesh topology - Maximum communication distance: R_top (helix outer radius) - Performance metrics for Hypothesis H2 validation and statistical analysis Key Features: - Agent registration and connection management - FIFO message queuing with guaranteed ordering - Performance metrics collection (throughput, latency, overhead ratios) - Scalability up to 133 agents (matching OpenSCAD model parameters) Mathematical references: - docs/mathematical_model.md, Section 5: Spoke geometry and communication complexity - docs/hypothesis_mathematics.md, Section H2: Communication overhead analysis and proofs - Theoretical proof of O(N) vs O(N²) scaling advantage in hypothesis documentation Implementation supports rigorous testing of Hypothesis H2 communication efficiency claims. """ import time import uuid import random import logging from enum import Enum from typing import Dict, List, Optional, Any, Tuple, TYPE_CHECKING from dataclasses import dataclass, field from collections import deque from queue import Queue, Empty import asyncio # Memory system imports from src.memory.knowledge_store import KnowledgeStore, KnowledgeEntry, KnowledgeType, ConfidenceLevel from src.memory.task_memory import TaskMemory, TaskPattern, TaskOutcome from src.memory.context_compression import ContextCompressor, CompressionStrategy # Dynamic spawning imports - moved to avoid circular imports if TYPE_CHECKING: from src.agents.llm_agent import LLMAgent from src.core.helix_geometry import HelixGeometry from src.llm.lm_studio_client import LMStudioClient from src.llm.token_budget import TokenBudgetManager # Set up logging logger = logging.getLogger(__name__) class MessageType(Enum): """Types of messages in the communication system.""" TASK_REQUEST = "task_request" TASK_ASSIGNMENT = "task_assignment" STATUS_UPDATE = "status_update" TASK_COMPLETE = "task_complete" ERROR_REPORT = "error_report" @dataclass class Message: """Message structure for communication between agents and central post.""" sender_id: str message_type: MessageType content: Dict[str, Any] timestamp: float message_id: str = field(default_factory=lambda: str(uuid.uuid4())) class CentralPost: """ Central coordination system managing all agent communication. The central post acts as the hub in the spoke-based communication model, processing messages from agents and coordinating task assignments. """ def __init__(self, max_agents: int = 133, enable_metrics: bool = False, enable_memory: bool = True, memory_db_path: str = "felix_memory.db"): """ Initialize central post with configuration parameters. Args: max_agents: Maximum number of concurrent agent connections enable_metrics: Whether to collect performance metrics enable_memory: Whether to enable persistent memory systems memory_db_path: Path to the memory database file """ self.max_agents = max_agents self.enable_metrics = enable_metrics self.enable_memory = enable_memory # Connection management self._registered_agents: Dict[str, str] = {} # agent_id -> connection_id self._connection_times: Dict[str, float] = {} # agent_id -> registration_time # Message processing (sync and async) self._message_queue: Queue = Queue() self._async_message_queue: Optional[asyncio.Queue] = None # Lazy initialization self._processed_messages: List[Message] = [] self._async_processors: List[asyncio.Task] = [] # Performance metrics (for Hypothesis H2) self._metrics_enabled = enable_metrics self._start_time = time.time() self._total_messages_processed = 0 self._processing_times: List[float] = [] self._overhead_ratios: List[float] = [] self._scaling_metrics: Dict[int, float] = {} # Memory systems (Priority 5: Memory and Context Persistence) self._memory_enabled = enable_memory if enable_memory: self.knowledge_store = KnowledgeStore(memory_db_path) self.task_memory = TaskMemory(memory_db_path) self.context_compressor = ContextCompressor() else: self.knowledge_store = None self.task_memory = None self.context_compressor = None # System state self._is_active = True @property def active_connections(self) -> int: """Get number of currently registered agents.""" return len(self._registered_agents) @property def message_queue_size(self) -> int: """Get number of pending messages in queue.""" return self._message_queue.qsize() @property def is_active(self) -> bool: """Check if central post is active and accepting connections.""" return self._is_active @property def total_messages_processed(self) -> int: """Get total number of messages processed.""" return self._total_messages_processed def register_agent(self, agent) -> str: """ Register an agent with the central post. Args: agent: Agent instance to register Returns: Connection ID for the registered agent Raises: ValueError: If maximum connections exceeded or agent already registered """ if self.active_connections >= self.max_agents: raise ValueError("Maximum agent connections exceeded") if agent.agent_id in self._registered_agents: raise ValueError(f"Agent {agent.agent_id} already registered") # Create unique connection ID connection_id = str(uuid.uuid4()) # Register agent self._registered_agents[agent.agent_id] = connection_id self._connection_times[agent.agent_id] = time.time() return connection_id def deregister_agent(self, agent_id: str) -> bool: """ Deregister an agent from the central post. Args: agent_id: ID of agent to deregister Returns: True if successfully deregistered, False if not found """ if agent_id not in self._registered_agents: return False # Remove agent registration del self._registered_agents[agent_id] del self._connection_times[agent_id] return True def is_agent_registered(self, agent_id: str) -> bool: """ Check if an agent is currently registered. Args: agent_id: ID of agent to check Returns: True if agent is registered, False otherwise """ return agent_id in self._registered_agents async def _ensure_async_queue(self) -> asyncio.Queue: """Ensure async message queue is initialized.""" if self._async_message_queue is None: self._async_message_queue = asyncio.Queue(maxsize=1000) return self._async_message_queue def queue_message(self, message: Message) -> str: """ Queue a message for processing (sync). Args: message: Message to queue Returns: Message ID for tracking """ if not self._is_active: raise RuntimeError("Central post is not active") # Validate sender is registered if message.sender_id != "central_post" and message.sender_id not in self._registered_agents: raise ValueError(f"Message from unregistered agent: {message.sender_id}") # Queue message self._message_queue.put(message) return message.message_id async def queue_message_async(self, message: Message) -> str: """ Queue a message for async processing. Args: message: Message to queue Returns: Message ID for tracking """ if not self._is_active: raise RuntimeError("Central post is not active") # Validate sender is registered if message.sender_id != "central_post" and message.sender_id not in self._registered_agents: raise ValueError(f"Message from unregistered agent: {message.sender_id}") # Queue message asynchronously async_queue = await self._ensure_async_queue() await async_queue.put(message) return message.message_id def has_pending_messages(self) -> bool: """ Check if there are messages waiting to be processed. Returns: True if messages are pending, False otherwise """ return not self._message_queue.empty() def process_next_message(self) -> Optional[Message]: """ Process the next message in the queue (FIFO order). Returns: Processed message, or None if queue is empty """ try: # Get next message start_time = time.time() if self._metrics_enabled else None message = self._message_queue.get_nowait() # Process message (placeholder - actual processing depends on message type) self._handle_message(message) # Record metrics if self._metrics_enabled and start_time: processing_time = time.time() - start_time self._processing_times.append(processing_time) # Track processed message self._processed_messages.append(message) self._total_messages_processed += 1 return message except Empty: return None async def process_next_message_async(self) -> Optional[Message]: """ Process the next message in the async queue (FIFO order). Returns: Processed message, or None if queue is empty """ try: async_queue = await self._ensure_async_queue() # Try to get message without blocking try: message = async_queue.get_nowait() except asyncio.QueueEmpty: return None # Get next message start_time = time.time() if self._metrics_enabled else None # Process message asynchronously await self._handle_message_async(message) # Record metrics if self._metrics_enabled and start_time: processing_time = time.time() - start_time self._processing_times.append(processing_time) # Track processed message self._processed_messages.append(message) self._total_messages_processed += 1 return message except Exception as e: logger.error(f"Async message processing failed: {e}") return None def _handle_message(self, message: Message) -> None: """ Handle specific message types (internal processing). Args: message: Message to handle """ # Message type-specific handling if message.message_type == MessageType.TASK_REQUEST: self._handle_task_request(message) elif message.message_type == MessageType.STATUS_UPDATE: self._handle_status_update(message) elif message.message_type == MessageType.TASK_COMPLETE: self._handle_task_completion(message) elif message.message_type == MessageType.ERROR_REPORT: self._handle_error_report(message) # Add more handlers as needed async def _handle_message_async(self, message: Message) -> None: """ Handle specific message types asynchronously (internal processing). Args: message: Message to handle """ # Message type-specific async handling if message.message_type == MessageType.TASK_REQUEST: await self._handle_task_request_async(message) elif message.message_type == MessageType.STATUS_UPDATE: await self._handle_status_update_async(message) elif message.message_type == MessageType.TASK_COMPLETE: await self._handle_task_completion_async(message) elif message.message_type == MessageType.ERROR_REPORT: await self._handle_error_report_async(message) # Add more handlers as needed def _handle_task_request(self, message: Message) -> None: """Handle task request from agent.""" # Placeholder for task assignment logic pass def _handle_status_update(self, message: Message) -> None: """Handle status update from agent.""" # Placeholder for status tracking logic pass def _handle_task_completion(self, message: Message) -> None: """Handle task completion notification.""" # Placeholder for completion processing logic pass def _handle_error_report(self, message: Message) -> None: """Handle error report from agent.""" # Placeholder for error handling logic pass # Async message handlers async def _handle_task_request_async(self, message: Message) -> None: """Handle task request from agent asynchronously.""" # Async task assignment logic pass async def _handle_status_update_async(self, message: Message) -> None: """Handle status update from agent asynchronously.""" # Async status tracking logic pass async def _handle_task_completion_async(self, message: Message) -> None: """Handle task completion notification asynchronously.""" # Async completion processing logic pass async def _handle_error_report_async(self, message: Message) -> None: """Handle error report from agent asynchronously.""" # Async error handling logic pass # Performance metrics methods (for Hypothesis H2) def get_current_time(self) -> float: """Get current timestamp for performance measurements.""" return time.time() def get_message_throughput(self) -> float: """ Calculate message processing throughput. Returns: Messages processed per second """ if not self._metrics_enabled or self._total_messages_processed == 0: return 0.0 elapsed_time = time.time() - self._start_time if elapsed_time == 0: return 0.0 return self._total_messages_processed / elapsed_time def measure_communication_overhead(self, num_messages: int, processing_time: float) -> float: """ Measure communication overhead vs processing time. Args: num_messages: Number of messages in the measurement processing_time: Actual processing time for comparison Returns: Communication overhead time """ if not self._metrics_enabled: return 0.0 # Simulate communication overhead calculation if self._processing_times: avg_msg_time = sum(self._processing_times) / len(self._processing_times) communication_overhead = avg_msg_time * num_messages return communication_overhead return 0.0 def record_overhead_ratio(self, overhead_ratio: float) -> None: """ Record overhead ratio for hypothesis validation. Args: overhead_ratio: Communication overhead / processing time ratio """ if self._metrics_enabled: self._overhead_ratios.append(overhead_ratio) def get_average_overhead_ratio(self) -> float: """ Get average overhead ratio across all measurements. Returns: Average overhead ratio """ if not self._overhead_ratios: return 0.0 return sum(self._overhead_ratios) / len(self._overhead_ratios) def record_scaling_metric(self, agent_count: int, processing_time: float) -> None: """ Record scaling performance metric. Args: agent_count: Number of agents in the test processing_time: Time to process messages from all agents """ if self._metrics_enabled: self._scaling_metrics[agent_count] = processing_time def get_scaling_metrics(self) -> Dict[int, float]: """ Get scaling performance metrics. Returns: Dictionary mapping agent count to processing time """ return self._scaling_metrics.copy() async def start_async_processing(self, max_concurrent_processors: int = 3) -> None: """Start async message processors.""" for i in range(max_concurrent_processors): processor = asyncio.create_task(self._async_message_processor(f"processor_{i}")) self._async_processors.append(processor) async def _async_message_processor(self, processor_id: str) -> None: """Individual async message processor.""" while self._is_active: try: message = await self.process_next_message_async() if message is None: # No messages to process, wait briefly await asyncio.sleep(0.01) continue logger.debug(f"Processor {processor_id} handled message {message.message_id}") except Exception as e: logger.error(f"Async processor {processor_id} error: {e}") await asyncio.sleep(0.1) # Brief recovery delay def shutdown(self) -> None: """Shutdown the central post and disconnect all agents.""" self._is_active = False # Clear all connections self._registered_agents.clear() self._connection_times.clear() # Clear message queue while not self._message_queue.empty(): try: self._message_queue.get_nowait() except Empty: break async def shutdown_async(self) -> None: """Shutdown async components.""" self._is_active = False # Cancel async processors for processor in self._async_processors: processor.cancel() # Wait for processors to finish if self._async_processors: await asyncio.gather(*self._async_processors, return_exceptions=True) self._async_processors.clear() def get_performance_summary(self) -> Dict[str, Any]: """ Get comprehensive performance summary for analysis. Returns: Dictionary containing all performance metrics """ if not self._metrics_enabled: return {"metrics_enabled": False} return { "metrics_enabled": True, "total_messages_processed": self._total_messages_processed, "message_throughput": self.get_message_throughput(), "average_overhead_ratio": self.get_average_overhead_ratio(), "scaling_metrics": self.get_scaling_metrics(), "active_connections": self.active_connections, "uptime": time.time() - self._start_time, "async_processors": len(self._async_processors), "async_queue_size": self._async_message_queue.qsize() if self._async_message_queue else 0 } def accept_high_confidence_result(self, message: Message, min_confidence: float = 0.8) -> bool: """ Accept agent results that meet minimum confidence threshold. This implements the natural selection aspect of the helix model - only high-quality results from synthesis agents deep in the helix are accepted as final output from the central coordination system. Args: message: Message containing agent result min_confidence: Minimum confidence threshold (0.0 to 1.0) Returns: True if result was accepted, False if rejected """ if message.message_type != MessageType.STATUS_UPDATE: return False content = message.content confidence = content.get("confidence", 0.0) depth_ratio = content.get("position_info", {}).get("depth_ratio", 0.0) agent_type = content.get("agent_type", "") # Only synthesis agents can produce final output if agent_type != "synthesis": return False # Synthesis agents should be deep in the helix (>0.7) with high confidence if depth_ratio >= 0.7 and confidence >= min_confidence: # Accept the result - add to processed messages self._processed_messages.append(message) self._total_messages_processed += 1 return True else: # Reject the result return False # Memory Integration Methods (Priority 5: Memory and Context Persistence) def store_agent_result_as_knowledge(self, agent_id: str, content: str, confidence: float, domain: str = "general", tags: Optional[List[str]] = None) -> bool: """ Store agent result as knowledge in the persistent knowledge base. Args: agent_id: ID of the agent producing the result content: Content of the result to store confidence: Confidence level of the result (0.0 to 1.0) domain: Domain/category for the knowledge tags: Optional tags for the knowledge entry Returns: True if knowledge was stored successfully, False otherwise """ if not self._memory_enabled or not self.knowledge_store: return False try: # Convert confidence to ConfidenceLevel enum if confidence >= 0.8: confidence_level = ConfidenceLevel.HIGH elif confidence >= 0.6: confidence_level = ConfidenceLevel.MEDIUM else: confidence_level = ConfidenceLevel.LOW # Store in knowledge base using correct method signature entry_id = self.knowledge_store.store_knowledge( knowledge_type=KnowledgeType.TASK_RESULT, content={"result": content, "confidence": confidence}, confidence_level=confidence_level, source_agent=agent_id, domain=domain, tags=tags ) return entry_id is not None except Exception as e: logger.error(f"Failed to store knowledge from agent {agent_id}: {e}") return False def retrieve_relevant_knowledge(self, domain: Optional[str] = None, knowledge_type: Optional[KnowledgeType] = None, keywords: Optional[List[str]] = None, min_confidence: Optional[ConfidenceLevel] = None, limit: int = 10) -> List[KnowledgeEntry]: """ Retrieve relevant knowledge from the knowledge base. Args: domain: Filter by domain knowledge_type: Filter by knowledge type keywords: Keywords to search for min_confidence: Minimum confidence level limit: Maximum number of entries to return Returns: List of relevant knowledge entries """ if not self._memory_enabled or not self.knowledge_store: return [] try: from memory.knowledge_store import KnowledgeQuery query = KnowledgeQuery( knowledge_types=[knowledge_type] if knowledge_type else None, domains=[domain] if domain else None, content_keywords=keywords, min_confidence=min_confidence, limit=limit ) return self.knowledge_store.retrieve_knowledge(query) except Exception as e: logger.error(f"Failed to retrieve knowledge: {e}") return [] def get_task_strategy_recommendations(self, task_description: str, task_type: str = "general", complexity: str = "MODERATE") -> Dict[str, Any]: """ Get strategy recommendations based on task memory. Args: task_description: Description of the task task_type: Type of task (e.g., "research", "analysis", "synthesis") complexity: Task complexity level ("SIMPLE", "MODERATE", "COMPLEX", "VERY_COMPLEX") Returns: Dictionary containing strategy recommendations """ if not self._memory_enabled or not self.task_memory: return {} try: from memory.task_memory import TaskComplexity # Convert string complexity to enum complexity_enum = TaskComplexity.MODERATE if complexity.upper() == "SIMPLE": complexity_enum = TaskComplexity.SIMPLE elif complexity.upper() == "COMPLEX": complexity_enum = TaskComplexity.COMPLEX elif complexity.upper() == "VERY_COMPLEX": complexity_enum = TaskComplexity.VERY_COMPLEX return self.task_memory.recommend_strategy( task_description=task_description, task_type=task_type, complexity=complexity_enum ) except Exception as e: logger.error(f"Failed to get strategy recommendations: {e}") return {} def compress_large_context(self, context: str, strategy: CompressionStrategy = CompressionStrategy.EXTRACTIVE_SUMMARY, target_size: Optional[int] = None): """ Compress large context using the context compression system. Args: context: Content to compress strategy: Compression strategy to use target_size: Optional target size for compression Returns: CompressedContext object or None if compression failed """ if not self._memory_enabled or not self.context_compressor: return None try: # Convert string context to dict format expected by compressor context_dict = {"main_content": context} return self.context_compressor.compress_context( context=context_dict, target_size=target_size, strategy=strategy ) except Exception as e: logger.error(f"Failed to compress context: {e}") return None def get_memory_summary(self) -> Dict[str, Any]: """ Get summary of memory system status and contents. Returns: Dictionary with memory system summary """ if not self._memory_enabled: return { "knowledge_entries": 0, "task_patterns": 0, "memory_enabled": False } try: summary: Dict[str, Any] = {"memory_enabled": True} if self.knowledge_store: # Get knowledge entry count using proper query from memory.knowledge_store import KnowledgeQuery query = KnowledgeQuery(limit=1000) all_knowledge = self.knowledge_store.retrieve_knowledge(query) summary["knowledge_entries"] = len(all_knowledge) # Get domain breakdown domains: Dict[str, int] = {} for entry in all_knowledge: domains[entry.domain] = domains.get(entry.domain, 0) + 1 summary["knowledge_by_domain"] = domains else: summary["knowledge_entries"] = 0 summary["knowledge_by_domain"] = {} if self.task_memory: # Get task pattern count and summary memory_summary = self.task_memory.get_memory_summary() summary["task_patterns"] = memory_summary.get("total_patterns", 0) summary["task_executions"] = memory_summary.get("total_executions", 0) # Handle success rate calculation from outcome distribution outcome_dist = memory_summary.get("outcome_distribution", {}) total_executions = sum(outcome_dist.values()) if outcome_dist else 0 if total_executions > 0: successful_outcomes = outcome_dist.get("success", 0) + outcome_dist.get("partial_success", 0) summary["success_rate"] = successful_outcomes / total_executions else: summary["success_rate"] = 0.0 # Get top task types summary["top_task_types"] = memory_summary.get("top_task_types", {}) summary["success_by_complexity"] = memory_summary.get("success_by_complexity", {}) else: summary["task_patterns"] = 0 summary["task_executions"] = 0 summary["success_rate"] = 0.0 summary["top_task_types"] = {} summary["success_by_complexity"] = {} return summary except Exception as e: logger.error(f"Failed to get memory summary: {e}") return { "knowledge_entries": 0, "task_patterns": 0, "memory_enabled": True, "error": str(e) } class AgentFactory: """ Factory for creating agents dynamically based on task needs. The AgentFactory allows the central post to spawn new agents as needed during the helix processing, enabling emergent behavior and adaptive team composition. """ def __init__(self, helix: "HelixGeometry", llm_client: "LMStudioClient", token_budget_manager: Optional["TokenBudgetManager"] = None, random_seed: Optional[int] = None, enable_dynamic_spawning: bool = True, max_agents: int = 15, token_budget_limit: int = 10000): """ Initialize the agent factory. Args: helix: Helix geometry for new agents llm_client: LM Studio client for new agents token_budget_manager: Optional token budget manager random_seed: Seed for random spawn time generation enable_dynamic_spawning: Enable intelligent agent spawning max_agents: Maximum number of agents for dynamic spawning token_budget_limit: Token budget limit for dynamic spawning """ self.helix = helix self.llm_client = llm_client self.token_budget_manager = token_budget_manager self.random_seed = random_seed self._agent_counter = 0 self.enable_dynamic_spawning = enable_dynamic_spawning # Initialize dynamic spawning system if enabled if enable_dynamic_spawning: # Import here to avoid circular imports from agents.dynamic_spawning import DynamicSpawning self.dynamic_spawner = DynamicSpawning( agent_factory=self, confidence_threshold=0.7, max_agents=max_agents, token_budget_limit=token_budget_limit ) else: self.dynamic_spawner = None if random_seed is not None: random.seed(random_seed) def create_research_agent(self, domain: str = "general", spawn_time_range: Tuple[float, float] = (0.0, 0.3)) -> "LLMAgent": """Create a research agent with random spawn time in specified range.""" from agents.specialized_agents import ResearchAgent spawn_time = random.uniform(*spawn_time_range) agent_id = f"dynamic_research_{self._agent_counter:03d}" self._agent_counter += 1 return ResearchAgent( agent_id=agent_id, spawn_time=spawn_time, helix=self.helix, llm_client=self.llm_client, research_domain=domain, token_budget_manager=self.token_budget_manager, max_tokens=800 ) def create_analysis_agent(self, analysis_type: str = "general", spawn_time_range: Tuple[float, float] = (0.2, 0.7)) -> "LLMAgent": """Create an analysis agent with random spawn time in specified range.""" from agents.specialized_agents import AnalysisAgent spawn_time = random.uniform(*spawn_time_range) agent_id = f"dynamic_analysis_{self._agent_counter:03d}" self._agent_counter += 1 return AnalysisAgent( agent_id=agent_id, spawn_time=spawn_time, helix=self.helix, llm_client=self.llm_client, analysis_type=analysis_type, token_budget_manager=self.token_budget_manager, max_tokens=800 ) def create_critic_agent(self, review_focus: str = "general", spawn_time_range: Tuple[float, float] = (0.5, 0.8)) -> "LLMAgent": """Create a critic agent with random spawn time in specified range.""" from agents.specialized_agents import CriticAgent spawn_time = random.uniform(*spawn_time_range) agent_id = f"dynamic_critic_{self._agent_counter:03d}" self._agent_counter += 1 return CriticAgent( agent_id=agent_id, spawn_time=spawn_time, helix=self.helix, llm_client=self.llm_client, review_focus=review_focus, token_budget_manager=self.token_budget_manager, max_tokens=800 ) def create_synthesis_agent(self, output_format: str = "general", spawn_time_range: Tuple[float, float] = (0.7, 0.95)) -> "LLMAgent": """Create a synthesis agent with random spawn time in specified range.""" from agents.specialized_agents import SynthesisAgent spawn_time = random.uniform(*spawn_time_range) agent_id = f"dynamic_synthesis_{self._agent_counter:03d}" self._agent_counter += 1 return SynthesisAgent( agent_id=agent_id, spawn_time=spawn_time, helix=self.helix, llm_client=self.llm_client, output_format=output_format, token_budget_manager=self.token_budget_manager, max_tokens=1200 # Increased for comprehensive blog posts ) def assess_team_needs(self, processed_messages: List[Message], current_time: float, current_agents: Optional[List["LLMAgent"]] = None) -> List["LLMAgent"]: """ Assess current team composition and suggest new agents if needed. Enhanced with DynamicSpawning system that provides: - Confidence monitoring with trend analysis - Content analysis for contradictions and gaps - Team size optimization based on task complexity - Resource-aware spawning decisions Falls back to basic heuristics if dynamic spawning is disabled. Args: processed_messages: Messages processed so far current_time: Current simulation time current_agents: List of currently active agents Returns: List of recommended new agents to spawn """ # Use dynamic spawning if enabled and available if self.enable_dynamic_spawning and self.dynamic_spawner: return self.dynamic_spawner.analyze_and_spawn( processed_messages, current_agents or [], current_time ) # Fallback to basic heuristics for backward compatibility return self._assess_team_needs_basic(processed_messages, current_time) def _assess_team_needs_basic(self, processed_messages: List[Message], current_time: float) -> List["LLMAgent"]: """ Basic team assessment for backward compatibility. This implements simple heuristics when dynamic spawning is disabled. """ recommended_agents = [] if not processed_messages: return recommended_agents # Analyze recent messages for patterns recent_messages = [msg for msg in processed_messages if msg.timestamp > current_time - 0.2] # Last 0.2 time units if not recent_messages: return recommended_agents # Check for consistent low confidence low_confidence_count = sum(1 for msg in recent_messages if msg.content.get("confidence", 1.0) < 0.6) if low_confidence_count >= 2: # Spawn critic agent to improve quality critic = self.create_critic_agent( review_focus="quality_improvement", spawn_time_range=(current_time + 0.1, current_time + 0.3) ) recommended_agents.append(critic) # Check for gaps in research domains research_domains = set() for msg in recent_messages: if "research_domain" in msg.content: research_domains.add(msg.content["research_domain"]) # If only general research, add technical research if len(research_domains) == 1 and "general" in research_domains: technical_research = self.create_research_agent( domain="technical", spawn_time_range=(current_time + 0.05, current_time + 0.2) ) recommended_agents.append(technical_research) # Check for need for alternative synthesis synthesis_count = sum(1 for msg in recent_messages if msg.content.get("agent_type") == "synthesis") if synthesis_count == 0 and current_time > 0.6: # Late in process but no synthesis yet synthesis = self.create_synthesis_agent( output_format="comprehensive", spawn_time_range=(current_time + 0.1, current_time + 0.25) ) recommended_agents.append(synthesis) return recommended_agents def get_spawning_summary(self) -> Dict[str, Any]: """ Get summary of dynamic spawning activity. Returns: Dictionary with spawning statistics and activity """ if self.enable_dynamic_spawning and self.dynamic_spawner: return self.dynamic_spawner.get_spawning_summary() else: return { "dynamic_spawning_enabled": False, "total_spawns": 0, "spawns_by_type": {}, "average_priority": 0.0, "spawning_reasons": [] }