""" Observability module for Langfuse v3 integration with OpenTelemetry support. This module provides: - Single global CallbackHandler for LangChain integration - Root span management for user requests - Session and user tracking - Background flushing for async operations """ import os import base64 from typing import Optional, Dict, Any from contextlib import contextmanager from dotenv import load_dotenv # Langfuse v3 imports from langfuse import get_client from langfuse.langchain import CallbackHandler # OpenTelemetry imports for v3 compatibility from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import SimpleSpanProcessor from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter # Load environment variables load_dotenv("env.local") # Global callback handler instance (singleton) _langfuse_handler: Optional[CallbackHandler] = None _tracer_provider: Optional[TracerProvider] = None def initialize_observability() -> bool: """ Initialize Langfuse observability with OTEL integration. Returns: bool: True if initialization successful, False otherwise """ global _langfuse_handler, _tracer_provider try: # Check required environment variables required_vars = ["LANGFUSE_PUBLIC_KEY", "LANGFUSE_SECRET_KEY", "LANGFUSE_HOST"] missing_vars = [var for var in required_vars if not os.getenv(var)] if missing_vars: print(f"Warning: Missing required environment variables: {missing_vars}") return False # Setup OTEL integration for Langfuse v3 langfuse_auth = base64.b64encode( f"{os.getenv('LANGFUSE_PUBLIC_KEY')}:{os.getenv('LANGFUSE_SECRET_KEY')}".encode() ).decode() # Configure OTEL environment os.environ['OTEL_EXPORTER_OTLP_ENDPOINT'] = f"{os.getenv('LANGFUSE_HOST')}/api/public/otel" os.environ['OTEL_EXPORTER_OTLP_HEADERS'] = f"Authorization=Basic {langfuse_auth}" # Setup OpenTelemetry tracer provider _tracer_provider = TracerProvider() _tracer_provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter())) trace.set_tracer_provider(_tracer_provider) # Create single global callback handler _langfuse_handler = CallbackHandler() print("✅ Langfuse observability initialized successfully") return True except Exception as e: print(f"❌ Failed to initialize observability: {e}") return False def get_callback_handler() -> Optional[CallbackHandler]: """ Get the global Langfuse callback handler. Returns: CallbackHandler or None if not initialized """ global _langfuse_handler if _langfuse_handler is None: if initialize_observability(): return _langfuse_handler return None return _langfuse_handler @contextmanager def start_root_span( name: str, user_id: str, session_id: str, metadata: Optional[Dict[str, Any]] = None ): """ Context manager for creating root spans with user and session tracking. Args: name: Span name (e.g., "user-request") user_id: User identifier for session tracking session_id: Session identifier for conversation continuity metadata: Optional additional metadata Yields: Langfuse span context or None if creation fails """ span = None try: # Create root span with v3 API client = get_client() span = client.start_as_current_span(name=name) span_context = span.__enter__() # Update trace with user and session information span_context.update_trace( user_id=user_id, session_id=session_id, tags=[ os.getenv("ENV", "dev"), # Environment tag "multi-agent-system" # System identifier ] ) # Add metadata if provided if metadata: span_context.update_trace(metadata=metadata) yield span_context except Exception as e: print(f"Warning: Failed to create root span: {e}") # Yield None so code doesn't break yield None finally: # Ensure proper cleanup if span is not None: try: span.__exit__(None, None, None) except Exception as e: print(f"Warning: Error closing span: {e}") def flush_traces(background: bool = True) -> None: """ Flush pending traces to Langfuse. Args: background: Whether to flush in background (non-blocking) """ try: client = get_client() client.flush() except Exception as e: print(f"Warning: Failed to flush traces: {e}") def shutdown_observability() -> None: """ Clean shutdown of observability components. """ global _tracer_provider try: # Flush any remaining traces flush_traces(background=False) # Shutdown tracer provider if _tracer_provider: _tracer_provider.shutdown() except Exception as e: print(f"Warning: Error during observability shutdown: {e}") # Agent span helpers for consistent naming @contextmanager def agent_span(agent_name: str, metadata: Optional[Dict[str, Any]] = None): """ Context manager for agent-level spans. Args: agent_name: Name of the agent (e.g., "lead", "research", "code") metadata: Optional metadata for the span """ span_name = f"agent/{agent_name}" span = None try: client = get_client() span = client.start_as_current_span(name=span_name) span_context = span.__enter__() if metadata: span_context.update_trace(metadata=metadata) yield span_context except Exception as e: print(f"Warning: Failed to create agent span for {agent_name}: {e}") yield None finally: if span is not None: try: span.__exit__(None, None, None) except Exception as e: print(f"Warning: Error closing agent span: {e}") @contextmanager def tool_span(tool_name: str, metadata: Optional[Dict[str, Any]] = None): """ Context manager for tool-level spans. Args: tool_name: Name of the tool (e.g., "tavily_search", "calculator") metadata: Optional metadata for the span """ span_name = f"tool/{tool_name}" span = None try: client = get_client() span = client.start_as_current_span(name=span_name) span_context = span.__enter__() if metadata: span_context.update_trace(metadata=metadata) yield span_context except Exception as e: print(f"Warning: Failed to create tool span for {tool_name}: {e}") yield None finally: if span is not None: try: span.__exit__(None, None, None) except Exception as e: print(f"Warning: Error closing tool span: {e}")