|
|
""" |
|
|
Observability module for Langfuse v3 integration with OpenTelemetry support. |
|
|
|
|
|
This module provides: |
|
|
- Single global CallbackHandler for LangChain integration |
|
|
- Root span management for user requests |
|
|
- Session and user tracking |
|
|
- Background flushing for async operations |
|
|
""" |
|
|
|
|
|
import os |
|
|
import base64 |
|
|
from typing import Optional, Dict, Any |
|
|
from contextlib import contextmanager |
|
|
from dotenv import load_dotenv |
|
|
|
|
|
|
|
|
from langfuse import get_client |
|
|
from langfuse.langchain import CallbackHandler |
|
|
|
|
|
|
|
|
from opentelemetry import trace |
|
|
from opentelemetry.sdk.trace import TracerProvider |
|
|
from opentelemetry.sdk.trace.export import SimpleSpanProcessor |
|
|
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter |
|
|
|
|
|
|
|
|
load_dotenv("env.local") |
|
|
|
|
|
|
|
|
_langfuse_handler: Optional[CallbackHandler] = None |
|
|
_tracer_provider: Optional[TracerProvider] = None |
|
|
|
|
|
def initialize_observability() -> bool: |
|
|
""" |
|
|
Initialize Langfuse observability with OTEL integration. |
|
|
|
|
|
Returns: |
|
|
bool: True if initialization successful, False otherwise |
|
|
""" |
|
|
global _langfuse_handler, _tracer_provider |
|
|
|
|
|
try: |
|
|
|
|
|
required_vars = ["LANGFUSE_PUBLIC_KEY", "LANGFUSE_SECRET_KEY", "LANGFUSE_HOST"] |
|
|
missing_vars = [var for var in required_vars if not os.getenv(var)] |
|
|
|
|
|
if missing_vars: |
|
|
print(f"Warning: Missing required environment variables: {missing_vars}") |
|
|
return False |
|
|
|
|
|
|
|
|
langfuse_auth = base64.b64encode( |
|
|
f"{os.getenv('LANGFUSE_PUBLIC_KEY')}:{os.getenv('LANGFUSE_SECRET_KEY')}".encode() |
|
|
).decode() |
|
|
|
|
|
|
|
|
os.environ['OTEL_EXPORTER_OTLP_ENDPOINT'] = f"{os.getenv('LANGFUSE_HOST')}/api/public/otel" |
|
|
os.environ['OTEL_EXPORTER_OTLP_HEADERS'] = f"Authorization=Basic {langfuse_auth}" |
|
|
|
|
|
|
|
|
_tracer_provider = TracerProvider() |
|
|
_tracer_provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter())) |
|
|
trace.set_tracer_provider(_tracer_provider) |
|
|
|
|
|
|
|
|
_langfuse_handler = CallbackHandler() |
|
|
|
|
|
print("✅ Langfuse observability initialized successfully") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ Failed to initialize observability: {e}") |
|
|
return False |
|
|
|
|
|
def get_callback_handler() -> Optional[CallbackHandler]: |
|
|
""" |
|
|
Get the global Langfuse callback handler. |
|
|
|
|
|
Returns: |
|
|
CallbackHandler or None if not initialized |
|
|
""" |
|
|
global _langfuse_handler |
|
|
|
|
|
if _langfuse_handler is None: |
|
|
if initialize_observability(): |
|
|
return _langfuse_handler |
|
|
return None |
|
|
|
|
|
return _langfuse_handler |
|
|
|
|
|
@contextmanager |
|
|
def start_root_span( |
|
|
name: str, |
|
|
user_id: str, |
|
|
session_id: str, |
|
|
metadata: Optional[Dict[str, Any]] = None |
|
|
): |
|
|
""" |
|
|
Context manager for creating root spans with user and session tracking. |
|
|
|
|
|
Args: |
|
|
name: Span name (e.g., "user-request") |
|
|
user_id: User identifier for session tracking |
|
|
session_id: Session identifier for conversation continuity |
|
|
metadata: Optional additional metadata |
|
|
|
|
|
Yields: |
|
|
Langfuse span context or None if creation fails |
|
|
""" |
|
|
span = None |
|
|
try: |
|
|
|
|
|
client = get_client() |
|
|
span = client.start_as_current_span(name=name) |
|
|
span_context = span.__enter__() |
|
|
|
|
|
|
|
|
span_context.update_trace( |
|
|
user_id=user_id, |
|
|
session_id=session_id, |
|
|
tags=[ |
|
|
os.getenv("ENV", "dev"), |
|
|
"multi-agent-system" |
|
|
] |
|
|
) |
|
|
|
|
|
|
|
|
if metadata: |
|
|
span_context.update_trace(metadata=metadata) |
|
|
|
|
|
yield span_context |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Warning: Failed to create root span: {e}") |
|
|
|
|
|
yield None |
|
|
finally: |
|
|
|
|
|
if span is not None: |
|
|
try: |
|
|
span.__exit__(None, None, None) |
|
|
except Exception as e: |
|
|
print(f"Warning: Error closing span: {e}") |
|
|
|
|
|
def flush_traces(background: bool = True) -> None: |
|
|
""" |
|
|
Flush pending traces to Langfuse. |
|
|
|
|
|
Args: |
|
|
background: Whether to flush in background (non-blocking) |
|
|
""" |
|
|
try: |
|
|
client = get_client() |
|
|
client.flush() |
|
|
except Exception as e: |
|
|
print(f"Warning: Failed to flush traces: {e}") |
|
|
|
|
|
def shutdown_observability() -> None: |
|
|
""" |
|
|
Clean shutdown of observability components. |
|
|
""" |
|
|
global _tracer_provider |
|
|
|
|
|
try: |
|
|
|
|
|
flush_traces(background=False) |
|
|
|
|
|
|
|
|
if _tracer_provider: |
|
|
_tracer_provider.shutdown() |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Warning: Error during observability shutdown: {e}") |
|
|
|
|
|
|
|
|
@contextmanager |
|
|
def agent_span(agent_name: str, metadata: Optional[Dict[str, Any]] = None): |
|
|
""" |
|
|
Context manager for agent-level spans. |
|
|
|
|
|
Args: |
|
|
agent_name: Name of the agent (e.g., "lead", "research", "code") |
|
|
metadata: Optional metadata for the span |
|
|
""" |
|
|
span_name = f"agent/{agent_name}" |
|
|
span = None |
|
|
|
|
|
try: |
|
|
client = get_client() |
|
|
span = client.start_as_current_span(name=span_name) |
|
|
span_context = span.__enter__() |
|
|
|
|
|
if metadata: |
|
|
span_context.update_trace(metadata=metadata) |
|
|
yield span_context |
|
|
except Exception as e: |
|
|
print(f"Warning: Failed to create agent span for {agent_name}: {e}") |
|
|
yield None |
|
|
finally: |
|
|
if span is not None: |
|
|
try: |
|
|
span.__exit__(None, None, None) |
|
|
except Exception as e: |
|
|
print(f"Warning: Error closing agent span: {e}") |
|
|
|
|
|
@contextmanager |
|
|
def tool_span(tool_name: str, metadata: Optional[Dict[str, Any]] = None): |
|
|
""" |
|
|
Context manager for tool-level spans. |
|
|
|
|
|
Args: |
|
|
tool_name: Name of the tool (e.g., "tavily_search", "calculator") |
|
|
metadata: Optional metadata for the span |
|
|
""" |
|
|
span_name = f"tool/{tool_name}" |
|
|
span = None |
|
|
|
|
|
try: |
|
|
client = get_client() |
|
|
span = client.start_as_current_span(name=span_name) |
|
|
span_context = span.__enter__() |
|
|
|
|
|
if metadata: |
|
|
span_context.update_trace(metadata=metadata) |
|
|
yield span_context |
|
|
except Exception as e: |
|
|
print(f"Warning: Failed to create tool span for {tool_name}: {e}") |
|
|
yield None |
|
|
finally: |
|
|
if span is not None: |
|
|
try: |
|
|
span.__exit__(None, None, None) |
|
|
except Exception as e: |
|
|
print(f"Warning: Error closing tool span: {e}") |