Humanlearning's picture
updated agent
f844f16
"""Tracing and Observability Setup for Langfuse v3.0.0"""
import os
from typing import Optional
from langfuse import Langfuse, get_client
from langfuse.langchain import CallbackHandler
def initialize_langfuse() -> None:
"""Initialize Langfuse client with proper configuration"""
try:
# Initialize Langfuse client
Langfuse(
public_key=os.environ.get("LANGFUSE_PUBLIC_KEY"),
secret_key=os.environ.get("LANGFUSE_SECRET_KEY"),
host=os.environ.get("LANGFUSE_HOST", "https://cloud.langfuse.com")
)
print("Langfuse client initialized successfully")
except Exception as e:
print(f"Warning: Could not initialize Langfuse client: {e}")
# Singleton for Langfuse CallbackHandler to ensure a single handler per request
_CALLBACK_HANDLER: Optional[CallbackHandler] = None
def get_langfuse_callback_handler() -> Optional[CallbackHandler]:
"""Get (or create) a singleton Langfuse callback handler for LangChain integration
Best-practice (#2): Pass exactly **one** CallbackHandler into graph.invoke/stream so that
every nested LLM/tool span is correlated underneath the same root span. Re-using the
same instance avoids fragmenting traces when individual nodes try to create their own
handler.
"""
global _CALLBACK_HANDLER # noqa: PLW0603 – module-level singleton is intentional
try:
initialize_langfuse()
if _CALLBACK_HANDLER is None:
_CALLBACK_HANDLER = CallbackHandler()
return _CALLBACK_HANDLER
except Exception as e:
print(f"Warning: Could not create Langfuse callback handler: {e}")
return None
def trace_agent_execution(name: str, user_id: str | None = None, session_id: str | None = None):
"""Context manager that opens a **root** span for the current user request.
Follows Langfuse best practices (rules #2 & #3):
• exactly one root span per request
• attach `user_id` and `session_id` so that follow-up calls are stitched together
"""
try:
langfuse = get_client()
span_kwargs = {"name": name}
# Open the span as context manager so everything inside is automatically nested
span_cm = langfuse.start_as_current_span(**span_kwargs)
# Wrap the CM so that we can update the trace metadata *after* it was started
class _TraceWrapper:
def __enter__(self):
# Enter the span
self._span = span_cm.__enter__()
# Immediately enrich it with session/user information
try:
langfuse.update_current_trace(
**{k: v for k, v in {"user_id": user_id, "session_id": session_id}.items() if v}
)
except Exception:
# Ignore update failures – tracing must never break business logic
pass
return self._span
def __exit__(self, exc_type, exc_val, exc_tb):
return span_cm.__exit__(exc_type, exc_val, exc_tb)
return _TraceWrapper()
except Exception as e:
print(f"Warning: Could not create trace span: {e}")
# Gracefully degrade – return dummy context manager
from contextlib import nullcontext
return nullcontext() # type: ignore
def update_trace_metadata(user_id: str = None, session_id: str = None, tags: list = None, **kwargs):
"""Update current trace with metadata"""
try:
langfuse = get_client()
update_args = {}
if user_id:
update_args["user_id"] = user_id
if session_id:
update_args["session_id"] = session_id
if tags:
update_args["tags"] = tags
if kwargs:
update_args.update(kwargs)
langfuse.update_current_trace(**update_args)
except Exception as e:
print(f"Warning: Could not update trace metadata: {e}")
def flush_langfuse():
"""Flush Langfuse events (for short-lived applications)"""
try:
langfuse = get_client()
langfuse.flush()
except Exception as e:
print(f"Warning: Could not flush Langfuse events: {e}")
def shutdown_langfuse():
"""Shutdown Langfuse client (for application cleanup)"""
try:
langfuse = get_client()
langfuse.shutdown()
except Exception as e:
print(f"Warning: Could not shutdown Langfuse client: {e}")
# Initialize Langfuse on module import
initialize_langfuse()