rajkumarrawal's picture
Initial commit
2ec0d39
# API Reference
Complete API reference for the MCP Orchestration Platform.
## Core Classes
### MCPOrchestrator
The main orchestration engine that manages MCP servers, connections, and tool execution.
```python
from orchestration_platform.mcp_orchestrator import MCPOrchestrator
```
#### Constructor
```python
MCPOrchestrator(
config: Optional[Dict[str, Any]] = None,
secrets_manager: Optional[SecretsManager] = None,
debug: bool = False
)
```
**Parameters:**
- `config`: Optional configuration dictionary
- `secrets_manager`: Optional secrets manager instance
- `debug`: Enable debug mode for verbose logging
#### Methods
##### async initialize()
Initialize the orchestrator with configuration and setup internal components.
```python
await orchestrator.initialize()
```
**Returns:** `None`
**Raises:**
- `ConfigurationError`: If configuration is invalid
- `InitializationError`: If initialization fails
##### async add_server(name: str, url: str, config: Optional[Dict] = None) -> bool
Register a new MCP server with the orchestrator.
```python
success = await orchestrator.add_server(
"weather-server",
"http://localhost:8001/mcp",
{"timeout": 30}
)
```
**Parameters:**
- `name`: Unique server identifier
- `url`: MCP server endpoint URL
- `config`: Optional server-specific configuration
**Returns:** `bool` - True if registration successful
**Raises:**
- `ServerRegistrationError`: If server registration fails
- `DuplicateServerError`: If server name already exists
##### async remove_server(name: str) -> bool
Remove a registered server from the orchestrator.
```python
success = await orchestrator.remove_server("weather-server")
```
**Parameters:**
- `name`: Server identifier to remove
**Returns:** `bool` - True if removal successful
##### async call_tool(server: str, tool: str, arguments: Dict[str, Any]) -> Dict[str, Any]
Execute a tool on a registered MCP server.
```python
result = await orchestrator.call_tool(
"weather-server",
"get_current_weather",
{"location": "New York"}
)
```
**Parameters:**
- `server`: Server identifier
- `tool`: Tool name to execute
- `arguments`: Tool arguments as dictionary
**Returns:** `Dict[str, Any]` - Tool execution result
**Raises:**
- `ServerNotFoundError`: If server doesn't exist
- `ToolNotFoundError`: If tool doesn't exist
- `ToolExecutionError`: If tool execution fails
- `CircuitBreakerError`: If circuit breaker is open
##### async list_all_tools() -> Dict[str, List[Dict[str, Any]]]
Get catalog of all available tools across registered servers.
```python
tools = await orchestrator.list_all_tools()
# Returns: {"weather-server": [...], "crm-server": [...]}
```
**Returns:** `Dict[str, List[Dict[str, Any]]]` - Tool catalog by server
##### async get_server_status(server: str) -> Dict[str, Any]
Get status information for a registered server.
```python
status = await orchestrator.get_server_status("weather-server")
```
**Parameters:**
- `server`: Server identifier
**Returns:** `Dict[str, Any]` - Server status information
##### async health_check() -> Dict[str, Any]
Perform comprehensive health check of the orchestrator.
```python
health = await orchestrator.health_check()
```
**Returns:** `Dict[str, Any]` - Health status information
##### async cleanup()
Clean up resources and close connections.
```python
await orchestrator.cleanup()
```
### SecretsManager
Enterprise-grade secrets management with multiple backend support.
```python
from orchestration_platform.secrets_manager import SecretsManager
```
#### Constructor
```python
SecretsManager(
backend: str = "local",
config: Optional[Dict[str, Any]] = None
)
```
**Parameters:**
- `backend`: Backend type ("local", "vault", "aws", "environment")
- `config`: Backend-specific configuration
#### Methods
##### async initialize()
Initialize the secrets manager with specified backend.
```python
secrets = SecretsManager(backend="local")
await secrets.initialize()
```
**Raises:**
- `SecretsBackendError`: If backend initialization fails
- `ConfigurationError`: If configuration is invalid
##### async get_secret(key: str) -> str
Retrieve a secret value.
```python
api_key = await secrets.get_secret("WEATHER_API_KEY")
```
**Parameters:**
- `key`: Secret key identifier
**Returns:** `str` - Secret value
**Raises:**
- `SecretNotFoundError`: If secret doesn't exist
- `AccessDeniedError`: If access is denied
##### async set_secret(key: str, value: str) -> bool
Store a secret value.
```python
await secrets.set_secret("DATABASE_PASSWORD", "secure_password")
```
**Parameters:**
- `key`: Secret key identifier
- `value`: Secret value to store
**Returns:** `bool` - True if storage successful
##### async delete_secret(key: str) -> bool
Delete a secret value.
```python
await secrets.delete_secret("OLD_API_KEY")
```
**Parameters:**
- `key`: Secret key identifier
**Returns:** `bool` - True if deletion successful
##### async list_secrets(pattern: Optional[str] = None) -> List[str]
List available secret keys.
```python
keys = await secrets.list_secrets(pattern="api_*")
```
**Parameters:**
- `pattern`: Optional glob pattern to filter keys
**Returns:** `List[str]` - List of secret key names
##### async rotate_secret(key: str) -> bool
Rotate a secret value.
```python
await secrets.rotate_secret("JWT_SECRET")
```
**Parameters:**
- `key`: Secret key identifier
**Returns:** `bool` - True if rotation successful
### SessionManager
Secure session management for user isolation and rate limiting.
```python
from orchestration_platform.mcp_orchestrator import SessionManager
```
#### Constructor
```python
SessionManager(
orchestrator: MCPOrchestrator,
session_ttl: int = 3600,
max_sessions: int = 1000
)
```
**Parameters:**
- `orchestrator`: Parent orchestrator instance
- `session_ttl`: Session time-to-live in seconds
- `max_sessions`: Maximum concurrent sessions
#### Methods
##### async create_session(user_id: str, permissions: List[str] = None) -> str
Create a new user session.
```python
session_id = await session_manager.create_session(
"user123",
permissions=["read", "execute"]
)
```
**Parameters:**
- `user_id`: Unique user identifier
- `permissions`: List of user permissions
**Returns:** `str` - Session identifier
##### async get_session(session_id: str) -> Optional[Dict[str, Any]]
Retrieve session information.
```python
session = await session_manager.get_session(session_id)
```
**Parameters:**
- `session_id`: Session identifier
**Returns:** `Optional[Dict[str, Any]]` - Session information
##### async revoke_session(session_id: str) -> bool
Revoke an active session.
```python
await session_manager.revoke_session(session_id)
```
**Parameters:**
- `session_id`: Session identifier
**Returns:** `bool` - True if revocation successful
### ConnectionPool
Advanced connection management with circuit breaker patterns.
```python
from orchestration_platform.mcp_orchestrator import ConnectionPool
```
#### Constructor
```python
ConnectionPool(
server_url: str,
max_connections: int = 10,
min_connections: int = 2,
connection_timeout: int = 30,
health_check_interval: int = 30
)
```
**Parameters:**
- `server_url`: MCP server endpoint URL
- `max_connections`: Maximum pool size
- `min_connections`: Minimum pool size
- `connection_timeout`: Connection timeout in seconds
- `health_check_interval`: Health check interval in seconds
#### Methods
##### async acquire() -> Connection
Acquire a connection from the pool.
```python
connection = await pool.acquire()
try:
result = await connection.execute_request(request)
finally:
await pool.release(connection)
```
**Returns:** `Connection` - Managed connection instance
##### async release(connection: Connection)
Release a connection back to the pool.
```python
await pool.release(connection)
```
**Parameters:**
- `connection`: Connection to release
##### async health_check() -> Dict[str, Any]
Get connection pool health status.
```python
health = await pool.health_check()
```
**Returns:** `Dict[str, Any]` - Pool health information
### ToolManager
Dynamic tool discovery and introspection system.
```python
from orchestration_platform.mcp_orchestrator import ToolManager
```
#### Constructor
```python
ToolManager(orchestrator: MCPOrchestrator)
```
**Parameters:**
- `orchestrator`: Parent orchestrator instance
#### Methods
##### async discover_tools(server_name: str) -> List[Dict[str, Any]]
Discover available tools for a server.
```python
tools = await tool_manager.discover_tools("weather-server")
```
**Parameters:**
- `server_name`: Server identifier
**Returns:** `List[Dict[str, Any]]` - Discovered tools
##### async validate_tool_call(server: str, tool: str, arguments: Dict[str, Any]) -> bool
Validate tool call arguments against schema.
```python
valid = await tool_manager.validate_tool_call(
"weather-server",
"get_current_weather",
{"location": "New York"}
)
```
**Parameters:**
- `server`: Server identifier
- `tool`: Tool name
- `arguments`: Arguments to validate
**Returns:** `bool` - True if validation passes
##### async get_tool_schema(server: str, tool: str) -> Optional[Dict[str, Any]]
Get tool input/output schema.
```python
schema = await tool_manager.get_tool_schema("weather-server", "get_current_weather")
```
**Parameters:**
- `server`: Server identifier
- `tool`: Tool name
**Returns:** `Optional[Dict[str, Any]]` - Tool schema
### MultiLayerCache
High-performance caching with multiple storage layers.
```python
from orchestration_platform.mcp_orchestrator import MultiLayerCache
```
#### Constructor
```python
MultiLayerCache(
layers: List[CacheLayer],
default_ttl: int = 3600,
max_size: int = 10000
)
```
**Parameters:**
- `layers`: List of cache layer configurations
- `default_ttl`: Default time-to-live in seconds
- `max_size`: Maximum cache size
#### Methods
##### async get(key: str) -> Optional[Any]
Get value from cache.
```python
value = await cache.get("weather:new_york")
```
**Parameters:**
- `key`: Cache key
**Returns:** `Optional[Any]` - Cached value
##### async set(key: str, value: Any, ttl: Optional[int] = None) -> bool
Set value in cache.
```python
await cache.set("weather:new_york", weather_data, ttl=1800)
```
**Parameters:**
- `key`: Cache key
- `value`: Value to cache
- `ttl`: Optional time-to-live in seconds
**Returns:** `bool` - True if caching successful
##### async delete(key: str) -> bool
Delete value from cache.
```python
await cache.delete("weather:new_york")
```
**Parameters:**
- `key`: Cache key
**Returns:** `bool` - True if deletion successful
##### async clear() -> bool
Clear all cached values.
```python
await cache.clear()
```
**Returns:** `bool` - True if clear successful
## Event System
### Event Types
```python
from orchestration_platform.mcp_orchestrator import EventType
# Available event types
EventType.SERVER_CONNECTED
EventType.SERVER_DISCONNECTED
EventType.TOOL_CALLED
EventType.TOOL_COMPLETED
EventType.TOOL_FAILED
EventType.CIRCUIT_BREAKER_TRIPPED
EventType.CACHE_HIT
EventType.CACHE_MISS
EventType.SESSION_CREATED
EventType.SESSION_EXPIRED
```
### Event Handlers
```python
from orchestration_platform.mcp_orchestrator import EventHandler
async def handle_tool_event(event):
print(f"Tool called: {event.tool} on {event.server}")
handler = EventHandler(handle_tool_event)
orchestrator.add_event_handler(EventType.TOOL_CALLED, handler)
```
## Data Models
### ServerStatus
```python
@dataclass
class ServerStatus:
name: str
url: str
status: str # "healthy", "degraded", "unhealthy"
last_health_check: str
connection_count: int
error_rate: float
average_response_time: float
```
### ToolInfo
```python
@dataclass
class ToolInfo:
name: str
description: str
input_schema: Dict[str, Any]
output_schema: Optional[Dict[str, Any]]
examples: List[Dict[str, Any]]
tags: List[str]
server_name: str
```
### SessionInfo
```python
@dataclass
class SessionInfo:
session_id: str
user_id: str
created_at: str
expires_at: str
permissions: List[str]
last_activity: str
request_count: int
```
## Error Classes
### OrchestratorError
Base exception for all orchestrator errors.
```python
class OrchestratorError(Exception):
def __init__(self, message: str, details: Optional[Dict] = None):
self.message = message
self.details = details or {}
super().__init__(self.message)
```
### ConfigurationError
Raised when configuration is invalid.
```python
class ConfigurationError(OrchestratorError):
pass
```
### ServerRegistrationError
Raised when server registration fails.
```python
class ServerRegistrationError(OrchestratorError):
pass
```
### ToolExecutionError
Raised when tool execution fails.
```python
class ToolExecutionError(OrchestratorError):
def __init__(self, message: str, server: str, tool: str, details: Optional[Dict] = None):
super().__init__(message, details)
self.server = server
self.tool = tool
```
### CircuitBreakerError
Raised when circuit breaker is open.
```python
class CircuitBreakerError(OrchestratorError):
def __init__(self, message: str, server: str):
super().__init__(message)
self.server = server
```
## Configuration Options
### Orchestrator Configuration
```python
config = {
"orchestrator": {
"host": "localhost",
"port": 7860,
"max_connections": 100,
"connection_timeout": 30,
"request_timeout": 60,
"max_retries": 3,
"retry_delay": 1.0,
"circuit_breaker": {
"failure_threshold": 5,
"recovery_timeout": 60,
"half_open_max_calls": 3
}
},
"cache": {
"layers": [
{"type": "memory", "max_size": 1000},
{"type": "redis", "url": "redis://localhost:6379"}
],
"default_ttl": 3600,
"enable_compression": True
},
"session": {
"ttl": 3600,
"max_sessions": 1000,
"cleanup_interval": 300
},
"monitoring": {
"prometheus": {
"enabled": True,
"port": 9090
},
"logging": {
"level": "INFO",
"format": "json",
"file": "/var/log/orchestrator.log"
}
}
}
```
## Usage Examples
### Basic Server Registration
```python
import asyncio
from orchestration_platform.mcp_orchestrator import MCPOrchestrator
async def main():
orchestrator = MCPOrchestrator()
await orchestrator.initialize()
# Register servers
await orchestrator.add_server("weather", "http://localhost:8001/mcp")
await orchestrator.add_server("crm", "http://localhost:8002/mcp")
# Call tools
result = await orchestrator.call_tool("weather", "get_current_weather", {
"location": "New York"
})
print(result)
await orchestrator.cleanup()
asyncio.run(main())
```
### With Secrets Management
```python
import asyncio
from orchestration_platform.mcp_orchestrator import MCPOrchestrator, SecretsManager
async def main():
# Setup secrets manager
secrets = SecretsManager(backend="local")
await secrets.initialize()
await secrets.set_secret("WEATHER_API_KEY", "your-api-key")
# Setup orchestrator with secrets
orchestrator = MCPOrchestrator(secrets_manager=secrets)
await orchestrator.initialize()
# Use secret in server config
await orchestrator.add_server("weather", "http://localhost:8001/mcp", {
"auth": {
"type": "api_key",
"key": await secrets.get_secret("WEATHER_API_KEY")
}
})
await orchestrator.cleanup()
asyncio.run(main())
```
### With Session Management
```python
import asyncio
from orchestration_platform.mcp_orchestrator import MCPOrchestrator, SessionManager
async def main():
orchestrator = MCPOrchestrator()
await orchestrator.initialize()
session_manager = SessionManager(orchestrator)
# Create session
session_id = await session_manager.create_session("user123", ["read", "execute"])
# Use session for tool calls
result = await orchestrator.call_tool("weather", "get_current_weather", {
"location": "New York"
})
await orchestrator.cleanup()
asyncio.run(main())
```
### Event Handling
```python
import asyncio
from orchestration_platform.mcp_orchestrator import MCPOrchestrator, EventType, EventHandler
async def handle_events(event):
if event.type == EventType.TOOL_CALLED:
print(f"Tool called: {event.tool} on {event.server}")
elif event.type == EventType.CIRCUIT_BREAKER_TRIPPED:
print(f"Circuit breaker tripped: {event.server}")
async def main():
orchestrator = MCPOrchestrator()
await orchestrator.initialize()
# Add event handler
handler = EventHandler(handle_events)
orchestrator.add_event_handler(EventType.TOOL_CALLED, handler)
orchestrator.add_event_handler(EventType.CIRCUIT_BREAKER_TRIPPED, handler)
await orchestrator.cleanup()
asyncio.run(main())