File size: 10,237 Bytes
f637227
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8400d8c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f637227
 
 
 
 
 
 
 
 
 
 
 
8400d8c
 
 
 
f637227
8400d8c
 
 
f637227
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8400d8c
 
11f4e9a
8400d8c
 
 
 
 
 
11f4e9a
 
8400d8c
11f4e9a
 
 
 
 
 
 
 
f637227
8400d8c
 
 
 
26bc5b9
 
 
 
 
 
8400d8c
 
26bc5b9
 
 
 
 
 
 
 
 
8400d8c
 
 
 
 
 
 
 
 
 
 
11f4e9a
 
8400d8c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
"""The Commentator β€” a universal "color commentary" observer.

It is scenario-agnostic by design: it summarises only the public ledger, so it drops
into *any* cast (a debate, a mystery, a guessing game, a living scene) with no engine
edits and no per-scenario flavour. Most agents are pure declarative config; the
commentator needs a handler for two things the generic turn cannot express:

  1. **Cadence, measured in rounds.** It holds its tongue until a configurable number
     of speaking *rounds* have passed since its last remark β€” where one round is
     approximated as "every known speaker has spoken once" (one beat per distinct cast
     speaker it has seen). The knob is ``commentary.rounds`` in the manifest (default 1),
     overridable at runtime via ``MAL_COMMENTATOR_ROUNDS``; the legacy
     ``MAL_COMMENTATOR_EVERY`` still pins an *absolute* beat count when set. It is polled
     every turn (``schedule.tick_every: 1``) and ABSTAINS (returns ``None``) until the
     threshold accrues, then delivers exactly one beat. The threshold is a *count* of
     beats, not a per-speaker quorum, so a stalled or errored speaker can never wedge the
     cadence (the illustrated/spoken media beat always eventually fires).

  2. **Media.** When it does speak it draws an image of the beat and says the line
     aloud, folding both onto its event β€” the :class:`FortuneTeller` tool pattern,
     for the ``image.render`` / ``tts.speak`` capabilities. Media is a garnish: a
     missing tool (before media is wired) or a failed call degrades the beat to text,
     never breaking the turn.

It never calls a peer and never reads another mind β€” it summarises only the public
ledger, exactly like every other agent. Drop ``rafters-critic`` into a scenario's
``cast`` to switch it on; remove it and the engine never knows it existed (ADR-0011).
"""

from __future__ import annotations

import os

from src import observability as obs
from src.agents.base import ManifestAgent
from src.core.events import Event
from src.core.projections import StageProjection
from src.core.registry import register_handler

# Public, ledger-visible "a cast member said something" kinds β€” mirrors
# ``base._SPEECH_KINDS``. The commentator's own ``commentary.posted`` is deliberately
# absent, so a remark never counts toward the next quorum (self-trigger guard #2; guard
# #1 is ``subscribes_to: []`` in the manifest, so it is never event-woken at all).
_SPEECH_KINDS = frozenset({"agent.spoke", "agent.thought", "oracle.spoke", "world.observed"})

_COMMENTARY_KIND = "commentary.posted"
_DEFAULT_ROUNDS = 1


def _env_int(name: str) -> int | None:
    """A floored-at-1 positive int from env var *name*, or None if unset/garbage."""
    raw = os.getenv(name)
    if raw is None:
        return None
    try:
        return max(1, int(raw))
    except ValueError:
        return None


