| | """Tracing and Observability Setup for Langfuse v3.0.0""" |
| | import os |
| | from typing import Optional |
| | from langfuse import Langfuse, get_client |
| | from langfuse.langchain import CallbackHandler |
| |
|
| |
|
| | def initialize_langfuse() -> None: |
| | """Initialize Langfuse client with proper configuration""" |
| | try: |
| | |
| | Langfuse( |
| | public_key=os.environ.get("LANGFUSE_PUBLIC_KEY"), |
| | secret_key=os.environ.get("LANGFUSE_SECRET_KEY"), |
| | host=os.environ.get("LANGFUSE_HOST", "https://cloud.langfuse.com") |
| | ) |
| | print("Langfuse client initialized successfully") |
| | except Exception as e: |
| | print(f"Warning: Could not initialize Langfuse client: {e}") |
| |
|
| |
|
| | |
| | _CALLBACK_HANDLER: Optional[CallbackHandler] = None |
| |
|
| |
|
| | def get_langfuse_callback_handler() -> Optional[CallbackHandler]: |
| | """Get (or create) a singleton Langfuse callback handler for LangChain integration |
| | |
| | Best-practice (#2): Pass exactly **one** CallbackHandler into graph.invoke/stream so that |
| | every nested LLM/tool span is correlated underneath the same root span. Re-using the |
| | same instance avoids fragmenting traces when individual nodes try to create their own |
| | handler. |
| | """ |
| | global _CALLBACK_HANDLER |
| |
|
| | try: |
| | initialize_langfuse() |
| | if _CALLBACK_HANDLER is None: |
| | _CALLBACK_HANDLER = CallbackHandler() |
| | return _CALLBACK_HANDLER |
| | except Exception as e: |
| | print(f"Warning: Could not create Langfuse callback handler: {e}") |
| | return None |
| |
|
| |
|
| | def trace_agent_execution(name: str, user_id: str | None = None, session_id: str | None = None): |
| | """Context manager that opens a **root** span for the current user request. |
| | |
| | Follows Langfuse best practices (rules #2 & #3): |
| | • exactly one root span per request |
| | • attach `user_id` and `session_id` so that follow-up calls are stitched together |
| | """ |
| | try: |
| | langfuse = get_client() |
| | span_kwargs = {"name": name} |
| | |
| | span_cm = langfuse.start_as_current_span(**span_kwargs) |
| |
|
| | |
| | class _TraceWrapper: |
| | def __enter__(self): |
| | |
| | self._span = span_cm.__enter__() |
| | |
| | try: |
| | langfuse.update_current_trace( |
| | **{k: v for k, v in {"user_id": user_id, "session_id": session_id}.items() if v} |
| | ) |
| | except Exception: |
| | |
| | pass |
| | return self._span |
| |
|
| | def __exit__(self, exc_type, exc_val, exc_tb): |
| | return span_cm.__exit__(exc_type, exc_val, exc_tb) |
| |
|
| | return _TraceWrapper() |
| | except Exception as e: |
| | print(f"Warning: Could not create trace span: {e}") |
| | |
| | from contextlib import nullcontext |
| |
|
| | return nullcontext() |
| |
|
| |
|
| | def update_trace_metadata(user_id: str = None, session_id: str = None, tags: list = None, **kwargs): |
| | """Update current trace with metadata""" |
| | try: |
| | langfuse = get_client() |
| | update_args = {} |
| | |
| | if user_id: |
| | update_args["user_id"] = user_id |
| | if session_id: |
| | update_args["session_id"] = session_id |
| | if tags: |
| | update_args["tags"] = tags |
| | if kwargs: |
| | update_args.update(kwargs) |
| | |
| | langfuse.update_current_trace(**update_args) |
| | except Exception as e: |
| | print(f"Warning: Could not update trace metadata: {e}") |
| |
|
| |
|
| | def flush_langfuse(): |
| | """Flush Langfuse events (for short-lived applications)""" |
| | try: |
| | langfuse = get_client() |
| | langfuse.flush() |
| | except Exception as e: |
| | print(f"Warning: Could not flush Langfuse events: {e}") |
| |
|
| |
|
| | def shutdown_langfuse(): |
| | """Shutdown Langfuse client (for application cleanup)""" |
| | try: |
| | langfuse = get_client() |
| | langfuse.shutdown() |
| | except Exception as e: |
| | print(f"Warning: Could not shutdown Langfuse client: {e}") |
| |
|
| |
|
| | |
| | initialize_langfuse() |