Spaces:
Sleeping
Sleeping
File size: 4,932 Bytes
1ea0743 9918f43 1ea0743 9918f43 1ea0743 9918f43 1ea0743 9918f43 e28d52e 9918f43 1ea0743 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 | """Observability: LangSmith passthrough + lightweight in-process metrics.
Three pieces:
1. **LangSmith tracing** — opt-in via the standard ``LANGCHAIN_TRACING_V2``
environment variable. LangGraph picks it up automatically; this module
only surfaces a one-line confirmation at startup.
2. **MetricsCollector** — wraps :class:`utils.ParseMetrics` and adds
per-agent latency, call count, and fallback-rate counters. The Gradio
app renders these as a live system-health panel.
3. **Span context manager** — ad-hoc timing inside agents, logged through
the standard ``mealgraph`` logger so the timing line is filterable
like everything else.
"""
from __future__ import annotations
import os
import time
from contextlib import contextmanager
from dataclasses import dataclass, field
from threading import Lock
from typing import Dict, Iterator, Optional
from logging_setup import get_logger
from utils import get_parse_metrics
_logger = get_logger("observability")
# ---------------------------------------------------------------------------
# LangSmith env passthrough
# ---------------------------------------------------------------------------
def init_langsmith(project: Optional[str] = None) -> bool:
"""If LangSmith env vars are set, log that tracing is on. Returns True."""
if os.environ.get("LANGCHAIN_TRACING_V2", "").lower() not in {"true", "1", "yes"}:
return False
api_key = os.environ.get("LANGCHAIN_API_KEY", "")
proj = project or os.environ.get("LANGCHAIN_PROJECT", "Nutrition-MAS")
if not api_key:
_logger.warning("LANGCHAIN_TRACING_V2 set but no LANGCHAIN_API_KEY; skipping.")
return False
os.environ["LANGCHAIN_PROJECT"] = proj
_logger.info("📈 LangSmith tracing enabled (project=%s, key=…%s)", proj, api_key[-4:])
return True
# ---------------------------------------------------------------------------
# In-process metrics
# ---------------------------------------------------------------------------
@dataclass
class AgentMetric:
calls: int = 0
total_seconds: float = 0.0
last_seconds: float = 0.0
errors: int = 0
@dataclass
class MetricsCollector:
"""Aggregate per-agent + per-tool counters. Process-singleton."""
agents: Dict[str, AgentMetric] = field(default_factory=dict)
tools: Dict[str, AgentMetric] = field(default_factory=dict)
_lock: Lock = field(default_factory=Lock)
def record_agent(self, name: str, seconds: float, *, error: bool = False) -> None:
with self._lock:
m = self.agents.setdefault(name, AgentMetric())
m.calls += 1
m.total_seconds += seconds
m.last_seconds = seconds
if error:
m.errors += 1
def record_tool(self, name: str, seconds: float, *, error: bool = False) -> None:
with self._lock:
m = self.tools.setdefault(name, AgentMetric())
m.calls += 1
m.total_seconds += seconds
m.last_seconds = seconds
if error:
m.errors += 1
def snapshot(self) -> Dict[str, Dict[str, dict]]:
"""Return a JSON-serialisable snapshot including parse metrics."""
pm = get_parse_metrics()
with self._lock:
return {
"agents": {k: vars(v).copy() for k, v in self.agents.items() if k != "_lock"},
"tools": {k: vars(v).copy() for k, v in self.tools.items() if k != "_lock"},
"parsing": {
"native": pm.native_parses,
"fallback": pm.fallback_parses,
"failure": pm.schema_failures,
"by_model": dict(pm.by_model),
},
}
def reset(self) -> None:
with self._lock:
self.agents.clear()
self.tools.clear()
_collector = MetricsCollector()
def get_metrics() -> MetricsCollector:
return _collector
# ---------------------------------------------------------------------------
# Timing span
# ---------------------------------------------------------------------------
@contextmanager
def span(label: str, *, kind: str = "agent") -> Iterator[None]:
"""Time a block; record into MetricsCollector and emit a debug log line.
``kind`` is one of 'agent' | 'tool' | 'misc' (misc only logs, no metric).
"""
start = time.perf_counter()
error = False
try:
yield
except Exception:
error = True
raise
finally:
dur = time.perf_counter() - start
if kind == "agent":
_collector.record_agent(label, dur, error=error)
elif kind == "tool":
_collector.record_tool(label, dur, error=error)
_logger.debug("⏱️ %s span %r took %.3fs (err=%s)", kind, label, dur, error)
__all__ = [
"AgentMetric",
"MetricsCollector",
"get_metrics",
"init_langsmith",
"span",
]
|