Spaces:
Running
Running
| """ | |
| Benchmark: gMAS vs LangGraph | |
| ============================= | |
| Runs equivalent workflows on both frameworks and compares | |
| execution time and token usage: | |
| Test 1 β Single agent (Math Solver) | |
| Test 2 β Chain of 3 (Analyzer β Solver β Formatter) | |
| Test 3 β Fan-in (2β1) (Algebraic + Vieta β Aggregator) | |
| Test 4 β Chain of 7 (Parser β Classifier β Strategist β Solver β Verifier β Interpreter β Reporter) | |
| Test 5 β Fan-out (1β3β1) (Planner β [Algebraic, Graphical, Numerical] β Synthesizer) | |
| Test 6 β Chain of 15 (Reader β Extractor β Categorizer β Assumptions β Planner β Modeler β | |
| Simplifier β Solver β UnitChecker β VerifierA β VerifierB β | |
| Interpreter β EdgeCases β Reviewer β Reporter) | |
| gMAS uses the modern ``structured_llm_caller`` API so the LLM receives | |
| proper ``system`` / ``user`` roles natively (no flat-string hacks). | |
| For your own projects prefer ``MACPRunner(structured_llm_caller=...)`` | |
| over the legacy ``llm_caller(str)->str`` interface β see the | |
| ``MACPRunner`` docstring for a full example. | |
| Configure your LLM via environment variables: | |
| LLM_API_KEY β API key for your LLM provider | |
| LLM_BASE_URL β Base URL (e.g. http://localhost:8000/v1) | |
| LLM_MODEL β Model name (e.g. gpt-4o-mini) | |
| Run (single pass): | |
| python -m examples.benchmark_vs_langgraph | |
| Run N passes and average the results: | |
| python -m examples.benchmark_vs_langgraph --runs 10 | |
| """ | |
| import argparse | |
| import asyncio | |
| import json | |
| import os | |
| import sys | |
| import time | |
| from datetime import UTC, datetime | |
| from pathlib import Path | |
| from unicodedata import normalize | |
| from openai import AsyncOpenAI, OpenAI | |
| from builder import BuilderConfig, GraphBuilder | |
| from execution import MACPRunner, RunnerConfig, StreamEventType | |
| # ββ Helpers βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _safe(text: str) -> str: | |
| """Sanitize text for Windows cp1252 console β replace non-ASCII chars.""" | |
| return normalize("NFKC", text).encode("ascii", errors="replace").decode("ascii") | |
| def _header(title: str) -> None: | |
| bar = "β" * 64 | |
| print(f"\n{bar}") | |
| print(f" {title}") | |
| print(bar) | |
| def _validate_llm_config() -> tuple[str, str, str]: | |
| """Read LLM credentials from env vars. Exit with a clear message if missing.""" | |
| api_key = os.getenv("LLM_API_KEY") | |
| base_url = os.getenv("LLM_BASE_URL") | |
| model = os.getenv("LLM_MODEL") | |
| missing = [v for v, val in [("LLM_API_KEY", api_key), ("LLM_BASE_URL", base_url), ("LLM_MODEL", model)] if not val] | |
| if missing: | |
| print("ERROR: Required environment variables are not set.", file=sys.stderr) | |
| print(f" Missing: {', '.join(missing)}", file=sys.stderr) | |
| print(file=sys.stderr) | |
| print("Set them before running:", file=sys.stderr) | |
| print(" export LLM_API_KEY=your-api-key", file=sys.stderr) | |
| print(" export LLM_BASE_URL=http://localhost:8000/v1", file=sys.stderr) | |
| print(" export LLM_MODEL=gpt-4o-mini", file=sys.stderr) | |
| sys.exit(1) | |
| return api_key, base_url, model # type: ignore[return-value] | |
| # ββ Token-tracking LLM wrapper ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class TrackedLLM: | |
| """ | |
| OpenAI-compatible client that tracks token usage across calls. | |
| Used by both gMAS and LangGraph sides of the benchmark so the | |
| comparison is perfectly fair (same HTTP client, same model). | |
| For **your own projects** you don't need this class β use the | |
| built-in factory helpers instead:: | |
| from execution import ( | |
| create_openai_structured_caller, | |
| create_openai_async_structured_caller, | |
| ) | |
| runner = MACPRunner( | |
| structured_llm_caller=create_openai_structured_caller(...), | |
| async_structured_llm_caller=create_openai_async_structured_caller(...), | |
| ) | |
| """ | |
| def __init__(self, api_key: str, base_url: str, model: str) -> None: | |
| self._client = OpenAI(api_key=api_key, base_url=base_url) | |
| self._async_client = AsyncOpenAI(api_key=api_key, base_url=base_url) | |
| self._model = model | |
| self.total_tokens: int = 0 | |
| self.call_count: int = 0 | |
| # ------------------------------------------------------------------ | |
| def reset(self) -> None: | |
| self.total_tokens = 0 | |
| self.call_count = 0 | |
| # ------------------------------------------------------------------ | |
| def chat(self, system: str, user: str, max_tokens: int = 1024) -> str: | |
| """Call the LLM with explicit system + user roles.""" | |
| messages = [] | |
| if system: | |
| messages.append({"role": "system", "content": system}) | |
| messages.append({"role": "user", "content": user}) | |
| response = self._client.chat.completions.create( | |
| model=self._model, | |
| messages=messages, | |
| temperature=0.7, | |
| max_tokens=max_tokens, | |
| ) | |
| text = response.choices[0].message.content or "" | |
| self.total_tokens += response.usage.total_tokens if response.usage else 0 | |
| self.call_count += 1 | |
| return text | |
| # ------------------------------------------------------------------ | |
| async def achat(self, system: str, user: str, max_tokens: int = 1024) -> str: | |
| """Async version of chat() β used by gMAS parallel execution via astream().""" | |
| messages = [] | |
| if system: | |
| messages.append({"role": "system", "content": system}) | |
| messages.append({"role": "user", "content": user}) | |
| response = await self._async_client.chat.completions.create( | |
| model=self._model, | |
| messages=messages, | |
| temperature=0.7, | |
| max_tokens=max_tokens, | |
| ) | |
| text = response.choices[0].message.content or "" | |
| self.total_tokens += response.usage.total_tokens if response.usage else 0 | |
| self.call_count += 1 | |
| return text | |
| # ------------------------------------------------------------------ | |
| # gMAS caller factories (with token tracking) | |
| # | |
| # These mirror the framework's ``create_openai_structured_caller`` / | |
| # ``create_openai_async_structured_caller`` but add per-call token | |
| # and call-count tracking needed for benchmarking. | |
| # ------------------------------------------------------------------ | |
| def structured_caller(self, max_tokens: int = 1024): | |
| """Return a ``structured_llm_caller`` for MACPRunner (sync, tracked).""" | |
| def _caller(messages: list[dict[str, str]]) -> str: | |
| response = self._client.chat.completions.create( | |
| model=self._model, | |
| messages=messages, | |
| temperature=0.7, | |
| max_tokens=max_tokens, | |
| ) | |
| text = response.choices[0].message.content or "" | |
| self.total_tokens += response.usage.total_tokens if response.usage else 0 | |
| self.call_count += 1 | |
| return text | |
| return _caller | |
| def async_structured_caller(self, max_tokens: int = 1024): | |
| """Return an ``async_structured_llm_caller`` for MACPRunner (async, tracked).""" | |
| async def _caller(messages: list[dict[str, str]]) -> str: | |
| response = await self._async_client.chat.completions.create( | |
| model=self._model, | |
| messages=messages, | |
| temperature=0.7, | |
| max_tokens=max_tokens, | |
| ) | |
| text = response.choices[0].message.content or "" | |
| self.total_tokens += response.usage.total_tokens if response.usage else 0 | |
| self.call_count += 1 | |
| return text | |
| return _caller | |
| # ββ gMAS graph builder helper βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _run_gmas( | |
| llm: TrackedLLM, | |
| build_fn, | |
| final_agent_id: str, | |
| max_tokens: int = 1024, | |
| *, | |
| parallel: bool = False, | |
| ) -> dict: | |
| """ | |
| Build graph, execute via MACPRunner, return result dict. | |
| For **sequential** topologies (single agent, chains) uses synchronous | |
| ``stream()`` β zero async overhead, minimal latency. | |
| For **parallel** topologies (fan-in, fan-out) uses ``astream()`` with | |
| ``enable_parallel=True`` so independent agents run concurrently via | |
| ``asyncio.gather`` β matching LangGraph's parallel branch semantics. | |
| Neither mode uses ``adaptive=True`` (that is for conditional edges, | |
| pruning, and fallbacks β not relevant to this benchmark). | |
| Args: | |
| llm: Tracked LLM client used for all agent calls. | |
| build_fn: Zero-argument callable that constructs and returns | |
| the :class:`~graph.AgentGraph` for this test. | |
| final_agent_id: ID of the agent whose output is the final answer. | |
| max_tokens: Per-agent output token limit (must match LangGraph side). | |
| parallel: When ``True``, use async execution with parallel agent | |
| groups. Set for fan-in / fan-out topologies. | |
| """ | |
| llm.reset() | |
| t0 = time.perf_counter() | |
| graph = build_fn() | |
| if parallel: | |
| # Async mode with parallel groups β for fan-in / fan-out topologies. | |
| runner = MACPRunner( | |
| structured_llm_caller=llm.structured_caller(max_tokens=max_tokens), | |
| async_structured_llm_caller=llm.async_structured_caller(max_tokens=max_tokens), | |
| config=RunnerConfig( | |
| timeout=180.0, | |
| adaptive=False, | |
| enable_parallel=True, | |
| update_states=True, | |
| broadcast_task_to_all=False, | |
| ), | |
| ) | |
| async def _run() -> str: | |
| output = "" | |
| async for event in runner.astream(graph, final_agent_id=final_agent_id): | |
| if ( | |
| event.event_type == StreamEventType.AGENT_OUTPUT | |
| and getattr(event, "agent_id", "") == final_agent_id | |
| ): | |
| output = getattr(event, "content", "") | |
| return output | |
| output = asyncio.run(_run()) | |
| else: | |
| # Synchronous mode β for single agent and chains. | |
| runner = MACPRunner( | |
| structured_llm_caller=llm.structured_caller(max_tokens=max_tokens), | |
| config=RunnerConfig( | |
| timeout=180.0, | |
| adaptive=False, | |
| update_states=True, | |
| broadcast_task_to_all=False, | |
| ), | |
| ) | |
| output = "" | |
| for event in runner.stream(graph, final_agent_id=final_agent_id): | |
| if event.event_type == StreamEventType.AGENT_OUTPUT and getattr(event, "agent_id", "") == final_agent_id: | |
| output = getattr(event, "content", "") | |
| elapsed = time.perf_counter() - t0 | |
| return { | |
| "framework": "gMAS", | |
| "time": elapsed, | |
| "tokens": llm.total_tokens, | |
| "calls": llm.call_count, | |
| "output": output, | |
| } | |
| # ββ Test 1: Single agent ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _build_single_gmas(llm: TrackedLLM, problem: str): | |
| def _build(): | |
| builder = GraphBuilder(BuilderConfig(include_task_node=True, validate=True)) | |
| builder.add_task(query=problem, description="Math problem") | |
| builder.add_agent( | |
| agent_id="solver", | |
| display_name="Math Solver", | |
| # persona is prefixed with "You are " by the framework, | |
| # so we pass just the role β same semantic as the LangGraph side. | |
| persona="a mathematician who solves problems step by step", | |
| description="Solve the equation and state both roots clearly.", | |
| ) | |
| builder.connect_task_to_agents(agent_ids=["solver"], bidirectional=False) | |
| return builder.build() | |
| return _run_gmas(llm, _build, "solver") | |
| def _build_single_langgraph(llm: TrackedLLM, problem: str) -> dict: | |
| try: | |
| from typing import Any, TypedDict, cast | |
| from langgraph.graph import StateGraph | |
| except ImportError: | |
| return {"framework": "langgraph", "error": "langgraph not installed β pip install langgraph"} | |
| class State(TypedDict): | |
| input: str | |
| output: str | |
| def solver_node(state: State) -> dict: | |
| return { | |
| "output": llm.chat( | |
| "You are a mathematician. Solve problems step by step.", | |
| state["input"], | |
| ) | |
| } | |
| llm.reset() | |
| t0 = time.perf_counter() | |
| g = StateGraph(cast("Any", State)) | |
| g.add_node("solver", solver_node) | |
| g.set_entry_point("solver") | |
| g.set_finish_point("solver") | |
| result = g.compile().invoke({"input": problem, "output": ""}) | |
| return { | |
| "framework": "langgraph", | |
| "time": time.perf_counter() - t0, | |
| "tokens": llm.total_tokens, | |
| "calls": llm.call_count, | |
| "output": result["output"], | |
| } | |
| # ββ Test 2: Chain of 3 agents βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _build_chain_gmas(llm: TrackedLLM, problem: str) -> dict: | |
| def _build(): | |
| builder = GraphBuilder(BuilderConfig(include_task_node=True, validate=True)) | |
| builder.add_task(query=problem, description="Math problem") | |
| builder.add_agent("analyzer", "Analyzer", "an analyst", "Identify the equation type and best solution method.") | |
| builder.add_agent("solver", "Solver", "a mathematician", "Solve the problem step by step.") | |
| builder.add_agent("formatter", "Formatter", "a presenter", "Format the final answer clearly and concisely.") | |
| builder.connect_task_to_agents(agent_ids=["analyzer"], bidirectional=False) | |
| builder.add_workflow_edge("analyzer", "solver") | |
| builder.add_workflow_edge("solver", "formatter") | |
| return builder.build() | |
| return _run_gmas(llm, _build, "formatter") | |
| def _build_chain_langgraph(llm: TrackedLLM, problem: str) -> dict: | |
| try: | |
| from typing import Any, TypedDict, cast | |
| from langgraph.graph import StateGraph | |
| except ImportError: | |
| return {"framework": "langgraph", "error": "langgraph not installed β pip install langgraph"} | |
| class State(TypedDict): | |
| input: str | |
| step1: str | |
| step2: str | |
| output: str | |
| def analyzer(s: State) -> dict: | |
| return {"step1": llm.chat("Identify the equation type and best solution method.", f"Problem: {s['input']}")} | |
| def solver(s: State) -> dict: | |
| return {"step2": llm.chat("Solve step by step.", f"Problem: {s['input']}\nAnalysis: {s['step1']}")} | |
| def formatter(s: State) -> dict: | |
| return {"output": llm.chat("Format the answer clearly.", f"Solution: {s['step2']}")} | |
| llm.reset() | |
| t0 = time.perf_counter() | |
| g = StateGraph(cast("Any", State)) | |
| for name, fn in [("analyzer", analyzer), ("solver", solver), ("formatter", formatter)]: | |
| g.add_node(name, fn) | |
| g.add_edge("analyzer", "solver") | |
| g.add_edge("solver", "formatter") | |
| g.set_entry_point("analyzer") | |
| g.set_finish_point("formatter") | |
| result = g.compile().invoke({"input": problem, "step1": "", "step2": "", "output": ""}) | |
| return { | |
| "framework": "langgraph", | |
| "time": time.perf_counter() - t0, | |
| "tokens": llm.total_tokens, | |
| "calls": llm.call_count, | |
| "output": result["output"], | |
| } | |
| # ββ Test 3: Fan-in (2β1) ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # | |
| # Topology: [Algebraic, Vieta] β Aggregator | |
| # | |
| # Both solvers receive only the task input (not each other's output). | |
| # gMAS uses astream() with enable_parallel=True (adaptive=False), | |
| # so [algebraic, vieta] are executed concurrently via asyncio.gather β | |
| # matching LangGraph's START β [algebraic, vieta] β aggregator branches. | |
| # Fair data-flow comparison. | |
| def _build_fanin_gmas(llm: TrackedLLM, problem: str) -> dict: | |
| def _build(): | |
| builder = GraphBuilder(BuilderConfig(include_task_node=True, validate=True)) | |
| builder.add_task(query=problem, description="Math problem") | |
| builder.add_agent( | |
| "algebraic", "Algebraic Solver", "an algebraist", "Solve using the discriminant formula (D = bΒ²-4ac)." | |
| ) | |
| builder.add_agent( | |
| "vieta", "Vieta Solver", "a Vieta specialist", "Solve using Vieta's formulas (sum and product of roots)." | |
| ) | |
| builder.add_agent( | |
| "aggregator", "Aggregator", "a verifier", "Compare both solutions and provide the definitive answer." | |
| ) | |
| builder.connect_task_to_agents(agent_ids=["algebraic", "vieta"], bidirectional=False) | |
| builder.add_workflow_edge("algebraic", "aggregator") | |
| builder.add_workflow_edge("vieta", "aggregator") | |
| return builder.build() | |
| return _run_gmas(llm, _build, "aggregator", parallel=True) | |
| def _build_fanin_langgraph(llm: TrackedLLM, problem: str) -> dict: | |
| try: | |
| from typing import Any, TypedDict, cast | |
| from langgraph.graph import END, START, StateGraph | |
| except ImportError: | |
| return {"framework": "langgraph", "error": "langgraph not installed β pip install langgraph"} | |
| class State(TypedDict): | |
| input: str | |
| method1: str | |
| method2: str | |
| output: str | |
| def algebraic(s: State) -> dict: | |
| return {"method1": llm.chat("Solve using the discriminant formula (D = bΒ²-4ac).", f"Problem: {s['input']}")} | |
| def vieta(s: State) -> dict: | |
| return {"method2": llm.chat("Solve using Vieta's formulas.", f"Problem: {s['input']}")} | |
| def aggregator(s: State) -> dict: | |
| return { | |
| "output": llm.chat( | |
| "Compare both solutions and provide the definitive answer.", | |
| f"Method 1 (discriminant):\n{s['method1']}\n\nMethod 2 (Vieta):\n{s['method2']}", | |
| ) | |
| } | |
| llm.reset() | |
| t0 = time.perf_counter() | |
| g = StateGraph(cast("Any", State)) | |
| g.add_node("algebraic", algebraic) | |
| g.add_node("vieta", vieta) | |
| g.add_node("aggregator", aggregator) | |
| # Fan-in: both solvers start from START (parallel branches), both feed aggregator. | |
| # Each solver receives only the input β they don't see each other's output. | |
| # This matches the gMAS topology: task β [algebraic, vieta] β aggregator. | |
| g.add_edge(START, "algebraic") | |
| g.add_edge(START, "vieta") | |
| g.add_edge("algebraic", "aggregator") | |
| g.add_edge("vieta", "aggregator") | |
| g.add_edge("aggregator", END) | |
| result = g.compile().invoke({"input": problem, "method1": "", "method2": "", "output": ""}) | |
| return { | |
| "framework": "langgraph", | |
| "time": time.perf_counter() - t0, | |
| "tokens": llm.total_tokens, | |
| "calls": llm.call_count, | |
| "output": result["output"], | |
| } | |
| # ββ Test 4: Chain of 7 agents βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # parser β classifier β strategist β solver β verifier β interpreter β reporter | |
| _CHAIN7_AGENTS = [ | |
| ( | |
| "parser", | |
| "Parser", | |
| "a problem parser", | |
| "Restate the math problem clearly, identify all given values and unknowns.", | |
| ), | |
| ( | |
| "classifier", | |
| "Classifier", | |
| "an equation classifier", | |
| "Identify the equation type (linear, quadratic, etc.) and its key properties (degree, discriminant sign).", | |
| ), | |
| ( | |
| "strategist", | |
| "Strategist", | |
| "a solution strategist", | |
| "Choose the best solution method and outline the plan step-by-step before any calculation.", | |
| ), | |
| ("solver", "Solver", "a mathematician", "Execute the chosen method and compute the roots with full working shown."), | |
| ( | |
| "verifier", | |
| "Verifier", | |
| "a solution verifier", | |
| "Substitute each root back into the original equation and confirm both sides equal zero.", | |
| ), | |
| ( | |
| "interpreter", | |
| "Interpreter", | |
| "a math interpreter", | |
| "Explain what the roots mean in the context of the problem (real/complex, sign, magnitude).", | |
| ), | |
| ( | |
| "reporter", | |
| "Reporter", | |
| "a technical writer", | |
| "Write the final concise answer: state both roots, verify they are correct, one sentence summary.", | |
| ), | |
| ] | |
| def _build_chain7_gmas(llm: TrackedLLM, problem: str) -> dict: | |
| def _build(): | |
| builder = GraphBuilder(BuilderConfig(include_task_node=True, validate=True)) | |
| builder.add_task(query=problem, description="Math problem") | |
| for agent_id, display, persona, desc in _CHAIN7_AGENTS: | |
| builder.add_agent(agent_id, display, persona, desc) | |
| builder.connect_task_to_agents(agent_ids=[_CHAIN7_AGENTS[0][0]], bidirectional=False) | |
| for i in range(len(_CHAIN7_AGENTS) - 1): | |
| builder.add_workflow_edge(_CHAIN7_AGENTS[i][0], _CHAIN7_AGENTS[i + 1][0]) | |
| return builder.build() | |
| # Both gMAS and LangGraph sides use max_tokens=512 per agent call | |
| # so token budgets are equal and the comparison is fair. | |
| return _run_gmas(llm, _build, "reporter", max_tokens=512) | |
| def _build_chain7_langgraph(llm: TrackedLLM, problem: str) -> dict: | |
| try: | |
| from typing import Any, TypedDict, cast | |
| from langgraph.graph import StateGraph | |
| except ImportError: | |
| return {"framework": "langgraph", "error": "langgraph not installed β pip install langgraph"} | |
| keys = ["input"] + [a[0] for a in _CHAIN7_AGENTS] | |
| State = TypedDict("State", dict.fromkeys(keys, str)) # type: ignore[misc] | |
| system_prompts = {a[0]: a[3] for a in _CHAIN7_AGENTS} | |
| # max_tokens=512 matches the gMAS side β equal budget for fair comparison | |
| def _make_node(agent_id: str, prev_key: str, out_key: str): | |
| def _node(s): | |
| user_msg = ( | |
| f"Problem: {s['input']}\n\nPrevious step ({prev_key}):\n{s[prev_key]}" | |
| if prev_key != "input" | |
| else f"Problem: {s['input']}" | |
| ) | |
| return {out_key: llm.chat(system_prompts[agent_id], user_msg, max_tokens=512)} | |
| _node.__name__ = agent_id | |
| return _node | |
| llm.reset() | |
| t0 = time.perf_counter() | |
| g = StateGraph(cast("Any", State)) | |
| agent_ids = [a[0] for a in _CHAIN7_AGENTS] | |
| prev_keys = ["input", *agent_ids[:-1]] | |
| for agent_id, prev_key in zip(agent_ids, prev_keys, strict=False): | |
| g.add_node(agent_id, _make_node(agent_id, prev_key, agent_id)) | |
| for i in range(len(agent_ids) - 1): | |
| g.add_edge(agent_ids[i], agent_ids[i + 1]) | |
| g.set_entry_point(agent_ids[0]) | |
| g.set_finish_point(agent_ids[-1]) | |
| init_state = dict.fromkeys(keys, "") | |
| init_state["input"] = problem | |
| result = g.compile().invoke(init_state) | |
| return { | |
| "framework": "langgraph", | |
| "time": time.perf_counter() - t0, | |
| "tokens": llm.total_tokens, | |
| "calls": llm.call_count, | |
| "output": result["reporter"], | |
| } | |
| # ββ Test 5: Fan-out (1β3β1) βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # | |
| # Topology: Planner β [Algebraic, Graphical, Numerical] β Synthesizer | |
| # | |
| # Planner runs first, then three solvers (each sees planner output + task), | |
| # finally Synthesizer aggregates all three. | |
| # gMAS uses astream() with enable_parallel=True (adaptive=False), | |
| # so [algebraic, graphical, numerical] are executed concurrently via | |
| # asyncio.gather β matching LangGraph's fan-out/fan-in branches. | |
| # Fair data-flow comparison. | |
| def _build_fanout_gmas(llm: TrackedLLM, problem: str) -> dict: | |
| def _build(): | |
| builder = GraphBuilder(BuilderConfig(include_task_node=True, validate=True)) | |
| builder.add_task(query=problem, description="Math problem") | |
| builder.add_agent( | |
| "planner", | |
| "Planner", | |
| "a solution planner", | |
| "Analyze the equation, identify its type, and outline three independent solution strategies.", | |
| ) | |
| builder.add_agent( | |
| "algebraic", | |
| "Algebraic Solver", | |
| "an algebraist", | |
| "Solve using the quadratic formula (discriminant method). Show full working.", | |
| ) | |
| builder.add_agent( | |
| "graphical", | |
| "Graphical Solver", | |
| "a graph analyst", | |
| "Describe how to find roots graphically: vertex, axis of symmetry, x-intercepts.", | |
| ) | |
| builder.add_agent( | |
| "numerical", | |
| "Numerical Solver", | |
| "a numerical methods expert", | |
| "Solve using a numerical approach (e.g. Newton's method). Show iterations.", | |
| ) | |
| builder.add_agent( | |
| "synthesizer", | |
| "Synthesizer", | |
| "a result synthesizer", | |
| "Compare all three solutions, verify consistency, and provide the definitive answer.", | |
| ) | |
| builder.connect_task_to_agents(agent_ids=["planner"], bidirectional=False) | |
| # fan-out: planner β three solvers | |
| builder.add_workflow_edge("planner", "algebraic") | |
| builder.add_workflow_edge("planner", "graphical") | |
| builder.add_workflow_edge("planner", "numerical") | |
| # fan-in: three solvers β synthesizer | |
| builder.add_workflow_edge("algebraic", "synthesizer") | |
| builder.add_workflow_edge("graphical", "synthesizer") | |
| builder.add_workflow_edge("numerical", "synthesizer") | |
| return builder.build() | |
| return _run_gmas(llm, _build, "synthesizer", parallel=True) | |
| def _build_fanout_langgraph(llm: TrackedLLM, problem: str) -> dict: | |
| try: | |
| from typing import Any, TypedDict, cast | |
| from langgraph.graph import END, START, StateGraph | |
| except ImportError: | |
| return {"framework": "langgraph", "error": "langgraph not installed β pip install langgraph"} | |
| class State(TypedDict): | |
| input: str | |
| plan: str | |
| algebraic: str | |
| graphical: str | |
| numerical: str | |
| output: str | |
| def planner(s: State) -> dict: | |
| return { | |
| "plan": llm.chat( | |
| "Analyze the equation, identify its type, and outline three independent solution strategies.", | |
| f"Problem: {s['input']}", | |
| ) | |
| } | |
| def algebraic(s: State) -> dict: | |
| return { | |
| "algebraic": llm.chat( | |
| "Solve using the quadratic formula (discriminant method). Show full working.", | |
| f"Problem: {s['input']}\n\nPlan:\n{s['plan']}", | |
| ) | |
| } | |
| def graphical(s: State) -> dict: | |
| return { | |
| "graphical": llm.chat( | |
| "Describe how to find roots graphically: vertex, axis of symmetry, x-intercepts.", | |
| f"Problem: {s['input']}\n\nPlan:\n{s['plan']}", | |
| ) | |
| } | |
| def numerical(s: State) -> dict: | |
| return { | |
| "numerical": llm.chat( | |
| "Solve using a numerical approach (e.g. Newton's method). Show iterations.", | |
| f"Problem: {s['input']}\n\nPlan:\n{s['plan']}", | |
| ) | |
| } | |
| def synthesizer(s: State) -> dict: | |
| return { | |
| "output": llm.chat( | |
| "Compare all three solutions, verify consistency, and provide the definitive answer.", | |
| f"Algebraic:\n{s['algebraic']}\n\nGraphical:\n{s['graphical']}\n\nNumerical:\n{s['numerical']}", | |
| ) | |
| } | |
| llm.reset() | |
| t0 = time.perf_counter() | |
| g = StateGraph(cast("Any", State)) | |
| g.add_node("planner", planner) | |
| g.add_node("algebraic", algebraic) | |
| g.add_node("graphical", graphical) | |
| g.add_node("numerical", numerical) | |
| g.add_node("synthesizer", synthesizer) | |
| # Fan-out / fan-in topology matching gMAS: | |
| # planner β [algebraic, graphical, numerical] β synthesizer | |
| # Each solver receives planner's output (via state), not the previous solver's. | |
| g.add_edge(START, "planner") | |
| g.add_edge("planner", "algebraic") | |
| g.add_edge("planner", "graphical") | |
| g.add_edge("planner", "numerical") | |
| g.add_edge("algebraic", "synthesizer") | |
| g.add_edge("graphical", "synthesizer") | |
| g.add_edge("numerical", "synthesizer") | |
| g.add_edge("synthesizer", END) | |
| result = g.compile().invoke( | |
| { | |
| "input": problem, | |
| "plan": "", | |
| "algebraic": "", | |
| "graphical": "", | |
| "numerical": "", | |
| "output": "", | |
| } | |
| ) | |
| return { | |
| "framework": "langgraph", | |
| "time": time.perf_counter() - t0, | |
| "tokens": llm.total_tokens, | |
| "calls": llm.call_count, | |
| "output": result["output"], | |
| } | |
| # ββ Test 6: Chain of 15 agents ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # | |
| # reader β extractor β categorizer β assumptions β planner β modeler β | |
| # simplifier β solver β unit_check β verifier_a β verifier_b β | |
| # interpreter β edge_cases β reviewer β reporter | |
| # | |
| # Task: classic two-trains word problem. | |
| # The correct numerical answer emerges as early as the "solver" step; | |
| # subsequent agents verify, interpret and summarise it. | |
| # Goal: measure raw throughput (time + tokens) of a 15-node sequential chain. | |
| TRAIN_PROBLEM = ( | |
| "Two trains leave stations A and B at the same time, travelling towards each other. " | |
| "The distance between the stations is 300 km. " | |
| "Train A travels at 60 km/h and Train B at 80 km/h. " | |
| "When do they meet, and how far from station A is the meeting point?" | |
| ) | |
| _CHAIN15_AGENTS = [ | |
| ( | |
| "reader", | |
| "Reader", | |
| "a careful problem reader", | |
| "Read the problem and restate it in your own words without changing any numbers or facts.", | |
| ), | |
| ( | |
| "extractor", | |
| "Extractor", | |
| "a data extractor", | |
| "List every given value and unknown from the problem as bullet points.", | |
| ), | |
| ( | |
| "categorizer", | |
| "Categorizer", | |
| "a problem categorizer", | |
| "Identify the mathematical category (e.g. kinematics, relative motion) and the specific sub-type.", | |
| ), | |
| ( | |
| "assumptions", | |
| "Assumptions", | |
| "an assumption analyst", | |
| "State all implicit assumptions (constant speed, straight track, simultaneous departure, point-like trains).", | |
| ), | |
| ( | |
| "planner", | |
| "Planner", | |
| "a solution planner", | |
| "Write a numbered step-by-step plan to solve the problem before doing any arithmetic.", | |
| ), | |
| ( | |
| "modeler", | |
| "Modeler", | |
| "a mathematical modeler", | |
| "Translate the plan into equations using variables (e.g. let t = time in hours, d = distance from A).", | |
| ), | |
| ( | |
| "simplifier", | |
| "Simplifier", | |
| "an algebra simplifier", | |
| "Simplify the system of equations down to a single equation in one unknown. Show every step.", | |
| ), | |
| ( | |
| "solver", | |
| "Solver", | |
| "a mathematician", | |
| "Solve the simplified equation and compute the numerical answer (time and distance). Show full working.", | |
| ), | |
| ( | |
| "unit_check", | |
| "UnitChecker", | |
| "a dimensional analyst", | |
| "Verify the units of the answer are consistent (km, h, km/h) and that the magnitudes are physically sensible.", | |
| ), | |
| ( | |
| "verifier_a", | |
| "VerifierA", | |
| "a solution verifier", | |
| "Substitute the answer back into the original equations and confirm both sides are equal.", | |
| ), | |
| ( | |
| "verifier_b", | |
| "VerifierB", | |
| "a cross-checker", | |
| ( | |
| "Use an alternative approach (closing-speed formula: t = D / (v1+v2)) " | |
| "to independently verify the meeting time." | |
| ), | |
| ), | |
| ( | |
| "interpreter", | |
| "Interpreter", | |
| "a results interpreter", | |
| "Express the answer in plain language: time in hours and minutes, distance from each station.", | |
| ), | |
| ( | |
| "edge_cases", | |
| "EdgeCases", | |
| "an edge-case analyst", | |
| ( | |
| "Briefly discuss how the answer changes if one train is faster " | |
| "or the stations are farther apart (1β2 sentences)." | |
| ), | |
| ), | |
| ( | |
| "reviewer", | |
| "Reviewer", | |
| "a critical reviewer", | |
| "Review all previous steps for arithmetic errors or logical gaps and flag any issues found.", | |
| ), | |
| ( | |
| "reporter", | |
| "Reporter", | |
| "a technical writer", | |
| "Write the final concise answer: meeting time, meeting point distance from A, one-sentence summary.", | |
| ), | |
| ] | |
| def _build_chain15_gmas(llm: TrackedLLM, problem: str) -> dict: | |
| def _build(): | |
| builder = GraphBuilder(BuilderConfig(include_task_node=True, validate=True)) | |
| builder.add_task(query=problem, description="Word problem") | |
| for agent_id, display, persona, desc in _CHAIN15_AGENTS: | |
| builder.add_agent(agent_id, display, persona, desc) | |
| builder.connect_task_to_agents(agent_ids=[_CHAIN15_AGENTS[0][0]], bidirectional=False) | |
| for i in range(len(_CHAIN15_AGENTS) - 1): | |
| builder.add_workflow_edge(_CHAIN15_AGENTS[i][0], _CHAIN15_AGENTS[i + 1][0]) | |
| return builder.build() | |
| # max_tokens=256 per agent β 15 calls Γ 256 matches the LangGraph side budget. | |
| return _run_gmas(llm, _build, "reporter", max_tokens=256) | |
| def _build_chain15_langgraph(llm: TrackedLLM, problem: str) -> dict: | |
| try: | |
| from typing import Any, TypedDict, cast | |
| from langgraph.graph import StateGraph | |
| except ImportError: | |
| return {"framework": "langgraph", "error": "langgraph not installed β pip install langgraph"} | |
| keys = ["input"] + [a[0] for a in _CHAIN15_AGENTS] | |
| State = TypedDict("State", dict.fromkeys(keys, str)) # type: ignore[misc] | |
| system_prompts = {a[0]: a[3] for a in _CHAIN15_AGENTS} | |
| # max_tokens=256 matches the gMAS side β equal budget for fair comparison. | |
| def _make_node(agent_id: str, prev_key: str, out_key: str): | |
| def _node(s): | |
| user_msg = ( | |
| f"Problem: {s['input']}\n\nPrevious step ({prev_key}):\n{s[prev_key]}" | |
| if prev_key != "input" | |
| else f"Problem: {s['input']}" | |
| ) | |
| return {out_key: llm.chat(system_prompts[agent_id], user_msg, max_tokens=256)} | |
| _node.__name__ = agent_id | |
| return _node | |
| llm.reset() | |
| t0 = time.perf_counter() | |
| g = StateGraph(cast("Any", State)) | |
| agent_ids = [a[0] for a in _CHAIN15_AGENTS] | |
| prev_keys = ["input", *agent_ids[:-1]] | |
| for agent_id, prev_key in zip(agent_ids, prev_keys, strict=False): | |
| g.add_node(agent_id, _make_node(agent_id, prev_key, agent_id)) | |
| for i in range(len(agent_ids) - 1): | |
| g.add_edge(agent_ids[i], agent_ids[i + 1]) | |
| g.set_entry_point(agent_ids[0]) | |
| g.set_finish_point(agent_ids[-1]) | |
| init_state = dict.fromkeys(keys, "") | |
| init_state["input"] = problem | |
| result = g.compile().invoke(init_state) | |
| return { | |
| "framework": "langgraph", | |
| "time": time.perf_counter() - t0, | |
| "tokens": llm.total_tokens, | |
| "calls": llm.call_count, | |
| "output": result["reporter"], | |
| } | |
| # ββ Results printer βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _print_pair(lg: dict, gmas: dict) -> None: | |
| """Print a side-by-side row for one test.""" | |
| lg_err = lg.get("error") | |
| if lg_err: | |
| print(f" [SKIP] LangGraph β {lg_err}") | |
| print(f" [gMAS] {gmas['time']:>6.2f}s {gmas['tokens']:>6} tok {gmas['calls']:>2} calls") | |
| return | |
| lg_time, lg_tok = lg["time"], lg["tokens"] | |
| gm_time, gm_tok = gmas["time"], gmas["tokens"] | |
| time_delta = (lg_time - gm_time) / max(lg_time, 1e-9) * 100 | |
| tok_delta = (lg_tok - gm_tok) / max(lg_tok, 1) * 100 | |
| time_winner = "gMAS" if gm_time < lg_time else "langgraph" | |
| tok_winner = "gMAS" if gm_tok < lg_tok else "langgraph" | |
| print(f" {'Framework':<14} {'Time':>8} {'Tokens':>8} {'Calls':>5}") | |
| print(f" {'β' * 14} {'β' * 8} {'β' * 8} {'β' * 5}") | |
| print(f" {'LangGraph':<14} {lg_time:>7.2f}s {lg_tok:>8} {lg['calls']:>5}") | |
| print(f" {'gMAS':<14} {gm_time:>7.2f}s {gm_tok:>8} {gmas['calls']:>5}") | |
| print(f" {'delta':<14} {time_delta:>+7.1f}% {tok_delta:>+7.1f}%") | |
| print(f" Winner: time={time_winner} tokens={tok_winner}") | |
| # ββ Main benchmark ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| MATH_PROBLEM = "Solve the equation: 3x^2 - 7x + 2 = 0. Find both roots." | |
| TESTS = [ | |
| ( | |
| "Test 1 β Single Agent (Math Solver)", | |
| "single_agent", | |
| _build_single_langgraph, | |
| _build_single_gmas, | |
| MATH_PROBLEM, | |
| ), | |
| ( | |
| "Test 2 β Chain of 3 (Analyzer -> Solver -> Formatter)", | |
| "chain_3", | |
| _build_chain_langgraph, | |
| _build_chain_gmas, | |
| MATH_PROBLEM, | |
| ), | |
| ( | |
| "Test 3 β Fan-in (Algebraic + Vieta -> Aggregator)", | |
| "fan_in", | |
| _build_fanin_langgraph, | |
| _build_fanin_gmas, | |
| MATH_PROBLEM, | |
| ), | |
| ( | |
| "Test 4 β Chain-7 (Parser->Classifier->Strategist->Solver->Verifier->Interpreter->Reporter)", | |
| "chain_7", | |
| _build_chain7_langgraph, | |
| _build_chain7_gmas, | |
| MATH_PROBLEM, | |
| ), | |
| ( | |
| "Test 5 β Fan-out (Planner -> [Algebraic, Graphical, Numerical] -> Synthesizer)", | |
| "fan_out", | |
| _build_fanout_langgraph, | |
| _build_fanout_gmas, | |
| MATH_PROBLEM, | |
| ), | |
| ( | |
| "Test 6 β Chain-15 (Reader->Extractor->Categorizer->Assumptions->Planner->Modeler->" | |
| "Simplifier->Solver->UnitChecker->VerifierA->VerifierB->Interpreter->EdgeCases->Reviewer->Reporter)", | |
| "chain_15", | |
| _build_chain15_langgraph, | |
| _build_chain15_gmas, | |
| TRAIN_PROBLEM, | |
| ), | |
| ] | |
| def _connect_llm(api_key: str, base_url: str, model: str) -> TrackedLLM: | |
| """Create TrackedLLM and verify connectivity. Exits on failure.""" | |
| llm = TrackedLLM(api_key=api_key, base_url=base_url, model=model) | |
| print("\n [*] Testing LLM connection...") | |
| try: | |
| ping = llm.chat("", "Say OK") | |
| if not ping.strip(): | |
| print("ERROR: LLM returned an empty response. Check your configuration.", file=sys.stderr) | |
| sys.exit(1) | |
| print(f" [+] LLM OK β responded: {_safe(ping.strip()[:60])}") | |
| except Exception as exc: | |
| print(f"ERROR: Cannot connect to LLM: {exc}", file=sys.stderr) | |
| sys.exit(1) | |
| return llm | |
| def _run_all_tests(llm: TrackedLLM) -> tuple[list[dict], list[tuple]]: | |
| """Execute all benchmark test pairs. Returns (results, summary_rows).""" | |
| results: list[dict] = [] | |
| summary_rows: list[tuple] = [] | |
| for title, test_key, lg_fn, gmas_fn, problem in TESTS: | |
| _header(title) | |
| print(" Running LangGraph...", end=" ", flush=True) | |
| lg = lg_fn(llm, problem) | |
| lg["test"] = test_key | |
| if "error" not in lg: | |
| print(f"done ({lg['time']:.2f}s, {lg['tokens']} tok)") | |
| else: | |
| print(f"skipped ({lg['error']})") | |
| print(" Running gMAS... ", end=" ", flush=True) | |
| gmas = gmas_fn(llm, problem) | |
| gmas["test"] = test_key | |
| print(f"done ({gmas['time']:.2f}s, {gmas['tokens']} tok)") | |
| print() | |
| _print_pair(lg, gmas) | |
| results.extend([lg, gmas]) | |
| if "error" not in lg: | |
| summary_rows.append((test_key, lg["time"], gmas["time"], lg["tokens"], gmas["tokens"])) | |
| return results, summary_rows | |
| def _print_summary(summary_rows: list[tuple]) -> None: | |
| """Print grand summary table with averages.""" | |
| if not summary_rows: | |
| return | |
| _header("Summary") | |
| print(f" {'Test':<16} {'LG time':>8} {'gMAS time':>9} {'LG tok':>7} {'gMAS tok':>8} Time winner") | |
| print(f" {'β' * 16} {'β' * 8} {'β' * 9} {'β' * 7} {'β' * 8} {'β' * 12}") | |
| for key, lt, gt, ltok, gtok in summary_rows: | |
| winner = "gMAS" if gt < lt else "LangGraph" | |
| print(f" {key:<16} {lt:>7.2f}s {gt:>8.2f}s {ltok:>7} {gtok:>8} {winner}") | |
| avg_time_delta = sum((r[1] - r[2]) / max(r[1], 1e-9) for r in summary_rows) / len(summary_rows) * 100 | |
| avg_tok_delta = sum((r[3] - r[4]) / max(r[3], 1) for r in summary_rows) / len(summary_rows) * 100 | |
| print("\n Average improvement for gMAS vs LangGraph:") | |
| print(f" time : {avg_time_delta:+.1f}% (positive = gMAS faster)") | |
| print(f" tokens : {avg_tok_delta:+.1f}% (positive = gMAS cheaper)") | |
| def _save_log(model: str, base_url: str, results: list[dict]) -> Path: | |
| """Serialise results to a timestamped JSON file and return its path.""" | |
| output = { | |
| "timestamp": datetime.now(UTC).isoformat(), | |
| "model": model, | |
| "base_url": base_url, | |
| "problem": MATH_PROBLEM, | |
| "results": results, | |
| } | |
| log_dir = Path("benchmark_logs") | |
| log_dir.mkdir(exist_ok=True) | |
| log_file = log_dir / f"benchmark_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}.json" | |
| log_file.write_text(json.dumps(output, ensure_ascii=False, indent=2, default=str), encoding="utf-8") | |
| return log_file | |
| def run_benchmark(num_runs: int = 1) -> dict: | |
| """ | |
| Run all benchmark pairs and print a comparison table. | |
| Args: | |
| num_runs: Number of passes. When > 1 the benchmark runs the full | |
| suite ``num_runs`` times and reports averaged metrics. | |
| """ | |
| _header("Benchmark: gMAS vs LangGraph") | |
| api_key, base_url, model = _validate_llm_config() | |
| print(f" LLM provider : {base_url}") | |
| print(f" Model : {model}") | |
| print(f" Problem : {MATH_PROBLEM}") | |
| if num_runs > 1: | |
| print(f" Runs : {num_runs}") | |
| llm = _connect_llm(api_key, base_url, model) | |
| if num_runs <= 1: | |
| # ββ single pass ββββββββββββββββββββββββββββββββββββββββββββββ | |
| results, summary_rows = _run_all_tests(llm) | |
| _print_summary(summary_rows) | |
| log_file = _save_log(model, base_url, results) | |
| print(f"\n Results saved -> {log_file}") | |
| return { | |
| "timestamp": datetime.now(UTC).isoformat(), | |
| "model": model, | |
| "base_url": base_url, | |
| "problem": MATH_PROBLEM, | |
| "results": results, | |
| } | |
| # ββ multi-pass: collect, average, report βββββββββββββββββββββ | |
| accumulated: dict[str, dict[str, list]] = {} | |
| all_results: list[dict] = [] | |
| for run_idx in range(1, num_runs + 1): | |
| _header(f"Run {run_idx}/{num_runs}") | |
| results, summary_rows = _run_all_tests(llm) | |
| all_results.extend(results) | |
| for key, lt, gt, ltok, gtok in summary_rows: | |
| acc = accumulated.setdefault(key, {"lg_time": [], "gm_time": [], "lg_tok": [], "gm_tok": []}) | |
| acc["lg_time"].append(lt) | |
| acc["gm_time"].append(gt) | |
| acc["lg_tok"].append(ltok) | |
| acc["gm_tok"].append(gtok) | |
| # ββ averaged summary βββββββββββββββββββββββββββββββββββββββββ | |
| _header(f"Averaged Summary ({num_runs} runs)") | |
| print(f" {'Test':<16} {'LG time':>8} {'gMAS time':>9} {'LG tok':>7} {'gMAS tok':>8} Time winner") | |
| print(f" {'β' * 16} {'β' * 8} {'β' * 9} {'β' * 7} {'β' * 8} {'β' * 12}") | |
| avg_rows: list[tuple] = [] | |
| for key, acc in accumulated.items(): | |
| lt = sum(acc["lg_time"]) / len(acc["lg_time"]) | |
| gt = sum(acc["gm_time"]) / len(acc["gm_time"]) | |
| ltok = sum(acc["lg_tok"]) / len(acc["lg_tok"]) | |
| gtok = sum(acc["gm_tok"]) / len(acc["gm_tok"]) | |
| winner = "gMAS" if gt < lt else "LangGraph" | |
| print(f" {key:<16} {lt:>7.2f}s {gt:>8.2f}s {ltok:>7.0f} {gtok:>8.0f} {winner}") | |
| avg_rows.append((key, lt, gt, ltok, gtok)) | |
| if avg_rows: | |
| avg_time_delta = sum((r[1] - r[2]) / max(r[1], 1e-9) for r in avg_rows) / len(avg_rows) * 100 | |
| avg_tok_delta = sum((r[3] - r[4]) / max(r[3], 1) for r in avg_rows) / len(avg_rows) * 100 | |
| print(f"\n Average improvement for gMAS vs LangGraph ({num_runs} runs):") | |
| print(f" time : {avg_time_delta:+.1f}% (positive = gMAS faster)") | |
| print(f" tokens : {avg_tok_delta:+.1f}% (positive = gMAS cheaper)") | |
| log_file = _save_log(model, base_url, all_results) | |
| print(f"\n Results saved -> {log_file}") | |
| return { | |
| "timestamp": datetime.now(UTC).isoformat(), | |
| "model": model, | |
| "base_url": base_url, | |
| "problem": MATH_PROBLEM, | |
| "num_runs": num_runs, | |
| "results": all_results, | |
| } | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser(description="Benchmark gMAS vs LangGraph") | |
| parser.add_argument( | |
| "--runs", | |
| type=int, | |
| default=1, | |
| metavar="N", | |
| help="Number of full benchmark passes (default: 1). Results are averaged.", | |
| ) | |
| args = parser.parse_args() | |
| run_benchmark(num_runs=args.runs) | |