Spaces:
Sleeping
Sleeping
| """Element class for element operations.""" | |
| import asyncio | |
| from typing import TYPE_CHECKING, Literal, Union | |
| from cdp_use.client import logger | |
| from typing_extensions import TypedDict | |
| if TYPE_CHECKING: | |
| from cdp_use.cdp.dom.commands import ( | |
| DescribeNodeParameters, | |
| FocusParameters, | |
| GetAttributesParameters, | |
| GetBoxModelParameters, | |
| PushNodesByBackendIdsToFrontendParameters, | |
| RequestChildNodesParameters, | |
| ResolveNodeParameters, | |
| ) | |
| from cdp_use.cdp.input.commands import ( | |
| DispatchMouseEventParameters, | |
| ) | |
| from cdp_use.cdp.input.types import MouseButton | |
| from cdp_use.cdp.page.commands import CaptureScreenshotParameters | |
| from cdp_use.cdp.page.types import Viewport | |
| from cdp_use.cdp.runtime.commands import CallFunctionOnParameters | |
| from browser_use.browser.session import BrowserSession | |
| # Type definitions for element operations | |
| ModifierType = Literal['Alt', 'Control', 'Meta', 'Shift'] | |
| class Position(TypedDict): | |
| """2D position coordinates.""" | |
| x: float | |
| y: float | |
| class BoundingBox(TypedDict): | |
| """Element bounding box with position and dimensions.""" | |
| x: float | |
| y: float | |
| width: float | |
| height: float | |
| class ElementInfo(TypedDict): | |
| """Basic information about a DOM element.""" | |
| backendNodeId: int | |
| nodeId: int | None | |
| nodeName: str | |
| nodeType: int | |
| nodeValue: str | None | |
| attributes: dict[str, str] | |
| boundingBox: BoundingBox | None | |
| error: str | None | |
| class Element: | |
| """Element operations using BackendNodeId.""" | |
| def __init__( | |
| self, | |
| browser_session: 'BrowserSession', | |
| backend_node_id: int, | |
| session_id: str | None = None, | |
| ): | |
| self._browser_session = browser_session | |
| self._client = browser_session.cdp_client | |
| self._backend_node_id = backend_node_id | |
| self._session_id = session_id | |
| async def _get_node_id(self) -> int: | |
| """Get DOM node ID from backend node ID.""" | |
| params: 'PushNodesByBackendIdsToFrontendParameters' = {'backendNodeIds': [self._backend_node_id]} | |
| result = await self._client.send.DOM.pushNodesByBackendIdsToFrontend(params, session_id=self._session_id) | |
| return result['nodeIds'][0] | |
| async def _get_remote_object_id(self) -> str | None: | |
| """Get remote object ID for this element.""" | |
| node_id = await self._get_node_id() | |
| params: 'ResolveNodeParameters' = {'nodeId': node_id} | |
| result = await self._client.send.DOM.resolveNode(params, session_id=self._session_id) | |
| object_id = result['object'].get('objectId', None) | |
| if not object_id: | |
| return None | |
| return object_id | |
| async def click( | |
| self, | |
| button: 'MouseButton' = 'left', | |
| click_count: int = 1, | |
| modifiers: list[ModifierType] | None = None, | |
| ) -> None: | |
| """Click the element using the advanced watchdog implementation.""" | |
| try: | |
| # Get viewport dimensions for visibility checks | |
| layout_metrics = await self._client.send.Page.getLayoutMetrics(session_id=self._session_id) | |
| viewport_width = layout_metrics['layoutViewport']['clientWidth'] | |
| viewport_height = layout_metrics['layoutViewport']['clientHeight'] | |
| # Try multiple methods to get element geometry | |
| quads = [] | |
| # Method 1: Try DOM.getContentQuads first (best for inline elements and complex layouts) | |
| try: | |
| content_quads_result = await self._client.send.DOM.getContentQuads( | |
| params={'backendNodeId': self._backend_node_id}, session_id=self._session_id | |
| ) | |
| if 'quads' in content_quads_result and content_quads_result['quads']: | |
| quads = content_quads_result['quads'] | |
| except Exception: | |
| pass | |
| # Method 2: Fall back to DOM.getBoxModel | |
| if not quads: | |
| try: | |
| box_model = await self._client.send.DOM.getBoxModel( | |
| params={'backendNodeId': self._backend_node_id}, session_id=self._session_id | |
| ) | |
| if 'model' in box_model and 'content' in box_model['model']: | |
| content_quad = box_model['model']['content'] | |
| if len(content_quad) >= 8: | |
| # Convert box model format to quad format | |
| quads = [ | |
| [ | |
| content_quad[0], | |
| content_quad[1], # x1, y1 | |
| content_quad[2], | |
| content_quad[3], # x2, y2 | |
| content_quad[4], | |
| content_quad[5], # x3, y3 | |
| content_quad[6], | |
| content_quad[7], # x4, y4 | |
| ] | |
| ] | |
| except Exception: | |
| pass | |
| # Method 3: Fall back to JavaScript getBoundingClientRect | |
| if not quads: | |
| try: | |
| result = await self._client.send.DOM.resolveNode( | |
| params={'backendNodeId': self._backend_node_id}, session_id=self._session_id | |
| ) | |
| if 'object' in result and 'objectId' in result['object']: | |
| object_id = result['object']['objectId'] | |
| # Get bounding rect via JavaScript | |
| bounds_result = await self._client.send.Runtime.callFunctionOn( | |
| params={ | |
| 'functionDeclaration': """ | |
| function() { | |
| const rect = this.getBoundingClientRect(); | |
| return { | |
| x: rect.left, | |
| y: rect.top, | |
| width: rect.width, | |
| height: rect.height | |
| }; | |
| } | |
| """, | |
| 'objectId': object_id, | |
| 'returnByValue': True, | |
| }, | |
| session_id=self._session_id, | |
| ) | |
| if 'result' in bounds_result and 'value' in bounds_result['result']: | |
| rect = bounds_result['result']['value'] | |
| # Convert rect to quad format | |
| x, y, w, h = rect['x'], rect['y'], rect['width'], rect['height'] | |
| quads = [ | |
| [ | |
| x, | |
| y, # top-left | |
| x + w, | |
| y, # top-right | |
| x + w, | |
| y + h, # bottom-right | |
| x, | |
| y + h, # bottom-left | |
| ] | |
| ] | |
| except Exception: | |
| pass | |
| # If we still don't have quads, fall back to JS click | |
| if not quads: | |
| try: | |
| result = await self._client.send.DOM.resolveNode( | |
| params={'backendNodeId': self._backend_node_id}, session_id=self._session_id | |
| ) | |
| if 'object' not in result or 'objectId' not in result['object']: | |
| raise Exception('Failed to find DOM element based on backendNodeId, maybe page content changed?') | |
| object_id = result['object']['objectId'] | |
| await self._client.send.Runtime.callFunctionOn( | |
| params={ | |
| 'functionDeclaration': 'function() { this.click(); }', | |
| 'objectId': object_id, | |
| }, | |
| session_id=self._session_id, | |
| ) | |
| await asyncio.sleep(0.05) | |
| return | |
| except Exception as js_e: | |
| raise Exception(f'Failed to click element: {js_e}') | |
| # Find the largest visible quad within the viewport | |
| best_quad = None | |
| best_area = 0 | |
| for quad in quads: | |
| if len(quad) < 8: | |
| continue | |
| # Calculate quad bounds | |
| xs = [quad[i] for i in range(0, 8, 2)] | |
| ys = [quad[i] for i in range(1, 8, 2)] | |
| min_x, max_x = min(xs), max(xs) | |
| min_y, max_y = min(ys), max(ys) | |
| # Check if quad intersects with viewport | |
| if max_x < 0 or max_y < 0 or min_x > viewport_width or min_y > viewport_height: | |
| continue # Quad is completely outside viewport | |
| # Calculate visible area (intersection with viewport) | |
| visible_min_x = max(0, min_x) | |
| visible_max_x = min(viewport_width, max_x) | |
| visible_min_y = max(0, min_y) | |
| visible_max_y = min(viewport_height, max_y) | |
| visible_width = visible_max_x - visible_min_x | |
| visible_height = visible_max_y - visible_min_y | |
| visible_area = visible_width * visible_height | |
| if visible_area > best_area: | |
| best_area = visible_area | |
| best_quad = quad | |
| if not best_quad: | |
| # No visible quad found, use the first quad anyway | |
| best_quad = quads[0] | |
| # Calculate center point of the best quad | |
| center_x = sum(best_quad[i] for i in range(0, 8, 2)) / 4 | |
| center_y = sum(best_quad[i] for i in range(1, 8, 2)) / 4 | |
| # Ensure click point is within viewport bounds | |
| center_x = max(0, min(viewport_width - 1, center_x)) | |
| center_y = max(0, min(viewport_height - 1, center_y)) | |
| # Scroll element into view | |
| try: | |
| await self._client.send.DOM.scrollIntoViewIfNeeded( | |
| params={'backendNodeId': self._backend_node_id}, session_id=self._session_id | |
| ) | |
| await asyncio.sleep(0.05) # Wait for scroll to complete | |
| except Exception: | |
| pass | |
| # Calculate modifier bitmask for CDP | |
| modifier_value = 0 | |
| if modifiers: | |
| modifier_map = {'Alt': 1, 'Control': 2, 'Meta': 4, 'Shift': 8} | |
| for mod in modifiers: | |
| modifier_value |= modifier_map.get(mod, 0) | |
| # Perform the click using CDP | |
| try: | |
| # Move mouse to element | |
| await self._client.send.Input.dispatchMouseEvent( | |
| params={ | |
| 'type': 'mouseMoved', | |
| 'x': center_x, | |
| 'y': center_y, | |
| }, | |
| session_id=self._session_id, | |
| ) | |
| await asyncio.sleep(0.05) | |
| # Mouse down | |
| try: | |
| await asyncio.wait_for( | |
| self._client.send.Input.dispatchMouseEvent( | |
| params={ | |
| 'type': 'mousePressed', | |
| 'x': center_x, | |
| 'y': center_y, | |
| 'button': button, | |
| 'clickCount': click_count, | |
| 'modifiers': modifier_value, | |
| }, | |
| session_id=self._session_id, | |
| ), | |
| timeout=1.0, # 1 second timeout for mousePressed | |
| ) | |
| await asyncio.sleep(0.08) | |
| except TimeoutError: | |
| pass # Don't sleep if we timed out | |
| # Mouse up | |
| try: | |
| await asyncio.wait_for( | |
| self._client.send.Input.dispatchMouseEvent( | |
| params={ | |
| 'type': 'mouseReleased', | |
| 'x': center_x, | |
| 'y': center_y, | |
| 'button': button, | |
| 'clickCount': click_count, | |
| 'modifiers': modifier_value, | |
| }, | |
| session_id=self._session_id, | |
| ), | |
| timeout=3.0, # 3 second timeout for mouseReleased | |
| ) | |
| except TimeoutError: | |
| pass | |
| except Exception as e: | |
| # Fall back to JavaScript click via CDP | |
| try: | |
| result = await self._client.send.DOM.resolveNode( | |
| params={'backendNodeId': self._backend_node_id}, session_id=self._session_id | |
| ) | |
| if 'object' not in result or 'objectId' not in result['object']: | |
| raise Exception('Failed to find DOM element based on backendNodeId, maybe page content changed?') | |
| object_id = result['object']['objectId'] | |
| await self._client.send.Runtime.callFunctionOn( | |
| params={ | |
| 'functionDeclaration': 'function() { this.click(); }', | |
| 'objectId': object_id, | |
| }, | |
| session_id=self._session_id, | |
| ) | |
| await asyncio.sleep(0.1) | |
| return | |
| except Exception as js_e: | |
| raise Exception(f'Failed to click element: {e}') | |
| except Exception as e: | |
| # Extract key element info for error message | |
| raise RuntimeError(f'Failed to click element: {e}') | |
| async def fill(self, value: str, clear: bool = True) -> None: | |
| """Fill the input element using proper CDP methods with improved focus handling.""" | |
| try: | |
| # Use the existing CDP client and session | |
| cdp_client = self._client | |
| session_id = self._session_id | |
| backend_node_id = self._backend_node_id | |
| # Track coordinates for metadata | |
| input_coordinates = None | |
| # Scroll element into view | |
| try: | |
| await cdp_client.send.DOM.scrollIntoViewIfNeeded(params={'backendNodeId': backend_node_id}, session_id=session_id) | |
| await asyncio.sleep(0.01) | |
| except Exception as e: | |
| logger.warning(f'Failed to scroll element into view: {e}') | |
| # Get object ID for the element | |
| result = await cdp_client.send.DOM.resolveNode( | |
| params={'backendNodeId': backend_node_id}, | |
| session_id=session_id, | |
| ) | |
| if 'object' not in result or 'objectId' not in result['object']: | |
| raise RuntimeError('Failed to get object ID for element') | |
| object_id = result['object']['objectId'] | |
| # Get element coordinates for focus | |
| try: | |
| bounds_result = await cdp_client.send.Runtime.callFunctionOn( | |
| params={ | |
| 'functionDeclaration': 'function() { return this.getBoundingClientRect(); }', | |
| 'objectId': object_id, | |
| 'returnByValue': True, | |
| }, | |
| session_id=session_id, | |
| ) | |
| if bounds_result.get('result', {}).get('value'): | |
| bounds = bounds_result['result']['value'] # type: ignore | |
| center_x = bounds['x'] + bounds['width'] / 2 | |
| center_y = bounds['y'] + bounds['height'] / 2 | |
| input_coordinates = {'input_x': center_x, 'input_y': center_y} | |
| logger.debug(f'Using element coordinates: x={center_x:.1f}, y={center_y:.1f}') | |
| except Exception as e: | |
| logger.debug(f'Could not get element coordinates: {e}') | |
| # Ensure session_id is not None | |
| if session_id is None: | |
| raise RuntimeError('Session ID is required for fill operation') | |
| # Step 1: Focus the element | |
| focused_successfully = await self._focus_element_simple( | |
| backend_node_id=backend_node_id, | |
| object_id=object_id, | |
| cdp_client=cdp_client, | |
| session_id=session_id, | |
| input_coordinates=input_coordinates, | |
| ) | |
| # Step 2: Clear existing text if requested | |
| if clear: | |
| cleared_successfully = await self._clear_text_field( | |
| object_id=object_id, cdp_client=cdp_client, session_id=session_id | |
| ) | |
| if not cleared_successfully: | |
| logger.warning('Text field clearing failed, typing may append to existing text') | |
| # Step 3: Type the text character by character using proper human-like key events | |
| logger.debug(f'Typing text character by character: "{value}"') | |
| for i, char in enumerate(value): | |
| # Handle newline characters as Enter key | |
| if char == '\n': | |
| # Send proper Enter key sequence | |
| await cdp_client.send.Input.dispatchKeyEvent( | |
| params={ | |
| 'type': 'keyDown', | |
| 'key': 'Enter', | |
| 'code': 'Enter', | |
| 'windowsVirtualKeyCode': 13, | |
| }, | |
| session_id=session_id, | |
| ) | |
| # Small delay to emulate human typing speed | |
| await asyncio.sleep(0.001) | |
| # Send char event with carriage return | |
| await cdp_client.send.Input.dispatchKeyEvent( | |
| params={ | |
| 'type': 'char', | |
| 'text': '\r', | |
| 'key': 'Enter', | |
| }, | |
| session_id=session_id, | |
| ) | |
| # Send keyUp event | |
| await cdp_client.send.Input.dispatchKeyEvent( | |
| params={ | |
| 'type': 'keyUp', | |
| 'key': 'Enter', | |
| 'code': 'Enter', | |
| 'windowsVirtualKeyCode': 13, | |
| }, | |
| session_id=session_id, | |
| ) | |
| else: | |
| # Handle regular characters | |
| # Get proper modifiers, VK code, and base key for the character | |
| modifiers, vk_code, base_key = self._get_char_modifiers_and_vk(char) | |
| key_code = self._get_key_code_for_char(base_key) | |
| # Step 1: Send keyDown event (NO text parameter) | |
| await cdp_client.send.Input.dispatchKeyEvent( | |
| params={ | |
| 'type': 'keyDown', | |
| 'key': base_key, | |
| 'code': key_code, | |
| 'modifiers': modifiers, | |
| 'windowsVirtualKeyCode': vk_code, | |
| }, | |
| session_id=session_id, | |
| ) | |
| # Small delay to emulate human typing speed | |
| await asyncio.sleep(0.001) | |
| # Step 2: Send char event (WITH text parameter) - this is crucial for text input | |
| await cdp_client.send.Input.dispatchKeyEvent( | |
| params={ | |
| 'type': 'char', | |
| 'text': char, | |
| 'key': char, | |
| }, | |
| session_id=session_id, | |
| ) | |
| # Step 3: Send keyUp event (NO text parameter) | |
| await cdp_client.send.Input.dispatchKeyEvent( | |
| params={ | |
| 'type': 'keyUp', | |
| 'key': base_key, | |
| 'code': key_code, | |
| 'modifiers': modifiers, | |
| 'windowsVirtualKeyCode': vk_code, | |
| }, | |
| session_id=session_id, | |
| ) | |
| # Add 18ms delay between keystrokes | |
| await asyncio.sleep(0.018) | |
| except Exception as e: | |
| raise Exception(f'Failed to fill element: {str(e)}') | |
| async def hover(self) -> None: | |
| """Hover over the element.""" | |
| box = await self.get_bounding_box() | |
| if not box: | |
| raise RuntimeError('Element is not visible or has no bounding box') | |
| x = box['x'] + box['width'] / 2 | |
| y = box['y'] + box['height'] / 2 | |
| params: 'DispatchMouseEventParameters' = {'type': 'mouseMoved', 'x': x, 'y': y} | |
| await self._client.send.Input.dispatchMouseEvent(params, session_id=self._session_id) | |
| async def focus(self) -> None: | |
| """Focus the element.""" | |
| node_id = await self._get_node_id() | |
| params: 'FocusParameters' = {'nodeId': node_id} | |
| await self._client.send.DOM.focus(params, session_id=self._session_id) | |
| async def check(self) -> None: | |
| """Check or uncheck a checkbox/radio button.""" | |
| await self.click() | |
| async def select_option(self, values: str | list[str]) -> None: | |
| """Select option(s) in a select element.""" | |
| if isinstance(values, str): | |
| values = [values] | |
| # Focus the element first | |
| try: | |
| await self.focus() | |
| except Exception: | |
| logger.warning('Failed to focus element') | |
| # For select elements, we need to find option elements and click them | |
| # This is a simplified approach - in practice, you might need to handle | |
| # different select types (single vs multi-select) differently | |
| node_id = await self._get_node_id() | |
| # Request child nodes to get the options | |
| params: 'RequestChildNodesParameters' = {'nodeId': node_id, 'depth': 1} | |
| await self._client.send.DOM.requestChildNodes(params, session_id=self._session_id) | |
| # Get the updated node description with children | |
| describe_params: 'DescribeNodeParameters' = {'nodeId': node_id, 'depth': 1} | |
| describe_result = await self._client.send.DOM.describeNode(describe_params, session_id=self._session_id) | |
| select_node = describe_result['node'] | |
| # Find and select matching options | |
| for child in select_node.get('children', []): | |
| if child.get('nodeName', '').lower() == 'option': | |
| # Get option attributes | |
| attrs = child.get('attributes', []) | |
| option_attrs = {} | |
| for i in range(0, len(attrs), 2): | |
| if i + 1 < len(attrs): | |
| option_attrs[attrs[i]] = attrs[i + 1] | |
| option_value = option_attrs.get('value', '') | |
| option_text = child.get('nodeValue', '') | |
| # Check if this option should be selected | |
| should_select = option_value in values or option_text in values | |
| if should_select: | |
| # Click the option to select it | |
| option_node_id = child.get('nodeId') | |
| if option_node_id: | |
| # Get backend node ID for the option | |
| option_describe_params: 'DescribeNodeParameters' = {'nodeId': option_node_id} | |
| option_backend_result = await self._client.send.DOM.describeNode( | |
| option_describe_params, session_id=self._session_id | |
| ) | |
| option_backend_id = option_backend_result['node']['backendNodeId'] | |
| # Create an Element for the option and click it | |
| option_element = Element(self._browser_session, option_backend_id, self._session_id) | |
| await option_element.click() | |
| async def drag_to( | |
| self, | |
| target: Union['Element', Position], | |
| source_position: Position | None = None, | |
| target_position: Position | None = None, | |
| ) -> None: | |
| """Drag this element to another element or position.""" | |
| # Get source coordinates | |
| if source_position: | |
| source_x = source_position['x'] | |
| source_y = source_position['y'] | |
| else: | |
| source_box = await self.get_bounding_box() | |
| if not source_box: | |
| raise RuntimeError('Source element is not visible') | |
| source_x = source_box['x'] + source_box['width'] / 2 | |
| source_y = source_box['y'] + source_box['height'] / 2 | |
| # Get target coordinates | |
| if isinstance(target, dict) and 'x' in target and 'y' in target: | |
| target_x = target['x'] | |
| target_y = target['y'] | |
| else: | |
| if target_position: | |
| target_box = await target.get_bounding_box() | |
| if not target_box: | |
| raise RuntimeError('Target element is not visible') | |
| target_x = target_box['x'] + target_position['x'] | |
| target_y = target_box['y'] + target_position['y'] | |
| else: | |
| target_box = await target.get_bounding_box() | |
| if not target_box: | |
| raise RuntimeError('Target element is not visible') | |
| target_x = target_box['x'] + target_box['width'] / 2 | |
| target_y = target_box['y'] + target_box['height'] / 2 | |
| # Perform drag operation | |
| await self._client.send.Input.dispatchMouseEvent( | |
| {'type': 'mousePressed', 'x': source_x, 'y': source_y, 'button': 'left'}, | |
| session_id=self._session_id, | |
| ) | |
| await self._client.send.Input.dispatchMouseEvent( | |
| {'type': 'mouseMoved', 'x': target_x, 'y': target_y}, | |
| session_id=self._session_id, | |
| ) | |
| await self._client.send.Input.dispatchMouseEvent( | |
| {'type': 'mouseReleased', 'x': target_x, 'y': target_y, 'button': 'left'}, | |
| session_id=self._session_id, | |
| ) | |
| # Element properties and queries | |
| async def get_attribute(self, name: str) -> str | None: | |
| """Get an attribute value.""" | |
| node_id = await self._get_node_id() | |
| params: 'GetAttributesParameters' = {'nodeId': node_id} | |
| result = await self._client.send.DOM.getAttributes(params, session_id=self._session_id) | |
| attributes = result['attributes'] | |
| for i in range(0, len(attributes), 2): | |
| if attributes[i] == name: | |
| return attributes[i + 1] | |
| return None | |
| async def get_bounding_box(self) -> BoundingBox | None: | |
| """Get the bounding box of the element.""" | |
| try: | |
| node_id = await self._get_node_id() | |
| params: 'GetBoxModelParameters' = {'nodeId': node_id} | |
| result = await self._client.send.DOM.getBoxModel(params, session_id=self._session_id) | |
| if 'model' not in result: | |
| return None | |
| # Get content box (first 8 values are content quad: x1,y1,x2,y2,x3,y3,x4,y4) | |
| content = result['model']['content'] | |
| if len(content) < 8: | |
| return None | |
| # Calculate bounding box from quad | |
| x_coords = [content[i] for i in range(0, 8, 2)] | |
| y_coords = [content[i] for i in range(1, 8, 2)] | |
| x = min(x_coords) | |
| y = min(y_coords) | |
| width = max(x_coords) - x | |
| height = max(y_coords) - y | |
| return BoundingBox(x=x, y=y, width=width, height=height) | |
| except Exception: | |
| return None | |
| async def screenshot(self, format: str = 'jpeg', quality: int | None = None) -> str: | |
| """Take a screenshot of this element and return base64 encoded image. | |
| Args: | |
| format: Image format ('jpeg', 'png', 'webp') | |
| quality: Quality 0-100 for JPEG format | |
| Returns: | |
| Base64-encoded image data | |
| """ | |
| # Get element's bounding box | |
| box = await self.get_bounding_box() | |
| if not box: | |
| raise RuntimeError('Element is not visible or has no bounding box') | |
| # Create viewport clip for the element | |
| viewport: 'Viewport' = {'x': box['x'], 'y': box['y'], 'width': box['width'], 'height': box['height'], 'scale': 1.0} | |
| # Prepare screenshot parameters | |
| params: 'CaptureScreenshotParameters' = {'format': format, 'clip': viewport} | |
| if quality is not None and format.lower() == 'jpeg': | |
| params['quality'] = quality | |
| # Take screenshot | |
| result = await self._client.send.Page.captureScreenshot(params, session_id=self._session_id) | |
| return result['data'] | |
| async def evaluate(self, page_function: str, *args) -> str: | |
| """Execute JavaScript code in the context of this element. | |
| The JavaScript code executes with 'this' bound to the element, allowing direct | |
| access to element properties and methods. | |
| Args: | |
| page_function: JavaScript code that MUST start with (...args) => format | |
| *args: Arguments to pass to the function | |
| Returns: | |
| String representation of the JavaScript execution result. | |
| Objects and arrays are JSON-stringified. | |
| Example: | |
| # Get element's text content | |
| text = await element.evaluate("() => this.textContent") | |
| # Set style with argument | |
| await element.evaluate("(color) => this.style.color = color", "red") | |
| # Get computed style | |
| color = await element.evaluate("() => getComputedStyle(this).color") | |
| # Async operations | |
| result = await element.evaluate("async () => { await new Promise(r => setTimeout(r, 100)); return this.id; }") | |
| """ | |
| # Get remote object ID for this element | |
| object_id = await self._get_remote_object_id() | |
| if not object_id: | |
| raise RuntimeError('Element has no remote object ID (element may be detached from DOM)') | |
| # Validate arrow function format (allow async prefix) | |
| page_function = page_function.strip() | |
| # Check for arrow function with optional async prefix | |
| if not ('=>' in page_function and (page_function.startswith('(') or page_function.startswith('async'))): | |
| raise ValueError( | |
| f'JavaScript code must start with (...args) => or async (...args) => format. Got: {page_function[:50]}...' | |
| ) | |
| # Convert arrow function to function declaration for CallFunctionOn | |
| # CallFunctionOn expects 'function(...args) { ... }' format, not arrow functions | |
| # We need to convert: '() => expression' to 'function() { return expression; }' | |
| # or: '(x, y) => { statements }' to 'function(x, y) { statements }' | |
| # Extract parameters and body from arrow function | |
| import re | |
| # Check if it's an async arrow function | |
| is_async = page_function.strip().startswith('async') | |
| async_prefix = 'async ' if is_async else '' | |
| # Match: (params) => body or async (params) => body | |
| # Strip 'async' prefix if present for parsing | |
| func_to_parse = page_function.strip() | |
| if is_async: | |
| func_to_parse = func_to_parse[5:].strip() # Remove 'async' prefix | |
| arrow_match = re.match(r'\s*\(([^)]*)\)\s*=>\s*(.+)', func_to_parse, re.DOTALL) | |
| if not arrow_match: | |
| raise ValueError(f'Could not parse arrow function: {page_function[:50]}...') | |
| params_str = arrow_match.group(1).strip() # e.g., '', 'x', 'x, y' | |
| body = arrow_match.group(2).strip() | |
| # If body doesn't start with {, it's an expression that needs implicit return | |
| if not body.startswith('{'): | |
| function_declaration = f'{async_prefix}function({params_str}) {{ return {body}; }}' | |
| else: | |
| # Body already has braces, use as-is | |
| function_declaration = f'{async_prefix}function({params_str}) {body}' | |
| # Build CallArgument list for args if provided | |
| call_arguments = [] | |
| if args: | |
| from cdp_use.cdp.runtime.types import CallArgument | |
| for arg in args: | |
| # Convert Python values to CallArgument format | |
| call_arguments.append(CallArgument(value=arg)) | |
| # Prepare CallFunctionOn parameters | |
| params: 'CallFunctionOnParameters' = { | |
| 'functionDeclaration': function_declaration, | |
| 'objectId': object_id, | |
| 'returnByValue': True, | |
| 'awaitPromise': True, | |
| } | |
| if call_arguments: | |
| params['arguments'] = call_arguments | |
| # Execute the function on the element | |
| result = await self._client.send.Runtime.callFunctionOn( | |
| params, | |
| session_id=self._session_id, | |
| ) | |
| # Handle exceptions | |
| if 'exceptionDetails' in result: | |
| raise RuntimeError(f'JavaScript evaluation failed: {result["exceptionDetails"]}') | |
| # Extract and return value | |
| value = result.get('result', {}).get('value') | |
| # Return string representation (matching Page.evaluate behavior) | |
| if value is None: | |
| return '' | |
| elif isinstance(value, str): | |
| return value | |
| else: | |
| # Convert objects, numbers, booleans to string | |
| import json | |
| try: | |
| return json.dumps(value) if isinstance(value, (dict, list)) else str(value) | |
| except (TypeError, ValueError): | |
| return str(value) | |
| # Helpers for modifiers etc | |
| def _get_char_modifiers_and_vk(self, char: str) -> tuple[int, int, str]: | |
| """Get modifiers, virtual key code, and base key for a character. | |
| Returns: | |
| (modifiers, windowsVirtualKeyCode, base_key) | |
| """ | |
| # Characters that require Shift modifier | |
| shift_chars = { | |
| '!': ('1', 49), | |
| '@': ('2', 50), | |
| '#': ('3', 51), | |
| '$': ('4', 52), | |
| '%': ('5', 53), | |
| '^': ('6', 54), | |
| '&': ('7', 55), | |
| '*': ('8', 56), | |
| '(': ('9', 57), | |
| ')': ('0', 48), | |
| '_': ('-', 189), | |
| '+': ('=', 187), | |
| '{': ('[', 219), | |
| '}': (']', 221), | |
| '|': ('\\', 220), | |
| ':': (';', 186), | |
| '"': ("'", 222), | |
| '<': (',', 188), | |
| '>': ('.', 190), | |
| '?': ('/', 191), | |
| '~': ('`', 192), | |
| } | |
| # Check if character requires Shift | |
| if char in shift_chars: | |
| base_key, vk_code = shift_chars[char] | |
| return (8, vk_code, base_key) # Shift=8 | |
| # Uppercase letters require Shift | |
| if char.isupper(): | |
| return (8, ord(char), char.lower()) # Shift=8 | |
| # Lowercase letters | |
| if char.islower(): | |
| return (0, ord(char.upper()), char) | |
| # Numbers | |
| if char.isdigit(): | |
| return (0, ord(char), char) | |
| # Special characters without Shift | |
| no_shift_chars = { | |
| ' ': 32, | |
| '-': 189, | |
| '=': 187, | |
| '[': 219, | |
| ']': 221, | |
| '\\': 220, | |
| ';': 186, | |
| "'": 222, | |
| ',': 188, | |
| '.': 190, | |
| '/': 191, | |
| '`': 192, | |
| } | |
| if char in no_shift_chars: | |
| return (0, no_shift_chars[char], char) | |
| # Fallback | |
| return (0, ord(char.upper()) if char.isalpha() else ord(char), char) | |
| def _get_key_code_for_char(self, char: str) -> str: | |
| """Get the proper key code for a character (like Playwright does).""" | |
| # Key code mapping for common characters (using proper base keys + modifiers) | |
| key_codes = { | |
| ' ': 'Space', | |
| '.': 'Period', | |
| ',': 'Comma', | |
| '-': 'Minus', | |
| '_': 'Minus', # Underscore uses Minus with Shift | |
| '@': 'Digit2', # @ uses Digit2 with Shift | |
| '!': 'Digit1', # ! uses Digit1 with Shift (not 'Exclamation') | |
| '?': 'Slash', # ? uses Slash with Shift | |
| ':': 'Semicolon', # : uses Semicolon with Shift | |
| ';': 'Semicolon', | |
| '(': 'Digit9', # ( uses Digit9 with Shift | |
| ')': 'Digit0', # ) uses Digit0 with Shift | |
| '[': 'BracketLeft', | |
| ']': 'BracketRight', | |
| '{': 'BracketLeft', # { uses BracketLeft with Shift | |
| '}': 'BracketRight', # } uses BracketRight with Shift | |
| '/': 'Slash', | |
| '\\': 'Backslash', | |
| '=': 'Equal', | |
| '+': 'Equal', # + uses Equal with Shift | |
| '*': 'Digit8', # * uses Digit8 with Shift | |
| '&': 'Digit7', # & uses Digit7 with Shift | |
| '%': 'Digit5', # % uses Digit5 with Shift | |
| '$': 'Digit4', # $ uses Digit4 with Shift | |
| '#': 'Digit3', # # uses Digit3 with Shift | |
| '^': 'Digit6', # ^ uses Digit6 with Shift | |
| '~': 'Backquote', # ~ uses Backquote with Shift | |
| '`': 'Backquote', | |
| '"': 'Quote', # " uses Quote with Shift | |
| "'": 'Quote', | |
| '<': 'Comma', # < uses Comma with Shift | |
| '>': 'Period', # > uses Period with Shift | |
| '|': 'Backslash', # | uses Backslash with Shift | |
| } | |
| if char in key_codes: | |
| return key_codes[char] | |
| elif char.isalpha(): | |
| return f'Key{char.upper()}' | |
| elif char.isdigit(): | |
| return f'Digit{char}' | |
| else: | |
| # Fallback for unknown characters | |
| return f'Key{char.upper()}' if char.isascii() and char.isalpha() else 'Unidentified' | |
| async def _clear_text_field(self, object_id: str, cdp_client, session_id: str) -> bool: | |
| """Clear text field using multiple strategies, starting with the most reliable.""" | |
| try: | |
| # Strategy 1: Direct JavaScript value setting (most reliable for modern web apps) | |
| logger.debug('Clearing text field using JavaScript value setting') | |
| await cdp_client.send.Runtime.callFunctionOn( | |
| params={ | |
| 'functionDeclaration': """ | |
| function() { | |
| // Try to select all text first (only works on text-like inputs) | |
| // This handles cases where cursor is in the middle of text | |
| try { | |
| this.select(); | |
| } catch (e) { | |
| // Some input types (date, color, number, etc.) don't support select() | |
| // That's fine, we'll just clear the value directly | |
| } | |
| // Set value to empty | |
| this.value = ""; | |
| // Dispatch events to notify frameworks like React | |
| this.dispatchEvent(new Event("input", { bubbles: true })); | |
| this.dispatchEvent(new Event("change", { bubbles: true })); | |
| return this.value; | |
| } | |
| """, | |
| 'objectId': object_id, | |
| 'returnByValue': True, | |
| }, | |
| session_id=session_id, | |
| ) | |
| # Verify clearing worked by checking the value | |
| verify_result = await cdp_client.send.Runtime.callFunctionOn( | |
| params={ | |
| 'functionDeclaration': 'function() { return this.value; }', | |
| 'objectId': object_id, | |
| 'returnByValue': True, | |
| }, | |
| session_id=session_id, | |
| ) | |
| current_value = verify_result.get('result', {}).get('value', '') | |
| if not current_value: | |
| logger.debug('Text field cleared successfully using JavaScript') | |
| return True | |
| else: | |
| logger.debug(f'JavaScript clear partially failed, field still contains: "{current_value}"') | |
| except Exception as e: | |
| logger.debug(f'JavaScript clear failed: {e}') | |
| # Strategy 2: Triple-click + Delete (fallback for stubborn fields) | |
| try: | |
| logger.debug('Fallback: Clearing using triple-click + Delete') | |
| # Get element center coordinates for triple-click | |
| bounds_result = await cdp_client.send.Runtime.callFunctionOn( | |
| params={ | |
| 'functionDeclaration': 'function() { return this.getBoundingClientRect(); }', | |
| 'objectId': object_id, | |
| 'returnByValue': True, | |
| }, | |
| session_id=session_id, | |
| ) | |
| if bounds_result.get('result', {}).get('value'): | |
| bounds = bounds_result['result']['value'] # type: ignore # type: ignore | |
| center_x = bounds['x'] + bounds['width'] / 2 | |
| center_y = bounds['y'] + bounds['height'] / 2 | |
| # Triple-click to select all text | |
| await cdp_client.send.Input.dispatchMouseEvent( | |
| params={ | |
| 'type': 'mousePressed', | |
| 'x': center_x, | |
| 'y': center_y, | |
| 'button': 'left', | |
| 'clickCount': 3, | |
| }, | |
| session_id=session_id, | |
| ) | |
| await cdp_client.send.Input.dispatchMouseEvent( | |
| params={ | |
| 'type': 'mouseReleased', | |
| 'x': center_x, | |
| 'y': center_y, | |
| 'button': 'left', | |
| 'clickCount': 3, | |
| }, | |
| session_id=session_id, | |
| ) | |
| # Delete selected text | |
| await cdp_client.send.Input.dispatchKeyEvent( | |
| params={ | |
| 'type': 'keyDown', | |
| 'key': 'Delete', | |
| 'code': 'Delete', | |
| }, | |
| session_id=session_id, | |
| ) | |
| await cdp_client.send.Input.dispatchKeyEvent( | |
| params={ | |
| 'type': 'keyUp', | |
| 'key': 'Delete', | |
| 'code': 'Delete', | |
| }, | |
| session_id=session_id, | |
| ) | |
| logger.debug('Text field cleared using triple-click + Delete') | |
| return True | |
| except Exception as e: | |
| logger.debug(f'Triple-click clear failed: {e}') | |
| # If all strategies failed | |
| logger.warning('All text clearing strategies failed') | |
| return False | |
| async def _focus_element_simple( | |
| self, backend_node_id: int, object_id: str, cdp_client, session_id: str, input_coordinates=None | |
| ) -> bool: | |
| """Focus element using multiple strategies with robust fallbacks.""" | |
| try: | |
| # Strategy 1: CDP focus (most reliable) | |
| logger.debug('Focusing element using CDP focus') | |
| await cdp_client.send.DOM.focus(params={'backendNodeId': backend_node_id}, session_id=session_id) | |
| logger.debug('Element focused successfully using CDP focus') | |
| return True | |
| except Exception as e: | |
| logger.debug(f'CDP focus failed: {e}, trying JavaScript focus') | |
| try: | |
| # Strategy 2: JavaScript focus (fallback) | |
| logger.debug('Focusing element using JavaScript focus') | |
| await cdp_client.send.Runtime.callFunctionOn( | |
| params={ | |
| 'functionDeclaration': 'function() { this.focus(); }', | |
| 'objectId': object_id, | |
| }, | |
| session_id=session_id, | |
| ) | |
| logger.debug('Element focused successfully using JavaScript') | |
| return True | |
| except Exception as e: | |
| logger.debug(f'JavaScript focus failed: {e}, trying click focus') | |
| try: | |
| # Strategy 3: Click to focus (last resort) | |
| if input_coordinates: | |
| logger.debug(f'Focusing element by clicking at coordinates: {input_coordinates}') | |
| center_x = input_coordinates['input_x'] | |
| center_y = input_coordinates['input_y'] | |
| # Click on the element to focus it | |
| await cdp_client.send.Input.dispatchMouseEvent( | |
| params={ | |
| 'type': 'mousePressed', | |
| 'x': center_x, | |
| 'y': center_y, | |
| 'button': 'left', | |
| 'clickCount': 1, | |
| }, | |
| session_id=session_id, | |
| ) | |
| await cdp_client.send.Input.dispatchMouseEvent( | |
| params={ | |
| 'type': 'mouseReleased', | |
| 'x': center_x, | |
| 'y': center_y, | |
| 'button': 'left', | |
| 'clickCount': 1, | |
| }, | |
| session_id=session_id, | |
| ) | |
| logger.debug('Element focused using click') | |
| return True | |
| else: | |
| logger.debug('No coordinates available for click focus') | |
| except Exception as e: | |
| logger.warning(f'All focus strategies failed: {e}') | |
| return False | |
| async def get_basic_info(self) -> ElementInfo: | |
| """Get basic information about the element including coordinates and properties.""" | |
| try: | |
| # Get basic node information | |
| node_id = await self._get_node_id() | |
| describe_result = await self._client.send.DOM.describeNode({'nodeId': node_id}, session_id=self._session_id) | |
| node_info = describe_result['node'] | |
| # Get bounding box | |
| bounding_box = await self.get_bounding_box() | |
| # Get attributes as a proper dict | |
| attributes_list = node_info.get('attributes', []) | |
| attributes_dict: dict[str, str] = {} | |
| for i in range(0, len(attributes_list), 2): | |
| if i + 1 < len(attributes_list): | |
| attributes_dict[attributes_list[i]] = attributes_list[i + 1] | |
| return ElementInfo( | |
| backendNodeId=self._backend_node_id, | |
| nodeId=node_id, | |
| nodeName=node_info.get('nodeName', ''), | |
| nodeType=node_info.get('nodeType', 0), | |
| nodeValue=node_info.get('nodeValue'), | |
| attributes=attributes_dict, | |
| boundingBox=bounding_box, | |
| error=None, | |
| ) | |
| except Exception as e: | |
| return ElementInfo( | |
| backendNodeId=self._backend_node_id, | |
| nodeId=None, | |
| nodeName='', | |
| nodeType=0, | |
| nodeValue=None, | |
| attributes={}, | |
| boundingBox=None, | |
| error=str(e), | |
| ) | |