| | from __future__ import annotations |
| |
|
| | import re |
| | from typing import Annotated, Dict, Literal, Tuple |
| | from urllib.parse import urlparse, urljoin |
| |
|
| | import gradio as gr |
| | import requests |
| | from bs4 import BeautifulSoup |
| | from markdownify import markdownify as md |
| | from readability import Document |
| |
|
| | from app import _fetch_rate_limiter, _log_call_end, _log_call_start, _truncate_for_log |
| | from ._docstrings import autodoc |
| |
|
| |
|
| | |
| | TOOL_SUMMARY = ( |
| | "Fetch a webpage and return clean Markdown, raw HTML, or a list of links, with max length and pagination via " |
| | "offset; if truncated, the output includes a notice with next_cursor for exact continuation." |
| | ) |
| |
|
| | ModeOption = Literal["markdown", "html", "url_scraper"] |
| |
|
| |
|
| | def _http_get_enhanced(url: str, timeout: int | float = 30, *, skip_rate_limit: bool = False) -> requests.Response: |
| | headers = { |
| | "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", |
| | "Accept-Language": "en-US,en;q=0.9", |
| | "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", |
| | "Accept-Encoding": "gzip, deflate, br", |
| | "DNT": "1", |
| | "Connection": "keep-alive", |
| | "Upgrade-Insecure-Requests": "1", |
| | } |
| | if not skip_rate_limit: |
| | _fetch_rate_limiter.acquire() |
| | try: |
| | response = requests.get( |
| | url, |
| | headers=headers, |
| | timeout=timeout, |
| | allow_redirects=True, |
| | stream=False, |
| | ) |
| | response.raise_for_status() |
| | return response |
| | except requests.exceptions.Timeout as exc: |
| | raise requests.exceptions.RequestException("Request timed out. The webpage took too long to respond.") from exc |
| | except requests.exceptions.ConnectionError as exc: |
| | raise requests.exceptions.RequestException("Connection error. Please check the URL and your internet connection.") from exc |
| | except requests.exceptions.HTTPError as exc: |
| | if response.status_code == 403: |
| | raise requests.exceptions.RequestException("Access forbidden. The website may be blocking automated requests.") from exc |
| | if response.status_code == 404: |
| | raise requests.exceptions.RequestException("Page not found. Please check the URL.") from exc |
| | if response.status_code == 429: |
| | raise requests.exceptions.RequestException("Rate limited. Please try again in a few minutes.") from exc |
| | raise requests.exceptions.RequestException(f"HTTP error {response.status_code}: {exc}") from exc |
| |
|
| |
|
| | def _normalize_whitespace(text: str) -> str: |
| | text = re.sub(r"[ \t\u00A0]+", " ", text) |
| | text = re.sub(r"\n\s*\n\s*\n+", "\n\n", text.strip()) |
| | return text.strip() |
| |
|
| |
|
| | def _truncate(text: str, max_chars: int) -> Tuple[str, bool]: |
| | if max_chars is None or max_chars <= 0 or len(text) <= max_chars: |
| | return text, False |
| | return text[:max_chars].rstrip() + " …", True |
| |
|
| |
|
| | def _shorten(text: str, limit: int) -> str: |
| | if limit <= 0 or len(text) <= limit: |
| | return text |
| | return text[: max(0, limit - 1)].rstrip() + "…" |
| |
|
| |
|
| | def _domain_of(url: str) -> str: |
| | try: |
| | return urlparse(url).netloc or "" |
| | except Exception: |
| | return "" |
| |
|
| |
|
| | def _normalize_mode(mode: str | None) -> ModeOption: |
| | """Convert UI-supplied labels into canonical mode values.""" |
| | if not mode: |
| | return "markdown" |
| | normalized = mode.strip().lower() |
| | normalized = normalized.replace("-", "_").replace(" ", "_") |
| | if normalized in {"markdown", "markdown_mode", "md"}: |
| | return "markdown" |
| | if normalized in {"html", "html_mode"}: |
| | return "html" |
| | if normalized in {"url_scraper", "urlscraper", "url_mode", "scraper", "links", "link_mode"}: |
| | return "url_scraper" |
| | return "markdown" |
| |
|
| |
|
| | def _extract_links_from_soup(soup: BeautifulSoup, base_url: str) -> str: |
| | links = [] |
| | for link in soup.find_all("a", href=True): |
| | href = link.get("href") |
| | text = link.get_text(strip=True) |
| | if href.startswith("http"): |
| | full_url = href |
| | elif href.startswith("//"): |
| | full_url = "https:" + href |
| | elif href.startswith("/"): |
| | full_url = urljoin(base_url, href) |
| | else: |
| | full_url = urljoin(base_url, href) |
| | if text and href not in ["#", "javascript:void(0)"]: |
| | links.append(f"- [{text}]({full_url})") |
| | if not links: |
| | return "No links found on this page." |
| | title = soup.find("title") |
| | title_text = title.get_text(strip=True) if title else "Links from webpage" |
| | return f"# {title_text}\n\n" + "\n".join(links) |
| |
|
| |
|
| | def _fullpage_markdown_from_soup(full_soup: BeautifulSoup, base_url: str, strip_selectors: str = "") -> str: |
| | if strip_selectors: |
| | selectors = [s.strip() for s in strip_selectors.split(",") if s.strip()] |
| | for selector in selectors: |
| | try: |
| | for element in full_soup.select(selector): |
| | element.decompose() |
| | except Exception: |
| | continue |
| | for element in full_soup.select("script, style, nav, footer, header, aside"): |
| | element.decompose() |
| | main = ( |
| | full_soup.find("main") |
| | or full_soup.find("article") |
| | or full_soup.find("div", class_=re.compile(r"content|main|post|article", re.I)) |
| | or full_soup.find("body") |
| | ) |
| | if not main: |
| | return "No main content found on the webpage." |
| | markdown_text = md(str(main), heading_style="ATX") |
| | markdown_text = re.sub(r"\n{3,}", "\n\n", markdown_text) |
| | markdown_text = re.sub(r"\[\s*\]\([^)]*\)", "", markdown_text) |
| | markdown_text = re.sub(r"[ \t]+", " ", markdown_text) |
| | markdown_text = markdown_text.strip() |
| | title = full_soup.find("title") |
| | if title and title.get_text(strip=True): |
| | markdown_text = f"# {title.get_text(strip=True)}\n\n{markdown_text}" |
| | return markdown_text or "No content could be extracted." |
| |
|
| |
|
| | def _truncate_with_notice(content: str, max_chars: int) -> Tuple[str, Dict[str, object]]: |
| | total_chars = len(content) |
| | if total_chars <= max_chars: |
| | return content, { |
| | "truncated": False, |
| | "returned_chars": total_chars, |
| | "total_chars_estimate": total_chars, |
| | "next_cursor": None, |
| | } |
| | truncated = content[:max_chars] |
| | last_paragraph = truncated.rfind("\n\n") |
| | if last_paragraph > max_chars * 0.7: |
| | truncated = truncated[:last_paragraph] |
| | cursor_pos = last_paragraph |
| | elif "." in truncated[-100:]: |
| | last_period = truncated.rfind(".") |
| | if last_period > max_chars * 0.8: |
| | truncated = truncated[: last_period + 1] |
| | cursor_pos = last_period + 1 |
| | else: |
| | cursor_pos = len(truncated) |
| | else: |
| | cursor_pos = len(truncated) |
| | metadata = { |
| | "truncated": True, |
| | "returned_chars": len(truncated), |
| | "total_chars_estimate": total_chars, |
| | "next_cursor": cursor_pos, |
| | } |
| | truncated = truncated.rstrip() |
| | truncation_notice = ( |
| | "\n\n---\n" |
| | f"**Content Truncated:** Showing {metadata['returned_chars']:,} of {metadata['total_chars_estimate']:,} characters " |
| | f"({(metadata['returned_chars']/metadata['total_chars_estimate']*100):.1f}%)\n" |
| | f"**Next cursor:** {metadata['next_cursor']} (use this value with offset parameter for continuation)\n" |
| | "---" |
| | ) |
| | return truncated + truncation_notice, metadata |
| |
|
| |
|
| | @autodoc(summary=TOOL_SUMMARY) |
| | def Web_Fetch( |
| | url: Annotated[str, "The absolute URL to fetch (must return HTML)."], |
| | max_chars: Annotated[int, "Maximum characters to return (0 = no limit, full page content)."] = 3000, |
| | offset: Annotated[int, "Character offset to start from (for pagination, use next_cursor from previous call)."] = 0, |
| | strip_selectors: Annotated[str, "CSS selectors to remove (comma-separated, e.g., '.header, .footer, nav')."] = "", |
| | mode: Annotated[ |
| | str, |
| | "Output mode: 'markdown' (default, clean content), 'html' (raw response), or 'url_scraper' (links list).", |
| | ] = "markdown", |
| | ) -> str: |
| | canonical_mode = _normalize_mode(mode) |
| | _log_call_start( |
| | "Web_Fetch", |
| | url=url, |
| | max_chars=max_chars, |
| | strip_selectors=strip_selectors, |
| | mode=canonical_mode, |
| | offset=offset, |
| | ) |
| | if not url or not url.strip(): |
| | result = "Please enter a valid URL." |
| | _log_call_end("Web_Fetch", _truncate_for_log(result)) |
| | return result |
| | try: |
| | resp = _http_get_enhanced(url) |
| | resp.raise_for_status() |
| | except requests.exceptions.RequestException as exc: |
| | result = f"An error occurred: {exc}" |
| | _log_call_end("Web_Fetch", _truncate_for_log(result)) |
| | return result |
| | final_url = str(resp.url) |
| | ctype = resp.headers.get("Content-Type", "") |
| | if "html" not in ctype.lower(): |
| | result = f"Unsupported content type for extraction: {ctype or 'unknown'}" |
| | _log_call_end("Web_Fetch", _truncate_for_log(result)) |
| | return result |
| | resp.encoding = resp.encoding or resp.apparent_encoding |
| | html = resp.text |
| | full_soup = BeautifulSoup(html, "lxml") |
| | if canonical_mode == "html": |
| | _log_call_end("Web_Fetch", f"chars={len(html)}, mode={canonical_mode}, offset=0 (ignored)") |
| | return html |
| | if canonical_mode == "markdown": |
| | full_result = _fullpage_markdown_from_soup(full_soup, final_url, strip_selectors) |
| | elif canonical_mode == "url_scraper": |
| | full_result = _extract_links_from_soup(full_soup, final_url) |
| | else: |
| | full_result = html |
| |
|
| | if offset > 0: |
| | if offset >= len(full_result): |
| | result = ( |
| | f"Offset {offset} exceeds content length ({len(full_result)} characters). " |
| | f"Content ends at position {len(full_result)}." |
| | ) |
| | _log_call_end("Web_Fetch", _truncate_for_log(result)) |
| | return result |
| | result = full_result[offset:] |
| | else: |
| | result = full_result |
| |
|
| | if max_chars > 0 and len(result) > max_chars: |
| | result, metadata = _truncate_with_notice(result, max_chars) |
| | if offset > 0: |
| | metadata["total_chars_estimate"] = len(full_result) |
| | metadata["next_cursor"] = offset + metadata["next_cursor"] if metadata["next_cursor"] else None |
| |
|
| | _log_call_end("Web_Fetch", f"chars={len(result)}, mode={canonical_mode}, offset={offset}") |
| | return result |
| |
|
| |
|
| | def build_interface() -> gr.Interface: |
| | return gr.Interface( |
| | fn=Web_Fetch, |
| | inputs=[ |
| | gr.Textbox(label="URL", placeholder="https://example.com/article", max_lines=1), |
| | gr.Slider( |
| | minimum=0, |
| | maximum=10000, |
| | value=3000, |
| | step=100, |
| | label="Max Characters", |
| | info="0 = no limit (full page), default 3000", |
| | ), |
| | gr.Slider( |
| | minimum=0, |
| | maximum=100000, |
| | value=0, |
| | step=100, |
| | label="Offset", |
| | info="Character offset to start from (use next_cursor from previous call for pagination)", |
| | ), |
| | gr.Textbox( |
| | label="Strip Selectors", |
| | placeholder=".header, .footer, nav, .sidebar", |
| | value="", |
| | max_lines=1, |
| | info="CSS selectors to remove (comma-separated)", |
| | ), |
| | gr.Radio( |
| | label="Mode", |
| | choices=["Markdown Mode", "HTML Mode", "URL Scraper"], |
| | value="Markdown Mode", |
| | info="Markdown cleans content, HTML returns raw response, URL Scraper lists links.", |
| | ), |
| | ], |
| | outputs=gr.Markdown(label="Extracted Content"), |
| | title="Web Fetch", |
| | description=( |
| | "<div style=\"text-align:center\">Convert any webpage to Markdown, inspect the raw HTML response, or " |
| | "extract all links. Supports custom element removal, length limits, and pagination with offset.</div>" |
| | ), |
| | api_description=TOOL_SUMMARY, |
| | flagging_mode="never", |
| | ) |
| |
|
| |
|
| | __all__ = [ |
| | "Web_Fetch", |
| | "build_interface", |
| | "_http_get_enhanced", |
| | "_fullpage_markdown_from_soup", |
| | ] |
| |
|