| """Tracing and Observability Setup for Langfuse v3.0.0""" |
| import os |
| from typing import Optional |
| from langfuse import Langfuse, get_client |
| from langfuse.langchain import CallbackHandler |
|
|
|
|
| def initialize_langfuse() -> None: |
| """Initialize Langfuse client with proper configuration""" |
| try: |
| |
| Langfuse( |
| public_key=os.environ.get("LANGFUSE_PUBLIC_KEY"), |
| secret_key=os.environ.get("LANGFUSE_SECRET_KEY"), |
| host=os.environ.get("LANGFUSE_HOST", "https://cloud.langfuse.com") |
| ) |
| print("Langfuse client initialized successfully") |
| except Exception as e: |
| print(f"Warning: Could not initialize Langfuse client: {e}") |
|
|
|
|
| |
| _CALLBACK_HANDLER: Optional[CallbackHandler] = None |
|
|
|
|
| def get_langfuse_callback_handler() -> Optional[CallbackHandler]: |
| """Get (or create) a singleton Langfuse callback handler for LangChain integration |
| |
| Best-practice (#2): Pass exactly **one** CallbackHandler into graph.invoke/stream so that |
| every nested LLM/tool span is correlated underneath the same root span. Re-using the |
| same instance avoids fragmenting traces when individual nodes try to create their own |
| handler. |
| """ |
| global _CALLBACK_HANDLER |
|
|
| try: |
| initialize_langfuse() |
| if _CALLBACK_HANDLER is None: |
| _CALLBACK_HANDLER = CallbackHandler() |
| return _CALLBACK_HANDLER |
| except Exception as e: |
| print(f"Warning: Could not create Langfuse callback handler: {e}") |
| return None |
|
|
|
|
| def trace_agent_execution(name: str, user_id: str | None = None, session_id: str | None = None): |
| """Context manager that opens a **root** span for the current user request. |
| |
| Follows Langfuse best practices (rules #2 & #3): |
| • exactly one root span per request |
| • attach `user_id` and `session_id` so that follow-up calls are stitched together |
| """ |
| try: |
| langfuse = get_client() |
| span_kwargs = {"name": name} |
| |
| span_cm = langfuse.start_as_current_span(**span_kwargs) |
|
|
| |
| class _TraceWrapper: |
| def __enter__(self): |
| |
| self._span = span_cm.__enter__() |
| |
| try: |
| langfuse.update_current_trace( |
| **{k: v for k, v in {"user_id": user_id, "session_id": session_id}.items() if v} |
| ) |
| except Exception: |
| |
| pass |
| return self._span |
|
|
| def __exit__(self, exc_type, exc_val, exc_tb): |
| return span_cm.__exit__(exc_type, exc_val, exc_tb) |
|
|
| return _TraceWrapper() |
| except Exception as e: |
| print(f"Warning: Could not create trace span: {e}") |
| |
| from contextlib import nullcontext |
|
|
| return nullcontext() |
|
|
|
|
| def update_trace_metadata(user_id: str = None, session_id: str = None, tags: list = None, **kwargs): |
| """Update current trace with metadata""" |
| try: |
| langfuse = get_client() |
| update_args = {} |
| |
| if user_id: |
| update_args["user_id"] = user_id |
| if session_id: |
| update_args["session_id"] = session_id |
| if tags: |
| update_args["tags"] = tags |
| if kwargs: |
| update_args.update(kwargs) |
| |
| langfuse.update_current_trace(**update_args) |
| except Exception as e: |
| print(f"Warning: Could not update trace metadata: {e}") |
|
|
|
|
| def flush_langfuse(): |
| """Flush Langfuse events (for short-lived applications)""" |
| try: |
| langfuse = get_client() |
| langfuse.flush() |
| except Exception as e: |
| print(f"Warning: Could not flush Langfuse events: {e}") |
|
|
|
|
| def shutdown_langfuse(): |
| """Shutdown Langfuse client (for application cleanup)""" |
| try: |
| langfuse = get_client() |
| langfuse.shutdown() |
| except Exception as e: |
| print(f"Warning: Could not shutdown Langfuse client: {e}") |
|
|
|
|
| |
| initialize_langfuse() |