# This file has the code which we tried for multiple tries to get element screenshot working. # It is kept here for reference but not currently used. # class ElementScreenshotParams(BaseModel): # selector: str = Field(..., description="A CSS selector for the element to screenshot.") # @tools.action( # description="Take a screenshot of an element at the given CSS selector and return status", # param_model=ElementScreenshotParams, # ) # async def element_screenshot(params: ElementScreenshotParams, browser_session: BrowserSession) -> ActionResult: # # This is a dummy implementation # print('element_screenshot ---------------') # print(params) # return ActionResult(extracted_content=json.dumps({ # "success": True, # "selector": params.selector, # })) # @tools.action( # description=""" # A robust tool to capture screenshots of web elements. # Args: # params (ElementScreenshotParams): Parameters containing selectors, filename, highlight, padding, scroll_if_needed, fallback_to_full_page. # browser_session (BrowserSession): The active browser session. # Returns: # ActionResult: Contains extracted_content, long_term_memory, vision_content, or error. # """, # param_model=ElementScreenshotParams, # ) # async def element_screenshot(params: ElementScreenshotParams, browser_session: BrowserSession) -> ActionResult: # page = await browser_session.get_current_page() # try: # session_base = getattr(browser_session, 'file_system_path', None) # if session_base: # base_path = session_base # else: # base_path = os.getcwd() # parsed_url = urlparse(await page.get_url()) # website_name = parsed_url.netloc.replace('www.', '').replace('.', '_').replace(':', '_') # timestamp = int(time.time()) # screenshot_dir = os.path.join(base_path, "tempImgs", f"{website_name}-{timestamp}") # os.makedirs(screenshot_dir, exist_ok=True) # output_path = os.path.join(screenshot_dir, params.filename) # except Exception as e: # print(e) # output_path = os.path.join(os.path.abspath('.'), params.filename) # element = None # used_selector = None # error_messages = [] # for selector in params.selectors: # try: # element = await page.query_selector(selector) # if element: # used_selector = selector # break # except Exception as e: # error_messages.append(f"{selector}: {str(e)}") # if not element: # return ActionResult(error=f"Could not find any element using selectors: {params.selectors}. Errors: {'; '.join(error_messages)}") # try: # clip_obj = dict(element).get('clip') # if not clip_obj or clip_obj.get('x') is None: # return ActionResult(error=f"Could not get element bounds for selector '{used_selector}'") # try: # await page.screenshot( # path=output_path, # clip={ # 'x': clip_obj['x'] - params.padding, # 'y': clip_obj['y'] - params.padding, # 'width': clip_obj['width'] + (2 * params.padding), # 'height': clip_obj['height'] + (2 * params.padding) # } # ) # except Exception as e: # return ActionResult(error=f"Screenshot failed for selector '{used_selector}': {str(e)}") # success_msg = f"Element screenshot saved at: {output_path} (selector: '{used_selector}')" # return ActionResult( # extracted_content=success_msg, # include_in_memory=True, # long_term_memory=f"Element screenshot taken: {used_selector} -> {output_path}", # vision_content=[{"type": "image", "path": output_path}] # ) # except PlaywrightTimeoutError: # return ActionResult(error=f"Element screenshot failed: Timeout waiting for element '{used_selector}' to be visible or stable.") # except Exception as e: # return ActionResult(error=f"Element screenshot failed for selector '{used_selector}': {str(e)}") # @tools.action( # description=""" # Finds and returns a web page element based on a natural language description. # Captures a base64 screenshot of the element and returns it along with the backend_node_id. # Args: # params (FindElementByPromptParams): Parameters containing the query string. # browser_session (BrowserSession): The active browser session. # Returns: # ActionResult: Result containing element_base64, backend_node_id, and screenshot_path. # """, # param_model=FindElementByPromptParams, # ) # async def find_element_by_prompt(params: FindElementByPromptParams, browser_session: BrowserSession) -> ActionResult: # page = await browser_session.get_current_page() # try: # # CRITICAL: Refresh DOM tree BEFORE finding element # print('🔄 Loading initial DOM tree...') # await page.dom_service.get_dom_tree(target_id=page._target_id) # print('✅ Initial DOM tree loaded ---------') # print(params.query) # # Find element using prompt # element = await page.must_get_element_by_prompt(params.query, llm=get_model("browser_agent_openrouter:google/gemini-2.5-flash")) # print(f'✅ Element found: {element}') # # Refresh DOM tree again before taking screenshot # print('🔄 Refreshing DOM tree for screenshot...') # await page.dom_service.get_dom_tree(target_id=page._target_id) # print('✅ DOM tree refreshed') # backend_node_id = element._backend_node_id # # Create fresh element reference and ensure DOM is ready before screenshot # try: # print("🔄 Preparing for screenshot operation...") # print(f"Backend node ID: {element._backend_node_id}") # await page.dom_service.get_dom_tree(target_id=page._target_id) # await page._ensure_session() # Make sure session is active # fresh_element = await page.get_element(backend_node_id=element._backend_node_id) # print("✅ Got fresh element reference ",fresh_element) # # Ensure we have a valid session and document # session_id = await page._ensure_session() # # Get a fresh document first # print("🔄 Getting fresh document...") # doc_result = await page._client.send.DOM.getDocument( # session_id=session_id # ) # root_node = doc_result['root'] # print("✅ Got fresh document root") # # Now get our element's full node info # print("🔄 Describing node...") # node_result = await page._client.send.DOM.describeNode( # params={'backendNodeId': element._backend_node_id}, # session_id=session_id # ) # print("✅ Got node description") # # Take screenshot with fresh context # print("📸 Taking screenshot with fresh context...") # element_base64 = await fresh_element.screenshot() # print("✅ Got screenshot data, ", element_base64[:30], "...") # if not element_base64: # raise ValueError("Screenshot returned empty data") # print('✅ Screenshot captured successfully') # except Exception as screenshot_error: # print(f"⚠️ Screenshot error: {type(screenshot_error).__name__}: {screenshot_error}") # raise # # Get a fresh reference for basic info # # print('🔄 Refreshing DOM tree for basic info...') # # await page.dom_service.get_dom_tree(target_id=page._target_id) # # basic_info_element = await page.get_element(backend_node_id=element.backend_node_id) # # basic_info = await basic_info_element.get_basic_info() # # backend_node_id = basic_info.get('backendNodeId') # # print(f"✅ Element info: backend_node_id={backend_node_id}, nodeName={basic_info.get('nodeName')}") # # # Try to get bounding box with another fresh reference # # bbox = None # # try: # # print('🔄 Refreshing DOM tree for bounding box...') # # await page.dom_service.get_dom_tree(target_id=page._target_id) # # bbox_element = await page.get_element(backend_node_id=backend_node_id) # # bbox = await bbox_element.get_bounding_box() # # print(f"✅ Element bounding box: {bbox}") # # except Exception as bbox_error: # # print(f"⚠️ Could not get bounding box: {bbox_error}") # # # Bbox is optional, we can continue without it # # Save the base64 screenshot to tempImgs folder # try: # session_base = getattr(browser_session, 'file_system_path', None) # base_path = session_base if session_base else os.getcwd() # screenshot_dir = os.path.join(base_path, "tempImgs", "finding") # os.makedirs(screenshot_dir, exist_ok=True) # # Save base64 to file # screenshot_filename = f"element_{backend_node_id}.png" # screenshot_path = os.path.join(screenshot_dir, screenshot_filename) # # Decode base64 and save as PNG file # try: # import base64 # screenshot_data = base64.b64decode(element_base64) # with open(screenshot_path, 'wb') as f: # f.write(screenshot_data) # print(f"Screenshot saved to: {screenshot_path}") # except Exception as save_error: # print(f"Error saving screenshot: {save_error}") # screenshot_path = None # except Exception as e: # screenshot_path = None # result_data = { # "element_base64": element_base64, # "backend_node_id": backend_node_id, # "screenshot_path": screenshot_path, # } # # Wrap the dictionary in an ActionResult # return ActionResult(extracted_content=json.dumps(result_data)) # except Exception as e: # error_data = { # "element_base64": None, # "backend_node_id": None, # "screenshot_path": None, # "reason": f"llm_error: {e}" # } # # Also wrap errors in an ActionResult # return ActionResult(error=json.dumps(error_data)) # @tools.action( # description=""" # Adds or removes a visual highlight (red border) around a specified element. # Args: # params (HighlightElementParams): Parameters containing selector and remove flag. # browser_session (BrowserSession): The active browser session. # Returns: # dict: Result containing ok, selector, error. # """, # param_model=HighlightElementParams, # ) # async def highlight_element(params: HighlightElementParams, browser_session: BrowserSession) -> dict: # page = await browser_session.get_current_page() # selector = params.selector # remove = params.remove # try: # element = await page.query_selector(selector) # return {"ok": True, "selector": selector} # except Exception as e: # return {"ok": False, "selector": selector, "error": str(e)} # @tools.action( # description=""" # Gets the position and dimensions of an element on the page. # Args: # params (GetBoundingBoxParams): Parameters containing selector. # browser_session (BrowserSession): The active browser session. # Returns: # dict: Element's bounding box containing x, y, width, height, error. # """, # param_model=GetBoundingBoxParams, # ) # async def get_bounding_box(params: GetBoundingBoxParams, browser_session: BrowserSession) -> dict: # page = await browser_session.get_current_page() # selector = params.selector # js = """ # (sel) => { # const el = document.querySelector(sel); # if (!el) return { x: null, y: null, width: null, height: null }; # const r = el.getBoundingClientRect(); # return { x: r.x, y: r.y, width: r.width, height: r.height }; # } # """ # try: # return await page.evaluate(js, selector) # except Exception as e: # return {"error": str(e)} # @tools.action( # description=""" # Takes a screenshot of a specific region of the webpage using provided coordinates. # Args: # params (ElementScreenshotClipParams): Parameters containing clip and filename. # browser_session (BrowserSession, optional): The active browser session. # Returns: # dict: Result containing ok, path, clip, error. # """, # param_model=ElementScreenshotClipParams, # ) # async def element_screenshot_clip(params: ElementScreenshotClipParams, browser_session: BrowserSession = None) -> dict: # if browser_session is None: # return {"ok": False, "error": "No browser session provided"} # page = await browser_session.get_current_page() # try: # # Set up the output path # session_base = getattr(browser_session, 'file_system_path', None) # base_path = session_base if session_base else os.getcwd() # # Create unique subdirectory for this session's screenshots # parsed_url = urlparse(await page.get_url()) # website_name = parsed_url.netloc.replace('www.', '').replace('.', '_').replace(':', '_') # timestamp = int(time.time()) # screenshot_dir = os.path.join(base_path, "tempImgs", f"{website_name}-{timestamp}") # os.makedirs(screenshot_dir, exist_ok=True) # output_path = os.path.join(screenshot_dir, params.filename) # # Take the screenshot # await page.screenshot(path=output_path, clip=clip) # return { # "ok": True, # "path": output_path, # "clip": clip # } # except Exception as e: # return {"ok": False, "error": str(e)} # @tools.action( # description=""" # Verifies that a screenshot corresponds to the element found by a natural language query. # Args: # params (VerifyElementVisualParams): Parameters containing query, screenshot_path, tolerance. # browser_session (BrowserSession): The active browser session. # Returns: # dict: Verification result containing verified, selector, bbox, screenshot, reason. # """, # param_model=VerifyElementVisualParams, # ) # async def verify_element_visual(params: VerifyElementVisualParams, browser_session: BrowserSession) -> dict: # found = await find_element_by_prompt(query, browser_session) # selector = found.get('selector') # if not selector: # return {"verified": False, "reason": f"Could not find element for query: {query}"} # bbox = await get_bounding_box(selector, browser_session) # if not bbox or bbox.get('width') is None: # return {"verified": False, "reason": "Could not get element bounds"} # try: # img = Image.open(screenshot_path) # w, h = img.size # except Exception as e: # return {"verified": False, "reason": f"Could not load screenshot: {e}"} # bw = int(round(bbox['width'])) # bh = int(round(bbox['height'])) # if abs(bw - w) <= tolerance and abs(bh - h) <= tolerance: # return { # "verified": True, # "selector": selector, # "bbox": bbox, # "screenshot": screenshot_path # } # else: # return { # "verified": False, # "reason": f"Size mismatch: element {bw}x{bh} vs screenshot {w}x{h}" # } # class FindByAccessibilityParams(BaseModel): # query: str = Field(..., description="Natural language description of the element to find (e.g., 'main navigation menu', 'login button').") # @tools.action( # description=""" # Finds elements using accessibility tree for more accurate semantic matching. # This approach mimics how screen readers interpret the page, making it more reliable # for finding semantically meaningful elements. # Args: # params (FindByAccessibilityParams): Parameters containing the search query # browser_session (BrowserSession): The active browser session # Returns: # ActionResult: Contains element details including role, name, and selector # """, # param_model=FindByAccessibilityParams # ) # async def find_by_accessibility(params: FindByAccessibilityParams, browser_session: BrowserSession) -> ActionResult: # page = await browser_session.get_current_page() # try: # # Get accessibility snapshot # print("🔄 Getting accessibility snapshot...") # accessibility_tree = await page.accessibility.snapshot() # if not accessibility_tree: # return ActionResult(error="Failed to get accessibility snapshot") # print("✅ Got accessibility snapshot") # # Use LLM to analyze the tree # print("🧠 Analyzing with LLM...") # llm = get_model("browser_agent_openrouter:google/gemini-2.5-flash") # prompt = f""" # Given this accessibility tree: # {json.dumps(accessibility_tree, indent=2)} # Find the element that best matches this description: "{params.query}" # Return a JSON object with these fields: # - role: The ARIA role of the element # - name: The accessible name # - selector: A CSS selector that uniquely identifies this element # - confidence: Number between 0-100 indicating match confidence # - reasoning: Brief explanation of why this element matches # Example: # {{ # "role": "button", # "name": "Sign up", # "selector": "#signup-button", # "confidence": 95, # "reasoning": "Exact match for button with 'Sign up' text" # }} # """ # analysis = await llm(prompt) # print("✅ LLM analysis complete") # try: # result = json.loads(analysis) # except json.JSONDecodeError: # return ActionResult(error="Failed to parse LLM response as JSON") # # Verify the element exists # print("🔍 Verifying element...") # element = None # if result.get("selector"): # try: # element = await page.query_selector(result["selector"]) # except Exception as e: # print(f"⚠️ Selector verification failed: {e}") # if not element and result.get("role") and result.get("name"): # try: # element = await page.get_by_role(result["role"], name=result["name"]).element_handle() # except Exception as e: # print(f"⚠️ Role/name verification failed: {e}") # if not element: # return ActionResult(error="Could not verify element existence") # print("✅ Element verified") # # Just return the element if found # return ActionResult( # extracted_content=json.dumps({ # "element": { # "role": result.get("role"), # "name": result.get("name"), # "selector": result.get("selector") # } # }), # include_in_memory=True # ) # except Exception as e: # return ActionResult( # error=f"Accessibility search failed: {str(e)}" # )