File size: 25,781 Bytes
0d7db8e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5424fe6
 
 
 
 
 
 
 
0d7db8e
 
 
ba6dd5f
42e32ed
 
ba6dd5f
a196e34
0d7db8e
5424fe6
0d7db8e
42e32ed
 
a196e34
a2ca0e0
ba6dd5f
42e32ed
 
 
 
0d7db8e
 
 
 
ba6dd5f
 
42e32ed
 
0d7db8e
 
 
 
 
 
5424fe6
 
0d7db8e
42e32ed
0d7db8e
32a601f
0d7db8e
5424fe6
 
42e32ed
a2ca0e0
 
 
42e32ed
0d7db8e
0b0d9be
 
 
 
ba6dd5f
 
 
 
0d7db8e
 
42e32ed
 
 
a2ca0e0
 
 
42e32ed
0d7db8e
 
a2ca0e0
 
 
 
 
 
 
 
 
 
 
f637227
 
a2ca0e0
f637227
 
 
 
 
 
a2ca0e0
 
 
f637227
 
 
 
 
 
 
 
 
 
 
 
 
 
a2ca0e0
 
 
 
 
 
 
 
0d7db8e
 
 
0b0d9be
ba6dd5f
42e32ed
a2ca0e0
 
 
 
42e32ed
5424fe6
a2ca0e0
 
 
a196e34
a2ca0e0
 
0d0c561
 
 
 
 
 
a2ca0e0
 
0d7db8e
 
 
 
 
a2ca0e0
42e32ed
0d7db8e
42e32ed
0d7db8e
42e32ed
a2ca0e0
 
 
 
 
 
ce159dc
 
a2ca0e0
 
 
 
 
 
ce159dc
6432854
 
 
 
 
 
 
ce159dc
 
 
 
 
a2ca0e0
 
 
6432854
 
 
 
 
 
 
 
 
a2ca0e0
 
 
 
 
 
 
 
 
ce159dc
a2ca0e0
ce159dc
a2ca0e0
 
 
 
 
 
 
 
 
ce159dc
a2ca0e0
 
 
 
 
 
5424fe6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0d7db8e
5424fe6
 
0d7db8e
5424fe6
 
 
a2ca0e0
5424fe6
 
a2ca0e0
 
 
 
 
 
 
5424fe6
42e32ed
0b0d9be
 
 
 
 
 
 
 
 
 
 
 
 
 
a2ca0e0
0b0d9be
 
 
a2ca0e0
0b0d9be
a2ca0e0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0b0d9be
 
 
 
 
 
 
 
f6566bb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6432854
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0460922
 
 
 
 
 
 
6432854
 
 
 
 
ade9df5
42e32ed
ade9df5
 
 
0d7db8e
42e32ed
 
 
 
 
ade9df5
42e32ed
 
0d7db8e
 
 
5424fe6
 
 
 
a196e34
5424fe6
 
 
a196e34
 
 
 
 
5424fe6
a196e34
 
 
5424fe6
6432854
 
 
a196e34
 
 
 
 
 
 
 
a2ca0e0
 
 
 
a196e34
 
 
 
 
 
0460922
 
 
 
 
 
a196e34
 
 
 
 
 
 
 
5424fe6
ba6dd5f
 
 
 
 
 
 
 
 
a196e34
ba6dd5f
5424fe6
 
 
 
 
 
 
 
 
0d7db8e
a2ca0e0
 
 
 
 
0d7db8e
a196e34
 
 
0d7db8e
 
 
 
 
 
f6566bb
 
 
 
 
 
 
 
0d7db8e
 
