import json import os import sys from pathlib import Path def analyze_structural_data(recon_json_path, raw_data=None): """ Parses Stage 0 raw JSON (from file or dict) and produces a structured 'Execution Blueprint' for Stage 1 to use as a configuration source. """ if raw_data: data = raw_data elif recon_json_path and os.path.exists(recon_json_path): try: with open(recon_json_path, "r", encoding="utf-8") as f: data = json.load(f) except Exception as e: return {"status": "error", "message": f"Failed to parse JSON: {str(e)}"} else: return {"status": "error", "message": "No data source provided"} # --- EXTRACT STRUCTURAL INTELLIGENCE --- infra = data.get("infrastructure", {}) security = data.get("security_audit", {}) topology = data.get("topology", {}) blueprint = data.get("blueprint", {}) wcs_compat = data.get("wcs_compatibility", {}) vendor = security.get("vendor", "Unknown") # 1. Determine Stealth Level Requirements # 1. Determine Stealth Level Requirements stealth_mode = "NORMAL" is_webdriver = security.get("environment_consistency", []) # Check for Webdriver leakage if any("webdriver" in s.lower() for s in is_webdriver): stealth_mode = "STEALTH_PARANOID" # Check for WAF presence (Aggressive Posture) KNOWN_WAFS = ["Cloudflare", "Akamai", "Datadome", "Incapsula", "Kasada"] if any(waf.lower() in str(vendor).lower() for waf in KNOWN_WAFS): stealth_mode = "STEALTH_PARANOID" # Determine if XHR hook is possible based on security posture can_hook = stealth_mode != "STEALTH_PARANOID" and not any(waf.lower() in str(vendor).lower() for waf in KNOWN_WAFS) network_mode = "XHR_HOOK" if can_hook else "DOM_POLLING" # 3. Timing Adjustments (based on latency and thread lag) latency = infra.get("cdn_latency_avg", 100) thread_lag = wcs_compat.get("main_thread_lag_ms", 0) base_wait = 2000 # Default 2s if latency > 200 or thread_lag > 50: base_wait = 5000 # Slow site, wait longer elif latency < 50: base_wait = 1000 # Very fast site # 4. Target Selectors (Prioritized) recommended = blueprint.get("recommended_selectors", []) priority_selector = None if recommended: # Pick the one with highest count or a specific attribute priority_selector = recommended[0].get("attr") # 5. Advanced Browser Recommendation vendor = security.get("vendor", "Unknown") is_strict = security.get("strict_mode", False) or "Kasada" in str(vendor) or "Akamai" in str(vendor) # Tier mapping: # - Enhanced: Camoufox (High Risk/Strict) # - Normal: Chrome (Standard WAF) # - Playwright: Chromium (Low Risk) if is_strict: rec_browser = "camoufox" elif "Cloudflare" in str(vendor) or "Incapsula" in str(vendor): rec_browser = "chrome" else: rec_browser = "chromium" # --- COMPILE ADAPTIVE BLUEPRINT --- source_name = Path(recon_json_path).name if recon_json_path else "memory_buffer" blueprint_output = { "meta": { "source": source_name, "generated_for": "Stage 1: Capture" }, "adaptation_profile": { "stealth_level": stealth_mode, "wait_ms": base_wait, "observer_strategy": network_mode, "recommended_browser": rec_browser, "device_category": data.get("responsive", {}).get("category", "desktop"), "concurrency_limit": 1 if "Cloudflare" in str(vendor) else 3 }, "target_hints": { "page_type": topology.get("type", "UNKNOWN"), "best_selector_attr": priority_selector, "expected_links": topology.get("total", 0) }, "risk_profile": { "soft_ban_risk": infra.get("soft_ban_detected", False), "waf_detected": True if security.get("vendor") != "Unknown" else False } } return blueprint_output if __name__ == "__main__": if len(sys.argv) < 2: print("Usage: py tools/structural_check.py ") sys.exit(1) result = analyze_structural_data(sys.argv[1]) print(json.dumps(result, indent=4))