File size: 25,781 Bytes
0d7db8e 5424fe6 0d7db8e ba6dd5f 42e32ed ba6dd5f a196e34 0d7db8e 5424fe6 0d7db8e 42e32ed a196e34 a2ca0e0 ba6dd5f 42e32ed 0d7db8e ba6dd5f 42e32ed 0d7db8e 5424fe6 0d7db8e 42e32ed 0d7db8e 32a601f 0d7db8e 5424fe6 42e32ed a2ca0e0 42e32ed 0d7db8e 0b0d9be ba6dd5f 0d7db8e 42e32ed a2ca0e0 42e32ed 0d7db8e a2ca0e0 f637227 a2ca0e0 f637227 a2ca0e0 f637227 a2ca0e0 0d7db8e 0b0d9be ba6dd5f 42e32ed a2ca0e0 42e32ed 5424fe6 a2ca0e0 a196e34 a2ca0e0 0d0c561 a2ca0e0 0d7db8e a2ca0e0 42e32ed 0d7db8e 42e32ed 0d7db8e 42e32ed a2ca0e0 ce159dc a2ca0e0 ce159dc 6432854 ce159dc a2ca0e0 6432854 a2ca0e0 ce159dc a2ca0e0 ce159dc a2ca0e0 ce159dc a2ca0e0 5424fe6 0d7db8e 5424fe6 0d7db8e 5424fe6 a2ca0e0 5424fe6 a2ca0e0 5424fe6 42e32ed 0b0d9be a2ca0e0 0b0d9be a2ca0e0 0b0d9be a2ca0e0 0b0d9be f6566bb 6432854 0460922 6432854 ade9df5 42e32ed ade9df5 0d7db8e 42e32ed ade9df5 42e32ed 0d7db8e 5424fe6 a196e34 5424fe6 a196e34 5424fe6 a196e34 5424fe6 6432854 a196e34 a2ca0e0 a196e34 0460922 a196e34 5424fe6 ba6dd5f a196e34 ba6dd5f 5424fe6 0d7db8e a2ca0e0 0d7db8e a196e34 0d7db8e f6566bb 0d7db8e f6566bb 0d7db8e 5424fe6 0d7db8e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 | """Conductor β the stage manager who raises the curtain and drives the loop.
The conductor plays two roles:
Initiator (t=0): takes the seed, writes genesis events, configures the
cast. This is where the scenario is translated into running state.
Driver (t>0): each tick it decides who acts, checks the governor, fires
the heartbeat. Pull-based scheduling β the conductor pulls the next unit
of work β gives a natural throttle and a natural pause point.
Scheduling is hybrid:
1. Subscription-based: when an event is appended, agents that declared
that event kind in their manifest.subscribes_to are queued to react.
2. Tick-based: agents with manifest.schedule.tick_every also fire on a
fixed interval regardless of subscriptions.
3. Scenario fallback: if no agent has a manifest, the scenario's legacy
schedule() method is used (backward-compatible with Phase 0/1 scenarios).
Long-running support (ADR-0013):
* Two clocks β wall-clock cadence is the caller's concern; sim-time is the
`turn`. ``step(n_ticks=N)`` advances N sim-ticks in one call, so a wall-clock
cron ("one episode per hour") maps to ``step(n_ticks=60)``.
* ``restore()`` resumes a persisted run from the ledger tail.
* ``snapshot_every`` periodically checkpoints a SQLite-backed ledger.
* Per-agent token usage is metered into the governor for budget enforcement.
The observer is decoupled: the conductor notifies it after every append but
the observer never participates in cognition.
"""
from __future__ import annotations
import logging
import time
from collections import deque
from pathlib import Path
from typing import TYPE_CHECKING
from uuid import uuid4
from src import observability as obs
from src.core.events import Event, normalize_session_id
from src.core.governor import BudgetExceeded, Governor
from src.core.ledger import Ledger
from src.core.projections import StageProjection, rebuild_stage
from src.scenarios.base import Scenario
if TYPE_CHECKING:
from src.agents.base import Agent
from src.core.observer import Observer
logger = logging.getLogger(__name__)
class Conductor:
def __init__(
self,
scenario: Scenario,
governor: Governor | None = None,
ledger: Ledger | None = None,
observer: "Observer | None" = None,
snapshot_every: int | None = None,
snapshot_path: str | Path | None = None,
) -> None:
self.scenario = scenario
self.ledger = ledger or Ledger()
self.governor = governor or Governor()
self.observer = observer
self.snapshot_every = snapshot_every
self.snapshot_path = snapshot_path
self.run_id = str(uuid4())
# The browser/user session driving the current run (normalized, untrusted
# input) β stamped onto every event this conductor appends (see _append).
self.session_id: str | None = None
self.turn = 0
self._trigger_queue: deque[tuple["Agent", Event]] = deque()
# Actors still to act in the CURRENT turn β the queue ``step_one`` drains one
# at a time so the UI can show each agent the moment it responds, instead of
# waiting for the whole turn (ADR-0023). ``step()`` does not use it.
self._pending: deque["Agent"] = deque()
# Agents that failed to act this run, newest last β a single agent's crash
# is isolated (the rest of the cast still acts) and recorded here for the UI
# and tests, never swallowed silently (ADR-0023).
self.agent_errors: list[dict[str, str]] = []
# ββ projection ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
@property
def projection(self) -> StageProjection:
# Run-scoped: the live stage shows only the current run, even though the
# ledger is a shared, append-only store of every run (ADR-0009).
return rebuild_stage(self.ledger.events, self.run_id)
# ββ lifecycle βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _cast_map(self) -> dict[str, dict[str, str | None]]:
"""Snapshot of each agent's model binding, keyed by agent name.
Recorded on ``run.started`` so a run is self-describing β the trace alone
says which models played which parts (handy for sponsor-track receipts).
Agents without a manifest (Phase-0/1 fallback) are reported as unbound.
"""
cast: dict[str, dict[str, str | None]] = {}
for agent in self.scenario.agents:
name = getattr(agent, "name", agent.__class__.__name__)
manifest = getattr(agent, "manifest", None)
endpoint = getattr(manifest, "model_endpoint", None)
profile = getattr(manifest, "model_profile", None)
cast[name] = {
"model_endpoint": endpoint,
"model_profile": profile,
# Resolve the *concrete* model this agent routes to (endpoint key or tier),
# so the trace β and the winner attribution / Hall of Fame downstream β names
# a real model even for profile-bound agents whose ``model_endpoint`` is None.
"model": self._resolve_model_name(agent, endpoint or profile),
}
return cast
@staticmethod
def _resolve_model_name(agent: object, route_key: str | None) -> str | None:
"""Best-effort concrete model name for *agent*'s route key via its router.
Defensive: any resolution hiccup (no router, catalogue unavailable) degrades to
``None`` rather than breaking ``run.started``."""
router = getattr(agent, "router", None)
if router is None or not route_key:
return None
try:
return router.model_for(route_key)
except Exception: # pragma: no cover - never let model resolution break a run start
return None
def reset(self, seed: str, *, session_id: str | None = None) -> None:
# NOTE: we no longer wipe the ledger β it is a shared, persistent, append-only
# store (ADR-0009), so a reset mints a *new* run rather than destroying prior
# ones. Only the in-conductor transient state for the old run is cleared.
#
# ``session_id`` (optional) attributes the run to the browser/user that started
# it β stamped onto ``run.started`` so the per-user Archive can list "my runs"
# without a side table (ADR-0014: every view is a projection of the log).
if self.observer:
self.observer.reset()
self._trigger_queue.clear()
self._pending.clear()
self.agent_errors.clear()
self.run_id = str(uuid4())
# Normalize at the engine boundary: the id originates client-side
# (localStorage), so malformed/oversized values degrade to None here
# rather than reaching the ledger or the memory index.
self.session_id = normalize_session_id(session_id)
self.turn = 0
self.governor.reset()
goal = getattr(self.scenario, "goal", "")
scenario_name = getattr(self.scenario, "name", type(self.scenario).__name__)
cast = self._cast_map()
obs.set_context(run_id=self.run_id, turn=self.turn)
obs.log("run.started", run_id=self.run_id, seed=seed, goal=goal, scenario=scenario_name)
payload: dict = {"seed": seed, "goal": goal, "scenario": scenario_name, "cast": cast}
# Stamp the arena contract so the run is self-describing forever (ADR-0029):
# the leaderboard reads competition.kind to know which runs produce winners
# and how to attribute them. None (legacy/test scenarios) behaves like 'none'.
competition = getattr(self.scenario, "competition", None)
if competition is not None:
payload["competition"] = competition.model_dump()
if self.session_id:
payload["session_id"] = self.session_id
genesis_start = Event(
run_id=self.run_id,
turn=self.turn,
kind="run.started",
actor="conductor",
payload=payload,
)
self._append(genesis_start)
for event in self.scenario.genesis(self.run_id, self.turn, seed):
self._append(event)
def finalize(
self,
reason: str,
*,
winner: str | None = None,
winning_model: str | None = None,
winner_kind: str | None = None,
winning_models: list[str] | None = None,
) -> Event | None:
"""Close the current run with a ``run.finished`` event.
Idempotent-safe: if this run already has a ``run.finished`` event we return
the existing one rather than emitting a duplicate. ``turns`` and ``tokens``
are read from the governor's live counters.
One scoped exception (the curtain call): when the show was cut short by a budget
bound β the run was already finalized ``"budget"`` with no winner β and the judge
*then* rules (``reason == "verdict"`` with a winner), we append a corrective
``run.finished`` so the win is attributed (ADR-0029). The leaderboard reads
``run.finished`` last-wins (``run_index``), so the ruling, not the truncation,
names the winner. Every other repeat call stays a true no-op.
Attribution (ADR-0029): ``winner`` is a cast agent name (``winner_kind:
"agent"``) or a team label (``winner_kind: "team"``). ``winning_model`` keeps
its original meaning β a single cast agent's endpoint, populated only for an
agent winner β while ``winning_models`` lists the endpoint(s) behind the
winner (every member of a winning team). All keys are additive.
"""
existing = [e for e in self.ledger.events_for_run(self.run_id) if e.kind == "run.finished"]
if existing:
prior = existing[-1]
supersedes_budget_close = (
reason == "verdict"
and bool(winner)
and prior.payload.get("reason") == "budget"
and not prior.payload.get("winner")
)
if not supersedes_budget_close:
return prior
stats = self.governor.stats
finished = Event(
run_id=self.run_id,
turn=self.turn,
kind="run.finished",
actor="conductor",
payload={
"reason": reason,
"winner": winner,
"winner_kind": winner_kind,
"winning_model": winning_model,
"winning_models": list(winning_models or []),
"turns": int(stats.get("current_turn", self.turn) or self.turn),
"tokens": int(stats.get("total_tokens", 0) or 0),
},
)
obs.log(
"run.finished",
run_id=self.run_id,
reason=reason,
winner=winner,
winner_kind=winner_kind,
winning_model=winning_model,
turns=finished.payload["turns"],
tokens=finished.payload["tokens"],
)
return self._append(finished)
def restore(self) -> bool:
"""Resume a persisted run: adopt the ledger's run_id and last turn.
The ledger rehydrates its own events from disk (e.g.
``SQLiteLedger.from_file``); this re-points the conductor at that tail so
the next ``step()`` continues the run rather than starting fresh. Returns
True when there was state to restore."""
events = self.ledger.events
if not events:
return False
last = events[-1]
self.run_id = last.run_id
self.turn = last.turn
self._trigger_queue.clear()
self.governor.reset()
return True
def step(self, n_ticks: int = 1) -> None:
"""Advance the simulation by *n_ticks* sim-ticks (default 1).
With an empty ledger, the first tick performs genesis instead of acting
(preserving the original auto-reset behaviour)."""
for _ in range(max(1, n_ticks)):
if not self.ledger.events_for_run(self.run_id):
self.reset(self.scenario.default_seed)
continue
try:
self._tick()
except BudgetExceeded:
# Close the run on the ledger before the stop propagates β a headless
# run that hits a budget bound should still be self-describing.
self.finalize("budget")
raise
self._maybe_snapshot()
def step_one(self) -> bool:
"""Advance exactly ONE actor, opening a new turn when the queue is empty.
This is the streaming counterpart to :meth:`step`: ``step`` runs a whole turn
(every scheduled agent) before returning, so the UI only sees the result once
the last mind has spoken; ``step_one`` produces a single event per call, so each
agent appears the moment it responds. Turn semantics are preserved β a new turn
opens (incrementing ``turn``, checking the governor, queuing this turn's
subscription + tick actors) only when the previous turn's queue drains, and
subscribers an agent triggers are absorbed into the same turn (mirroring the
``_tick`` drain loop).
Returns True when it produced an event (or performed genesis), False when the
opened turn had no actors. May raise :class:`BudgetExceeded` like ``step``."""
if not self.ledger.events_for_run(self.run_id):
self.reset(self.scenario.default_seed)
return True
try:
if not self._pending:
self.turn += 1
self.governor.begin_turn(self.turn)
self.governor.check(self.turn)
obs.set_context(turn=self.turn)
self._pending.extend(agent for agent, _ in self._trigger_queue)
self._trigger_queue.clear()
self._pending.extend(self._tick_scheduled_agents())
if not self._pending:
return False
agent = self._pending.popleft()
self._run_agent(agent, self.projection)
except BudgetExceeded:
self.finalize("budget")
raise
# Absorb subscribers this agent's event just triggered into the current turn,
# so a subscription cascade still resolves within the turn (as in ``_tick``).
while self._trigger_queue:
triggered, _ = self._trigger_queue.popleft()
self._pending.append(triggered)
self._maybe_snapshot()
return True
def peek_next_actor_name(self) -> str | None:
"""Best-effort name of the agent the next :meth:`step_one` will run.
A pure read (it never mutates the queue or the turn) used by the UI to show a
"who's thinkingβ¦" hint while a model call is in flight. Mirrors ``step_one``'s
own pull order: an already-queued agent first, then a subscription-triggered
one, then β when the queue is empty and the next call would open a fresh turn β
the first tick-scheduled agent for ``turn + 1``. Returns ``None`` when nothing
is queued and no agent ticks on the next turn (the show is effectively idle)."""
if self._pending:
return getattr(self._pending[0], "name", None)
if self._trigger_queue:
return getattr(self._trigger_queue[0][0], "name", None)
next_turn = self.turn + 1
for agent in self.scenario.agents:
manifest = getattr(agent, "manifest", None)
if manifest is None:
continue
tick_every = manifest.schedule.tick_every
if tick_every is not None and (tick_every == 0 or next_turn % tick_every == 0):
return getattr(agent, "name", None)
return None
def force_verdict(self) -> Event | None:
"""Cut the show short and have the judge rule *now*, on the whole run.
The curtain call: the visitor pressed "Start judging", or a budget/turn limit
ended the cast's run. We silence the cast (drain any queued or in-flight
competitor turns so no further mind speaks), then run the scenario's judge(s)
so a ``judge.verdict`` β carrying a ``winner`` via the competition handler β
lands and reads every event of this run.
Crucially the judge runs *un-gated*: a verdict must still land even when the
very budget that ended the show is already spent, so the round resolves on a
ruling rather than a silent halt. Idempotent: if this run already has a
verdict, it is returned unchanged. Returns the verdict event, or ``None`` when
the scenario has no judge to rule.
"""
existing = next(
(e for e in self.ledger.events_for_run(self.run_id) if e.kind == "judge.verdict"),
None,
)
if existing is not None:
return existing
judges = [a for a in self.scenario.agents if getattr(getattr(a, "manifest", None), "role", None) == "judge"]
if not judges:
return None
# Silence the cast: no queued subscription/tick competitor acts after this.
self._pending.clear()
self._trigger_queue.clear()
# Advance the sim-clock once to mark the curtain call, but DON'T gate on the
# governor β the judge must rule even when the show ended on a spent budget.
self.turn += 1
obs.set_context(turn=self.turn)
obs.log("run.judging", run_id=self.run_id, turn=self.turn, judges=[getattr(j, "name", "") for j in judges])
projection = self.projection
for judge in judges:
# The curtain call must produce a ruling: tell a reactive judge (one that
# otherwise abstains until its win condition) to rule unconditionally now.
judge._forced = True
try:
self._run_agent(judge, projection, check_budget=False)
finally:
judge._forced = False
return next(
(e for e in reversed(self.ledger.events_for_run(self.run_id)) if e.kind == "judge.verdict"),
None,
)
def inject_user_event(self, text: str, label: str | None = None) -> None:
self.turn += 1
payload: dict[str, str] = {"text": text}
if label:
payload["label"] = label
self._append(
Event(
run_id=self.run_id,
turn=self.turn,
kind="user.injected",
actor="visitor",
payload=payload,
)
)
# ββ internal ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _tick(self) -> None:
self.turn += 1
self.governor.begin_turn(self.turn)
self.governor.check(self.turn)
obs.set_context(turn=self.turn)
projection = self.projection
with obs.span("turn", **{"mal.turn": self.turn}):
# ββ phase 1: event-triggered (subscription) agents ββββββββββββββββ
while self._trigger_queue:
agent, _trigger = self._trigger_queue.popleft()
self._run_agent(agent, projection)
# ββ phase 2: tick-based scheduled agents ββββββββββββββββββββββββββ
for agent in self._tick_scheduled_agents():
self._run_agent(agent, projection)
def _run_agent(self, agent: "Agent", projection: StageProjection, *, check_budget: bool = True) -> None:
if check_budget:
self.governor.check(self.turn) # before the span: a budget stop is not an agent turn
name = getattr(agent, "name", agent.__class__.__name__)
start = time.perf_counter()
with obs.bind(agent=name), obs.span("agent.turn", **{"mal.agent": name, "mal.turn": self.turn}):
try:
event = agent.act(
run_id=self.run_id,
turn=self.turn,
projection=projection,
# Run-scoped: the ledger holds EVERY run (shared store, ADR-0026);
# an agent's memory/context must never recall another run's β or
# another user's β discussion.
recent_events=self.ledger.events_for_run(self.run_id),
)
except BudgetExceeded:
raise # an intentional stop from the governor β never swallow it
except Exception as exc: # noqa: BLE001 β one agent's crash must not silence the cast
self._note_agent_error(agent, exc)
return
# An agent may ABSTAIN by returning None β it was invoked (e.g. a judge woken by
# every spoken line, watching for an end condition) but has nothing to emit this
# time. No event, no budget charge; the show simply continues. Distinct from an
# error: a deliberate "not yet," not a failed turn.
if event is None:
return
usage = getattr(agent, "last_usage", {})
tokens = int(usage.get("total_tokens", 0) or 0)
cost_usd = float(usage.get("cost_usd", 0.0) or 0.0)
obs.add_span_attrs(**{"event.kind": event.kind, "mal.tokens": tokens, "mal.cost_usd": cost_usd})
self.governor.record_call(tokens=tokens, cost_usd=cost_usd)
self._append(event)
projection.apply(event)
obs.record_agent_turn(name, time.perf_counter() - start)
def _note_agent_error(self, agent: "Agent", exc: Exception) -> None:
"""Record (and log) an agent's failed turn without aborting the tick.
Resilience over silence: if one mind throws (a flaky model call, a memory
index hiccup), the others still get their turn this round, and the failure
is visible on ``agent_errors`` rather than crashing the whole loop."""
name = getattr(agent, "name", agent.__class__.__name__)
self.agent_errors.append({"turn": str(self.turn), "agent": name, "error": str(exc)})
logger.warning("agent %s failed on turn %d: %s", name, self.turn, exc, exc_info=exc)
obs.log("agent.error", level="warning", agent=name, turn=self.turn, error=str(exc))
def _maybe_snapshot(self) -> None:
if not self.snapshot_every or not self.snapshot_path:
return
if self.turn % self.snapshot_every != 0:
return
snapshot_to = getattr(self.ledger, "snapshot_to", None)
if callable(snapshot_to):
snapshot_to(self.snapshot_path)
def _append(self, event: Event) -> Event:
# Stamp the session onto the envelope at the single append chokepoint, so
# *every* action in a run is attributable/filterable by who drove it β
# agents and scenarios never have to know sessions exist.
if self.session_id and event.session_id is None:
event = event.model_copy(update={"session_id": self.session_id})
appended = self.ledger.append(event)
obs.log(
"event.append", level="debug", id=appended.id, kind=appended.kind, actor=appended.actor, turn=appended.turn
)
if self.observer:
self.observer.consume(appended)
self._notify_subscribers(appended)
return appended
def _notify_subscribers(self, event: Event) -> None:
"""Queue agents that subscribe to this event kind β but never an agent for its OWN
event.
An agent that both subscribes to and emits a kind (e.g. the devil's-advocate
subscribes to ``agent.spoke`` and now speaks one) would otherwise re-trigger itself
on its own line, cascading until the per-turn call cap trips β starving later
agents (the judge) of their turn. Self-reaction is never intended, so we skip it;
a subscriber still reacts to every *peer's* event."""
for agent in self.scenario.agents:
manifest = getattr(agent, "manifest", None)
if manifest and event.kind in manifest.subscribes_to and event.actor != getattr(agent, "name", None):
self._trigger_queue.append((agent, event))
def _tick_scheduled_agents(self) -> list["Agent"]:
"""Return agents that should fire this turn based on their tick schedule.
Falls back to the scenario's legacy schedule() method for agents
without a manifest β preserving full backward compatibility.
"""
manifest_agents = [a for a in self.scenario.agents if getattr(a, "manifest", None)]
legacy_agents = [a for a in self.scenario.agents if not getattr(a, "manifest", None)]
result: list[Agent] = []
# Manifest-driven tick scheduling
for agent in manifest_agents:
tick_every = agent.manifest.schedule.tick_every
if tick_every is not None and (tick_every == 0 or self.turn % tick_every == 0):
result.append(agent)
# Legacy scenario scheduling (backward-compatible)
if legacy_agents:
scheduled = self.scenario.schedule(self.turn)
result.extend(a for a in scheduled if a in legacy_agents)
return result
|