Spaces:
Sleeping
Sleeping
File size: 4,015 Bytes
c2ea5ed | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 | """
Service for handling connections to external platforms like Langfuse
"""
import os
import logging
import uuid
from typing import Dict, Any, Optional
from datetime import datetime
logger = logging.getLogger("agent_monitoring_server.services.platform.connection")
class ConnectionService:
"""Service for connecting to external platforms"""
@staticmethod
def connect_platform(platform_name: str, api_key: str, secret_key: Optional[str] = None, host: Optional[str] = None) -> Dict[str, Any]:
"""Connect to an external platform like Langfuse"""
try:
if platform_name.lower() == 'langfuse':
# Validate the credentials by attempting to connect to Langfuse
from langfuse import Langfuse
# Use default host if not provided
if not host:
host = "https://cloud.langfuse.com"
# Initialize Langfuse client with provided credentials
client = Langfuse(
secret_key=secret_key,
public_key=api_key,
host=host
)
# Try to fetch a trace to verify connection
try:
# Just fetch a single trace to verify connection
traces_response = client.fetch_traces(limit=1)
# If we get here, connection was successful
# Store credentials securely (should be in a database in production)
from dotenv import load_dotenv, set_key
# Update .env file with the new credentials
dotenv_path = os.path.join(os.getcwd(), '.env')
load_dotenv(dotenv_path)
set_key(dotenv_path, "LANGFUSE_PUBLIC_KEY", api_key)
set_key(dotenv_path, "LANGFUSE_SECRET_KEY", secret_key)
set_key(dotenv_path, "LANGFUSE_HOST", host)
# Set the credentials in current environment
os.environ["LANGFUSE_PUBLIC_KEY"] = api_key
os.environ["LANGFUSE_SECRET_KEY"] = secret_key
os.environ["LANGFUSE_HOST"] = host
# Create basic auth string
import base64
auth_string = base64.b64encode(f"{api_key}:{secret_key}".encode()).decode()
os.environ["LANGFUSE_AUTH"] = auth_string
return {
"status": "success",
"platform": platform_name,
"message": f"Successfully connected to {platform_name}",
"connection_id": str(uuid.uuid4()),
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Error validating Langfuse credentials: {str(e)}")
return {
"status": "error",
"platform": platform_name,
"message": f"Failed to connect to {platform_name}: {str(e)}",
"error": str(e)
}
else:
# Handle other platforms
return {
"status": "error",
"platform": platform_name,
"message": f"Connection to platform '{platform_name}' not implemented yet",
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Error connecting to platform: {str(e)}")
return {
"status": "error",
"platform": platform_name,
"message": f"Error connecting to {platform_name}: {str(e)}",
"error": str(e)
} |