Spaces:
Running
Running
| """ | |
| telemetry_buffer.py | |
| Per-robot-per-metric rolling buffers used by the agent graph's robot | |
| poll loop. Each robot poll writes one reading into each metric's buffer; | |
| when a buffer reaches WINDOW_SIZE, it becomes eligible for an EML | |
| symbolic-law fit. | |
| Also caches the most recent successful fit per (robot, metric) so that | |
| graph nodes created from telemetry can carry the law as an attribute. | |
| Design notes | |
| ------------ | |
| - Buffer is a deque(maxlen=WINDOW_SIZE), so old samples are evicted | |
| automatically — no manual trimming. | |
| - We rate-limit fits to at most one per FIT_COOLDOWN_SEC per | |
| (robot, metric). A 32-sample window at 30 s polling is ~16 minutes of | |
| data; refitting more often than every 10 minutes is wasteful. | |
| - A fit is only eligible if the buffer is full AND the cooldown has | |
| elapsed AND at least REFIT_NEW_SAMPLES new readings have arrived | |
| since the last fit. This prevents busy-loops on constant signals. | |
| - All timestamps are monotonic seconds (time.monotonic() from the | |
| process start), which is sufficient for EML's relative-time inputs | |
| and avoids wall-clock skew issues. | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| import time | |
| from collections import deque | |
| from dataclasses import dataclass, field | |
| from threading import Lock | |
| from typing import Iterable | |
| log = logging.getLogger("telemetry_buffer") | |
| # --------------------------------------------------------------------------- | |
| # Tunables | |
| # --------------------------------------------------------------------------- | |
| WINDOW_SIZE = 32 # readings per metric before a fit is triggered | |
| FIT_COOLDOWN_SEC = 600 # 10 minutes between fits on the same (robot, metric) | |
| REFIT_NEW_SAMPLES = 8 # require N new samples since last fit before next | |
| # --------------------------------------------------------------------------- | |
| # Data types | |
| # --------------------------------------------------------------------------- | |
| class Reading: | |
| """A single polled telemetry value with its monotonic timestamp.""" | |
| t: float | |
| v: float | |
| class Law: | |
| """The most recent successful EML fit for a (robot, metric) pair.""" | |
| expression: str # simplified form ("0.5*log(t) + 10") | |
| raw_expression: str # raw eml(...) form | |
| final_loss: float | |
| tree_size: int | |
| depth_used: int | |
| fit_at: float # monotonic time when fit was recorded | |
| n_samples_when_fit: int # how many readings had been seen when fit | |
| class _BufferEntry: | |
| """Per-(robot, metric) state.""" | |
| readings: deque = field(default_factory=lambda: deque(maxlen=WINDOW_SIZE)) | |
| last_fit_at: float = 0.0 | |
| samples_at_last_fit: int = 0 | |
| total_samples: int = 0 # lifetime count for refit gating | |
| law: Law | None = None | |
| # --------------------------------------------------------------------------- | |
| # Main registry | |
| # --------------------------------------------------------------------------- | |
| class TelemetryBuffers: | |
| """Thread-safe registry of per-(robot, metric) buffers. | |
| Mutations happen on the poll loop's thread (asyncio single-threaded | |
| in this Space), but reads may happen from other loops (expand, WS | |
| broadcast). The lock is cheap insurance. | |
| """ | |
| def __init__(self) -> None: | |
| self._entries: dict[tuple[str, str], _BufferEntry] = {} | |
| self._lock = Lock() | |
| # -- mutations -------------------------------------------------------- | |
| def add(self, robot_id: str, metric: str, value: float | None) -> None: | |
| """Append one reading. Silently ignores None values.""" | |
| if value is None: | |
| return | |
| key = (str(robot_id), str(metric)) | |
| now = time.monotonic() | |
| with self._lock: | |
| entry = self._entries.setdefault(key, _BufferEntry()) | |
| entry.readings.append(Reading(t=now, v=float(value))) | |
| entry.total_samples += 1 | |
| def record_fit( | |
| self, | |
| robot_id: str, | |
| metric: str, | |
| *, | |
| expression: str, | |
| raw_expression: str, | |
| final_loss: float, | |
| tree_size: int, | |
| depth_used: int, | |
| ) -> None: | |
| """Cache a successful EML fit on the buffer.""" | |
| key = (str(robot_id), str(metric)) | |
| now = time.monotonic() | |
| with self._lock: | |
| entry = self._entries.get(key) | |
| if entry is None: | |
| return | |
| entry.law = Law( | |
| expression=expression, | |
| raw_expression=raw_expression, | |
| final_loss=final_loss, | |
| tree_size=tree_size, | |
| depth_used=depth_used, | |
| fit_at=now, | |
| n_samples_when_fit=entry.total_samples, | |
| ) | |
| entry.last_fit_at = now | |
| entry.samples_at_last_fit = entry.total_samples | |
| # -- gating queries --------------------------------------------------- | |
| def should_fit(self, robot_id: str, metric: str) -> bool: | |
| """Is this (robot, metric) eligible for a new fit right now?""" | |
| key = (str(robot_id), str(metric)) | |
| now = time.monotonic() | |
| with self._lock: | |
| entry = self._entries.get(key) | |
| if entry is None: | |
| return False | |
| if len(entry.readings) < WINDOW_SIZE: | |
| return False | |
| if now - entry.last_fit_at < FIT_COOLDOWN_SEC and entry.last_fit_at > 0: | |
| return False | |
| new_since_fit = entry.total_samples - entry.samples_at_last_fit | |
| if entry.last_fit_at > 0 and new_since_fit < REFIT_NEW_SAMPLES: | |
| return False | |
| return True | |
| # -- reads ------------------------------------------------------------ | |
| def snapshot( | |
| self, robot_id: str, metric: str | |
| ) -> tuple[list[float], list[float]] | None: | |
| """Return (timestamps, values) for the current buffer contents.""" | |
| key = (str(robot_id), str(metric)) | |
| with self._lock: | |
| entry = self._entries.get(key) | |
| if entry is None or not entry.readings: | |
| return None | |
| ts = [r.t for r in entry.readings] | |
| vs = [r.v for r in entry.readings] | |
| # Normalise time to start at 0 for the fitter — keeps inputs small. | |
| t0 = ts[0] | |
| return [t - t0 for t in ts], vs | |
| def get_law(self, robot_id: str, metric: str) -> Law | None: | |
| """Return the most recent cached law for this (robot, metric), if any.""" | |
| key = (str(robot_id), str(metric)) | |
| with self._lock: | |
| entry = self._entries.get(key) | |
| return entry.law if entry else None | |
| def all_laws_for_robot(self, robot_id: str) -> dict[str, Law]: | |
| """All known laws for a robot, keyed by metric name.""" | |
| out: dict[str, Law] = {} | |
| rid = str(robot_id) | |
| with self._lock: | |
| for (r, m), entry in self._entries.items(): | |
| if r == rid and entry.law: | |
| out[m] = entry.law | |
| return out | |
| # -- maintenance ------------------------------------------------------ | |
| def prune_robot(self, robot_id: str) -> None: | |
| """Forget all state for a robot (e.g. when it goes offline).""" | |
| rid = str(robot_id) | |
| with self._lock: | |
| for key in [k for k in self._entries if k[0] == rid]: | |
| del self._entries[key] | |
| def tracked_metrics(self, robot_id: str) -> Iterable[str]: | |
| rid = str(robot_id) | |
| with self._lock: | |
| return [m for (r, m) in self._entries if r == rid] | |
| # Module-level singleton (matches the pattern used elsewhere in the Space). | |
| telemetry_buffers = TelemetryBuffers() | |
| __all__ = [ | |
| "TelemetryBuffers", | |
| "telemetry_buffers", | |
| "Law", | |
| "Reading", | |
| "WINDOW_SIZE", | |
| "FIT_COOLDOWN_SEC", | |
| "REFIT_NEW_SAMPLES", | |
| ] | |