Spaces:
Running on Zero
Running on Zero
agharsallah
feat: Enhance telemetry store with revision tracking and optimize UI refresh logic
e1959d5 | """In-memory telemetry store β the self-contained backend for the Gradio panel. | |
| Logs, spans, and metric points are kept in bounded ring buffers (oldest dropped | |
| once full) plus a cumulative counter table. This is deliberately *not* an | |
| external collector: the whole monitoring story lives in-process so the live demo | |
| shows logs, traces, and charts with nothing to deploy. The Telemetry tab reads | |
| straight off the accessors here. | |
| Thread-safe: Gradio serves requests on a pool, and OTEL ends spans from whatever | |
| thread ran them, so every mutation takes the lock. | |
| """ | |
| from __future__ import annotations | |
| import threading | |
| import time | |
| from collections import deque | |
| from dataclasses import dataclass, field | |
| class SpanRecord: | |
| """A finished span, flattened for display and JSON-friendliness.""" | |
| name: str | |
| trace_id: str | |
| span_id: str | |
| parent_id: str | None | |
| start_ms: float | |
| end_ms: float | |
| duration_ms: float | |
| status: str | |
| attributes: dict = field(default_factory=dict) | |
| class MetricPoint: | |
| name: str | |
| value: float | |
| ts: float | |
| labels: dict = field(default_factory=dict) | |
| class TelemetryStore: | |
| """Bounded, thread-safe buffers of recent logs / spans / metrics.""" | |
| def __init__(self, capacity: int = 4000) -> None: | |
| self._lock = threading.Lock() | |
| self._logs: deque[dict] = deque(maxlen=capacity) | |
| self._spans: deque[SpanRecord] = deque(maxlen=capacity) | |
| self._metrics: deque[MetricPoint] = deque(maxlen=capacity * 4) | |
| self._counters: dict[tuple, float] = {} | |
| # Monotonic ingest counter β a cheap "has anything changed since I last looked?" | |
| # signal so the Telemetry tab can skip recomputing/repainting on an idle tick. | |
| self._rev = 0 | |
| # ββ ingest ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def add_log(self, record: dict) -> None: | |
| with self._lock: | |
| self._logs.append(record) | |
| self._rev += 1 | |
| def add_span(self, span: SpanRecord) -> None: | |
| with self._lock: | |
| self._spans.append(span) | |
| self._rev += 1 | |
| def add_metric(self, point: MetricPoint, *, counter: bool = False) -> None: | |
| with self._lock: | |
| self._metrics.append(point) | |
| self._rev += 1 | |
| if counter: | |
| key = (point.name, tuple(sorted(point.labels.items()))) | |
| self._counters[key] = self._counters.get(key, 0.0) + point.value | |
| # ββ read (for the UI) βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def recent_logs(self, n: int = 200) -> list[dict]: | |
| with self._lock: | |
| return list(self._logs)[-n:] | |
| def recent_spans(self, n: int = 200) -> list[SpanRecord]: | |
| with self._lock: | |
| return list(self._spans)[-n:] | |
| def metric_points(self, name: str | None = None, limit: int | None = None) -> list[MetricPoint]: | |
| """Recorded points, optionally filtered by ``name``. | |
| With ``limit`` set, only the most recent ``limit`` matching points are returned | |
| (still chronological) β this bounds both the scan and the chart payload for | |
| high-frequency metrics like agent-turn latency, which would otherwise grow until | |
| the buffer is full. | |
| """ | |
| with self._lock: | |
| if limit is None: | |
| return [p for p in self._metrics if name is None or p.name == name] | |
| out: list[MetricPoint] = [] | |
| for p in reversed(self._metrics): # newest-first, stop once we have `limit` | |
| if name is None or p.name == name: | |
| out.append(p) | |
| if len(out) >= limit: | |
| break | |
| out.reverse() | |
| return out | |
| def revision(self) -> int: | |
| """Monotonic ingest counter β bumped on every log/span/metric and on ``clear``.""" | |
| with self._lock: | |
| return self._rev | |
| def counter_totals(self) -> dict[str, float]: | |
| """Cumulative total per metric name, summed across label sets.""" | |
| totals: dict[str, float] = {} | |
| with self._lock: | |
| for (name, _labels), value in self._counters.items(): | |
| totals[name] = totals.get(name, 0.0) + value | |
| return totals | |
| def counters(self) -> dict[tuple, float]: | |
| """Raw cumulative counters keyed by (name, sorted-label-tuple).""" | |
| with self._lock: | |
| return dict(self._counters) | |
| def clear(self) -> None: | |
| with self._lock: | |
| self._logs.clear() | |
| self._spans.clear() | |
| self._metrics.clear() | |
| self._counters.clear() | |
| self._rev += 1 # a clear is a change too β let the UI repaint to empty | |
| def now_ts() -> float: | |
| """Wall-clock seconds β isolated here so callers don't import ``time``.""" | |
| return time.time() | |