Spaces:
Sleeping
Sleeping
| """ | |
| MediGuard AI — Langfuse Observability Tracer | |
| Wraps Langfuse v3 SDK for end-to-end tracing of the RAG pipeline. | |
| Silently no-ops when Langfuse is disabled or unreachable. | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| from contextlib import contextmanager | |
| from functools import lru_cache | |
| from typing import Any | |
| from src.settings import get_settings | |
| logger = logging.getLogger(__name__) | |
| try: | |
| from langfuse import Langfuse as _Langfuse | |
| except ImportError: | |
| _Langfuse = None # type: ignore[assignment,misc] | |
| class LangfuseTracer: | |
| """Thin wrapper around Langfuse for MediGuard pipeline tracing.""" | |
| def __init__(self, client: Any | None): | |
| self._client = client | |
| self._enabled = client is not None | |
| def enabled(self) -> bool: | |
| return self._enabled | |
| def trace(self, name: str, **kwargs: Any): | |
| """Create a new trace (top-level span).""" | |
| if not self._enabled: | |
| return _NullSpan() | |
| return self._client.trace(name=name, **kwargs) | |
| def span(self, trace, name: str, **kwargs): | |
| """Context manager for creating a span within a trace.""" | |
| if not self._enabled or trace is None: | |
| yield _NullSpan() | |
| return | |
| s = trace.span(name=name, **kwargs) | |
| try: | |
| yield s | |
| finally: | |
| s.end() | |
| def score(self, trace_id: str, name: str, value: float, comment: str = ""): | |
| """Attach a score to a trace (for evaluation feedback).""" | |
| if not self._enabled: | |
| return | |
| try: | |
| self._client.score(trace_id=trace_id, name=name, value=value, comment=comment) | |
| except Exception as exc: | |
| logger.warning("Langfuse score failed: %s", exc) | |
| def flush(self): | |
| if self._enabled: | |
| try: | |
| self._client.flush() | |
| except Exception as exc: | |
| logger.debug("Langfuse flush failed: %s", exc) | |
| class _NullSpan: | |
| """Dummy span object that silently swallows calls.""" | |
| def __getattr__(self, name: str): | |
| return lambda *a, **kw: _NullSpan() | |
| def end(self): | |
| pass | |
| def make_langfuse_tracer() -> LangfuseTracer: | |
| settings = get_settings() | |
| if not settings.langfuse.enabled or _Langfuse is None: | |
| logger.info("Langfuse tracing disabled") | |
| return LangfuseTracer(None) | |
| try: | |
| client = _Langfuse( | |
| public_key=settings.langfuse.public_key, | |
| secret_key=settings.langfuse.secret_key, | |
| host=settings.langfuse.host, | |
| ) | |
| logger.info("Langfuse connected (%s)", settings.langfuse.host) | |
| return LangfuseTracer(client) | |
| except Exception as exc: | |
| logger.warning("Langfuse unavailable: %s", exc) | |
| return LangfuseTracer(None) | |