AgentGraph / backend /services /platform /connection_service.py
wu981526092's picture
🚀 Deploy AgentGraph: Complete agent monitoring and knowledge graph system
c2ea5ed
"""
Service for handling connections to external platforms like Langfuse
"""
import os
import logging
import uuid
from typing import Dict, Any, Optional
from datetime import datetime
logger = logging.getLogger("agent_monitoring_server.services.platform.connection")
class ConnectionService:
"""Service for connecting to external platforms"""
@staticmethod
def connect_platform(platform_name: str, api_key: str, secret_key: Optional[str] = None, host: Optional[str] = None) -> Dict[str, Any]:
"""Connect to an external platform like Langfuse"""
try:
if platform_name.lower() == 'langfuse':
# Validate the credentials by attempting to connect to Langfuse
from langfuse import Langfuse
# Use default host if not provided
if not host:
host = "https://cloud.langfuse.com"
# Initialize Langfuse client with provided credentials
client = Langfuse(
secret_key=secret_key,
public_key=api_key,
host=host
)
# Try to fetch a trace to verify connection
try:
# Just fetch a single trace to verify connection
traces_response = client.fetch_traces(limit=1)
# If we get here, connection was successful
# Store credentials securely (should be in a database in production)
from dotenv import load_dotenv, set_key
# Update .env file with the new credentials
dotenv_path = os.path.join(os.getcwd(), '.env')
load_dotenv(dotenv_path)
set_key(dotenv_path, "LANGFUSE_PUBLIC_KEY", api_key)
set_key(dotenv_path, "LANGFUSE_SECRET_KEY", secret_key)
set_key(dotenv_path, "LANGFUSE_HOST", host)
# Set the credentials in current environment
os.environ["LANGFUSE_PUBLIC_KEY"] = api_key
os.environ["LANGFUSE_SECRET_KEY"] = secret_key
os.environ["LANGFUSE_HOST"] = host
# Create basic auth string
import base64
auth_string = base64.b64encode(f"{api_key}:{secret_key}".encode()).decode()
os.environ["LANGFUSE_AUTH"] = auth_string
return {
"status": "success",
"platform": platform_name,
"message": f"Successfully connected to {platform_name}",
"connection_id": str(uuid.uuid4()),
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Error validating Langfuse credentials: {str(e)}")
return {
"status": "error",
"platform": platform_name,
"message": f"Failed to connect to {platform_name}: {str(e)}",
"error": str(e)
}
else:
# Handle other platforms
return {
"status": "error",
"platform": platform_name,
"message": f"Connection to platform '{platform_name}' not implemented yet",
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Error connecting to platform: {str(e)}")
return {
"status": "error",
"platform": platform_name,
"message": f"Error connecting to {platform_name}: {str(e)}",
"error": str(e)
}