multi-agent-lab / src /core /run_index.py
agharsallah
feat: Implement well-known typed fields for verdicts in output models
ce159dc
Raw
History Blame Contribute Delete
5.95 kB
"""Run index — a per-run summary projection over the append-only ledger.
This is a *projection* in the same spirit as :func:`src.core.projections.rebuild_stage`:
a pure fold over events that yields one :class:`RunSummary` per ``run_id``. It reads the
two engine bookend events — ``run.started`` (scenario / seed / cast / start time) and
``run.finished`` (reason / winner / token+turn totals / finish time) — and ignores the
rest. A run that has started but not finished yields a summary with the ``finished_*``
fields left ``None`` / zero.
Two implementations, one contract:
* :func:`index_runs` is the reference oracle — a pure function over any
``Iterable[Event]``, folding per ``run_id`` in first-seen order. Tests and offline
callers use this.
* :func:`index_runs_from_ledger` produces the *same* list straight from a ledger's
indexed queries (``runs()`` + ``events_for_run()``), so a hosted SQL backend never
has to stream every event through Python to list its runs.
The cast payload mirrors the enriched ``run.started`` shape (see ``src/core/events.py``):
``{agent_name: {"model_endpoint": str | None, "model_profile": str | None}}``.
"""
from __future__ import annotations
from datetime import datetime
from typing import TYPE_CHECKING, Any, Iterable
from pydantic import BaseModel, ConfigDict, Field
from src.core.events import Event
if TYPE_CHECKING: # avoid a hard import cycle / SQLAlchemy dependency on the offline path
from src.core.ledger import Ledger
class CastBinding(BaseModel):
"""The model a cast member is bound to (mirrors the ``run.started`` cast entry)."""
model_config = ConfigDict(extra="ignore")
model_endpoint: str | None = None
model_profile: str | None = None
class RunSummary(BaseModel):
"""A one-line summary of a single run, folded from its bookend events."""
model_config = ConfigDict(extra="forbid")
run_id: str
scenario: str = ""
seed: str = ""
session_id: str | None = None
cast: dict[str, CastBinding] = Field(default_factory=dict)
started_at: datetime | None = None
finished_at: datetime | None = None
reason: str | None = None
winner: str | None = None
winner_kind: str | None = None
"""Whether ``winner`` names a cast agent (``"agent"``) or a team label
(``"team"``); ``None`` for runs with no competition or no winner (ADR-0029)."""
winning_model: str | None = None
winning_models: list[str] = Field(default_factory=list)
"""Endpoint(s) behind the winner — the agent's model, or every member of a
winning team (``None`` entries dropped). ``winning_model`` stays the single
agent-winner endpoint for back-compat."""
turns: int = 0
tokens: int = 0
def _coerce_cast(raw: Any) -> dict[str, CastBinding]:
if not isinstance(raw, dict):
return {}
cast: dict[str, CastBinding] = {}
for name, binding in raw.items():
if isinstance(binding, dict):
cast[name] = CastBinding(**binding)
else:
cast[name] = CastBinding()
return cast
def _apply_started(summary: RunSummary, event: Event) -> None:
payload = event.payload
summary.scenario = payload.get("scenario", "") or ""
summary.seed = payload.get("seed", "") or ""
# Attribution lives on the envelope (stamped by the conductor on every event);
# the run.started payload copy is kept for human-readable traces.
summary.session_id = event.session_id or payload.get("session_id") or None
summary.cast = _coerce_cast(payload.get("cast"))
summary.started_at = event.created_at
def _apply_finished(summary: RunSummary, event: Event) -> None:
payload = event.payload
summary.reason = payload.get("reason")
summary.winner = payload.get("winner")
summary.winner_kind = payload.get("winner_kind")
summary.winning_model = payload.get("winning_model")
summary.winning_models = list(payload.get("winning_models") or [])
summary.turns = int(payload.get("turns", 0) or 0)
summary.tokens = int(payload.get("tokens", 0) or 0)
summary.finished_at = event.created_at
def index_runs(events: Iterable[Event]) -> list[RunSummary]:
"""Fold *events* into one :class:`RunSummary` per run, in first-seen order.
Pure reference oracle: ``run.started`` populates scenario/seed/cast/started_at and
``run.finished`` populates reason/winner/winning_model/turns/tokens/finished_at.
Runs appear in the order their *first* event is seen. All other event kinds are
ignored (they don't change the summary), but any event is enough to register a run.
"""
summaries: dict[str, RunSummary] = {}
for event in events:
summary = summaries.get(event.run_id)
if summary is None:
summary = RunSummary(run_id=event.run_id)
summaries[event.run_id] = summary
if event.kind == "run.started":
_apply_started(summary, event)
elif event.kind == "run.finished":
_apply_finished(summary, event)
return list(summaries.values())
def index_runs_from_ledger(ledger: "Ledger") -> list[RunSummary]:
"""Build the same :class:`RunSummary` list using a ledger's indexed queries.
Equivalent to ``index_runs(ledger.events)`` but reads run-scoped slices via
``ledger.runs()`` + ``ledger.events_for_run()`` so SQL backends keep the work in the
database's indexes rather than streaming the whole log through Python. ``index_runs``
remains the oracle the two paths are checked against.
"""
summaries: list[RunSummary] = []
for run_id in ledger.runs():
summary = RunSummary(run_id=run_id)
for event in ledger.events_for_run(run_id):
if event.kind == "run.started":
_apply_started(summary, event)
elif event.kind == "run.finished":
_apply_finished(summary, event)
summaries.append(summary)
return summaries