felix-framework / src /agents /llm_agent.py
jkbennitt
Clean hf-space branch and prepare for HuggingFace Spaces deployment
fb867c3
"""
LLM-powered agent for the Felix Framework.
This module extends the base Agent class with language model capabilities,
enabling agents to process tasks using local LLM inference via LM Studio.
Key Features:
- Integration with LM Studio for local LLM inference
- Position-aware prompt engineering based on helix location
- Adaptive behavior based on geometric constraints
- Communication via spoke system with central coordination
- Built-in task processing and result sharing
The agent's behavior adapts based on its position on the helix:
- Top (wide): Broad exploration, high creativity
- Middle: Focused analysis, balanced processing
- Bottom (narrow): Precise synthesis, low temperature
"""
import time
import asyncio
import logging
from typing import Optional, Dict, Any, List
from dataclasses import dataclass
from agents.agent import Agent, AgentState
from core.helix_geometry import HelixGeometry
from llm.lm_studio_client import LMStudioClient, LLMResponse, RequestPriority
from llm.multi_server_client import LMStudioClientPool
from llm.token_budget import TokenBudgetManager, TokenAllocation
from communication.central_post import Message, MessageType
from pipeline.chunking import ChunkedResult, ProgressiveProcessor, ContentSummarizer
from agents.prompt_optimization import PromptOptimizer, PromptMetrics, PromptContext
logger = logging.getLogger(__name__)
@dataclass
class LLMTask:
"""Task for LLM agent processing."""
task_id: str
description: str
context: str = ""
metadata: Optional[Dict[str, Any]] = None
def __post_init__(self):
if self.metadata is None:
self.metadata = {}
@dataclass
class LLMResult:
"""Result from LLM agent processing with chunking support."""
agent_id: str
task_id: str
content: str
position_info: Dict[str, float]
llm_response: LLMResponse
processing_time: float
timestamp: float
confidence: float = 0.0 # Confidence score (0.0 to 1.0)
processing_stage: int = 1 # Stage number in helix descent
# Chunking support fields
is_chunked: bool = False # Whether this result contains chunked output
chunk_results: Optional[List[ChunkedResult]] = None # List of chunk results if chunked
total_chunks: int = 1 # Total number of chunks (1 for non-chunked)
chunking_strategy: Optional[str] = None # Strategy used for chunking (progressive/streaming)
summary_fallback: Optional[str] = None # Summary if content was truncated
full_content_available: bool = True # Whether full content is available or summarized
def __post_init__(self):
if self.chunk_results is None and self.is_chunked:
self.chunk_results = []
elif self.chunk_results is None:
self.chunk_results = []
def get_full_content(self) -> str:
"""Get full content, combining chunks if necessary."""
if not self.is_chunked:
return self.content
if not self.chunk_results:
return self.content
# Combine chunk content in order
sorted_chunks = sorted(self.chunk_results, key=lambda x: x.chunk_index)
combined_content = "".join(chunk.content_chunk for chunk in sorted_chunks)
return combined_content if combined_content else self.content
def get_content_summary(self) -> str:
"""Get content summary, preferring summary_fallback if available."""
if self.summary_fallback and not self.full_content_available:
return self.summary_fallback
content = self.get_full_content()
if len(content) <= 200:
return content
return content[:200] + "..."
def add_chunk(self, chunk: ChunkedResult) -> None:
"""Add a chunk result to this LLM result."""
if not self.is_chunked:
self.is_chunked = True
if self.chunk_results is None:
self.chunk_results = []
self.chunk_results.append(chunk)
self.total_chunks = len(self.chunk_results)
def is_complete(self) -> bool:
"""Check if all chunks are available for a chunked result."""
if not self.is_chunked:
return True
if not self.chunk_results:
return False
# Check if we have a final chunk
return any(chunk.is_final for chunk in self.chunk_results)
def get_chunking_metadata(self) -> Dict[str, Any]:
"""Get metadata about the chunking process."""
if not self.is_chunked:
return {"chunked": False}
return {
"chunked": True,
"total_chunks": self.total_chunks,
"chunks_available": len(self.chunk_results) if self.chunk_results else 0,
"strategy": self.chunking_strategy,
"complete": self.is_complete(),
"full_content_available": self.full_content_available,
"has_summary_fallback": self.summary_fallback is not None
}
class LLMAgent(Agent):
"""
LLM-powered agent that processes tasks using language models.
Extends the base Agent class with LLM capabilities, providing
position-aware prompt engineering and adaptive behavior based
on the agent's location on the helix.
"""
def __init__(self, agent_id: str, spawn_time: float, helix: HelixGeometry,
llm_client, agent_type: str = "general",
temperature_range: Optional[tuple] = None, max_tokens: Optional[int] = None,
token_budget_manager: Optional[TokenBudgetManager] = None,
prompt_optimizer: Optional[PromptOptimizer] = None):
"""
Initialize LLM agent.
Args:
agent_id: Unique identifier for the agent
spawn_time: Time when agent becomes active (0.0 to 1.0)
helix: Helix geometry for path calculation
llm_client: LM Studio client or client pool for LLM inference
agent_type: Agent specialization (research, analysis, synthesis, critic)
temperature_range: (min, max) temperature based on helix position
token_budget_manager: Optional budget manager for adaptive token allocation
prompt_optimizer: Optional prompt optimization system for learning
"""
super().__init__(agent_id, spawn_time, helix)
self.llm_client = llm_client
self.agent_type = agent_type
# FIXED: Set appropriate defaults based on agent type (aligns with TokenBudgetManager)
if temperature_range is None:
if agent_type == "research":
self.temperature_range = (0.4, 0.9) # High creativity for exploration
elif agent_type == "analysis":
self.temperature_range = (0.2, 0.7) # Balanced for processing
elif agent_type == "synthesis":
self.temperature_range = (0.1, 0.5) # Lower for focused synthesis
elif agent_type == "critic":
self.temperature_range = (0.1, 0.6) # Low-medium for critique
else:
self.temperature_range = (0.1, 0.9) # Default fallback
else:
self.temperature_range = temperature_range
if max_tokens is None:
if agent_type == "research":
self.max_tokens = 200 # Small for bullet points
elif agent_type == "analysis":
self.max_tokens = 400 # Medium for structured analysis
elif agent_type == "synthesis":
self.max_tokens = 1000 # Large for comprehensive output
elif agent_type == "critic":
self.max_tokens = 150 # Small for focused feedback
else:
self.max_tokens = 500 # Default fallback
else:
self.max_tokens = max_tokens
self.token_budget_manager = token_budget_manager
self.prompt_optimizer = prompt_optimizer
# Initialize token budget if manager provided
if self.token_budget_manager:
self.token_budget_manager.initialize_agent_budget(agent_id, agent_type, self.max_tokens)
# LLM-specific state
self.processing_results: List[LLMResult] = []
self.total_tokens_used = 0
self.total_processing_time = 0.0
self.processing_stage = 0 # Current processing stage in helix descent
# Communication state
self.shared_context: Dict[str, Any] = {}
self.received_messages: List[Dict[str, Any]] = []
# Emergent behavior tracking
self.influenced_by: List[str] = [] # Agent IDs that influenced this agent
self.influence_strength: Dict[str, float] = {} # How much each agent influenced this one
self.collaboration_history: List[Dict[str, Any]] = [] # History of collaborations
def get_position_info(self, current_time: float) -> Dict[str, float]:
"""
Get detailed position information for the agent.
Args:
current_time: Current simulation time
Returns:
Dictionary with position details
"""
position = self.get_position(current_time)
if position is None:
return {}
x, y, z = position
radius = self.helix.get_radius(z)
depth_ratio = z / self.helix.height
return {
"x": x,
"y": y,
"z": z,
"radius": radius,
"depth_ratio": depth_ratio,
"progress": self._progress
}
def get_adaptive_temperature(self, current_time: float) -> float:
"""
Calculate temperature based on helix position.
Higher temperature (more creative) at top of helix,
lower temperature (more focused) at bottom.
Args:
current_time: Current simulation time
Returns:
Temperature value for LLM
"""
position_info = self.get_position_info(current_time)
depth_ratio = position_info.get("depth_ratio", 0.0)
# Invert depth ratio: top (0.0) = high temp, bottom (1.0) = low temp
inverted_ratio = 1.0 - depth_ratio
min_temp, max_temp = self.temperature_range
temperature = min_temp + (max_temp - min_temp) * inverted_ratio
return max(min_temp, min(max_temp, temperature))
def calculate_confidence(self, current_time: float, content: str, stage: int) -> float:
"""
Calculate confidence score based on agent type, helix position, and content quality.
Agent types have different confidence ranges to ensure proper workflow:
- Research agents: 0.3-0.6 (gather info, don't make final decisions)
- Analysis agents: 0.4-0.7 (process info, prepare for synthesis)
- Synthesis agents: 0.6-0.95 (create final output)
- Critic agents: 0.5-0.8 (provide feedback)
Args:
current_time: Current simulation time
content: Generated content to evaluate
stage: Processing stage number
Returns:
Confidence score (0.0 to 1.0)
"""
position_info = self.get_position_info(current_time)
depth_ratio = position_info.get("depth_ratio", 0.0)
# Base confidence range based on agent type
if self.agent_type == "research":
# Research agents max out at 0.6 - they gather info, don't make final decisions
base_confidence = 0.3 + (depth_ratio * 0.3) # 0.3-0.6 range
max_confidence = 0.6
elif self.agent_type == "analysis":
# Analysis agents: 0.4-0.7 - process info but don't synthesize
base_confidence = 0.4 + (depth_ratio * 0.3) # 0.4-0.7 range
max_confidence = 0.7
elif self.agent_type == "synthesis":
# Synthesis agents: 0.6-0.95 - create final comprehensive output
base_confidence = 0.6 + (depth_ratio * 0.35) # 0.6-0.95 range
max_confidence = 0.95
elif self.agent_type == "critic":
# Critic agents: 0.5-0.8 - provide feedback and validation
base_confidence = 0.5 + (depth_ratio * 0.3) # 0.5-0.8 range
max_confidence = 0.8
else:
# Default fallback
base_confidence = 0.3 + (depth_ratio * 0.4)
max_confidence = 0.7
# Content quality bonus (up to 0.1 additional)
content_quality = self._analyze_content_quality(content)
content_bonus = content_quality * 0.1
# Processing stage bonus (up to 0.05 additional)
stage_bonus = min(stage * 0.005, 0.05)
# Historical consistency bonus (up to 0.05 additional)
consistency_bonus = self._calculate_consistency_bonus() * 0.05
total_confidence = base_confidence + content_bonus + stage_bonus + consistency_bonus
# Store debug info for potential display
self._last_confidence_breakdown = {
"base_confidence": base_confidence,
"content_bonus": content_bonus,
"stage_bonus": stage_bonus,
"consistency_bonus": consistency_bonus,
"total_before_cap": total_confidence,
"max_confidence": max_confidence,
"final_confidence": min(max(total_confidence, 0.0), max_confidence)
}
return min(max(total_confidence, 0.0), max_confidence)
def _analyze_content_quality(self, content: str) -> float:
"""
Analyze content quality using multiple heuristics.
Args:
content: Content to analyze
Returns:
Quality score (0.0 to 1.0)
"""
if not content or len(content.strip()) == 0:
return 0.0
content_lower = content.lower()
quality_score = 0.0
# Length appropriateness (0.25 weight)
content_length = len(content)
if 100 <= content_length <= 2000:
length_score = 1.0
elif content_length < 100:
length_score = content_length / 100.0
else: # Very long content
length_score = max(0.3, 2000.0 / content_length)
quality_score += length_score * 0.25
# Structure indicators (0.25 weight)
structure_indicators = [
'\n\n' in content, # Paragraphs
'.' in content, # Sentences
any(word in content_lower for word in ['analysis', 'research', 'conclusion', 'summary']),
content.count('.') >= 3, # Multiple sentences
]
structure_score = sum(structure_indicators) / len(structure_indicators)
quality_score += structure_score * 0.25
# Content depth indicators (0.25 weight)
depth_indicators = [
any(word in content_lower for word in ['because', 'therefore', 'however', 'moreover', 'furthermore']),
any(word in content_lower for word in ['data', 'evidence', 'study', 'research', 'analysis']),
any(word in content_lower for word in ['consider', 'suggest', 'indicate', 'demonstrate']),
len(content.split()) > 50, # Substantial word count
]
depth_score = sum(depth_indicators) / len(depth_indicators)
quality_score += depth_score * 0.25
# Specificity indicators (0.25 weight)
specificity_indicators = [
any(char.isdigit() for char in content), # Contains numbers/data
content.count(',') > 2, # Complex sentences with details
any(word in content_lower for word in ['specific', 'particular', 'detail', 'example']),
'"' in content or "'" in content, # Quotes or citations
]
specificity_score = sum(specificity_indicators) / len(specificity_indicators)
quality_score += specificity_score * 0.25
return min(quality_score, 1.0)
def _calculate_consistency_bonus(self) -> float:
"""
Calculate consistency bonus based on confidence history stability.
Returns:
Consistency bonus (0.0 to 1.0)
"""
if len(self._confidence_history) < 3:
return 0.5 # Neutral for insufficient data
# Calculate confidence variance (lower variance = more consistent)
recent_confidences = self._confidence_history[-3:]
avg_confidence = sum(recent_confidences) / len(recent_confidences)
variance = sum((c - avg_confidence) ** 2 for c in recent_confidences) / len(recent_confidences)
# Convert variance to consistency bonus (lower variance = higher bonus)
consistency_bonus = max(0.0, 1.0 - (variance * 10)) # Scale variance appropriately
return min(consistency_bonus, 1.0)
def create_position_aware_prompt(self, task: LLMTask, current_time: float) -> tuple[str, int]:
"""
Create system prompt that adapts to agent's helix position with token budget.
Enhanced with prompt optimization that learns from performance metrics.
Args:
task: Task to process
current_time: Current simulation time
Returns:
Tuple of (position-aware system prompt, token budget for this stage)
"""
position_info = self.get_position_info(current_time)
depth_ratio = position_info.get("depth_ratio", 0.0)
# Determine prompt context based on agent type and position
prompt_context = self._get_prompt_context(depth_ratio)
# Get token allocation if budget manager is available
token_allocation = None
stage_token_budget = self.max_tokens # Default fallback
if self.token_budget_manager:
token_allocation = self.token_budget_manager.calculate_stage_allocation(
self.agent_id, depth_ratio, self.processing_stage + 1
)
stage_token_budget = token_allocation.stage_budget
# Add shared context from other agents
context_summary = ""
if self.shared_context:
context_summary = "\n\nShared Context from Other Agents:\n"
for key, value in self.shared_context.items():
context_summary += f"- {key}: {value}\n"
# Generate prompt ID for optimization tracking
prompt_id = f"{self.agent_type}_{prompt_context.value}_stage_{self.processing_stage}"
# Check if we have an optimized prompt available
optimized_prompt = None
if self.prompt_optimizer:
recommendations = self.prompt_optimizer.get_optimization_recommendations(prompt_context)
if recommendations.get("best_prompts"):
best_prompt = recommendations["best_prompts"][0]
if best_prompt[1] > 0.7: # High performance threshold
optimized_prompt = best_prompt[0]
logger.debug(f"Using optimized prompt for {prompt_id} (score: {best_prompt[1]:.3f})")
# Create base system prompt
if optimized_prompt:
base_prompt = optimized_prompt
else:
base_prompt = self.llm_client.create_agent_system_prompt(
agent_type=self.agent_type,
position_info=position_info,
task_context=f"{task.context}{context_summary}"
)
# Add token budget guidance if available
if token_allocation:
budget_guidance = f"\n\nToken Budget Guidance:\n{token_allocation.style_guidance}"
if token_allocation.compression_ratio > 0.5:
budget_guidance += f"\nCompress previous insights by ~{token_allocation.compression_ratio:.0%} while preserving key points."
enhanced_prompt = base_prompt + budget_guidance
else:
enhanced_prompt = base_prompt
return enhanced_prompt, stage_token_budget
def _get_prompt_context(self, depth_ratio: float) -> PromptContext:
"""
Determine prompt context based on agent type and helix position.
Args:
depth_ratio: Agent's depth ratio on helix (0.0 = top, 1.0 = bottom)
Returns:
PromptContext enum value
"""
if self.agent_type == "research":
return PromptContext.RESEARCH_EARLY if depth_ratio < 0.3 else PromptContext.RESEARCH_MID
elif self.agent_type == "analysis":
return PromptContext.ANALYSIS_MID if depth_ratio < 0.7 else PromptContext.ANALYSIS_LATE
elif self.agent_type == "synthesis":
return PromptContext.SYNTHESIS_LATE
elif self.agent_type == "critic":
return PromptContext.GENERAL # Critics can work at any stage
else:
return PromptContext.GENERAL
def _record_prompt_metrics(self, prompt_text: str, prompt_context: PromptContext,
result: LLMResult) -> None:
"""
Record prompt performance metrics for optimization learning.
Args:
prompt_text: The full system prompt used
prompt_context: Context category for the prompt
result: LLM result containing performance data
"""
if not self.prompt_optimizer:
return
# Calculate token efficiency
tokens_used = getattr(result.llm_response, 'tokens_used', 0)
token_efficiency = min(result.confidence, 0.8) if tokens_used > 0 else 0.0
# Determine if truncation occurred (approximate)
truncation_occurred = (
tokens_used >= self.max_tokens * 0.95 or # Used most of token budget
result.llm_response.content.endswith("...") or # Ends with ellipsis
len(result.llm_response.content) < 50 # Very short response
)
# Create metrics
metrics = PromptMetrics(
output_quality=result.confidence, # Use confidence as proxy for quality
confidence=result.confidence,
completion_time=result.processing_time,
token_efficiency=token_efficiency,
truncation_occurred=truncation_occurred,
context=prompt_context
)
# Generate prompt ID for tracking
prompt_id = f"{self.agent_type}_{prompt_context.value}_stage_{self.processing_stage}"
# Record metrics
optimization_result = self.prompt_optimizer.record_prompt_execution(
prompt_id, prompt_text, metrics
)
if optimization_result.get("optimization_triggered"):
logger.info(f"Prompt optimization triggered for {prompt_id}")
async def process_task_with_llm_async(self, task: LLMTask, current_time: float,
priority: RequestPriority = RequestPriority.NORMAL) -> LLMResult:
"""
Asynchronously process task using LLM with position-aware prompting.
Args:
task: Task to process
current_time: Current simulation time
priority: Request priority for LLM processing
Returns:
LLM processing result
"""
start_time = time.perf_counter()
# Get position-aware prompts, token budget, and temperature
system_prompt, stage_token_budget = self.create_position_aware_prompt(task, current_time)
temperature = self.get_adaptive_temperature(current_time)
position_info = self.get_position_info(current_time)
# Ensure stage budget doesn't exceed agent's max_tokens
effective_token_budget = min(stage_token_budget, self.max_tokens)
# Process with LLM using coordinated token budget (ASYNC)
# Use multi-server client pool if available, otherwise use regular client
if isinstance(self.llm_client, LMStudioClientPool):
llm_response = await self.llm_client.complete_for_agent_type(
agent_type=self.agent_type,
agent_id=self.agent_id,
system_prompt=system_prompt,
user_prompt=task.description,
temperature=temperature,
max_tokens=effective_token_budget,
priority=priority
)
else:
llm_response = await self.llm_client.complete_async(
agent_id=self.agent_id,
system_prompt=system_prompt,
user_prompt=task.description,
temperature=temperature,
max_tokens=effective_token_budget,
priority=priority
)
end_time = time.perf_counter()
processing_time = end_time - start_time
# Increment processing stage
self.processing_stage += 1
# Calculate confidence based on position and content
confidence = self.calculate_confidence(current_time, llm_response.content, self.processing_stage)
# Record confidence for adaptive progression
self.record_confidence(confidence)
# Create result
result = LLMResult(
agent_id=self.agent_id,
task_id=task.task_id,
content=llm_response.content,
position_info=position_info,
llm_response=llm_response,
processing_time=processing_time,
timestamp=time.time(),
confidence=confidence,
processing_stage=self.processing_stage
)
# Record token usage with budget manager
if self.token_budget_manager:
self.token_budget_manager.record_usage(self.agent_id, llm_response.tokens_used)
# Record prompt metrics for optimization
prompt_context = self._get_prompt_context(position_info.get("depth_ratio", 0.0))
self._record_prompt_metrics(system_prompt, prompt_context, result)
# Update statistics
self.processing_results.append(result)
self.total_tokens_used += llm_response.tokens_used
self.total_processing_time += processing_time
logger.info(f"Agent {self.agent_id} processed task {task.task_id} "
f"at depth {position_info.get('depth_ratio', 0):.2f} "
f"in {processing_time:.2f}s (async)")
return result
def process_task_with_llm(self, task: LLMTask, current_time: float) -> LLMResult:
"""
Process task using LLM with position-aware prompting (sync wrapper).
Args:
task: Task to process
current_time: Current simulation time
Returns:
LLM processing result
"""
start_time = time.perf_counter()
# Get position-aware prompts, token budget, and temperature
system_prompt, stage_token_budget = self.create_position_aware_prompt(task, current_time)
temperature = self.get_adaptive_temperature(current_time)
position_info = self.get_position_info(current_time)
# Ensure stage budget doesn't exceed agent's max_tokens
effective_token_budget = min(stage_token_budget, self.max_tokens)
# Process with LLM using coordinated token budget (SYNC)
# Note: Multi-server client pool requires async, so fall back to first available server
if isinstance(self.llm_client, LMStudioClientPool):
# For sync calls with pool, use the first available client
server_name = self.llm_client.get_server_for_agent_type(self.agent_type)
if server_name and server_name in self.llm_client.clients:
client = self.llm_client.clients[server_name]
llm_response = client.complete(
agent_id=self.agent_id,
system_prompt=system_prompt,
user_prompt=task.description,
temperature=temperature,
max_tokens=effective_token_budget
)
else:
raise RuntimeError(f"No available server for agent type: {self.agent_type}")
else:
llm_response = self.llm_client.complete(
agent_id=self.agent_id,
system_prompt=system_prompt,
user_prompt=task.description,
temperature=temperature,
max_tokens=effective_token_budget
)
end_time = time.perf_counter()
processing_time = end_time - start_time
# Increment processing stage
self.processing_stage += 1
# Calculate confidence based on position and content
confidence = self.calculate_confidence(current_time, llm_response.content, self.processing_stage)
# Record confidence for adaptive progression
self.record_confidence(confidence)
# Create result
result = LLMResult(
agent_id=self.agent_id,
task_id=task.task_id,
content=llm_response.content,
position_info=position_info,
llm_response=llm_response,
processing_time=processing_time,
timestamp=time.time(),
confidence=confidence,
processing_stage=self.processing_stage
)
# Record token usage with budget manager
if self.token_budget_manager:
self.token_budget_manager.record_usage(self.agent_id, llm_response.tokens_used)
# Record prompt metrics for optimization
prompt_context = self._get_prompt_context(position_info.get("depth_ratio", 0.0))
self._record_prompt_metrics(system_prompt, prompt_context, result)
# Update statistics
self.processing_results.append(result)
self.total_tokens_used += llm_response.tokens_used
self.total_processing_time += processing_time
logger.info(f"Agent {self.agent_id} processed task {task.task_id} "
f"at depth {position_info.get('depth_ratio', 0):.2f} "
f"in {processing_time:.2f}s")
return result
# Legacy method alias for backward compatibility
async def process_task_async(self, task: LLMTask, current_time: float) -> LLMResult:
"""
Asynchronously process task using LLM (legacy method).
Args:
task: Task to process
current_time: Current simulation time
Returns:
LLM processing result
"""
return await self.process_task_with_llm_async(task, current_time, RequestPriority.NORMAL)
def share_result_to_central(self, result: LLMResult) -> Message:
"""
Create message to share result with central post.
Args:
result: Processing result to share
Returns:
Message for central post communication
"""
# Handle tokens_used for chunked vs non-chunked results
tokens_used = 0
if result.is_chunked and result.chunk_results:
# Sum tokens from all chunks
tokens_used = sum(chunk.metadata.get("tokens_used", 0) for chunk in result.chunk_results)
elif result.llm_response:
tokens_used = result.llm_response.tokens_used
content_data = {
"type": "AGENT_RESULT",
"agent_id": self.agent_id,
"agent_type": self.agent_type,
"task_id": result.task_id,
"content": result.get_content_summary(), # Use summary for chunked content
"full_content_available": result.full_content_available,
"position_info": result.position_info,
"tokens_used": tokens_used,
"processing_time": result.processing_time,
"confidence": result.confidence,
"processing_stage": result.processing_stage,
"summary": self._create_result_summary(result)
}
# Add chunking metadata if applicable
if result.is_chunked:
content_data["chunking_metadata"] = result.get_chunking_metadata()
content_data["chunking_strategy"] = result.chunking_strategy
# For streaming synthesis, make chunks available to synthesis agents
if result.chunking_strategy == "streaming" and result.chunk_results:
content_data["streaming_chunks"] = [
{
"chunk_index": chunk.chunk_index,
"aspect": chunk.metadata.get("aspect", f"Section {chunk.chunk_index + 1}"),
"content": chunk.content_chunk,
"is_final": chunk.is_final,
"timestamp": chunk.timestamp
}
for chunk in result.chunk_results
]
return Message(
sender_id=self.agent_id,
message_type=MessageType.STATUS_UPDATE,
content=content_data,
timestamp=result.timestamp
)
def _create_result_summary(self, result: LLMResult) -> str:
"""Create concise summary of processing result."""
content_preview = result.content[:100] + "..." if len(result.content) > 100 else result.content
depth = result.position_info.get("depth_ratio", 0.0)
return f"[{self.agent_type.upper()} @ depth {depth:.2f}] {content_preview}"
def receive_shared_context(self, message: Dict[str, Any]) -> None:
"""
Receive and store shared context from other agents.
Args:
message: Shared context message
"""
self.received_messages.append(message)
# Extract relevant context
if message.get("type") == "AGENT_RESULT":
key = f"{message.get('agent_type', 'unknown')}_{message.get('agent_id', '')}"
self.shared_context[key] = message.get("summary", "")
def influence_agent_behavior(self, other_agent: "LLMAgent", influence_type: str, strength: float) -> None:
"""
Influence another agent's behavior based on collaboration.
Args:
other_agent: Agent to influence
influence_type: Type of influence ('accelerate', 'slow', 'pause', 'redirect')
strength: Influence strength (0.0 to 1.0)
"""
if strength <= 0.0 or other_agent.agent_id == self.agent_id:
return # No influence or self-influence
# Record the influence relationship
if other_agent.agent_id not in self.influence_strength:
self.influence_strength[other_agent.agent_id] = 0.0
self.influence_strength[other_agent.agent_id] += strength * 0.1 # Cumulative influence
if self.agent_id not in other_agent.influenced_by:
other_agent.influenced_by.append(self.agent_id)
# Apply influence based on type and agent compatibility
compatibility = self._calculate_agent_compatibility(other_agent)
effective_strength = strength * compatibility
if influence_type == "accelerate" and effective_strength > 0.3:
# Speed up the other agent if they're compatible
current_velocity = other_agent.velocity
other_agent.set_velocity_multiplier(min(current_velocity * 1.2, 2.0))
elif influence_type == "slow" and effective_strength > 0.4:
# Slow down if there's strong incompatibility
current_velocity = other_agent.velocity
other_agent.set_velocity_multiplier(max(current_velocity * 0.8, 0.3))
elif influence_type == "pause" and effective_strength > 0.6:
# Pause for consideration of conflicting approaches
other_agent.pause_for_duration(0.1 * effective_strength, 0.0) # Brief pause
# Record collaboration
self.collaboration_history.append({
"timestamp": time.time(),
"other_agent": other_agent.agent_id,
"influence_type": influence_type,
"strength": strength,
"effective_strength": effective_strength,
"compatibility": compatibility
})
def _calculate_agent_compatibility(self, other_agent: "LLMAgent") -> float:
"""
Calculate compatibility between this agent and another.
Args:
other_agent: Other agent to assess compatibility with
Returns:
Compatibility score (0.0 to 1.0)
"""
# Type compatibility matrix
type_compatibility = {
("research", "research"): 0.8, # Research agents collaborate well
("research", "analysis"): 0.9, # Research feeds analysis
("research", "synthesis"): 0.7, # Research provides raw material
("research", "critic"): 0.6, # Some tension but productive
("analysis", "analysis"): 0.7, # Analysis agents can complement
("analysis", "synthesis"): 0.9, # Analysis feeds synthesis
("analysis", "critic"): 0.8, # Analysis benefits from critique
("synthesis", "synthesis"): 0.5, # May compete for final output
("synthesis", "critic"): 0.8, # Synthesis benefits from review
("critic", "critic"): 0.6, # Critics can disagree
}
# Get base compatibility from types
type_pair = (self.agent_type, other_agent.agent_type)
reverse_type_pair = (other_agent.agent_type, self.agent_type)
base_compatibility = type_compatibility.get(
type_pair, type_compatibility.get(reverse_type_pair, 0.5)
)
# Modify based on confidence histories
if (len(self._confidence_history) > 2 and
len(other_agent._confidence_history) > 2):
my_trend = self._confidence_history[-1] - self._confidence_history[-2]
their_trend = other_agent._confidence_history[-1] - other_agent._confidence_history[-2]
# Agents with similar confidence trends are more compatible
trend_similarity = 1.0 - abs(my_trend - their_trend)
base_compatibility = (base_compatibility + trend_similarity) / 2
return max(0.0, min(base_compatibility, 1.0))
def assess_collaboration_opportunities(self, available_agents: List["LLMAgent"],
current_time: float) -> List[Dict[str, Any]]:
"""
Assess opportunities for collaboration with other agents.
Args:
available_agents: List of other agents available for collaboration
current_time: Current simulation time
Returns:
List of collaboration opportunities with recommendations
"""
opportunities = []
for other_agent in available_agents:
if other_agent.agent_id == self.agent_id or other_agent.state != AgentState.ACTIVE:
continue
compatibility = self._calculate_agent_compatibility(other_agent)
# Skip if compatibility is too low
if compatibility < 0.3:
continue
# Assess potential collaboration based on current states
opportunity = {
"agent_id": other_agent.agent_id,
"agent_type": other_agent.agent_type,
"compatibility": compatibility,
"recommended_influence": self._recommend_influence_type(other_agent),
"confidence": other_agent._confidence_history[-1] if other_agent._confidence_history else 0.5,
"distance": abs(self._progress - other_agent._progress)
}
opportunities.append(opportunity)
# Sort by potential value (compatibility * confidence, adjusted for distance)
opportunities.sort(key=lambda x: x["compatibility"] * x["confidence"] * (1.0 - x["distance"] * 0.5), reverse=True)
return opportunities
def _recommend_influence_type(self, other_agent: "LLMAgent") -> str:
"""
Recommend type of influence to apply to another agent.
Args:
other_agent: Agent to recommend influence for
Returns:
Recommended influence type
"""
if not other_agent._confidence_history:
return "accelerate" # Default to acceleration for new agents
other_confidence = other_agent._confidence_history[-1]
my_confidence = self._confidence_history[-1] if self._confidence_history else 0.5
# If other agent has low confidence and I have high confidence, accelerate them
if other_confidence < 0.5 and my_confidence > 0.7:
return "accelerate"
# If other agent has much higher confidence, slow down to learn from them
elif other_confidence > my_confidence + 0.3:
return "slow"
# If confidence gap is large and we're incompatible, suggest pause
elif abs(other_confidence - my_confidence) > 0.4:
return "pause"
# Default to acceleration for collaborative growth
return "accelerate"
def get_agent_stats(self) -> Dict[str, Any]:
"""
Get agent performance statistics including emergent behavior metrics.
Returns:
Dictionary with performance metrics
"""
stats = {
"agent_id": self.agent_id,
"agent_type": self.agent_type,
"state": self.state.value,
"spawn_time": self.spawn_time,
"progress": self._progress,
"total_tasks_processed": len(self.processing_results),
"total_tokens_used": self.total_tokens_used,
"total_processing_time": self.total_processing_time,
"average_processing_time": (self.total_processing_time / len(self.processing_results)
if self.processing_results else 0.0),
"messages_received": len(self.received_messages),
"shared_context_items": len(self.shared_context),
# Emergent behavior metrics
"influenced_by_count": len(self.influenced_by),
"influences_given": len(self.influence_strength),
"total_influence_received": sum(self.influence_strength.values()),
"collaboration_count": len(self.collaboration_history),
"velocity": self.velocity,
"confidence_history": self._confidence_history.copy(),
"progression_info": self.get_progression_info()
}
# Add token budget information if available
if self.token_budget_manager:
budget_status = self.token_budget_manager.get_agent_status(self.agent_id)
if budget_status:
stats["token_budget"] = budget_status
return stats
def create_llm_agents(helix: HelixGeometry, llm_client: LMStudioClient,
agent_configs: List[Dict[str, Any]]) -> List[LLMAgent]:
"""
Create multiple LLM agents with specified configurations.
Args:
helix: Helix geometry for agent paths
llm_client: LM Studio client for LLM inference
agent_configs: List of agent configuration dictionaries
Returns:
List of configured LLM agents
"""
agents = []
for config in agent_configs:
agent = LLMAgent(
agent_id=config["agent_id"],
spawn_time=config["spawn_time"],
helix=helix,
llm_client=llm_client,
agent_type=config.get("agent_type", "general"),
temperature_range=config.get("temperature_range", (0.1, 0.9))
)
agents.append(agent)
return agents
def create_specialized_agent_configs(num_research: int = 3, num_analysis: int = 2,
num_synthesis: int = 1, random_seed: int = 42069) -> List[Dict[str, Any]]:
"""
Create agent configurations for typical Felix multi-agent task.
Args:
num_research: Number of research agents (spawn early)
num_analysis: Number of analysis agents (spawn mid)
num_synthesis: Number of synthesis agents (spawn late)
random_seed: Random seed for spawn timing
Returns:
List of agent configuration dictionaries
"""
import random
random.seed(random_seed)
configs = []
agent_id = 0
# Research agents - spawn early (0.0-0.4)
for i in range(num_research):
configs.append({
"agent_id": f"research_{agent_id:03d}",
"spawn_time": random.uniform(0.0, 0.4),
"agent_type": "research",
"temperature_range": (0.3, 0.9)
})
agent_id += 1
# Analysis agents - spawn mid (0.3-0.7)
for i in range(num_analysis):
configs.append({
"agent_id": f"analysis_{agent_id:03d}",
"spawn_time": random.uniform(0.3, 0.7),
"agent_type": "analysis",
"temperature_range": (0.2, 0.7)
})
agent_id += 1
# Synthesis agents - spawn late (0.6-1.0)
for i in range(num_synthesis):
configs.append({
"agent_id": f"synthesis_{agent_id:03d}",
"spawn_time": random.uniform(0.6, 1.0),
"agent_type": "synthesis",
"temperature_range": (0.1, 0.5)
})
agent_id += 1
return configs