# API Reference Complete API reference for the MCP Orchestration Platform. ## Core Classes ### MCPOrchestrator The main orchestration engine that manages MCP servers, connections, and tool execution. ```python from orchestration_platform.mcp_orchestrator import MCPOrchestrator ``` #### Constructor ```python MCPOrchestrator( config: Optional[Dict[str, Any]] = None, secrets_manager: Optional[SecretsManager] = None, debug: bool = False ) ``` **Parameters:** - `config`: Optional configuration dictionary - `secrets_manager`: Optional secrets manager instance - `debug`: Enable debug mode for verbose logging #### Methods ##### async initialize() Initialize the orchestrator with configuration and setup internal components. ```python await orchestrator.initialize() ``` **Returns:** `None` **Raises:** - `ConfigurationError`: If configuration is invalid - `InitializationError`: If initialization fails ##### async add_server(name: str, url: str, config: Optional[Dict] = None) -> bool Register a new MCP server with the orchestrator. ```python success = await orchestrator.add_server( "weather-server", "http://localhost:8001/mcp", {"timeout": 30} ) ``` **Parameters:** - `name`: Unique server identifier - `url`: MCP server endpoint URL - `config`: Optional server-specific configuration **Returns:** `bool` - True if registration successful **Raises:** - `ServerRegistrationError`: If server registration fails - `DuplicateServerError`: If server name already exists ##### async remove_server(name: str) -> bool Remove a registered server from the orchestrator. ```python success = await orchestrator.remove_server("weather-server") ``` **Parameters:** - `name`: Server identifier to remove **Returns:** `bool` - True if removal successful ##### async call_tool(server: str, tool: str, arguments: Dict[str, Any]) -> Dict[str, Any] Execute a tool on a registered MCP server. ```python result = await orchestrator.call_tool( "weather-server", "get_current_weather", {"location": "New York"} ) ``` **Parameters:** - `server`: Server identifier - `tool`: Tool name to execute - `arguments`: Tool arguments as dictionary **Returns:** `Dict[str, Any]` - Tool execution result **Raises:** - `ServerNotFoundError`: If server doesn't exist - `ToolNotFoundError`: If tool doesn't exist - `ToolExecutionError`: If tool execution fails - `CircuitBreakerError`: If circuit breaker is open ##### async list_all_tools() -> Dict[str, List[Dict[str, Any]]] Get catalog of all available tools across registered servers. ```python tools = await orchestrator.list_all_tools() # Returns: {"weather-server": [...], "crm-server": [...]} ``` **Returns:** `Dict[str, List[Dict[str, Any]]]` - Tool catalog by server ##### async get_server_status(server: str) -> Dict[str, Any] Get status information for a registered server. ```python status = await orchestrator.get_server_status("weather-server") ``` **Parameters:** - `server`: Server identifier **Returns:** `Dict[str, Any]` - Server status information ##### async health_check() -> Dict[str, Any] Perform comprehensive health check of the orchestrator. ```python health = await orchestrator.health_check() ``` **Returns:** `Dict[str, Any]` - Health status information ##### async cleanup() Clean up resources and close connections. ```python await orchestrator.cleanup() ``` ### SecretsManager Enterprise-grade secrets management with multiple backend support. ```python from orchestration_platform.secrets_manager import SecretsManager ``` #### Constructor ```python SecretsManager( backend: str = "local", config: Optional[Dict[str, Any]] = None ) ``` **Parameters:** - `backend`: Backend type ("local", "vault", "aws", "environment") - `config`: Backend-specific configuration #### Methods ##### async initialize() Initialize the secrets manager with specified backend. ```python secrets = SecretsManager(backend="local") await secrets.initialize() ``` **Raises:** - `SecretsBackendError`: If backend initialization fails - `ConfigurationError`: If configuration is invalid ##### async get_secret(key: str) -> str Retrieve a secret value. ```python api_key = await secrets.get_secret("WEATHER_API_KEY") ``` **Parameters:** - `key`: Secret key identifier **Returns:** `str` - Secret value **Raises:** - `SecretNotFoundError`: If secret doesn't exist - `AccessDeniedError`: If access is denied ##### async set_secret(key: str, value: str) -> bool Store a secret value. ```python await secrets.set_secret("DATABASE_PASSWORD", "secure_password") ``` **Parameters:** - `key`: Secret key identifier - `value`: Secret value to store **Returns:** `bool` - True if storage successful ##### async delete_secret(key: str) -> bool Delete a secret value. ```python await secrets.delete_secret("OLD_API_KEY") ``` **Parameters:** - `key`: Secret key identifier **Returns:** `bool` - True if deletion successful ##### async list_secrets(pattern: Optional[str] = None) -> List[str] List available secret keys. ```python keys = await secrets.list_secrets(pattern="api_*") ``` **Parameters:** - `pattern`: Optional glob pattern to filter keys **Returns:** `List[str]` - List of secret key names ##### async rotate_secret(key: str) -> bool Rotate a secret value. ```python await secrets.rotate_secret("JWT_SECRET") ``` **Parameters:** - `key`: Secret key identifier **Returns:** `bool` - True if rotation successful ### SessionManager Secure session management for user isolation and rate limiting. ```python from orchestration_platform.mcp_orchestrator import SessionManager ``` #### Constructor ```python SessionManager( orchestrator: MCPOrchestrator, session_ttl: int = 3600, max_sessions: int = 1000 ) ``` **Parameters:** - `orchestrator`: Parent orchestrator instance - `session_ttl`: Session time-to-live in seconds - `max_sessions`: Maximum concurrent sessions #### Methods ##### async create_session(user_id: str, permissions: List[str] = None) -> str Create a new user session. ```python session_id = await session_manager.create_session( "user123", permissions=["read", "execute"] ) ``` **Parameters:** - `user_id`: Unique user identifier - `permissions`: List of user permissions **Returns:** `str` - Session identifier ##### async get_session(session_id: str) -> Optional[Dict[str, Any]] Retrieve session information. ```python session = await session_manager.get_session(session_id) ``` **Parameters:** - `session_id`: Session identifier **Returns:** `Optional[Dict[str, Any]]` - Session information ##### async revoke_session(session_id: str) -> bool Revoke an active session. ```python await session_manager.revoke_session(session_id) ``` **Parameters:** - `session_id`: Session identifier **Returns:** `bool` - True if revocation successful ### ConnectionPool Advanced connection management with circuit breaker patterns. ```python from orchestration_platform.mcp_orchestrator import ConnectionPool ``` #### Constructor ```python ConnectionPool( server_url: str, max_connections: int = 10, min_connections: int = 2, connection_timeout: int = 30, health_check_interval: int = 30 ) ``` **Parameters:** - `server_url`: MCP server endpoint URL - `max_connections`: Maximum pool size - `min_connections`: Minimum pool size - `connection_timeout`: Connection timeout in seconds - `health_check_interval`: Health check interval in seconds #### Methods ##### async acquire() -> Connection Acquire a connection from the pool. ```python connection = await pool.acquire() try: result = await connection.execute_request(request) finally: await pool.release(connection) ``` **Returns:** `Connection` - Managed connection instance ##### async release(connection: Connection) Release a connection back to the pool. ```python await pool.release(connection) ``` **Parameters:** - `connection`: Connection to release ##### async health_check() -> Dict[str, Any] Get connection pool health status. ```python health = await pool.health_check() ``` **Returns:** `Dict[str, Any]` - Pool health information ### ToolManager Dynamic tool discovery and introspection system. ```python from orchestration_platform.mcp_orchestrator import ToolManager ``` #### Constructor ```python ToolManager(orchestrator: MCPOrchestrator) ``` **Parameters:** - `orchestrator`: Parent orchestrator instance #### Methods ##### async discover_tools(server_name: str) -> List[Dict[str, Any]] Discover available tools for a server. ```python tools = await tool_manager.discover_tools("weather-server") ``` **Parameters:** - `server_name`: Server identifier **Returns:** `List[Dict[str, Any]]` - Discovered tools ##### async validate_tool_call(server: str, tool: str, arguments: Dict[str, Any]) -> bool Validate tool call arguments against schema. ```python valid = await tool_manager.validate_tool_call( "weather-server", "get_current_weather", {"location": "New York"} ) ``` **Parameters:** - `server`: Server identifier - `tool`: Tool name - `arguments`: Arguments to validate **Returns:** `bool` - True if validation passes ##### async get_tool_schema(server: str, tool: str) -> Optional[Dict[str, Any]] Get tool input/output schema. ```python schema = await tool_manager.get_tool_schema("weather-server", "get_current_weather") ``` **Parameters:** - `server`: Server identifier - `tool`: Tool name **Returns:** `Optional[Dict[str, Any]]` - Tool schema ### MultiLayerCache High-performance caching with multiple storage layers. ```python from orchestration_platform.mcp_orchestrator import MultiLayerCache ``` #### Constructor ```python MultiLayerCache( layers: List[CacheLayer], default_ttl: int = 3600, max_size: int = 10000 ) ``` **Parameters:** - `layers`: List of cache layer configurations - `default_ttl`: Default time-to-live in seconds - `max_size`: Maximum cache size #### Methods ##### async get(key: str) -> Optional[Any] Get value from cache. ```python value = await cache.get("weather:new_york") ``` **Parameters:** - `key`: Cache key **Returns:** `Optional[Any]` - Cached value ##### async set(key: str, value: Any, ttl: Optional[int] = None) -> bool Set value in cache. ```python await cache.set("weather:new_york", weather_data, ttl=1800) ``` **Parameters:** - `key`: Cache key - `value`: Value to cache - `ttl`: Optional time-to-live in seconds **Returns:** `bool` - True if caching successful ##### async delete(key: str) -> bool Delete value from cache. ```python await cache.delete("weather:new_york") ``` **Parameters:** - `key`: Cache key **Returns:** `bool` - True if deletion successful ##### async clear() -> bool Clear all cached values. ```python await cache.clear() ``` **Returns:** `bool` - True if clear successful ## Event System ### Event Types ```python from orchestration_platform.mcp_orchestrator import EventType # Available event types EventType.SERVER_CONNECTED EventType.SERVER_DISCONNECTED EventType.TOOL_CALLED EventType.TOOL_COMPLETED EventType.TOOL_FAILED EventType.CIRCUIT_BREAKER_TRIPPED EventType.CACHE_HIT EventType.CACHE_MISS EventType.SESSION_CREATED EventType.SESSION_EXPIRED ``` ### Event Handlers ```python from orchestration_platform.mcp_orchestrator import EventHandler async def handle_tool_event(event): print(f"Tool called: {event.tool} on {event.server}") handler = EventHandler(handle_tool_event) orchestrator.add_event_handler(EventType.TOOL_CALLED, handler) ``` ## Data Models ### ServerStatus ```python @dataclass class ServerStatus: name: str url: str status: str # "healthy", "degraded", "unhealthy" last_health_check: str connection_count: int error_rate: float average_response_time: float ``` ### ToolInfo ```python @dataclass class ToolInfo: name: str description: str input_schema: Dict[str, Any] output_schema: Optional[Dict[str, Any]] examples: List[Dict[str, Any]] tags: List[str] server_name: str ``` ### SessionInfo ```python @dataclass class SessionInfo: session_id: str user_id: str created_at: str expires_at: str permissions: List[str] last_activity: str request_count: int ``` ## Error Classes ### OrchestratorError Base exception for all orchestrator errors. ```python class OrchestratorError(Exception): def __init__(self, message: str, details: Optional[Dict] = None): self.message = message self.details = details or {} super().__init__(self.message) ``` ### ConfigurationError Raised when configuration is invalid. ```python class ConfigurationError(OrchestratorError): pass ``` ### ServerRegistrationError Raised when server registration fails. ```python class ServerRegistrationError(OrchestratorError): pass ``` ### ToolExecutionError Raised when tool execution fails. ```python class ToolExecutionError(OrchestratorError): def __init__(self, message: str, server: str, tool: str, details: Optional[Dict] = None): super().__init__(message, details) self.server = server self.tool = tool ``` ### CircuitBreakerError Raised when circuit breaker is open. ```python class CircuitBreakerError(OrchestratorError): def __init__(self, message: str, server: str): super().__init__(message) self.server = server ``` ## Configuration Options ### Orchestrator Configuration ```python config = { "orchestrator": { "host": "localhost", "port": 7860, "max_connections": 100, "connection_timeout": 30, "request_timeout": 60, "max_retries": 3, "retry_delay": 1.0, "circuit_breaker": { "failure_threshold": 5, "recovery_timeout": 60, "half_open_max_calls": 3 } }, "cache": { "layers": [ {"type": "memory", "max_size": 1000}, {"type": "redis", "url": "redis://localhost:6379"} ], "default_ttl": 3600, "enable_compression": True }, "session": { "ttl": 3600, "max_sessions": 1000, "cleanup_interval": 300 }, "monitoring": { "prometheus": { "enabled": True, "port": 9090 }, "logging": { "level": "INFO", "format": "json", "file": "/var/log/orchestrator.log" } } } ``` ## Usage Examples ### Basic Server Registration ```python import asyncio from orchestration_platform.mcp_orchestrator import MCPOrchestrator async def main(): orchestrator = MCPOrchestrator() await orchestrator.initialize() # Register servers await orchestrator.add_server("weather", "http://localhost:8001/mcp") await orchestrator.add_server("crm", "http://localhost:8002/mcp") # Call tools result = await orchestrator.call_tool("weather", "get_current_weather", { "location": "New York" }) print(result) await orchestrator.cleanup() asyncio.run(main()) ``` ### With Secrets Management ```python import asyncio from orchestration_platform.mcp_orchestrator import MCPOrchestrator, SecretsManager async def main(): # Setup secrets manager secrets = SecretsManager(backend="local") await secrets.initialize() await secrets.set_secret("WEATHER_API_KEY", "your-api-key") # Setup orchestrator with secrets orchestrator = MCPOrchestrator(secrets_manager=secrets) await orchestrator.initialize() # Use secret in server config await orchestrator.add_server("weather", "http://localhost:8001/mcp", { "auth": { "type": "api_key", "key": await secrets.get_secret("WEATHER_API_KEY") } }) await orchestrator.cleanup() asyncio.run(main()) ``` ### With Session Management ```python import asyncio from orchestration_platform.mcp_orchestrator import MCPOrchestrator, SessionManager async def main(): orchestrator = MCPOrchestrator() await orchestrator.initialize() session_manager = SessionManager(orchestrator) # Create session session_id = await session_manager.create_session("user123", ["read", "execute"]) # Use session for tool calls result = await orchestrator.call_tool("weather", "get_current_weather", { "location": "New York" }) await orchestrator.cleanup() asyncio.run(main()) ``` ### Event Handling ```python import asyncio from orchestration_platform.mcp_orchestrator import MCPOrchestrator, EventType, EventHandler async def handle_events(event): if event.type == EventType.TOOL_CALLED: print(f"Tool called: {event.tool} on {event.server}") elif event.type == EventType.CIRCUIT_BREAKER_TRIPPED: print(f"Circuit breaker tripped: {event.server}") async def main(): orchestrator = MCPOrchestrator() await orchestrator.initialize() # Add event handler handler = EventHandler(handle_events) orchestrator.add_event_handler(EventType.TOOL_CALLED, handler) orchestrator.add_event_handler(EventType.CIRCUIT_BREAKER_TRIPPED, handler) await orchestrator.cleanup() asyncio.run(main())