"""MCP Server for browser-use - exposes browser automation capabilities via Model Context Protocol. This server provides tools for: - Running autonomous browser tasks with an AI agent - Direct browser control (navigation, clicking, typing, etc.) - Content extraction from web pages - File system operations Usage: uvx browser-use --mcp Or as an MCP server in Claude Desktop or other MCP clients: { "mcpServers": { "browser-use": { "command": "uvx", "args": ["browser-use[cli]", "--mcp"], "env": { "OPENAI_API_KEY": "sk-proj-1234567890", } } } } """ import os import sys from browser_use.llm import ChatAWSBedrock # Set environment variables BEFORE any browser_use imports to prevent early logging os.environ['BROWSER_USE_LOGGING_LEVEL'] = 'critical' os.environ['BROWSER_USE_SETUP_LOGGING'] = 'false' import asyncio import json import logging import time from pathlib import Path from typing import Any # Configure logging for MCP mode - redirect to stderr but preserve critical diagnostics logging.basicConfig( stream=sys.stderr, level=logging.WARNING, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', force=True ) try: import psutil PSUTIL_AVAILABLE = True except ImportError: PSUTIL_AVAILABLE = False # Add browser-use to path if running from source sys.path.insert(0, str(Path(__file__).parent.parent)) # Import and configure logging to use stderr before other imports from browser_use.logging_config import setup_logging def _configure_mcp_server_logging(): """Configure logging for MCP server mode - redirect all logs to stderr to prevent JSON RPC interference.""" # Set environment to suppress browser-use logging during server mode os.environ['BROWSER_USE_LOGGING_LEVEL'] = 'warning' os.environ['BROWSER_USE_SETUP_LOGGING'] = 'false' # Prevent automatic logging setup # Configure logging to stderr for MCP mode - preserve warnings and above for troubleshooting setup_logging(stream=sys.stderr, log_level='warning', force_setup=True) # Also configure the root logger and all existing loggers to use stderr logging.root.handlers = [] stderr_handler = logging.StreamHandler(sys.stderr) stderr_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) logging.root.addHandler(stderr_handler) logging.root.setLevel(logging.CRITICAL) # Configure all existing loggers to use stderr and CRITICAL level for name in list(logging.root.manager.loggerDict.keys()): logger_obj = logging.getLogger(name) logger_obj.handlers = [] logger_obj.setLevel(logging.CRITICAL) logger_obj.addHandler(stderr_handler) logger_obj.propagate = False # Configure MCP server logging before any browser_use imports to capture early log lines _configure_mcp_server_logging() # Additional suppression - disable all logging completely for MCP mode logging.disable(logging.CRITICAL) # Import browser_use modules from browser_use import ActionModel, Agent from browser_use.browser import BrowserProfile, BrowserSession from browser_use.config import get_default_llm, get_default_profile, load_browser_use_config from browser_use.filesystem.file_system import FileSystem from browser_use.llm.openai.chat import ChatOpenAI from browser_use.tools.service import Tools logger = logging.getLogger(__name__) def _ensure_all_loggers_use_stderr(): """Ensure ALL loggers only output to stderr, not stdout.""" # Get the stderr handler stderr_handler = None for handler in logging.root.handlers: if hasattr(handler, 'stream') and handler.stream == sys.stderr: # type: ignore stderr_handler = handler break if not stderr_handler: stderr_handler = logging.StreamHandler(sys.stderr) stderr_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) # Configure root logger logging.root.handlers = [stderr_handler] logging.root.setLevel(logging.CRITICAL) # Configure all existing loggers for name in list(logging.root.manager.loggerDict.keys()): logger_obj = logging.getLogger(name) logger_obj.handlers = [stderr_handler] logger_obj.setLevel(logging.CRITICAL) logger_obj.propagate = False # Ensure stderr logging after all imports _ensure_all_loggers_use_stderr() # Try to import MCP SDK try: import mcp.server.stdio import mcp.types as types from mcp.server import NotificationOptions, Server from mcp.server.models import InitializationOptions MCP_AVAILABLE = True # Configure MCP SDK logging to stderr as well mcp_logger = logging.getLogger('mcp') mcp_logger.handlers = [] mcp_logger.addHandler(logging.root.handlers[0] if logging.root.handlers else logging.StreamHandler(sys.stderr)) mcp_logger.setLevel(logging.ERROR) mcp_logger.propagate = False except ImportError: MCP_AVAILABLE = False logger.error('MCP SDK not installed. Install with: pip install mcp') sys.exit(1) from browser_use.telemetry import MCPServerTelemetryEvent, ProductTelemetry from browser_use.utils import get_browser_use_version def get_parent_process_cmdline() -> str | None: """Get the command line of all parent processes up the chain.""" if not PSUTIL_AVAILABLE: return None try: cmdlines = [] current_process = psutil.Process() parent = current_process.parent() while parent: try: cmdline = parent.cmdline() if cmdline: cmdlines.append(' '.join(cmdline)) except (psutil.AccessDenied, psutil.NoSuchProcess): # Skip processes we can't access (like system processes) pass try: parent = parent.parent() except (psutil.AccessDenied, psutil.NoSuchProcess): # Can't go further up the chain break return ';'.join(cmdlines) if cmdlines else None except Exception: # If we can't get parent process info, just return None return None class BrowserUseServer: """MCP Server for browser-use capabilities.""" def __init__(self, session_timeout_minutes: int = 10): # Ensure all logging goes to stderr (in case new loggers were created) _ensure_all_loggers_use_stderr() self.server = Server('browser-use') self.config = load_browser_use_config() self.agent: Agent | None = None self.browser_session: BrowserSession | None = None self.tools: Tools | None = None self.llm: ChatOpenAI | None = None self.file_system: FileSystem | None = None self._telemetry = ProductTelemetry() self._start_time = time.time() # Session management self.active_sessions: dict[str, dict[str, Any]] = {} # session_id -> session info self.session_timeout_minutes = session_timeout_minutes self._cleanup_task: Any = None # Setup handlers self._setup_handlers() def _setup_handlers(self): """Setup MCP server handlers.""" @self.server.list_tools() async def handle_list_tools() -> list[types.Tool]: """List all available browser-use tools.""" return [ # Agent tools # Direct browser control tools types.Tool( name='browser_navigate', description='Navigate to a URL in the browser', inputSchema={ 'type': 'object', 'properties': { 'url': {'type': 'string', 'description': 'The URL to navigate to'}, 'new_tab': {'type': 'boolean', 'description': 'Whether to open in a new tab', 'default': False}, }, 'required': ['url'], }, ), types.Tool( name='browser_click', description='Click an element on the page by its index', inputSchema={ 'type': 'object', 'properties': { 'index': { 'type': 'integer', 'description': 'The index of the link or element to click (from browser_get_state)', }, 'new_tab': { 'type': 'boolean', 'description': 'Whether to open any resulting navigation in a new tab', 'default': False, }, }, 'required': ['index'], }, ), types.Tool( name='browser_type', description='Type text into an input field', inputSchema={ 'type': 'object', 'properties': { 'index': { 'type': 'integer', 'description': 'The index of the input element (from browser_get_state)', }, 'text': {'type': 'string', 'description': 'The text to type'}, }, 'required': ['index', 'text'], }, ), types.Tool( name='browser_get_state', description='Get the current state of the page including all interactive elements', inputSchema={ 'type': 'object', 'properties': { 'include_screenshot': { 'type': 'boolean', 'description': 'Whether to include a screenshot of the current page', 'default': False, } }, }, ), types.Tool( name='browser_extract_content', description='Extract structured content from the current page based on a query', inputSchema={ 'type': 'object', 'properties': { 'query': {'type': 'string', 'description': 'What information to extract from the page'}, 'extract_links': { 'type': 'boolean', 'description': 'Whether to include links in the extraction', 'default': False, }, }, 'required': ['query'], }, ), types.Tool( name='browser_scroll', description='Scroll the page', inputSchema={ 'type': 'object', 'properties': { 'direction': { 'type': 'string', 'enum': ['up', 'down'], 'description': 'Direction to scroll', 'default': 'down', } }, }, ), types.Tool( name='browser_go_back', description='Go back to the previous page', inputSchema={'type': 'object', 'properties': {}}, ), # Tab management types.Tool( name='browser_list_tabs', description='List all open tabs', inputSchema={'type': 'object', 'properties': {}} ), types.Tool( name='browser_switch_tab', description='Switch to a different tab', inputSchema={ 'type': 'object', 'properties': {'tab_id': {'type': 'string', 'description': '4 Character Tab ID of the tab to switch to'}}, 'required': ['tab_id'], }, ), types.Tool( name='browser_close_tab', description='Close a tab', inputSchema={ 'type': 'object', 'properties': {'tab_id': {'type': 'string', 'description': '4 Character Tab ID of the tab to close'}}, 'required': ['tab_id'], }, ), # types.Tool( # name="browser_close", # description="Close the browser session", # inputSchema={ # "type": "object", # "properties": {} # } # ), types.Tool( name='retry_with_browser_use_agent', description='Retry a task using the browser-use agent. Only use this as a last resort if you fail to interact with a page multiple times.', inputSchema={ 'type': 'object', 'properties': { 'task': { 'type': 'string', 'description': 'The high-level goal and detailed step-by-step description of the task the AI browser agent needs to attempt, along with any relevant data needed to complete the task and info about previous attempts.', }, 'max_steps': { 'type': 'integer', 'description': 'Maximum number of steps an agent can take.', 'default': 100, }, 'model': { 'type': 'string', 'description': 'LLM model to use (e.g., gpt-4o, claude-3-opus-20240229)', 'default': 'gpt-4o', }, 'allowed_domains': { 'type': 'array', 'items': {'type': 'string'}, 'description': 'List of domains the agent is allowed to visit (security feature)', 'default': [], }, 'use_vision': { 'type': 'boolean', 'description': 'Whether to use vision capabilities (screenshots) for the agent', 'default': True, }, }, 'required': ['task'], }, ), # Browser session management tools types.Tool( name='browser_list_sessions', description='List all active browser sessions with their details and last activity time', inputSchema={'type': 'object', 'properties': {}}, ), types.Tool( name='browser_close_session', description='Close a specific browser session by its ID', inputSchema={ 'type': 'object', 'properties': { 'session_id': { 'type': 'string', 'description': 'The browser session ID to close (get from browser_list_sessions)', } }, 'required': ['session_id'], }, ), types.Tool( name='browser_close_all', description='Close all active browser sessions and clean up resources', inputSchema={'type': 'object', 'properties': {}}, ), ] @self.server.list_resources() async def handle_list_resources() -> list[types.Resource]: """List available resources (none for browser-use).""" return [] @self.server.list_prompts() async def handle_list_prompts() -> list[types.Prompt]: """List available prompts (none for browser-use).""" return [] @self.server.call_tool() async def handle_call_tool(name: str, arguments: dict[str, Any] | None) -> list[types.TextContent]: """Handle tool execution.""" start_time = time.time() error_msg = None try: result = await self._execute_tool(name, arguments or {}) return [types.TextContent(type='text', text=result)] except Exception as e: error_msg = str(e) logger.error(f'Tool execution failed: {e}', exc_info=True) return [types.TextContent(type='text', text=f'Error: {str(e)}')] finally: # Capture telemetry for tool calls duration = time.time() - start_time self._telemetry.capture( MCPServerTelemetryEvent( version=get_browser_use_version(), action='tool_call', tool_name=name, duration_seconds=duration, error_message=error_msg, ) ) async def _execute_tool(self, tool_name: str, arguments: dict[str, Any]) -> str: """Execute a browser-use tool.""" # Agent-based tools if tool_name == 'retry_with_browser_use_agent': return await self._retry_with_browser_use_agent( task=arguments['task'], max_steps=arguments.get('max_steps', 100), model=arguments.get('model', 'gpt-4o'), allowed_domains=arguments.get('allowed_domains', []), use_vision=arguments.get('use_vision', True), ) # Browser session management tools (don't require active session) if tool_name == 'browser_list_sessions': return await self._list_sessions() elif tool_name == 'browser_close_session': return await self._close_session(arguments['session_id']) elif tool_name == 'browser_close_all': return await self._close_all_sessions() # Direct browser control tools (require active session) elif tool_name.startswith('browser_'): # Ensure browser session exists if not self.browser_session: await self._init_browser_session() if tool_name == 'browser_navigate': return await self._navigate(arguments['url'], arguments.get('new_tab', False)) elif tool_name == 'browser_click': return await self._click(arguments['index'], arguments.get('new_tab', False)) elif tool_name == 'browser_type': return await self._type_text(arguments['index'], arguments['text']) elif tool_name == 'browser_get_state': return await self._get_browser_state(arguments.get('include_screenshot', False)) elif tool_name == 'browser_extract_content': return await self._extract_content(arguments['query'], arguments.get('extract_links', False)) elif tool_name == 'browser_scroll': return await self._scroll(arguments.get('direction', 'down')) elif tool_name == 'browser_go_back': return await self._go_back() elif tool_name == 'browser_close': return await self._close_browser() elif tool_name == 'browser_list_tabs': return await self._list_tabs() elif tool_name == 'browser_switch_tab': return await self._switch_tab(arguments['tab_id']) elif tool_name == 'browser_close_tab': return await self._close_tab(arguments['tab_id']) return f'Unknown tool: {tool_name}' async def _init_browser_session(self, allowed_domains: list[str] | None = None, **kwargs): """Initialize browser session using config""" if self.browser_session: return # Ensure all logging goes to stderr before browser initialization _ensure_all_loggers_use_stderr() logger.debug('Initializing browser session...') # Get profile config profile_config = get_default_profile(self.config) # Merge profile config with defaults and overrides profile_data = { 'downloads_path': str(Path.home() / 'Downloads' / 'browser-use-mcp'), 'wait_between_actions': 0.5, 'keep_alive': True, 'user_data_dir': '~/.config/browseruse/profiles/default', 'device_scale_factor': 1.0, 'disable_security': False, 'headless': False, **profile_config, # Config values override defaults } # Tool parameter overrides (highest priority) if allowed_domains is not None: profile_data['allowed_domains'] = allowed_domains # Merge any additional kwargs that are valid BrowserProfile fields for key, value in kwargs.items(): profile_data[key] = value # Create browser profile profile = BrowserProfile(**profile_data) # Create browser session self.browser_session = BrowserSession(browser_profile=profile) await self.browser_session.start() # Track the session for management self._track_session(self.browser_session) # Create tools for direct actions self.tools = Tools() # Initialize LLM from config llm_config = get_default_llm(self.config) if api_key := llm_config.get('api_key'): self.llm = ChatOpenAI( model=llm_config.get('model', 'gpt-4o-mini'), api_key=api_key, temperature=llm_config.get('temperature', 0.7), # max_tokens=llm_config.get('max_tokens'), ) # Initialize FileSystem for extraction actions file_system_path = profile_config.get('file_system_path', '~/.browser-use-mcp') self.file_system = FileSystem(base_dir=Path(file_system_path).expanduser()) logger.debug('Browser session initialized') async def _retry_with_browser_use_agent( self, task: str, max_steps: int = 100, model: str = 'gpt-4o', allowed_domains: list[str] | None = None, use_vision: bool = True, ) -> str: """Run an autonomous agent task.""" logger.debug(f'Running agent task: {task}') # Get LLM config llm_config = get_default_llm(self.config) # Get LLM provider model_provider = llm_config.get('model_provider') or os.getenv('MODEL_PROVIDER') # 如果model_provider不等于空,且等Bedrock if model_provider and model_provider.lower() == 'bedrock': llm_model = llm_config.get('model') or os.getenv('MODEL') or 'us.anthropic.claude-sonnet-4-20250514-v1:0' aws_region = llm_config.get('region') or os.getenv('REGION') if not aws_region: aws_region = 'us-east-1' llm = ChatAWSBedrock( model=llm_model, # or any Bedrock model aws_region=aws_region, aws_sso_auth=True, ) else: api_key = llm_config.get('api_key') or os.getenv('OPENAI_API_KEY') if not api_key: return 'Error: OPENAI_API_KEY not set in config or environment' # Override model if provided in tool call if model != llm_config.get('model', 'gpt-4o'): llm_model = model else: llm_model = llm_config.get('model', 'gpt-4o') llm = ChatOpenAI( model=llm_model, api_key=api_key, temperature=llm_config.get('temperature', 0.7), ) # Get profile config and merge with tool parameters profile_config = get_default_profile(self.config) # Override allowed_domains if provided in tool call if allowed_domains is not None: profile_config['allowed_domains'] = allowed_domains # Create browser profile using config profile = BrowserProfile(**profile_config) # Create and run agent agent = Agent( task=task, llm=llm, browser_profile=profile, use_vision=use_vision, ) try: history = await agent.run(max_steps=max_steps) # Format results results = [] results.append(f'Task completed in {len(history.history)} steps') results.append(f'Success: {history.is_successful()}') # Get final result if available final_result = history.final_result() if final_result: results.append(f'\nFinal result:\n{final_result}') # Include any errors errors = history.errors() if errors: results.append(f'\nErrors encountered:\n{json.dumps(errors, indent=2)}') # Include URLs visited urls = history.urls() if urls: # Filter out None values and convert to strings valid_urls = [str(url) for url in urls if url is not None] if valid_urls: results.append(f'\nURLs visited: {", ".join(valid_urls)}') return '\n'.join(results) except Exception as e: logger.error(f'Agent task failed: {e}', exc_info=True) return f'Agent task failed: {str(e)}' finally: # Clean up await agent.close() async def _navigate(self, url: str, new_tab: bool = False) -> str: """Navigate to a URL.""" if not self.browser_session: return 'Error: No browser session active' # Update session activity self._update_session_activity(self.browser_session.id) from browser_use.browser.events import NavigateToUrlEvent if new_tab: event = self.browser_session.event_bus.dispatch(NavigateToUrlEvent(url=url, new_tab=True)) await event return f'Opened new tab with URL: {url}' else: event = self.browser_session.event_bus.dispatch(NavigateToUrlEvent(url=url)) await event return f'Navigated to: {url}' async def _click(self, index: int, new_tab: bool = False) -> str: """Click an element by index.""" if not self.browser_session: return 'Error: No browser session active' # Update session activity self._update_session_activity(self.browser_session.id) # Get the element element = await self.browser_session.get_dom_element_by_index(index) if not element: return f'Element with index {index} not found' if new_tab: # For links, extract href and open in new tab href = element.attributes.get('href') if href: # Convert relative href to absolute URL state = await self.browser_session.get_browser_state_summary() current_url = state.url if href.startswith('/'): # Relative URL - construct full URL from urllib.parse import urlparse parsed = urlparse(current_url) full_url = f'{parsed.scheme}://{parsed.netloc}{href}' else: full_url = href # Open link in new tab from browser_use.browser.events import NavigateToUrlEvent event = self.browser_session.event_bus.dispatch(NavigateToUrlEvent(url=full_url, new_tab=True)) await event return f'Clicked element {index} and opened in new tab {full_url[:20]}...' else: # For non-link elements, just do a normal click # Opening in new tab without href is not reliably supported from browser_use.browser.events import ClickElementEvent event = self.browser_session.event_bus.dispatch(ClickElementEvent(node=element)) await event return f'Clicked element {index} (new tab not supported for non-link elements)' else: # Normal click from browser_use.browser.events import ClickElementEvent event = self.browser_session.event_bus.dispatch(ClickElementEvent(node=element)) await event return f'Clicked element {index}' async def _type_text(self, index: int, text: str) -> str: """Type text into an element.""" if not self.browser_session: return 'Error: No browser session active' element = await self.browser_session.get_dom_element_by_index(index) if not element: return f'Element with index {index} not found' from browser_use.browser.events import TypeTextEvent # Conservative heuristic to detect potentially sensitive data # Only flag very obvious patterns to minimize false positives is_potentially_sensitive = len(text) >= 6 and ( # Email pattern: contains @ and a domain-like suffix ('@' in text and '.' in text.split('@')[-1] if '@' in text else False) # Mixed alphanumeric with reasonable complexity (likely API keys/tokens) or ( len(text) >= 16 and any(char.isdigit() for char in text) and any(char.isalpha() for char in text) and any(char in '.-_' for char in text) ) ) # Use generic key names to avoid information leakage about detection patterns sensitive_key_name = None if is_potentially_sensitive: if '@' in text and '.' in text.split('@')[-1]: sensitive_key_name = 'email' else: sensitive_key_name = 'credential' event = self.browser_session.event_bus.dispatch( TypeTextEvent(node=element, text=text, is_sensitive=is_potentially_sensitive, sensitive_key_name=sensitive_key_name) ) await event if is_potentially_sensitive: if sensitive_key_name: return f'Typed <{sensitive_key_name}> into element {index}' else: return f'Typed into element {index}' else: return f"Typed '{text}' into element {index}" async def _get_browser_state(self, include_screenshot: bool = False) -> str: """Get current browser state.""" if not self.browser_session: return 'Error: No browser session active' state = await self.browser_session.get_browser_state_summary() result = { 'url': state.url, 'title': state.title, 'tabs': [{'url': tab.url, 'title': tab.title} for tab in state.tabs], 'interactive_elements': [], } # Add interactive elements with their indices for index, element in state.dom_state.selector_map.items(): elem_info = { 'index': index, 'tag': element.tag_name, 'text': element.get_all_children_text(max_depth=2)[:100], } if element.attributes.get('placeholder'): elem_info['placeholder'] = element.attributes['placeholder'] if element.attributes.get('href'): elem_info['href'] = element.attributes['href'] result['interactive_elements'].append(elem_info) if include_screenshot and state.screenshot: result['screenshot'] = state.screenshot return json.dumps(result, indent=2) async def _extract_content(self, query: str, extract_links: bool = False) -> str: """Extract content from current page.""" if not self.llm: return 'Error: LLM not initialized (set OPENAI_API_KEY)' if not self.file_system: return 'Error: FileSystem not initialized' if not self.browser_session: return 'Error: No browser session active' if not self.tools: return 'Error: Tools not initialized' state = await self.browser_session.get_browser_state_summary() # Use the extract action # Create a dynamic action model that matches the tools's expectations from pydantic import create_model # Create action model dynamically ExtractAction = create_model( 'ExtractAction', __base__=ActionModel, extract=dict[str, Any], ) # Use model_validate because Pyright does not understand the dynamic model action = ExtractAction.model_validate( { 'extract': {'query': query, 'extract_links': extract_links}, } ) action_result = await self.tools.act( action=action, browser_session=self.browser_session, page_extraction_llm=self.llm, file_system=self.file_system, ) return action_result.extracted_content or 'No content extracted' async def _scroll(self, direction: str = 'down') -> str: """Scroll the page.""" if not self.browser_session: return 'Error: No browser session active' from browser_use.browser.events import ScrollEvent # Scroll by a standard amount (500 pixels) event = self.browser_session.event_bus.dispatch( ScrollEvent( direction=direction, # type: ignore amount=500, ) ) await event return f'Scrolled {direction}' async def _go_back(self) -> str: """Go back in browser history.""" if not self.browser_session: return 'Error: No browser session active' from browser_use.browser.events import GoBackEvent event = self.browser_session.event_bus.dispatch(GoBackEvent()) await event return 'Navigated back' async def _close_browser(self) -> str: """Close the browser session.""" if self.browser_session: from browser_use.browser.events import BrowserStopEvent event = self.browser_session.event_bus.dispatch(BrowserStopEvent()) await event self.browser_session = None self.tools = None return 'Browser closed' return 'No browser session to close' async def _list_tabs(self) -> str: """List all open tabs.""" if not self.browser_session: return 'Error: No browser session active' tabs_info = await self.browser_session.get_tabs() tabs = [] for i, tab in enumerate(tabs_info): tabs.append({'tab_id': tab.target_id[-4:], 'url': tab.url, 'title': tab.title or ''}) return json.dumps(tabs, indent=2) async def _switch_tab(self, tab_id: str) -> str: """Switch to a different tab.""" if not self.browser_session: return 'Error: No browser session active' from browser_use.browser.events import SwitchTabEvent target_id = await self.browser_session.get_target_id_from_tab_id(tab_id) event = self.browser_session.event_bus.dispatch(SwitchTabEvent(target_id=target_id)) await event state = await self.browser_session.get_browser_state_summary() return f'Switched to tab {tab_id}: {state.url}' async def _close_tab(self, tab_id: str) -> str: """Close a specific tab.""" if not self.browser_session: return 'Error: No browser session active' from browser_use.browser.events import CloseTabEvent target_id = await self.browser_session.get_target_id_from_tab_id(tab_id) event = self.browser_session.event_bus.dispatch(CloseTabEvent(target_id=target_id)) await event current_url = await self.browser_session.get_current_page_url() return f'Closed tab # {tab_id}, now on {current_url}' def _track_session(self, session: BrowserSession) -> None: """Track a browser session for management.""" self.active_sessions[session.id] = { 'session': session, 'created_at': time.time(), 'last_activity': time.time(), 'url': getattr(session, 'current_url', None), } def _update_session_activity(self, session_id: str) -> None: """Update the last activity time for a session.""" if session_id in self.active_sessions: self.active_sessions[session_id]['last_activity'] = time.time() async def _list_sessions(self) -> str: """List all active browser sessions.""" if not self.active_sessions: return 'No active browser sessions' sessions_info = [] for session_id, session_data in self.active_sessions.items(): session = session_data['session'] created_at = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(session_data['created_at'])) last_activity = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(session_data['last_activity'])) # Check if session is still active is_active = hasattr(session, 'cdp_client') and session.cdp_client is not None sessions_info.append( { 'session_id': session_id, 'created_at': created_at, 'last_activity': last_activity, 'active': is_active, 'current_url': session_data.get('url', 'Unknown'), 'age_minutes': (time.time() - session_data['created_at']) / 60, } ) return json.dumps(sessions_info, indent=2) async def _close_session(self, session_id: str) -> str: """Close a specific browser session.""" if session_id not in self.active_sessions: return f'Session {session_id} not found' session_data = self.active_sessions[session_id] session = session_data['session'] try: # Close the session if hasattr(session, 'kill'): await session.kill() elif hasattr(session, 'close'): await session.close() # Remove from tracking del self.active_sessions[session_id] # If this was the current session, clear it if self.browser_session and self.browser_session.id == session_id: self.browser_session = None self.tools = None return f'Successfully closed session {session_id}' except Exception as e: return f'Error closing session {session_id}: {str(e)}' async def _close_all_sessions(self) -> str: """Close all active browser sessions.""" if not self.active_sessions: return 'No active sessions to close' closed_count = 0 errors = [] for session_id in list(self.active_sessions.keys()): try: result = await self._close_session(session_id) if 'Successfully closed' in result: closed_count += 1 else: errors.append(f'{session_id}: {result}') except Exception as e: errors.append(f'{session_id}: {str(e)}') # Clear current session references self.browser_session = None self.tools = None result = f'Closed {closed_count} sessions' if errors: result += f'. Errors: {"; ".join(errors)}' return result async def _cleanup_expired_sessions(self) -> None: """Background task to clean up expired sessions.""" current_time = time.time() timeout_seconds = self.session_timeout_minutes * 60 expired_sessions = [] for session_id, session_data in self.active_sessions.items(): last_activity = session_data['last_activity'] if current_time - last_activity > timeout_seconds: expired_sessions.append(session_id) for session_id in expired_sessions: try: await self._close_session(session_id) logger.info(f'Auto-closed expired session {session_id}') except Exception as e: logger.error(f'Error auto-closing session {session_id}: {e}') async def _start_cleanup_task(self) -> None: """Start the background cleanup task.""" async def cleanup_loop(): while True: try: await self._cleanup_expired_sessions() # Check every 2 minutes await asyncio.sleep(120) except Exception as e: logger.error(f'Error in cleanup task: {e}') await asyncio.sleep(120) self._cleanup_task = asyncio.create_task(cleanup_loop()) async def run(self): """Run the MCP server.""" # Start the cleanup task await self._start_cleanup_task() async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): await self.server.run( read_stream, write_stream, InitializationOptions( server_name='browser-use', server_version='0.1.0', capabilities=self.server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={}, ), ), ) async def main(session_timeout_minutes: int = 10): if not MCP_AVAILABLE: print('MCP SDK is required. Install with: pip install mcp', file=sys.stderr) sys.exit(1) server = BrowserUseServer(session_timeout_minutes=session_timeout_minutes) server._telemetry.capture( MCPServerTelemetryEvent( version=get_browser_use_version(), action='start', parent_process_cmdline=get_parent_process_cmdline(), ) ) try: await server.run() finally: duration = time.time() - server._start_time server._telemetry.capture( MCPServerTelemetryEvent( version=get_browser_use_version(), action='stop', duration_seconds=duration, parent_process_cmdline=get_parent_process_cmdline(), ) ) server._telemetry.flush() if __name__ == '__main__': asyncio.run(main())