Spaces:
Running
Running
| """ | |
| Service for importing data from external platforms | |
| """ | |
| import os | |
| import json | |
| import time | |
| import logging | |
| import uuid | |
| from typing import Dict, List, Any, Optional | |
| from datetime import datetime | |
| logger = logging.getLogger("agent_monitoring_server.services.platform.import") | |
| class ImportService: | |
| """Service for importing data from connected platforms""" | |
| def import_platform_data( | |
| import_traces: bool = False, | |
| import_logs: bool = False, | |
| import_system_cards: bool = False, | |
| import_metrics: bool = False, | |
| import_limit: str = "1000", | |
| start_date: Optional[str] = None, | |
| end_date: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| """Import data from a connected platform""" | |
| try: | |
| # Generate unique import ID | |
| import_id = str(uuid.uuid4()) | |
| current_time = datetime.now().isoformat() | |
| # Initialize results | |
| imports = [] | |
| # Convert import_limit to integer | |
| try: | |
| if import_limit == "all": | |
| limit = 100 # More reasonable default for "all" | |
| else: | |
| # Parse the limit and enforce reasonable boundaries | |
| limit = int(import_limit) | |
| if limit > 500: | |
| logger.warning(f"Requested limit {limit} exceeds maximum of 500, using 500 instead") | |
| limit = 500 | |
| elif limit <= 0: | |
| logger.warning(f"Invalid limit {limit}, using default of 50") | |
| limit = 50 | |
| except ValueError: | |
| logger.warning(f"Invalid limit format: {import_limit}, using default of 50") | |
| limit = 50 | |
| # Get credentials from environment variables | |
| from utils.config import LANGFUSE_PUBLIC_KEY, LANGFUSE_SECRET_KEY, LANGFUSE_HOST | |
| if not LANGFUSE_PUBLIC_KEY or not LANGFUSE_SECRET_KEY: | |
| return { | |
| "status": "error", | |
| "message": "Langfuse credentials not found. Please connect to Langfuse first.", | |
| "timestamp": current_time | |
| } | |
| # Import traces if requested | |
| if import_traces: | |
| try: | |
| # Use the fetch_all_logs function to get traces | |
| from utils.fetch_langfuse_logs import fetch_all_logs | |
| # Create logs directory if it doesn't exist | |
| logs_dir = os.path.join(os.getcwd(), 'logs') | |
| os.makedirs(logs_dir, exist_ok=True) | |
| # Fetch traces and save to JSON file | |
| filepath, traces = fetch_all_logs( | |
| output_dir=logs_dir, | |
| limit=limit, | |
| fetch_details=True, | |
| retry_delay=1.0 | |
| ) | |
| # Add import record | |
| imports.append({ | |
| "id": str(uuid.uuid4()), | |
| "timestamp": current_time, | |
| "platform": "Langfuse", | |
| "data_type": "Traces", | |
| "record_count": len(traces), | |
| "status": "complete", | |
| "filepath": filepath, | |
| "details": { | |
| "trace_count": len(traces), | |
| "first_trace_id": traces[0].get("id", "unknown") if traces else "none" | |
| } | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error importing traces: {str(e)}") | |
| imports.append({ | |
| "id": str(uuid.uuid4()), | |
| "timestamp": current_time, | |
| "platform": "Langfuse", | |
| "data_type": "Traces", | |
| "record_count": 0, | |
| "status": "error", | |
| "error": str(e) | |
| }) | |
| # Import logs if requested (for Langfuse, logs are part of traces) | |
| if import_logs and not import_traces: | |
| # If we haven't already imported traces (which include logs) | |
| try: | |
| # Use the fetch_all_logs function with minimal details | |
| from utils.fetch_langfuse_logs import fetch_all_logs | |
| # Create logs directory if it doesn't exist | |
| logs_dir = os.path.join(os.getcwd(), 'logs') | |
| os.makedirs(logs_dir, exist_ok=True) | |
| # Fetch logs without all detailed observations | |
| filepath, logs = fetch_all_logs( | |
| output_dir=logs_dir, | |
| limit=limit, | |
| fetch_details=False, | |
| retry_delay=1.0 | |
| ) | |
| # Add import record | |
| imports.append({ | |
| "id": str(uuid.uuid4()), | |
| "timestamp": current_time, | |
| "platform": "Langfuse", | |
| "data_type": "Logs", | |
| "record_count": len(logs), | |
| "status": "complete", | |
| "filepath": filepath | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error importing logs: {str(e)}") | |
| imports.append({ | |
| "id": str(uuid.uuid4()), | |
| "timestamp": current_time, | |
| "platform": "Langfuse", | |
| "data_type": "Logs", | |
| "record_count": 0, | |
| "status": "error", | |
| "error": str(e) | |
| }) | |
| # For now, system cards and metrics are placeholders | |
| if import_system_cards: | |
| imports.append({ | |
| "id": str(uuid.uuid4()), | |
| "timestamp": current_time, | |
| "platform": "Langfuse", | |
| "data_type": "System Cards", | |
| "record_count": 0, | |
| "status": "not_implemented", | |
| "message": "System card import not yet implemented for Langfuse" | |
| }) | |
| if import_metrics: | |
| imports.append({ | |
| "id": str(uuid.uuid4()), | |
| "timestamp": current_time, | |
| "platform": "Langfuse", | |
| "data_type": "Performance Metrics", | |
| "record_count": 0, | |
| "status": "not_implemented", | |
| "message": "Metrics import not yet implemented for Langfuse" | |
| }) | |
| # Save import records | |
| for import_record in imports: | |
| ImportService._save_import_record(import_record) | |
| return { | |
| "status": "success", | |
| "import_id": import_id, | |
| "timestamp": current_time, | |
| "imports": imports, | |
| "message": "Data import completed successfully" | |
| } | |
| except Exception as e: | |
| logger.error(f"Error importing platform data: {str(e)}") | |
| return { | |
| "status": "error", | |
| "message": f"Error importing platform data: {str(e)}", | |
| "timestamp": datetime.now().isoformat(), | |
| "error": str(e) | |
| } | |
| def import_traces_by_id(trace_ids: List[str]) -> Dict[str, Any]: | |
| """Import selected traces from Langfuse directly by their IDs""" | |
| try: | |
| # Get credentials from environment variables | |
| from utils.config import LANGFUSE_PUBLIC_KEY, LANGFUSE_SECRET_KEY, LANGFUSE_HOST | |
| if not LANGFUSE_PUBLIC_KEY or not LANGFUSE_SECRET_KEY: | |
| return { | |
| "status": "error", | |
| "message": "Langfuse credentials not found. Please connect to Langfuse first.", | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| # Validate that trace_ids is provided | |
| if not trace_ids: | |
| return { | |
| "status": "error", | |
| "message": "No trace IDs provided to import", | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| # Initialize Langfuse client | |
| from langfuse import Langfuse | |
| client = Langfuse( | |
| secret_key=LANGFUSE_SECRET_KEY, | |
| public_key=LANGFUSE_PUBLIC_KEY, | |
| host=LANGFUSE_HOST | |
| ) | |
| # Create unique import ID and timestamp | |
| import_id = str(uuid.uuid4()) | |
| current_time = datetime.now().isoformat() | |
| # Create a directory for imported traces | |
| logs_dir = os.path.join(os.getcwd(), 'logs') | |
| os.makedirs(logs_dir, exist_ok=True) | |
| # Fetch each trace and its observations | |
| from utils.fetch_langfuse_logs import convert_to_serializable | |
| imported_traces = [] | |
| failed_traces = [] | |
| for trace_id in trace_ids: | |
| logger.info(f"Fetching trace: {trace_id}") | |
| try: | |
| # Fetch the trace | |
| trace_response = client.fetch_trace(trace_id) | |
| trace_data = convert_to_serializable(trace_response) | |
| # Fetch observations | |
| observations_response = client.fetch_observations(trace_id=trace_id, limit=100) | |
| observations_data = [] | |
| if hasattr(observations_response, 'data'): | |
| observations_data = convert_to_serializable(observations_response.data) | |
| elif hasattr(observations_response, 'model_dump'): | |
| observations_dict = convert_to_serializable(observations_response.model_dump()) | |
| if isinstance(observations_dict, dict) and 'data' in observations_dict: | |
| observations_data = observations_dict['data'] | |
| # Add observations to trace | |
| trace_data['observations'] = observations_data | |
| # Add to imported traces | |
| imported_traces.append(trace_data) | |
| # Add slight delay to avoid rate limits | |
| time.sleep(0.5) | |
| except Exception as e: | |
| logger.error(f"Error fetching trace {trace_id}: {str(e)}") | |
| failed_traces.append({"id": trace_id, "error": str(e)}) | |
| # If we have any imported traces, save them to a file | |
| if imported_traces: | |
| # Save to a file | |
| filepath = os.path.join(logs_dir, f"imported_traces_{import_id}.json") | |
| with open(filepath, 'w') as f: | |
| json.dump(imported_traces, f, indent=2) | |
| # Create import record | |
| import_record = { | |
| "id": import_id, | |
| "timestamp": current_time, | |
| "platform": "Langfuse", | |
| "data_type": "Selected Traces", | |
| "record_count": len(imported_traces), | |
| "status": "complete", | |
| "filepath": filepath, | |
| "details": { | |
| "successful_imports": len(imported_traces), | |
| "failed_imports": len(failed_traces), | |
| "trace_ids": trace_ids | |
| } | |
| } | |
| # Save import history | |
| ImportService._save_import_record(import_record) | |
| return { | |
| "status": "success", | |
| "import_id": import_id, | |
| "timestamp": current_time, | |
| "successful_count": len(imported_traces), | |
| "failed_count": len(failed_traces), | |
| "total_requested": len(trace_ids), | |
| "filepath": filepath, | |
| "failed_traces": failed_traces, | |
| "message": f"Successfully imported {len(imported_traces)} traces" | |
| } | |
| else: | |
| return { | |
| "status": "error", | |
| "message": "No traces were successfully imported", | |
| "timestamp": current_time, | |
| "failed_traces": failed_traces | |
| } | |
| except Exception as e: | |
| logger.error(f"Error importing traces by ID: {str(e)}") | |
| return { | |
| "status": "error", | |
| "message": f"Error importing traces by ID: {str(e)}", | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| def import_selected_traces(filepath: str, trace_ids: List[str]) -> Dict[str, Any]: | |
| """Import selected traces from a file""" | |
| try: | |
| # Check if the file exists | |
| if not os.path.exists(filepath): | |
| return { | |
| "status": "error", | |
| "message": f"Import file not found: {filepath}", | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| # Validate that trace_ids is provided | |
| if not trace_ids: | |
| return { | |
| "status": "error", | |
| "message": "No trace IDs provided to import", | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| # Load the file contents | |
| with open(filepath, 'r') as f: | |
| all_traces = json.load(f) | |
| # Filter to just the selected traces | |
| selected_traces = [] | |
| for trace in all_traces: | |
| trace_id = trace.get('id') | |
| if trace_id and trace_id in trace_ids: | |
| selected_traces.append(trace) | |
| if not selected_traces: | |
| return { | |
| "status": "error", | |
| "message": "No matching traces found in the import file", | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| # Create a new file with just the selected traces | |
| import_id = str(uuid.uuid4()) | |
| current_time = datetime.now().isoformat() | |
| # Create a directory for selected traces | |
| selected_dir = os.path.join(os.getcwd(), 'logs', 'selected') | |
| os.makedirs(selected_dir, exist_ok=True) | |
| # Save the selected traces | |
| selected_filepath = os.path.join(selected_dir, f"selected_traces_{import_id}.json") | |
| with open(selected_filepath, 'w') as f: | |
| json.dump(selected_traces, f, indent=2) | |
| # Record this import in the history | |
| import_record = { | |
| "id": import_id, | |
| "timestamp": current_time, | |
| "platform": "Langfuse", | |
| "data_type": "Selected Traces", | |
| "record_count": len(selected_traces), | |
| "status": "complete", | |
| "filepath": selected_filepath, | |
| "source_filepath": filepath, | |
| "selected_trace_ids": trace_ids | |
| } | |
| # Save import history | |
| ImportService._save_import_record(import_record) | |
| return { | |
| "status": "success", | |
| "import_id": import_id, | |
| "timestamp": current_time, | |
| "selected_count": len(selected_traces), | |
| "total_count": len(all_traces), | |
| "filepath": selected_filepath, | |
| "message": f"Successfully imported {len(selected_traces)} traces" | |
| } | |
| except Exception as e: | |
| logger.error(f"Error importing selected traces: {str(e)}") | |
| return { | |
| "status": "error", | |
| "message": f"Error importing selected traces: {str(e)}", | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| def get_recent_imports() -> Dict[str, Any]: | |
| """Get list of recent imports""" | |
| try: | |
| # Load import history | |
| imports = ImportService._load_import_history() | |
| return { | |
| "status": "success", | |
| "imports": imports, | |
| "count": len(imports), | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Error getting recent imports: {str(e)}") | |
| return { | |
| "status": "error", | |
| "message": f"Error getting recent imports: {str(e)}", | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| def _save_import_record(import_record: Dict[str, Any]) -> None: | |
| """Save an import record to the history file""" | |
| # Create the directory for import history | |
| history_dir = os.path.join(os.getcwd(), 'logs', 'history') | |
| os.makedirs(history_dir, exist_ok=True) | |
| # Path to the history file | |
| history_file = os.path.join(history_dir, "import_history.json") | |
| # Load existing history | |
| existing_imports = [] | |
| if os.path.exists(history_file): | |
| try: | |
| with open(history_file, 'r') as f: | |
| existing_imports = json.load(f) | |
| except Exception: | |
| # If there's an error reading the file, start with empty history | |
| existing_imports = [] | |
| # Add the new import to the history | |
| existing_imports.append(import_record) | |
| # Keep only the most recent 50 imports | |
| if len(existing_imports) > 50: | |
| existing_imports = sorted(existing_imports, key=lambda x: x.get('timestamp', ''), reverse=True)[:50] | |
| # Save the updated history | |
| with open(history_file, 'w') as f: | |
| json.dump(existing_imports, f, indent=2) | |
| def _load_import_history() -> List[Dict[str, Any]]: | |
| """Load the import history from the history file""" | |
| # Path to the history file | |
| history_file = os.path.join(os.getcwd(), 'logs', 'history', "import_history.json") | |
| # Check if the file exists | |
| if not os.path.exists(history_file): | |
| return [] | |
| # Load the history | |
| try: | |
| with open(history_file, 'r') as f: | |
| history = json.load(f) | |
| # Sort by timestamp, newest first | |
| history = sorted(history, key=lambda x: x.get('timestamp', ''), reverse=True) | |
| return history | |
| except Exception as e: | |
| logger.error(f"Error loading import history: {str(e)}") | |
| return [] |