Spaces:
Paused
Paused
| """ | |
| LLM-powered agent for the Felix Framework. | |
| This module extends the base Agent class with language model capabilities, | |
| enabling agents to process tasks using local LLM inference via LM Studio. | |
| Key Features: | |
| - Integration with LM Studio for local LLM inference | |
| - Position-aware prompt engineering based on helix location | |
| - Adaptive behavior based on geometric constraints | |
| - Communication via spoke system with central coordination | |
| - Built-in task processing and result sharing | |
| The agent's behavior adapts based on its position on the helix: | |
| - Top (wide): Broad exploration, high creativity | |
| - Middle: Focused analysis, balanced processing | |
| - Bottom (narrow): Precise synthesis, low temperature | |
| """ | |
| import time | |
| import asyncio | |
| import logging | |
| from typing import Optional, Dict, Any, List | |
| from dataclasses import dataclass | |
| from agents.agent import Agent, AgentState | |
| from core.helix_geometry import HelixGeometry | |
| from llm.lm_studio_client import LMStudioClient, LLMResponse, RequestPriority | |
| from llm.multi_server_client import LMStudioClientPool | |
| from llm.token_budget import TokenBudgetManager, TokenAllocation | |
| from communication.central_post import Message, MessageType | |
| from pipeline.chunking import ChunkedResult, ProgressiveProcessor, ContentSummarizer | |
| from agents.prompt_optimization import PromptOptimizer, PromptMetrics, PromptContext | |
| logger = logging.getLogger(__name__) | |
| class LLMTask: | |
| """Task for LLM agent processing.""" | |
| task_id: str | |
| description: str | |
| context: str = "" | |
| metadata: Optional[Dict[str, Any]] = None | |
| def __post_init__(self): | |
| if self.metadata is None: | |
| self.metadata = {} | |
| class LLMResult: | |
| """Result from LLM agent processing with chunking support.""" | |
| agent_id: str | |
| task_id: str | |
| content: str | |
| position_info: Dict[str, float] | |
| llm_response: LLMResponse | |
| processing_time: float | |
| timestamp: float | |
| confidence: float = 0.0 # Confidence score (0.0 to 1.0) | |
| processing_stage: int = 1 # Stage number in helix descent | |
| # Chunking support fields | |
| is_chunked: bool = False # Whether this result contains chunked output | |
| chunk_results: Optional[List[ChunkedResult]] = None # List of chunk results if chunked | |
| total_chunks: int = 1 # Total number of chunks (1 for non-chunked) | |
| chunking_strategy: Optional[str] = None # Strategy used for chunking (progressive/streaming) | |
| summary_fallback: Optional[str] = None # Summary if content was truncated | |
| full_content_available: bool = True # Whether full content is available or summarized | |
| def __post_init__(self): | |
| if self.chunk_results is None and self.is_chunked: | |
| self.chunk_results = [] | |
| elif self.chunk_results is None: | |
| self.chunk_results = [] | |
| def get_full_content(self) -> str: | |
| """Get full content, combining chunks if necessary.""" | |
| if not self.is_chunked: | |
| return self.content | |
| if not self.chunk_results: | |
| return self.content | |
| # Combine chunk content in order | |
| sorted_chunks = sorted(self.chunk_results, key=lambda x: x.chunk_index) | |
| combined_content = "".join(chunk.content_chunk for chunk in sorted_chunks) | |
| return combined_content if combined_content else self.content | |
| def get_content_summary(self) -> str: | |
| """Get content summary, preferring summary_fallback if available.""" | |
| if self.summary_fallback and not self.full_content_available: | |
| return self.summary_fallback | |
| content = self.get_full_content() | |
| if len(content) <= 200: | |
| return content | |
| return content[:200] + "..." | |
| def add_chunk(self, chunk: ChunkedResult) -> None: | |
| """Add a chunk result to this LLM result.""" | |
| if not self.is_chunked: | |
| self.is_chunked = True | |
| if self.chunk_results is None: | |
| self.chunk_results = [] | |
| self.chunk_results.append(chunk) | |
| self.total_chunks = len(self.chunk_results) | |
| def is_complete(self) -> bool: | |
| """Check if all chunks are available for a chunked result.""" | |
| if not self.is_chunked: | |
| return True | |
| if not self.chunk_results: | |
| return False | |
| # Check if we have a final chunk | |
| return any(chunk.is_final for chunk in self.chunk_results) | |
| def get_chunking_metadata(self) -> Dict[str, Any]: | |
| """Get metadata about the chunking process.""" | |
| if not self.is_chunked: | |
| return {"chunked": False} | |
| return { | |
| "chunked": True, | |
| "total_chunks": self.total_chunks, | |
| "chunks_available": len(self.chunk_results) if self.chunk_results else 0, | |
| "strategy": self.chunking_strategy, | |
| "complete": self.is_complete(), | |
| "full_content_available": self.full_content_available, | |
| "has_summary_fallback": self.summary_fallback is not None | |
| } | |
| class LLMAgent(Agent): | |
| """ | |
| LLM-powered agent that processes tasks using language models. | |
| Extends the base Agent class with LLM capabilities, providing | |
| position-aware prompt engineering and adaptive behavior based | |
| on the agent's location on the helix. | |
| """ | |
| def __init__(self, agent_id: str, spawn_time: float, helix: HelixGeometry, | |
| llm_client, agent_type: str = "general", | |
| temperature_range: Optional[tuple] = None, max_tokens: Optional[int] = None, | |
| token_budget_manager: Optional[TokenBudgetManager] = None, | |
| prompt_optimizer: Optional[PromptOptimizer] = None): | |
| """ | |
| Initialize LLM agent. | |
| Args: | |
| agent_id: Unique identifier for the agent | |
| spawn_time: Time when agent becomes active (0.0 to 1.0) | |
| helix: Helix geometry for path calculation | |
| llm_client: LM Studio client or client pool for LLM inference | |
| agent_type: Agent specialization (research, analysis, synthesis, critic) | |
| temperature_range: (min, max) temperature based on helix position | |
| token_budget_manager: Optional budget manager for adaptive token allocation | |
| prompt_optimizer: Optional prompt optimization system for learning | |
| """ | |
| super().__init__(agent_id, spawn_time, helix) | |
| self.llm_client = llm_client | |
| self.agent_type = agent_type | |
| # FIXED: Set appropriate defaults based on agent type (aligns with TokenBudgetManager) | |
| if temperature_range is None: | |
| if agent_type == "research": | |
| self.temperature_range = (0.4, 0.9) # High creativity for exploration | |
| elif agent_type == "analysis": | |
| self.temperature_range = (0.2, 0.7) # Balanced for processing | |
| elif agent_type == "synthesis": | |
| self.temperature_range = (0.1, 0.5) # Lower for focused synthesis | |
| elif agent_type == "critic": | |
| self.temperature_range = (0.1, 0.6) # Low-medium for critique | |
| else: | |
| self.temperature_range = (0.1, 0.9) # Default fallback | |
| else: | |
| self.temperature_range = temperature_range | |
| if max_tokens is None: | |
| if agent_type == "research": | |
| self.max_tokens = 200 # Small for bullet points | |
| elif agent_type == "analysis": | |
| self.max_tokens = 400 # Medium for structured analysis | |
| elif agent_type == "synthesis": | |
| self.max_tokens = 1000 # Large for comprehensive output | |
| elif agent_type == "critic": | |
| self.max_tokens = 150 # Small for focused feedback | |
| else: | |
| self.max_tokens = 500 # Default fallback | |
| else: | |
| self.max_tokens = max_tokens | |
| self.token_budget_manager = token_budget_manager | |
| self.prompt_optimizer = prompt_optimizer | |
| # Initialize token budget if manager provided | |
| if self.token_budget_manager: | |
| self.token_budget_manager.initialize_agent_budget(agent_id, agent_type, self.max_tokens) | |
| # LLM-specific state | |
| self.processing_results: List[LLMResult] = [] | |
| self.total_tokens_used = 0 | |
| self.total_processing_time = 0.0 | |
| self.processing_stage = 0 # Current processing stage in helix descent | |
| # Communication state | |
| self.shared_context: Dict[str, Any] = {} | |
| self.received_messages: List[Dict[str, Any]] = [] | |
| # Emergent behavior tracking | |
| self.influenced_by: List[str] = [] # Agent IDs that influenced this agent | |
| self.influence_strength: Dict[str, float] = {} # How much each agent influenced this one | |
| self.collaboration_history: List[Dict[str, Any]] = [] # History of collaborations | |
| def get_position_info(self, current_time: float) -> Dict[str, float]: | |
| """ | |
| Get detailed position information for the agent. | |
| Args: | |
| current_time: Current simulation time | |
| Returns: | |
| Dictionary with position details | |
| """ | |
| position = self.get_position(current_time) | |
| if position is None: | |
| return {} | |
| x, y, z = position | |
| radius = self.helix.get_radius(z) | |
| depth_ratio = z / self.helix.height | |
| return { | |
| "x": x, | |
| "y": y, | |
| "z": z, | |
| "radius": radius, | |
| "depth_ratio": depth_ratio, | |
| "progress": self._progress | |
| } | |
| def get_adaptive_temperature(self, current_time: float) -> float: | |
| """ | |
| Calculate temperature based on helix position. | |
| Higher temperature (more creative) at top of helix, | |
| lower temperature (more focused) at bottom. | |
| Args: | |
| current_time: Current simulation time | |
| Returns: | |
| Temperature value for LLM | |
| """ | |
| position_info = self.get_position_info(current_time) | |
| depth_ratio = position_info.get("depth_ratio", 0.0) | |
| # Invert depth ratio: top (0.0) = high temp, bottom (1.0) = low temp | |
| inverted_ratio = 1.0 - depth_ratio | |
| min_temp, max_temp = self.temperature_range | |
| temperature = min_temp + (max_temp - min_temp) * inverted_ratio | |
| return max(min_temp, min(max_temp, temperature)) | |
| def calculate_confidence(self, current_time: float, content: str, stage: int) -> float: | |
| """ | |
| Calculate confidence score based on agent type, helix position, and content quality. | |
| Agent types have different confidence ranges to ensure proper workflow: | |
| - Research agents: 0.3-0.6 (gather info, don't make final decisions) | |
| - Analysis agents: 0.4-0.7 (process info, prepare for synthesis) | |
| - Synthesis agents: 0.6-0.95 (create final output) | |
| - Critic agents: 0.5-0.8 (provide feedback) | |
| Args: | |
| current_time: Current simulation time | |
| content: Generated content to evaluate | |
| stage: Processing stage number | |
| Returns: | |
| Confidence score (0.0 to 1.0) | |
| """ | |
| position_info = self.get_position_info(current_time) | |
| depth_ratio = position_info.get("depth_ratio", 0.0) | |
| # Base confidence range based on agent type | |
| if self.agent_type == "research": | |
| # Research agents max out at 0.6 - they gather info, don't make final decisions | |
| base_confidence = 0.3 + (depth_ratio * 0.3) # 0.3-0.6 range | |
| max_confidence = 0.6 | |
| elif self.agent_type == "analysis": | |
| # Analysis agents: 0.4-0.7 - process info but don't synthesize | |
| base_confidence = 0.4 + (depth_ratio * 0.3) # 0.4-0.7 range | |
| max_confidence = 0.7 | |
| elif self.agent_type == "synthesis": | |
| # Synthesis agents: 0.6-0.95 - create final comprehensive output | |
| base_confidence = 0.6 + (depth_ratio * 0.35) # 0.6-0.95 range | |
| max_confidence = 0.95 | |
| elif self.agent_type == "critic": | |
| # Critic agents: 0.5-0.8 - provide feedback and validation | |
| base_confidence = 0.5 + (depth_ratio * 0.3) # 0.5-0.8 range | |
| max_confidence = 0.8 | |
| else: | |
| # Default fallback | |
| base_confidence = 0.3 + (depth_ratio * 0.4) | |
| max_confidence = 0.7 | |
| # Content quality bonus (up to 0.1 additional) | |
| content_quality = self._analyze_content_quality(content) | |
| content_bonus = content_quality * 0.1 | |
| # Processing stage bonus (up to 0.05 additional) | |
| stage_bonus = min(stage * 0.005, 0.05) | |
| # Historical consistency bonus (up to 0.05 additional) | |
| consistency_bonus = self._calculate_consistency_bonus() * 0.05 | |
| total_confidence = base_confidence + content_bonus + stage_bonus + consistency_bonus | |
| # Store debug info for potential display | |
| self._last_confidence_breakdown = { | |
| "base_confidence": base_confidence, | |
| "content_bonus": content_bonus, | |
| "stage_bonus": stage_bonus, | |
| "consistency_bonus": consistency_bonus, | |
| "total_before_cap": total_confidence, | |
| "max_confidence": max_confidence, | |
| "final_confidence": min(max(total_confidence, 0.0), max_confidence) | |
| } | |
| return min(max(total_confidence, 0.0), max_confidence) | |
| def _analyze_content_quality(self, content: str) -> float: | |
| """ | |
| Analyze content quality using multiple heuristics. | |
| Args: | |
| content: Content to analyze | |
| Returns: | |
| Quality score (0.0 to 1.0) | |
| """ | |
| if not content or len(content.strip()) == 0: | |
| return 0.0 | |
| content_lower = content.lower() | |
| quality_score = 0.0 | |
| # Length appropriateness (0.25 weight) | |
| content_length = len(content) | |
| if 100 <= content_length <= 2000: | |
| length_score = 1.0 | |
| elif content_length < 100: | |
| length_score = content_length / 100.0 | |
| else: # Very long content | |
| length_score = max(0.3, 2000.0 / content_length) | |
| quality_score += length_score * 0.25 | |
| # Structure indicators (0.25 weight) | |
| structure_indicators = [ | |
| '\n\n' in content, # Paragraphs | |
| '.' in content, # Sentences | |
| any(word in content_lower for word in ['analysis', 'research', 'conclusion', 'summary']), | |
| content.count('.') >= 3, # Multiple sentences | |
| ] | |
| structure_score = sum(structure_indicators) / len(structure_indicators) | |
| quality_score += structure_score * 0.25 | |
| # Content depth indicators (0.25 weight) | |
| depth_indicators = [ | |
| any(word in content_lower for word in ['because', 'therefore', 'however', 'moreover', 'furthermore']), | |
| any(word in content_lower for word in ['data', 'evidence', 'study', 'research', 'analysis']), | |
| any(word in content_lower for word in ['consider', 'suggest', 'indicate', 'demonstrate']), | |
| len(content.split()) > 50, # Substantial word count | |
| ] | |
| depth_score = sum(depth_indicators) / len(depth_indicators) | |
| quality_score += depth_score * 0.25 | |
| # Specificity indicators (0.25 weight) | |
| specificity_indicators = [ | |
| any(char.isdigit() for char in content), # Contains numbers/data | |
| content.count(',') > 2, # Complex sentences with details | |
| any(word in content_lower for word in ['specific', 'particular', 'detail', 'example']), | |
| '"' in content or "'" in content, # Quotes or citations | |
| ] | |
| specificity_score = sum(specificity_indicators) / len(specificity_indicators) | |
| quality_score += specificity_score * 0.25 | |
| return min(quality_score, 1.0) | |
| def _calculate_consistency_bonus(self) -> float: | |
| """ | |
| Calculate consistency bonus based on confidence history stability. | |
| Returns: | |
| Consistency bonus (0.0 to 1.0) | |
| """ | |
| if len(self._confidence_history) < 3: | |
| return 0.5 # Neutral for insufficient data | |
| # Calculate confidence variance (lower variance = more consistent) | |
| recent_confidences = self._confidence_history[-3:] | |
| avg_confidence = sum(recent_confidences) / len(recent_confidences) | |
| variance = sum((c - avg_confidence) ** 2 for c in recent_confidences) / len(recent_confidences) | |
| # Convert variance to consistency bonus (lower variance = higher bonus) | |
| consistency_bonus = max(0.0, 1.0 - (variance * 10)) # Scale variance appropriately | |
| return min(consistency_bonus, 1.0) | |
| def create_position_aware_prompt(self, task: LLMTask, current_time: float) -> tuple[str, int]: | |
| """ | |
| Create system prompt that adapts to agent's helix position with token budget. | |
| Enhanced with prompt optimization that learns from performance metrics. | |
| Args: | |
| task: Task to process | |
| current_time: Current simulation time | |
| Returns: | |
| Tuple of (position-aware system prompt, token budget for this stage) | |
| """ | |
| position_info = self.get_position_info(current_time) | |
| depth_ratio = position_info.get("depth_ratio", 0.0) | |
| # Determine prompt context based on agent type and position | |
| prompt_context = self._get_prompt_context(depth_ratio) | |
| # Get token allocation if budget manager is available | |
| token_allocation = None | |
| stage_token_budget = self.max_tokens # Default fallback | |
| if self.token_budget_manager: | |
| token_allocation = self.token_budget_manager.calculate_stage_allocation( | |
| self.agent_id, depth_ratio, self.processing_stage + 1 | |
| ) | |
| stage_token_budget = token_allocation.stage_budget | |
| # Add shared context from other agents | |
| context_summary = "" | |
| if self.shared_context: | |
| context_summary = "\n\nShared Context from Other Agents:\n" | |
| for key, value in self.shared_context.items(): | |
| context_summary += f"- {key}: {value}\n" | |
| # Generate prompt ID for optimization tracking | |
| prompt_id = f"{self.agent_type}_{prompt_context.value}_stage_{self.processing_stage}" | |
| # Check if we have an optimized prompt available | |
| optimized_prompt = None | |
| if self.prompt_optimizer: | |
| recommendations = self.prompt_optimizer.get_optimization_recommendations(prompt_context) | |
| if recommendations.get("best_prompts"): | |
| best_prompt = recommendations["best_prompts"][0] | |
| if best_prompt[1] > 0.7: # High performance threshold | |
| optimized_prompt = best_prompt[0] | |
| logger.debug(f"Using optimized prompt for {prompt_id} (score: {best_prompt[1]:.3f})") | |
| # Create base system prompt | |
| if optimized_prompt: | |
| base_prompt = optimized_prompt | |
| else: | |
| base_prompt = self.llm_client.create_agent_system_prompt( | |
| agent_type=self.agent_type, | |
| position_info=position_info, | |
| task_context=f"{task.context}{context_summary}" | |
| ) | |
| # Add token budget guidance if available | |
| if token_allocation: | |
| budget_guidance = f"\n\nToken Budget Guidance:\n{token_allocation.style_guidance}" | |
| if token_allocation.compression_ratio > 0.5: | |
| budget_guidance += f"\nCompress previous insights by ~{token_allocation.compression_ratio:.0%} while preserving key points." | |
| enhanced_prompt = base_prompt + budget_guidance | |
| else: | |
| enhanced_prompt = base_prompt | |
| return enhanced_prompt, stage_token_budget | |
| def _get_prompt_context(self, depth_ratio: float) -> PromptContext: | |
| """ | |
| Determine prompt context based on agent type and helix position. | |
| Args: | |
| depth_ratio: Agent's depth ratio on helix (0.0 = top, 1.0 = bottom) | |
| Returns: | |
| PromptContext enum value | |
| """ | |
| if self.agent_type == "research": | |
| return PromptContext.RESEARCH_EARLY if depth_ratio < 0.3 else PromptContext.RESEARCH_MID | |
| elif self.agent_type == "analysis": | |
| return PromptContext.ANALYSIS_MID if depth_ratio < 0.7 else PromptContext.ANALYSIS_LATE | |
| elif self.agent_type == "synthesis": | |
| return PromptContext.SYNTHESIS_LATE | |
| elif self.agent_type == "critic": | |
| return PromptContext.GENERAL # Critics can work at any stage | |
| else: | |
| return PromptContext.GENERAL | |
| def _record_prompt_metrics(self, prompt_text: str, prompt_context: PromptContext, | |
| result: LLMResult) -> None: | |
| """ | |
| Record prompt performance metrics for optimization learning. | |
| Args: | |
| prompt_text: The full system prompt used | |
| prompt_context: Context category for the prompt | |
| result: LLM result containing performance data | |
| """ | |
| if not self.prompt_optimizer: | |
| return | |
| # Calculate token efficiency | |
| tokens_used = getattr(result.llm_response, 'tokens_used', 0) | |
| token_efficiency = min(result.confidence, 0.8) if tokens_used > 0 else 0.0 | |
| # Determine if truncation occurred (approximate) | |
| truncation_occurred = ( | |
| tokens_used >= self.max_tokens * 0.95 or # Used most of token budget | |
| result.llm_response.content.endswith("...") or # Ends with ellipsis | |
| len(result.llm_response.content) < 50 # Very short response | |
| ) | |
| # Create metrics | |
| metrics = PromptMetrics( | |
| output_quality=result.confidence, # Use confidence as proxy for quality | |
| confidence=result.confidence, | |
| completion_time=result.processing_time, | |
| token_efficiency=token_efficiency, | |
| truncation_occurred=truncation_occurred, | |
| context=prompt_context | |
| ) | |
| # Generate prompt ID for tracking | |
| prompt_id = f"{self.agent_type}_{prompt_context.value}_stage_{self.processing_stage}" | |
| # Record metrics | |
| optimization_result = self.prompt_optimizer.record_prompt_execution( | |
| prompt_id, prompt_text, metrics | |
| ) | |
| if optimization_result.get("optimization_triggered"): | |
| logger.info(f"Prompt optimization triggered for {prompt_id}") | |
| async def process_task_with_llm_async(self, task: LLMTask, current_time: float, | |
| priority: RequestPriority = RequestPriority.NORMAL) -> LLMResult: | |
| """ | |
| Asynchronously process task using LLM with position-aware prompting. | |
| Args: | |
| task: Task to process | |
| current_time: Current simulation time | |
| priority: Request priority for LLM processing | |
| Returns: | |
| LLM processing result | |
| """ | |
| start_time = time.perf_counter() | |
| # Get position-aware prompts, token budget, and temperature | |
| system_prompt, stage_token_budget = self.create_position_aware_prompt(task, current_time) | |
| temperature = self.get_adaptive_temperature(current_time) | |
| position_info = self.get_position_info(current_time) | |
| # Ensure stage budget doesn't exceed agent's max_tokens | |
| effective_token_budget = min(stage_token_budget, self.max_tokens) | |
| # Process with LLM using coordinated token budget (ASYNC) | |
| # Use multi-server client pool if available, otherwise use regular client | |
| if isinstance(self.llm_client, LMStudioClientPool): | |
| llm_response = await self.llm_client.complete_for_agent_type( | |
| agent_type=self.agent_type, | |
| agent_id=self.agent_id, | |
| system_prompt=system_prompt, | |
| user_prompt=task.description, | |
| temperature=temperature, | |
| max_tokens=effective_token_budget, | |
| priority=priority | |
| ) | |
| else: | |
| llm_response = await self.llm_client.complete_async( | |
| agent_id=self.agent_id, | |
| system_prompt=system_prompt, | |
| user_prompt=task.description, | |
| temperature=temperature, | |
| max_tokens=effective_token_budget, | |
| priority=priority | |
| ) | |
| end_time = time.perf_counter() | |
| processing_time = end_time - start_time | |
| # Increment processing stage | |
| self.processing_stage += 1 | |
| # Calculate confidence based on position and content | |
| confidence = self.calculate_confidence(current_time, llm_response.content, self.processing_stage) | |
| # Record confidence for adaptive progression | |
| self.record_confidence(confidence) | |
| # Create result | |
| result = LLMResult( | |
| agent_id=self.agent_id, | |
| task_id=task.task_id, | |
| content=llm_response.content, | |
| position_info=position_info, | |
| llm_response=llm_response, | |
| processing_time=processing_time, | |
| timestamp=time.time(), | |
| confidence=confidence, | |
| processing_stage=self.processing_stage | |
| ) | |
| # Record token usage with budget manager | |
| if self.token_budget_manager: | |
| self.token_budget_manager.record_usage(self.agent_id, llm_response.tokens_used) | |
| # Record prompt metrics for optimization | |
| prompt_context = self._get_prompt_context(position_info.get("depth_ratio", 0.0)) | |
| self._record_prompt_metrics(system_prompt, prompt_context, result) | |
| # Update statistics | |
| self.processing_results.append(result) | |
| self.total_tokens_used += llm_response.tokens_used | |
| self.total_processing_time += processing_time | |
| logger.info(f"Agent {self.agent_id} processed task {task.task_id} " | |
| f"at depth {position_info.get('depth_ratio', 0):.2f} " | |
| f"in {processing_time:.2f}s (async)") | |
| return result | |
| def process_task_with_llm(self, task: LLMTask, current_time: float) -> LLMResult: | |
| """ | |
| Process task using LLM with position-aware prompting (sync wrapper). | |
| Args: | |
| task: Task to process | |
| current_time: Current simulation time | |
| Returns: | |
| LLM processing result | |
| """ | |
| start_time = time.perf_counter() | |
| # Get position-aware prompts, token budget, and temperature | |
| system_prompt, stage_token_budget = self.create_position_aware_prompt(task, current_time) | |
| temperature = self.get_adaptive_temperature(current_time) | |
| position_info = self.get_position_info(current_time) | |
| # Ensure stage budget doesn't exceed agent's max_tokens | |
| effective_token_budget = min(stage_token_budget, self.max_tokens) | |
| # Process with LLM using coordinated token budget (SYNC) | |
| # Note: Multi-server client pool requires async, so fall back to first available server | |
| if isinstance(self.llm_client, LMStudioClientPool): | |
| # For sync calls with pool, use the first available client | |
| server_name = self.llm_client.get_server_for_agent_type(self.agent_type) | |
| if server_name and server_name in self.llm_client.clients: | |
| client = self.llm_client.clients[server_name] | |
| llm_response = client.complete( | |
| agent_id=self.agent_id, | |
| system_prompt=system_prompt, | |
| user_prompt=task.description, | |
| temperature=temperature, | |
| max_tokens=effective_token_budget | |
| ) | |
| else: | |
| raise RuntimeError(f"No available server for agent type: {self.agent_type}") | |
| else: | |
| llm_response = self.llm_client.complete( | |
| agent_id=self.agent_id, | |
| system_prompt=system_prompt, | |
| user_prompt=task.description, | |
| temperature=temperature, | |
| max_tokens=effective_token_budget | |
| ) | |
| end_time = time.perf_counter() | |
| processing_time = end_time - start_time | |
| # Increment processing stage | |
| self.processing_stage += 1 | |
| # Calculate confidence based on position and content | |
| confidence = self.calculate_confidence(current_time, llm_response.content, self.processing_stage) | |
| # Record confidence for adaptive progression | |
| self.record_confidence(confidence) | |
| # Create result | |
| result = LLMResult( | |
| agent_id=self.agent_id, | |
| task_id=task.task_id, | |
| content=llm_response.content, | |
| position_info=position_info, | |
| llm_response=llm_response, | |
| processing_time=processing_time, | |
| timestamp=time.time(), | |
| confidence=confidence, | |
| processing_stage=self.processing_stage | |
| ) | |
| # Record token usage with budget manager | |
| if self.token_budget_manager: | |
| self.token_budget_manager.record_usage(self.agent_id, llm_response.tokens_used) | |
| # Record prompt metrics for optimization | |
| prompt_context = self._get_prompt_context(position_info.get("depth_ratio", 0.0)) | |
| self._record_prompt_metrics(system_prompt, prompt_context, result) | |
| # Update statistics | |
| self.processing_results.append(result) | |
| self.total_tokens_used += llm_response.tokens_used | |
| self.total_processing_time += processing_time | |
| logger.info(f"Agent {self.agent_id} processed task {task.task_id} " | |
| f"at depth {position_info.get('depth_ratio', 0):.2f} " | |
| f"in {processing_time:.2f}s") | |
| return result | |
| # Legacy method alias for backward compatibility | |
| async def process_task_async(self, task: LLMTask, current_time: float) -> LLMResult: | |
| """ | |
| Asynchronously process task using LLM (legacy method). | |
| Args: | |
| task: Task to process | |
| current_time: Current simulation time | |
| Returns: | |
| LLM processing result | |
| """ | |
| return await self.process_task_with_llm_async(task, current_time, RequestPriority.NORMAL) | |
| def share_result_to_central(self, result: LLMResult) -> Message: | |
| """ | |
| Create message to share result with central post. | |
| Args: | |
| result: Processing result to share | |
| Returns: | |
| Message for central post communication | |
| """ | |
| # Handle tokens_used for chunked vs non-chunked results | |
| tokens_used = 0 | |
| if result.is_chunked and result.chunk_results: | |
| # Sum tokens from all chunks | |
| tokens_used = sum(chunk.metadata.get("tokens_used", 0) for chunk in result.chunk_results) | |
| elif result.llm_response: | |
| tokens_used = result.llm_response.tokens_used | |
| content_data = { | |
| "type": "AGENT_RESULT", | |
| "agent_id": self.agent_id, | |
| "agent_type": self.agent_type, | |
| "task_id": result.task_id, | |
| "content": result.get_content_summary(), # Use summary for chunked content | |
| "full_content_available": result.full_content_available, | |
| "position_info": result.position_info, | |
| "tokens_used": tokens_used, | |
| "processing_time": result.processing_time, | |
| "confidence": result.confidence, | |
| "processing_stage": result.processing_stage, | |
| "summary": self._create_result_summary(result) | |
| } | |
| # Add chunking metadata if applicable | |
| if result.is_chunked: | |
| content_data["chunking_metadata"] = result.get_chunking_metadata() | |
| content_data["chunking_strategy"] = result.chunking_strategy | |
| # For streaming synthesis, make chunks available to synthesis agents | |
| if result.chunking_strategy == "streaming" and result.chunk_results: | |
| content_data["streaming_chunks"] = [ | |
| { | |
| "chunk_index": chunk.chunk_index, | |
| "aspect": chunk.metadata.get("aspect", f"Section {chunk.chunk_index + 1}"), | |
| "content": chunk.content_chunk, | |
| "is_final": chunk.is_final, | |
| "timestamp": chunk.timestamp | |
| } | |
| for chunk in result.chunk_results | |
| ] | |
| return Message( | |
| sender_id=self.agent_id, | |
| message_type=MessageType.STATUS_UPDATE, | |
| content=content_data, | |
| timestamp=result.timestamp | |
| ) | |
| def _create_result_summary(self, result: LLMResult) -> str: | |
| """Create concise summary of processing result.""" | |
| content_preview = result.content[:100] + "..." if len(result.content) > 100 else result.content | |
| depth = result.position_info.get("depth_ratio", 0.0) | |
| return f"[{self.agent_type.upper()} @ depth {depth:.2f}] {content_preview}" | |
| def receive_shared_context(self, message: Dict[str, Any]) -> None: | |
| """ | |
| Receive and store shared context from other agents. | |
| Args: | |
| message: Shared context message | |
| """ | |
| self.received_messages.append(message) | |
| # Extract relevant context | |
| if message.get("type") == "AGENT_RESULT": | |
| key = f"{message.get('agent_type', 'unknown')}_{message.get('agent_id', '')}" | |
| self.shared_context[key] = message.get("summary", "") | |
| def influence_agent_behavior(self, other_agent: "LLMAgent", influence_type: str, strength: float) -> None: | |
| """ | |
| Influence another agent's behavior based on collaboration. | |
| Args: | |
| other_agent: Agent to influence | |
| influence_type: Type of influence ('accelerate', 'slow', 'pause', 'redirect') | |
| strength: Influence strength (0.0 to 1.0) | |
| """ | |
| if strength <= 0.0 or other_agent.agent_id == self.agent_id: | |
| return # No influence or self-influence | |
| # Record the influence relationship | |
| if other_agent.agent_id not in self.influence_strength: | |
| self.influence_strength[other_agent.agent_id] = 0.0 | |
| self.influence_strength[other_agent.agent_id] += strength * 0.1 # Cumulative influence | |
| if self.agent_id not in other_agent.influenced_by: | |
| other_agent.influenced_by.append(self.agent_id) | |
| # Apply influence based on type and agent compatibility | |
| compatibility = self._calculate_agent_compatibility(other_agent) | |
| effective_strength = strength * compatibility | |
| if influence_type == "accelerate" and effective_strength > 0.3: | |
| # Speed up the other agent if they're compatible | |
| current_velocity = other_agent.velocity | |
| other_agent.set_velocity_multiplier(min(current_velocity * 1.2, 2.0)) | |
| elif influence_type == "slow" and effective_strength > 0.4: | |
| # Slow down if there's strong incompatibility | |
| current_velocity = other_agent.velocity | |
| other_agent.set_velocity_multiplier(max(current_velocity * 0.8, 0.3)) | |
| elif influence_type == "pause" and effective_strength > 0.6: | |
| # Pause for consideration of conflicting approaches | |
| other_agent.pause_for_duration(0.1 * effective_strength, 0.0) # Brief pause | |
| # Record collaboration | |
| self.collaboration_history.append({ | |
| "timestamp": time.time(), | |
| "other_agent": other_agent.agent_id, | |
| "influence_type": influence_type, | |
| "strength": strength, | |
| "effective_strength": effective_strength, | |
| "compatibility": compatibility | |
| }) | |
| def _calculate_agent_compatibility(self, other_agent: "LLMAgent") -> float: | |
| """ | |
| Calculate compatibility between this agent and another. | |
| Args: | |
| other_agent: Other agent to assess compatibility with | |
| Returns: | |
| Compatibility score (0.0 to 1.0) | |
| """ | |
| # Type compatibility matrix | |
| type_compatibility = { | |
| ("research", "research"): 0.8, # Research agents collaborate well | |
| ("research", "analysis"): 0.9, # Research feeds analysis | |
| ("research", "synthesis"): 0.7, # Research provides raw material | |
| ("research", "critic"): 0.6, # Some tension but productive | |
| ("analysis", "analysis"): 0.7, # Analysis agents can complement | |
| ("analysis", "synthesis"): 0.9, # Analysis feeds synthesis | |
| ("analysis", "critic"): 0.8, # Analysis benefits from critique | |
| ("synthesis", "synthesis"): 0.5, # May compete for final output | |
| ("synthesis", "critic"): 0.8, # Synthesis benefits from review | |
| ("critic", "critic"): 0.6, # Critics can disagree | |
| } | |
| # Get base compatibility from types | |
| type_pair = (self.agent_type, other_agent.agent_type) | |
| reverse_type_pair = (other_agent.agent_type, self.agent_type) | |
| base_compatibility = type_compatibility.get( | |
| type_pair, type_compatibility.get(reverse_type_pair, 0.5) | |
| ) | |
| # Modify based on confidence histories | |
| if (len(self._confidence_history) > 2 and | |
| len(other_agent._confidence_history) > 2): | |
| my_trend = self._confidence_history[-1] - self._confidence_history[-2] | |
| their_trend = other_agent._confidence_history[-1] - other_agent._confidence_history[-2] | |
| # Agents with similar confidence trends are more compatible | |
| trend_similarity = 1.0 - abs(my_trend - their_trend) | |
| base_compatibility = (base_compatibility + trend_similarity) / 2 | |
| return max(0.0, min(base_compatibility, 1.0)) | |
| def assess_collaboration_opportunities(self, available_agents: List["LLMAgent"], | |
| current_time: float) -> List[Dict[str, Any]]: | |
| """ | |
| Assess opportunities for collaboration with other agents. | |
| Args: | |
| available_agents: List of other agents available for collaboration | |
| current_time: Current simulation time | |
| Returns: | |
| List of collaboration opportunities with recommendations | |
| """ | |
| opportunities = [] | |
| for other_agent in available_agents: | |
| if other_agent.agent_id == self.agent_id or other_agent.state != AgentState.ACTIVE: | |
| continue | |
| compatibility = self._calculate_agent_compatibility(other_agent) | |
| # Skip if compatibility is too low | |
| if compatibility < 0.3: | |
| continue | |
| # Assess potential collaboration based on current states | |
| opportunity = { | |
| "agent_id": other_agent.agent_id, | |
| "agent_type": other_agent.agent_type, | |
| "compatibility": compatibility, | |
| "recommended_influence": self._recommend_influence_type(other_agent), | |
| "confidence": other_agent._confidence_history[-1] if other_agent._confidence_history else 0.5, | |
| "distance": abs(self._progress - other_agent._progress) | |
| } | |
| opportunities.append(opportunity) | |
| # Sort by potential value (compatibility * confidence, adjusted for distance) | |
| opportunities.sort(key=lambda x: x["compatibility"] * x["confidence"] * (1.0 - x["distance"] * 0.5), reverse=True) | |
| return opportunities | |
| def _recommend_influence_type(self, other_agent: "LLMAgent") -> str: | |
| """ | |
| Recommend type of influence to apply to another agent. | |
| Args: | |
| other_agent: Agent to recommend influence for | |
| Returns: | |
| Recommended influence type | |
| """ | |
| if not other_agent._confidence_history: | |
| return "accelerate" # Default to acceleration for new agents | |
| other_confidence = other_agent._confidence_history[-1] | |
| my_confidence = self._confidence_history[-1] if self._confidence_history else 0.5 | |
| # If other agent has low confidence and I have high confidence, accelerate them | |
| if other_confidence < 0.5 and my_confidence > 0.7: | |
| return "accelerate" | |
| # If other agent has much higher confidence, slow down to learn from them | |
| elif other_confidence > my_confidence + 0.3: | |
| return "slow" | |
| # If confidence gap is large and we're incompatible, suggest pause | |
| elif abs(other_confidence - my_confidence) > 0.4: | |
| return "pause" | |
| # Default to acceleration for collaborative growth | |
| return "accelerate" | |
| def get_agent_stats(self) -> Dict[str, Any]: | |
| """ | |
| Get agent performance statistics including emergent behavior metrics. | |
| Returns: | |
| Dictionary with performance metrics | |
| """ | |
| stats = { | |
| "agent_id": self.agent_id, | |
| "agent_type": self.agent_type, | |
| "state": self.state.value, | |
| "spawn_time": self.spawn_time, | |
| "progress": self._progress, | |
| "total_tasks_processed": len(self.processing_results), | |
| "total_tokens_used": self.total_tokens_used, | |
| "total_processing_time": self.total_processing_time, | |
| "average_processing_time": (self.total_processing_time / len(self.processing_results) | |
| if self.processing_results else 0.0), | |
| "messages_received": len(self.received_messages), | |
| "shared_context_items": len(self.shared_context), | |
| # Emergent behavior metrics | |
| "influenced_by_count": len(self.influenced_by), | |
| "influences_given": len(self.influence_strength), | |
| "total_influence_received": sum(self.influence_strength.values()), | |
| "collaboration_count": len(self.collaboration_history), | |
| "velocity": self.velocity, | |
| "confidence_history": self._confidence_history.copy(), | |
| "progression_info": self.get_progression_info() | |
| } | |
| # Add token budget information if available | |
| if self.token_budget_manager: | |
| budget_status = self.token_budget_manager.get_agent_status(self.agent_id) | |
| if budget_status: | |
| stats["token_budget"] = budget_status | |
| return stats | |
| def create_llm_agents(helix: HelixGeometry, llm_client: LMStudioClient, | |
| agent_configs: List[Dict[str, Any]]) -> List[LLMAgent]: | |
| """ | |
| Create multiple LLM agents with specified configurations. | |
| Args: | |
| helix: Helix geometry for agent paths | |
| llm_client: LM Studio client for LLM inference | |
| agent_configs: List of agent configuration dictionaries | |
| Returns: | |
| List of configured LLM agents | |
| """ | |
| agents = [] | |
| for config in agent_configs: | |
| agent = LLMAgent( | |
| agent_id=config["agent_id"], | |
| spawn_time=config["spawn_time"], | |
| helix=helix, | |
| llm_client=llm_client, | |
| agent_type=config.get("agent_type", "general"), | |
| temperature_range=config.get("temperature_range", (0.1, 0.9)) | |
| ) | |
| agents.append(agent) | |
| return agents | |
| def create_specialized_agent_configs(num_research: int = 3, num_analysis: int = 2, | |
| num_synthesis: int = 1, random_seed: int = 42069) -> List[Dict[str, Any]]: | |
| """ | |
| Create agent configurations for typical Felix multi-agent task. | |
| Args: | |
| num_research: Number of research agents (spawn early) | |
| num_analysis: Number of analysis agents (spawn mid) | |
| num_synthesis: Number of synthesis agents (spawn late) | |
| random_seed: Random seed for spawn timing | |
| Returns: | |
| List of agent configuration dictionaries | |
| """ | |
| import random | |
| random.seed(random_seed) | |
| configs = [] | |
| agent_id = 0 | |
| # Research agents - spawn early (0.0-0.4) | |
| for i in range(num_research): | |
| configs.append({ | |
| "agent_id": f"research_{agent_id:03d}", | |
| "spawn_time": random.uniform(0.0, 0.4), | |
| "agent_type": "research", | |
| "temperature_range": (0.3, 0.9) | |
| }) | |
| agent_id += 1 | |
| # Analysis agents - spawn mid (0.3-0.7) | |
| for i in range(num_analysis): | |
| configs.append({ | |
| "agent_id": f"analysis_{agent_id:03d}", | |
| "spawn_time": random.uniform(0.3, 0.7), | |
| "agent_type": "analysis", | |
| "temperature_range": (0.2, 0.7) | |
| }) | |
| agent_id += 1 | |
| # Synthesis agents - spawn late (0.6-1.0) | |
| for i in range(num_synthesis): | |
| configs.append({ | |
| "agent_id": f"synthesis_{agent_id:03d}", | |
| "spawn_time": random.uniform(0.6, 1.0), | |
| "agent_type": "synthesis", | |
| "temperature_range": (0.1, 0.5) | |
| }) | |
| agent_id += 1 | |
| return configs | |