File size: 4,932 Bytes
1ea0743
 
9918f43
1ea0743
9918f43
 
 
1ea0743
9918f43
 
 
1ea0743
9918f43
e28d52e
9918f43
1ea0743
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
"""Observability: LangSmith passthrough + lightweight in-process metrics.

Three pieces:

1. **LangSmith tracing** — opt-in via the standard ``LANGCHAIN_TRACING_V2``
   environment variable. LangGraph picks it up automatically; this module
   only surfaces a one-line confirmation at startup.

2. **MetricsCollector** — wraps :class:`utils.ParseMetrics` and adds
   per-agent latency, call count, and fallback-rate counters. The Gradio
   app renders these as a live system-health panel.

3. **Span context manager** — ad-hoc timing inside agents, logged through
   the standard ``mealgraph`` logger so the timing line is filterable
   like everything else.
"""

from __future__ import annotations

import os
import time
from contextlib import contextmanager
from dataclasses import dataclass, field
from threading import Lock
from typing import Dict, Iterator, Optional

from logging_setup import get_logger
from utils import get_parse_metrics

_logger = get_logger("observability")


# ---------------------------------------------------------------------------
# LangSmith env passthrough
# ---------------------------------------------------------------------------
def init_langsmith(project: Optional[str] = None) -> bool:
    """If LangSmith env vars are set, log that tracing is on. Returns True."""
    if os.environ.get("LANGCHAIN_TRACING_V2", "").lower() not in {"true", "1", "yes"}:
        return False
    api_key = os.environ.get("LANGCHAIN_API_KEY", "")
    proj = project or os.environ.get("LANGCHAIN_PROJECT", "Nutrition-MAS")
    if not api_key:
        _logger.warning("LANGCHAIN_TRACING_V2 set but no LANGCHAIN_API_KEY; skipping.")
        return False
    os.environ["LANGCHAIN_PROJECT"] = proj
    _logger.info("📈 LangSmith tracing enabled (project=%s, key=…%s)", proj, api_key[-4:])
    return True


# ---------------------------------------------------------------------------
# In-process metrics
# ---------------------------------------------------------------------------
@dataclass
class AgentMetric:
    calls: int = 0
    total_seconds: float = 0.0
    last_seconds: float = 0.0
    errors: int = 0


@dataclass
class MetricsCollector:
    """Aggregate per-agent + per-tool counters. Process-singleton."""

    agents: Dict[str, AgentMetric] = field(default_factory=dict)
    tools: Dict[str, AgentMetric] = field(default_factory=dict)
    _lock: Lock = field(default_factory=Lock)

    def record_agent(self, name: str, seconds: float, *, error: bool = False) -> None:
        with self._lock:
            m = self.agents.setdefault(name, AgentMetric())
            m.calls += 1
            m.total_seconds += seconds
            m.last_seconds = seconds
            if error:
                m.errors += 1

    def record_tool(self, name: str, seconds: float, *, error: bool = False) -> None:
        with self._lock:
            m = self.tools.setdefault(name, AgentMetric())
            m.calls += 1
            m.total_seconds += seconds
            m.last_seconds = seconds
            if error:
                m.errors += 1

    def snapshot(self) -> Dict[str, Dict[str, dict]]:
        """Return a JSON-serialisable snapshot including parse metrics."""
        pm = get_parse_metrics()
        with self._lock:
            return {
                "agents": {k: vars(v).copy() for k, v in self.agents.items() if k != "_lock"},
                "tools": {k: vars(v).copy() for k, v in self.tools.items() if k != "_lock"},
                "parsing": {
                    "native": pm.native_parses,
                    "fallback": pm.fallback_parses,
                    "failure": pm.schema_failures,
                    "by_model": dict(pm.by_model),
                },
            }

    def reset(self) -> None:
        with self._lock:
            self.agents.clear()
            self.tools.clear()


_collector = MetricsCollector()


def get_metrics() -> MetricsCollector:
    return _collector


# ---------------------------------------------------------------------------
# Timing span
# ---------------------------------------------------------------------------
@contextmanager
def span(label: str, *, kind: str = "agent") -> Iterator[None]:
    """Time a block; record into MetricsCollector and emit a debug log line.

    ``kind`` is one of 'agent' | 'tool' | 'misc' (misc only logs, no metric).
    """
    start = time.perf_counter()
    error = False
    try:
        yield
    except Exception:
        error = True
        raise
    finally:
        dur = time.perf_counter() - start
        if kind == "agent":
            _collector.record_agent(label, dur, error=error)
        elif kind == "tool":
            _collector.record_tool(label, dur, error=error)
        _logger.debug("⏱️  %s span %r took %.3fs (err=%s)", kind, label, dur, error)


__all__ = [
    "AgentMetric",
    "MetricsCollector",
    "get_metrics",
    "init_langsmith",
    "span",
]