Spaces:
Running
Running
| """ | |
| Observability layer for RAG pipeline instrumentation. | |
| Provides decorators and context managers for LangFuse tracing. | |
| """ | |
| import logging | |
| import os | |
| import time | |
| from contextlib import contextmanager | |
| from typing import Any, Dict, Optional | |
| try: | |
| from langfuse import Langfuse as _Langfuse | |
| except ImportError: | |
| _Langfuse = None | |
| logger = logging.getLogger(__name__) | |
| class RAGObserver: | |
| """ | |
| Centralized observer for RAG pipeline. | |
| Manages LangFuse client and provides tracing context managers. | |
| Usage pattern — instrument inside RAGOrchestrator.run(), not in main.py: | |
| with observer.trace_request("rag_query", query=query_text) as trace: | |
| with observer.trace_step(trace, "retrieval") as span: | |
| result = hybrid_retriever.retrieve(...) | |
| span["output"] = {"chunks": len(result)} | |
| """ | |
| def __init__( | |
| self, | |
| enabled: bool = True, | |
| public_key: Optional[str] = None, | |
| secret_key: Optional[str] = None, | |
| ): | |
| """ | |
| Args: | |
| enabled: If False, all tracing is no-op (demo mode, tests) | |
| public_key: LangFuse public key (defaults to LANGFUSE_PUBLIC_KEY env var) | |
| secret_key: LangFuse secret key (defaults to LANGFUSE_SECRET_KEY env var) | |
| """ | |
| self.enabled = enabled | |
| self.client: Any | None = None | |
| if self.enabled: | |
| if _Langfuse is None: | |
| logger.warning("langfuse package not installed; observability disabled") | |
| self.enabled = False | |
| return | |
| try: | |
| resolved_public_key = public_key or os.getenv("LANGFUSE_PUBLIC_KEY") | |
| resolved_secret_key = secret_key or os.getenv("LANGFUSE_SECRET_KEY") | |
| if resolved_public_key and resolved_secret_key: | |
| self.client = _Langfuse( | |
| public_key=resolved_public_key, | |
| secret_key=resolved_secret_key, | |
| host=os.getenv("LANGFUSE_HOST", "https://cloud.langfuse.com"), | |
| ) | |
| logger.info("LangFuse observability enabled") | |
| else: | |
| self.enabled = False | |
| logger.warning("LangFuse keys not found; observability disabled") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize LangFuse: {e}; observability disabled") | |
| self.enabled = False | |
| def trace_request( | |
| self, | |
| name: str, | |
| query: str = "", | |
| metadata: Optional[Dict[str, Any]] = None, | |
| ): | |
| """ | |
| Context manager for a top-level request trace. | |
| One trace per query request — child spans live inside this. | |
| IMPORTANT: This is the top-level trace object. Use trace.span() for | |
| individual pipeline steps. Never call client.trace() per step — that | |
| creates disconnected traces in the LangFuse UI. | |
| Usage: | |
| with observer.trace_request("rag_query", query=query_text) as trace: | |
| with observer.trace_step(trace, "retrieval") as span: | |
| chunks = retriever.retrieve(query) | |
| span["chunks_retrieved"] = len(chunks) | |
| """ | |
| if not self.enabled or not self.client: | |
| yield None | |
| return | |
| trace = self.client.trace( | |
| name=name, | |
| input={"query": query}, | |
| metadata=metadata or {}, | |
| ) | |
| start = time.time() | |
| try: | |
| yield trace | |
| except Exception as e: | |
| trace.update( | |
| output={"error": str(e)}, | |
| metadata={**(metadata or {}), "total_ms": (time.time() - start) * 1000}, | |
| ) | |
| raise | |
| finally: | |
| trace.update( | |
| metadata={**(metadata or {}), "total_ms": round((time.time() - start) * 1000, 2)}, | |
| ) | |
| def trace_step( | |
| self, | |
| trace, | |
| step_name: str, | |
| input_data: Optional[Dict[str, Any]] = None, | |
| ): | |
| """ | |
| Context manager for a child span within a request trace. | |
| Attach to the trace returned by trace_request(). | |
| Args: | |
| trace: The top-level trace object from trace_request() | |
| step_name: Name of the pipeline step (e.g. "retrieval", "generation") | |
| input_data: Optional input metadata for this step | |
| """ | |
| output: Dict[str, Any] = {} | |
| start = time.time() | |
| if not self.enabled or trace is None: | |
| try: | |
| yield output | |
| finally: | |
| output["latency_ms"] = round((time.time() - start) * 1000, 2) | |
| return | |
| span = trace.span(name=step_name, input=input_data or {}) | |
| try: | |
| yield output | |
| except Exception as e: | |
| span.end( | |
| output={"error": str(e)}, | |
| metadata={"latency_ms": round((time.time() - start) * 1000, 2)}, | |
| ) | |
| raise | |
| finally: | |
| output["latency_ms"] = round((time.time() - start) * 1000, 2) | |
| span.end(output=output) | |
| def flush_async(self) -> None: | |
| """ | |
| Flush pending traces to LangFuse in a background thread. | |
| Call this after the HTTP response is sent — never block the hot path. | |
| In FastAPI, use a BackgroundTask: | |
| from fastapi import BackgroundTasks | |
| background_tasks.add_task(observer.flush_async) | |
| """ | |
| if not self.client: | |
| return | |
| import threading | |
| threading.Thread(target=self.client.flush, daemon=True).start() | |
| def flush(self) -> None: | |
| """Synchronous flush — only use in shutdown/test contexts, not request handlers.""" | |
| if self.client: | |
| self.client.flush() | |
| # Global observer instance | |
| _observer_instance: Optional[RAGObserver] = None | |
| def get_observer() -> RAGObserver: | |
| """Singleton getter for RAGObserver.""" | |
| global _observer_instance | |
| if _observer_instance is None: | |
| enabled = os.getenv("DOC_PROFILE") != "demo" | |
| _observer_instance = RAGObserver(enabled=enabled) | |
| return _observer_instance | |
| def init_observer(enabled: bool = True) -> RAGObserver: | |
| """Initialize the observer (useful for testing).""" | |
| global _observer_instance | |
| _observer_instance = RAGObserver(enabled=enabled) | |
| return _observer_instance | |