0xmoose commited on
Commit
20f9d00
·
verified ·
1 Parent(s): 3bd24c5

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +159 -200
app.py CHANGED
@@ -1,265 +1,224 @@
 
1
  import re
2
- import socket
3
- import ipaddress
4
- from urllib.parse import urlparse, urljoin
5
 
6
  import httpx
7
  import gradio as gr
8
  from bs4 import BeautifulSoup
9
 
10
- try:
11
- # Optional but recommended for cleaner article-style extraction
12
- from readability import Document
13
- HAS_READABILITY = True
14
- except Exception:
15
- HAS_READABILITY = False
16
 
 
17
 
18
- # ----------------------------
19
- # Security / validation helpers
20
- # ----------------------------
21
- def _is_public_hostname(hostname: str) -> bool:
22
- """
23
- Resolve hostname and block private/loopback/link-local/reserved ranges.
24
- Mitigates SSRF against internal networks via DNS.
25
- """
26
- if not hostname:
27
- return False
28
-
29
  try:
30
- # Disallow obvious local hostnames
31
- hn = hostname.strip().lower()
32
- if hn in {"localhost", "localhost.localdomain"}:
33
- return False
34
-
35
- infos = socket.getaddrinfo(hostname, None)
36
- ips = {info[4][0] for info in infos}
37
-
38
- for ip_str in ips:
39
- ip = ipaddress.ip_address(ip_str)
40
- if (
41
- ip.is_private
42
- or ip.is_loopback
43
- or ip.is_link_local
44
- or ip.is_reserved
45
- or ip.is_multicast
46
- ):
47
- return False
48
- return True
49
  except Exception:
50
  return False
51
 
52
 
53
- def _validate_url(url: str) -> str:
54
- url = (url or "").strip()
55
- if not url:
56
- raise ValueError("URL is required.")
57
-
58
- parsed = urlparse(url)
59
- if parsed.scheme not in {"http", "https"}:
60
- raise ValueError("Only http:// and https:// URLs are allowed.")
61
 
62
- if not parsed.netloc:
63
- raise ValueError("Invalid URL (missing hostname).")
64
 
65
- # Block credentials in URL (e.g., http://user:pass@host)
66
- if parsed.username or parsed.password:
67
- raise ValueError("URLs containing credentials are not allowed.")
68
-
69
- if not _is_public_hostname(parsed.hostname):
70
- raise ValueError("Blocked hostname/IP (possible local/private network).")
 
71
 
72
- return url
 
73
 
 
 
 
74
 
75
- # ----------------------------
76
- # Extraction helpers
77
- # ----------------------------
78
- def _strip_text(text: str) -> str:
79
- text = re.sub(r"\n{3,}", "\n\n", text)
80
- text = re.sub(r"[ \t]{2,}", " ", text)
81
- return text.strip()
82
 
83
 
84
- def _extract_with_bs4(html: str, base_url: str):
85
- soup = BeautifulSoup(html, "html.parser")
 
 
 
 
86
 
87
- # Remove noisy tags
88
- for tag in soup(["script", "style", "noscript", "iframe"]):
89
- tag.decompose()
90
 
91
- title = (soup.title.string.strip() if soup.title and soup.title.string else "")[:300]
92
-
93
- # Basic meta
94
- meta = {}
95
- for m in soup.find_all("meta"):
96
- name = (m.get("name") or m.get("property") or "").strip()
97
- content = (m.get("content") or "").strip()
98
- if name and content and name.lower() in {
99
- "description",
100
- "og:title",
101
- "og:description",
102
- "og:url",
103
- "twitter:title",
104
- "twitter:description",
105
- }:
106
- meta[name] = content[:500]
107
-
108
- text = _strip_text(soup.get_text("\n"))
109
-
110
- # Links
111
  links = []
 
 
112
  for a in soup.find_all("a", href=True):
113
  href = a.get("href", "").strip()
