| """Langfuse tracking integration for GAIA Benchmark Agent. |
| |
| This module provides decorators and context managers for tracking: |
| - Agent execution sessions |
| - LLM invocations |
| - Tool calls |
| - Question processing |
| """ |
|
|
| import os |
| import functools |
| import time |
| from typing import Any, Optional, Dict |
| from contextlib import contextmanager |
|
|
| |
| langfuse = None |
|
|
| try: |
| from langfuse import Langfuse, observe |
| LANGFUSE_AVAILABLE = True |
| except ImportError: |
| LANGFUSE_AVAILABLE = False |
| print("[INFO] Langfuse not installed. Tracking is disabled. Install with: pip install langfuse") |
|
|
|
|
| class LangfuseTracker: |
| """Singleton class to manage Langfuse client and tracking state.""" |
|
|
| _instance = None |
| _client = None |
| _enabled = False |
|
|
| def __new__(cls): |
| if cls._instance is None: |
| cls._instance = super().__new__(cls) |
| return cls._instance |
|
|
| def __init__(self): |
| """Initialize Langfuse client if not already initialized.""" |
| if self._client is None and LANGFUSE_AVAILABLE: |
| public_key = os.getenv("LANGFUSE_PUBLIC_KEY") |
| secret_key = os.getenv("LANGFUSE_SECRET_KEY") |
| host = os.getenv("LANGFUSE_HOST", "https://us.cloud.langfuse.com") |
|
|
| if public_key and secret_key: |
| self._client = Langfuse( |
| public_key=public_key, |
| secret_key=secret_key, |
| host=host |
| ) |
| self._enabled = True |
| print(f"[LANGFUSE] Tracking enabled (host: {host})") |
| else: |
| print("[LANGFUSE] Tracking disabled. Set LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY to enable.") |
|
|
| @property |
| def enabled(self) -> bool: |
| """Check if Langfuse tracking is enabled.""" |
| return self._enabled and LANGFUSE_AVAILABLE |
|
|
| @property |
| def client(self): |
| """Get Langfuse client instance.""" |
| return self._client if self.enabled else None |
|
|
|
|
| |
| tracker = LangfuseTracker() |
|
|
|
|
| def track_agent_execution(agent_type: str): |
| """Decorator to track agent execution lifecycle. |
| |
| Args: |
| agent_type: Type of agent (e.g., "LangGraph", "ReAct", "LlamaIndex") |
| |
| Usage: |
| @track_agent_execution("LangGraph") |
| def __call__(self, question: str, file_name: str = None) -> str: |
| ... |
| """ |
| def decorator(func): |
| if not tracker.enabled: |
| return func |
|
|
| @observe(name=f"{agent_type}_Agent_Execution") |
| @functools.wraps(func) |
| def wrapper(self, question: str, file_name: str = None, *args, **kwargs): |
| |
| if tracker.client: |
| tracker.client.update_current_span( |
| metadata={ |
| "agent_type": agent_type, |
| "has_file": file_name is not None, |
| "file_name": file_name or "none", |
| "question_length": len(question) |
| }, |
| input={"question": question[:500], "file_name": file_name} |
| ) |
|
|
| start_time = time.time() |
| try: |
| result = func(self, question, file_name, *args, **kwargs) |
|
|
| |
| if tracker.client: |
| tracker.client.update_current_span( |
| output={"answer": str(result)[:500]}, |
| metadata={ |
| "execution_time_seconds": time.time() - start_time, |
| "success": not result.startswith("Error:") |
| } |
| ) |
| return result |
| except Exception as e: |
| |
| if tracker.client: |
| tracker.client.update_current_span( |
| level="ERROR", |
| status_message=str(e), |
| metadata={ |
| "execution_time_seconds": time.time() - start_time, |
| "error": str(e) |
| } |
| ) |
| raise |
|
|
| return wrapper |
| return decorator |
|
|
|
|
| def track_llm_call(model_name: str): |
| """Decorator to track LLM invocations. |
| |
| Args: |
| model_name: Name of the LLM model being called |
| |
| Usage: |
| @track_llm_call("gemini-1.5-flash") |
| def _assistant(self, state): |
| response = self.llm_client_with_tools.invoke(state["messages"]) |
| ... |
| """ |
| def decorator(func): |
| if not tracker.enabled: |
| return func |
|
|
| @observe(as_type="generation", name=f"LLM_Call_{model_name}") |
| @functools.wraps(func) |
| def wrapper(*args, **kwargs): |
| start_time = time.time() |
|
|
| try: |
| result = func(*args, **kwargs) |
|
|
| |
| if tracker.client: |
| tracker.client.update_current_generation( |
| model=model_name, |
| metadata={ |
| "latency_seconds": time.time() - start_time, |
| } |
| ) |
|
|
| return result |
| except Exception as e: |
| if tracker.client: |
| tracker.client.update_current_generation( |
| level="ERROR", |
| status_message=str(e), |
| metadata={ |
| "latency_seconds": time.time() - start_time, |
| "error": str(e) |
| } |
| ) |
| raise |
|
|
| return wrapper |
| return decorator |
|
|
|
|
| def track_tool_call(tool_name: str): |
| """Decorator to track tool/function calls. |
| |
| Args: |
| tool_name: Name of the tool being called |
| |
| Usage: |
| @track_tool_call("websearch") |
| def websearch(query: str, num_results: int = 5): |
| ... |
| """ |
| def decorator(func): |
| if not tracker.enabled: |
| return func |
|
|
| @observe(name=f"Tool_{tool_name}") |
| @functools.wraps(func) |
| def wrapper(*args, **kwargs): |
| |
| if tracker.client: |
| tracker.client.update_current_span( |
| input={ |
| "tool": tool_name, |
| "args": args[:3] if args else [], |
| "kwargs": {k: str(v)[:100] for k, v in list(kwargs.items())[:5]} |
| }, |
| metadata={"tool_name": tool_name} |
| ) |
|
|
| start_time = time.time() |
| try: |
| result = func(*args, **kwargs) |
|
|
| |
| result_str = str(result) |
| if tracker.client: |
| tracker.client.update_current_span( |
| output={ |
| "result_preview": result_str[:500], |
| "result_length": len(result_str) |
| }, |
| metadata={ |
| "execution_time_seconds": time.time() - start_time, |
| "success": True |
| } |
| ) |
|
|
| return result |
| except Exception as e: |
| if tracker.client: |
| tracker.client.update_current_span( |
| level="ERROR", |
| status_message=str(e), |
| metadata={ |
| "execution_time_seconds": time.time() - start_time, |
| "error": str(e) |
| } |
| ) |
| raise |
|
|
| return wrapper |
| return decorator |
|
|
|
|
| @contextmanager |
| def track_session(session_name: str, metadata: Optional[Dict[str, Any]] = None): |
| """Context manager to track a complete session (batch processing). |
| |
| Args: |
| session_name: Name of the session (e.g., "Test_Run", "Full_Submission") |
| metadata: Optional metadata dict |
| |
| Usage: |
| with track_session("Test_Run", {"agent": "LangGraph", "questions": 20}): |
| # Run agent on questions |
| ... |
| """ |
| if not tracker.enabled: |
| yield |
| return |
|
|
| |
| with tracker.client.start_as_current_span( |
| name=session_name, |
| metadata=metadata or {} |
| ) as span: |
| try: |
| yield span |
| finally: |
| |
| if tracker.client: |
| tracker.client.flush() |
|
|
|
|
| @contextmanager |
| def track_question_processing(task_id: str, question: str): |
| """Context manager to track individual question processing. |
| |
| Args: |
| task_id: Unique task identifier |
| question: Question text |
| |
| Usage: |
| with track_question_processing(task_id, question_text) as span: |
| answer = agent(question_text) |
| span.update(output={"answer": answer}) |
| """ |
| if not tracker.enabled: |
| yield None |
| return |
|
|
| with tracker.client.start_as_current_span( |
| name=f"Question_{task_id[:8]}", |
| input={"task_id": task_id, "question": question[:300]}, |
| metadata={"task_id": task_id} |
| ) as span: |
| yield span |
|
|
|
|
| |
| def create_span(name: str, input_data: Optional[Dict] = None, metadata: Optional[Dict] = None): |
| """Create a manual span for tracking custom operations. |
| |
| Args: |
| name: Span name |
| input_data: Input data dict |
| metadata: Metadata dict |
| |
| Returns: |
| Span context manager or None if tracking disabled |
| """ |
| if not tracker.enabled: |
| return None |
|
|
| return tracker.client.start_span( |
| name=name, |
| input=input_data, |
| metadata=metadata |
| ) |
|
|