Spaces:
Sleeping
Sleeping
| # coding: utf-8 | |
| # Copyright (c) 2025 inclusionAI. | |
| import re | |
| import time | |
| import traceback | |
| from typing import Optional | |
| from examples.tools.browsers.util.dom import DOMElementNode | |
| from aworld.logs.util import logger | |
| from aworld.utils import import_package | |
| class DomUtil: | |
| def __init__(self): | |
| import_package("playwright") | |
| async def async_click_element(page, element_node: DOMElementNode, **kwargs) -> Optional[str]: | |
| from playwright.async_api import ElementHandle as AElementHandle, BrowserContext as ABrowserContext | |
| try: | |
| element_handle: AElementHandle = await DomUtil.async_get_locate_element(page, element_node) | |
| if element_handle is None: | |
| raise Exception(f'Element: {repr(element_node)} not found') | |
| bound = await element_handle.bounding_box() | |
| try: | |
| # todo: iframe. | |
| center_x = bound['x'] + bound['width'] / 2 | |
| center_y = bound['y'] + bound['height'] / 2 | |
| try: | |
| browser: ABrowserContext = kwargs.get('browser') | |
| async with browser.expect_page() as new_page_info: | |
| await page.mouse.click(center_x, center_y) | |
| await page.mouse.click(center_x, center_y) | |
| await page.wait_for_load_state() | |
| except: | |
| logger.warning(traceback.format_exc()) | |
| except: | |
| logger.info(f"click {element_handle}!!") | |
| if await element_handle.text_content(): | |
| browser: ABrowserContext = kwargs.get('browser') | |
| if browser: | |
| try: | |
| async with browser.expect_page() as new_page_info: | |
| await page.click(f"text={element_handle.text_content()}") | |
| page = await new_page_info.value | |
| await page.wait_for_load_state() | |
| except: | |
| logger.warning(traceback.format_exc()) | |
| else: | |
| await element_handle.click() | |
| await page.wait_for_load_state() | |
| else: | |
| await element_handle.click() | |
| await page.wait_for_load_state() | |
| except Exception as e: | |
| logger.error(traceback.format_exc()) | |
| raise Exception(f'Failed to click element: {repr(element_node)}. Error: {str(e)}') | |
| def click_element(page, element_node: DOMElementNode, **kwargs) -> Optional[str]: | |
| from playwright.sync_api import ElementHandle, BrowserContext | |
| try: | |
| element_handle: ElementHandle = DomUtil.get_locate_element(page, element_node) | |
| if element_handle is None: | |
| raise Exception(f'Element: {repr(element_node)} not found') | |
| bound = element_handle.bounding_box() | |
| try: | |
| # todo: iframe. | |
| center_x = bound['x'] + bound['width'] / 2 | |
| center_y = bound['y'] + bound['height'] / 2 | |
| try: | |
| browser: BrowserContext = kwargs.get('browser') | |
| with browser.expect_page() as new_page_info: | |
| page.mouse.click(center_x, center_y) | |
| page = new_page_info.value | |
| page.wait_for_load_state() | |
| except: | |
| logger.warning(traceback.format_exc()) | |
| except: | |
| logger.info(f"click {element_handle}!!") | |
| if element_handle.text_content(): | |
| browser: BrowserContext = kwargs.get('browser') | |
| if browser: | |
| try: | |
| with browser.expect_page() as new_page_info: | |
| page.click(f"text={element_handle.text_content()}") | |
| page = new_page_info.value | |
| page.wait_for_load_state() | |
| except: | |
| logger.warning(traceback.format_exc()) | |
| else: | |
| element_handle.click() | |
| page.wait_for_load_state() | |
| else: | |
| element_handle.click() | |
| page.wait_for_load_state() | |
| except Exception as e: | |
| logger.error(traceback.format_exc()) | |
| raise Exception(f'Failed to click element: {repr(element_node)}. Error: {str(e)}') | |
| async def async_get_locate_element(current_frame, element: DOMElementNode): | |
| # Start with the target element and collect all parents, return Optional[AElementHandle] | |
| from playwright.async_api import FrameLocator as AFrameLocator | |
| parents: list[DOMElementNode] = [] | |
| current = element | |
| while current.parent is not None: | |
| parent = current.parent | |
| parents.append(parent) | |
| current = parent | |
| # Reverse the parents list to process from top to bottom | |
| parents.reverse() | |
| # Process all iframe parents in sequence | |
| iframes = [item for item in parents if item.tag_name == 'iframe'] | |
| for parent in iframes: | |
| css_selector = DomUtil._enhanced_css_selector_for_element( | |
| parent, | |
| include_dynamic_attributes=True, | |
| ) | |
| current_frame = current_frame.frame_locator(css_selector) | |
| css_selector = DomUtil._enhanced_css_selector_for_element( | |
| element, include_dynamic_attributes=True | |
| ) | |
| try: | |
| if isinstance(current_frame, AFrameLocator): | |
| element_handle = await current_frame.locator(css_selector).element_handle() | |
| return element_handle | |
| else: | |
| # Try to scroll into view if hidden | |
| element_handle = await current_frame.query_selector(css_selector) | |
| if element_handle: | |
| await element_handle.scroll_into_view_if_needed() | |
| return element_handle | |
| return None | |
| except Exception as e: | |
| logger.error(f'Failed to locate element: {str(e)}') | |
| return None | |
| def get_locate_element(current_frame, element: DOMElementNode): | |
| # Start with the target element and collect all parents | |
| from playwright.sync_api import FrameLocator | |
| parents: list[DOMElementNode] = [] | |
| current = element | |
| while current.parent is not None: | |
| parent = current.parent | |
| parents.append(parent) | |
| current = parent | |
| # Reverse the parents list to process from top to bottom | |
| parents.reverse() | |
| # Process all iframe parents in sequence | |
| iframes = [item for item in parents if item.tag_name == 'iframe'] | |
| for parent in iframes: | |
| css_selector = DomUtil._enhanced_css_selector_for_element( | |
| parent, | |
| include_dynamic_attributes=True, | |
| ) | |
| current_frame = current_frame.frame_locator(css_selector) | |
| css_selector = DomUtil._enhanced_css_selector_for_element( | |
| element, include_dynamic_attributes=True | |
| ) | |
| try: | |
| if isinstance(current_frame, FrameLocator): | |
| element_handle = current_frame.locator(css_selector).element_handle() | |
| return element_handle | |
| else: | |
| # Try to scroll into view if hidden | |
| element_handle = current_frame.query_selector(css_selector) | |
| if element_handle: | |
| element_handle.scroll_into_view_if_needed() | |
| return element_handle | |
| return None | |
| except Exception as e: | |
| logger.error(f'Failed to locate element: {str(e)}') | |
| return None | |
| def wait_for_stable_network(page, **kwargs): | |
| pending_requests = set() | |
| last_activity = time.time() | |
| # Define relevant resource types and content types | |
| RELEVANT_RESOURCE_TYPES = { | |
| 'document', | |
| 'stylesheet', | |
| 'image', | |
| 'font', | |
| 'script', | |
| 'iframe', | |
| } | |
| RELEVANT_CONTENT_TYPES = { | |
| 'text/html', | |
| 'text/css', | |
| 'application/javascript', | |
| 'image/', | |
| 'font/', | |
| 'application/json', | |
| } | |
| # Additional patterns to filter out | |
| IGNORED_URL_PATTERNS = { | |
| # Analytics and tracking | |
| 'analytics', | |
| 'tracking', | |
| 'telemetry', | |
| 'beacon', | |
| 'metrics', | |
| # Ad-related | |
| 'doubleclick', | |
| 'adsystem', | |
| 'adserver', | |
| 'advertising', | |
| # Social media widgets | |
| 'facebook.com/plugins', | |
| 'platform.twitter', | |
| 'linkedin.com/embed', | |
| # Live chat and support | |
| 'livechat', | |
| 'zendesk', | |
| 'intercom', | |
| 'crisp.chat', | |
| 'hotjar', | |
| # Push notifications | |
| 'push-notifications', | |
| 'onesignal', | |
| 'pushwoosh', | |
| # Background sync/heartbeat | |
| 'heartbeat', | |
| 'ping', | |
| 'alive', | |
| # WebRTC and streaming | |
| 'webrtc', | |
| 'rtmp://', | |
| 'wss://', | |
| # Common CDNs for dynamic content | |
| 'cloudfront.net', | |
| 'fastly.net', | |
| } | |
| def on_request(request): | |
| # Filter by resource type | |
| if request.resource_type not in RELEVANT_RESOURCE_TYPES: | |
| return | |
| # Filter out streaming, websocket, and other real-time requests | |
| if request.resource_type in { | |
| 'websocket', | |
| 'media', | |
| 'eventsource', | |
| 'manifest', | |
| 'other', | |
| }: | |
| return | |
| # Filter out by URL patterns | |
| url = request.url.lower() | |
| if any(pattern in url for pattern in IGNORED_URL_PATTERNS): | |
| return | |
| # Filter out data URLs and blob URLs | |
| if url.startswith(('data:', 'blob:')): | |
| return | |
| # Filter out requests with certain headers | |
| headers = request.headers | |
| if headers.get('purpose') == 'prefetch' or headers.get('sec-fetch-dest') in [ | |
| 'video', | |
| 'audio', | |
| ]: | |
| return | |
| nonlocal last_activity | |
| pending_requests.add(request) | |
| last_activity = time.time() | |
| def on_response(response): | |
| request = response.request | |
| if request not in pending_requests: | |
| return | |
| # Filter by content type if available | |
| content_type = response.headers.get('content-type', '').lower() | |
| # Skip if content type indicates streaming or real-time data | |
| if any(t in content_type | |
| for t in [ | |
| 'streaming', | |
| 'video', | |
| 'audio', | |
| 'webm', | |
| 'mp4', | |
| 'event-stream', | |
| 'websocket', | |
| 'protobuf']): | |
| pending_requests.remove(request) | |
| return | |
| # Only process relevant content types | |
| if not any(ct in content_type for ct in RELEVANT_CONTENT_TYPES): | |
| pending_requests.remove(request) | |
| return | |
| # Skip if response is too large (likely not essential for page load) | |
| content_length = response.headers.get('content-length') | |
| if content_length and int(content_length) > 5 * 1024 * 1024: # 5MB | |
| pending_requests.remove(request) | |
| return | |
| nonlocal last_activity | |
| pending_requests.remove(request) | |
| last_activity = time.time() | |
| # Attach event listeners | |
| page.on('request', on_request) | |
| page.on('response', on_response) | |
| try: | |
| start_time = time.time() | |
| while True: | |
| time.sleep(0.1) | |
| now = time.time() | |
| if len(pending_requests) == 0 and (now - last_activity) >= kwargs.get('idle_wait_time', 0.5): | |
| break | |
| if now - start_time > kwargs.get('max_wait_time', 5): | |
| logger.debug( | |
| f'Network timeout after {kwargs.get("max_wait_time", 5)}s with {len(pending_requests)} ' | |
| f'pending requests: {[r.url for r in pending_requests]}' | |
| ) | |
| break | |
| finally: | |
| # Clean up event listeners | |
| page.remove_listener('request', on_request) | |
| page.remove_listener('response', on_response) | |
| logger.debug(f'Network stabilized for {kwargs.get("idle_wait_time", 0.5)} seconds') | |
| def _enhanced_css_selector_for_element(element: DOMElementNode, include_dynamic_attributes: bool = True) -> str: | |
| """Creates a CSS selector for a DOM element, handling various edge cases and special characters. | |
| Args: | |
| element: The DOM element to create a selector for | |
| Returns: | |
| A valid CSS selector string | |
| """ | |
| try: | |
| # Get base selector from XPath | |
| css_selector = DomUtil._convert_simple_xpath_to_css_selector(element.xpath) | |
| # Handle class attributes | |
| if 'class' in element.attributes and element.attributes['class'] and include_dynamic_attributes: | |
| # Define a regex pattern for valid class names in CSS | |
| valid_class_name_pattern = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_-]*$') | |
| # Iterate through the class attribute values | |
| classes = element.attributes['class'].split() | |
| for class_name in classes: | |
| # Skip empty class names | |
| if not class_name.strip(): | |
| continue | |
| # Check if the class name is valid | |
| if valid_class_name_pattern.match(class_name): | |
| # Append the valid class name to the CSS selector | |
| css_selector += f'.{class_name}' | |
| else: | |
| # Skip invalid class names | |
| continue | |
| # Expanded set of safe attributes that are stable and useful for selection | |
| SAFE_ATTRIBUTES = { | |
| # Data attributes (if they're stable in your application) | |
| 'id', | |
| # Standard HTML attributes | |
| 'name', | |
| 'type', | |
| 'placeholder', | |
| # Accessibility attributes | |
| 'aria-label', | |
| 'aria-labelledby', | |
| 'aria-describedby', | |
| 'role', | |
| # Common form attributes | |
| 'for', | |
| 'autocomplete', | |
| 'required', | |
| 'readonly', | |
| # Media attributes | |
| 'alt', | |
| 'title', | |
| 'src', | |
| # Custom stable attributes (add any application-specific ones) | |
| 'href', | |
| 'target', | |
| } | |
| if include_dynamic_attributes: | |
| dynamic_attributes = { | |
| 'data-id', | |
| 'data-qa', | |
| 'data-cy', | |
| 'data-testid', | |
| } | |
| SAFE_ATTRIBUTES.update(dynamic_attributes) | |
| # Handle other attributes | |
| for attribute, value in element.attributes.items(): | |
| if attribute == 'class': | |
| continue | |
| # Skip invalid attribute names | |
| if not attribute.strip(): | |
| continue | |
| if attribute not in SAFE_ATTRIBUTES: | |
| continue | |
| # Escape special characters in attribute names | |
| safe_attribute = attribute.replace(':', r'\:') | |
| # Handle different value cases | |
| if value == '': | |
| css_selector += f'[{safe_attribute}]' | |
| elif any(char in value for char in '"\'<>`\n\r\t'): | |
| # Use contains for values with special characters | |
| # Regex-substitute *any* whitespace with a single space, then strip. | |
| collapsed_value = re.sub(r'\s+', ' ', value).strip() | |
| # Escape embedded double-quotes. | |
| safe_value = collapsed_value.replace('"', '\\"') | |
| css_selector += f'[{safe_attribute}*="{safe_value}"]' | |
| else: | |
| css_selector += f'[{safe_attribute}="{value}"]' | |
| return css_selector | |
| except Exception: | |
| # Fallback to a more basic selector if something goes wrong | |
| tag_name = element.tag_name or '*' | |
| return f"{tag_name}[highlight_index='{element.highlight_index}']" | |
| def _convert_simple_xpath_to_css_selector(xpath: str) -> str: | |
| """Converts simple XPath expressions to CSS selectors.""" | |
| if not xpath: | |
| return '' | |
| # Remove leading slash if present | |
| xpath = xpath.lstrip('/') | |
| # Split into parts | |
| parts = xpath.split('/') | |
| css_parts = [] | |
| for part in parts: | |
| if not part: | |
| continue | |
| # Handle index notation [n] | |
| if '[' in part: | |
| base_part = part[: part.find('[')] | |
| index_part = part[part.find('['):] | |
| # Handle multiple indices | |
| indices = [i.strip('[]') for i in index_part.split(']')[:-1]] | |
| for idx in indices: | |
| try: | |
| # Handle numeric indices | |
| if idx.isdigit(): | |
| index = int(idx) - 1 | |
| base_part += f':nth-of-type({index + 1})' | |
| # Handle last() function | |
| elif idx == 'last()': | |
| base_part += ':last-of-type' | |
| # Handle position() functions | |
| elif 'position()' in idx: | |
| if '>1' in idx: | |
| base_part += ':nth-of-type(n+2)' | |
| except ValueError: | |
| continue | |
| css_parts.append(base_part) | |
| else: | |
| css_parts.append(part) | |
| base_selector = ' > '.join(css_parts) | |
| return base_selector | |