#!/usr/bin/env python3 """ Performance Benchmark: Felix vs Linear Multi-Agent Systems. This benchmark compares the Felix Framework's geometric orchestration against traditional linear multi-agent approaches for LLM-powered tasks. The comparison helps validate whether helix-based coordination provides measurable advantages over sequential processing for real-world tasks. Usage: python examples/benchmark_comparison.py --task "research quantum computing" python examples/benchmark_comparison.py --task-file tasks.txt --runs 5 Requirements: - LM Studio running with a model loaded - openai Python package installed """ import sys import time import json import asyncio import argparse import statistics from typing import List, Dict, Any, Optional from pathlib import Path from dataclasses import dataclass, asdict # Add src to path sys.path.insert(0, str(Path(__file__).parent.parent / "src")) from core.helix_geometry import HelixGeometry from llm.lm_studio_client import LMStudioClient, LMStudioConnectionError from agents.llm_agent import LLMTask from agents.specialized_agents import create_specialized_team from communication.central_post import CentralPost from communication.spoke import SpokeManager @dataclass class BenchmarkResult: """Results from a single benchmark run.""" approach: str task_description: str run_id: int total_time: float total_tokens: int final_output: str output_quality_score: float agent_count: int communication_messages: int memory_usage_estimate: float success: bool error_message: Optional[str] = None @dataclass class BenchmarkSummary: """Summary statistics across multiple runs.""" approach: str task_description: str run_count: int avg_total_time: float avg_tokens: float avg_quality_score: float time_std_dev: float tokens_std_dev: float quality_std_dev: float success_rate: float best_output: str worst_output: str class LinearMultiAgentSystem: """ Traditional linear multi-agent system for comparison. Processes tasks sequentially through a pipeline of agents, representing the traditional approach to multi-agent coordination. """ def __init__(self, llm_client: LMStudioClient): """Initialize linear system.""" self.llm_client = llm_client self.central_post = CentralPost(max_agents=10, enable_metrics=True) def process_task_linear(self, task: LLMTask) -> Dict[str, Any]: """ Process task through linear pipeline. Args: task: Task to process Returns: Processing results """ start_time = time.perf_counter() results = [] # Stage 1: Research (single agent) try: research_response = self.llm_client.complete( agent_id="linear_research", system_prompt="You are a research agent. Gather comprehensive information about the given topic.", user_prompt=task.description, temperature=0.7 ) results.append(("research", research_response)) except Exception as e: return {"success": False, "error": f"Research stage failed: {e}"} # Stage 2: Analysis (single agent, uses research results) try: analysis_prompt = f"Analyze the following research findings and organize them:\n\n{research_response.content}" analysis_response = self.llm_client.complete( agent_id="linear_analysis", system_prompt="You are an analysis agent. Process and organize information from research.", user_prompt=analysis_prompt, temperature=0.5 ) results.append(("analysis", analysis_response)) except Exception as e: return {"success": False, "error": f"Analysis stage failed: {e}"} # Stage 3: Synthesis (single agent, uses all previous results) try: synthesis_prompt = f"""Create a final comprehensive response based on: Research: {research_response.content} Analysis: {analysis_response.content} Original task: {task.description}""" synthesis_response = self.llm_client.complete( agent_id="linear_synthesis", system_prompt="You are a synthesis agent. Create the final comprehensive output.", user_prompt=synthesis_prompt, temperature=0.3 ) results.append(("synthesis", synthesis_response)) except Exception as e: return {"success": False, "error": f"Synthesis stage failed: {e}"} end_time = time.perf_counter() # Calculate metrics total_tokens = sum(r[1].tokens_used for r in results) total_time = end_time - start_time final_output = synthesis_response.content return { "success": True, "total_time": total_time, "total_tokens": total_tokens, "final_output": final_output, "agent_count": 3, # Linear: 1 per stage "communication_messages": 2, # Research->Analysis, Analysis->Synthesis "memory_usage_estimate": 100.0, # Simple sequential memory "stage_results": results } class FelixMultiAgentSystem: """ Felix geometric orchestration system. Processes tasks using helix-based agent coordination with natural convergence and spoke-based communication. """ def __init__(self, llm_client: LMStudioClient): """Initialize Felix system.""" self.llm_client = llm_client self.helix = HelixGeometry( top_radius=33.0, bottom_radius=0.001, height=33.0, turns=33 ) self.central_post = CentralPost(max_agents=20, enable_metrics=True) self.spoke_manager = SpokeManager(self.central_post) def process_task_felix(self, task: LLMTask) -> Dict[str, Any]: """ Process task using Felix geometric orchestration. Args: task: Task to process Returns: Processing results """ start_time = time.perf_counter() # Create specialized team agents = create_specialized_team( helix=self.helix, llm_client=self.llm_client, task_complexity="medium" ) # Register agents for agent in agents: self.spoke_manager.register_agent(agent) # Run geometric orchestration simulation current_time = 0.0 time_step = 0.05 simulation_time = 1.0 final_output = None agent_results = [] while current_time <= simulation_time and not final_output: # Process agents based on spawn timing for agent in agents: if (agent.can_spawn(current_time) and agent.state.value == "waiting"): try: # Spawn and process agent.spawn(current_time, task) result = agent.process_task_with_llm(task, current_time) # Share results with central post via spoke communication message = agent.share_result_to_central(result) self.spoke_manager.send_message(agent.agent_id, message) # Central post will handle distribution through spoke system agent_results.append(result) # Check for final synthesis if agent.agent_type == "synthesis": if hasattr(agent, 'finalize_output'): final_output_data = agent.finalize_output(result) final_output = final_output_data["content"] else: final_output = result.content except Exception as e: return {"success": False, "error": f"Agent {agent.agent_id} failed: {e}"} # Update positions and process communication for agent in agents: if agent.state.value == "active": agent.update_position(current_time) self.spoke_manager.process_all_messages() current_time += time_step end_time = time.perf_counter() # Calculate metrics total_tokens = sum(r.llm_response.tokens_used for r in agent_results) total_time = end_time - start_time if not final_output and agent_results: # Fallback: use last synthesis result synthesis_results = [r for r in agent_results if "synthesis" in r.agent_id] final_output = synthesis_results[-1].content if synthesis_results else agent_results[-1].content return { "success": True, "total_time": total_time, "total_tokens": total_tokens, "final_output": final_output or "No final output generated", "agent_count": len(agents), "communication_messages": self.central_post.total_messages_processed, "memory_usage_estimate": len(agents) * 20.0, # Geometric memory overhead "agent_results": agent_results } class BenchmarkRunner: """ Main benchmark runner that coordinates comparisons. Runs both systems on the same tasks and collects performance metrics for statistical comparison. """ def __init__(self, lm_studio_url: str = "http://localhost:1234/v1"): """Initialize benchmark runner.""" self.llm_client = LMStudioClient(base_url=lm_studio_url) self.linear_system = LinearMultiAgentSystem(self.llm_client) self.felix_system = FelixMultiAgentSystem(self.llm_client) print("Benchmark Runner initialized") def test_connection(self) -> bool: """Test LM Studio connection.""" try: if self.llm_client.test_connection(): print("✓ LM Studio connection successful") return True else: print("✗ LM Studio connection failed") return False except LMStudioConnectionError as e: print(f"✗ LM Studio connection error: {e}") return False def calculate_quality_score(self, output: str, task_description: str) -> float: """ Calculate simple quality score for output. Args: output: Generated output task_description: Original task Returns: Quality score (0.0 to 1.0) """ # Simple heuristics for quality assessment length_score = min(len(output) / 1000, 1.0) # Longer is better up to 1000 chars # Check for task-relevant content task_words = task_description.lower().split() output_lower = output.lower() relevance_score = sum(1 for word in task_words if word in output_lower) / len(task_words) # Structure score (check for organized content) structure_indicators = ["introduction", "conclusion", "summary", "analysis", "research"] structure_score = sum(0.1 for indicator in structure_indicators if indicator in output_lower) structure_score = min(structure_score, 0.5) return (length_score + relevance_score + structure_score) / 2.5 def run_single_benchmark(self, task_description: str, approach: str, run_id: int) -> BenchmarkResult: """ Run single benchmark for one approach. Args: task_description: Task to perform approach: "linear" or "felix" run_id: Run identifier Returns: Benchmark result """ print(f" Running {approach} approach (run {run_id})...") task = LLMTask( task_id=f"benchmark_{run_id}", description=task_description, context="This is a benchmark comparison task." ) try: if approach == "linear": results = self.linear_system.process_task_linear(task) else: # felix results = self.felix_system.process_task_felix(task) if not results["success"]: return BenchmarkResult( approach=approach, task_description=task_description, run_id=run_id, total_time=0.0, total_tokens=0, final_output="", output_quality_score=0.0, agent_count=0, communication_messages=0, memory_usage_estimate=0.0, success=False, error_message=results.get("error", "Unknown error") ) quality_score = self.calculate_quality_score(results["final_output"], task_description) return BenchmarkResult( approach=approach, task_description=task_description, run_id=run_id, total_time=results["total_time"], total_tokens=results["total_tokens"], final_output=results["final_output"], output_quality_score=quality_score, agent_count=results["agent_count"], communication_messages=results["communication_messages"], memory_usage_estimate=results["memory_usage_estimate"], success=True ) except Exception as e: return BenchmarkResult( approach=approach, task_description=task_description, run_id=run_id, total_time=0.0, total_tokens=0, final_output="", output_quality_score=0.0, agent_count=0, communication_messages=0, memory_usage_estimate=0.0, success=False, error_message=str(e) ) def run_benchmark_comparison(self, task_description: str, runs: int = 3) -> Dict[str, Any]: """ Run complete benchmark comparison. Args: task_description: Task to benchmark runs: Number of runs per approach Returns: Comparison results """ print(f"\n{'='*60}") print(f"BENCHMARK COMPARISON") print(f"Task: {task_description}") print(f"Runs per approach: {runs}") print(f"{'='*60}") all_results = [] # Run linear approach print(f"\nRunning Linear Pipeline Approach...") for run_id in range(runs): result = self.run_single_benchmark(task_description, "linear", run_id) all_results.append(result) if result.success: print(f" Run {run_id}: {result.total_time:.2f}s, {result.total_tokens} tokens, quality={result.output_quality_score:.2f}") else: print(f" Run {run_id}: FAILED - {result.error_message}") # Run Felix approach print(f"\nRunning Felix Geometric Orchestration...") for run_id in range(runs): result = self.run_single_benchmark(task_description, "felix", run_id) all_results.append(result) if result.success: print(f" Run {run_id}: {result.total_time:.2f}s, {result.total_tokens} tokens, quality={result.output_quality_score:.2f}") else: print(f" Run {run_id}: FAILED - {result.error_message}") # Analyze results return self.analyze_benchmark_results(all_results, task_description) def analyze_benchmark_results(self, results: List[BenchmarkResult], task_description: str) -> Dict[str, Any]: """Analyze and summarize benchmark results.""" # Separate by approach linear_results = [r for r in results if r.approach == "linear" and r.success] felix_results = [r for r in results if r.approach == "felix" and r.success] # Calculate summaries summaries = {} for approach, approach_results in [("linear", linear_results), ("felix", felix_results)]: if approach_results: times = [r.total_time for r in approach_results] tokens = [r.total_tokens for r in approach_results] qualities = [r.output_quality_score for r in approach_results] # Find best and worst outputs best_result = max(approach_results, key=lambda r: r.output_quality_score) worst_result = min(approach_results, key=lambda r: r.output_quality_score) summary = BenchmarkSummary( approach=approach, task_description=task_description, run_count=len(approach_results), avg_total_time=statistics.mean(times), avg_tokens=statistics.mean(tokens), avg_quality_score=statistics.mean(qualities), time_std_dev=statistics.stdev(times) if len(times) > 1 else 0.0, tokens_std_dev=statistics.stdev(tokens) if len(tokens) > 1 else 0.0, quality_std_dev=statistics.stdev(qualities) if len(qualities) > 1 else 0.0, success_rate=len(approach_results) / sum(1 for r in results if r.approach == approach), best_output=best_result.final_output, worst_output=worst_result.final_output ) summaries[approach] = summary return { "task_description": task_description, "raw_results": results, "summaries": summaries, "comparison": self._compare_approaches(summaries) if len(summaries) == 2 else None } def _compare_approaches(self, summaries: Dict[str, BenchmarkSummary]) -> Dict[str, Any]: """Compare the two approaches statistically.""" linear = summaries["linear"] felix = summaries["felix"] comparison = { "time_improvement": ((linear.avg_total_time - felix.avg_total_time) / linear.avg_total_time) * 100, "token_efficiency": ((linear.avg_tokens - felix.avg_tokens) / linear.avg_tokens) * 100, "quality_improvement": ((felix.avg_quality_score - linear.avg_quality_score) / linear.avg_quality_score) * 100, "winner_by_time": "felix" if felix.avg_total_time < linear.avg_total_time else "linear", "winner_by_quality": "felix" if felix.avg_quality_score > linear.avg_quality_score else "linear", "winner_by_tokens": "felix" if felix.avg_tokens < linear.avg_tokens else "linear" } return comparison def display_results(self, analysis: Dict[str, Any]) -> None: """Display benchmark results.""" print(f"\n{'='*60}") print(f"BENCHMARK RESULTS") print(f"{'='*60}") summaries = analysis["summaries"] print(f"\nTask: {analysis['task_description']}") print(f"\nPerformance Summary:") for approach, summary in summaries.items(): print(f"\n{approach.upper()} APPROACH:") print(f" Success Rate: {summary.success_rate:.1%}") print(f" Avg Time: {summary.avg_total_time:.2f}s (±{summary.time_std_dev:.2f})") print(f" Avg Tokens: {summary.avg_tokens:.0f} (±{summary.tokens_std_dev:.0f})") print(f" Avg Quality: {summary.avg_quality_score:.3f} (±{summary.quality_std_dev:.3f})") if analysis["comparison"]: comp = analysis["comparison"] print(f"\nCOMPARISON:") print(f" Time: Felix is {comp['time_improvement']:+.1f}% vs Linear") print(f" Tokens: Felix uses {comp['token_efficiency']:+.1f}% tokens vs Linear") print(f" Quality: Felix is {comp['quality_improvement']:+.1f}% quality vs Linear") print(f" Best Time: {comp['winner_by_time']}") print(f" Best Quality: {comp['winner_by_quality']}") print(f" Best Token Efficiency: {comp['winner_by_tokens']}") # Show best outputs for approach, summary in summaries.items(): print(f"\n{'='*60}") print(f"BEST OUTPUT - {approach.upper()}") print(f"{'='*60}") print(summary.best_output[:500] + ("..." if len(summary.best_output) > 500 else "")) def save_results(self, analysis: Dict[str, Any], output_file: str) -> None: """Save benchmark results to JSON file.""" # Convert dataclasses to dicts for JSON serialization serializable_analysis = { "task_description": analysis["task_description"], "raw_results": [asdict(r) for r in analysis["raw_results"]], "summaries": {k: asdict(v) for k, v in analysis["summaries"].items()}, "comparison": analysis["comparison"] } with open(output_file, 'w') as f: json.dump(serializable_analysis, f, indent=2) print(f"\nResults saved to: {output_file}") def main(): """Main function for benchmark comparison.""" parser = argparse.ArgumentParser(description="Felix vs Linear Multi-Agent Benchmark") parser.add_argument("--task", help="Task description to benchmark") parser.add_argument("--task-file", help="File containing task descriptions (one per line)") parser.add_argument("--runs", type=int, default=3, help="Number of runs per approach") parser.add_argument("--lm-studio-url", default="http://localhost:1234/v1", help="LM Studio API URL") parser.add_argument("--output", help="Save results to JSON file") args = parser.parse_args() if not args.task and not args.task_file: parser.error("Must provide either --task or --task-file") # Create benchmark runner runner = BenchmarkRunner(lm_studio_url=args.lm_studio_url) # Test connection if not runner.test_connection(): print("\nPlease ensure LM Studio is running with a model loaded.") sys.exit(1) # Get tasks to benchmark tasks = [] if args.task: tasks = [args.task] elif args.task_file: try: with open(args.task_file, 'r') as f: tasks = [line.strip() for line in f if line.strip()] except Exception as e: print(f"Error reading task file: {e}") sys.exit(1) # Run benchmarks all_analyses = [] for i, task in enumerate(tasks): print(f"\n{'#'*60}") print(f"BENCHMARK {i+1}/{len(tasks)}") print(f"{'#'*60}") analysis = runner.run_benchmark_comparison(task, runs=args.runs) all_analyses.append(analysis) runner.display_results(analysis) if args.output: output_file = args.output if len(tasks) == 1 else f"{args.output}_{i+1}.json" runner.save_results(analysis, output_file) print(f"\nBenchmark comparison completed!") if __name__ == "__main__": main()