gMAS / examples /benchmark_vs_langgraph.py
Артём Боярских
chore: initial commit
3193174
"""
Benchmark: gMAS vs LangGraph
=============================
Runs equivalent workflows on both frameworks and compares
execution time and token usage:
Test 1 β€” Single agent (Math Solver)
Test 2 β€” Chain of 3 (Analyzer β†’ Solver β†’ Formatter)
Test 3 β€” Fan-in (2β†’1) (Algebraic + Vieta β†’ Aggregator)
Test 4 β€” Chain of 7 (Parser β†’ Classifier β†’ Strategist β†’ Solver β†’ Verifier β†’ Interpreter β†’ Reporter)
Test 5 β€” Fan-out (1β†’3β†’1) (Planner β†’ [Algebraic, Graphical, Numerical] β†’ Synthesizer)
Test 6 β€” Chain of 15 (Reader β†’ Extractor β†’ Categorizer β†’ Assumptions β†’ Planner β†’ Modeler β†’
Simplifier β†’ Solver β†’ UnitChecker β†’ VerifierA β†’ VerifierB β†’
Interpreter β†’ EdgeCases β†’ Reviewer β†’ Reporter)
gMAS uses the modern ``structured_llm_caller`` API so the LLM receives
proper ``system`` / ``user`` roles natively (no flat-string hacks).
For your own projects prefer ``MACPRunner(structured_llm_caller=...)``
over the legacy ``llm_caller(str)->str`` interface β€” see the
``MACPRunner`` docstring for a full example.
Configure your LLM via environment variables:
LLM_API_KEY – API key for your LLM provider
LLM_BASE_URL – Base URL (e.g. http://localhost:8000/v1)
LLM_MODEL – Model name (e.g. gpt-4o-mini)
Run (single pass):
python -m examples.benchmark_vs_langgraph
Run N passes and average the results:
python -m examples.benchmark_vs_langgraph --runs 10
"""
import argparse
import asyncio
import json
import os
import sys
import time
from datetime import UTC, datetime
from pathlib import Path
from unicodedata import normalize
from openai import AsyncOpenAI, OpenAI
from builder import BuilderConfig, GraphBuilder
from execution import MACPRunner, RunnerConfig, StreamEventType
# ── Helpers ───────────────────────────────────────────────────────────────────
def _safe(text: str) -> str:
"""Sanitize text for Windows cp1252 console β€” replace non-ASCII chars."""
return normalize("NFKC", text).encode("ascii", errors="replace").decode("ascii")
def _header(title: str) -> None:
bar = "─" * 64
print(f"\n{bar}")
print(f" {title}")
print(bar)
def _validate_llm_config() -> tuple[str, str, str]:
"""Read LLM credentials from env vars. Exit with a clear message if missing."""
api_key = os.getenv("LLM_API_KEY")
base_url = os.getenv("LLM_BASE_URL")
model = os.getenv("LLM_MODEL")
missing = [v for v, val in [("LLM_API_KEY", api_key), ("LLM_BASE_URL", base_url), ("LLM_MODEL", model)] if not val]
if missing:
print("ERROR: Required environment variables are not set.", file=sys.stderr)
print(f" Missing: {', '.join(missing)}", file=sys.stderr)
print(file=sys.stderr)
print("Set them before running:", file=sys.stderr)
print(" export LLM_API_KEY=your-api-key", file=sys.stderr)
print(" export LLM_BASE_URL=http://localhost:8000/v1", file=sys.stderr)
print(" export LLM_MODEL=gpt-4o-mini", file=sys.stderr)
sys.exit(1)
return api_key, base_url, model # type: ignore[return-value]
# ── Token-tracking LLM wrapper ────────────────────────────────────────────────
class TrackedLLM:
"""
OpenAI-compatible client that tracks token usage across calls.
Used by both gMAS and LangGraph sides of the benchmark so the
comparison is perfectly fair (same HTTP client, same model).
For **your own projects** you don't need this class β€” use the
built-in factory helpers instead::
from execution import (
create_openai_structured_caller,
create_openai_async_structured_caller,
)
runner = MACPRunner(
structured_llm_caller=create_openai_structured_caller(...),
async_structured_llm_caller=create_openai_async_structured_caller(...),
)
"""
def __init__(self, api_key: str, base_url: str, model: str) -> None:
self._client = OpenAI(api_key=api_key, base_url=base_url)
self._async_client = AsyncOpenAI(api_key=api_key, base_url=base_url)
self._model = model
self.total_tokens: int = 0
self.call_count: int = 0
# ------------------------------------------------------------------
def reset(self) -> None:
self.total_tokens = 0
self.call_count = 0
# ------------------------------------------------------------------
def chat(self, system: str, user: str, max_tokens: int = 1024) -> str:
"""Call the LLM with explicit system + user roles."""
messages = []
if system:
messages.append({"role": "system", "content": system})
messages.append({"role": "user", "content": user})
response = self._client.chat.completions.create(
model=self._model,
messages=messages,
temperature=0.7,
max_tokens=max_tokens,
)
text = response.choices[0].message.content or ""
self.total_tokens += response.usage.total_tokens if response.usage else 0
self.call_count += 1
return text
# ------------------------------------------------------------------
async def achat(self, system: str, user: str, max_tokens: int = 1024) -> str:
"""Async version of chat() β€” used by gMAS parallel execution via astream()."""
messages = []
if system:
messages.append({"role": "system", "content": system})
messages.append({"role": "user", "content": user})
response = await self._async_client.chat.completions.create(
model=self._model,
messages=messages,
temperature=0.7,
max_tokens=max_tokens,
)
text = response.choices[0].message.content or ""
self.total_tokens += response.usage.total_tokens if response.usage else 0
self.call_count += 1
return text
# ------------------------------------------------------------------
# gMAS caller factories (with token tracking)
#
# These mirror the framework's ``create_openai_structured_caller`` /
# ``create_openai_async_structured_caller`` but add per-call token
# and call-count tracking needed for benchmarking.
# ------------------------------------------------------------------
def structured_caller(self, max_tokens: int = 1024):
"""Return a ``structured_llm_caller`` for MACPRunner (sync, tracked)."""
def _caller(messages: list[dict[str, str]]) -> str:
response = self._client.chat.completions.create(
model=self._model,
messages=messages,
temperature=0.7,
max_tokens=max_tokens,
)
text = response.choices[0].message.content or ""
self.total_tokens += response.usage.total_tokens if response.usage else 0
self.call_count += 1
return text
return _caller
def async_structured_caller(self, max_tokens: int = 1024):
"""Return an ``async_structured_llm_caller`` for MACPRunner (async, tracked)."""
async def _caller(messages: list[dict[str, str]]) -> str:
response = await self._async_client.chat.completions.create(
model=self._model,
messages=messages,
temperature=0.7,
max_tokens=max_tokens,
)
text = response.choices[0].message.content or ""
self.total_tokens += response.usage.total_tokens if response.usage else 0
self.call_count += 1
return text
return _caller
# ── gMAS graph builder helper ─────────────────────────────────────────────────
def _run_gmas(
llm: TrackedLLM,
build_fn,
final_agent_id: str,
max_tokens: int = 1024,
*,
parallel: bool = False,
) -> dict:
"""
Build graph, execute via MACPRunner, return result dict.
For **sequential** topologies (single agent, chains) uses synchronous
``stream()`` β€” zero async overhead, minimal latency.
For **parallel** topologies (fan-in, fan-out) uses ``astream()`` with
``enable_parallel=True`` so independent agents run concurrently via
``asyncio.gather`` β€” matching LangGraph's parallel branch semantics.
Neither mode uses ``adaptive=True`` (that is for conditional edges,
pruning, and fallbacks β€” not relevant to this benchmark).
Args:
llm: Tracked LLM client used for all agent calls.
build_fn: Zero-argument callable that constructs and returns
the :class:`~graph.AgentGraph` for this test.
final_agent_id: ID of the agent whose output is the final answer.
max_tokens: Per-agent output token limit (must match LangGraph side).
parallel: When ``True``, use async execution with parallel agent
groups. Set for fan-in / fan-out topologies.
"""
llm.reset()
t0 = time.perf_counter()
graph = build_fn()
if parallel:
# Async mode with parallel groups β€” for fan-in / fan-out topologies.
runner = MACPRunner(
structured_llm_caller=llm.structured_caller(max_tokens=max_tokens),
async_structured_llm_caller=llm.async_structured_caller(max_tokens=max_tokens),
config=RunnerConfig(
timeout=180.0,
adaptive=False,
enable_parallel=True,
update_states=True,
broadcast_task_to_all=False,
),
)
async def _run() -> str:
output = ""
async for event in runner.astream(graph, final_agent_id=final_agent_id):
if (
event.event_type == StreamEventType.AGENT_OUTPUT
and getattr(event, "agent_id", "") == final_agent_id
):
output = getattr(event, "content", "")
return output
output = asyncio.run(_run())
else:
# Synchronous mode β€” for single agent and chains.
runner = MACPRunner(
structured_llm_caller=llm.structured_caller(max_tokens=max_tokens),
config=RunnerConfig(
timeout=180.0,
adaptive=False,
update_states=True,
broadcast_task_to_all=False,
),
)
output = ""
for event in runner.stream(graph, final_agent_id=final_agent_id):
if event.event_type == StreamEventType.AGENT_OUTPUT and getattr(event, "agent_id", "") == final_agent_id:
output = getattr(event, "content", "")
elapsed = time.perf_counter() - t0
return {
"framework": "gMAS",
"time": elapsed,
"tokens": llm.total_tokens,
"calls": llm.call_count,
"output": output,
}
# ── Test 1: Single agent ──────────────────────────────────────────────────────
def _build_single_gmas(llm: TrackedLLM, problem: str):
def _build():
builder = GraphBuilder(BuilderConfig(include_task_node=True, validate=True))
builder.add_task(query=problem, description="Math problem")
builder.add_agent(
agent_id="solver",
display_name="Math Solver",
# persona is prefixed with "You are " by the framework,
# so we pass just the role β€” same semantic as the LangGraph side.
persona="a mathematician who solves problems step by step",
description="Solve the equation and state both roots clearly.",
)
builder.connect_task_to_agents(agent_ids=["solver"], bidirectional=False)
return builder.build()
return _run_gmas(llm, _build, "solver")
def _build_single_langgraph(llm: TrackedLLM, problem: str) -> dict:
try:
from typing import Any, TypedDict, cast
from langgraph.graph import StateGraph
except ImportError:
return {"framework": "langgraph", "error": "langgraph not installed β€” pip install langgraph"}
class State(TypedDict):
input: str
output: str
def solver_node(state: State) -> dict:
return {
"output": llm.chat(
"You are a mathematician. Solve problems step by step.",
state["input"],
)
}
llm.reset()
t0 = time.perf_counter()
g = StateGraph(cast("Any", State))
g.add_node("solver", solver_node)
g.set_entry_point("solver")
g.set_finish_point("solver")
result = g.compile().invoke({"input": problem, "output": ""})
return {
"framework": "langgraph",
"time": time.perf_counter() - t0,
"tokens": llm.total_tokens,
"calls": llm.call_count,
"output": result["output"],
}
# ── Test 2: Chain of 3 agents ─────────────────────────────────────────────────
def _build_chain_gmas(llm: TrackedLLM, problem: str) -> dict:
def _build():
builder = GraphBuilder(BuilderConfig(include_task_node=True, validate=True))
builder.add_task(query=problem, description="Math problem")
builder.add_agent("analyzer", "Analyzer", "an analyst", "Identify the equation type and best solution method.")
builder.add_agent("solver", "Solver", "a mathematician", "Solve the problem step by step.")
builder.add_agent("formatter", "Formatter", "a presenter", "Format the final answer clearly and concisely.")
builder.connect_task_to_agents(agent_ids=["analyzer"], bidirectional=False)
builder.add_workflow_edge("analyzer", "solver")
builder.add_workflow_edge("solver", "formatter")
return builder.build()
return _run_gmas(llm, _build, "formatter")
def _build_chain_langgraph(llm: TrackedLLM, problem: str) -> dict:
try:
from typing import Any, TypedDict, cast
from langgraph.graph import StateGraph
except ImportError:
return {"framework": "langgraph", "error": "langgraph not installed β€” pip install langgraph"}
class State(TypedDict):
input: str
step1: str
step2: str
output: str
def analyzer(s: State) -> dict:
return {"step1": llm.chat("Identify the equation type and best solution method.", f"Problem: {s['input']}")}
def solver(s: State) -> dict:
return {"step2": llm.chat("Solve step by step.", f"Problem: {s['input']}\nAnalysis: {s['step1']}")}
def formatter(s: State) -> dict:
return {"output": llm.chat("Format the answer clearly.", f"Solution: {s['step2']}")}
llm.reset()
t0 = time.perf_counter()
g = StateGraph(cast("Any", State))
for name, fn in [("analyzer", analyzer), ("solver", solver), ("formatter", formatter)]:
g.add_node(name, fn)
g.add_edge("analyzer", "solver")
g.add_edge("solver", "formatter")
g.set_entry_point("analyzer")
g.set_finish_point("formatter")
result = g.compile().invoke({"input": problem, "step1": "", "step2": "", "output": ""})
return {
"framework": "langgraph",
"time": time.perf_counter() - t0,
"tokens": llm.total_tokens,
"calls": llm.call_count,
"output": result["output"],
}
# ── Test 3: Fan-in (2β†’1) ──────────────────────────────────────────────────────
#
# Topology: [Algebraic, Vieta] β†’ Aggregator
#
# Both solvers receive only the task input (not each other's output).
# gMAS uses astream() with enable_parallel=True (adaptive=False),
# so [algebraic, vieta] are executed concurrently via asyncio.gather β€”
# matching LangGraph's START β†’ [algebraic, vieta] β†’ aggregator branches.
# Fair data-flow comparison.
def _build_fanin_gmas(llm: TrackedLLM, problem: str) -> dict:
def _build():
builder = GraphBuilder(BuilderConfig(include_task_node=True, validate=True))
builder.add_task(query=problem, description="Math problem")
builder.add_agent(
"algebraic", "Algebraic Solver", "an algebraist", "Solve using the discriminant formula (D = bΒ²-4ac)."
)
builder.add_agent(
"vieta", "Vieta Solver", "a Vieta specialist", "Solve using Vieta's formulas (sum and product of roots)."
)
builder.add_agent(
"aggregator", "Aggregator", "a verifier", "Compare both solutions and provide the definitive answer."
)
builder.connect_task_to_agents(agent_ids=["algebraic", "vieta"], bidirectional=False)
builder.add_workflow_edge("algebraic", "aggregator")
builder.add_workflow_edge("vieta", "aggregator")
return builder.build()
return _run_gmas(llm, _build, "aggregator", parallel=True)
def _build_fanin_langgraph(llm: TrackedLLM, problem: str) -> dict:
try:
from typing import Any, TypedDict, cast
from langgraph.graph import END, START, StateGraph
except ImportError:
return {"framework": "langgraph", "error": "langgraph not installed β€” pip install langgraph"}
class State(TypedDict):
input: str
method1: str
method2: str
output: str
def algebraic(s: State) -> dict:
return {"method1": llm.chat("Solve using the discriminant formula (D = bΒ²-4ac).", f"Problem: {s['input']}")}
def vieta(s: State) -> dict:
return {"method2": llm.chat("Solve using Vieta's formulas.", f"Problem: {s['input']}")}
def aggregator(s: State) -> dict:
return {
"output": llm.chat(
"Compare both solutions and provide the definitive answer.",
f"Method 1 (discriminant):\n{s['method1']}\n\nMethod 2 (Vieta):\n{s['method2']}",
)
}
llm.reset()
t0 = time.perf_counter()
g = StateGraph(cast("Any", State))
g.add_node("algebraic", algebraic)
g.add_node("vieta", vieta)
g.add_node("aggregator", aggregator)
# Fan-in: both solvers start from START (parallel branches), both feed aggregator.
# Each solver receives only the input β€” they don't see each other's output.
# This matches the gMAS topology: task β†’ [algebraic, vieta] β†’ aggregator.
g.add_edge(START, "algebraic")
g.add_edge(START, "vieta")
g.add_edge("algebraic", "aggregator")
g.add_edge("vieta", "aggregator")
g.add_edge("aggregator", END)
result = g.compile().invoke({"input": problem, "method1": "", "method2": "", "output": ""})
return {
"framework": "langgraph",
"time": time.perf_counter() - t0,
"tokens": llm.total_tokens,
"calls": llm.call_count,
"output": result["output"],
}
# ── Test 4: Chain of 7 agents ─────────────────────────────────────────────────
# parser β†’ classifier β†’ strategist β†’ solver β†’ verifier β†’ interpreter β†’ reporter
_CHAIN7_AGENTS = [
(
"parser",
"Parser",
"a problem parser",
"Restate the math problem clearly, identify all given values and unknowns.",
),
(
"classifier",
"Classifier",
"an equation classifier",
"Identify the equation type (linear, quadratic, etc.) and its key properties (degree, discriminant sign).",
),
(
"strategist",
"Strategist",
"a solution strategist",
"Choose the best solution method and outline the plan step-by-step before any calculation.",
),
("solver", "Solver", "a mathematician", "Execute the chosen method and compute the roots with full working shown."),
(
"verifier",
"Verifier",
"a solution verifier",
"Substitute each root back into the original equation and confirm both sides equal zero.",
),
(
"interpreter",
"Interpreter",
"a math interpreter",
"Explain what the roots mean in the context of the problem (real/complex, sign, magnitude).",
),
(
"reporter",
"Reporter",
"a technical writer",
"Write the final concise answer: state both roots, verify they are correct, one sentence summary.",
),
]
def _build_chain7_gmas(llm: TrackedLLM, problem: str) -> dict:
def _build():
builder = GraphBuilder(BuilderConfig(include_task_node=True, validate=True))
builder.add_task(query=problem, description="Math problem")
for agent_id, display, persona, desc in _CHAIN7_AGENTS:
builder.add_agent(agent_id, display, persona, desc)
builder.connect_task_to_agents(agent_ids=[_CHAIN7_AGENTS[0][0]], bidirectional=False)
for i in range(len(_CHAIN7_AGENTS) - 1):
builder.add_workflow_edge(_CHAIN7_AGENTS[i][0], _CHAIN7_AGENTS[i + 1][0])
return builder.build()
# Both gMAS and LangGraph sides use max_tokens=512 per agent call
# so token budgets are equal and the comparison is fair.
return _run_gmas(llm, _build, "reporter", max_tokens=512)
def _build_chain7_langgraph(llm: TrackedLLM, problem: str) -> dict:
try:
from typing import Any, TypedDict, cast
from langgraph.graph import StateGraph
except ImportError:
return {"framework": "langgraph", "error": "langgraph not installed β€” pip install langgraph"}
keys = ["input"] + [a[0] for a in _CHAIN7_AGENTS]
State = TypedDict("State", dict.fromkeys(keys, str)) # type: ignore[misc]
system_prompts = {a[0]: a[3] for a in _CHAIN7_AGENTS}
# max_tokens=512 matches the gMAS side β€” equal budget for fair comparison
def _make_node(agent_id: str, prev_key: str, out_key: str):
def _node(s):
user_msg = (
f"Problem: {s['input']}\n\nPrevious step ({prev_key}):\n{s[prev_key]}"
if prev_key != "input"
else f"Problem: {s['input']}"
)
return {out_key: llm.chat(system_prompts[agent_id], user_msg, max_tokens=512)}
_node.__name__ = agent_id
return _node
llm.reset()
t0 = time.perf_counter()
g = StateGraph(cast("Any", State))
agent_ids = [a[0] for a in _CHAIN7_AGENTS]
prev_keys = ["input", *agent_ids[:-1]]
for agent_id, prev_key in zip(agent_ids, prev_keys, strict=False):
g.add_node(agent_id, _make_node(agent_id, prev_key, agent_id))
for i in range(len(agent_ids) - 1):
g.add_edge(agent_ids[i], agent_ids[i + 1])
g.set_entry_point(agent_ids[0])
g.set_finish_point(agent_ids[-1])
init_state = dict.fromkeys(keys, "")
init_state["input"] = problem
result = g.compile().invoke(init_state)
return {
"framework": "langgraph",
"time": time.perf_counter() - t0,
"tokens": llm.total_tokens,
"calls": llm.call_count,
"output": result["reporter"],
}
# ── Test 5: Fan-out (1β†’3β†’1) ───────────────────────────────────────────────────
#
# Topology: Planner β†’ [Algebraic, Graphical, Numerical] β†’ Synthesizer
#
# Planner runs first, then three solvers (each sees planner output + task),
# finally Synthesizer aggregates all three.
# gMAS uses astream() with enable_parallel=True (adaptive=False),
# so [algebraic, graphical, numerical] are executed concurrently via
# asyncio.gather β€” matching LangGraph's fan-out/fan-in branches.
# Fair data-flow comparison.
def _build_fanout_gmas(llm: TrackedLLM, problem: str) -> dict:
def _build():
builder = GraphBuilder(BuilderConfig(include_task_node=True, validate=True))
builder.add_task(query=problem, description="Math problem")
builder.add_agent(
"planner",
"Planner",
"a solution planner",
"Analyze the equation, identify its type, and outline three independent solution strategies.",
)
builder.add_agent(
"algebraic",
"Algebraic Solver",
"an algebraist",
"Solve using the quadratic formula (discriminant method). Show full working.",
)
builder.add_agent(
"graphical",
"Graphical Solver",
"a graph analyst",
"Describe how to find roots graphically: vertex, axis of symmetry, x-intercepts.",
)
builder.add_agent(
"numerical",
"Numerical Solver",
"a numerical methods expert",
"Solve using a numerical approach (e.g. Newton's method). Show iterations.",
)
builder.add_agent(
"synthesizer",
"Synthesizer",
"a result synthesizer",
"Compare all three solutions, verify consistency, and provide the definitive answer.",
)
builder.connect_task_to_agents(agent_ids=["planner"], bidirectional=False)
# fan-out: planner β†’ three solvers
builder.add_workflow_edge("planner", "algebraic")
builder.add_workflow_edge("planner", "graphical")
builder.add_workflow_edge("planner", "numerical")
# fan-in: three solvers β†’ synthesizer
builder.add_workflow_edge("algebraic", "synthesizer")
builder.add_workflow_edge("graphical", "synthesizer")
builder.add_workflow_edge("numerical", "synthesizer")
return builder.build()
return _run_gmas(llm, _build, "synthesizer", parallel=True)
def _build_fanout_langgraph(llm: TrackedLLM, problem: str) -> dict:
try:
from typing import Any, TypedDict, cast
from langgraph.graph import END, START, StateGraph
except ImportError:
return {"framework": "langgraph", "error": "langgraph not installed β€” pip install langgraph"}
class State(TypedDict):
input: str
plan: str
algebraic: str
graphical: str
numerical: str
output: str
def planner(s: State) -> dict:
return {
"plan": llm.chat(
"Analyze the equation, identify its type, and outline three independent solution strategies.",
f"Problem: {s['input']}",
)
}
def algebraic(s: State) -> dict:
return {
"algebraic": llm.chat(
"Solve using the quadratic formula (discriminant method). Show full working.",
f"Problem: {s['input']}\n\nPlan:\n{s['plan']}",
)
}
def graphical(s: State) -> dict:
return {
"graphical": llm.chat(
"Describe how to find roots graphically: vertex, axis of symmetry, x-intercepts.",
f"Problem: {s['input']}\n\nPlan:\n{s['plan']}",
)
}
def numerical(s: State) -> dict:
return {
"numerical": llm.chat(
"Solve using a numerical approach (e.g. Newton's method). Show iterations.",
f"Problem: {s['input']}\n\nPlan:\n{s['plan']}",
)
}
def synthesizer(s: State) -> dict:
return {
"output": llm.chat(
"Compare all three solutions, verify consistency, and provide the definitive answer.",
f"Algebraic:\n{s['algebraic']}\n\nGraphical:\n{s['graphical']}\n\nNumerical:\n{s['numerical']}",
)
}
llm.reset()
t0 = time.perf_counter()
g = StateGraph(cast("Any", State))
g.add_node("planner", planner)
g.add_node("algebraic", algebraic)
g.add_node("graphical", graphical)
g.add_node("numerical", numerical)
g.add_node("synthesizer", synthesizer)
# Fan-out / fan-in topology matching gMAS:
# planner β†’ [algebraic, graphical, numerical] β†’ synthesizer
# Each solver receives planner's output (via state), not the previous solver's.
g.add_edge(START, "planner")
g.add_edge("planner", "algebraic")
g.add_edge("planner", "graphical")
g.add_edge("planner", "numerical")
g.add_edge("algebraic", "synthesizer")
g.add_edge("graphical", "synthesizer")
g.add_edge("numerical", "synthesizer")
g.add_edge("synthesizer", END)
result = g.compile().invoke(
{
"input": problem,
"plan": "",
"algebraic": "",
"graphical": "",
"numerical": "",
"output": "",
}
)
return {
"framework": "langgraph",
"time": time.perf_counter() - t0,
"tokens": llm.total_tokens,
"calls": llm.call_count,
"output": result["output"],
}
# ── Test 6: Chain of 15 agents ────────────────────────────────────────────────
#
# reader β†’ extractor β†’ categorizer β†’ assumptions β†’ planner β†’ modeler β†’
# simplifier β†’ solver β†’ unit_check β†’ verifier_a β†’ verifier_b β†’
# interpreter β†’ edge_cases β†’ reviewer β†’ reporter
#
# Task: classic two-trains word problem.
# The correct numerical answer emerges as early as the "solver" step;
# subsequent agents verify, interpret and summarise it.
# Goal: measure raw throughput (time + tokens) of a 15-node sequential chain.
TRAIN_PROBLEM = (
"Two trains leave stations A and B at the same time, travelling towards each other. "
"The distance between the stations is 300 km. "
"Train A travels at 60 km/h and Train B at 80 km/h. "
"When do they meet, and how far from station A is the meeting point?"
)
_CHAIN15_AGENTS = [
(
"reader",
"Reader",
"a careful problem reader",
"Read the problem and restate it in your own words without changing any numbers or facts.",
),
(
"extractor",
"Extractor",
"a data extractor",
"List every given value and unknown from the problem as bullet points.",
),
(
"categorizer",
"Categorizer",
"a problem categorizer",
"Identify the mathematical category (e.g. kinematics, relative motion) and the specific sub-type.",
),
(
"assumptions",
"Assumptions",
"an assumption analyst",
"State all implicit assumptions (constant speed, straight track, simultaneous departure, point-like trains).",
),
(
"planner",
"Planner",
"a solution planner",
"Write a numbered step-by-step plan to solve the problem before doing any arithmetic.",
),
(
"modeler",
"Modeler",
"a mathematical modeler",
"Translate the plan into equations using variables (e.g. let t = time in hours, d = distance from A).",
),
(
"simplifier",
"Simplifier",
"an algebra simplifier",
"Simplify the system of equations down to a single equation in one unknown. Show every step.",
),
(
"solver",
"Solver",
"a mathematician",
"Solve the simplified equation and compute the numerical answer (time and distance). Show full working.",
),
(
"unit_check",
"UnitChecker",
"a dimensional analyst",
"Verify the units of the answer are consistent (km, h, km/h) and that the magnitudes are physically sensible.",
),
(
"verifier_a",
"VerifierA",
"a solution verifier",
"Substitute the answer back into the original equations and confirm both sides are equal.",
),
(
"verifier_b",
"VerifierB",
"a cross-checker",
(
"Use an alternative approach (closing-speed formula: t = D / (v1+v2)) "
"to independently verify the meeting time."
),
),
(
"interpreter",
"Interpreter",
"a results interpreter",
"Express the answer in plain language: time in hours and minutes, distance from each station.",
),
(
"edge_cases",
"EdgeCases",
"an edge-case analyst",
(
"Briefly discuss how the answer changes if one train is faster "
"or the stations are farther apart (1–2 sentences)."
),
),
(
"reviewer",
"Reviewer",
"a critical reviewer",
"Review all previous steps for arithmetic errors or logical gaps and flag any issues found.",
),
(
"reporter",
"Reporter",
"a technical writer",
"Write the final concise answer: meeting time, meeting point distance from A, one-sentence summary.",
),
]
def _build_chain15_gmas(llm: TrackedLLM, problem: str) -> dict:
def _build():
builder = GraphBuilder(BuilderConfig(include_task_node=True, validate=True))
builder.add_task(query=problem, description="Word problem")
for agent_id, display, persona, desc in _CHAIN15_AGENTS:
builder.add_agent(agent_id, display, persona, desc)
builder.connect_task_to_agents(agent_ids=[_CHAIN15_AGENTS[0][0]], bidirectional=False)
for i in range(len(_CHAIN15_AGENTS) - 1):
builder.add_workflow_edge(_CHAIN15_AGENTS[i][0], _CHAIN15_AGENTS[i + 1][0])
return builder.build()
# max_tokens=256 per agent β€” 15 calls Γ— 256 matches the LangGraph side budget.
return _run_gmas(llm, _build, "reporter", max_tokens=256)
def _build_chain15_langgraph(llm: TrackedLLM, problem: str) -> dict:
try:
from typing import Any, TypedDict, cast
from langgraph.graph import StateGraph
except ImportError:
return {"framework": "langgraph", "error": "langgraph not installed β€” pip install langgraph"}
keys = ["input"] + [a[0] for a in _CHAIN15_AGENTS]
State = TypedDict("State", dict.fromkeys(keys, str)) # type: ignore[misc]
system_prompts = {a[0]: a[3] for a in _CHAIN15_AGENTS}
# max_tokens=256 matches the gMAS side β€” equal budget for fair comparison.
def _make_node(agent_id: str, prev_key: str, out_key: str):
def _node(s):
user_msg = (
f"Problem: {s['input']}\n\nPrevious step ({prev_key}):\n{s[prev_key]}"
if prev_key != "input"
else f"Problem: {s['input']}"
)
return {out_key: llm.chat(system_prompts[agent_id], user_msg, max_tokens=256)}
_node.__name__ = agent_id
return _node
llm.reset()
t0 = time.perf_counter()
g = StateGraph(cast("Any", State))
agent_ids = [a[0] for a in _CHAIN15_AGENTS]
prev_keys = ["input", *agent_ids[:-1]]
for agent_id, prev_key in zip(agent_ids, prev_keys, strict=False):
g.add_node(agent_id, _make_node(agent_id, prev_key, agent_id))
for i in range(len(agent_ids) - 1):
g.add_edge(agent_ids[i], agent_ids[i + 1])
g.set_entry_point(agent_ids[0])
g.set_finish_point(agent_ids[-1])
init_state = dict.fromkeys(keys, "")
init_state["input"] = problem
result = g.compile().invoke(init_state)
return {
"framework": "langgraph",
"time": time.perf_counter() - t0,
"tokens": llm.total_tokens,
"calls": llm.call_count,
"output": result["reporter"],
}
# ── Results printer ───────────────────────────────────────────────────────────
def _print_pair(lg: dict, gmas: dict) -> None:
"""Print a side-by-side row for one test."""
lg_err = lg.get("error")
if lg_err:
print(f" [SKIP] LangGraph β€” {lg_err}")
print(f" [gMAS] {gmas['time']:>6.2f}s {gmas['tokens']:>6} tok {gmas['calls']:>2} calls")
return
lg_time, lg_tok = lg["time"], lg["tokens"]
gm_time, gm_tok = gmas["time"], gmas["tokens"]
time_delta = (lg_time - gm_time) / max(lg_time, 1e-9) * 100
tok_delta = (lg_tok - gm_tok) / max(lg_tok, 1) * 100
time_winner = "gMAS" if gm_time < lg_time else "langgraph"
tok_winner = "gMAS" if gm_tok < lg_tok else "langgraph"
print(f" {'Framework':<14} {'Time':>8} {'Tokens':>8} {'Calls':>5}")
print(f" {'─' * 14} {'─' * 8} {'─' * 8} {'─' * 5}")
print(f" {'LangGraph':<14} {lg_time:>7.2f}s {lg_tok:>8} {lg['calls']:>5}")
print(f" {'gMAS':<14} {gm_time:>7.2f}s {gm_tok:>8} {gmas['calls']:>5}")
print(f" {'delta':<14} {time_delta:>+7.1f}% {tok_delta:>+7.1f}%")
print(f" Winner: time={time_winner} tokens={tok_winner}")
# ── Main benchmark ────────────────────────────────────────────────────────────
MATH_PROBLEM = "Solve the equation: 3x^2 - 7x + 2 = 0. Find both roots."
TESTS = [
(
"Test 1 β€” Single Agent (Math Solver)",
"single_agent",
_build_single_langgraph,
_build_single_gmas,
MATH_PROBLEM,
),
(
"Test 2 β€” Chain of 3 (Analyzer -> Solver -> Formatter)",
"chain_3",
_build_chain_langgraph,
_build_chain_gmas,
MATH_PROBLEM,
),
(
"Test 3 β€” Fan-in (Algebraic + Vieta -> Aggregator)",
"fan_in",
_build_fanin_langgraph,
_build_fanin_gmas,
MATH_PROBLEM,
),
(
"Test 4 β€” Chain-7 (Parser->Classifier->Strategist->Solver->Verifier->Interpreter->Reporter)",
"chain_7",
_build_chain7_langgraph,
_build_chain7_gmas,
MATH_PROBLEM,
),
(
"Test 5 β€” Fan-out (Planner -> [Algebraic, Graphical, Numerical] -> Synthesizer)",
"fan_out",
_build_fanout_langgraph,
_build_fanout_gmas,
MATH_PROBLEM,
),
(
"Test 6 β€” Chain-15 (Reader->Extractor->Categorizer->Assumptions->Planner->Modeler->"
"Simplifier->Solver->UnitChecker->VerifierA->VerifierB->Interpreter->EdgeCases->Reviewer->Reporter)",
"chain_15",
_build_chain15_langgraph,
_build_chain15_gmas,
TRAIN_PROBLEM,
),
]
def _connect_llm(api_key: str, base_url: str, model: str) -> TrackedLLM:
"""Create TrackedLLM and verify connectivity. Exits on failure."""
llm = TrackedLLM(api_key=api_key, base_url=base_url, model=model)
print("\n [*] Testing LLM connection...")
try:
ping = llm.chat("", "Say OK")
if not ping.strip():
print("ERROR: LLM returned an empty response. Check your configuration.", file=sys.stderr)
sys.exit(1)
print(f" [+] LLM OK β€” responded: {_safe(ping.strip()[:60])}")
except Exception as exc:
print(f"ERROR: Cannot connect to LLM: {exc}", file=sys.stderr)
sys.exit(1)
return llm
def _run_all_tests(llm: TrackedLLM) -> tuple[list[dict], list[tuple]]:
"""Execute all benchmark test pairs. Returns (results, summary_rows)."""
results: list[dict] = []
summary_rows: list[tuple] = []
for title, test_key, lg_fn, gmas_fn, problem in TESTS:
_header(title)
print(" Running LangGraph...", end=" ", flush=True)
lg = lg_fn(llm, problem)
lg["test"] = test_key
if "error" not in lg:
print(f"done ({lg['time']:.2f}s, {lg['tokens']} tok)")
else:
print(f"skipped ({lg['error']})")
print(" Running gMAS... ", end=" ", flush=True)
gmas = gmas_fn(llm, problem)
gmas["test"] = test_key
print(f"done ({gmas['time']:.2f}s, {gmas['tokens']} tok)")
print()
_print_pair(lg, gmas)
results.extend([lg, gmas])
if "error" not in lg:
summary_rows.append((test_key, lg["time"], gmas["time"], lg["tokens"], gmas["tokens"]))
return results, summary_rows
def _print_summary(summary_rows: list[tuple]) -> None:
"""Print grand summary table with averages."""
if not summary_rows:
return
_header("Summary")
print(f" {'Test':<16} {'LG time':>8} {'gMAS time':>9} {'LG tok':>7} {'gMAS tok':>8} Time winner")
print(f" {'─' * 16} {'─' * 8} {'─' * 9} {'─' * 7} {'─' * 8} {'─' * 12}")
for key, lt, gt, ltok, gtok in summary_rows:
winner = "gMAS" if gt < lt else "LangGraph"
print(f" {key:<16} {lt:>7.2f}s {gt:>8.2f}s {ltok:>7} {gtok:>8} {winner}")
avg_time_delta = sum((r[1] - r[2]) / max(r[1], 1e-9) for r in summary_rows) / len(summary_rows) * 100
avg_tok_delta = sum((r[3] - r[4]) / max(r[3], 1) for r in summary_rows) / len(summary_rows) * 100
print("\n Average improvement for gMAS vs LangGraph:")
print(f" time : {avg_time_delta:+.1f}% (positive = gMAS faster)")
print(f" tokens : {avg_tok_delta:+.1f}% (positive = gMAS cheaper)")
def _save_log(model: str, base_url: str, results: list[dict]) -> Path:
"""Serialise results to a timestamped JSON file and return its path."""
output = {
"timestamp": datetime.now(UTC).isoformat(),
"model": model,
"base_url": base_url,
"problem": MATH_PROBLEM,
"results": results,
}
log_dir = Path("benchmark_logs")
log_dir.mkdir(exist_ok=True)
log_file = log_dir / f"benchmark_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}.json"
log_file.write_text(json.dumps(output, ensure_ascii=False, indent=2, default=str), encoding="utf-8")
return log_file
def run_benchmark(num_runs: int = 1) -> dict:
"""
Run all benchmark pairs and print a comparison table.
Args:
num_runs: Number of passes. When > 1 the benchmark runs the full
suite ``num_runs`` times and reports averaged metrics.
"""
_header("Benchmark: gMAS vs LangGraph")
api_key, base_url, model = _validate_llm_config()
print(f" LLM provider : {base_url}")
print(f" Model : {model}")
print(f" Problem : {MATH_PROBLEM}")
if num_runs > 1:
print(f" Runs : {num_runs}")
llm = _connect_llm(api_key, base_url, model)
if num_runs <= 1:
# ── single pass ──────────────────────────────────────────────
results, summary_rows = _run_all_tests(llm)
_print_summary(summary_rows)
log_file = _save_log(model, base_url, results)
print(f"\n Results saved -> {log_file}")
return {
"timestamp": datetime.now(UTC).isoformat(),
"model": model,
"base_url": base_url,
"problem": MATH_PROBLEM,
"results": results,
}
# ── multi-pass: collect, average, report ─────────────────────
accumulated: dict[str, dict[str, list]] = {}
all_results: list[dict] = []
for run_idx in range(1, num_runs + 1):
_header(f"Run {run_idx}/{num_runs}")
results, summary_rows = _run_all_tests(llm)
all_results.extend(results)
for key, lt, gt, ltok, gtok in summary_rows:
acc = accumulated.setdefault(key, {"lg_time": [], "gm_time": [], "lg_tok": [], "gm_tok": []})
acc["lg_time"].append(lt)
acc["gm_time"].append(gt)
acc["lg_tok"].append(ltok)
acc["gm_tok"].append(gtok)
# ── averaged summary ─────────────────────────────────────────
_header(f"Averaged Summary ({num_runs} runs)")
print(f" {'Test':<16} {'LG time':>8} {'gMAS time':>9} {'LG tok':>7} {'gMAS tok':>8} Time winner")
print(f" {'─' * 16} {'─' * 8} {'─' * 9} {'─' * 7} {'─' * 8} {'─' * 12}")
avg_rows: list[tuple] = []
for key, acc in accumulated.items():
lt = sum(acc["lg_time"]) / len(acc["lg_time"])
gt = sum(acc["gm_time"]) / len(acc["gm_time"])
ltok = sum(acc["lg_tok"]) / len(acc["lg_tok"])
gtok = sum(acc["gm_tok"]) / len(acc["gm_tok"])
winner = "gMAS" if gt < lt else "LangGraph"
print(f" {key:<16} {lt:>7.2f}s {gt:>8.2f}s {ltok:>7.0f} {gtok:>8.0f} {winner}")
avg_rows.append((key, lt, gt, ltok, gtok))
if avg_rows:
avg_time_delta = sum((r[1] - r[2]) / max(r[1], 1e-9) for r in avg_rows) / len(avg_rows) * 100
avg_tok_delta = sum((r[3] - r[4]) / max(r[3], 1) for r in avg_rows) / len(avg_rows) * 100
print(f"\n Average improvement for gMAS vs LangGraph ({num_runs} runs):")
print(f" time : {avg_time_delta:+.1f}% (positive = gMAS faster)")
print(f" tokens : {avg_tok_delta:+.1f}% (positive = gMAS cheaper)")
log_file = _save_log(model, base_url, all_results)
print(f"\n Results saved -> {log_file}")
return {
"timestamp": datetime.now(UTC).isoformat(),
"model": model,
"base_url": base_url,
"problem": MATH_PROBLEM,
"num_runs": num_runs,
"results": all_results,
}
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Benchmark gMAS vs LangGraph")
parser.add_argument(
"--runs",
type=int,
default=1,
metavar="N",
help="Number of full benchmark passes (default: 1). Results are averaged.",
)
args = parser.parse_args()
run_benchmark(num_runs=args.runs)