Spaces:
Sleeping
Sleeping
| import asyncio | |
| from typing import Optional, Dict, Any | |
| from .transport import StreamableHttpMCPClient, load_server_config | |
| class MCPClientManager: | |
| """ | |
| Manages a single long-lived MCP client session. | |
| - Initializes once on demand | |
| - Reuses the same session for subsequent calls | |
| - Can be closed explicitly if needed | |
| """ | |
| def __init__(self) -> None: | |
| self._client: Optional[StreamableHttpMCPClient] = None | |
| self._init_lock = asyncio.Lock() | |
| def mcp_session_id(self) -> Optional[str]: | |
| return self._client.mcp_session_id if self._client else None | |
| async def ensure_initialized(self) -> None: | |
| if self._client and self._client.mcp_session_id: | |
| return | |
| async with self._init_lock: | |
| if self._client and self._client.mcp_session_id: | |
| return | |
| server_config = load_server_config() | |
| client = StreamableHttpMCPClient(server_config) | |
| # Manually enter context to open HTTP session | |
| await client.__aenter__() | |
| await client.initialize() | |
| try: | |
| await client.send_initialized_notification() | |
| except Exception: | |
| # Non-fatal if notification fails | |
| pass | |
| self._client = client | |
| async def call(self, method: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: | |
| await self.ensure_initialized() | |
| return await self._client.call(method, params or {}) | |
| async def close(self) -> None: | |
| if self._client: | |
| client = self._client | |
| self._client = None | |
| try: | |
| await client.__aexit__(None, None, None) | |
| except Exception: | |
| pass | |
| _singleton: Optional[MCPClientManager] = None | |
| def get_mcp_manager() -> MCPClientManager: | |
| global _singleton | |
| if _singleton is None: | |
| _singleton = MCPClientManager() | |
| return _singleton | |