Spaces:
Running on Zero
Running on Zero
| from __future__ import annotations | |
| from dataclasses import dataclass, field | |
| from src import observability as obs | |
| class Governor: | |
| """Rate and budget guard for the conductor loop. | |
| Prevents runaway inference cascades by tracking calls, tokens, and (optional) | |
| spend per run and per turn. Small models are cheap, but a 'many small models | |
| posting to a shared board' topology is exactly what produces surprise bills — | |
| so the governor is the runtime safety valve (ADR-0007, ADR-0013). | |
| All token/cost limits default to off, so existing call-only behaviour is | |
| unchanged unless a scenario opts in. | |
| """ | |
| max_turns: int = 100 | |
| max_calls_per_turn: int = 8 | |
| max_total_calls: int = 500 | |
| max_total_tokens: int | None = None | |
| hourly_budget_usd: float | None = None | |
| _total_calls: int = field(default=0, init=False, repr=False) | |
| _calls_this_turn: int = field(default=0, init=False, repr=False) | |
| _current_turn: int = field(default=-1, init=False, repr=False) | |
| _total_tokens: int = field(default=0, init=False, repr=False) | |
| _spend_usd: float = field(default=0.0, init=False, repr=False) | |
| def begin_turn(self, turn: int) -> None: | |
| if turn != self._current_turn: | |
| self._calls_this_turn = 0 | |
| self._current_turn = turn | |
| def check(self, turn: int) -> None: | |
| if turn > self.max_turns: | |
| self._trip("max_turns", f"Turn cap {self.max_turns} reached") | |
| if self._total_calls >= self.max_total_calls: | |
| self._trip("max_total_calls", f"Total call cap {self.max_total_calls} reached") | |
| if self._calls_this_turn >= self.max_calls_per_turn: | |
| self._trip("max_calls_per_turn", f"Per-turn call cap {self.max_calls_per_turn} reached on turn {turn}") | |
| if self.max_total_tokens is not None and self._total_tokens >= self.max_total_tokens: | |
| self._trip("max_total_tokens", f"Total token cap {self.max_total_tokens} reached") | |
| if self.hourly_budget_usd is not None and self._spend_usd >= self.hourly_budget_usd: | |
| self._trip("hourly_budget_usd", f"Spend cap ${self.hourly_budget_usd:.2f} reached") | |
| def _trip(self, reason: str, message: str) -> None: | |
| """Record the budget trip as a metric + log, then raise the stop.""" | |
| obs.record_governor_trip(reason) | |
| obs.log( | |
| "governor.trip", | |
| level="warning", | |
| reason=reason, | |
| message=message, | |
| total_calls=self._total_calls, | |
| total_tokens=self._total_tokens, | |
| spend_usd=round(self._spend_usd, 4), | |
| ) | |
| raise BudgetExceeded(message, reason=reason) | |
| def record_call(self, tokens: int = 0, cost_usd: float = 0.0) -> None: | |
| self._calls_this_turn += 1 | |
| self._total_calls += 1 | |
| self._total_tokens += max(0, tokens) | |
| self._spend_usd += max(0.0, cost_usd) | |
| def reset(self) -> None: | |
| """Zero the counters but keep the configured limits. | |
| Used by Conductor.reset() between runs so budget config survives a restart | |
| (the old code re-ran __init__, which silently dropped any extra limits).""" | |
| self._total_calls = 0 | |
| self._calls_this_turn = 0 | |
| self._current_turn = -1 | |
| self._total_tokens = 0 | |
| self._spend_usd = 0.0 | |
| def stats(self) -> dict[str, int | float]: | |
| return { | |
| "total_calls": self._total_calls, | |
| "calls_this_turn": self._calls_this_turn, | |
| "current_turn": self._current_turn, | |
| "total_tokens": self._total_tokens, | |
| "spend_usd": round(self._spend_usd, 4), | |
| } | |
| def snapshot(self) -> dict[str, int | float | None]: | |
| """Read-only view of the live counters alongside their configured limits. | |
| Handy for UI surfaces that want to show "X of Y calls used" without | |
| reaching into private fields.""" | |
| return { | |
| **self.stats, | |
| "max_turns": self.max_turns, | |
| "max_calls_per_turn": self.max_calls_per_turn, | |
| "max_total_calls": self.max_total_calls, | |
| "max_total_tokens": self.max_total_tokens, | |
| "hourly_budget_usd": self.hourly_budget_usd, | |
| } | |
| class BudgetExceeded(RuntimeError): | |
| """Raised when a Governor bound trips. | |
| Carries a structured ``reason`` naming which bound tripped (one of | |
| ``max_turns`` / ``max_total_calls`` / ``max_calls_per_turn`` / | |
| ``max_total_tokens`` / ``hourly_budget_usd``) while ``str(exc)`` stays a | |
| human-readable message. Remains a ``RuntimeError`` subclass so existing | |
| generic handlers keep working.""" | |
| def __init__(self, message: str, *, reason: str | None = None) -> None: | |
| super().__init__(message) | |
| self.reason = reason | |