File size: 10,237 Bytes
f637227 8400d8c f637227 8400d8c f637227 8400d8c f637227 8400d8c 11f4e9a 8400d8c 11f4e9a 8400d8c 11f4e9a f637227 8400d8c 26bc5b9 8400d8c 26bc5b9 8400d8c 11f4e9a 8400d8c | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 | """The Commentator β a universal "color commentary" observer.
It is scenario-agnostic by design: it summarises only the public ledger, so it drops
into *any* cast (a debate, a mystery, a guessing game, a living scene) with no engine
edits and no per-scenario flavour. Most agents are pure declarative config; the
commentator needs a handler for two things the generic turn cannot express:
1. **Cadence, measured in rounds.** It holds its tongue until a configurable number
of speaking *rounds* have passed since its last remark β where one round is
approximated as "every known speaker has spoken once" (one beat per distinct cast
speaker it has seen). The knob is ``commentary.rounds`` in the manifest (default 1),
overridable at runtime via ``MAL_COMMENTATOR_ROUNDS``; the legacy
``MAL_COMMENTATOR_EVERY`` still pins an *absolute* beat count when set. It is polled
every turn (``schedule.tick_every: 1``) and ABSTAINS (returns ``None``) until the
threshold accrues, then delivers exactly one beat. The threshold is a *count* of
beats, not a per-speaker quorum, so a stalled or errored speaker can never wedge the
cadence (the illustrated/spoken media beat always eventually fires).
2. **Media.** When it does speak it draws an image of the beat and says the line
aloud, folding both onto its event β the :class:`FortuneTeller` tool pattern,
for the ``image.render`` / ``tts.speak`` capabilities. Media is a garnish: a
missing tool (before media is wired) or a failed call degrades the beat to text,
never breaking the turn.
It never calls a peer and never reads another mind β it summarises only the public
ledger, exactly like every other agent. Drop ``rafters-critic`` into a scenario's
``cast`` to switch it on; remove it and the engine never knows it existed (ADR-0011).
"""
from __future__ import annotations
import os
from src import observability as obs
from src.agents.base import ManifestAgent
from src.core.events import Event
from src.core.projections import StageProjection
from src.core.registry import register_handler
# Public, ledger-visible "a cast member said something" kinds β mirrors
# ``base._SPEECH_KINDS``. The commentator's own ``commentary.posted`` is deliberately
# absent, so a remark never counts toward the next quorum (self-trigger guard #2; guard
# #1 is ``subscribes_to: []`` in the manifest, so it is never event-woken at all).
_SPEECH_KINDS = frozenset({"agent.spoke", "agent.thought", "oracle.spoke", "world.observed"})
_COMMENTARY_KIND = "commentary.posted"
_DEFAULT_ROUNDS = 1
def _env_int(name: str) -> int | None:
"""A floored-at-1 positive int from env var *name*, or None if unset/garbage."""
raw = os.getenv(name)
if raw is None:
return None
try:
return max(1, int(raw))
except ValueError:
return None
@register_handler("commentator")
class Commentator(ManifestAgent):
"""Universal color commentary on a round-paced beat counter, with an illustrated, spoken beat."""
# ββ cadence βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _rounds(self) -> int:
"""How many speaking rounds must pass before the next remark (default 1).
Manifest ``commentary.rounds`` is the declared default; ``MAL_COMMENTATOR_ROUNDS``
overrides it at runtime (the user-facing knob). Floored at 1 so a bad value can't
wedge the cadence."""
env = _env_int("MAL_COMMENTATOR_ROUNDS")
if env is not None:
return env
cfg = self.manifest.commentary
return max(1, cfg.rounds) if cfg else _DEFAULT_ROUNDS
def _round_size(self, events: tuple[Event, ...]) -> int:
"""Distinct cast speakers (never self) seen so far β one round's worth of beats.
Self-calibrating: it counts only cast members who have actually spoken, so silent
observers and the critic itself don't inflate the round, and a scenario with three
speakers needs three beats per round where one with five needs five."""
cast = set(self.cast_names)
speakers = {e.actor for e in events if e.kind in _SPEECH_KINDS and e.actor in cast and e.actor != self.name}
return len(speakers)
def _every(self, events: tuple[Event, ...]) -> int:
"""How many public speech beats must land before the next remark.
Legacy ``MAL_COMMENTATOR_EVERY`` pins an *absolute* beat count when set (back-compat);
otherwise it is ``rounds Γ round_size`` β "this many rounds of everyone-speaks-once".
A plain count, not a per-speaker quorum: a stalled or errored speaker can never wedge
the cadence (the old quorum required *every* speaker who ever spoke to keep speaking,
so one silent agent blocked commentary forever β and starved the media beat with it).
Floored at 1."""
absolute = _env_int("MAL_COMMENTATOR_EVERY")
if absolute is not None:
return absolute
return max(1, self._rounds() * self._round_size(events))
def _window_since_last(self, events: tuple[Event, ...]) -> tuple[Event, ...]:
"""Events after this agent's most recent remark β its counter resets each beat."""
last = -1
for i, event in enumerate(events):
if event.kind == _COMMENTARY_KIND and event.actor == self.name:
last = i
return events[last + 1 :]
def _beats_since_last(self, events: tuple[Event, ...]) -> int:
"""Count cast speech beats (never self) since this critic's last remark."""
cast = set(self.cast_names)
return sum(
1
for e in self._window_since_last(events)
if e.kind in _SPEECH_KINDS and e.actor in cast and e.actor != self.name
)
def _ready(self, events: tuple[Event, ...]) -> bool:
"""True once enough fresh speech has landed since the last beat to chime in."""
return self._beats_since_last(events) >= self._every(events)
# ββ prompt steering βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _build_extra_prompt(self, projection: StageProjection, recent_events: tuple[Event, ...]) -> str:
"""Steer the model toward a genuinely funny one-line heckle of the beat.
Small models can't be funny on the word "funny" alone β they default to
cheerful narration. So we hand them a comedian's recipe: latch onto one
concrete detail, then break it with a twist (absurd comparison, deadpan
undercut, or mock-serious overreaction). Specific + surprising = the laugh."""
return (
"YOUR JOB\n"
"Heckle the beat above with ONE short, funny line β the kind that gets a laugh, "
"not a polite nod. Work the bit like this:\n"
"- Grab ONE specific thing the cast just did β a prop, a word, a choice β and make "
"THAT the target. Never a vague 'well, that happened'.\n"
"- Then break it: an absurd comparison, a deadpan undercut, or a mock-serious "
"overreaction. The twist is where the laugh lives β surprise beats cleverness.\n"
"- Punch up at the drama, never down at a person. Affectionate, never cruel.\n"
"- ONE sentence. No narration, no stage directions, no quotation marks, no lists, "
"no emoji, no setup-then-punchline. Just the line, like you shouted it from the rafters."
)
# ββ turn ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def act(
self,
run_id: str,
turn: int,
projection: StageProjection,
recent_events: tuple[Event, ...],
) -> Event | None:
# Hold until enough fresh speech beats have landed since the last remark.
if not self._ready(recent_events):
return None
# The generic turn writes the funny line (offline β the curated stub keyed on
# this agent's name); kind is constrained to ``commentary.posted`` by may_emit.
event = super().act(run_id, turn, projection, recent_events)
summary = str(event.payload.get("text", "")).strip()
if not summary:
return event
# Draw + voice the beat. Best-effort: a missing/failed tool leaves the beat as
# text, exactly like a media-less offline run. The slug keys the file under the
# run so the hybrid transport can serve it (or inline a data: URI offline).
slug = f"{turn:03d}-{event.id[:8]}"
image = self._media_ref("image.render", prompt=summary, run_id=run_id, slug=f"{slug}-img")
if image:
event.payload["image"] = {"src": image["src"], "alt": summary[:120]}
audio = self._media_ref("tts.speak", text=summary, run_id=run_id, slug=f"{slug}-tts")
if audio:
event.payload["audio"] = {"src": audio["src"], "mime": audio.get("mime", "")}
return event
def _media_ref(self, tool: str, **params) -> dict | None:
"""Best-effort media via a capability-checked tool; ``None`` on absence or failure.
Returns the tool's ref dict (``{"src", "mime", ...}``) only when it carries a
usable ``src``. A tool that isn't registered (before media is wired) or a failed
generation degrades the beat to text β it must never drop the turn."""
if self.tools is None or tool not in self.manifest.tools or not self.tools.has(tool):
return None
try:
result = self.call_tool(tool, **params)
except Exception as exc: # noqa: BLE001 β media is garnish; a failure must not drop the beat
obs.log("commentator.media_skip", level="warning", agent=self.name, tool=tool, error=str(exc))
return None
return result if (result or {}).get("src") else None
|