""" Service for importing data from external platforms """ import os import json import time import logging import uuid from typing import Dict, List, Any, Optional from datetime import datetime logger = logging.getLogger("agent_monitoring_server.services.platform.import") class ImportService: """Service for importing data from connected platforms""" @staticmethod def import_platform_data( import_traces: bool = False, import_logs: bool = False, import_system_cards: bool = False, import_metrics: bool = False, import_limit: str = "1000", start_date: Optional[str] = None, end_date: Optional[str] = None ) -> Dict[str, Any]: """Import data from a connected platform""" try: # Generate unique import ID import_id = str(uuid.uuid4()) current_time = datetime.now().isoformat() # Initialize results imports = [] # Convert import_limit to integer try: if import_limit == "all": limit = 100 # More reasonable default for "all" else: # Parse the limit and enforce reasonable boundaries limit = int(import_limit) if limit > 500: logger.warning(f"Requested limit {limit} exceeds maximum of 500, using 500 instead") limit = 500 elif limit <= 0: logger.warning(f"Invalid limit {limit}, using default of 50") limit = 50 except ValueError: logger.warning(f"Invalid limit format: {import_limit}, using default of 50") limit = 50 # Get credentials from environment variables from utils.config import LANGFUSE_PUBLIC_KEY, LANGFUSE_SECRET_KEY, LANGFUSE_HOST if not LANGFUSE_PUBLIC_KEY or not LANGFUSE_SECRET_KEY: return { "status": "error", "message": "Langfuse credentials not found. Please connect to Langfuse first.", "timestamp": current_time } # Import traces if requested if import_traces: try: # Use the fetch_all_logs function to get traces from utils.fetch_langfuse_logs import fetch_all_logs # Create logs directory if it doesn't exist logs_dir = os.path.join(os.getcwd(), 'logs') os.makedirs(logs_dir, exist_ok=True) # Fetch traces and save to JSON file filepath, traces = fetch_all_logs( output_dir=logs_dir, limit=limit, fetch_details=True, retry_delay=1.0 ) # Add import record imports.append({ "id": str(uuid.uuid4()), "timestamp": current_time, "platform": "Langfuse", "data_type": "Traces", "record_count": len(traces), "status": "complete", "filepath": filepath, "details": { "trace_count": len(traces), "first_trace_id": traces[0].get("id", "unknown") if traces else "none" } }) except Exception as e: logger.error(f"Error importing traces: {str(e)}") imports.append({ "id": str(uuid.uuid4()), "timestamp": current_time, "platform": "Langfuse", "data_type": "Traces", "record_count": 0, "status": "error", "error": str(e) }) # Import logs if requested (for Langfuse, logs are part of traces) if import_logs and not import_traces: # If we haven't already imported traces (which include logs) try: # Use the fetch_all_logs function with minimal details from utils.fetch_langfuse_logs import fetch_all_logs # Create logs directory if it doesn't exist logs_dir = os.path.join(os.getcwd(), 'logs') os.makedirs(logs_dir, exist_ok=True) # Fetch logs without all detailed observations filepath, logs = fetch_all_logs( output_dir=logs_dir, limit=limit, fetch_details=False, retry_delay=1.0 ) # Add import record imports.append({ "id": str(uuid.uuid4()), "timestamp": current_time, "platform": "Langfuse", "data_type": "Logs", "record_count": len(logs), "status": "complete", "filepath": filepath }) except Exception as e: logger.error(f"Error importing logs: {str(e)}") imports.append({ "id": str(uuid.uuid4()), "timestamp": current_time, "platform": "Langfuse", "data_type": "Logs", "record_count": 0, "status": "error", "error": str(e) }) # For now, system cards and metrics are placeholders if import_system_cards: imports.append({ "id": str(uuid.uuid4()), "timestamp": current_time, "platform": "Langfuse", "data_type": "System Cards", "record_count": 0, "status": "not_implemented", "message": "System card import not yet implemented for Langfuse" }) if import_metrics: imports.append({ "id": str(uuid.uuid4()), "timestamp": current_time, "platform": "Langfuse", "data_type": "Performance Metrics", "record_count": 0, "status": "not_implemented", "message": "Metrics import not yet implemented for Langfuse" }) # Save import records for import_record in imports: ImportService._save_import_record(import_record) return { "status": "success", "import_id": import_id, "timestamp": current_time, "imports": imports, "message": "Data import completed successfully" } except Exception as e: logger.error(f"Error importing platform data: {str(e)}") return { "status": "error", "message": f"Error importing platform data: {str(e)}", "timestamp": datetime.now().isoformat(), "error": str(e) } @staticmethod def import_traces_by_id(trace_ids: List[str]) -> Dict[str, Any]: """Import selected traces from Langfuse directly by their IDs""" try: # Get credentials from environment variables from utils.config import LANGFUSE_PUBLIC_KEY, LANGFUSE_SECRET_KEY, LANGFUSE_HOST if not LANGFUSE_PUBLIC_KEY or not LANGFUSE_SECRET_KEY: return { "status": "error", "message": "Langfuse credentials not found. Please connect to Langfuse first.", "timestamp": datetime.now().isoformat() } # Validate that trace_ids is provided if not trace_ids: return { "status": "error", "message": "No trace IDs provided to import", "timestamp": datetime.now().isoformat() } # Initialize Langfuse client from langfuse import Langfuse client = Langfuse( secret_key=LANGFUSE_SECRET_KEY, public_key=LANGFUSE_PUBLIC_KEY, host=LANGFUSE_HOST ) # Create unique import ID and timestamp import_id = str(uuid.uuid4()) current_time = datetime.now().isoformat() # Create a directory for imported traces logs_dir = os.path.join(os.getcwd(), 'logs') os.makedirs(logs_dir, exist_ok=True) # Fetch each trace and its observations from utils.fetch_langfuse_logs import convert_to_serializable imported_traces = [] failed_traces = [] for trace_id in trace_ids: logger.info(f"Fetching trace: {trace_id}") try: # Fetch the trace trace_response = client.fetch_trace(trace_id) trace_data = convert_to_serializable(trace_response) # Fetch observations observations_response = client.fetch_observations(trace_id=trace_id, limit=100) observations_data = [] if hasattr(observations_response, 'data'): observations_data = convert_to_serializable(observations_response.data) elif hasattr(observations_response, 'model_dump'): observations_dict = convert_to_serializable(observations_response.model_dump()) if isinstance(observations_dict, dict) and 'data' in observations_dict: observations_data = observations_dict['data'] # Add observations to trace trace_data['observations'] = observations_data # Add to imported traces imported_traces.append(trace_data) # Add slight delay to avoid rate limits time.sleep(0.5) except Exception as e: logger.error(f"Error fetching trace {trace_id}: {str(e)}") failed_traces.append({"id": trace_id, "error": str(e)}) # If we have any imported traces, save them to a file if imported_traces: # Save to a file filepath = os.path.join(logs_dir, f"imported_traces_{import_id}.json") with open(filepath, 'w') as f: json.dump(imported_traces, f, indent=2) # Create import record import_record = { "id": import_id, "timestamp": current_time, "platform": "Langfuse", "data_type": "Selected Traces", "record_count": len(imported_traces), "status": "complete", "filepath": filepath, "details": { "successful_imports": len(imported_traces), "failed_imports": len(failed_traces), "trace_ids": trace_ids } } # Save import history ImportService._save_import_record(import_record) return { "status": "success", "import_id": import_id, "timestamp": current_time, "successful_count": len(imported_traces), "failed_count": len(failed_traces), "total_requested": len(trace_ids), "filepath": filepath, "failed_traces": failed_traces, "message": f"Successfully imported {len(imported_traces)} traces" } else: return { "status": "error", "message": "No traces were successfully imported", "timestamp": current_time, "failed_traces": failed_traces } except Exception as e: logger.error(f"Error importing traces by ID: {str(e)}") return { "status": "error", "message": f"Error importing traces by ID: {str(e)}", "timestamp": datetime.now().isoformat() } @staticmethod def import_selected_traces(filepath: str, trace_ids: List[str]) -> Dict[str, Any]: """Import selected traces from a file""" try: # Check if the file exists if not os.path.exists(filepath): return { "status": "error", "message": f"Import file not found: {filepath}", "timestamp": datetime.now().isoformat() } # Validate that trace_ids is provided if not trace_ids: return { "status": "error", "message": "No trace IDs provided to import", "timestamp": datetime.now().isoformat() } # Load the file contents with open(filepath, 'r') as f: all_traces = json.load(f) # Filter to just the selected traces selected_traces = [] for trace in all_traces: trace_id = trace.get('id') if trace_id and trace_id in trace_ids: selected_traces.append(trace) if not selected_traces: return { "status": "error", "message": "No matching traces found in the import file", "timestamp": datetime.now().isoformat() } # Create a new file with just the selected traces import_id = str(uuid.uuid4()) current_time = datetime.now().isoformat() # Create a directory for selected traces selected_dir = os.path.join(os.getcwd(), 'logs', 'selected') os.makedirs(selected_dir, exist_ok=True) # Save the selected traces selected_filepath = os.path.join(selected_dir, f"selected_traces_{import_id}.json") with open(selected_filepath, 'w') as f: json.dump(selected_traces, f, indent=2) # Record this import in the history import_record = { "id": import_id, "timestamp": current_time, "platform": "Langfuse", "data_type": "Selected Traces", "record_count": len(selected_traces), "status": "complete", "filepath": selected_filepath, "source_filepath": filepath, "selected_trace_ids": trace_ids } # Save import history ImportService._save_import_record(import_record) return { "status": "success", "import_id": import_id, "timestamp": current_time, "selected_count": len(selected_traces), "total_count": len(all_traces), "filepath": selected_filepath, "message": f"Successfully imported {len(selected_traces)} traces" } except Exception as e: logger.error(f"Error importing selected traces: {str(e)}") return { "status": "error", "message": f"Error importing selected traces: {str(e)}", "timestamp": datetime.now().isoformat() } @staticmethod def get_recent_imports() -> Dict[str, Any]: """Get list of recent imports""" try: # Load import history imports = ImportService._load_import_history() return { "status": "success", "imports": imports, "count": len(imports), "timestamp": datetime.now().isoformat() } except Exception as e: logger.error(f"Error getting recent imports: {str(e)}") return { "status": "error", "message": f"Error getting recent imports: {str(e)}", "timestamp": datetime.now().isoformat() } @staticmethod def _save_import_record(import_record: Dict[str, Any]) -> None: """Save an import record to the history file""" # Create the directory for import history history_dir = os.path.join(os.getcwd(), 'logs', 'history') os.makedirs(history_dir, exist_ok=True) # Path to the history file history_file = os.path.join(history_dir, "import_history.json") # Load existing history existing_imports = [] if os.path.exists(history_file): try: with open(history_file, 'r') as f: existing_imports = json.load(f) except Exception: # If there's an error reading the file, start with empty history existing_imports = [] # Add the new import to the history existing_imports.append(import_record) # Keep only the most recent 50 imports if len(existing_imports) > 50: existing_imports = sorted(existing_imports, key=lambda x: x.get('timestamp', ''), reverse=True)[:50] # Save the updated history with open(history_file, 'w') as f: json.dump(existing_imports, f, indent=2) @staticmethod def _load_import_history() -> List[Dict[str, Any]]: """Load the import history from the history file""" # Path to the history file history_file = os.path.join(os.getcwd(), 'logs', 'history', "import_history.json") # Check if the file exists if not os.path.exists(history_file): return [] # Load the history try: with open(history_file, 'r') as f: history = json.load(f) # Sort by timestamp, newest first history = sorted(history, key=lambda x: x.get('timestamp', ''), reverse=True) return history except Exception as e: logger.error(f"Error loading import history: {str(e)}") return []