web-scraper / app.py
John1234huh's picture
Update app.py
303a416 verified
#!/usr/bin/env python3
"""
πŸ•·οΈ MILITARY-GRADE HIERARCHICAL SPIDERWEB CRAWLER (v2)
NEW FEATURES:
βœ… Result format selector: HTML / Markdown / Plain Text
βœ… Toggleable CORS proxy: https://proxi.jammesop007.workers.dev/
βœ… All prior 17 features preserved
βœ… Log viewer in footer
βœ… Optimized for 16GB RAM / 2 vCPU
"""
import os
import sys
import json
import tempfile
import subprocess
import hashlib
import re
import multiprocessing
import psutil
from pathlib import Path
from urllib.parse import urljoin, urlparse
from typing import Tuple, Dict, Any, List, Optional
import time
import gradio as gr
from loguru import logger
from fake_useragent import UserAgent
# === LOGGING SETUP ===
LOG_FILE = Path(tempfile.gettempdir()) / "mcp_spider.log"
if LOG_FILE.exists():
with open(LOG_FILE, "r") as f:
lines = f.readlines()
if len(lines) > 10000:
with open(LOG_FILE, "w") as f:
f.writelines(lines[-5000:])
logger.remove()
logger.add(sys.stderr, level="INFO")
logger.add(LOG_FILE, rotation="10 MB", retention="1 week")
# === PLAYWRIGHT INSTALL ===
def install_playwright():
try:
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
browser = p.chromium.launch(headless=True, args=["--no-sandbox"])
browser.close()
except Exception as e:
logger.warning(f"Playwright not ready; installing: {e}")
subprocess.run([sys.executable, "-m", "playwright", "install", "chromium"],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
install_playwright()
CACHE_DIR = Path(tempfile.gettempdir()) / "mcp_spider_cache"
CACHE_DIR.mkdir(exist_ok=True, parents=True)
CORS_PROXY_BASE = "https://proxi.jammesop007.workers.dev/"
# === SPIDER CODE ===
SPIDER_CODE = r'''
import sys
import json
import re
import logging
from urllib.parse import urljoin, urlparse
from scrapy.crawler import CrawlerProcess
from scrapy import Spider, Request
from bs4 import BeautifulSoup
from fake_useragent import UserAgent
logging.getLogger('scrapy').setLevel(logging.WARNING)
ua = UserAgent()
class HierarchicalSpider(Spider):
name = "hierarchical_spider"
custom_settings = {
"USER_AGENT": ua.random,
"ROBOTSTXT_OBEY": False,
"DOWNLOAD_DELAY": 0.5,
"RANDOMIZE_DOWNLOAD_DELAY": 0.2,
"CONCURRENT_REQUESTS_PER_DOMAIN": 16,
"CONCURRENT_REQUESTS": 32,
"COOKIES_ENABLED": True,
"TELNETCONSOLE_ENABLED": False,
"PLAYWRIGHT_BROWSER_TYPE": "chromium",
"PLAYWRIGHT_LAUNCH_OPTIONS": {
"headless": True,
"args": [
"--no-sandbox",
"--disable-web-security",
"--disable-features=IsolateOrigins,site-per-process",
"--disable-blink-features=AutomationControlled"
]
},
"RETRY_TIMES": 3,
"RETRY_HTTP_CODES": [500, 502, 503, 504, 408, 429, 403],
"FEEDS": {},
}
def __init__(self,
start_url,
max_depth,
max_pages,
use_js_everywhere,
same_domain_only,
allow_patterns,
deny_patterns,
skip_verify_ssl,
use_cors_proxy,
**kwargs):
super().__init__(**kwargs)
self.start_url = start_url
self.max_depth = int(max_depth)
self.max_pages = int(max_pages)
self.use_js_everywhere = use_js_everywhere
self.same_domain_only = same_domain_only
self.base_domain = urlparse(start_url.replace(CORS_PROXY_BASE, "")).netloc
self.pages_crawled = 0
self.seen = set()
self.failed_urls = []
self.allow_patterns = allow_patterns.split("|") if allow_patterns else []
self.deny_patterns = deny_patterns.split("|") if deny_patterns else []
self.skip_verify_ssl = skip_verify_ssl
self.use_cors_proxy = use_cors_proxy
self.proxy_base = "https://proxi.jammesop007.workers.dev/"
if self.skip_verify_ssl:
import ssl
ssl._create_default_https_context = ssl._create_unverified_context
def start_requests(self):
actual_start = self.proxy_base + self.start_url if self.use_cors_proxy else self.start_url
yield Request(
actual_start,
callback=self.parse_page,
errback=self.handle_error,
meta={
"depth": 0,
"parent": None,
"playwright": self.use_js_everywhere,
"original_url": self.start_url
},
dont_filter=True
)
def should_follow(self, url: str) -> bool:
if any(re.search(pat, url, re.IGNORECASE) for pat in self.deny_patterns):
return False
if self.allow_patterns and not any(re.search(pat, url, re.IGNORECASE) for pat in self.allow_patterns):
return False
return True
def handle_error(self, failure):
orig_url = failure.request.meta.get("original_url", failure.request.url)
reason = str(failure.value)
self.failed_urls.append({"url": orig_url, "error": reason, "parent": failure.request.meta.get("parent")})
self.logger.error(f"Failed: {orig_url} - {reason}")
def parse_page(self, response):
if self.pages_crawled >= self.max_pages:
return
# Get original URL (without proxy)
original_url = response.meta.get("original_url", response.url)
if self.use_cors_proxy and response.url.startswith(self.proxy_base):
displayed_url = original_url
else:
displayed_url = response.url
if displayed_url in self.seen:
return
self.seen.add(displayed_url)
ct = response.headers.get("Content-Type", b"").decode().lower()
if "text/html" not in ct:
return
skip_keywords = ["/login", "/user/", "/auth", "/admin", "/signin", ".pdf", ".jpg", ".png", ".zip", ".exe"]
if any(kw in displayed_url.lower() for kw in skip_keywords):
return
if not self.should_follow(displayed_url):
return
if self.same_domain_only:
if urlparse(displayed_url).netloc != self.base_domain:
return
try:
soup = BeautifulSoup(response.text, "html.parser")
except Exception as e:
self.logger.warning(f"Parse fail {displayed_url}: {e}")
return
for tag in soup(["script", "style", "nav", "footer", "header", "aside"]):
tag.decompose()
cleaned_html = str(soup)
text = soup.get_text(" ", strip=True)
emails = re.findall(r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}", text)
email = emails[0] if emails else ""
try:
from markdownify import markdownify as md
markdown = md(cleaned_html, heading_style="ATX")
markdown = re.sub(r'\n{3,}', '\n\n', markdown).strip()
except:
markdown = text
yield {
"url": displayed_url,
"title": response.css("title::text").get("") or "",
"email": email,
"html": cleaned_html[:5000],
"markdown": markdown[:5000],
"txt": text[:5000],
"depth": response.meta["depth"],
"parent": response.meta["parent"]
}
self.pages_crawled += 1
if response.meta["depth"] < self.max_depth and self.pages_crawled < self.max_pages:
for href in response.css("a[href]::attr(href)").getall():
if not href or href.startswith(("#", "javascript:", "mailto:", "tel:", "")):
continue
full = urljoin(displayed_url, href)
if full in self.seen:
continue
if not self.should_follow(full):
continue
if self.same_domain_only and urlparse(full).netloc != self.base_domain:
continue
if self.use_cors_proxy:
request_url = self.proxy_base + full
else:
request_url = full
yield Request(
request_url,
callback=self.parse_page,
errback=self.handle_error,
meta={
"depth": response.meta["depth"] + 1,
"parent": displayed_url,
"playwright": self.use_js_everywhere,
"original_url": full
},
dont_filter=False
)
def closed(self, reason):
output_file = sys.argv[9]
failed_file = output_file.replace(".json", "_failed.json")
with open(failed_file, "w") as f:
json.dump(self.failed_urls, f, indent=2)
if __name__ == "__main__":
(
start_url, max_depth, max_pages, use_js_everywhere, same_domain_only,
allow_patterns, deny_patterns, skip_verify_ssl, use_cors_proxy, _, output_file
) = sys.argv[1:12]
process = CrawlerProcess({
"REQUEST_FINGERPRINTER_IMPLEMENTATION": "2.7",
"TWISTED_REACTOR": "twisted.internet.asyncioreactor.AsyncioSelectorReactor",
})
process.crawl(
HierarchicalSpider,
start_url=start_url,
max_depth=max_depth,
max_pages=max_pages,
use_js_everywhere=(use_js_everywhere.lower() == "true"),
same_domain_only=(same_domain_only.lower() == "true"),
allow_patterns=allow_patterns,
deny_patterns=deny_patterns,
skip_verify_ssl=(skip_verify_ssl.lower() == "true"),
use_cors_proxy=(use_cors_proxy.lower() == "true"),
)
process.start()
'''
# === HELPER FUNCTIONS ===
def sanitize_url(url: str) -> str:
url = url.strip()
url = re.sub(r"^[^\w+]*", "", url)
url = re.sub(r"[^\w/\.:-]*$", "", url)
if not url.startswith(("http://", "https://")):
url = "https://" + url
if not urlparse(url).netloc:
raise ValueError("Invalid URL")
return url
def build_tree(pages: List[Dict]) -> Dict:
url_to_node = {}
root = None
for page in pages:
node = {**page, "children": []}
url_to_node[page["url"]] = node
if page.get("parent") is None:
root = node
if not root and url_to_node:
root = next(iter(url_to_node.values()))
for page in pages:
if page["parent"] and page["parent"] in url_to_node:
url_to_node[page["parent"]]["children"].append(url_to_node[page["url"]])
return root or {}
def build_pyvis_graph(pages: List[Dict]) -> str:
try:
from pyvis.network import Network
except ImportError:
return "<p>Pyvis not installed. Run: pip install pyvis</p>"
net = Network(height="600px", width="100%", bgcolor="#ffffff", font_color="black", directed=True)
url_to_id = {p["url"]: i for i, p in enumerate(pages)}
for p in pages:
label = (p.get("title") or p["url"])[:40]
net.add_node(url_to_id[p["url"]], label=label, title=p["url"], size=10 + 5 * (p.get("depth", 0)))
for p in pages:
if p.get("parent") and p["parent"] in url_to_id:
net.add_edge(url_to_id[p["parent"]], url_to_id[p["url"]])
net.set_options("""
var options = {
"physics": {"stabilization": {"iterations": 100}},
"edges": {"arrows": {"to": {"enabled": true}}, "smooth": true}
}
""")
tmp = tempfile.NamedTemporaryFile(suffix=".html", delete=False, mode="w")
net.write_html(tmp.name)
return tmp.name
def export_data(pages: List[Dict], fmt: str, base_path: Path) -> str:
if fmt == "json":
out = base_path.with_suffix(".json")
with open(out, "w") as f:
json.dump(pages, f, indent=2)
elif fmt == "csv":
import csv
out = base_path.with_suffix(".csv")
with open(out, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=["url", "title", "email", "depth", "parent"])
writer.writeheader()
for p in pages:
writer.writerow({k: v for k, v in p.items() if k in writer.fieldnames})
elif fmt == "graphml":
import networkx as nx
G = build_networkx_graph(pages)
out = base_path.with_suffix(".graphml")
nx.write_graphml(G, str(out))
elif fmt == "markdown":
out = base_path.with_suffix(".md")
with open(out, "w") as f:
for p in pages:
f.write(f"## [{p.get('title', 'No Title')}]({p['url']})\n")
f.write(f"Email: {p.get('email', 'N/A')}\n\n")
f.write(f"{p.get('markdown', '')[:300]}...\n\n---\n\n")
return str(out)
def build_networkx_graph(pages: List[Dict]):
import networkx as nx
G = nx.DiGraph()
for p in pages:
G.add_node(p["url"], title=p.get("title", ""), depth=p.get("depth", 0))
for p in pages:
if p.get("parent"):
G.add_edge(p["parent"], p["url"])
return G
def get_system_stats() -> str:
cpu = psutil.cpu_percent()
ram = psutil.virtual_memory().percent
return f"CPU: {cpu:.1f}% | RAM: {ram:.1f}%"
def crawl_now(
url: str,
max_depth: int,
max_pages: int,
reqs_per_sec: float,
use_js_everywhere: bool,
same_domain_only: bool,
show_only_failed: bool,
allow_patterns: str,
deny_patterns: str,
skip_verify_ssl: bool,
use_cors_proxy: bool,
result_format: str,
export_format: str,
) -> Tuple[str, str, str, str, str]:
try:
clean = sanitize_url(url)
key_inputs = f"{clean}_{max_depth}_{max_pages}_{use_js_everywhere}_{same_domain_only}_{allow_patterns}_{deny_patterns}_{skip_verify_ssl}_{use_cors_proxy}"
key = hashlib.md5(key_inputs.encode()).hexdigest()
output_file = CACHE_DIR / f"{key}.json"
failed_file = CACHE_DIR / f"{key}_failed.json"
if not output_file.exists():
script = CACHE_DIR / "spider.py"
script.write_text(SPIDER_CODE)
cmd = [
sys.executable, str(script),
clean, str(max_depth), str(max_pages), str(use_js_everywhere),
str(same_domain_only), allow_patterns, deny_patterns, str(skip_verify_ssl),
str(use_cors_proxy), "0", str(output_file)
]
env = os.environ.copy()
subprocess.run(cmd, check=True, env=env)
if show_only_failed:
if failed_file.exists():
with open(failed_file, "r") as f:
failed = json.load(f)
preview = f"❌ {len(failed)} failed links:\n"
preview += "\n".join(f"- {item['url']} ({item['error']})" for item in failed[:20])
full_json = json.dumps(failed, indent=2)
graph_html = "<p>Failed links have no graph.</p>"
export_path = str(failed_file)
else:
preview = "βœ… No failed links."
full_json = "[]"
graph_html = "<p>No failures.</p>"
export_path = ""
else:
with open(output_file, "r") as f:
flat_data = json.load(f)
if not isinstance(flat_data, list):
flat_data = [flat_data]
# Select result format for preview
preview_lines = []
for item in flat_data[:5]: # preview first 5
content = item.get(result_format, "")[:300]
preview_lines.append(f"[{item.get('depth')}] {item.get('url')}\n{content}\n---")
preview = f"βœ… Showing '{result_format}' format (first 5 of {len(flat_data)} pages)\n\n" + "\n".join(preview_lines)
tree = build_tree(flat_data)
full_json = json.dumps(tree, indent=2)
graph_path = build_pyvis_graph(flat_data)
with open(graph_path, "r") as f:
graph_html = f.read()
export_base = CACHE_DIR / f"{key}_export"
export_path = export_data(flat_data, export_format, export_base)
stats = get_system_stats()
return preview, full_json, graph_html, export_path, stats
except Exception as e:
logger.exception("Crawl failed")
error_json = json.dumps({"error": str(e)}, indent=2)
return f"❌ Error: {e}", error_json, "<p>Error</p>", "", get_system_stats()
# === GRADIO UI ===
with gr.Blocks(title="πŸ•·οΈ Military-Grade Hierarchical Spiderweb Crawler") as app:
gr.Markdown("# πŸ•ΈοΈ Military-Grade Hierarchical Spiderweb Crawler")
with gr.Row():
url = gr.Textbox(label="Start URL", value="https://web.mit.edu/")
depth = gr.Number(label="Max Depth", value=2, minimum=0)
pages = gr.Number(label="Max Pages", value=100, minimum=1)
reqs_per_sec = gr.Number(label="Max Requests/sec", value=2, minimum=0.1)
with gr.Row():
js = gr.Checkbox(label="JS Rendering for All Pages", value=False)
same_domain = gr.Checkbox(label="Same Domain Only", value=True)
skip_ssl = gr.Checkbox(label="Skip SSL Verification", value=False)
use_proxy = gr.Checkbox(label="Use CORS Proxy", value=False)
with gr.Row():
allow_pat = gr.Textbox(label="Allow URL Regex (| separated)", placeholder=r".*\.mit\.edu.*")
deny_pat = gr.Textbox(label="Deny URL Regex (| separated)", placeholder=r"/admin|/login")
with gr.Row():
result_fmt = gr.Dropdown(
choices=["txt", "markdown", "html"],
value="txt",
label="Result Content Format"
)
export_fmt = gr.Dropdown(
choices=["json", "csv", "graphml", "markdown"],
value="json",
label="Export Format"
)
btn = gr.Button("πŸ•·οΈ Start Crawl", variant="primary")
with gr.Tab("Tree Preview"):
preview_out = gr.Textbox(label="Crawl Summary", lines=10)
with gr.Tab("Full Nested JSON"):
json_out = gr.Code(language="json", label="Nested Tree", lines=20)
with gr.Tab("Interactive Link Graph"):
graph_out = gr.HTML(label="Network Visualization")
with gr.Tab("Export File"):
file_out = gr.File(label="Download Export")
stats_display = gr.Textbox(label="System Stats (Live)", interactive=False, lines=1)
# Log Viewer
with gr.Accordion("πŸ“œ Live Log Viewer (Last 100 Lines)", open=False):
log_viewer = gr.Textbox(label="", lines=10, max_lines=20, interactive=False)
def update_log():
if LOG_FILE.exists():
with open(LOG_FILE, "r") as f:
lines = f.readlines()
return "".join(lines[-100:])
return "No logs yet."
app.load(fn=update_log, inputs=None, outputs=log_viewer, every=2)
btn.click(
fn=crawl_now,
inputs=[
url, depth, pages, reqs_per_sec, js, same_domain,
False, # show_only_failed handled separately if needed
allow_pat, deny_pat, skip_ssl, use_proxy,
result_fmt, export_fmt
],
outputs=[preview_out, json_out, graph_out, file_out, stats_display]
)
# Failed-only button (optional)
with gr.Row():
failed_btn = gr.Button("πŸ” Show Only Failed Links")
failed_btn.click(
fn=lambda *args: crawl_now(*args[:-2], True, *args[-2:]), # inject show_only_failed=True
inputs=[
url, depth, pages, reqs_per_sec, js, same_domain,
allow_pat, deny_pat, skip_ssl, use_proxy,
result_fmt, export_fmt
],
outputs=[preview_out, json_out, graph_out, file_out, stats_display]
)
gr.Markdown("## πŸ›‘οΈ Features: Stealth, Multi-Domain, JS, Graph, CORS Proxy, Format Selector, Export")
if __name__ == "__main__":
multiprocessing.set_start_method("spawn", force=True)
app.launch(
server_name="0.0.0.0",
server_port=7860,
show_api=True,
mcp_server=True
)