f6566bb
0d7db8e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5424fe6
0d7db8e
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
"""Conductor β€” the stage manager who raises the curtain and drives the loop.

The conductor plays two roles:

  Initiator (t=0): takes the seed, writes genesis events, configures the
  cast.  This is where the scenario is translated into running state.

  Driver (t>0): each tick it decides who acts, checks the governor, fires
  the heartbeat.  Pull-based scheduling β€” the conductor pulls the next unit
  of work β€” gives a natural throttle and a natural pause point.

Scheduling is hybrid:
  1. Subscription-based: when an event is appended, agents that declared
     that event kind in their manifest.subscribes_to are queued to react.
  2. Tick-based: agents with manifest.schedule.tick_every also fire on a
     fixed interval regardless of subscriptions.
  3. Scenario fallback: if no agent has a manifest, the scenario's legacy
     schedule() method is used (backward-compatible with Phase 0/1 scenarios).

Long-running support (ADR-0013):
  * Two clocks β€” wall-clock cadence is the caller's concern; sim-time is the
    `turn`.  ``step(n_ticks=N)`` advances N sim-ticks in one call, so a wall-clock
    cron ("one episode per hour") maps to ``step(n_ticks=60)``.
  * ``restore()`` resumes a persisted run from the ledger tail.
  * ``snapshot_every`` periodically checkpoints a SQLite-backed ledger.
  * Per-agent token usage is metered into the governor for budget enforcement.

The observer is decoupled: the conductor notifies it after every append but
the observer never participates in cognition.
"""

from __future__ import annotations

import logging
import time
from collections import deque
from pathlib import Path
from typing import TYPE_CHECKING
from uuid import uuid4

from src import observability as obs
from src.core.events import Event, normalize_session_id
from src.core.governor import BudgetExceeded, Governor
from src.core.ledger import Ledger
from src.core.projections import StageProjection, rebuild_stage
from src.scenarios.base import Scenario

if TYPE_CHECKING:
    from src.agents.base import Agent
    from src.core.observer import Observer

logger = logging.getLogger(__name__)