114
  if not href:
115
  continue
116
  abs_url = urljoin(base_url, href)
117
- # keep only http(s)
118
- if urlparse(abs_url).scheme in {"http", "https"}:
119
- label = _strip_text(a.get_text(" "))[:200]
120
- links.append({"text": label, "url": abs_url})
121
 
122
- return title, meta, text, links
 
 
 
123
 
 
 
 
 
 
 
 
 
 
124
 
125
- def _extract_readable(html: str, base_url: str):
126
- """
127
- Use readability-lxml if available; fallback to BeautifulSoup extraction.
128
- """
129
- if not HAS_READABILITY:
130
- return _extract_with_bs4(html, base_url)
131
 
132
- doc = Document(html)
133
- title = (doc.short_title() or "")[:300]
134
- content_html = doc.summary(html_partial=True)
135
- return _extract_with_bs4(content_html, base_url)
136
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
137
 
138
- # ----------------------------
139
- # Fetcher (with redirect checks)
140
- # ----------------------------
141
- def _fetch_html(url: str, timeout_s: float, max_bytes: int, user_agent: str, max_redirects: int = 5):
142
  headers = {"User-Agent": user_agent}
143
- limits = httpx.Limits(max_keepalive_connections=5, max_connections=10)
 
 
 
 
144
 
145
- with httpx.Client(timeout=timeout_s, headers=headers, limits=limits, follow_redirects=False) as client:
146
- current = url
147
- for _ in range(max_redirects + 1):
148
- r = client.get(current)
149
- # Handle redirects manually so we can validate each hop
150
- if 300 <= r.status_code < 400 and "location" in r.headers:
151
- nxt = urljoin(current, r.headers["location"])
152
- _validate_url(nxt)
153
- current = nxt
154
- continue
155
 
156
- r.raise_for_status()
 
 
 
157
 
158
- ctype = (r.headers.get("content-type") or "").lower()
159
- if "text/html" not in ctype and "application/xhtml+xml" not in ctype:
160
- raise ValueError(f"Unsupported content-type: {ctype or 'unknown'} (expected HTML)")
161
 
162
- content = r.content
163
- if len(content) > max_bytes:
164
- raise ValueError(f"Response too large ({len(content)} bytes). Limit is {max_bytes} bytes.")
165
 
166
- # Best-effort decode
167
- try:
168
- html = content.decode(r.encoding or "utf-8", errors="replace")
169
- except Exception:
170
- html = content.decode("utf-8", errors="replace")
171
 
172
- return current, html
 
173
 
174
- raise ValueError("Too many redirects.")
175
 
 
 
 
 
176
 
177
- # ----------------------------
178
- # MCP tool + UI function
179
- # ----------------------------
180
- def scrape_url(
181
- url: str,
182
- include_links: bool = True,
183
- max_chars: int = 12000,
184
- timeout_seconds: float = 15.0,
185
- max_kb: int = 1024,
186
- user_agent: str = "Mozilla/5.0 (compatible; HFSpaceScraper/1.0; +https://huggingface.co/spaces)"
187
- ):
188
  """
189
- Scrape a single web page and return clean text + metadata.
190
 
191
- Args:
192
- url (str): The http(s) URL to fetch.
193
- include_links (bool): If true, include extracted hyperlinks.
194
- max_chars (int): Maximum number of characters returned for the main text.
195
- timeout_seconds (float): Network timeout in seconds.
196
- max_kb (int): Maximum HTML response size in kilobytes.
197
- user_agent (str): User-Agent header to send.
198
 
199
  Returns:
200
- dict: {final_url, title, meta, text, links}
201
  """
202
- url = _validate_url(url)
203
- max_bytes = int(max_kb) * 1024
204
-
205
- final_url, html = _fetch_html(
206
- url=url,
207
- timeout_s=float(timeout_seconds),
208
- max_bytes=max_bytes,
209
- user_agent=user_agent,
210
- )
211
-
212
- title, meta, text, links = _extract_readable(html, final_url)
213
 
