""" Service for handling trace operations with external platforms like Langfuse """ import os import json import time import logging import random import uuid from typing import Dict, List, Any, Optional from datetime import datetime logger = logging.getLogger("agent_monitoring_server.services.platform.trace") class TraceService: """Service for trace operations with external platforms""" @staticmethod def get_trace_metadata( limit: int = 20, offset: int = 0, start_date: Optional[str] = None, end_date: Optional[str] = None ) -> Dict[str, Any]: """ Get metadata for traces without downloading full details Uses retry logic for handling 500 errors from the server """ import time import random # Get credentials from environment variables from utils.config import LANGFUSE_PUBLIC_KEY, LANGFUSE_SECRET_KEY, LANGFUSE_HOST if not LANGFUSE_PUBLIC_KEY or not LANGFUSE_SECRET_KEY: return { "status": "error", "message": "Langfuse credentials not found. Please connect to Langfuse first.", "timestamp": datetime.now().isoformat() } # Enforce reasonable limits if limit > 50: logger.warning(f"Requested limit {limit} exceeds maximum of 50, using 50 instead") limit = 50 elif limit <= 0: logger.warning(f"Invalid limit {limit}, using default of 20") limit = 20 # Convert offset to page (Langfuse uses page-based pagination, not offset) # Page numbering starts at 1 page = (offset // limit) + 1 logger.info(f"Fetching trace metadata (limit={limit}, page={page})") # Set up timestamp filters if provided from_timestamp = None to_timestamp = None if start_date: try: from_timestamp = datetime.fromisoformat(start_date) except ValueError: logger.warning(f"Invalid start_date format: {start_date}") if end_date: try: to_timestamp = datetime.fromisoformat(end_date) except ValueError: logger.warning(f"Invalid end_date format: {end_date}") # Retry logic for API calls max_retries = 3 retry_count = 0 while retry_count <= max_retries: try: # Initialize Langfuse client from langfuse import Langfuse client = Langfuse( secret_key=LANGFUSE_SECRET_KEY, public_key=LANGFUSE_PUBLIC_KEY, host=LANGFUSE_HOST ) # Fetch traces using page-based pagination traces_response = client.fetch_traces( limit=limit, page=page, from_timestamp=from_timestamp, to_timestamp=to_timestamp ) # Convert response to serializable format from utils.fetch_langfuse_logs import convert_to_serializable traces_data = [] if hasattr(traces_response, 'data'): traces_data = convert_to_serializable(traces_response.data) elif hasattr(traces_response, 'model_dump'): traces_dict = convert_to_serializable(traces_response.model_dump()) if isinstance(traces_dict, dict) and 'data' in traces_dict: traces_data = traces_dict['data'] else: traces_data = [traces_dict] # Extract only the metadata we need trace_metadata = [] for trace in traces_data: # Ensure trace is a dictionary if not isinstance(trace, dict): logger.warning(f"Skipping non-dictionary trace: {trace}") continue # Extract basic metadata fields with safe dict access metadata = { "id": trace.get("id", "unknown"), "name": trace.get("name", "Unnamed Trace"), "timestamp": trace.get("timestamp"), "status": trace.get("status", "unknown"), "level": trace.get("level"), "metadata": trace.get("metadata", {}) } # Include model information if available (with proper type checking) if isinstance(trace.get("observations"), list) and trace["observations"]: first_obs = trace["observations"][0] if isinstance(first_obs, dict): metadata["model"] = first_obs.get("model") else: metadata["model"] = None else: metadata["model"] = trace.get("model") # Include user information if available metadata["user"] = trace.get("userId") metadata["session"] = trace.get("sessionId") # Include any tags (with proper type checking) if isinstance(trace.get("tags"), list): metadata["tags"] = trace.get("tags", []) else: metadata["tags"] = [] trace_metadata.append(metadata) # Success - break out of retry loop break except Exception as e: error_message = str(e) # Check if it's a 500 error from the server if "500" in error_message or "Internal Server Error" in error_message or "Memory limit" in error_message: retry_count += 1 if retry_count <= max_retries: # Exponential backoff with jitter wait_time = (2 ** retry_count) + random.uniform(0, 1) logger.warning(f"Langfuse server error, retrying in {wait_time:.2f} seconds (attempt {retry_count}/{max_retries})") time.sleep(wait_time) else: logger.error(f"Failed after {max_retries} retries: {error_message}") return { "status": "error", "message": f"Langfuse server error after {max_retries} retries. The service may be experiencing high load.", "timestamp": datetime.now().isoformat() } else: # For non-500 errors, don't retry logger.error(f"Error fetching trace metadata: {error_message}") return { "status": "error", "message": f"Error fetching trace metadata: {error_message}", "timestamp": datetime.now().isoformat() } # Check if we have trace data if not 'trace_metadata' in locals() or not trace_metadata: return { "status": "error", "message": "No trace data returned from Langfuse", "timestamp": datetime.now().isoformat() } # Check if there are more traces available # If we received fewer traces than requested, we've reached the end has_more = len(traces_data) >= limit return { "status": "success", "traces": trace_metadata, "count": len(trace_metadata), "timestamp": datetime.now().isoformat(), "has_more": has_more } @staticmethod def get_trace_by_id(trace_id: str) -> Dict[str, Any]: """ Get a specific trace by ID from Langfuse """ try: # Get credentials from environment variables from utils.config import LANGFUSE_PUBLIC_KEY, LANGFUSE_SECRET_KEY, LANGFUSE_HOST if not LANGFUSE_PUBLIC_KEY or not LANGFUSE_SECRET_KEY: return { "status": "error", "message": "Langfuse credentials not found. Please connect to Langfuse first.", "timestamp": datetime.now().isoformat() } # Initialize Langfuse client from langfuse import Langfuse client = Langfuse( secret_key=LANGFUSE_SECRET_KEY, public_key=LANGFUSE_PUBLIC_KEY, host=LANGFUSE_HOST ) # Fetch trace with all details logger.info(f"Fetching trace with ID: {trace_id}") trace_response = client.fetch_trace(trace_id) # Get observations for this trace logger.info(f"Fetching observations for trace: {trace_id}") observations_response = client.fetch_observations(trace_id=trace_id, limit=100) # Convert response to serializable format from utils.fetch_langfuse_logs import convert_to_serializable trace_data = None observations_data = [] # Convert trace data try: trace_data = convert_to_serializable(trace_response) except Exception as e: logger.error(f"Error converting trace response: {str(e)}") return { "status": "error", "message": f"Error processing trace data: {str(e)}", "timestamp": datetime.now().isoformat() } # Convert observations data try: if hasattr(observations_response, 'data'): observations_data = convert_to_serializable(observations_response.data) elif hasattr(observations_response, 'model_dump'): observations_dict = convert_to_serializable(observations_response.model_dump()) if isinstance(observations_dict, dict) and 'data' in observations_dict: observations_data = observations_dict['data'] except Exception as e: logger.error(f"Error converting observations response: {str(e)}") # Continue even if observations conversion fails # Add observations to trace data if trace_data: trace_data["observations"] = observations_data # Generate a trace URL trace_url = f"{LANGFUSE_HOST}/project/unknown/traces/{trace_id}" trace_data["trace_url"] = trace_url return { "status": "success", "trace": trace_data, "timestamp": datetime.now().isoformat() } else: return { "status": "error", "message": f"Trace with ID {trace_id} not found", "timestamp": datetime.now().isoformat() } except Exception as e: logger.error(f"Error fetching trace by ID: {str(e)}") return { "status": "error", "message": f"Error fetching trace by ID: {str(e)}", "timestamp": datetime.now().isoformat() }