#!/usr/bin/env python3 """ πŸ•·οΈ MILITARY-GRADE HIERARCHICAL SPIDERWEB CRAWLER (v2) NEW FEATURES: βœ… Result format selector: HTML / Markdown / Plain Text βœ… Toggleable CORS proxy: https://proxi.jammesop007.workers.dev/ βœ… All prior 17 features preserved βœ… Log viewer in footer βœ… Optimized for 16GB RAM / 2 vCPU """ import os import sys import json import tempfile import subprocess import hashlib import re import multiprocessing import psutil from pathlib import Path from urllib.parse import urljoin, urlparse from typing import Tuple, Dict, Any, List, Optional import time import gradio as gr from loguru import logger from fake_useragent import UserAgent # === LOGGING SETUP === LOG_FILE = Path(tempfile.gettempdir()) / "mcp_spider.log" if LOG_FILE.exists(): with open(LOG_FILE, "r") as f: lines = f.readlines() if len(lines) > 10000: with open(LOG_FILE, "w") as f: f.writelines(lines[-5000:]) logger.remove() logger.add(sys.stderr, level="INFO") logger.add(LOG_FILE, rotation="10 MB", retention="1 week") # === PLAYWRIGHT INSTALL === def install_playwright(): try: from playwright.sync_api import sync_playwright with sync_playwright() as p: browser = p.chromium.launch(headless=True, args=["--no-sandbox"]) browser.close() except Exception as e: logger.warning(f"Playwright not ready; installing: {e}") subprocess.run([sys.executable, "-m", "playwright", "install", "chromium"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) install_playwright() CACHE_DIR = Path(tempfile.gettempdir()) / "mcp_spider_cache" CACHE_DIR.mkdir(exist_ok=True, parents=True) CORS_PROXY_BASE = "https://proxi.jammesop007.workers.dev/" # === SPIDER CODE === SPIDER_CODE = r''' import sys import json import re import logging from urllib.parse import urljoin, urlparse from scrapy.crawler import CrawlerProcess from scrapy import Spider, Request from bs4 import BeautifulSoup from fake_useragent import UserAgent logging.getLogger('scrapy').setLevel(logging.WARNING) ua = UserAgent() class HierarchicalSpider(Spider): name = "hierarchical_spider" custom_settings = { "USER_AGENT": ua.random, "ROBOTSTXT_OBEY": False, "DOWNLOAD_DELAY": 0.5, "RANDOMIZE_DOWNLOAD_DELAY": 0.2, "CONCURRENT_REQUESTS_PER_DOMAIN": 16, "CONCURRENT_REQUESTS": 32, "COOKIES_ENABLED": True, "TELNETCONSOLE_ENABLED": False, "PLAYWRIGHT_BROWSER_TYPE": "chromium", "PLAYWRIGHT_LAUNCH_OPTIONS": { "headless": True, "args": [ "--no-sandbox", "--disable-web-security", "--disable-features=IsolateOrigins,site-per-process", "--disable-blink-features=AutomationControlled" ] }, "RETRY_TIMES": 3, "RETRY_HTTP_CODES": [500, 502, 503, 504, 408, 429, 403], "FEEDS": {}, } def __init__(self, start_url, max_depth, max_pages, use_js_everywhere, same_domain_only, allow_patterns, deny_patterns, skip_verify_ssl, use_cors_proxy, **kwargs): super().__init__(**kwargs) self.start_url = start_url self.max_depth = int(max_depth) self.max_pages = int(max_pages) self.use_js_everywhere = use_js_everywhere self.same_domain_only = same_domain_only self.base_domain = urlparse(start_url.replace(CORS_PROXY_BASE, "")).netloc self.pages_crawled = 0 self.seen = set() self.failed_urls = [] self.allow_patterns = allow_patterns.split("|") if allow_patterns else [] self.deny_patterns = deny_patterns.split("|") if deny_patterns else [] self.skip_verify_ssl = skip_verify_ssl self.use_cors_proxy = use_cors_proxy self.proxy_base = "https://proxi.jammesop007.workers.dev/" if self.skip_verify_ssl: import ssl ssl._create_default_https_context = ssl._create_unverified_context def start_requests(self): actual_start = self.proxy_base + self.start_url if self.use_cors_proxy else self.start_url yield Request( actual_start, callback=self.parse_page, errback=self.handle_error, meta={ "depth": 0, "parent": None, "playwright": self.use_js_everywhere, "original_url": self.start_url }, dont_filter=True ) def should_follow(self, url: str) -> bool: if any(re.search(pat, url, re.IGNORECASE) for pat in self.deny_patterns): return False if self.allow_patterns and not any(re.search(pat, url, re.IGNORECASE) for pat in self.allow_patterns): return False return True def handle_error(self, failure): orig_url = failure.request.meta.get("original_url", failure.request.url) reason = str(failure.value) self.failed_urls.append({"url": orig_url, "error": reason, "parent": failure.request.meta.get("parent")}) self.logger.error(f"Failed: {orig_url} - {reason}") def parse_page(self, response): if self.pages_crawled >= self.max_pages: return # Get original URL (without proxy) original_url = response.meta.get("original_url", response.url) if self.use_cors_proxy and response.url.startswith(self.proxy_base): displayed_url = original_url else: displayed_url = response.url if displayed_url in self.seen: return self.seen.add(displayed_url) ct = response.headers.get("Content-Type", b"").decode().lower() if "text/html" not in ct: return skip_keywords = ["/login", "/user/", "/auth", "/admin", "/signin", ".pdf", ".jpg", ".png", ".zip", ".exe"] if any(kw in displayed_url.lower() for kw in skip_keywords): return if not self.should_follow(displayed_url): return if self.same_domain_only: if urlparse(displayed_url).netloc != self.base_domain: return try: soup = BeautifulSoup(response.text, "html.parser") except Exception as e: self.logger.warning(f"Parse fail {displayed_url}: {e}") return for tag in soup(["script", "style", "nav", "footer", "header", "aside"]): tag.decompose() cleaned_html = str(soup) text = soup.get_text(" ", strip=True) emails = re.findall(r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}", text) email = emails[0] if emails else "" try: from markdownify import markdownify as md markdown = md(cleaned_html, heading_style="ATX") markdown = re.sub(r'\n{3,}', '\n\n', markdown).strip() except: markdown = text yield { "url": displayed_url, "title": response.css("title::text").get("") or "", "email": email, "html": cleaned_html[:5000], "markdown": markdown[:5000], "txt": text[:5000], "depth": response.meta["depth"], "parent": response.meta["parent"] } self.pages_crawled += 1 if response.meta["depth"] < self.max_depth and self.pages_crawled < self.max_pages: for href in response.css("a[href]::attr(href)").getall(): if not href or href.startswith(("#", "javascript:", "mailto:", "tel:", "")): continue full = urljoin(displayed_url, href) if full in self.seen: continue if not self.should_follow(full): continue if self.same_domain_only and urlparse(full).netloc != self.base_domain: continue if self.use_cors_proxy: request_url = self.proxy_base + full else: request_url = full yield Request( request_url, callback=self.parse_page, errback=self.handle_error, meta={ "depth": response.meta["depth"] + 1, "parent": displayed_url, "playwright": self.use_js_everywhere, "original_url": full }, dont_filter=False ) def closed(self, reason): output_file = sys.argv[9] failed_file = output_file.replace(".json", "_failed.json") with open(failed_file, "w") as f: json.dump(self.failed_urls, f, indent=2) if __name__ == "__main__": ( start_url, max_depth, max_pages, use_js_everywhere, same_domain_only, allow_patterns, deny_patterns, skip_verify_ssl, use_cors_proxy, _, output_file ) = sys.argv[1:12] process = CrawlerProcess({ "REQUEST_FINGERPRINTER_IMPLEMENTATION": "2.7", "TWISTED_REACTOR": "twisted.internet.asyncioreactor.AsyncioSelectorReactor", }) process.crawl( HierarchicalSpider, start_url=start_url, max_depth=max_depth, max_pages=max_pages, use_js_everywhere=(use_js_everywhere.lower() == "true"), same_domain_only=(same_domain_only.lower() == "true"), allow_patterns=allow_patterns, deny_patterns=deny_patterns, skip_verify_ssl=(skip_verify_ssl.lower() == "true"), use_cors_proxy=(use_cors_proxy.lower() == "true"), ) process.start() ''' # === HELPER FUNCTIONS === def sanitize_url(url: str) -> str: url = url.strip() url = re.sub(r"^[^\w+]*", "", url) url = re.sub(r"[^\w/\.:-]*$", "", url) if not url.startswith(("http://", "https://")): url = "https://" + url if not urlparse(url).netloc: raise ValueError("Invalid URL") return url def build_tree(pages: List[Dict]) -> Dict: url_to_node = {} root = None for page in pages: node = {**page, "children": []} url_to_node[page["url"]] = node if page.get("parent") is None: root = node if not root and url_to_node: root = next(iter(url_to_node.values())) for page in pages: if page["parent"] and page["parent"] in url_to_node: url_to_node[page["parent"]]["children"].append(url_to_node[page["url"]]) return root or {} def build_pyvis_graph(pages: List[Dict]) -> str: try: from pyvis.network import Network except ImportError: return "

