File size: 4,546 Bytes
fe36046
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
"""Tracing and Observability Setup for Langfuse v3.0.0"""
import os
from typing import Optional
from langfuse import Langfuse, get_client
from langfuse.langchain import CallbackHandler


def initialize_langfuse() -> None:
    """Initialize Langfuse client with proper configuration"""
    try:
        # Initialize Langfuse client
        Langfuse(
            public_key=os.environ.get("LANGFUSE_PUBLIC_KEY"),
            secret_key=os.environ.get("LANGFUSE_SECRET_KEY"),
            host=os.environ.get("LANGFUSE_HOST", "https://cloud.langfuse.com")
        )
        print("Langfuse client initialized successfully")
    except Exception as e:
        print(f"Warning: Could not initialize Langfuse client: {e}")


# Singleton for Langfuse CallbackHandler to ensure a single handler per request
_CALLBACK_HANDLER: Optional[CallbackHandler] = None


def get_langfuse_callback_handler() -> Optional[CallbackHandler]:
    """Get (or create) a singleton Langfuse callback handler for LangChain integration

    Best-practice (#2): Pass exactly **one** CallbackHandler into graph.invoke/stream so that
    every nested LLM/tool span is correlated underneath the same root span. Re-using the
    same instance avoids fragmenting traces when individual nodes try to create their own
    handler.
    """
    global _CALLBACK_HANDLER  # noqa: PLW0603 – module-level singleton is intentional

    try:
        initialize_langfuse()
        if _CALLBACK_HANDLER is None:
            _CALLBACK_HANDLER = CallbackHandler()
        return _CALLBACK_HANDLER
    except Exception as e:
        print(f"Warning: Could not create Langfuse callback handler: {e}")
        return None


def trace_agent_execution(name: str, user_id: str | None = None, session_id: str | None = None):
    """Context manager that opens a **root** span for the current user request.

    Follows Langfuse best practices (rules #2 & #3):
        • exactly one root span per request
        • attach `user_id` and `session_id` so that follow-up calls are stitched together
    """
    try:
        langfuse = get_client()
        span_kwargs = {"name": name}
        # Open the span as context manager so everything inside is automatically nested
        span_cm = langfuse.start_as_current_span(**span_kwargs)

        # Wrap the CM so that we can update the trace metadata *after* it was started
        class _TraceWrapper:
            def __enter__(self):
                # Enter the span
                self._span = span_cm.__enter__()
                # Immediately enrich it with session/user information
                try:
                    langfuse.update_current_trace(
                        **{k: v for k, v in {"user_id": user_id, "session_id": session_id}.items() if v}
                    )
                except Exception:
                    # Ignore update failures – tracing must never break business logic
                    pass
                return self._span

            def __exit__(self, exc_type, exc_val, exc_tb):
                return span_cm.__exit__(exc_type, exc_val, exc_tb)

        return _TraceWrapper()
    except Exception as e:
        print(f"Warning: Could not create trace span: {e}")
        # Gracefully degrade – return dummy context manager
        from contextlib import nullcontext

        return nullcontext()  # type: ignore


def update_trace_metadata(user_id: str = None, session_id: str = None, tags: list = None, **kwargs):
    """Update current trace with metadata"""
    try:
        langfuse = get_client()
        update_args = {}
        
        if user_id:
            update_args["user_id"] = user_id
        if session_id:
            update_args["session_id"] = session_id
        if tags:
            update_args["tags"] = tags
        if kwargs:
            update_args.update(kwargs)
        
        langfuse.update_current_trace(**update_args)
    except Exception as e:
        print(f"Warning: Could not update trace metadata: {e}")


def flush_langfuse():
    """Flush Langfuse events (for short-lived applications)"""
    try:
        langfuse = get_client()
        langfuse.flush()
    except Exception as e:
        print(f"Warning: Could not flush Langfuse events: {e}")


def shutdown_langfuse():
    """Shutdown Langfuse client (for application cleanup)"""
    try:
        langfuse = get_client()
        langfuse.shutdown()
    except Exception as e:
        print(f"Warning: Could not shutdown Langfuse client: {e}")


# Initialize Langfuse on module import
initialize_langfuse()