webscraper / tools /structural_check.py
bluedragonDC
🚀 Deploy: Sniper MCP Forensic Scraper + Gradio
5ddaa4f
Raw
History Blame Contribute Delete
4.32 kB
import json
import os
import sys
from pathlib import Path
def analyze_structural_data(recon_json_path, raw_data=None):
"""
Parses Stage 0 raw JSON (from file or dict) and produces a structured
'Execution Blueprint' for Stage 1 to use as a configuration source.
"""
if raw_data:
data = raw_data
elif recon_json_path and os.path.exists(recon_json_path):
try:
with open(recon_json_path, "r", encoding="utf-8") as f:
data = json.load(f)
except Exception as e:
return {"status": "error", "message": f"Failed to parse JSON: {str(e)}"}
else:
return {"status": "error", "message": "No data source provided"}
# --- EXTRACT STRUCTURAL INTELLIGENCE ---
infra = data.get("infrastructure", {})
security = data.get("security_audit", {})
topology = data.get("topology", {})
blueprint = data.get("blueprint", {})
wcs_compat = data.get("wcs_compatibility", {})
vendor = security.get("vendor", "Unknown")
# 1. Determine Stealth Level Requirements
# 1. Determine Stealth Level Requirements
stealth_mode = "NORMAL"
is_webdriver = security.get("environment_consistency", [])
# Check for Webdriver leakage
if any("webdriver" in s.lower() for s in is_webdriver):
stealth_mode = "STEALTH_PARANOID"
# Check for WAF presence (Aggressive Posture)
KNOWN_WAFS = ["Cloudflare", "Akamai", "Datadome", "Incapsula", "Kasada"]
if any(waf.lower() in str(vendor).lower() for waf in KNOWN_WAFS):
stealth_mode = "STEALTH_PARANOID"
# Determine if XHR hook is possible based on security posture
can_hook = stealth_mode != "STEALTH_PARANOID" and not any(waf.lower() in str(vendor).lower() for waf in KNOWN_WAFS)
network_mode = "XHR_HOOK" if can_hook else "DOM_POLLING"
# 3. Timing Adjustments (based on latency and thread lag)
latency = infra.get("cdn_latency_avg", 100)
thread_lag = wcs_compat.get("main_thread_lag_ms", 0)
base_wait = 2000 # Default 2s
if latency > 200 or thread_lag > 50:
base_wait = 5000 # Slow site, wait longer
elif latency < 50:
base_wait = 1000 # Very fast site
# 4. Target Selectors (Prioritized)
recommended = blueprint.get("recommended_selectors", [])
priority_selector = None
if recommended:
# Pick the one with highest count or a specific attribute
priority_selector = recommended[0].get("attr")
# 5. Advanced Browser Recommendation
vendor = security.get("vendor", "Unknown")
is_strict = security.get("strict_mode", False) or "Kasada" in str(vendor) or "Akamai" in str(vendor)
# Tier mapping:
# - Enhanced: Camoufox (High Risk/Strict)
# - Normal: Chrome (Standard WAF)
# - Playwright: Chromium (Low Risk)
if is_strict:
rec_browser = "camoufox"
elif "Cloudflare" in str(vendor) or "Incapsula" in str(vendor):
rec_browser = "chrome"
else:
rec_browser = "chromium"
# --- COMPILE ADAPTIVE BLUEPRINT ---
source_name = Path(recon_json_path).name if recon_json_path else "memory_buffer"
blueprint_output = {
"meta": {
"source": source_name,
"generated_for": "Stage 1: Capture"
},
"adaptation_profile": {
"stealth_level": stealth_mode,
"wait_ms": base_wait,
"observer_strategy": network_mode,
"recommended_browser": rec_browser,
"device_category": data.get("responsive", {}).get("category", "desktop"),
"concurrency_limit": 1 if "Cloudflare" in str(vendor) else 3
},
"target_hints": {
"page_type": topology.get("type", "UNKNOWN"),
"best_selector_attr": priority_selector,
"expected_links": topology.get("total", 0)
},
"risk_profile": {
"soft_ban_risk": infra.get("soft_ban_detected", False),
"waf_detected": True if security.get("vendor") != "Unknown" else False
}
}
return blueprint_output
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: py tools/structural_check.py <path_to_recon_json>")
sys.exit(1)
result = analyze_structural_data(sys.argv[1])
print(json.dumps(result, indent=4))