rajkumarrawal's picture
Initial commit
2ec0d39
|
raw
history blame
17.2 kB

API Reference

Complete API reference for the MCP Orchestration Platform.

Core Classes

MCPOrchestrator

The main orchestration engine that manages MCP servers, connections, and tool execution.

from orchestration_platform.mcp_orchestrator import MCPOrchestrator

Constructor

MCPOrchestrator(
    config: Optional[Dict[str, Any]] = None,
    secrets_manager: Optional[SecretsManager] = None,
    debug: bool = False
)

Parameters:

  • config: Optional configuration dictionary
  • secrets_manager: Optional secrets manager instance
  • debug: Enable debug mode for verbose logging

Methods

async initialize()

Initialize the orchestrator with configuration and setup internal components.

await orchestrator.initialize()

Returns: None

Raises:

  • ConfigurationError: If configuration is invalid
  • InitializationError: If initialization fails
async add_server(name: str, url: str, config: Optional[Dict] = None) -> bool

Register a new MCP server with the orchestrator.

success = await orchestrator.add_server(
    "weather-server", 
    "http://localhost:8001/mcp",
    {"timeout": 30}
)

Parameters:

  • name: Unique server identifier
  • url: MCP server endpoint URL
  • config: Optional server-specific configuration

Returns: bool - True if registration successful

Raises:

  • ServerRegistrationError: If server registration fails
  • DuplicateServerError: If server name already exists
async remove_server(name: str) -> bool

Remove a registered server from the orchestrator.

success = await orchestrator.remove_server("weather-server")

Parameters:

  • name: Server identifier to remove

Returns: bool - True if removal successful

async call_tool(server: str, tool: str, arguments: Dict[str, Any]) -> Dict[str, Any]

Execute a tool on a registered MCP server.

result = await orchestrator.call_tool(
    "weather-server",
    "get_current_weather",
    {"location": "New York"}
)

Parameters:

  • server: Server identifier
  • tool: Tool name to execute
  • arguments: Tool arguments as dictionary

Returns: Dict[str, Any] - Tool execution result

Raises:

  • ServerNotFoundError: If server doesn't exist
  • ToolNotFoundError: If tool doesn't exist
  • ToolExecutionError: If tool execution fails
  • CircuitBreakerError: If circuit breaker is open
async list_all_tools() -> Dict[str, List[Dict[str, Any]]]

Get catalog of all available tools across registered servers.

tools = await orchestrator.list_all_tools()
# Returns: {"weather-server": [...], "crm-server": [...]}

Returns: Dict[str, List[Dict[str, Any]]] - Tool catalog by server

async get_server_status(server: str) -> Dict[str, Any]

Get status information for a registered server.

status = await orchestrator.get_server_status("weather-server")

Parameters:

  • server: Server identifier

Returns: Dict[str, Any] - Server status information

async health_check() -> Dict[str, Any]

Perform comprehensive health check of the orchestrator.

health = await orchestrator.health_check()

Returns: Dict[str, Any] - Health status information

async cleanup()

Clean up resources and close connections.

await orchestrator.cleanup()

SecretsManager

Enterprise-grade secrets management with multiple backend support.

from orchestration_platform.secrets_manager import SecretsManager

Constructor

SecretsManager(
    backend: str = "local",
    config: Optional[Dict[str, Any]] = None
)

Parameters:

  • backend: Backend type ("local", "vault", "aws", "environment")
  • config: Backend-specific configuration

Methods

async initialize()

Initialize the secrets manager with specified backend.

secrets = SecretsManager(backend="local")
await secrets.initialize()

Raises:

  • SecretsBackendError: If backend initialization fails
  • ConfigurationError: If configuration is invalid
async get_secret(key: str) -> str

Retrieve a secret value.

api_key = await secrets.get_secret("WEATHER_API_KEY")

Parameters:

  • key: Secret key identifier

Returns: str - Secret value

