rajkumarrawal's picture
Initial commit
2ec0d39

A newer version of the Gradio SDK is available: 6.1.0

Upgrade

API Reference

Complete API reference for the MCP Orchestration Platform.

Core Classes

MCPOrchestrator

The main orchestration engine that manages MCP servers, connections, and tool execution.

from orchestration_platform.mcp_orchestrator import MCPOrchestrator

Constructor

MCPOrchestrator(
    config: Optional[Dict[str, Any]] = None,
    secrets_manager: Optional[SecretsManager] = None,
    debug: bool = False
)

Parameters:

  • config: Optional configuration dictionary
  • secrets_manager: Optional secrets manager instance
  • debug: Enable debug mode for verbose logging

Methods

async initialize()

Initialize the orchestrator with configuration and setup internal components.

await orchestrator.initialize()

Returns: None

Raises:

  • ConfigurationError: If configuration is invalid
  • InitializationError: If initialization fails
async add_server(name: str, url: str, config: Optional[Dict] = None) -> bool

Register a new MCP server with the orchestrator.

success = await orchestrator.add_server(
    "weather-server", 
    "http://localhost:8001/mcp",
    {"timeout": 30}
)

Parameters:

  • name: Unique server identifier
  • url: MCP server endpoint URL
  • config: Optional server-specific configuration

Returns: bool - True if registration successful

Raises:

  • ServerRegistrationError: If server registration fails
  • DuplicateServerError: If server name already exists
async remove_server(name: str) -> bool

Remove a registered server from the orchestrator.

success = await orchestrator.remove_server("weather-server")

Parameters:

  • name: Server identifier to remove

Returns: bool - True if removal successful

async call_tool(server: str, tool: str, arguments: Dict[str, Any]) -> Dict[str, Any]

Execute a tool on a registered MCP server.

result = await orchestrator.call_tool(
    "weather-server",
    "get_current_weather",
    {"location": "New York"}
)

Parameters:

  • server: Server identifier
  • tool: Tool name to execute
  • arguments: Tool arguments as dictionary

Returns: Dict[str, Any] - Tool execution result

Raises:

  • ServerNotFoundError: If server doesn't exist
  • ToolNotFoundError: If tool doesn't exist
  • ToolExecutionError: If tool execution fails
  • CircuitBreakerError: If circuit breaker is open
async list_all_tools() -> Dict[str, List[Dict[str, Any]]]

Get catalog of all available tools across registered servers.

tools = await orchestrator.list_all_tools()
# Returns: {"weather-server": [...], "crm-server": [...]}

Returns: Dict[str, List[Dict[str, Any]]] - Tool catalog by server

async get_server_status(server: str) -> Dict[str, Any]

Get status information for a registered server.

status = await orchestrator.get_server_status("weather-server")

Parameters:

  • server: Server identifier

Returns: Dict[str, Any] - Server status information

async health_check() -> Dict[str, Any]

Perform comprehensive health check of the orchestrator.

health = await orchestrator.health_check()

Returns: Dict[str, Any] - Health status information

async cleanup()

Clean up resources and close connections.

await orchestrator.cleanup()

SecretsManager

Enterprise-grade secrets management with multiple backend support.

from orchestration_platform.secrets_manager import SecretsManager

Constructor

SecretsManager(
    backend: str = "local",
    config: Optional[Dict[str, Any]] = None
)

Parameters:

  • backend: Backend type ("local", "vault", "aws", "environment")
  • config: Backend-specific configuration

Methods

async initialize()

Initialize the secrets manager with specified backend.

secrets = SecretsManager(backend="local")
await secrets.initialize()

Raises:

  • SecretsBackendError: If backend initialization fails
  • ConfigurationError: If configuration is invalid
async get_secret(key: str) -> str

Retrieve a secret value.

api_key = await secrets.get_secret("WEATHER_API_KEY")

Parameters:

  • key: Secret key identifier

Returns: str - Secret value

