doc-ingestion / src /core /observability.py
vampokala's picture
Unit test errors resolved
ae86c40
"""
Observability layer for RAG pipeline instrumentation.
Provides decorators and context managers for LangFuse tracing.
"""
import logging
import os
import time
from contextlib import contextmanager
from typing import Any, Dict, Optional
try:
from langfuse import Langfuse as _Langfuse
except ImportError:
_Langfuse = None
logger = logging.getLogger(__name__)
class RAGObserver:
"""
Centralized observer for RAG pipeline.
Manages LangFuse client and provides tracing context managers.
Usage pattern — instrument inside RAGOrchestrator.run(), not in main.py:
with observer.trace_request("rag_query", query=query_text) as trace:
with observer.trace_step(trace, "retrieval") as span:
result = hybrid_retriever.retrieve(...)
span["output"] = {"chunks": len(result)}
"""
def __init__(
self,
enabled: bool = True,
public_key: Optional[str] = None,
secret_key: Optional[str] = None,
):
"""
Args:
enabled: If False, all tracing is no-op (demo mode, tests)
public_key: LangFuse public key (defaults to LANGFUSE_PUBLIC_KEY env var)
secret_key: LangFuse secret key (defaults to LANGFUSE_SECRET_KEY env var)
"""
self.enabled = enabled
self.client: Any | None = None
if self.enabled:
if _Langfuse is None:
logger.warning("langfuse package not installed; observability disabled")
self.enabled = False
return
try:
resolved_public_key = public_key or os.getenv("LANGFUSE_PUBLIC_KEY")
resolved_secret_key = secret_key or os.getenv("LANGFUSE_SECRET_KEY")
if resolved_public_key and resolved_secret_key:
self.client = _Langfuse(
public_key=resolved_public_key,
secret_key=resolved_secret_key,
host=os.getenv("LANGFUSE_HOST", "https://cloud.langfuse.com"),
)
logger.info("LangFuse observability enabled")
else:
self.enabled = False
logger.warning("LangFuse keys not found; observability disabled")
except Exception as e:
logger.error(f"Failed to initialize LangFuse: {e}; observability disabled")
self.enabled = False
@contextmanager
def trace_request(
self,
name: str,
query: str = "",
metadata: Optional[Dict[str, Any]] = None,
):
"""
Context manager for a top-level request trace.
One trace per query request — child spans live inside this.
IMPORTANT: This is the top-level trace object. Use trace.span() for
individual pipeline steps. Never call client.trace() per step — that
creates disconnected traces in the LangFuse UI.
Usage:
with observer.trace_request("rag_query", query=query_text) as trace:
with observer.trace_step(trace, "retrieval") as span:
chunks = retriever.retrieve(query)
span["chunks_retrieved"] = len(chunks)
"""
if not self.enabled or not self.client:
yield None
return
trace = self.client.trace(
name=name,
input={"query": query},
metadata=metadata or {},
)
start = time.time()
try:
yield trace
except Exception as e:
trace.update(
output={"error": str(e)},
metadata={**(metadata or {}), "total_ms": (time.time() - start) * 1000},
)
raise
finally:
trace.update(
metadata={**(metadata or {}), "total_ms": round((time.time() - start) * 1000, 2)},
)
@contextmanager
def trace_step(
self,
trace,
step_name: str,
input_data: Optional[Dict[str, Any]] = None,
):
"""
Context manager for a child span within a request trace.
Attach to the trace returned by trace_request().
Args:
trace: The top-level trace object from trace_request()
step_name: Name of the pipeline step (e.g. "retrieval", "generation")
input_data: Optional input metadata for this step
"""
output: Dict[str, Any] = {}
start = time.time()
if not self.enabled or trace is None:
try:
yield output
finally:
output["latency_ms"] = round((time.time() - start) * 1000, 2)
return
span = trace.span(name=step_name, input=input_data or {})
try:
yield output
except Exception as e:
span.end(
output={"error": str(e)},
metadata={"latency_ms": round((time.time() - start) * 1000, 2)},
)
raise
finally:
output["latency_ms"] = round((time.time() - start) * 1000, 2)
span.end(output=output)
def flush_async(self) -> None:
"""
Flush pending traces to LangFuse in a background thread.
Call this after the HTTP response is sent — never block the hot path.
In FastAPI, use a BackgroundTask:
from fastapi import BackgroundTasks
background_tasks.add_task(observer.flush_async)
"""
if not self.client:
return
import threading
threading.Thread(target=self.client.flush, daemon=True).start()
def flush(self) -> None:
"""Synchronous flush — only use in shutdown/test contexts, not request handlers."""
if self.client:
self.client.flush()
# Global observer instance
_observer_instance: Optional[RAGObserver] = None
def get_observer() -> RAGObserver:
"""Singleton getter for RAGObserver."""
global _observer_instance
if _observer_instance is None:
enabled = os.getenv("DOC_PROFILE") != "demo"
_observer_instance = RAGObserver(enabled=enabled)
return _observer_instance
def init_observer(enabled: bool = True) -> RAGObserver:
"""Initialize the observer (useful for testing)."""
global _observer_instance
_observer_instance = RAGObserver(enabled=enabled)
return _observer_instance