Spaces:
Running on Zero
Running on Zero
| from __future__ import annotations | |
| from src.core.conductor import Conductor | |
| from src.core.events import Event | |
| from src.core.governor import Governor | |
| from src.core.manifest import AgentManifest, ScheduleConfig | |
| from src.scenarios.base import Scenario | |
| from src.scenarios.thousand_token_wood import build_scenario | |
| def _conductor() -> Conductor: | |
| return Conductor(scenario=build_scenario()) | |
| class TestConductorReset: | |
| def test_reset_clears_ledger(self): | |
| c = _conductor() | |
| c.reset("seed-a") | |
| c.reset("seed-b") | |
| kinds = {e.kind for e in c.ledger.events} | |
| assert "run.started" in kinds | |
| assert len(c.ledger.events) < 10 # not accumulating across resets | |
| def test_reset_writes_genesis_events(self): | |
| c = _conductor() | |
| c.reset("forest awakens") | |
| kinds = [e.kind for e in c.ledger.events] | |
| assert "run.started" in kinds | |
| assert "world.observed" in kinds | |
| def test_reset_sets_turn_to_zero(self): | |
| c = _conductor() | |
| c.step() | |
| c.step() | |
| c.reset("fresh start") | |
| assert c.turn == 0 | |
| def test_reset_uses_seed_in_event(self): | |
| c = _conductor() | |
| c.reset("unique-seed-xyz") | |
| seed_events = [e for e in c.ledger.events if e.kind == "run.started"] | |
| assert seed_events[0].payload["seed"] == "unique-seed-xyz" | |
| class TestConductorStep: | |
| def test_step_increments_turn(self): | |
| c = _conductor() | |
| c.reset("seed") | |
| initial = c.turn | |
| c.step() | |
| assert c.turn == initial + 1 | |
| def test_step_appends_events(self): | |
| c = _conductor() | |
| c.reset("seed") | |
| before = len(c.ledger.events) | |
| c.step() | |
| after = len(c.ledger.events) | |
| assert after > before | |
| def test_multiple_steps_accumulate(self): | |
| c = _conductor() | |
| c.reset("seed") | |
| for _ in range(4): | |
| c.step() | |
| assert len(c.ledger.events) >= 5 # genesis + at least one per step | |
| def test_step_without_reset_auto_resets(self): | |
| c = _conductor() | |
| c.step() # should not raise | |
| assert len(c.ledger.events) > 0 | |
| class TestConductorInject: | |
| def test_inject_appends_user_event(self): | |
| c = _conductor() | |
| c.reset("seed") | |
| c.inject_user_event("a silver fish falls upward") | |
| kinds = [e.kind for e in c.ledger.events] | |
| assert "user.injected" in kinds | |
| def test_inject_text_preserved(self): | |
| c = _conductor() | |
| c.reset("seed") | |
| c.inject_user_event("strange message here") | |
| injected = [e for e in c.ledger.events if e.kind == "user.injected"] | |
| assert injected[-1].payload["text"] == "strange message here" | |
| class TestConductorProjection: | |
| def test_projection_reflects_latest_events(self): | |
| c = _conductor() | |
| c.reset("the wood wakes") | |
| proj = c.projection | |
| assert proj.seed == "the wood wakes" or "the wood wakes" in proj.current_scene | |
| class _CostingAgent: | |
| """Minimal agent that reports a per-call cost — stands in for the live gateway.""" | |
| manifest = AgentManifest( | |
| name="coster", | |
| persona="p", | |
| may_emit=["world.observed"], | |
| schedule=ScheduleConfig(tick_every=1), | |
| ) | |
| def __init__(self) -> None: | |
| self.last_usage = {"prompt_tokens": 10, "completion_tokens": 5, "total_tokens": 15, "cost_usd": 0.002} | |
| def act(self, run_id, turn, projection, recent_events) -> Event: | |
| return Event(run_id=run_id, turn=turn, kind="world.observed", actor="coster", payload={"text": "x"}) | |
| class TestConductorCostMetering: | |
| def test_live_cost_reaches_governor(self): | |
| # On the live path the agent carries real cost on last_usage; the conductor | |
| # must plumb it into the Governor so hourly_budget_usd is enforceable. | |
| scenario = Scenario(name="s", default_seed="seed", agents=(_CostingAgent(),)) | |
| c = Conductor(scenario=scenario, governor=Governor()) | |
| c.reset("seed") | |
| c.step() | |
| assert c.governor.stats["spend_usd"] > 0 | |
| assert c.governor.stats["total_tokens"] >= 15 | |
| def test_offline_cost_stays_zero(self): | |
| # The deterministic stub reports no cost; spend must remain 0. | |
| c = _conductor() | |
| c.reset("seed") | |
| c.step() | |
| assert c.governor.stats["spend_usd"] == 0.0 | |
| class _ExplodingAgent: | |
| """An agent whose turn always raises — stands in for a flaky live model call | |
| or a memory-index hiccup (the live failure that silenced the whole spy cast).""" | |
| name = "boom" | |
| manifest = AgentManifest(name="boom", persona="p", may_emit=["agent.spoke"], schedule=ScheduleConfig(tick_every=1)) | |
| def __init__(self) -> None: | |
| self.last_usage: dict = {} | |
| def act(self, run_id, turn, projection, recent_events) -> Event: | |
| raise RuntimeError("kaboom") | |
| class _SpeakingAgent: | |
| name = "speaker" | |
| manifest = AgentManifest( | |
| name="speaker", persona="p", may_emit=["agent.spoke"], schedule=ScheduleConfig(tick_every=1) | |
| ) | |
| def __init__(self) -> None: | |
| self.last_usage: dict = {} | |
| def act(self, run_id, turn, projection, recent_events) -> Event: | |
| return Event(run_id=run_id, turn=turn, kind="agent.spoke", actor="speaker", payload={"text": "hi"}) | |
| class TestConductorResilience: | |
| def test_one_agent_crash_does_not_silence_the_cast(self): | |
| # boom is scheduled FIRST: the old loop aborted the tick after it raised, | |
| # so every later agent went silent (the "only spy-cara talks" symptom). | |
| scenario = Scenario(name="s", default_seed="seed", agents=(_ExplodingAgent(), _SpeakingAgent())) | |
| c = Conductor(scenario=scenario, governor=Governor()) | |
| c.reset("seed") | |
| c.step() | |
| spoke = [e for e in c.ledger.events if e.kind == "agent.spoke"] | |
| assert any(e.actor == "speaker" for e in spoke), "the rest of the cast must still act" | |
| assert c.agent_errors and c.agent_errors[-1]["agent"] == "boom" | |
| def test_budget_exceeded_still_propagates(self): | |
| # Resilience must not swallow the governor's intentional stop: with a | |
| # per-turn cap of 1, the SECOND agent trips BudgetExceeded inside | |
| # _run_agent — exactly the branch resilience must re-raise, not absorb. | |
| import pytest | |
| from src.core.governor import BudgetExceeded | |
| scenario = Scenario(name="s", default_seed="seed", agents=(_SpeakingAgent(), _SpeakingAgent())) | |
| c = Conductor(scenario=scenario, governor=Governor(max_calls_per_turn=1)) | |
| c.reset("seed") | |
| with pytest.raises(BudgetExceeded): | |
| c.step() | |
| class TestConductorStepOne: | |
| """``step_one`` streams a single agent per call so the UI shows each mind as it | |
| responds, while preserving turn semantics and per-agent error isolation.""" | |
| def test_one_event_per_call_with_turn_rollover(self): | |
| scenario = Scenario(name="s", default_seed="seed", agents=(_SpeakingAgent(), _SpeakingAgent())) | |
| c = Conductor(scenario=scenario, governor=Governor()) | |
| c.reset("seed") | |
| base = len(c.ledger.events) | |
| c.step_one() # turn 1, first actor | |
| assert len(c.ledger.events) == base + 1 | |
| assert c.turn == 1 | |
| c.step_one() # turn 1, second actor — still the same turn | |
| assert len(c.ledger.events) == base + 2 | |
| assert c.turn == 1 | |
| c.step_one() # queue drained → a NEW turn opens | |
| assert len(c.ledger.events) == base + 3 | |
| assert c.turn == 2 | |
| def test_step_one_isolates_a_failing_agent(self): | |
| # boom is first: its failed call produces no event but must not block the speaker. | |
| scenario = Scenario(name="s", default_seed="seed", agents=(_ExplodingAgent(), _SpeakingAgent())) | |
| c = Conductor(scenario=scenario, governor=Governor()) | |
| c.reset("seed") | |
| base = len(c.ledger.events) | |
| c.step_one() # pops boom → raises internally → recorded, no event appended | |
| assert len(c.ledger.events) == base | |
| assert c.agent_errors and c.agent_errors[-1]["agent"] == "boom" | |
| c.step_one() # pops the speaker → one real event | |
| assert len(c.ledger.events) == base + 1 | |
| assert c.ledger.events[-1].actor == "speaker" | |
| def test_step_one_performs_genesis_on_empty_ledger(self): | |
| c = _conductor() | |
| assert c.step_one() is True | |
| kinds = {e.kind for e in c.ledger.events} | |
| assert "run.started" in kinds and "world.observed" in kinds | |
| class TestPeekNextActor: | |
| """``peek_next_actor_name`` powers the Show's "who's thinking…" hint: a pure read | |
| that names whoever the next ``step_one`` will run, without advancing the run.""" | |
| def test_peeks_the_first_queued_agent(self): | |
| # Two tick-every-1 agents: after the first step_one pops the first, the second | |
| # sits queued for this same turn — peek must name it, not re-open a turn. | |
| scenario = Scenario(name="s", default_seed="seed", agents=(_SpeakingAgent(), _ExplodingAgent())) | |
| c = Conductor(scenario=scenario, governor=Governor()) | |
| c.reset("seed") | |
| c.step_one() # pops "speaker"; "boom" remains queued for turn 1 | |
| assert c.peek_next_actor_name() == "boom" | |
| def test_peek_does_not_advance_the_run(self): | |
| scenario = Scenario(name="s", default_seed="seed", agents=(_SpeakingAgent(),)) | |
| c = Conductor(scenario=scenario, governor=Governor()) | |
| c.reset("seed") | |
| before_events, before_turn = len(c.ledger.events), c.turn | |
| c.peek_next_actor_name() | |
| c.peek_next_actor_name() | |
| assert (len(c.ledger.events), c.turn) == (before_events, before_turn) | |
| def test_peeks_next_turns_tick_actor_when_queue_is_empty(self): | |
| # Queue drained between turns: peek looks ahead to who ticks on turn+1. | |
| scenario = Scenario(name="s", default_seed="seed", agents=(_SpeakingAgent(),)) | |
| c = Conductor(scenario=scenario, governor=Governor()) | |
| c.reset("seed") # turn 0, nothing queued; "speaker" ticks every turn | |
| assert c.peek_next_actor_name() == "speaker" | |