""" telemetry_buffer.py Per-robot-per-metric rolling buffers used by the agent graph's robot poll loop. Each robot poll writes one reading into each metric's buffer; when a buffer reaches WINDOW_SIZE, it becomes eligible for an EML symbolic-law fit. Also caches the most recent successful fit per (robot, metric) so that graph nodes created from telemetry can carry the law as an attribute. Design notes ------------ - Buffer is a deque(maxlen=WINDOW_SIZE), so old samples are evicted automatically — no manual trimming. - We rate-limit fits to at most one per FIT_COOLDOWN_SEC per (robot, metric). A 32-sample window at 30 s polling is ~16 minutes of data; refitting more often than every 10 minutes is wasteful. - A fit is only eligible if the buffer is full AND the cooldown has elapsed AND at least REFIT_NEW_SAMPLES new readings have arrived since the last fit. This prevents busy-loops on constant signals. - All timestamps are monotonic seconds (time.monotonic() from the process start), which is sufficient for EML's relative-time inputs and avoids wall-clock skew issues. """ from __future__ import annotations import logging import time from collections import deque from dataclasses import dataclass, field from threading import Lock from typing import Iterable log = logging.getLogger("telemetry_buffer") # --------------------------------------------------------------------------- # Tunables # --------------------------------------------------------------------------- WINDOW_SIZE = 32 # readings per metric before a fit is triggered FIT_COOLDOWN_SEC = 600 # 10 minutes between fits on the same (robot, metric) REFIT_NEW_SAMPLES = 8 # require N new samples since last fit before next # --------------------------------------------------------------------------- # Data types # --------------------------------------------------------------------------- @dataclass class Reading: """A single polled telemetry value with its monotonic timestamp.""" t: float v: float @dataclass class Law: """The most recent successful EML fit for a (robot, metric) pair.""" expression: str # simplified form ("0.5*log(t) + 10") raw_expression: str # raw eml(...) form final_loss: float tree_size: int depth_used: int fit_at: float # monotonic time when fit was recorded n_samples_when_fit: int # how many readings had been seen when fit @dataclass class _BufferEntry: """Per-(robot, metric) state.""" readings: deque = field(default_factory=lambda: deque(maxlen=WINDOW_SIZE)) last_fit_at: float = 0.0 samples_at_last_fit: int = 0 total_samples: int = 0 # lifetime count for refit gating law: Law | None = None # --------------------------------------------------------------------------- # Main registry # --------------------------------------------------------------------------- class TelemetryBuffers: """Thread-safe registry of per-(robot, metric) buffers. Mutations happen on the poll loop's thread (asyncio single-threaded in this Space), but reads may happen from other loops (expand, WS broadcast). The lock is cheap insurance. """ def __init__(self) -> None: self._entries: dict[tuple[str, str], _BufferEntry] = {} self._lock = Lock() # -- mutations -------------------------------------------------------- def add(self, robot_id: str, metric: str, value: float | None) -> None: """Append one reading. Silently ignores None values.""" if value is None: return key = (str(robot_id), str(metric)) now = time.monotonic() with self._lock: entry = self._entries.setdefault(key, _BufferEntry()) entry.readings.append(Reading(t=now, v=float(value))) entry.total_samples += 1 def record_fit( self, robot_id: str, metric: str, *, expression: str, raw_expression: str, final_loss: float, tree_size: int, depth_used: int, ) -> None: """Cache a successful EML fit on the buffer.""" key = (str(robot_id), str(metric)) now = time.monotonic() with self._lock: entry = self._entries.get(key) if entry is None: return entry.law = Law( expression=expression, raw_expression=raw_expression, final_loss=final_loss, tree_size=tree_size, depth_used=depth_used, fit_at=now, n_samples_when_fit=entry.total_samples, ) entry.last_fit_at = now entry.samples_at_last_fit = entry.total_samples # -- gating queries --------------------------------------------------- def should_fit(self, robot_id: str, metric: str) -> bool: """Is this (robot, metric) eligible for a new fit right now?""" key = (str(robot_id), str(metric)) now = time.monotonic() with self._lock: entry = self._entries.get(key) if entry is None: return False if len(entry.readings) < WINDOW_SIZE: return False if now - entry.last_fit_at < FIT_COOLDOWN_SEC and entry.last_fit_at > 0: return False new_since_fit = entry.total_samples - entry.samples_at_last_fit if entry.last_fit_at > 0 and new_since_fit < REFIT_NEW_SAMPLES: return False return True # -- reads ------------------------------------------------------------ def snapshot( self, robot_id: str, metric: str ) -> tuple[list[float], list[float]] | None: """Return (timestamps, values) for the current buffer contents.""" key = (str(robot_id), str(metric)) with self._lock: entry = self._entries.get(key) if entry is None or not entry.readings: return None ts = [r.t for r in entry.readings] vs = [r.v for r in entry.readings] # Normalise time to start at 0 for the fitter — keeps inputs small. t0 = ts[0] return [t - t0 for t in ts], vs def get_law(self, robot_id: str, metric: str) -> Law | None: """Return the most recent cached law for this (robot, metric), if any.""" key = (str(robot_id), str(metric)) with self._lock: entry = self._entries.get(key) return entry.law if entry else None def all_laws_for_robot(self, robot_id: str) -> dict[str, Law]: """All known laws for a robot, keyed by metric name.""" out: dict[str, Law] = {} rid = str(robot_id) with self._lock: for (r, m), entry in self._entries.items(): if r == rid and entry.law: out[m] = entry.law return out # -- maintenance ------------------------------------------------------ def prune_robot(self, robot_id: str) -> None: """Forget all state for a robot (e.g. when it goes offline).""" rid = str(robot_id) with self._lock: for key in [k for k in self._entries if k[0] == rid]: del self._entries[key] def tracked_metrics(self, robot_id: str) -> Iterable[str]: rid = str(robot_id) with self._lock: return [m for (r, m) in self._entries if r == rid] # Module-level singleton (matches the pattern used elsewhere in the Space). telemetry_buffers = TelemetryBuffers() __all__ = [ "TelemetryBuffers", "telemetry_buffers", "Law", "Reading", "WINDOW_SIZE", "FIT_COOLDOWN_SEC", "REFIT_NEW_SAMPLES", ]