import logging import os import asyncio from contextlib import AsyncExitStack from typing import Any, Callable, Optional logger = logging.getLogger(__name__) def get_mcp_start_timeout() -> float: """Return the startup timeout for optional MCP servers.""" try: return float(os.getenv("MCP_START_TIMEOUT_SECONDS", "5")) except ValueError: return 5.0 async def enter_optional_mcp( stack: AsyncExitStack, factory: Callable[[], Any], server_name: str, ): """Try to start an MCP server and degrade gracefully if it is unavailable.""" try: server = factory() timeout = get_mcp_start_timeout() if timeout <= 0: return await stack.enter_async_context(server) return await asyncio.wait_for(stack.enter_async_context(server), timeout=timeout) except asyncio.TimeoutError: logger.warning("%s startup timed out; continuing without it", server_name) return None except Exception as exc: logger.warning("%s unavailable; continuing without it: %s", server_name, exc) return None def connected_mcp_servers(*servers: Optional[Any]) -> list[Any]: """Return only successfully connected MCP servers.""" return [server for server in servers if server is not None]