File size: 7,285 Bytes
f844f16 df7388a f844f16 df7388a f844f16 df7388a f844f16 df7388a f844f16 df7388a f844f16 df7388a f844f16 df7388a f844f16 df7388a f844f16 df7388a f844f16 df7388a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 |
"""
Observability module for Langfuse v3 integration with OpenTelemetry support.
This module provides:
- Single global CallbackHandler for LangChain integration
- Root span management for user requests
- Session and user tracking
- Background flushing for async operations
"""
import os
import base64
from typing import Optional, Dict, Any
from contextlib import contextmanager
from dotenv import load_dotenv
# Langfuse v3 imports
from langfuse import get_client
from langfuse.langchain import CallbackHandler
# OpenTelemetry imports for v3 compatibility
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
# Load environment variables
load_dotenv("env.local")
# Global callback handler instance (singleton)
_langfuse_handler: Optional[CallbackHandler] = None
_tracer_provider: Optional[TracerProvider] = None
def initialize_observability() -> bool:
"""
Initialize Langfuse observability with OTEL integration.
Returns:
bool: True if initialization successful, False otherwise
"""
global _langfuse_handler, _tracer_provider
try:
# Check required environment variables
required_vars = ["LANGFUSE_PUBLIC_KEY", "LANGFUSE_SECRET_KEY", "LANGFUSE_HOST"]
missing_vars = [var for var in required_vars if not os.getenv(var)]
if missing_vars:
print(f"Warning: Missing required environment variables: {missing_vars}")
return False
# Setup OTEL integration for Langfuse v3
langfuse_auth = base64.b64encode(
f"{os.getenv('LANGFUSE_PUBLIC_KEY')}:{os.getenv('LANGFUSE_SECRET_KEY')}".encode()
).decode()
# Configure OTEL environment
os.environ['OTEL_EXPORTER_OTLP_ENDPOINT'] = f"{os.getenv('LANGFUSE_HOST')}/api/public/otel"
os.environ['OTEL_EXPORTER_OTLP_HEADERS'] = f"Authorization=Basic {langfuse_auth}"
# Setup OpenTelemetry tracer provider
_tracer_provider = TracerProvider()
_tracer_provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter()))
trace.set_tracer_provider(_tracer_provider)
# Create single global callback handler
_langfuse_handler = CallbackHandler()
print("✅ Langfuse observability initialized successfully")
return True
except Exception as e:
print(f"❌ Failed to initialize observability: {e}")
return False
def get_callback_handler() -> Optional[CallbackHandler]:
"""
Get the global Langfuse callback handler.
Returns:
CallbackHandler or None if not initialized
"""
global _langfuse_handler
if _langfuse_handler is None:
if initialize_observability():
return _langfuse_handler
return None
return _langfuse_handler
@contextmanager
def start_root_span(
name: str,
user_id: str,
session_id: str,
metadata: Optional[Dict[str, Any]] = None
):
"""
Context manager for creating root spans with user and session tracking.
Args:
name: Span name (e.g., "user-request")
user_id: User identifier for session tracking
session_id: Session identifier for conversation continuity
metadata: Optional additional metadata
Yields:
Langfuse span context or None if creation fails
"""
span = None
try:
# Create root span with v3 API
client = get_client()
span = client.start_as_current_span(name=name)
span_context = span.__enter__()
# Update trace with user and session information
span_context.update_trace(
user_id=user_id,
session_id=session_id,
tags=[
os.getenv("ENV", "dev"), # Environment tag
"multi-agent-system" # System identifier
]
)
# Add metadata if provided
if metadata:
span_context.update_trace(metadata=metadata)
yield span_context
except Exception as e:
print(f"Warning: Failed to create root span: {e}")
# Yield None so code doesn't break
yield None
finally:
# Ensure proper cleanup
if span is not None:
try:
span.__exit__(None, None, None)
except Exception as e:
print(f"Warning: Error closing span: {e}")
def flush_traces(background: bool = True) -> None:
"""
Flush pending traces to Langfuse.
Args:
background: Whether to flush in background (non-blocking)
"""
try:
client = get_client()
client.flush()
except Exception as e:
print(f"Warning: Failed to flush traces: {e}")
def shutdown_observability() -> None:
"""
Clean shutdown of observability components.
"""
global _tracer_provider
try:
# Flush any remaining traces
flush_traces(background=False)
# Shutdown tracer provider
if _tracer_provider:
_tracer_provider.shutdown()
except Exception as e:
print(f"Warning: Error during observability shutdown: {e}")
# Agent span helpers for consistent naming
@contextmanager
def agent_span(agent_name: str, metadata: Optional[Dict[str, Any]] = None):
"""
Context manager for agent-level spans.
Args:
agent_name: Name of the agent (e.g., "lead", "research", "code")
metadata: Optional metadata for the span
"""
span_name = f"agent/{agent_name}"
span = None
try:
client = get_client()
span = client.start_as_current_span(name=span_name)
span_context = span.__enter__()
if metadata:
span_context.update_trace(metadata=metadata)
yield span_context
except Exception as e:
print(f"Warning: Failed to create agent span for {agent_name}: {e}")
yield None
finally:
if span is not None:
try:
span.__exit__(None, None, None)
except Exception as e:
print(f"Warning: Error closing agent span: {e}")
@contextmanager
def tool_span(tool_name: str, metadata: Optional[Dict[str, Any]] = None):
"""
Context manager for tool-level spans.
Args:
tool_name: Name of the tool (e.g., "tavily_search", "calculator")
metadata: Optional metadata for the span
"""
span_name = f"tool/{tool_name}"
span = None
try:
client = get_client()
span = client.start_as_current_span(name=span_name)
span_context = span.__enter__()
if metadata:
span_context.update_trace(metadata=metadata)
yield span_context
except Exception as e:
print(f"Warning: Failed to create tool span for {tool_name}: {e}")
yield None
finally:
if span is not None:
try:
span.__exit__(None, None, None)
except Exception as e:
print(f"Warning: Error closing tool span: {e}") |