Spaces:
Sleeping
Sleeping
| """ | |
| Service for handling connections to external platforms like Langfuse | |
| """ | |
| import os | |
| import logging | |
| import uuid | |
| from typing import Dict, Any, Optional | |
| from datetime import datetime | |
| logger = logging.getLogger("agent_monitoring_server.services.platform.connection") | |
| class ConnectionService: | |
| """Service for connecting to external platforms""" | |
| def connect_platform(platform_name: str, api_key: str, secret_key: Optional[str] = None, host: Optional[str] = None) -> Dict[str, Any]: | |
| """Connect to an external platform like Langfuse""" | |
| try: | |
| if platform_name.lower() == 'langfuse': | |
| # Validate the credentials by attempting to connect to Langfuse | |
| from langfuse import Langfuse | |
| # Use default host if not provided | |
| if not host: | |
| host = "https://cloud.langfuse.com" | |
| # Initialize Langfuse client with provided credentials | |
| client = Langfuse( | |
| secret_key=secret_key, | |
| public_key=api_key, | |
| host=host | |
| ) | |
| # Try to fetch a trace to verify connection | |
| try: | |
| # Just fetch a single trace to verify connection | |
| traces_response = client.fetch_traces(limit=1) | |
| # If we get here, connection was successful | |
| # Store credentials securely (should be in a database in production) | |
| from dotenv import load_dotenv, set_key | |
| # Update .env file with the new credentials | |
| dotenv_path = os.path.join(os.getcwd(), '.env') | |
| load_dotenv(dotenv_path) | |
| set_key(dotenv_path, "LANGFUSE_PUBLIC_KEY", api_key) | |
| set_key(dotenv_path, "LANGFUSE_SECRET_KEY", secret_key) | |
| set_key(dotenv_path, "LANGFUSE_HOST", host) | |
| # Set the credentials in current environment | |
| os.environ["LANGFUSE_PUBLIC_KEY"] = api_key | |
| os.environ["LANGFUSE_SECRET_KEY"] = secret_key | |
| os.environ["LANGFUSE_HOST"] = host | |
| # Create basic auth string | |
| import base64 | |
| auth_string = base64.b64encode(f"{api_key}:{secret_key}".encode()).decode() | |
| os.environ["LANGFUSE_AUTH"] = auth_string | |
| return { | |
| "status": "success", | |
| "platform": platform_name, | |
| "message": f"Successfully connected to {platform_name}", | |
| "connection_id": str(uuid.uuid4()), | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Error validating Langfuse credentials: {str(e)}") | |
| return { | |
| "status": "error", | |
| "platform": platform_name, | |
| "message": f"Failed to connect to {platform_name}: {str(e)}", | |
| "error": str(e) | |
| } | |
| else: | |
| # Handle other platforms | |
| return { | |
| "status": "error", | |
| "platform": platform_name, | |
| "message": f"Connection to platform '{platform_name}' not implemented yet", | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Error connecting to platform: {str(e)}") | |
| return { | |
| "status": "error", | |
| "platform": platform_name, | |
| "message": f"Error connecting to {platform_name}: {str(e)}", | |
| "error": str(e) | |
| } |