File size: 6,471 Bytes
f844f16
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
"""
Observability module for Langfuse v3 integration with OpenTelemetry support.

This module provides:
- Single global CallbackHandler for LangChain integration
- Root span management for user requests
- Session and user tracking
- Background flushing for async operations
"""

import os
import base64
from typing import Optional, Dict, Any
from contextlib import contextmanager
from dotenv import load_dotenv

# Langfuse v3 imports
from langfuse import get_client
from langfuse.langchain import CallbackHandler

# OpenTelemetry imports for v3 compatibility  
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter

# Load environment variables
load_dotenv("env.local")

# Global callback handler instance (singleton)
_langfuse_handler: Optional[CallbackHandler] = None
_tracer_provider: Optional[TracerProvider] = None

def initialize_observability() -> bool:
    """
    Initialize Langfuse observability with OTEL integration.
    
    Returns:
        bool: True if initialization successful, False otherwise
    """
    global _langfuse_handler, _tracer_provider
    
    try:
        # Check required environment variables
        required_vars = ["LANGFUSE_PUBLIC_KEY", "LANGFUSE_SECRET_KEY", "LANGFUSE_HOST"]
        missing_vars = [var for var in required_vars if not os.getenv(var)]
        
        if missing_vars:
            print(f"Warning: Missing required environment variables: {missing_vars}")
            return False
        
        # Setup OTEL integration for Langfuse v3
        langfuse_auth = base64.b64encode(
            f"{os.getenv('LANGFUSE_PUBLIC_KEY')}:{os.getenv('LANGFUSE_SECRET_KEY')}".encode()
        ).decode()
        
        # Configure OTEL environment
        os.environ['OTEL_EXPORTER_OTLP_ENDPOINT'] = f"{os.getenv('LANGFUSE_HOST')}/api/public/otel"
        os.environ['OTEL_EXPORTER_OTLP_HEADERS'] = f"Authorization=Basic {langfuse_auth}"
        
        # Setup OpenTelemetry tracer provider
        _tracer_provider = TracerProvider()
        _tracer_provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter()))
        trace.set_tracer_provider(_tracer_provider)
        
        # Create single global callback handler
        _langfuse_handler = CallbackHandler()
        
        print("✅ Langfuse observability initialized successfully")
        return True
        
    except Exception as e:
        print(f"❌ Failed to initialize observability: {e}")
        return False

def get_callback_handler() -> Optional[CallbackHandler]:
    """
    Get the global Langfuse callback handler.
    
    Returns:
        CallbackHandler or None if not initialized
    """
    global _langfuse_handler
    
    if _langfuse_handler is None:
        if initialize_observability():
            return _langfuse_handler
        return None
    
    return _langfuse_handler

@contextmanager
def start_root_span(
    name: str,
    user_id: str,
    session_id: str,
    metadata: Optional[Dict[str, Any]] = None
):
    """
    Context manager for creating root spans with user and session tracking.
    
    Args:
        name: Span name (e.g., "user-request")
        user_id: User identifier for session tracking
        session_id: Session identifier for conversation continuity
        metadata: Optional additional metadata
        
    Yields:
        Langfuse span context
    """
    try:
        # Create root span with v3 API
        client = get_client()
        with client.start_as_current_span(name=name) as span:
            # Update trace with user and session information
            span.update_trace(
                user_id=user_id,
                session_id=session_id,
                tags=[
                    os.getenv("ENV", "dev"),  # Environment tag
                    "multi-agent-system"      # System identifier
                ]
            )
            
            # Add metadata if provided
            if metadata:
                span.update_trace(metadata=metadata)
            
            yield span
            
    except Exception as e:
        print(f"Warning: Failed to create root span: {e}")
        # Yield None so code doesn't break
        yield None

def flush_traces(background: bool = True) -> None:
    """
    Flush pending traces to Langfuse.
    
    Args:
        background: Whether to flush in background (non-blocking)
    """
    try:
        client = get_client()
        client.flush()
    except Exception as e:
        print(f"Warning: Failed to flush traces: {e}")

def shutdown_observability() -> None:
    """
    Clean shutdown of observability components.
    """
    global _tracer_provider
    
    try:
        # Flush any remaining traces
        flush_traces(background=False)
        
        # Shutdown tracer provider
        if _tracer_provider:
            _tracer_provider.shutdown()
            
    except Exception as e:
        print(f"Warning: Error during observability shutdown: {e}")

# Agent span helpers for consistent naming
@contextmanager
def agent_span(agent_name: str, metadata: Optional[Dict[str, Any]] = None):
    """
    Context manager for agent-level spans.
    
    Args:
        agent_name: Name of the agent (e.g., "lead", "research", "code")
        metadata: Optional metadata for the span
    """
    span_name = f"agent/{agent_name}"
    
    try:
        client = get_client()
        with client.start_as_current_span(name=span_name) as span:
            if metadata:
                span.update_trace(metadata=metadata)
            yield span
    except Exception as e:
        print(f"Warning: Failed to create agent span for {agent_name}: {e}")
        yield None

@contextmanager  
def tool_span(tool_name: str, metadata: Optional[Dict[str, Any]] = None):
    """
    Context manager for tool-level spans.
    
    Args:
        tool_name: Name of the tool (e.g., "tavily_search", "calculator")
        metadata: Optional metadata for the span
    """
    span_name = f"tool/{tool_name}"
    
    try:
        client = get_client()
        with client.start_as_current_span(name=span_name) as span:
            if metadata:
                span.update_trace(metadata=metadata)
            yield span
    except Exception as e:
        print(f"Warning: Failed to create tool span for {tool_name}: {e}")
        yield None