from __future__ import annotations from dataclasses import dataclass, field from src import observability as obs @dataclass class Governor: """Rate and budget guard for the conductor loop. Prevents runaway inference cascades by tracking calls, tokens, and (optional) spend per run and per turn. Small models are cheap, but a 'many small models posting to a shared board' topology is exactly what produces surprise bills — so the governor is the runtime safety valve (ADR-0007, ADR-0013). All token/cost limits default to off, so existing call-only behaviour is unchanged unless a scenario opts in. """ max_turns: int = 100 max_calls_per_turn: int = 8 max_total_calls: int = 500 max_total_tokens: int | None = None hourly_budget_usd: float | None = None _total_calls: int = field(default=0, init=False, repr=False) _calls_this_turn: int = field(default=0, init=False, repr=False) _current_turn: int = field(default=-1, init=False, repr=False) _total_tokens: int = field(default=0, init=False, repr=False) _spend_usd: float = field(default=0.0, init=False, repr=False) def begin_turn(self, turn: int) -> None: if turn != self._current_turn: self._calls_this_turn = 0 self._current_turn = turn def check(self, turn: int) -> None: if turn > self.max_turns: self._trip("max_turns", f"Turn cap {self.max_turns} reached") if self._total_calls >= self.max_total_calls: self._trip("max_total_calls", f"Total call cap {self.max_total_calls} reached") if self._calls_this_turn >= self.max_calls_per_turn: self._trip("max_calls_per_turn", f"Per-turn call cap {self.max_calls_per_turn} reached on turn {turn}") if self.max_total_tokens is not None and self._total_tokens >= self.max_total_tokens: self._trip("max_total_tokens", f"Total token cap {self.max_total_tokens} reached") if self.hourly_budget_usd is not None and self._spend_usd >= self.hourly_budget_usd: self._trip("hourly_budget_usd", f"Spend cap ${self.hourly_budget_usd:.2f} reached") def _trip(self, reason: str, message: str) -> None: """Record the budget trip as a metric + log, then raise the stop.""" obs.record_governor_trip(reason) obs.log( "governor.trip", level="warning", reason=reason, message=message, total_calls=self._total_calls, total_tokens=self._total_tokens, spend_usd=round(self._spend_usd, 4), ) raise BudgetExceeded(message, reason=reason) def record_call(self, tokens: int = 0, cost_usd: float = 0.0) -> None: self._calls_this_turn += 1 self._total_calls += 1 self._total_tokens += max(0, tokens) self._spend_usd += max(0.0, cost_usd) def reset(self) -> None: """Zero the counters but keep the configured limits. Used by Conductor.reset() between runs so budget config survives a restart (the old code re-ran __init__, which silently dropped any extra limits).""" self._total_calls = 0 self._calls_this_turn = 0 self._current_turn = -1 self._total_tokens = 0 self._spend_usd = 0.0 @property def stats(self) -> dict[str, int | float]: return { "total_calls": self._total_calls, "calls_this_turn": self._calls_this_turn, "current_turn": self._current_turn, "total_tokens": self._total_tokens, "spend_usd": round(self._spend_usd, 4), } @property def snapshot(self) -> dict[str, int | float | None]: """Read-only view of the live counters alongside their configured limits. Handy for UI surfaces that want to show "X of Y calls used" without reaching into private fields.""" return { **self.stats, "max_turns": self.max_turns, "max_calls_per_turn": self.max_calls_per_turn, "max_total_calls": self.max_total_calls, "max_total_tokens": self.max_total_tokens, "hourly_budget_usd": self.hourly_budget_usd, } class BudgetExceeded(RuntimeError): """Raised when a Governor bound trips. Carries a structured ``reason`` naming which bound tripped (one of ``max_turns`` / ``max_total_calls`` / ``max_calls_per_turn`` / ``max_total_tokens`` / ``hourly_budget_usd``) while ``str(exc)`` stays a human-readable message. Remains a ``RuntimeError`` subclass so existing generic handlers keep working.""" def __init__(self, message: str, *, reason: str | None = None) -> None: super().__init__(message) self.reason = reason