Spaces:
Paused
Paused
File size: 23,968 Bytes
fb867c3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 |
#!/usr/bin/env python3
"""
Performance Benchmark: Felix vs Linear Multi-Agent Systems.
This benchmark compares the Felix Framework's geometric orchestration
against traditional linear multi-agent approaches for LLM-powered tasks.
The comparison helps validate whether helix-based coordination provides
measurable advantages over sequential processing for real-world tasks.
Usage:
python examples/benchmark_comparison.py --task "research quantum computing"
python examples/benchmark_comparison.py --task-file tasks.txt --runs 5
Requirements:
- LM Studio running with a model loaded
- openai Python package installed
"""
import sys
import time
import json
import asyncio
import argparse
import statistics
from typing import List, Dict, Any, Optional
from pathlib import Path
from dataclasses import dataclass, asdict
# Add src to path
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from core.helix_geometry import HelixGeometry
from llm.lm_studio_client import LMStudioClient, LMStudioConnectionError
from agents.llm_agent import LLMTask
from agents.specialized_agents import create_specialized_team
from communication.central_post import CentralPost
from communication.spoke import SpokeManager
@dataclass
class BenchmarkResult:
"""Results from a single benchmark run."""
approach: str
task_description: str
run_id: int
total_time: float
total_tokens: int
final_output: str
output_quality_score: float
agent_count: int
communication_messages: int
memory_usage_estimate: float
success: bool
error_message: Optional[str] = None
@dataclass
class BenchmarkSummary:
"""Summary statistics across multiple runs."""
approach: str
task_description: str
run_count: int
avg_total_time: float
avg_tokens: float
avg_quality_score: float
time_std_dev: float
tokens_std_dev: float
quality_std_dev: float
success_rate: float
best_output: str
worst_output: str
class LinearMultiAgentSystem:
"""
Traditional linear multi-agent system for comparison.
Processes tasks sequentially through a pipeline of agents,
representing the traditional approach to multi-agent coordination.
"""
def __init__(self, llm_client: LMStudioClient):
"""Initialize linear system."""
self.llm_client = llm_client
self.central_post = CentralPost(max_agents=10, enable_metrics=True)
def process_task_linear(self, task: LLMTask) -> Dict[str, Any]:
"""
Process task through linear pipeline.
Args:
task: Task to process
Returns:
Processing results
"""
start_time = time.perf_counter()
results = []
# Stage 1: Research (single agent)
try:
research_response = self.llm_client.complete(
agent_id="linear_research",
system_prompt="You are a research agent. Gather comprehensive information about the given topic.",
user_prompt=task.description,
temperature=0.7
)
results.append(("research", research_response))
except Exception as e:
return {"success": False, "error": f"Research stage failed: {e}"}
# Stage 2: Analysis (single agent, uses research results)
try:
analysis_prompt = f"Analyze the following research findings and organize them:\n\n{research_response.content}"
analysis_response = self.llm_client.complete(
agent_id="linear_analysis",
system_prompt="You are an analysis agent. Process and organize information from research.",
user_prompt=analysis_prompt,
temperature=0.5
)
results.append(("analysis", analysis_response))
except Exception as e:
return {"success": False, "error": f"Analysis stage failed: {e}"}
# Stage 3: Synthesis (single agent, uses all previous results)
try:
synthesis_prompt = f"""Create a final comprehensive response based on:
Research: {research_response.content}
Analysis: {analysis_response.content}
Original task: {task.description}"""
synthesis_response = self.llm_client.complete(
agent_id="linear_synthesis",
system_prompt="You are a synthesis agent. Create the final comprehensive output.",
user_prompt=synthesis_prompt,
temperature=0.3
)
results.append(("synthesis", synthesis_response))
except Exception as e:
return {"success": False, "error": f"Synthesis stage failed: {e}"}
end_time = time.perf_counter()
# Calculate metrics
total_tokens = sum(r[1].tokens_used for r in results)
total_time = end_time - start_time
final_output = synthesis_response.content
return {
"success": True,
"total_time": total_time,
"total_tokens": total_tokens,
"final_output": final_output,
"agent_count": 3, # Linear: 1 per stage
"communication_messages": 2, # Research->Analysis, Analysis->Synthesis
"memory_usage_estimate": 100.0, # Simple sequential memory
"stage_results": results
}
class FelixMultiAgentSystem:
"""
Felix geometric orchestration system.
Processes tasks using helix-based agent coordination with
natural convergence and spoke-based communication.
"""
def __init__(self, llm_client: LMStudioClient):
"""Initialize Felix system."""
self.llm_client = llm_client
self.helix = HelixGeometry(
top_radius=33.0,
bottom_radius=0.001,
height=33.0,
turns=33
)
self.central_post = CentralPost(max_agents=20, enable_metrics=True)
self.spoke_manager = SpokeManager(self.central_post)
def process_task_felix(self, task: LLMTask) -> Dict[str, Any]:
"""
Process task using Felix geometric orchestration.
Args:
task: Task to process
Returns:
Processing results
"""
start_time = time.perf_counter()
# Create specialized team
agents = create_specialized_team(
helix=self.helix,
llm_client=self.llm_client,
task_complexity="medium"
)
# Register agents
for agent in agents:
self.spoke_manager.register_agent(agent)
# Run geometric orchestration simulation
current_time = 0.0
time_step = 0.05
simulation_time = 1.0
final_output = None
agent_results = []
while current_time <= simulation_time and not final_output:
# Process agents based on spawn timing
for agent in agents:
if (agent.can_spawn(current_time) and
agent.state.value == "waiting"):
try:
# Spawn and process
agent.spawn(current_time, task)
result = agent.process_task_with_llm(task, current_time)
# Share results with central post via spoke communication
message = agent.share_result_to_central(result)
self.spoke_manager.send_message(agent.agent_id, message)
# Central post will handle distribution through spoke system
agent_results.append(result)
# Check for final synthesis
if agent.agent_type == "synthesis":
if hasattr(agent, 'finalize_output'):
final_output_data = agent.finalize_output(result)
final_output = final_output_data["content"]
else:
final_output = result.content
except Exception as e:
return {"success": False, "error": f"Agent {agent.agent_id} failed: {e}"}
# Update positions and process communication
for agent in agents:
if agent.state.value == "active":
agent.update_position(current_time)
self.spoke_manager.process_all_messages()
current_time += time_step
end_time = time.perf_counter()
# Calculate metrics
total_tokens = sum(r.llm_response.tokens_used for r in agent_results)
total_time = end_time - start_time
if not final_output and agent_results:
# Fallback: use last synthesis result
synthesis_results = [r for r in agent_results if "synthesis" in r.agent_id]
final_output = synthesis_results[-1].content if synthesis_results else agent_results[-1].content
return {
"success": True,
"total_time": total_time,
"total_tokens": total_tokens,
"final_output": final_output or "No final output generated",
"agent_count": len(agents),
"communication_messages": self.central_post.total_messages_processed,
"memory_usage_estimate": len(agents) * 20.0, # Geometric memory overhead
"agent_results": agent_results
}
class BenchmarkRunner:
"""
Main benchmark runner that coordinates comparisons.
Runs both systems on the same tasks and collects performance metrics
for statistical comparison.
"""
def __init__(self, lm_studio_url: str = "http://localhost:1234/v1"):
"""Initialize benchmark runner."""
self.llm_client = LMStudioClient(base_url=lm_studio_url)
self.linear_system = LinearMultiAgentSystem(self.llm_client)
self.felix_system = FelixMultiAgentSystem(self.llm_client)
print("Benchmark Runner initialized")
def test_connection(self) -> bool:
"""Test LM Studio connection."""
try:
if self.llm_client.test_connection():
print("✓ LM Studio connection successful")
return True
else:
print("✗ LM Studio connection failed")
return False
except LMStudioConnectionError as e:
print(f"✗ LM Studio connection error: {e}")
return False
def calculate_quality_score(self, output: str, task_description: str) -> float:
"""
Calculate simple quality score for output.
Args:
output: Generated output
task_description: Original task
Returns:
Quality score (0.0 to 1.0)
"""
# Simple heuristics for quality assessment
length_score = min(len(output) / 1000, 1.0) # Longer is better up to 1000 chars
# Check for task-relevant content
task_words = task_description.lower().split()
output_lower = output.lower()
relevance_score = sum(1 for word in task_words if word in output_lower) / len(task_words)
# Structure score (check for organized content)
structure_indicators = ["introduction", "conclusion", "summary", "analysis", "research"]
structure_score = sum(0.1 for indicator in structure_indicators if indicator in output_lower)
structure_score = min(structure_score, 0.5)
return (length_score + relevance_score + structure_score) / 2.5
def run_single_benchmark(self, task_description: str, approach: str, run_id: int) -> BenchmarkResult:
"""
Run single benchmark for one approach.
Args:
task_description: Task to perform
approach: "linear" or "felix"
run_id: Run identifier
Returns:
Benchmark result
"""
print(f" Running {approach} approach (run {run_id})...")
task = LLMTask(
task_id=f"benchmark_{run_id}",
description=task_description,
context="This is a benchmark comparison task."
)
try:
if approach == "linear":
results = self.linear_system.process_task_linear(task)
else: # felix
results = self.felix_system.process_task_felix(task)
if not results["success"]:
return BenchmarkResult(
approach=approach,
task_description=task_description,
run_id=run_id,
total_time=0.0,
total_tokens=0,
final_output="",
output_quality_score=0.0,
agent_count=0,
communication_messages=0,
memory_usage_estimate=0.0,
success=False,
error_message=results.get("error", "Unknown error")
)
quality_score = self.calculate_quality_score(results["final_output"], task_description)
return BenchmarkResult(
approach=approach,
task_description=task_description,
run_id=run_id,
total_time=results["total_time"],
total_tokens=results["total_tokens"],
final_output=results["final_output"],
output_quality_score=quality_score,
agent_count=results["agent_count"],
communication_messages=results["communication_messages"],
memory_usage_estimate=results["memory_usage_estimate"],
success=True
)
except Exception as e:
return BenchmarkResult(
approach=approach,
task_description=task_description,
run_id=run_id,
total_time=0.0,
total_tokens=0,
final_output="",
output_quality_score=0.0,
agent_count=0,
communication_messages=0,
memory_usage_estimate=0.0,
success=False,
error_message=str(e)
)
def run_benchmark_comparison(self, task_description: str, runs: int = 3) -> Dict[str, Any]:
"""
Run complete benchmark comparison.
Args:
task_description: Task to benchmark
runs: Number of runs per approach
Returns:
Comparison results
"""
print(f"\n{'='*60}")
print(f"BENCHMARK COMPARISON")
print(f"Task: {task_description}")
print(f"Runs per approach: {runs}")
print(f"{'='*60}")
all_results = []
# Run linear approach
print(f"\nRunning Linear Pipeline Approach...")
for run_id in range(runs):
result = self.run_single_benchmark(task_description, "linear", run_id)
all_results.append(result)
if result.success:
print(f" Run {run_id}: {result.total_time:.2f}s, {result.total_tokens} tokens, quality={result.output_quality_score:.2f}")
else:
print(f" Run {run_id}: FAILED - {result.error_message}")
# Run Felix approach
print(f"\nRunning Felix Geometric Orchestration...")
for run_id in range(runs):
result = self.run_single_benchmark(task_description, "felix", run_id)
all_results.append(result)
if result.success:
print(f" Run {run_id}: {result.total_time:.2f}s, {result.total_tokens} tokens, quality={result.output_quality_score:.2f}")
else:
print(f" Run {run_id}: FAILED - {result.error_message}")
# Analyze results
return self.analyze_benchmark_results(all_results, task_description)
def analyze_benchmark_results(self, results: List[BenchmarkResult], task_description: str) -> Dict[str, Any]:
"""Analyze and summarize benchmark results."""
# Separate by approach
linear_results = [r for r in results if r.approach == "linear" and r.success]
felix_results = [r for r in results if r.approach == "felix" and r.success]
# Calculate summaries
summaries = {}
for approach, approach_results in [("linear", linear_results), ("felix", felix_results)]:
if approach_results:
times = [r.total_time for r in approach_results]
tokens = [r.total_tokens for r in approach_results]
qualities = [r.output_quality_score for r in approach_results]
# Find best and worst outputs
best_result = max(approach_results, key=lambda r: r.output_quality_score)
worst_result = min(approach_results, key=lambda r: r.output_quality_score)
summary = BenchmarkSummary(
approach=approach,
task_description=task_description,
run_count=len(approach_results),
avg_total_time=statistics.mean(times),
avg_tokens=statistics.mean(tokens),
avg_quality_score=statistics.mean(qualities),
time_std_dev=statistics.stdev(times) if len(times) > 1 else 0.0,
tokens_std_dev=statistics.stdev(tokens) if len(tokens) > 1 else 0.0,
quality_std_dev=statistics.stdev(qualities) if len(qualities) > 1 else 0.0,
success_rate=len(approach_results) / sum(1 for r in results if r.approach == approach),
best_output=best_result.final_output,
worst_output=worst_result.final_output
)
summaries[approach] = summary
return {
"task_description": task_description,
"raw_results": results,
"summaries": summaries,
"comparison": self._compare_approaches(summaries) if len(summaries) == 2 else None
}
def _compare_approaches(self, summaries: Dict[str, BenchmarkSummary]) -> Dict[str, Any]:
"""Compare the two approaches statistically."""
linear = summaries["linear"]
felix = summaries["felix"]
comparison = {
"time_improvement": ((linear.avg_total_time - felix.avg_total_time) / linear.avg_total_time) * 100,
"token_efficiency": ((linear.avg_tokens - felix.avg_tokens) / linear.avg_tokens) * 100,
"quality_improvement": ((felix.avg_quality_score - linear.avg_quality_score) / linear.avg_quality_score) * 100,
"winner_by_time": "felix" if felix.avg_total_time < linear.avg_total_time else "linear",
"winner_by_quality": "felix" if felix.avg_quality_score > linear.avg_quality_score else "linear",
"winner_by_tokens": "felix" if felix.avg_tokens < linear.avg_tokens else "linear"
}
return comparison
def display_results(self, analysis: Dict[str, Any]) -> None:
"""Display benchmark results."""
print(f"\n{'='*60}")
print(f"BENCHMARK RESULTS")
print(f"{'='*60}")
summaries = analysis["summaries"]
print(f"\nTask: {analysis['task_description']}")
print(f"\nPerformance Summary:")
for approach, summary in summaries.items():
print(f"\n{approach.upper()} APPROACH:")
print(f" Success Rate: {summary.success_rate:.1%}")
print(f" Avg Time: {summary.avg_total_time:.2f}s (±{summary.time_std_dev:.2f})")
print(f" Avg Tokens: {summary.avg_tokens:.0f} (±{summary.tokens_std_dev:.0f})")
print(f" Avg Quality: {summary.avg_quality_score:.3f} (±{summary.quality_std_dev:.3f})")
if analysis["comparison"]:
comp = analysis["comparison"]
print(f"\nCOMPARISON:")
print(f" Time: Felix is {comp['time_improvement']:+.1f}% vs Linear")
print(f" Tokens: Felix uses {comp['token_efficiency']:+.1f}% tokens vs Linear")
print(f" Quality: Felix is {comp['quality_improvement']:+.1f}% quality vs Linear")
print(f" Best Time: {comp['winner_by_time']}")
print(f" Best Quality: {comp['winner_by_quality']}")
print(f" Best Token Efficiency: {comp['winner_by_tokens']}")
# Show best outputs
for approach, summary in summaries.items():
print(f"\n{'='*60}")
print(f"BEST OUTPUT - {approach.upper()}")
print(f"{'='*60}")
print(summary.best_output[:500] + ("..." if len(summary.best_output) > 500 else ""))
def save_results(self, analysis: Dict[str, Any], output_file: str) -> None:
"""Save benchmark results to JSON file."""
# Convert dataclasses to dicts for JSON serialization
serializable_analysis = {
"task_description": analysis["task_description"],
"raw_results": [asdict(r) for r in analysis["raw_results"]],
"summaries": {k: asdict(v) for k, v in analysis["summaries"].items()},
"comparison": analysis["comparison"]
}
with open(output_file, 'w') as f:
json.dump(serializable_analysis, f, indent=2)
print(f"\nResults saved to: {output_file}")
def main():
"""Main function for benchmark comparison."""
parser = argparse.ArgumentParser(description="Felix vs Linear Multi-Agent Benchmark")
parser.add_argument("--task", help="Task description to benchmark")
parser.add_argument("--task-file", help="File containing task descriptions (one per line)")
parser.add_argument("--runs", type=int, default=3, help="Number of runs per approach")
parser.add_argument("--lm-studio-url", default="http://localhost:1234/v1",
help="LM Studio API URL")
parser.add_argument("--output", help="Save results to JSON file")
args = parser.parse_args()
if not args.task and not args.task_file:
parser.error("Must provide either --task or --task-file")
# Create benchmark runner
runner = BenchmarkRunner(lm_studio_url=args.lm_studio_url)
# Test connection
if not runner.test_connection():
print("\nPlease ensure LM Studio is running with a model loaded.")
sys.exit(1)
# Get tasks to benchmark
tasks = []
if args.task:
tasks = [args.task]
elif args.task_file:
try:
with open(args.task_file, 'r') as f:
tasks = [line.strip() for line in f if line.strip()]
except Exception as e:
print(f"Error reading task file: {e}")
sys.exit(1)
# Run benchmarks
all_analyses = []
for i, task in enumerate(tasks):
print(f"\n{'#'*60}")
print(f"BENCHMARK {i+1}/{len(tasks)}")
print(f"{'#'*60}")
analysis = runner.run_benchmark_comparison(task, runs=args.runs)
all_analyses.append(analysis)
runner.display_results(analysis)
if args.output:
output_file = args.output if len(tasks) == 1 else f"{args.output}_{i+1}.json"
runner.save_results(analysis, output_file)
print(f"\nBenchmark comparison completed!")
if __name__ == "__main__":
main() |