gMAS / tests /test_runner.py
Артём Боярских
chore: initial commit
3193174
"""Tests for execution/runner.py — MACPRunner."""
import asyncio
import pytest
import rustworkx as rx
import torch
from core.graph import RoleGraph
from execution.budget import BudgetConfig
from execution.runner import MACPResult, MACPRunner, RunnerConfig
def create_test_graph(nodes, edges):
"""Create a test RoleGraph."""
from core.agent import AgentProfile
g = rx.PyDiGraph()
id_to_idx = {}
agents = []
for nid in nodes:
if nid != "task":
idx = g.add_node({"id": nid})
id_to_idx[nid] = idx
# Create roles for agents
agent = AgentProfile(agent_id=nid, display_name=f"Agent {nid.upper()}")
agents.append(agent)
connections = {n: [] for n in nodes}
for src, tgt in edges:
if src in id_to_idx and tgt in id_to_idx:
g.add_edge(id_to_idx[src], id_to_idx[tgt], {"weight": 1.0})
connections[src].append(tgt)
n = len(id_to_idx)
a_com = torch.zeros((n + 1, n + 1), dtype=torch.float32) # +1 for task node
# Add task node
task_idx = g.add_node({"id": "task"})
id_to_idx["task"] = task_idx
# Fill matrix
node_list = [nid for nid in nodes if nid != "task"]
for i, src in enumerate(node_list):
for tgt in connections[src]:
if tgt in node_list:
j = node_list.index(tgt)
a_com[i, j] = 1.0
role_graph = RoleGraph(
node_ids=nodes,
role_connections=connections,
graph=g,
A_com=a_com,
task_node="task",
query="test query",
)
role_graph.agents = agents
return role_graph
def create_simple_llm_caller(response_text="Test response"):
"""Create a simple synchronous LLM caller."""
def llm_caller(prompt: str) -> str:
return response_text
return llm_caller
def create_simple_async_llm_caller(response_text="Test response"):
"""Create a simple asynchronous LLM caller."""
async def async_llm_caller(prompt: str) -> str:
await asyncio.sleep(0.001) # Simulate delay
return response_text
return async_llm_caller
class TestMACPRunnerCreation:
"""Tests for MACPRunner creation."""
def test_basic_creation(self):
"""Basic creation."""
llm_caller = create_simple_llm_caller()
runner = MACPRunner(llm_caller=llm_caller)
assert runner is not None
assert runner.llm_caller is not None
def test_creation_with_config(self):
"""Creation with configuration."""
llm_caller = create_simple_llm_caller()
config = RunnerConfig(
timeout=30.0,
max_retries=3,
adaptive=True,
)
runner = MACPRunner(llm_caller=llm_caller, config=config)
assert runner.config.timeout == 30.0
assert runner.config.max_retries == 3
assert runner.config.adaptive
class TestSyncExecution:
"""Tests for synchronous execution."""
def test_run_simple(self):
"""Simple run."""
graph = create_test_graph(["a", "b"], [("a", "b")])
llm_caller = create_simple_llm_caller()
runner = MACPRunner(llm_caller=llm_caller)
result = runner.run_round(graph)
assert isinstance(result, MACPResult)
assert result.final_answer is not None
def test_run_linear_graph(self):
"""Run on a linear graph."""
graph = create_test_graph(["a", "b", "c"], [("a", "b"), ("b", "c")])
llm_caller = create_simple_llm_caller()
runner = MACPRunner(llm_caller=llm_caller)
result = runner.run_round(graph)
assert len(result.execution_order) == 3
assert result.final_answer is not None
def test_run_with_final_agent(self):
"""Run with a specified final agent."""
graph = create_test_graph(["a", "b"], [("a", "b")])
llm_caller = create_simple_llm_caller("final response")
runner = MACPRunner(llm_caller=llm_caller)
result = runner.run_round(graph, final_agent_id="b")
assert result.final_agent_id == "b"
class TestAsyncExecution:
"""Tests for asynchronous execution."""
@pytest.mark.asyncio
async def test_arun_simple(self):
"""Simple async run."""
graph = create_test_graph(["a", "b"], [("a", "b")])
async_llm_caller = create_simple_async_llm_caller()
runner = MACPRunner(async_llm_caller=async_llm_caller)
result = await runner.arun_round(graph)
assert isinstance(result, MACPResult)
assert result.final_answer is not None
@pytest.mark.asyncio
async def test_arun_parallel_execution(self):
"""Parallel async execution."""
# a -> b, c (parallel) -> d
graph = create_test_graph(
["a", "b", "c", "d"],
[("a", "b"), ("a", "c"), ("b", "d"), ("c", "d")],
)
async_llm_caller = create_simple_async_llm_caller()
config = RunnerConfig(enable_parallel=True, adaptive=True)
runner = MACPRunner(async_llm_caller=async_llm_caller, config=config)
result = await runner.arun_round(graph)
# Should have executed all agents
assert len(result.execution_order) == 4
# a should be first in execution order
assert result.execution_order[0] == "a"
class TestTimeouts:
"""Tests for timeout handling."""
@pytest.mark.asyncio
async def test_timeout_triggers(self):
"""Timeout triggers."""
graph = create_test_graph(["a"], [])
async def slow_llm_caller(prompt: str) -> str:
await asyncio.sleep(10.0) # Very slow
return "done"
config = RunnerConfig(timeout=0.1) # Short timeout
runner = MACPRunner(async_llm_caller=slow_llm_caller, config=config)
result = await runner.arun_round(graph)
# Should complete but might have timeout in messages
assert result is not None
@pytest.mark.asyncio
async def test_per_agent_timeout(self):
"""Per-agent timeout."""
graph = create_test_graph(["a", "b"], [("a", "b")])
call_count = 0
async def slow_llm_caller(prompt: str) -> str:
nonlocal call_count
call_count += 1
if call_count > 1:
await asyncio.sleep(10.0)
return "response"
config = RunnerConfig(timeout=0.1)
runner = MACPRunner(async_llm_caller=slow_llm_caller, config=config)
result = await runner.arun_round(graph)
# First agent should succeed, second might timeout
assert result is not None
class TestRetries:
"""Tests for retry mechanism."""
@pytest.mark.asyncio
async def test_retry_on_failure(self):
"""Retry on failure."""
graph = create_test_graph(["a"], [])
attempt_count = 0
async def flaky_llm_caller(prompt: str) -> str:
nonlocal attempt_count
attempt_count += 1
if attempt_count < 3:
msg = "Temporary failure"
raise RuntimeError(msg)
return "success"
config = RunnerConfig(max_retries=5, adaptive=True)
runner = MACPRunner(async_llm_caller=flaky_llm_caller, config=config)
result = await runner.arun_round(graph)
assert attempt_count == 3
assert result.messages.get("a") == "success"
@pytest.mark.asyncio
async def test_max_retries_exceeded(self):
"""Max retries exceeded."""
graph = create_test_graph(["a"], [])
async def always_fails(prompt: str) -> str:
msg = "Always fails"
raise RuntimeError(msg)
config = RunnerConfig(max_retries=2, adaptive=True)
runner = MACPRunner(async_llm_caller=always_fails, config=config)
result = await runner.arun_round(graph)
# Should fail after max retries
assert result.errors is not None
@pytest.mark.asyncio
async def test_retry_with_backoff(self):
"""Retry with exponential backoff."""
graph = create_test_graph(["a"], [])
import time
timestamps = []
async def timing_llm_caller(prompt: str) -> str:
timestamps.append(time.time())
if len(timestamps) < 3:
msg = "Retry"
raise RuntimeError(msg)
return "done"
config = RunnerConfig(
max_retries=5,
retry_delay=0.1,
retry_backoff=2.0,
adaptive=True,
)
runner = MACPRunner(async_llm_caller=timing_llm_caller, config=config)
await runner.arun_round(graph)
# Check delays increased
if len(timestamps) >= 3:
delay1 = timestamps[1] - timestamps[0]
delay2 = timestamps[2] - timestamps[1]
assert delay2 > delay1 # Backoff should increase delay
class TestBudgetControl:
"""Tests for budget control."""
@pytest.mark.asyncio
async def test_token_budget_respected(self):
"""Token budget is respected."""
graph = create_test_graph(["a", "b", "c"], [("a", "b"), ("b", "c")])
async def token_hungry_llm_caller(prompt: str) -> str:
# Simulate token usage
return "response " * 100 # Many tokens
budget_config = BudgetConfig(
# Using reasonable defaults - BudgetConfig doesn't require specific params
)
config = RunnerConfig(budget_config=budget_config, adaptive=True)
runner = MACPRunner(async_llm_caller=token_hungry_llm_caller, config=config)
result = await runner.arun_round(graph)
# Should stop due to budget or complete with budget tracking
assert result.budget_summary is not None or result.total_tokens > 0
@pytest.mark.asyncio
async def test_budget_warning(self):
"""Warning when approaching budget."""
graph = create_test_graph(["a"], [])
async_llm_caller = create_simple_async_llm_caller()
budget_config = BudgetConfig(
# Using reasonable defaults
)
config = RunnerConfig(budget_config=budget_config)
runner = MACPRunner(async_llm_caller=async_llm_caller, config=config)
result = await runner.arun_round(graph)
# Should complete successfully
assert result is not None
class TestMemoryUpdates:
"""Tests for agent memory updates."""
@pytest.mark.asyncio
async def test_state_propagation(self):
"""State is propagated between agents."""
graph = create_test_graph(["a", "b"], [("a", "b")])
async_llm_caller = create_simple_async_llm_caller()
runner = MACPRunner(async_llm_caller=async_llm_caller)
result = await runner.arun_round(graph)
# b should have received context from a
assert "b" in result.messages
assert result.messages["b"] is not None
@pytest.mark.asyncio
async def test_hidden_state_channels(self):
"""Hidden state channels."""
graph = create_test_graph(["a", "b"], [("a", "b")])
async_llm_caller = create_simple_async_llm_caller()
config = RunnerConfig(enable_hidden_channels=True)
runner = MACPRunner(async_llm_caller=async_llm_caller, config=config)
result = await runner.arun_round(graph)
assert result is not None
class TestAdaptiveMode:
"""Tests for adaptive mode."""
@pytest.mark.asyncio
async def test_adaptive_routing(self):
"""Adaptive routing."""
graph = create_test_graph(
["a", "b", "c"],
[("a", "b"), ("a", "c"), ("b", "c")],
)
async_llm_caller = create_simple_async_llm_caller()
config = RunnerConfig(adaptive=True)
runner = MACPRunner(async_llm_caller=async_llm_caller, config=config)
result = await runner.arun_round(graph)
assert len(result.execution_order) > 0
@pytest.mark.asyncio
async def test_adaptive_topology_change(self):
"""Adaptive topology change on error."""
graph = create_test_graph(
["a", "b", "fallback", "c"],
[("a", "b"), ("a", "fallback"), ("b", "c"), ("fallback", "c")],
)
call_count = 0
async def maybe_failing_llm_caller(prompt: str) -> str:
nonlocal call_count
call_count += 1
# Simulate error on the second call (agent b)
if call_count == 2:
msg = "b failed"
raise RuntimeError(msg)
return "response"
config = RunnerConfig(
adaptive=True,
max_retries=0,
)
runner = MACPRunner(async_llm_caller=maybe_failing_llm_caller, config=config)
result = await runner.arun_round(graph)
# Should complete with some agents executed
assert result is not None
class TestErrorHandling:
"""Tests for error handling."""
@pytest.mark.asyncio
async def test_agent_exception_handled(self):
"""Agent exception is handled."""
graph = create_test_graph(["a", "b"], [("a", "b")])
call_count = 0
async def failing_llm_caller(prompt: str) -> str:
nonlocal call_count
call_count += 1
if call_count == 1:
msg = "Agent error"
raise ValueError(msg)
return "response"
config = RunnerConfig(max_retries=0)
runner = MACPRunner(async_llm_caller=failing_llm_caller, config=config)
result = await runner.arun_round(graph)
# Should not crash, error should be recorded
assert result is not None
@pytest.mark.asyncio
async def test_on_error_fail_policy(self):
"""Error handling with retries."""
graph = create_test_graph(["a"], [])
async def failing_llm_caller(prompt: str) -> str:
msg = "Critical error"
raise RuntimeError(msg)
config = RunnerConfig(max_retries=0, adaptive=True)
runner = MACPRunner(async_llm_caller=failing_llm_caller, config=config)
result = await runner.arun_round(graph)
# Should have error recorded
assert result.errors is not None or "[Error:" in str(result.messages.get("a", ""))
@pytest.mark.asyncio
async def test_on_error_skip_policy(self):
"""Error handling and continuing execution."""
graph = create_test_graph(["a", "b"], [("a", "b")])
call_count = 0
async def maybe_failing_llm_caller(prompt: str) -> str:
nonlocal call_count
call_count += 1
if call_count == 1:
msg = "a failed"
raise RuntimeError(msg)
return "response"
config = RunnerConfig(max_retries=0)
runner = MACPRunner(async_llm_caller=maybe_failing_llm_caller, config=config)
result = await runner.arun_round(graph)
# Should continue to agent b
assert result is not None
class TestMACPResult:
"""Tests for execution result."""
def test_result_structure(self):
"""Result structure."""
result = MACPResult(
messages={"a": "response"},
final_answer="final answer",
final_agent_id="a",
execution_order=["a"],
errors=[],
)
assert result.final_answer == "final answer"
assert result.final_agent_id == "a"
assert result.execution_order == ["a"]
assert result.errors == []
def test_result_with_metrics(self):
"""Result with metrics."""
from datetime import datetime
from execution.errors import ExecutionMetrics
metrics = ExecutionMetrics(
start_time=datetime.now(),
total_agents=1,
total_tokens=500,
)
result = MACPResult(
messages={"a": "response"},
final_answer="answer",
final_agent_id="a",
execution_order=["a"],
errors=[],
metrics=metrics,
total_tokens=500,
total_time=1.234,
)
assert result.total_tokens == 500
assert result.total_time == 1.234
class TestConditionalEdgesAdaptive:
"""Tests for conditional edges in adaptive mode."""
def test_condition_true_executes_target(self):
"""If condition is met — target agent is executed."""
graph = create_test_graph(["a", "b", "c"], [("a", "b"), ("b", "c")])
# Condition: a→b executes only if a responds with "ok"
graph.edge_conditions = {
("a", "b"): lambda ctx: "ok" in ctx.messages.get("a", ""),
}
llm_caller = create_simple_llm_caller("ok response")
config = RunnerConfig(adaptive=True)
runner = MACPRunner(llm_caller=llm_caller, config=config)
result = runner.run_round(graph)
assert "a" in result.execution_order
assert "b" in result.execution_order
def test_condition_false_skips_target(self):
"""If condition is not met — target agent is skipped."""
graph = create_test_graph(["a", "b", "c"], [("a", "b"), ("a", "c")])
# Condition: a→b executes only if a responds with "secret"
graph.edge_conditions = {
("a", "b"): lambda ctx: "secret" in ctx.messages.get("a", ""),
}
llm_caller = create_simple_llm_caller("normal response")
config = RunnerConfig(adaptive=True)
runner = MACPRunner(llm_caller=llm_caller, config=config)
result = runner.run_round(graph)
assert "a" in result.execution_order
# b should be skipped because condition is not met
# c should execute (unconditional edge)
assert "c" in result.execution_order
@pytest.mark.asyncio
async def test_async_conditional_edges(self):
"""Conditional edges work in async mode."""
graph = create_test_graph(["a", "b", "c"], [("a", "b"), ("a", "c")])
graph.edge_conditions = {
("a", "b"): lambda ctx: ctx.source_succeeded(),
}
async_llm_caller = create_simple_async_llm_caller("response")
config = RunnerConfig(adaptive=True)
runner = MACPRunner(async_llm_caller=async_llm_caller, config=config)
result = await runner.arun_round(graph)
assert "a" in result.execution_order
assert "b" in result.execution_order
def test_topology_changed_count(self):
"""topology_changed_count increments when plan changes."""
graph = create_test_graph(["a", "b"], [("a", "b")])
graph.edge_conditions = {
("a", "b"): lambda ctx: ctx.source_succeeded(),
}
llm_caller = create_simple_llm_caller("response")
config = RunnerConfig(adaptive=True)
runner = MACPRunner(llm_caller=llm_caller, config=config)
result = runner.run_round(graph)
assert result is not None
assert isinstance(result.topology_changed_count, int)
def test_multiple_incoming_conditional_edges(self):
"""Multiple incoming conditional edges: B is not skipped until all are evaluated."""
graph = create_test_graph(
["a", "c", "b"],
[("a", "b"), ("c", "b")],
)
# Different callers for different agents
llm_callers = {
"a": lambda _: "fail result",
"c": lambda _: "good result",
"b": lambda _: "final response",
}
# a→b: condition NOT met (no "success" in a's response)
# c→b: condition MET ("good" in c's response)
graph.edge_conditions = {
("a", "b"): lambda ctx: "success" in ctx.messages.get("a", ""),
("c", "b"): lambda ctx: "good" in ctx.messages.get("c", ""),
}
config = RunnerConfig(adaptive=True)
runner = MACPRunner(
llm_caller=lambda _: "default",
llm_callers=llm_callers,
config=config,
)
result = runner.run_round(graph)
# b should execute because c→b condition is met
assert "a" in result.execution_order
assert "c" in result.execution_order
assert "b" in result.execution_order
def test_conditional_edges_with_hidden_states_and_chain(self):
"""
Complex test: conditional edges + hidden states + cascading chain.
Graph:
solver → reviewer → finalize (conditional edge solver→reviewer)
solver → alt_end (unconditional)
Scenario 1 (condition=True): solver("correct") → reviewer → finalize execute.
Scenario 2 (condition=False): solver("wrong") → reviewer + finalize are skipped,
alt_end executes.
Covers:
- Issue 1: hidden states + conditional edges
- Issue 2: plan does not stop after the last agent
- Issue 3: full chain executes after conditional transition
"""
# --- Scenario 1: condition met → full chain ---
graph1 = create_test_graph(
["solver", "reviewer", "finalize", "alt_end"],
[
("solver", "reviewer"),
("reviewer", "finalize"),
("solver", "alt_end"),
],
)
graph1.edge_conditions = {
("solver", "reviewer"): lambda ctx: "correct" in ctx.messages.get("solver", ""),
}
callers1 = {
"solver": lambda _: "answer is correct",
"reviewer": lambda _: "review passed",
"finalize": lambda _: "done",
"alt_end": lambda _: "alt",
}
config = RunnerConfig(adaptive=True)
runner1 = MACPRunner(
llm_caller=lambda _: "default",
llm_callers=callers1,
config=config,
)
result1 = runner1.run_round_with_hidden(graph1)
assert "solver" in result1.execution_order
assert "reviewer" in result1.execution_order
assert "finalize" in result1.execution_order # full chain
assert result1.hidden_states is not None
assert "solver" in result1.hidden_states
# --- Scenario 2: condition NOT met → cascading skip ---
graph2 = create_test_graph(
["solver", "reviewer", "finalize", "alt_end"],
[
("solver", "reviewer"),
("reviewer", "finalize"),
("solver", "alt_end"),
],
)
graph2.edge_conditions = {
("solver", "reviewer"): lambda ctx: "correct" in ctx.messages.get("solver", ""),
}
callers2 = {
"solver": lambda _: "answer is wrong",
"reviewer": lambda _: "review passed",
"finalize": lambda _: "done",
"alt_end": lambda _: "alt ending",
}
runner2 = MACPRunner(
llm_caller=lambda _: "default",
llm_callers=callers2,
config=config,
)
result2 = runner2.run_round_with_hidden(graph2)
assert "solver" in result2.execution_order
assert "reviewer" not in result2.execution_order # skipped
assert "finalize" not in result2.execution_order # cascaded skip
assert "alt_end" in result2.execution_order # unconditional path
if __name__ == "__main__":
pytest.main([__file__, "-v"])