A newer version of the Gradio SDK is available:
6.1.0
API Reference
Complete API reference for the MCP Orchestration Platform.
Core Classes
MCPOrchestrator
The main orchestration engine that manages MCP servers, connections, and tool execution.
from orchestration_platform.mcp_orchestrator import MCPOrchestrator
Constructor
MCPOrchestrator(
config: Optional[Dict[str, Any]] = None,
secrets_manager: Optional[SecretsManager] = None,
debug: bool = False
)
Parameters:
config: Optional configuration dictionarysecrets_manager: Optional secrets manager instancedebug: Enable debug mode for verbose logging
Methods
async initialize()
Initialize the orchestrator with configuration and setup internal components.
await orchestrator.initialize()
Returns: None
Raises:
ConfigurationError: If configuration is invalidInitializationError: If initialization fails
async add_server(name: str, url: str, config: Optional[Dict] = None) -> bool
Register a new MCP server with the orchestrator.
success = await orchestrator.add_server(
"weather-server",
"http://localhost:8001/mcp",
{"timeout": 30}
)
Parameters:
name: Unique server identifierurl: MCP server endpoint URLconfig: Optional server-specific configuration
Returns: bool - True if registration successful
Raises:
ServerRegistrationError: If server registration failsDuplicateServerError: If server name already exists
async remove_server(name: str) -> bool
Remove a registered server from the orchestrator.
success = await orchestrator.remove_server("weather-server")
Parameters:
name: Server identifier to remove
Returns: bool - True if removal successful
async call_tool(server: str, tool: str, arguments: Dict[str, Any]) -> Dict[str, Any]
Execute a tool on a registered MCP server.
result = await orchestrator.call_tool(
"weather-server",
"get_current_weather",
{"location": "New York"}
)
Parameters:
server: Server identifiertool: Tool name to executearguments: Tool arguments as dictionary
Returns: Dict[str, Any] - Tool execution result
Raises:
ServerNotFoundError: If server doesn't existToolNotFoundError: If tool doesn't existToolExecutionError: If tool execution failsCircuitBreakerError: If circuit breaker is open
async list_all_tools() -> Dict[str, List[Dict[str, Any]]]
Get catalog of all available tools across registered servers.
tools = await orchestrator.list_all_tools()
# Returns: {"weather-server": [...], "crm-server": [...]}
Returns: Dict[str, List[Dict[str, Any]]] - Tool catalog by server
async get_server_status(server: str) -> Dict[str, Any]
Get status information for a registered server.
status = await orchestrator.get_server_status("weather-server")
Parameters:
server: Server identifier
Returns: Dict[str, Any] - Server status information
async health_check() -> Dict[str, Any]
Perform comprehensive health check of the orchestrator.
health = await orchestrator.health_check()
Returns: Dict[str, Any] - Health status information
async cleanup()
Clean up resources and close connections.
await orchestrator.cleanup()
SecretsManager
Enterprise-grade secrets management with multiple backend support.
from orchestration_platform.secrets_manager import SecretsManager
Constructor
SecretsManager(
backend: str = "local",
config: Optional[Dict[str, Any]] = None
)
Parameters:
backend: Backend type ("local", "vault", "aws", "environment")config: Backend-specific configuration
Methods
async initialize()
Initialize the secrets manager with specified backend.
secrets = SecretsManager(backend="local")
await secrets.initialize()
Raises:
SecretsBackendError: If backend initialization failsConfigurationError: If configuration is invalid
async get_secret(key: str) -> str
Retrieve a secret value.
api_key = await secrets.get_secret("WEATHER_API_KEY")
Parameters:
key: Secret key identifier
Returns: str - Secret value
Raises:
SecretNotFoundError: If secret doesn't existAccessDeniedError: If access is denied
async set_secret(key: str, value: str) -> bool
Store a secret value.
await secrets.set_secret("DATABASE_PASSWORD", "secure_password")
Parameters:
key: Secret key identifiervalue: Secret value to store
Returns: bool - True if storage successful
async delete_secret(key: str) -> bool
Delete a secret value.
await secrets.delete_secret("OLD_API_KEY")
Parameters:
key: Secret key identifier
Returns: bool - True if deletion successful
async list_secrets(pattern: Optional[str] = None) -> List[str]
List available secret keys.
keys = await secrets.list_secrets(pattern="api_*")
Parameters:
pattern: Optional glob pattern to filter keys
Returns: List[str] - List of secret key names
async rotate_secret(key: str) -> bool
Rotate a secret value.
await secrets.rotate_secret("JWT_SECRET")
Parameters:
key: Secret key identifier
Returns: bool - True if rotation successful
SessionManager
Secure session management for user isolation and rate limiting.
from orchestration_platform.mcp_orchestrator import SessionManager
Constructor
SessionManager(
orchestrator: MCPOrchestrator,
session_ttl: int = 3600,
max_sessions: int = 1000
)
Parameters:
orchestrator: Parent orchestrator instancesession_ttl: Session time-to-live in secondsmax_sessions: Maximum concurrent sessions
Methods
async create_session(user_id: str, permissions: List[str] = None) -> str
Create a new user session.
session_id = await session_manager.create_session(
"user123",
permissions=["read", "execute"]
)
Parameters:
user_id: Unique user identifierpermissions: List of user permissions
Returns: str - Session identifier
async get_session(session_id: str) -> Optional[Dict[str, Any]]
Retrieve session information.
session = await session_manager.get_session(session_id)
Parameters:
session_id: Session identifier
Returns: Optional[Dict[str, Any]] - Session information
async revoke_session(session_id: str) -> bool
Revoke an active session.
await session_manager.revoke_session(session_id)
Parameters:
session_id: Session identifier
Returns: bool - True if revocation successful
ConnectionPool
Advanced connection management with circuit breaker patterns.
from orchestration_platform.mcp_orchestrator import ConnectionPool
Constructor
ConnectionPool(
server_url: str,
max_connections: int = 10,
min_connections: int = 2,
connection_timeout: int = 30,
health_check_interval: int = 30
)
Parameters:
server_url: MCP server endpoint URLmax_connections: Maximum pool sizemin_connections: Minimum pool sizeconnection_timeout: Connection timeout in secondshealth_check_interval: Health check interval in seconds
Methods
async acquire() -> Connection
Acquire a connection from the pool.
connection = await pool.acquire()
try:
result = await connection.execute_request(request)
finally:
await pool.release(connection)
Returns: Connection - Managed connection instance
async release(connection: Connection)
Release a connection back to the pool.
await pool.release(connection)
Parameters:
connection: Connection to release
async health_check() -> Dict[str, Any]
Get connection pool health status.
health = await pool.health_check()
Returns: Dict[str, Any] - Pool health information
ToolManager
Dynamic tool discovery and introspection system.
from orchestration_platform.mcp_orchestrator import ToolManager
Constructor
ToolManager(orchestrator: MCPOrchestrator)
Parameters:
orchestrator: Parent orchestrator instance
Methods
async discover_tools(server_name: str) -> List[Dict[str, Any]]
Discover available tools for a server.
tools = await tool_manager.discover_tools("weather-server")
Parameters:
server_name: Server identifier
Returns: List[Dict[str, Any]] - Discovered tools
async validate_tool_call(server: str, tool: str, arguments: Dict[str, Any]) -> bool
Validate tool call arguments against schema.
valid = await tool_manager.validate_tool_call(
"weather-server",
"get_current_weather",
{"location": "New York"}
)
Parameters:
server: Server identifiertool: Tool namearguments: Arguments to validate
Returns: bool - True if validation passes
async get_tool_schema(server: str, tool: str) -> Optional[Dict[str, Any]]
Get tool input/output schema.
schema = await tool_manager.get_tool_schema("weather-server", "get_current_weather")
Parameters:
server: Server identifiertool: Tool name
Returns: Optional[Dict[str, Any]] - Tool schema
MultiLayerCache
High-performance caching with multiple storage layers.
from orchestration_platform.mcp_orchestrator import MultiLayerCache
Constructor
MultiLayerCache(
layers: List[CacheLayer],
default_ttl: int = 3600,
max_size: int = 10000
)
Parameters:
layers: List of cache layer configurationsdefault_ttl: Default time-to-live in secondsmax_size: Maximum cache size
Methods
async get(key: str) -> Optional[Any]
Get value from cache.
value = await cache.get("weather:new_york")
Parameters:
key: Cache key
Returns: Optional[Any] - Cached value
async set(key: str, value: Any, ttl: Optional[int] = None) -> bool
Set value in cache.
await cache.set("weather:new_york", weather_data, ttl=1800)
Parameters:
key: Cache keyvalue: Value to cachettl: Optional time-to-live in seconds
Returns: bool - True if caching successful
async delete(key: str) -> bool
Delete value from cache.
await cache.delete("weather:new_york")
Parameters:
key: Cache key
Returns: bool - True if deletion successful
async clear() -> bool
Clear all cached values.
await cache.clear()
Returns: bool - True if clear successful
Event System
Event Types
from orchestration_platform.mcp_orchestrator import EventType
# Available event types
EventType.SERVER_CONNECTED
EventType.SERVER_DISCONNECTED
EventType.TOOL_CALLED
EventType.TOOL_COMPLETED
EventType.TOOL_FAILED
EventType.CIRCUIT_BREAKER_TRIPPED
EventType.CACHE_HIT
EventType.CACHE_MISS
EventType.SESSION_CREATED
EventType.SESSION_EXPIRED
Event Handlers
from orchestration_platform.mcp_orchestrator import EventHandler
async def handle_tool_event(event):
print(f"Tool called: {event.tool} on {event.server}")
handler = EventHandler(handle_tool_event)
orchestrator.add_event_handler(EventType.TOOL_CALLED, handler)
Data Models
ServerStatus
@dataclass
class ServerStatus:
name: str
url: str
status: str # "healthy", "degraded", "unhealthy"
last_health_check: str
connection_count: int
error_rate: float
average_response_time: float
ToolInfo
@dataclass
class ToolInfo:
name: str
description: str
input_schema: Dict[str, Any]
output_schema: Optional[Dict[str, Any]]
examples: List[Dict[str, Any]]
tags: List[str]
server_name: str
SessionInfo
@dataclass
class SessionInfo:
session_id: str
user_id: str
created_at: str
expires_at: str
permissions: List[str]
last_activity: str
request_count: int
Error Classes
OrchestratorError
Base exception for all orchestrator errors.
class OrchestratorError(Exception):
def __init__(self, message: str, details: Optional[Dict] = None):
self.message = message
self.details = details or {}
super().__init__(self.message)
ConfigurationError
Raised when configuration is invalid.
class ConfigurationError(OrchestratorError):
pass
ServerRegistrationError
Raised when server registration fails.
class ServerRegistrationError(OrchestratorError):
pass
ToolExecutionError
Raised when tool execution fails.
class ToolExecutionError(OrchestratorError):
def __init__(self, message: str, server: str, tool: str, details: Optional[Dict] = None):
super().__init__(message, details)
self.server = server
self.tool = tool
CircuitBreakerError
Raised when circuit breaker is open.
class CircuitBreakerError(OrchestratorError):
def __init__(self, message: str, server: str):
super().__init__(message)
self.server = server
Configuration Options
Orchestrator Configuration
config = {
"orchestrator": {
"host": "localhost",
"port": 7860,
"max_connections": 100,
"connection_timeout": 30,
"request_timeout": 60,
"max_retries": 3,
"retry_delay": 1.0,
"circuit_breaker": {
"failure_threshold": 5,
"recovery_timeout": 60,
"half_open_max_calls": 3
}
},
"cache": {
"layers": [
{"type": "memory", "max_size": 1000},
{"type": "redis", "url": "redis://localhost:6379"}
],
"default_ttl": 3600,
"enable_compression": True
},
"session": {
"ttl": 3600,
"max_sessions": 1000,
"cleanup_interval": 300
},
"monitoring": {
"prometheus": {
"enabled": True,
"port": 9090
},
"logging": {
"level": "INFO",
"format": "json",
"file": "/var/log/orchestrator.log"
}
}
}
Usage Examples
Basic Server Registration
import asyncio
from orchestration_platform.mcp_orchestrator import MCPOrchestrator
async def main():
orchestrator = MCPOrchestrator()
await orchestrator.initialize()
# Register servers
await orchestrator.add_server("weather", "http://localhost:8001/mcp")
await orchestrator.add_server("crm", "http://localhost:8002/mcp")
# Call tools
result = await orchestrator.call_tool("weather", "get_current_weather", {
"location": "New York"
})
print(result)
await orchestrator.cleanup()
asyncio.run(main())
With Secrets Management
import asyncio
from orchestration_platform.mcp_orchestrator import MCPOrchestrator, SecretsManager
async def main():
# Setup secrets manager
secrets = SecretsManager(backend="local")
await secrets.initialize()
await secrets.set_secret("WEATHER_API_KEY", "your-api-key")
# Setup orchestrator with secrets
orchestrator = MCPOrchestrator(secrets_manager=secrets)
await orchestrator.initialize()
# Use secret in server config
await orchestrator.add_server("weather", "http://localhost:8001/mcp", {
"auth": {
"type": "api_key",
"key": await secrets.get_secret("WEATHER_API_KEY")
}
})
await orchestrator.cleanup()
asyncio.run(main())
With Session Management
import asyncio
from orchestration_platform.mcp_orchestrator import MCPOrchestrator, SessionManager
async def main():
orchestrator = MCPOrchestrator()
await orchestrator.initialize()
session_manager = SessionManager(orchestrator)
# Create session
session_id = await session_manager.create_session("user123", ["read", "execute"])
# Use session for tool calls
result = await orchestrator.call_tool("weather", "get_current_weather", {
"location": "New York"
})
await orchestrator.cleanup()
asyncio.run(main())
Event Handling
import asyncio
from orchestration_platform.mcp_orchestrator import MCPOrchestrator, EventType, EventHandler
async def handle_events(event):
if event.type == EventType.TOOL_CALLED:
print(f"Tool called: {event.tool} on {event.server}")
elif event.type == EventType.CIRCUIT_BREAKER_TRIPPED:
print(f"Circuit breaker tripped: {event.server}")
async def main():
orchestrator = MCPOrchestrator()
await orchestrator.initialize()
# Add event handler
handler = EventHandler(handle_events)
orchestrator.add_event_handler(EventType.TOOL_CALLED, handler)
orchestrator.add_event_handler(EventType.CIRCUIT_BREAKER_TRIPPED, handler)
await orchestrator.cleanup()
asyncio.run(main())