214
- text = text[: max(0, int(max_chars))]
215
- if not include_links:
216
- links = []
 
217
 
218
- return {
219
- "final_url": final_url,
220
- "title": title,
221
- "meta": meta,
222
- "text": text,
223
- "links": links[:200], # cap link count
224
- "note": "readability-lxml enabled" if HAS_READABILITY else "readability-lxml not installed; using basic extraction",
225
- }
226
 
 
227
 
228
- # ----------------------------
229
- # Gradio UI
230
- # ----------------------------
231
- with gr.Blocks(title="URL Scraper (MCP)") as demo:
232
  gr.Markdown(
233
  """
234
- # URL Scraper (MCP-compatible)
235
-
236
- - Paste a URL get extracted text, title, metadata, and links.
237
- - This Space also exposes the scraper as an **MCP tool**.
238
-
239
- **MCP endpoint (after deploy):** `https://<your-space>.hf.space/gradio_api/mcp/`
240
- """
241
  )
242
 
243
  with gr.Row():
244
- url_in = gr.Textbox(label="URL", placeholder="https://example.com/article")
245
- with gr.Row():
246
- include_links_in = gr.Checkbox(label="Include links", value=True)
247
- max_chars_in = gr.Slider(1000, 50000, value=12000, step=500, label="Max returned characters")
248
- with gr.Accordion("Advanced", open=False):
249
- timeout_in = gr.Slider(5, 60, value=15, step=1, label="Timeout (seconds)")
250
- max_kb_in = gr.Slider(128, 4096, value=1024, step=128, label="Max HTML size (KB)")
251
- ua_in = gr.Textbox(label="User-Agent", value="Mozilla/5.0 (compatible; HFSpaceScraper/1.0; +https://huggingface.co/spaces)")
252
 
253
- scrape_btn = gr.Button("Scrape")
 
 
254
 
255
- out = gr.JSON(label="Result")
 
256
 
257
- scrape_btn.click(
258
  fn=scrape_url,
259
- inputs=[url_in, include_links_in, max_chars_in, timeout_in, max_kb_in, ua_in],
260
- outputs=[out],
261
- api_name="scrape_url", # tool name in Gradio API (and MCP)
262
  )
263
 
264
  if __name__ == "__main__":
265
- demo.launch(mcp_server=True)
 
 
 
 
 
 
 
1
+ import os
2
  import re
3
+ import json
4
+ import asyncio
5
+ from urllib.parse import urljoin, urlparse
6
 
7
  import httpx
8
  import gradio as gr
9
  from bs4 import BeautifulSoup
10
 
 
 
 
 
 
 
11
 
12
+ # --- Scraper core helpers ---
13
 
14
+ def _is_valid_url(url: str) -> bool:
 
 
 
 
 
 
 
 
 
 
15
  try:
16
+ u = urlparse(url.strip())
17
+ return u.scheme in {"http", "https"} and bool(u.netloc)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
18
  except Exception:
19
  return False
20
 
21
 
22
+ def _clean_text(s: str) -> str:
23
+ s = re.sub(r"\s+", " ", s or "").strip()
24
+ return s
 
 
 
 
 
25
 
 
 
26
 
27
+ def _extract_main_text(html: str) -> str:
28
+ """
29
+ Lightweight "main text" extraction (no heavy ML deps):
30
+ - remove script/style/nav/footer/header/aside
31
+ - prefer <main> or <article>, otherwise body
32
+ """
33
+ soup = BeautifulSoup(html, "lxml")
34
 
35
+ for tag in soup(["script", "style", "noscript"]):
36
+ tag.decompose()
37
 
38
+ for selector in ["nav", "footer", "header", "aside"]:
39
+ for tag in soup.select(selector):
40
+ tag.decompose()
41
 
