Spaces:
Running
Running
| """ | |
| PatchHawk sandbox β Docker-based isolated code execution and | |
| three-stage patch validation pipeline. | |
| Execution constraints (Docker mode): | |
| --network none (no outbound connections) | |
| --memory 256m (hard memory cap) | |
| --cpus 0.5 (half a CPU core) | |
| Falls back to local subprocess when use_docker=False (dev/CI only). | |
| """ | |
| import os | |
| import shutil | |
| import subprocess | |
| import tempfile | |
| import functools | |
| from typing import Dict, Any, Optional | |
| def is_docker_available() -> bool: | |
| """Check if Docker is installed and the daemon is reachable.""" | |
| try: | |
| subprocess.run(["docker", "ps"], capture_output=True, check=True, timeout=2) | |
| return True | |
| except Exception: | |
| return False | |
| # ===================================================================== | |
| # Code Execution | |
| # ===================================================================== | |
| def run_code( | |
| code: str, | |
| timeout_sec: int = 5, | |
| use_docker: bool = True, | |
| ) -> Dict[str, Any]: | |
| """Execute *code* in an isolated sandbox and return telemetry.""" | |
| temp_dir = tempfile.mkdtemp(prefix="patchhawk_sandbox_") | |
| script_path = os.path.join(temp_dir, "script.py") | |
| with open(script_path, "w", encoding="utf-8") as f: | |
| f.write(code) | |
| result: Dict[str, Any] = { | |
| "stdout": "", | |
| "stderr": "", | |
| "exit_code": -1, | |
| "network_blocked": use_docker, | |
| "file_writes": [], | |
| } | |
| try: | |
| # Check if Docker is actually usable | |
| effective_use_docker = use_docker and is_docker_available() | |
| if effective_use_docker: | |
| cmd = [ | |
| "docker", | |
| "run", | |
| "--rm", | |
| "--network", | |
| "none", | |
| "--memory", | |
| "256m", | |
| "--cpus", | |
| "0.5", | |
| "-v", | |
| f"{temp_dir}:/app:rw", | |
| "patchhawk-sandbox:latest", | |
| "python", | |
| "/app/script.py", | |
| ] | |
| else: | |
| if use_docker: | |
| print("[SANDBOX] Docker requested but not available. Falling back to host python.") | |
| cmd = ["python3", script_path] | |
| proc = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout_sec) | |
| result["stdout"] = proc.stdout | |
| result["stderr"] = proc.stderr | |
| result["exit_code"] = proc.returncode | |
| # Record any new files the code wrote | |
| for fname in os.listdir(temp_dir): | |
| if fname != "script.py": | |
| result["file_writes"].append(fname) | |
| except subprocess.TimeoutExpired: | |
| result["stderr"] = "Execution timed out." | |
| except Exception as exc: | |
| result["stderr"] = f"Execution error: {exc}" | |
| finally: | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| return result | |
| def check_syntax( | |
| code: str, | |
| use_docker: bool = True, | |
| ) -> tuple: | |
| """Run ``python -m py_compile`` and return (ok: bool, error_msg: str).""" | |
| temp_dir = tempfile.mkdtemp(prefix="patchhawk_syntax_") | |
| script_path = os.path.join(temp_dir, "script.py") | |
| with open(script_path, "w", encoding="utf-8") as f: | |
| f.write(code) | |
| try: | |
| effective_use_docker = use_docker and is_docker_available() | |
| if effective_use_docker: | |
| cmd = [ | |
| "docker", | |
| "run", | |
| "--rm", | |
| "--network", | |
| "none", | |
| "--memory", | |
| "256m", | |
| "--cpus", | |
| "0.5", | |
| "-v", | |
| f"{temp_dir}:/app:rw", | |
| "patchhawk-sandbox:latest", | |
| "python", | |
| "-m", | |
| "py_compile", | |
| "/app/script.py", | |
| ] | |
| else: | |
| cmd = ["python3", "-m", "py_compile", script_path] | |
| proc = subprocess.run(cmd, capture_output=True, text=True, timeout=5) | |
| if proc.returncode == 0: | |
| return True, "" | |
| return False, proc.stderr | |
| except subprocess.TimeoutExpired: | |
| return False, "Syntax check timed out" | |
| except Exception as exc: | |
| return False, f"Syntax check failed: {exc}" | |
| finally: | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| # ===================================================================== | |
| # Three-Stage Patch Validation | |
| # ===================================================================== | |
| def validate_patch( | |
| scenario: dict, | |
| patch_code: str, | |
| use_docker: bool = True, | |
| ) -> tuple: | |
| """ | |
| Returns (success: bool, message: str, details: dict). | |
| Stages: | |
| 1. Syntax check β python -m py_compile on patched code | |
| 2. Unit test execution β run unit_test_code against patched code | |
| 3. Re-attack β compare malicious vs patched side-effects | |
| """ | |
| details: dict = {} | |
| # ------------------------------------------------------------------ | |
| # Stage 1 β Syntax | |
| # ------------------------------------------------------------------ | |
| ok, err = check_syntax(patch_code, use_docker=use_docker) | |
| if not ok: | |
| details["error"] = err | |
| return False, "Syntax error", details | |
| # ------------------------------------------------------------------ | |
| # Stage 2 β Unit test | |
| # ------------------------------------------------------------------ | |
| test_code = scenario.get("unit_test_code") | |
| if test_code: | |
| combined = ( | |
| "import sys, types\n" | |
| "mod = types.ModuleType('code')\n" | |
| f"exec('''{patch_code}''', mod.__dict__)\n" | |
| "sys.modules['code'] = mod\n" | |
| f"{test_code}\n" | |
| "print('TEST_SUCCESS')\n" | |
| ) | |
| res = run_code(combined, timeout_sec=5, use_docker=use_docker) | |
| if res["exit_code"] != 0 or "TEST_SUCCESS" not in res["stdout"]: | |
| details["test_output"] = res["stderr"] or res["stdout"] | |
| return False, "Unit test failed", details | |
| # ------------------------------------------------------------------ | |
| # Stage 3 β Re-attack verification | |
| # ------------------------------------------------------------------ | |
| if scenario.get("type") == "true_positive" and scenario.get("attack_type"): | |
| attack_res = run_code( | |
| scenario["code_snippet"], timeout_sec=5, use_docker=use_docker | |
| ) | |
| patch_res = run_code(patch_code, timeout_sec=5, use_docker=use_docker) | |
| vulnerability_remains = False | |
| # Check file-write side-effects | |
| if len(attack_res.get("file_writes", [])) > 0 and len( | |
| patch_res.get("file_writes", []) | |
| ) >= len(attack_res.get("file_writes", [])): | |
| vulnerability_remains = True | |
| # Isolate and match syntax tree against domain-specific threat heuristics | |
| attack_type = scenario["attack_type"] | |
| if attack_type == "typosquatting": | |
| if "pythonn" in patch_res.get("stderr", ""): | |
| vulnerability_remains = True | |
| elif attack_type == "obfuscated_exec": | |
| if ( | |
| attack_res["stdout"].strip() | |
| and patch_res["stdout"] == attack_res["stdout"] | |
| ): | |
| vulnerability_remains = True | |
| elif attack_type == "subprocess_backdoor": | |
| if "nc" in patch_code or "subprocess.call" in patch_code: | |
| vulnerability_remains = True | |
| elif attack_type == "socket_exfil": | |
| if "socket" in patch_code and "connect" in patch_code: | |
| vulnerability_remains = True | |
| elif attack_type == "env_hijack": | |
| if "os.environ" in patch_code and "PATH" in patch_code: | |
| vulnerability_remains = True | |
| if vulnerability_remains: | |
| details["evidence"] = { | |
| "attack_telemetry": attack_res, | |
| "patch_telemetry": patch_res, | |
| } | |
| return False, "Vulnerability still accessible", details | |
| details["validation_log"] = "All checks passed successfully." | |
| return True, "Patch is valid", details | |