felix-framework / examples /colony_design.py
jkbennitt
Clean hf-space branch and prepare for HuggingFace Spaces deployment
fb867c3
#!/usr/bin/env python3
"""
Colony Design Demo using Felix Framework Geometric Orchestration.
This demo showcases how the Felix Framework's helix-based multi-agent system
can be used to collaboratively design a self-sustaining human colony on a hypothetical
exoplanet, Kepler-999z, addressing ecological, scientific, energy, governance, and
philosophical challenges. It demonstrates the geometric orchestration approach as an
alternative to traditional multi-agent systems.
The demo creates a team of specialized LLM agents that work together to produce a
comprehensive colony design plan, with agents spawning at different times and converging
naturally through the helix geometry toward a final synthesis.
Usage:
python examples/colony_design.py
Requirements:
- LM Studio running with a model loaded (http://localhost:1234)
- openai Python package installed
"""
import sys
import time
import asyncio
import argparse
from typing import List, Dict, Any, Optional
from pathlib import Path
# Add src to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from core.helix_geometry import HelixGeometry
from llm.lm_studio_client import LMStudioClient, LMStudioConnectionError, RequestPriority
from llm.token_budget import TokenBudgetManager
from agents.llm_agent import LLMTask
from agents.specialized_agents import create_specialized_team
from communication.central_post import CentralPost, AgentFactory
from communication.spoke import SpokeManager
class FelixColonyDesigner:
"""
Colony design system using Felix geometric orchestration.
Demonstrates how helix-based agent coordination can create a comprehensive
plan for a self-sustaining human colony on an exoplanet through natural
convergence rather than explicit workflow management.
"""
def __init__(self, lm_studio_url: str = "http://localhost:1234/v1", random_seed: int = None,
strict_mode: bool = False, max_concurrent_agents: int = 4):
"""
Initialize the Felix colony design system.
Args:
lm_studio_url: LM Studio API endpoint
random_seed: Seed for randomization (None for truly random behavior)
strict_mode: Enable strict token budgets for lightweight models
max_concurrent_agents: Maximum concurrent agent processing
"""
self.random_seed = random_seed
self.strict_mode = strict_mode
self.max_concurrent_agents = max_concurrent_agents
# Create helix geometry (OpenSCAD parameters)
self.helix = HelixGeometry(
top_radius=33.0,
bottom_radius=0.001,
height=33.0,
turns=33
)
# Initialize LLM client
self.llm_client = LMStudioClient(base_url=lm_studio_url, max_concurrent_requests=max_concurrent_agents)
# Initialize communication system
self.central_post = CentralPost(max_agents=20, enable_metrics=True)
self.spoke_manager = SpokeManager(self.central_post)
# Initialize token budget manager
self.token_budget_manager = TokenBudgetManager(
base_budget=1200 if not strict_mode else 400, # Reduced budget for strict mode
min_budget=150 if not strict_mode else 50,
max_budget=800 if not strict_mode else 150,
strict_mode=strict_mode
)
# Initialize agent factory for dynamic agent creation
self.agent_factory = AgentFactory(
helix=self.helix,
llm_client=self.llm_client,
token_budget_manager=self.token_budget_manager,
random_seed=random_seed
)
# Agent team
self.agents = []
print(f"Felix Colony Designer initialized")
print(f"Helix: {self.helix.turns} turns, {self.helix.top_radius}{self.helix.bottom_radius} radius")
if random_seed is not None:
print(f"Random seed: {random_seed} (deterministic behavior)")
else:
print("Random seed: None (truly random behavior)")
def test_lm_studio_connection(self) -> bool:
"""Test connection to LM Studio."""
try:
if self.llm_client.test_connection():
print("✓ LM Studio connection successful")
return True
else:
print("✗ LM Studio connection failed")
return False
except LMStudioConnectionError as e:
print(f"✗ LM Studio connection error: {e}")
return False
def create_colony_design_team(self, complexity: str = "complex") -> None:
"""
Create specialized team for colony design with randomized spawn times.
Args:
complexity: Task complexity level (default: complex)
"""
print(f"\nCreating {complexity} complexity colony design team...")
# Create specialized agents with default roles, relying on task description for guidance
self.agents = create_specialized_team(
helix=self.helix,
llm_client=self.llm_client,
task_complexity=complexity,
token_budget_manager=self.token_budget_manager,
random_seed=self.random_seed
)
# Register agents with communication system
for agent in self.agents:
self.spoke_manager.register_agent(agent)
print(f"Created team of {len(self.agents)} specialized agents:")
for agent in self.agents:
print(f" - {agent.agent_id} ({agent.agent_type}) spawns at t={agent.spawn_time:.3f}")
print(f"Agent spawn times are {'deterministic' if self.random_seed is not None else 'randomized'}")
async def run_colony_design_session_async(self, simulation_time: float = 1.0) -> Dict[str, Any]:
"""
Run collaborative colony design session with true parallel processing.
Args:
simulation_time: Duration of simulation (0.0 to 1.0)
Returns:
Results from the design session
"""
topic = "Design a self-sustaining human colony on Kepler-999z"
print(f"\n{'='*60}")
print(f"FELIX PARALLEL COLONY DESIGN SESSION")
print(f"Topic: {topic}")
print(f"Mode: {'STRICT (Lightweight)' if self.strict_mode else 'NORMAL'}")
print(f"{'='*60}")
# Start async message processing
await self.central_post.start_async_processing(max_concurrent_processors=2)
# Create the main task
main_task = LLMTask(
task_id="colony_design_001",
description=(
"Design a self-sustaining human colony for 10,000 people on Kepler-999z, an exoplanet with "
"Earth-like conditions but a silicon-oxygen polymer-based ecosystem and non-linear time dilation "
"effects near pulsar-induced geological sites. Address: (1) sustainable ecological integration "
"with silicon-based lifeforms, (2) safe utilization of time dilation sites for research while "
"mitigating cognitive risks, (3) an energy-independent system leveraging the planet’s technetium-99 "
"and pulsar radiation, (4) a governance model balancing human needs and ethical considerations for "
"potentially sentient lifeforms, and (5) a speculative technological breakthrough grounded in 2025 "
"science to address one major challenge. Reflect on the philosophical implications of colonizing a "
"planet with non-linear time and sentient silicon life."
),
context=(
"This is a collaborative project. Agents must contribute specialized expertise (e.g., astrobiology, "
"astrophysics, engineering, ethics) to create a comprehensive colony design plan. The plan must integrate "
"all components cohesively, ensuring human survival, scientific advancement, and ethical coexistence. "
"Agents should adapt their roles to address specific aspects of the task as needed."
)
)
# Track session results
results = {
"topic": topic,
"agents_participated": [],
"processing_timeline": [],
"final_output": None,
"session_stats": {},
"parallel_processing": True
}
session_start = time.perf_counter()
# Performance targets for strict mode
if self.strict_mode:
target_completion_time = 30.0 # 30 seconds max
target_total_tokens = 2000 # 2000 tokens max
else:
target_completion_time = simulation_time * 60 # More relaxed
target_total_tokens = 15000
print(f"\nStarting PARALLEL geometric orchestration...")
print(f"Target: {'<30s, <2000 tokens' if self.strict_mode else 'Normal performance'}")
# Run parallel processing
final_result = await self._run_parallel_processing(
main_task, 0.0, 0.05, simulation_time, results,
target_completion_time, target_total_tokens
)
session_end = time.perf_counter()
session_duration = session_end - session_start
# Cleanup async resources
await self.central_post.shutdown_async()
if hasattr(self.llm_client, 'close_async'):
await self.llm_client.close_async()
# Final statistics
total_tokens = sum(a.get("tokens_used", 0) for a in results["agents_participated"])
results["session_stats"] = {
"total_duration": session_duration,
"simulation_time": simulation_time,
"simulation_complete": final_result is not None,
"total_tokens_used": total_tokens,
"agents_created": len(self.agents),
"agents_participated": len(results["agents_participated"]),
"total_messages_processed": self.central_post.total_messages_processed,
"llm_client_stats": self.llm_client.get_usage_stats(),
"strict_mode": self.strict_mode,
"performance_targets_met": {
"time_target": session_duration < target_completion_time,
"token_target": total_tokens < target_total_tokens
}
}
return results
def run_colony_design_session(self, simulation_time: float = 1.0) -> Dict[str, Any]:
"""
Run collaborative colony design session.
Args:
simulation_time: Duration of simulation (0.0 to 1.0)
Returns:
Results from the design session
"""
# Run the async version
return asyncio.run(self.run_colony_design_session_async(simulation_time))
async def _run_parallel_processing(self, main_task: LLMTask, current_time: float,
time_step: float, simulation_time: float,
results: Dict[str, Any], target_completion_time: float,
target_total_tokens: int) -> Optional[Dict[str, Any]]:
"""
Run the core parallel processing loop.
"""
simulation_complete = False
max_timeout = simulation_time * 10
while current_time <= max_timeout and not simulation_complete:
step_start = time.perf_counter()
# Collect agents ready for processing
ready_agents = []
for agent in self.agents:
if (agent.can_spawn(current_time) and
agent.state.value == "waiting"):
print(f"\n[t={current_time:.2f}] 🌀 Spawning {agent.agent_id} ({agent.agent_type})")
agent.spawn(current_time, main_task)
ready_agents.append(agent)
elif agent.state.value == "active":
agent.update_position(current_time)
# Process every few steps or first stage
if current_time % 0.1 < time_step or agent.processing_stage == 0:
ready_agents.append(agent)
# Process agents in parallel batches
if ready_agents:
batch_size = min(len(ready_agents), self.max_concurrent_agents)
for i in range(0, len(ready_agents), batch_size):
batch = ready_agents[i:i + batch_size]
# Process batch in parallel
print(f"[t={current_time:.2f}] 🚀 Processing {len(batch)} agents in parallel")
batch_results = await self._process_agent_batch_async(
batch, main_task, current_time
)
# Check results and update
synthesis_agents_ready = any(
agent.agent_type == "synthesis" and agent.state.value == "active"
for agent in self.agents
)
for result in batch_results:
if result:
# Check for high-confidence acceptance
message = result['agent'].share_result_to_central(result['llm_result'])
await self.central_post.queue_message_async(message)
can_complete = (synthesis_agents_ready or
current_time > 0.9)
if can_complete and self.central_post.accept_high_confidence_result(message):
print(f" ✅ HIGH CONFIDENCE: {result['agent'].agent_id} ({result['agent'].agent_type}) result accepted!")
print(f" 🎯 PARALLEL SIMULATION COMPLETE!")
results["final_output"] = {
"content": result['llm_result'].content,
"agent_id": result['agent'].agent_id,
"agent_type": result['agent'].agent_type,
"confidence": result['llm_result'].confidence,
"stage": result['llm_result'].processing_stage,
"timestamp": current_time
}
simulation_complete = True
break
# Track participation
if result['llm_result'].confidence >= 0.6:
agent_info = {
"agent_id": result['agent'].agent_id,
"agent_type": result['agent'].agent_type,
"spawn_time": current_time,
"position_info": result['llm_result'].position_info,
"tokens_used": result['llm_result'].llm_response.tokens_used,
"confidence": result['llm_result'].confidence,
"stage": result['llm_result'].processing_stage
}
results["agents_participated"].append(agent_info)
if simulation_complete:
break
# Check performance targets in strict mode
if self.strict_mode:
current_duration = time.perf_counter() - step_start
current_tokens = sum(a.get("tokens_used", 0) for a in results["agents_participated"])
if current_duration > target_completion_time or current_tokens > target_total_tokens:
print(f"\n⚠️ Performance target exceeded - completing simulation")
simulation_complete = True
break
current_time += time_step
return results["final_output"] if simulation_complete else None
async def _process_agent_batch_async(self, agents: List, main_task: LLMTask,
current_time: float) -> List[Dict[str, Any]]:
"""
Process a batch of agents in parallel.
"""
async def process_single_agent(agent):
try:
priority = RequestPriority.HIGH if self.strict_mode else RequestPriority.NORMAL
result = await agent.process_task_with_llm_async(
main_task, current_time, priority
)
pos_info = agent.get_position_info(current_time)
depth = pos_info.get("depth_ratio", 0.0)
print(f" ✓ {agent.agent_id} completed (depth: {depth:.2f}, "
f"confidence: {result.confidence:.2f}, tokens: {result.llm_response.tokens_used})")
return {
"agent": agent,
"llm_result": result,
"success": True
}
except Exception as e:
print(f" ✗ {agent.agent_id} failed: {e}")
return {
"agent": agent,
"llm_result": None,
"success": False,
"error": str(e)
}
results = await asyncio.gather(
*[process_single_agent(agent) for agent in agents],
return_exceptions=True
)
successful_results = [r for r in results if isinstance(r, dict) and r.get("success", False)]
return successful_results
def display_results(self, results: Dict[str, Any]) -> None:
"""Display session results in a readable format."""
print(f"\n{'='*60}")
print(f"SESSION RESULTS")
print(f"{'='*60}")
stats = results["session_stats"]
print(f"Topic: {results['topic']}")
print(f"Duration: {stats['total_duration']:.2f} seconds")
print(f"Simulation Time: {stats['simulation_time']:.2f} units (completed: {stats['simulation_complete']})")
print(f"Agents: {stats['agents_participated']}/{stats['agents_created']} participated")
print(f"Tokens: {stats['total_tokens_used']} total")
print(f"Messages: {stats['total_messages_processed']} processed")
if hasattr(self, 'token_budget_manager'):
budget_status = self.token_budget_manager.get_system_status()
print(f"Token Efficiency: {budget_status['system_efficiency']:.1%} "
f"({budget_status['total_used']}/{budget_status['total_allocated']} allocated)")
print(f"\nAgent Participation Timeline:")
for agent_info in results["agents_participated"]:
print(f" {agent_info['spawn_time']:.2f}s: {agent_info['agent_id']} ({agent_info['agent_type']})")
print(f" Depth: {agent_info['position_info'].get('depth_ratio', 0):.2f}, "
f"Tokens: {agent_info['tokens_used']}")
if results["final_output"]:
print(f"\n{'='*60}")
print(f"FINAL COLONY DESIGN PLAN")
print(f"{'='*60}")
print(results["final_output"]["content"])
print(f"\n[Generated by {results['final_output']['agent_id']} "
f"with confidence {results['final_output']['confidence']:.2f} "
f"at stage {results['final_output']['stage']} "
f"at time t={results['final_output']['timestamp']:.2f}]")
else:
print(f"\nNo final synthesis output was generated.")
def save_results(self, results: Dict[str, Any], output_file: str = None) -> None:
"""Save results to file."""
if output_file is None:
timestamp = int(time.time())
output_file = f"colony_design_session_{timestamp}.json"
import json
with open(output_file, 'w') as f:
serializable_results = {
"topic": results["topic"],
"agents_participated": results["agents_participated"],
"session_stats": results["session_stats"],
"final_output": results["final_output"]
}
json.dump(serializable_results, f, indent=2)
print(f"\nResults saved to: {output_file}")
def main():
"""Main function for colony design demo."""
parser = argparse.ArgumentParser(description="Felix Framework Colony Design Demo")
parser.add_argument("--complexity", choices=["simple", "medium", "complex"],
default="complex", help="Task complexity level")
parser.add_argument("--simulation-time", type=float, default=1.0,
help="Simulation duration (0.0 to 1.0)")
parser.add_argument("--lm-studio-url", default="http://localhost:1234/v1",
help="LM Studio API URL")
parser.add_argument("--save-output", help="Save results to file")
parser.add_argument("--random-seed", type=int,
help="Random seed for reproducibility (omit for truly random)")
parser.add_argument("--strict-mode", action="store_true",
help="Enable strict token budgets for lightweight models")
parser.add_argument("--max-concurrent", type=int, default=4,
help="Maximum concurrent agents (default: 4)")
args = parser.parse_args()
# Create colony designer
designer = FelixColonyDesigner(
lm_studio_url=args.lm_studio_url,
random_seed=args.random_seed,
strict_mode=args.strict_mode,
max_concurrent_agents=args.max_concurrent
)
# Test LM Studio connection
if not designer.test_lm_studio_connection():
print("\nPlease ensure LM Studio is running with a model loaded.")
sys.exit(1)
# Create team and run session
designer.create_colony_design_team(complexity=args.complexity)
results = designer.run_colony_design_session(
simulation_time=args.simulation_time
)
# Display and optionally save results
designer.display_results(results)
if args.save_output:
designer.save_results(results, args.save_output)
if __name__ == "__main__":
main()