agharsallah
feat: Enhance telemetry store with revision tracking and optimize UI refresh logic
e1959d5
Raw
History Blame Contribute Delete
5.08 kB
"""In-memory telemetry store β€” the self-contained backend for the Gradio panel.
Logs, spans, and metric points are kept in bounded ring buffers (oldest dropped
once full) plus a cumulative counter table. This is deliberately *not* an
external collector: the whole monitoring story lives in-process so the live demo
shows logs, traces, and charts with nothing to deploy. The Telemetry tab reads
straight off the accessors here.
Thread-safe: Gradio serves requests on a pool, and OTEL ends spans from whatever
thread ran them, so every mutation takes the lock.
"""
from __future__ import annotations
import threading
import time
from collections import deque
from dataclasses import dataclass, field
@dataclass
class SpanRecord:
"""A finished span, flattened for display and JSON-friendliness."""
name: str
trace_id: str
span_id: str
parent_id: str | None
start_ms: float
end_ms: float
duration_ms: float
status: str
attributes: dict = field(default_factory=dict)
@dataclass
class MetricPoint:
name: str
value: float
ts: float
labels: dict = field(default_factory=dict)
class TelemetryStore:
"""Bounded, thread-safe buffers of recent logs / spans / metrics."""
def __init__(self, capacity: int = 4000) -> None:
self._lock = threading.Lock()
self._logs: deque[dict] = deque(maxlen=capacity)
self._spans: deque[SpanRecord] = deque(maxlen=capacity)
self._metrics: deque[MetricPoint] = deque(maxlen=capacity * 4)
self._counters: dict[tuple, float] = {}
# Monotonic ingest counter β€” a cheap "has anything changed since I last looked?"
# signal so the Telemetry tab can skip recomputing/repainting on an idle tick.
self._rev = 0
# ── ingest ──────────────────────────────────────────────────────────────
def add_log(self, record: dict) -> None:
with self._lock:
self._logs.append(record)
self._rev += 1
def add_span(self, span: SpanRecord) -> None:
with self._lock:
self._spans.append(span)
self._rev += 1
def add_metric(self, point: MetricPoint, *, counter: bool = False) -> None:
with self._lock:
self._metrics.append(point)
self._rev += 1
if counter:
key = (point.name, tuple(sorted(point.labels.items())))
self._counters[key] = self._counters.get(key, 0.0) + point.value
# ── read (for the UI) ───────────────────────────────────────────────────
def recent_logs(self, n: int = 200) -> list[dict]:
with self._lock:
return list(self._logs)[-n:]
def recent_spans(self, n: int = 200) -> list[SpanRecord]:
with self._lock:
return list(self._spans)[-n:]
def metric_points(self, name: str | None = None, limit: int | None = None) -> list[MetricPoint]:
"""Recorded points, optionally filtered by ``name``.
With ``limit`` set, only the most recent ``limit`` matching points are returned
(still chronological) β€” this bounds both the scan and the chart payload for
high-frequency metrics like agent-turn latency, which would otherwise grow until
the buffer is full.
"""
with self._lock:
if limit is None:
return [p for p in self._metrics if name is None or p.name == name]
out: list[MetricPoint] = []
for p in reversed(self._metrics): # newest-first, stop once we have `limit`
if name is None or p.name == name:
out.append(p)
if len(out) >= limit:
break
out.reverse()
return out
def revision(self) -> int:
"""Monotonic ingest counter β€” bumped on every log/span/metric and on ``clear``."""
with self._lock:
return self._rev
def counter_totals(self) -> dict[str, float]:
"""Cumulative total per metric name, summed across label sets."""
totals: dict[str, float] = {}
with self._lock:
for (name, _labels), value in self._counters.items():
totals[name] = totals.get(name, 0.0) + value
return totals
def counters(self) -> dict[tuple, float]:
"""Raw cumulative counters keyed by (name, sorted-label-tuple)."""
with self._lock:
return dict(self._counters)
def clear(self) -> None:
with self._lock:
self._logs.clear()
self._spans.clear()
self._metrics.clear()
self._counters.clear()
self._rev += 1 # a clear is a change too β€” let the UI repaint to empty
def now_ts() -> float:
"""Wall-clock seconds β€” isolated here so callers don't import ``time``."""
return time.time()