Raises:

  • SecretNotFoundError: If secret doesn't exist
  • AccessDeniedError: If access is denied
async set_secret(key: str, value: str) -> bool

Store a secret value.

await secrets.set_secret("DATABASE_PASSWORD", "secure_password")

Parameters:

  • key: Secret key identifier
  • value: Secret value to store

Returns: bool - True if storage successful

async delete_secret(key: str) -> bool

Delete a secret value.

await secrets.delete_secret("OLD_API_KEY")

Parameters:

  • key: Secret key identifier

Returns: bool - True if deletion successful

async list_secrets(pattern: Optional[str] = None) -> List[str]

List available secret keys.

keys = await secrets.list_secrets(pattern="api_*")

Parameters:

  • pattern: Optional glob pattern to filter keys

Returns: List[str] - List of secret key names

async rotate_secret(key: str) -> bool

Rotate a secret value.

await secrets.rotate_secret("JWT_SECRET")

Parameters:

  • key: Secret key identifier

Returns: bool - True if rotation successful

SessionManager

Secure session management for user isolation and rate limiting.

from orchestration_platform.mcp_orchestrator import SessionManager

Constructor

SessionManager(
    orchestrator: MCPOrchestrator,
    session_ttl: int = 3600,
    max_sessions: int = 1000
)

Parameters:

  • orchestrator: Parent orchestrator instance
  • session_ttl: Session time-to-live in seconds
  • max_sessions: Maximum concurrent sessions

Methods

async create_session(user_id: str, permissions: List[str] = None) -> str

Create a new user session.

session_id = await session_manager.create_session(
    "user123",
    permissions=["read", "execute"]
)

Parameters:

  • user_id: Unique user identifier
  • permissions: List of user permissions

Returns: str - Session identifier

async get_session(session_id: str) -> Optional[Dict[str, Any]]

Retrieve session information.

session = await session_manager.get_session(session_id)

Parameters:

  • session_id: Session identifier

Returns: Optional[Dict[str, Any]] - Session information

async revoke_session(session_id: str) -> bool

Revoke an active session.

await session_manager.revoke_session(session_id)

Parameters:

  • session_id: Session identifier

Returns: bool - True if revocation successful

ConnectionPool

Advanced connection management with circuit breaker patterns.

from orchestration_platform.mcp_orchestrator import ConnectionPool

Constructor

ConnectionPool(
    server_url: str,
    max_connections: int = 10,
    min_connections: int = 2,
    connection_timeout: int = 30,
    health_check_interval: int = 30
)

Parameters:

  • server_url: MCP server endpoint URL
  • max_connections: Maximum pool size
  • min_connections: Minimum pool size
  • connection_timeout: Connection timeout in seconds
  • health_check_interval: Health check interval in seconds

Methods

async acquire() -> Connection

Acquire a connection from the pool.

connection = await pool.acquire()
try:
    result = await connection.execute_request(request)
finally:
    await pool.release(connection)

Returns: Connection - Managed connection instance

async release(connection: Connection)

Release a connection back to the pool.

await pool.release(connection)

Parameters:

  • connection: Connection to release
async health_check() -> Dict[str, Any]

Get connection pool health status.

health = await pool.health_check()

Returns: Dict[str, Any] - Pool health information

ToolManager

Dynamic tool discovery and introspection system.

from orchestration_platform.mcp_orchestrator import ToolManager

Constructor

ToolManager(orchestrator: MCPOrchestrator)

Parameters:

  • orchestrator: Parent orchestrator instance

Methods

async discover_tools(server_name: str) -> List[Dict[str, Any]]

Discover available tools for a server.

tools = await tool_manager.discover_tools("weather-server")

Parameters:

  • server_name: Server identifier

Returns: List[Dict[str, Any]] - Discovered tools

async validate_tool_call(server: str, tool: str, arguments: Dict[str, Any]) -> bool

Validate tool call arguments against schema.

