web-scraper / app.pyxxxx
John1234huh's picture
Rename app.py to app.pyxxxx
db4f919 verified
#!/usr/bin/env python3
"""
πŸ•·οΈ TRUE HIERARCHICAL SPIDERWEB CRAWLER
- Outputs nested JSON tree: {url, title, ..., children: [...]}
- Each node knows its parent implicitly via structure
- Skips binaries, login pages, DNS errors
- Optimized for 16GB RAM / 2 vCPU
- MCP server on 0.0.0.0:7860
"""
import os
import sys
import json
import tempfile
import subprocess
import hashlib
import re
import multiprocessing
from pathlib import Path
from urllib.parse import urljoin, urlparse
from typing import Tuple, Dict, Any
import gradio as gr
from loguru import logger
# === LOG AUTO-CLEAN ===
LOG_FILE = Path(tempfile.gettempdir()) / "mcp_spider.log"
if LOG_FILE.exists():
with open(LOG_FILE, "r") as f:
lines = f.readlines()
if len(lines) > 10000:
with open(LOG_FILE, "w") as f:
f.writelines(lines[-5000:])
logger.remove()
logger.add(sys.stderr, level="INFO")
logger.add(LOG_FILE, rotation="10 MB")
# === PLAYWRIGHT INSTALL ===
def install_playwright():
try:
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
p.chromium.launch(headless=True).close()
except Exception:
subprocess.run([sys.executable, "-m", "playwright", "install", "chromium"],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
install_playwright()
CACHE_DIR = Path(tempfile.gettempdir()) / "mcp_spider_cache"
CACHE_DIR.mkdir(exist_ok=True, parents=True)
# === SPIDER THAT TRACKS PARENT ===
SPIDER_CODE = r'''
import sys
import json
import re
from urllib.parse import urljoin, urlparse
from scrapy.crawler import CrawlerProcess
from scrapy import Spider, Request
from bs4 import BeautifulSoup
class HierarchicalSpider(Spider):
name = "hierarchical_spider"
custom_settings = {
"USER_AGENT": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36",
"ROBOTSTXT_OBEY": False,
"DOWNLOAD_DELAY": 0.0,
"CONCURRENT_REQUESTS_PER_DOMAIN": 16,
"CONCURRENT_REQUESTS": 32,
"COOKIES_ENABLED": False,
"TELNETCONSOLE_ENABLED": False,
"PLAYWRIGHT_BROWSER_TYPE": "chromium",
"PLAYWRIGHT_LAUNCH_OPTIONS": {"headless": True, "args": ["--no-sandbox"]},
"RETRY_TIMES": 2,
"RETRY_HTTP_CODES": [500, 502, 503, 504, 408, 429],
"FEEDS": {sys.argv[6]: {"format": "json", "overwrite": True}},
}
def __init__(self, start_url, max_depth, max_pages, use_js, **kwargs):
super().__init__(**kwargs)
self.start_url = start_url
self.max_depth = int(max_depth)
self.max_pages = int(max_pages)
self.use_js = use_js
self.base_domain = urlparse(start_url).netloc
self.pages_crawled = 0
self.seen = set()
def start_requests(self):
yield Request(
self.start_url,
callback=self.parse_page,
meta={"depth": 0, "parent": None, "playwright": self.use_js},
dont_filter=True
)
def parse_page(self, response):
if self.pages_crawled >= self.max_pages:
return
url = response.url
if url in self.seen:
return
self.seen.add(url)
ct = response.headers.get("Content-Type", b"").decode().lower()
if "text/html" not in ct:
return
if any(x in url.lower() for x in ["/login", "/user/", "/auth", ".pdf", ".jpg", ".png", ".zip"]):
return
try:
soup = BeautifulSoup(response.text, "html.parser")
except:
return
for tag in soup(["script", "style", "nav", "footer", "header", "aside"]):
tag.decompose()
text = soup.get_text(" ", strip=True)
emails = re.findall(r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}", text)
email = emails[0] if emails else ""
try:
from markdownify import markdownify as md
markdown = md(str(soup), heading_style="ATX")
markdown = re.sub(r'\n{3,}', '\n\n', markdown)
markdown = re.sub(r'[ \t]+', ' ', markdown).strip()
except:
markdown = text
yield {
"url": url,
"title": response.css("title::text").get("") or "",
"email": email,
"raw_text": text[:5000],
"markdown": markdown[:5000],
"depth": response.meta["depth"],
"parent": response.meta["parent"] # βœ… TRACK PARENT
}
self.pages_crawled += 1
if response.meta["depth"] < self.max_depth and self.pages_crawled < self.max_pages:
for href in response.css("a[href]::attr(href)").getall():
if not href or href.startswith(("#", "javascript:", "mailto:", "tel:")):
continue
full = urljoin(url, href)
parsed = urlparse(full)
if parsed.netloc == self.base_domain and full not in self.seen:
yield Request(
full,
callback=self.parse_page,
meta={
"depth": response.meta["depth"] + 1,
"parent": url, # βœ… PASS PARENT
"playwright": self.use_js
},
dont_filter=False
)
if __name__ == "__main__":
start_url = sys.argv[1]
max_depth = sys.argv[2]
max_pages = sys.argv[3]
use_js = sys.argv[4].lower() == "true"
output_file = sys.argv[6]
process = CrawlerProcess()
process.crawl(HierarchicalSpider, start_url=start_url, max_depth=max_depth, max_pages=max_pages, use_js=use_js)
process.start()
'''
def build_tree(pages: list) -> dict:
"""Convert flat list with parent refs into nested tree."""
url_to_node = {}
root = None
# First pass: create all nodes
for page in pages:
node = {**page, "children": []}
url_to_node[page["url"]] = node
if page.get("parent") is None:
root = node
# Second pass: attach children
for page in pages:
if page["parent"] and page["parent"] in url_to_node:
url_to_node[page["parent"]]["children"].append(url_to_node[page["url"]])
return root or (url_to_node[next(iter(url_to_node))] if url_to_node else {})
def run_crawl(start_url: str, max_depth: int, max_pages: int, use_js: bool) -> Tuple[str, str, str]:
key = hashlib.md5(f"{start_url}_{max_depth}_{max_pages}_{use_js}".encode()).hexdigest()
output_file = CACHE_DIR / f"{key}.json"
if not output_file.exists():
script = CACHE_DIR / "spider.py"
script.write_text(SPIDER_CODE)
cmd = [
sys.executable, str(script),
start_url, str(max_depth), str(max_pages), str(use_js), "0", str(output_file)
]
subprocess.run(cmd, check=True)
# Read flat data
with open(output_file, "r") as f:
flat_data = json.load(f)
if not isinstance(flat_data, list):
flat_data = [flat_data]
# Build tree
tree = build_tree(flat_data)
full_json_str = json.dumps(tree, indent=2)
# Preview
preview = f"βœ… Built hierarchical tree (depth ≀ {max_depth})\n"
preview += f"Root: {tree.get('url', 'N/A')}\n"
preview += f"Total nodes: {len(flat_data)}\n\n"
def _summarize(node, depth=0, max_show=10, out=[]):
if len(out) >= max_show:
return
indent = " " * depth
title = node.get("title", "N/A")
email = node.get("email", "no email")
out.append(f"{indent}β”œβ”€ [{node.get('depth', '?')}] {title} ({email})")
for child in node.get("children", [])[:3]:
_summarize(child, depth + 1, max_show, out)
summary_lines = []
_summarize(tree, out=summary_lines)
preview += "\n".join(summary_lines[:15])
return preview, full_json_str, str(output_file)
def sanitize_url(url: str) -> str:
url = url.strip()
url = re.sub(r"^[^\w+]*", "", url)
url = re.sub(r"[^\w/\.:-]*$", "", url)
if not url.startswith(("http://", "https://")):
url = "https://" + url
if not urlparse(url).netloc:
raise ValueError("Invalid URL")
return url
def crawl_now(url: str, max_depth: int, max_pages: int, use_js: bool) -> Tuple[str, str, str]:
try:
clean = sanitize_url(url)
return run_crawl(clean, max_depth, max_pages, use_js)
except Exception as e:
error_json = json.dumps({"error": str(e)}, indent=2)
return f"❌ Error: {e}", error_json, ""
# === GRADIO UI ===
with gr.Blocks(title="πŸ•·οΈ Hierarchical Spiderweb Crawler") as app:
gr.Markdown("# πŸ•ΈοΈ True Hierarchical Spiderweb Crawler")
with gr.Row():
url = gr.Textbox(label="Start URL", value="https://web.mit.edu/")
depth = gr.Number(label="Max Depth", value=2, minimum=0)
pages = gr.Number(label="Max Pages", value=100, minimum=1)
with gr.Row():
js = gr.Checkbox(label="JS Rendering", value=False)
domain = gr.Textbox(value="Same Domain Only", interactive=False)
btn = gr.Button("πŸ•·οΈ Start Recursive Crawl", variant="primary")
with gr.Tab("Tree Preview"):
preview_out = gr.Textbox(label="Hierarchical Summary", lines=15)
with gr.Tab("Full Nested JSON"):
json_out = gr.Code(language="json", label="Spiderweb Tree (Copy-Paste Ready)", lines=25)
file_out = gr.File(label="Download Nested JSON")
btn.click(
fn=crawl_now,
inputs=[url, depth, pages, js],
outputs=[preview_out, json_out, file_out]
)
gr.Markdown("## 🌐 Output Structure")
gr.Markdown("""
```json
{
"url": "...",
"title": "...",
"email": "...",
"depth": 0,
"parent": null,
"children": [
{
"url": "...",
"title": "...",
"depth": 1,
"parent": "...",
"children": [...]
}
]
}
```
""")
if __name__ == "__main__":
multiprocessing.set_start_method("spawn", force=True)
app.launch(server_name="0.0.0.0", server_port=7860, mcp_server=True, show_api=True)