QuickAgent / BrowsingAgent /BrowsingAgent.py
varun324242's picture
Upload 58 files
0d3af20 verified
import json
import re
import logging
from agency_swarm.agents import Agent
from typing_extensions import override
import base64
from .tools.SearchAndScrape import SearchAndScrape
from selenium.webdriver.common.by import By
from selenium.webdriver.support.select import Select
from .tools.util import highlight_elements_with_labels, get_web_driver, set_web_driver
from agency_swarm.tools.oai import FileSearch
class BrowsingAgent(Agent):
SCREENSHOT_FILE_NAME = "screenshot.jpg"
def __init__(self, selenium_config=None, **kwargs):
from .tools.util.selenium import set_selenium_config
super().__init__(
name="BrowsingAgent",
description="This agent is designed to perform web searches and navigate web pages.",
instructions="""
I am a browsing agent that can:
1. Perform Google searches
2. Navigate web pages
3. Take screenshots
4. Highlight and interact with page elements
Use my search capabilities to find information and my navigation tools to explore web pages.
""",
files_folder="./files",
schemas_folder="./schemas",
tools=[SearchAndScrape],
tools_folder="./tools",
temperature=0,
max_prompt_tokens=16000,
model="groq/llama-3.3-70b-versatile",
**kwargs
)
if selenium_config is not None:
set_selenium_config(selenium_config)
self.prev_message = ""
@override
def response_validator(self, message):
from .tools.util.selenium import get_web_driver, set_web_driver
from .tools.util import highlight_elements_with_labels, remove_highlight_and_labels
from selenium.webdriver.common.by import By
from selenium.webdriver.support.select import Select
# Filter out everything in square brackets
filtered_message = re.sub(r'\[.*?\]', '', message).strip()
if filtered_message and self.prev_message == filtered_message:
raise ValueError("Do not repeat yourself. If you are stuck, try a different approach or search in google for the page you are looking for directly.")
self.prev_message = filtered_message
if "[send screenshot]" in message.lower():
wd = get_web_driver()
remove_highlight_and_labels(wd)
self.take_screenshot()
response_text = "Here is the screenshot of the current web page:"
elif '[highlight clickable elements]' in message.lower():
wd = get_web_driver()
highlight_elements_with_labels(wd, 'a, button, div[onclick], div[role="button"], div[tabindex], '
'span[onclick], span[role="button"], span[tabindex]')
self._shared_state.set("elements_highlighted", 'a, button, div[onclick], div[role="button"], div[tabindex], '
'span[onclick], span[role="button"], span[tabindex]')
self.take_screenshot()
all_elements = wd.find_elements(By.CSS_SELECTOR, '.highlighted-element')
all_element_texts = [element.text for element in all_elements]
element_texts_json = {}
for i, element_text in enumerate(all_element_texts):
element_texts_json[str(i + 1)] = self.remove_unicode(element_text)
element_texts_json = {k: v for k, v in element_texts_json.items() if v}
element_texts_formatted = ", ".join([f"{k}: {v}" for k, v in element_texts_json.items()])
response_text = ("Here is the screenshot of the current web page with highlighted clickable elements. \n\n"
"Texts of the elements are: " + element_texts_formatted + ".\n\n"
"Elements without text are not shown, but are available on screenshot. \n"
"Please make sure to analyze the screenshot to find the clickable element you need to click on.")
elif '[highlight text fields]' in message.lower():
wd = get_web_driver()
highlight_elements_with_labels(wd, 'input, textarea')
self._shared_state.set("elements_highlighted", "input, textarea")
self.take_screenshot()
all_elements = wd.find_elements(By.CSS_SELECTOR, '.highlighted-element')
all_element_texts = [element.text for element in all_elements]
element_texts_json = {}
for i, element_text in enumerate(all_element_texts):
element_texts_json[str(i + 1)] = self.remove_unicode(element_text)
element_texts_formatted = ", ".join([f"{k}: {v}" for k, v in element_texts_json.items()])
response_text = ("Here is the screenshot of the current web page with highlighted text fields: \n"
"Texts of the elements are: " + element_texts_formatted + ".\n"
"Please make sure to analyze the screenshot to find the text field you need to fill.")
elif '[highlight dropdowns]' in message.lower():
wd = get_web_driver()
highlight_elements_with_labels(wd, 'select')
self._shared_state.set("elements_highlighted", "select")
self.take_screenshot()
all_elements = wd.find_elements(By.CSS_SELECTOR, '.highlighted-element')
all_selector_values = {}
i = 0
for element in all_elements:
select = Select(element)
options = select.options
selector_values = {}
for j, option in enumerate(options):
selector_values[str(j)] = option.text
if j > 10:
break
all_selector_values[str(i + 1)] = selector_values
all_selector_values = {k: v for k, v in all_selector_values.items() if v}
all_selector_values_formatted = ", ".join([f"{k}: {v}" for k, v in all_selector_values.items()])
response_text = ("Here is the screenshot with highlighted dropdowns. \n"
"Selector values are: " + all_selector_values_formatted + ".\n"
"Please make sure to analyze the screenshot to find the dropdown you need to select.")
else:
return message
set_web_driver(wd)
content = self.create_response_content(response_text)
raise ValueError(content)
def take_screenshot(self):
from .tools.util.selenium import get_web_driver
from .tools.util import get_b64_screenshot
wd = get_web_driver()
screenshot = get_b64_screenshot(wd)
screenshot_data = base64.b64decode(screenshot)
with open(self.SCREENSHOT_FILE_NAME, "wb") as screenshot_file:
screenshot_file.write(screenshot_data)
def create_response_content(self, response_text):
with open(self.SCREENSHOT_FILE_NAME, "rb") as file:
file_id = self.client.files.create(
file=file,
purpose="vision",
).id
content = [
{"type": "text", "text": response_text},
{
"type": "image_file",
"image_file": {"file_id": file_id}
}
]
return content
# Function to check for Unicode escape sequences
def remove_unicode(self, data):
return re.sub(r'[^\x00-\x7F]+', '', data)
def run_search_and_scrape(self, query):
"""Run the SearchAndScrape tool and process the results."""
tool = SearchAndScrape(query=query)
result = tool.run()
logging.info(f"Search and Scrape result: {result}")
return result