Spaces:
Sleeping
Sleeping
| """MCP (Model Context Protocol) tool wrapper for browser-use. | |
| This module provides integration between MCP tools and browser-use's action registry system. | |
| MCP tools are dynamically discovered and registered as browser-use actions. | |
| """ | |
| import asyncio | |
| import logging | |
| from typing import Any | |
| from pydantic import Field, create_model | |
| from browser_use.agent.views import ActionResult | |
| from browser_use.tools.registry.service import Registry | |
| logger = logging.getLogger(__name__) | |
| try: | |
| from mcp import ClientSession, StdioServerParameters | |
| from mcp.client.stdio import stdio_client | |
| from mcp.types import TextContent, Tool | |
| MCP_AVAILABLE = True | |
| except ImportError: | |
| MCP_AVAILABLE = False | |
| logger.warning('MCP SDK not installed. Install with: pip install mcp') | |
| class MCPToolWrapper: | |
| """Wrapper to integrate MCP tools as browser-use actions.""" | |
| def __init__(self, registry: Registry, mcp_command: str, mcp_args: list[str] | None = None): | |
| """Initialize MCP tool wrapper. | |
| Args: | |
| registry: Browser-use action registry to register MCP tools | |
| mcp_command: Command to start MCP server (e.g., "npx") | |
| mcp_args: Arguments for MCP command (e.g., ["@playwright/mcp@latest"]) | |
| """ | |
| if not MCP_AVAILABLE: | |
| raise ImportError('MCP SDK not installed. Install with: pip install mcp') | |
| self.registry = registry | |
| self.mcp_command = mcp_command | |
| self.mcp_args = mcp_args or [] | |
| self.session: ClientSession | None = None | |
| self._tools: dict[str, Tool] = {} | |
| self._registered_actions: set[str] = set() | |
| self._shutdown_event = asyncio.Event() | |
| async def connect(self): | |
| """Connect to MCP server and discover available tools.""" | |
| if self.session: | |
| return # Already connected | |
| logger.info(f'π Connecting to MCP server: {self.mcp_command} {" ".join(self.mcp_args)}') | |
| # Create server parameters | |
| server_params = StdioServerParameters(command=self.mcp_command, args=self.mcp_args, env=None) | |
| # Connect to the MCP server | |
| async with stdio_client(server_params) as (read, write): | |
| async with ClientSession(read, write) as session: | |
| self.session = session | |
| # Initialize the connection | |
| await session.initialize() | |
| # Discover available tools | |
| tools_response = await session.list_tools() | |
| self._tools = {tool.name: tool for tool in tools_response.tools} | |
| logger.info(f'π¦ Discovered {len(self._tools)} MCP tools: {list(self._tools.keys())}') | |
| # Register all discovered tools as actions | |
| for tool_name, tool in self._tools.items(): | |
| self._register_tool_as_action(tool_name, tool) | |
| # Keep session alive while tools are being used | |
| await self._keep_session_alive() | |
| async def _keep_session_alive(self): | |
| """Keep the MCP session alive.""" | |
| # This will block until the session is closed | |
| # In practice, you'd want to manage this lifecycle better | |
| try: | |
| await self._shutdown_event.wait() | |
| except asyncio.CancelledError: | |
| pass | |
| def _register_tool_as_action(self, tool_name: str, tool: Tool): | |
| """Register an MCP tool as a browser-use action. | |
| Args: | |
| tool_name: Name of the MCP tool | |
| tool: MCP Tool object with schema information | |
| """ | |
| if tool_name in self._registered_actions: | |
| return # Already registered | |
| # Parse tool parameters to create Pydantic model | |
| param_fields = {} | |
| if tool.inputSchema: | |
| # MCP tools use JSON Schema for parameters | |
| properties = tool.inputSchema.get('properties', {}) | |
| required = set(tool.inputSchema.get('required', [])) | |
| for param_name, param_schema in properties.items(): | |
| # Convert JSON Schema type to Python type | |
| param_type = self._json_schema_to_python_type(param_schema) | |
| # Determine if field is required | |
| if param_name in required: | |
| default = ... # Required field | |
| else: | |
| default = param_schema.get('default', None) | |
| # Add field description if available | |
| field_kwargs = {} | |
| if 'description' in param_schema: | |
| field_kwargs['description'] = param_schema['description'] | |
| param_fields[param_name] = (param_type, Field(default, **field_kwargs)) | |
| # Create Pydantic model for the tool parameters | |
| param_model = create_model(f'{tool_name}_Params', **param_fields) if param_fields else None | |
| # Determine if this is a browser-specific tool | |
| is_browser_tool = tool_name.startswith('browser_') | |
| domains = None | |
| # Note: page_filter has been removed since we no longer use Page objects | |
| # Create wrapper function for the MCP tool | |
| async def mcp_action_wrapper(**kwargs): | |
| """Wrapper function that calls the MCP tool.""" | |
| if not self.session: | |
| raise RuntimeError(f'MCP session not connected for tool {tool_name}') | |
| # Extract parameters (excluding special injected params) | |
| special_params = { | |
| 'page', | |
| 'browser_session', | |
| 'context', | |
| 'page_extraction_llm', | |
| 'file_system', | |
| 'available_file_paths', | |
| 'has_sensitive_data', | |
| 'browser', | |
| 'browser_context', | |
| } | |
| tool_params = {k: v for k, v in kwargs.items() if k not in special_params} | |
| logger.debug(f'π§ Calling MCP tool {tool_name} with params: {tool_params}') | |
| try: | |
| # Call the MCP tool | |
| result = await self.session.call_tool(tool_name, tool_params) | |
| # Convert MCP result to ActionResult | |
| # MCP tools return results in various formats | |
| if hasattr(result, 'content'): | |
| # Handle structured content responses | |
| if isinstance(result.content, list): | |
| # Multiple content items | |
| content_parts = [] | |
| for item in result.content: | |
| if isinstance(item, TextContent): | |
| content_parts.append(item.text) # type: ignore[reportAttributeAccessIssue] | |
| else: | |
| content_parts.append(str(item)) | |
| extracted_content = '\n'.join(content_parts) | |
| else: | |
| extracted_content = str(result.content) | |
| else: | |
| # Direct result | |
| extracted_content = str(result) | |
| return ActionResult(extracted_content=extracted_content) | |
| except Exception as e: | |
| logger.error(f'β MCP tool {tool_name} failed: {e}') | |
| return ActionResult(extracted_content=f'MCP tool {tool_name} failed: {str(e)}', error=str(e)) | |
| # Set function name for better debugging | |
| mcp_action_wrapper.__name__ = tool_name | |
| mcp_action_wrapper.__qualname__ = f'mcp.{tool_name}' | |
| # Register the action with browser-use | |
| description = tool.description or f'MCP tool: {tool_name}' | |
| # Use the decorator to register the action | |
| decorated_wrapper = self.registry.action(description=description, param_model=param_model, domains=domains)( | |
| mcp_action_wrapper | |
| ) | |
| self._registered_actions.add(tool_name) | |
| logger.info(f'β Registered MCP tool as action: {tool_name}') | |
| async def disconnect(self): | |
| """Disconnect from the MCP server and clean up resources.""" | |
| self._shutdown_event.set() | |
| if self.session: | |
| # Session cleanup will be handled by the context manager | |
| self.session = None | |
| def _json_schema_to_python_type(self, schema: dict) -> Any: | |
| """Convert JSON Schema type to Python type. | |
| Args: | |
| schema: JSON Schema definition | |
| Returns: | |
| Python type corresponding to the schema | |
| """ | |
| json_type = schema.get('type', 'string') | |
| type_mapping = { | |
| 'string': str, | |
| 'number': float, | |
| 'integer': int, | |
| 'boolean': bool, | |
| 'array': list, | |
| 'object': dict, | |
| } | |
| base_type = type_mapping.get(json_type, str) | |
| # Handle nullable types | |
| if schema.get('nullable', False): | |
| return base_type | None | |
| return base_type | |
| # Convenience function for easy integration | |
| async def register_mcp_tools(registry: Registry, mcp_command: str, mcp_args: list[str] | None = None) -> MCPToolWrapper: | |
| """Register MCP tools with a browser-use registry. | |
| Args: | |
| registry: Browser-use action registry | |
| mcp_command: Command to start MCP server | |
| mcp_args: Arguments for MCP command | |
| Returns: | |
| MCPToolWrapper instance (connected) | |
| Example: | |
| ```python | |
| from browser_use import Tools | |
| from browser_use.mcp.tools import register_mcp_tools | |
| tools = Tools() | |
| # Register Playwright MCP tools | |
| mcp = await register_mcp_tools(tools.registry, 'npx', ['@playwright/mcp@latest', '--headless']) | |
| # Now all MCP tools are available as browser-use actions | |
| ``` | |
| """ | |
| wrapper = MCPToolWrapper(registry, mcp_command, mcp_args) | |
| await wrapper.connect() | |
| return wrapper | |