Spaces:
Sleeping
Sleeping
| """MCP Server for browser-use - exposes browser automation capabilities via Model Context Protocol. | |
| This server provides tools for: | |
| - Running autonomous browser tasks with an AI agent | |
| - Direct browser control (navigation, clicking, typing, etc.) | |
| - Content extraction from web pages | |
| - File system operations | |
| Usage: | |
| uvx browser-use --mcp | |
| Or as an MCP server in Claude Desktop or other MCP clients: | |
| { | |
| "mcpServers": { | |
| "browser-use": { | |
| "command": "uvx", | |
| "args": ["browser-use[cli]", "--mcp"], | |
| "env": { | |
| "OPENAI_API_KEY": "sk-proj-1234567890", | |
| } | |
| } | |
| } | |
| } | |
| """ | |
| import os | |
| import sys | |
| from browser_use.llm import ChatAWSBedrock | |
| # Set environment variables BEFORE any browser_use imports to prevent early logging | |
| os.environ['BROWSER_USE_LOGGING_LEVEL'] = 'critical' | |
| os.environ['BROWSER_USE_SETUP_LOGGING'] = 'false' | |
| import asyncio | |
| import json | |
| import logging | |
| import time | |
| from pathlib import Path | |
| from typing import Any | |
| # Configure logging for MCP mode - redirect to stderr but preserve critical diagnostics | |
| logging.basicConfig( | |
| stream=sys.stderr, level=logging.WARNING, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', force=True | |
| ) | |
| try: | |
| import psutil | |
| PSUTIL_AVAILABLE = True | |
| except ImportError: | |
| PSUTIL_AVAILABLE = False | |
| # Add browser-use to path if running from source | |
| sys.path.insert(0, str(Path(__file__).parent.parent)) | |
| # Import and configure logging to use stderr before other imports | |
| from browser_use.logging_config import setup_logging | |
| def _configure_mcp_server_logging(): | |
| """Configure logging for MCP server mode - redirect all logs to stderr to prevent JSON RPC interference.""" | |
| # Set environment to suppress browser-use logging during server mode | |
| os.environ['BROWSER_USE_LOGGING_LEVEL'] = 'warning' | |
| os.environ['BROWSER_USE_SETUP_LOGGING'] = 'false' # Prevent automatic logging setup | |
| # Configure logging to stderr for MCP mode - preserve warnings and above for troubleshooting | |
| setup_logging(stream=sys.stderr, log_level='warning', force_setup=True) | |
| # Also configure the root logger and all existing loggers to use stderr | |
| logging.root.handlers = [] | |
| stderr_handler = logging.StreamHandler(sys.stderr) | |
| stderr_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) | |
| logging.root.addHandler(stderr_handler) | |
| logging.root.setLevel(logging.CRITICAL) | |
| # Configure all existing loggers to use stderr and CRITICAL level | |
| for name in list(logging.root.manager.loggerDict.keys()): | |
| logger_obj = logging.getLogger(name) | |
| logger_obj.handlers = [] | |
| logger_obj.setLevel(logging.CRITICAL) | |
| logger_obj.addHandler(stderr_handler) | |
| logger_obj.propagate = False | |
| # Configure MCP server logging before any browser_use imports to capture early log lines | |
| _configure_mcp_server_logging() | |
| # Additional suppression - disable all logging completely for MCP mode | |
| logging.disable(logging.CRITICAL) | |
| # Import browser_use modules | |
| from browser_use import ActionModel, Agent | |
| from browser_use.browser import BrowserProfile, BrowserSession | |
| from browser_use.config import get_default_llm, get_default_profile, load_browser_use_config | |
| from browser_use.filesystem.file_system import FileSystem | |
| from browser_use.llm.openai.chat import ChatOpenAI | |
| from browser_use.tools.service import Tools | |
| logger = logging.getLogger(__name__) | |
| def _ensure_all_loggers_use_stderr(): | |
| """Ensure ALL loggers only output to stderr, not stdout.""" | |
| # Get the stderr handler | |
| stderr_handler = None | |
| for handler in logging.root.handlers: | |
| if hasattr(handler, 'stream') and handler.stream == sys.stderr: # type: ignore | |
| stderr_handler = handler | |
| break | |
| if not stderr_handler: | |
| stderr_handler = logging.StreamHandler(sys.stderr) | |
| stderr_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) | |
| # Configure root logger | |
| logging.root.handlers = [stderr_handler] | |
| logging.root.setLevel(logging.CRITICAL) | |
| # Configure all existing loggers | |
| for name in list(logging.root.manager.loggerDict.keys()): | |
| logger_obj = logging.getLogger(name) | |
| logger_obj.handlers = [stderr_handler] | |
| logger_obj.setLevel(logging.CRITICAL) | |
| logger_obj.propagate = False | |
| # Ensure stderr logging after all imports | |
| _ensure_all_loggers_use_stderr() | |
| # Try to import MCP SDK | |
| try: | |
| import mcp.server.stdio | |
| import mcp.types as types | |
| from mcp.server import NotificationOptions, Server | |
| from mcp.server.models import InitializationOptions | |
| MCP_AVAILABLE = True | |
| # Configure MCP SDK logging to stderr as well | |
| mcp_logger = logging.getLogger('mcp') | |
| mcp_logger.handlers = [] | |
| mcp_logger.addHandler(logging.root.handlers[0] if logging.root.handlers else logging.StreamHandler(sys.stderr)) | |
| mcp_logger.setLevel(logging.ERROR) | |
| mcp_logger.propagate = False | |
| except ImportError: | |
| MCP_AVAILABLE = False | |
| logger.error('MCP SDK not installed. Install with: pip install mcp') | |
| sys.exit(1) | |
| from browser_use.telemetry import MCPServerTelemetryEvent, ProductTelemetry | |
| from browser_use.utils import get_browser_use_version | |
| def get_parent_process_cmdline() -> str | None: | |
| """Get the command line of all parent processes up the chain.""" | |
| if not PSUTIL_AVAILABLE: | |
| return None | |
| try: | |
| cmdlines = [] | |
| current_process = psutil.Process() | |
| parent = current_process.parent() | |
| while parent: | |
| try: | |
| cmdline = parent.cmdline() | |
| if cmdline: | |
| cmdlines.append(' '.join(cmdline)) | |
| except (psutil.AccessDenied, psutil.NoSuchProcess): | |
| # Skip processes we can't access (like system processes) | |
| pass | |
| try: | |
| parent = parent.parent() | |
| except (psutil.AccessDenied, psutil.NoSuchProcess): | |
| # Can't go further up the chain | |
| break | |
| return ';'.join(cmdlines) if cmdlines else None | |
| except Exception: | |
| # If we can't get parent process info, just return None | |
| return None | |
| class BrowserUseServer: | |
| """MCP Server for browser-use capabilities.""" | |
| def __init__(self, session_timeout_minutes: int = 10): | |
| # Ensure all logging goes to stderr (in case new loggers were created) | |
| _ensure_all_loggers_use_stderr() | |
| self.server = Server('browser-use') | |
| self.config = load_browser_use_config() | |
| self.agent: Agent | None = None | |
| self.browser_session: BrowserSession | None = None | |
| self.tools: Tools | None = None | |
| self.llm: ChatOpenAI | None = None | |
| self.file_system: FileSystem | None = None | |
| self._telemetry = ProductTelemetry() | |
| self._start_time = time.time() | |
| # Session management | |
| self.active_sessions: dict[str, dict[str, Any]] = {} # session_id -> session info | |
| self.session_timeout_minutes = session_timeout_minutes | |
| self._cleanup_task: Any = None | |
| # Setup handlers | |
| self._setup_handlers() | |
| def _setup_handlers(self): | |
| """Setup MCP server handlers.""" | |
| async def handle_list_tools() -> list[types.Tool]: | |
| """List all available browser-use tools.""" | |
| return [ | |
| # Agent tools | |
| # Direct browser control tools | |
| types.Tool( | |
| name='browser_navigate', | |
| description='Navigate to a URL in the browser', | |
| inputSchema={ | |
| 'type': 'object', | |
| 'properties': { | |
| 'url': {'type': 'string', 'description': 'The URL to navigate to'}, | |
| 'new_tab': {'type': 'boolean', 'description': 'Whether to open in a new tab', 'default': False}, | |
| }, | |
| 'required': ['url'], | |
| }, | |
| ), | |
| types.Tool( | |
| name='browser_click', | |
| description='Click an element on the page by its index', | |
| inputSchema={ | |
| 'type': 'object', | |
| 'properties': { | |
| 'index': { | |
| 'type': 'integer', | |
| 'description': 'The index of the link or element to click (from browser_get_state)', | |
| }, | |
| 'new_tab': { | |
| 'type': 'boolean', | |
| 'description': 'Whether to open any resulting navigation in a new tab', | |
| 'default': False, | |
| }, | |
| }, | |
| 'required': ['index'], | |
| }, | |
| ), | |
| types.Tool( | |
| name='browser_type', | |
| description='Type text into an input field', | |
| inputSchema={ | |
| 'type': 'object', | |
| 'properties': { | |
| 'index': { | |
| 'type': 'integer', | |
| 'description': 'The index of the input element (from browser_get_state)', | |
| }, | |
| 'text': {'type': 'string', 'description': 'The text to type'}, | |
| }, | |
| 'required': ['index', 'text'], | |
| }, | |
| ), | |
| types.Tool( | |
| name='browser_get_state', | |
| description='Get the current state of the page including all interactive elements', | |
| inputSchema={ | |
| 'type': 'object', | |
| 'properties': { | |
| 'include_screenshot': { | |
| 'type': 'boolean', | |
| 'description': 'Whether to include a screenshot of the current page', | |
| 'default': False, | |
| } | |
| }, | |
| }, | |
| ), | |
| types.Tool( | |
| name='browser_extract_content', | |
| description='Extract structured content from the current page based on a query', | |
| inputSchema={ | |
| 'type': 'object', | |
| 'properties': { | |
| 'query': {'type': 'string', 'description': 'What information to extract from the page'}, | |
| 'extract_links': { | |
| 'type': 'boolean', | |
| 'description': 'Whether to include links in the extraction', | |
| 'default': False, | |
| }, | |
| }, | |
| 'required': ['query'], | |
| }, | |
| ), | |
| types.Tool( | |
| name='browser_scroll', | |
| description='Scroll the page', | |
| inputSchema={ | |
| 'type': 'object', | |
| 'properties': { | |
| 'direction': { | |
| 'type': 'string', | |
| 'enum': ['up', 'down'], | |
| 'description': 'Direction to scroll', | |
| 'default': 'down', | |
| } | |
| }, | |
| }, | |
| ), | |
| types.Tool( | |
| name='browser_go_back', | |
| description='Go back to the previous page', | |
| inputSchema={'type': 'object', 'properties': {}}, | |
| ), | |
| # Tab management | |
| types.Tool( | |
| name='browser_list_tabs', description='List all open tabs', inputSchema={'type': 'object', 'properties': {}} | |
| ), | |
| types.Tool( | |
| name='browser_switch_tab', | |
| description='Switch to a different tab', | |
| inputSchema={ | |
| 'type': 'object', | |
| 'properties': {'tab_id': {'type': 'string', 'description': '4 Character Tab ID of the tab to switch to'}}, | |
| 'required': ['tab_id'], | |
| }, | |
| ), | |
| types.Tool( | |
| name='browser_close_tab', | |
| description='Close a tab', | |
| inputSchema={ | |
| 'type': 'object', | |
| 'properties': {'tab_id': {'type': 'string', 'description': '4 Character Tab ID of the tab to close'}}, | |
| 'required': ['tab_id'], | |
| }, | |
| ), | |
| # types.Tool( | |
| # name="browser_close", | |
| # description="Close the browser session", | |
| # inputSchema={ | |
| # "type": "object", | |
| # "properties": {} | |
| # } | |
| # ), | |
| types.Tool( | |
| name='retry_with_browser_use_agent', | |
| description='Retry a task using the browser-use agent. Only use this as a last resort if you fail to interact with a page multiple times.', | |
| inputSchema={ | |
| 'type': 'object', | |
| 'properties': { | |
| 'task': { | |
| 'type': 'string', | |
| 'description': 'The high-level goal and detailed step-by-step description of the task the AI browser agent needs to attempt, along with any relevant data needed to complete the task and info about previous attempts.', | |
| }, | |
| 'max_steps': { | |
| 'type': 'integer', | |
| 'description': 'Maximum number of steps an agent can take.', | |
| 'default': 100, | |
| }, | |
| 'model': { | |
| 'type': 'string', | |
| 'description': 'LLM model to use (e.g., gpt-4o, claude-3-opus-20240229)', | |
| 'default': 'gpt-4o', | |
| }, | |
| 'allowed_domains': { | |
| 'type': 'array', | |
| 'items': {'type': 'string'}, | |
| 'description': 'List of domains the agent is allowed to visit (security feature)', | |
| 'default': [], | |
| }, | |
| 'use_vision': { | |
| 'type': 'boolean', | |
| 'description': 'Whether to use vision capabilities (screenshots) for the agent', | |
| 'default': True, | |
| }, | |
| }, | |
| 'required': ['task'], | |
| }, | |
| ), | |
| # Browser session management tools | |
| types.Tool( | |
| name='browser_list_sessions', | |
| description='List all active browser sessions with their details and last activity time', | |
| inputSchema={'type': 'object', 'properties': {}}, | |
| ), | |
| types.Tool( | |
| name='browser_close_session', | |
| description='Close a specific browser session by its ID', | |
| inputSchema={ | |
| 'type': 'object', | |
| 'properties': { | |
| 'session_id': { | |
| 'type': 'string', | |
| 'description': 'The browser session ID to close (get from browser_list_sessions)', | |
| } | |
| }, | |
| 'required': ['session_id'], | |
| }, | |
| ), | |
| types.Tool( | |
| name='browser_close_all', | |
| description='Close all active browser sessions and clean up resources', | |
| inputSchema={'type': 'object', 'properties': {}}, | |
| ), | |
| ] | |
| async def handle_list_resources() -> list[types.Resource]: | |
| """List available resources (none for browser-use).""" | |
| return [] | |
| async def handle_list_prompts() -> list[types.Prompt]: | |
| """List available prompts (none for browser-use).""" | |
| return [] | |
| async def handle_call_tool(name: str, arguments: dict[str, Any] | None) -> list[types.TextContent]: | |
| """Handle tool execution.""" | |
| start_time = time.time() | |
| error_msg = None | |
| try: | |
| result = await self._execute_tool(name, arguments or {}) | |
| return [types.TextContent(type='text', text=result)] | |
| except Exception as e: | |
| error_msg = str(e) | |
| logger.error(f'Tool execution failed: {e}', exc_info=True) | |
| return [types.TextContent(type='text', text=f'Error: {str(e)}')] | |
| finally: | |
| # Capture telemetry for tool calls | |
| duration = time.time() - start_time | |
| self._telemetry.capture( | |
| MCPServerTelemetryEvent( | |
| version=get_browser_use_version(), | |
| action='tool_call', | |
| tool_name=name, | |
| duration_seconds=duration, | |
| error_message=error_msg, | |
| ) | |
| ) | |
| async def _execute_tool(self, tool_name: str, arguments: dict[str, Any]) -> str: | |
| """Execute a browser-use tool.""" | |
| # Agent-based tools | |
| if tool_name == 'retry_with_browser_use_agent': | |
| return await self._retry_with_browser_use_agent( | |
| task=arguments['task'], | |
| max_steps=arguments.get('max_steps', 100), | |
| model=arguments.get('model', 'gpt-4o'), | |
| allowed_domains=arguments.get('allowed_domains', []), | |
| use_vision=arguments.get('use_vision', True), | |
| ) | |
| # Browser session management tools (don't require active session) | |
| if tool_name == 'browser_list_sessions': | |
| return await self._list_sessions() | |
| elif tool_name == 'browser_close_session': | |
| return await self._close_session(arguments['session_id']) | |
| elif tool_name == 'browser_close_all': | |
| return await self._close_all_sessions() | |
| # Direct browser control tools (require active session) | |
| elif tool_name.startswith('browser_'): | |
| # Ensure browser session exists | |
| if not self.browser_session: | |
| await self._init_browser_session() | |
| if tool_name == 'browser_navigate': | |
| return await self._navigate(arguments['url'], arguments.get('new_tab', False)) | |
| elif tool_name == 'browser_click': | |
| return await self._click(arguments['index'], arguments.get('new_tab', False)) | |
| elif tool_name == 'browser_type': | |
| return await self._type_text(arguments['index'], arguments['text']) | |
| elif tool_name == 'browser_get_state': | |
| return await self._get_browser_state(arguments.get('include_screenshot', False)) | |
| elif tool_name == 'browser_extract_content': | |
| return await self._extract_content(arguments['query'], arguments.get('extract_links', False)) | |
| elif tool_name == 'browser_scroll': | |
| return await self._scroll(arguments.get('direction', 'down')) | |
| elif tool_name == 'browser_go_back': | |
| return await self._go_back() | |
| elif tool_name == 'browser_close': | |
| return await self._close_browser() | |
| elif tool_name == 'browser_list_tabs': | |
| return await self._list_tabs() | |
| elif tool_name == 'browser_switch_tab': | |
| return await self._switch_tab(arguments['tab_id']) | |
| elif tool_name == 'browser_close_tab': | |
| return await self._close_tab(arguments['tab_id']) | |
| return f'Unknown tool: {tool_name}' | |
| async def _init_browser_session(self, allowed_domains: list[str] | None = None, **kwargs): | |
| """Initialize browser session using config""" | |
| if self.browser_session: | |
| return | |
| # Ensure all logging goes to stderr before browser initialization | |
| _ensure_all_loggers_use_stderr() | |
| logger.debug('Initializing browser session...') | |
| # Get profile config | |
| profile_config = get_default_profile(self.config) | |
| # Merge profile config with defaults and overrides | |
| profile_data = { | |
| 'downloads_path': str(Path.home() / 'Downloads' / 'browser-use-mcp'), | |
| 'wait_between_actions': 0.5, | |
| 'keep_alive': True, | |
| 'user_data_dir': '~/.config/browseruse/profiles/default', | |
| 'device_scale_factor': 1.0, | |
| 'disable_security': False, | |
| 'headless': False, | |
| **profile_config, # Config values override defaults | |
| } | |
| # Tool parameter overrides (highest priority) | |
| if allowed_domains is not None: | |
| profile_data['allowed_domains'] = allowed_domains | |
| # Merge any additional kwargs that are valid BrowserProfile fields | |
| for key, value in kwargs.items(): | |
| profile_data[key] = value | |
| # Create browser profile | |
| profile = BrowserProfile(**profile_data) | |
| # Create browser session | |
| self.browser_session = BrowserSession(browser_profile=profile) | |
| await self.browser_session.start() | |
| # Track the session for management | |
| self._track_session(self.browser_session) | |
| # Create tools for direct actions | |
| self.tools = Tools() | |
| # Initialize LLM from config | |
| llm_config = get_default_llm(self.config) | |
| if api_key := llm_config.get('api_key'): | |
| self.llm = ChatOpenAI( | |
| model=llm_config.get('model', 'gpt-4o-mini'), | |
| api_key=api_key, | |
| temperature=llm_config.get('temperature', 0.7), | |
| # max_tokens=llm_config.get('max_tokens'), | |
| ) | |
| # Initialize FileSystem for extraction actions | |
| file_system_path = profile_config.get('file_system_path', '~/.browser-use-mcp') | |
| self.file_system = FileSystem(base_dir=Path(file_system_path).expanduser()) | |
| logger.debug('Browser session initialized') | |
| async def _retry_with_browser_use_agent( | |
| self, | |
| task: str, | |
| max_steps: int = 100, | |
| model: str = 'gpt-4o', | |
| allowed_domains: list[str] | None = None, | |
| use_vision: bool = True, | |
| ) -> str: | |
| """Run an autonomous agent task.""" | |
| logger.debug(f'Running agent task: {task}') | |
| # Get LLM config | |
| llm_config = get_default_llm(self.config) | |
| # Get LLM provider | |
| model_provider = llm_config.get('model_provider') or os.getenv('MODEL_PROVIDER') | |
| # 如果model_provider不等于空,且等Bedrock | |
| if model_provider and model_provider.lower() == 'bedrock': | |
| llm_model = llm_config.get('model') or os.getenv('MODEL') or 'us.anthropic.claude-sonnet-4-20250514-v1:0' | |
| aws_region = llm_config.get('region') or os.getenv('REGION') | |
| if not aws_region: | |
| aws_region = 'us-east-1' | |
| llm = ChatAWSBedrock( | |
| model=llm_model, # or any Bedrock model | |
| aws_region=aws_region, | |
| aws_sso_auth=True, | |
| ) | |
| else: | |
| api_key = llm_config.get('api_key') or os.getenv('OPENAI_API_KEY') | |
| if not api_key: | |
| return 'Error: OPENAI_API_KEY not set in config or environment' | |
| # Override model if provided in tool call | |
| if model != llm_config.get('model', 'gpt-4o'): | |
| llm_model = model | |
| else: | |
| llm_model = llm_config.get('model', 'gpt-4o') | |
| llm = ChatOpenAI( | |
| model=llm_model, | |
| api_key=api_key, | |
| temperature=llm_config.get('temperature', 0.7), | |
| ) | |
| # Get profile config and merge with tool parameters | |
| profile_config = get_default_profile(self.config) | |
| # Override allowed_domains if provided in tool call | |
| if allowed_domains is not None: | |
| profile_config['allowed_domains'] = allowed_domains | |
| # Create browser profile using config | |
| profile = BrowserProfile(**profile_config) | |
| # Create and run agent | |
| agent = Agent( | |
| task=task, | |
| llm=llm, | |
| browser_profile=profile, | |
| use_vision=use_vision, | |
| ) | |
| try: | |
| history = await agent.run(max_steps=max_steps) | |
| # Format results | |
| results = [] | |
| results.append(f'Task completed in {len(history.history)} steps') | |
| results.append(f'Success: {history.is_successful()}') | |
| # Get final result if available | |
| final_result = history.final_result() | |
| if final_result: | |
| results.append(f'\nFinal result:\n{final_result}') | |
| # Include any errors | |
| errors = history.errors() | |
| if errors: | |
| results.append(f'\nErrors encountered:\n{json.dumps(errors, indent=2)}') | |
| # Include URLs visited | |
| urls = history.urls() | |
| if urls: | |
| # Filter out None values and convert to strings | |
| valid_urls = [str(url) for url in urls if url is not None] | |
| if valid_urls: | |
| results.append(f'\nURLs visited: {", ".join(valid_urls)}') | |
| return '\n'.join(results) | |
| except Exception as e: | |
| logger.error(f'Agent task failed: {e}', exc_info=True) | |
| return f'Agent task failed: {str(e)}' | |
| finally: | |
| # Clean up | |
| await agent.close() | |
| async def _navigate(self, url: str, new_tab: bool = False) -> str: | |
| """Navigate to a URL.""" | |
| if not self.browser_session: | |
| return 'Error: No browser session active' | |
| # Update session activity | |
| self._update_session_activity(self.browser_session.id) | |
| from browser_use.browser.events import NavigateToUrlEvent | |
| if new_tab: | |
| event = self.browser_session.event_bus.dispatch(NavigateToUrlEvent(url=url, new_tab=True)) | |
| await event | |
| return f'Opened new tab with URL: {url}' | |
| else: | |
| event = self.browser_session.event_bus.dispatch(NavigateToUrlEvent(url=url)) | |
| await event | |
| return f'Navigated to: {url}' | |
| async def _click(self, index: int, new_tab: bool = False) -> str: | |
| """Click an element by index.""" | |
| if not self.browser_session: | |
| return 'Error: No browser session active' | |
| # Update session activity | |
| self._update_session_activity(self.browser_session.id) | |
| # Get the element | |
| element = await self.browser_session.get_dom_element_by_index(index) | |
| if not element: | |
| return f'Element with index {index} not found' | |
| if new_tab: | |
| # For links, extract href and open in new tab | |
| href = element.attributes.get('href') | |
| if href: | |
| # Convert relative href to absolute URL | |
| state = await self.browser_session.get_browser_state_summary() | |
| current_url = state.url | |
| if href.startswith('/'): | |
| # Relative URL - construct full URL | |
| from urllib.parse import urlparse | |
| parsed = urlparse(current_url) | |
| full_url = f'{parsed.scheme}://{parsed.netloc}{href}' | |
| else: | |
| full_url = href | |
| # Open link in new tab | |
| from browser_use.browser.events import NavigateToUrlEvent | |
| event = self.browser_session.event_bus.dispatch(NavigateToUrlEvent(url=full_url, new_tab=True)) | |
| await event | |
| return f'Clicked element {index} and opened in new tab {full_url[:20]}...' | |
| else: | |
| # For non-link elements, just do a normal click | |
| # Opening in new tab without href is not reliably supported | |
| from browser_use.browser.events import ClickElementEvent | |
| event = self.browser_session.event_bus.dispatch(ClickElementEvent(node=element)) | |
| await event | |
| return f'Clicked element {index} (new tab not supported for non-link elements)' | |
| else: | |
| # Normal click | |
| from browser_use.browser.events import ClickElementEvent | |
| event = self.browser_session.event_bus.dispatch(ClickElementEvent(node=element)) | |
| await event | |
| return f'Clicked element {index}' | |
| async def _type_text(self, index: int, text: str) -> str: | |
| """Type text into an element.""" | |
| if not self.browser_session: | |
| return 'Error: No browser session active' | |
| element = await self.browser_session.get_dom_element_by_index(index) | |
| if not element: | |
| return f'Element with index {index} not found' | |
| from browser_use.browser.events import TypeTextEvent | |
| # Conservative heuristic to detect potentially sensitive data | |
| # Only flag very obvious patterns to minimize false positives | |
| is_potentially_sensitive = len(text) >= 6 and ( | |
| # Email pattern: contains @ and a domain-like suffix | |
| ('@' in text and '.' in text.split('@')[-1] if '@' in text else False) | |
| # Mixed alphanumeric with reasonable complexity (likely API keys/tokens) | |
| or ( | |
| len(text) >= 16 | |
| and any(char.isdigit() for char in text) | |
| and any(char.isalpha() for char in text) | |
| and any(char in '.-_' for char in text) | |
| ) | |
| ) | |
| # Use generic key names to avoid information leakage about detection patterns | |
| sensitive_key_name = None | |
| if is_potentially_sensitive: | |
| if '@' in text and '.' in text.split('@')[-1]: | |
| sensitive_key_name = 'email' | |
| else: | |
| sensitive_key_name = 'credential' | |
| event = self.browser_session.event_bus.dispatch( | |
| TypeTextEvent(node=element, text=text, is_sensitive=is_potentially_sensitive, sensitive_key_name=sensitive_key_name) | |
| ) | |
| await event | |
| if is_potentially_sensitive: | |
| if sensitive_key_name: | |
| return f'Typed <{sensitive_key_name}> into element {index}' | |
| else: | |
| return f'Typed <sensitive> into element {index}' | |
| else: | |
| return f"Typed '{text}' into element {index}" | |
| async def _get_browser_state(self, include_screenshot: bool = False) -> str: | |
| """Get current browser state.""" | |
| if not self.browser_session: | |
| return 'Error: No browser session active' | |
| state = await self.browser_session.get_browser_state_summary() | |
| result = { | |
| 'url': state.url, | |
| 'title': state.title, | |
| 'tabs': [{'url': tab.url, 'title': tab.title} for tab in state.tabs], | |
| 'interactive_elements': [], | |
| } | |
| # Add interactive elements with their indices | |
| for index, element in state.dom_state.selector_map.items(): | |
| elem_info = { | |
| 'index': index, | |
| 'tag': element.tag_name, | |
| 'text': element.get_all_children_text(max_depth=2)[:100], | |
| } | |
| if element.attributes.get('placeholder'): | |
| elem_info['placeholder'] = element.attributes['placeholder'] | |
| if element.attributes.get('href'): | |
| elem_info['href'] = element.attributes['href'] | |
| result['interactive_elements'].append(elem_info) | |
| if include_screenshot and state.screenshot: | |
| result['screenshot'] = state.screenshot | |
| return json.dumps(result, indent=2) | |
| async def _extract_content(self, query: str, extract_links: bool = False) -> str: | |
| """Extract content from current page.""" | |
| if not self.llm: | |
| return 'Error: LLM not initialized (set OPENAI_API_KEY)' | |
| if not self.file_system: | |
| return 'Error: FileSystem not initialized' | |
| if not self.browser_session: | |
| return 'Error: No browser session active' | |
| if not self.tools: | |
| return 'Error: Tools not initialized' | |
| state = await self.browser_session.get_browser_state_summary() | |
| # Use the extract action | |
| # Create a dynamic action model that matches the tools's expectations | |
| from pydantic import create_model | |
| # Create action model dynamically | |
| ExtractAction = create_model( | |
| 'ExtractAction', | |
| __base__=ActionModel, | |
| extract=dict[str, Any], | |
| ) | |
| # Use model_validate because Pyright does not understand the dynamic model | |
| action = ExtractAction.model_validate( | |
| { | |
| 'extract': {'query': query, 'extract_links': extract_links}, | |
| } | |
| ) | |
| action_result = await self.tools.act( | |
| action=action, | |
| browser_session=self.browser_session, | |
| page_extraction_llm=self.llm, | |
| file_system=self.file_system, | |
| ) | |
| return action_result.extracted_content or 'No content extracted' | |
| async def _scroll(self, direction: str = 'down') -> str: | |
| """Scroll the page.""" | |
| if not self.browser_session: | |
| return 'Error: No browser session active' | |
| from browser_use.browser.events import ScrollEvent | |
| # Scroll by a standard amount (500 pixels) | |
| event = self.browser_session.event_bus.dispatch( | |
| ScrollEvent( | |
| direction=direction, # type: ignore | |
| amount=500, | |
| ) | |
| ) | |
| await event | |
| return f'Scrolled {direction}' | |
| async def _go_back(self) -> str: | |
| """Go back in browser history.""" | |
| if not self.browser_session: | |
| return 'Error: No browser session active' | |
| from browser_use.browser.events import GoBackEvent | |
| event = self.browser_session.event_bus.dispatch(GoBackEvent()) | |
| await event | |
| return 'Navigated back' | |
| async def _close_browser(self) -> str: | |
| """Close the browser session.""" | |
| if self.browser_session: | |
| from browser_use.browser.events import BrowserStopEvent | |
| event = self.browser_session.event_bus.dispatch(BrowserStopEvent()) | |
| await event | |
| self.browser_session = None | |
| self.tools = None | |
| return 'Browser closed' | |
| return 'No browser session to close' | |
| async def _list_tabs(self) -> str: | |
| """List all open tabs.""" | |
| if not self.browser_session: | |
| return 'Error: No browser session active' | |
| tabs_info = await self.browser_session.get_tabs() | |
| tabs = [] | |
| for i, tab in enumerate(tabs_info): | |
| tabs.append({'tab_id': tab.target_id[-4:], 'url': tab.url, 'title': tab.title or ''}) | |
| return json.dumps(tabs, indent=2) | |
| async def _switch_tab(self, tab_id: str) -> str: | |
| """Switch to a different tab.""" | |
| if not self.browser_session: | |
| return 'Error: No browser session active' | |
| from browser_use.browser.events import SwitchTabEvent | |
| target_id = await self.browser_session.get_target_id_from_tab_id(tab_id) | |
| event = self.browser_session.event_bus.dispatch(SwitchTabEvent(target_id=target_id)) | |
| await event | |
| state = await self.browser_session.get_browser_state_summary() | |
| return f'Switched to tab {tab_id}: {state.url}' | |
| async def _close_tab(self, tab_id: str) -> str: | |
| """Close a specific tab.""" | |
| if not self.browser_session: | |
| return 'Error: No browser session active' | |
| from browser_use.browser.events import CloseTabEvent | |
| target_id = await self.browser_session.get_target_id_from_tab_id(tab_id) | |
| event = self.browser_session.event_bus.dispatch(CloseTabEvent(target_id=target_id)) | |
| await event | |
| current_url = await self.browser_session.get_current_page_url() | |
| return f'Closed tab # {tab_id}, now on {current_url}' | |
| def _track_session(self, session: BrowserSession) -> None: | |
| """Track a browser session for management.""" | |
| self.active_sessions[session.id] = { | |
| 'session': session, | |
| 'created_at': time.time(), | |
| 'last_activity': time.time(), | |
| 'url': getattr(session, 'current_url', None), | |
| } | |
| def _update_session_activity(self, session_id: str) -> None: | |
| """Update the last activity time for a session.""" | |
| if session_id in self.active_sessions: | |
| self.active_sessions[session_id]['last_activity'] = time.time() | |
| async def _list_sessions(self) -> str: | |
| """List all active browser sessions.""" | |
| if not self.active_sessions: | |
| return 'No active browser sessions' | |
| sessions_info = [] | |
| for session_id, session_data in self.active_sessions.items(): | |
| session = session_data['session'] | |
| created_at = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(session_data['created_at'])) | |
| last_activity = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(session_data['last_activity'])) | |
| # Check if session is still active | |
| is_active = hasattr(session, 'cdp_client') and session.cdp_client is not None | |
| sessions_info.append( | |
| { | |
| 'session_id': session_id, | |
| 'created_at': created_at, | |
| 'last_activity': last_activity, | |
| 'active': is_active, | |
| 'current_url': session_data.get('url', 'Unknown'), | |
| 'age_minutes': (time.time() - session_data['created_at']) / 60, | |
| } | |
| ) | |
| return json.dumps(sessions_info, indent=2) | |
| async def _close_session(self, session_id: str) -> str: | |
| """Close a specific browser session.""" | |
| if session_id not in self.active_sessions: | |
| return f'Session {session_id} not found' | |
| session_data = self.active_sessions[session_id] | |
| session = session_data['session'] | |
| try: | |
| # Close the session | |
| if hasattr(session, 'kill'): | |
| await session.kill() | |
| elif hasattr(session, 'close'): | |
| await session.close() | |
| # Remove from tracking | |
| del self.active_sessions[session_id] | |
| # If this was the current session, clear it | |
| if self.browser_session and self.browser_session.id == session_id: | |
| self.browser_session = None | |
| self.tools = None | |
| return f'Successfully closed session {session_id}' | |
| except Exception as e: | |
| return f'Error closing session {session_id}: {str(e)}' | |
| async def _close_all_sessions(self) -> str: | |
| """Close all active browser sessions.""" | |
| if not self.active_sessions: | |
| return 'No active sessions to close' | |
| closed_count = 0 | |
| errors = [] | |
| for session_id in list(self.active_sessions.keys()): | |
| try: | |
| result = await self._close_session(session_id) | |
| if 'Successfully closed' in result: | |
| closed_count += 1 | |
| else: | |
| errors.append(f'{session_id}: {result}') | |
| except Exception as e: | |
| errors.append(f'{session_id}: {str(e)}') | |
| # Clear current session references | |
| self.browser_session = None | |
| self.tools = None | |
| result = f'Closed {closed_count} sessions' | |
| if errors: | |
| result += f'. Errors: {"; ".join(errors)}' | |
| return result | |
| async def _cleanup_expired_sessions(self) -> None: | |
| """Background task to clean up expired sessions.""" | |
| current_time = time.time() | |
| timeout_seconds = self.session_timeout_minutes * 60 | |
| expired_sessions = [] | |
| for session_id, session_data in self.active_sessions.items(): | |
| last_activity = session_data['last_activity'] | |
| if current_time - last_activity > timeout_seconds: | |
| expired_sessions.append(session_id) | |
| for session_id in expired_sessions: | |
| try: | |
| await self._close_session(session_id) | |
| logger.info(f'Auto-closed expired session {session_id}') | |
| except Exception as e: | |
| logger.error(f'Error auto-closing session {session_id}: {e}') | |
| async def _start_cleanup_task(self) -> None: | |
| """Start the background cleanup task.""" | |
| async def cleanup_loop(): | |
| while True: | |
| try: | |
| await self._cleanup_expired_sessions() | |
| # Check every 2 minutes | |
| await asyncio.sleep(120) | |
| except Exception as e: | |
| logger.error(f'Error in cleanup task: {e}') | |
| await asyncio.sleep(120) | |
| self._cleanup_task = asyncio.create_task(cleanup_loop()) | |
| async def run(self): | |
| """Run the MCP server.""" | |
| # Start the cleanup task | |
| await self._start_cleanup_task() | |
| async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): | |
| await self.server.run( | |
| read_stream, | |
| write_stream, | |
| InitializationOptions( | |
| server_name='browser-use', | |
| server_version='0.1.0', | |
| capabilities=self.server.get_capabilities( | |
| notification_options=NotificationOptions(), | |
| experimental_capabilities={}, | |
| ), | |
| ), | |
| ) | |
| async def main(session_timeout_minutes: int = 10): | |
| if not MCP_AVAILABLE: | |
| print('MCP SDK is required. Install with: pip install mcp', file=sys.stderr) | |
| sys.exit(1) | |
| server = BrowserUseServer(session_timeout_minutes=session_timeout_minutes) | |
| server._telemetry.capture( | |
| MCPServerTelemetryEvent( | |
| version=get_browser_use_version(), | |
| action='start', | |
| parent_process_cmdline=get_parent_process_cmdline(), | |
| ) | |
| ) | |
| try: | |
| await server.run() | |
| finally: | |
| duration = time.time() - server._start_time | |
| server._telemetry.capture( | |
| MCPServerTelemetryEvent( | |
| version=get_browser_use_version(), | |
| action='stop', | |
| duration_seconds=duration, | |
| parent_process_cmdline=get_parent_process_cmdline(), | |
| ) | |
| ) | |
| server._telemetry.flush() | |
| if __name__ == '__main__': | |
| asyncio.run(main()) | |