| """OpenTelemetry tracing wired to the in-memory store (and optional console). |
| |
| Builds a real :class:`~opentelemetry.sdk.trace.TracerProvider` so spans nest |
| automatically via OTEL context — the conductor opens ``run``/``turn``, the agent |
| opens ``agent.turn``, the provider opens ``llm.call``, and the parent/child links |
| fall out without anyone passing ids around. |
| |
| Finished spans are captured by :class:`_StoreSpanProcessor` into the |
| :class:`~src.observability.store.TelemetryStore` (the Telemetry tab's trace view). |
| When ``tracing`` includes ``console`` a standard ``ConsoleSpanExporter`` also |
| prints them. OTEL is imported lazily inside :func:`init_tracing` so importing the |
| package stays cheap. |
| """ |
|
|
| from __future__ import annotations |
|
|
| from typing import TYPE_CHECKING |
|
|
| from .config import ObservabilitySettings |
| from .store import SpanRecord, TelemetryStore |
|
|
| if TYPE_CHECKING: |
| from opentelemetry.trace import Tracer |
|
|
| _SERVICE_NAME = "multi-agent-land" |
|
|
|
|
| def _make_store_processor(store: TelemetryStore, text_limit: int): |
| from opentelemetry.sdk.trace import ReadableSpan |
| from opentelemetry.sdk.trace.export import SpanProcessor |
|
|
| def _coerce(value: object) -> object: |
| if isinstance(value, str) and len(value) > text_limit: |
| return value[:text_limit] + "…" |
| return value |
|
|
| class _StoreSpanProcessor(SpanProcessor): |
| def on_start(self, span, parent_context=None) -> None: |
| return None |
|
|
| def on_end(self, span: "ReadableSpan") -> None: |
| ctx = span.get_span_context() |
| parent = span.parent |
| start = span.start_time or 0 |
| end = span.end_time or start |
| store.add_span( |
| SpanRecord( |
| name=span.name, |
| trace_id=format(ctx.trace_id, "032x"), |
| span_id=format(ctx.span_id, "016x"), |
| parent_id=format(parent.span_id, "016x") if parent else None, |
| start_ms=start / 1e6, |
| end_ms=end / 1e6, |
| duration_ms=(end - start) / 1e6, |
| status=span.status.status_code.name if span.status else "UNSET", |
| attributes={k: _coerce(v) for k, v in dict(span.attributes or {}).items()}, |
| ) |
| ) |
|
|
| def shutdown(self) -> None: |
| return None |
|
|
| def force_flush(self, timeout_millis: int = 30000) -> bool: |
| return True |
|
|
| return _StoreSpanProcessor() |
|
|
|
|
| def init_tracing(settings: ObservabilitySettings, store: TelemetryStore) -> "Tracer | None": |
| """Set up the global tracer provider per ``settings.tracing``; return a tracer. |
| |
| ``off`` leaves the API's default no-op provider in place (zero overhead). |
| """ |
| if settings.tracing == "off": |
| from opentelemetry import trace |
|
|
| return trace.get_tracer(_SERVICE_NAME) |
|
|
| from opentelemetry import trace |
| from opentelemetry.sdk.resources import Resource |
| from opentelemetry.sdk.trace import TracerProvider |
| from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor |
|
|
| |
| |
| existing = trace.get_tracer_provider() |
| if isinstance(existing, TracerProvider): |
| provider = existing |
| else: |
| provider = TracerProvider(resource=Resource.create({"service.name": _SERVICE_NAME})) |
| trace.set_tracer_provider(provider) |
|
|
| if settings.tracing in ("memory", "both"): |
| provider.add_span_processor(_make_store_processor(store, settings.store_text_limit)) |
| if settings.tracing in ("console", "both"): |
| provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter())) |
|
|
| return trace.get_tracer(_SERVICE_NAME) |
|
|