File size: 7,896 Bytes
3f7b296
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
"""Observability facade β€” the one import every layer uses.

Usage at any call site::

    from src import observability as obs

    log = obs.get_logger(__name__)

    with obs.span("llm.call", **{"gen_ai.request.model": model}):
        ...
        obs.add_span_attrs(**{"gen_ai.usage.output_tokens": n})
        obs.record_llm_call(model, prompt_tokens=p, completion_tokens=n, cost_usd=c)

    obs.log("event.append", kind=event.kind, actor=event.actor)

This module owns the singletons (settings, in-memory store, tracer) and keeps the
public surface tiny and stable so instrumentation across the codebase never
touches the OpenTelemetry SDK directly. Structured logging + tracing + in-process
metrics all flow through here; the Gradio Telemetry panel reads from
:func:`telemetry_store`. See ADR-0024.
"""

from __future__ import annotations

import logging
from contextlib import contextmanager
from dataclasses import replace
from typing import Iterator

from .config import ObservabilitySettings
from .context import bind, current_context, set_context
from .store import MetricPoint, SpanRecord, TelemetryStore, now_ts

__all__ = [
    "configure",
    "get_logger",
    "log",
    "span",
    "add_span_attrs",
    "incr",
    "observe",
    "record_llm_call",
    "record_agent_turn",
    "record_governor_trip",
    "bind",
    "set_context",
    "current_context",
    "telemetry_store",
    "settings",
    "ObservabilitySettings",
    "SpanRecord",
    "MetricPoint",
]

_configured = False
_settings: ObservabilitySettings | None = None
_store: TelemetryStore | None = None
_tracer = None

# Logging extras may not clobber these built-in LogRecord attributes.
_LOG_RESERVED = frozenset(
    {
        "name",
        "msg",
        "args",
        "levelname",
        "levelno",
        "pathname",
        "filename",
        "module",
        "exc_info",
        "exc_text",
        "stack_info",
        "lineno",
        "funcName",
        "created",
        "msecs",
        "relativeCreated",
        "thread",
        "threadName",
        "processName",
        "process",
        "taskName",
        "message",
    }
)


def configure(
    level: str | None = None,
    fmt: str | None = None,
    tracing: str | None = None,
    *,
    force: bool = False,
) -> None:
    """Initialise logging + tracing once (idempotent).

    Reads ``MAL_*`` env vars; explicit args override them. Safe to call from any
    entrypoint (app, Modal, a smoke script) β€” later calls are no-ops unless
    ``force=True``.
    """
    global _configured, _settings, _store, _tracer
    if _configured and not force:
        return

    resolved = ObservabilitySettings.from_env()
    overrides: dict[str, object] = {}
    if level is not None:
        overrides["level"] = level.upper()
    if fmt is not None:
        overrides["fmt"] = fmt.lower()
    if tracing is not None:
        overrides["tracing"] = tracing.lower()
    if overrides:
        resolved = replace(resolved, **overrides)

    _settings = resolved
    _store = TelemetryStore(capacity=resolved.store_capacity)

    from .logging_setup import setup_logging

    setup_logging(resolved, _store)

    from .tracing import init_tracing

    _tracer = init_tracing(resolved, _store)
    _configured = True


def _ensure() -> None:
    if not _configured:
        configure()


def settings() -> ObservabilitySettings:
    _ensure()
    assert _settings is not None
    return _settings


def telemetry_store() -> TelemetryStore:
    """The in-memory store backing the Gradio Telemetry panel."""
    _ensure()
    assert _store is not None
    return _store


# ── logging ─────────────────────────────────────────────────────────────────


def get_logger(name: str) -> logging.Logger:
    _ensure()
    return logging.getLogger(name)


def log(event: str, level: str = "info", *, logger: str = "mal", msg: str | None = None, **fields) -> None:
    """Emit one structured record: an ``event`` name plus arbitrary ``fields``.

    The bound run/turn/agent are added automatically. Reserved ``LogRecord``
    attribute names in ``fields`` are suffixed with ``_`` so logging never raises.
    """
    _ensure()
    lg = logging.getLogger(logger)
    lvl = logging._nameToLevel.get(level.upper(), logging.INFO)
    if not lg.isEnabledFor(lvl):
        return
    extra: dict[str, object] = {"event": event}
    for key, value in fields.items():
        extra[f"{key}_" if key in _LOG_RESERVED else key] = value
    lg.log(lvl, msg if msg is not None else event, extra=extra)


# ── tracing ───────────────────────────────────────────────────────────────--


def _attr_value(value: object) -> object:
    if isinstance(value, (str, bool, int, float)):
        return value
    if isinstance(value, (list, tuple)):
        return [str(v) for v in value]
    import json

    try:
        return json.dumps(value, default=str)
    except (TypeError, ValueError):
        return str(value)


@contextmanager
def span(name: str, **attributes) -> Iterator[object]:
    """Open a span named *name* with *attributes*; nesting is automatic.

    Yields the span (or ``None`` if tracing is off). Records and re-raises any
    exception with an ERROR status.
    """
    _ensure()
    if _tracer is None:
        yield None
        return
    with _tracer.start_as_current_span(name) as sp:
        for key, value in attributes.items():
            if value is not None:
                sp.set_attribute(key, _attr_value(value))
        try:
            yield sp
        except Exception as exc:  # noqa: BLE001 - record then re-raise
            from opentelemetry.trace import Status, StatusCode

            sp.record_exception(exc)
            sp.set_status(Status(StatusCode.ERROR, str(exc)))
            raise


def add_span_attrs(**attrs) -> None:
    """Attach attributes to the currently-active span (no-op if none/off)."""
    _ensure()
    if _tracer is None:
        return
    from opentelemetry import trace

    sp = trace.get_current_span()
    if sp is None or not sp.is_recording():
        return
    for key, value in attrs.items():
        if value is not None:
            sp.set_attribute(key, _attr_value(value))


# ── metrics (in-process, feeding the UI charts) ──────────────────────────────


def incr(name: str, value: float = 1, **labels) -> None:
    """Add to a cumulative counter (e.g. ``llm.calls``, ``governor.trips``)."""
    _ensure()
    assert _store is not None
    _store.add_metric(MetricPoint(name=name, value=float(value), ts=now_ts(), labels=labels), counter=True)


def observe(name: str, value: float, **labels) -> None:
    """Record a histogram-style observation (e.g. ``agent.turn.seconds``)."""
    _ensure()
    assert _store is not None
    _store.add_metric(MetricPoint(name=name, value=float(value), ts=now_ts(), labels=labels), counter=False)


def record_llm_call(model: str, prompt_tokens: int = 0, completion_tokens: int = 0, cost_usd: float = 0.0) -> None:
    """One LLM call's counters: call count, token totals, and spend."""
    incr("llm.calls", 1, model=model)
    incr("llm.tokens.input", prompt_tokens, model=model)
    incr("llm.tokens.output", completion_tokens, model=model)
    incr("llm.cost_usd", cost_usd, model=model)


def record_agent_turn(agent: str, seconds: float) -> None:
    """Latency of one agent's turn."""
    observe("agent.turn.seconds", seconds, agent=agent)


def record_governor_trip(reason: str) -> None:
    """A governor budget bound tripping, labelled by which one."""
    incr("governor.trips", 1, reason=reason)