Speedofmastery's picture
Merge Landrun + Browser-Use + Chromium with AI agent support (without binary files)
d7b3d84
"""MCP Server for browser-use - exposes browser automation capabilities via Model Context Protocol.
This server provides tools for:
- Running autonomous browser tasks with an AI agent
- Direct browser control (navigation, clicking, typing, etc.)
- Content extraction from web pages
- File system operations
Usage:
uvx browser-use --mcp
Or as an MCP server in Claude Desktop or other MCP clients:
{
"mcpServers": {
"browser-use": {
"command": "uvx",
"args": ["browser-use[cli]", "--mcp"],
"env": {
"OPENAI_API_KEY": "sk-proj-1234567890",
}
}
}
}
"""
import os
import sys
from browser_use.llm import ChatAWSBedrock
# Set environment variables BEFORE any browser_use imports to prevent early logging
os.environ['BROWSER_USE_LOGGING_LEVEL'] = 'critical'
os.environ['BROWSER_USE_SETUP_LOGGING'] = 'false'
import asyncio
import json
import logging
import time
from pathlib import Path
from typing import Any
# Configure logging for MCP mode - redirect to stderr but preserve critical diagnostics
logging.basicConfig(
stream=sys.stderr, level=logging.WARNING, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', force=True
)
try:
import psutil
PSUTIL_AVAILABLE = True
except ImportError:
PSUTIL_AVAILABLE = False
# Add browser-use to path if running from source
sys.path.insert(0, str(Path(__file__).parent.parent))
# Import and configure logging to use stderr before other imports
from browser_use.logging_config import setup_logging
def _configure_mcp_server_logging():
"""Configure logging for MCP server mode - redirect all logs to stderr to prevent JSON RPC interference."""
# Set environment to suppress browser-use logging during server mode
os.environ['BROWSER_USE_LOGGING_LEVEL'] = 'warning'
os.environ['BROWSER_USE_SETUP_LOGGING'] = 'false' # Prevent automatic logging setup
# Configure logging to stderr for MCP mode - preserve warnings and above for troubleshooting
setup_logging(stream=sys.stderr, log_level='warning', force_setup=True)
# Also configure the root logger and all existing loggers to use stderr
logging.root.handlers = []
stderr_handler = logging.StreamHandler(sys.stderr)
stderr_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logging.root.addHandler(stderr_handler)
logging.root.setLevel(logging.CRITICAL)
# Configure all existing loggers to use stderr and CRITICAL level
for name in list(logging.root.manager.loggerDict.keys()):
logger_obj = logging.getLogger(name)
logger_obj.handlers = []
logger_obj.setLevel(logging.CRITICAL)
logger_obj.addHandler(stderr_handler)
logger_obj.propagate = False
# Configure MCP server logging before any browser_use imports to capture early log lines
_configure_mcp_server_logging()
# Additional suppression - disable all logging completely for MCP mode
logging.disable(logging.CRITICAL)
# Import browser_use modules
from browser_use import ActionModel, Agent
from browser_use.browser import BrowserProfile, BrowserSession
from browser_use.config import get_default_llm, get_default_profile, load_browser_use_config
from browser_use.filesystem.file_system import FileSystem
from browser_use.llm.openai.chat import ChatOpenAI
from browser_use.tools.service import Tools
logger = logging.getLogger(__name__)
def _ensure_all_loggers_use_stderr():
"""Ensure ALL loggers only output to stderr, not stdout."""
# Get the stderr handler
stderr_handler = None
for handler in logging.root.handlers:
if hasattr(handler, 'stream') and handler.stream == sys.stderr: # type: ignore
stderr_handler = handler
break
if not stderr_handler:
stderr_handler = logging.StreamHandler(sys.stderr)
stderr_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
# Configure root logger
logging.root.handlers = [stderr_handler]
logging.root.setLevel(logging.CRITICAL)
# Configure all existing loggers
for name in list(logging.root.manager.loggerDict.keys()):
logger_obj = logging.getLogger(name)
logger_obj.handlers = [stderr_handler]
logger_obj.setLevel(logging.CRITICAL)
logger_obj.propagate = False
# Ensure stderr logging after all imports
_ensure_all_loggers_use_stderr()
# Try to import MCP SDK
try:
import mcp.server.stdio
import mcp.types as types
from mcp.server import NotificationOptions, Server
from mcp.server.models import InitializationOptions
MCP_AVAILABLE = True
# Configure MCP SDK logging to stderr as well
mcp_logger = logging.getLogger('mcp')
mcp_logger.handlers = []
mcp_logger.addHandler(logging.root.handlers[0] if logging.root.handlers else logging.StreamHandler(sys.stderr))
mcp_logger.setLevel(logging.ERROR)
mcp_logger.propagate = False
except ImportError:
MCP_AVAILABLE = False
logger.error('MCP SDK not installed. Install with: pip install mcp')
sys.exit(1)
from browser_use.telemetry import MCPServerTelemetryEvent, ProductTelemetry
from browser_use.utils import get_browser_use_version
def get_parent_process_cmdline() -> str | None:
"""Get the command line of all parent processes up the chain."""
if not PSUTIL_AVAILABLE:
return None
try:
cmdlines = []
current_process = psutil.Process()
parent = current_process.parent()
while parent:
try:
cmdline = parent.cmdline()
if cmdline:
cmdlines.append(' '.join(cmdline))
except (psutil.AccessDenied, psutil.NoSuchProcess):
# Skip processes we can't access (like system processes)
pass
try:
parent = parent.parent()
except (psutil.AccessDenied, psutil.NoSuchProcess):
# Can't go further up the chain
break
return ';'.join(cmdlines) if cmdlines else None
except Exception:
# If we can't get parent process info, just return None
return None
class BrowserUseServer:
"""MCP Server for browser-use capabilities."""
def __init__(self, session_timeout_minutes: int = 10):
# Ensure all logging goes to stderr (in case new loggers were created)
_ensure_all_loggers_use_stderr()
self.server = Server('browser-use')
self.config = load_browser_use_config()
self.agent: Agent | None = None
self.browser_session: BrowserSession | None = None
self.tools: Tools | None = None
self.llm: ChatOpenAI | None = None
self.file_system: FileSystem | None = None
self._telemetry = ProductTelemetry()
self._start_time = time.time()
# Session management
self.active_sessions: dict[str, dict[str, Any]] = {} # session_id -> session info
self.session_timeout_minutes = session_timeout_minutes
self._cleanup_task: Any = None
# Setup handlers
self._setup_handlers()
def _setup_handlers(self):
"""Setup MCP server handlers."""
@self.server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
"""List all available browser-use tools."""
return [
# Agent tools
# Direct browser control tools
types.Tool(
name='browser_navigate',
description='Navigate to a URL in the browser',
inputSchema={
'type': 'object',
'properties': {
'url': {'type': 'string', 'description': 'The URL to navigate to'},
'new_tab': {'type': 'boolean', 'description': 'Whether to open in a new tab', 'default': False},
},
'required': ['url'],
},
),
types.Tool(
name='browser_click',
description='Click an element on the page by its index',
inputSchema={
'type': 'object',
'properties': {
'index': {
'type': 'integer',
'description': 'The index of the link or element to click (from browser_get_state)',
},
'new_tab': {
'type': 'boolean',
'description': 'Whether to open any resulting navigation in a new tab',
'default': False,
},
},
'required': ['index'],
},
),
types.Tool(
name='browser_type',
description='Type text into an input field',
inputSchema={
'type': 'object',
'properties': {
'index': {
'type': 'integer',
'description': 'The index of the input element (from browser_get_state)',
},
'text': {'type': 'string', 'description': 'The text to type'},
},
'required': ['index', 'text'],
},
),
types.Tool(
name='browser_get_state',
description='Get the current state of the page including all interactive elements',
inputSchema={
'type': 'object',
'properties': {
'include_screenshot': {
'type': 'boolean',
'description': 'Whether to include a screenshot of the current page',
'default': False,
}
},
},
),
types.Tool(
name='browser_extract_content',
description='Extract structured content from the current page based on a query',
inputSchema={
'type': 'object',
'properties': {
'query': {'type': 'string', 'description': 'What information to extract from the page'},
'extract_links': {
'type': 'boolean',
'description': 'Whether to include links in the extraction',
'default': False,
},
},
'required': ['query'],
},
),
types.Tool(
name='browser_scroll',
description='Scroll the page',
inputSchema={
'type': 'object',
'properties': {
'direction': {
'type': 'string',
'enum': ['up', 'down'],
'description': 'Direction to scroll',
'default': 'down',
}
},
},
),
types.Tool(
name='browser_go_back',
description='Go back to the previous page',
inputSchema={'type': 'object', 'properties': {}},
),
# Tab management
types.Tool(
name='browser_list_tabs', description='List all open tabs', inputSchema={'type': 'object', 'properties': {}}
),
types.Tool(
name='browser_switch_tab',
description='Switch to a different tab',
inputSchema={
'type': 'object',
'properties': {'tab_id': {'type': 'string', 'description': '4 Character Tab ID of the tab to switch to'}},
'required': ['tab_id'],
},
),
types.Tool(
name='browser_close_tab',
description='Close a tab',
inputSchema={
'type': 'object',
'properties': {'tab_id': {'type': 'string', 'description': '4 Character Tab ID of the tab to close'}},
'required': ['tab_id'],
},
),
# types.Tool(
# name="browser_close",
# description="Close the browser session",
# inputSchema={
# "type": "object",
# "properties": {}
# }
# ),
types.Tool(
name='retry_with_browser_use_agent',
description='Retry a task using the browser-use agent. Only use this as a last resort if you fail to interact with a page multiple times.',
inputSchema={
'type': 'object',
'properties': {
'task': {
'type': 'string',
'description': 'The high-level goal and detailed step-by-step description of the task the AI browser agent needs to attempt, along with any relevant data needed to complete the task and info about previous attempts.',
},
'max_steps': {
'type': 'integer',
'description': 'Maximum number of steps an agent can take.',
'default': 100,
},
'model': {
'type': 'string',
'description': 'LLM model to use (e.g., gpt-4o, claude-3-opus-20240229)',
'default': 'gpt-4o',
},
'allowed_domains': {
'type': 'array',
'items': {'type': 'string'},
'description': 'List of domains the agent is allowed to visit (security feature)',
'default': [],
},
'use_vision': {
'type': 'boolean',
'description': 'Whether to use vision capabilities (screenshots) for the agent',
'default': True,
},
},
'required': ['task'],
},
),
# Browser session management tools
types.Tool(
name='browser_list_sessions',
description='List all active browser sessions with their details and last activity time',
inputSchema={'type': 'object', 'properties': {}},
),
types.Tool(
name='browser_close_session',
description='Close a specific browser session by its ID',
inputSchema={
'type': 'object',
'properties': {
'session_id': {
'type': 'string',
'description': 'The browser session ID to close (get from browser_list_sessions)',
}
},
'required': ['session_id'],
},
),
types.Tool(
name='browser_close_all',
description='Close all active browser sessions and clean up resources',
inputSchema={'type': 'object', 'properties': {}},
),
]
@self.server.list_resources()
async def handle_list_resources() -> list[types.Resource]:
"""List available resources (none for browser-use)."""
return []
@self.server.list_prompts()
async def handle_list_prompts() -> list[types.Prompt]:
"""List available prompts (none for browser-use)."""
return []
@self.server.call_tool()
async def handle_call_tool(name: str, arguments: dict[str, Any] | None) -> list[types.TextContent]:
"""Handle tool execution."""
start_time = time.time()
error_msg = None
try:
result = await self._execute_tool(name, arguments or {})
return [types.TextContent(type='text', text=result)]
except Exception as e:
error_msg = str(e)
logger.error(f'Tool execution failed: {e}', exc_info=True)
return [types.TextContent(type='text', text=f'Error: {str(e)}')]
finally:
# Capture telemetry for tool calls
duration = time.time() - start_time
self._telemetry.capture(
MCPServerTelemetryEvent(
version=get_browser_use_version(),
action='tool_call',
tool_name=name,
duration_seconds=duration,
error_message=error_msg,
)
)
async def _execute_tool(self, tool_name: str, arguments: dict[str, Any]) -> str:
"""Execute a browser-use tool."""
# Agent-based tools
if tool_name == 'retry_with_browser_use_agent':
return await self._retry_with_browser_use_agent(
task=arguments['task'],
max_steps=arguments.get('max_steps', 100),
model=arguments.get('model', 'gpt-4o'),
allowed_domains=arguments.get('allowed_domains', []),
use_vision=arguments.get('use_vision', True),
)
# Browser session management tools (don't require active session)
if tool_name == 'browser_list_sessions':
return await self._list_sessions()
elif tool_name == 'browser_close_session':
return await self._close_session(arguments['session_id'])
elif tool_name == 'browser_close_all':
return await self._close_all_sessions()
# Direct browser control tools (require active session)
elif tool_name.startswith('browser_'):
# Ensure browser session exists
if not self.browser_session:
await self._init_browser_session()
if tool_name == 'browser_navigate':
return await self._navigate(arguments['url'], arguments.get('new_tab', False))
elif tool_name == 'browser_click':
return await self._click(arguments['index'], arguments.get('new_tab', False))
elif tool_name == 'browser_type':
return await self._type_text(arguments['index'], arguments['text'])
elif tool_name == 'browser_get_state':
return await self._get_browser_state(arguments.get('include_screenshot', False))
elif tool_name == 'browser_extract_content':
return await self._extract_content(arguments['query'], arguments.get('extract_links', False))
elif tool_name == 'browser_scroll':
return await self._scroll(arguments.get('direction', 'down'))
elif tool_name == 'browser_go_back':
return await self._go_back()
elif tool_name == 'browser_close':
return await self._close_browser()
elif tool_name == 'browser_list_tabs':
return await self._list_tabs()
elif tool_name == 'browser_switch_tab':
return await self._switch_tab(arguments['tab_id'])
elif tool_name == 'browser_close_tab':
return await self._close_tab(arguments['tab_id'])
return f'Unknown tool: {tool_name}'
async def _init_browser_session(self, allowed_domains: list[str] | None = None, **kwargs):
"""Initialize browser session using config"""
if self.browser_session:
return
# Ensure all logging goes to stderr before browser initialization
_ensure_all_loggers_use_stderr()
logger.debug('Initializing browser session...')
# Get profile config
profile_config = get_default_profile(self.config)
# Merge profile config with defaults and overrides
profile_data = {
'downloads_path': str(Path.home() / 'Downloads' / 'browser-use-mcp'),
'wait_between_actions': 0.5,
'keep_alive': True,
'user_data_dir': '~/.config/browseruse/profiles/default',
'device_scale_factor': 1.0,
'disable_security': False,
'headless': False,
**profile_config, # Config values override defaults
}
# Tool parameter overrides (highest priority)
if allowed_domains is not None:
profile_data['allowed_domains'] = allowed_domains
# Merge any additional kwargs that are valid BrowserProfile fields
for key, value in kwargs.items():
profile_data[key] = value
# Create browser profile
profile = BrowserProfile(**profile_data)
# Create browser session
self.browser_session = BrowserSession(browser_profile=profile)
await self.browser_session.start()
# Track the session for management
self._track_session(self.browser_session)
# Create tools for direct actions
self.tools = Tools()
# Initialize LLM from config
llm_config = get_default_llm(self.config)
if api_key := llm_config.get('api_key'):
self.llm = ChatOpenAI(
model=llm_config.get('model', 'gpt-4o-mini'),
api_key=api_key,
temperature=llm_config.get('temperature', 0.7),
# max_tokens=llm_config.get('max_tokens'),
)
# Initialize FileSystem for extraction actions
file_system_path = profile_config.get('file_system_path', '~/.browser-use-mcp')
self.file_system = FileSystem(base_dir=Path(file_system_path).expanduser())
logger.debug('Browser session initialized')
async def _retry_with_browser_use_agent(
self,
task: str,
max_steps: int = 100,
model: str = 'gpt-4o',
allowed_domains: list[str] | None = None,
use_vision: bool = True,
) -> str:
"""Run an autonomous agent task."""
logger.debug(f'Running agent task: {task}')
# Get LLM config
llm_config = get_default_llm(self.config)
# Get LLM provider
model_provider = llm_config.get('model_provider') or os.getenv('MODEL_PROVIDER')
# 如果model_provider不等于空,且等Bedrock
if model_provider and model_provider.lower() == 'bedrock':
llm_model = llm_config.get('model') or os.getenv('MODEL') or 'us.anthropic.claude-sonnet-4-20250514-v1:0'
aws_region = llm_config.get('region') or os.getenv('REGION')
if not aws_region:
aws_region = 'us-east-1'
llm = ChatAWSBedrock(
model=llm_model, # or any Bedrock model
aws_region=aws_region,
aws_sso_auth=True,
)
else:
api_key = llm_config.get('api_key') or os.getenv('OPENAI_API_KEY')
if not api_key:
return 'Error: OPENAI_API_KEY not set in config or environment'
# Override model if provided in tool call
if model != llm_config.get('model', 'gpt-4o'):
llm_model = model
else:
llm_model = llm_config.get('model', 'gpt-4o')
llm = ChatOpenAI(
model=llm_model,
api_key=api_key,
temperature=llm_config.get('temperature', 0.7),
)
# Get profile config and merge with tool parameters
profile_config = get_default_profile(self.config)
# Override allowed_domains if provided in tool call
if allowed_domains is not None:
profile_config['allowed_domains'] = allowed_domains
# Create browser profile using config
profile = BrowserProfile(**profile_config)
# Create and run agent
agent = Agent(
task=task,
llm=llm,
browser_profile=profile,
use_vision=use_vision,
)
try:
history = await agent.run(max_steps=max_steps)
# Format results
results = []
results.append(f'Task completed in {len(history.history)} steps')
results.append(f'Success: {history.is_successful()}')
# Get final result if available
final_result = history.final_result()
if final_result:
results.append(f'\nFinal result:\n{final_result}')
# Include any errors
errors = history.errors()
if errors:
results.append(f'\nErrors encountered:\n{json.dumps(errors, indent=2)}')
# Include URLs visited
urls = history.urls()
if urls:
# Filter out None values and convert to strings
valid_urls = [str(url) for url in urls if url is not None]
if valid_urls:
results.append(f'\nURLs visited: {", ".join(valid_urls)}')
return '\n'.join(results)
except Exception as e:
logger.error(f'Agent task failed: {e}', exc_info=True)
return f'Agent task failed: {str(e)}'
finally:
# Clean up
await agent.close()
async def _navigate(self, url: str, new_tab: bool = False) -> str:
"""Navigate to a URL."""
if not self.browser_session:
return 'Error: No browser session active'
# Update session activity
self._update_session_activity(self.browser_session.id)
from browser_use.browser.events import NavigateToUrlEvent
if new_tab:
event = self.browser_session.event_bus.dispatch(NavigateToUrlEvent(url=url, new_tab=True))
await event
return f'Opened new tab with URL: {url}'
else:
event = self.browser_session.event_bus.dispatch(NavigateToUrlEvent(url=url))
await event
return f'Navigated to: {url}'
async def _click(self, index: int, new_tab: bool = False) -> str:
"""Click an element by index."""
if not self.browser_session:
return 'Error: No browser session active'
# Update session activity
self._update_session_activity(self.browser_session.id)
# Get the element
element = await self.browser_session.get_dom_element_by_index(index)
if not element:
return f'Element with index {index} not found'
if new_tab:
# For links, extract href and open in new tab
href = element.attributes.get('href')
if href:
# Convert relative href to absolute URL
state = await self.browser_session.get_browser_state_summary()
current_url = state.url
if href.startswith('/'):
# Relative URL - construct full URL
from urllib.parse import urlparse
parsed = urlparse(current_url)
full_url = f'{parsed.scheme}://{parsed.netloc}{href}'
else:
full_url = href
# Open link in new tab
from browser_use.browser.events import NavigateToUrlEvent
event = self.browser_session.event_bus.dispatch(NavigateToUrlEvent(url=full_url, new_tab=True))
await event
return f'Clicked element {index} and opened in new tab {full_url[:20]}...'
else:
# For non-link elements, just do a normal click
# Opening in new tab without href is not reliably supported
from browser_use.browser.events import ClickElementEvent
event = self.browser_session.event_bus.dispatch(ClickElementEvent(node=element))
await event
return f'Clicked element {index} (new tab not supported for non-link elements)'
else:
# Normal click
from browser_use.browser.events import ClickElementEvent
event = self.browser_session.event_bus.dispatch(ClickElementEvent(node=element))
await event
return f'Clicked element {index}'
async def _type_text(self, index: int, text: str) -> str:
"""Type text into an element."""
if not self.browser_session:
return 'Error: No browser session active'
element = await self.browser_session.get_dom_element_by_index(index)
if not element:
return f'Element with index {index} not found'
from browser_use.browser.events import TypeTextEvent
# Conservative heuristic to detect potentially sensitive data
# Only flag very obvious patterns to minimize false positives
is_potentially_sensitive = len(text) >= 6 and (
# Email pattern: contains @ and a domain-like suffix
('@' in text and '.' in text.split('@')[-1] if '@' in text else False)
# Mixed alphanumeric with reasonable complexity (likely API keys/tokens)
or (
len(text) >= 16
and any(char.isdigit() for char in text)
and any(char.isalpha() for char in text)
and any(char in '.-_' for char in text)
)
)
# Use generic key names to avoid information leakage about detection patterns
sensitive_key_name = None
if is_potentially_sensitive:
if '@' in text and '.' in text.split('@')[-1]:
sensitive_key_name = 'email'
else:
sensitive_key_name = 'credential'
event = self.browser_session.event_bus.dispatch(
TypeTextEvent(node=element, text=text, is_sensitive=is_potentially_sensitive, sensitive_key_name=sensitive_key_name)
)
await event
if is_potentially_sensitive:
if sensitive_key_name:
return f'Typed <{sensitive_key_name}> into element {index}'
else:
return f'Typed <sensitive> into element {index}'
else:
return f"Typed '{text}' into element {index}"
async def _get_browser_state(self, include_screenshot: bool = False) -> str:
"""Get current browser state."""
if not self.browser_session:
return 'Error: No browser session active'
state = await self.browser_session.get_browser_state_summary()
result = {
'url': state.url,
'title': state.title,
'tabs': [{'url': tab.url, 'title': tab.title} for tab in state.tabs],
'interactive_elements': [],
}
# Add interactive elements with their indices
for index, element in state.dom_state.selector_map.items():
elem_info = {
'index': index,
'tag': element.tag_name,
'text': element.get_all_children_text(max_depth=2)[:100],
}
if element.attributes.get('placeholder'):
elem_info['placeholder'] = element.attributes['placeholder']
if element.attributes.get('href'):
elem_info['href'] = element.attributes['href']
result['interactive_elements'].append(elem_info)
if include_screenshot and state.screenshot:
result['screenshot'] = state.screenshot
return json.dumps(result, indent=2)
async def _extract_content(self, query: str, extract_links: bool = False) -> str:
"""Extract content from current page."""
if not self.llm:
return 'Error: LLM not initialized (set OPENAI_API_KEY)'
if not self.file_system:
return 'Error: FileSystem not initialized'
if not self.browser_session:
return 'Error: No browser session active'
if not self.tools:
return 'Error: Tools not initialized'
state = await self.browser_session.get_browser_state_summary()
# Use the extract action
# Create a dynamic action model that matches the tools's expectations
from pydantic import create_model
# Create action model dynamically
ExtractAction = create_model(
'ExtractAction',
__base__=ActionModel,
extract=dict[str, Any],
)
# Use model_validate because Pyright does not understand the dynamic model
action = ExtractAction.model_validate(
{
'extract': {'query': query, 'extract_links': extract_links},
}
)
action_result = await self.tools.act(
action=action,
browser_session=self.browser_session,
page_extraction_llm=self.llm,
file_system=self.file_system,
)
return action_result.extracted_content or 'No content extracted'
async def _scroll(self, direction: str = 'down') -> str:
"""Scroll the page."""
if not self.browser_session:
return 'Error: No browser session active'
from browser_use.browser.events import ScrollEvent
# Scroll by a standard amount (500 pixels)
event = self.browser_session.event_bus.dispatch(
ScrollEvent(
direction=direction, # type: ignore
amount=500,
)
)
await event
return f'Scrolled {direction}'
async def _go_back(self) -> str:
"""Go back in browser history."""
if not self.browser_session:
return 'Error: No browser session active'
from browser_use.browser.events import GoBackEvent
event = self.browser_session.event_bus.dispatch(GoBackEvent())
await event
return 'Navigated back'
async def _close_browser(self) -> str:
"""Close the browser session."""
if self.browser_session:
from browser_use.browser.events import BrowserStopEvent
event = self.browser_session.event_bus.dispatch(BrowserStopEvent())
await event
self.browser_session = None
self.tools = None
return 'Browser closed'
return 'No browser session to close'
async def _list_tabs(self) -> str:
"""List all open tabs."""
if not self.browser_session:
return 'Error: No browser session active'
tabs_info = await self.browser_session.get_tabs()
tabs = []
for i, tab in enumerate(tabs_info):
tabs.append({'tab_id': tab.target_id[-4:], 'url': tab.url, 'title': tab.title or ''})
return json.dumps(tabs, indent=2)
async def _switch_tab(self, tab_id: str) -> str:
"""Switch to a different tab."""
if not self.browser_session:
return 'Error: No browser session active'
from browser_use.browser.events import SwitchTabEvent
target_id = await self.browser_session.get_target_id_from_tab_id(tab_id)
event = self.browser_session.event_bus.dispatch(SwitchTabEvent(target_id=target_id))
await event
state = await self.browser_session.get_browser_state_summary()
return f'Switched to tab {tab_id}: {state.url}'
async def _close_tab(self, tab_id: str) -> str:
"""Close a specific tab."""
if not self.browser_session:
return 'Error: No browser session active'
from browser_use.browser.events import CloseTabEvent
target_id = await self.browser_session.get_target_id_from_tab_id(tab_id)
event = self.browser_session.event_bus.dispatch(CloseTabEvent(target_id=target_id))
await event
current_url = await self.browser_session.get_current_page_url()
return f'Closed tab # {tab_id}, now on {current_url}'
def _track_session(self, session: BrowserSession) -> None:
"""Track a browser session for management."""
self.active_sessions[session.id] = {
'session': session,
'created_at': time.time(),
'last_activity': time.time(),
'url': getattr(session, 'current_url', None),
}
def _update_session_activity(self, session_id: str) -> None:
"""Update the last activity time for a session."""
if session_id in self.active_sessions:
self.active_sessions[session_id]['last_activity'] = time.time()
async def _list_sessions(self) -> str:
"""List all active browser sessions."""
if not self.active_sessions:
return 'No active browser sessions'
sessions_info = []
for session_id, session_data in self.active_sessions.items():
session = session_data['session']
created_at = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(session_data['created_at']))
last_activity = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(session_data['last_activity']))
# Check if session is still active
is_active = hasattr(session, 'cdp_client') and session.cdp_client is not None
sessions_info.append(
{
'session_id': session_id,
'created_at': created_at,
'last_activity': last_activity,
'active': is_active,
'current_url': session_data.get('url', 'Unknown'),
'age_minutes': (time.time() - session_data['created_at']) / 60,
}
)
return json.dumps(sessions_info, indent=2)
async def _close_session(self, session_id: str) -> str:
"""Close a specific browser session."""
if session_id not in self.active_sessions:
return f'Session {session_id} not found'
session_data = self.active_sessions[session_id]
session = session_data['session']
try:
# Close the session
if hasattr(session, 'kill'):
await session.kill()
elif hasattr(session, 'close'):
await session.close()
# Remove from tracking
del self.active_sessions[session_id]
# If this was the current session, clear it
if self.browser_session and self.browser_session.id == session_id:
self.browser_session = None
self.tools = None
return f'Successfully closed session {session_id}'
except Exception as e:
return f'Error closing session {session_id}: {str(e)}'
async def _close_all_sessions(self) -> str:
"""Close all active browser sessions."""
if not self.active_sessions:
return 'No active sessions to close'
closed_count = 0
errors = []
for session_id in list(self.active_sessions.keys()):
try:
result = await self._close_session(session_id)
if 'Successfully closed' in result:
closed_count += 1
else:
errors.append(f'{session_id}: {result}')
except Exception as e:
errors.append(f'{session_id}: {str(e)}')
# Clear current session references
self.browser_session = None
self.tools = None
result = f'Closed {closed_count} sessions'
if errors:
result += f'. Errors: {"; ".join(errors)}'
return result
async def _cleanup_expired_sessions(self) -> None:
"""Background task to clean up expired sessions."""
current_time = time.time()
timeout_seconds = self.session_timeout_minutes * 60
expired_sessions = []
for session_id, session_data in self.active_sessions.items():
last_activity = session_data['last_activity']
if current_time - last_activity > timeout_seconds:
expired_sessions.append(session_id)
for session_id in expired_sessions:
try:
await self._close_session(session_id)
logger.info(f'Auto-closed expired session {session_id}')
except Exception as e:
logger.error(f'Error auto-closing session {session_id}: {e}')
async def _start_cleanup_task(self) -> None:
"""Start the background cleanup task."""
async def cleanup_loop():
while True:
try:
await self._cleanup_expired_sessions()
# Check every 2 minutes
await asyncio.sleep(120)
except Exception as e:
logger.error(f'Error in cleanup task: {e}')
await asyncio.sleep(120)
self._cleanup_task = asyncio.create_task(cleanup_loop())
async def run(self):
"""Run the MCP server."""
# Start the cleanup task
await self._start_cleanup_task()
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await self.server.run(
read_stream,
write_stream,
InitializationOptions(
server_name='browser-use',
server_version='0.1.0',
capabilities=self.server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
async def main(session_timeout_minutes: int = 10):
if not MCP_AVAILABLE:
print('MCP SDK is required. Install with: pip install mcp', file=sys.stderr)
sys.exit(1)
server = BrowserUseServer(session_timeout_minutes=session_timeout_minutes)
server._telemetry.capture(
MCPServerTelemetryEvent(
version=get_browser_use_version(),
action='start',
parent_process_cmdline=get_parent_process_cmdline(),
)
)
try:
await server.run()
finally:
duration = time.time() - server._start_time
server._telemetry.capture(
MCPServerTelemetryEvent(
version=get_browser_use_version(),
action='stop',
duration_seconds=duration,
parent_process_cmdline=get_parent_process_cmdline(),
)
)
server._telemetry.flush()
if __name__ == '__main__':
asyncio.run(main())