Raises:

  • SecretNotFoundError: If secret doesn't exist
  • AccessDeniedError: If access is denied
async set_secret(key: str, value: str) -> bool

Store a secret value.

await secrets.set_secret("DATABASE_PASSWORD", "secure_password")

Parameters:

  • key: Secret key identifier
  • value: Secret value to store

Returns: bool - True if storage successful

async delete_secret(key: str) -> bool

Delete a secret value.

await secrets.delete_secret("OLD_API_KEY")

Parameters:

  • key: Secret key identifier

Returns: bool - True if deletion successful

async list_secrets(pattern: Optional[str] = None) -> List[str]

List available secret keys.

keys = await secrets.list_secrets(pattern="api_*")

Parameters:

  • pattern: Optional glob pattern to filter keys

Returns: List[str] - List of secret key names

async rotate_secret(key: str) -> bool

Rotate a secret value.

await secrets.rotate_secret("JWT_SECRET")

Parameters:

  • key: Secret key identifier

Returns: bool - True if rotation successful

SessionManager

Secure session management for user isolation and rate limiting.

from orchestration_platform.mcp_orchestrator import SessionManager

Constructor

SessionManager(
    orchestrator: MCPOrchestrator,
    session_ttl: int = 3600,
    max_sessions: int = 1000
)

Parameters:

  • orchestrator: Parent orchestrator instance
  • session_ttl: Session time-to-live in seconds
  • max_sessions: Maximum concurrent sessions

Methods

async create_session(user_id: str, permissions: List[str] = None) -> str

Create a new user session.

session_id = await session_manager.create_session(
    "user123",
    permissions=["read", "execute"]
)

Parameters:

  • user_id: Unique user identifier
  • permissions: List of user permissions

Returns: str - Session identifier

async get_session(session_id: str) -> Optional[Dict[str, Any]]

Retrieve session information.

session = await session_manager.get_session(session_id)

Parameters:

  • session_id: Session identifier

Returns: Optional[Dict[str, Any]] - Session information

async revoke_session(session_id: str) -> bool

Revoke an active session.

await session_manager.revoke_session(session_id)

Parameters:

  • session_id: Session identifier

Returns: bool - True if revocation successful

ConnectionPool

Advanced connection management with circuit breaker patterns.

from orchestration_platform.mcp_orchestrator import ConnectionPool

Constructor

ConnectionPool(
    server_url: str,
    max_connections: int = 10,
    min_connections: int = 2,
    connection_timeout: int = 30,
    health_check_interval: int = 30
)

Parameters:

  • server_url: MCP server endpoint URL
  • max_connections: Maximum pool size
  • min_connections: Minimum pool size
  • connection_timeout: Connection timeout in seconds
  • health_check_interval: Health check interval in seconds

Methods

async acquire() -> Connection

Acquire a connection from the pool.

connection = await pool.acquire()
try:
    result = await connection.execute_request(request)
finally:
    await pool.release(connection)

Returns: Connection - Managed connection instance

async release(connection: Connection)

Release a connection back to the pool.

await pool.release(connection)

Parameters:

  • connection: Connection to release
async health_check() -> Dict[str, Any]

Get connection pool health status.

health = await pool.health_check()

Returns: Dict[str, Any] - Pool health information

ToolManager

Dynamic tool discovery and introspection system.

from orchestration_platform.mcp_orchestrator import ToolManager

Constructor

ToolManager(orchestrator: MCPOrchestrator)

Parameters:

  • orchestrator: Parent orchestrator instance

Methods

async discover_tools(server_name: str) -> List[Dict[str, Any]]

Discover available tools for a server.

tools = await tool_manager.discover_tools("weather-server")

Parameters:

  • server_name: Server identifier

Returns: List[Dict[str, Any]] - Discovered tools

async validate_tool_call(server: str, tool: str, arguments: Dict[str, Any]) -> bool

Validate tool call arguments against schema.

