"""Page class for page-level operations.""" from typing import TYPE_CHECKING, TypeVar from pydantic import BaseModel from browser_use.actor.utils import get_key_info from browser_use.dom.serializer.serializer import DOMTreeSerializer from browser_use.dom.service import DomService from browser_use.llm.messages import SystemMessage, UserMessage T = TypeVar('T', bound=BaseModel) if TYPE_CHECKING: from cdp_use.cdp.dom.commands import ( DescribeNodeParameters, QuerySelectorAllParameters, ) from cdp_use.cdp.emulation.commands import SetDeviceMetricsOverrideParameters from cdp_use.cdp.input.commands import ( DispatchKeyEventParameters, ) from cdp_use.cdp.page.commands import CaptureScreenshotParameters, NavigateParameters, NavigateToHistoryEntryParameters from cdp_use.cdp.runtime.commands import EvaluateParameters from cdp_use.cdp.target.commands import ( AttachToTargetParameters, GetTargetInfoParameters, ) from cdp_use.cdp.target.types import TargetInfo from browser_use.browser.session import BrowserSession from browser_use.llm.base import BaseChatModel from .element import Element from .mouse import Mouse class Page: """Page operations (tab or iframe).""" def __init__( self, browser_session: 'BrowserSession', target_id: str, session_id: str | None = None, llm: 'BaseChatModel | None' = None ): self._browser_session = browser_session self._client = browser_session.cdp_client self._target_id = target_id self._session_id: str | None = session_id self._mouse: 'Mouse | None' = None self._llm = llm async def _ensure_session(self) -> str: """Ensure we have a session ID for this target.""" if not self._session_id: params: 'AttachToTargetParameters' = {'targetId': self._target_id, 'flatten': True} result = await self._client.send.Target.attachToTarget(params) self._session_id = result['sessionId'] # Enable necessary domains import asyncio await asyncio.gather( self._client.send.Page.enable(session_id=self._session_id), self._client.send.DOM.enable(session_id=self._session_id), self._client.send.Runtime.enable(session_id=self._session_id), self._client.send.Network.enable(session_id=self._session_id), ) return self._session_id @property async def session_id(self) -> str: """Get the session ID for this target. @dev Pass this to an arbitrary CDP call """ return await self._ensure_session() @property async def mouse(self) -> 'Mouse': """Get the mouse interface for this target.""" if not self._mouse: session_id = await self._ensure_session() from .mouse import Mouse self._mouse = Mouse(self._browser_session, session_id, self._target_id) return self._mouse async def reload(self) -> None: """Reload the target.""" session_id = await self._ensure_session() await self._client.send.Page.reload(session_id=session_id) async def get_element(self, backend_node_id: int) -> 'Element': """Get an element by its backend node ID.""" session_id = await self._ensure_session() from .element import Element as Element_ return Element_(self._browser_session, backend_node_id, session_id) async def evaluate(self, page_function: str, *args) -> str: """Execute JavaScript in the target. Args: page_function: JavaScript code that MUST start with (...args) => format *args: Arguments to pass to the function Returns: String representation of the JavaScript execution result. Objects and arrays are JSON-stringified. """ session_id = await self._ensure_session() # Clean and fix common JavaScript string parsing issues page_function = self._fix_javascript_string(page_function) # Enforce arrow function format if not (page_function.startswith('(') and '=>' in page_function): raise ValueError(f'JavaScript code must start with (...args) => format. Got: {page_function[:50]}...') # Build the expression - call the arrow function with provided args if args: # Convert args to JSON representation for safe passing import json arg_strs = [json.dumps(arg) for arg in args] expression = f'({page_function})({", ".join(arg_strs)})' else: expression = f'({page_function})()' # Debug: print the actual expression being evaluated print(f'DEBUG: Evaluating JavaScript: {repr(expression)}') params: 'EvaluateParameters' = {'expression': expression, 'returnByValue': True, 'awaitPromise': True} result = await self._client.send.Runtime.evaluate( params, session_id=session_id, ) if 'exceptionDetails' in result: raise RuntimeError(f'JavaScript evaluation failed: {result["exceptionDetails"]}') value = result.get('result', {}).get('value') # Always return string representation if value is None: return '' elif isinstance(value, str): return value else: # Convert objects, numbers, booleans to string import json try: return json.dumps(value) if isinstance(value, (dict, list)) else str(value) except (TypeError, ValueError): return str(value) def _fix_javascript_string(self, js_code: str) -> str: """Fix common JavaScript string parsing issues when written as Python string.""" # Just do minimal, safe cleaning js_code = js_code.strip() # Only fix the most common and safe issues: # 1. Remove obvious Python string wrapper quotes if they exist if (js_code.startswith('"') and js_code.endswith('"')) or (js_code.startswith("'") and js_code.endswith("'")): # Check if it's a wrapped string (not part of JS syntax) inner = js_code[1:-1] if inner.count('"') + inner.count("'") == 0 or '() =>' in inner: js_code = inner # 2. Only fix clearly escaped quotes that shouldn't be # But be very conservative - only if we're sure it's a Python string artifact if '\\"' in js_code and js_code.count('\\"') > js_code.count('"'): js_code = js_code.replace('\\"', '"') if "\\'" in js_code and js_code.count("\\'") > js_code.count("'"): js_code = js_code.replace("\\'", "'") # 3. Basic whitespace normalization only js_code = js_code.strip() # Final validation - ensure it's not empty if not js_code: raise ValueError('JavaScript code is empty after cleaning') return js_code async def screenshot(self, format: str = 'jpeg', quality: int | None = None) -> str: """Take a screenshot and return base64 encoded image. Args: format: Image format ('jpeg', 'png', 'webp') quality: Quality 0-100 for JPEG format Returns: Base64-encoded image data """ session_id = await self._ensure_session() params: 'CaptureScreenshotParameters' = {'format': format} if quality is not None and format.lower() == 'jpeg': params['quality'] = quality result = await self._client.send.Page.captureScreenshot(params, session_id=session_id) return result['data'] async def press(self, key: str) -> None: """Press a key on the page (sends keyboard input to the focused element or page).""" session_id = await self._ensure_session() # Handle key combinations like "Control+A" if '+' in key: parts = key.split('+') modifiers = parts[:-1] main_key = parts[-1] # Calculate modifier bitmask modifier_value = 0 modifier_map = {'Alt': 1, 'Control': 2, 'Meta': 4, 'Shift': 8} for mod in modifiers: modifier_value |= modifier_map.get(mod, 0) # Press modifier keys for mod in modifiers: code, vk_code = get_key_info(mod) params: 'DispatchKeyEventParameters' = {'type': 'keyDown', 'key': mod, 'code': code} if vk_code is not None: params['windowsVirtualKeyCode'] = vk_code await self._client.send.Input.dispatchKeyEvent(params, session_id=session_id) # Press main key with modifiers bitmask main_code, main_vk_code = get_key_info(main_key) main_down_params: 'DispatchKeyEventParameters' = { 'type': 'keyDown', 'key': main_key, 'code': main_code, 'modifiers': modifier_value, } if main_vk_code is not None: main_down_params['windowsVirtualKeyCode'] = main_vk_code await self._client.send.Input.dispatchKeyEvent(main_down_params, session_id=session_id) main_up_params: 'DispatchKeyEventParameters' = { 'type': 'keyUp', 'key': main_key, 'code': main_code, 'modifiers': modifier_value, } if main_vk_code is not None: main_up_params['windowsVirtualKeyCode'] = main_vk_code await self._client.send.Input.dispatchKeyEvent(main_up_params, session_id=session_id) # Release modifier keys for mod in reversed(modifiers): code, vk_code = get_key_info(mod) release_params: 'DispatchKeyEventParameters' = {'type': 'keyUp', 'key': mod, 'code': code} if vk_code is not None: release_params['windowsVirtualKeyCode'] = vk_code await self._client.send.Input.dispatchKeyEvent(release_params, session_id=session_id) else: # Simple key press code, vk_code = get_key_info(key) key_down_params: 'DispatchKeyEventParameters' = {'type': 'keyDown', 'key': key, 'code': code} if vk_code is not None: key_down_params['windowsVirtualKeyCode'] = vk_code await self._client.send.Input.dispatchKeyEvent(key_down_params, session_id=session_id) key_up_params: 'DispatchKeyEventParameters' = {'type': 'keyUp', 'key': key, 'code': code} if vk_code is not None: key_up_params['windowsVirtualKeyCode'] = vk_code await self._client.send.Input.dispatchKeyEvent(key_up_params, session_id=session_id) async def set_viewport_size(self, width: int, height: int) -> None: """Set the viewport size.""" session_id = await self._ensure_session() params: 'SetDeviceMetricsOverrideParameters' = { 'width': width, 'height': height, 'deviceScaleFactor': 1.0, 'mobile': False, } await self._client.send.Emulation.setDeviceMetricsOverride( params, session_id=session_id, ) # Target properties (from CDP getTargetInfo) async def get_target_info(self) -> 'TargetInfo': """Get target information.""" params: 'GetTargetInfoParameters' = {'targetId': self._target_id} result = await self._client.send.Target.getTargetInfo(params) return result['targetInfo'] async def get_url(self) -> str: """Get the current URL.""" info = await self.get_target_info() return info.get('url', '') async def get_title(self) -> str: """Get the current title.""" info = await self.get_target_info() return info.get('title', '') async def goto(self, url: str) -> None: """Navigate this target to a URL.""" session_id = await self._ensure_session() params: 'NavigateParameters' = {'url': url} await self._client.send.Page.navigate(params, session_id=session_id) async def navigate(self, url: str) -> None: """Alias for goto.""" await self.goto(url) async def go_back(self) -> None: """Navigate back in history.""" session_id = await self._ensure_session() try: # Get navigation history history = await self._client.send.Page.getNavigationHistory(session_id=session_id) current_index = history['currentIndex'] entries = history['entries'] # Check if we can go back if current_index <= 0: raise RuntimeError('Cannot go back - no previous entry in history') # Navigate to the previous entry previous_entry_id = entries[current_index - 1]['id'] params: 'NavigateToHistoryEntryParameters' = {'entryId': previous_entry_id} await self._client.send.Page.navigateToHistoryEntry(params, session_id=session_id) except Exception as e: raise RuntimeError(f'Failed to navigate back: {e}') async def go_forward(self) -> None: """Navigate forward in history.""" session_id = await self._ensure_session() try: # Get navigation history history = await self._client.send.Page.getNavigationHistory(session_id=session_id) current_index = history['currentIndex'] entries = history['entries'] # Check if we can go forward if current_index >= len(entries) - 1: raise RuntimeError('Cannot go forward - no next entry in history') # Navigate to the next entry next_entry_id = entries[current_index + 1]['id'] params: 'NavigateToHistoryEntryParameters' = {'entryId': next_entry_id} await self._client.send.Page.navigateToHistoryEntry(params, session_id=session_id) except Exception as e: raise RuntimeError(f'Failed to navigate forward: {e}') # Element finding methods (these would need to be implemented based on DOM queries) async def get_elements_by_css_selector(self, selector: str) -> list['Element']: """Get elements by CSS selector.""" session_id = await self._ensure_session() # Get document first doc_result = await self._client.send.DOM.getDocument(session_id=session_id) document_node_id = doc_result['root']['nodeId'] # Query selector all query_params: 'QuerySelectorAllParameters' = {'nodeId': document_node_id, 'selector': selector} result = await self._client.send.DOM.querySelectorAll(query_params, session_id=session_id) elements = [] from .element import Element as Element_ # Convert node IDs to backend node IDs for node_id in result['nodeIds']: # Get backend node ID describe_params: 'DescribeNodeParameters' = {'nodeId': node_id} node_result = await self._client.send.DOM.describeNode(describe_params, session_id=session_id) backend_node_id = node_result['node']['backendNodeId'] elements.append(Element_(self._browser_session, backend_node_id, session_id)) return elements # AI METHODS @property def dom_service(self) -> 'DomService': """Get the DOM service for this target.""" return DomService(self._browser_session) async def get_element_by_prompt(self, prompt: str, llm: 'BaseChatModel | None' = None) -> 'Element | None': """Get an element by a prompt.""" await self._ensure_session() llm = llm or self._llm if not llm: raise ValueError('LLM not provided') dom_service = self.dom_service enhanced_dom_tree = await dom_service.get_dom_tree(target_id=self._target_id) serialized_dom_state, _ = DOMTreeSerializer( enhanced_dom_tree, None, paint_order_filtering=True ).serialize_accessible_elements() llm_representation = serialized_dom_state.llm_representation() system_message = SystemMessage( content="""You are an AI created to find an element on a page by a prompt. Interactive Elements: All interactive elements will be provided in format as [index]text where - index: Numeric identifier for interaction - type: HTML element type (button, input, etc.) - text: Element description Examples: [33]
User form
[35] Note that: - Only elements with numeric indexes in [] are interactive - (stacked) indentation (with \t) is important and means that the element is a (html) child of the element above (with a lower index) - Pure text elements without [] are not interactive.
Your task is to find an element index (if any) that matches the prompt (written in tag). If non of the elements matches the, return None. Before you return the element index, reason about the state and elements for a sentence or two.""" ) state_message = UserMessage( content=f""" {llm_representation} {prompt} """ ) class ElementResponse(BaseModel): # thinking: str element_highlight_index: int | None llm_response = await llm.ainvoke( [ system_message, state_message, ], output_format=ElementResponse, ) element_highlight_index = llm_response.completion.element_highlight_index if element_highlight_index is None or element_highlight_index not in serialized_dom_state.selector_map: return None element = serialized_dom_state.selector_map[element_highlight_index] from .element import Element as Element_ return Element_(self._browser_session, element.backend_node_id, self._session_id) async def must_get_element_by_prompt(self, prompt: str, llm: 'BaseChatModel | None' = None) -> 'Element': """Get an element by a prompt. @dev LLM can still return None, this just raises an error if the element is not found. """ element = await self.get_element_by_prompt(prompt, llm) if element is None: raise ValueError(f'No element found for prompt: {prompt}') return element async def extract_content(self, prompt: str, structured_output: type[T], llm: 'BaseChatModel | None' = None) -> T: """Extract structured content from the current page using LLM. Extracts clean markdown from the page and sends it to LLM for structured data extraction. Args: prompt: Description of what content to extract structured_output: Pydantic BaseModel class defining the expected output structure llm: Language model to use for extraction Returns: The structured BaseModel instance with extracted content """ llm = llm or self._llm if not llm: raise ValueError('LLM not provided') # Extract clean markdown using the same method as in tools/service.py try: content, content_stats = await self._extract_clean_markdown() except Exception as e: raise RuntimeError(f'Could not extract clean markdown: {type(e).__name__}') # System prompt for structured extraction system_prompt = """ You are an expert at extracting structured data from the markdown of a webpage. You will be given a query and the markdown of a webpage that has been filtered to remove noise and advertising content. - You are tasked to extract information from the webpage that is relevant to the query. - You should ONLY use the information available in the webpage to answer the query. Do not make up information or provide guess from your own knowledge. - If the information relevant to the query is not available in the page, your response should mention that. - If the query asks for all items, products, etc., make sure to directly list all of them. - Return the extracted content in the exact structured format specified. - Your output should present ALL the information relevant to the query in the specified structured format. - Do not answer in conversational format - directly output the relevant information in the structured format. """.strip() # Build prompt with just query and content prompt_content = f'\n{prompt}\n\n\n\n{content}\n' # Send to LLM with structured output import asyncio try: response = await asyncio.wait_for( llm.ainvoke( [SystemMessage(content=system_prompt), UserMessage(content=prompt_content)], output_format=structured_output ), timeout=120.0, ) # Return the structured output BaseModel instance return response.completion except Exception as e: raise RuntimeError(str(e)) async def _extract_clean_markdown(self, extract_links: bool = False) -> tuple[str, dict]: """Extract clean markdown from the current page using enhanced DOM tree. Uses the shared markdown extractor for consistency with tools/service.py. """ from browser_use.dom.markdown_extractor import extract_clean_markdown dom_service = self.dom_service return await extract_clean_markdown(dom_service=dom_service, target_id=self._target_id, extract_links=extract_links)