| """ |
| MCP Manager for CodeAct Agent. |
| Manages MCP (Model Context Protocol) tools and servers. |
| """ |
|
|
| import os |
| import sys |
| import types |
| from pathlib import Path |
| from typing import Dict, List, Optional, Any |
| from rich.console import Console |
|
|
|
|
| class MCPManager: |
| """Manages MCP (Model Context Protocol) tools and servers.""" |
|
|
| def __init__(self, console_display=None): |
| self.mcp_functions = {} |
| self.console = console_display.console if console_display else Console() |
|
|
| def has_mcp_functions(self) -> bool: |
| """Check if MCP functions are available.""" |
| return bool(self.mcp_functions) |
|
|
| def group_tools_by_server(self, mcp_tools: Dict[str, dict]) -> Dict[str, List[Dict]]: |
| """Group MCP tools by server name.""" |
| servers = {} |
| for tool_name, tool_info in mcp_tools.items(): |
| server_name = tool_info.get('server', 'unknown') |
| if server_name not in servers: |
| servers[server_name] = [] |
| servers[server_name].append({ |
| 'name': tool_name, |
| 'description': tool_info.get('description', 'No description') |
| }) |
| return servers |
|
|
| def add_mcp(self, config_path: str = "./mcp_config.yaml", tool_registry=None) -> None: |
| """Add MCP tools from configuration file.""" |
| try: |
| import asyncio |
| import yaml |
| except ImportError as e: |
| raise ImportError(f"Required packages not available: {e}. Install with: pip install pyyaml") from e |
|
|
| try: |
| import nest_asyncio |
| from mcp import ClientSession |
| from mcp.client.stdio import StdioServerParameters, stdio_client |
| from mcp.client.streamable_http import streamablehttp_client |
| from langchain_mcp_adapters.tools import _list_all_tools |
| nest_asyncio.apply() |
| except ImportError as e: |
| raise ImportError(f"MCP packages not available: {e}. Install with: pip install mcp langchain-mcp-adapters") from e |
|
|
| def discover_mcp_tools_sync(server_params: StdioServerParameters) -> List[dict]: |
| """Discover available tools from MCP server synchronously.""" |
| try: |
| async def _discover_async(): |
| async with stdio_client(server_params) as (reader, writer): |
| async with ClientSession(reader, writer) as session: |
| await session.initialize() |
|
|
| tools_result = await session.list_tools() |
| tools = tools_result.tools if hasattr(tools_result, "tools") else tools_result |
| print(tools) |
|
|
| discovered_tools = [] |
| for tool in tools: |
| if hasattr(tool, "name"): |
| |
| description = getattr(tool, 'description', None) |
| if not description or description.strip() == "": |
| |
| formatted_name = tool.name.replace('_', ' ').title() |
| description = f"MCP tool: {formatted_name}" |
|
|
| discovered_tools.append({ |
| "name": tool.name, |
| "description": description, |
| "inputSchema": tool.inputSchema, |
| }) |
| else: |
| print(f"Warning: Skipping tool with no name attribute: {tool}") |
|
|
| return discovered_tools |
|
|
| return asyncio.run(_discover_async()) |
| except Exception as e: |
| print(f"Failed to discover tools: {e}") |
| return [] |
|
|
| def discover_remote_mcp_tools_sync(url: str) -> List[dict]: |
| """Discover available tools from remote MCP server synchronously.""" |
| try: |
| async def _discover_remote_async(): |
| async with streamablehttp_client(url) as (read, write, _): |
| async with ClientSession(read, write) as session: |
| await session.initialize() |
| tools = await _list_all_tools(session) |
|
|
| discovered_tools = [] |
| for tool in tools: |
| if hasattr(tool, "name"): |
| |
| description = getattr(tool, 'description', None) |
| if not description or description.strip() == "": |
| |
| formatted_name = tool.name.replace('_', ' ').title() |
| description = f"MCP tool: {formatted_name}" |
|
|
| discovered_tools.append({ |
| "name": tool.name, |
| "description": description, |
| "inputSchema": tool.inputSchema, |
| }) |
| else: |
| print(f"Warning: Skipping tool with no name attribute: {tool}") |
|
|
| return discovered_tools |
|
|
| return asyncio.run(_discover_remote_async()) |
| except Exception as e: |
| print(f"Failed to discover remote tools from {url}: {e}") |
| return [] |
|
|
| def make_mcp_wrapper(cmd: str, args: List[str], tool_name: str, doc: str, env_vars: dict = None): |
| """Create a synchronous wrapper for an async MCP tool call.""" |
|
|
| def sync_tool_wrapper(**kwargs): |
| """Synchronous wrapper for MCP tool execution.""" |
| try: |
| server_params = StdioServerParameters(command=cmd, args=args, env=env_vars) |
|
|
| async def async_tool_call(): |
| async with stdio_client(server_params) as (reader, writer): |
| async with ClientSession(reader, writer) as session: |
| await session.initialize() |
| result = await session.call_tool(tool_name, kwargs) |
| content = result.content[0] |
| if hasattr(content, "model_dump_json"): |
| return content.model_dump_json() |
| elif hasattr(content, "json"): |
| return content.json() |
| return content.text |
|
|
| try: |
| loop = asyncio.get_running_loop() |
| return loop.create_task(async_tool_call()) |
| except RuntimeError: |
| return asyncio.run(async_tool_call()) |
|
|
| except Exception as e: |
| raise RuntimeError(f"MCP tool execution failed for '{tool_name}': {e}") from e |
|
|
| sync_tool_wrapper.__name__ = tool_name |
| sync_tool_wrapper.__doc__ = doc |
| return sync_tool_wrapper |
|
|
| def make_remote_mcp_wrapper(url: str, tool_name: str, doc: str): |
| """Create a synchronous wrapper for an async remote MCP tool call.""" |
|
|
| def sync_tool_wrapper(**kwargs): |
| """Synchronous wrapper for remote MCP tool execution.""" |
| try: |
| async def async_tool_call(): |
| async with streamablehttp_client(url) as (read, write, _): |
| async with ClientSession(read, write) as session: |
| await session.initialize() |
| result = await session.call_tool(tool_name, kwargs) |
| content = result.content[0] |
| if hasattr(content, "model_dump_json"): |
| return content.model_dump_json() |
| elif hasattr(content, "json"): |
| return content.json() |
| return content.text |
|
|
| try: |
| loop = asyncio.get_running_loop() |
| return loop.create_task(async_tool_call()) |
| except RuntimeError: |
| return asyncio.run(async_tool_call()) |
|
|
| except Exception as e: |
| raise RuntimeError(f"Remote MCP tool execution failed for '{tool_name}': {e}") from e |
|
|
| sync_tool_wrapper.__name__ = tool_name |
| sync_tool_wrapper.__doc__ = doc |
| return sync_tool_wrapper |
|
|
| |
| try: |
| config_content = Path(config_path).read_text(encoding="utf-8") |
| cfg = yaml.safe_load(config_content) or {} |
| except FileNotFoundError: |
| raise FileNotFoundError(f"MCP config file not found: {config_path}") from None |
| except yaml.YAMLError as e: |
| raise yaml.YAMLError(f"Invalid YAML in MCP config: {e}") from e |
|
|
| mcp_servers = cfg.get("mcp_servers", {}) |
| if not mcp_servers: |
| print("Warning: No MCP servers found in configuration") |
| return |
|
|
| |
| for server_name, server_meta in mcp_servers.items(): |
| if not server_meta.get("enabled", True): |
| continue |
|
|
| |
| remote_url = server_meta.get("url") |
| if remote_url: |
| |
| self._process_remote_server(server_name, server_meta, remote_url, tool_registry, discover_remote_mcp_tools_sync, make_remote_mcp_wrapper) |
| continue |
|
|
| |
| |
| cmd_list = server_meta.get("command", []) |
| if not cmd_list or not isinstance(cmd_list, list): |
| print(f"Warning: Invalid command configuration for server '{server_name}'") |
| continue |
|
|
| cmd, *args = cmd_list |
|
|
| |
| env_vars = server_meta.get("env", {}) |
| if env_vars: |
| processed_env = {} |
| for key, value in env_vars.items(): |
| if isinstance(value, str) and value.startswith("${") and value.endswith("}"): |
| var_name = value[2:-1] |
| processed_env[key] = os.getenv(var_name, "") |
| else: |
| processed_env[key] = value |
| env_vars = processed_env |
|
|
| |
| mcp_module_name = f"mcp_servers.{server_name}" |
| if mcp_module_name not in sys.modules: |
| sys.modules[mcp_module_name] = types.ModuleType(mcp_module_name) |
| server_module = sys.modules[mcp_module_name] |
|
|
| tools_config = server_meta.get("tools", []) |
|
|
| |
| if not tools_config: |
| try: |
| server_params = StdioServerParameters(command=cmd, args=args, env=env_vars) |
| tools_config = discover_mcp_tools_sync(server_params) |
|
|
| if tools_config: |
| print(f"π Discovered {len(tools_config)} tools from {server_name} MCP server") |
| else: |
| print(f"Warning: No tools discovered from {server_name} MCP server") |
| continue |
|
|
| except Exception as e: |
| print(f"Failed to discover tools for {server_name}: {e}") |
| continue |
|
|
| |
| tools_added = 0 |
| for tool_meta in tools_config: |
| if isinstance(tool_meta, dict) and "biomni_name" in tool_meta: |
| |
| tool_name = tool_meta.get("biomni_name") |
| description = tool_meta.get("description", f"MCP tool: {tool_name}") |
| parameters = tool_meta.get("parameters", {}) |
| required_param_names = [] |
| for param_name, param_spec in parameters.items(): |
| if param_spec.get("required", False): |
| required_param_names.append(param_name) |
| else: |
| |
| tool_name = tool_meta.get("name") |
| description = tool_meta.get("description", "") |
|
|
| |
| if not description or description.strip() == "": |
| formatted_name = tool_name.replace('_', ' ').title() |
| description = f"MCP tool: {formatted_name}" |
|
|
| input_schema = tool_meta.get("inputSchema", {}) |
| parameters = input_schema.get("properties", {}) |
| required_param_names = input_schema.get("required", []) |
|
|
| if not tool_name: |
| print(f"Warning: Skipping tool with no name in {server_name}") |
| continue |
|
|
| |
| wrapper_function = make_mcp_wrapper(cmd, args, tool_name, description, env_vars) |
|
|
| |
| setattr(server_module, tool_name, wrapper_function) |
|
|
| |
| self.mcp_functions[tool_name] = { |
| "function": wrapper_function, |
| "server": server_name, |
| "module": mcp_module_name, |
| "description": description, |
| "required_parameters": [], |
| "optional_parameters": [] |
| } |
|
|
| |
| if tool_registry: |
| from .tool_registry import ToolRegistry |
| |
| required_params = [] |
| optional_params = [] |
|
|
| for param_name, param_spec in parameters.items(): |
| param_info = { |
| "name": param_name, |
| "type": param_spec.get("type", "string"), |
| "description": param_spec.get("description", f"Parameter {param_name}"), |
| } |
|
|
| |
| if "enum" in param_spec: |
| param_info["enum"] = param_spec["enum"] |
|
|
| |
| if "anyOf" in param_spec: |
| |
| for schema_option in param_spec["anyOf"]: |
| if "enum" in schema_option: |
| param_info["enum"] = schema_option["enum"] |
| |
| if "type" in schema_option: |
| param_info["type"] = schema_option["type"] |
| break |
|
|
| |
| if "oneOf" in param_spec: |
| |
| for schema_option in param_spec["oneOf"]: |
| if "enum" in schema_option: |
| param_info["enum"] = schema_option["enum"] |
| if "type" in schema_option: |
| param_info["type"] = schema_option["type"] |
| break |
|
|
| |
| |
| |
| is_required = (param_name in required_param_names) or ("default" not in param_spec) |
|
|
| if is_required: |
| required_params.append(param_info) |
| else: |
| param_info["default"] = param_spec.get("default") |
| optional_params.append(param_info) |
|
|
| |
| tool_schema = { |
| "name": tool_name, |
| "description": description, |
| "required_parameters": required_params, |
| "optional_parameters": optional_params, |
| "module": mcp_module_name, |
| } |
|
|
| success = tool_registry.register_tool(tool_schema, mcp_module_name) |
| if success: |
| tool_registry._name_to_function[tool_name] = wrapper_function |
| tools_added += 1 |
|
|
| |
| self.mcp_functions[tool_name]["required_parameters"] = required_params |
| self.mcp_functions[tool_name]["optional_parameters"] = optional_params |
|
|
| if tools_added > 0: |
| print(f"β
Added {tools_added} MCP tools from {server_name} server") |
|
|
| print(f"π οΈ Total MCP tools loaded: {len(self.mcp_functions)}") |
|
|
| def _process_remote_server(self, server_name: str, server_meta: dict, remote_url: str, tool_registry, discover_remote_mcp_tools_sync, make_remote_mcp_wrapper): |
| """Process a remote MCP server configuration.""" |
| import sys |
| import types |
|
|
| |
| mcp_module_name = f"mcp_servers.{server_name}" |
| if mcp_module_name not in sys.modules: |
| sys.modules[mcp_module_name] = types.ModuleType(mcp_module_name) |
| server_module = sys.modules[mcp_module_name] |
|
|
| tools_config = server_meta.get("tools", []) |
|
|
| |
| if not tools_config: |
| try: |
| tools_config = discover_remote_mcp_tools_sync(remote_url) |
|
|
| if tools_config: |
| print(f"π Discovered {len(tools_config)} tools from {server_name} remote MCP server") |
| else: |
| print(f"Warning: No tools discovered from {server_name} remote MCP server") |
| return |
|
|
| except Exception as e: |
| print(f"Failed to discover tools for remote {server_name}: {e}") |
| return |
|
|
| |
| tools_added = 0 |
| for tool_meta in tools_config: |
| if isinstance(tool_meta, dict) and "biomni_name" in tool_meta: |
| |
| tool_name = tool_meta.get("biomni_name") |
| description = tool_meta.get("description", f"Remote MCP tool: {tool_name}") |
| parameters = tool_meta.get("parameters", {}) |
| required_param_names = [] |
| for param_name, param_spec in parameters.items(): |
| if param_spec.get("required", False): |
| required_param_names.append(param_name) |
| else: |
| |
| tool_name = tool_meta.get("name") |
| description = tool_meta.get("description", "") |
|
|
| |
| if not description or description.strip() == "": |
| formatted_name = tool_name.replace('_', ' ').title() |
| description = f"Remote MCP tool: {formatted_name}" |
|
|
| input_schema = tool_meta.get("inputSchema", {}) |
| parameters = input_schema.get("properties", {}) |
| required_param_names = input_schema.get("required", []) |
|
|
| if not tool_name: |
| print(f"Warning: Skipping tool with no name in remote {server_name}") |
| continue |
|
|
| |
| wrapper_function = make_remote_mcp_wrapper(remote_url, tool_name, description) |
|
|
| |
| setattr(server_module, tool_name, wrapper_function) |
|
|
| |
| self.mcp_functions[tool_name] = { |
| "function": wrapper_function, |
| "server": server_name, |
| "module": mcp_module_name, |
| "description": description, |
| "required_parameters": [], |
| "optional_parameters": [], |
| "remote_url": remote_url |
| } |
|
|
| |
| if tool_registry: |
| from .tool_registry import ToolRegistry |
| |
| required_params = [] |
| optional_params = [] |
|
|
| for param_name, param_spec in parameters.items(): |
| param_info = { |
| "name": param_name, |
| "type": param_spec.get("type", "string"), |
| "description": param_spec.get("description", f"Parameter {param_name}"), |
| } |
|
|
| |
| if "enum" in param_spec: |
| param_info["enum"] = param_spec["enum"] |
|
|
| |
| if "anyOf" in param_spec: |
| |
| for schema_option in param_spec["anyOf"]: |
| if "enum" in schema_option: |
| param_info["enum"] = schema_option["enum"] |
| |
| if "type" in schema_option: |
| param_info["type"] = schema_option["type"] |
| break |
|
|
| |
| if "oneOf" in param_spec: |
| |
| for schema_option in param_spec["oneOf"]: |
| if "enum" in schema_option: |
| param_info["enum"] = schema_option["enum"] |
| if "type" in schema_option: |
| param_info["type"] = schema_option["type"] |
| break |
|
|
| |
| |
| |
| is_required = (param_name in required_param_names) or ("default" not in param_spec) |
|
|
| if is_required: |
| required_params.append(param_info) |
| else: |
| param_info["default"] = param_spec.get("default") |
| optional_params.append(param_info) |
|
|
| |
| tool_schema = { |
| "name": tool_name, |
| "description": description, |
| "required_parameters": required_params, |
| "optional_parameters": optional_params, |
| "module": mcp_module_name, |
| } |
|
|
| success = tool_registry.register_tool(tool_schema, mcp_module_name) |
| if success: |
| tool_registry._name_to_function[tool_name] = wrapper_function |
| tools_added += 1 |
|
|
| |
| self.mcp_functions[tool_name]["required_parameters"] = required_params |
| self.mcp_functions[tool_name]["optional_parameters"] = optional_params |
|
|
| if tools_added > 0: |
| print(f"β
Added {tools_added} remote MCP tools from {server_name} server") |
|
|
| def list_mcp_tools(self) -> Dict[str, dict]: |
| """List all loaded MCP tools.""" |
| return self.mcp_functions.copy() |
|
|
| def remove_mcp_tool(self, tool_name: str, tool_registry=None) -> bool: |
| """Remove an MCP tool by name.""" |
| if not self.has_mcp_functions() or tool_name not in self.mcp_functions: |
| return False |
|
|
| |
| if tool_registry: |
| tool_registry.remove_tool_by_name(tool_name) |
|
|
| |
| del self.mcp_functions[tool_name] |
| return True |
|
|
| def show_mcp_status(self) -> None: |
| """Display detailed MCP status information to the user.""" |
| if not self.has_mcp_functions(): |
| self.console.print("π No MCP tools loaded") |
| return |
|
|
| mcp_tools = self.mcp_functions |
| if not mcp_tools: |
| self.console.print("π MCP system initialized but no tools loaded") |
| return |
|
|
| |
| servers = self.group_tools_by_server(mcp_tools) |
|
|
| |
| self.console.print(f"\nπ MCP Status Report:") |
| self.console.print(f" π Total servers: {len(servers)}") |
| self.console.print(f" π οΈ Total MCP tools: {len(mcp_tools)}") |
|
|
| for server_name, tools in servers.items(): |
| self.console.print(f"\n π‘ Server: {server_name}") |
| self.console.print(f" Status: β
Active ({len(tools)} tools)") |
| for tool in tools: |
| self.console.print(f" β’ {tool['name']}: {tool['description']}") |
|
|
| def get_mcp_summary(self) -> Dict[str, any]: |
| """Get a summary of MCP tools for programmatic access.""" |
| if not self.has_mcp_functions(): |
| return {"total_tools": 0, "servers": {}, "tools": {}} |
|
|
| mcp_tools = self.mcp_functions |
| |
| servers = {} |
| for tool_name, tool_info in mcp_tools.items(): |
| server_name = tool_info.get('server', 'unknown') |
| if server_name not in servers: |
| servers[server_name] = [] |
| servers[server_name].append(tool_name) |
|
|
| return { |
| "total_tools": len(mcp_tools), |
| "total_servers": len(servers), |
| "servers": servers, |
| "tools": {name: info.get('description', '') for name, info in mcp_tools.items()} |
| } |