Spaces:
Running
Running
| """ | |
| MCP Manager for CodeAct Agent. | |
| Manages MCP (Model Context Protocol) tools and servers. | |
| """ | |
| import os | |
| import sys | |
| import types | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Any | |
| from rich.console import Console | |
| class MCPManager: | |
| """Manages MCP (Model Context Protocol) tools and servers.""" | |
| def __init__(self, console_display=None): | |
| self.mcp_functions = {} | |
| self.console = console_display.console if console_display else Console() | |
| def has_mcp_functions(self) -> bool: | |
| """Check if MCP functions are available.""" | |
| return bool(self.mcp_functions) | |
| def group_tools_by_server(self, mcp_tools: Dict[str, dict]) -> Dict[str, List[Dict]]: | |
| """Group MCP tools by server name.""" | |
| servers = {} | |
| for tool_name, tool_info in mcp_tools.items(): | |
| server_name = tool_info.get('server', 'unknown') | |
| if server_name not in servers: | |
| servers[server_name] = [] | |
| servers[server_name].append({ | |
| 'name': tool_name, | |
| 'description': tool_info.get('description', 'No description') | |
| }) | |
| return servers | |
| def add_mcp(self, config_path: str = "./mcp_config.yaml", tool_registry=None) -> None: | |
| """Add MCP tools from configuration file.""" | |
| try: | |
| import asyncio | |
| import yaml | |
| except ImportError as e: | |
| raise ImportError(f"Required packages not available: {e}. Install with: pip install pyyaml") from e | |
| try: | |
| import nest_asyncio | |
| from mcp import ClientSession | |
| from mcp.client.stdio import StdioServerParameters, stdio_client | |
| from mcp.client.streamable_http import streamablehttp_client | |
| from langchain_mcp_adapters.tools import _list_all_tools | |
| nest_asyncio.apply() | |
| except ImportError as e: | |
| raise ImportError(f"MCP packages not available: {e}. Install with: pip install mcp langchain-mcp-adapters") from e | |
| def discover_mcp_tools_sync(server_params: StdioServerParameters) -> List[dict]: | |
| """Discover available tools from MCP server synchronously.""" | |
| try: | |
| async def _discover_async(): | |
| async with stdio_client(server_params) as (reader, writer): | |
| async with ClientSession(reader, writer) as session: | |
| await session.initialize() | |
| tools_result = await session.list_tools() | |
| tools = tools_result.tools if hasattr(tools_result, "tools") else tools_result | |
| print(tools) | |
| discovered_tools = [] | |
| for tool in tools: | |
| if hasattr(tool, "name"): | |
| # Ensure description is never empty or None | |
| description = getattr(tool, 'description', None) | |
| if not description or description.strip() == "": | |
| # Generate description from tool name | |
| formatted_name = tool.name.replace('_', ' ').title() | |
| description = f"MCP tool: {formatted_name}" | |
| discovered_tools.append({ | |
| "name": tool.name, | |
| "description": description, | |
| "inputSchema": tool.inputSchema, | |
| }) | |
| else: | |
| print(f"Warning: Skipping tool with no name attribute: {tool}") | |
| return discovered_tools | |
| return asyncio.run(_discover_async()) | |
| except Exception as e: | |
| print(f"Failed to discover tools: {e}") | |
| return [] | |
| def discover_remote_mcp_tools_sync(url: str) -> List[dict]: | |
| """Discover available tools from remote MCP server synchronously.""" | |
| try: | |
| async def _discover_remote_async(): | |
| async with streamablehttp_client(url) as (read, write, _): | |
| async with ClientSession(read, write) as session: | |
| await session.initialize() | |
| tools = await _list_all_tools(session) | |
| discovered_tools = [] | |
| for tool in tools: | |
| if hasattr(tool, "name"): | |
| # Ensure description is never empty or None | |
| description = getattr(tool, 'description', None) | |
| if not description or description.strip() == "": | |
| # Generate description from tool name | |
| formatted_name = tool.name.replace('_', ' ').title() | |
| description = f"MCP tool: {formatted_name}" | |
| discovered_tools.append({ | |
| "name": tool.name, | |
| "description": description, | |
| "inputSchema": tool.inputSchema, | |
| }) | |
| else: | |
| print(f"Warning: Skipping tool with no name attribute: {tool}") | |
| return discovered_tools | |
| return asyncio.run(_discover_remote_async()) | |
| except Exception as e: | |
| print(f"Failed to discover remote tools from {url}: {e}") | |
| return [] | |
| def make_mcp_wrapper(cmd: str, args: List[str], tool_name: str, doc: str, env_vars: dict = None): | |
| """Create a synchronous wrapper for an async MCP tool call.""" | |
| def sync_tool_wrapper(**kwargs): | |
| """Synchronous wrapper for MCP tool execution.""" | |
| try: | |
| server_params = StdioServerParameters(command=cmd, args=args, env=env_vars) | |
| async def async_tool_call(): | |
| async with stdio_client(server_params) as (reader, writer): | |
| async with ClientSession(reader, writer) as session: | |
| await session.initialize() | |
| result = await session.call_tool(tool_name, kwargs) | |
| content = result.content[0] | |
| if hasattr(content, "model_dump_json"): | |
| return content.model_dump_json() | |
| elif hasattr(content, "json"): | |
| return content.json() | |
| return content.text | |
| try: | |
| loop = asyncio.get_running_loop() | |
| return loop.create_task(async_tool_call()) | |
| except RuntimeError: | |
| return asyncio.run(async_tool_call()) | |
| except Exception as e: | |
| raise RuntimeError(f"MCP tool execution failed for '{tool_name}': {e}") from e | |
| sync_tool_wrapper.__name__ = tool_name | |
| sync_tool_wrapper.__doc__ = doc | |
| return sync_tool_wrapper | |
| def make_remote_mcp_wrapper(url: str, tool_name: str, doc: str): | |
| """Create a synchronous wrapper for an async remote MCP tool call.""" | |
| def sync_tool_wrapper(**kwargs): | |
| """Synchronous wrapper for remote MCP tool execution.""" | |
| try: | |
| async def async_tool_call(): | |
| async with streamablehttp_client(url) as (read, write, _): | |
| async with ClientSession(read, write) as session: | |
| await session.initialize() | |
| result = await session.call_tool(tool_name, kwargs) | |
| content = result.content[0] | |
| if hasattr(content, "model_dump_json"): | |
| return content.model_dump_json() | |
| elif hasattr(content, "json"): | |
| return content.json() | |
| return content.text | |
| try: | |
| loop = asyncio.get_running_loop() | |
| return loop.create_task(async_tool_call()) | |
| except RuntimeError: | |
| return asyncio.run(async_tool_call()) | |
| except Exception as e: | |
| raise RuntimeError(f"Remote MCP tool execution failed for '{tool_name}': {e}") from e | |
| sync_tool_wrapper.__name__ = tool_name | |
| sync_tool_wrapper.__doc__ = doc | |
| return sync_tool_wrapper | |
| # Load and validate configuration | |
| try: | |
| config_content = Path(config_path).read_text(encoding="utf-8") | |
| cfg = yaml.safe_load(config_content) or {} | |
| except FileNotFoundError: | |
| raise FileNotFoundError(f"MCP config file not found: {config_path}") from None | |
| except yaml.YAMLError as e: | |
| raise yaml.YAMLError(f"Invalid YAML in MCP config: {e}") from e | |
| mcp_servers = cfg.get("mcp_servers", {}) | |
| if not mcp_servers: | |
| print("Warning: No MCP servers found in configuration") | |
| return | |
| # Process each MCP server configuration | |
| for server_name, server_meta in mcp_servers.items(): | |
| if not server_meta.get("enabled", True): | |
| continue | |
| # Check if this is a remote server configuration | |
| remote_url = server_meta.get("url") | |
| if remote_url: | |
| # Handle remote MCP server | |
| self._process_remote_server(server_name, server_meta, remote_url, tool_registry, discover_remote_mcp_tools_sync, make_remote_mcp_wrapper) | |
| continue | |
| # Handle local MCP server (existing logic) | |
| # Validate command configuration | |
| cmd_list = server_meta.get("command", []) | |
| if not cmd_list or not isinstance(cmd_list, list): | |
| print(f"Warning: Invalid command configuration for server '{server_name}'") | |
| continue | |
| cmd, *args = cmd_list | |
| # Process environment variables | |
| env_vars = server_meta.get("env", {}) | |
| if env_vars: | |
| processed_env = {} | |
| for key, value in env_vars.items(): | |
| if isinstance(value, str) and value.startswith("${") and value.endswith("}"): | |
| var_name = value[2:-1] | |
| processed_env[key] = os.getenv(var_name, "") | |
| else: | |
| processed_env[key] = value | |
| env_vars = processed_env | |
| # Create module namespace for this MCP server | |
| mcp_module_name = f"mcp_servers.{server_name}" | |
| if mcp_module_name not in sys.modules: | |
| sys.modules[mcp_module_name] = types.ModuleType(mcp_module_name) | |
| server_module = sys.modules[mcp_module_name] | |
| tools_config = server_meta.get("tools", []) | |
| # Auto-discover tools if not manually configured | |
| if not tools_config: | |
| try: | |
| server_params = StdioServerParameters(command=cmd, args=args, env=env_vars) | |
| tools_config = discover_mcp_tools_sync(server_params) | |
| if tools_config: | |
| print(f"π Discovered {len(tools_config)} tools from {server_name} MCP server") | |
| else: | |
| print(f"Warning: No tools discovered from {server_name} MCP server") | |
| continue | |
| except Exception as e: | |
| print(f"Failed to discover tools for {server_name}: {e}") | |
| continue | |
| # Register each tool | |
| tools_added = 0 | |
| for tool_meta in tools_config: | |
| if isinstance(tool_meta, dict) and "biomni_name" in tool_meta: | |
| # Manual tool definition (Biomni-style) | |
| tool_name = tool_meta.get("biomni_name") | |
| description = tool_meta.get("description", f"MCP tool: {tool_name}") | |
| parameters = tool_meta.get("parameters", {}) | |
| required_param_names = [] | |
| for param_name, param_spec in parameters.items(): | |
| if param_spec.get("required", False): | |
| required_param_names.append(param_name) | |
| else: | |
| # Auto-discovered tool | |
| tool_name = tool_meta.get("name") | |
| description = tool_meta.get("description", "") | |
| # Ensure description is never empty | |
| if not description or description.strip() == "": | |
| formatted_name = tool_name.replace('_', ' ').title() | |
| description = f"MCP tool: {formatted_name}" | |
| input_schema = tool_meta.get("inputSchema", {}) | |
| parameters = input_schema.get("properties", {}) | |
| required_param_names = input_schema.get("required", []) | |
| if not tool_name: | |
| print(f"Warning: Skipping tool with no name in {server_name}") | |
| continue | |
| # Create wrapper function | |
| wrapper_function = make_mcp_wrapper(cmd, args, tool_name, description, env_vars) | |
| # Add to module namespace | |
| setattr(server_module, tool_name, wrapper_function) | |
| # Store in MCP functions registry with parameter information | |
| self.mcp_functions[tool_name] = { | |
| "function": wrapper_function, | |
| "server": server_name, | |
| "module": mcp_module_name, | |
| "description": description, | |
| "required_parameters": [], # Will be populated below | |
| "optional_parameters": [] # Will be populated below | |
| } | |
| # Register with tool registry if available | |
| if tool_registry: | |
| from .tool_registry import ToolRegistry | |
| # Create tool schema with proper parameter information | |
| required_params = [] | |
| optional_params = [] | |
| for param_name, param_spec in parameters.items(): | |
| param_info = { | |
| "name": param_name, | |
| "type": param_spec.get("type", "string"), | |
| "description": param_spec.get("description", f"Parameter {param_name}"), | |
| } | |
| # Extract enum/literal values if present | |
| if "enum" in param_spec: | |
| param_info["enum"] = param_spec["enum"] | |
| # Handle anyOf schemas (common for optional literal types) | |
| if "anyOf" in param_spec: | |
| # Look for enum in anyOf schemas | |
| for schema_option in param_spec["anyOf"]: | |
| if "enum" in schema_option: | |
| param_info["enum"] = schema_option["enum"] | |
| # Update type if specified | |
| if "type" in schema_option: | |
| param_info["type"] = schema_option["type"] | |
| break | |
| # Handle oneOf schemas (alternative union syntax) | |
| if "oneOf" in param_spec: | |
| # Look for enum in oneOf schemas | |
| for schema_option in param_spec["oneOf"]: | |
| if "enum" in schema_option: | |
| param_info["enum"] = schema_option["enum"] | |
| if "type" in schema_option: | |
| param_info["type"] = schema_option["type"] | |
| break | |
| # Determine if parameter is required based on: | |
| # 1. Explicit required list (if provided) | |
| # 2. If no default value is present in the schema | |
| is_required = (param_name in required_param_names) or ("default" not in param_spec) | |
| if is_required: | |
| required_params.append(param_info) | |
| else: | |
| param_info["default"] = param_spec.get("default") | |
| optional_params.append(param_info) | |
| # Create complete tool schema | |
| tool_schema = { | |
| "name": tool_name, | |
| "description": description, | |
| "required_parameters": required_params, | |
| "optional_parameters": optional_params, | |
| "module": mcp_module_name, | |
| } | |
| success = tool_registry.register_tool(tool_schema, mcp_module_name) | |
| if success: | |
| tool_registry._name_to_function[tool_name] = wrapper_function | |
| tools_added += 1 | |
| # Update MCP functions registry with parameter information | |
| self.mcp_functions[tool_name]["required_parameters"] = required_params | |
| self.mcp_functions[tool_name]["optional_parameters"] = optional_params | |
| if tools_added > 0: | |
| print(f"β Added {tools_added} MCP tools from {server_name} server") | |
| print(f"π οΈ Total MCP tools loaded: {len(self.mcp_functions)}") | |
| def _process_remote_server(self, server_name: str, server_meta: dict, remote_url: str, tool_registry, discover_remote_mcp_tools_sync, make_remote_mcp_wrapper): | |
| """Process a remote MCP server configuration.""" | |
| import sys | |
| import types | |
| # Create module namespace for this remote MCP server | |
| mcp_module_name = f"mcp_servers.{server_name}" | |
| if mcp_module_name not in sys.modules: | |
| sys.modules[mcp_module_name] = types.ModuleType(mcp_module_name) | |
| server_module = sys.modules[mcp_module_name] | |
| tools_config = server_meta.get("tools", []) | |
| # Auto-discover tools if not manually configured | |
| if not tools_config: | |
| try: | |
| tools_config = discover_remote_mcp_tools_sync(remote_url) | |
| if tools_config: | |
| print(f"π Discovered {len(tools_config)} tools from {server_name} remote MCP server") | |
| else: | |
| print(f"Warning: No tools discovered from {server_name} remote MCP server") | |
| return | |
| except Exception as e: | |
| print(f"Failed to discover tools for remote {server_name}: {e}") | |
| return | |
| # Register each tool | |
| tools_added = 0 | |
| for tool_meta in tools_config: | |
| if isinstance(tool_meta, dict) and "biomni_name" in tool_meta: | |
| # Manual tool definition (Biomni-style) | |
| tool_name = tool_meta.get("biomni_name") | |
| description = tool_meta.get("description", f"Remote MCP tool: {tool_name}") | |
| parameters = tool_meta.get("parameters", {}) | |
| required_param_names = [] | |
| for param_name, param_spec in parameters.items(): | |
| if param_spec.get("required", False): | |
| required_param_names.append(param_name) | |
| else: | |
| # Auto-discovered tool | |
| tool_name = tool_meta.get("name") | |
| description = tool_meta.get("description", "") | |
| # Ensure description is never empty | |
| if not description or description.strip() == "": | |
| formatted_name = tool_name.replace('_', ' ').title() | |
| description = f"Remote MCP tool: {formatted_name}" | |
| input_schema = tool_meta.get("inputSchema", {}) | |
| parameters = input_schema.get("properties", {}) | |
| required_param_names = input_schema.get("required", []) | |
| if not tool_name: | |
| print(f"Warning: Skipping tool with no name in remote {server_name}") | |
| continue | |
| # Create wrapper function for remote tool | |
| wrapper_function = make_remote_mcp_wrapper(remote_url, tool_name, description) | |
| # Add to module namespace | |
| setattr(server_module, tool_name, wrapper_function) | |
| # Store in MCP functions registry with parameter information | |
| self.mcp_functions[tool_name] = { | |
| "function": wrapper_function, | |
| "server": server_name, | |
| "module": mcp_module_name, | |
| "description": description, | |
| "required_parameters": [], # Will be populated below | |
| "optional_parameters": [], # Will be populated below | |
| "remote_url": remote_url | |
| } | |
| # Register with tool registry if available | |
| if tool_registry: | |
| from .tool_registry import ToolRegistry | |
| # Create tool schema with proper parameter information | |
| required_params = [] | |
| optional_params = [] | |
| for param_name, param_spec in parameters.items(): | |
| param_info = { | |
| "name": param_name, | |
| "type": param_spec.get("type", "string"), | |
| "description": param_spec.get("description", f"Parameter {param_name}"), | |
| } | |
| # Extract enum/literal values if present | |
| if "enum" in param_spec: | |
| param_info["enum"] = param_spec["enum"] | |
| # Handle anyOf schemas (common for optional literal types) | |
| if "anyOf" in param_spec: | |
| # Look for enum in anyOf schemas | |
| for schema_option in param_spec["anyOf"]: | |
| if "enum" in schema_option: | |
| param_info["enum"] = schema_option["enum"] | |
| # Update type if specified | |
| if "type" in schema_option: | |
| param_info["type"] = schema_option["type"] | |
| break | |
| # Handle oneOf schemas (alternative union syntax) | |
| if "oneOf" in param_spec: | |
| # Look for enum in oneOf schemas | |
| for schema_option in param_spec["oneOf"]: | |
| if "enum" in schema_option: | |
| param_info["enum"] = schema_option["enum"] | |
| if "type" in schema_option: | |
| param_info["type"] = schema_option["type"] | |
| break | |
| # Determine if parameter is required based on: | |
| # 1. Explicit required list (if provided) | |
| # 2. If no default value is present in the schema | |
| is_required = (param_name in required_param_names) or ("default" not in param_spec) | |
| if is_required: | |
| required_params.append(param_info) | |
| else: | |
| param_info["default"] = param_spec.get("default") | |
| optional_params.append(param_info) | |
| # Create complete tool schema | |
| tool_schema = { | |
| "name": tool_name, | |
| "description": description, | |
| "required_parameters": required_params, | |
| "optional_parameters": optional_params, | |
| "module": mcp_module_name, | |
| } | |
| success = tool_registry.register_tool(tool_schema, mcp_module_name) | |
| if success: | |
| tool_registry._name_to_function[tool_name] = wrapper_function | |
| tools_added += 1 | |
| # Update MCP functions registry with parameter information | |
| self.mcp_functions[tool_name]["required_parameters"] = required_params | |
| self.mcp_functions[tool_name]["optional_parameters"] = optional_params | |
| if tools_added > 0: | |
| print(f"β Added {tools_added} remote MCP tools from {server_name} server") | |
| def list_mcp_tools(self) -> Dict[str, dict]: | |
| """List all loaded MCP tools.""" | |
| return self.mcp_functions.copy() | |
| def remove_mcp_tool(self, tool_name: str, tool_registry=None) -> bool: | |
| """Remove an MCP tool by name.""" | |
| if not self.has_mcp_functions() or tool_name not in self.mcp_functions: | |
| return False | |
| # Remove from tool registry | |
| if tool_registry: | |
| tool_registry.remove_tool_by_name(tool_name) | |
| # Remove from MCP functions | |
| del self.mcp_functions[tool_name] | |
| return True | |
| def show_mcp_status(self) -> None: | |
| """Display detailed MCP status information to the user.""" | |
| if not self.has_mcp_functions(): | |
| self.console.print("π No MCP tools loaded") | |
| return | |
| mcp_tools = self.mcp_functions | |
| if not mcp_tools: | |
| self.console.print("π MCP system initialized but no tools loaded") | |
| return | |
| # Group tools by server | |
| servers = self.group_tools_by_server(mcp_tools) | |
| # Display server information | |
| self.console.print(f"\nπ MCP Status Report:") | |
| self.console.print(f" π Total servers: {len(servers)}") | |
| self.console.print(f" π οΈ Total MCP tools: {len(mcp_tools)}") | |
| for server_name, tools in servers.items(): | |
| self.console.print(f"\n π‘ Server: {server_name}") | |
| self.console.print(f" Status: β Active ({len(tools)} tools)") | |
| for tool in tools: | |
| self.console.print(f" β’ {tool['name']}: {tool['description']}") | |
| def get_mcp_summary(self) -> Dict[str, any]: | |
| """Get a summary of MCP tools for programmatic access.""" | |
| if not self.has_mcp_functions(): | |
| return {"total_tools": 0, "servers": {}, "tools": {}} | |
| mcp_tools = self.mcp_functions | |
| # Group tools by server but only get tool names | |
| servers = {} | |
| for tool_name, tool_info in mcp_tools.items(): | |
| server_name = tool_info.get('server', 'unknown') | |
| if server_name not in servers: | |
| servers[server_name] = [] | |
| servers[server_name].append(tool_name) | |
| return { | |
| "total_tools": len(mcp_tools), | |
| "total_servers": len(servers), | |
| "servers": servers, | |
| "tools": {name: info.get('description', '') for name, info in mcp_tools.items()} | |
| } |