Spaces:
Paused
Paused
| """ | |
| Central coordination system for the Felix Framework. | |
| The central post manages communication and coordination between agents, | |
| implementing the hub of the spoke-based communication model from thefelix.md. | |
| Mathematical Foundation: | |
| - Spoke communication: O(N) message complexity vs O(N²) mesh topology | |
| - Maximum communication distance: R_top (helix outer radius) | |
| - Performance metrics for Hypothesis H2 validation and statistical analysis | |
| Key Features: | |
| - Agent registration and connection management | |
| - FIFO message queuing with guaranteed ordering | |
| - Performance metrics collection (throughput, latency, overhead ratios) | |
| - Scalability up to 133 agents (matching OpenSCAD model parameters) | |
| Mathematical references: | |
| - docs/mathematical_model.md, Section 5: Spoke geometry and communication complexity | |
| - docs/hypothesis_mathematics.md, Section H2: Communication overhead analysis and proofs | |
| - Theoretical proof of O(N) vs O(N²) scaling advantage in hypothesis documentation | |
| Implementation supports rigorous testing of Hypothesis H2 communication efficiency claims. | |
| """ | |
| import time | |
| import uuid | |
| import random | |
| import logging | |
| from enum import Enum | |
| from typing import Dict, List, Optional, Any, Tuple, TYPE_CHECKING | |
| from dataclasses import dataclass, field | |
| from collections import deque | |
| from queue import Queue, Empty | |
| import asyncio | |
| # Memory system imports | |
| from src.memory.knowledge_store import KnowledgeStore, KnowledgeEntry, KnowledgeType, ConfidenceLevel | |
| from src.memory.task_memory import TaskMemory, TaskPattern, TaskOutcome | |
| from src.memory.context_compression import ContextCompressor, CompressionStrategy | |
| # Dynamic spawning imports - moved to avoid circular imports | |
| if TYPE_CHECKING: | |
| from src.agents.llm_agent import LLMAgent | |
| from src.core.helix_geometry import HelixGeometry | |
| from src.llm.lm_studio_client import LMStudioClient | |
| from src.llm.token_budget import TokenBudgetManager | |
| # Set up logging | |
| logger = logging.getLogger(__name__) | |
| class MessageType(Enum): | |
| """Types of messages in the communication system.""" | |
| TASK_REQUEST = "task_request" | |
| TASK_ASSIGNMENT = "task_assignment" | |
| STATUS_UPDATE = "status_update" | |
| TASK_COMPLETE = "task_complete" | |
| ERROR_REPORT = "error_report" | |
| class Message: | |
| """Message structure for communication between agents and central post.""" | |
| sender_id: str | |
| message_type: MessageType | |
| content: Dict[str, Any] | |
| timestamp: float | |
| message_id: str = field(default_factory=lambda: str(uuid.uuid4())) | |
| class CentralPost: | |
| """ | |
| Central coordination system managing all agent communication. | |
| The central post acts as the hub in the spoke-based communication model, | |
| processing messages from agents and coordinating task assignments. | |
| """ | |
| def __init__(self, max_agents: int = 133, enable_metrics: bool = False, | |
| enable_memory: bool = True, memory_db_path: str = "felix_memory.db"): | |
| """ | |
| Initialize central post with configuration parameters. | |
| Args: | |
| max_agents: Maximum number of concurrent agent connections | |
| enable_metrics: Whether to collect performance metrics | |
| enable_memory: Whether to enable persistent memory systems | |
| memory_db_path: Path to the memory database file | |
| """ | |
| self.max_agents = max_agents | |
| self.enable_metrics = enable_metrics | |
| self.enable_memory = enable_memory | |
| # Connection management | |
| self._registered_agents: Dict[str, str] = {} # agent_id -> connection_id | |
| self._connection_times: Dict[str, float] = {} # agent_id -> registration_time | |
| # Message processing (sync and async) | |
| self._message_queue: Queue = Queue() | |
| self._async_message_queue: Optional[asyncio.Queue] = None # Lazy initialization | |
| self._processed_messages: List[Message] = [] | |
| self._async_processors: List[asyncio.Task] = [] | |
| # Performance metrics (for Hypothesis H2) | |
| self._metrics_enabled = enable_metrics | |
| self._start_time = time.time() | |
| self._total_messages_processed = 0 | |
| self._processing_times: List[float] = [] | |
| self._overhead_ratios: List[float] = [] | |
| self._scaling_metrics: Dict[int, float] = {} | |
| # Memory systems (Priority 5: Memory and Context Persistence) | |
| self._memory_enabled = enable_memory | |
| if enable_memory: | |
| self.knowledge_store = KnowledgeStore(memory_db_path) | |
| self.task_memory = TaskMemory(memory_db_path) | |
| self.context_compressor = ContextCompressor() | |
| else: | |
| self.knowledge_store = None | |
| self.task_memory = None | |
| self.context_compressor = None | |
| # System state | |
| self._is_active = True | |
| def active_connections(self) -> int: | |
| """Get number of currently registered agents.""" | |
| return len(self._registered_agents) | |
| def message_queue_size(self) -> int: | |
| """Get number of pending messages in queue.""" | |
| return self._message_queue.qsize() | |
| def is_active(self) -> bool: | |
| """Check if central post is active and accepting connections.""" | |
| return self._is_active | |
| def total_messages_processed(self) -> int: | |
| """Get total number of messages processed.""" | |
| return self._total_messages_processed | |
| def register_agent(self, agent) -> str: | |
| """ | |
| Register an agent with the central post. | |
| Args: | |
| agent: Agent instance to register | |
| Returns: | |
| Connection ID for the registered agent | |
| Raises: | |
| ValueError: If maximum connections exceeded or agent already registered | |
| """ | |
| if self.active_connections >= self.max_agents: | |
| raise ValueError("Maximum agent connections exceeded") | |
| if agent.agent_id in self._registered_agents: | |
| raise ValueError(f"Agent {agent.agent_id} already registered") | |
| # Create unique connection ID | |
| connection_id = str(uuid.uuid4()) | |
| # Register agent | |
| self._registered_agents[agent.agent_id] = connection_id | |
| self._connection_times[agent.agent_id] = time.time() | |
| return connection_id | |
| def deregister_agent(self, agent_id: str) -> bool: | |
| """ | |
| Deregister an agent from the central post. | |
| Args: | |
| agent_id: ID of agent to deregister | |
| Returns: | |
| True if successfully deregistered, False if not found | |
| """ | |
| if agent_id not in self._registered_agents: | |
| return False | |
| # Remove agent registration | |
| del self._registered_agents[agent_id] | |
| del self._connection_times[agent_id] | |
| return True | |
| def is_agent_registered(self, agent_id: str) -> bool: | |
| """ | |
| Check if an agent is currently registered. | |
| Args: | |
| agent_id: ID of agent to check | |
| Returns: | |
| True if agent is registered, False otherwise | |
| """ | |
| return agent_id in self._registered_agents | |
| async def _ensure_async_queue(self) -> asyncio.Queue: | |
| """Ensure async message queue is initialized.""" | |
| if self._async_message_queue is None: | |
| self._async_message_queue = asyncio.Queue(maxsize=1000) | |
| return self._async_message_queue | |
| def queue_message(self, message: Message) -> str: | |
| """ | |
| Queue a message for processing (sync). | |
| Args: | |
| message: Message to queue | |
| Returns: | |
| Message ID for tracking | |
| """ | |
| if not self._is_active: | |
| raise RuntimeError("Central post is not active") | |
| # Validate sender is registered | |
| if message.sender_id != "central_post" and message.sender_id not in self._registered_agents: | |
| raise ValueError(f"Message from unregistered agent: {message.sender_id}") | |
| # Queue message | |
| self._message_queue.put(message) | |
| return message.message_id | |
| async def queue_message_async(self, message: Message) -> str: | |
| """ | |
| Queue a message for async processing. | |
| Args: | |
| message: Message to queue | |
| Returns: | |
| Message ID for tracking | |
| """ | |
| if not self._is_active: | |
| raise RuntimeError("Central post is not active") | |
| # Validate sender is registered | |
| if message.sender_id != "central_post" and message.sender_id not in self._registered_agents: | |
| raise ValueError(f"Message from unregistered agent: {message.sender_id}") | |
| # Queue message asynchronously | |
| async_queue = await self._ensure_async_queue() | |
| await async_queue.put(message) | |
| return message.message_id | |
| def has_pending_messages(self) -> bool: | |
| """ | |
| Check if there are messages waiting to be processed. | |
| Returns: | |
| True if messages are pending, False otherwise | |
| """ | |
| return not self._message_queue.empty() | |
| def process_next_message(self) -> Optional[Message]: | |
| """ | |
| Process the next message in the queue (FIFO order). | |
| Returns: | |
| Processed message, or None if queue is empty | |
| """ | |
| try: | |
| # Get next message | |
| start_time = time.time() if self._metrics_enabled else None | |
| message = self._message_queue.get_nowait() | |
| # Process message (placeholder - actual processing depends on message type) | |
| self._handle_message(message) | |
| # Record metrics | |
| if self._metrics_enabled and start_time: | |
| processing_time = time.time() - start_time | |
| self._processing_times.append(processing_time) | |
| # Track processed message | |
| self._processed_messages.append(message) | |
| self._total_messages_processed += 1 | |
| return message | |
| except Empty: | |
| return None | |
| async def process_next_message_async(self) -> Optional[Message]: | |
| """ | |
| Process the next message in the async queue (FIFO order). | |
| Returns: | |
| Processed message, or None if queue is empty | |
| """ | |
| try: | |
| async_queue = await self._ensure_async_queue() | |
| # Try to get message without blocking | |
| try: | |
| message = async_queue.get_nowait() | |
| except asyncio.QueueEmpty: | |
| return None | |
| # Get next message | |
| start_time = time.time() if self._metrics_enabled else None | |
| # Process message asynchronously | |
| await self._handle_message_async(message) | |
| # Record metrics | |
| if self._metrics_enabled and start_time: | |
| processing_time = time.time() - start_time | |
| self._processing_times.append(processing_time) | |
| # Track processed message | |
| self._processed_messages.append(message) | |
| self._total_messages_processed += 1 | |
| return message | |
| except Exception as e: | |
| logger.error(f"Async message processing failed: {e}") | |
| return None | |
| def _handle_message(self, message: Message) -> None: | |
| """ | |
| Handle specific message types (internal processing). | |
| Args: | |
| message: Message to handle | |
| """ | |
| # Message type-specific handling | |
| if message.message_type == MessageType.TASK_REQUEST: | |
| self._handle_task_request(message) | |
| elif message.message_type == MessageType.STATUS_UPDATE: | |
| self._handle_status_update(message) | |
| elif message.message_type == MessageType.TASK_COMPLETE: | |
| self._handle_task_completion(message) | |
| elif message.message_type == MessageType.ERROR_REPORT: | |
| self._handle_error_report(message) | |
| # Add more handlers as needed | |
| async def _handle_message_async(self, message: Message) -> None: | |
| """ | |
| Handle specific message types asynchronously (internal processing). | |
| Args: | |
| message: Message to handle | |
| """ | |
| # Message type-specific async handling | |
| if message.message_type == MessageType.TASK_REQUEST: | |
| await self._handle_task_request_async(message) | |
| elif message.message_type == MessageType.STATUS_UPDATE: | |
| await self._handle_status_update_async(message) | |
| elif message.message_type == MessageType.TASK_COMPLETE: | |
| await self._handle_task_completion_async(message) | |
| elif message.message_type == MessageType.ERROR_REPORT: | |
| await self._handle_error_report_async(message) | |
| # Add more handlers as needed | |
| def _handle_task_request(self, message: Message) -> None: | |
| """Handle task request from agent.""" | |
| # Placeholder for task assignment logic | |
| pass | |
| def _handle_status_update(self, message: Message) -> None: | |
| """Handle status update from agent.""" | |
| # Placeholder for status tracking logic | |
| pass | |
| def _handle_task_completion(self, message: Message) -> None: | |
| """Handle task completion notification.""" | |
| # Placeholder for completion processing logic | |
| pass | |
| def _handle_error_report(self, message: Message) -> None: | |
| """Handle error report from agent.""" | |
| # Placeholder for error handling logic | |
| pass | |
| # Async message handlers | |
| async def _handle_task_request_async(self, message: Message) -> None: | |
| """Handle task request from agent asynchronously.""" | |
| # Async task assignment logic | |
| pass | |
| async def _handle_status_update_async(self, message: Message) -> None: | |
| """Handle status update from agent asynchronously.""" | |
| # Async status tracking logic | |
| pass | |
| async def _handle_task_completion_async(self, message: Message) -> None: | |
| """Handle task completion notification asynchronously.""" | |
| # Async completion processing logic | |
| pass | |
| async def _handle_error_report_async(self, message: Message) -> None: | |
| """Handle error report from agent asynchronously.""" | |
| # Async error handling logic | |
| pass | |
| # Performance metrics methods (for Hypothesis H2) | |
| def get_current_time(self) -> float: | |
| """Get current timestamp for performance measurements.""" | |
| return time.time() | |
| def get_message_throughput(self) -> float: | |
| """ | |
| Calculate message processing throughput. | |
| Returns: | |
| Messages processed per second | |
| """ | |
| if not self._metrics_enabled or self._total_messages_processed == 0: | |
| return 0.0 | |
| elapsed_time = time.time() - self._start_time | |
| if elapsed_time == 0: | |
| return 0.0 | |
| return self._total_messages_processed / elapsed_time | |
| def measure_communication_overhead(self, num_messages: int, processing_time: float) -> float: | |
| """ | |
| Measure communication overhead vs processing time. | |
| Args: | |
| num_messages: Number of messages in the measurement | |
| processing_time: Actual processing time for comparison | |
| Returns: | |
| Communication overhead time | |
| """ | |
| if not self._metrics_enabled: | |
| return 0.0 | |
| # Simulate communication overhead calculation | |
| if self._processing_times: | |
| avg_msg_time = sum(self._processing_times) / len(self._processing_times) | |
| communication_overhead = avg_msg_time * num_messages | |
| return communication_overhead | |
| return 0.0 | |
| def record_overhead_ratio(self, overhead_ratio: float) -> None: | |
| """ | |
| Record overhead ratio for hypothesis validation. | |
| Args: | |
| overhead_ratio: Communication overhead / processing time ratio | |
| """ | |
| if self._metrics_enabled: | |
| self._overhead_ratios.append(overhead_ratio) | |
| def get_average_overhead_ratio(self) -> float: | |
| """ | |
| Get average overhead ratio across all measurements. | |
| Returns: | |
| Average overhead ratio | |
| """ | |
| if not self._overhead_ratios: | |
| return 0.0 | |
| return sum(self._overhead_ratios) / len(self._overhead_ratios) | |
| def record_scaling_metric(self, agent_count: int, processing_time: float) -> None: | |
| """ | |
| Record scaling performance metric. | |
| Args: | |
| agent_count: Number of agents in the test | |
| processing_time: Time to process messages from all agents | |
| """ | |
| if self._metrics_enabled: | |
| self._scaling_metrics[agent_count] = processing_time | |
| def get_scaling_metrics(self) -> Dict[int, float]: | |
| """ | |
| Get scaling performance metrics. | |
| Returns: | |
| Dictionary mapping agent count to processing time | |
| """ | |
| return self._scaling_metrics.copy() | |
| async def start_async_processing(self, max_concurrent_processors: int = 3) -> None: | |
| """Start async message processors.""" | |
| for i in range(max_concurrent_processors): | |
| processor = asyncio.create_task(self._async_message_processor(f"processor_{i}")) | |
| self._async_processors.append(processor) | |
| async def _async_message_processor(self, processor_id: str) -> None: | |
| """Individual async message processor.""" | |
| while self._is_active: | |
| try: | |
| message = await self.process_next_message_async() | |
| if message is None: | |
| # No messages to process, wait briefly | |
| await asyncio.sleep(0.01) | |
| continue | |
| logger.debug(f"Processor {processor_id} handled message {message.message_id}") | |
| except Exception as e: | |
| logger.error(f"Async processor {processor_id} error: {e}") | |
| await asyncio.sleep(0.1) # Brief recovery delay | |
| def shutdown(self) -> None: | |
| """Shutdown the central post and disconnect all agents.""" | |
| self._is_active = False | |
| # Clear all connections | |
| self._registered_agents.clear() | |
| self._connection_times.clear() | |
| # Clear message queue | |
| while not self._message_queue.empty(): | |
| try: | |
| self._message_queue.get_nowait() | |
| except Empty: | |
| break | |
| async def shutdown_async(self) -> None: | |
| """Shutdown async components.""" | |
| self._is_active = False | |
| # Cancel async processors | |
| for processor in self._async_processors: | |
| processor.cancel() | |
| # Wait for processors to finish | |
| if self._async_processors: | |
| await asyncio.gather(*self._async_processors, return_exceptions=True) | |
| self._async_processors.clear() | |
| def get_performance_summary(self) -> Dict[str, Any]: | |
| """ | |
| Get comprehensive performance summary for analysis. | |
| Returns: | |
| Dictionary containing all performance metrics | |
| """ | |
| if not self._metrics_enabled: | |
| return {"metrics_enabled": False} | |
| return { | |
| "metrics_enabled": True, | |
| "total_messages_processed": self._total_messages_processed, | |
| "message_throughput": self.get_message_throughput(), | |
| "average_overhead_ratio": self.get_average_overhead_ratio(), | |
| "scaling_metrics": self.get_scaling_metrics(), | |
| "active_connections": self.active_connections, | |
| "uptime": time.time() - self._start_time, | |
| "async_processors": len(self._async_processors), | |
| "async_queue_size": self._async_message_queue.qsize() if self._async_message_queue else 0 | |
| } | |
| def accept_high_confidence_result(self, message: Message, min_confidence: float = 0.8) -> bool: | |
| """ | |
| Accept agent results that meet minimum confidence threshold. | |
| This implements the natural selection aspect of the helix model - | |
| only high-quality results from synthesis agents deep in the helix | |
| are accepted as final output from the central coordination system. | |
| Args: | |
| message: Message containing agent result | |
| min_confidence: Minimum confidence threshold (0.0 to 1.0) | |
| Returns: | |
| True if result was accepted, False if rejected | |
| """ | |
| if message.message_type != MessageType.STATUS_UPDATE: | |
| return False | |
| content = message.content | |
| confidence = content.get("confidence", 0.0) | |
| depth_ratio = content.get("position_info", {}).get("depth_ratio", 0.0) | |
| agent_type = content.get("agent_type", "") | |
| # Only synthesis agents can produce final output | |
| if agent_type != "synthesis": | |
| return False | |
| # Synthesis agents should be deep in the helix (>0.7) with high confidence | |
| if depth_ratio >= 0.7 and confidence >= min_confidence: | |
| # Accept the result - add to processed messages | |
| self._processed_messages.append(message) | |
| self._total_messages_processed += 1 | |
| return True | |
| else: | |
| # Reject the result | |
| return False | |
| # Memory Integration Methods (Priority 5: Memory and Context Persistence) | |
| def store_agent_result_as_knowledge(self, agent_id: str, content: str, | |
| confidence: float, domain: str = "general", | |
| tags: Optional[List[str]] = None) -> bool: | |
| """ | |
| Store agent result as knowledge in the persistent knowledge base. | |
| Args: | |
| agent_id: ID of the agent producing the result | |
| content: Content of the result to store | |
| confidence: Confidence level of the result (0.0 to 1.0) | |
| domain: Domain/category for the knowledge | |
| tags: Optional tags for the knowledge entry | |
| Returns: | |
| True if knowledge was stored successfully, False otherwise | |
| """ | |
| if not self._memory_enabled or not self.knowledge_store: | |
| return False | |
| try: | |
| # Convert confidence to ConfidenceLevel enum | |
| if confidence >= 0.8: | |
| confidence_level = ConfidenceLevel.HIGH | |
| elif confidence >= 0.6: | |
| confidence_level = ConfidenceLevel.MEDIUM | |
| else: | |
| confidence_level = ConfidenceLevel.LOW | |
| # Store in knowledge base using correct method signature | |
| entry_id = self.knowledge_store.store_knowledge( | |
| knowledge_type=KnowledgeType.TASK_RESULT, | |
| content={"result": content, "confidence": confidence}, | |
| confidence_level=confidence_level, | |
| source_agent=agent_id, | |
| domain=domain, | |
| tags=tags | |
| ) | |
| return entry_id is not None | |
| except Exception as e: | |
| logger.error(f"Failed to store knowledge from agent {agent_id}: {e}") | |
| return False | |
| def retrieve_relevant_knowledge(self, domain: Optional[str] = None, | |
| knowledge_type: Optional[KnowledgeType] = None, | |
| keywords: Optional[List[str]] = None, | |
| min_confidence: Optional[ConfidenceLevel] = None, | |
| limit: int = 10) -> List[KnowledgeEntry]: | |
| """ | |
| Retrieve relevant knowledge from the knowledge base. | |
| Args: | |
| domain: Filter by domain | |
| knowledge_type: Filter by knowledge type | |
| keywords: Keywords to search for | |
| min_confidence: Minimum confidence level | |
| limit: Maximum number of entries to return | |
| Returns: | |
| List of relevant knowledge entries | |
| """ | |
| if not self._memory_enabled or not self.knowledge_store: | |
| return [] | |
| try: | |
| from memory.knowledge_store import KnowledgeQuery | |
| query = KnowledgeQuery( | |
| knowledge_types=[knowledge_type] if knowledge_type else None, | |
| domains=[domain] if domain else None, | |
| content_keywords=keywords, | |
| min_confidence=min_confidence, | |
| limit=limit | |
| ) | |
| return self.knowledge_store.retrieve_knowledge(query) | |
| except Exception as e: | |
| logger.error(f"Failed to retrieve knowledge: {e}") | |
| return [] | |
| def get_task_strategy_recommendations(self, task_description: str, | |
| task_type: str = "general", | |
| complexity: str = "MODERATE") -> Dict[str, Any]: | |
| """ | |
| Get strategy recommendations based on task memory. | |
| Args: | |
| task_description: Description of the task | |
| task_type: Type of task (e.g., "research", "analysis", "synthesis") | |
| complexity: Task complexity level ("SIMPLE", "MODERATE", "COMPLEX", "VERY_COMPLEX") | |
| Returns: | |
| Dictionary containing strategy recommendations | |
| """ | |
| if not self._memory_enabled or not self.task_memory: | |
| return {} | |
| try: | |
| from memory.task_memory import TaskComplexity | |
| # Convert string complexity to enum | |
| complexity_enum = TaskComplexity.MODERATE | |
| if complexity.upper() == "SIMPLE": | |
| complexity_enum = TaskComplexity.SIMPLE | |
| elif complexity.upper() == "COMPLEX": | |
| complexity_enum = TaskComplexity.COMPLEX | |
| elif complexity.upper() == "VERY_COMPLEX": | |
| complexity_enum = TaskComplexity.VERY_COMPLEX | |
| return self.task_memory.recommend_strategy( | |
| task_description=task_description, | |
| task_type=task_type, | |
| complexity=complexity_enum | |
| ) | |
| except Exception as e: | |
| logger.error(f"Failed to get strategy recommendations: {e}") | |
| return {} | |
| def compress_large_context(self, context: str, | |
| strategy: CompressionStrategy = CompressionStrategy.EXTRACTIVE_SUMMARY, | |
| target_size: Optional[int] = None): | |
| """ | |
| Compress large context using the context compression system. | |
| Args: | |
| context: Content to compress | |
| strategy: Compression strategy to use | |
| target_size: Optional target size for compression | |
| Returns: | |
| CompressedContext object or None if compression failed | |
| """ | |
| if not self._memory_enabled or not self.context_compressor: | |
| return None | |
| try: | |
| # Convert string context to dict format expected by compressor | |
| context_dict = {"main_content": context} | |
| return self.context_compressor.compress_context( | |
| context=context_dict, | |
| target_size=target_size, | |
| strategy=strategy | |
| ) | |
| except Exception as e: | |
| logger.error(f"Failed to compress context: {e}") | |
| return None | |
| def get_memory_summary(self) -> Dict[str, Any]: | |
| """ | |
| Get summary of memory system status and contents. | |
| Returns: | |
| Dictionary with memory system summary | |
| """ | |
| if not self._memory_enabled: | |
| return { | |
| "knowledge_entries": 0, | |
| "task_patterns": 0, | |
| "memory_enabled": False | |
| } | |
| try: | |
| summary: Dict[str, Any] = {"memory_enabled": True} | |
| if self.knowledge_store: | |
| # Get knowledge entry count using proper query | |
| from memory.knowledge_store import KnowledgeQuery | |
| query = KnowledgeQuery(limit=1000) | |
| all_knowledge = self.knowledge_store.retrieve_knowledge(query) | |
| summary["knowledge_entries"] = len(all_knowledge) | |
| # Get domain breakdown | |
| domains: Dict[str, int] = {} | |
| for entry in all_knowledge: | |
| domains[entry.domain] = domains.get(entry.domain, 0) + 1 | |
| summary["knowledge_by_domain"] = domains | |
| else: | |
| summary["knowledge_entries"] = 0 | |
| summary["knowledge_by_domain"] = {} | |
| if self.task_memory: | |
| # Get task pattern count and summary | |
| memory_summary = self.task_memory.get_memory_summary() | |
| summary["task_patterns"] = memory_summary.get("total_patterns", 0) | |
| summary["task_executions"] = memory_summary.get("total_executions", 0) | |
| # Handle success rate calculation from outcome distribution | |
| outcome_dist = memory_summary.get("outcome_distribution", {}) | |
| total_executions = sum(outcome_dist.values()) if outcome_dist else 0 | |
| if total_executions > 0: | |
| successful_outcomes = outcome_dist.get("success", 0) + outcome_dist.get("partial_success", 0) | |
| summary["success_rate"] = successful_outcomes / total_executions | |
| else: | |
| summary["success_rate"] = 0.0 | |
| # Get top task types | |
| summary["top_task_types"] = memory_summary.get("top_task_types", {}) | |
| summary["success_by_complexity"] = memory_summary.get("success_by_complexity", {}) | |
| else: | |
| summary["task_patterns"] = 0 | |
| summary["task_executions"] = 0 | |
| summary["success_rate"] = 0.0 | |
| summary["top_task_types"] = {} | |
| summary["success_by_complexity"] = {} | |
| return summary | |
| except Exception as e: | |
| logger.error(f"Failed to get memory summary: {e}") | |
| return { | |
| "knowledge_entries": 0, | |
| "task_patterns": 0, | |
| "memory_enabled": True, | |
| "error": str(e) | |
| } | |
| class AgentFactory: | |
| """ | |
| Factory for creating agents dynamically based on task needs. | |
| The AgentFactory allows the central post to spawn new agents | |
| as needed during the helix processing, enabling emergent behavior | |
| and adaptive team composition. | |
| """ | |
| def __init__(self, helix: "HelixGeometry", llm_client: "LMStudioClient", | |
| token_budget_manager: Optional["TokenBudgetManager"] = None, | |
| random_seed: Optional[int] = None, enable_dynamic_spawning: bool = True, | |
| max_agents: int = 15, token_budget_limit: int = 10000): | |
| """ | |
| Initialize the agent factory. | |
| Args: | |
| helix: Helix geometry for new agents | |
| llm_client: LM Studio client for new agents | |
| token_budget_manager: Optional token budget manager | |
| random_seed: Seed for random spawn time generation | |
| enable_dynamic_spawning: Enable intelligent agent spawning | |
| max_agents: Maximum number of agents for dynamic spawning | |
| token_budget_limit: Token budget limit for dynamic spawning | |
| """ | |
| self.helix = helix | |
| self.llm_client = llm_client | |
| self.token_budget_manager = token_budget_manager | |
| self.random_seed = random_seed | |
| self._agent_counter = 0 | |
| self.enable_dynamic_spawning = enable_dynamic_spawning | |
| # Initialize dynamic spawning system if enabled | |
| if enable_dynamic_spawning: | |
| # Import here to avoid circular imports | |
| from agents.dynamic_spawning import DynamicSpawning | |
| self.dynamic_spawner = DynamicSpawning( | |
| agent_factory=self, | |
| confidence_threshold=0.7, | |
| max_agents=max_agents, | |
| token_budget_limit=token_budget_limit | |
| ) | |
| else: | |
| self.dynamic_spawner = None | |
| if random_seed is not None: | |
| random.seed(random_seed) | |
| def create_research_agent(self, domain: str = "general", | |
| spawn_time_range: Tuple[float, float] = (0.0, 0.3)) -> "LLMAgent": | |
| """Create a research agent with random spawn time in specified range.""" | |
| from agents.specialized_agents import ResearchAgent | |
| spawn_time = random.uniform(*spawn_time_range) | |
| agent_id = f"dynamic_research_{self._agent_counter:03d}" | |
| self._agent_counter += 1 | |
| return ResearchAgent( | |
| agent_id=agent_id, | |
| spawn_time=spawn_time, | |
| helix=self.helix, | |
| llm_client=self.llm_client, | |
| research_domain=domain, | |
| token_budget_manager=self.token_budget_manager, | |
| max_tokens=800 | |
| ) | |
| def create_analysis_agent(self, analysis_type: str = "general", | |
| spawn_time_range: Tuple[float, float] = (0.2, 0.7)) -> "LLMAgent": | |
| """Create an analysis agent with random spawn time in specified range.""" | |
| from agents.specialized_agents import AnalysisAgent | |
| spawn_time = random.uniform(*spawn_time_range) | |
| agent_id = f"dynamic_analysis_{self._agent_counter:03d}" | |
| self._agent_counter += 1 | |
| return AnalysisAgent( | |
| agent_id=agent_id, | |
| spawn_time=spawn_time, | |
| helix=self.helix, | |
| llm_client=self.llm_client, | |
| analysis_type=analysis_type, | |
| token_budget_manager=self.token_budget_manager, | |
| max_tokens=800 | |
| ) | |
| def create_critic_agent(self, review_focus: str = "general", | |
| spawn_time_range: Tuple[float, float] = (0.5, 0.8)) -> "LLMAgent": | |
| """Create a critic agent with random spawn time in specified range.""" | |
| from agents.specialized_agents import CriticAgent | |
| spawn_time = random.uniform(*spawn_time_range) | |
| agent_id = f"dynamic_critic_{self._agent_counter:03d}" | |
| self._agent_counter += 1 | |
| return CriticAgent( | |
| agent_id=agent_id, | |
| spawn_time=spawn_time, | |
| helix=self.helix, | |
| llm_client=self.llm_client, | |
| review_focus=review_focus, | |
| token_budget_manager=self.token_budget_manager, | |
| max_tokens=800 | |
| ) | |
| def create_synthesis_agent(self, output_format: str = "general", | |
| spawn_time_range: Tuple[float, float] = (0.7, 0.95)) -> "LLMAgent": | |
| """Create a synthesis agent with random spawn time in specified range.""" | |
| from agents.specialized_agents import SynthesisAgent | |
| spawn_time = random.uniform(*spawn_time_range) | |
| agent_id = f"dynamic_synthesis_{self._agent_counter:03d}" | |
| self._agent_counter += 1 | |
| return SynthesisAgent( | |
| agent_id=agent_id, | |
| spawn_time=spawn_time, | |
| helix=self.helix, | |
| llm_client=self.llm_client, | |
| output_format=output_format, | |
| token_budget_manager=self.token_budget_manager, | |
| max_tokens=1200 # Increased for comprehensive blog posts | |
| ) | |
| def assess_team_needs(self, processed_messages: List[Message], | |
| current_time: float, current_agents: Optional[List["LLMAgent"]] = None) -> List["LLMAgent"]: | |
| """ | |
| Assess current team composition and suggest new agents if needed. | |
| Enhanced with DynamicSpawning system that provides: | |
| - Confidence monitoring with trend analysis | |
| - Content analysis for contradictions and gaps | |
| - Team size optimization based on task complexity | |
| - Resource-aware spawning decisions | |
| Falls back to basic heuristics if dynamic spawning is disabled. | |
| Args: | |
| processed_messages: Messages processed so far | |
| current_time: Current simulation time | |
| current_agents: List of currently active agents | |
| Returns: | |
| List of recommended new agents to spawn | |
| """ | |
| # Use dynamic spawning if enabled and available | |
| if self.enable_dynamic_spawning and self.dynamic_spawner: | |
| return self.dynamic_spawner.analyze_and_spawn( | |
| processed_messages, current_agents or [], current_time | |
| ) | |
| # Fallback to basic heuristics for backward compatibility | |
| return self._assess_team_needs_basic(processed_messages, current_time) | |
| def _assess_team_needs_basic(self, processed_messages: List[Message], | |
| current_time: float) -> List["LLMAgent"]: | |
| """ | |
| Basic team assessment for backward compatibility. | |
| This implements simple heuristics when dynamic spawning is disabled. | |
| """ | |
| recommended_agents = [] | |
| if not processed_messages: | |
| return recommended_agents | |
| # Analyze recent messages for patterns | |
| recent_messages = [msg for msg in processed_messages | |
| if msg.timestamp > current_time - 0.2] # Last 0.2 time units | |
| if not recent_messages: | |
| return recommended_agents | |
| # Check for consistent low confidence | |
| low_confidence_count = sum(1 for msg in recent_messages | |
| if msg.content.get("confidence", 1.0) < 0.6) | |
| if low_confidence_count >= 2: | |
| # Spawn critic agent to improve quality | |
| critic = self.create_critic_agent( | |
| review_focus="quality_improvement", | |
| spawn_time_range=(current_time + 0.1, current_time + 0.3) | |
| ) | |
| recommended_agents.append(critic) | |
| # Check for gaps in research domains | |
| research_domains = set() | |
| for msg in recent_messages: | |
| if "research_domain" in msg.content: | |
| research_domains.add(msg.content["research_domain"]) | |
| # If only general research, add technical research | |
| if len(research_domains) == 1 and "general" in research_domains: | |
| technical_research = self.create_research_agent( | |
| domain="technical", | |
| spawn_time_range=(current_time + 0.05, current_time + 0.2) | |
| ) | |
| recommended_agents.append(technical_research) | |
| # Check for need for alternative synthesis | |
| synthesis_count = sum(1 for msg in recent_messages | |
| if msg.content.get("agent_type") == "synthesis") | |
| if synthesis_count == 0 and current_time > 0.6: | |
| # Late in process but no synthesis yet | |
| synthesis = self.create_synthesis_agent( | |
| output_format="comprehensive", | |
| spawn_time_range=(current_time + 0.1, current_time + 0.25) | |
| ) | |
| recommended_agents.append(synthesis) | |
| return recommended_agents | |
| def get_spawning_summary(self) -> Dict[str, Any]: | |
| """ | |
| Get summary of dynamic spawning activity. | |
| Returns: | |
| Dictionary with spawning statistics and activity | |
| """ | |
| if self.enable_dynamic_spawning and self.dynamic_spawner: | |
| return self.dynamic_spawner.get_spawning_summary() | |
| else: | |
| return { | |
| "dynamic_spawning_enabled": False, | |
| "total_spawns": 0, | |
| "spawns_by_type": {}, | |
| "average_priority": 0.0, | |
| "spawning_reasons": [] | |
| } | |