"""Tracing and Observability Setup for Langfuse v3.0.0""" import os from typing import Optional from langfuse import Langfuse, get_client from langfuse.langchain import CallbackHandler def initialize_langfuse() -> None: """Initialize Langfuse client with proper configuration""" try: # Initialize Langfuse client Langfuse( public_key=os.environ.get("LANGFUSE_PUBLIC_KEY"), secret_key=os.environ.get("LANGFUSE_SECRET_KEY"), host=os.environ.get("LANGFUSE_HOST", "https://cloud.langfuse.com") ) print("Langfuse client initialized successfully") except Exception as e: print(f"Warning: Could not initialize Langfuse client: {e}") # Singleton for Langfuse CallbackHandler to ensure a single handler per request _CALLBACK_HANDLER: Optional[CallbackHandler] = None def get_langfuse_callback_handler() -> Optional[CallbackHandler]: """Get (or create) a singleton Langfuse callback handler for LangChain integration Best-practice (#2): Pass exactly **one** CallbackHandler into graph.invoke/stream so that every nested LLM/tool span is correlated underneath the same root span. Re-using the same instance avoids fragmenting traces when individual nodes try to create their own handler. """ global _CALLBACK_HANDLER # noqa: PLW0603 – module-level singleton is intentional try: initialize_langfuse() if _CALLBACK_HANDLER is None: _CALLBACK_HANDLER = CallbackHandler() return _CALLBACK_HANDLER except Exception as e: print(f"Warning: Could not create Langfuse callback handler: {e}") return None def trace_agent_execution(name: str, user_id: str | None = None, session_id: str | None = None): """Context manager that opens a **root** span for the current user request. Follows Langfuse best practices (rules #2 & #3): • exactly one root span per request • attach `user_id` and `session_id` so that follow-up calls are stitched together """ try: langfuse = get_client() span_kwargs = {"name": name} # Open the span as context manager so everything inside is automatically nested span_cm = langfuse.start_as_current_span(**span_kwargs) # Wrap the CM so that we can update the trace metadata *after* it was started class _TraceWrapper: def __enter__(self): # Enter the span self._span = span_cm.__enter__() # Immediately enrich it with session/user information try: langfuse.update_current_trace( **{k: v for k, v in {"user_id": user_id, "session_id": session_id}.items() if v} ) except Exception: # Ignore update failures – tracing must never break business logic pass return self._span def __exit__(self, exc_type, exc_val, exc_tb): return span_cm.__exit__(exc_type, exc_val, exc_tb) return _TraceWrapper() except Exception as e: print(f"Warning: Could not create trace span: {e}") # Gracefully degrade – return dummy context manager from contextlib import nullcontext return nullcontext() # type: ignore def update_trace_metadata(user_id: str = None, session_id: str = None, tags: list = None, **kwargs): """Update current trace with metadata""" try: langfuse = get_client() update_args = {} if user_id: update_args["user_id"] = user_id if session_id: update_args["session_id"] = session_id if tags: update_args["tags"] = tags if kwargs: update_args.update(kwargs) langfuse.update_current_trace(**update_args) except Exception as e: print(f"Warning: Could not update trace metadata: {e}") def flush_langfuse(): """Flush Langfuse events (for short-lived applications)""" try: langfuse = get_client() langfuse.flush() except Exception as e: print(f"Warning: Could not flush Langfuse events: {e}") def shutdown_langfuse(): """Shutdown Langfuse client (for application cleanup)""" try: langfuse = get_client() langfuse.shutdown() except Exception as e: print(f"Warning: Could not shutdown Langfuse client: {e}") # Initialize Langfuse on module import initialize_langfuse()