Spaces:
Runtime error
Runtime error
| import asyncio | |
| import base64 | |
| import json | |
| from typing import Generic, Optional, TypeVar | |
| from browser_use import Browser as BrowserUseBrowser | |
| from browser_use import BrowserConfig | |
| from browser_use.browser.context import BrowserContext, BrowserContextConfig | |
| from browser_use.dom.service import DomService | |
| from pydantic import Field, field_validator | |
| from pydantic_core.core_schema import ValidationInfo | |
| from app.config import config | |
| from app.llm import LLM | |
| from app.tool.base import BaseTool, ToolResult | |
| from app.tool.web_search import WebSearch | |
| _BROWSER_DESCRIPTION = """\ | |
| A powerful browser automation tool that allows interaction with web pages through various actions. | |
| * This tool provides commands for controlling a browser session, navigating web pages, and extracting information | |
| * It maintains state across calls, keeping the browser session alive until explicitly closed | |
| * Use this when you need to browse websites, fill forms, click buttons, extract content, or perform web searches | |
| * Each action requires specific parameters as defined in the tool's dependencies | |
| Key capabilities include: | |
| * Navigation: Go to specific URLs, go back, search the web, or refresh pages | |
| * Interaction: Click elements, input text, select from dropdowns, send keyboard commands | |
| * Scrolling: Scroll up/down by pixel amount or scroll to specific text | |
| * Content extraction: Extract and analyze content from web pages based on specific goals | |
| * Tab management: Switch between tabs, open new tabs, or close tabs | |
| Note: When using element indices, refer to the numbered elements shown in the current browser state. | |
| """ | |
| Context = TypeVar("Context") | |
| class BrowserUseTool(BaseTool, Generic[Context]): | |
| name: str = "browser_use" | |
| description: str = _BROWSER_DESCRIPTION | |
| parameters: dict = { | |
| "type": "object", | |
| "properties": { | |
| "action": { | |
| "type": "string", | |
| "enum": [ | |
| "go_to_url", | |
| "click_element", | |
| "input_text", | |
| "scroll_down", | |
| "scroll_up", | |
| "scroll_to_text", | |
| "send_keys", | |
| "get_dropdown_options", | |
| "select_dropdown_option", | |
| "go_back", | |
| "web_search", | |
| "wait", | |
| "extract_content", | |
| "switch_tab", | |
| "open_tab", | |
| "close_tab", | |
| ], | |
| "description": "The browser action to perform", | |
| }, | |
| "url": { | |
| "type": "string", | |
| "description": "URL for 'go_to_url' or 'open_tab' actions", | |
| }, | |
| "index": { | |
| "type": "integer", | |
| "description": "Element index for 'click_element', 'input_text', 'get_dropdown_options', or 'select_dropdown_option' actions", | |
| }, | |
| "text": { | |
| "type": "string", | |
| "description": "Text for 'input_text', 'scroll_to_text', or 'select_dropdown_option' actions", | |
| }, | |
| "scroll_amount": { | |
| "type": "integer", | |
| "description": "Pixels to scroll (positive for down, negative for up) for 'scroll_down' or 'scroll_up' actions", | |
| }, | |
| "tab_id": { | |
| "type": "integer", | |
| "description": "Tab ID for 'switch_tab' action", | |
| }, | |
| "query": { | |
| "type": "string", | |
| "description": "Search query for 'web_search' action", | |
| }, | |
| "goal": { | |
| "type": "string", | |
| "description": "Extraction goal for 'extract_content' action", | |
| }, | |
| "keys": { | |
| "type": "string", | |
| "description": "Keys to send for 'send_keys' action", | |
| }, | |
| "seconds": { | |
| "type": "integer", | |
| "description": "Seconds to wait for 'wait' action", | |
| }, | |
| }, | |
| "required": ["action"], | |
| "dependencies": { | |
| "go_to_url": ["url"], | |
| "click_element": ["index"], | |
| "input_text": ["index", "text"], | |
| "switch_tab": ["tab_id"], | |
| "open_tab": ["url"], | |
| "scroll_down": ["scroll_amount"], | |
| "scroll_up": ["scroll_amount"], | |
| "scroll_to_text": ["text"], | |
| "send_keys": ["keys"], | |
| "get_dropdown_options": ["index"], | |
| "select_dropdown_option": ["index", "text"], | |
| "go_back": [], | |
| "web_search": ["query"], | |
| "wait": ["seconds"], | |
| "extract_content": ["goal"], | |
| }, | |
| } | |
| lock: asyncio.Lock = Field(default_factory=asyncio.Lock) | |
| browser: Optional[BrowserUseBrowser] = Field(default=None, exclude=True) | |
| context: Optional[BrowserContext] = Field(default=None, exclude=True) | |
| dom_service: Optional[DomService] = Field(default=None, exclude=True) | |
| web_search_tool: WebSearch = Field(default_factory=WebSearch, exclude=True) | |
| # Context for generic functionality | |
| tool_context: Optional[Context] = Field(default=None, exclude=True) | |
| llm: Optional[LLM] = Field(default_factory=LLM) | |
| def validate_parameters(cls, v: dict, info: ValidationInfo) -> dict: | |
| if not v: | |
| raise ValueError("Parameters cannot be empty") | |
| return v | |
| async def _ensure_browser_initialized(self) -> BrowserContext: | |
| """Ensure browser and context are initialized.""" | |
| if self.browser is None: | |
| browser_config_kwargs = {"headless": False, "disable_security": True} | |
| if config.browser_config: | |
| from browser_use.browser.browser import ProxySettings | |
| # handle proxy settings. | |
| if config.browser_config.proxy and config.browser_config.proxy.server: | |
| browser_config_kwargs["proxy"] = ProxySettings( | |
| server=config.browser_config.proxy.server, | |
| username=config.browser_config.proxy.username, | |
| password=config.browser_config.proxy.password, | |
| ) | |
| browser_attrs = [ | |
| "headless", | |
| "disable_security", | |
| "extra_chromium_args", | |
| "chrome_instance_path", | |
| "wss_url", | |
| "cdp_url", | |
| ] | |
| for attr in browser_attrs: | |
| value = getattr(config.browser_config, attr, None) | |
| if value is not None: | |
| if not isinstance(value, list) or value: | |
| browser_config_kwargs[attr] = value | |
| self.browser = BrowserUseBrowser(BrowserConfig(**browser_config_kwargs)) | |
| if self.context is None: | |
| context_config = BrowserContextConfig() | |
| # if there is context config in the config, use it. | |
| if ( | |
| config.browser_config | |
| and hasattr(config.browser_config, "new_context_config") | |
| and config.browser_config.new_context_config | |
| ): | |
| context_config = config.browser_config.new_context_config | |
| self.context = await self.browser.new_context(context_config) | |
| self.dom_service = DomService(await self.context.get_current_page()) | |
| return self.context | |
| async def execute( | |
| self, | |
| action: str, | |
| url: Optional[str] = None, | |
| index: Optional[int] = None, | |
| text: Optional[str] = None, | |
| scroll_amount: Optional[int] = None, | |
| tab_id: Optional[int] = None, | |
| query: Optional[str] = None, | |
| goal: Optional[str] = None, | |
| keys: Optional[str] = None, | |
| seconds: Optional[int] = None, | |
| **kwargs, | |
| ) -> ToolResult: | |
| """ | |
| Execute a specified browser action. | |
| Args: | |
| action: The browser action to perform | |
| url: URL for navigation or new tab | |
| index: Element index for click or input actions | |
| text: Text for input action or search query | |
| scroll_amount: Pixels to scroll for scroll action | |
| tab_id: Tab ID for switch_tab action | |
| query: Search query for Google search | |
| goal: Extraction goal for content extraction | |
| keys: Keys to send for keyboard actions | |
| seconds: Seconds to wait | |
| **kwargs: Additional arguments | |
| Returns: | |
| ToolResult with the action's output or error | |
| """ | |
| async with self.lock: | |
| try: | |
| context = await self._ensure_browser_initialized() | |
| # Get max content length from config | |
| max_content_length = getattr( | |
| config.browser_config, "max_content_length", 2000 | |
| ) | |
| # Navigation actions | |
| if action == "go_to_url": | |
| if not url: | |
| return ToolResult( | |
| error="URL is required for 'go_to_url' action" | |
| ) | |
| page = await context.get_current_page() | |
| await page.goto(url) | |
| await page.wait_for_load_state() | |
| return ToolResult(output=f"Navigated to {url}") | |
| elif action == "go_back": | |
| await context.go_back() | |
| return ToolResult(output="Navigated back") | |
| elif action == "refresh": | |
| await context.refresh_page() | |
| return ToolResult(output="Refreshed current page") | |
| elif action == "web_search": | |
| if not query: | |
| return ToolResult( | |
| error="Query is required for 'web_search' action" | |
| ) | |
| # Execute the web search and return results directly without browser navigation | |
| search_response = await self.web_search_tool.execute( | |
| query=query, fetch_content=True, num_results=1 | |
| ) | |
| # Navigate to the first search result | |
| first_search_result = search_response.results[0] | |
| url_to_navigate = first_search_result.url | |
| page = await context.get_current_page() | |
| await page.goto(url_to_navigate) | |
| await page.wait_for_load_state() | |
| return search_response | |
| # Element interaction actions | |
| elif action == "click_element": | |
| if index is None: | |
| return ToolResult( | |
| error="Index is required for 'click_element' action" | |
| ) | |
| element = await context.get_dom_element_by_index(index) | |
| if not element: | |
| return ToolResult(error=f"Element with index {index} not found") | |
| download_path = await context._click_element_node(element) | |
| output = f"Clicked element at index {index}" | |
| if download_path: | |
| output += f" - Downloaded file to {download_path}" | |
| return ToolResult(output=output) | |
| elif action == "input_text": | |
| if index is None or not text: | |
| return ToolResult( | |
| error="Index and text are required for 'input_text' action" | |
| ) | |
| element = await context.get_dom_element_by_index(index) | |
| if not element: | |
| return ToolResult(error=f"Element with index {index} not found") | |
| await context._input_text_element_node(element, text) | |
| return ToolResult( | |
| output=f"Input '{text}' into element at index {index}" | |
| ) | |
| elif action == "scroll_down" or action == "scroll_up": | |
| direction = 1 if action == "scroll_down" else -1 | |
| amount = ( | |
| scroll_amount | |
| if scroll_amount is not None | |
| else context.config.browser_window_size["height"] | |
| ) | |
| await context.execute_javascript( | |
| f"window.scrollBy(0, {direction * amount});" | |
| ) | |
| return ToolResult( | |
| output=f"Scrolled {'down' if direction > 0 else 'up'} by {amount} pixels" | |
| ) | |
| elif action == "scroll_to_text": | |
| if not text: | |
| return ToolResult( | |
| error="Text is required for 'scroll_to_text' action" | |
| ) | |
| page = await context.get_current_page() | |
| try: | |
| locator = page.get_by_text(text, exact=False) | |
| await locator.scroll_into_view_if_needed() | |
| return ToolResult(output=f"Scrolled to text: '{text}'") | |
| except Exception as e: | |
| return ToolResult(error=f"Failed to scroll to text: {str(e)}") | |
| elif action == "send_keys": | |
| if not keys: | |
| return ToolResult( | |
| error="Keys are required for 'send_keys' action" | |
| ) | |
| page = await context.get_current_page() | |
| await page.keyboard.press(keys) | |
| return ToolResult(output=f"Sent keys: {keys}") | |
| elif action == "get_dropdown_options": | |
| if index is None: | |
| return ToolResult( | |
| error="Index is required for 'get_dropdown_options' action" | |
| ) | |
| element = await context.get_dom_element_by_index(index) | |
| if not element: | |
| return ToolResult(error=f"Element with index {index} not found") | |
| page = await context.get_current_page() | |
| options = await page.evaluate( | |
| """ | |
| (xpath) => { | |
| const select = document.evaluate(xpath, document, null, | |
| XPathResult.FIRST_ORDERED_NODE_TYPE, null).singleNodeValue; | |
| if (!select) return null; | |
| return Array.from(select.options).map(opt => ({ | |
| text: opt.text, | |
| value: opt.value, | |
| index: opt.index | |
| })); | |
| } | |
| """, | |
| element.xpath, | |
| ) | |
| return ToolResult(output=f"Dropdown options: {options}") | |
| elif action == "select_dropdown_option": | |
| if index is None or not text: | |
| return ToolResult( | |
| error="Index and text are required for 'select_dropdown_option' action" | |
| ) | |
| element = await context.get_dom_element_by_index(index) | |
| if not element: | |
| return ToolResult(error=f"Element with index {index} not found") | |
| page = await context.get_current_page() | |
| await page.select_option(element.xpath, label=text) | |
| return ToolResult( | |
| output=f"Selected option '{text}' from dropdown at index {index}" | |
| ) | |
| # Content extraction actions | |
| elif action == "extract_content": | |
| if not goal: | |
| return ToolResult( | |
| error="Goal is required for 'extract_content' action" | |
| ) | |
| page = await context.get_current_page() | |
| import markdownify | |
| content = markdownify.markdownify(await page.content()) | |
| prompt = f"""\ | |
| Your task is to extract the content of the page. You will be given a page and a goal, and you should extract all relevant information around this goal from the page. If the goal is vague, summarize the page. Respond in json format. | |
| Extraction goal: {goal} | |
| Page content: | |
| {content[:max_content_length]} | |
| """ | |
| messages = [{"role": "system", "content": prompt}] | |
| # Define extraction function schema | |
| extraction_function = { | |
| "type": "function", | |
| "function": { | |
| "name": "extract_content", | |
| "description": "Extract specific information from a webpage based on a goal", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "extracted_content": { | |
| "type": "object", | |
| "description": "The content extracted from the page according to the goal", | |
| "properties": { | |
| "text": { | |
| "type": "string", | |
| "description": "Text content extracted from the page", | |
| }, | |
| "metadata": { | |
| "type": "object", | |
| "description": "Additional metadata about the extracted content", | |
| "properties": { | |
| "source": { | |
| "type": "string", | |
| "description": "Source of the extracted content", | |
| } | |
| }, | |
| }, | |
| }, | |
| } | |
| }, | |
| "required": ["extracted_content"], | |
| }, | |
| }, | |
| } | |
| # Use LLM to extract content with required function calling | |
| response = await self.llm.ask_tool( | |
| messages, | |
| tools=[extraction_function], | |
| tool_choice="required", | |
| ) | |
| if response and response.tool_calls: | |
| args = json.loads(response.tool_calls[0].function.arguments) | |
| extracted_content = args.get("extracted_content", {}) | |
| return ToolResult( | |
| output=f"Extracted from page:\n{extracted_content}\n" | |
| ) | |
| return ToolResult(output="No content was extracted from the page.") | |
| # Tab management actions | |
| elif action == "switch_tab": | |
| if tab_id is None: | |
| return ToolResult( | |
| error="Tab ID is required for 'switch_tab' action" | |
| ) | |
| await context.switch_to_tab(tab_id) | |
| page = await context.get_current_page() | |
| await page.wait_for_load_state() | |
| return ToolResult(output=f"Switched to tab {tab_id}") | |
| elif action == "open_tab": | |
| if not url: | |
| return ToolResult(error="URL is required for 'open_tab' action") | |
| await context.create_new_tab(url) | |
| return ToolResult(output=f"Opened new tab with {url}") | |
| elif action == "close_tab": | |
| await context.close_current_tab() | |
| return ToolResult(output="Closed current tab") | |
| # Utility actions | |
| elif action == "wait": | |
| seconds_to_wait = seconds if seconds is not None else 3 | |
| await asyncio.sleep(seconds_to_wait) | |
| return ToolResult(output=f"Waited for {seconds_to_wait} seconds") | |
| else: | |
| return ToolResult(error=f"Unknown action: {action}") | |
| except Exception as e: | |
| return ToolResult(error=f"Browser action '{action}' failed: {str(e)}") | |
| async def get_current_state( | |
| self, context: Optional[BrowserContext] = None | |
| ) -> ToolResult: | |
| """ | |
| Get the current browser state as a ToolResult. | |
| If context is not provided, uses self.context. | |
| """ | |
| try: | |
| # Use provided context or fall back to self.context | |
| ctx = context or self.context | |
| if not ctx: | |
| return ToolResult(error="Browser context not initialized") | |
| state = await ctx.get_state() | |
| # Create a viewport_info dictionary if it doesn't exist | |
| viewport_height = 0 | |
| if hasattr(state, "viewport_info") and state.viewport_info: | |
| viewport_height = state.viewport_info.height | |
| elif hasattr(ctx, "config") and hasattr(ctx.config, "browser_window_size"): | |
| viewport_height = ctx.config.browser_window_size.get("height", 0) | |
| # Take a screenshot for the state | |
| page = await ctx.get_current_page() | |
| await page.bring_to_front() | |
| await page.wait_for_load_state() | |
| screenshot = await page.screenshot( | |
| full_page=True, animations="disabled", type="jpeg", quality=100 | |
| ) | |
| screenshot = base64.b64encode(screenshot).decode("utf-8") | |
| # Build the state info with all required fields | |
| state_info = { | |
| "url": state.url, | |
| "title": state.title, | |
| "tabs": [tab.model_dump() for tab in state.tabs], | |
| "help": "[0], [1], [2], etc., represent clickable indices corresponding to the elements listed. Clicking on these indices will navigate to or interact with the respective content behind them.", | |
| "interactive_elements": ( | |
| state.element_tree.clickable_elements_to_string() | |
| if state.element_tree | |
| else "" | |
| ), | |
| "scroll_info": { | |
| "pixels_above": getattr(state, "pixels_above", 0), | |
| "pixels_below": getattr(state, "pixels_below", 0), | |
| "total_height": getattr(state, "pixels_above", 0) | |
| + getattr(state, "pixels_below", 0) | |
| + viewport_height, | |
| }, | |
| "viewport_height": viewport_height, | |
| } | |
| return ToolResult( | |
| output=json.dumps(state_info, indent=4, ensure_ascii=False), | |
| base64_image=screenshot, | |
| ) | |
| except Exception as e: | |
| return ToolResult(error=f"Failed to get browser state: {str(e)}") | |
| async def cleanup(self): | |
| """Clean up browser resources.""" | |
| async with self.lock: | |
| if self.context is not None: | |
| await self.context.close() | |
| self.context = None | |
| self.dom_service = None | |
| if self.browser is not None: | |
| await self.browser.close() | |
| self.browser = None | |
| def __del__(self): | |
| """Ensure cleanup when object is destroyed.""" | |
| if self.browser is not None or self.context is not None: | |
| try: | |
| asyncio.run(self.cleanup()) | |
| except RuntimeError: | |
| loop = asyncio.new_event_loop() | |
| loop.run_until_complete(self.cleanup()) | |
| loop.close() | |
| def create_with_context(cls, context: Context) -> "BrowserUseTool[Context]": | |
| """Factory method to create a BrowserUseTool with a specific context.""" | |
| tool = cls() | |
| tool.tool_context = context | |
| return tool | |