valid = await tool_manager.validate_tool_call(
    "weather-server",
    "get_current_weather",
    {"location": "New York"}
)

Parameters:

  • server: Server identifier
  • tool: Tool name
  • arguments: Arguments to validate

Returns: bool - True if validation passes

async get_tool_schema(server: str, tool: str) -> Optional[Dict[str, Any]]

Get tool input/output schema.

schema = await tool_manager.get_tool_schema("weather-server", "get_current_weather")

Parameters:

  • server: Server identifier
  • tool: Tool name

Returns: Optional[Dict[str, Any]] - Tool schema

MultiLayerCache

High-performance caching with multiple storage layers.

from orchestration_platform.mcp_orchestrator import MultiLayerCache

Constructor

MultiLayerCache(
    layers: List[CacheLayer],
    default_ttl: int = 3600,
    max_size: int = 10000
)

Parameters:

  • layers: List of cache layer configurations
  • default_ttl: Default time-to-live in seconds
  • max_size: Maximum cache size

Methods

async get(key: str) -> Optional[Any]

Get value from cache.

value = await cache.get("weather:new_york")

Parameters:

  • key: Cache key

Returns: Optional[Any] - Cached value

async set(key: str, value: Any, ttl: Optional[int] = None) -> bool

Set value in cache.

await cache.set("weather:new_york", weather_data, ttl=1800)

Parameters:

  • key: Cache key
  • value: Value to cache
  • ttl: Optional time-to-live in seconds

Returns: bool - True if caching successful

async delete(key: str) -> bool

Delete value from cache.

await cache.delete("weather:new_york")

Parameters:

  • key: Cache key

Returns: bool - True if deletion successful

async clear() -> bool

Clear all cached values.

await cache.clear()

Returns: bool - True if clear successful

Event System

Event Types

from orchestration_platform.mcp_orchestrator import EventType

# Available event types
EventType.SERVER_CONNECTED
EventType.SERVER_DISCONNECTED
EventType.TOOL_CALLED
EventType.TOOL_COMPLETED
EventType.TOOL_FAILED
EventType.CIRCUIT_BREAKER_TRIPPED
EventType.CACHE_HIT
EventType.CACHE_MISS
EventType.SESSION_CREATED
EventType.SESSION_EXPIRED

Event Handlers

from orchestration_platform.mcp_orchestrator import EventHandler

async def handle_tool_event(event):
    print(f"Tool called: {event.tool} on {event.server}")

handler = EventHandler(handle_tool_event)
orchestrator.add_event_handler(EventType.TOOL_CALLED, handler)

Data Models

ServerStatus

@dataclass
class ServerStatus:
    name: str
    url: str
    status: str  # "healthy", "degraded", "unhealthy"
    last_health_check: str
    connection_count: int
    error_rate: float
    average_response_time: float

ToolInfo

@dataclass
class ToolInfo:
    name: str
    description: str
    input_schema: Dict[str, Any]
    output_schema: Optional[Dict[str, Any]]
    examples: List[Dict[str, Any]]
    tags: List[str]
    server_name: str

SessionInfo

@dataclass
class SessionInfo:
    session_id: str
    user_id: str
    created_at: str
    expires_at: str
    permissions: List[str]
    last_activity: str
    request_count: int

Error Classes

OrchestratorError

Base exception for all orchestrator errors.

class OrchestratorError(Exception):
    def __init__(self, message: str, details: Optional[Dict] = None):
        self.message = message
        self.details = details or {}
        super().__init__(self.message)

ConfigurationError

Raised when configuration is invalid.

class ConfigurationError(OrchestratorError):
    pass

ServerRegistrationError

Raised when server registration fails.

class ServerRegistrationError(OrchestratorError):
    pass

ToolExecutionError

Raised when tool execution fails.

class ToolExecutionError(OrchestratorError):
    def __init__(self, message: str, server: str, tool: str, details: Optional[Dict] = None):
        super().__init__(message, details)
        self.server = server
        self.tool = tool

