Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python3 | |
| """ | |
| π·οΈ MILITARY-GRADE HIERARCHICAL SPIDERWEB CRAWLER (v2) | |
| NEW FEATURES: | |
| β Result format selector: HTML / Markdown / Plain Text | |
| β Toggleable CORS proxy: https://proxi.jammesop007.workers.dev/ | |
| β All prior 17 features preserved | |
| β Log viewer in footer | |
| β Optimized for 16GB RAM / 2 vCPU | |
| """ | |
| import os | |
| import sys | |
| import json | |
| import tempfile | |
| import subprocess | |
| import hashlib | |
| import re | |
| import multiprocessing | |
| import psutil | |
| from pathlib import Path | |
| from urllib.parse import urljoin, urlparse | |
| from typing import Tuple, Dict, Any, List, Optional | |
| import time | |
| import gradio as gr | |
| from loguru import logger | |
| from fake_useragent import UserAgent | |
| # === LOGGING SETUP === | |
| LOG_FILE = Path(tempfile.gettempdir()) / "mcp_spider.log" | |
| if LOG_FILE.exists(): | |
| with open(LOG_FILE, "r") as f: | |
| lines = f.readlines() | |
| if len(lines) > 10000: | |
| with open(LOG_FILE, "w") as f: | |
| f.writelines(lines[-5000:]) | |
| logger.remove() | |
| logger.add(sys.stderr, level="INFO") | |
| logger.add(LOG_FILE, rotation="10 MB", retention="1 week") | |
| # === PLAYWRIGHT INSTALL === | |
| def install_playwright(): | |
| try: | |
| from playwright.sync_api import sync_playwright | |
| with sync_playwright() as p: | |
| browser = p.chromium.launch(headless=True, args=["--no-sandbox"]) | |
| browser.close() | |
| except Exception as e: | |
| logger.warning(f"Playwright not ready; installing: {e}") | |
| subprocess.run([sys.executable, "-m", "playwright", "install", "chromium"], | |
| stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
| install_playwright() | |
| CACHE_DIR = Path(tempfile.gettempdir()) / "mcp_spider_cache" | |
| CACHE_DIR.mkdir(exist_ok=True, parents=True) | |
| CORS_PROXY_BASE = "https://proxi.jammesop007.workers.dev/" | |
| # === SPIDER CODE === | |
| SPIDER_CODE = r''' | |
| import sys | |
| import json | |
| import re | |
| import logging | |
| from urllib.parse import urljoin, urlparse | |
| from scrapy.crawler import CrawlerProcess | |
| from scrapy import Spider, Request | |
| from bs4 import BeautifulSoup | |
| from fake_useragent import UserAgent | |
| logging.getLogger('scrapy').setLevel(logging.WARNING) | |
| ua = UserAgent() | |
| class HierarchicalSpider(Spider): | |
| name = "hierarchical_spider" | |
| custom_settings = { | |
| "USER_AGENT": ua.random, | |
| "ROBOTSTXT_OBEY": False, | |
| "DOWNLOAD_DELAY": 0.5, | |
| "RANDOMIZE_DOWNLOAD_DELAY": 0.2, | |
| "CONCURRENT_REQUESTS_PER_DOMAIN": 16, | |
| "CONCURRENT_REQUESTS": 32, | |
| "COOKIES_ENABLED": True, | |
| "TELNETCONSOLE_ENABLED": False, | |
| "PLAYWRIGHT_BROWSER_TYPE": "chromium", | |
| "PLAYWRIGHT_LAUNCH_OPTIONS": { | |
| "headless": True, | |
| "args": [ | |
| "--no-sandbox", | |
| "--disable-web-security", | |
| "--disable-features=IsolateOrigins,site-per-process", | |
| "--disable-blink-features=AutomationControlled" | |
| ] | |
| }, | |
| "RETRY_TIMES": 3, | |
| "RETRY_HTTP_CODES": [500, 502, 503, 504, 408, 429, 403], | |
| "FEEDS": {}, | |
| } | |
| def __init__(self, | |
| start_url, | |
| max_depth, | |
| max_pages, | |
| use_js_everywhere, | |
| same_domain_only, | |
| allow_patterns, | |
| deny_patterns, | |
| skip_verify_ssl, | |
| use_cors_proxy, | |
| **kwargs): | |
| super().__init__(**kwargs) | |
| self.start_url = start_url | |
| self.max_depth = int(max_depth) | |
| self.max_pages = int(max_pages) | |
| self.use_js_everywhere = use_js_everywhere | |
| self.same_domain_only = same_domain_only | |
| self.base_domain = urlparse(start_url.replace(CORS_PROXY_BASE, "")).netloc | |
| self.pages_crawled = 0 | |
| self.seen = set() | |
| self.failed_urls = [] | |
| self.allow_patterns = allow_patterns.split("|") if allow_patterns else [] | |
| self.deny_patterns = deny_patterns.split("|") if deny_patterns else [] | |
| self.skip_verify_ssl = skip_verify_ssl | |
| self.use_cors_proxy = use_cors_proxy | |
| self.proxy_base = "https://proxi.jammesop007.workers.dev/" | |
| if self.skip_verify_ssl: | |
| import ssl | |
| ssl._create_default_https_context = ssl._create_unverified_context | |
| def start_requests(self): | |
| actual_start = self.proxy_base + self.start_url if self.use_cors_proxy else self.start_url | |
| yield Request( | |
| actual_start, | |
| callback=self.parse_page, | |
| errback=self.handle_error, | |
| meta={ | |
| "depth": 0, | |
| "parent": None, | |
| "playwright": self.use_js_everywhere, | |
| "original_url": self.start_url | |
| }, | |
| dont_filter=True | |
| ) | |
| def should_follow(self, url: str) -> bool: | |
| if any(re.search(pat, url, re.IGNORECASE) for pat in self.deny_patterns): | |
| return False | |
| if self.allow_patterns and not any(re.search(pat, url, re.IGNORECASE) for pat in self.allow_patterns): | |
| return False | |
| return True | |
| def handle_error(self, failure): | |
| orig_url = failure.request.meta.get("original_url", failure.request.url) | |
| reason = str(failure.value) | |
| self.failed_urls.append({"url": orig_url, "error": reason, "parent": failure.request.meta.get("parent")}) | |
| self.logger.error(f"Failed: {orig_url} - {reason}") | |
| def parse_page(self, response): | |
| if self.pages_crawled >= self.max_pages: | |
| return | |
| # Get original URL (without proxy) | |
| original_url = response.meta.get("original_url", response.url) | |
| if self.use_cors_proxy and response.url.startswith(self.proxy_base): | |
| displayed_url = original_url | |
| else: | |
| displayed_url = response.url | |
| if displayed_url in self.seen: | |
| return | |
| self.seen.add(displayed_url) | |
| ct = response.headers.get("Content-Type", b"").decode().lower() | |
| if "text/html" not in ct: | |
| return | |
| skip_keywords = ["/login", "/user/", "/auth", "/admin", "/signin", ".pdf", ".jpg", ".png", ".zip", ".exe"] | |
| if any(kw in displayed_url.lower() for kw in skip_keywords): | |
| return | |
| if not self.should_follow(displayed_url): | |
| return | |
| if self.same_domain_only: | |
| if urlparse(displayed_url).netloc != self.base_domain: | |
| return | |
| try: | |
| soup = BeautifulSoup(response.text, "html.parser") | |
| except Exception as e: | |
| self.logger.warning(f"Parse fail {displayed_url}: {e}") | |
| return | |
| for tag in soup(["script", "style", "nav", "footer", "header", "aside"]): | |
| tag.decompose() | |
| cleaned_html = str(soup) | |
| text = soup.get_text(" ", strip=True) | |
| emails = re.findall(r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}", text) | |
| email = emails[0] if emails else "" | |
| try: | |
| from markdownify import markdownify as md | |
| markdown = md(cleaned_html, heading_style="ATX") | |
| markdown = re.sub(r'\n{3,}', '\n\n', markdown).strip() | |
| except: | |
| markdown = text | |
| yield { | |
| "url": displayed_url, | |
| "title": response.css("title::text").get("") or "", | |
| "email": email, | |
| "html": cleaned_html[:5000], | |
| "markdown": markdown[:5000], | |
| "txt": text[:5000], | |
| "depth": response.meta["depth"], | |
| "parent": response.meta["parent"] | |
| } | |
| self.pages_crawled += 1 | |
| if response.meta["depth"] < self.max_depth and self.pages_crawled < self.max_pages: | |
| for href in response.css("a[href]::attr(href)").getall(): | |
| if not href or href.startswith(("#", "javascript:", "mailto:", "tel:", "")): | |
| continue | |
| full = urljoin(displayed_url, href) | |
| if full in self.seen: | |
| continue | |
| if not self.should_follow(full): | |
| continue | |
| if self.same_domain_only and urlparse(full).netloc != self.base_domain: | |
| continue | |
| if self.use_cors_proxy: | |
| request_url = self.proxy_base + full | |
| else: | |
| request_url = full | |
| yield Request( | |
| request_url, | |
| callback=self.parse_page, | |
| errback=self.handle_error, | |
| meta={ | |
| "depth": response.meta["depth"] + 1, | |
| "parent": displayed_url, | |
| "playwright": self.use_js_everywhere, | |
| "original_url": full | |
| }, | |
| dont_filter=False | |
| ) | |
| def closed(self, reason): | |
| output_file = sys.argv[9] | |
| failed_file = output_file.replace(".json", "_failed.json") | |
| with open(failed_file, "w") as f: | |
| json.dump(self.failed_urls, f, indent=2) | |
| if __name__ == "__main__": | |
| ( | |
| start_url, max_depth, max_pages, use_js_everywhere, same_domain_only, | |
| allow_patterns, deny_patterns, skip_verify_ssl, use_cors_proxy, _, output_file | |
| ) = sys.argv[1:12] | |
| process = CrawlerProcess({ | |
| "REQUEST_FINGERPRINTER_IMPLEMENTATION": "2.7", | |
| "TWISTED_REACTOR": "twisted.internet.asyncioreactor.AsyncioSelectorReactor", | |
| }) | |
| process.crawl( | |
| HierarchicalSpider, | |
| start_url=start_url, | |
| max_depth=max_depth, | |
| max_pages=max_pages, | |
| use_js_everywhere=(use_js_everywhere.lower() == "true"), | |
| same_domain_only=(same_domain_only.lower() == "true"), | |
| allow_patterns=allow_patterns, | |
| deny_patterns=deny_patterns, | |
| skip_verify_ssl=(skip_verify_ssl.lower() == "true"), | |
| use_cors_proxy=(use_cors_proxy.lower() == "true"), | |
| ) | |
| process.start() | |
| ''' | |
| # === HELPER FUNCTIONS === | |
| def sanitize_url(url: str) -> str: | |
| url = url.strip() | |
| url = re.sub(r"^[^\w+]*", "", url) | |
| url = re.sub(r"[^\w/\.:-]*$", "", url) | |
| if not url.startswith(("http://", "https://")): | |
| url = "https://" + url | |
| if not urlparse(url).netloc: | |
| raise ValueError("Invalid URL") | |
| return url | |
| def build_tree(pages: List[Dict]) -> Dict: | |
| url_to_node = {} | |
| root = None | |
| for page in pages: | |
| node = {**page, "children": []} | |
| url_to_node[page["url"]] = node | |
| if page.get("parent") is None: | |
| root = node | |
| if not root and url_to_node: | |
| root = next(iter(url_to_node.values())) | |
| for page in pages: | |
| if page["parent"] and page["parent"] in url_to_node: | |
| url_to_node[page["parent"]]["children"].append(url_to_node[page["url"]]) | |
| return root or {} | |
| def build_pyvis_graph(pages: List[Dict]) -> str: | |
| try: | |
| from pyvis.network import Network | |
| except ImportError: | |
| return "<p>Pyvis not installed. Run: pip install pyvis</p>" | |
| net = Network(height="600px", width="100%", bgcolor="#ffffff", font_color="black", directed=True) | |
| url_to_id = {p["url"]: i for i, p in enumerate(pages)} | |
| for p in pages: | |
| label = (p.get("title") or p["url"])[:40] | |
| net.add_node(url_to_id[p["url"]], label=label, title=p["url"], size=10 + 5 * (p.get("depth", 0))) | |
| for p in pages: | |
| if p.get("parent") and p["parent"] in url_to_id: | |
| net.add_edge(url_to_id[p["parent"]], url_to_id[p["url"]]) | |
| net.set_options(""" | |
| var options = { | |
| "physics": {"stabilization": {"iterations": 100}}, | |
| "edges": {"arrows": {"to": {"enabled": true}}, "smooth": true} | |
| } | |
| """) | |
| tmp = tempfile.NamedTemporaryFile(suffix=".html", delete=False, mode="w") | |
| net.write_html(tmp.name) | |
| return tmp.name | |
| def export_data(pages: List[Dict], fmt: str, base_path: Path) -> str: | |
| if fmt == "json": | |
| out = base_path.with_suffix(".json") | |
| with open(out, "w") as f: | |
| json.dump(pages, f, indent=2) | |
| elif fmt == "csv": | |
| import csv | |
| out = base_path.with_suffix(".csv") | |
| with open(out, "w", newline="") as f: | |
| writer = csv.DictWriter(f, fieldnames=["url", "title", "email", "depth", "parent"]) | |
| writer.writeheader() | |
| for p in pages: | |
| writer.writerow({k: v for k, v in p.items() if k in writer.fieldnames}) | |
| elif fmt == "graphml": | |
| import networkx as nx | |
| G = build_networkx_graph(pages) | |
| out = base_path.with_suffix(".graphml") | |
| nx.write_graphml(G, str(out)) | |
| elif fmt == "markdown": | |
| out = base_path.with_suffix(".md") | |
| with open(out, "w") as f: | |
| for p in pages: | |
| f.write(f"## [{p.get('title', 'No Title')}]({p['url']})\n") | |
| f.write(f"Email: {p.get('email', 'N/A')}\n\n") | |
| f.write(f"{p.get('markdown', '')[:300]}...\n\n---\n\n") | |
| return str(out) | |
| def build_networkx_graph(pages: List[Dict]): | |
| import networkx as nx | |
| G = nx.DiGraph() | |
| for p in pages: | |
| G.add_node(p["url"], title=p.get("title", ""), depth=p.get("depth", 0)) | |
| for p in pages: | |
| if p.get("parent"): | |
| G.add_edge(p["parent"], p["url"]) | |
| return G | |
| def get_system_stats() -> str: | |
| cpu = psutil.cpu_percent() | |
| ram = psutil.virtual_memory().percent | |
| return f"CPU: {cpu:.1f}% | RAM: {ram:.1f}%" | |
| def crawl_now( | |
| url: str, | |
| max_depth: int, | |
| max_pages: int, | |
| reqs_per_sec: float, | |
| use_js_everywhere: bool, | |
| same_domain_only: bool, | |
| show_only_failed: bool, | |
| allow_patterns: str, | |
| deny_patterns: str, | |
| skip_verify_ssl: bool, | |
| use_cors_proxy: bool, | |
| result_format: str, | |
| export_format: str, | |
| ) -> Tuple[str, str, str, str, str]: | |
| try: | |
| clean = sanitize_url(url) | |
| key_inputs = f"{clean}_{max_depth}_{max_pages}_{use_js_everywhere}_{same_domain_only}_{allow_patterns}_{deny_patterns}_{skip_verify_ssl}_{use_cors_proxy}" | |
| key = hashlib.md5(key_inputs.encode()).hexdigest() | |
| output_file = CACHE_DIR / f"{key}.json" | |
| failed_file = CACHE_DIR / f"{key}_failed.json" | |
| if not output_file.exists(): | |
| script = CACHE_DIR / "spider.py" | |
| script.write_text(SPIDER_CODE) | |
| cmd = [ | |
| sys.executable, str(script), | |
| clean, str(max_depth), str(max_pages), str(use_js_everywhere), | |
| str(same_domain_only), allow_patterns, deny_patterns, str(skip_verify_ssl), | |
| str(use_cors_proxy), "0", str(output_file) | |
| ] | |
| env = os.environ.copy() | |
| subprocess.run(cmd, check=True, env=env) | |
| if show_only_failed: | |
| if failed_file.exists(): | |
| with open(failed_file, "r") as f: | |
| failed = json.load(f) | |
| preview = f"β {len(failed)} failed links:\n" | |
| preview += "\n".join(f"- {item['url']} ({item['error']})" for item in failed[:20]) | |
| full_json = json.dumps(failed, indent=2) | |
| graph_html = "<p>Failed links have no graph.</p>" | |
| export_path = str(failed_file) | |
| else: | |
| preview = "β No failed links." | |
| full_json = "[]" | |
| graph_html = "<p>No failures.</p>" | |
| export_path = "" | |
| else: | |
| with open(output_file, "r") as f: | |
| flat_data = json.load(f) | |
| if not isinstance(flat_data, list): | |
| flat_data = [flat_data] | |
| # Select result format for preview | |
| preview_lines = [] | |
| for item in flat_data[:5]: # preview first 5 | |
| content = item.get(result_format, "")[:300] | |
| preview_lines.append(f"[{item.get('depth')}] {item.get('url')}\n{content}\n---") | |
| preview = f"β Showing '{result_format}' format (first 5 of {len(flat_data)} pages)\n\n" + "\n".join(preview_lines) | |
| tree = build_tree(flat_data) | |
| full_json = json.dumps(tree, indent=2) | |
| graph_path = build_pyvis_graph(flat_data) | |
| with open(graph_path, "r") as f: | |
| graph_html = f.read() | |
| export_base = CACHE_DIR / f"{key}_export" | |
| export_path = export_data(flat_data, export_format, export_base) | |
| stats = get_system_stats() | |
| return preview, full_json, graph_html, export_path, stats | |
| except Exception as e: | |
| logger.exception("Crawl failed") | |
| error_json = json.dumps({"error": str(e)}, indent=2) | |
| return f"β Error: {e}", error_json, "<p>Error</p>", "", get_system_stats() | |
| # === GRADIO UI === | |
| with gr.Blocks(title="π·οΈ Military-Grade Hierarchical Spiderweb Crawler") as app: | |
| gr.Markdown("# πΈοΈ Military-Grade Hierarchical Spiderweb Crawler") | |
| with gr.Row(): | |
| url = gr.Textbox(label="Start URL", value="https://web.mit.edu/") | |
| depth = gr.Number(label="Max Depth", value=2, minimum=0) | |
| pages = gr.Number(label="Max Pages", value=100, minimum=1) | |
| reqs_per_sec = gr.Number(label="Max Requests/sec", value=2, minimum=0.1) | |
| with gr.Row(): | |
| js = gr.Checkbox(label="JS Rendering for All Pages", value=False) | |
| same_domain = gr.Checkbox(label="Same Domain Only", value=True) | |
| skip_ssl = gr.Checkbox(label="Skip SSL Verification", value=False) | |
| use_proxy = gr.Checkbox(label="Use CORS Proxy", value=False) | |
| with gr.Row(): | |
| allow_pat = gr.Textbox(label="Allow URL Regex (| separated)", placeholder=r".*\.mit\.edu.*") | |
| deny_pat = gr.Textbox(label="Deny URL Regex (| separated)", placeholder=r"/admin|/login") | |
| with gr.Row(): | |
| result_fmt = gr.Dropdown( | |
| choices=["txt", "markdown", "html"], | |
| value="txt", | |
| label="Result Content Format" | |
| ) | |
| export_fmt = gr.Dropdown( | |
| choices=["json", "csv", "graphml", "markdown"], | |
| value="json", | |
| label="Export Format" | |
| ) | |
| btn = gr.Button("π·οΈ Start Crawl", variant="primary") | |
| with gr.Tab("Tree Preview"): | |
| preview_out = gr.Textbox(label="Crawl Summary", lines=10) | |
| with gr.Tab("Full Nested JSON"): | |
| json_out = gr.Code(language="json", label="Nested Tree", lines=20) | |
| with gr.Tab("Interactive Link Graph"): | |
| graph_out = gr.HTML(label="Network Visualization") | |
| with gr.Tab("Export File"): | |
| file_out = gr.File(label="Download Export") | |
| stats_display = gr.Textbox(label="System Stats (Live)", interactive=False, lines=1) | |
| # Log Viewer | |
| with gr.Accordion("π Live Log Viewer (Last 100 Lines)", open=False): | |
| log_viewer = gr.Textbox(label="", lines=10, max_lines=20, interactive=False) | |
| def update_log(): | |
| if LOG_FILE.exists(): | |
| with open(LOG_FILE, "r") as f: | |
| lines = f.readlines() | |
| return "".join(lines[-100:]) | |
| return "No logs yet." | |
| app.load(fn=update_log, inputs=None, outputs=log_viewer, every=2) | |
| btn.click( | |
| fn=crawl_now, | |
| inputs=[ | |
| url, depth, pages, reqs_per_sec, js, same_domain, | |
| False, # show_only_failed handled separately if needed | |
| allow_pat, deny_pat, skip_ssl, use_proxy, | |
| result_fmt, export_fmt | |
| ], | |
| outputs=[preview_out, json_out, graph_out, file_out, stats_display] | |
| ) | |
| # Failed-only button (optional) | |
| with gr.Row(): | |
| failed_btn = gr.Button("π Show Only Failed Links") | |
| failed_btn.click( | |
| fn=lambda *args: crawl_now(*args[:-2], True, *args[-2:]), # inject show_only_failed=True | |
| inputs=[ | |
| url, depth, pages, reqs_per_sec, js, same_domain, | |
| allow_pat, deny_pat, skip_ssl, use_proxy, | |
| result_fmt, export_fmt | |
| ], | |
| outputs=[preview_out, json_out, graph_out, file_out, stats_display] | |
| ) | |
| gr.Markdown("## π‘οΈ Features: Stealth, Multi-Domain, JS, Graph, CORS Proxy, Format Selector, Export") | |
| if __name__ == "__main__": | |
| multiprocessing.set_start_method("spawn", force=True) | |
| app.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| show_api=True, | |
| mcp_server=True | |
| ) |