valid = await tool_manager.validate_tool_call(
    "weather-server",
    "get_current_weather",
    {"location": "New York"}
)

Parameters:

  • server: Server identifier
  • tool: Tool name
  • arguments: Arguments to validate

Returns: bool - True if validation passes

async get_tool_schema(server: str, tool: str) -> Optional[Dict[str, Any]]

Get tool input/output schema.

schema = await tool_manager.get_tool_schema("weather-server", "get_current_weather")

Parameters:

  • server: Server identifier
  • tool: Tool name

Returns: Optional[Dict[str, Any]] - Tool schema

MultiLayerCache

High-performance caching with multiple storage layers.

from orchestration_platform.mcp_orchestrator import MultiLayerCache

Constructor

MultiLayerCache(
    layers: List[CacheLayer],
    default_ttl: int = 3600,
    max_size: int = 10000
)

Parameters:

  • layers: List of cache layer configurations
  • default_ttl: Default time-to-live in seconds
  • max_size: Maximum cache size

Methods

async get(key: str) -> Optional[Any]

Get value from cache.

value = await cache.get("weather:new_york")

Parameters:

  • key: Cache key

Returns: Optional[Any] - Cached value

async set(key: str, value: Any, ttl: Optional[int] = None) -> bool

Set value in cache.

await cache.set("weather:new_york", weather_data, ttl=1800)

Parameters:

  • key: Cache key
  • value: Value to cache
  • ttl: Optional time-to-live in seconds

Returns: bool - True if caching successful

async delete(key: str) -> bool

Delete value from cache.

await cache.delete("weather:new_york")

Parameters:

  • key: Cache key

Returns: bool - True if deletion successful

async clear() -> bool

Clear all cached values.

await cache.clear()

Returns: bool - True if clear successful

Event System

Event Types

from orchestration_platform.mcp_orchestrator import EventType

# Available event types
EventType.SERVER_CONNECTED
EventType.SERVER_DISCONNECTED
EventType.TOOL_CALLED
EventType.TOOL_COMPLETED
EventType.TOOL_FAILED
EventType.CIRCUIT_BREAKER_TRIPPED
EventType.CACHE_HIT
EventType.CACHE_MISS
EventType.SESSION_CREATED
EventType.SESSION_EXPIRED

Event Handlers

from orchestration_platform.mcp_orchestrator import EventHandler

async def handle_tool_event(event):
    print(f"Tool called: {event.tool} on {event.server}")

handler = EventHandler(handle_tool_event)
orchestrator.add_event_handler(EventType.TOOL_CALLED, handler)

Data Models

ServerStatus

@dataclass
class ServerStatus:
    name: str
    url: str
    status: str  # "healthy", "degraded", "unhealthy"
    last_health_check: str
    connection_count: int
    error_rate: float
    average_response_time: float

ToolInfo

@dataclass
class ToolInfo:
    name: str
    description: str
    input_schema: Dict[str, Any]
    output_schema: Optional[Dict[str, Any]]
    examples: List[Dict[str, Any]]
    tags: List[str]
    server_name: str

SessionInfo

@dataclass
class SessionInfo:
    session_id: str
    user_id: str
    created_at: str
    expires_at: str
    permissions: List[str]
    last_activity: str
    request_count: int

Error Classes

OrchestratorError

Base exception for all orchestrator errors.

class OrchestratorError(Exception):
    def __init__(self, message: str, details: Optional[Dict] = None):
        self.message = message
        self.details = details or {}
        super().__init__(self.message)

ConfigurationError

Raised when configuration is invalid.

class ConfigurationError(OrchestratorError):
    pass

ServerRegistrationError

Raised when server registration fails.

class ServerRegistrationError(OrchestratorError):
    pass

ToolExecutionError

Raised when tool execution fails.

class ToolExecutionError(OrchestratorError):
    def __init__(self, message: str, server: str, tool: str, details: Optional[Dict] = None):
        super().__init__(message, details)
        self.server = server
        self.tool = tool

