nwo-agent-graph / telemetry_buffer.py
CPater's picture
Upload 2 files
24b683e verified
Raw
History Blame Contribute Delete
7.77 kB
"""
telemetry_buffer.py
Per-robot-per-metric rolling buffers used by the agent graph's robot
poll loop. Each robot poll writes one reading into each metric's buffer;
when a buffer reaches WINDOW_SIZE, it becomes eligible for an EML
symbolic-law fit.
Also caches the most recent successful fit per (robot, metric) so that
graph nodes created from telemetry can carry the law as an attribute.
Design notes
------------
- Buffer is a deque(maxlen=WINDOW_SIZE), so old samples are evicted
automatically — no manual trimming.
- We rate-limit fits to at most one per FIT_COOLDOWN_SEC per
(robot, metric). A 32-sample window at 30 s polling is ~16 minutes of
data; refitting more often than every 10 minutes is wasteful.
- A fit is only eligible if the buffer is full AND the cooldown has
elapsed AND at least REFIT_NEW_SAMPLES new readings have arrived
since the last fit. This prevents busy-loops on constant signals.
- All timestamps are monotonic seconds (time.monotonic() from the
process start), which is sufficient for EML's relative-time inputs
and avoids wall-clock skew issues.
"""
from __future__ import annotations
import logging
import time
from collections import deque
from dataclasses import dataclass, field
from threading import Lock
from typing import Iterable
log = logging.getLogger("telemetry_buffer")
# ---------------------------------------------------------------------------
# Tunables
# ---------------------------------------------------------------------------
WINDOW_SIZE = 32 # readings per metric before a fit is triggered
FIT_COOLDOWN_SEC = 600 # 10 minutes between fits on the same (robot, metric)
REFIT_NEW_SAMPLES = 8 # require N new samples since last fit before next
# ---------------------------------------------------------------------------
# Data types
# ---------------------------------------------------------------------------
@dataclass
class Reading:
"""A single polled telemetry value with its monotonic timestamp."""
t: float
v: float
@dataclass
class Law:
"""The most recent successful EML fit for a (robot, metric) pair."""
expression: str # simplified form ("0.5*log(t) + 10")
raw_expression: str # raw eml(...) form
final_loss: float
tree_size: int
depth_used: int
fit_at: float # monotonic time when fit was recorded
n_samples_when_fit: int # how many readings had been seen when fit
@dataclass
class _BufferEntry:
"""Per-(robot, metric) state."""
readings: deque = field(default_factory=lambda: deque(maxlen=WINDOW_SIZE))
last_fit_at: float = 0.0
samples_at_last_fit: int = 0
total_samples: int = 0 # lifetime count for refit gating
law: Law | None = None
# ---------------------------------------------------------------------------
# Main registry
# ---------------------------------------------------------------------------
class TelemetryBuffers:
"""Thread-safe registry of per-(robot, metric) buffers.
Mutations happen on the poll loop's thread (asyncio single-threaded
in this Space), but reads may happen from other loops (expand, WS
broadcast). The lock is cheap insurance.
"""
def __init__(self) -> None:
self._entries: dict[tuple[str, str], _BufferEntry] = {}
self._lock = Lock()
# -- mutations --------------------------------------------------------
def add(self, robot_id: str, metric: str, value: float | None) -> None:
"""Append one reading. Silently ignores None values."""
if value is None:
return
key = (str(robot_id), str(metric))
now = time.monotonic()
with self._lock:
entry = self._entries.setdefault(key, _BufferEntry())
entry.readings.append(Reading(t=now, v=float(value)))
entry.total_samples += 1
def record_fit(
self,
robot_id: str,
metric: str,
*,
expression: str,
raw_expression: str,
final_loss: float,
tree_size: int,
depth_used: int,
) -> None:
"""Cache a successful EML fit on the buffer."""
key = (str(robot_id), str(metric))
now = time.monotonic()
with self._lock:
entry = self._entries.get(key)
if entry is None:
return
entry.law = Law(
expression=expression,
raw_expression=raw_expression,
final_loss=final_loss,
tree_size=tree_size,
depth_used=depth_used,
fit_at=now,
n_samples_when_fit=entry.total_samples,
)
entry.last_fit_at = now
entry.samples_at_last_fit = entry.total_samples
# -- gating queries ---------------------------------------------------
def should_fit(self, robot_id: str, metric: str) -> bool:
"""Is this (robot, metric) eligible for a new fit right now?"""
key = (str(robot_id), str(metric))
now = time.monotonic()
with self._lock:
entry = self._entries.get(key)
if entry is None:
return False
if len(entry.readings) < WINDOW_SIZE:
return False
if now - entry.last_fit_at < FIT_COOLDOWN_SEC and entry.last_fit_at > 0:
return False
new_since_fit = entry.total_samples - entry.samples_at_last_fit
if entry.last_fit_at > 0 and new_since_fit < REFIT_NEW_SAMPLES:
return False
return True
# -- reads ------------------------------------------------------------
def snapshot(
self, robot_id: str, metric: str
) -> tuple[list[float], list[float]] | None:
"""Return (timestamps, values) for the current buffer contents."""
key = (str(robot_id), str(metric))
with self._lock:
entry = self._entries.get(key)
if entry is None or not entry.readings:
return None
ts = [r.t for r in entry.readings]
vs = [r.v for r in entry.readings]
# Normalise time to start at 0 for the fitter — keeps inputs small.
t0 = ts[0]
return [t - t0 for t in ts], vs
def get_law(self, robot_id: str, metric: str) -> Law | None:
"""Return the most recent cached law for this (robot, metric), if any."""
key = (str(robot_id), str(metric))
with self._lock:
entry = self._entries.get(key)
return entry.law if entry else None
def all_laws_for_robot(self, robot_id: str) -> dict[str, Law]:
"""All known laws for a robot, keyed by metric name."""
out: dict[str, Law] = {}
rid = str(robot_id)
with self._lock:
for (r, m), entry in self._entries.items():
if r == rid and entry.law:
out[m] = entry.law
return out
# -- maintenance ------------------------------------------------------
def prune_robot(self, robot_id: str) -> None:
"""Forget all state for a robot (e.g. when it goes offline)."""
rid = str(robot_id)
with self._lock:
for key in [k for k in self._entries if k[0] == rid]:
del self._entries[key]
def tracked_metrics(self, robot_id: str) -> Iterable[str]:
rid = str(robot_id)
with self._lock:
return [m for (r, m) in self._entries if r == rid]
# Module-level singleton (matches the pattern used elsewhere in the Space).
telemetry_buffers = TelemetryBuffers()
__all__ = [
"TelemetryBuffers",
"telemetry_buffers",
"Law",
"Reading",
"WINDOW_SIZE",
"FIT_COOLDOWN_SEC",
"REFIT_NEW_SAMPLES",
]