CircuitBreakerError

Raised when circuit breaker is open.

class CircuitBreakerError(OrchestratorError):
    def __init__(self, message: str, server: str):
        super().__init__(message)
        self.server = server

Configuration Options

Orchestrator Configuration

config = {
    "orchestrator": {
        "host": "localhost",
        "port": 7860,
        "max_connections": 100,
        "connection_timeout": 30,
        "request_timeout": 60,
        "max_retries": 3,
        "retry_delay": 1.0,
        "circuit_breaker": {
            "failure_threshold": 5,
            "recovery_timeout": 60,
            "half_open_max_calls": 3
        }
    },
    "cache": {
        "layers": [
            {"type": "memory", "max_size": 1000},
            {"type": "redis", "url": "redis://localhost:6379"}
        ],
        "default_ttl": 3600,
        "enable_compression": True
    },
    "session": {
        "ttl": 3600,
        "max_sessions": 1000,
        "cleanup_interval": 300
    },
    "monitoring": {
        "prometheus": {
            "enabled": True,
            "port": 9090
        },
        "logging": {
            "level": "INFO",
            "format": "json",
            "file": "/var/log/orchestrator.log"
        }
    }
}

Usage Examples

Basic Server Registration

import asyncio
from orchestration_platform.mcp_orchestrator import MCPOrchestrator

async def main():
    orchestrator = MCPOrchestrator()
    await orchestrator.initialize()
    
    # Register servers
    await orchestrator.add_server("weather", "http://localhost:8001/mcp")
    await orchestrator.add_server("crm", "http://localhost:8002/mcp")
    
    # Call tools
    result = await orchestrator.call_tool("weather", "get_current_weather", {
        "location": "New York"
    })
    
    print(result)
    
    await orchestrator.cleanup()

asyncio.run(main())

With Secrets Management

import asyncio
from orchestration_platform.mcp_orchestrator import MCPOrchestrator, SecretsManager

async def main():
    # Setup secrets manager
    secrets = SecretsManager(backend="local")
    await secrets.initialize()
    await secrets.set_secret("WEATHER_API_KEY", "your-api-key")
    
    # Setup orchestrator with secrets
    orchestrator = MCPOrchestrator(secrets_manager=secrets)
    await orchestrator.initialize()
    
    # Use secret in server config
    await orchestrator.add_server("weather", "http://localhost:8001/mcp", {
        "auth": {
            "type": "api_key",
            "key": await secrets.get_secret("WEATHER_API_KEY")
        }
    })
    
    await orchestrator.cleanup()

asyncio.run(main())

With Session Management

import asyncio
from orchestration_platform.mcp_orchestrator import MCPOrchestrator, SessionManager

async def main():
    orchestrator = MCPOrchestrator()
    await orchestrator.initialize()
    
    session_manager = SessionManager(orchestrator)
    
    # Create session
    session_id = await session_manager.create_session("user123", ["read", "execute"])
    
    # Use session for tool calls
    result = await orchestrator.call_tool("weather", "get_current_weather", {
        "location": "New York"
    })
    
    await orchestrator.cleanup()

asyncio.run(main())

Event Handling

import asyncio
from orchestration_platform.mcp_orchestrator import MCPOrchestrator, EventType, EventHandler

async def handle_events(event):
    if event.type == EventType.TOOL_CALLED:
        print(f"Tool called: {event.tool} on {event.server}")
    elif event.type == EventType.CIRCUIT_BREAKER_TRIPPED:
        print(f"Circuit breaker tripped: {event.server}")

async def main():
    orchestrator = MCPOrchestrator()
    await orchestrator.initialize()
    
    # Add event handler
    handler = EventHandler(handle_events)
    orchestrator.add_event_handler(EventType.TOOL_CALLED, handler)
    orchestrator.add_event_handler(EventType.CIRCUIT_BREAKER_TRIPPED, handler)
    
    await orchestrator.cleanup()

asyncio.run(main())