42
+ container = soup.find("main") or soup.find("article") or soup.body or soup
43
+ text = container.get_text(" ", strip=True)
44
+ return _clean_text(text)
 
 
 
 
45
 
46
 
47
+ def _extract_title(html: str) -> str:
48
+ soup = BeautifulSoup(html, "lxml")
49
+ if soup.title and soup.title.string:
50
+ return _clean_text(soup.title.string)
51
+ h1 = soup.find("h1")
52
+ return _clean_text(h1.get_text(strip=True)) if h1 else ""
53
 
 
 
 
54
 
55
+ def _extract_links(base_url: str, html: str, limit: int = 50) -> list[dict]:
56
+ soup = BeautifulSoup(html, "lxml")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
57
  links = []
58
+ seen = set()
59
+
60
  for a in soup.find_all("a", href=True):
61
  href = a.get("href", "").strip()
62
  if not href:
63
  continue
64
  abs_url = urljoin(base_url, href)
65
+ abs_url = abs_url.split("#", 1)[0]
 
 
 
66
 
67
+ if not _is_valid_url(abs_url):
68
+ continue
69
+ if abs_url in seen:
70
+ continue
71
 
72
+ seen.add(abs_url)
73
+ links.append(
74
+ {
75
+ "url": abs_url,
76
+ "text": _clean_text(a.get_text(" ", strip=True))[:200],
77
+ }
78
+ )
79
+ if len(links) >= limit:
80
+ break
81
 
82
+ return links
 
 
 
 
 
83
 
 
 
 
 
84
 
85
+ def _safe_truncate(s: str, max_chars: int) -> str:
86
+ if len(s) <= max_chars:
87
+ return s
88
+ return s[: max_chars - 3] + "..."
89
+
90
+
91
+ # --- MCP-exposed tool functions (type hints + docstrings help MCP clients) ---
92
+
93
+ def scrape_url(
94
+ url: str,
95
+ *,
96
+ mode: str = "text",
97
+ timeout_s: int = 20,
98
+ max_chars: int = 12000,
99
+ follow_redirects: bool = True,
100
+ user_agent: str = "Mozilla/5.0 (compatible; GradioMCPUrlScraper/1.0)",
101
+ ) -> dict:
102
+ """
103
+ Fetch and scrape a URL.
104
+
105
+ Parameters:
106
+ url: The http(s) URL to fetch.
107
+ mode: One of:
108
+ - "text": returns title + extracted main text
109
+ - "html": returns raw HTML (truncated)
110
+ - "links": returns list of outgoing links (url + anchor text)
111
+ - "all": returns title + text + links + html (truncated)
112
+ timeout_s: Request timeout in seconds.
113
+ max_chars: Maximum characters returned for large fields.
114
+ follow_redirects: Whether to follow redirects.
115
+ user_agent: Custom User-Agent header.
116
+
117
+ Returns:
118
+ A JSON-serializable dict with fields depending on mode.
119
+ """
120
+ url = (url or "").strip()
121
+ if not _is_valid_url(url):
122
+ return {"ok": False, "error": "Invalid URL. Must start with http:// or https://", "url": url}
123
 
 
 
 
 
124
  headers = {"User-Agent": user_agent}
125
+ try:
126
+ with httpx.Client(headers=headers, timeout=timeout_s, follow_redirects=follow_redirects) as client:
127
+ r = client.get(url)
128
+ content_type = (r.headers.get("content-type") or "").lower()
129
+ html = r.text if "text" in content_type or "html" in content_type or not content_type else r.text
130
 
131
+ out: dict = {
132
+ "ok": True,
133
+ "url": str(r.url),
134
+ "status_code": r.status_code,
135
+ "content_type": content_type,
136
+ }
 
 
 
 
137
 
138
+ # Always compute title if HTML-ish
139
+ title = _extract_title(html)
140
+ if title:
141
+ out["title"] = title
142
 