CircuitBreakerError

Raised when circuit breaker is open.

class CircuitBreakerError(OrchestratorError):
    def __init__(self, message: str, server: str):
        super().__init__(message)
        self.server = server

Configuration Options

Orchestrator Configuration

config = {
    "orchestrator": {
        "host": "localhost",
        "port": 7860,
        "max_connections": 100,
        "connection_timeout": 30,
        "request_timeout": 60,
        "max_retries": 3,
        "retry_delay": 1.0,
        "circuit_breaker": {
            "failure_threshold": 5,
            "recovery_timeout": 60,
            "half_open_max_calls": 3
        }
    },
    "cache": {
        "layers": [
            {"type": "memory", "max_size": 1000},
            {"type": "redis", "url": "redis://localhost:6379"}
        ],
        "default_ttl": 3600,
        "enable_compression": True
    },
    "session": {
        "ttl": 3600,
        "max_sessions": 1000,
        "cleanup_interval": 300
    },
    "monitoring": {
        "prometheus": {
            "enabled": True,
            "port": 9090
        },
        "logging": {
            "level": "INFO",
            "format": "json",
            "file": "/var/log/orchestrator.log"
        }
    }
}

Usage Examples

Basic Server Registration

import asyncio
from orchestration_platform.mcp_orchestrator import MCPOrchestrator

async def main():
    orchestrator = MCPOrchestrator()
    await orchestrator.initialize()
    
    # Register servers
    await orchestrator.add_server("weather", "http://localhost:8001/mcp")
    await orchestrator.add_server("crm", "http://localhost:8002/mcp")
    
    # Call tools
    result = await orchestrator.call_tool("weather", "get_current_weather", {
        "location": "New York"
    })
    
    print(result)
    
    await orchestrator.cleanup()

asyncio.run(main())

With Secrets Management

import asyncio
from orchestration_platform.mcp_orchestrator import MCPOrchestrator, SecretsManager

async def main():
    # Setup secrets manager
    secrets = SecretsManager(backend="local")
    await secrets.initialize()
    await secrets.set_secret("WEATHER_API_KEY", "your-api-key")
    
    # Setup orchestrator with secrets
    orchestrator = MCPOrchestrator(secrets_manager=secrets)
    await orchestrator.initialize()
    
    # Use secret in server config
    await orchestrator.add_server("weather", "http://localhost:8001/mcp", {
        "auth": {
            "type": "api_key",
            "key": await secrets.get_secret("WEATHER_API_KEY")
        }
    })
    
    await orchestrator.cleanup()

asyncio.run(main())

With Session Management

import asyncio
from orchestration_platform.mcp_orchestrator import MCPOrchestrator, SessionManager

async def main():
    orchestrator = MCPOrchestrator()
    await orchestrator.initialize()
    
    session_manager = SessionManager(orchestrator)
    
    # Create session
    session_id = await session_manager.create_session("user123", ["read", "execute"])
    
    # Use session for tool calls
    result = await orchestrator.call_tool("weather", "get_current_weather", {
        "location": "New York"
    })
    
    await orchestrator.cleanup()

asyncio.run(main())

Event Handling

import asyncio
from orchestration_platform.mcp_orchestrator import MCPOrchestrator, EventType, EventHandler

async def handle_events(event):
    if event.type == EventType.TOOL_CALLED:
        print(f"Tool called: {event.tool} on {event.server}")
    elif event.type == EventType.CIRCUIT_BREAKER_TRIPPED:
        print(f"Circuit breaker tripped: {event.server}")

async def main():
    orchestrator = MCPOrchestrator()
    await orchestrator.initialize()
    
    # Add event handler
    handler = EventHandler(handle_events)
    orchestrator.add_event_handler(EventType.TOOL_CALLED, handler)
    orchestrator.add_event_handler(EventType.CIRCUIT_BREAKER_TRIPPED, handler)
    
    await orchestrator.cleanup()

asyncio.run(main())