Paper2Agent's picture
Upload 56 files
8b54db1 verified
"""
MCP Manager for CodeAct Agent.
Manages MCP (Model Context Protocol) tools and servers.
"""
import os
import sys
import types
from pathlib import Path
from typing import Dict, List, Optional, Any
from rich.console import Console
class MCPManager:
"""Manages MCP (Model Context Protocol) tools and servers."""
def __init__(self, console_display=None):
self.mcp_functions = {}
self.console = console_display.console if console_display else Console()
def has_mcp_functions(self) -> bool:
"""Check if MCP functions are available."""
return bool(self.mcp_functions)
def group_tools_by_server(self, mcp_tools: Dict[str, dict]) -> Dict[str, List[Dict]]:
"""Group MCP tools by server name."""
servers = {}
for tool_name, tool_info in mcp_tools.items():
server_name = tool_info.get('server', 'unknown')
if server_name not in servers:
servers[server_name] = []
servers[server_name].append({
'name': tool_name,
'description': tool_info.get('description', 'No description')
})
return servers
def add_mcp(self, config_path: str = "./mcp_config.yaml", tool_registry=None) -> None:
"""Add MCP tools from configuration file."""
try:
import asyncio
import yaml
except ImportError as e:
raise ImportError(f"Required packages not available: {e}. Install with: pip install pyyaml") from e
try:
import nest_asyncio
from mcp import ClientSession
from mcp.client.stdio import StdioServerParameters, stdio_client
from mcp.client.streamable_http import streamablehttp_client
from langchain_mcp_adapters.tools import _list_all_tools
nest_asyncio.apply()
except ImportError as e:
raise ImportError(f"MCP packages not available: {e}. Install with: pip install mcp langchain-mcp-adapters") from e
def discover_mcp_tools_sync(server_params: StdioServerParameters) -> List[dict]:
"""Discover available tools from MCP server synchronously."""
try:
async def _discover_async():
async with stdio_client(server_params) as (reader, writer):
async with ClientSession(reader, writer) as session:
await session.initialize()
tools_result = await session.list_tools()
tools = tools_result.tools if hasattr(tools_result, "tools") else tools_result
print(tools)
discovered_tools = []
for tool in tools:
if hasattr(tool, "name"):
# Ensure description is never empty or None
description = getattr(tool, 'description', None)
if not description or description.strip() == "":
# Generate description from tool name
formatted_name = tool.name.replace('_', ' ').title()
description = f"MCP tool: {formatted_name}"
discovered_tools.append({
"name": tool.name,
"description": description,
"inputSchema": tool.inputSchema,
})
else:
print(f"Warning: Skipping tool with no name attribute: {tool}")
return discovered_tools
return asyncio.run(_discover_async())
except Exception as e:
print(f"Failed to discover tools: {e}")
return []
def discover_remote_mcp_tools_sync(url: str) -> List[dict]:
"""Discover available tools from remote MCP server synchronously."""
try:
async def _discover_remote_async():
async with streamablehttp_client(url) as (read, write, _):
async with ClientSession(read, write) as session:
await session.initialize()
tools = await _list_all_tools(session)
discovered_tools = []
for tool in tools:
if hasattr(tool, "name"):
# Ensure description is never empty or None
description = getattr(tool, 'description', None)
if not description or description.strip() == "":
# Generate description from tool name
formatted_name = tool.name.replace('_', ' ').title()
description = f"MCP tool: {formatted_name}"
discovered_tools.append({
"name": tool.name,
"description": description,
"inputSchema": tool.inputSchema,
})
else:
print(f"Warning: Skipping tool with no name attribute: {tool}")
return discovered_tools
return asyncio.run(_discover_remote_async())
except Exception as e:
print(f"Failed to discover remote tools from {url}: {e}")
return []
def make_mcp_wrapper(cmd: str, args: List[str], tool_name: str, doc: str, env_vars: dict = None):
"""Create a synchronous wrapper for an async MCP tool call."""
def sync_tool_wrapper(**kwargs):
"""Synchronous wrapper for MCP tool execution."""
try:
server_params = StdioServerParameters(command=cmd, args=args, env=env_vars)
async def async_tool_call():
async with stdio_client(server_params) as (reader, writer):
async with ClientSession(reader, writer) as session:
await session.initialize()
result = await session.call_tool(tool_name, kwargs)
content = result.content[0]
if hasattr(content, "model_dump_json"):
return content.model_dump_json()
elif hasattr(content, "json"):
return content.json()
return content.text
try:
loop = asyncio.get_running_loop()
return loop.create_task(async_tool_call())
except RuntimeError:
return asyncio.run(async_tool_call())
except Exception as e:
raise RuntimeError(f"MCP tool execution failed for '{tool_name}': {e}") from e
sync_tool_wrapper.__name__ = tool_name
sync_tool_wrapper.__doc__ = doc
return sync_tool_wrapper
def make_remote_mcp_wrapper(url: str, tool_name: str, doc: str):
"""Create a synchronous wrapper for an async remote MCP tool call."""
def sync_tool_wrapper(**kwargs):
"""Synchronous wrapper for remote MCP tool execution."""
try:
async def async_tool_call():
async with streamablehttp_client(url) as (read, write, _):
async with ClientSession(read, write) as session:
await session.initialize()
result = await session.call_tool(tool_name, kwargs)
content = result.content[0]
if hasattr(content, "model_dump_json"):
return content.model_dump_json()
elif hasattr(content, "json"):
return content.json()
return content.text
try:
loop = asyncio.get_running_loop()
return loop.create_task(async_tool_call())
except RuntimeError:
return asyncio.run(async_tool_call())
except Exception as e:
raise RuntimeError(f"Remote MCP tool execution failed for '{tool_name}': {e}") from e
sync_tool_wrapper.__name__ = tool_name
sync_tool_wrapper.__doc__ = doc
return sync_tool_wrapper
# Load and validate configuration
try:
config_content = Path(config_path).read_text(encoding="utf-8")
cfg = yaml.safe_load(config_content) or {}
except FileNotFoundError:
raise FileNotFoundError(f"MCP config file not found: {config_path}") from None
except yaml.YAMLError as e:
raise yaml.YAMLError(f"Invalid YAML in MCP config: {e}") from e
mcp_servers = cfg.get("mcp_servers", {})
if not mcp_servers:
print("Warning: No MCP servers found in configuration")
return
# Process each MCP server configuration
for server_name, server_meta in mcp_servers.items():
if not server_meta.get("enabled", True):
continue
# Check if this is a remote server configuration
remote_url = server_meta.get("url")
if remote_url:
# Handle remote MCP server
self._process_remote_server(server_name, server_meta, remote_url, tool_registry, discover_remote_mcp_tools_sync, make_remote_mcp_wrapper)
continue
# Handle local MCP server (existing logic)
# Validate command configuration
cmd_list = server_meta.get("command", [])
if not cmd_list or not isinstance(cmd_list, list):
print(f"Warning: Invalid command configuration for server '{server_name}'")
continue
cmd, *args = cmd_list
# Process environment variables
env_vars = server_meta.get("env", {})
if env_vars:
processed_env = {}
for key, value in env_vars.items():
if isinstance(value, str) and value.startswith("${") and value.endswith("}"):
var_name = value[2:-1]
processed_env[key] = os.getenv(var_name, "")
else:
processed_env[key] = value
env_vars = processed_env
# Create module namespace for this MCP server
mcp_module_name = f"mcp_servers.{server_name}"
if mcp_module_name not in sys.modules:
sys.modules[mcp_module_name] = types.ModuleType(mcp_module_name)
server_module = sys.modules[mcp_module_name]
tools_config = server_meta.get("tools", [])
# Auto-discover tools if not manually configured
if not tools_config:
try:
server_params = StdioServerParameters(command=cmd, args=args, env=env_vars)
tools_config = discover_mcp_tools_sync(server_params)
if tools_config:
print(f"πŸ” Discovered {len(tools_config)} tools from {server_name} MCP server")
else:
print(f"Warning: No tools discovered from {server_name} MCP server")
continue
except Exception as e:
print(f"Failed to discover tools for {server_name}: {e}")
continue
# Register each tool
tools_added = 0
for tool_meta in tools_config:
if isinstance(tool_meta, dict) and "biomni_name" in tool_meta:
# Manual tool definition (Biomni-style)
tool_name = tool_meta.get("biomni_name")
description = tool_meta.get("description", f"MCP tool: {tool_name}")
parameters = tool_meta.get("parameters", {})
required_param_names = []
for param_name, param_spec in parameters.items():
if param_spec.get("required", False):
required_param_names.append(param_name)
else:
# Auto-discovered tool
tool_name = tool_meta.get("name")
description = tool_meta.get("description", "")
# Ensure description is never empty
if not description or description.strip() == "":
formatted_name = tool_name.replace('_', ' ').title()
description = f"MCP tool: {formatted_name}"
input_schema = tool_meta.get("inputSchema", {})
parameters = input_schema.get("properties", {})
required_param_names = input_schema.get("required", [])
if not tool_name:
print(f"Warning: Skipping tool with no name in {server_name}")
continue
# Create wrapper function
wrapper_function = make_mcp_wrapper(cmd, args, tool_name, description, env_vars)
# Add to module namespace
setattr(server_module, tool_name, wrapper_function)
# Store in MCP functions registry with parameter information
self.mcp_functions[tool_name] = {
"function": wrapper_function,
"server": server_name,
"module": mcp_module_name,
"description": description,
"required_parameters": [], # Will be populated below
"optional_parameters": [] # Will be populated below
}
# Register with tool registry if available
if tool_registry:
from .tool_registry import ToolRegistry
# Create tool schema with proper parameter information
required_params = []
optional_params = []
for param_name, param_spec in parameters.items():
param_info = {
"name": param_name,
"type": param_spec.get("type", "string"),
"description": param_spec.get("description", f"Parameter {param_name}"),
}
# Extract enum/literal values if present
if "enum" in param_spec:
param_info["enum"] = param_spec["enum"]
# Handle anyOf schemas (common for optional literal types)
if "anyOf" in param_spec:
# Look for enum in anyOf schemas
for schema_option in param_spec["anyOf"]:
if "enum" in schema_option:
param_info["enum"] = schema_option["enum"]
# Update type if specified
if "type" in schema_option:
param_info["type"] = schema_option["type"]
break
# Handle oneOf schemas (alternative union syntax)
if "oneOf" in param_spec:
# Look for enum in oneOf schemas
for schema_option in param_spec["oneOf"]:
if "enum" in schema_option:
param_info["enum"] = schema_option["enum"]
if "type" in schema_option:
param_info["type"] = schema_option["type"]
break
# Determine if parameter is required based on:
# 1. Explicit required list (if provided)
# 2. If no default value is present in the schema
is_required = (param_name in required_param_names) or ("default" not in param_spec)
if is_required:
required_params.append(param_info)
else:
param_info["default"] = param_spec.get("default")
optional_params.append(param_info)
# Create complete tool schema
tool_schema = {
"name": tool_name,
"description": description,
"required_parameters": required_params,
"optional_parameters": optional_params,
"module": mcp_module_name,
}
success = tool_registry.register_tool(tool_schema, mcp_module_name)
if success:
tool_registry._name_to_function[tool_name] = wrapper_function
tools_added += 1
# Update MCP functions registry with parameter information
self.mcp_functions[tool_name]["required_parameters"] = required_params
self.mcp_functions[tool_name]["optional_parameters"] = optional_params
if tools_added > 0:
print(f"βœ… Added {tools_added} MCP tools from {server_name} server")
print(f"πŸ› οΈ Total MCP tools loaded: {len(self.mcp_functions)}")
def _process_remote_server(self, server_name: str, server_meta: dict, remote_url: str, tool_registry, discover_remote_mcp_tools_sync, make_remote_mcp_wrapper):
"""Process a remote MCP server configuration."""
import sys
import types
# Create module namespace for this remote MCP server
mcp_module_name = f"mcp_servers.{server_name}"
if mcp_module_name not in sys.modules:
sys.modules[mcp_module_name] = types.ModuleType(mcp_module_name)
server_module = sys.modules[mcp_module_name]
tools_config = server_meta.get("tools", [])
# Auto-discover tools if not manually configured
if not tools_config:
try:
tools_config = discover_remote_mcp_tools_sync(remote_url)
if tools_config:
print(f"πŸ” Discovered {len(tools_config)} tools from {server_name} remote MCP server")
else:
print(f"Warning: No tools discovered from {server_name} remote MCP server")
return
except Exception as e:
print(f"Failed to discover tools for remote {server_name}: {e}")
return
# Register each tool
tools_added = 0
for tool_meta in tools_config:
if isinstance(tool_meta, dict) and "biomni_name" in tool_meta:
# Manual tool definition (Biomni-style)
tool_name = tool_meta.get("biomni_name")
description = tool_meta.get("description", f"Remote MCP tool: {tool_name}")
parameters = tool_meta.get("parameters", {})
required_param_names = []
for param_name, param_spec in parameters.items():
if param_spec.get("required", False):
required_param_names.append(param_name)
else:
# Auto-discovered tool
tool_name = tool_meta.get("name")
description = tool_meta.get("description", "")
# Ensure description is never empty
if not description or description.strip() == "":
formatted_name = tool_name.replace('_', ' ').title()
description = f"Remote MCP tool: {formatted_name}"
input_schema = tool_meta.get("inputSchema", {})
parameters = input_schema.get("properties", {})
required_param_names = input_schema.get("required", [])
if not tool_name:
print(f"Warning: Skipping tool with no name in remote {server_name}")
continue
# Create wrapper function for remote tool
wrapper_function = make_remote_mcp_wrapper(remote_url, tool_name, description)
# Add to module namespace
setattr(server_module, tool_name, wrapper_function)
# Store in MCP functions registry with parameter information
self.mcp_functions[tool_name] = {
"function": wrapper_function,
"server": server_name,
"module": mcp_module_name,
"description": description,
"required_parameters": [], # Will be populated below
"optional_parameters": [], # Will be populated below
"remote_url": remote_url
}
# Register with tool registry if available
if tool_registry:
from .tool_registry import ToolRegistry
# Create tool schema with proper parameter information
required_params = []
optional_params = []
for param_name, param_spec in parameters.items():
param_info = {
"name": param_name,
"type": param_spec.get("type", "string"),
"description": param_spec.get("description", f"Parameter {param_name}"),
}
# Extract enum/literal values if present
if "enum" in param_spec:
param_info["enum"] = param_spec["enum"]
# Handle anyOf schemas (common for optional literal types)
if "anyOf" in param_spec:
# Look for enum in anyOf schemas
for schema_option in param_spec["anyOf"]:
if "enum" in schema_option:
param_info["enum"] = schema_option["enum"]
# Update type if specified
if "type" in schema_option:
param_info["type"] = schema_option["type"]
break
# Handle oneOf schemas (alternative union syntax)
if "oneOf" in param_spec:
# Look for enum in oneOf schemas
for schema_option in param_spec["oneOf"]:
if "enum" in schema_option:
param_info["enum"] = schema_option["enum"]
if "type" in schema_option:
param_info["type"] = schema_option["type"]
break
# Determine if parameter is required based on:
# 1. Explicit required list (if provided)
# 2. If no default value is present in the schema
is_required = (param_name in required_param_names) or ("default" not in param_spec)
if is_required:
required_params.append(param_info)
else:
param_info["default"] = param_spec.get("default")
optional_params.append(param_info)
# Create complete tool schema
tool_schema = {
"name": tool_name,
"description": description,
"required_parameters": required_params,
"optional_parameters": optional_params,
"module": mcp_module_name,
}
success = tool_registry.register_tool(tool_schema, mcp_module_name)
if success:
tool_registry._name_to_function[tool_name] = wrapper_function
tools_added += 1
# Update MCP functions registry with parameter information
self.mcp_functions[tool_name]["required_parameters"] = required_params
self.mcp_functions[tool_name]["optional_parameters"] = optional_params
if tools_added > 0:
print(f"βœ… Added {tools_added} remote MCP tools from {server_name} server")
def list_mcp_tools(self) -> Dict[str, dict]:
"""List all loaded MCP tools."""
return self.mcp_functions.copy()
def remove_mcp_tool(self, tool_name: str, tool_registry=None) -> bool:
"""Remove an MCP tool by name."""
if not self.has_mcp_functions() or tool_name not in self.mcp_functions:
return False
# Remove from tool registry
if tool_registry:
tool_registry.remove_tool_by_name(tool_name)
# Remove from MCP functions
del self.mcp_functions[tool_name]
return True
def show_mcp_status(self) -> None:
"""Display detailed MCP status information to the user."""
if not self.has_mcp_functions():
self.console.print("πŸ”— No MCP tools loaded")
return
mcp_tools = self.mcp_functions
if not mcp_tools:
self.console.print("πŸ”— MCP system initialized but no tools loaded")
return
# Group tools by server
servers = self.group_tools_by_server(mcp_tools)
# Display server information
self.console.print(f"\nπŸ”— MCP Status Report:")
self.console.print(f" πŸ“Š Total servers: {len(servers)}")
self.console.print(f" πŸ› οΈ Total MCP tools: {len(mcp_tools)}")
for server_name, tools in servers.items():
self.console.print(f"\n πŸ“‘ Server: {server_name}")
self.console.print(f" Status: βœ… Active ({len(tools)} tools)")
for tool in tools:
self.console.print(f" β€’ {tool['name']}: {tool['description']}")
def get_mcp_summary(self) -> Dict[str, any]:
"""Get a summary of MCP tools for programmatic access."""
if not self.has_mcp_functions():
return {"total_tools": 0, "servers": {}, "tools": {}}
mcp_tools = self.mcp_functions
# Group tools by server but only get tool names
servers = {}
for tool_name, tool_info in mcp_tools.items():
server_name = tool_info.get('server', 'unknown')
if server_name not in servers:
servers[server_name] = []
servers[server_name].append(tool_name)
return {
"total_tools": len(mcp_tools),
"total_servers": len(servers),
"servers": servers,
"tools": {name: info.get('description', '') for name, info in mcp_tools.items()}
}