File size: 5,083 Bytes
3f7b296
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e1959d5
 
 
3f7b296
 
 
 
 
 
e1959d5
3f7b296
 
 
 
e1959d5
3f7b296
 
 
 
e1959d5
3f7b296
 
 
 
 
 
 
 
 
 
 
 
 
 
e1959d5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3f7b296
e1959d5
3f7b296
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e1959d5
3f7b296
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
"""In-memory telemetry store β€” the self-contained backend for the Gradio panel.

Logs, spans, and metric points are kept in bounded ring buffers (oldest dropped
once full) plus a cumulative counter table. This is deliberately *not* an
external collector: the whole monitoring story lives in-process so the live demo
shows logs, traces, and charts with nothing to deploy. The Telemetry tab reads
straight off the accessors here.

Thread-safe: Gradio serves requests on a pool, and OTEL ends spans from whatever
thread ran them, so every mutation takes the lock.
"""

from __future__ import annotations

import threading
import time
from collections import deque
from dataclasses import dataclass, field


@dataclass
class SpanRecord:
    """A finished span, flattened for display and JSON-friendliness."""

    name: str
    trace_id: str
    span_id: str
    parent_id: str | None
    start_ms: float
    end_ms: float
    duration_ms: float
    status: str
    attributes: dict = field(default_factory=dict)


@dataclass
class MetricPoint:
    name: str
    value: float
    ts: float
    labels: dict = field(default_factory=dict)


class TelemetryStore:
    """Bounded, thread-safe buffers of recent logs / spans / metrics."""

    def __init__(self, capacity: int = 4000) -> None:
        self._lock = threading.Lock()
        self._logs: deque[dict] = deque(maxlen=capacity)
        self._spans: deque[SpanRecord] = deque(maxlen=capacity)
        self._metrics: deque[MetricPoint] = deque(maxlen=capacity * 4)
        self._counters: dict[tuple, float] = {}
        # Monotonic ingest counter β€” a cheap "has anything changed since I last looked?"
        # signal so the Telemetry tab can skip recomputing/repainting on an idle tick.
        self._rev = 0

    # ── ingest ──────────────────────────────────────────────────────────────

    def add_log(self, record: dict) -> None:
        with self._lock:
            self._logs.append(record)
            self._rev += 1

    def add_span(self, span: SpanRecord) -> None:
        with self._lock:
            self._spans.append(span)
            self._rev += 1

    def add_metric(self, point: MetricPoint, *, counter: bool = False) -> None:
        with self._lock:
            self._metrics.append(point)
            self._rev += 1
            if counter:
                key = (point.name, tuple(sorted(point.labels.items())))
                self._counters[key] = self._counters.get(key, 0.0) + point.value

    # ── read (for the UI) ───────────────────────────────────────────────────

    def recent_logs(self, n: int = 200) -> list[dict]:
        with self._lock:
            return list(self._logs)[-n:]

    def recent_spans(self, n: int = 200) -> list[SpanRecord]:
        with self._lock:
            return list(self._spans)[-n:]

    def metric_points(self, name: str | None = None, limit: int | None = None) -> list[MetricPoint]:
        """Recorded points, optionally filtered by ``name``.

        With ``limit`` set, only the most recent ``limit`` matching points are returned
        (still chronological) β€” this bounds both the scan and the chart payload for
        high-frequency metrics like agent-turn latency, which would otherwise grow until
        the buffer is full.
        """
        with self._lock:
            if limit is None:
                return [p for p in self._metrics if name is None or p.name == name]
            out: list[MetricPoint] = []
            for p in reversed(self._metrics):  # newest-first, stop once we have `limit`
                if name is None or p.name == name:
                    out.append(p)
                    if len(out) >= limit:
                        break
            out.reverse()
            return out

    def revision(self) -> int:
        """Monotonic ingest counter β€” bumped on every log/span/metric and on ``clear``."""
        with self._lock:
            return self._rev

    def counter_totals(self) -> dict[str, float]:
        """Cumulative total per metric name, summed across label sets."""
        totals: dict[str, float] = {}
        with self._lock:
            for (name, _labels), value in self._counters.items():
                totals[name] = totals.get(name, 0.0) + value
        return totals

    def counters(self) -> dict[tuple, float]:
        """Raw cumulative counters keyed by (name, sorted-label-tuple)."""
        with self._lock:
            return dict(self._counters)

    def clear(self) -> None:
        with self._lock:
            self._logs.clear()
            self._spans.clear()
            self._metrics.clear()
            self._counters.clear()
            self._rev += 1  # a clear is a change too β€” let the UI repaint to empty


def now_ts() -> float:
    """Wall-clock seconds β€” isolated here so callers don't import ``time``."""
    return time.time()