Spaces:
Sleeping
Sleeping
| """ | |
| Web Search tool — information search on the internet. | |
| Allows agents to search for up-to-date information online and read web page content. | |
| Supports DuckDuckGo (no API key), Serper (Google Search), and custom providers. | |
| Capabilities: | |
| - Query search | |
| - Automatic page download and parsing (fetch_content=True) | |
| - Reading a specific URL (url parameter) | |
| - Full JavaScript rendering via Selenium (use_selenium=True) | |
| """ | |
| import contextlib | |
| import html | |
| import json | |
| import logging | |
| import re | |
| import urllib.error | |
| import urllib.parse | |
| import urllib.request | |
| from abc import ABC, abstractmethod | |
| from collections import deque | |
| from typing import Any, ClassVar, Self | |
| from .base import BaseTool, ToolResult | |
| logger = logging.getLogger(__name__) | |
| # ============================================================ | |
| # HTML Parser — text extraction from HTML | |
| # ============================================================ | |
| class SimpleHTMLParser: | |
| """ | |
| Simple HTML-to-text parser without external dependencies. | |
| Extracts text content by removing tags, scripts, and styles. | |
| BeautifulSoup is recommended for higher quality parsing. | |
| """ | |
| # Tags whose content should be completely removed | |
| REMOVE_TAGS: ClassVar[set[str]] = { | |
| "script", | |
| "style", | |
| "head", | |
| "meta", | |
| "link", | |
| "noscript", | |
| "iframe", | |
| "svg", | |
| "nav", | |
| "footer", | |
| "header", | |
| } | |
| # Tags after which a line break is needed | |
| BLOCK_TAGS: ClassVar[set[str]] = { | |
| "p", | |
| "div", | |
| "br", | |
| "h1", | |
| "h2", | |
| "h3", | |
| "h4", | |
| "h5", | |
| "h6", | |
| "li", | |
| "tr", | |
| "article", | |
| "section", | |
| } | |
| def html_to_text(cls, html_content: str, max_length: int = 8000) -> str: | |
| """ | |
| Convert HTML to plain text. | |
| Args: | |
| html_content: HTML string. | |
| max_length: Maximum result length. | |
| Returns: | |
| Extracted text. | |
| """ | |
| if not html_content: | |
| return "" | |
| text = html_content | |
| # Remove content of script, style, etc. tags | |
| for tag in cls.REMOVE_TAGS: | |
| pattern = rf"<{tag}[^>]*>.*?</{tag}>" | |
| text = re.sub(pattern, " ", text, flags=re.IGNORECASE | re.DOTALL) | |
| # Remove HTML comments | |
| text = re.sub(r"<!--.*?-->", " ", text, flags=re.DOTALL) | |
| # Add line breaks after block tags | |
| for tag in cls.BLOCK_TAGS: | |
| text = re.sub(rf"</{tag}>", f"</{tag}>\n", text, flags=re.IGNORECASE) | |
| text = re.sub(rf"<{tag}[^>]*/?>", f"\n<{tag}>", text, flags=re.IGNORECASE) | |
| # Replace <br> with newline | |
| text = re.sub(r"<br\s*/?>", "\n", text, flags=re.IGNORECASE) | |
| # Remove all remaining tags | |
| text = re.sub(r"<[^>]+>", " ", text) | |
| # Decode HTML entities | |
| text = html.unescape(text) | |
| # Normalise whitespace | |
| text = re.sub(r"[ \t]+", " ", text) # Collapse multiple spaces | |
| text = re.sub(r"\n\s*\n+", "\n\n", text) # Collapse multiple newlines | |
| text = text.strip() | |
| # Limit length | |
| if len(text) > max_length: | |
| text = text[:max_length] + "\n\n... (content truncated)" | |
| return text | |
| # ============================================================ | |
| # URL Fetcher — web page download and parsing | |
| # ============================================================ | |
| class URLFetcher: | |
| """Utility for downloading and parsing web pages.""" | |
| DEFAULT_HEADERS: ClassVar[dict[str, str]] = { | |
| "User-Agent": ( | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " | |
| "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" | |
| ), | |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", | |
| "Accept-Language": "en-US,en;q=0.5", | |
| } | |
| def __init__(self, timeout: int = 15, max_content_length: int = 500_000): | |
| """ | |
| Create URLFetcher. | |
| Args: | |
| timeout: Request timeout in seconds. | |
| max_content_length: Maximum size of downloaded content in bytes. | |
| """ | |
| self._timeout = timeout | |
| self._max_content_length = max_content_length | |
| def fetch(self, url: str) -> dict[str, Any]: | |
| """ | |
| Download and parse a web page. | |
| Args: | |
| url: Page URL. | |
| Returns: | |
| Dict with keys: | |
| - success: bool | |
| - url: str | |
| - title: str (if found) | |
| - content: str (text content) | |
| - error: str (if error) | |
| """ | |
| result: dict[str, Any] = { | |
| "success": False, | |
| "url": url, | |
| "title": "", | |
| "content": "", | |
| "error": "", | |
| } | |
| try: | |
| # Create request | |
| request = urllib.request.Request(url, headers=self.DEFAULT_HEADERS) | |
| with urllib.request.urlopen(request, timeout=self._timeout) as response: | |
| # Check Content-Type | |
| content_type = response.headers.get("Content-Type", "") | |
| if "text/html" not in content_type and "text/plain" not in content_type: | |
| result["error"] = f"Unsupported content type: {content_type}" | |
| return result | |
| # Read content with size limit | |
| raw_content = response.read(self._max_content_length) | |
| # Determine encoding | |
| charset = "utf-8" | |
| if "charset=" in content_type: | |
| match = re.search(r"charset=([^\s;]+)", content_type) | |
| if match: | |
| charset = match.group(1) | |
| try: | |
| html_content = raw_content.decode(charset, errors="replace") | |
| except (UnicodeDecodeError, LookupError): | |
| html_content = raw_content.decode("utf-8", errors="replace") | |
| # Extract title | |
| title_match = re.search(r"<title[^>]*>(.*?)</title>", html_content, re.IGNORECASE | re.DOTALL) | |
| if title_match: | |
| result["title"] = html.unescape(title_match.group(1).strip()) | |
| # Extract main content (try to find main/article) | |
| main_content = html_content | |
| for tag in ["main", "article", "div[role='main']"]: | |
| match = re.search( | |
| rf"<{tag}[^>]*>(.*?)</{tag.split('[')[0]}>", | |
| html_content, | |
| re.IGNORECASE | re.DOTALL, | |
| ) | |
| # Minimum content length to be considered main content | |
| min_main_content_length = 500 | |
| if match and len(match.group(1)) > min_main_content_length: | |
| main_content = match.group(1) | |
| break | |
| # Parse HTML to text | |
| result["content"] = SimpleHTMLParser.html_to_text(main_content) | |
| result["success"] = True | |
| except urllib.error.HTTPError as e: | |
| result["error"] = f"HTTP Error {e.code}: {e.reason}" | |
| except urllib.error.URLError as e: | |
| result["error"] = f"URL Error: {e.reason}" | |
| except TimeoutError: | |
| result["error"] = f"Request timed out after {self._timeout} seconds" | |
| except (ValueError, OSError, UnicodeDecodeError) as e: | |
| result["error"] = f"Fetch error: {e}" | |
| return result | |
| # ============================================================ | |
| # Selenium Fetcher — full rendering via browser | |
| # ============================================================ | |
| class SeleniumFetcher: | |
| """ | |
| Fetcher based on Selenium WebDriver for full page rendering. | |
| Allows you to: | |
| - Render JavaScript (SPA, dynamic content) | |
| - Wait for elements to load | |
| - Scroll the page to load more content | |
| - Handle sites that block simple HTTP requests | |
| Requires: ``pip install selenium webdriver-manager`` | |
| Example: | |
| # Basic usage | |
| fetcher = SeleniumFetcher() | |
| result = fetcher.fetch("https://example.com") | |
| print(result["content"]) | |
| # With settings | |
| fetcher = SeleniumFetcher( | |
| headless=True, | |
| wait_timeout=15, | |
| scroll_to_bottom=True, | |
| browser="chrome", | |
| ) | |
| result = fetcher.fetch("https://spa-website.com") | |
| """ | |
| def __init__( | |
| self, | |
| *, | |
| headless: bool = True, | |
| browser: str = "chrome", | |
| wait_timeout: int = 15, | |
| page_load_timeout: int = 30, | |
| max_content_length: int = 500_000, | |
| scroll_to_bottom: bool = False, | |
| scroll_pause: float = 1.0, | |
| max_scrolls: int = 5, | |
| extra_wait: float = 2.0, | |
| user_agent: str | None = None, | |
| window_size: tuple[int, int] = (1920, 1080), | |
| proxy: str | None = None, | |
| disable_images: bool = False, | |
| ): | |
| """ | |
| Create SeleniumFetcher. | |
| Args: | |
| headless: Run browser without GUI (recommended for servers). | |
| browser: Browser type: "chrome", "firefox", "edge". | |
| wait_timeout: Element wait timeout (sec). | |
| page_load_timeout: Page load timeout (sec). | |
| max_content_length: Maximum extracted content length. | |
| scroll_to_bottom: Scroll to the bottom to load more content. | |
| scroll_pause: Pause between scrolls (sec). | |
| max_scrolls: Maximum number of scrolls. | |
| extra_wait: Additional wait after loading (sec). | |
| user_agent: Custom User-Agent (None = default). | |
| window_size: Browser window size (width, height). | |
| proxy: Proxy server (e.g. "http://proxy:8080"). | |
| disable_images: Disable image loading (faster). | |
| """ | |
| self._headless = headless | |
| self._browser = browser.lower() | |
| self._wait_timeout = wait_timeout | |
| self._page_load_timeout = page_load_timeout | |
| self._max_content_length = max_content_length | |
| self._scroll_to_bottom = scroll_to_bottom | |
| self._scroll_pause = scroll_pause | |
| self._max_scrolls = max_scrolls | |
| self._extra_wait = extra_wait | |
| self._user_agent = user_agent | |
| self._window_size = window_size | |
| self._proxy = proxy | |
| self._disable_images = disable_images | |
| # Lazy-initialized driver | |
| self._driver: Any = None | |
| def _ensure_dependencies(self) -> None: | |
| """Check that selenium and webdriver-manager are available.""" | |
| try: | |
| import selenium # noqa: F401 | |
| except ImportError as e: | |
| msg = "Selenium is required for SeleniumFetcher. Install it with: pip install selenium webdriver-manager" | |
| raise ImportError(msg) from e | |
| def _create_driver(self) -> Any: | |
| """Create and configure the WebDriver.""" | |
| self._ensure_dependencies() | |
| if self._browser == "chrome": | |
| return self._create_chrome_driver() | |
| if self._browser == "firefox": | |
| return self._create_firefox_driver() | |
| if self._browser == "edge": | |
| return self._create_edge_driver() | |
| msg = f"Unsupported browser: {self._browser}. Use 'chrome', 'firefox', or 'edge'." | |
| raise ValueError(msg) | |
| def _create_chrome_driver(self) -> Any: | |
| """Create a Chrome WebDriver.""" | |
| from selenium import webdriver | |
| from selenium.webdriver.chrome.options import Options | |
| from selenium.webdriver.chrome.service import Service | |
| options = Options() | |
| if self._headless: | |
| options.add_argument("--headless=new") | |
| options.add_argument("--no-sandbox") | |
| options.add_argument("--disable-dev-shm-usage") | |
| options.add_argument("--disable-gpu") | |
| options.add_argument(f"--window-size={self._window_size[0]},{self._window_size[1]}") | |
| options.add_argument("--disable-blink-features=AutomationControlled") | |
| if self._user_agent: | |
| options.add_argument(f"--user-agent={self._user_agent}") | |
| if self._proxy: | |
| options.add_argument(f"--proxy-server={self._proxy}") | |
| if self._disable_images: | |
| prefs = {"profile.managed_default_content_settings.images": 2} | |
| options.add_experimental_option("prefs", prefs) | |
| # Try webdriver-manager, fall back to system chromedriver | |
| try: | |
| from webdriver_manager.chrome import ChromeDriverManager | |
| service = Service(ChromeDriverManager().install()) | |
| except Exception: # noqa: BLE001 | |
| logger.debug("webdriver-manager failed or not found, using system chromedriver") | |
| service = Service() | |
| return webdriver.Chrome(service=service, options=options) | |
| def _create_firefox_driver(self) -> Any: | |
| """Create a Firefox WebDriver.""" | |
| from selenium import webdriver | |
| from selenium.webdriver.firefox.options import Options | |
| from selenium.webdriver.firefox.service import Service | |
| options = Options() | |
| if self._headless: | |
| options.add_argument("--headless") | |
| options.set_preference("general.useragent.override", self._user_agent or "") | |
| if self._proxy: | |
| # Firefox proxy via preferences | |
| from urllib.parse import urlparse | |
| parsed = urlparse(self._proxy) | |
| options.set_preference("network.proxy.type", 1) | |
| options.set_preference("network.proxy.http", parsed.hostname) | |
| options.set_preference("network.proxy.http_port", parsed.port or 8080) | |
| if self._disable_images: | |
| options.set_preference("permissions.default.image", 2) | |
| try: | |
| from webdriver_manager.firefox import GeckoDriverManager | |
| service = Service(GeckoDriverManager().install()) | |
| except Exception: # noqa: BLE001 | |
| logger.debug("webdriver-manager failed or not found, using system geckodriver") | |
| service = Service() | |
| return webdriver.Firefox(service=service, options=options) | |
| def _create_edge_driver(self) -> Any: | |
| """Create an Edge WebDriver.""" | |
| from selenium import webdriver | |
| from selenium.webdriver.edge.options import Options | |
| from selenium.webdriver.edge.service import Service | |
| options = Options() | |
| if self._headless: | |
| options.add_argument("--headless=new") | |
| options.add_argument("--no-sandbox") | |
| options.add_argument("--disable-dev-shm-usage") | |
| options.add_argument(f"--window-size={self._window_size[0]},{self._window_size[1]}") | |
| if self._user_agent: | |
| options.add_argument(f"--user-agent={self._user_agent}") | |
| if self._proxy: | |
| options.add_argument(f"--proxy-server={self._proxy}") | |
| try: | |
| from webdriver_manager.microsoft import EdgeChromiumDriverManager | |
| service = Service(EdgeChromiumDriverManager().install()) | |
| except Exception: # noqa: BLE001 | |
| logger.debug("webdriver-manager failed or not found, using system msedgedriver") | |
| service = Service() | |
| return webdriver.Edge(service=service, options=options) | |
| def _get_driver(self) -> Any: | |
| """Get or create the WebDriver (lazy init).""" | |
| if self._driver is None: | |
| self._driver = self._create_driver() | |
| self._driver.set_page_load_timeout(self._page_load_timeout) | |
| self._driver.implicitly_wait(self._wait_timeout) | |
| return self._driver | |
| def _scroll_page(self, driver: Any) -> None: | |
| """Scroll the page to load dynamic content.""" | |
| import time | |
| last_height = driver.execute_script("return document.body.scrollHeight") | |
| for _ in range(self._max_scrolls): | |
| driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") | |
| time.sleep(self._scroll_pause) | |
| new_height = driver.execute_script("return document.body.scrollHeight") | |
| if new_height == last_height: | |
| break | |
| last_height = new_height | |
| def fetch(self, url: str) -> dict[str, Any]: | |
| """ | |
| Open a URL in the browser, wait for rendering, and extract content. | |
| Args: | |
| url: Page URL. | |
| Returns: | |
| Dict with keys: | |
| - success: bool | |
| - url: str | |
| - title: str | |
| - content: str (text content) | |
| - error: str (if error) | |
| """ | |
| import time | |
| result: dict[str, Any] = { | |
| "success": False, | |
| "url": url, | |
| "title": "", | |
| "content": "", | |
| "error": "", | |
| } | |
| try: | |
| driver = self._get_driver() | |
| driver.get(url) | |
| # Additional wait for JS rendering | |
| if self._extra_wait > 0: | |
| time.sleep(self._extra_wait) | |
| # Scrolling to load content | |
| if self._scroll_to_bottom: | |
| self._scroll_page(driver) | |
| # Extract title | |
| result["title"] = driver.title or "" | |
| # Extract HTML after rendering | |
| html_content = driver.page_source | |
| # Try to find the main content | |
| main_content = html_content | |
| for tag in ["main", "article"]: | |
| match = re.search( | |
| rf"<{tag}[^>]*>(.*?)</{tag}>", | |
| html_content, | |
| re.IGNORECASE | re.DOTALL, | |
| ) | |
| min_main_content_length = 500 | |
| if match and len(match.group(1)) > min_main_content_length: | |
| main_content = match.group(1) | |
| break | |
| # Parse HTML to text | |
| content = SimpleHTMLParser.html_to_text(main_content, max_length=self._max_content_length) | |
| result["content"] = content | |
| result["success"] = True | |
| except Exception as e: # noqa: BLE001 | |
| error_type = type(e).__name__ | |
| result["error"] = f"Selenium error ({error_type}): {e}" | |
| logger.debug("SeleniumFetcher error for %s: %s", url, result["error"]) | |
| return result | |
| def fetch_with_wait( | |
| self, | |
| url: str, | |
| wait_for_selector: str | None = None, | |
| wait_timeout: int | None = None, | |
| ) -> dict[str, Any]: | |
| """ | |
| Open a URL and wait for a specific CSS selector to appear. | |
| Args: | |
| url: Page URL. | |
| wait_for_selector: CSS selector of the element to wait for. | |
| wait_timeout: Wait timeout (sec). Defaults to self._wait_timeout. | |
| Returns: | |
| Dict with the result (same as fetch). | |
| """ | |
| import time | |
| result: dict[str, Any] = { | |
| "success": False, | |
| "url": url, | |
| "title": "", | |
| "content": "", | |
| "error": "", | |
| } | |
| try: | |
| driver = self._get_driver() | |
| driver.get(url) | |
| if wait_for_selector: | |
| from selenium.webdriver.common.by import By | |
| from selenium.webdriver.support import expected_conditions as EC # noqa: N812 | |
| from selenium.webdriver.support.ui import WebDriverWait | |
| timeout = wait_timeout or self._wait_timeout | |
| WebDriverWait(driver, timeout).until( | |
| EC.presence_of_element_located((By.CSS_SELECTOR, wait_for_selector)) | |
| ) | |
| elif self._extra_wait > 0: | |
| time.sleep(self._extra_wait) | |
| if self._scroll_to_bottom: | |
| self._scroll_page(driver) | |
| result["title"] = driver.title or "" | |
| html_content = driver.page_source | |
| main_content = html_content | |
| for tag in ["main", "article"]: | |
| match = re.search( | |
| rf"<{tag}[^>]*>(.*?)</{tag}>", | |
| html_content, | |
| re.IGNORECASE | re.DOTALL, | |
| ) | |
| min_main_content_length = 500 | |
| if match and len(match.group(1)) > min_main_content_length: | |
| main_content = match.group(1) | |
| break | |
| content = SimpleHTMLParser.html_to_text(main_content, max_length=self._max_content_length) | |
| result["content"] = content | |
| result["success"] = True | |
| except Exception as e: # noqa: BLE001 | |
| error_type = type(e).__name__ | |
| result["error"] = f"Selenium error ({error_type}): {e}" | |
| logger.debug("SeleniumFetcher.fetch_with_wait error for %s: %s", url, result["error"]) | |
| return result | |
| # ================================================================ | |
| # Browser Actions — page interaction | |
| # ================================================================ | |
| def click_element(self, selector: str, wait_timeout: int | None = None) -> dict[str, Any]: | |
| """ | |
| Click an element on the current page. | |
| Args: | |
| selector: CSS selector of the element. | |
| wait_timeout: Element wait timeout (sec). | |
| Returns: | |
| Dict with result: | |
| - success: bool | |
| - url: str (current URL after click) | |
| - title: str | |
| - clicked_text: str (text of the clicked element) | |
| - error: str | |
| """ | |
| result: dict[str, Any] = { | |
| "success": False, | |
| "url": "", | |
| "title": "", | |
| "clicked_text": "", | |
| "error": "", | |
| } | |
| try: | |
| from selenium.webdriver.common.by import By | |
| from selenium.webdriver.support import expected_conditions as EC # noqa: N812 | |
| from selenium.webdriver.support.ui import WebDriverWait | |
| driver = self._get_driver() | |
| timeout = wait_timeout or self._wait_timeout | |
| element = WebDriverWait(driver, timeout).until(EC.element_to_be_clickable((By.CSS_SELECTOR, selector))) | |
| result["clicked_text"] = element.text or element.get_attribute("textContent") or "" | |
| element.click() | |
| import time | |
| time.sleep(1.0) # Wait for navigation/rendering | |
| result["url"] = driver.current_url | |
| result["title"] = driver.title or "" | |
| result["success"] = True | |
| except Exception as e: # noqa: BLE001 | |
| result["error"] = f"Click error ({type(e).__name__}): {e}" | |
| return result | |
| def fill_input( | |
| self, | |
| selector: str, | |
| value: str, | |
| *, | |
| submit: bool = False, | |
| clear_first: bool = True, | |
| wait_timeout: int | None = None, | |
| ) -> dict[str, Any]: | |
| """ | |
| Fill an input field on the current page. | |
| Args: | |
| selector: CSS selector of the input field. | |
| value: Value to enter. | |
| submit: Press Enter after input. | |
| clear_first: Clear the field before input. | |
| wait_timeout: Element wait timeout (sec). | |
| Returns: | |
| Dict with result: | |
| - success: bool | |
| - url: str | |
| - title: str | |
| - error: str | |
| """ | |
| result: dict[str, Any] = { | |
| "success": False, | |
| "url": "", | |
| "title": "", | |
| "error": "", | |
| } | |
| try: | |
| from selenium.webdriver.common.by import By | |
| from selenium.webdriver.common.keys import Keys | |
| from selenium.webdriver.support import expected_conditions as EC # noqa: N812 | |
| from selenium.webdriver.support.ui import WebDriverWait | |
| driver = self._get_driver() | |
| timeout = wait_timeout or self._wait_timeout | |
| element = WebDriverWait(driver, timeout).until(EC.presence_of_element_located((By.CSS_SELECTOR, selector))) | |
| if clear_first: | |
| element.clear() | |
| element.send_keys(value) | |
| if submit: | |
| element.send_keys(Keys.RETURN) | |
| import time | |
| time.sleep(2.0) # Wait for form submission | |
| result["url"] = driver.current_url | |
| result["title"] = driver.title or "" | |
| result["success"] = True | |
| except Exception as e: # noqa: BLE001 | |
| result["error"] = f"Fill error ({type(e).__name__}): {e}" | |
| return result | |
| def extract_links( | |
| self, | |
| selector: str = "a[href]", | |
| *, | |
| base_url_filter: str | None = None, | |
| max_links: int = 50, | |
| ) -> dict[str, Any]: | |
| """ | |
| Extract all links from the current page. | |
| Args: | |
| selector: CSS selector for finding links. | |
| base_url_filter: Base URL filter (only links starting with this URL). | |
| max_links: Maximum number of links. | |
| Returns: | |
| Dict with result: | |
| - success: bool | |
| - url: str (current URL) | |
| - links: list[dict] — list of links [{url, text, title}] | |
| - count: int | |
| - error: str | |
| """ | |
| result: dict[str, Any] = { | |
| "success": False, | |
| "url": "", | |
| "links": [], | |
| "count": 0, | |
| "error": "", | |
| } | |
| try: | |
| from selenium.webdriver.common.by import By | |
| driver = self._get_driver() | |
| result["url"] = driver.current_url | |
| elements = driver.find_elements(By.CSS_SELECTOR, selector) | |
| links: list[dict[str, str]] = [] | |
| for elem in elements: | |
| href = elem.get_attribute("href") or "" | |
| if not href or href.startswith(("javascript:", "mailto:", "tel:", "#")): | |
| continue | |
| if base_url_filter and not href.startswith(base_url_filter): | |
| continue | |
| links.append( | |
| { | |
| "url": href, | |
| "text": (elem.text or "").strip()[:200], | |
| "title": (elem.get_attribute("title") or "").strip()[:200], | |
| } | |
| ) | |
| if len(links) >= max_links: | |
| break | |
| result["links"] = links | |
| result["count"] = len(links) | |
| result["success"] = True | |
| except Exception as e: # noqa: BLE001 | |
| result["error"] = f"Extract links error ({type(e).__name__}): {e}" | |
| return result | |
| def execute_js(self, script: str) -> dict[str, Any]: | |
| """ | |
| Execute arbitrary JavaScript on the current page. | |
| Args: | |
| script: JavaScript code to execute. | |
| Returns: | |
| Dict with result: | |
| - success: bool | |
| - url: str | |
| - return_value: Any (script execution result) | |
| - error: str | |
| """ | |
| result: dict[str, Any] = { | |
| "success": False, | |
| "url": "", | |
| "return_value": None, | |
| "error": "", | |
| } | |
| try: | |
| driver = self._get_driver() | |
| return_value = driver.execute_script(script) | |
| result["url"] = driver.current_url | |
| result["return_value"] = str(return_value) if return_value is not None else None | |
| result["success"] = True | |
| except Exception as e: # noqa: BLE001 | |
| result["error"] = f"JS execution error ({type(e).__name__}): {e}" | |
| return result | |
| def get_current_url(self) -> str: | |
| """Get the current URL of the open page.""" | |
| try: | |
| driver = self._get_driver() | |
| except Exception: # noqa: BLE001 | |
| return "" | |
| else: | |
| return driver.current_url | |
| def get_page_content(self) -> dict[str, Any]: | |
| """ | |
| Extract text content from the current open page. | |
| Returns: | |
| Dict with keys: success, url, title, content, error. | |
| """ | |
| result: dict[str, Any] = { | |
| "success": False, | |
| "url": "", | |
| "title": "", | |
| "content": "", | |
| "error": "", | |
| } | |
| try: | |
| driver = self._get_driver() | |
| result["url"] = driver.current_url | |
| result["title"] = driver.title or "" | |
| html_content = driver.page_source | |
| main_content = html_content | |
| for tag in ["main", "article"]: | |
| match = re.search( | |
| rf"<{tag}[^>]*>(.*?)</{tag}>", | |
| html_content, | |
| re.IGNORECASE | re.DOTALL, | |
| ) | |
| min_main_content_length = 500 | |
| if match and len(match.group(1)) > min_main_content_length: | |
| main_content = match.group(1) | |
| break | |
| result["content"] = SimpleHTMLParser.html_to_text(main_content, max_length=self._max_content_length) | |
| result["success"] = True | |
| except Exception as e: # noqa: BLE001 | |
| result["error"] = f"Get content error ({type(e).__name__}): {e}" | |
| return result | |
| def crawl( | |
| self, | |
| start_url: str, | |
| *, | |
| max_pages: int = 10, | |
| max_depth: int = 2, | |
| url_filter: str | None = None, | |
| link_selector: str = "a[href]", | |
| extract_content: bool = True, | |
| ) -> dict[str, Any]: | |
| """ | |
| Recursively crawl a site, collecting page content. | |
| Args: | |
| start_url: Starting URL. | |
| max_pages: Maximum number of pages to crawl. | |
| max_depth: Maximum crawl depth. | |
| url_filter: URL filter (only links containing this substring). | |
| Default — the same domain as start_url. | |
| link_selector: CSS selector for finding links. | |
| extract_content: Whether to extract page text content. | |
| Returns: | |
| Dict with result: | |
| - success: bool | |
| - pages: list[dict] — [{url, title, content, depth, links_found}] | |
| - total_pages: int | |
| - error: str | |
| """ | |
| import time | |
| from urllib.parse import urlparse | |
| result: dict[str, Any] = { | |
| "success": False, | |
| "pages": [], | |
| "total_pages": 0, | |
| "error": "", | |
| } | |
| # Determine domain filter | |
| if url_filter is None: | |
| parsed = urlparse(start_url) | |
| url_filter = f"{parsed.scheme}://{parsed.netloc}" | |
| visited: set[str] = set() | |
| queue: deque[tuple[str, int]] = deque([(start_url, 0)]) # (url, depth) | |
| pages: list[dict[str, Any]] = [] | |
| try: | |
| while queue and len(pages) < max_pages: | |
| current_url, depth = queue.popleft() | |
| # Normalise URL (remove fragments) | |
| current_url = current_url.split("#")[0].rstrip("/") | |
| if current_url in visited: | |
| continue | |
| visited.add(current_url) | |
| # Open the page | |
| fetch_result = self.fetch(current_url) | |
| if not fetch_result["success"]: | |
| continue | |
| page_info: dict[str, Any] = { | |
| "url": current_url, | |
| "title": fetch_result["title"], | |
| "depth": depth, | |
| "links_found": 0, | |
| } | |
| if extract_content: | |
| page_info["content"] = fetch_result["content"] | |
| # Extract links if max depth not reached | |
| if depth < max_depth: | |
| links_result = self.extract_links( | |
| selector=link_selector, | |
| base_url_filter=url_filter, | |
| max_links=50, | |
| ) | |
| if links_result["success"]: | |
| page_info["links_found"] = links_result["count"] | |
| for link in links_result["links"]: | |
| link_url = link["url"].split("#")[0].rstrip("/") | |
| if link_url not in visited and len(queue) < max_pages * 2: | |
| queue.append((link_url, depth + 1)) | |
| pages.append(page_info) | |
| time.sleep(0.5) # Polite pause between requests | |
| result["pages"] = pages | |
| result["total_pages"] = len(pages) | |
| result["success"] = True | |
| except Exception as e: # noqa: BLE001 | |
| result["error"] = f"Crawl error ({type(e).__name__}): {e}" | |
| result["pages"] = pages | |
| result["total_pages"] = len(pages) | |
| return result | |
| def close(self) -> None: | |
| """Close the browser and release resources.""" | |
| if self._driver is not None: | |
| with contextlib.suppress(Exception): | |
| self._driver.quit() | |
| self._driver = None | |
| def __del__(self) -> None: | |
| """Close the browser when the object is deleted.""" | |
| self.close() | |
| def __enter__(self) -> Self: | |
| """Context manager support.""" | |
| return self | |
| def __exit__(self, *_args: object) -> None: | |
| """Close the browser when exiting the context manager.""" | |
| self.close() | |
| # ============================================================ | |
| # Search Providers | |
| # ============================================================ | |
| class SearchProvider(ABC): | |
| """Abstract base class for search providers.""" | |
| def search(self, query: str, max_results: int = 5) -> list[dict[str, str]]: | |
| """ | |
| Perform a search and return results. | |
| Args: | |
| query: Search query. | |
| max_results: Maximum number of results. | |
| Returns: | |
| List of dicts with keys 'title', 'url', 'snippet'. | |
| """ | |
| ... | |
| class DuckDuckGoProvider(SearchProvider): | |
| """ | |
| Search provider via the DuckDuckGo Instant Answers API. | |
| Does not require an API key. Uses the public DuckDuckGo API. | |
| Limitation: returns only instant answers and related topics, | |
| not full search results (this is an API limitation). | |
| Serper or TavilyProvider is recommended for full search results. | |
| """ | |
| def __init__(self, timeout: int = 10): | |
| """ | |
| Create DuckDuckGoProvider. | |
| Args: | |
| timeout: Request timeout in seconds. | |
| """ | |
| self._timeout = timeout | |
| self._base_url = "https://api.duckduckgo.com/" | |
| def search(self, query: str, max_results: int = 5) -> list[dict[str, str]]: | |
| """ | |
| Perform a search via DuckDuckGo. | |
| Args: | |
| query: Search query. | |
| max_results: Maximum number of results. | |
| Returns: | |
| List of search results. | |
| """ | |
| results: list[dict[str, str]] = [] | |
| try: | |
| # Build URL for API request | |
| params = urllib.parse.urlencode( | |
| { | |
| "q": query, | |
| "format": "json", | |
| "no_html": "1", | |
| "skip_disambig": "1", | |
| } | |
| ) | |
| url = f"{self._base_url}?{params}" | |
| # Execute request | |
| request = urllib.request.Request( | |
| url, | |
| headers={"User-Agent": "RustworkxFramework/1.0"}, | |
| ) | |
| with urllib.request.urlopen(request, timeout=self._timeout) as response: | |
| data = json.loads(response.read().decode("utf-8")) | |
| # Extract Abstract (main answer) | |
| if data.get("Abstract"): | |
| results.append( | |
| { | |
| "title": data.get("Heading", "DuckDuckGo Answer"), | |
| "url": data.get("AbstractURL", ""), | |
| "snippet": data.get("Abstract", ""), | |
| } | |
| ) | |
| # Extract Related Topics | |
| results.extend( | |
| { | |
| "title": topic.get("Text", "")[:80], | |
| "url": topic.get("FirstURL", ""), | |
| "snippet": topic.get("Text", ""), | |
| } | |
| for topic in data.get("RelatedTopics", [])[: max_results - len(results)] | |
| if isinstance(topic, dict) and topic.get("Text") | |
| ) | |
| # If there are Results (rare for regular queries) | |
| results.extend( | |
| { | |
| "title": item.get("Text", "")[:80], | |
| "url": item.get("FirstURL", ""), | |
| "snippet": item.get("Text", ""), | |
| } | |
| for item in data.get("Results", [])[: max_results - len(results)] | |
| if isinstance(item, dict) | |
| ) | |
| except (urllib.error.URLError, ValueError, KeyError, OSError): | |
| # Network errors are handled in WebSearchTool | |
| pass | |
| return results[:max_results] | |
| class SerperProvider(SearchProvider): | |
| """ | |
| Search provider via the Serper API (Google Search). | |
| Requires an API key from https://serper.dev/ | |
| Returns full Google Search results. | |
| """ | |
| def __init__(self, api_key: str, timeout: int = 10): | |
| """ | |
| Create SerperProvider. | |
| Args: | |
| api_key: Serper API key. | |
| timeout: Request timeout in seconds. | |
| """ | |
| self._api_key = api_key | |
| self._timeout = timeout | |
| self._base_url = "https://google.serper.dev/search" | |
| def search(self, query: str, max_results: int = 5) -> list[dict[str, str]]: | |
| """ | |
| Perform a search via Serper (Google). | |
| Args: | |
| query: Search query. | |
| max_results: Maximum number of results. | |
| Returns: | |
| List of search results. | |
| """ | |
| results: list[dict[str, str]] = [] | |
| try: | |
| # Build request | |
| payload = json.dumps({"q": query, "num": max_results}).encode("utf-8") | |
| request = urllib.request.Request( | |
| self._base_url, | |
| data=payload, | |
| headers={ | |
| "X-API-KEY": self._api_key, | |
| "Content-Type": "application/json", | |
| }, | |
| method="POST", | |
| ) | |
| with urllib.request.urlopen(request, timeout=self._timeout) as response: | |
| data = json.loads(response.read().decode("utf-8")) | |
| # Extract organic results | |
| organic_items = data.get("organic", [])[:max_results] | |
| results.extend( | |
| { | |
| "title": item.get("title", ""), | |
| "url": item.get("link", ""), | |
| "snippet": item.get("snippet", ""), | |
| } | |
| for item in organic_items | |
| ) | |
| # Add answer box if present | |
| if data.get("answerBox") and len(results) < max_results: | |
| answer = data["answerBox"] | |
| results.insert( | |
| 0, | |
| { | |
| "title": answer.get("title", "Featured Answer"), | |
| "url": answer.get("link", ""), | |
| "snippet": answer.get("snippet", answer.get("answer", "")), | |
| }, | |
| ) | |
| except (urllib.error.URLError, ValueError, KeyError, OSError): | |
| pass | |
| return results[:max_results] | |
| class TavilyProvider(SearchProvider): | |
| """ | |
| Search provider via the Tavily API (as in LangGraph). | |
| Requires an API key from https://tavily.com/ | |
| Returns results with the option to get full page content. | |
| """ | |
| def __init__( | |
| self, | |
| api_key: str, | |
| timeout: int = 30, | |
| *, | |
| include_answer: bool = True, | |
| search_depth: str = "basic", # "basic" or "advanced" | |
| ): | |
| """ | |
| Create TavilyProvider. | |
| Args: | |
| api_key: Tavily API key. | |
| timeout: Request timeout in seconds. | |
| include_answer: Include the AI-generated answer. | |
| search_depth: Search depth ("basic" or "advanced"). | |
| """ | |
| self._api_key = api_key | |
| self._timeout = timeout | |
| self._include_answer = include_answer | |
| self._search_depth = search_depth | |
| self._base_url = "https://api.tavily.com/search" | |
| def search(self, query: str, max_results: int = 5) -> list[dict[str, str]]: | |
| """ | |
| Perform a search via Tavily. | |
| Args: | |
| query: Search query. | |
| max_results: Maximum number of results. | |
| Returns: | |
| List of search results. | |
| """ | |
| results: list[dict[str, str]] = [] | |
| try: | |
| payload = json.dumps( | |
| { | |
| "api_key": self._api_key, | |
| "query": query, | |
| "max_results": max_results, | |
| "include_answer": self._include_answer, | |
| "search_depth": self._search_depth, | |
| } | |
| ).encode("utf-8") | |
| request = urllib.request.Request( | |
| self._base_url, | |
| data=payload, | |
| headers={"Content-Type": "application/json"}, | |
| method="POST", | |
| ) | |
| with urllib.request.urlopen(request, timeout=self._timeout) as response: | |
| data = json.loads(response.read().decode("utf-8")) | |
| # Add AI answer if present | |
| if data.get("answer"): | |
| results.append( | |
| { | |
| "title": "Tavily AI Answer", | |
| "url": "", | |
| "snippet": data["answer"], | |
| } | |
| ) | |
| # Extract results | |
| results.extend( | |
| { | |
| "title": item.get("title", ""), | |
| "url": item.get("url", ""), | |
| "snippet": item.get("content", ""), | |
| } | |
| for item in data.get("results", [])[:max_results] | |
| ) | |
| except (urllib.error.URLError, ValueError, KeyError, OSError): | |
| pass | |
| return results[:max_results] | |
| # ============================================================ | |
| # WebSearchTool — main tool | |
| # ============================================================ | |
| class WebSearchTool(BaseTool): | |
| """ | |
| Tool for searching information on the internet with the ability to read pages. | |
| Allows agents to: | |
| 1. Search for information by query (action="search") | |
| 2. Read a specific URL (action="fetch") | |
| 3. Click elements on the page (action="click", requires Selenium) | |
| 4. Fill forms (action="fill", requires Selenium) | |
| 5. Extract links from the page (action="extract_links", requires Selenium) | |
| 6. Execute JavaScript (action="execute_js", requires Selenium) | |
| 7. Recursively crawl a site (action="crawl", requires Selenium) | |
| 8. Get the content of the current page (action="get_content", requires Selenium) | |
| All events are logged via the framework callback system (on_tool_start/end/error). | |
| Example: | |
| # Basic usage — search only (fast) | |
| tool = WebSearchTool() | |
| result = tool.execute(query="Python async await tutorial") | |
| # With automatic page download | |
| tool = WebSearchTool(fetch_content=True) | |
| result = tool.execute(query="Python asyncio best practices") | |
| # Reading a specific URL | |
| result = tool.execute(url="https://docs.python.org/3/library/asyncio.html") | |
| # With Selenium — full browser | |
| tool = WebSearchTool( | |
| use_selenium=True, | |
| fetch_content=True, | |
| selenium_config={"headless": True, "browser": "chrome"}, | |
| ) | |
| # Click element | |
| result = tool.execute(action="click", selector="button.submit") | |
| # Fill form | |
| result = tool.execute(action="fill", selector="input[name=q]", value="Python", submit=True) | |
| # Extract links | |
| result = tool.execute(action="extract_links", url="https://example.com") | |
| # Execute JavaScript | |
| result = tool.execute(action="execute_js", js_code="return document.title") | |
| # Recursive site crawl | |
| result = tool.execute(action="crawl", url="https://docs.python.org", max_depth=2, max_pages=5) | |
| """ | |
| def __init__( | |
| self, | |
| provider: SearchProvider | None = None, | |
| max_results: int = 5, | |
| max_content_length: int = 4000, | |
| *, | |
| fetch_content: bool = False, | |
| timeout: int = 15, | |
| use_selenium: bool = False, | |
| selenium_config: dict[str, Any] | None = None, | |
| selenium_fetcher: SeleniumFetcher | None = None, | |
| callback_manager: Any | None = None, | |
| ): | |
| """ | |
| Create WebSearchTool. | |
| Args: | |
| provider: Search provider (default DuckDuckGoProvider). | |
| max_results: Maximum number of search results. | |
| max_content_length: Maximum content length per page. | |
| fetch_content: Automatically download page content during search. | |
| timeout: Request timeout in seconds. | |
| use_selenium: Use Selenium for page downloading. | |
| selenium_config: Settings for SeleniumFetcher (if use_selenium=True). | |
| selenium_fetcher: Ready SeleniumFetcher instance. | |
| callback_manager: CallbackManager for sending events. | |
| If None — tries to get from context. | |
| """ | |
| self._provider = provider or DuckDuckGoProvider(timeout=timeout) | |
| self._max_results = max_results | |
| self._max_content_length = max_content_length | |
| self._fetch_content = fetch_content | |
| self._timeout = timeout | |
| self._fetcher = URLFetcher(timeout=timeout, max_content_length=500_000) | |
| self._callback_manager = callback_manager | |
| # Selenium support | |
| self._use_selenium = use_selenium | |
| self._selenium_fetcher: SeleniumFetcher | None = None | |
| if selenium_fetcher is not None: | |
| self._selenium_fetcher = selenium_fetcher | |
| self._use_selenium = True | |
| elif use_selenium: | |
| config = selenium_config or {} | |
| self._selenium_fetcher = SeleniumFetcher(**config) | |
| def _get_callback_manager(self) -> Any | None: | |
| """Get callback manager (from constructor or from context).""" | |
| if self._callback_manager is not None: | |
| return self._callback_manager | |
| try: | |
| from callbacks.context import get_callback_manager | |
| return get_callback_manager() | |
| except Exception: # noqa: BLE001 | |
| return None | |
| def _emit_tool_start(self, action: str, arguments: dict[str, Any] | None = None) -> None: | |
| """Send tool execution start event.""" | |
| from uuid import uuid4 | |
| cb = self._get_callback_manager() | |
| if cb is not None: | |
| with contextlib.suppress(Exception): | |
| cb.on_tool_start( | |
| uuid4(), | |
| tool_name=self.name, | |
| action=action, | |
| arguments=arguments or {}, | |
| ) | |
| def _emit_tool_end( | |
| self, | |
| action: str, | |
| *, | |
| success: bool = True, | |
| output_size: int = 0, | |
| duration_ms: float = 0.0, | |
| result_summary: str = "", | |
| ) -> None: | |
| """Send tool execution completion event.""" | |
| from uuid import uuid4 | |
| cb = self._get_callback_manager() | |
| if cb is not None: | |
| with contextlib.suppress(Exception): | |
| cb.on_tool_end( | |
| uuid4(), | |
| tool_name=self.name, | |
| action=action, | |
| success=success, | |
| output_size=output_size, | |
| duration_ms=duration_ms, | |
| result_summary=result_summary, | |
| ) | |
| def _emit_tool_error(self, action: str, error: Exception) -> None: | |
| """Send tool execution error event.""" | |
| from uuid import uuid4 | |
| cb = self._get_callback_manager() | |
| if cb is not None: | |
| with contextlib.suppress(Exception): | |
| cb.on_tool_error( | |
| uuid4(), | |
| tool_name=self.name, | |
| action=action, | |
| error_type=type(error).__name__, | |
| error_message=str(error), | |
| ) | |
| def name(self) -> str: | |
| return "web_search" | |
| def description(self) -> str: | |
| base = ( | |
| "Search the web and interact with web pages. " | |
| "Use 'query' to search for information. " | |
| "Use 'url' to read a specific web page. " | |
| "Set 'fetch_content=true' to automatically read the content of search results. " | |
| "Returns search results with titles, URLs, snippets, and optionally full page content." | |
| ) | |
| if self._use_selenium: | |
| base += ( | |
| "\n\nThis tool uses a real browser (Selenium) and supports advanced actions:\n" | |
| "- action='click': Click an element by CSS selector.\n" | |
| "- action='fill': Fill an input field and optionally submit.\n" | |
| "- action='extract_links': Extract all links from the current/specified page.\n" | |
| "- action='execute_js': Execute JavaScript code on the current page.\n" | |
| "- action='crawl': Recursively crawl a website collecting content.\n" | |
| "- action='get_content': Get text content of the current page.\n" | |
| "Use 'wait_for_selector' to wait for a specific element before extracting content." | |
| ) | |
| return base | |
| def parameters_schema(self) -> dict[str, Any]: | |
| action_enum = ["search", "fetch"] | |
| action_description = "Action to perform. Default: auto-detected from query/url." | |
| if self._use_selenium: | |
| action_enum = [ | |
| "search", | |
| "fetch", | |
| "click", | |
| "fill", | |
| "extract_links", | |
| "execute_js", | |
| "crawl", | |
| "get_content", | |
| ] | |
| action_description = ( | |
| "Browser action to perform. Default: auto-detected from query/url. " | |
| "Use 'click' to click elements, 'fill' to fill forms, " | |
| "'extract_links' to get all links, 'execute_js' to run JavaScript, " | |
| "'crawl' to recursively browse a site, 'get_content' to read current page." | |
| ) | |
| properties: dict[str, Any] = { | |
| "query": { | |
| "type": "string", | |
| "description": "Search query. Returns search results with titles, URLs, and snippets.", | |
| }, | |
| "url": { | |
| "type": "string", | |
| "description": "URL of a specific web page to read/open.", | |
| }, | |
| "fetch_content": { | |
| "type": "boolean", | |
| "description": ( | |
| "If true, automatically fetch and include full content of found pages. Default: false (faster)." | |
| ), | |
| }, | |
| "max_results": { | |
| "type": "integer", | |
| "description": f"Maximum number of search results. Default: {self._max_results}", | |
| }, | |
| "action": { | |
| "type": "string", | |
| "enum": action_enum, | |
| "description": action_description, | |
| }, | |
| } | |
| if self._use_selenium: | |
| properties.update( | |
| { | |
| "selector": { | |
| "type": "string", | |
| "description": ( | |
| "CSS selector for click/fill actions. " | |
| "Examples: 'button.submit', 'input[name=q]', '#login-btn', 'a.nav-link'." | |
| ), | |
| }, | |
| "value": { | |
| "type": "string", | |
| "description": "Value to type into an input field (for action='fill').", | |
| }, | |
| "submit": { | |
| "type": "boolean", | |
| "description": "Press Enter after filling input (for action='fill'). Default: false.", | |
| }, | |
| "js_code": { | |
| "type": "string", | |
| "description": ( | |
| "JavaScript code to execute on the page (for action='execute_js'). " | |
| "Use 'return ...' to get a value back." | |
| ), | |
| }, | |
| "wait_for_selector": { | |
| "type": "string", | |
| "description": ( | |
| "CSS selector to wait for before extracting content. " | |
| "Useful for SPA pages. Example: '.main-content', '#article-body'." | |
| ), | |
| }, | |
| "max_depth": { | |
| "type": "integer", | |
| "description": "Maximum crawl depth (for action='crawl'). Default: 2.", | |
| }, | |
| "max_pages": { | |
| "type": "integer", | |
| "description": "Maximum pages to crawl (for action='crawl'). Default: 10.", | |
| }, | |
| "url_filter": { | |
| "type": "string", | |
| "description": ( | |
| "URL prefix filter for crawl/extract_links. " | |
| "Only links starting with this prefix are followed. " | |
| "Default: same domain as start URL." | |
| ), | |
| }, | |
| } | |
| ) | |
| return { | |
| "type": "object", | |
| "properties": properties, | |
| "required": [], | |
| } | |
| def _format_search_results( | |
| self, | |
| results: list[dict[str, str]], | |
| *, | |
| with_content: bool = False, | |
| ) -> str: | |
| """Format search results for output.""" | |
| if not results: | |
| return "No results found for the query." | |
| lines = [f"Found {len(results)} result(s):\n"] | |
| for i, result in enumerate(results, 1): | |
| title = result.get("title", "Untitled") | |
| url = result.get("url", "") | |
| snippet = result.get("snippet", "") | |
| content = result.get("content", "") | |
| lines.append(f"[{i}] {title}") | |
| if url: | |
| lines.append(f" URL: {url}") | |
| if content and with_content: | |
| truncated = content[: self._max_content_length] | |
| if len(content) > self._max_content_length: | |
| truncated += "\n ... (content truncated)" | |
| lines.append(f"\n --- Page Content ---\n {truncated}\n") | |
| elif snippet: | |
| lines.append(f" {snippet}") | |
| lines.append("") | |
| return "\n".join(lines).strip() | |
| def _get_active_fetcher(self) -> URLFetcher | SeleniumFetcher: | |
| """Get the active fetcher (Selenium or standard).""" | |
| if self._use_selenium and self._selenium_fetcher is not None: | |
| return self._selenium_fetcher | |
| return self._fetcher | |
| def _fetch_url( | |
| self, | |
| url: str, | |
| wait_for_selector: str | None = None, | |
| ) -> ToolResult: | |
| """Download and return the URL content.""" | |
| import time as _time | |
| start = _time.monotonic() | |
| self._emit_tool_start("fetch", {"url": url, "wait_for_selector": wait_for_selector}) | |
| fetcher = self._get_active_fetcher() | |
| try: | |
| if isinstance(fetcher, SeleniumFetcher) and wait_for_selector: | |
| result = fetcher.fetch_with_wait(url, wait_for_selector=wait_for_selector) | |
| else: | |
| result = fetcher.fetch(url) | |
| elapsed_ms = (_time.monotonic() - start) * 1000 | |
| if not result["success"]: | |
| self._emit_tool_end( | |
| "fetch", | |
| success=False, | |
| duration_ms=elapsed_ms, | |
| result_summary=f"Failed: {result['error']}", | |
| ) | |
| return ToolResult( | |
| tool_name=self.name, | |
| success=False, | |
| error=f"Failed to fetch URL: {result['error']}", | |
| ) | |
| output_lines = [] | |
| if result["title"]: | |
| output_lines.append(f"Title: {result['title']}") | |
| output_lines.append(f"URL: {url}") | |
| if self._use_selenium: | |
| output_lines.append("(Rendered with Selenium browser)") | |
| output_lines.append("") | |
| output_lines.append("--- Page Content ---") | |
| output_lines.append(result["content"]) | |
| output = "\n".join(output_lines) | |
| self._emit_tool_end( | |
| "fetch", | |
| success=True, | |
| output_size=len(output), | |
| duration_ms=elapsed_ms, | |
| result_summary=f"Fetched {url} ({len(result['content'])} chars)", | |
| ) | |
| return ToolResult(tool_name=self.name, success=True, output=output) | |
| except Exception as e: # noqa: BLE001 | |
| self._emit_tool_error("fetch", e) | |
| return ToolResult(tool_name=self.name, success=False, error=str(e)) | |
| def _fetch_page_content(self, page_url: str) -> dict[str, Any] | None: | |
| """Download the content of a single page via the active fetcher.""" | |
| fetcher = self._get_active_fetcher() | |
| fetched = fetcher.fetch(page_url) | |
| if fetched["success"]: | |
| return fetched | |
| return None | |
| # ================================================================ | |
| # Browser Actions (require Selenium) | |
| # ================================================================ | |
| def _require_selenium(self, action: str) -> SeleniumFetcher: | |
| """Check that Selenium is available, otherwise raise an error.""" | |
| if self._selenium_fetcher is None: | |
| msg = ( | |
| f"Action '{action}' requires Selenium. " | |
| "Initialize WebSearchTool with use_selenium=True or provide selenium_fetcher." | |
| ) | |
| raise RuntimeError(msg) | |
| return self._selenium_fetcher | |
| def _execute_click(self, selector: str, wait_timeout: int | None = None) -> ToolResult: | |
| """Click an element.""" | |
| import time as _time | |
| start = _time.monotonic() | |
| self._emit_tool_start("click", {"selector": selector}) | |
| try: | |
| fetcher = self._require_selenium("click") | |
| result = fetcher.click_element(selector, wait_timeout=wait_timeout) | |
| elapsed_ms = (_time.monotonic() - start) * 1000 | |
| if not result["success"]: | |
| self._emit_tool_end( | |
| "click", | |
| success=False, | |
| duration_ms=elapsed_ms, | |
| result_summary=f"Click failed: {result['error']}", | |
| ) | |
| return ToolResult( | |
| tool_name=self.name, | |
| success=False, | |
| error=f"Click failed: {result['error']}", | |
| ) | |
| output_parts = [ | |
| f"Clicked element: '{selector}'", | |
| f"Element text: {result['clicked_text'][:200]}" if result["clicked_text"] else "", | |
| f"Current URL: {result['url']}", | |
| f"Page title: {result['title']}", | |
| ] | |
| output = "\n".join(p for p in output_parts if p) | |
| self._emit_tool_end( | |
| "click", | |
| success=True, | |
| output_size=len(output), | |
| duration_ms=elapsed_ms, | |
| result_summary=f"Clicked '{selector}' -> {result['url']}", | |
| ) | |
| return ToolResult(tool_name=self.name, success=True, output=output) | |
| except Exception as e: # noqa: BLE001 | |
| self._emit_tool_error("click", e) | |
| return ToolResult(tool_name=self.name, success=False, error=str(e)) | |
| def _execute_fill( | |
| self, | |
| selector: str, | |
| value: str, | |
| *, | |
| submit: bool = False, | |
| wait_timeout: int | None = None, | |
| ) -> ToolResult: | |
| """Fill an input field.""" | |
| import time as _time | |
| start = _time.monotonic() | |
| self._emit_tool_start("fill", {"selector": selector, "value": value, "submit": submit}) | |
| try: | |
| fetcher = self._require_selenium("fill") | |
| result = fetcher.fill_input( | |
| selector, | |
| value, | |
| submit=submit, | |
| wait_timeout=wait_timeout, | |
| ) | |
| elapsed_ms = (_time.monotonic() - start) * 1000 | |
| if not result["success"]: | |
| self._emit_tool_end( | |
| "fill", | |
| success=False, | |
| duration_ms=elapsed_ms, | |
| result_summary=f"Fill failed: {result['error']}", | |
| ) | |
| return ToolResult( | |
| tool_name=self.name, | |
| success=False, | |
| error=f"Fill failed: {result['error']}", | |
| ) | |
| output_parts = [ | |
| f"Filled '{selector}' with value: '{value}'", | |
| f"Submitted: {submit}", | |
| f"Current URL: {result['url']}", | |
| f"Page title: {result['title']}", | |
| ] | |
| output = "\n".join(output_parts) | |
| self._emit_tool_end( | |
| "fill", | |
| success=True, | |
| output_size=len(output), | |
| duration_ms=elapsed_ms, | |
| result_summary=f"Filled '{selector}' with '{value[:50]}'", | |
| ) | |
| return ToolResult(tool_name=self.name, success=True, output=output) | |
| except Exception as e: # noqa: BLE001 | |
| self._emit_tool_error("fill", e) | |
| return ToolResult(tool_name=self.name, success=False, error=str(e)) | |
| def _execute_extract_links( | |
| self, | |
| url: str | None = None, | |
| *, | |
| selector: str = "a[href]", | |
| url_filter: str | None = None, | |
| max_links: int = 50, | |
| ) -> ToolResult: | |
| """Extract links from the page.""" | |
| import time as _time | |
| start = _time.monotonic() | |
| self._emit_tool_start("extract_links", {"url": url, "selector": selector}) | |
| try: | |
| fetcher = self._require_selenium("extract_links") | |
| # If URL is specified — open the page first | |
| if url: | |
| fetch_result = fetcher.fetch(url) | |
| if not fetch_result["success"]: | |
| self._emit_tool_end( | |
| "extract_links", | |
| success=False, | |
| result_summary=f"Failed to open {url}", | |
| ) | |
| return ToolResult( | |
| tool_name=self.name, | |
| success=False, | |
| error=f"Failed to open URL: {fetch_result['error']}", | |
| ) | |
| result = fetcher.extract_links( | |
| selector=selector, | |
| base_url_filter=url_filter, | |
| max_links=max_links, | |
| ) | |
| elapsed_ms = (_time.monotonic() - start) * 1000 | |
| if not result["success"]: | |
| self._emit_tool_end( | |
| "extract_links", | |
| success=False, | |
| duration_ms=elapsed_ms, | |
| result_summary=f"Extract failed: {result['error']}", | |
| ) | |
| return ToolResult( | |
| tool_name=self.name, | |
| success=False, | |
| error=f"Extract links failed: {result['error']}", | |
| ) | |
| lines = [f"Found {result['count']} link(s) on {result['url']}:\n"] | |
| for i, link in enumerate(result["links"], 1): | |
| text = link.get("text", "").strip() or "(no text)" | |
| lines.append(f"[{i}] {text}") | |
| lines.append(f" URL: {link['url']}") | |
| if link.get("title"): | |
| lines.append(f" Title: {link['title']}") | |
| lines.append("") | |
| output = "\n".join(lines).strip() | |
| self._emit_tool_end( | |
| "extract_links", | |
| success=True, | |
| output_size=len(output), | |
| duration_ms=elapsed_ms, | |
| result_summary=f"Extracted {result['count']} links from {result['url']}", | |
| ) | |
| return ToolResult(tool_name=self.name, success=True, output=output) | |
| except Exception as e: # noqa: BLE001 | |
| self._emit_tool_error("extract_links", e) | |
| return ToolResult(tool_name=self.name, success=False, error=str(e)) | |
| def _execute_js(self, js_code: str) -> ToolResult: | |
| """Execute JavaScript on the current page.""" | |
| import time as _time | |
| start = _time.monotonic() | |
| self._emit_tool_start("execute_js", {"js_code": js_code[:200]}) | |
| try: | |
| fetcher = self._require_selenium("execute_js") | |
| result = fetcher.execute_js(js_code) | |
| elapsed_ms = (_time.monotonic() - start) * 1000 | |
| if not result["success"]: | |
| self._emit_tool_end( | |
| "execute_js", | |
| success=False, | |
| duration_ms=elapsed_ms, | |
| result_summary=f"JS error: {result['error']}", | |
| ) | |
| return ToolResult( | |
| tool_name=self.name, | |
| success=False, | |
| error=f"JavaScript execution failed: {result['error']}", | |
| ) | |
| output_parts = [ | |
| f"JavaScript executed on: {result['url']}", | |
| ] | |
| if result["return_value"] is not None: | |
| output_parts.append(f"Return value: {result['return_value']}") | |
| else: | |
| output_parts.append("(no return value)") | |
| output = "\n".join(output_parts) | |
| self._emit_tool_end( | |
| "execute_js", | |
| success=True, | |
| output_size=len(output), | |
| duration_ms=elapsed_ms, | |
| result_summary="JS executed successfully", | |
| ) | |
| return ToolResult(tool_name=self.name, success=True, output=output) | |
| except Exception as e: # noqa: BLE001 | |
| self._emit_tool_error("execute_js", e) | |
| return ToolResult(tool_name=self.name, success=False, error=str(e)) | |
| def _execute_crawl( | |
| self, | |
| start_url: str, | |
| *, | |
| max_pages: int = 10, | |
| max_depth: int = 2, | |
| url_filter: str | None = None, | |
| ) -> ToolResult: | |
| """Recursively crawl the site.""" | |
| import time as _time | |
| start = _time.monotonic() | |
| self._emit_tool_start( | |
| "crawl", | |
| { | |
| "url": start_url, | |
| "max_pages": max_pages, | |
| "max_depth": max_depth, | |
| "url_filter": url_filter, | |
| }, | |
| ) | |
| try: | |
| fetcher = self._require_selenium("crawl") | |
| result = fetcher.crawl( | |
| start_url, | |
| max_pages=max_pages, | |
| max_depth=max_depth, | |
| url_filter=url_filter, | |
| extract_content=True, | |
| ) | |
| elapsed_ms = (_time.monotonic() - start) * 1000 | |
| if not result["success"] and not result["pages"]: | |
| self._emit_tool_end( | |
| "crawl", | |
| success=False, | |
| duration_ms=elapsed_ms, | |
| result_summary=f"Crawl failed: {result['error']}", | |
| ) | |
| return ToolResult( | |
| tool_name=self.name, | |
| success=False, | |
| error=f"Crawl failed: {result['error']}", | |
| ) | |
| lines = [f"Crawled {result['total_pages']} page(s) starting from {start_url}:\n"] | |
| for i, page in enumerate(result["pages"], 1): | |
| lines.append(f"[{i}] {page.get('title', 'Untitled')}") | |
| lines.append(f" URL: {page['url']}") | |
| lines.append(f" Depth: {page['depth']}, Links found: {page.get('links_found', 0)}") | |
| content = page.get("content", "") | |
| if content: | |
| truncated = content[: self._max_content_length] | |
| if len(content) > self._max_content_length: | |
| truncated += "\n ... (content truncated)" | |
| lines.append(f"\n --- Page Content ---\n {truncated}\n") | |
| lines.append("") | |
| if result.get("error"): | |
| lines.append(f"Note: Crawl completed with warning: {result['error']}") | |
| output = "\n".join(lines).strip() | |
| self._emit_tool_end( | |
| "crawl", | |
| success=True, | |
| output_size=len(output), | |
| duration_ms=elapsed_ms, | |
| result_summary=f"Crawled {result['total_pages']} pages from {start_url}", | |
| ) | |
| return ToolResult(tool_name=self.name, success=True, output=output) | |
| except Exception as e: # noqa: BLE001 | |
| self._emit_tool_error("crawl", e) | |
| return ToolResult(tool_name=self.name, success=False, error=str(e)) | |
| def _execute_get_content(self) -> ToolResult: | |
| """Get the content of the currently open page.""" | |
| import time as _time | |
| start = _time.monotonic() | |
| self._emit_tool_start("get_content", {}) | |
| try: | |
| fetcher = self._require_selenium("get_content") | |
| result = fetcher.get_page_content() | |
| elapsed_ms = (_time.monotonic() - start) * 1000 | |
| if not result["success"]: | |
| self._emit_tool_end( | |
| "get_content", | |
| success=False, | |
| duration_ms=elapsed_ms, | |
| result_summary=f"Failed: {result['error']}", | |
| ) | |
| return ToolResult( | |
| tool_name=self.name, | |
| success=False, | |
| error=f"Get content failed: {result['error']}", | |
| ) | |
| output_lines = [] | |
| if result["title"]: | |
| output_lines.append(f"Title: {result['title']}") | |
| output_lines.append(f"URL: {result['url']}") | |
| output_lines.append("") | |
| output_lines.append("--- Page Content ---") | |
| output_lines.append(result["content"]) | |
| output = "\n".join(output_lines) | |
| self._emit_tool_end( | |
| "get_content", | |
| success=True, | |
| output_size=len(output), | |
| duration_ms=elapsed_ms, | |
| result_summary=f"Got content from {result['url']} ({len(result['content'])} chars)", | |
| ) | |
| return ToolResult(tool_name=self.name, success=True, output=output) | |
| except Exception as e: # noqa: BLE001 | |
| self._emit_tool_error("get_content", e) | |
| return ToolResult(tool_name=self.name, success=False, error=str(e)) | |
| # ================================================================ | |
| # Main execute | |
| # ================================================================ | |
| def execute( # noqa: PLR0912 | |
| self, | |
| query: str = "", | |
| url: str = "", | |
| *, | |
| action: str = "", | |
| fetch_content: bool | None = None, | |
| max_results: int | None = None, | |
| wait_for_selector: str | None = None, | |
| selector: str = "", | |
| value: str = "", | |
| submit: bool = False, | |
| js_code: str = "", | |
| max_depth: int = 2, | |
| max_pages: int = 10, | |
| url_filter: str | None = None, | |
| **_kwargs: Any, | |
| ) -> ToolResult: | |
| """ | |
| Perform a search, read a web page, or execute a browser action. | |
| Args: | |
| query: Search query. | |
| url: URL to read/open. | |
| action: Action: "search", "fetch", "click", "fill", | |
| "extract_links", "execute_js", "crawl", "get_content". | |
| If not specified — determined automatically. | |
| fetch_content: Whether to download page content. | |
| max_results: Maximum number of results. | |
| wait_for_selector: CSS selector to wait for. | |
| selector: CSS selector for click/fill. | |
| value: Value for fill. | |
| submit: Press Enter after fill. | |
| js_code: JavaScript code for execute_js. | |
| max_depth: Maximum depth for crawl. | |
| max_pages: Maximum number of pages for crawl. | |
| url_filter: URL filter for crawl/extract_links. | |
| Returns: | |
| ToolResult with results. | |
| """ | |
| # Determine action automatically if not specified | |
| if not action: | |
| if query: | |
| action = "search" | |
| elif url: | |
| action = "fetch" | |
| elif selector: | |
| action = "click" | |
| elif js_code: | |
| action = "execute_js" | |
| else: | |
| return ToolResult( | |
| tool_name=self.name, | |
| success=False, | |
| error="No action, query, url, selector, or js_code provided.", | |
| ) | |
| # Dispatch by action | |
| if action == "click": | |
| if not selector: | |
| return ToolResult( | |
| tool_name=self.name, | |
| success=False, | |
| error="Action 'click' requires 'selector' parameter.", | |
| ) | |
| return self._execute_click(selector) | |
| if action == "fill": | |
| if not selector: | |
| return ToolResult( | |
| tool_name=self.name, | |
| success=False, | |
| error="Action 'fill' requires 'selector' parameter.", | |
| ) | |
| return self._execute_fill(selector, value, submit=submit) | |
| if action == "extract_links": | |
| return self._execute_extract_links( | |
| url or None, | |
| selector=selector or "a[href]", | |
| url_filter=url_filter, | |
| ) | |
| if action == "execute_js": | |
| if not js_code: | |
| return ToolResult( | |
| tool_name=self.name, | |
| success=False, | |
| error="Action 'execute_js' requires 'js_code' parameter.", | |
| ) | |
| return self._execute_js(js_code) | |
| if action == "crawl": | |
| if not url: | |
| return ToolResult( | |
| tool_name=self.name, | |
| success=False, | |
| error="Action 'crawl' requires 'url' parameter.", | |
| ) | |
| return self._execute_crawl( | |
| url, | |
| max_pages=max_pages, | |
| max_depth=max_depth, | |
| url_filter=url_filter, | |
| ) | |
| if action == "get_content": | |
| return self._execute_get_content() | |
| if action == "fetch": | |
| if not url: | |
| return ToolResult( | |
| tool_name=self.name, | |
| success=False, | |
| error="Action 'fetch' requires 'url' parameter.", | |
| ) | |
| return self._fetch_url(url, wait_for_selector=wait_for_selector) | |
| # "search" action or default fallback | |
| if not query: | |
| return ToolResult( | |
| tool_name=self.name, | |
| success=False, | |
| error="No search query provided. Use 'query' to search or 'url' to read a page.", | |
| ) | |
| return self._execute_search( | |
| query, | |
| fetch_content=fetch_content, | |
| max_results=max_results, | |
| wait_for_selector=wait_for_selector, | |
| ) | |
| def _execute_search( | |
| self, | |
| query: str, | |
| *, | |
| fetch_content: bool | None = None, | |
| max_results: int | None = None, | |
| wait_for_selector: str | None = None, # noqa: ARG002 | |
| ) -> ToolResult: | |
| """Perform a search.""" | |
| import time as _time | |
| start = _time.monotonic() | |
| num_results = max_results if max_results is not None else self._max_results | |
| num_results = max(1, min(num_results, 10)) | |
| should_fetch = fetch_content if fetch_content is not None else self._fetch_content | |
| self._emit_tool_start("search", {"query": query, "max_results": num_results, "fetch_content": should_fetch}) | |
| try: | |
| results = self._provider.search(query, num_results) | |
| if not results: | |
| output = ( | |
| f"No results found for query: '{query}'. Try rephrasing your search or using different keywords." | |
| ) | |
| elapsed_ms = (_time.monotonic() - start) * 1000 | |
| self._emit_tool_end( | |
| "search", | |
| success=True, | |
| output_size=len(output), | |
| duration_ms=elapsed_ms, | |
| result_summary="No results found", | |
| ) | |
| return ToolResult(tool_name=self.name, success=True, output=output) | |
| # If page content download is needed | |
| if should_fetch: | |
| for result in results: | |
| page_url = result.get("url", "") | |
| if page_url: | |
| fetched = self._fetch_page_content(page_url) | |
| if fetched is not None: | |
| result["content"] = fetched["content"] | |
| if fetched["title"] and not result.get("title"): | |
| result["title"] = fetched["title"] | |
| output = self._format_search_results(results, with_content=should_fetch) | |
| elapsed_ms = (_time.monotonic() - start) * 1000 | |
| self._emit_tool_end( | |
| "search", | |
| success=True, | |
| output_size=len(output), | |
| duration_ms=elapsed_ms, | |
| result_summary=f"Found {len(results)} results for '{query}'", | |
| ) | |
| return ToolResult(tool_name=self.name, success=True, output=output) | |
| except TimeoutError as e: | |
| self._emit_tool_error("search", e) | |
| error_msg = f"Search timed out after {self._timeout} seconds" | |
| except urllib.error.URLError as e: | |
| self._emit_tool_error("search", e) | |
| error_msg = f"Network error: {e.reason}" | |
| except (ValueError, KeyError, OSError) as e: | |
| self._emit_tool_error("search", e) | |
| error_msg = f"Search error: {e}" | |
| return ToolResult(tool_name=self.name, success=False, error=error_msg) | |
| def close(self) -> None: | |
| """Close the Selenium browser (if used).""" | |
| if self._selenium_fetcher is not None: | |
| self._selenium_fetcher.close() | |
| def __del__(self) -> None: | |
| """Close resources on deletion.""" | |
| self.close() | |
| def __enter__(self) -> Self: | |
| """Context manager support.""" | |
| return self | |
| def __exit__(self, *_args: object) -> None: | |
| """Close resources when exiting the context manager.""" | |
| self.close() | |
| # ============================================================ | |
| # Factory for creating WebSearchTool from a dict config | |
| # ============================================================ | |
| def _create_web_search_tool(**kwargs: Any) -> WebSearchTool: | |
| """ | |
| Create a WebSearchTool from config parameters. | |
| Supports all WebSearchTool constructor parameters. | |
| Additionally: ``provider="serper"`` / ``provider="tavily"`` | |
| automatically creates the corresponding provider. | |
| Example config:: | |
| {"name": "web_search", "use_selenium": True} | |
| {"name": "web_search", "provider": "serper", "fetch_content": True} | |
| """ | |
| provider = kwargs.pop("provider", None) | |
| if isinstance(provider, str): | |
| provider_name = provider.lower() | |
| if provider_name == "serper": | |
| api_key = kwargs.pop("serper_api_key", None) or kwargs.pop("api_key", None) | |
| provider = SerperProvider(api_key=api_key) if api_key else DuckDuckGoProvider() | |
| elif provider_name == "tavily": | |
| api_key = kwargs.pop("tavily_api_key", None) or kwargs.pop("api_key", None) | |
| provider = TavilyProvider(api_key=api_key) if api_key else DuckDuckGoProvider() | |
| elif provider_name in ("duckduckgo", "ddg"): | |
| provider = DuckDuckGoProvider() | |
| else: | |
| provider = DuckDuckGoProvider() | |
| return WebSearchTool(provider=provider, **kwargs) | |
| # Automatic factory registration on module import | |
| from .base import register_tool_factory # noqa: E402 | |
| register_tool_factory("web_search", _create_web_search_tool) | |