Spaces:
Sleeping
Sleeping
File size: 11,939 Bytes
c2ea5ed |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 |
"""
Service for handling trace operations with external platforms like Langfuse
"""
import os
import json
import time
import logging
import random
import uuid
from typing import Dict, List, Any, Optional
from datetime import datetime
logger = logging.getLogger("agent_monitoring_server.services.platform.trace")
class TraceService:
"""Service for trace operations with external platforms"""
@staticmethod
def get_trace_metadata(
limit: int = 20,
offset: int = 0,
start_date: Optional[str] = None,
end_date: Optional[str] = None
) -> Dict[str, Any]:
"""
Get metadata for traces without downloading full details
Uses retry logic for handling 500 errors from the server
"""
import time
import random
# Get credentials from environment variables
from utils.config import LANGFUSE_PUBLIC_KEY, LANGFUSE_SECRET_KEY, LANGFUSE_HOST
if not LANGFUSE_PUBLIC_KEY or not LANGFUSE_SECRET_KEY:
return {
"status": "error",
"message": "Langfuse credentials not found. Please connect to Langfuse first.",
"timestamp": datetime.now().isoformat()
}
# Enforce reasonable limits
if limit > 50:
logger.warning(f"Requested limit {limit} exceeds maximum of 50, using 50 instead")
limit = 50
elif limit <= 0:
logger.warning(f"Invalid limit {limit}, using default of 20")
limit = 20
# Convert offset to page (Langfuse uses page-based pagination, not offset)
# Page numbering starts at 1
page = (offset // limit) + 1
logger.info(f"Fetching trace metadata (limit={limit}, page={page})")
# Set up timestamp filters if provided
from_timestamp = None
to_timestamp = None
if start_date:
try:
from_timestamp = datetime.fromisoformat(start_date)
except ValueError:
logger.warning(f"Invalid start_date format: {start_date}")
if end_date:
try:
to_timestamp = datetime.fromisoformat(end_date)
except ValueError:
logger.warning(f"Invalid end_date format: {end_date}")
# Retry logic for API calls
max_retries = 3
retry_count = 0
while retry_count <= max_retries:
try:
# Initialize Langfuse client
from langfuse import Langfuse
client = Langfuse(
secret_key=LANGFUSE_SECRET_KEY,
public_key=LANGFUSE_PUBLIC_KEY,
host=LANGFUSE_HOST
)
# Fetch traces using page-based pagination
traces_response = client.fetch_traces(
limit=limit,
page=page,
from_timestamp=from_timestamp,
to_timestamp=to_timestamp
)
# Convert response to serializable format
from utils.fetch_langfuse_logs import convert_to_serializable
traces_data = []
if hasattr(traces_response, 'data'):
traces_data = convert_to_serializable(traces_response.data)
elif hasattr(traces_response, 'model_dump'):
traces_dict = convert_to_serializable(traces_response.model_dump())
if isinstance(traces_dict, dict) and 'data' in traces_dict:
traces_data = traces_dict['data']
else:
traces_data = [traces_dict]
# Extract only the metadata we need
trace_metadata = []
for trace in traces_data:
# Ensure trace is a dictionary
if not isinstance(trace, dict):
logger.warning(f"Skipping non-dictionary trace: {trace}")
continue
# Extract basic metadata fields with safe dict access
metadata = {
"id": trace.get("id", "unknown"),
"name": trace.get("name", "Unnamed Trace"),
"timestamp": trace.get("timestamp"),
"status": trace.get("status", "unknown"),
"level": trace.get("level"),
"metadata": trace.get("metadata", {})
}
# Include model information if available (with proper type checking)
if isinstance(trace.get("observations"), list) and trace["observations"]:
first_obs = trace["observations"][0]
if isinstance(first_obs, dict):
metadata["model"] = first_obs.get("model")
else:
metadata["model"] = None
else:
metadata["model"] = trace.get("model")
# Include user information if available
metadata["user"] = trace.get("userId")
metadata["session"] = trace.get("sessionId")
# Include any tags (with proper type checking)
if isinstance(trace.get("tags"), list):
metadata["tags"] = trace.get("tags", [])
else:
metadata["tags"] = []
trace_metadata.append(metadata)
# Success - break out of retry loop
break
except Exception as e:
error_message = str(e)
# Check if it's a 500 error from the server
if "500" in error_message or "Internal Server Error" in error_message or "Memory limit" in error_message:
retry_count += 1
if retry_count <= max_retries:
# Exponential backoff with jitter
wait_time = (2 ** retry_count) + random.uniform(0, 1)
logger.warning(f"Langfuse server error, retrying in {wait_time:.2f} seconds (attempt {retry_count}/{max_retries})")
time.sleep(wait_time)
else:
logger.error(f"Failed after {max_retries} retries: {error_message}")
return {
"status": "error",
"message": f"Langfuse server error after {max_retries} retries. The service may be experiencing high load.",
"timestamp": datetime.now().isoformat()
}
else:
# For non-500 errors, don't retry
logger.error(f"Error fetching trace metadata: {error_message}")
return {
"status": "error",
"message": f"Error fetching trace metadata: {error_message}",
"timestamp": datetime.now().isoformat()
}
# Check if we have trace data
if not 'trace_metadata' in locals() or not trace_metadata:
return {
"status": "error",
"message": "No trace data returned from Langfuse",
"timestamp": datetime.now().isoformat()
}
# Check if there are more traces available
# If we received fewer traces than requested, we've reached the end
has_more = len(traces_data) >= limit
return {
"status": "success",
"traces": trace_metadata,
"count": len(trace_metadata),
"timestamp": datetime.now().isoformat(),
"has_more": has_more
}
@staticmethod
def get_trace_by_id(trace_id: str) -> Dict[str, Any]:
"""
Get a specific trace by ID from Langfuse
"""
try:
# Get credentials from environment variables
from utils.config import LANGFUSE_PUBLIC_KEY, LANGFUSE_SECRET_KEY, LANGFUSE_HOST
if not LANGFUSE_PUBLIC_KEY or not LANGFUSE_SECRET_KEY:
return {
"status": "error",
"message": "Langfuse credentials not found. Please connect to Langfuse first.",
"timestamp": datetime.now().isoformat()
}
# Initialize Langfuse client
from langfuse import Langfuse
client = Langfuse(
secret_key=LANGFUSE_SECRET_KEY,
public_key=LANGFUSE_PUBLIC_KEY,
host=LANGFUSE_HOST
)
# Fetch trace with all details
logger.info(f"Fetching trace with ID: {trace_id}")
trace_response = client.fetch_trace(trace_id)
# Get observations for this trace
logger.info(f"Fetching observations for trace: {trace_id}")
observations_response = client.fetch_observations(trace_id=trace_id, limit=100)
# Convert response to serializable format
from utils.fetch_langfuse_logs import convert_to_serializable
trace_data = None
observations_data = []
# Convert trace data
try:
trace_data = convert_to_serializable(trace_response)
except Exception as e:
logger.error(f"Error converting trace response: {str(e)}")
return {
"status": "error",
"message": f"Error processing trace data: {str(e)}",
"timestamp": datetime.now().isoformat()
}
# Convert observations data
try:
if hasattr(observations_response, 'data'):
observations_data = convert_to_serializable(observations_response.data)
elif hasattr(observations_response, 'model_dump'):
observations_dict = convert_to_serializable(observations_response.model_dump())
if isinstance(observations_dict, dict) and 'data' in observations_dict:
observations_data = observations_dict['data']
except Exception as e:
logger.error(f"Error converting observations response: {str(e)}")
# Continue even if observations conversion fails
# Add observations to trace data
if trace_data:
trace_data["observations"] = observations_data
# Generate a trace URL
trace_url = f"{LANGFUSE_HOST}/project/unknown/traces/{trace_id}"
trace_data["trace_url"] = trace_url
return {
"status": "success",
"trace": trace_data,
"timestamp": datetime.now().isoformat()
}
else:
return {
"status": "error",
"message": f"Trace with ID {trace_id} not found",
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Error fetching trace by ID: {str(e)}")
return {
"status": "error",
"message": f"Error fetching trace by ID: {str(e)}",
"timestamp": datetime.now().isoformat()
} |