Spaces:
Running on Zero
Running on Zero
agharsallah
feat(commentator): update cadence logic to use a fixed count instead of per-speaker quorum
11f4e9a | """Agent base protocol and manifest-driven agent base class. | |
| Two layers: | |
| Agent (ABC) β the minimal interface the conductor requires. Any object | |
| with an ``act()`` method and a ``name`` is a valid agent. Kept minimal so | |
| simple stubs and deterministic test agents stay trivial. | |
| ManifestAgent β extends Agent with a manifest, per-profile model routing, | |
| layered memory (episodic / salience), reflection, structured output, and | |
| capability-checked tools. A manifest + this base is usually all a new agent | |
| needs; only special behaviour requires a subclass (override | |
| ``_build_extra_prompt``). This is the workhorse of the modular system. | |
| Backward compatibility: Phase-0/1 agents extend Agent directly and are | |
| unaffected. The conductor checks ``getattr(agent, "manifest", None)`` to | |
| decide whether manifest-based routing applies. | |
| """ | |
| from __future__ import annotations | |
| import re | |
| from abc import ABC, abstractmethod | |
| from typing import TYPE_CHECKING | |
| from src import observability as obs | |
| from src.core.context import ContextBuilder | |
| from src.core.events import Event | |
| from src.core.manifest import AgentManifest | |
| from src.core.memory import EpisodicMemory, ReflectionTracker, SalienceMemory | |
| from src.core.projections import StageProjection | |
| from src.core.structured import ( | |
| AgentOutputError, | |
| build_output_model, | |
| clean_clue, | |
| extract_reasoning, | |
| is_usable_line, | |
| json_instruction, | |
| parse_agent_output, | |
| ) | |
| from src.models.provider import is_model_error | |
| from src.models.router import ModelRouter | |
| if TYPE_CHECKING: | |
| from src.core.memory_index import MemoryIndex | |
| from src.tools.registry import ToolRegistry | |
| _ctx = ContextBuilder() | |
| # System-level memory event every reflecting agent may emit, independent of its | |
| # domain `may_emit` grant β reflection compacts memory, it is not a world action. | |
| _REFLECTION_KIND = "agent.reflected" | |
| # Anti-loop corrective re-ask: when the whole cast shares one backend, every agent | |
| # samples the same model and they drift toward the same line. Rather than silently | |
| # DROP a near-duplicate turn (which sidelines that agent from the round β one model | |
| # repeating made it look like a single agent monopolised the show), we nudge once for | |
| # something new and only skip if it still repeats. Keeps the round collaborative. | |
| _ANTI_REPEAT_NUDGE = ( | |
| "\n\nIMPORTANT: another voice just said something almost identical to your draft. " | |
| "Say something DIFFERENT this turn β a fresh image, a new beat, your own angle. " | |
| "Do not echo or paraphrase a line that was already spoken." | |
| ) | |
| # Live fallback when structured output fails: ask for a plain spoken line, NOT JSON. | |
| # Weak/reasoning models echo a JSON schema (and its example) and leak their reasoning; | |
| # asking for prose gives a clean line we can strip and ship. | |
| _PROSE_FALLBACK = ( | |
| "\n\nNow say your line aloud β one or two vivid, in-character sentences and nothing else. " | |
| "No JSON, no labels, no analysis, no quotation marks. " | |
| "Never name or spell the secret word you were given; only describe it." | |
| ) | |
| # Kinds we de-duplicate so the cast advances the tale instead of echoing the same line | |
| # (small models ignore "never repeat"; this enforces it). `world.observed` is included: | |
| # the seedkeeper narrates the world every turn, so without it a weak model loops on the | |
| # same scene line (and other agents parrot it back). Verdicts (the closing ruling) and | |
| # reflections (private memory compaction) are excluded β they are not table chatter. | |
| _SPEECH_KINDS = frozenset({"agent.spoke", "oracle.spoke", "agent.thought", "world.observed"}) | |
| _WORD = re.compile(r"[a-z0-9']+") | |
| # ββ minimal interface βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class Agent(ABC): | |
| name: str | |
| def act( | |
| self, | |
| run_id: str, | |
| turn: int, | |
| projection: StageProjection, | |
| recent_events: tuple[Event, ...], | |
| ) -> Event: | |
| raise NotImplementedError | |
| # ββ manifest-driven base ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class ManifestAgent(Agent): | |
| """Base class for manifest-driven agents. | |
| Subclasses provide a class-level ``manifest``. Construction takes a | |
| :class:`ModelRouter` (per-profile model selection) and an optional | |
| :class:`ToolRegistry` (capability-checked tools). ``act()`` handles context | |
| assembly, memory, reflection, model routing, and structured output, so most | |
| agents need no extra code. | |
| """ | |
| manifest: AgentManifest | |
| def __init__( | |
| self, | |
| router: ModelRouter, | |
| tools: "ToolRegistry | None" = None, | |
| memory_index: "MemoryIndex | None" = None, | |
| ) -> None: | |
| self.router = router | |
| self.tools = tools | |
| # Optional semantic relevance index β a derived, rebuildable lens over the | |
| # ledger (ADR-0018). ``None`` (offline default) keeps salience on the | |
| # keyword path. | |
| self.memory_index = memory_index | |
| self._reflection_tracker: ReflectionTracker | None = None | |
| self.last_usage: dict[str, int] = {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0} | |
| # Scenario competition context (ADR-0029), injected by the registry when this | |
| # agent is assembled into a cast. ``None`` (the standalone default) means no | |
| # competition: the verdict-validation hook is inert, so non-judged scenarios | |
| # and bare-constructed agents behave exactly as before. | |
| self.competition = None | |
| self.cast_names: list[str] = [] | |
| # The model behind the most recent generation, captured when the provider is | |
| # resolved and stamped onto the event in act()/reflection β so each line in the | |
| # ledger records the model that actually produced it, not just the intended one. | |
| self.last_model_profile: str | None = None | |
| self.last_model_id: str | None = None | |
| def name(self) -> str: # type: ignore[override] | |
| return self.manifest.name | |
| # ββ main turn βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def act( | |
| self, | |
| run_id: str, | |
| turn: int, | |
| projection: StageProjection, | |
| recent_events: tuple[Event, ...], | |
| ) -> Event: | |
| mem_cfg = self.manifest.memory | |
| # Reflection takes priority on its scheduled turn: compact memory into a | |
| # belief instead of acting on the world this turn. | |
| threshold = mem_cfg.reflection_threshold | |
| if threshold is not None and self._tracker(threshold).observe(recent_events): | |
| return self._emit_reflection(run_id, turn, recent_events) | |
| memory_text = self._recall(turn, projection, recent_events) | |
| context = _ctx.build( | |
| agent_name=self.manifest.name, | |
| persona=self.manifest.persona, | |
| projection=projection, | |
| all_events=recent_events, | |
| memory_window=mem_cfg.window, | |
| memory_text=memory_text, # FIX: salience/episodic recall is now actually used | |
| # A judge gets the COMPLETE exchange to rule on; workers get the recent table to | |
| # react to (ContextBuilder picks the discussion block by role β ADR-0023 follow-up). | |
| role=self.manifest.role, | |
| ) | |
| extra = self._build_extra_prompt(projection, recent_events) | |
| tools_block = self._tools_block() | |
| allowed = self._content_kinds() | |
| extra_fields = self.manifest.output_extra_fields or None | |
| base_prompt = "\n".join(filter(None, [context, extra, tools_block])) | |
| # DEBUG: the FULL prompt this agent will send to its model (a key user ask). | |
| obs.log("agent.prompt", level="debug", agent=self.manifest.name, allowed=allowed, prompt=base_prompt) | |
| parsed = self._resolve_payload(self.manifest.name, base_prompt, allowed, extra_fields) | |
| # Don't echo the table: if this line near-duplicates a recent spoken one, give the | |
| # agent ONE corrective re-ask for something new before skipping. A shared backend | |
| # makes the whole cast drift toward the same line, so dropping every duplicate | |
| # silently sidelines agents from the round; the nudge keeps each one talking. Only | |
| # if the re-ask still repeats do we skip the turn so the conversation advances. | |
| # Live only β the offline stub's curated catalogue is reproducible by design, and | |
| # de-duplicating its small set of lines would starve demos and tests of events. | |
| if ( | |
| not getattr(self.router, "offline", False) | |
| and parsed.get("kind") in _SPEECH_KINDS | |
| and self._is_repeat(parsed.get("text", ""), recent_events) | |
| ): | |
| obs.log("agent.repeat_retry", agent=self.manifest.name, text=str(parsed.get("text", ""))[:120]) | |
| parsed = self._resolve_payload(self.manifest.name, base_prompt + _ANTI_REPEAT_NUDGE, allowed, extra_fields) | |
| if parsed.get("kind") in _SPEECH_KINDS and self._is_repeat(parsed.get("text", ""), recent_events): | |
| obs.log("agent.repeat_skip", agent=self.manifest.name, text=str(parsed.get("text", ""))[:120]) | |
| raise AgentOutputError(f"{self.manifest.name}: repeated a recent line β skipped to keep it moving") | |
| obs.log("agent.acted", agent=self.manifest.name, kind=parsed["kind"], text=str(parsed.get("text", ""))[:160]) | |
| return Event( | |
| run_id=run_id, | |
| turn=turn, | |
| kind=parsed["kind"], | |
| actor=self.manifest.name, | |
| payload={k: v for k, v in parsed.items() if k != "kind"}, | |
| model_profile=self.last_model_profile, | |
| model_id=self.last_model_id, | |
| ) | |
| def _is_repeat(text: str, recent_events: tuple[Event, ...], *, look_back: int = 12, threshold: float = 0.8) -> bool: | |
| """True when *text* echoes a recent spoken line β exact (token set) or high overlap. | |
| Enforces the "say something new" rule small models ignore: peers' and the agent's | |
| own recent lines are compared by token-set Jaccard, so a verbatim or near-verbatim | |
| repeat is caught and skipped, keeping the conversation moving instead of looping.""" | |
| tokens = set(_WORD.findall((text or "").lower())) | |
| if not tokens: | |
| return False | |
| spoken = [e for e in recent_events if e.kind in _SPEECH_KINDS][-look_back:] | |
| for event in spoken: | |
| prior = set(_WORD.findall(str(event.payload.get("text", "")).lower())) | |
| if prior and len(tokens & prior) / len(tokens | prior) >= threshold: | |
| return True | |
| return False | |
| def _build_extra_prompt( | |
| self, | |
| projection: StageProjection, | |
| recent_events: tuple[Event, ...], | |
| ) -> str: | |
| """Override to inject scenario-specific instructions.""" | |
| return "" | |
| # ββ model routing βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _route_key(self) -> str: | |
| """Router key for this agent: the explicit ``model_endpoint`` catalogue key | |
| when set (a specific served model), else the logical ``model_profile`` tier. | |
| The router accepts either β a catalogue key resolves to that model's live | |
| binding, a tier to the profile default β so an agent can be pinned to one | |
| concrete Modal model without the engine naming a model anywhere (ADR-0022).""" | |
| return self.manifest.model_endpoint or self.manifest.model_profile | |
| def _resolve_payload( | |
| self, | |
| role: str, | |
| prompt: str, | |
| allowed: list[str], | |
| extra_fields: list[str] | None, | |
| ) -> dict: | |
| """Produce a validated ``{kind, text, β¦}`` payload for *role*. | |
| Live path: ask the provider for a Pydantic model whose ``kind`` is | |
| constrained to *allowed* β validated by construction. When that fails (a | |
| small or reasoning model that won't emit clean JSON), DON'T re-prompt with | |
| the schema: weak models echo the instruction, copy the example, and leak | |
| their reasoning (and the secret word) into the line. Instead ask for a | |
| PLAIN-PROSE line, strip the thinking, and β if nothing usable survives β | |
| raise so the conductor skips the turn rather than ship ``β¦`` or junk. | |
| Offline path (deterministic stub, no ``complete_structured``): append the | |
| JSON instruction and run the tolerant parser as before. Token/cost usage | |
| is recorded from the provider in every path. | |
| On either path, a judge's verdict in a competition scenario is run through | |
| :meth:`_verify_verdict` β one corrective re-ask when the model names a winner | |
| outside the cast (ADR-0029), otherwise a no-op for every other agent. | |
| """ | |
| wants_thought = bool(extra_fields and "thought" in extra_fields) | |
| provider = self.router.for_profile(self._route_key) | |
| self._record_model(provider) | |
| with obs.span("agent.resolve", **{"mal.agent": role, "mal.profile": self._route_key}): | |
| if hasattr(provider, "complete_structured"): | |
| model = build_output_model(allowed, extra_fields) | |
| def _structured(p: str) -> dict | None: | |
| """One structured generation: a usable payload, or None to fall back.""" | |
| try: | |
| result = provider.complete_structured(role, p, model) | |
| except Exception: | |
| self.last_usage = dict(provider.last_usage) | |
| return None # structured failed β caller falls through to prose | |
| self.last_usage = dict(provider.last_usage) | |
| payload = self._with_reasoning(result.model_dump(), provider, "", wants_thought) | |
| return payload if is_usable_line(payload.get("text", "")) else None | |
| payload = _structured(prompt) | |
| if payload is not None: | |
| payload = self._verify_verdict(prompt, payload, _structured) | |
| obs.add_span_attrs(**{"resolve.path": "structured", "event.kind": payload.get("kind", "")}) | |
| return payload | |
| obs.add_span_attrs(**{"resolve.path": "prose_fallback"}) | |
| return self._prose_fallback(role, prompt, allowed, wants_thought, provider) | |
| instruction = json_instruction(allowed, extra_fields=extra_fields) | |
| def _offline(p: str) -> dict: | |
| """One offline generation: parse the stub's output into a payload.""" | |
| raw = provider.complete(role, f"{p}\n{instruction}") | |
| self.last_usage = dict(provider.last_usage) | |
| self._guard_model_error(role, raw) | |
| parsed = parse_agent_output(raw, allowed_kinds=allowed, fallback_kind=allowed[0]) | |
| return self._with_reasoning(parsed, provider, raw, wants_thought) | |
| parsed = self._verify_verdict(prompt, _offline(prompt), _offline) | |
| obs.add_span_attrs(**{"resolve.path": "offline_parse", "event.kind": parsed.get("kind", "")}) | |
| return parsed | |
| # ββ verdict winner validation (ADR-0029) βββββββββββββββββββββββββββββββββββ | |
| def _verify_verdict(self, prompt: str, payload: dict, regenerate) -> dict: | |
| """Validate a verdict's ``winner``/``scores``; re-ask once on a bad winner. | |
| ``scores`` is normalised in place (non-cast keys dropped, values clamped to | |
| 0β10) and never re-asked β it is garnish. An out-of-cast ``winner`` triggers | |
| exactly one corrective regeneration via *regenerate*, with the token usage of | |
| both calls summed so the governor (ADR-0013) meters the retry. A second | |
| failure drops ``winner`` and stamps ``no_contest`` β the verdict *text* still | |
| ships, so the show always ends; only the leaderboard row is forfeited. | |
| For every non-judge agent, every ``kind: none`` scenario, and every standalone | |
| agent (no competition attached), :meth:`_validate_payload` returns ``None`` and | |
| this is a transparent pass-through.""" | |
| error = self._validate_payload(payload) | |
| if error is None: | |
| return payload | |
| first_usage = dict(self.last_usage) | |
| corrective = ( | |
| f"\n\nCORRECTION: your previous reply named an invalid winner. {error} " | |
| "Reply again with the same JSON object, changing only the 'winner' field." | |
| ) | |
| retry = regenerate(prompt + corrective) | |
| self.last_usage = self._sum_usage(first_usage, self.last_usage) | |
| if retry is not None and is_usable_line(retry.get("text", "")) and self._validate_payload(retry) is None: | |
| return retry | |
| payload.pop("winner", None) | |
| payload["no_contest"] = True | |
| return payload | |
| def _validate_payload(self, parsed: dict) -> str | None: | |
| """Return an error string when a judge named an out-of-cast winner, else ``None``. | |
| Active only for a ``judge`` whose attached competition has ``kind != none`` and | |
| whose manifest declares a ``winner`` field. A missing/empty ``winner`` is *not* | |
| an error (the field is optional and the offline stub never emits it β determinism | |
| preserved); ``scores`` is normalised in place as a side effect (never an error).""" | |
| comp = self.competition | |
| if comp is None or getattr(comp, "kind", "none") == "none" or self.manifest.role != "judge": | |
| return None | |
| if "winner" not in (self.manifest.output_extra_fields or []): | |
| return None | |
| cast = set(self.cast_names) | |
| scores = parsed.get("scores") | |
| if isinstance(scores, dict): | |
| cleaned: dict[str, float] = {} | |
| for name, value in scores.items(): | |
| if name not in cast: | |
| continue | |
| try: | |
| cleaned[name] = max(0.0, min(10.0, float(value))) | |
| except (TypeError, ValueError): | |
| continue | |
| parsed["scores"] = cleaned | |
| winner = parsed.get("winner") | |
| if winner in (None, ""): | |
| return None | |
| if winner not in self._winner_vocab(): | |
| return f"'winner' must be exactly one of: {', '.join(self._winner_vocab())}." | |
| return None | |
| def _winner_vocab(self) -> list[str]: | |
| """The valid winner vocabulary: every cast member, plus any team labels.""" | |
| comp = self.competition | |
| teams = getattr(comp, "teams", None) or {} | |
| return list(self.cast_names) + list(teams.keys()) | |
| def _sum_usage(first: dict, second: dict) -> dict: | |
| """Sum two token-usage dicts so a re-ask's cost is metered, not lost.""" | |
| keys = set(first) | set(second) | |
| return {k: int(first.get(k, 0) or 0) + int(second.get(k, 0) or 0) for k in keys} | |
| def _guard_model_error(self, role: str, raw: str) -> None: | |
| """Raise when *raw* is a provider failure sentinel, not a spoken line. | |
| ``complete()`` returns the ``[model error: β¦]`` sentinel instead of raising when | |
| a model call fails (a transient connection drop, a 5xx). Turning it back into an | |
| exception here hands the failure to the conductor's resilient loop, which skips | |
| this agent's turn and records it in ``agent_errors`` β so the error never reaches | |
| the stage as the agent's line (ADR-0023).""" | |
| if is_model_error(raw): | |
| raise AgentOutputError(f"{getattr(self, 'name', role)}: model call failed β {raw}") | |
| def _prose_fallback(self, role, prompt, allowed, wants_thought, provider) -> dict: | |
| """Re-prompt for a plain spoken line and clean it; skip the turn if it's junk.""" | |
| obs.log("agent.prose_fallback", agent=getattr(self, "name", role)) | |
| raw = provider.complete(role, prompt + _PROSE_FALLBACK) | |
| self.last_usage = dict(provider.last_usage) | |
| self._guard_model_error(role, raw) | |
| clue, residue = clean_clue(raw) | |
| if not is_usable_line(clue): | |
| raise AgentOutputError(f"{getattr(self, 'name', role)}: no usable line from prose fallback") | |
| payload: dict = {"kind": allowed[0], "text": clue} | |
| if wants_thought: | |
| thought = (getattr(provider, "last_reasoning", "") or "").strip() or extract_reasoning(raw) or residue | |
| if thought: | |
| payload["thought"] = thought[:600] | |
| return payload | |
| def _with_reasoning(payload: dict, provider, raw: str, wants_thought: bool) -> dict: | |
| """Surface the model's *thinking* as the ``thought`` when it gave none. | |
| Reasoning models return their chain-of-thought separately | |
| (``provider.last_reasoning``, from vLLM's reasoning parser) or inline in | |
| ``<think>`` tags. When the agent wants a ``thought`` and the structured | |
| field was empty (the fallback path), we fill it from that reasoning so the | |
| UI's mind-reader has something real to show. It rides only on this event's | |
| payload β the blackboard and memory share ``text`` alone, so a peer never | |
| reads another mind's thinking.""" | |
| if not wants_thought or payload.get("thought"): | |
| return payload | |
| reasoning = (getattr(provider, "last_reasoning", "") or "").strip() or extract_reasoning(raw) | |
| if reasoning: | |
| payload["thought"] = reasoning | |
| return payload | |
| def _complete(self, role: str, prompt: str) -> str: | |
| """Route to the provider for this agent's profile and record token usage.""" | |
| provider = self.router.for_profile(self._route_key) | |
| self._record_model(provider) | |
| raw = provider.complete(role, prompt) | |
| self.last_usage = dict(provider.last_usage) | |
| self._guard_model_error(role, raw) | |
| return raw | |
| def _record_model(self, provider) -> None: | |
| """Remember the model behind the current generation so the emitted event can | |
| record it (the route key asked for + the concrete model that actually ran).""" | |
| self.last_model_profile = self._route_key | |
| self.last_model_id = getattr(provider, "model_id", "") or None | |
| # ββ memory ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _recall(self, turn: int, projection: StageProjection, recent_events: tuple[Event, ...]) -> str: | |
| cfg = self.manifest.memory | |
| if cfg.use_salience: | |
| # The index (when attached) upgrades only the relevance term to | |
| # semantic search; recency/importance and the visibility filter are | |
| # unchanged. With no index this is the keyword-Jaccard path. | |
| return SalienceMemory( | |
| self.manifest.name, top_k=cfg.salience_top_k, index=self.memory_index | |
| ).format_for_prompt(recent_events, current_turn=turn, query=projection.current_scene) | |
| return EpisodicMemory(self.manifest.name, max_recent=cfg.window).format_for_prompt(recent_events) | |
| def _tracker(self, threshold: int) -> ReflectionTracker: | |
| if self._reflection_tracker is None: | |
| self._reflection_tracker = ReflectionTracker(self.manifest.name, threshold) | |
| return self._reflection_tracker | |
| def _emit_reflection(self, run_id: str, turn: int, recent_events: tuple[Event, ...]) -> Event: | |
| memory = EpisodicMemory(self.manifest.name, max_recent=20).format_for_prompt(recent_events) | |
| prompt = ( | |
| f"IDENTITY\n{self.manifest.persona}\n\n" | |
| f"RECENT MEMORY (events you witnessed)\n{memory}\n\n" | |
| "TASK\nSynthesise the above into ONE short, high-level belief about yourself or the " | |
| "world. It will replace raw memories in your future context.\n\n" | |
| "OUTPUT FORMAT\nReply with a single JSON object and nothing else: " | |
| '{"kind": "agent.reflected", "text": "<one-sentence belief>"}' | |
| ) | |
| raw = self._complete(self.manifest.name + "-reflect", prompt) | |
| parsed = parse_agent_output(raw, [_REFLECTION_KIND], _REFLECTION_KIND) | |
| return Event( | |
| run_id=run_id, | |
| turn=turn, | |
| kind=_REFLECTION_KIND, | |
| actor=self.manifest.name, | |
| payload={k: v for k, v in parsed.items() if k != "kind"}, | |
| model_profile=self.last_model_profile, | |
| model_id=self.last_model_id, | |
| ) | |
| # ββ output authority ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _content_kinds(self) -> list[str]: | |
| """Domain kinds this agent may emit on a normal turn (excludes reflection).""" | |
| kinds = [k for k in self.manifest.may_emit if k != _REFLECTION_KIND] | |
| return kinds or ["agent.spoke"] | |
| # ββ tools βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _tools_block(self) -> str: | |
| if self.tools is None or not self.manifest.tools: | |
| return "" | |
| described = self.tools.describe(self.manifest.tools) | |
| return f"AVAILABLE TOOLS\n{described}" if described else "" | |
| def call_tool(self, tool: str, **params) -> dict: | |
| """Capability-checked tool call. Raises if the manifest does not grant *tool*.""" | |
| if self.tools is None: | |
| raise RuntimeError(f"{self.manifest.name} has no tool registry attached") | |
| return self.tools.call(self.manifest.name, self.manifest, tool, params) | |