wu981526092's picture
🚀 Deploy AgentGraph: Complete agent monitoring and knowledge graph system
c2ea5ed
"""
Service for handling trace operations with external platforms like Langfuse
"""
import os
import json
import time
import logging
import random
import uuid
from typing import Dict, List, Any, Optional
from datetime import datetime
logger = logging.getLogger("agent_monitoring_server.services.platform.trace")
class TraceService:
"""Service for trace operations with external platforms"""
@staticmethod
def get_trace_metadata(
limit: int = 20,
offset: int = 0,
start_date: Optional[str] = None,
end_date: Optional[str] = None
) -> Dict[str, Any]:
"""
Get metadata for traces without downloading full details
Uses retry logic for handling 500 errors from the server
"""
import time
import random
# Get credentials from environment variables
from utils.config import LANGFUSE_PUBLIC_KEY, LANGFUSE_SECRET_KEY, LANGFUSE_HOST
if not LANGFUSE_PUBLIC_KEY or not LANGFUSE_SECRET_KEY:
return {
"status": "error",
"message": "Langfuse credentials not found. Please connect to Langfuse first.",
"timestamp": datetime.now().isoformat()
}
# Enforce reasonable limits
if limit > 50:
logger.warning(f"Requested limit {limit} exceeds maximum of 50, using 50 instead")
limit = 50
elif limit <= 0:
logger.warning(f"Invalid limit {limit}, using default of 20")
limit = 20
# Convert offset to page (Langfuse uses page-based pagination, not offset)
# Page numbering starts at 1
page = (offset // limit) + 1
logger.info(f"Fetching trace metadata (limit={limit}, page={page})")
# Set up timestamp filters if provided
from_timestamp = None
to_timestamp = None
if start_date:
try:
from_timestamp = datetime.fromisoformat(start_date)
except ValueError:
logger.warning(f"Invalid start_date format: {start_date}")
if end_date:
try:
to_timestamp = datetime.fromisoformat(end_date)
except ValueError:
logger.warning(f"Invalid end_date format: {end_date}")
# Retry logic for API calls
max_retries = 3
retry_count = 0
while retry_count <= max_retries:
try:
# Initialize Langfuse client
from langfuse import Langfuse
client = Langfuse(
secret_key=LANGFUSE_SECRET_KEY,
public_key=LANGFUSE_PUBLIC_KEY,
host=LANGFUSE_HOST
)
# Fetch traces using page-based pagination
traces_response = client.fetch_traces(
limit=limit,
page=page,
from_timestamp=from_timestamp,
to_timestamp=to_timestamp
)
# Convert response to serializable format
from utils.fetch_langfuse_logs import convert_to_serializable
traces_data = []
if hasattr(traces_response, 'data'):
traces_data = convert_to_serializable(traces_response.data)
elif hasattr(traces_response, 'model_dump'):
traces_dict = convert_to_serializable(traces_response.model_dump())
if isinstance(traces_dict, dict) and 'data' in traces_dict:
traces_data = traces_dict['data']
else:
traces_data = [traces_dict]
# Extract only the metadata we need
trace_metadata = []
for trace in traces_data:
# Ensure trace is a dictionary
if not isinstance(trace, dict):
logger.warning(f"Skipping non-dictionary trace: {trace}")
continue
# Extract basic metadata fields with safe dict access
metadata = {
"id": trace.get("id", "unknown"),
"name": trace.get("name", "Unnamed Trace"),
"timestamp": trace.get("timestamp"),
"status": trace.get("status", "unknown"),
"level": trace.get("level"),
"metadata": trace.get("metadata", {})
}
# Include model information if available (with proper type checking)
if isinstance(trace.get("observations"), list) and trace["observations"]:
first_obs = trace["observations"][0]
if isinstance(first_obs, dict):
metadata["model"] = first_obs.get("model")
else:
metadata["model"] = None
else:
metadata["model"] = trace.get("model")
# Include user information if available
metadata["user"] = trace.get("userId")
metadata["session"] = trace.get("sessionId")
# Include any tags (with proper type checking)
if isinstance(trace.get("tags"), list):
metadata["tags"] = trace.get("tags", [])
else:
metadata["tags"] = []
trace_metadata.append(metadata)
# Success - break out of retry loop
break
except Exception as e:
error_message = str(e)
# Check if it's a 500 error from the server
if "500" in error_message or "Internal Server Error" in error_message or "Memory limit" in error_message:
retry_count += 1
if retry_count <= max_retries:
# Exponential backoff with jitter
wait_time = (2 ** retry_count) + random.uniform(0, 1)
logger.warning(f"Langfuse server error, retrying in {wait_time:.2f} seconds (attempt {retry_count}/{max_retries})")
time.sleep(wait_time)
else:
logger.error(f"Failed after {max_retries} retries: {error_message}")
return {
"status": "error",
"message": f"Langfuse server error after {max_retries} retries. The service may be experiencing high load.",
"timestamp": datetime.now().isoformat()
}
else:
# For non-500 errors, don't retry
logger.error(f"Error fetching trace metadata: {error_message}")
return {
"status": "error",
"message": f"Error fetching trace metadata: {error_message}",
"timestamp": datetime.now().isoformat()
}
# Check if we have trace data
if not 'trace_metadata' in locals() or not trace_metadata:
return {
"status": "error",
"message": "No trace data returned from Langfuse",
"timestamp": datetime.now().isoformat()
}
# Check if there are more traces available
# If we received fewer traces than requested, we've reached the end
has_more = len(traces_data) >= limit
return {
"status": "success",
"traces": trace_metadata,
"count": len(trace_metadata),
"timestamp": datetime.now().isoformat(),
"has_more": has_more
}
@staticmethod
def get_trace_by_id(trace_id: str) -> Dict[str, Any]:
"""
Get a specific trace by ID from Langfuse
"""
try:
# Get credentials from environment variables
from utils.config import LANGFUSE_PUBLIC_KEY, LANGFUSE_SECRET_KEY, LANGFUSE_HOST
if not LANGFUSE_PUBLIC_KEY or not LANGFUSE_SECRET_KEY:
return {
"status": "error",
"message": "Langfuse credentials not found. Please connect to Langfuse first.",
"timestamp": datetime.now().isoformat()
}
# Initialize Langfuse client
from langfuse import Langfuse
client = Langfuse(
secret_key=LANGFUSE_SECRET_KEY,
public_key=LANGFUSE_PUBLIC_KEY,
host=LANGFUSE_HOST
)
# Fetch trace with all details
logger.info(f"Fetching trace with ID: {trace_id}")
trace_response = client.fetch_trace(trace_id)
# Get observations for this trace
logger.info(f"Fetching observations for trace: {trace_id}")
observations_response = client.fetch_observations(trace_id=trace_id, limit=100)
# Convert response to serializable format
from utils.fetch_langfuse_logs import convert_to_serializable
trace_data = None
observations_data = []
# Convert trace data
try:
trace_data = convert_to_serializable(trace_response)
except Exception as e:
logger.error(f"Error converting trace response: {str(e)}")
return {
"status": "error",
"message": f"Error processing trace data: {str(e)}",
"timestamp": datetime.now().isoformat()
}
# Convert observations data
try:
if hasattr(observations_response, 'data'):
observations_data = convert_to_serializable(observations_response.data)
elif hasattr(observations_response, 'model_dump'):
observations_dict = convert_to_serializable(observations_response.model_dump())
if isinstance(observations_dict, dict) and 'data' in observations_dict:
observations_data = observations_dict['data']
except Exception as e:
logger.error(f"Error converting observations response: {str(e)}")
# Continue even if observations conversion fails
# Add observations to trace data
if trace_data:
trace_data["observations"] = observations_data
# Generate a trace URL
trace_url = f"{LANGFUSE_HOST}/project/unknown/traces/{trace_id}"
trace_data["trace_url"] = trace_url
return {
"status": "success",
"trace": trace_data,
"timestamp": datetime.now().isoformat()
}
else:
return {
"status": "error",
"message": f"Trace with ID {trace_id} not found",
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Error fetching trace by ID: {str(e)}")
return {
"status": "error",
"message": f"Error fetching trace by ID: {str(e)}",
"timestamp": datetime.now().isoformat()
}