multi-agent-lab / tests /test_conductor.py
agharsallah
feat: Implement audience-only secret badge for Twenty Sprouts game
f6566bb
Raw
History Blame Contribute Delete
10.1 kB
from __future__ import annotations
from src.core.conductor import Conductor
from src.core.events import Event
from src.core.governor import Governor
from src.core.manifest import AgentManifest, ScheduleConfig
from src.scenarios.base import Scenario
from src.scenarios.thousand_token_wood import build_scenario
def _conductor() -> Conductor:
return Conductor(scenario=build_scenario())
class TestConductorReset:
def test_reset_clears_ledger(self):
c = _conductor()
c.reset("seed-a")
c.reset("seed-b")
kinds = {e.kind for e in c.ledger.events}
assert "run.started" in kinds
assert len(c.ledger.events) < 10 # not accumulating across resets
def test_reset_writes_genesis_events(self):
c = _conductor()
c.reset("forest awakens")
kinds = [e.kind for e in c.ledger.events]
assert "run.started" in kinds
assert "world.observed" in kinds
def test_reset_sets_turn_to_zero(self):
c = _conductor()
c.step()
c.step()
c.reset("fresh start")
assert c.turn == 0
def test_reset_uses_seed_in_event(self):
c = _conductor()
c.reset("unique-seed-xyz")
seed_events = [e for e in c.ledger.events if e.kind == "run.started"]
assert seed_events[0].payload["seed"] == "unique-seed-xyz"
class TestConductorStep:
def test_step_increments_turn(self):
c = _conductor()
c.reset("seed")
initial = c.turn
c.step()
assert c.turn == initial + 1
def test_step_appends_events(self):
c = _conductor()
c.reset("seed")
before = len(c.ledger.events)
c.step()
after = len(c.ledger.events)
assert after > before
def test_multiple_steps_accumulate(self):
c = _conductor()
c.reset("seed")
for _ in range(4):
c.step()
assert len(c.ledger.events) >= 5 # genesis + at least one per step
def test_step_without_reset_auto_resets(self):
c = _conductor()
c.step() # should not raise
assert len(c.ledger.events) > 0
class TestConductorInject:
def test_inject_appends_user_event(self):
c = _conductor()
c.reset("seed")
c.inject_user_event("a silver fish falls upward")
kinds = [e.kind for e in c.ledger.events]
assert "user.injected" in kinds
def test_inject_text_preserved(self):
c = _conductor()
c.reset("seed")
c.inject_user_event("strange message here")
injected = [e for e in c.ledger.events if e.kind == "user.injected"]
assert injected[-1].payload["text"] == "strange message here"
class TestConductorProjection:
def test_projection_reflects_latest_events(self):
c = _conductor()
c.reset("the wood wakes")
proj = c.projection
assert proj.seed == "the wood wakes" or "the wood wakes" in proj.current_scene
class _CostingAgent:
"""Minimal agent that reports a per-call cost — stands in for the live gateway."""
manifest = AgentManifest(
name="coster",
persona="p",
may_emit=["world.observed"],
schedule=ScheduleConfig(tick_every=1),
)
def __init__(self) -> None:
self.last_usage = {"prompt_tokens": 10, "completion_tokens": 5, "total_tokens": 15, "cost_usd": 0.002}
def act(self, run_id, turn, projection, recent_events) -> Event:
return Event(run_id=run_id, turn=turn, kind="world.observed", actor="coster", payload={"text": "x"})
class TestConductorCostMetering:
def test_live_cost_reaches_governor(self):
# On the live path the agent carries real cost on last_usage; the conductor
# must plumb it into the Governor so hourly_budget_usd is enforceable.
scenario = Scenario(name="s", default_seed="seed", agents=(_CostingAgent(),))
c = Conductor(scenario=scenario, governor=Governor())
c.reset("seed")
c.step()
assert c.governor.stats["spend_usd"] > 0
assert c.governor.stats["total_tokens"] >= 15
def test_offline_cost_stays_zero(self):
# The deterministic stub reports no cost; spend must remain 0.
c = _conductor()
c.reset("seed")
c.step()
assert c.governor.stats["spend_usd"] == 0.0
class _ExplodingAgent:
"""An agent whose turn always raises — stands in for a flaky live model call
or a memory-index hiccup (the live failure that silenced the whole spy cast)."""
name = "boom"
manifest = AgentManifest(name="boom", persona="p", may_emit=["agent.spoke"], schedule=ScheduleConfig(tick_every=1))
def __init__(self) -> None:
self.last_usage: dict = {}
def act(self, run_id, turn, projection, recent_events) -> Event:
raise RuntimeError("kaboom")
class _SpeakingAgent:
name = "speaker"
manifest = AgentManifest(
name="speaker", persona="p", may_emit=["agent.spoke"], schedule=ScheduleConfig(tick_every=1)
)
def __init__(self) -> None:
self.last_usage: dict = {}
def act(self, run_id, turn, projection, recent_events) -> Event:
return Event(run_id=run_id, turn=turn, kind="agent.spoke", actor="speaker", payload={"text": "hi"})
class TestConductorResilience:
def test_one_agent_crash_does_not_silence_the_cast(self):
# boom is scheduled FIRST: the old loop aborted the tick after it raised,
# so every later agent went silent (the "only spy-cara talks" symptom).
scenario = Scenario(name="s", default_seed="seed", agents=(_ExplodingAgent(), _SpeakingAgent()))
c = Conductor(scenario=scenario, governor=Governor())
c.reset("seed")
c.step()
spoke = [e for e in c.ledger.events if e.kind == "agent.spoke"]
assert any(e.actor == "speaker" for e in spoke), "the rest of the cast must still act"
assert c.agent_errors and c.agent_errors[-1]["agent"] == "boom"
def test_budget_exceeded_still_propagates(self):
# Resilience must not swallow the governor's intentional stop: with a
# per-turn cap of 1, the SECOND agent trips BudgetExceeded inside
# _run_agent — exactly the branch resilience must re-raise, not absorb.
import pytest
from src.core.governor import BudgetExceeded
scenario = Scenario(name="s", default_seed="seed", agents=(_SpeakingAgent(), _SpeakingAgent()))
c = Conductor(scenario=scenario, governor=Governor(max_calls_per_turn=1))
c.reset("seed")
with pytest.raises(BudgetExceeded):
c.step()
class TestConductorStepOne:
"""``step_one`` streams a single agent per call so the UI shows each mind as it
responds, while preserving turn semantics and per-agent error isolation."""
def test_one_event_per_call_with_turn_rollover(self):
scenario = Scenario(name="s", default_seed="seed", agents=(_SpeakingAgent(), _SpeakingAgent()))
c = Conductor(scenario=scenario, governor=Governor())
c.reset("seed")
base = len(c.ledger.events)
c.step_one() # turn 1, first actor
assert len(c.ledger.events) == base + 1
assert c.turn == 1
c.step_one() # turn 1, second actor — still the same turn
assert len(c.ledger.events) == base + 2
assert c.turn == 1
c.step_one() # queue drained → a NEW turn opens
assert len(c.ledger.events) == base + 3
assert c.turn == 2
def test_step_one_isolates_a_failing_agent(self):
# boom is first: its failed call produces no event but must not block the speaker.
scenario = Scenario(name="s", default_seed="seed", agents=(_ExplodingAgent(), _SpeakingAgent()))
c = Conductor(scenario=scenario, governor=Governor())
c.reset("seed")
base = len(c.ledger.events)
c.step_one() # pops boom → raises internally → recorded, no event appended
assert len(c.ledger.events) == base
assert c.agent_errors and c.agent_errors[-1]["agent"] == "boom"
c.step_one() # pops the speaker → one real event
assert len(c.ledger.events) == base + 1
assert c.ledger.events[-1].actor == "speaker"
def test_step_one_performs_genesis_on_empty_ledger(self):
c = _conductor()
assert c.step_one() is True
kinds = {e.kind for e in c.ledger.events}
assert "run.started" in kinds and "world.observed" in kinds
class TestPeekNextActor:
"""``peek_next_actor_name`` powers the Show's "who's thinking…" hint: a pure read
that names whoever the next ``step_one`` will run, without advancing the run."""
def test_peeks_the_first_queued_agent(self):
# Two tick-every-1 agents: after the first step_one pops the first, the second
# sits queued for this same turn — peek must name it, not re-open a turn.
scenario = Scenario(name="s", default_seed="seed", agents=(_SpeakingAgent(), _ExplodingAgent()))
c = Conductor(scenario=scenario, governor=Governor())
c.reset("seed")
c.step_one() # pops "speaker"; "boom" remains queued for turn 1
assert c.peek_next_actor_name() == "boom"
def test_peek_does_not_advance_the_run(self):
scenario = Scenario(name="s", default_seed="seed", agents=(_SpeakingAgent(),))
c = Conductor(scenario=scenario, governor=Governor())
c.reset("seed")
before_events, before_turn = len(c.ledger.events), c.turn
c.peek_next_actor_name()
c.peek_next_actor_name()
assert (len(c.ledger.events), c.turn) == (before_events, before_turn)
def test_peeks_next_turns_tick_actor_when_queue_is_empty(self):
# Queue drained between turns: peek looks ahead to who ticks on turn+1.
scenario = Scenario(name="s", default_seed="seed", agents=(_SpeakingAgent(),))
c = Conductor(scenario=scenario, governor=Governor())
c.reset("seed") # turn 0, nothing queued; "speaker" ticks every turn
assert c.peek_next_actor_name() == "speaker"