File size: 7,285 Bytes
f844f16
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
df7388a
f844f16
df7388a
f844f16
 
 
df7388a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f844f16
 
 
 
df7388a
 
 
 
 
 
 
f844f16
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
df7388a
f844f16
 
 
df7388a
 
 
 
 
 
f844f16
 
 
df7388a
 
 
 
 
 
f844f16
 
 
 
 
 
 
 
 
 
 
df7388a
f844f16
 
 
df7388a
 
 
 
 
 
f844f16
 
df7388a
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
"""
Observability module for Langfuse v3 integration with OpenTelemetry support.

This module provides:
- Single global CallbackHandler for LangChain integration
- Root span management for user requests
- Session and user tracking
- Background flushing for async operations
"""

import os
import base64
from typing import Optional, Dict, Any
from contextlib import contextmanager
from dotenv import load_dotenv

# Langfuse v3 imports
from langfuse import get_client
from langfuse.langchain import CallbackHandler

# OpenTelemetry imports for v3 compatibility  
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter

# Load environment variables
load_dotenv("env.local")

# Global callback handler instance (singleton)
_langfuse_handler: Optional[CallbackHandler] = None
_tracer_provider: Optional[TracerProvider] = None

def initialize_observability() -> bool:
    """
    Initialize Langfuse observability with OTEL integration.
    
    Returns:
        bool: True if initialization successful, False otherwise
    """
    global _langfuse_handler, _tracer_provider
    
    try:
        # Check required environment variables
        required_vars = ["LANGFUSE_PUBLIC_KEY", "LANGFUSE_SECRET_KEY", "LANGFUSE_HOST"]
        missing_vars = [var for var in required_vars if not os.getenv(var)]
        
        if missing_vars:
            print(f"Warning: Missing required environment variables: {missing_vars}")
            return False
        
        # Setup OTEL integration for Langfuse v3
        langfuse_auth = base64.b64encode(
            f"{os.getenv('LANGFUSE_PUBLIC_KEY')}:{os.getenv('LANGFUSE_SECRET_KEY')}".encode()
        ).decode()
        
        # Configure OTEL environment
        os.environ['OTEL_EXPORTER_OTLP_ENDPOINT'] = f"{os.getenv('LANGFUSE_HOST')}/api/public/otel"
        os.environ['OTEL_EXPORTER_OTLP_HEADERS'] = f"Authorization=Basic {langfuse_auth}"
        
        # Setup OpenTelemetry tracer provider
        _tracer_provider = TracerProvider()
        _tracer_provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter()))
        trace.set_tracer_provider(_tracer_provider)
        
        # Create single global callback handler
        _langfuse_handler = CallbackHandler()
        
        print("✅ Langfuse observability initialized successfully")
        return True
        
    except Exception as e:
        print(f"❌ Failed to initialize observability: {e}")
        return False

def get_callback_handler() -> Optional[CallbackHandler]:
    """
    Get the global Langfuse callback handler.
    
    Returns:
        CallbackHandler or None if not initialized
    """
    global _langfuse_handler
    
    if _langfuse_handler is None:
        if initialize_observability():
            return _langfuse_handler
        return None
    
    return _langfuse_handler

@contextmanager
def start_root_span(
    name: str,
    user_id: str,
    session_id: str,
    metadata: Optional[Dict[str, Any]] = None
):
    """
    Context manager for creating root spans with user and session tracking.
    
    Args:
        name: Span name (e.g., "user-request")
        user_id: User identifier for session tracking
        session_id: Session identifier for conversation continuity
        metadata: Optional additional metadata
        
    Yields:
        Langfuse span context or None if creation fails
    """
    span = None
    try:
        # Create root span with v3 API
        client = get_client()
        span = client.start_as_current_span(name=name)
        span_context = span.__enter__()
        
        # Update trace with user and session information
        span_context.update_trace(
            user_id=user_id,
            session_id=session_id,
            tags=[
                os.getenv("ENV", "dev"),  # Environment tag
                "multi-agent-system"      # System identifier
            ]
        )
        
        # Add metadata if provided
        if metadata:
            span_context.update_trace(metadata=metadata)
        
        yield span_context
        
    except Exception as e:
        print(f"Warning: Failed to create root span: {e}")
        # Yield None so code doesn't break
        yield None
    finally:
        # Ensure proper cleanup
        if span is not None:
            try:
                span.__exit__(None, None, None)
            except Exception as e:
                print(f"Warning: Error closing span: {e}")

def flush_traces(background: bool = True) -> None:
    """
    Flush pending traces to Langfuse.
    
    Args:
        background: Whether to flush in background (non-blocking)
    """
    try:
        client = get_client()
        client.flush()
    except Exception as e:
        print(f"Warning: Failed to flush traces: {e}")

def shutdown_observability() -> None:
    """
    Clean shutdown of observability components.
    """
    global _tracer_provider
    
    try:
        # Flush any remaining traces
        flush_traces(background=False)
        
        # Shutdown tracer provider
        if _tracer_provider:
            _tracer_provider.shutdown()
            
    except Exception as e:
        print(f"Warning: Error during observability shutdown: {e}")

# Agent span helpers for consistent naming
@contextmanager
def agent_span(agent_name: str, metadata: Optional[Dict[str, Any]] = None):
    """
    Context manager for agent-level spans.
    
    Args:
        agent_name: Name of the agent (e.g., "lead", "research", "code")
        metadata: Optional metadata for the span
    """
    span_name = f"agent/{agent_name}"
    span = None
    
    try:
        client = get_client()
        span = client.start_as_current_span(name=span_name)
        span_context = span.__enter__()
        
        if metadata:
            span_context.update_trace(metadata=metadata)
        yield span_context
    except Exception as e:
        print(f"Warning: Failed to create agent span for {agent_name}: {e}")
        yield None
    finally:
        if span is not None:
            try:
                span.__exit__(None, None, None)
            except Exception as e:
                print(f"Warning: Error closing agent span: {e}")

@contextmanager  
def tool_span(tool_name: str, metadata: Optional[Dict[str, Any]] = None):
    """
    Context manager for tool-level spans.
    
    Args:
        tool_name: Name of the tool (e.g., "tavily_search", "calculator")
        metadata: Optional metadata for the span
    """
    span_name = f"tool/{tool_name}"
    span = None
    
    try:
        client = get_client()
        span = client.start_as_current_span(name=span_name)
        span_context = span.__enter__()
        
        if metadata:
            span_context.update_trace(metadata=metadata)
        yield span_context
    except Exception as e:
        print(f"Warning: Failed to create tool span for {tool_name}: {e}")
        yield None
    finally:
        if span is not None:
            try:
                span.__exit__(None, None, None)
            except Exception as e:
                print(f"Warning: Error closing tool span: {e}")