File size: 4,546 Bytes
fe36046 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
"""Tracing and Observability Setup for Langfuse v3.0.0"""
import os
from typing import Optional
from langfuse import Langfuse, get_client
from langfuse.langchain import CallbackHandler
def initialize_langfuse() -> None:
"""Initialize Langfuse client with proper configuration"""
try:
# Initialize Langfuse client
Langfuse(
public_key=os.environ.get("LANGFUSE_PUBLIC_KEY"),
secret_key=os.environ.get("LANGFUSE_SECRET_KEY"),
host=os.environ.get("LANGFUSE_HOST", "https://cloud.langfuse.com")
)
print("Langfuse client initialized successfully")
except Exception as e:
print(f"Warning: Could not initialize Langfuse client: {e}")
# Singleton for Langfuse CallbackHandler to ensure a single handler per request
_CALLBACK_HANDLER: Optional[CallbackHandler] = None
def get_langfuse_callback_handler() -> Optional[CallbackHandler]:
"""Get (or create) a singleton Langfuse callback handler for LangChain integration
Best-practice (#2): Pass exactly **one** CallbackHandler into graph.invoke/stream so that
every nested LLM/tool span is correlated underneath the same root span. Re-using the
same instance avoids fragmenting traces when individual nodes try to create their own
handler.
"""
global _CALLBACK_HANDLER # noqa: PLW0603 – module-level singleton is intentional
try:
initialize_langfuse()
if _CALLBACK_HANDLER is None:
_CALLBACK_HANDLER = CallbackHandler()
return _CALLBACK_HANDLER
except Exception as e:
print(f"Warning: Could not create Langfuse callback handler: {e}")
return None
def trace_agent_execution(name: str, user_id: str | None = None, session_id: str | None = None):
"""Context manager that opens a **root** span for the current user request.
Follows Langfuse best practices (rules #2 & #3):
• exactly one root span per request
• attach `user_id` and `session_id` so that follow-up calls are stitched together
"""
try:
langfuse = get_client()
span_kwargs = {"name": name}
# Open the span as context manager so everything inside is automatically nested
span_cm = langfuse.start_as_current_span(**span_kwargs)
# Wrap the CM so that we can update the trace metadata *after* it was started
class _TraceWrapper:
def __enter__(self):
# Enter the span
self._span = span_cm.__enter__()
# Immediately enrich it with session/user information
try:
langfuse.update_current_trace(
**{k: v for k, v in {"user_id": user_id, "session_id": session_id}.items() if v}
)
except Exception:
# Ignore update failures – tracing must never break business logic
pass
return self._span
def __exit__(self, exc_type, exc_val, exc_tb):
return span_cm.__exit__(exc_type, exc_val, exc_tb)
return _TraceWrapper()
except Exception as e:
print(f"Warning: Could not create trace span: {e}")
# Gracefully degrade – return dummy context manager
from contextlib import nullcontext
return nullcontext() # type: ignore
def update_trace_metadata(user_id: str = None, session_id: str = None, tags: list = None, **kwargs):
"""Update current trace with metadata"""
try:
langfuse = get_client()
update_args = {}
if user_id:
update_args["user_id"] = user_id
if session_id:
update_args["session_id"] = session_id
if tags:
update_args["tags"] = tags
if kwargs:
update_args.update(kwargs)
langfuse.update_current_trace(**update_args)
except Exception as e:
print(f"Warning: Could not update trace metadata: {e}")
def flush_langfuse():
"""Flush Langfuse events (for short-lived applications)"""
try:
langfuse = get_client()
langfuse.flush()
except Exception as e:
print(f"Warning: Could not flush Langfuse events: {e}")
def shutdown_langfuse():
"""Shutdown Langfuse client (for application cleanup)"""
try:
langfuse = get_client()
langfuse.shutdown()
except Exception as e:
print(f"Warning: Could not shutdown Langfuse client: {e}")
# Initialize Langfuse on module import
initialize_langfuse() |