File size: 2,835 Bytes
1e732dd
 
 
 
 
 
 
 
 
 
 
 
696f787
1e732dd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
696f787
 
1e732dd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
"""
MediGuard AI — Langfuse Observability Tracer

Wraps Langfuse v3 SDK for end-to-end tracing of the RAG pipeline.
Silently no-ops when Langfuse is disabled or unreachable.
"""

from __future__ import annotations

import logging
from contextlib import contextmanager
from functools import lru_cache
from typing import Any

from src.settings import get_settings

logger = logging.getLogger(__name__)

try:
    from langfuse import Langfuse as _Langfuse
except ImportError:
    _Langfuse = None  # type: ignore[assignment,misc]


class LangfuseTracer:
    """Thin wrapper around Langfuse for MediGuard pipeline tracing."""

    def __init__(self, client: Any | None):
        self._client = client
        self._enabled = client is not None

    @property
    def enabled(self) -> bool:
        return self._enabled

    def trace(self, name: str, **kwargs: Any):
        """Create a new trace (top-level span)."""
        if not self._enabled:
            return _NullSpan()
        return self._client.trace(name=name, **kwargs)

    @contextmanager
    def span(self, trace, name: str, **kwargs):
        """Context manager for creating a span within a trace."""
        if not self._enabled or trace is None:
            yield _NullSpan()
            return
        s = trace.span(name=name, **kwargs)
        try:
            yield s
        finally:
            s.end()

    def score(self, trace_id: str, name: str, value: float, comment: str = ""):
        """Attach a score to a trace (for evaluation feedback)."""
        if not self._enabled:
            return
        try:
            self._client.score(trace_id=trace_id, name=name, value=value, comment=comment)
        except Exception as exc:
            logger.warning("Langfuse score failed: %s", exc)

    def flush(self):
        if self._enabled:
            try:
                self._client.flush()
            except Exception as exc:
                logger.debug("Langfuse flush failed: %s", exc)


class _NullSpan:
    """Dummy span object that silently swallows calls."""

    def __getattr__(self, name: str):
        return lambda *a, **kw: _NullSpan()

    def end(self):
        pass


@lru_cache(maxsize=1)
def make_langfuse_tracer() -> LangfuseTracer:
    settings = get_settings()
    if not settings.langfuse.enabled or _Langfuse is None:
        logger.info("Langfuse tracing disabled")
        return LangfuseTracer(None)
    try:
        client = _Langfuse(
            public_key=settings.langfuse.public_key,
            secret_key=settings.langfuse.secret_key,
            host=settings.langfuse.host,
        )
        logger.info("Langfuse connected (%s)", settings.langfuse.host)
        return LangfuseTracer(client)
    except Exception as exc:
        logger.warning("Langfuse unavailable: %s", exc)
        return LangfuseTracer(None)