class Conductor:
    def __init__(
        self,
        scenario: Scenario,
        governor: Governor | None = None,
        ledger: Ledger | None = None,
        observer: "Observer | None" = None,
        snapshot_every: int | None = None,
        snapshot_path: str | Path | None = None,
    ) -> None:
        self.scenario = scenario
        self.ledger = ledger or Ledger()
        self.governor = governor or Governor()
        self.observer = observer
        self.snapshot_every = snapshot_every
        self.snapshot_path = snapshot_path
        self.run_id = str(uuid4())
        # The browser/user session driving the current run (normalized, untrusted
        # input) β€” stamped onto every event this conductor appends (see _append).
        self.session_id: str | None = None
        self.turn = 0
        self._trigger_queue: deque[tuple["Agent", Event]] = deque()
        # Actors still to act in the CURRENT turn β€” the queue ``step_one`` drains one
        # at a time so the UI can show each agent the moment it responds, instead of
        # waiting for the whole turn (ADR-0023).  ``step()`` does not use it.
        self._pending: deque["Agent"] = deque()
        # Agents that failed to act this run, newest last β€” a single agent's crash
        # is isolated (the rest of the cast still acts) and recorded here for the UI
        # and tests, never swallowed silently (ADR-0023).
        self.agent_errors: list[dict[str, str]] = []

    # ── projection ────────────────────────────────────────────────────────────

    @property
    def projection(self) -> StageProjection:
        # Run-scoped: the live stage shows only the current run, even though the
        # ledger is a shared, append-only store of every run (ADR-0009).
        return rebuild_stage(self.ledger.events, self.run_id)

    # ── lifecycle ─────────────────────────────────────────────────────────────

    def _cast_map(self) -> dict[str, dict[str, str | None]]:
        """Snapshot of each agent's model binding, keyed by agent name.

        Recorded on ``run.started`` so a run is self-describing β€” the trace alone
        says which models played which parts (handy for sponsor-track receipts).
        Agents without a manifest (Phase-0/1 fallback) are reported as unbound.
        """
        cast: dict[str, dict[str, str | None]] = {}
        for agent in self.scenario.agents:
            name = getattr(agent, "name", agent.__class__.__name__)
            manifest = getattr(agent, "manifest", None)
            endpoint = getattr(manifest, "model_endpoint", None)
            profile = getattr(manifest, "model_profile", None)
            cast[name] = {
                "model_endpoint": endpoint,
                "model_profile": profile,
                # Resolve the *concrete* model this agent routes to (endpoint key or tier),
                # so the trace β€” and the winner attribution / Hall of Fame downstream β€” names
                # a real model even for profile-bound agents whose ``model_endpoint`` is None.
                "model": self._resolve_model_name(agent, endpoint or profile),
            }
        return cast

    @staticmethod
    def _resolve_model_name(agent: object, route_key: str | None) -> str | None:
        """Best-effort concrete model name for *agent*'s route key via its router.

        Defensive: any resolution hiccup (no router, catalogue unavailable) degrades to
        ``None`` rather than breaking ``run.started``."""
        router = getattr(agent, "router", None)
        if router is None or not route_key:
            return None
        try:
            return router.model_for(route_key)
        except Exception:  # pragma: no cover - never let model resolution break a run start
            return None

    def reset(self, seed: str, *, session_id: str | None = None) -> None:
        # NOTE: we no longer wipe the ledger β€” it is a shared, persistent, append-only
        # store (ADR-0009), so a reset mints a *new* run rather than destroying prior
        # ones.  Only the in-conductor transient state for the old run is cleared.
        #
        # ``session_id`` (optional) attributes the run to the browser/user that started
        # it β€” stamped onto ``run.started`` so the per-user Archive can list "my runs"
        # without a side table (ADR-0014: every view is a projection of the log).
        if self.observer:
            self.observer.reset()
        self._trigger_queue.clear()
        self._pending.clear()
        self.agent_errors.clear()
        self.run_id = str(uuid4())
        # Normalize at the engine boundary: the id originates client-side
        # (localStorage), so malformed/oversized values degrade to None here
        # rather than reaching the ledger or the memory index.
        self.session_id = normalize_session_id(session_id)
        self.turn = 0
        self.governor.reset()
        goal = getattr(self.scenario, "goal", "")
        scenario_name = getattr(self.scenario, "name", type(self.scenario).__name__)
        cast = self._cast_map()
        obs.set_context(run_id=self.run_id, turn=self.turn)
        obs.log("run.started", run_id=self.run_id, seed=seed, goal=goal, scenario=scenario_name)
        payload: dict = {"seed": seed, "goal": goal, "scenario": scenario_name, "cast": cast}
        # Stamp the arena contract so the run is self-describing forever (ADR-0029):
        # the leaderboard reads competition.kind to know which runs produce winners
        # and how to attribute them.  None (legacy/test scenarios) behaves like 'none'.
        competition = getattr(self.scenario, "competition", None)
        if competition is not None:
            payload["competition"] = competition.model_dump()
        if self.session_id:
            payload["session_id"] = self.session_id
        genesis_start = Event(
            run_id=self.run_id,
            turn=self.turn,
            kind="run.started",
            actor="conductor",
            payload=payload,
        )
        self._append(genesis_start)
        for event in self.scenario.genesis(self.run_id, self.turn, seed):
            self._append(event)

    def finalize(
        self,
        reason: str,
        *,
        winner: str | None = None,
        winning_model: str | None = None,
        winner_kind: str | None = None,
        winning_models: list[str] | None = None,
    ) -> Event | None:
        """Close the current run with a ``run.finished`` event.

        Idempotent-safe: if this run already has a ``run.finished`` event we return
        the existing one rather than emitting a duplicate.  ``turns`` and ``tokens``
        are read from the governor's live counters.

        One scoped exception (the curtain call): when the show was cut short by a budget
        bound β€” the run was already finalized ``"budget"`` with no winner β€” and the judge
        *then* rules (``reason == "verdict"`` with a winner), we append a corrective
        ``run.finished`` so the win is attributed (ADR-0029).  The leaderboard reads
        ``run.finished`` last-wins (``run_index``), so the ruling, not the truncation,
        names the winner.  Every other repeat call stays a true no-op.

        Attribution (ADR-0029): ``winner`` is a cast agent name (``winner_kind:
        "agent"``) or a team label (``winner_kind: "team"``).  ``winning_model`` keeps
        its original meaning β€” a single cast agent's endpoint, populated only for an
        agent winner β€” while ``winning_models`` lists the endpoint(s) behind the
        winner (every member of a winning team).  All keys are additive.
        """
        existing = [e for e in self.ledger.events_for_run(self.run_id) if e.kind == "run.finished"]
        if existing:
            prior = existing[-1]
            supersedes_budget_close = (
                reason == "verdict"
                and bool(winner)
                and prior.payload.get("reason") == "budget"
                and not prior.payload.get("winner")
            )
            if not supersedes_budget_close:
                return prior
        stats = self.governor.stats
        finished = Event(
            run_id=self.run_id,
            turn=self.turn,
            kind="run.finished",
            actor="conductor",
            payload={
                "reason": reason,
                "winner": winner,
                "winner_kind": winner_kind,
                "winning_model": winning_model,
                "winning_models": list(winning_models or []),
                "turns": int(stats.get("current_turn", self.turn) or self.turn),
                "tokens": int(stats.get("total_tokens", 0) or 0),
            },
        )
        obs.log(
            "run.finished",
            run_id=self.run_id,
            reason=reason,
            winner=winner,
            winner_kind=winner_kind,
            winning_model=winning_model,
            turns=finished.payload["turns"],
            tokens=finished.payload["tokens"],
        )
        return self._append(finished)

    def restore(self) -> bool:
        """Resume a persisted run: adopt the ledger's run_id and last turn.

        The ledger rehydrates its own events from disk (e.g.
        ``SQLiteLedger.from_file``); this re-points the conductor at that tail so
        the next ``step()`` continues the run rather than starting fresh.  Returns
        True when there was state to restore."""
        events = self.ledger.events
        if not events:
            return False
        last = events[-1]
        self.run_id = last.run_id
        self.turn = last.turn
        self._trigger_queue.clear()
        self.governor.reset()
        return True

    def step(self, n_ticks: int = 1) -> None:
        """Advance the simulation by *n_ticks* sim-ticks (default 1).

        With an empty ledger, the first tick performs genesis instead of acting
        (preserving the original auto-reset behaviour)."""
        for _ in range(max(1, n_ticks)):
            if not self.ledger.events_for_run(self.run_id):
                self.reset(self.scenario.default_seed)
                continue
            try:
                self._tick()
            except BudgetExceeded:
                # Close the run on the ledger before the stop propagates β€” a headless
                # run that hits a budget bound should still be self-describing.
                self.finalize("budget")
                raise
            self._maybe_snapshot()

    def step_one(self) -> bool:
        """Advance exactly ONE actor, opening a new turn when the queue is empty.

        This is the streaming counterpart to :meth:`step`: ``step`` runs a whole turn
        (every scheduled agent) before returning, so the UI only sees the result once
        the last mind has spoken; ``step_one`` produces a single event per call, so each
        agent appears the moment it responds.  Turn semantics are preserved β€” a new turn
        opens (incrementing ``turn``, checking the governor, queuing this turn's
        subscription + tick actors) only when the previous turn's queue drains, and
        subscribers an agent triggers are absorbed into the same turn (mirroring the
        ``_tick`` drain loop).

        Returns True when it produced an event (or performed genesis), False when the
        opened turn had no actors.  May raise :class:`BudgetExceeded` like ``step``."""
        if not self.ledger.events_for_run(self.run_id):
            self.reset(self.scenario.default_seed)
            return True

        try:
            if not self._pending:
                self.turn += 1
                self.governor.begin_turn(self.turn)
                self.governor.check(self.turn)
                obs.set_context(turn=self.turn)
                self._pending.extend(agent for agent, _ in self._trigger_queue)
                self._trigger_queue.clear()
                self._pending.extend(self._tick_scheduled_agents())
                if not self._pending:
                    return False

            agent = self._pending.popleft()
            self._run_agent(agent, self.projection)
        except BudgetExceeded:
            self.finalize("budget")
            raise
        # Absorb subscribers this agent's event just triggered into the current turn,
        # so a subscription cascade still resolves within the turn (as in ``_tick``).
        while self._trigger_queue:
            triggered, _ = self._trigger_queue.popleft()
            self._pending.append(triggered)
        self._maybe_snapshot()
        return True

    def peek_next_actor_name(self) -> str | None:
        """Best-effort name of the agent the next :meth:`step_one` will run.

        A pure read (it never mutates the queue or the turn) used by the UI to show a
        "who's thinking…" hint while a model call is in flight.  Mirrors ``step_one``'s
        own pull order: an already-queued agent first, then a subscription-triggered
        one, then β€” when the queue is empty and the next call would open a fresh turn β€”
        the first tick-scheduled agent for ``turn + 1``.  Returns ``None`` when nothing
        is queued and no agent ticks on the next turn (the show is effectively idle)."""
        if self._pending:
            return getattr(self._pending[0], "name", None)
        if self._trigger_queue:
            return getattr(self._trigger_queue[0][0], "name", None)
        next_turn = self.turn + 1
        for agent in self.scenario.agents:
            manifest = getattr(agent, "manifest", None)
            if manifest is None:
                continue
            tick_every = manifest.schedule.tick_every
            if tick_every is not None and (tick_every == 0 or next_turn % tick_every == 0):
                return getattr(agent, "name", None)
        return None

    def force_verdict(self) -> Event | None:
        """Cut the show short and have the judge rule *now*, on the whole run.

        The curtain call: the visitor pressed "Start judging", or a budget/turn limit
        ended the cast's run.  We silence the cast (drain any queued or in-flight
        competitor turns so no further mind speaks), then run the scenario's judge(s)
        so a ``judge.verdict`` β€” carrying a ``winner`` via the competition handler β€”
        lands and reads every event of this run.

        Crucially the judge runs *un-gated*: a verdict must still land even when the
        very budget that ended the show is already spent, so the round resolves on a
        ruling rather than a silent halt.  Idempotent: if this run already has a
        verdict, it is returned unchanged.  Returns the verdict event, or ``None`` when
        the scenario has no judge to rule.
        """
        existing = next(
            (e for e in self.ledger.events_for_run(self.run_id) if e.kind == "judge.verdict"),
            None,
        )
        if existing is not None:
            return existing
        judges = [a for a in self.scenario.agents if getattr(getattr(a, "manifest", None), "role", None) == "judge"]
        if not judges:
            return None
        # Silence the cast: no queued subscription/tick competitor acts after this.
        self._pending.clear()
        self._trigger_queue.clear()
        # Advance the sim-clock once to mark the curtain call, but DON'T gate on the
        # governor β€” the judge must rule even when the show ended on a spent budget.
        self.turn += 1
        obs.set_context(turn=self.turn)
        obs.log("run.judging", run_id=self.run_id, turn=self.turn, judges=[getattr(j, "name", "") for j in judges])
        projection = self.projection
        for judge in judges:
            # The curtain call must produce a ruling: tell a reactive judge (one that
            # otherwise abstains until its win condition) to rule unconditionally now.
            judge._forced = True
            try:
                self._run_agent(judge, projection, check_budget=False)
            finally:
                judge._forced = False
        return next(
            (e for e in reversed(self.ledger.events_for_run(self.run_id)) if e.kind == "judge.verdict"),
            None,
        )

    def inject_user_event(self, text: str, label: str | None = None) -> None:
        self.turn += 1
        payload: dict[str, str] = {"text": text}
        if label:
            payload["label"] = label
        self._append(
            Event(
                run_id=self.run_id,
                turn=self.turn,
                kind="user.injected",
                actor="visitor",
                payload=payload,
            )
        )

    # ── internal ──────────────────────────────────────────────────────────────

    def _tick(self) -> None:
        self.turn += 1
        self.governor.begin_turn(self.turn)
        self.governor.check(self.turn)
        obs.set_context(turn=self.turn)

        projection = self.projection

        with obs.span("turn", **{"mal.turn": self.turn}):
            # ── phase 1: event-triggered (subscription) agents ────────────────
            while self._trigger_queue:
                agent, _trigger = self._trigger_queue.popleft()
                self._run_agent(agent, projection)

            # ── phase 2: tick-based scheduled agents ──────────────────────────
            for agent in self._tick_scheduled_agents():
                self._run_agent(agent, projection)

    def _run_agent(self, agent: "Agent", projection: StageProjection, *, check_budget: bool = True) -> None:
        if check_budget:
            self.governor.check(self.turn)  # before the span: a budget stop is not an agent turn
        name = getattr(agent, "name", agent.__class__.__name__)
        start = time.perf_counter()
        with obs.bind(agent=name), obs.span("agent.turn", **{"mal.agent": name, "mal.turn": self.turn}):
            try:
                event = agent.act(
                    run_id=self.run_id,
                    turn=self.turn,
                    projection=projection,
                    # Run-scoped: the ledger holds EVERY run (shared store, ADR-0026);
                    # an agent's memory/context must never recall another run's β€” or
                    # another user's β€” discussion.
                    recent_events=self.ledger.events_for_run(self.run_id),
                )
            except BudgetExceeded:
                raise  # an intentional stop from the governor β€” never swallow it
            except Exception as exc:  # noqa: BLE001 β€” one agent's crash must not silence the cast
                self._note_agent_error(agent, exc)
                return
            # An agent may ABSTAIN by returning None β€” it was invoked (e.g. a judge woken by
            # every spoken line, watching for an end condition) but has nothing to emit this
            # time. No event, no budget charge; the show simply continues. Distinct from an
            # error: a deliberate "not yet," not a failed turn.
            if event is None:
                return
            usage = getattr(agent, "last_usage", {})
            tokens = int(usage.get("total_tokens", 0) or 0)
            cost_usd = float(usage.get("cost_usd", 0.0) or 0.0)
            obs.add_span_attrs(**{"event.kind": event.kind, "mal.tokens": tokens, "mal.cost_usd": cost_usd})
            self.governor.record_call(tokens=tokens, cost_usd=cost_usd)
            self._append(event)
            projection.apply(event)
        obs.record_agent_turn(name, time.perf_counter() - start)

    def _note_agent_error(self, agent: "Agent", exc: Exception) -> None:
        """Record (and log) an agent's failed turn without aborting the tick.

        Resilience over silence: if one mind throws (a flaky model call, a memory
        index hiccup), the others still get their turn this round, and the failure
        is visible on ``agent_errors`` rather than crashing the whole loop."""
        name = getattr(agent, "name", agent.__class__.__name__)
        self.agent_errors.append({"turn": str(self.turn), "agent": name, "error": str(exc)})
        logger.warning("agent %s failed on turn %d: %s", name, self.turn, exc, exc_info=exc)
        obs.log("agent.error", level="warning", agent=name, turn=self.turn, error=str(exc))

    def _maybe_snapshot(self) -> None:
        if not self.snapshot_every or not self.snapshot_path:
            return
        if self.turn % self.snapshot_every != 0:
            return
        snapshot_to = getattr(self.ledger, "snapshot_to", None)
        if callable(snapshot_to):
            snapshot_to(self.snapshot_path)

    def _append(self, event: Event) -> Event:
        # Stamp the session onto the envelope at the single append chokepoint, so
        # *every* action in a run is attributable/filterable by who drove it β€”
        # agents and scenarios never have to know sessions exist.
        if self.session_id and event.session_id is None:
            event = event.model_copy(update={"session_id": self.session_id})
        appended = self.ledger.append(event)
        obs.log(
            "event.append", level="debug", id=appended.id, kind=appended.kind, actor=appended.actor, turn=appended.turn
        )
        if self.observer:
            self.observer.consume(appended)
        self._notify_subscribers(appended)
        return appended

    def _notify_subscribers(self, event: Event) -> None:
        """Queue agents that subscribe to this event kind β€” but never an agent for its OWN
        event.

        An agent that both subscribes to and emits a kind (e.g. the devil's-advocate
        subscribes to ``agent.spoke`` and now speaks one) would otherwise re-trigger itself
        on its own line, cascading until the per-turn call cap trips β€” starving later
        agents (the judge) of their turn. Self-reaction is never intended, so we skip it;
        a subscriber still reacts to every *peer's* event."""
        for agent in self.scenario.agents:
            manifest = getattr(agent, "manifest", None)
            if manifest and event.kind in manifest.subscribes_to and event.actor != getattr(agent, "name", None):
                self._trigger_queue.append((agent, event))

    def _tick_scheduled_agents(self) -> list["Agent"]:
        """Return agents that should fire this turn based on their tick schedule.

        Falls back to the scenario's legacy schedule() method for agents
        without a manifest β€” preserving full backward compatibility.
        """
        manifest_agents = [a for a in self.scenario.agents if getattr(a, "manifest", None)]
        legacy_agents = [a for a in self.scenario.agents if not getattr(a, "manifest", None)]

        result: list[Agent] = []

        # Manifest-driven tick scheduling
        for agent in manifest_agents:
            tick_every = agent.manifest.schedule.tick_every
            if tick_every is not None and (tick_every == 0 or self.turn % tick_every == 0):
                result.append(agent)

        # Legacy scenario scheduling (backward-compatible)
        if legacy_agents:
            scheduled = self.scenario.schedule(self.turn)
            result.extend(a for a in scheduled if a in legacy_agents)

        return result