Pyvis not installed. Run: pip install pyvis

" net = Network(height="600px", width="100%", bgcolor="#ffffff", font_color="black", directed=True) url_to_id = {p["url"]: i for i, p in enumerate(pages)} for p in pages: label = (p.get("title") or p["url"])[:40] net.add_node(url_to_id[p["url"]], label=label, title=p["url"], size=10 + 5 * (p.get("depth", 0))) for p in pages: if p.get("parent") and p["parent"] in url_to_id: net.add_edge(url_to_id[p["parent"]], url_to_id[p["url"]]) net.set_options(""" var options = { "physics": {"stabilization": {"iterations": 100}}, "edges": {"arrows": {"to": {"enabled": true}}, "smooth": true} } """) tmp = tempfile.NamedTemporaryFile(suffix=".html", delete=False, mode="w") net.write_html(tmp.name) return tmp.name def export_data(pages: List[Dict], fmt: str, base_path: Path) -> str: if fmt == "json": out = base_path.with_suffix(".json") with open(out, "w") as f: json.dump(pages, f, indent=2) elif fmt == "csv": import csv out = base_path.with_suffix(".csv") with open(out, "w", newline="") as f: writer = csv.DictWriter(f, fieldnames=["url", "title", "email", "depth", "parent"]) writer.writeheader() for p in pages: writer.writerow({k: v for k, v in p.items() if k in writer.fieldnames}) elif fmt == "graphml": import networkx as nx G = build_networkx_graph(pages) out = base_path.with_suffix(".graphml") nx.write_graphml(G, str(out)) elif fmt == "markdown": out = base_path.with_suffix(".md") with open(out, "w") as f: for p in pages: f.write(f"## [{p.get('title', 'No Title')}]({p['url']})\n") f.write(f"Email: {p.get('email', 'N/A')}\n\n") f.write(f"{p.get('markdown', '')[:300]}...\n\n---\n\n") return str(out) def build_networkx_graph(pages: List[Dict]): import networkx as nx G = nx.DiGraph() for p in pages: G.add_node(p["url"], title=p.get("title", ""), depth=p.get("depth", 0)) for p in pages: if p.get("parent"): G.add_edge(p["parent"], p["url"]) return G def get_system_stats() -> str: cpu = psutil.cpu_percent() ram = psutil.virtual_memory().percent return f"CPU: {cpu:.1f}% | RAM: {ram:.1f}%" def crawl_now( url: str, max_depth: int, max_pages: int, reqs_per_sec: float, use_js_everywhere: bool, same_domain_only: bool, show_only_failed: bool, allow_patterns: str, deny_patterns: str, skip_verify_ssl: bool, use_cors_proxy: bool, result_format: str, export_format: str, ) -> Tuple[str, str, str, str, str]: try: clean = sanitize_url(url) key_inputs = f"{clean}_{max_depth}_{max_pages}_{use_js_everywhere}_{same_domain_only}_{allow_patterns}_{deny_patterns}_{skip_verify_ssl}_{use_cors_proxy}" key = hashlib.md5(key_inputs.encode()).hexdigest() output_file = CACHE_DIR / f"{key}.json" failed_file = CACHE_DIR / f"{key}_failed.json" if not output_file.exists(): script = CACHE_DIR / "spider.py" script.write_text(SPIDER_CODE) cmd = [ sys.executable, str(script), clean, str(max_depth), str(max_pages), str(use_js_everywhere), str(same_domain_only), allow_patterns, deny_patterns, str(skip_verify_ssl), str(use_cors_proxy), "0", str(output_file) ] env = os.environ.copy() subprocess.run(cmd, check=True, env=env) if show_only_failed: if failed_file.exists(): with open(failed_file, "r") as f: failed = json.load(f) preview = f"❌ {len(failed)} failed links:\n" preview += "\n".join(f"- {item['url']} ({item['error']})" for item in failed[:20]) full_json = json.dumps(failed, indent=2) graph_html = "

