Final_Assignment_Template / langfuse_tracking.py
hemantvirmani's picture
Fixing langfuse issues
38bdec7
"""Langfuse tracking integration for GAIA Benchmark Agent.
This module provides decorators and context managers for tracking:
- Agent execution sessions
- LLM invocations
- Tool calls
- Question processing
"""
import os
import functools
import time
from typing import Any, Optional, Dict
from contextlib import contextmanager
# Langfuse will be imported conditionally
langfuse = None
try:
from langfuse import Langfuse, observe
LANGFUSE_AVAILABLE = True
except ImportError:
LANGFUSE_AVAILABLE = False
print("[INFO] Langfuse not installed. Tracking is disabled. Install with: pip install langfuse")
class LangfuseTracker:
"""Singleton class to manage Langfuse client and tracking state."""
_instance = None
_client = None
_enabled = False
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
"""Initialize Langfuse client if not already initialized."""
if self._client is None and LANGFUSE_AVAILABLE:
public_key = os.getenv("LANGFUSE_PUBLIC_KEY")
secret_key = os.getenv("LANGFUSE_SECRET_KEY")
host = os.getenv("LANGFUSE_HOST", "https://us.cloud.langfuse.com")
if public_key and secret_key:
self._client = Langfuse(
public_key=public_key,
secret_key=secret_key,
host=host
)
self._enabled = True
print(f"[LANGFUSE] Tracking enabled (host: {host})")
else:
print("[LANGFUSE] Tracking disabled. Set LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY to enable.")
@property
def enabled(self) -> bool:
"""Check if Langfuse tracking is enabled."""
return self._enabled and LANGFUSE_AVAILABLE
@property
def client(self):
"""Get Langfuse client instance."""
return self._client if self.enabled else None
# Global tracker instance
tracker = LangfuseTracker()
def track_agent_execution(agent_type: str):
"""Decorator to track agent execution lifecycle.
Args:
agent_type: Type of agent (e.g., "LangGraph", "ReAct", "LlamaIndex")
Usage:
@track_agent_execution("LangGraph")
def __call__(self, question: str, file_name: str = None) -> str:
...
"""
def decorator(func):
if not tracker.enabled:
return func
@observe(name=f"{agent_type}_Agent_Execution")
@functools.wraps(func)
def wrapper(self, question: str, file_name: str = None, *args, **kwargs):
# Add metadata to current observation
if tracker.client:
tracker.client.update_current_span(
metadata={
"agent_type": agent_type,
"has_file": file_name is not None,
"file_name": file_name or "none",
"question_length": len(question)
},
input={"question": question[:500], "file_name": file_name} # Limit question length
)
start_time = time.time()
try:
result = func(self, question, file_name, *args, **kwargs)
# Update with output and success metrics
if tracker.client:
tracker.client.update_current_span(
output={"answer": str(result)[:500]}, # Limit answer length
metadata={
"execution_time_seconds": time.time() - start_time,
"success": not result.startswith("Error:")
}
)
return result
except Exception as e:
# Track errors
if tracker.client:
tracker.client.update_current_span(
level="ERROR",
status_message=str(e),
metadata={
"execution_time_seconds": time.time() - start_time,
"error": str(e)
}
)
raise
return wrapper
return decorator
def track_llm_call(model_name: str):
"""Decorator to track LLM invocations.
Args:
model_name: Name of the LLM model being called
Usage:
@track_llm_call("gemini-1.5-flash")
def _assistant(self, state):
response = self.llm_client_with_tools.invoke(state["messages"])
...
"""
def decorator(func):
if not tracker.enabled:
return func
@observe(as_type="generation", name=f"LLM_Call_{model_name}")
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
# Update generation with model info
if tracker.client:
tracker.client.update_current_generation(
model=model_name,
metadata={
"latency_seconds": time.time() - start_time,
}
)
return result
except Exception as e:
if tracker.client:
tracker.client.update_current_generation(
level="ERROR",
status_message=str(e),
metadata={
"latency_seconds": time.time() - start_time,
"error": str(e)
}
)
raise
return wrapper
return decorator
def track_tool_call(tool_name: str):
"""Decorator to track tool/function calls.
Args:
tool_name: Name of the tool being called
Usage:
@track_tool_call("websearch")
def websearch(query: str, num_results: int = 5):
...
"""
def decorator(func):
if not tracker.enabled:
return func
@observe(name=f"Tool_{tool_name}")
@functools.wraps(func)
def wrapper(*args, **kwargs):
# Capture input parameters
if tracker.client:
tracker.client.update_current_span(
input={
"tool": tool_name,
"args": args[:3] if args else [], # Limit args
"kwargs": {k: str(v)[:100] for k, v in list(kwargs.items())[:5]} # Limit kwargs
},
metadata={"tool_name": tool_name}
)
start_time = time.time()
try:
result = func(*args, **kwargs)
# Track output
result_str = str(result)
if tracker.client:
tracker.client.update_current_span(
output={
"result_preview": result_str[:500],
"result_length": len(result_str)
},
metadata={
"execution_time_seconds": time.time() - start_time,
"success": True
}
)
return result
except Exception as e:
if tracker.client:
tracker.client.update_current_span(
level="ERROR",
status_message=str(e),
metadata={
"execution_time_seconds": time.time() - start_time,
"error": str(e)
}
)
raise
return wrapper
return decorator
@contextmanager
def track_session(session_name: str, metadata: Optional[Dict[str, Any]] = None):
"""Context manager to track a complete session (batch processing).
Args:
session_name: Name of the session (e.g., "Test_Run", "Full_Submission")
metadata: Optional metadata dict
Usage:
with track_session("Test_Run", {"agent": "LangGraph", "questions": 20}):
# Run agent on questions
...
"""
if not tracker.enabled:
yield
return
# Use start_as_current_span to create a root span/trace for the session
with tracker.client.start_as_current_span(
name=session_name,
metadata=metadata or {}
) as span:
try:
yield span
finally:
# Flush to ensure data is sent
if tracker.client:
tracker.client.flush()
@contextmanager
def track_question_processing(task_id: str, question: str):
"""Context manager to track individual question processing.
Args:
task_id: Unique task identifier
question: Question text
Usage:
with track_question_processing(task_id, question_text) as span:
answer = agent(question_text)
span.update(output={"answer": answer})
"""
if not tracker.enabled:
yield None
return
with tracker.client.start_as_current_span(
name=f"Question_{task_id[:8]}",
input={"task_id": task_id, "question": question[:300]},
metadata={"task_id": task_id}
) as span:
yield span
# Convenience function for manual span creation
def create_span(name: str, input_data: Optional[Dict] = None, metadata: Optional[Dict] = None):
"""Create a manual span for tracking custom operations.
Args:
name: Span name
input_data: Input data dict
metadata: Metadata dict
Returns:
Span context manager or None if tracking disabled
"""
if not tracker.enabled:
return None
return tracker.client.start_span(
name=name,
input=input_data,
metadata=metadata
)