"""Element class for element operations.""" import asyncio from typing import TYPE_CHECKING, Literal, Union from cdp_use.client import logger from typing_extensions import TypedDict if TYPE_CHECKING: from cdp_use.cdp.dom.commands import ( DescribeNodeParameters, FocusParameters, GetAttributesParameters, GetBoxModelParameters, PushNodesByBackendIdsToFrontendParameters, RequestChildNodesParameters, ResolveNodeParameters, ) from cdp_use.cdp.input.commands import ( DispatchMouseEventParameters, ) from cdp_use.cdp.input.types import MouseButton from cdp_use.cdp.page.commands import CaptureScreenshotParameters from cdp_use.cdp.page.types import Viewport from cdp_use.cdp.runtime.commands import CallFunctionOnParameters from browser_use.browser.session import BrowserSession # Type definitions for element operations ModifierType = Literal['Alt', 'Control', 'Meta', 'Shift'] class Position(TypedDict): """2D position coordinates.""" x: float y: float class BoundingBox(TypedDict): """Element bounding box with position and dimensions.""" x: float y: float width: float height: float class ElementInfo(TypedDict): """Basic information about a DOM element.""" backendNodeId: int nodeId: int | None nodeName: str nodeType: int nodeValue: str | None attributes: dict[str, str] boundingBox: BoundingBox | None error: str | None class Element: """Element operations using BackendNodeId.""" def __init__( self, browser_session: 'BrowserSession', backend_node_id: int, session_id: str | None = None, ): self._browser_session = browser_session self._client = browser_session.cdp_client self._backend_node_id = backend_node_id self._session_id = session_id async def _get_node_id(self) -> int: """Get DOM node ID from backend node ID.""" params: 'PushNodesByBackendIdsToFrontendParameters' = {'backendNodeIds': [self._backend_node_id]} result = await self._client.send.DOM.pushNodesByBackendIdsToFrontend(params, session_id=self._session_id) return result['nodeIds'][0] async def _get_remote_object_id(self) -> str | None: """Get remote object ID for this element.""" node_id = await self._get_node_id() params: 'ResolveNodeParameters' = {'nodeId': node_id} result = await self._client.send.DOM.resolveNode(params, session_id=self._session_id) object_id = result['object'].get('objectId', None) if not object_id: return None return object_id async def click( self, button: 'MouseButton' = 'left', click_count: int = 1, modifiers: list[ModifierType] | None = None, ) -> None: """Click the element using the advanced watchdog implementation.""" try: # Get viewport dimensions for visibility checks layout_metrics = await self._client.send.Page.getLayoutMetrics(session_id=self._session_id) viewport_width = layout_metrics['layoutViewport']['clientWidth'] viewport_height = layout_metrics['layoutViewport']['clientHeight'] # Try multiple methods to get element geometry quads = [] # Method 1: Try DOM.getContentQuads first (best for inline elements and complex layouts) try: content_quads_result = await self._client.send.DOM.getContentQuads( params={'backendNodeId': self._backend_node_id}, session_id=self._session_id ) if 'quads' in content_quads_result and content_quads_result['quads']: quads = content_quads_result['quads'] except Exception: pass # Method 2: Fall back to DOM.getBoxModel if not quads: try: box_model = await self._client.send.DOM.getBoxModel( params={'backendNodeId': self._backend_node_id}, session_id=self._session_id ) if 'model' in box_model and 'content' in box_model['model']: content_quad = box_model['model']['content'] if len(content_quad) >= 8: # Convert box model format to quad format quads = [ [ content_quad[0], content_quad[1], # x1, y1 content_quad[2], content_quad[3], # x2, y2 content_quad[4], content_quad[5], # x3, y3 content_quad[6], content_quad[7], # x4, y4 ] ] except Exception: pass # Method 3: Fall back to JavaScript getBoundingClientRect if not quads: try: result = await self._client.send.DOM.resolveNode( params={'backendNodeId': self._backend_node_id}, session_id=self._session_id ) if 'object' in result and 'objectId' in result['object']: object_id = result['object']['objectId'] # Get bounding rect via JavaScript bounds_result = await self._client.send.Runtime.callFunctionOn( params={ 'functionDeclaration': """ function() { const rect = this.getBoundingClientRect(); return { x: rect.left, y: rect.top, width: rect.width, height: rect.height }; } """, 'objectId': object_id, 'returnByValue': True, }, session_id=self._session_id, ) if 'result' in bounds_result and 'value' in bounds_result['result']: rect = bounds_result['result']['value'] # Convert rect to quad format x, y, w, h = rect['x'], rect['y'], rect['width'], rect['height'] quads = [ [ x, y, # top-left x + w, y, # top-right x + w, y + h, # bottom-right x, y + h, # bottom-left ] ] except Exception: pass # If we still don't have quads, fall back to JS click if not quads: try: result = await self._client.send.DOM.resolveNode( params={'backendNodeId': self._backend_node_id}, session_id=self._session_id ) if 'object' not in result or 'objectId' not in result['object']: raise Exception('Failed to find DOM element based on backendNodeId, maybe page content changed?') object_id = result['object']['objectId'] await self._client.send.Runtime.callFunctionOn( params={ 'functionDeclaration': 'function() { this.click(); }', 'objectId': object_id, }, session_id=self._session_id, ) await asyncio.sleep(0.05) return except Exception as js_e: raise Exception(f'Failed to click element: {js_e}') # Find the largest visible quad within the viewport best_quad = None best_area = 0 for quad in quads: if len(quad) < 8: continue # Calculate quad bounds xs = [quad[i] for i in range(0, 8, 2)] ys = [quad[i] for i in range(1, 8, 2)] min_x, max_x = min(xs), max(xs) min_y, max_y = min(ys), max(ys) # Check if quad intersects with viewport if max_x < 0 or max_y < 0 or min_x > viewport_width or min_y > viewport_height: continue # Quad is completely outside viewport # Calculate visible area (intersection with viewport) visible_min_x = max(0, min_x) visible_max_x = min(viewport_width, max_x) visible_min_y = max(0, min_y) visible_max_y = min(viewport_height, max_y) visible_width = visible_max_x - visible_min_x visible_height = visible_max_y - visible_min_y visible_area = visible_width * visible_height if visible_area > best_area: best_area = visible_area best_quad = quad if not best_quad: # No visible quad found, use the first quad anyway best_quad = quads[0] # Calculate center point of the best quad center_x = sum(best_quad[i] for i in range(0, 8, 2)) / 4 center_y = sum(best_quad[i] for i in range(1, 8, 2)) / 4 # Ensure click point is within viewport bounds center_x = max(0, min(viewport_width - 1, center_x)) center_y = max(0, min(viewport_height - 1, center_y)) # Scroll element into view try: await self._client.send.DOM.scrollIntoViewIfNeeded( params={'backendNodeId': self._backend_node_id}, session_id=self._session_id ) await asyncio.sleep(0.05) # Wait for scroll to complete except Exception: pass # Calculate modifier bitmask for CDP modifier_value = 0 if modifiers: modifier_map = {'Alt': 1, 'Control': 2, 'Meta': 4, 'Shift': 8} for mod in modifiers: modifier_value |= modifier_map.get(mod, 0) # Perform the click using CDP try: # Move mouse to element await self._client.send.Input.dispatchMouseEvent( params={ 'type': 'mouseMoved', 'x': center_x, 'y': center_y, }, session_id=self._session_id, ) await asyncio.sleep(0.05) # Mouse down try: await asyncio.wait_for( self._client.send.Input.dispatchMouseEvent( params={ 'type': 'mousePressed', 'x': center_x, 'y': center_y, 'button': button, 'clickCount': click_count, 'modifiers': modifier_value, }, session_id=self._session_id, ), timeout=1.0, # 1 second timeout for mousePressed ) await asyncio.sleep(0.08) except TimeoutError: pass # Don't sleep if we timed out # Mouse up try: await asyncio.wait_for( self._client.send.Input.dispatchMouseEvent( params={ 'type': 'mouseReleased', 'x': center_x, 'y': center_y, 'button': button, 'clickCount': click_count, 'modifiers': modifier_value, }, session_id=self._session_id, ), timeout=3.0, # 3 second timeout for mouseReleased ) except TimeoutError: pass except Exception as e: # Fall back to JavaScript click via CDP try: result = await self._client.send.DOM.resolveNode( params={'backendNodeId': self._backend_node_id}, session_id=self._session_id ) if 'object' not in result or 'objectId' not in result['object']: raise Exception('Failed to find DOM element based on backendNodeId, maybe page content changed?') object_id = result['object']['objectId'] await self._client.send.Runtime.callFunctionOn( params={ 'functionDeclaration': 'function() { this.click(); }', 'objectId': object_id, }, session_id=self._session_id, ) await asyncio.sleep(0.1) return except Exception as js_e: raise Exception(f'Failed to click element: {e}') except Exception as e: # Extract key element info for error message raise RuntimeError(f'Failed to click element: {e}') async def fill(self, value: str, clear: bool = True) -> None: """Fill the input element using proper CDP methods with improved focus handling.""" try: # Use the existing CDP client and session cdp_client = self._client session_id = self._session_id backend_node_id = self._backend_node_id # Track coordinates for metadata input_coordinates = None # Scroll element into view try: await cdp_client.send.DOM.scrollIntoViewIfNeeded(params={'backendNodeId': backend_node_id}, session_id=session_id) await asyncio.sleep(0.01) except Exception as e: logger.warning(f'Failed to scroll element into view: {e}') # Get object ID for the element result = await cdp_client.send.DOM.resolveNode( params={'backendNodeId': backend_node_id}, session_id=session_id, ) if 'object' not in result or 'objectId' not in result['object']: raise RuntimeError('Failed to get object ID for element') object_id = result['object']['objectId'] # Get element coordinates for focus try: bounds_result = await cdp_client.send.Runtime.callFunctionOn( params={ 'functionDeclaration': 'function() { return this.getBoundingClientRect(); }', 'objectId': object_id, 'returnByValue': True, }, session_id=session_id, ) if bounds_result.get('result', {}).get('value'): bounds = bounds_result['result']['value'] # type: ignore center_x = bounds['x'] + bounds['width'] / 2 center_y = bounds['y'] + bounds['height'] / 2 input_coordinates = {'input_x': center_x, 'input_y': center_y} logger.debug(f'Using element coordinates: x={center_x:.1f}, y={center_y:.1f}') except Exception as e: logger.debug(f'Could not get element coordinates: {e}') # Ensure session_id is not None if session_id is None: raise RuntimeError('Session ID is required for fill operation') # Step 1: Focus the element focused_successfully = await self._focus_element_simple( backend_node_id=backend_node_id, object_id=object_id, cdp_client=cdp_client, session_id=session_id, input_coordinates=input_coordinates, ) # Step 2: Clear existing text if requested if clear: cleared_successfully = await self._clear_text_field( object_id=object_id, cdp_client=cdp_client, session_id=session_id ) if not cleared_successfully: logger.warning('Text field clearing failed, typing may append to existing text') # Step 3: Type the text character by character using proper human-like key events logger.debug(f'Typing text character by character: "{value}"') for i, char in enumerate(value): # Handle newline characters as Enter key if char == '\n': # Send proper Enter key sequence await cdp_client.send.Input.dispatchKeyEvent( params={ 'type': 'keyDown', 'key': 'Enter', 'code': 'Enter', 'windowsVirtualKeyCode': 13, }, session_id=session_id, ) # Small delay to emulate human typing speed await asyncio.sleep(0.001) # Send char event with carriage return await cdp_client.send.Input.dispatchKeyEvent( params={ 'type': 'char', 'text': '\r', 'key': 'Enter', }, session_id=session_id, ) # Send keyUp event await cdp_client.send.Input.dispatchKeyEvent( params={ 'type': 'keyUp', 'key': 'Enter', 'code': 'Enter', 'windowsVirtualKeyCode': 13, }, session_id=session_id, ) else: # Handle regular characters # Get proper modifiers, VK code, and base key for the character modifiers, vk_code, base_key = self._get_char_modifiers_and_vk(char) key_code = self._get_key_code_for_char(base_key) # Step 1: Send keyDown event (NO text parameter) await cdp_client.send.Input.dispatchKeyEvent( params={ 'type': 'keyDown', 'key': base_key, 'code': key_code, 'modifiers': modifiers, 'windowsVirtualKeyCode': vk_code, }, session_id=session_id, ) # Small delay to emulate human typing speed await asyncio.sleep(0.001) # Step 2: Send char event (WITH text parameter) - this is crucial for text input await cdp_client.send.Input.dispatchKeyEvent( params={ 'type': 'char', 'text': char, 'key': char, }, session_id=session_id, ) # Step 3: Send keyUp event (NO text parameter) await cdp_client.send.Input.dispatchKeyEvent( params={ 'type': 'keyUp', 'key': base_key, 'code': key_code, 'modifiers': modifiers, 'windowsVirtualKeyCode': vk_code, }, session_id=session_id, ) # Add 18ms delay between keystrokes await asyncio.sleep(0.018) except Exception as e: raise Exception(f'Failed to fill element: {str(e)}') async def hover(self) -> None: """Hover over the element.""" box = await self.get_bounding_box() if not box: raise RuntimeError('Element is not visible or has no bounding box') x = box['x'] + box['width'] / 2 y = box['y'] + box['height'] / 2 params: 'DispatchMouseEventParameters' = {'type': 'mouseMoved', 'x': x, 'y': y} await self._client.send.Input.dispatchMouseEvent(params, session_id=self._session_id) async def focus(self) -> None: """Focus the element.""" node_id = await self._get_node_id() params: 'FocusParameters' = {'nodeId': node_id} await self._client.send.DOM.focus(params, session_id=self._session_id) async def check(self) -> None: """Check or uncheck a checkbox/radio button.""" await self.click() async def select_option(self, values: str | list[str]) -> None: """Select option(s) in a select element.""" if isinstance(values, str): values = [values] # Focus the element first try: await self.focus() except Exception: logger.warning('Failed to focus element') # For select elements, we need to find option elements and click them # This is a simplified approach - in practice, you might need to handle # different select types (single vs multi-select) differently node_id = await self._get_node_id() # Request child nodes to get the options params: 'RequestChildNodesParameters' = {'nodeId': node_id, 'depth': 1} await self._client.send.DOM.requestChildNodes(params, session_id=self._session_id) # Get the updated node description with children describe_params: 'DescribeNodeParameters' = {'nodeId': node_id, 'depth': 1} describe_result = await self._client.send.DOM.describeNode(describe_params, session_id=self._session_id) select_node = describe_result['node'] # Find and select matching options for child in select_node.get('children', []): if child.get('nodeName', '').lower() == 'option': # Get option attributes attrs = child.get('attributes', []) option_attrs = {} for i in range(0, len(attrs), 2): if i + 1 < len(attrs): option_attrs[attrs[i]] = attrs[i + 1] option_value = option_attrs.get('value', '') option_text = child.get('nodeValue', '') # Check if this option should be selected should_select = option_value in values or option_text in values if should_select: # Click the option to select it option_node_id = child.get('nodeId') if option_node_id: # Get backend node ID for the option option_describe_params: 'DescribeNodeParameters' = {'nodeId': option_node_id} option_backend_result = await self._client.send.DOM.describeNode( option_describe_params, session_id=self._session_id ) option_backend_id = option_backend_result['node']['backendNodeId'] # Create an Element for the option and click it option_element = Element(self._browser_session, option_backend_id, self._session_id) await option_element.click() async def drag_to( self, target: Union['Element', Position], source_position: Position | None = None, target_position: Position | None = None, ) -> None: """Drag this element to another element or position.""" # Get source coordinates if source_position: source_x = source_position['x'] source_y = source_position['y'] else: source_box = await self.get_bounding_box() if not source_box: raise RuntimeError('Source element is not visible') source_x = source_box['x'] + source_box['width'] / 2 source_y = source_box['y'] + source_box['height'] / 2 # Get target coordinates if isinstance(target, dict) and 'x' in target and 'y' in target: target_x = target['x'] target_y = target['y'] else: if target_position: target_box = await target.get_bounding_box() if not target_box: raise RuntimeError('Target element is not visible') target_x = target_box['x'] + target_position['x'] target_y = target_box['y'] + target_position['y'] else: target_box = await target.get_bounding_box() if not target_box: raise RuntimeError('Target element is not visible') target_x = target_box['x'] + target_box['width'] / 2 target_y = target_box['y'] + target_box['height'] / 2 # Perform drag operation await self._client.send.Input.dispatchMouseEvent( {'type': 'mousePressed', 'x': source_x, 'y': source_y, 'button': 'left'}, session_id=self._session_id, ) await self._client.send.Input.dispatchMouseEvent( {'type': 'mouseMoved', 'x': target_x, 'y': target_y}, session_id=self._session_id, ) await self._client.send.Input.dispatchMouseEvent( {'type': 'mouseReleased', 'x': target_x, 'y': target_y, 'button': 'left'}, session_id=self._session_id, ) # Element properties and queries async def get_attribute(self, name: str) -> str | None: """Get an attribute value.""" node_id = await self._get_node_id() params: 'GetAttributesParameters' = {'nodeId': node_id} result = await self._client.send.DOM.getAttributes(params, session_id=self._session_id) attributes = result['attributes'] for i in range(0, len(attributes), 2): if attributes[i] == name: return attributes[i + 1] return None async def get_bounding_box(self) -> BoundingBox | None: """Get the bounding box of the element.""" try: node_id = await self._get_node_id() params: 'GetBoxModelParameters' = {'nodeId': node_id} result = await self._client.send.DOM.getBoxModel(params, session_id=self._session_id) if 'model' not in result: return None # Get content box (first 8 values are content quad: x1,y1,x2,y2,x3,y3,x4,y4) content = result['model']['content'] if len(content) < 8: return None # Calculate bounding box from quad x_coords = [content[i] for i in range(0, 8, 2)] y_coords = [content[i] for i in range(1, 8, 2)] x = min(x_coords) y = min(y_coords) width = max(x_coords) - x height = max(y_coords) - y return BoundingBox(x=x, y=y, width=width, height=height) except Exception: return None async def screenshot(self, format: str = 'jpeg', quality: int | None = None) -> str: """Take a screenshot of this element and return base64 encoded image. Args: format: Image format ('jpeg', 'png', 'webp') quality: Quality 0-100 for JPEG format Returns: Base64-encoded image data """ # Get element's bounding box box = await self.get_bounding_box() if not box: raise RuntimeError('Element is not visible or has no bounding box') # Create viewport clip for the element viewport: 'Viewport' = {'x': box['x'], 'y': box['y'], 'width': box['width'], 'height': box['height'], 'scale': 1.0} # Prepare screenshot parameters params: 'CaptureScreenshotParameters' = {'format': format, 'clip': viewport} if quality is not None and format.lower() == 'jpeg': params['quality'] = quality # Take screenshot result = await self._client.send.Page.captureScreenshot(params, session_id=self._session_id) return result['data'] async def evaluate(self, page_function: str, *args) -> str: """Execute JavaScript code in the context of this element. The JavaScript code executes with 'this' bound to the element, allowing direct access to element properties and methods. Args: page_function: JavaScript code that MUST start with (...args) => format *args: Arguments to pass to the function Returns: String representation of the JavaScript execution result. Objects and arrays are JSON-stringified. Example: # Get element's text content text = await element.evaluate("() => this.textContent") # Set style with argument await element.evaluate("(color) => this.style.color = color", "red") # Get computed style color = await element.evaluate("() => getComputedStyle(this).color") # Async operations result = await element.evaluate("async () => { await new Promise(r => setTimeout(r, 100)); return this.id; }") """ # Get remote object ID for this element object_id = await self._get_remote_object_id() if not object_id: raise RuntimeError('Element has no remote object ID (element may be detached from DOM)') # Validate arrow function format (allow async prefix) page_function = page_function.strip() # Check for arrow function with optional async prefix if not ('=>' in page_function and (page_function.startswith('(') or page_function.startswith('async'))): raise ValueError( f'JavaScript code must start with (...args) => or async (...args) => format. Got: {page_function[:50]}...' ) # Convert arrow function to function declaration for CallFunctionOn # CallFunctionOn expects 'function(...args) { ... }' format, not arrow functions # We need to convert: '() => expression' to 'function() { return expression; }' # or: '(x, y) => { statements }' to 'function(x, y) { statements }' # Extract parameters and body from arrow function import re # Check if it's an async arrow function is_async = page_function.strip().startswith('async') async_prefix = 'async ' if is_async else '' # Match: (params) => body or async (params) => body # Strip 'async' prefix if present for parsing func_to_parse = page_function.strip() if is_async: func_to_parse = func_to_parse[5:].strip() # Remove 'async' prefix arrow_match = re.match(r'\s*\(([^)]*)\)\s*=>\s*(.+)', func_to_parse, re.DOTALL) if not arrow_match: raise ValueError(f'Could not parse arrow function: {page_function[:50]}...') params_str = arrow_match.group(1).strip() # e.g., '', 'x', 'x, y' body = arrow_match.group(2).strip() # If body doesn't start with {, it's an expression that needs implicit return if not body.startswith('{'): function_declaration = f'{async_prefix}function({params_str}) {{ return {body}; }}' else: # Body already has braces, use as-is function_declaration = f'{async_prefix}function({params_str}) {body}' # Build CallArgument list for args if provided call_arguments = [] if args: from cdp_use.cdp.runtime.types import CallArgument for arg in args: # Convert Python values to CallArgument format call_arguments.append(CallArgument(value=arg)) # Prepare CallFunctionOn parameters params: 'CallFunctionOnParameters' = { 'functionDeclaration': function_declaration, 'objectId': object_id, 'returnByValue': True, 'awaitPromise': True, } if call_arguments: params['arguments'] = call_arguments # Execute the function on the element result = await self._client.send.Runtime.callFunctionOn( params, session_id=self._session_id, ) # Handle exceptions if 'exceptionDetails' in result: raise RuntimeError(f'JavaScript evaluation failed: {result["exceptionDetails"]}') # Extract and return value value = result.get('result', {}).get('value') # Return string representation (matching Page.evaluate behavior) if value is None: return '' elif isinstance(value, str): return value else: # Convert objects, numbers, booleans to string import json try: return json.dumps(value) if isinstance(value, (dict, list)) else str(value) except (TypeError, ValueError): return str(value) # Helpers for modifiers etc def _get_char_modifiers_and_vk(self, char: str) -> tuple[int, int, str]: """Get modifiers, virtual key code, and base key for a character. Returns: (modifiers, windowsVirtualKeyCode, base_key) """ # Characters that require Shift modifier shift_chars = { '!': ('1', 49), '@': ('2', 50), '#': ('3', 51), '$': ('4', 52), '%': ('5', 53), '^': ('6', 54), '&': ('7', 55), '*': ('8', 56), '(': ('9', 57), ')': ('0', 48), '_': ('-', 189), '+': ('=', 187), '{': ('[', 219), '}': (']', 221), '|': ('\\', 220), ':': (';', 186), '"': ("'", 222), '<': (',', 188), '>': ('.', 190), '?': ('/', 191), '~': ('`', 192), } # Check if character requires Shift if char in shift_chars: base_key, vk_code = shift_chars[char] return (8, vk_code, base_key) # Shift=8 # Uppercase letters require Shift if char.isupper(): return (8, ord(char), char.lower()) # Shift=8 # Lowercase letters if char.islower(): return (0, ord(char.upper()), char) # Numbers if char.isdigit(): return (0, ord(char), char) # Special characters without Shift no_shift_chars = { ' ': 32, '-': 189, '=': 187, '[': 219, ']': 221, '\\': 220, ';': 186, "'": 222, ',': 188, '.': 190, '/': 191, '`': 192, } if char in no_shift_chars: return (0, no_shift_chars[char], char) # Fallback return (0, ord(char.upper()) if char.isalpha() else ord(char), char) def _get_key_code_for_char(self, char: str) -> str: """Get the proper key code for a character (like Playwright does).""" # Key code mapping for common characters (using proper base keys + modifiers) key_codes = { ' ': 'Space', '.': 'Period', ',': 'Comma', '-': 'Minus', '_': 'Minus', # Underscore uses Minus with Shift '@': 'Digit2', # @ uses Digit2 with Shift '!': 'Digit1', # ! uses Digit1 with Shift (not 'Exclamation') '?': 'Slash', # ? uses Slash with Shift ':': 'Semicolon', # : uses Semicolon with Shift ';': 'Semicolon', '(': 'Digit9', # ( uses Digit9 with Shift ')': 'Digit0', # ) uses Digit0 with Shift '[': 'BracketLeft', ']': 'BracketRight', '{': 'BracketLeft', # { uses BracketLeft with Shift '}': 'BracketRight', # } uses BracketRight with Shift '/': 'Slash', '\\': 'Backslash', '=': 'Equal', '+': 'Equal', # + uses Equal with Shift '*': 'Digit8', # * uses Digit8 with Shift '&': 'Digit7', # & uses Digit7 with Shift '%': 'Digit5', # % uses Digit5 with Shift '$': 'Digit4', # $ uses Digit4 with Shift '#': 'Digit3', # # uses Digit3 with Shift '^': 'Digit6', # ^ uses Digit6 with Shift '~': 'Backquote', # ~ uses Backquote with Shift '`': 'Backquote', '"': 'Quote', # " uses Quote with Shift "'": 'Quote', '<': 'Comma', # < uses Comma with Shift '>': 'Period', # > uses Period with Shift '|': 'Backslash', # | uses Backslash with Shift } if char in key_codes: return key_codes[char] elif char.isalpha(): return f'Key{char.upper()}' elif char.isdigit(): return f'Digit{char}' else: # Fallback for unknown characters return f'Key{char.upper()}' if char.isascii() and char.isalpha() else 'Unidentified' async def _clear_text_field(self, object_id: str, cdp_client, session_id: str) -> bool: """Clear text field using multiple strategies, starting with the most reliable.""" try: # Strategy 1: Direct JavaScript value setting (most reliable for modern web apps) logger.debug('Clearing text field using JavaScript value setting') await cdp_client.send.Runtime.callFunctionOn( params={ 'functionDeclaration': """ function() { // Try to select all text first (only works on text-like inputs) // This handles cases where cursor is in the middle of text try { this.select(); } catch (e) { // Some input types (date, color, number, etc.) don't support select() // That's fine, we'll just clear the value directly } // Set value to empty this.value = ""; // Dispatch events to notify frameworks like React this.dispatchEvent(new Event("input", { bubbles: true })); this.dispatchEvent(new Event("change", { bubbles: true })); return this.value; } """, 'objectId': object_id, 'returnByValue': True, }, session_id=session_id, ) # Verify clearing worked by checking the value verify_result = await cdp_client.send.Runtime.callFunctionOn( params={ 'functionDeclaration': 'function() { return this.value; }', 'objectId': object_id, 'returnByValue': True, }, session_id=session_id, ) current_value = verify_result.get('result', {}).get('value', '') if not current_value: logger.debug('Text field cleared successfully using JavaScript') return True else: logger.debug(f'JavaScript clear partially failed, field still contains: "{current_value}"') except Exception as e: logger.debug(f'JavaScript clear failed: {e}') # Strategy 2: Triple-click + Delete (fallback for stubborn fields) try: logger.debug('Fallback: Clearing using triple-click + Delete') # Get element center coordinates for triple-click bounds_result = await cdp_client.send.Runtime.callFunctionOn( params={ 'functionDeclaration': 'function() { return this.getBoundingClientRect(); }', 'objectId': object_id, 'returnByValue': True, }, session_id=session_id, ) if bounds_result.get('result', {}).get('value'): bounds = bounds_result['result']['value'] # type: ignore # type: ignore center_x = bounds['x'] + bounds['width'] / 2 center_y = bounds['y'] + bounds['height'] / 2 # Triple-click to select all text await cdp_client.send.Input.dispatchMouseEvent( params={ 'type': 'mousePressed', 'x': center_x, 'y': center_y, 'button': 'left', 'clickCount': 3, }, session_id=session_id, ) await cdp_client.send.Input.dispatchMouseEvent( params={ 'type': 'mouseReleased', 'x': center_x, 'y': center_y, 'button': 'left', 'clickCount': 3, }, session_id=session_id, ) # Delete selected text await cdp_client.send.Input.dispatchKeyEvent( params={ 'type': 'keyDown', 'key': 'Delete', 'code': 'Delete', }, session_id=session_id, ) await cdp_client.send.Input.dispatchKeyEvent( params={ 'type': 'keyUp', 'key': 'Delete', 'code': 'Delete', }, session_id=session_id, ) logger.debug('Text field cleared using triple-click + Delete') return True except Exception as e: logger.debug(f'Triple-click clear failed: {e}') # If all strategies failed logger.warning('All text clearing strategies failed') return False async def _focus_element_simple( self, backend_node_id: int, object_id: str, cdp_client, session_id: str, input_coordinates=None ) -> bool: """Focus element using multiple strategies with robust fallbacks.""" try: # Strategy 1: CDP focus (most reliable) logger.debug('Focusing element using CDP focus') await cdp_client.send.DOM.focus(params={'backendNodeId': backend_node_id}, session_id=session_id) logger.debug('Element focused successfully using CDP focus') return True except Exception as e: logger.debug(f'CDP focus failed: {e}, trying JavaScript focus') try: # Strategy 2: JavaScript focus (fallback) logger.debug('Focusing element using JavaScript focus') await cdp_client.send.Runtime.callFunctionOn( params={ 'functionDeclaration': 'function() { this.focus(); }', 'objectId': object_id, }, session_id=session_id, ) logger.debug('Element focused successfully using JavaScript') return True except Exception as e: logger.debug(f'JavaScript focus failed: {e}, trying click focus') try: # Strategy 3: Click to focus (last resort) if input_coordinates: logger.debug(f'Focusing element by clicking at coordinates: {input_coordinates}') center_x = input_coordinates['input_x'] center_y = input_coordinates['input_y'] # Click on the element to focus it await cdp_client.send.Input.dispatchMouseEvent( params={ 'type': 'mousePressed', 'x': center_x, 'y': center_y, 'button': 'left', 'clickCount': 1, }, session_id=session_id, ) await cdp_client.send.Input.dispatchMouseEvent( params={ 'type': 'mouseReleased', 'x': center_x, 'y': center_y, 'button': 'left', 'clickCount': 1, }, session_id=session_id, ) logger.debug('Element focused using click') return True else: logger.debug('No coordinates available for click focus') except Exception as e: logger.warning(f'All focus strategies failed: {e}') return False async def get_basic_info(self) -> ElementInfo: """Get basic information about the element including coordinates and properties.""" try: # Get basic node information node_id = await self._get_node_id() describe_result = await self._client.send.DOM.describeNode({'nodeId': node_id}, session_id=self._session_id) node_info = describe_result['node'] # Get bounding box bounding_box = await self.get_bounding_box() # Get attributes as a proper dict attributes_list = node_info.get('attributes', []) attributes_dict: dict[str, str] = {} for i in range(0, len(attributes_list), 2): if i + 1 < len(attributes_list): attributes_dict[attributes_list[i]] = attributes_list[i + 1] return ElementInfo( backendNodeId=self._backend_node_id, nodeId=node_id, nodeName=node_info.get('nodeName', ''), nodeType=node_info.get('nodeType', 0), nodeValue=node_info.get('nodeValue'), attributes=attributes_dict, boundingBox=bounding_box, error=None, ) except Exception as e: return ElementInfo( backendNodeId=self._backend_node_id, nodeId=None, nodeName='', nodeType=0, nodeValue=None, attributes={}, boundingBox=None, error=str(e), )