""" Service for handling connections to external platforms like Langfuse """ import os import logging import uuid from typing import Dict, Any, Optional from datetime import datetime logger = logging.getLogger("agent_monitoring_server.services.platform.connection") class ConnectionService: """Service for connecting to external platforms""" @staticmethod def connect_platform(platform_name: str, api_key: str, secret_key: Optional[str] = None, host: Optional[str] = None) -> Dict[str, Any]: """Connect to an external platform like Langfuse""" try: if platform_name.lower() == 'langfuse': # Validate the credentials by attempting to connect to Langfuse from langfuse import Langfuse # Use default host if not provided if not host: host = "https://cloud.langfuse.com" # Initialize Langfuse client with provided credentials client = Langfuse( secret_key=secret_key, public_key=api_key, host=host ) # Try to fetch a trace to verify connection try: # Just fetch a single trace to verify connection traces_response = client.fetch_traces(limit=1) # If we get here, connection was successful # Store credentials securely (should be in a database in production) from dotenv import load_dotenv, set_key # Update .env file with the new credentials dotenv_path = os.path.join(os.getcwd(), '.env') load_dotenv(dotenv_path) set_key(dotenv_path, "LANGFUSE_PUBLIC_KEY", api_key) set_key(dotenv_path, "LANGFUSE_SECRET_KEY", secret_key) set_key(dotenv_path, "LANGFUSE_HOST", host) # Set the credentials in current environment os.environ["LANGFUSE_PUBLIC_KEY"] = api_key os.environ["LANGFUSE_SECRET_KEY"] = secret_key os.environ["LANGFUSE_HOST"] = host # Create basic auth string import base64 auth_string = base64.b64encode(f"{api_key}:{secret_key}".encode()).decode() os.environ["LANGFUSE_AUTH"] = auth_string return { "status": "success", "platform": platform_name, "message": f"Successfully connected to {platform_name}", "connection_id": str(uuid.uuid4()), "timestamp": datetime.now().isoformat() } except Exception as e: logger.error(f"Error validating Langfuse credentials: {str(e)}") return { "status": "error", "platform": platform_name, "message": f"Failed to connect to {platform_name}: {str(e)}", "error": str(e) } else: # Handle other platforms return { "status": "error", "platform": platform_name, "message": f"Connection to platform '{platform_name}' not implemented yet", "timestamp": datetime.now().isoformat() } except Exception as e: logger.error(f"Error connecting to platform: {str(e)}") return { "status": "error", "platform": platform_name, "message": f"Error connecting to {platform_name}: {str(e)}", "error": str(e) }