@register_handler("commentator")
class Commentator(ManifestAgent):
    """Universal color commentary on a round-paced beat counter, with an illustrated, spoken beat."""

    # ── cadence ───────────────────────────────────────────────────────────────

    def _rounds(self) -> int:
        """How many speaking rounds must pass before the next remark (default 1).

        Manifest ``commentary.rounds`` is the declared default; ``MAL_COMMENTATOR_ROUNDS``
        overrides it at runtime (the user-facing knob). Floored at 1 so a bad value can't
        wedge the cadence."""
        env = _env_int("MAL_COMMENTATOR_ROUNDS")
        if env is not None:
            return env
        cfg = self.manifest.commentary
        return max(1, cfg.rounds) if cfg else _DEFAULT_ROUNDS

    def _round_size(self, events: tuple[Event, ...]) -> int:
        """Distinct cast speakers (never self) seen so far β€” one round's worth of beats.

        Self-calibrating: it counts only cast members who have actually spoken, so silent
        observers and the critic itself don't inflate the round, and a scenario with three
        speakers needs three beats per round where one with five needs five."""
        cast = set(self.cast_names)
        speakers = {e.actor for e in events if e.kind in _SPEECH_KINDS and e.actor in cast and e.actor != self.name}
        return len(speakers)

    def _every(self, events: tuple[Event, ...]) -> int:
        """How many public speech beats must land before the next remark.

        Legacy ``MAL_COMMENTATOR_EVERY`` pins an *absolute* beat count when set (back-compat);
        otherwise it is ``rounds Γ— round_size`` β€” "this many rounds of everyone-speaks-once".
        A plain count, not a per-speaker quorum: a stalled or errored speaker can never wedge
        the cadence (the old quorum required *every* speaker who ever spoke to keep speaking,
        so one silent agent blocked commentary forever β€” and starved the media beat with it).
        Floored at 1."""
        absolute = _env_int("MAL_COMMENTATOR_EVERY")
        if absolute is not None:
            return absolute
        return max(1, self._rounds() * self._round_size(events))

    def _window_since_last(self, events: tuple[Event, ...]) -> tuple[Event, ...]:
        """Events after this agent's most recent remark β€” its counter resets each beat."""
        last = -1
        for i, event in enumerate(events):
            if event.kind == _COMMENTARY_KIND and event.actor == self.name:
                last = i
        return events[last + 1 :]

    def _beats_since_last(self, events: tuple[Event, ...]) -> int:
        """Count cast speech beats (never self) since this critic's last remark."""
        cast = set(self.cast_names)
        return sum(
            1
            for e in self._window_since_last(events)
            if e.kind in _SPEECH_KINDS and e.actor in cast and e.actor != self.name
        )

    def _ready(self, events: tuple[Event, ...]) -> bool:
        """True once enough fresh speech has landed since the last beat to chime in."""
        return self._beats_since_last(events) >= self._every(events)

    # ── prompt steering ─────────────────────────────────────────────────────────

    def _build_extra_prompt(self, projection: StageProjection, recent_events: tuple[Event, ...]) -> str:
        """Steer the model toward a genuinely funny one-line heckle of the beat.

        Small models can't be funny on the word "funny" alone β€” they default to
        cheerful narration. So we hand them a comedian's recipe: latch onto one
        concrete detail, then break it with a twist (absurd comparison, deadpan
        undercut, or mock-serious overreaction). Specific + surprising = the laugh."""
        return (
            "YOUR JOB\n"
            "Heckle the beat above with ONE short, funny line β€” the kind that gets a laugh, "
            "not a polite nod. Work the bit like this:\n"
            "- Grab ONE specific thing the cast just did β€” a prop, a word, a choice β€” and make "
            "THAT the target. Never a vague 'well, that happened'.\n"
            "- Then break it: an absurd comparison, a deadpan undercut, or a mock-serious "
            "overreaction. The twist is where the laugh lives β€” surprise beats cleverness.\n"
            "- Punch up at the drama, never down at a person. Affectionate, never cruel.\n"
            "- ONE sentence. No narration, no stage directions, no quotation marks, no lists, "
            "no emoji, no setup-then-punchline. Just the line, like you shouted it from the rafters."
        )

    # ── turn ──────────────────────────────────────────────────────────────────

    def act(
        self,
        run_id: str,
        turn: int,
        projection: StageProjection,
        recent_events: tuple[Event, ...],
    ) -> Event | None:
        # Hold until enough fresh speech beats have landed since the last remark.
        if not self._ready(recent_events):
            return None
        # The generic turn writes the funny line (offline β†’ the curated stub keyed on
        # this agent's name); kind is constrained to ``commentary.posted`` by may_emit.
        event = super().act(run_id, turn, projection, recent_events)
        summary = str(event.payload.get("text", "")).strip()
        if not summary:
            return event

        # Draw + voice the beat. Best-effort: a missing/failed tool leaves the beat as
        # text, exactly like a media-less offline run. The slug keys the file under the
        # run so the hybrid transport can serve it (or inline a data: URI offline).
        slug = f"{turn:03d}-{event.id[:8]}"
        image = self._media_ref("image.render", prompt=summary, run_id=run_id, slug=f"{slug}-img")
        if image:
            event.payload["image"] = {"src": image["src"], "alt": summary[:120]}
        audio = self._media_ref("tts.speak", text=summary, run_id=run_id, slug=f"{slug}-tts")
        if audio:
            event.payload["audio"] = {"src": audio["src"], "mime": audio.get("mime", "")}
        return event

    def _media_ref(self, tool: str, **params) -> dict | None:
        """Best-effort media via a capability-checked tool; ``None`` on absence or failure.

        Returns the tool's ref dict (``{"src", "mime", ...}``) only when it carries a
        usable ``src``. A tool that isn't registered (before media is wired) or a failed
        generation degrades the beat to text β€” it must never drop the turn."""
        if self.tools is None or tool not in self.manifest.tools or not self.tools.has(tool):
            return None
        try:
            result = self.call_tool(tool, **params)
        except Exception as exc:  # noqa: BLE001 β€” media is garnish; a failure must not drop the beat
            obs.log("commentator.media_skip", level="warning", agent=self.name, tool=tool, error=str(exc))
            return None
        return result if (result or {}).get("src") else None