143
+ mode = (mode or "text").strip().lower()
144
+ if mode not in {"text", "html", "links", "all"}:
145
+ return {"ok": False, "error": f"Invalid mode '{mode}'. Use text|html|links|all.", "url": url}
146
 
147
+ if mode in {"text", "all"}:
148
+ text = _extract_main_text(html)
149
+ out["text"] = _safe_truncate(text, max_chars)
150
 
151
+ if mode in {"links", "all"}:
152
+ out["links"] = _extract_links(str(r.url), html, limit=50)
 
 
 
153
 
154
+ if mode in {"html", "all"}:
155
+ out["html"] = _safe_truncate(html, max_chars)
156
 
157
+ return out
158
 
159
+ except httpx.HTTPError as e:
160
+ return {"ok": False, "error": f"HTTP error: {type(e).__name__}: {str(e)}", "url": url}
161
+ except Exception as e:
162
+ return {"ok": False, "error": f"Unexpected error: {type(e).__name__}: {str(e)}", "url": url}
163
 
164
+
165
+ def scrape_many(urls_json: str, mode: str = "text") -> list[dict]:
 
 
 
 
 
 
 
 
 
166
  """
167
+ Scrape multiple URLs in one call.
168
 
169
+ Parameters:
170
+ urls_json: JSON array of URLs, e.g. ["https://example.com", "https://example.org"]
171
+ mode: text|html|links|all
 
 
 
 
172
 
173
  Returns:
174
+ List of scrape_url() results.
175
  """
176
+ try:
177
+ urls = json.loads(urls_json)
178
+ if not isinstance(urls, list):
179
+ raise ValueError("urls_json must be a JSON array")
180
+ except Exception as e:
181
+ return [{"ok": False, "error": f"Invalid JSON array: {str(e)}", "url": ""}]
 
 
 
 
 
182
 
183
+ results = []
184
+ for u in urls[:25]: # prevent abuse
185
+ results.append(scrape_url(str(u), mode=mode))
186
+ return results
187
 
 
 
 
 
 
 
 
 
188
 
189
+ # --- Gradio UI ---
190
 
191
+ with gr.Blocks(title="MCP URL Scraper") as demo:
 
 
 
192
  gr.Markdown(
193
  """
194
+ # MCP URL Scraper (Gradio + Hugging Face Spaces)
195
+ - Use the UI to scrape a single URL
196
+ - Or connect as an MCP server (tools: `scrape_url`, `scrape_many`)
197
+ """
 
 
 
198
  )
199
 
200
  with gr.Row():
201
+ url_in = gr.Textbox(label="URL", placeholder="https://example.com", scale=3)
202
+ mode_in = gr.Dropdown(["text", "links", "html", "all"], value="text", label="Mode", scale=1)
 
 
 
 
 
 
203
 
204
+ with gr.Row():
205
+ timeout_in = gr.Slider(5, 60, value=20, step=1, label="Timeout (s)")
206
+ maxchars_in = gr.Slider(1000, 50000, value=12000, step=1000, label="Max chars returned")
207
 
208
+ run_btn = gr.Button("Scrape")
209
+ out_json = gr.JSON(label="Result")
210
 
211
+ run_btn.click(
212
  fn=scrape_url,
213
+ inputs=[url_in, mode_in, timeout_in, maxchars_in],
214
+ outputs=[out_json],
 
215
  )
216
 
217
  if __name__ == "__main__":
218
+ # Helps avoid some SSR-related weirdness on Spaces; users have reported ssr_mode=False as a workaround. :contentReference[oaicite:2]{index=2}
219
+ demo.launch(
220
+ server_name="0.0.0.0",
221
+ server_port=int(os.getenv("PORT", "7860")),
222
+ ssr_mode=False,
223
+ mcp_server=True, # this enables the MCP endpoints :contentReference[oaicite:3]{index=3}
224
+ )