Spaces:
Sleeping
Sleeping
| """Page class for page-level operations.""" | |
| from typing import TYPE_CHECKING, TypeVar | |
| from pydantic import BaseModel | |
| from browser_use.actor.utils import get_key_info | |
| from browser_use.dom.serializer.serializer import DOMTreeSerializer | |
| from browser_use.dom.service import DomService | |
| from browser_use.llm.messages import SystemMessage, UserMessage | |
| T = TypeVar('T', bound=BaseModel) | |
| if TYPE_CHECKING: | |
| from cdp_use.cdp.dom.commands import ( | |
| DescribeNodeParameters, | |
| QuerySelectorAllParameters, | |
| ) | |
| from cdp_use.cdp.emulation.commands import SetDeviceMetricsOverrideParameters | |
| from cdp_use.cdp.input.commands import ( | |
| DispatchKeyEventParameters, | |
| ) | |
| from cdp_use.cdp.page.commands import CaptureScreenshotParameters, NavigateParameters, NavigateToHistoryEntryParameters | |
| from cdp_use.cdp.runtime.commands import EvaluateParameters | |
| from cdp_use.cdp.target.commands import ( | |
| AttachToTargetParameters, | |
| GetTargetInfoParameters, | |
| ) | |
| from cdp_use.cdp.target.types import TargetInfo | |
| from browser_use.browser.session import BrowserSession | |
| from browser_use.llm.base import BaseChatModel | |
| from .element import Element | |
| from .mouse import Mouse | |
| class Page: | |
| """Page operations (tab or iframe).""" | |
| def __init__( | |
| self, browser_session: 'BrowserSession', target_id: str, session_id: str | None = None, llm: 'BaseChatModel | None' = None | |
| ): | |
| self._browser_session = browser_session | |
| self._client = browser_session.cdp_client | |
| self._target_id = target_id | |
| self._session_id: str | None = session_id | |
| self._mouse: 'Mouse | None' = None | |
| self._llm = llm | |
| async def _ensure_session(self) -> str: | |
| """Ensure we have a session ID for this target.""" | |
| if not self._session_id: | |
| params: 'AttachToTargetParameters' = {'targetId': self._target_id, 'flatten': True} | |
| result = await self._client.send.Target.attachToTarget(params) | |
| self._session_id = result['sessionId'] | |
| # Enable necessary domains | |
| import asyncio | |
| await asyncio.gather( | |
| self._client.send.Page.enable(session_id=self._session_id), | |
| self._client.send.DOM.enable(session_id=self._session_id), | |
| self._client.send.Runtime.enable(session_id=self._session_id), | |
| self._client.send.Network.enable(session_id=self._session_id), | |
| ) | |
| return self._session_id | |
| async def session_id(self) -> str: | |
| """Get the session ID for this target. | |
| @dev Pass this to an arbitrary CDP call | |
| """ | |
| return await self._ensure_session() | |
| async def mouse(self) -> 'Mouse': | |
| """Get the mouse interface for this target.""" | |
| if not self._mouse: | |
| session_id = await self._ensure_session() | |
| from .mouse import Mouse | |
| self._mouse = Mouse(self._browser_session, session_id, self._target_id) | |
| return self._mouse | |
| async def reload(self) -> None: | |
| """Reload the target.""" | |
| session_id = await self._ensure_session() | |
| await self._client.send.Page.reload(session_id=session_id) | |
| async def get_element(self, backend_node_id: int) -> 'Element': | |
| """Get an element by its backend node ID.""" | |
| session_id = await self._ensure_session() | |
| from .element import Element as Element_ | |
| return Element_(self._browser_session, backend_node_id, session_id) | |
| async def evaluate(self, page_function: str, *args) -> str: | |
| """Execute JavaScript in the target. | |
| Args: | |
| page_function: JavaScript code that MUST start with (...args) => format | |
| *args: Arguments to pass to the function | |
| Returns: | |
| String representation of the JavaScript execution result. | |
| Objects and arrays are JSON-stringified. | |
| """ | |
| session_id = await self._ensure_session() | |
| # Clean and fix common JavaScript string parsing issues | |
| page_function = self._fix_javascript_string(page_function) | |
| # Enforce arrow function format | |
| if not (page_function.startswith('(') and '=>' in page_function): | |
| raise ValueError(f'JavaScript code must start with (...args) => format. Got: {page_function[:50]}...') | |
| # Build the expression - call the arrow function with provided args | |
| if args: | |
| # Convert args to JSON representation for safe passing | |
| import json | |
| arg_strs = [json.dumps(arg) for arg in args] | |
| expression = f'({page_function})({", ".join(arg_strs)})' | |
| else: | |
| expression = f'({page_function})()' | |
| # Debug: print the actual expression being evaluated | |
| print(f'DEBUG: Evaluating JavaScript: {repr(expression)}') | |
| params: 'EvaluateParameters' = {'expression': expression, 'returnByValue': True, 'awaitPromise': True} | |
| result = await self._client.send.Runtime.evaluate( | |
| params, | |
| session_id=session_id, | |
| ) | |
| if 'exceptionDetails' in result: | |
| raise RuntimeError(f'JavaScript evaluation failed: {result["exceptionDetails"]}') | |
| value = result.get('result', {}).get('value') | |
| # Always return string representation | |
| if value is None: | |
| return '' | |
| elif isinstance(value, str): | |
| return value | |
| else: | |
| # Convert objects, numbers, booleans to string | |
| import json | |
| try: | |
| return json.dumps(value) if isinstance(value, (dict, list)) else str(value) | |
| except (TypeError, ValueError): | |
| return str(value) | |
| def _fix_javascript_string(self, js_code: str) -> str: | |
| """Fix common JavaScript string parsing issues when written as Python string.""" | |
| # Just do minimal, safe cleaning | |
| js_code = js_code.strip() | |
| # Only fix the most common and safe issues: | |
| # 1. Remove obvious Python string wrapper quotes if they exist | |
| if (js_code.startswith('"') and js_code.endswith('"')) or (js_code.startswith("'") and js_code.endswith("'")): | |
| # Check if it's a wrapped string (not part of JS syntax) | |
| inner = js_code[1:-1] | |
| if inner.count('"') + inner.count("'") == 0 or '() =>' in inner: | |
| js_code = inner | |
| # 2. Only fix clearly escaped quotes that shouldn't be | |
| # But be very conservative - only if we're sure it's a Python string artifact | |
| if '\\"' in js_code and js_code.count('\\"') > js_code.count('"'): | |
| js_code = js_code.replace('\\"', '"') | |
| if "\\'" in js_code and js_code.count("\\'") > js_code.count("'"): | |
| js_code = js_code.replace("\\'", "'") | |
| # 3. Basic whitespace normalization only | |
| js_code = js_code.strip() | |
| # Final validation - ensure it's not empty | |
| if not js_code: | |
| raise ValueError('JavaScript code is empty after cleaning') | |
| return js_code | |
| async def screenshot(self, format: str = 'jpeg', quality: int | None = None) -> str: | |
| """Take a screenshot and return base64 encoded image. | |
| Args: | |
| format: Image format ('jpeg', 'png', 'webp') | |
| quality: Quality 0-100 for JPEG format | |
| Returns: | |
| Base64-encoded image data | |
| """ | |
| session_id = await self._ensure_session() | |
| params: 'CaptureScreenshotParameters' = {'format': format} | |
| if quality is not None and format.lower() == 'jpeg': | |
| params['quality'] = quality | |
| result = await self._client.send.Page.captureScreenshot(params, session_id=session_id) | |
| return result['data'] | |
| async def press(self, key: str) -> None: | |
| """Press a key on the page (sends keyboard input to the focused element or page).""" | |
| session_id = await self._ensure_session() | |
| # Handle key combinations like "Control+A" | |
| if '+' in key: | |
| parts = key.split('+') | |
| modifiers = parts[:-1] | |
| main_key = parts[-1] | |
| # Calculate modifier bitmask | |
| modifier_value = 0 | |
| modifier_map = {'Alt': 1, 'Control': 2, 'Meta': 4, 'Shift': 8} | |
| for mod in modifiers: | |
| modifier_value |= modifier_map.get(mod, 0) | |
| # Press modifier keys | |
| for mod in modifiers: | |
| code, vk_code = get_key_info(mod) | |
| params: 'DispatchKeyEventParameters' = {'type': 'keyDown', 'key': mod, 'code': code} | |
| if vk_code is not None: | |
| params['windowsVirtualKeyCode'] = vk_code | |
| await self._client.send.Input.dispatchKeyEvent(params, session_id=session_id) | |
| # Press main key with modifiers bitmask | |
| main_code, main_vk_code = get_key_info(main_key) | |
| main_down_params: 'DispatchKeyEventParameters' = { | |
| 'type': 'keyDown', | |
| 'key': main_key, | |
| 'code': main_code, | |
| 'modifiers': modifier_value, | |
| } | |
| if main_vk_code is not None: | |
| main_down_params['windowsVirtualKeyCode'] = main_vk_code | |
| await self._client.send.Input.dispatchKeyEvent(main_down_params, session_id=session_id) | |
| main_up_params: 'DispatchKeyEventParameters' = { | |
| 'type': 'keyUp', | |
| 'key': main_key, | |
| 'code': main_code, | |
| 'modifiers': modifier_value, | |
| } | |
| if main_vk_code is not None: | |
| main_up_params['windowsVirtualKeyCode'] = main_vk_code | |
| await self._client.send.Input.dispatchKeyEvent(main_up_params, session_id=session_id) | |
| # Release modifier keys | |
| for mod in reversed(modifiers): | |
| code, vk_code = get_key_info(mod) | |
| release_params: 'DispatchKeyEventParameters' = {'type': 'keyUp', 'key': mod, 'code': code} | |
| if vk_code is not None: | |
| release_params['windowsVirtualKeyCode'] = vk_code | |
| await self._client.send.Input.dispatchKeyEvent(release_params, session_id=session_id) | |
| else: | |
| # Simple key press | |
| code, vk_code = get_key_info(key) | |
| key_down_params: 'DispatchKeyEventParameters' = {'type': 'keyDown', 'key': key, 'code': code} | |
| if vk_code is not None: | |
| key_down_params['windowsVirtualKeyCode'] = vk_code | |
| await self._client.send.Input.dispatchKeyEvent(key_down_params, session_id=session_id) | |
| key_up_params: 'DispatchKeyEventParameters' = {'type': 'keyUp', 'key': key, 'code': code} | |
| if vk_code is not None: | |
| key_up_params['windowsVirtualKeyCode'] = vk_code | |
| await self._client.send.Input.dispatchKeyEvent(key_up_params, session_id=session_id) | |
| async def set_viewport_size(self, width: int, height: int) -> None: | |
| """Set the viewport size.""" | |
| session_id = await self._ensure_session() | |
| params: 'SetDeviceMetricsOverrideParameters' = { | |
| 'width': width, | |
| 'height': height, | |
| 'deviceScaleFactor': 1.0, | |
| 'mobile': False, | |
| } | |
| await self._client.send.Emulation.setDeviceMetricsOverride( | |
| params, | |
| session_id=session_id, | |
| ) | |
| # Target properties (from CDP getTargetInfo) | |
| async def get_target_info(self) -> 'TargetInfo': | |
| """Get target information.""" | |
| params: 'GetTargetInfoParameters' = {'targetId': self._target_id} | |
| result = await self._client.send.Target.getTargetInfo(params) | |
| return result['targetInfo'] | |
| async def get_url(self) -> str: | |
| """Get the current URL.""" | |
| info = await self.get_target_info() | |
| return info.get('url', '') | |
| async def get_title(self) -> str: | |
| """Get the current title.""" | |
| info = await self.get_target_info() | |
| return info.get('title', '') | |
| async def goto(self, url: str) -> None: | |
| """Navigate this target to a URL.""" | |
| session_id = await self._ensure_session() | |
| params: 'NavigateParameters' = {'url': url} | |
| await self._client.send.Page.navigate(params, session_id=session_id) | |
| async def navigate(self, url: str) -> None: | |
| """Alias for goto.""" | |
| await self.goto(url) | |
| async def go_back(self) -> None: | |
| """Navigate back in history.""" | |
| session_id = await self._ensure_session() | |
| try: | |
| # Get navigation history | |
| history = await self._client.send.Page.getNavigationHistory(session_id=session_id) | |
| current_index = history['currentIndex'] | |
| entries = history['entries'] | |
| # Check if we can go back | |
| if current_index <= 0: | |
| raise RuntimeError('Cannot go back - no previous entry in history') | |
| # Navigate to the previous entry | |
| previous_entry_id = entries[current_index - 1]['id'] | |
| params: 'NavigateToHistoryEntryParameters' = {'entryId': previous_entry_id} | |
| await self._client.send.Page.navigateToHistoryEntry(params, session_id=session_id) | |
| except Exception as e: | |
| raise RuntimeError(f'Failed to navigate back: {e}') | |
| async def go_forward(self) -> None: | |
| """Navigate forward in history.""" | |
| session_id = await self._ensure_session() | |
| try: | |
| # Get navigation history | |
| history = await self._client.send.Page.getNavigationHistory(session_id=session_id) | |
| current_index = history['currentIndex'] | |
| entries = history['entries'] | |
| # Check if we can go forward | |
| if current_index >= len(entries) - 1: | |
| raise RuntimeError('Cannot go forward - no next entry in history') | |
| # Navigate to the next entry | |
| next_entry_id = entries[current_index + 1]['id'] | |
| params: 'NavigateToHistoryEntryParameters' = {'entryId': next_entry_id} | |
| await self._client.send.Page.navigateToHistoryEntry(params, session_id=session_id) | |
| except Exception as e: | |
| raise RuntimeError(f'Failed to navigate forward: {e}') | |
| # Element finding methods (these would need to be implemented based on DOM queries) | |
| async def get_elements_by_css_selector(self, selector: str) -> list['Element']: | |
| """Get elements by CSS selector.""" | |
| session_id = await self._ensure_session() | |
| # Get document first | |
| doc_result = await self._client.send.DOM.getDocument(session_id=session_id) | |
| document_node_id = doc_result['root']['nodeId'] | |
| # Query selector all | |
| query_params: 'QuerySelectorAllParameters' = {'nodeId': document_node_id, 'selector': selector} | |
| result = await self._client.send.DOM.querySelectorAll(query_params, session_id=session_id) | |
| elements = [] | |
| from .element import Element as Element_ | |
| # Convert node IDs to backend node IDs | |
| for node_id in result['nodeIds']: | |
| # Get backend node ID | |
| describe_params: 'DescribeNodeParameters' = {'nodeId': node_id} | |
| node_result = await self._client.send.DOM.describeNode(describe_params, session_id=session_id) | |
| backend_node_id = node_result['node']['backendNodeId'] | |
| elements.append(Element_(self._browser_session, backend_node_id, session_id)) | |
| return elements | |
| # AI METHODS | |
| def dom_service(self) -> 'DomService': | |
| """Get the DOM service for this target.""" | |
| return DomService(self._browser_session) | |
| async def get_element_by_prompt(self, prompt: str, llm: 'BaseChatModel | None' = None) -> 'Element | None': | |
| """Get an element by a prompt.""" | |
| await self._ensure_session() | |
| llm = llm or self._llm | |
| if not llm: | |
| raise ValueError('LLM not provided') | |
| dom_service = self.dom_service | |
| enhanced_dom_tree = await dom_service.get_dom_tree(target_id=self._target_id) | |
| serialized_dom_state, _ = DOMTreeSerializer( | |
| enhanced_dom_tree, None, paint_order_filtering=True | |
| ).serialize_accessible_elements() | |
| llm_representation = serialized_dom_state.llm_representation() | |
| system_message = SystemMessage( | |
| content="""You are an AI created to find an element on a page by a prompt. | |
| <browser_state> | |
| Interactive Elements: All interactive elements will be provided in format as [index]<type>text</type> where | |
| - index: Numeric identifier for interaction | |
| - type: HTML element type (button, input, etc.) | |
| - text: Element description | |
| Examples: | |
| [33]<div>User form</div> | |
| [35]<button aria-label='Submit form'>Submit</button> | |
| Note that: | |
| - Only elements with numeric indexes in [] are interactive | |
| - (stacked) indentation (with \t) is important and means that the element is a (html) child of the element above (with a lower index) | |
| - Pure text elements without [] are not interactive. | |
| </browser_state> | |
| Your task is to find an element index (if any) that matches the prompt (written in <prompt> tag). | |
| If non of the elements matches the, return None. | |
| Before you return the element index, reason about the state and elements for a sentence or two.""" | |
| ) | |
| state_message = UserMessage( | |
| content=f""" | |
| <browser_state> | |
| {llm_representation} | |
| </browser_state> | |
| <prompt> | |
| {prompt} | |
| </prompt> | |
| """ | |
| ) | |
| class ElementResponse(BaseModel): | |
| # thinking: str | |
| element_highlight_index: int | None | |
| llm_response = await llm.ainvoke( | |
| [ | |
| system_message, | |
| state_message, | |
| ], | |
| output_format=ElementResponse, | |
| ) | |
| element_highlight_index = llm_response.completion.element_highlight_index | |
| if element_highlight_index is None or element_highlight_index not in serialized_dom_state.selector_map: | |
| return None | |
| element = serialized_dom_state.selector_map[element_highlight_index] | |
| from .element import Element as Element_ | |
| return Element_(self._browser_session, element.backend_node_id, self._session_id) | |
| async def must_get_element_by_prompt(self, prompt: str, llm: 'BaseChatModel | None' = None) -> 'Element': | |
| """Get an element by a prompt. | |
| @dev LLM can still return None, this just raises an error if the element is not found. | |
| """ | |
| element = await self.get_element_by_prompt(prompt, llm) | |
| if element is None: | |
| raise ValueError(f'No element found for prompt: {prompt}') | |
| return element | |
| async def extract_content(self, prompt: str, structured_output: type[T], llm: 'BaseChatModel | None' = None) -> T: | |
| """Extract structured content from the current page using LLM. | |
| Extracts clean markdown from the page and sends it to LLM for structured data extraction. | |
| Args: | |
| prompt: Description of what content to extract | |
| structured_output: Pydantic BaseModel class defining the expected output structure | |
| llm: Language model to use for extraction | |
| Returns: | |
| The structured BaseModel instance with extracted content | |
| """ | |
| llm = llm or self._llm | |
| if not llm: | |
| raise ValueError('LLM not provided') | |
| # Extract clean markdown using the same method as in tools/service.py | |
| try: | |
| content, content_stats = await self._extract_clean_markdown() | |
| except Exception as e: | |
| raise RuntimeError(f'Could not extract clean markdown: {type(e).__name__}') | |
| # System prompt for structured extraction | |
| system_prompt = """ | |
| You are an expert at extracting structured data from the markdown of a webpage. | |
| <input> | |
| You will be given a query and the markdown of a webpage that has been filtered to remove noise and advertising content. | |
| </input> | |
| <instructions> | |
| - You are tasked to extract information from the webpage that is relevant to the query. | |
| - You should ONLY use the information available in the webpage to answer the query. Do not make up information or provide guess from your own knowledge. | |
| - If the information relevant to the query is not available in the page, your response should mention that. | |
| - If the query asks for all items, products, etc., make sure to directly list all of them. | |
| - Return the extracted content in the exact structured format specified. | |
| </instructions> | |
| <output> | |
| - Your output should present ALL the information relevant to the query in the specified structured format. | |
| - Do not answer in conversational format - directly output the relevant information in the structured format. | |
| </output> | |
| """.strip() | |
| # Build prompt with just query and content | |
| prompt_content = f'<query>\n{prompt}\n</query>\n\n<webpage_content>\n{content}\n</webpage_content>' | |
| # Send to LLM with structured output | |
| import asyncio | |
| try: | |
| response = await asyncio.wait_for( | |
| llm.ainvoke( | |
| [SystemMessage(content=system_prompt), UserMessage(content=prompt_content)], output_format=structured_output | |
| ), | |
| timeout=120.0, | |
| ) | |
| # Return the structured output BaseModel instance | |
| return response.completion | |
| except Exception as e: | |
| raise RuntimeError(str(e)) | |
| async def _extract_clean_markdown(self, extract_links: bool = False) -> tuple[str, dict]: | |
| """Extract clean markdown from the current page using enhanced DOM tree. | |
| Uses the shared markdown extractor for consistency with tools/service.py. | |
| """ | |
| from browser_use.dom.markdown_extractor import extract_clean_markdown | |
| dom_service = self.dom_service | |
| return await extract_clean_markdown(dom_service=dom_service, target_id=self._target_id, extract_links=extract_links) | |