Spaces:
No application file
No application file
| # This file has the code which we tried for multiple tries to get element screenshot working. | |
| # It is kept here for reference but not currently used. | |
| # class ElementScreenshotParams(BaseModel): | |
| # selector: str = Field(..., description="A CSS selector for the element to screenshot.") | |
| # @tools.action( | |
| # description="Take a screenshot of an element at the given CSS selector and return status", | |
| # param_model=ElementScreenshotParams, | |
| # ) | |
| # async def element_screenshot(params: ElementScreenshotParams, browser_session: BrowserSession) -> ActionResult: | |
| # # This is a dummy implementation | |
| # print('element_screenshot ---------------') | |
| # print(params) | |
| # return ActionResult(extracted_content=json.dumps({ | |
| # "success": True, | |
| # "selector": params.selector, | |
| # })) | |
| # @tools.action( | |
| # description=""" | |
| # A robust tool to capture screenshots of web elements. | |
| # Args: | |
| # params (ElementScreenshotParams): Parameters containing selectors, filename, highlight, padding, scroll_if_needed, fallback_to_full_page. | |
| # browser_session (BrowserSession): The active browser session. | |
| # Returns: | |
| # ActionResult: Contains extracted_content, long_term_memory, vision_content, or error. | |
| # """, | |
| # param_model=ElementScreenshotParams, | |
| # ) | |
| # async def element_screenshot(params: ElementScreenshotParams, browser_session: BrowserSession) -> ActionResult: | |
| # page = await browser_session.get_current_page() | |
| # try: | |
| # session_base = getattr(browser_session, 'file_system_path', None) | |
| # if session_base: | |
| # base_path = session_base | |
| # else: | |
| # base_path = os.getcwd() | |
| # parsed_url = urlparse(await page.get_url()) | |
| # website_name = parsed_url.netloc.replace('www.', '').replace('.', '_').replace(':', '_') | |
| # timestamp = int(time.time()) | |
| # screenshot_dir = os.path.join(base_path, "tempImgs", f"{website_name}-{timestamp}") | |
| # os.makedirs(screenshot_dir, exist_ok=True) | |
| # output_path = os.path.join(screenshot_dir, params.filename) | |
| # except Exception as e: | |
| # print(e) | |
| # output_path = os.path.join(os.path.abspath('.'), params.filename) | |
| # element = None | |
| # used_selector = None | |
| # error_messages = [] | |
| # for selector in params.selectors: | |
| # try: | |
| # element = await page.query_selector(selector) | |
| # if element: | |
| # used_selector = selector | |
| # break | |
| # except Exception as e: | |
| # error_messages.append(f"{selector}: {str(e)}") | |
| # if not element: | |
| # return ActionResult(error=f"Could not find any element using selectors: {params.selectors}. Errors: {'; '.join(error_messages)}") | |
| # try: | |
| # clip_obj = dict(element).get('clip') | |
| # if not clip_obj or clip_obj.get('x') is None: | |
| # return ActionResult(error=f"Could not get element bounds for selector '{used_selector}'") | |
| # try: | |
| # await page.screenshot( | |
| # path=output_path, | |
| # clip={ | |
| # 'x': clip_obj['x'] - params.padding, | |
| # 'y': clip_obj['y'] - params.padding, | |
| # 'width': clip_obj['width'] + (2 * params.padding), | |
| # 'height': clip_obj['height'] + (2 * params.padding) | |
| # } | |
| # ) | |
| # except Exception as e: | |
| # return ActionResult(error=f"Screenshot failed for selector '{used_selector}': {str(e)}") | |
| # success_msg = f"Element screenshot saved at: {output_path} (selector: '{used_selector}')" | |
| # return ActionResult( | |
| # extracted_content=success_msg, | |
| # include_in_memory=True, | |
| # long_term_memory=f"Element screenshot taken: {used_selector} -> {output_path}", | |
| # vision_content=[{"type": "image", "path": output_path}] | |
| # ) | |
| # except PlaywrightTimeoutError: | |
| # return ActionResult(error=f"Element screenshot failed: Timeout waiting for element '{used_selector}' to be visible or stable.") | |
| # except Exception as e: | |
| # return ActionResult(error=f"Element screenshot failed for selector '{used_selector}': {str(e)}") | |
| # @tools.action( | |
| # description=""" | |
| # Finds and returns a web page element based on a natural language description. | |
| # Captures a base64 screenshot of the element and returns it along with the backend_node_id. | |
| # Args: | |
| # params (FindElementByPromptParams): Parameters containing the query string. | |
| # browser_session (BrowserSession): The active browser session. | |
| # Returns: | |
| # ActionResult: Result containing element_base64, backend_node_id, and screenshot_path. | |
| # """, | |
| # param_model=FindElementByPromptParams, | |
| # ) | |
| # async def find_element_by_prompt(params: FindElementByPromptParams, browser_session: BrowserSession) -> ActionResult: | |
| # page = await browser_session.get_current_page() | |
| # try: | |
| # # CRITICAL: Refresh DOM tree BEFORE finding element | |
| # print('π Loading initial DOM tree...') | |
| # await page.dom_service.get_dom_tree(target_id=page._target_id) | |
| # print('β Initial DOM tree loaded ---------') | |
| # print(params.query) | |
| # # Find element using prompt | |
| # element = await page.must_get_element_by_prompt(params.query, llm=get_model("browser_agent_openrouter:google/gemini-2.5-flash")) | |
| # print(f'β Element found: {element}') | |
| # # Refresh DOM tree again before taking screenshot | |
| # print('π Refreshing DOM tree for screenshot...') | |
| # await page.dom_service.get_dom_tree(target_id=page._target_id) | |
| # print('β DOM tree refreshed') | |
| # backend_node_id = element._backend_node_id | |
| # # Create fresh element reference and ensure DOM is ready before screenshot | |
| # try: | |
| # print("π Preparing for screenshot operation...") | |
| # print(f"Backend node ID: {element._backend_node_id}") | |
| # await page.dom_service.get_dom_tree(target_id=page._target_id) | |
| # await page._ensure_session() # Make sure session is active | |
| # fresh_element = await page.get_element(backend_node_id=element._backend_node_id) | |
| # print("β Got fresh element reference ",fresh_element) | |
| # # Ensure we have a valid session and document | |
| # session_id = await page._ensure_session() | |
| # # Get a fresh document first | |
| # print("π Getting fresh document...") | |
| # doc_result = await page._client.send.DOM.getDocument( | |
| # session_id=session_id | |
| # ) | |
| # root_node = doc_result['root'] | |
| # print("β Got fresh document root") | |
| # # Now get our element's full node info | |
| # print("π Describing node...") | |
| # node_result = await page._client.send.DOM.describeNode( | |
| # params={'backendNodeId': element._backend_node_id}, | |
| # session_id=session_id | |
| # ) | |
| # print("β Got node description") | |
| # # Take screenshot with fresh context | |
| # print("πΈ Taking screenshot with fresh context...") | |
| # element_base64 = await fresh_element.screenshot() | |
| # print("β Got screenshot data, ", element_base64[:30], "...") | |
| # if not element_base64: | |
| # raise ValueError("Screenshot returned empty data") | |
| # print('β Screenshot captured successfully') | |
| # except Exception as screenshot_error: | |
| # print(f"β οΈ Screenshot error: {type(screenshot_error).__name__}: {screenshot_error}") | |
| # raise | |
| # # Get a fresh reference for basic info | |
| # # print('π Refreshing DOM tree for basic info...') | |
| # # await page.dom_service.get_dom_tree(target_id=page._target_id) | |
| # # basic_info_element = await page.get_element(backend_node_id=element.backend_node_id) | |
| # # basic_info = await basic_info_element.get_basic_info() | |
| # # backend_node_id = basic_info.get('backendNodeId') | |
| # # print(f"β Element info: backend_node_id={backend_node_id}, nodeName={basic_info.get('nodeName')}") | |
| # # # Try to get bounding box with another fresh reference | |
| # # bbox = None | |
| # # try: | |
| # # print('π Refreshing DOM tree for bounding box...') | |
| # # await page.dom_service.get_dom_tree(target_id=page._target_id) | |
| # # bbox_element = await page.get_element(backend_node_id=backend_node_id) | |
| # # bbox = await bbox_element.get_bounding_box() | |
| # # print(f"β Element bounding box: {bbox}") | |
| # # except Exception as bbox_error: | |
| # # print(f"β οΈ Could not get bounding box: {bbox_error}") | |
| # # # Bbox is optional, we can continue without it | |
| # # Save the base64 screenshot to tempImgs folder | |
| # try: | |
| # session_base = getattr(browser_session, 'file_system_path', None) | |
| # base_path = session_base if session_base else os.getcwd() | |
| # screenshot_dir = os.path.join(base_path, "tempImgs", "finding") | |
| # os.makedirs(screenshot_dir, exist_ok=True) | |
| # # Save base64 to file | |
| # screenshot_filename = f"element_{backend_node_id}.png" | |
| # screenshot_path = os.path.join(screenshot_dir, screenshot_filename) | |
| # # Decode base64 and save as PNG file | |
| # try: | |
| # import base64 | |
| # screenshot_data = base64.b64decode(element_base64) | |
| # with open(screenshot_path, 'wb') as f: | |
| # f.write(screenshot_data) | |
| # print(f"Screenshot saved to: {screenshot_path}") | |
| # except Exception as save_error: | |
| # print(f"Error saving screenshot: {save_error}") | |
| # screenshot_path = None | |
| # except Exception as e: | |
| # screenshot_path = None | |
| # result_data = { | |
| # "element_base64": element_base64, | |
| # "backend_node_id": backend_node_id, | |
| # "screenshot_path": screenshot_path, | |
| # } | |
| # # Wrap the dictionary in an ActionResult | |
| # return ActionResult(extracted_content=json.dumps(result_data)) | |
| # except Exception as e: | |
| # error_data = { | |
| # "element_base64": None, | |
| # "backend_node_id": None, | |
| # "screenshot_path": None, | |
| # "reason": f"llm_error: {e}" | |
| # } | |
| # # Also wrap errors in an ActionResult | |
| # return ActionResult(error=json.dumps(error_data)) | |
| # @tools.action( | |
| # description=""" | |
| # Adds or removes a visual highlight (red border) around a specified element. | |
| # Args: | |
| # params (HighlightElementParams): Parameters containing selector and remove flag. | |
| # browser_session (BrowserSession): The active browser session. | |
| # Returns: | |
| # dict: Result containing ok, selector, error. | |
| # """, | |
| # param_model=HighlightElementParams, | |
| # ) | |
| # async def highlight_element(params: HighlightElementParams, browser_session: BrowserSession) -> dict: | |
| # page = await browser_session.get_current_page() | |
| # selector = params.selector | |
| # remove = params.remove | |
| # try: | |
| # element = await page.query_selector(selector) | |
| # return {"ok": True, "selector": selector} | |
| # except Exception as e: | |
| # return {"ok": False, "selector": selector, "error": str(e)} | |
| # @tools.action( | |
| # description=""" | |
| # Gets the position and dimensions of an element on the page. | |
| # Args: | |
| # params (GetBoundingBoxParams): Parameters containing selector. | |
| # browser_session (BrowserSession): The active browser session. | |
| # Returns: | |
| # dict: Element's bounding box containing x, y, width, height, error. | |
| # """, | |
| # param_model=GetBoundingBoxParams, | |
| # ) | |
| # async def get_bounding_box(params: GetBoundingBoxParams, browser_session: BrowserSession) -> dict: | |
| # page = await browser_session.get_current_page() | |
| # selector = params.selector | |
| # js = """ | |
| # (sel) => { | |
| # const el = document.querySelector(sel); | |
| # if (!el) return { x: null, y: null, width: null, height: null }; | |
| # const r = el.getBoundingClientRect(); | |
| # return { x: r.x, y: r.y, width: r.width, height: r.height }; | |
| # } | |
| # """ | |
| # try: | |
| # return await page.evaluate(js, selector) | |
| # except Exception as e: | |
| # return {"error": str(e)} | |
| # @tools.action( | |
| # description=""" | |
| # Takes a screenshot of a specific region of the webpage using provided coordinates. | |
| # Args: | |
| # params (ElementScreenshotClipParams): Parameters containing clip and filename. | |
| # browser_session (BrowserSession, optional): The active browser session. | |
| # Returns: | |
| # dict: Result containing ok, path, clip, error. | |
| # """, | |
| # param_model=ElementScreenshotClipParams, | |
| # ) | |
| # async def element_screenshot_clip(params: ElementScreenshotClipParams, browser_session: BrowserSession = None) -> dict: | |
| # if browser_session is None: | |
| # return {"ok": False, "error": "No browser session provided"} | |
| # page = await browser_session.get_current_page() | |
| # try: | |
| # # Set up the output path | |
| # session_base = getattr(browser_session, 'file_system_path', None) | |
| # base_path = session_base if session_base else os.getcwd() | |
| # # Create unique subdirectory for this session's screenshots | |
| # parsed_url = urlparse(await page.get_url()) | |
| # website_name = parsed_url.netloc.replace('www.', '').replace('.', '_').replace(':', '_') | |
| # timestamp = int(time.time()) | |
| # screenshot_dir = os.path.join(base_path, "tempImgs", f"{website_name}-{timestamp}") | |
| # os.makedirs(screenshot_dir, exist_ok=True) | |
| # output_path = os.path.join(screenshot_dir, params.filename) | |
| # # Take the screenshot | |
| # await page.screenshot(path=output_path, clip=clip) | |
| # return { | |
| # "ok": True, | |
| # "path": output_path, | |
| # "clip": clip | |
| # } | |
| # except Exception as e: | |
| # return {"ok": False, "error": str(e)} | |
| # @tools.action( | |
| # description=""" | |
| # Verifies that a screenshot corresponds to the element found by a natural language query. | |
| # Args: | |
| # params (VerifyElementVisualParams): Parameters containing query, screenshot_path, tolerance. | |
| # browser_session (BrowserSession): The active browser session. | |
| # Returns: | |
| # dict: Verification result containing verified, selector, bbox, screenshot, reason. | |
| # """, | |
| # param_model=VerifyElementVisualParams, | |
| # ) | |
| # async def verify_element_visual(params: VerifyElementVisualParams, browser_session: BrowserSession) -> dict: | |
| # found = await find_element_by_prompt(query, browser_session) | |
| # selector = found.get('selector') | |
| # if not selector: | |
| # return {"verified": False, "reason": f"Could not find element for query: {query}"} | |
| # bbox = await get_bounding_box(selector, browser_session) | |
| # if not bbox or bbox.get('width') is None: | |
| # return {"verified": False, "reason": "Could not get element bounds"} | |
| # try: | |
| # img = Image.open(screenshot_path) | |
| # w, h = img.size | |
| # except Exception as e: | |
| # return {"verified": False, "reason": f"Could not load screenshot: {e}"} | |
| # bw = int(round(bbox['width'])) | |
| # bh = int(round(bbox['height'])) | |
| # if abs(bw - w) <= tolerance and abs(bh - h) <= tolerance: | |
| # return { | |
| # "verified": True, | |
| # "selector": selector, | |
| # "bbox": bbox, | |
| # "screenshot": screenshot_path | |
| # } | |
| # else: | |
| # return { | |
| # "verified": False, | |
| # "reason": f"Size mismatch: element {bw}x{bh} vs screenshot {w}x{h}" | |
| # } | |
| # class FindByAccessibilityParams(BaseModel): | |
| # query: str = Field(..., description="Natural language description of the element to find (e.g., 'main navigation menu', 'login button').") | |
| # @tools.action( | |
| # description=""" | |
| # Finds elements using accessibility tree for more accurate semantic matching. | |
| # This approach mimics how screen readers interpret the page, making it more reliable | |
| # for finding semantically meaningful elements. | |
| # Args: | |
| # params (FindByAccessibilityParams): Parameters containing the search query | |
| # browser_session (BrowserSession): The active browser session | |
| # Returns: | |
| # ActionResult: Contains element details including role, name, and selector | |
| # """, | |
| # param_model=FindByAccessibilityParams | |
| # ) | |
| # async def find_by_accessibility(params: FindByAccessibilityParams, browser_session: BrowserSession) -> ActionResult: | |
| # page = await browser_session.get_current_page() | |
| # try: | |
| # # Get accessibility snapshot | |
| # print("π Getting accessibility snapshot...") | |
| # accessibility_tree = await page.accessibility.snapshot() | |
| # if not accessibility_tree: | |
| # return ActionResult(error="Failed to get accessibility snapshot") | |
| # print("β Got accessibility snapshot") | |
| # # Use LLM to analyze the tree | |
| # print("π§ Analyzing with LLM...") | |
| # llm = get_model("browser_agent_openrouter:google/gemini-2.5-flash") | |
| # prompt = f""" | |
| # Given this accessibility tree: | |
| # {json.dumps(accessibility_tree, indent=2)} | |
| # Find the element that best matches this description: "{params.query}" | |
| # Return a JSON object with these fields: | |
| # - role: The ARIA role of the element | |
| # - name: The accessible name | |
| # - selector: A CSS selector that uniquely identifies this element | |
| # - confidence: Number between 0-100 indicating match confidence | |
| # - reasoning: Brief explanation of why this element matches | |
| # Example: | |
| # {{ | |
| # "role": "button", | |
| # "name": "Sign up", | |
| # "selector": "#signup-button", | |
| # "confidence": 95, | |
| # "reasoning": "Exact match for button with 'Sign up' text" | |
| # }} | |
| # """ | |
| # analysis = await llm(prompt) | |
| # print("β LLM analysis complete") | |
| # try: | |
| # result = json.loads(analysis) | |
| # except json.JSONDecodeError: | |
| # return ActionResult(error="Failed to parse LLM response as JSON") | |
| # # Verify the element exists | |
| # print("π Verifying element...") | |
| # element = None | |
| # if result.get("selector"): | |
| # try: | |
| # element = await page.query_selector(result["selector"]) | |
| # except Exception as e: | |
| # print(f"β οΈ Selector verification failed: {e}") | |
| # if not element and result.get("role") and result.get("name"): | |
| # try: | |
| # element = await page.get_by_role(result["role"], name=result["name"]).element_handle() | |
| # except Exception as e: | |
| # print(f"β οΈ Role/name verification failed: {e}") | |
| # if not element: | |
| # return ActionResult(error="Could not verify element existence") | |
| # print("β Element verified") | |
| # # Just return the element if found | |
| # return ActionResult( | |
| # extracted_content=json.dumps({ | |
| # "element": { | |
| # "role": result.get("role"), | |
| # "name": result.get("name"), | |
| # "selector": result.get("selector") | |
| # } | |
| # }), | |
| # include_in_memory=True | |
| # ) | |
| # except Exception as e: | |
| # return ActionResult( | |
| # error=f"Accessibility search failed: {str(e)}" | |
| # ) | |