Speedofmastery's picture
Merge Landrun + Browser-Use + Chromium with AI agent support (without binary files)
d7b3d84
"""MCP (Model Context Protocol) tool wrapper for browser-use.
This module provides integration between MCP tools and browser-use's action registry system.
MCP tools are dynamically discovered and registered as browser-use actions.
"""
import asyncio
import logging
from typing import Any
from pydantic import Field, create_model
from browser_use.agent.views import ActionResult
from browser_use.tools.registry.service import Registry
logger = logging.getLogger(__name__)
try:
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from mcp.types import TextContent, Tool
MCP_AVAILABLE = True
except ImportError:
MCP_AVAILABLE = False
logger.warning('MCP SDK not installed. Install with: pip install mcp')
class MCPToolWrapper:
"""Wrapper to integrate MCP tools as browser-use actions."""
def __init__(self, registry: Registry, mcp_command: str, mcp_args: list[str] | None = None):
"""Initialize MCP tool wrapper.
Args:
registry: Browser-use action registry to register MCP tools
mcp_command: Command to start MCP server (e.g., "npx")
mcp_args: Arguments for MCP command (e.g., ["@playwright/mcp@latest"])
"""
if not MCP_AVAILABLE:
raise ImportError('MCP SDK not installed. Install with: pip install mcp')
self.registry = registry
self.mcp_command = mcp_command
self.mcp_args = mcp_args or []
self.session: ClientSession | None = None
self._tools: dict[str, Tool] = {}
self._registered_actions: set[str] = set()
self._shutdown_event = asyncio.Event()
async def connect(self):
"""Connect to MCP server and discover available tools."""
if self.session:
return # Already connected
logger.info(f'πŸ”Œ Connecting to MCP server: {self.mcp_command} {" ".join(self.mcp_args)}')
# Create server parameters
server_params = StdioServerParameters(command=self.mcp_command, args=self.mcp_args, env=None)
# Connect to the MCP server
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
self.session = session
# Initialize the connection
await session.initialize()
# Discover available tools
tools_response = await session.list_tools()
self._tools = {tool.name: tool for tool in tools_response.tools}
logger.info(f'πŸ“¦ Discovered {len(self._tools)} MCP tools: {list(self._tools.keys())}')
# Register all discovered tools as actions
for tool_name, tool in self._tools.items():
self._register_tool_as_action(tool_name, tool)
# Keep session alive while tools are being used
await self._keep_session_alive()
async def _keep_session_alive(self):
"""Keep the MCP session alive."""
# This will block until the session is closed
# In practice, you'd want to manage this lifecycle better
try:
await self._shutdown_event.wait()
except asyncio.CancelledError:
pass
def _register_tool_as_action(self, tool_name: str, tool: Tool):
"""Register an MCP tool as a browser-use action.
Args:
tool_name: Name of the MCP tool
tool: MCP Tool object with schema information
"""
if tool_name in self._registered_actions:
return # Already registered
# Parse tool parameters to create Pydantic model
param_fields = {}
if tool.inputSchema:
# MCP tools use JSON Schema for parameters
properties = tool.inputSchema.get('properties', {})
required = set(tool.inputSchema.get('required', []))
for param_name, param_schema in properties.items():
# Convert JSON Schema type to Python type
param_type = self._json_schema_to_python_type(param_schema)
# Determine if field is required
if param_name in required:
default = ... # Required field
else:
default = param_schema.get('default', None)
# Add field description if available
field_kwargs = {}
if 'description' in param_schema:
field_kwargs['description'] = param_schema['description']
param_fields[param_name] = (param_type, Field(default, **field_kwargs))
# Create Pydantic model for the tool parameters
param_model = create_model(f'{tool_name}_Params', **param_fields) if param_fields else None
# Determine if this is a browser-specific tool
is_browser_tool = tool_name.startswith('browser_')
domains = None
# Note: page_filter has been removed since we no longer use Page objects
# Create wrapper function for the MCP tool
async def mcp_action_wrapper(**kwargs):
"""Wrapper function that calls the MCP tool."""
if not self.session:
raise RuntimeError(f'MCP session not connected for tool {tool_name}')
# Extract parameters (excluding special injected params)
special_params = {
'page',
'browser_session',
'context',
'page_extraction_llm',
'file_system',
'available_file_paths',
'has_sensitive_data',
'browser',
'browser_context',
}
tool_params = {k: v for k, v in kwargs.items() if k not in special_params}
logger.debug(f'πŸ”§ Calling MCP tool {tool_name} with params: {tool_params}')
try:
# Call the MCP tool
result = await self.session.call_tool(tool_name, tool_params)
# Convert MCP result to ActionResult
# MCP tools return results in various formats
if hasattr(result, 'content'):
# Handle structured content responses
if isinstance(result.content, list):
# Multiple content items
content_parts = []
for item in result.content:
if isinstance(item, TextContent):
content_parts.append(item.text) # type: ignore[reportAttributeAccessIssue]
else:
content_parts.append(str(item))
extracted_content = '\n'.join(content_parts)
else:
extracted_content = str(result.content)
else:
# Direct result
extracted_content = str(result)
return ActionResult(extracted_content=extracted_content)
except Exception as e:
logger.error(f'❌ MCP tool {tool_name} failed: {e}')
return ActionResult(extracted_content=f'MCP tool {tool_name} failed: {str(e)}', error=str(e))
# Set function name for better debugging
mcp_action_wrapper.__name__ = tool_name
mcp_action_wrapper.__qualname__ = f'mcp.{tool_name}'
# Register the action with browser-use
description = tool.description or f'MCP tool: {tool_name}'
# Use the decorator to register the action
decorated_wrapper = self.registry.action(description=description, param_model=param_model, domains=domains)(
mcp_action_wrapper
)
self._registered_actions.add(tool_name)
logger.info(f'βœ… Registered MCP tool as action: {tool_name}')
async def disconnect(self):
"""Disconnect from the MCP server and clean up resources."""
self._shutdown_event.set()
if self.session:
# Session cleanup will be handled by the context manager
self.session = None
def _json_schema_to_python_type(self, schema: dict) -> Any:
"""Convert JSON Schema type to Python type.
Args:
schema: JSON Schema definition
Returns:
Python type corresponding to the schema
"""
json_type = schema.get('type', 'string')
type_mapping = {
'string': str,
'number': float,
'integer': int,
'boolean': bool,
'array': list,
'object': dict,
}
base_type = type_mapping.get(json_type, str)
# Handle nullable types
if schema.get('nullable', False):
return base_type | None
return base_type
# Convenience function for easy integration
async def register_mcp_tools(registry: Registry, mcp_command: str, mcp_args: list[str] | None = None) -> MCPToolWrapper:
"""Register MCP tools with a browser-use registry.
Args:
registry: Browser-use action registry
mcp_command: Command to start MCP server
mcp_args: Arguments for MCP command
Returns:
MCPToolWrapper instance (connected)
Example:
```python
from browser_use import Tools
from browser_use.mcp.tools import register_mcp_tools
tools = Tools()
# Register Playwright MCP tools
mcp = await register_mcp_tools(tools.registry, 'npx', ['@playwright/mcp@latest', '--headless'])
# Now all MCP tools are available as browser-use actions
```
"""
wrapper = MCPToolWrapper(registry, mcp_command, mcp_args)
await wrapper.connect()
return wrapper