""" MCP Manager for CodeAct Agent. Manages MCP (Model Context Protocol) tools and servers. """ import os import sys import types from pathlib import Path from typing import Dict, List, Optional, Any from rich.console import Console class MCPManager: """Manages MCP (Model Context Protocol) tools and servers.""" def __init__(self, console_display=None): self.mcp_functions = {} self.console = console_display.console if console_display else Console() def has_mcp_functions(self) -> bool: """Check if MCP functions are available.""" return bool(self.mcp_functions) def group_tools_by_server(self, mcp_tools: Dict[str, dict]) -> Dict[str, List[Dict]]: """Group MCP tools by server name.""" servers = {} for tool_name, tool_info in mcp_tools.items(): server_name = tool_info.get('server', 'unknown') if server_name not in servers: servers[server_name] = [] servers[server_name].append({ 'name': tool_name, 'description': tool_info.get('description', 'No description') }) return servers def add_mcp(self, config_path: str = "./mcp_config.yaml", tool_registry=None) -> None: """Add MCP tools from configuration file.""" try: import asyncio import yaml except ImportError as e: raise ImportError(f"Required packages not available: {e}. Install with: pip install pyyaml") from e try: import nest_asyncio from mcp import ClientSession from mcp.client.stdio import StdioServerParameters, stdio_client from mcp.client.streamable_http import streamablehttp_client from langchain_mcp_adapters.tools import _list_all_tools nest_asyncio.apply() except ImportError as e: raise ImportError(f"MCP packages not available: {e}. Install with: pip install mcp langchain-mcp-adapters") from e def discover_mcp_tools_sync(server_params: StdioServerParameters) -> List[dict]: """Discover available tools from MCP server synchronously.""" try: async def _discover_async(): async with stdio_client(server_params) as (reader, writer): async with ClientSession(reader, writer) as session: await session.initialize() tools_result = await session.list_tools() tools = tools_result.tools if hasattr(tools_result, "tools") else tools_result print(tools) discovered_tools = [] for tool in tools: if hasattr(tool, "name"): # Ensure description is never empty or None description = getattr(tool, 'description', None) if not description or description.strip() == "": # Generate description from tool name formatted_name = tool.name.replace('_', ' ').title() description = f"MCP tool: {formatted_name}" discovered_tools.append({ "name": tool.name, "description": description, "inputSchema": tool.inputSchema, }) else: print(f"Warning: Skipping tool with no name attribute: {tool}") return discovered_tools return asyncio.run(_discover_async()) except Exception as e: print(f"Failed to discover tools: {e}") return [] def discover_remote_mcp_tools_sync(url: str) -> List[dict]: """Discover available tools from remote MCP server synchronously.""" try: async def _discover_remote_async(): async with streamablehttp_client(url) as (read, write, _): async with ClientSession(read, write) as session: await session.initialize() tools = await _list_all_tools(session) discovered_tools = [] for tool in tools: if hasattr(tool, "name"): # Ensure description is never empty or None description = getattr(tool, 'description', None) if not description or description.strip() == "": # Generate description from tool name formatted_name = tool.name.replace('_', ' ').title() description = f"MCP tool: {formatted_name}" discovered_tools.append({ "name": tool.name, "description": description, "inputSchema": tool.inputSchema, }) else: print(f"Warning: Skipping tool with no name attribute: {tool}") return discovered_tools return asyncio.run(_discover_remote_async()) except Exception as e: print(f"Failed to discover remote tools from {url}: {e}") return [] def make_mcp_wrapper(cmd: str, args: List[str], tool_name: str, doc: str, env_vars: dict = None): """Create a synchronous wrapper for an async MCP tool call.""" def sync_tool_wrapper(**kwargs): """Synchronous wrapper for MCP tool execution.""" try: server_params = StdioServerParameters(command=cmd, args=args, env=env_vars) async def async_tool_call(): async with stdio_client(server_params) as (reader, writer): async with ClientSession(reader, writer) as session: await session.initialize() result = await session.call_tool(tool_name, kwargs) content = result.content[0] if hasattr(content, "model_dump_json"): return content.model_dump_json() elif hasattr(content, "json"): return content.json() return content.text try: loop = asyncio.get_running_loop() return loop.create_task(async_tool_call()) except RuntimeError: return asyncio.run(async_tool_call()) except Exception as e: raise RuntimeError(f"MCP tool execution failed for '{tool_name}': {e}") from e sync_tool_wrapper.__name__ = tool_name sync_tool_wrapper.__doc__ = doc return sync_tool_wrapper def make_remote_mcp_wrapper(url: str, tool_name: str, doc: str): """Create a synchronous wrapper for an async remote MCP tool call.""" def sync_tool_wrapper(**kwargs): """Synchronous wrapper for remote MCP tool execution.""" try: async def async_tool_call(): async with streamablehttp_client(url) as (read, write, _): async with ClientSession(read, write) as session: await session.initialize() result = await session.call_tool(tool_name, kwargs) content = result.content[0] if hasattr(content, "model_dump_json"): return content.model_dump_json() elif hasattr(content, "json"): return content.json() return content.text try: loop = asyncio.get_running_loop() return loop.create_task(async_tool_call()) except RuntimeError: return asyncio.run(async_tool_call()) except Exception as e: raise RuntimeError(f"Remote MCP tool execution failed for '{tool_name}': {e}") from e sync_tool_wrapper.__name__ = tool_name sync_tool_wrapper.__doc__ = doc return sync_tool_wrapper # Load and validate configuration try: config_content = Path(config_path).read_text(encoding="utf-8") cfg = yaml.safe_load(config_content) or {} except FileNotFoundError: raise FileNotFoundError(f"MCP config file not found: {config_path}") from None except yaml.YAMLError as e: raise yaml.YAMLError(f"Invalid YAML in MCP config: {e}") from e mcp_servers = cfg.get("mcp_servers", {}) if not mcp_servers: print("Warning: No MCP servers found in configuration") return # Process each MCP server configuration for server_name, server_meta in mcp_servers.items(): if not server_meta.get("enabled", True): continue # Check if this is a remote server configuration remote_url = server_meta.get("url") if remote_url: # Handle remote MCP server self._process_remote_server(server_name, server_meta, remote_url, tool_registry, discover_remote_mcp_tools_sync, make_remote_mcp_wrapper) continue # Handle local MCP server (existing logic) # Validate command configuration cmd_list = server_meta.get("command", []) if not cmd_list or not isinstance(cmd_list, list): print(f"Warning: Invalid command configuration for server '{server_name}'") continue cmd, *args = cmd_list # Process environment variables env_vars = server_meta.get("env", {}) if env_vars: processed_env = {} for key, value in env_vars.items(): if isinstance(value, str) and value.startswith("${") and value.endswith("}"): var_name = value[2:-1] processed_env[key] = os.getenv(var_name, "") else: processed_env[key] = value env_vars = processed_env # Create module namespace for this MCP server mcp_module_name = f"mcp_servers.{server_name}" if mcp_module_name not in sys.modules: sys.modules[mcp_module_name] = types.ModuleType(mcp_module_name) server_module = sys.modules[mcp_module_name] tools_config = server_meta.get("tools", []) # Auto-discover tools if not manually configured if not tools_config: try: server_params = StdioServerParameters(command=cmd, args=args, env=env_vars) tools_config = discover_mcp_tools_sync(server_params) if tools_config: print(f"šŸ” Discovered {len(tools_config)} tools from {server_name} MCP server") else: print(f"Warning: No tools discovered from {server_name} MCP server") continue except Exception as e: print(f"Failed to discover tools for {server_name}: {e}") continue # Register each tool tools_added = 0 for tool_meta in tools_config: if isinstance(tool_meta, dict) and "biomni_name" in tool_meta: # Manual tool definition (Biomni-style) tool_name = tool_meta.get("biomni_name") description = tool_meta.get("description", f"MCP tool: {tool_name}") parameters = tool_meta.get("parameters", {}) required_param_names = [] for param_name, param_spec in parameters.items(): if param_spec.get("required", False): required_param_names.append(param_name) else: # Auto-discovered tool tool_name = tool_meta.get("name") description = tool_meta.get("description", "") # Ensure description is never empty if not description or description.strip() == "": formatted_name = tool_name.replace('_', ' ').title() description = f"MCP tool: {formatted_name}" input_schema = tool_meta.get("inputSchema", {}) parameters = input_schema.get("properties", {}) required_param_names = input_schema.get("required", []) if not tool_name: print(f"Warning: Skipping tool with no name in {server_name}") continue # Create wrapper function wrapper_function = make_mcp_wrapper(cmd, args, tool_name, description, env_vars) # Add to module namespace setattr(server_module, tool_name, wrapper_function) # Store in MCP functions registry with parameter information self.mcp_functions[tool_name] = { "function": wrapper_function, "server": server_name, "module": mcp_module_name, "description": description, "required_parameters": [], # Will be populated below "optional_parameters": [] # Will be populated below } # Register with tool registry if available if tool_registry: from .tool_registry import ToolRegistry # Create tool schema with proper parameter information required_params = [] optional_params = [] for param_name, param_spec in parameters.items(): param_info = { "name": param_name, "type": param_spec.get("type", "string"), "description": param_spec.get("description", f"Parameter {param_name}"), } # Extract enum/literal values if present if "enum" in param_spec: param_info["enum"] = param_spec["enum"] # Handle anyOf schemas (common for optional literal types) if "anyOf" in param_spec: # Look for enum in anyOf schemas for schema_option in param_spec["anyOf"]: if "enum" in schema_option: param_info["enum"] = schema_option["enum"] # Update type if specified if "type" in schema_option: param_info["type"] = schema_option["type"] break # Handle oneOf schemas (alternative union syntax) if "oneOf" in param_spec: # Look for enum in oneOf schemas for schema_option in param_spec["oneOf"]: if "enum" in schema_option: param_info["enum"] = schema_option["enum"] if "type" in schema_option: param_info["type"] = schema_option["type"] break # Determine if parameter is required based on: # 1. Explicit required list (if provided) # 2. If no default value is present in the schema is_required = (param_name in required_param_names) or ("default" not in param_spec) if is_required: required_params.append(param_info) else: param_info["default"] = param_spec.get("default") optional_params.append(param_info) # Create complete tool schema tool_schema = { "name": tool_name, "description": description, "required_parameters": required_params, "optional_parameters": optional_params, "module": mcp_module_name, } success = tool_registry.register_tool(tool_schema, mcp_module_name) if success: tool_registry._name_to_function[tool_name] = wrapper_function tools_added += 1 # Update MCP functions registry with parameter information self.mcp_functions[tool_name]["required_parameters"] = required_params self.mcp_functions[tool_name]["optional_parameters"] = optional_params if tools_added > 0: print(f"āœ… Added {tools_added} MCP tools from {server_name} server") print(f"šŸ› ļø Total MCP tools loaded: {len(self.mcp_functions)}") def _process_remote_server(self, server_name: str, server_meta: dict, remote_url: str, tool_registry, discover_remote_mcp_tools_sync, make_remote_mcp_wrapper): """Process a remote MCP server configuration.""" import sys import types # Create module namespace for this remote MCP server mcp_module_name = f"mcp_servers.{server_name}" if mcp_module_name not in sys.modules: sys.modules[mcp_module_name] = types.ModuleType(mcp_module_name) server_module = sys.modules[mcp_module_name] tools_config = server_meta.get("tools", []) # Auto-discover tools if not manually configured if not tools_config: try: tools_config = discover_remote_mcp_tools_sync(remote_url) if tools_config: print(f"šŸ” Discovered {len(tools_config)} tools from {server_name} remote MCP server") else: print(f"Warning: No tools discovered from {server_name} remote MCP server") return except Exception as e: print(f"Failed to discover tools for remote {server_name}: {e}") return # Register each tool tools_added = 0 for tool_meta in tools_config: if isinstance(tool_meta, dict) and "biomni_name" in tool_meta: # Manual tool definition (Biomni-style) tool_name = tool_meta.get("biomni_name") description = tool_meta.get("description", f"Remote MCP tool: {tool_name}") parameters = tool_meta.get("parameters", {}) required_param_names = [] for param_name, param_spec in parameters.items(): if param_spec.get("required", False): required_param_names.append(param_name) else: # Auto-discovered tool tool_name = tool_meta.get("name") description = tool_meta.get("description", "") # Ensure description is never empty if not description or description.strip() == "": formatted_name = tool_name.replace('_', ' ').title() description = f"Remote MCP tool: {formatted_name}" input_schema = tool_meta.get("inputSchema", {}) parameters = input_schema.get("properties", {}) required_param_names = input_schema.get("required", []) if not tool_name: print(f"Warning: Skipping tool with no name in remote {server_name}") continue # Create wrapper function for remote tool wrapper_function = make_remote_mcp_wrapper(remote_url, tool_name, description) # Add to module namespace setattr(server_module, tool_name, wrapper_function) # Store in MCP functions registry with parameter information self.mcp_functions[tool_name] = { "function": wrapper_function, "server": server_name, "module": mcp_module_name, "description": description, "required_parameters": [], # Will be populated below "optional_parameters": [], # Will be populated below "remote_url": remote_url } # Register with tool registry if available if tool_registry: from .tool_registry import ToolRegistry # Create tool schema with proper parameter information required_params = [] optional_params = [] for param_name, param_spec in parameters.items(): param_info = { "name": param_name, "type": param_spec.get("type", "string"), "description": param_spec.get("description", f"Parameter {param_name}"), } # Extract enum/literal values if present if "enum" in param_spec: param_info["enum"] = param_spec["enum"] # Handle anyOf schemas (common for optional literal types) if "anyOf" in param_spec: # Look for enum in anyOf schemas for schema_option in param_spec["anyOf"]: if "enum" in schema_option: param_info["enum"] = schema_option["enum"] # Update type if specified if "type" in schema_option: param_info["type"] = schema_option["type"] break # Handle oneOf schemas (alternative union syntax) if "oneOf" in param_spec: # Look for enum in oneOf schemas for schema_option in param_spec["oneOf"]: if "enum" in schema_option: param_info["enum"] = schema_option["enum"] if "type" in schema_option: param_info["type"] = schema_option["type"] break # Determine if parameter is required based on: # 1. Explicit required list (if provided) # 2. If no default value is present in the schema is_required = (param_name in required_param_names) or ("default" not in param_spec) if is_required: required_params.append(param_info) else: param_info["default"] = param_spec.get("default") optional_params.append(param_info) # Create complete tool schema tool_schema = { "name": tool_name, "description": description, "required_parameters": required_params, "optional_parameters": optional_params, "module": mcp_module_name, } success = tool_registry.register_tool(tool_schema, mcp_module_name) if success: tool_registry._name_to_function[tool_name] = wrapper_function tools_added += 1 # Update MCP functions registry with parameter information self.mcp_functions[tool_name]["required_parameters"] = required_params self.mcp_functions[tool_name]["optional_parameters"] = optional_params if tools_added > 0: print(f"āœ… Added {tools_added} remote MCP tools from {server_name} server") def list_mcp_tools(self) -> Dict[str, dict]: """List all loaded MCP tools.""" return self.mcp_functions.copy() def remove_mcp_tool(self, tool_name: str, tool_registry=None) -> bool: """Remove an MCP tool by name.""" if not self.has_mcp_functions() or tool_name not in self.mcp_functions: return False # Remove from tool registry if tool_registry: tool_registry.remove_tool_by_name(tool_name) # Remove from MCP functions del self.mcp_functions[tool_name] return True def show_mcp_status(self) -> None: """Display detailed MCP status information to the user.""" if not self.has_mcp_functions(): self.console.print("šŸ”— No MCP tools loaded") return mcp_tools = self.mcp_functions if not mcp_tools: self.console.print("šŸ”— MCP system initialized but no tools loaded") return # Group tools by server servers = self.group_tools_by_server(mcp_tools) # Display server information self.console.print(f"\nšŸ”— MCP Status Report:") self.console.print(f" šŸ“Š Total servers: {len(servers)}") self.console.print(f" šŸ› ļø Total MCP tools: {len(mcp_tools)}") for server_name, tools in servers.items(): self.console.print(f"\n šŸ“” Server: {server_name}") self.console.print(f" Status: āœ… Active ({len(tools)} tools)") for tool in tools: self.console.print(f" • {tool['name']}: {tool['description']}") def get_mcp_summary(self) -> Dict[str, any]: """Get a summary of MCP tools for programmatic access.""" if not self.has_mcp_functions(): return {"total_tools": 0, "servers": {}, "tools": {}} mcp_tools = self.mcp_functions # Group tools by server but only get tool names servers = {} for tool_name, tool_info in mcp_tools.items(): server_name = tool_info.get('server', 'unknown') if server_name not in servers: servers[server_name] = [] servers[server_name].append(tool_name) return { "total_tools": len(mcp_tools), "total_servers": len(servers), "servers": servers, "tools": {name: info.get('description', '') for name, info in mcp_tools.items()} }