Spaces:
Running
Running
| import json | |
| import os | |
| import sys | |
| from pathlib import Path | |
| def analyze_structural_data(recon_json_path, raw_data=None): | |
| """ | |
| Parses Stage 0 raw JSON (from file or dict) and produces a structured | |
| 'Execution Blueprint' for Stage 1 to use as a configuration source. | |
| """ | |
| if raw_data: | |
| data = raw_data | |
| elif recon_json_path and os.path.exists(recon_json_path): | |
| try: | |
| with open(recon_json_path, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| except Exception as e: | |
| return {"status": "error", "message": f"Failed to parse JSON: {str(e)}"} | |
| else: | |
| return {"status": "error", "message": "No data source provided"} | |
| # --- EXTRACT STRUCTURAL INTELLIGENCE --- | |
| infra = data.get("infrastructure", {}) | |
| security = data.get("security_audit", {}) | |
| topology = data.get("topology", {}) | |
| blueprint = data.get("blueprint", {}) | |
| wcs_compat = data.get("wcs_compatibility", {}) | |
| vendor = security.get("vendor", "Unknown") | |
| # 1. Determine Stealth Level Requirements | |
| # 1. Determine Stealth Level Requirements | |
| stealth_mode = "NORMAL" | |
| is_webdriver = security.get("environment_consistency", []) | |
| # Check for Webdriver leakage | |
| if any("webdriver" in s.lower() for s in is_webdriver): | |
| stealth_mode = "STEALTH_PARANOID" | |
| # Check for WAF presence (Aggressive Posture) | |
| KNOWN_WAFS = ["Cloudflare", "Akamai", "Datadome", "Incapsula", "Kasada"] | |
| if any(waf.lower() in str(vendor).lower() for waf in KNOWN_WAFS): | |
| stealth_mode = "STEALTH_PARANOID" | |
| # Determine if XHR hook is possible based on security posture | |
| can_hook = stealth_mode != "STEALTH_PARANOID" and not any(waf.lower() in str(vendor).lower() for waf in KNOWN_WAFS) | |
| network_mode = "XHR_HOOK" if can_hook else "DOM_POLLING" | |
| # 3. Timing Adjustments (based on latency and thread lag) | |
| latency = infra.get("cdn_latency_avg", 100) | |
| thread_lag = wcs_compat.get("main_thread_lag_ms", 0) | |
| base_wait = 2000 # Default 2s | |
| if latency > 200 or thread_lag > 50: | |
| base_wait = 5000 # Slow site, wait longer | |
| elif latency < 50: | |
| base_wait = 1000 # Very fast site | |
| # 4. Target Selectors (Prioritized) | |
| recommended = blueprint.get("recommended_selectors", []) | |
| priority_selector = None | |
| if recommended: | |
| # Pick the one with highest count or a specific attribute | |
| priority_selector = recommended[0].get("attr") | |
| # 5. Advanced Browser Recommendation | |
| vendor = security.get("vendor", "Unknown") | |
| is_strict = security.get("strict_mode", False) or "Kasada" in str(vendor) or "Akamai" in str(vendor) | |
| # Tier mapping: | |
| # - Enhanced: Camoufox (High Risk/Strict) | |
| # - Normal: Chrome (Standard WAF) | |
| # - Playwright: Chromium (Low Risk) | |
| if is_strict: | |
| rec_browser = "camoufox" | |
| elif "Cloudflare" in str(vendor) or "Incapsula" in str(vendor): | |
| rec_browser = "chrome" | |
| else: | |
| rec_browser = "chromium" | |
| # --- COMPILE ADAPTIVE BLUEPRINT --- | |
| source_name = Path(recon_json_path).name if recon_json_path else "memory_buffer" | |
| blueprint_output = { | |
| "meta": { | |
| "source": source_name, | |
| "generated_for": "Stage 1: Capture" | |
| }, | |
| "adaptation_profile": { | |
| "stealth_level": stealth_mode, | |
| "wait_ms": base_wait, | |
| "observer_strategy": network_mode, | |
| "recommended_browser": rec_browser, | |
| "device_category": data.get("responsive", {}).get("category", "desktop"), | |
| "concurrency_limit": 1 if "Cloudflare" in str(vendor) else 3 | |
| }, | |
| "target_hints": { | |
| "page_type": topology.get("type", "UNKNOWN"), | |
| "best_selector_attr": priority_selector, | |
| "expected_links": topology.get("total", 0) | |
| }, | |
| "risk_profile": { | |
| "soft_ban_risk": infra.get("soft_ban_detected", False), | |
| "waf_detected": True if security.get("vendor") != "Unknown" else False | |
| } | |
| } | |
| return blueprint_output | |
| if __name__ == "__main__": | |
| if len(sys.argv) < 2: | |
| print("Usage: py tools/structural_check.py <path_to_recon_json>") | |
| sys.exit(1) | |
| result = analyze_structural_data(sys.argv[1]) | |
| print(json.dumps(result, indent=4)) | |