import asyncio import enum import json import logging import os from typing import Generic, TypeVar try: from lmnr import Laminar # type: ignore except ImportError: Laminar = None # type: ignore from pydantic import BaseModel from browser_use.agent.views import ActionModel, ActionResult from browser_use.browser import BrowserSession from browser_use.browser.events import ( ClickElementEvent, CloseTabEvent, GetDropdownOptionsEvent, GoBackEvent, NavigateToUrlEvent, ScrollEvent, ScrollToTextEvent, SendKeysEvent, SwitchTabEvent, TypeTextEvent, UploadFileEvent, ) from browser_use.browser.views import BrowserError from browser_use.dom.service import EnhancedDOMTreeNode from browser_use.filesystem.file_system import FileSystem from browser_use.llm.base import BaseChatModel from browser_use.llm.messages import SystemMessage, UserMessage from browser_use.observability import observe_debug from browser_use.tools.registry.service import Registry from browser_use.tools.utils import get_click_description from browser_use.tools.views import ( ClickElementAction, CloseTabAction, DoneAction, ExtractAction, GetDropdownOptionsAction, InputTextAction, NavigateAction, NoParamsAction, ScrollAction, SearchAction, SelectDropdownOptionAction, SendKeysAction, StructuredOutputAction, SwitchTabAction, UploadFileAction, ) from browser_use.utils import time_execution_sync logger = logging.getLogger(__name__) # Import EnhancedDOMTreeNode and rebuild event models that have forward references to it # This must be done after all imports are complete ClickElementEvent.model_rebuild() TypeTextEvent.model_rebuild() ScrollEvent.model_rebuild() UploadFileEvent.model_rebuild() Context = TypeVar('Context') T = TypeVar('T', bound=BaseModel) def _detect_sensitive_key_name(text: str, sensitive_data: dict[str, str | dict[str, str]] | None) -> str | None: """Detect which sensitive key name corresponds to the given text value.""" if not sensitive_data or not text: return None # Collect all sensitive values and their keys for domain_or_key, content in sensitive_data.items(): if isinstance(content, dict): # New format: {domain: {key: value}} for key, value in content.items(): if value and value == text: return key elif content: # Old format: {key: value} if content == text: return domain_or_key return None def handle_browser_error(e: BrowserError) -> ActionResult: if e.long_term_memory is not None: if e.short_term_memory is not None: return ActionResult( extracted_content=e.short_term_memory, error=e.long_term_memory, include_extracted_content_only_once=True ) else: return ActionResult(error=e.long_term_memory) # Fallback to original error handling if long_term_memory is None logger.warning( '⚠️ A BrowserError was raised without long_term_memory - always set long_term_memory when raising BrowserError to propagate right messages to LLM.' ) raise e class Tools(Generic[Context]): def __init__( self, exclude_actions: list[str] = [], output_model: type[T] | None = None, display_files_in_done_text: bool = True, ): self.registry = Registry[Context](exclude_actions) self.display_files_in_done_text = display_files_in_done_text """Register all default browser actions""" self._register_done_action(output_model) # Basic Navigation Actions @self.registry.action( '', param_model=SearchAction, ) async def search(params: SearchAction, browser_session: BrowserSession): import urllib.parse # Encode query for URL safety encoded_query = urllib.parse.quote_plus(params.query) # Build search URL based on search engine search_engines = { 'duckduckgo': f'https://duckduckgo.com/?q={encoded_query}', 'google': f'https://www.google.com/search?q={encoded_query}&udm=14', 'bing': f'https://www.bing.com/search?q={encoded_query}', } if params.engine.lower() not in search_engines: return ActionResult(error=f'Unsupported search engine: {params.engine}. Options: duckduckgo, google, bing') search_url = search_engines[params.engine.lower()] # Simple tab logic: use current tab by default use_new_tab = False # Dispatch navigation event try: event = browser_session.event_bus.dispatch( NavigateToUrlEvent( url=search_url, new_tab=use_new_tab, ) ) await event await event.event_result(raise_if_any=True, raise_if_none=False) memory = f"Searched {params.engine.title()} for '{params.query}'" msg = f'🔍 {memory}' logger.info(msg) return ActionResult(extracted_content=memory, long_term_memory=memory) except Exception as e: logger.error(f'Failed to search {params.engine}: {e}') return ActionResult(error=f'Failed to search {params.engine} for "{params.query}": {str(e)}') @self.registry.action( '', param_model=NavigateAction, ) async def navigate(params: NavigateAction, browser_session: BrowserSession): try: # Dispatch navigation event event = browser_session.event_bus.dispatch(NavigateToUrlEvent(url=params.url, new_tab=params.new_tab)) await event await event.event_result(raise_if_any=True, raise_if_none=False) if params.new_tab: memory = f'Opened new tab with URL {params.url}' msg = f'🔗 Opened new tab with url {params.url}' else: memory = f'Navigated to {params.url}' msg = f'🔗 {memory}' logger.info(msg) return ActionResult(extracted_content=msg, long_term_memory=memory) except Exception as e: error_msg = str(e) # Always log the actual error first for debugging browser_session.logger.error(f'❌ Navigation failed: {error_msg}') # Check if it's specifically a RuntimeError about CDP client if isinstance(e, RuntimeError) and 'CDP client not initialized' in error_msg: browser_session.logger.error('❌ Browser connection failed - CDP client not properly initialized') return ActionResult(error=f'Browser connection error: {error_msg}') # Check for network-related errors elif any( err in error_msg for err in [ 'ERR_NAME_NOT_RESOLVED', 'ERR_INTERNET_DISCONNECTED', 'ERR_CONNECTION_REFUSED', 'ERR_TIMED_OUT', 'net::', ] ): site_unavailable_msg = f'Navigation failed - site unavailable: {params.url}' browser_session.logger.warning(f'⚠️ {site_unavailable_msg} - {error_msg}') return ActionResult(error=site_unavailable_msg) else: # Return error in ActionResult instead of re-raising return ActionResult(error=f'Navigation failed: {str(e)}') @self.registry.action('', param_model=NoParamsAction) async def go_back(_: NoParamsAction, browser_session: BrowserSession): try: event = browser_session.event_bus.dispatch(GoBackEvent()) await event memory = 'Navigated back' msg = f'🔙 {memory}' logger.info(msg) return ActionResult(extracted_content=memory) except Exception as e: logger.error(f'Failed to dispatch GoBackEvent: {type(e).__name__}: {e}') error_msg = f'Failed to go back: {str(e)}' return ActionResult(error=error_msg) @self.registry.action('') async def wait(seconds: int = 3): # Cap wait time at maximum 30 seconds # Reduce the wait time by 3 seconds to account for the llm call which takes at least 3 seconds # So if the model decides to wait for 5 seconds, the llm call took at least 3 seconds, so we only need to wait for 2 seconds # Note by Mert: the above doesnt make sense because we do the LLM call right after this or this could be followed by another action after which we would like to wait # so I revert this. actual_seconds = min(max(seconds - 3, 0), 30) memory = f'Waited for {seconds} seconds' logger.info(f'🕒 waited for {seconds} second{"" if seconds == 1 else "s"}') await asyncio.sleep(actual_seconds) return ActionResult(extracted_content=memory, long_term_memory=memory) # Element Interaction Actions @self.registry.action( '', param_model=ClickElementAction, ) async def click(params: ClickElementAction, browser_session: BrowserSession): # Dispatch click event with node try: assert params.index != 0, ( 'Cannot click on element with index 0. If there are no interactive elements use wait(), refresh(), etc. to troubleshoot' ) # Look up the node from the selector map node = await browser_session.get_element_by_index(params.index) if node is None: msg = f'Element index {params.index} not available - page may have changed. Try refreshing browser state.' logger.warning(f'⚠️ {msg}') return ActionResult(extracted_content=msg) # Get description of clicked element element_desc = get_click_description(node) # Highlight the element being clicked (truly non-blocking) asyncio.create_task(browser_session.highlight_interaction_element(node)) event = browser_session.event_bus.dispatch(ClickElementEvent(node=node)) await event # Wait for handler to complete and get any exception or metadata click_metadata = await event.event_result(raise_if_any=True, raise_if_none=False) # Check if result contains validation error (e.g., trying to click elements.' in error_msg: try: return await dropdown_options( params=GetDropdownOptionsAction(index=params.index), browser_session=browser_session ) except Exception as dropdown_error: logger.debug( f'Failed to get dropdown options as shortcut during click on dropdown: {type(dropdown_error).__name__}: {dropdown_error}' ) return ActionResult(error=error_msg) # Build memory with element info memory = f'Clicked {element_desc}' logger.info(f'🖱️ {memory}') # Include click coordinates in metadata if available return ActionResult( extracted_content=memory, metadata=click_metadata if isinstance(click_metadata, dict) else None, ) except BrowserError as e: return handle_browser_error(e) except Exception as e: error_msg = f'Failed to click element {params.index}: {str(e)}' return ActionResult(error=error_msg) @self.registry.action( '', param_model=InputTextAction, ) async def input( params: InputTextAction, browser_session: BrowserSession, has_sensitive_data: bool = False, sensitive_data: dict[str, str | dict[str, str]] | None = None, ): # Look up the node from the selector map node = await browser_session.get_element_by_index(params.index) if node is None: msg = f'Element index {params.index} not available - page may have changed. Try refreshing browser state.' logger.warning(f'⚠️ {msg}') return ActionResult(extracted_content=msg) # Highlight the element being typed into (truly non-blocking) asyncio.create_task(browser_session.highlight_interaction_element(node)) # Dispatch type text event with node try: # Detect which sensitive key is being used sensitive_key_name = None if has_sensitive_data and sensitive_data: sensitive_key_name = _detect_sensitive_key_name(params.text, sensitive_data) event = browser_session.event_bus.dispatch( TypeTextEvent( node=node, text=params.text, clear=params.clear, is_sensitive=has_sensitive_data, sensitive_key_name=sensitive_key_name, ) ) await event input_metadata = await event.event_result(raise_if_any=True, raise_if_none=False) # Create message with sensitive data handling if has_sensitive_data: if sensitive_key_name: msg = f'Typed {sensitive_key_name}' log_msg = f'Typed <{sensitive_key_name}>' else: msg = 'Typed sensitive data' log_msg = 'Typed ' else: msg = f"Typed '{params.text}'" log_msg = f"Typed '{params.text}'" logger.debug(log_msg) # Include input coordinates in metadata if available return ActionResult( extracted_content=msg, long_term_memory=msg, metadata=input_metadata if isinstance(input_metadata, dict) else None, ) except BrowserError as e: return handle_browser_error(e) except Exception as e: # Log the full error for debugging logger.error(f'Failed to dispatch TypeTextEvent: {type(e).__name__}: {e}') error_msg = f'Failed to type text into element {params.index}: {e}' return ActionResult(error=error_msg) @self.registry.action( '', param_model=UploadFileAction, ) async def upload_file( params: UploadFileAction, browser_session: BrowserSession, available_file_paths: list[str], file_system: FileSystem ): # Check if file is in available_file_paths (user-provided or downloaded files) # For remote browsers (is_local=False), we allow absolute remote paths even if not tracked locally if params.path not in available_file_paths: # Also check if it's a recently downloaded file that might not be in available_file_paths yet downloaded_files = browser_session.downloaded_files if params.path not in downloaded_files: # Finally, check if it's a file in the FileSystem service if file_system and file_system.get_dir(): # Check if the file is actually managed by the FileSystem service # The path should be just the filename for FileSystem files file_obj = file_system.get_file(params.path) if file_obj: # File is managed by FileSystem, construct the full path file_system_path = str(file_system.get_dir() / params.path) params = UploadFileAction(index=params.index, path=file_system_path) else: # If browser is remote, allow passing a remote-accessible absolute path if not browser_session.is_local: pass else: msg = f'File path {params.path} is not available. To fix: The user must add this file path to the available_file_paths parameter when creating the Agent. Example: Agent(task="...", llm=llm, browser=browser, available_file_paths=["{params.path}"])' logger.error(f'❌ {msg}') return ActionResult(error=msg) else: # If browser is remote, allow passing a remote-accessible absolute path if not browser_session.is_local: pass else: msg = f'File path {params.path} is not available. To fix: The user must add this file path to the available_file_paths parameter when creating the Agent. Example: Agent(task="...", llm=llm, browser=browser, available_file_paths=["{params.path}"])' raise BrowserError(message=msg, long_term_memory=msg) # For local browsers, ensure the file exists on the local filesystem if browser_session.is_local: if not os.path.exists(params.path): msg = f'File {params.path} does not exist' return ActionResult(error=msg) # Get the selector map to find the node selector_map = await browser_session.get_selector_map() if params.index not in selector_map: msg = f'Element with index {params.index} does not exist.' return ActionResult(error=msg) node = selector_map[params.index] # Helper function to find file input near the selected element def find_file_input_near_element( node: EnhancedDOMTreeNode, max_height: int = 3, max_descendant_depth: int = 3 ) -> EnhancedDOMTreeNode | None: """Find the closest file input to the selected element.""" def find_file_input_in_descendants(n: EnhancedDOMTreeNode, depth: int) -> EnhancedDOMTreeNode | None: if depth < 0: return None if browser_session.is_file_input(n): return n for child in n.children_nodes or []: result = find_file_input_in_descendants(child, depth - 1) if result: return result return None current = node for _ in range(max_height + 1): # Check the current node itself if browser_session.is_file_input(current): return current # Check all descendants of the current node result = find_file_input_in_descendants(current, max_descendant_depth) if result: return result # Check all siblings and their descendants if current.parent_node: for sibling in current.parent_node.children_nodes or []: if sibling is current: continue if browser_session.is_file_input(sibling): return sibling result = find_file_input_in_descendants(sibling, max_descendant_depth) if result: return result current = current.parent_node if not current: break return None # Try to find a file input element near the selected element file_input_node = find_file_input_near_element(node) # Highlight the file input element if found (truly non-blocking) if file_input_node: asyncio.create_task(browser_session.highlight_interaction_element(file_input_node)) # If not found near the selected element, fallback to finding the closest file input to current scroll position if file_input_node is None: logger.info( f'No file upload element found near index {params.index}, searching for closest file input to scroll position' ) # Get current scroll position cdp_session = await browser_session.get_or_create_cdp_session() try: scroll_info = await cdp_session.cdp_client.send.Runtime.evaluate( params={'expression': 'window.scrollY || window.pageYOffset || 0'}, session_id=cdp_session.session_id ) current_scroll_y = scroll_info.get('result', {}).get('value', 0) except Exception: current_scroll_y = 0 # Find all file inputs in the selector map and pick the closest one to scroll position closest_file_input = None min_distance = float('inf') for idx, element in selector_map.items(): if browser_session.is_file_input(element): # Get element's Y position if element.absolute_position: element_y = element.absolute_position.y distance = abs(element_y - current_scroll_y) if distance < min_distance: min_distance = distance closest_file_input = element if closest_file_input: file_input_node = closest_file_input logger.info(f'Found file input closest to scroll position (distance: {min_distance}px)') # Highlight the fallback file input element (truly non-blocking) asyncio.create_task(browser_session.highlight_interaction_element(file_input_node)) else: msg = 'No file upload element found on the page' logger.error(msg) raise BrowserError(msg) # TODO: figure out why this fails sometimes + add fallback hail mary, just look for any file input on page # Dispatch upload file event with the file input node try: event = browser_session.event_bus.dispatch(UploadFileEvent(node=file_input_node, file_path=params.path)) await event await event.event_result(raise_if_any=True, raise_if_none=False) msg = f'Successfully uploaded file to index {params.index}' logger.info(f'📁 {msg}') return ActionResult( extracted_content=msg, long_term_memory=f'Uploaded file {params.path} to element {params.index}', ) except Exception as e: logger.error(f'Failed to upload file: {e}') raise BrowserError(f'Failed to upload file: {e}') # Tab Management Actions @self.registry.action( 'Switch to another open tab by tab_id. Tab IDs are shown in browser state tabs list (last 4 chars of target_id). Use when you need to work with content in a different tab.', param_model=SwitchTabAction, ) async def switch(params: SwitchTabAction, browser_session: BrowserSession): # Simple switch tab logic try: target_id = await browser_session.get_target_id_from_tab_id(params.tab_id) event = browser_session.event_bus.dispatch(SwitchTabEvent(target_id=target_id)) await event new_target_id = await event.event_result(raise_if_any=False, raise_if_none=False) # Don't raise on errors if new_target_id: memory = f'Switched to tab #{new_target_id[-4:]}' else: memory = f'Switched to tab #{params.tab_id}' logger.info(f'🔄 {memory}') return ActionResult(extracted_content=memory, long_term_memory=memory) except Exception as e: logger.warning(f'Tab switch may have failed: {e}') memory = f'Attempted to switch to tab #{params.tab_id}' return ActionResult(extracted_content=memory, long_term_memory=memory) @self.registry.action( 'Close a tab by tab_id. Tab IDs are shown in browser state tabs list (last 4 chars of target_id). Use to clean up tabs you no longer need.', param_model=CloseTabAction, ) async def close(params: CloseTabAction, browser_session: BrowserSession): # Simple close tab logic try: target_id = await browser_session.get_target_id_from_tab_id(params.tab_id) # Dispatch close tab event - handle stale target IDs gracefully event = browser_session.event_bus.dispatch(CloseTabEvent(target_id=target_id)) await event await event.event_result(raise_if_any=False, raise_if_none=False) # Don't raise on errors memory = f'Closed tab #{params.tab_id}' logger.info(f'🗑️ {memory}') return ActionResult( extracted_content=memory, long_term_memory=memory, ) except Exception as e: # Handle stale target IDs gracefully logger.warning(f'Tab {params.tab_id} may already be closed: {e}') memory = f'Tab #{params.tab_id} closed (was already closed or invalid)' return ActionResult( extracted_content=memory, long_term_memory=memory, ) # Content Actions # TODO: Refactor to use events instead of direct page access # This action is temporarily disabled as it needs refactoring to use events @self.registry.action( """LLM extracts structured data from page markdown. Use when: on right page, know what to extract, haven't called before on same page+query. Can't get interactive elements. Set extract_links=True for URLs. Use start_from_char if truncated. If fails, use find_text instead.""", ) async def extract( params: ExtractAction, browser_session: BrowserSession, page_extraction_llm: BaseChatModel, file_system: FileSystem, ): # Constants MAX_CHAR_LIMIT = 30000 query = params['query'] if isinstance(params, dict) else params.query extract_links = params['extract_links'] if isinstance(params, dict) else params.extract_links start_from_char = params['start_from_char'] if isinstance(params, dict) else params.start_from_char # Extract clean markdown using the unified method try: from browser_use.dom.markdown_extractor import extract_clean_markdown content, content_stats = await extract_clean_markdown( browser_session=browser_session, extract_links=extract_links ) except Exception as e: raise RuntimeError(f'Could not extract clean markdown: {type(e).__name__}') # Original content length for processing final_filtered_length = content_stats['final_filtered_chars'] if start_from_char > 0: if start_from_char >= len(content): return ActionResult( error=f'start_from_char ({start_from_char}) exceeds content length {final_filtered_length} characters.' ) content = content[start_from_char:] content_stats['started_from_char'] = start_from_char # Smart truncation with context preservation truncated = False if len(content) > MAX_CHAR_LIMIT: # Try to truncate at a natural break point (paragraph, sentence) truncate_at = MAX_CHAR_LIMIT # Look for paragraph break within last 500 chars of limit paragraph_break = content.rfind('\n\n', MAX_CHAR_LIMIT - 500, MAX_CHAR_LIMIT) if paragraph_break > 0: truncate_at = paragraph_break else: # Look for sentence break within last 200 chars of limit sentence_break = content.rfind('.', MAX_CHAR_LIMIT - 200, MAX_CHAR_LIMIT) if sentence_break > 0: truncate_at = sentence_break + 1 content = content[:truncate_at] truncated = True next_start = (start_from_char or 0) + truncate_at content_stats['truncated_at_char'] = truncate_at content_stats['next_start_char'] = next_start # Add content statistics to the result original_html_length = content_stats['original_html_chars'] initial_markdown_length = content_stats['initial_markdown_chars'] chars_filtered = content_stats['filtered_chars_removed'] stats_summary = f"""Content processed: {original_html_length:,} HTML chars → {initial_markdown_length:,} initial markdown → {final_filtered_length:,} filtered markdown""" if start_from_char > 0: stats_summary += f' (started from char {start_from_char:,})' if truncated: stats_summary += f' → {len(content):,} final chars (truncated, use start_from_char={content_stats["next_start_char"]} to continue)' elif chars_filtered > 0: stats_summary += f' (filtered {chars_filtered:,} chars of noise)' system_prompt = """ You are an expert at extracting data from the markdown of a webpage. You will be given a query and the markdown of a webpage that has been filtered to remove noise and advertising content. - You are tasked to extract information from the webpage that is relevant to the query. - You should ONLY use the information available in the webpage to answer the query. Do not make up information or provide guess from your own knowledge. - If the information relevant to the query is not available in the page, your response should mention that. - If the query asks for all items, products, etc., make sure to directly list all of them. - If the content was truncated and you need more information, note that the user can use start_from_char parameter to continue from where truncation occurred. - Your output should present ALL the information relevant to the query in a concise way. - Do not answer in conversational format - directly output the relevant information or that the information is unavailable. """.strip() prompt = f'\n{query}\n\n\n\n{stats_summary}\n\n\n\n{content}\n' try: response = await asyncio.wait_for( page_extraction_llm.ainvoke([SystemMessage(content=system_prompt), UserMessage(content=prompt)]), timeout=120.0, ) current_url = await browser_session.get_current_page_url() extracted_content = ( f'\n{current_url}\n\n\n{query}\n\n\n{response.completion}\n' ) # Simple memory handling MAX_MEMORY_LENGTH = 1000 if len(extracted_content) < MAX_MEMORY_LENGTH: memory = extracted_content include_extracted_content_only_once = False else: file_name = await file_system.save_extracted_content(extracted_content) memory = f'Query: {query}\nContent in {file_name} and once in .' include_extracted_content_only_once = True logger.info(f'📄 {memory}') return ActionResult( extracted_content=extracted_content, include_extracted_content_only_once=include_extracted_content_only_once, long_term_memory=memory, ) except Exception as e: logger.debug(f'Error extracting content: {e}') raise RuntimeError(str(e)) @self.registry.action( """Scroll by pages (down=True/False, pages=0.5-10.0, default 1.0). Use index for scroll containers (dropdowns/custom UI). High pages (10) reaches bottom. Multi-page scrolls sequentially. Viewport-based height, fallback 1000px/page.""", param_model=ScrollAction, ) async def scroll(params: ScrollAction, browser_session: BrowserSession): try: # Look up the node from the selector map if index is provided # Special case: index 0 means scroll the whole page (root/body element) node = None if params.index is not None and params.index != 0: node = await browser_session.get_element_by_index(params.index) if node is None: # Element does not exist msg = f'Element index {params.index} not found in browser state' return ActionResult(error=msg) direction = 'down' if params.down else 'up' target = f'element {params.index}' if params.index is not None and params.index != 0 else '' # Get actual viewport height for more accurate scrolling try: cdp_session = await browser_session.get_or_create_cdp_session() metrics = await cdp_session.cdp_client.send.Page.getLayoutMetrics(session_id=cdp_session.session_id) # Use cssVisualViewport for the most accurate representation css_viewport = metrics.get('cssVisualViewport', {}) css_layout_viewport = metrics.get('cssLayoutViewport', {}) # Get viewport height, prioritizing cssVisualViewport viewport_height = int(css_viewport.get('clientHeight') or css_layout_viewport.get('clientHeight', 1000)) logger.debug(f'Detected viewport height: {viewport_height}px') except Exception as e: viewport_height = 1000 # Fallback to 1000px logger.debug(f'Failed to get viewport height, using fallback 1000px: {e}') # For multiple pages (>=1.0), scroll one page at a time to ensure each scroll completes if params.pages >= 1.0: import asyncio num_full_pages = int(params.pages) remaining_fraction = params.pages - num_full_pages completed_scrolls = 0 # Scroll one page at a time for i in range(num_full_pages): try: pixels = viewport_height # Use actual viewport height if not params.down: pixels = -pixels event = browser_session.event_bus.dispatch( ScrollEvent(direction=direction, amount=abs(pixels), node=node) ) await event await event.event_result(raise_if_any=True, raise_if_none=False) completed_scrolls += 1 # Small delay to ensure scroll completes before next one await asyncio.sleep(0.3) except Exception as e: logger.warning(f'Scroll {i + 1}/{num_full_pages} failed: {e}') # Continue with remaining scrolls even if one fails # Handle fractional page if present if remaining_fraction > 0: try: pixels = int(remaining_fraction * viewport_height) if not params.down: pixels = -pixels event = browser_session.event_bus.dispatch( ScrollEvent(direction=direction, amount=abs(pixels), node=node) ) await event await event.event_result(raise_if_any=True, raise_if_none=False) completed_scrolls += remaining_fraction except Exception as e: logger.warning(f'Fractional scroll failed: {e}') if params.pages == 1.0: long_term_memory = f'Scrolled {direction} {target} {viewport_height}px'.replace(' ', ' ') else: long_term_memory = f'Scrolled {direction} {target} {completed_scrolls:.1f} pages'.replace(' ', ' ') else: # For fractional pages <1.0, do single scroll pixels = int(params.pages * viewport_height) event = browser_session.event_bus.dispatch( ScrollEvent(direction='down' if params.down else 'up', amount=pixels, node=node) ) await event await event.event_result(raise_if_any=True, raise_if_none=False) long_term_memory = f'Scrolled {direction} {target} {params.pages} pages'.replace(' ', ' ') msg = f'🔍 {long_term_memory}' logger.info(msg) return ActionResult(extracted_content=msg, long_term_memory=long_term_memory) except Exception as e: logger.error(f'Failed to dispatch ScrollEvent: {type(e).__name__}: {e}') error_msg = 'Failed to execute scroll action.' return ActionResult(error=error_msg) @self.registry.action( '', param_model=SendKeysAction, ) async def send_keys(params: SendKeysAction, browser_session: BrowserSession): # Dispatch send keys event try: event = browser_session.event_bus.dispatch(SendKeysEvent(keys=params.keys)) await event await event.event_result(raise_if_any=True, raise_if_none=False) memory = f'Sent keys: {params.keys}' msg = f'⌨️ {memory}' logger.info(msg) return ActionResult(extracted_content=memory, long_term_memory=memory) except Exception as e: logger.error(f'Failed to dispatch SendKeysEvent: {type(e).__name__}: {e}') error_msg = f'Failed to send keys: {str(e)}' return ActionResult(error=error_msg) @self.registry.action('Scroll to text.') async def find_text(text: str, browser_session: BrowserSession): # type: ignore # Dispatch scroll to text event event = browser_session.event_bus.dispatch(ScrollToTextEvent(text=text)) try: # The handler returns None on success or raises an exception if text not found await event.event_result(raise_if_any=True, raise_if_none=False) memory = f'Scrolled to text: {text}' msg = f'🔍 {memory}' logger.info(msg) return ActionResult(extracted_content=memory, long_term_memory=memory) except Exception as e: # Text not found msg = f"Text '{text}' not found or not visible on page" logger.info(msg) return ActionResult( extracted_content=msg, long_term_memory=f"Tried scrolling to text '{text}' but it was not found", ) @self.registry.action( 'Get a screenshot of the current viewport. Use when: visual inspection needed, layout unclear, element positions uncertain, debugging UI issues, or verifying page state. Screenshot is included in the next browser_state No parameters are needed.', param_model=NoParamsAction, ) async def screenshot(_: NoParamsAction): """Request that a screenshot be included in the next observation""" memory = 'Requested screenshot for next observation' msg = f'📸 {memory}' logger.info(msg) # Return flag in metadata to signal that screenshot should be included return ActionResult( extracted_content=memory, metadata={'include_screenshot': True}, ) # Dropdown Actions @self.registry.action( '', param_model=GetDropdownOptionsAction, ) async def dropdown_options(params: GetDropdownOptionsAction, browser_session: BrowserSession): """Get all options from a native dropdown or ARIA menu""" # Look up the node from the selector map node = await browser_session.get_element_by_index(params.index) if node is None: msg = f'Element index {params.index} not available - page may have changed. Try refreshing browser state.' logger.warning(f'⚠️ {msg}') return ActionResult(extracted_content=msg) # Dispatch GetDropdownOptionsEvent to the event handler event = browser_session.event_bus.dispatch(GetDropdownOptionsEvent(node=node)) dropdown_data = await event.event_result(timeout=3.0, raise_if_none=True, raise_if_any=True) if not dropdown_data: raise ValueError('Failed to get dropdown options - no data returned') # Use structured memory from the handler return ActionResult( extracted_content=dropdown_data['short_term_memory'], long_term_memory=dropdown_data['long_term_memory'], include_extracted_content_only_once=True, ) @self.registry.action( 'Set the option of a