|
|
import os |
|
|
import re |
|
|
import json |
|
|
import asyncio |
|
|
from urllib.parse import urljoin, urlparse |
|
|
|
|
|
import httpx |
|
|
import gradio as gr |
|
|
from bs4 import BeautifulSoup |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _is_valid_url(url: str) -> bool: |
|
|
try: |
|
|
u = urlparse(url.strip()) |
|
|
return u.scheme in {"http", "https"} and bool(u.netloc) |
|
|
except Exception: |
|
|
return False |
|
|
|
|
|
|
|
|
def _clean_text(s: str) -> str: |
|
|
s = re.sub(r"\s+", " ", s or "").strip() |
|
|
return s |
|
|
|
|
|
|
|
|
def _extract_main_text(html: str) -> str: |
|
|
""" |
|
|
Lightweight "main text" extraction (no heavy ML deps): |
|
|
- remove script/style/nav/footer/header/aside |
|
|
- prefer <main> or <article>, otherwise body |
|
|
""" |
|
|
soup = BeautifulSoup(html, "lxml") |
|
|
|
|
|
for tag in soup(["script", "style", "noscript"]): |
|
|
tag.decompose() |
|
|
|
|
|
for selector in ["nav", "footer", "header", "aside"]: |
|
|
for tag in soup.select(selector): |
|
|
tag.decompose() |
|
|
|
|
|
container = soup.find("main") or soup.find("article") or soup.body or soup |
|
|
text = container.get_text(" ", strip=True) |
|
|
return _clean_text(text) |
|
|
|
|
|
|
|
|
def _extract_title(html: str) -> str: |
|
|
soup = BeautifulSoup(html, "lxml") |
|
|
if soup.title and soup.title.string: |
|
|
return _clean_text(soup.title.string) |
|
|
h1 = soup.find("h1") |
|
|
return _clean_text(h1.get_text(strip=True)) if h1 else "" |
|
|
|
|
|
|
|
|
def _extract_links(base_url: str, html: str, limit: int = 50) -> list[dict]: |
|
|
soup = BeautifulSoup(html, "lxml") |
|
|
links = [] |
|
|
seen = set() |
|
|
|
|
|
for a in soup.find_all("a", href=True): |
|
|
href = a.get("href", "").strip() |
|
|
if not href: |
|
|
continue |
|
|
abs_url = urljoin(base_url, href) |
|
|
abs_url = abs_url.split("#", 1)[0] |
|
|
|
|
|
if not _is_valid_url(abs_url): |
|
|
continue |
|
|
if abs_url in seen: |
|
|
continue |
|
|
|
|
|
seen.add(abs_url) |
|
|
links.append( |
|
|
{ |
|
|
"url": abs_url, |
|
|
"text": _clean_text(a.get_text(" ", strip=True))[:200], |
|
|
} |
|
|
) |
|
|
if len(links) >= limit: |
|
|
break |
|
|
|
|
|
return links |
|
|
|
|
|
|
|
|
def _safe_truncate(s: str, max_chars: int) -> str: |
|
|
if len(s) <= max_chars: |
|
|
return s |
|
|
return s[: max_chars - 3] + "..." |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def scrape_url( |
|
|
url: str, |
|
|
*, |
|
|
mode: str = "text", |
|
|
timeout_s: int = 20, |
|
|
max_chars: int = 12000, |
|
|
follow_redirects: bool = True, |
|
|
user_agent: str = "Mozilla/5.0 (compatible; GradioMCPUrlScraper/1.0)", |
|
|
) -> dict: |
|
|
""" |
|
|
Fetch and scrape a URL. |
|
|
|
|
|
Parameters: |
|
|
url: The http(s) URL to fetch. |
|
|
mode: One of: |
|
|
- "text": returns title + extracted main text |
|
|
- "html": returns raw HTML (truncated) |
|
|
- "links": returns list of outgoing links (url + anchor text) |
|
|
- "all": returns title + text + links + html (truncated) |
|
|
timeout_s: Request timeout in seconds. |
|
|
max_chars: Maximum characters returned for large fields. |
|
|
follow_redirects: Whether to follow redirects. |
|
|
user_agent: Custom User-Agent header. |
|
|
|
|
|
Returns: |
|
|
A JSON-serializable dict with fields depending on mode. |
|
|
""" |
|
|
url = (url or "").strip() |
|
|
if not _is_valid_url(url): |
|
|
return {"ok": False, "error": "Invalid URL. Must start with http:// or https://", "url": url} |
|
|
|
|
|
headers = {"User-Agent": user_agent} |
|
|
try: |
|
|
with httpx.Client(headers=headers, timeout=timeout_s, follow_redirects=follow_redirects) as client: |
|
|
r = client.get(url) |
|
|
content_type = (r.headers.get("content-type") or "").lower() |
|
|
html = r.text if "text" in content_type or "html" in content_type or not content_type else r.text |
|
|
|
|
|
out: dict = { |
|
|
"ok": True, |
|
|
"url": str(r.url), |
|
|
"status_code": r.status_code, |
|
|
"content_type": content_type, |
|
|
} |
|
|
|
|
|
|
|
|
title = _extract_title(html) |
|
|
if title: |
|
|
out["title"] = title |
|
|
|
|
|
mode = (mode or "text").strip().lower() |
|
|
if mode not in {"text", "html", "links", "all"}: |
|
|
return {"ok": False, "error": f"Invalid mode '{mode}'. Use text|html|links|all.", "url": url} |
|
|
|
|
|
if mode in {"text", "all"}: |
|
|
text = _extract_main_text(html) |
|
|
out["text"] = _safe_truncate(text, max_chars) |
|
|
|
|
|
if mode in {"links", "all"}: |
|
|
out["links"] = _extract_links(str(r.url), html, limit=50) |
|
|
|
|
|
if mode in {"html", "all"}: |
|
|
out["html"] = _safe_truncate(html, max_chars) |
|
|
|
|
|
return out |
|
|
|
|
|
except httpx.HTTPError as e: |
|
|
return {"ok": False, "error": f"HTTP error: {type(e).__name__}: {str(e)}", "url": url} |
|
|
except Exception as e: |
|
|
return {"ok": False, "error": f"Unexpected error: {type(e).__name__}: {str(e)}", "url": url} |
|
|
|
|
|
|
|
|
def scrape_many(urls_json: str, mode: str = "text") -> list[dict]: |
|
|
""" |
|
|
Scrape multiple URLs in one call. |
|
|
|
|
|
Parameters: |
|
|
urls_json: JSON array of URLs, e.g. ["https://example.com", "https://example.org"] |
|
|
mode: text|html|links|all |
|
|
|
|
|
Returns: |
|
|
List of scrape_url() results. |
|
|
""" |
|
|
try: |
|
|
urls = json.loads(urls_json) |
|
|
if not isinstance(urls, list): |
|
|
raise ValueError("urls_json must be a JSON array") |
|
|
except Exception as e: |
|
|
return [{"ok": False, "error": f"Invalid JSON array: {str(e)}", "url": ""}] |
|
|
|
|
|
results = [] |
|
|
for u in urls[:25]: |
|
|
results.append(scrape_url(str(u), mode=mode)) |
|
|
return results |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks(title="MCP URL Scraper") as demo: |
|
|
gr.Markdown( |
|
|
""" |
|
|
# MCP URL Scraper (Gradio + Hugging Face Spaces) |
|
|
- Use the UI to scrape a single URL |
|
|
- Or connect as an MCP server (tools: `scrape_url`, `scrape_many`) |
|
|
""" |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
url_in = gr.Textbox(label="URL", placeholder="https://example.com", scale=3) |
|
|
mode_in = gr.Dropdown(["text", "links", "html", "all"], value="text", label="Mode", scale=1) |
|
|
|
|
|
with gr.Row(): |
|
|
timeout_in = gr.Slider(5, 60, value=20, step=1, label="Timeout (s)") |
|
|
maxchars_in = gr.Slider(1000, 50000, value=12000, step=1000, label="Max chars returned") |
|
|
|
|
|
run_btn = gr.Button("Scrape") |
|
|
out_json = gr.JSON(label="Result") |
|
|
|
|
|
run_btn.click( |
|
|
fn=scrape_url, |
|
|
inputs=[url_in, mode_in, timeout_in, maxchars_in], |
|
|
outputs=[out_json], |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
demo.launch( |
|
|
server_name="0.0.0.0", |
|
|
server_port=int(os.getenv("PORT", "7860")), |
|
|
ssr_mode=False, |
|
|
mcp_server=True, |
|
|
) |
|
|
|