mealgraph / observability.py
moazeldegwy's picture
Rename module to mealgraph; add CC BY-NC 4.0 LICENSE; slim Space metadata
e28d52e
"""Observability: LangSmith passthrough + lightweight in-process metrics.
Three pieces:
1. **LangSmith tracing** — opt-in via the standard ``LANGCHAIN_TRACING_V2``
environment variable. LangGraph picks it up automatically; this module
only surfaces a one-line confirmation at startup.
2. **MetricsCollector** — wraps :class:`utils.ParseMetrics` and adds
per-agent latency, call count, and fallback-rate counters. The Gradio
app renders these as a live system-health panel.
3. **Span context manager** — ad-hoc timing inside agents, logged through
the standard ``mealgraph`` logger so the timing line is filterable
like everything else.
"""
from __future__ import annotations
import os
import time
from contextlib import contextmanager
from dataclasses import dataclass, field
from threading import Lock
from typing import Dict, Iterator, Optional
from logging_setup import get_logger
from utils import get_parse_metrics
_logger = get_logger("observability")
# ---------------------------------------------------------------------------
# LangSmith env passthrough
# ---------------------------------------------------------------------------
def init_langsmith(project: Optional[str] = None) -> bool:
"""If LangSmith env vars are set, log that tracing is on. Returns True."""
if os.environ.get("LANGCHAIN_TRACING_V2", "").lower() not in {"true", "1", "yes"}:
return False
api_key = os.environ.get("LANGCHAIN_API_KEY", "")
proj = project or os.environ.get("LANGCHAIN_PROJECT", "Nutrition-MAS")
if not api_key:
_logger.warning("LANGCHAIN_TRACING_V2 set but no LANGCHAIN_API_KEY; skipping.")
return False
os.environ["LANGCHAIN_PROJECT"] = proj
_logger.info("📈 LangSmith tracing enabled (project=%s, key=…%s)", proj, api_key[-4:])
return True
# ---------------------------------------------------------------------------
# In-process metrics
# ---------------------------------------------------------------------------
@dataclass
class AgentMetric:
calls: int = 0
total_seconds: float = 0.0
last_seconds: float = 0.0
errors: int = 0
@dataclass
class MetricsCollector:
"""Aggregate per-agent + per-tool counters. Process-singleton."""
agents: Dict[str, AgentMetric] = field(default_factory=dict)
tools: Dict[str, AgentMetric] = field(default_factory=dict)
_lock: Lock = field(default_factory=Lock)
def record_agent(self, name: str, seconds: float, *, error: bool = False) -> None:
with self._lock:
m = self.agents.setdefault(name, AgentMetric())
m.calls += 1
m.total_seconds += seconds
m.last_seconds = seconds
if error:
m.errors += 1
def record_tool(self, name: str, seconds: float, *, error: bool = False) -> None:
with self._lock:
m = self.tools.setdefault(name, AgentMetric())
m.calls += 1
m.total_seconds += seconds
m.last_seconds = seconds
if error:
m.errors += 1
def snapshot(self) -> Dict[str, Dict[str, dict]]:
"""Return a JSON-serialisable snapshot including parse metrics."""
pm = get_parse_metrics()
with self._lock:
return {
"agents": {k: vars(v).copy() for k, v in self.agents.items() if k != "_lock"},
"tools": {k: vars(v).copy() for k, v in self.tools.items() if k != "_lock"},
"parsing": {
"native": pm.native_parses,
"fallback": pm.fallback_parses,
"failure": pm.schema_failures,
"by_model": dict(pm.by_model),
},
}
def reset(self) -> None:
with self._lock:
self.agents.clear()
self.tools.clear()
_collector = MetricsCollector()
def get_metrics() -> MetricsCollector:
return _collector
# ---------------------------------------------------------------------------
# Timing span
# ---------------------------------------------------------------------------
@contextmanager
def span(label: str, *, kind: str = "agent") -> Iterator[None]:
"""Time a block; record into MetricsCollector and emit a debug log line.
``kind`` is one of 'agent' | 'tool' | 'misc' (misc only logs, no metric).
"""
start = time.perf_counter()
error = False
try:
yield
except Exception:
error = True
raise
finally:
dur = time.perf_counter() - start
if kind == "agent":
_collector.record_agent(label, dur, error=error)
elif kind == "tool":
_collector.record_tool(label, dur, error=error)
_logger.debug("⏱️ %s span %r took %.3fs (err=%s)", kind, label, dur, error)
__all__ = [
"AgentMetric",
"MetricsCollector",
"get_metrics",
"init_langsmith",
"span",
]