|
|
"""Tracing and Observability Setup for Langfuse v3.0.0""" |
|
|
import os |
|
|
from typing import Optional |
|
|
from langfuse import Langfuse, get_client |
|
|
from langfuse.langchain import CallbackHandler |
|
|
|
|
|
|
|
|
def initialize_langfuse() -> None: |
|
|
"""Initialize Langfuse client with proper configuration""" |
|
|
try: |
|
|
|
|
|
Langfuse( |
|
|
public_key=os.environ.get("LANGFUSE_PUBLIC_KEY"), |
|
|
secret_key=os.environ.get("LANGFUSE_SECRET_KEY"), |
|
|
host=os.environ.get("LANGFUSE_HOST", "https://cloud.langfuse.com") |
|
|
) |
|
|
print("Langfuse client initialized successfully") |
|
|
except Exception as e: |
|
|
print(f"Warning: Could not initialize Langfuse client: {e}") |
|
|
|
|
|
|
|
|
|
|
|
_CALLBACK_HANDLER: Optional[CallbackHandler] = None |
|
|
|
|
|
|
|
|
def get_langfuse_callback_handler() -> Optional[CallbackHandler]: |
|
|
"""Get (or create) a singleton Langfuse callback handler for LangChain integration |
|
|
|
|
|
Best-practice (#2): Pass exactly **one** CallbackHandler into graph.invoke/stream so that |
|
|
every nested LLM/tool span is correlated underneath the same root span. Re-using the |
|
|
same instance avoids fragmenting traces when individual nodes try to create their own |
|
|
handler. |
|
|
""" |
|
|
global _CALLBACK_HANDLER |
|
|
|
|
|
try: |
|
|
initialize_langfuse() |
|
|
if _CALLBACK_HANDLER is None: |
|
|
_CALLBACK_HANDLER = CallbackHandler() |
|
|
return _CALLBACK_HANDLER |
|
|
except Exception as e: |
|
|
print(f"Warning: Could not create Langfuse callback handler: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
def trace_agent_execution(name: str, user_id: str | None = None, session_id: str | None = None): |
|
|
"""Context manager that opens a **root** span for the current user request. |
|
|
|
|
|
Follows Langfuse best practices (rules #2 & #3): |
|
|
• exactly one root span per request |
|
|
• attach `user_id` and `session_id` so that follow-up calls are stitched together |
|
|
""" |
|
|
try: |
|
|
langfuse = get_client() |
|
|
span_kwargs = {"name": name} |
|
|
|
|
|
span_cm = langfuse.start_as_current_span(**span_kwargs) |
|
|
|
|
|
|
|
|
class _TraceWrapper: |
|
|
def __enter__(self): |
|
|
|
|
|
self._span = span_cm.__enter__() |
|
|
|
|
|
try: |
|
|
langfuse.update_current_trace( |
|
|
**{k: v for k, v in {"user_id": user_id, "session_id": session_id}.items() if v} |
|
|
) |
|
|
except Exception: |
|
|
|
|
|
pass |
|
|
return self._span |
|
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb): |
|
|
return span_cm.__exit__(exc_type, exc_val, exc_tb) |
|
|
|
|
|
return _TraceWrapper() |
|
|
except Exception as e: |
|
|
print(f"Warning: Could not create trace span: {e}") |
|
|
|
|
|
from contextlib import nullcontext |
|
|
|
|
|
return nullcontext() |
|
|
|
|
|
|
|
|
def update_trace_metadata(user_id: str = None, session_id: str = None, tags: list = None, **kwargs): |
|
|
"""Update current trace with metadata""" |
|
|
try: |
|
|
langfuse = get_client() |
|
|
update_args = {} |
|
|
|
|
|
if user_id: |
|
|
update_args["user_id"] = user_id |
|
|
if session_id: |
|
|
update_args["session_id"] = session_id |
|
|
if tags: |
|
|
update_args["tags"] = tags |
|
|
if kwargs: |
|
|
update_args.update(kwargs) |
|
|
|
|
|
langfuse.update_current_trace(**update_args) |
|
|
except Exception as e: |
|
|
print(f"Warning: Could not update trace metadata: {e}") |
|
|
|
|
|
|
|
|
def flush_langfuse(): |
|
|
"""Flush Langfuse events (for short-lived applications)""" |
|
|
try: |
|
|
langfuse = get_client() |
|
|
langfuse.flush() |
|
|
except Exception as e: |
|
|
print(f"Warning: Could not flush Langfuse events: {e}") |
|
|
|
|
|
|
|
|
def shutdown_langfuse(): |
|
|
"""Shutdown Langfuse client (for application cleanup)""" |
|
|
try: |
|
|
langfuse = get_client() |
|
|
langfuse.shutdown() |
|
|
except Exception as e: |
|
|
print(f"Warning: Could not shutdown Langfuse client: {e}") |
|
|
|
|
|
|
|
|
|
|
|
initialize_langfuse() |