| from __future__ import annotations |
|
|
| import re |
| from typing import Annotated, Dict, Literal, Tuple |
| from urllib.parse import urlparse, urljoin |
|
|
| import gradio as gr |
| import requests |
| from bs4 import BeautifulSoup |
| from markdownify import markdownify as md |
| from readability import Document |
|
|
| from app import _fetch_rate_limiter, _log_call_end, _log_call_start, _truncate_for_log |
| from ._docstrings import autodoc |
|
|
|
|
| |
| TOOL_SUMMARY = ( |
| "Fetch a webpage and return clean Markdown, raw HTML, or a list of links, with max length and pagination via " |
| "offset; if truncated, the output includes a notice with next_cursor for exact continuation. " |
| "Use in combination with `Web_Search` to navigate the web." |
| ) |
|
|
| ModeOption = Literal["markdown", "html", "url_scraper"] |
|
|
|
|
| def _http_get_enhanced(url: str, timeout: int | float = 30, *, skip_rate_limit: bool = False) -> requests.Response: |
| headers = { |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", |
| "Accept-Language": "en-US,en;q=0.9", |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", |
| "Accept-Encoding": "gzip, deflate, br", |
| "DNT": "1", |
| "Connection": "keep-alive", |
| "Upgrade-Insecure-Requests": "1", |
| } |
| if not skip_rate_limit: |
| _fetch_rate_limiter.acquire() |
| try: |
| response = requests.get( |
| url, |
| headers=headers, |
| timeout=timeout, |
| allow_redirects=True, |
| stream=False, |
| ) |
| response.raise_for_status() |
| return response |
| except requests.exceptions.Timeout as exc: |
| raise requests.exceptions.RequestException("Request timed out. The webpage took too long to respond.") from exc |
| except requests.exceptions.ConnectionError as exc: |
| raise requests.exceptions.RequestException("Connection error. Please check the URL and your internet connection.") from exc |
| except requests.exceptions.HTTPError as exc: |
| if response.status_code == 403: |
| raise requests.exceptions.RequestException("Access forbidden. The website may be blocking automated requests.") from exc |
| if response.status_code == 404: |
| raise requests.exceptions.RequestException("Page not found. Please check the URL.") from exc |
| if response.status_code == 429: |
| raise requests.exceptions.RequestException("Rate limited. Please try again in a few minutes.") from exc |
| raise requests.exceptions.RequestException(f"HTTP error {response.status_code}: {exc}") from exc |
|
|
|
|
| def _normalize_whitespace(text: str) -> str: |
| text = re.sub(r"[ \t\u00A0]+", " ", text) |
| text = re.sub(r"\n\s*\n\s*\n+", "\n\n", text.strip()) |
| return text.strip() |
|
|
|
|
| def _truncate(text: str, max_chars: int) -> Tuple[str, bool]: |
| if max_chars is None or max_chars <= 0 or len(text) <= max_chars: |
| return text, False |
| return text[:max_chars].rstrip() + " …", True |
|
|
|
|
| def _shorten(text: str, limit: int) -> str: |
| if limit <= 0 or len(text) <= limit: |
| return text |
| return text[: max(0, limit - 1)].rstrip() + "…" |
|
|
|
|
| def _domain_of(url: str) -> str: |
| try: |
| return urlparse(url).netloc or "" |
| except Exception: |
| return "" |
|
|
|
|
| def _normalize_mode(mode: str | None) -> ModeOption: |
| """Convert UI-supplied labels into canonical mode values.""" |
| if not mode: |
| return "markdown" |
| normalized = mode.strip().lower() |
| normalized = normalized.replace("-", "_").replace(" ", "_") |
| if normalized in {"markdown", "markdown_mode", "md"}: |
| return "markdown" |
| if normalized in {"html", "html_mode"}: |
| return "html" |
| if normalized in {"url_scraper", "urlscraper", "url_mode", "scraper", "links", "link_mode"}: |
| return "url_scraper" |
| return "markdown" |
|
|
|
|
| def _extract_links_from_soup(soup: BeautifulSoup, base_url: str) -> str: |
| links = [] |
| for link in soup.find_all("a", href=True): |
| href = link.get("href") |
| text = link.get_text(strip=True) |
| if href.startswith("http"): |
| full_url = href |
| elif href.startswith("//"): |
| full_url = "https:" + href |
| elif href.startswith("/"): |
| full_url = urljoin(base_url, href) |
| else: |
| full_url = urljoin(base_url, href) |
| if text and href not in ["#", "javascript:void(0)"]: |
| links.append(f"- [{text}]({full_url})") |
| if not links: |
| return "No links found on this page." |
| title = soup.find("title") |
| title_text = title.get_text(strip=True) if title else "Links from webpage" |
| return f"# {title_text}\n\n" + "\n".join(links) |
|
|
|
|
| def _fullpage_markdown_from_soup(full_soup: BeautifulSoup, base_url: str, strip_selectors: str = "") -> str: |
| if strip_selectors: |
| selectors = [s.strip() for s in strip_selectors.split(",") if s.strip()] |
| for selector in selectors: |
| try: |
| for element in full_soup.select(selector): |
| element.decompose() |
| except Exception: |
| continue |
| for element in full_soup.select("script, style, nav, footer, header, aside"): |
| element.decompose() |
| main = ( |
| full_soup.find("main") |
| or full_soup.find("article") |
| or full_soup.find("div", class_=re.compile(r"content|main|post|article", re.I)) |
| or full_soup.find("body") |
| ) |
| if not main: |
| return "No main content found on the webpage." |
| markdown_text = md(str(main), heading_style="ATX") |
| markdown_text = re.sub(r"\n{3,}", "\n\n", markdown_text) |
| markdown_text = re.sub(r"\[\s*\]\([^)]*\)", "", markdown_text) |
| markdown_text = re.sub(r"[ \t]+", " ", markdown_text) |
| markdown_text = markdown_text.strip() |
| title = full_soup.find("title") |
| if title and title.get_text(strip=True): |
| markdown_text = f"# {title.get_text(strip=True)}\n\n{markdown_text}" |
| return markdown_text or "No content could be extracted." |
|
|
|
|
| def _truncate_with_notice(content: str, max_chars: int) -> Tuple[str, Dict[str, object]]: |
| total_chars = len(content) |
| if total_chars <= max_chars: |
| return content, { |
| "truncated": False, |
| "returned_chars": total_chars, |
| "total_chars_estimate": total_chars, |
| "next_cursor": None, |
| } |
| truncated = content[:max_chars] |
| last_paragraph = truncated.rfind("\n\n") |
| if last_paragraph > max_chars * 0.7: |
| truncated = truncated[:last_paragraph] |
| cursor_pos = last_paragraph |
| elif "." in truncated[-100:]: |
| last_period = truncated.rfind(".") |
| if last_period > max_chars * 0.8: |
| truncated = truncated[: last_period + 1] |
| cursor_pos = last_period + 1 |
| else: |
| cursor_pos = len(truncated) |
| else: |
| cursor_pos = len(truncated) |
| metadata = { |
| "truncated": True, |
| "returned_chars": len(truncated), |
| "total_chars_estimate": total_chars, |
| "next_cursor": cursor_pos, |
| } |
| truncated = truncated.rstrip() |
| truncation_notice = ( |
| "\n\n---\n" |
| f"**Content Truncated:** Showing {metadata['returned_chars']:,} of {metadata['total_chars_estimate']:,} characters " |
| f"({(metadata['returned_chars']/metadata['total_chars_estimate']*100):.1f}%)\n" |
| f"**Next cursor:** {metadata['next_cursor']} (use this value with offset parameter for continuation)\n" |
| "---" |
| ) |
| return truncated + truncation_notice, metadata |
|
|
|
|
| @autodoc(summary=TOOL_SUMMARY) |
| def Web_Fetch( |
| url: Annotated[str, "The absolute URL to fetch (must return HTML)."], |
| max_chars: Annotated[int, "Maximum characters to return (0 = no limit, full page content)."] = 3000, |
| offset: Annotated[int, "Character offset to start from (for pagination, use next_cursor from previous call)."] = 0, |
| strip_selectors: Annotated[str, "CSS selectors to remove (comma-separated, e.g., '.header, .footer, nav')."] = "", |
| mode: Annotated[ |
| str, |
| "Output mode: 'markdown' (default, clean content), 'html' (raw response), or 'url_scraper' (links list).", |
| ] = "markdown", |
| ) -> str: |
| canonical_mode = _normalize_mode(mode) |
| _log_call_start( |
| "Web_Fetch", |
| url=url, |
| max_chars=max_chars, |
| strip_selectors=strip_selectors, |
| mode=canonical_mode, |
| offset=offset, |
| ) |
| if not url or not url.strip(): |
| result = "Please enter a valid URL." |
| _log_call_end("Web_Fetch", _truncate_for_log(result)) |
| return result |
| try: |
| resp = _http_get_enhanced(url) |
| resp.raise_for_status() |
| except requests.exceptions.RequestException as exc: |
| result = f"An error occurred: {exc}" |
| _log_call_end("Web_Fetch", _truncate_for_log(result)) |
| return result |
| final_url = str(resp.url) |
| ctype = resp.headers.get("Content-Type", "") |
| if "html" not in ctype.lower(): |
| result = f"Unsupported content type for extraction: {ctype or 'unknown'}" |
| _log_call_end("Web_Fetch", _truncate_for_log(result)) |
| return result |
| resp.encoding = resp.encoding or resp.apparent_encoding |
| html = resp.text |
| full_soup = BeautifulSoup(html, "lxml") |
| if canonical_mode == "html": |
| _log_call_end("Web_Fetch", f"chars={len(html)}, mode={canonical_mode}, offset=0 (ignored)") |
| return html |
| if canonical_mode == "markdown": |
| full_result = _fullpage_markdown_from_soup(full_soup, final_url, strip_selectors) |
| elif canonical_mode == "url_scraper": |
| full_result = _extract_links_from_soup(full_soup, final_url) |
| else: |
| full_result = html |
|
|
| if offset > 0: |
| if offset >= len(full_result): |
| result = ( |
| f"Offset {offset} exceeds content length ({len(full_result)} characters). " |
| f"Content ends at position {len(full_result)}." |
| ) |
| _log_call_end("Web_Fetch", _truncate_for_log(result)) |
| return result |
| result = full_result[offset:] |
| else: |
| result = full_result |
|
|
| if max_chars > 0 and len(result) > max_chars: |
| result, metadata = _truncate_with_notice(result, max_chars) |
| if offset > 0: |
| metadata["total_chars_estimate"] = len(full_result) |
| metadata["next_cursor"] = offset + metadata["next_cursor"] if metadata["next_cursor"] else None |
|
|
| _log_call_end("Web_Fetch", f"chars={len(result)}, mode={canonical_mode}, offset={offset}") |
| return result |
|
|
|
|
| def build_interface() -> gr.Interface: |
| return gr.Interface( |
| fn=Web_Fetch, |
| inputs=[ |
| gr.Textbox(label="URL", placeholder="https://example.com/article", max_lines=1), |
| gr.Slider( |
| minimum=0, |
| maximum=10000, |
| value=3000, |
| step=100, |
| label="Max Characters", |
| info="0 = no limit (full page), default 3000", |
| ), |
| gr.Slider( |
| minimum=0, |
| maximum=100000, |
| value=0, |
| step=100, |
| label="Offset", |
| info="Character offset to start from (use next_cursor from previous call for pagination)", |
| ), |
| gr.Textbox( |
| label="Strip Selectors", |
| placeholder=".header, .footer, nav, .sidebar", |
| value="", |
| max_lines=1, |
| info="CSS selectors to remove (comma-separated)", |
| ), |
| gr.Radio( |
| label="Mode", |
| choices=["Markdown Mode", "HTML Mode", "URL Scraper"], |
| value="Markdown Mode", |
| info="Markdown cleans content, HTML returns raw response, URL Scraper lists links.", |
| ), |
| ], |
| outputs=gr.Markdown(label="Extracted Content"), |
| title="Web Fetch", |
| description=( |
| "<div style=\"text-align:center\">Convert any webpage to Markdown, inspect the raw HTML response, or " |
| "extract all links. Supports custom element removal, length limits, and pagination with offset.</div>" |
| ), |
| api_description=TOOL_SUMMARY, |
| flagging_mode="never", |
| ) |
|
|
|
|
| __all__ = [ |
| "Web_Fetch", |
| "build_interface", |
| "_http_get_enhanced", |
| "_fullpage_markdown_from_soup", |
| ] |
|
|