smgp / src /agent_dir /multiple_tries.py
muhammadmaazuddin's picture
working on agent
4de915b
# This file has the code which we tried for multiple tries to get element screenshot working.
# It is kept here for reference but not currently used.
# class ElementScreenshotParams(BaseModel):
# selector: str = Field(..., description="A CSS selector for the element to screenshot.")
# @tools.action(
# description="Take a screenshot of an element at the given CSS selector and return status",
# param_model=ElementScreenshotParams,
# )
# async def element_screenshot(params: ElementScreenshotParams, browser_session: BrowserSession) -> ActionResult:
# # This is a dummy implementation
# print('element_screenshot ---------------')
# print(params)
# return ActionResult(extracted_content=json.dumps({
# "success": True,
# "selector": params.selector,
# }))
# @tools.action(
# description="""
# A robust tool to capture screenshots of web elements.
# Args:
# params (ElementScreenshotParams): Parameters containing selectors, filename, highlight, padding, scroll_if_needed, fallback_to_full_page.
# browser_session (BrowserSession): The active browser session.
# Returns:
# ActionResult: Contains extracted_content, long_term_memory, vision_content, or error.
# """,
# param_model=ElementScreenshotParams,
# )
# async def element_screenshot(params: ElementScreenshotParams, browser_session: BrowserSession) -> ActionResult:
# page = await browser_session.get_current_page()
# try:
# session_base = getattr(browser_session, 'file_system_path', None)
# if session_base:
# base_path = session_base
# else:
# base_path = os.getcwd()
# parsed_url = urlparse(await page.get_url())
# website_name = parsed_url.netloc.replace('www.', '').replace('.', '_').replace(':', '_')
# timestamp = int(time.time())
# screenshot_dir = os.path.join(base_path, "tempImgs", f"{website_name}-{timestamp}")
# os.makedirs(screenshot_dir, exist_ok=True)
# output_path = os.path.join(screenshot_dir, params.filename)
# except Exception as e:
# print(e)
# output_path = os.path.join(os.path.abspath('.'), params.filename)
# element = None
# used_selector = None
# error_messages = []
# for selector in params.selectors:
# try:
# element = await page.query_selector(selector)
# if element:
# used_selector = selector
# break
# except Exception as e:
# error_messages.append(f"{selector}: {str(e)}")
# if not element:
# return ActionResult(error=f"Could not find any element using selectors: {params.selectors}. Errors: {'; '.join(error_messages)}")
# try:
# clip_obj = dict(element).get('clip')
# if not clip_obj or clip_obj.get('x') is None:
# return ActionResult(error=f"Could not get element bounds for selector '{used_selector}'")
# try:
# await page.screenshot(
# path=output_path,
# clip={
# 'x': clip_obj['x'] - params.padding,
# 'y': clip_obj['y'] - params.padding,
# 'width': clip_obj['width'] + (2 * params.padding),
# 'height': clip_obj['height'] + (2 * params.padding)
# }
# )
# except Exception as e:
# return ActionResult(error=f"Screenshot failed for selector '{used_selector}': {str(e)}")
# success_msg = f"Element screenshot saved at: {output_path} (selector: '{used_selector}')"
# return ActionResult(
# extracted_content=success_msg,
# include_in_memory=True,
# long_term_memory=f"Element screenshot taken: {used_selector} -> {output_path}",
# vision_content=[{"type": "image", "path": output_path}]
# )
# except PlaywrightTimeoutError:
# return ActionResult(error=f"Element screenshot failed: Timeout waiting for element '{used_selector}' to be visible or stable.")
# except Exception as e:
# return ActionResult(error=f"Element screenshot failed for selector '{used_selector}': {str(e)}")
# @tools.action(
# description="""
# Finds and returns a web page element based on a natural language description.
# Captures a base64 screenshot of the element and returns it along with the backend_node_id.
# Args:
# params (FindElementByPromptParams): Parameters containing the query string.
# browser_session (BrowserSession): The active browser session.
# Returns:
# ActionResult: Result containing element_base64, backend_node_id, and screenshot_path.
# """,
# param_model=FindElementByPromptParams,
# )
# async def find_element_by_prompt(params: FindElementByPromptParams, browser_session: BrowserSession) -> ActionResult:
# page = await browser_session.get_current_page()
# try:
# # CRITICAL: Refresh DOM tree BEFORE finding element
# print('πŸ”„ Loading initial DOM tree...')
# await page.dom_service.get_dom_tree(target_id=page._target_id)
# print('βœ… Initial DOM tree loaded ---------')
# print(params.query)
# # Find element using prompt
# element = await page.must_get_element_by_prompt(params.query, llm=get_model("browser_agent_openrouter:google/gemini-2.5-flash"))
# print(f'βœ… Element found: {element}')
# # Refresh DOM tree again before taking screenshot
# print('πŸ”„ Refreshing DOM tree for screenshot...')
# await page.dom_service.get_dom_tree(target_id=page._target_id)
# print('βœ… DOM tree refreshed')
# backend_node_id = element._backend_node_id
# # Create fresh element reference and ensure DOM is ready before screenshot
# try:
# print("πŸ”„ Preparing for screenshot operation...")
# print(f"Backend node ID: {element._backend_node_id}")
# await page.dom_service.get_dom_tree(target_id=page._target_id)
# await page._ensure_session() # Make sure session is active
# fresh_element = await page.get_element(backend_node_id=element._backend_node_id)
# print("βœ… Got fresh element reference ",fresh_element)
# # Ensure we have a valid session and document
# session_id = await page._ensure_session()
# # Get a fresh document first
# print("πŸ”„ Getting fresh document...")
# doc_result = await page._client.send.DOM.getDocument(
# session_id=session_id
# )
# root_node = doc_result['root']
# print("βœ… Got fresh document root")
# # Now get our element's full node info
# print("πŸ”„ Describing node...")
# node_result = await page._client.send.DOM.describeNode(
# params={'backendNodeId': element._backend_node_id},
# session_id=session_id
# )
# print("βœ… Got node description")
# # Take screenshot with fresh context
# print("πŸ“Έ Taking screenshot with fresh context...")
# element_base64 = await fresh_element.screenshot()
# print("βœ… Got screenshot data, ", element_base64[:30], "...")
# if not element_base64:
# raise ValueError("Screenshot returned empty data")
# print('βœ… Screenshot captured successfully')
# except Exception as screenshot_error:
# print(f"⚠️ Screenshot error: {type(screenshot_error).__name__}: {screenshot_error}")
# raise
# # Get a fresh reference for basic info
# # print('πŸ”„ Refreshing DOM tree for basic info...')
# # await page.dom_service.get_dom_tree(target_id=page._target_id)
# # basic_info_element = await page.get_element(backend_node_id=element.backend_node_id)
# # basic_info = await basic_info_element.get_basic_info()
# # backend_node_id = basic_info.get('backendNodeId')
# # print(f"βœ… Element info: backend_node_id={backend_node_id}, nodeName={basic_info.get('nodeName')}")
# # # Try to get bounding box with another fresh reference
# # bbox = None
# # try:
# # print('πŸ”„ Refreshing DOM tree for bounding box...')
# # await page.dom_service.get_dom_tree(target_id=page._target_id)
# # bbox_element = await page.get_element(backend_node_id=backend_node_id)
# # bbox = await bbox_element.get_bounding_box()
# # print(f"βœ… Element bounding box: {bbox}")
# # except Exception as bbox_error:
# # print(f"⚠️ Could not get bounding box: {bbox_error}")
# # # Bbox is optional, we can continue without it
# # Save the base64 screenshot to tempImgs folder
# try:
# session_base = getattr(browser_session, 'file_system_path', None)
# base_path = session_base if session_base else os.getcwd()
# screenshot_dir = os.path.join(base_path, "tempImgs", "finding")
# os.makedirs(screenshot_dir, exist_ok=True)
# # Save base64 to file
# screenshot_filename = f"element_{backend_node_id}.png"
# screenshot_path = os.path.join(screenshot_dir, screenshot_filename)
# # Decode base64 and save as PNG file
# try:
# import base64
# screenshot_data = base64.b64decode(element_base64)
# with open(screenshot_path, 'wb') as f:
# f.write(screenshot_data)
# print(f"Screenshot saved to: {screenshot_path}")
# except Exception as save_error:
# print(f"Error saving screenshot: {save_error}")
# screenshot_path = None
# except Exception as e:
# screenshot_path = None
# result_data = {
# "element_base64": element_base64,
# "backend_node_id": backend_node_id,
# "screenshot_path": screenshot_path,
# }
# # Wrap the dictionary in an ActionResult
# return ActionResult(extracted_content=json.dumps(result_data))
# except Exception as e:
# error_data = {
# "element_base64": None,
# "backend_node_id": None,
# "screenshot_path": None,
# "reason": f"llm_error: {e}"
# }
# # Also wrap errors in an ActionResult
# return ActionResult(error=json.dumps(error_data))
# @tools.action(
# description="""
# Adds or removes a visual highlight (red border) around a specified element.
# Args:
# params (HighlightElementParams): Parameters containing selector and remove flag.
# browser_session (BrowserSession): The active browser session.
# Returns:
# dict: Result containing ok, selector, error.
# """,
# param_model=HighlightElementParams,
# )
# async def highlight_element(params: HighlightElementParams, browser_session: BrowserSession) -> dict:
# page = await browser_session.get_current_page()
# selector = params.selector
# remove = params.remove
# try:
# element = await page.query_selector(selector)
# return {"ok": True, "selector": selector}
# except Exception as e:
# return {"ok": False, "selector": selector, "error": str(e)}
# @tools.action(
# description="""
# Gets the position and dimensions of an element on the page.
# Args:
# params (GetBoundingBoxParams): Parameters containing selector.
# browser_session (BrowserSession): The active browser session.
# Returns:
# dict: Element's bounding box containing x, y, width, height, error.
# """,
# param_model=GetBoundingBoxParams,
# )
# async def get_bounding_box(params: GetBoundingBoxParams, browser_session: BrowserSession) -> dict:
# page = await browser_session.get_current_page()
# selector = params.selector
# js = """
# (sel) => {
# const el = document.querySelector(sel);
# if (!el) return { x: null, y: null, width: null, height: null };
# const r = el.getBoundingClientRect();
# return { x: r.x, y: r.y, width: r.width, height: r.height };
# }
# """
# try:
# return await page.evaluate(js, selector)
# except Exception as e:
# return {"error": str(e)}
# @tools.action(
# description="""
# Takes a screenshot of a specific region of the webpage using provided coordinates.
# Args:
# params (ElementScreenshotClipParams): Parameters containing clip and filename.
# browser_session (BrowserSession, optional): The active browser session.
# Returns:
# dict: Result containing ok, path, clip, error.
# """,
# param_model=ElementScreenshotClipParams,
# )
# async def element_screenshot_clip(params: ElementScreenshotClipParams, browser_session: BrowserSession = None) -> dict:
# if browser_session is None:
# return {"ok": False, "error": "No browser session provided"}
# page = await browser_session.get_current_page()
# try:
# # Set up the output path
# session_base = getattr(browser_session, 'file_system_path', None)
# base_path = session_base if session_base else os.getcwd()
# # Create unique subdirectory for this session's screenshots
# parsed_url = urlparse(await page.get_url())
# website_name = parsed_url.netloc.replace('www.', '').replace('.', '_').replace(':', '_')
# timestamp = int(time.time())
# screenshot_dir = os.path.join(base_path, "tempImgs", f"{website_name}-{timestamp}")
# os.makedirs(screenshot_dir, exist_ok=True)
# output_path = os.path.join(screenshot_dir, params.filename)
# # Take the screenshot
# await page.screenshot(path=output_path, clip=clip)
# return {
# "ok": True,
# "path": output_path,
# "clip": clip
# }
# except Exception as e:
# return {"ok": False, "error": str(e)}
# @tools.action(
# description="""
# Verifies that a screenshot corresponds to the element found by a natural language query.
# Args:
# params (VerifyElementVisualParams): Parameters containing query, screenshot_path, tolerance.
# browser_session (BrowserSession): The active browser session.
# Returns:
# dict: Verification result containing verified, selector, bbox, screenshot, reason.
# """,
# param_model=VerifyElementVisualParams,
# )
# async def verify_element_visual(params: VerifyElementVisualParams, browser_session: BrowserSession) -> dict:
# found = await find_element_by_prompt(query, browser_session)
# selector = found.get('selector')
# if not selector:
# return {"verified": False, "reason": f"Could not find element for query: {query}"}
# bbox = await get_bounding_box(selector, browser_session)
# if not bbox or bbox.get('width') is None:
# return {"verified": False, "reason": "Could not get element bounds"}
# try:
# img = Image.open(screenshot_path)
# w, h = img.size
# except Exception as e:
# return {"verified": False, "reason": f"Could not load screenshot: {e}"}
# bw = int(round(bbox['width']))
# bh = int(round(bbox['height']))
# if abs(bw - w) <= tolerance and abs(bh - h) <= tolerance:
# return {
# "verified": True,
# "selector": selector,
# "bbox": bbox,
# "screenshot": screenshot_path
# }
# else:
# return {
# "verified": False,
# "reason": f"Size mismatch: element {bw}x{bh} vs screenshot {w}x{h}"
# }
# class FindByAccessibilityParams(BaseModel):
# query: str = Field(..., description="Natural language description of the element to find (e.g., 'main navigation menu', 'login button').")
# @tools.action(
# description="""
# Finds elements using accessibility tree for more accurate semantic matching.
# This approach mimics how screen readers interpret the page, making it more reliable
# for finding semantically meaningful elements.
# Args:
# params (FindByAccessibilityParams): Parameters containing the search query
# browser_session (BrowserSession): The active browser session
# Returns:
# ActionResult: Contains element details including role, name, and selector
# """,
# param_model=FindByAccessibilityParams
# )
# async def find_by_accessibility(params: FindByAccessibilityParams, browser_session: BrowserSession) -> ActionResult:
# page = await browser_session.get_current_page()
# try:
# # Get accessibility snapshot
# print("πŸ”„ Getting accessibility snapshot...")
# accessibility_tree = await page.accessibility.snapshot()
# if not accessibility_tree:
# return ActionResult(error="Failed to get accessibility snapshot")
# print("βœ… Got accessibility snapshot")
# # Use LLM to analyze the tree
# print("🧠 Analyzing with LLM...")
# llm = get_model("browser_agent_openrouter:google/gemini-2.5-flash")
# prompt = f"""
# Given this accessibility tree:
# {json.dumps(accessibility_tree, indent=2)}
# Find the element that best matches this description: "{params.query}"
# Return a JSON object with these fields:
# - role: The ARIA role of the element
# - name: The accessible name
# - selector: A CSS selector that uniquely identifies this element
# - confidence: Number between 0-100 indicating match confidence
# - reasoning: Brief explanation of why this element matches
# Example:
# {{
# "role": "button",
# "name": "Sign up",
# "selector": "#signup-button",
# "confidence": 95,
# "reasoning": "Exact match for button with 'Sign up' text"
# }}
# """
# analysis = await llm(prompt)
# print("βœ… LLM analysis complete")
# try:
# result = json.loads(analysis)
# except json.JSONDecodeError:
# return ActionResult(error="Failed to parse LLM response as JSON")
# # Verify the element exists
# print("πŸ” Verifying element...")
# element = None
# if result.get("selector"):
# try:
# element = await page.query_selector(result["selector"])
# except Exception as e:
# print(f"⚠️ Selector verification failed: {e}")
# if not element and result.get("role") and result.get("name"):
# try:
# element = await page.get_by_role(result["role"], name=result["name"]).element_handle()
# except Exception as e:
# print(f"⚠️ Role/name verification failed: {e}")
# if not element:
# return ActionResult(error="Could not verify element existence")
# print("βœ… Element verified")
# # Just return the element if found
# return ActionResult(
# extracted_content=json.dumps({
# "element": {
# "role": result.get("role"),
# "name": result.get("name"),
# "selector": result.get("selector")
# }
# }),
# include_in_memory=True
# )
# except Exception as e:
# return ActionResult(
# error=f"Accessibility search failed: {str(e)}"
# )