Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python3 | |
| """ | |
| π·οΈ TRUE HIERARCHICAL SPIDERWEB CRAWLER | |
| - Outputs nested JSON tree: {url, title, ..., children: [...]} | |
| - Each node knows its parent implicitly via structure | |
| - Skips binaries, login pages, DNS errors | |
| - Optimized for 16GB RAM / 2 vCPU | |
| - MCP server on 0.0.0.0:7860 | |
| """ | |
| import os | |
| import sys | |
| import json | |
| import tempfile | |
| import subprocess | |
| import hashlib | |
| import re | |
| import multiprocessing | |
| from pathlib import Path | |
| from urllib.parse import urljoin, urlparse | |
| from typing import Tuple, Dict, Any | |
| import gradio as gr | |
| from loguru import logger | |
| # === LOG AUTO-CLEAN === | |
| LOG_FILE = Path(tempfile.gettempdir()) / "mcp_spider.log" | |
| if LOG_FILE.exists(): | |
| with open(LOG_FILE, "r") as f: | |
| lines = f.readlines() | |
| if len(lines) > 10000: | |
| with open(LOG_FILE, "w") as f: | |
| f.writelines(lines[-5000:]) | |
| logger.remove() | |
| logger.add(sys.stderr, level="INFO") | |
| logger.add(LOG_FILE, rotation="10 MB") | |
| # === PLAYWRIGHT INSTALL === | |
| def install_playwright(): | |
| try: | |
| from playwright.sync_api import sync_playwright | |
| with sync_playwright() as p: | |
| p.chromium.launch(headless=True).close() | |
| except Exception: | |
| subprocess.run([sys.executable, "-m", "playwright", "install", "chromium"], | |
| stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
| install_playwright() | |
| CACHE_DIR = Path(tempfile.gettempdir()) / "mcp_spider_cache" | |
| CACHE_DIR.mkdir(exist_ok=True, parents=True) | |
| # === SPIDER THAT TRACKS PARENT === | |
| SPIDER_CODE = r''' | |
| import sys | |
| import json | |
| import re | |
| from urllib.parse import urljoin, urlparse | |
| from scrapy.crawler import CrawlerProcess | |
| from scrapy import Spider, Request | |
| from bs4 import BeautifulSoup | |
| class HierarchicalSpider(Spider): | |
| name = "hierarchical_spider" | |
| custom_settings = { | |
| "USER_AGENT": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36", | |
| "ROBOTSTXT_OBEY": False, | |
| "DOWNLOAD_DELAY": 0.0, | |
| "CONCURRENT_REQUESTS_PER_DOMAIN": 16, | |
| "CONCURRENT_REQUESTS": 32, | |
| "COOKIES_ENABLED": False, | |
| "TELNETCONSOLE_ENABLED": False, | |
| "PLAYWRIGHT_BROWSER_TYPE": "chromium", | |
| "PLAYWRIGHT_LAUNCH_OPTIONS": {"headless": True, "args": ["--no-sandbox"]}, | |
| "RETRY_TIMES": 2, | |
| "RETRY_HTTP_CODES": [500, 502, 503, 504, 408, 429], | |
| "FEEDS": {sys.argv[6]: {"format": "json", "overwrite": True}}, | |
| } | |
| def __init__(self, start_url, max_depth, max_pages, use_js, **kwargs): | |
| super().__init__(**kwargs) | |
| self.start_url = start_url | |
| self.max_depth = int(max_depth) | |
| self.max_pages = int(max_pages) | |
| self.use_js = use_js | |
| self.base_domain = urlparse(start_url).netloc | |
| self.pages_crawled = 0 | |
| self.seen = set() | |
| def start_requests(self): | |
| yield Request( | |
| self.start_url, | |
| callback=self.parse_page, | |
| meta={"depth": 0, "parent": None, "playwright": self.use_js}, | |
| dont_filter=True | |
| ) | |
| def parse_page(self, response): | |
| if self.pages_crawled >= self.max_pages: | |
| return | |
| url = response.url | |
| if url in self.seen: | |
| return | |
| self.seen.add(url) | |
| ct = response.headers.get("Content-Type", b"").decode().lower() | |
| if "text/html" not in ct: | |
| return | |
| if any(x in url.lower() for x in ["/login", "/user/", "/auth", ".pdf", ".jpg", ".png", ".zip"]): | |
| return | |
| try: | |
| soup = BeautifulSoup(response.text, "html.parser") | |
| except: | |
| return | |
| for tag in soup(["script", "style", "nav", "footer", "header", "aside"]): | |
| tag.decompose() | |
| text = soup.get_text(" ", strip=True) | |
| emails = re.findall(r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}", text) | |
| email = emails[0] if emails else "" | |
| try: | |
| from markdownify import markdownify as md | |
| markdown = md(str(soup), heading_style="ATX") | |
| markdown = re.sub(r'\n{3,}', '\n\n', markdown) | |
| markdown = re.sub(r'[ \t]+', ' ', markdown).strip() | |
| except: | |
| markdown = text | |
| yield { | |
| "url": url, | |
| "title": response.css("title::text").get("") or "", | |
| "email": email, | |
| "raw_text": text[:5000], | |
| "markdown": markdown[:5000], | |
| "depth": response.meta["depth"], | |
| "parent": response.meta["parent"] # β TRACK PARENT | |
| } | |
| self.pages_crawled += 1 | |
| if response.meta["depth"] < self.max_depth and self.pages_crawled < self.max_pages: | |
| for href in response.css("a[href]::attr(href)").getall(): | |
| if not href or href.startswith(("#", "javascript:", "mailto:", "tel:")): | |
| continue | |
| full = urljoin(url, href) | |
| parsed = urlparse(full) | |
| if parsed.netloc == self.base_domain and full not in self.seen: | |
| yield Request( | |
| full, | |
| callback=self.parse_page, | |
| meta={ | |
| "depth": response.meta["depth"] + 1, | |
| "parent": url, # β PASS PARENT | |
| "playwright": self.use_js | |
| }, | |
| dont_filter=False | |
| ) | |
| if __name__ == "__main__": | |
| start_url = sys.argv[1] | |
| max_depth = sys.argv[2] | |
| max_pages = sys.argv[3] | |
| use_js = sys.argv[4].lower() == "true" | |
| output_file = sys.argv[6] | |
| process = CrawlerProcess() | |
| process.crawl(HierarchicalSpider, start_url=start_url, max_depth=max_depth, max_pages=max_pages, use_js=use_js) | |
| process.start() | |
| ''' | |
| def build_tree(pages: list) -> dict: | |
| """Convert flat list with parent refs into nested tree.""" | |
| url_to_node = {} | |
| root = None | |
| # First pass: create all nodes | |
| for page in pages: | |
| node = {**page, "children": []} | |
| url_to_node[page["url"]] = node | |
| if page.get("parent") is None: | |
| root = node | |
| # Second pass: attach children | |
| for page in pages: | |
| if page["parent"] and page["parent"] in url_to_node: | |
| url_to_node[page["parent"]]["children"].append(url_to_node[page["url"]]) | |
| return root or (url_to_node[next(iter(url_to_node))] if url_to_node else {}) | |
| def run_crawl(start_url: str, max_depth: int, max_pages: int, use_js: bool) -> Tuple[str, str, str]: | |
| key = hashlib.md5(f"{start_url}_{max_depth}_{max_pages}_{use_js}".encode()).hexdigest() | |
| output_file = CACHE_DIR / f"{key}.json" | |
| if not output_file.exists(): | |
| script = CACHE_DIR / "spider.py" | |
| script.write_text(SPIDER_CODE) | |
| cmd = [ | |
| sys.executable, str(script), | |
| start_url, str(max_depth), str(max_pages), str(use_js), "0", str(output_file) | |
| ] | |
| subprocess.run(cmd, check=True) | |
| # Read flat data | |
| with open(output_file, "r") as f: | |
| flat_data = json.load(f) | |
| if not isinstance(flat_data, list): | |
| flat_data = [flat_data] | |
| # Build tree | |
| tree = build_tree(flat_data) | |
| full_json_str = json.dumps(tree, indent=2) | |
| # Preview | |
| preview = f"β Built hierarchical tree (depth β€ {max_depth})\n" | |
| preview += f"Root: {tree.get('url', 'N/A')}\n" | |
| preview += f"Total nodes: {len(flat_data)}\n\n" | |
| def _summarize(node, depth=0, max_show=10, out=[]): | |
| if len(out) >= max_show: | |
| return | |
| indent = " " * depth | |
| title = node.get("title", "N/A") | |
| email = node.get("email", "no email") | |
| out.append(f"{indent}ββ [{node.get('depth', '?')}] {title} ({email})") | |
| for child in node.get("children", [])[:3]: | |
| _summarize(child, depth + 1, max_show, out) | |
| summary_lines = [] | |
| _summarize(tree, out=summary_lines) | |
| preview += "\n".join(summary_lines[:15]) | |
| return preview, full_json_str, str(output_file) | |
| def sanitize_url(url: str) -> str: | |
| url = url.strip() | |
| url = re.sub(r"^[^\w+]*", "", url) | |
| url = re.sub(r"[^\w/\.:-]*$", "", url) | |
| if not url.startswith(("http://", "https://")): | |
| url = "https://" + url | |
| if not urlparse(url).netloc: | |
| raise ValueError("Invalid URL") | |
| return url | |
| def crawl_now(url: str, max_depth: int, max_pages: int, use_js: bool) -> Tuple[str, str, str]: | |
| try: | |
| clean = sanitize_url(url) | |
| return run_crawl(clean, max_depth, max_pages, use_js) | |
| except Exception as e: | |
| error_json = json.dumps({"error": str(e)}, indent=2) | |
| return f"β Error: {e}", error_json, "" | |
| # === GRADIO UI === | |
| with gr.Blocks(title="π·οΈ Hierarchical Spiderweb Crawler") as app: | |
| gr.Markdown("# πΈοΈ True Hierarchical Spiderweb Crawler") | |
| with gr.Row(): | |
| url = gr.Textbox(label="Start URL", value="https://web.mit.edu/") | |
| depth = gr.Number(label="Max Depth", value=2, minimum=0) | |
| pages = gr.Number(label="Max Pages", value=100, minimum=1) | |
| with gr.Row(): | |
| js = gr.Checkbox(label="JS Rendering", value=False) | |
| domain = gr.Textbox(value="Same Domain Only", interactive=False) | |
| btn = gr.Button("π·οΈ Start Recursive Crawl", variant="primary") | |
| with gr.Tab("Tree Preview"): | |
| preview_out = gr.Textbox(label="Hierarchical Summary", lines=15) | |
| with gr.Tab("Full Nested JSON"): | |
| json_out = gr.Code(language="json", label="Spiderweb Tree (Copy-Paste Ready)", lines=25) | |
| file_out = gr.File(label="Download Nested JSON") | |
| btn.click( | |
| fn=crawl_now, | |
| inputs=[url, depth, pages, js], | |
| outputs=[preview_out, json_out, file_out] | |
| ) | |
| gr.Markdown("## π Output Structure") | |
| gr.Markdown(""" | |
| ```json | |
| { | |
| "url": "...", | |
| "title": "...", | |
| "email": "...", | |
| "depth": 0, | |
| "parent": null, | |
| "children": [ | |
| { | |
| "url": "...", | |
| "title": "...", | |
| "depth": 1, | |
| "parent": "...", | |
| "children": [...] | |
| } | |
| ] | |
| } | |
| ``` | |
| """) | |
| if __name__ == "__main__": | |
| multiprocessing.set_start_method("spawn", force=True) | |
| app.launch(server_name="0.0.0.0", server_port=7860, mcp_server=True, show_api=True) |