"""Page class for page-level operations."""
from typing import TYPE_CHECKING, TypeVar
from pydantic import BaseModel
from browser_use.actor.utils import get_key_info
from browser_use.dom.serializer.serializer import DOMTreeSerializer
from browser_use.dom.service import DomService
from browser_use.llm.messages import SystemMessage, UserMessage
T = TypeVar('T', bound=BaseModel)
if TYPE_CHECKING:
from cdp_use.cdp.dom.commands import (
DescribeNodeParameters,
QuerySelectorAllParameters,
)
from cdp_use.cdp.emulation.commands import SetDeviceMetricsOverrideParameters
from cdp_use.cdp.input.commands import (
DispatchKeyEventParameters,
)
from cdp_use.cdp.page.commands import CaptureScreenshotParameters, NavigateParameters, NavigateToHistoryEntryParameters
from cdp_use.cdp.runtime.commands import EvaluateParameters
from cdp_use.cdp.target.commands import (
AttachToTargetParameters,
GetTargetInfoParameters,
)
from cdp_use.cdp.target.types import TargetInfo
from browser_use.browser.session import BrowserSession
from browser_use.llm.base import BaseChatModel
from .element import Element
from .mouse import Mouse
class Page:
"""Page operations (tab or iframe)."""
def __init__(
self, browser_session: 'BrowserSession', target_id: str, session_id: str | None = None, llm: 'BaseChatModel | None' = None
):
self._browser_session = browser_session
self._client = browser_session.cdp_client
self._target_id = target_id
self._session_id: str | None = session_id
self._mouse: 'Mouse | None' = None
self._llm = llm
async def _ensure_session(self) -> str:
"""Ensure we have a session ID for this target."""
if not self._session_id:
params: 'AttachToTargetParameters' = {'targetId': self._target_id, 'flatten': True}
result = await self._client.send.Target.attachToTarget(params)
self._session_id = result['sessionId']
# Enable necessary domains
import asyncio
await asyncio.gather(
self._client.send.Page.enable(session_id=self._session_id),
self._client.send.DOM.enable(session_id=self._session_id),
self._client.send.Runtime.enable(session_id=self._session_id),
self._client.send.Network.enable(session_id=self._session_id),
)
return self._session_id
@property
async def session_id(self) -> str:
"""Get the session ID for this target.
@dev Pass this to an arbitrary CDP call
"""
return await self._ensure_session()
@property
async def mouse(self) -> 'Mouse':
"""Get the mouse interface for this target."""
if not self._mouse:
session_id = await self._ensure_session()
from .mouse import Mouse
self._mouse = Mouse(self._browser_session, session_id, self._target_id)
return self._mouse
async def reload(self) -> None:
"""Reload the target."""
session_id = await self._ensure_session()
await self._client.send.Page.reload(session_id=session_id)
async def get_element(self, backend_node_id: int) -> 'Element':
"""Get an element by its backend node ID."""
session_id = await self._ensure_session()
from .element import Element as Element_
return Element_(self._browser_session, backend_node_id, session_id)
async def evaluate(self, page_function: str, *args) -> str:
"""Execute JavaScript in the target.
Args:
page_function: JavaScript code that MUST start with (...args) => format
*args: Arguments to pass to the function
Returns:
String representation of the JavaScript execution result.
Objects and arrays are JSON-stringified.
"""
session_id = await self._ensure_session()
# Clean and fix common JavaScript string parsing issues
page_function = self._fix_javascript_string(page_function)
# Enforce arrow function format
if not (page_function.startswith('(') and '=>' in page_function):
raise ValueError(f'JavaScript code must start with (...args) => format. Got: {page_function[:50]}...')
# Build the expression - call the arrow function with provided args
if args:
# Convert args to JSON representation for safe passing
import json
arg_strs = [json.dumps(arg) for arg in args]
expression = f'({page_function})({", ".join(arg_strs)})'
else:
expression = f'({page_function})()'
# Debug: print the actual expression being evaluated
print(f'DEBUG: Evaluating JavaScript: {repr(expression)}')
params: 'EvaluateParameters' = {'expression': expression, 'returnByValue': True, 'awaitPromise': True}
result = await self._client.send.Runtime.evaluate(
params,
session_id=session_id,
)
if 'exceptionDetails' in result:
raise RuntimeError(f'JavaScript evaluation failed: {result["exceptionDetails"]}')
value = result.get('result', {}).get('value')
# Always return string representation
if value is None:
return ''
elif isinstance(value, str):
return value
else:
# Convert objects, numbers, booleans to string
import json
try:
return json.dumps(value) if isinstance(value, (dict, list)) else str(value)
except (TypeError, ValueError):
return str(value)
def _fix_javascript_string(self, js_code: str) -> str:
"""Fix common JavaScript string parsing issues when written as Python string."""
# Just do minimal, safe cleaning
js_code = js_code.strip()
# Only fix the most common and safe issues:
# 1. Remove obvious Python string wrapper quotes if they exist
if (js_code.startswith('"') and js_code.endswith('"')) or (js_code.startswith("'") and js_code.endswith("'")):
# Check if it's a wrapped string (not part of JS syntax)
inner = js_code[1:-1]
if inner.count('"') + inner.count("'") == 0 or '() =>' in inner:
js_code = inner
# 2. Only fix clearly escaped quotes that shouldn't be
# But be very conservative - only if we're sure it's a Python string artifact
if '\\"' in js_code and js_code.count('\\"') > js_code.count('"'):
js_code = js_code.replace('\\"', '"')
if "\\'" in js_code and js_code.count("\\'") > js_code.count("'"):
js_code = js_code.replace("\\'", "'")
# 3. Basic whitespace normalization only
js_code = js_code.strip()
# Final validation - ensure it's not empty
if not js_code:
raise ValueError('JavaScript code is empty after cleaning')
return js_code
async def screenshot(self, format: str = 'jpeg', quality: int | None = None) -> str:
"""Take a screenshot and return base64 encoded image.
Args:
format: Image format ('jpeg', 'png', 'webp')
quality: Quality 0-100 for JPEG format
Returns:
Base64-encoded image data
"""
session_id = await self._ensure_session()
params: 'CaptureScreenshotParameters' = {'format': format}
if quality is not None and format.lower() == 'jpeg':
params['quality'] = quality
result = await self._client.send.Page.captureScreenshot(params, session_id=session_id)
return result['data']
async def press(self, key: str) -> None:
"""Press a key on the page (sends keyboard input to the focused element or page)."""
session_id = await self._ensure_session()
# Handle key combinations like "Control+A"
if '+' in key:
parts = key.split('+')
modifiers = parts[:-1]
main_key = parts[-1]
# Calculate modifier bitmask
modifier_value = 0
modifier_map = {'Alt': 1, 'Control': 2, 'Meta': 4, 'Shift': 8}
for mod in modifiers:
modifier_value |= modifier_map.get(mod, 0)
# Press modifier keys
for mod in modifiers:
code, vk_code = get_key_info(mod)
params: 'DispatchKeyEventParameters' = {'type': 'keyDown', 'key': mod, 'code': code}
if vk_code is not None:
params['windowsVirtualKeyCode'] = vk_code
await self._client.send.Input.dispatchKeyEvent(params, session_id=session_id)
# Press main key with modifiers bitmask
main_code, main_vk_code = get_key_info(main_key)
main_down_params: 'DispatchKeyEventParameters' = {
'type': 'keyDown',
'key': main_key,
'code': main_code,
'modifiers': modifier_value,
}
if main_vk_code is not None:
main_down_params['windowsVirtualKeyCode'] = main_vk_code
await self._client.send.Input.dispatchKeyEvent(main_down_params, session_id=session_id)
main_up_params: 'DispatchKeyEventParameters' = {
'type': 'keyUp',
'key': main_key,
'code': main_code,
'modifiers': modifier_value,
}
if main_vk_code is not None:
main_up_params['windowsVirtualKeyCode'] = main_vk_code
await self._client.send.Input.dispatchKeyEvent(main_up_params, session_id=session_id)
# Release modifier keys
for mod in reversed(modifiers):
code, vk_code = get_key_info(mod)
release_params: 'DispatchKeyEventParameters' = {'type': 'keyUp', 'key': mod, 'code': code}
if vk_code is not None:
release_params['windowsVirtualKeyCode'] = vk_code
await self._client.send.Input.dispatchKeyEvent(release_params, session_id=session_id)
else:
# Simple key press
code, vk_code = get_key_info(key)
key_down_params: 'DispatchKeyEventParameters' = {'type': 'keyDown', 'key': key, 'code': code}
if vk_code is not None:
key_down_params['windowsVirtualKeyCode'] = vk_code
await self._client.send.Input.dispatchKeyEvent(key_down_params, session_id=session_id)
key_up_params: 'DispatchKeyEventParameters' = {'type': 'keyUp', 'key': key, 'code': code}
if vk_code is not None:
key_up_params['windowsVirtualKeyCode'] = vk_code
await self._client.send.Input.dispatchKeyEvent(key_up_params, session_id=session_id)
async def set_viewport_size(self, width: int, height: int) -> None:
"""Set the viewport size."""
session_id = await self._ensure_session()
params: 'SetDeviceMetricsOverrideParameters' = {
'width': width,
'height': height,
'deviceScaleFactor': 1.0,
'mobile': False,
}
await self._client.send.Emulation.setDeviceMetricsOverride(
params,
session_id=session_id,
)
# Target properties (from CDP getTargetInfo)
async def get_target_info(self) -> 'TargetInfo':
"""Get target information."""
params: 'GetTargetInfoParameters' = {'targetId': self._target_id}
result = await self._client.send.Target.getTargetInfo(params)
return result['targetInfo']
async def get_url(self) -> str:
"""Get the current URL."""
info = await self.get_target_info()
return info.get('url', '')
async def get_title(self) -> str:
"""Get the current title."""
info = await self.get_target_info()
return info.get('title', '')
async def goto(self, url: str) -> None:
"""Navigate this target to a URL."""
session_id = await self._ensure_session()
params: 'NavigateParameters' = {'url': url}
await self._client.send.Page.navigate(params, session_id=session_id)
async def navigate(self, url: str) -> None:
"""Alias for goto."""
await self.goto(url)
async def go_back(self) -> None:
"""Navigate back in history."""
session_id = await self._ensure_session()
try:
# Get navigation history
history = await self._client.send.Page.getNavigationHistory(session_id=session_id)
current_index = history['currentIndex']
entries = history['entries']
# Check if we can go back
if current_index <= 0:
raise RuntimeError('Cannot go back - no previous entry in history')
# Navigate to the previous entry
previous_entry_id = entries[current_index - 1]['id']
params: 'NavigateToHistoryEntryParameters' = {'entryId': previous_entry_id}
await self._client.send.Page.navigateToHistoryEntry(params, session_id=session_id)
except Exception as e:
raise RuntimeError(f'Failed to navigate back: {e}')
async def go_forward(self) -> None:
"""Navigate forward in history."""
session_id = await self._ensure_session()
try:
# Get navigation history
history = await self._client.send.Page.getNavigationHistory(session_id=session_id)
current_index = history['currentIndex']
entries = history['entries']
# Check if we can go forward
if current_index >= len(entries) - 1:
raise RuntimeError('Cannot go forward - no next entry in history')
# Navigate to the next entry
next_entry_id = entries[current_index + 1]['id']
params: 'NavigateToHistoryEntryParameters' = {'entryId': next_entry_id}
await self._client.send.Page.navigateToHistoryEntry(params, session_id=session_id)
except Exception as e:
raise RuntimeError(f'Failed to navigate forward: {e}')
# Element finding methods (these would need to be implemented based on DOM queries)
async def get_elements_by_css_selector(self, selector: str) -> list['Element']:
"""Get elements by CSS selector."""
session_id = await self._ensure_session()
# Get document first
doc_result = await self._client.send.DOM.getDocument(session_id=session_id)
document_node_id = doc_result['root']['nodeId']
# Query selector all
query_params: 'QuerySelectorAllParameters' = {'nodeId': document_node_id, 'selector': selector}
result = await self._client.send.DOM.querySelectorAll(query_params, session_id=session_id)
elements = []
from .element import Element as Element_
# Convert node IDs to backend node IDs
for node_id in result['nodeIds']:
# Get backend node ID
describe_params: 'DescribeNodeParameters' = {'nodeId': node_id}
node_result = await self._client.send.DOM.describeNode(describe_params, session_id=session_id)
backend_node_id = node_result['node']['backendNodeId']
elements.append(Element_(self._browser_session, backend_node_id, session_id))
return elements
# AI METHODS
@property
def dom_service(self) -> 'DomService':
"""Get the DOM service for this target."""
return DomService(self._browser_session)
async def get_element_by_prompt(self, prompt: str, llm: 'BaseChatModel | None' = None) -> 'Element | None':
"""Get an element by a prompt."""
await self._ensure_session()
llm = llm or self._llm
if not llm:
raise ValueError('LLM not provided')
dom_service = self.dom_service
enhanced_dom_tree = await dom_service.get_dom_tree(target_id=self._target_id)
serialized_dom_state, _ = DOMTreeSerializer(
enhanced_dom_tree, None, paint_order_filtering=True
).serialize_accessible_elements()
llm_representation = serialized_dom_state.llm_representation()
system_message = SystemMessage(
content="""You are an AI created to find an element on a page by a prompt.
Interactive Elements: All interactive elements will be provided in format as [index]text where
- index: Numeric identifier for interaction
- type: HTML element type (button, input, etc.)
- text: Element description
Examples:
[33]
User form
[35]
Note that:
- Only elements with numeric indexes in [] are interactive
- (stacked) indentation (with \t) is important and means that the element is a (html) child of the element above (with a lower index)
- Pure text elements without [] are not interactive.
Your task is to find an element index (if any) that matches the prompt (written in tag).
If non of the elements matches the, return None.
Before you return the element index, reason about the state and elements for a sentence or two."""
)
state_message = UserMessage(
content=f"""
{llm_representation}
{prompt}
"""
)
class ElementResponse(BaseModel):
# thinking: str
element_highlight_index: int | None
llm_response = await llm.ainvoke(
[
system_message,
state_message,
],
output_format=ElementResponse,
)
element_highlight_index = llm_response.completion.element_highlight_index
if element_highlight_index is None or element_highlight_index not in serialized_dom_state.selector_map:
return None
element = serialized_dom_state.selector_map[element_highlight_index]
from .element import Element as Element_
return Element_(self._browser_session, element.backend_node_id, self._session_id)
async def must_get_element_by_prompt(self, prompt: str, llm: 'BaseChatModel | None' = None) -> 'Element':
"""Get an element by a prompt.
@dev LLM can still return None, this just raises an error if the element is not found.
"""
element = await self.get_element_by_prompt(prompt, llm)
if element is None:
raise ValueError(f'No element found for prompt: {prompt}')
return element
async def extract_content(self, prompt: str, structured_output: type[T], llm: 'BaseChatModel | None' = None) -> T:
"""Extract structured content from the current page using LLM.
Extracts clean markdown from the page and sends it to LLM for structured data extraction.
Args:
prompt: Description of what content to extract
structured_output: Pydantic BaseModel class defining the expected output structure
llm: Language model to use for extraction
Returns:
The structured BaseModel instance with extracted content
"""
llm = llm or self._llm
if not llm:
raise ValueError('LLM not provided')
# Extract clean markdown using the same method as in tools/service.py
try:
content, content_stats = await self._extract_clean_markdown()
except Exception as e:
raise RuntimeError(f'Could not extract clean markdown: {type(e).__name__}')
# System prompt for structured extraction
system_prompt = """
You are an expert at extracting structured data from the markdown of a webpage.
You will be given a query and the markdown of a webpage that has been filtered to remove noise and advertising content.
- You are tasked to extract information from the webpage that is relevant to the query.
- You should ONLY use the information available in the webpage to answer the query. Do not make up information or provide guess from your own knowledge.
- If the information relevant to the query is not available in the page, your response should mention that.
- If the query asks for all items, products, etc., make sure to directly list all of them.
- Return the extracted content in the exact structured format specified.
""".strip()
# Build prompt with just query and content
prompt_content = f'\n{prompt}\n\n\n\n{content}\n'
# Send to LLM with structured output
import asyncio
try:
response = await asyncio.wait_for(
llm.ainvoke(
[SystemMessage(content=system_prompt), UserMessage(content=prompt_content)], output_format=structured_output
),
timeout=120.0,
)
# Return the structured output BaseModel instance
return response.completion
except Exception as e:
raise RuntimeError(str(e))
async def _extract_clean_markdown(self, extract_links: bool = False) -> tuple[str, dict]:
"""Extract clean markdown from the current page using enhanced DOM tree.
Uses the shared markdown extractor for consistency with tools/service.py.
"""
from browser_use.dom.markdown_extractor import extract_clean_markdown
dom_service = self.dom_service
return await extract_clean_markdown(dom_service=dom_service, target_id=self._target_id, extract_links=extract_links)