Failed links have no graph.

" export_path = str(failed_file) else: preview = "βœ… No failed links." full_json = "[]" graph_html = "

No failures.

" export_path = "" else: with open(output_file, "r") as f: flat_data = json.load(f) if not isinstance(flat_data, list): flat_data = [flat_data] # Select result format for preview preview_lines = [] for item in flat_data[:5]: # preview first 5 content = item.get(result_format, "")[:300] preview_lines.append(f"[{item.get('depth')}] {item.get('url')}\n{content}\n---") preview = f"βœ… Showing '{result_format}' format (first 5 of {len(flat_data)} pages)\n\n" + "\n".join(preview_lines) tree = build_tree(flat_data) full_json = json.dumps(tree, indent=2) graph_path = build_pyvis_graph(flat_data) with open(graph_path, "r") as f: graph_html = f.read() export_base = CACHE_DIR / f"{key}_export" export_path = export_data(flat_data, export_format, export_base) stats = get_system_stats() return preview, full_json, graph_html, export_path, stats except Exception as e: logger.exception("Crawl failed") error_json = json.dumps({"error": str(e)}, indent=2) return f"❌ Error: {e}", error_json, "

Error

", "", get_system_stats() # === GRADIO UI === with gr.Blocks(title="πŸ•·οΈ Military-Grade Hierarchical Spiderweb Crawler") as app: gr.Markdown("# πŸ•ΈοΈ Military-Grade Hierarchical Spiderweb Crawler") with gr.Row(): url = gr.Textbox(label="Start URL", value="https://web.mit.edu/") depth = gr.Number(label="Max Depth", value=2, minimum=0) pages = gr.Number(label="Max Pages", value=100, minimum=1) reqs_per_sec = gr.Number(label="Max Requests/sec", value=2, minimum=0.1) with gr.Row(): js = gr.Checkbox(label="JS Rendering for All Pages", value=False) same_domain = gr.Checkbox(label="Same Domain Only", value=True) skip_ssl = gr.Checkbox(label="Skip SSL Verification", value=False) use_proxy = gr.Checkbox(label="Use CORS Proxy", value=False) with gr.Row(): allow_pat = gr.Textbox(label="Allow URL Regex (| separated)", placeholder=r".*\.mit\.edu.*") deny_pat = gr.Textbox(label="Deny URL Regex (| separated)", placeholder=r"/admin|/login") with gr.Row(): result_fmt = gr.Dropdown( choices=["txt", "markdown", "html"], value="txt", label="Result Content Format" ) export_fmt = gr.Dropdown( choices=["json", "csv", "graphml", "markdown"], value="json", label="Export Format" ) btn = gr.Button("πŸ•·οΈ Start Crawl", variant="primary") with gr.Tab("Tree Preview"): preview_out = gr.Textbox(label="Crawl Summary", lines=10) with gr.Tab("Full Nested JSON"): json_out = gr.Code(language="json", label="Nested Tree", lines=20) with gr.Tab("Interactive Link Graph"): graph_out = gr.HTML(label="Network Visualization") with gr.Tab("Export File"): file_out = gr.File(label="Download Export") stats_display = gr.Textbox(label="System Stats (Live)", interactive=False, lines=1) # Log Viewer with gr.Accordion("πŸ“œ Live Log Viewer (Last 100 Lines)", open=False): log_viewer = gr.Textbox(label="", lines=10, max_lines=20, interactive=False) def update_log(): if LOG_FILE.exists(): with open(LOG_FILE, "r") as f: lines = f.readlines() return "".join(lines[-100:]) return "No logs yet." app.load(fn=update_log, inputs=None, outputs=log_viewer, every=2) btn.click( fn=crawl_now, inputs=[ url, depth, pages, reqs_per_sec, js, same_domain, False, # show_only_failed handled separately if needed allow_pat, deny_pat, skip_ssl, use_proxy, result_fmt, export_fmt ], outputs=[preview_out, json_out, graph_out, file_out, stats_display] ) # Failed-only button (optional) with gr.Row(): failed_btn = gr.Button("πŸ” Show Only Failed Links") failed_btn.click( fn=lambda *args: crawl_now(*args[:-2], True, *args[-2:]), # inject show_only_failed=True inputs=[ url, depth, pages, reqs_per_sec, js, same_domain, allow_pat, deny_pat, skip_ssl, use_proxy, result_fmt, export_fmt ], outputs=[preview_out, json_out, graph_out, file_out, stats_display] ) gr.Markdown("## πŸ›‘οΈ Features: Stealth, Multi-Domain, JS, Graph, CORS Proxy, Format Selector, Export") if __name__ == "__main__": multiprocessing.set_start_method("spawn", force=True) app.launch( server_name="0.0.0.0", server_port=7860, show_api=True, mcp_server=True )