Spaces:
Running
Running
| """ | |
| Service for handling trace operations with external platforms like Langfuse | |
| """ | |
| import os | |
| import json | |
| import time | |
| import logging | |
| import random | |
| import uuid | |
| from typing import Dict, List, Any, Optional | |
| from datetime import datetime | |
| logger = logging.getLogger("agent_monitoring_server.services.platform.trace") | |
| class TraceService: | |
| """Service for trace operations with external platforms""" | |
| def get_trace_metadata( | |
| limit: int = 20, | |
| offset: int = 0, | |
| start_date: Optional[str] = None, | |
| end_date: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Get metadata for traces without downloading full details | |
| Uses retry logic for handling 500 errors from the server | |
| """ | |
| import time | |
| import random | |
| # Get credentials from environment variables | |
| from utils.config import LANGFUSE_PUBLIC_KEY, LANGFUSE_SECRET_KEY, LANGFUSE_HOST | |
| if not LANGFUSE_PUBLIC_KEY or not LANGFUSE_SECRET_KEY: | |
| return { | |
| "status": "error", | |
| "message": "Langfuse credentials not found. Please connect to Langfuse first.", | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| # Enforce reasonable limits | |
| if limit > 50: | |
| logger.warning(f"Requested limit {limit} exceeds maximum of 50, using 50 instead") | |
| limit = 50 | |
| elif limit <= 0: | |
| logger.warning(f"Invalid limit {limit}, using default of 20") | |
| limit = 20 | |
| # Convert offset to page (Langfuse uses page-based pagination, not offset) | |
| # Page numbering starts at 1 | |
| page = (offset // limit) + 1 | |
| logger.info(f"Fetching trace metadata (limit={limit}, page={page})") | |
| # Set up timestamp filters if provided | |
| from_timestamp = None | |
| to_timestamp = None | |
| if start_date: | |
| try: | |
| from_timestamp = datetime.fromisoformat(start_date) | |
| except ValueError: | |
| logger.warning(f"Invalid start_date format: {start_date}") | |
| if end_date: | |
| try: | |
| to_timestamp = datetime.fromisoformat(end_date) | |
| except ValueError: | |
| logger.warning(f"Invalid end_date format: {end_date}") | |
| # Retry logic for API calls | |
| max_retries = 3 | |
| retry_count = 0 | |
| while retry_count <= max_retries: | |
| try: | |
| # Initialize Langfuse client | |
| from langfuse import Langfuse | |
| client = Langfuse( | |
| secret_key=LANGFUSE_SECRET_KEY, | |
| public_key=LANGFUSE_PUBLIC_KEY, | |
| host=LANGFUSE_HOST | |
| ) | |
| # Fetch traces using page-based pagination | |
| traces_response = client.fetch_traces( | |
| limit=limit, | |
| page=page, | |
| from_timestamp=from_timestamp, | |
| to_timestamp=to_timestamp | |
| ) | |
| # Convert response to serializable format | |
| from utils.fetch_langfuse_logs import convert_to_serializable | |
| traces_data = [] | |
| if hasattr(traces_response, 'data'): | |
| traces_data = convert_to_serializable(traces_response.data) | |
| elif hasattr(traces_response, 'model_dump'): | |
| traces_dict = convert_to_serializable(traces_response.model_dump()) | |
| if isinstance(traces_dict, dict) and 'data' in traces_dict: | |
| traces_data = traces_dict['data'] | |
| else: | |
| traces_data = [traces_dict] | |
| # Extract only the metadata we need | |
| trace_metadata = [] | |
| for trace in traces_data: | |
| # Ensure trace is a dictionary | |
| if not isinstance(trace, dict): | |
| logger.warning(f"Skipping non-dictionary trace: {trace}") | |
| continue | |
| # Extract basic metadata fields with safe dict access | |
| metadata = { | |
| "id": trace.get("id", "unknown"), | |
| "name": trace.get("name", "Unnamed Trace"), | |
| "timestamp": trace.get("timestamp"), | |
| "status": trace.get("status", "unknown"), | |
| "level": trace.get("level"), | |
| "metadata": trace.get("metadata", {}) | |
| } | |
| # Include model information if available (with proper type checking) | |
| if isinstance(trace.get("observations"), list) and trace["observations"]: | |
| first_obs = trace["observations"][0] | |
| if isinstance(first_obs, dict): | |
| metadata["model"] = first_obs.get("model") | |
| else: | |
| metadata["model"] = None | |
| else: | |
| metadata["model"] = trace.get("model") | |
| # Include user information if available | |
| metadata["user"] = trace.get("userId") | |
| metadata["session"] = trace.get("sessionId") | |
| # Include any tags (with proper type checking) | |
| if isinstance(trace.get("tags"), list): | |
| metadata["tags"] = trace.get("tags", []) | |
| else: | |
| metadata["tags"] = [] | |
| trace_metadata.append(metadata) | |
| # Success - break out of retry loop | |
| break | |
| except Exception as e: | |
| error_message = str(e) | |
| # Check if it's a 500 error from the server | |
| if "500" in error_message or "Internal Server Error" in error_message or "Memory limit" in error_message: | |
| retry_count += 1 | |
| if retry_count <= max_retries: | |
| # Exponential backoff with jitter | |
| wait_time = (2 ** retry_count) + random.uniform(0, 1) | |
| logger.warning(f"Langfuse server error, retrying in {wait_time:.2f} seconds (attempt {retry_count}/{max_retries})") | |
| time.sleep(wait_time) | |
| else: | |
| logger.error(f"Failed after {max_retries} retries: {error_message}") | |
| return { | |
| "status": "error", | |
| "message": f"Langfuse server error after {max_retries} retries. The service may be experiencing high load.", | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| else: | |
| # For non-500 errors, don't retry | |
| logger.error(f"Error fetching trace metadata: {error_message}") | |
| return { | |
| "status": "error", | |
| "message": f"Error fetching trace metadata: {error_message}", | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| # Check if we have trace data | |
| if not 'trace_metadata' in locals() or not trace_metadata: | |
| return { | |
| "status": "error", | |
| "message": "No trace data returned from Langfuse", | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| # Check if there are more traces available | |
| # If we received fewer traces than requested, we've reached the end | |
| has_more = len(traces_data) >= limit | |
| return { | |
| "status": "success", | |
| "traces": trace_metadata, | |
| "count": len(trace_metadata), | |
| "timestamp": datetime.now().isoformat(), | |
| "has_more": has_more | |
| } | |
| def get_trace_by_id(trace_id: str) -> Dict[str, Any]: | |
| """ | |
| Get a specific trace by ID from Langfuse | |
| """ | |
| try: | |
| # Get credentials from environment variables | |
| from utils.config import LANGFUSE_PUBLIC_KEY, LANGFUSE_SECRET_KEY, LANGFUSE_HOST | |
| if not LANGFUSE_PUBLIC_KEY or not LANGFUSE_SECRET_KEY: | |
| return { | |
| "status": "error", | |
| "message": "Langfuse credentials not found. Please connect to Langfuse first.", | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| # Initialize Langfuse client | |
| from langfuse import Langfuse | |
| client = Langfuse( | |
| secret_key=LANGFUSE_SECRET_KEY, | |
| public_key=LANGFUSE_PUBLIC_KEY, | |
| host=LANGFUSE_HOST | |
| ) | |
| # Fetch trace with all details | |
| logger.info(f"Fetching trace with ID: {trace_id}") | |
| trace_response = client.fetch_trace(trace_id) | |
| # Get observations for this trace | |
| logger.info(f"Fetching observations for trace: {trace_id}") | |
| observations_response = client.fetch_observations(trace_id=trace_id, limit=100) | |
| # Convert response to serializable format | |
| from utils.fetch_langfuse_logs import convert_to_serializable | |
| trace_data = None | |
| observations_data = [] | |
| # Convert trace data | |
| try: | |
| trace_data = convert_to_serializable(trace_response) | |
| except Exception as e: | |
| logger.error(f"Error converting trace response: {str(e)}") | |
| return { | |
| "status": "error", | |
| "message": f"Error processing trace data: {str(e)}", | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| # Convert observations data | |
| try: | |
| if hasattr(observations_response, 'data'): | |
| observations_data = convert_to_serializable(observations_response.data) | |
| elif hasattr(observations_response, 'model_dump'): | |
| observations_dict = convert_to_serializable(observations_response.model_dump()) | |
| if isinstance(observations_dict, dict) and 'data' in observations_dict: | |
| observations_data = observations_dict['data'] | |
| except Exception as e: | |
| logger.error(f"Error converting observations response: {str(e)}") | |
| # Continue even if observations conversion fails | |
| # Add observations to trace data | |
| if trace_data: | |
| trace_data["observations"] = observations_data | |
| # Generate a trace URL | |
| trace_url = f"{LANGFUSE_HOST}/project/unknown/traces/{trace_id}" | |
| trace_data["trace_url"] = trace_url | |
| return { | |
| "status": "success", | |
| "trace": trace_data, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| else: | |
| return { | |
| "status": "error", | |
| "message": f"Trace with ID {trace_id} not found", | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Error fetching trace by ID: {str(e)}") | |
| return { | |
| "status": "error", | |
| "message": f"Error fetching trace by ID: {str(e)}", | |
| "timestamp": datetime.now().isoformat() | |
| } |