rl_code_fix_env / src /sandbox /execution.py
Viraj0112's picture
Upload folder using huggingface_hub
03a907a verified
import os
import re
import shutil
import subprocess
import tempfile
from pathlib import Path
from typing import Dict, List, Optional, Tuple
TIMEOUT_SECONDS = 30
MEMORY_LIMIT = "512m"
CPU_LIMIT = "1.0"
SANDBOX_IMAGE = "python-sandbox-image"
# True when docker CLI is available on this host
_DOCKER_AVAILABLE: Optional[bool] = None
def _docker_available() -> bool:
global _DOCKER_AVAILABLE
if _DOCKER_AVAILABLE is None:
_DOCKER_AVAILABLE = shutil.which("docker") is not None
return _DOCKER_AVAILABLE
PYTEST_REPORT_FILENAME = ".pytest_error_report.json"
def _parse_pytest_counts(logs: str) -> Tuple[Optional[int], Optional[int]]:
"""
Parse pytest output to extract pass/fail counts.
Returns:
(passed_count, total_count) or (None, None) if not found
"""
import re
# Try to find patterns like "5 passed" or "3 passed, 2 failed"
# Also matches "5 passed, 1 failed" or "1 passed"
patterns = [
r'(\d+)\s+passed', # "5 passed"
r'(\d+)\s+passed.*?(\d+)\s+failed', # "3 passed, 2 failed"
r'(\d+)\s+failed', # "0 passed, 5 failed" - should handle this
]
# Look for summary line like "====== 5 passed, 2 failed ======"
summary_match = re.search(r'(\d+)\s+passed[,.]?\s*(\d+)?\s*failed?', logs)
if summary_match:
passed = int(summary_match.group(1))
failed = int(summary_match.group(2)) if summary_match.group(2) else 0
return passed, passed + failed
# Look for "X passed" only
passed_match = re.search(r'(\d+)\s+passed', logs)
if passed_match:
passed = int(passed_match.group(1))
# Check if there's also failed count
failed_match = re.search(r'(\d+)\s+failed', logs)
if failed_match:
failed = int(failed_match.group(1))
return passed, passed + failed
else:
# Assume all tests passed if only "X passed" and no failed
return passed, passed
# Look for "X failed" only without passed
failed_only_match = re.search(r'(\d+)\s+failed', logs)
if failed_only_match:
failed = int(failed_only_match.group(1))
return 0, failed
return None, None
def _build_docker_cmd(workspace_dir: str, test_file: str, extra_flags: List[str] = []) -> List[str]:
"""
Centralised Docker command builder shared by run_test_file and
get_error_logs so sandbox flags are never duplicated or mismatched.
"""
return [
"docker", "run",
# -- Ephemeral Sandbox --
"--rm",
# -- Network Security --
"--network", "none",
# -- Resource Constraints --
"--memory", MEMORY_LIMIT,
"--cpus", CPU_LIMIT,
"--pids-limit", "50",
# -- File System Security --
"--read-only",
"--tmpfs", "/tmp",
"-v", f"{workspace_dir}:/app:rw",
"-w", "/app",
# -- Privilege Escalation Prevention --
"--user", "1000:1000",
"--cap-drop", "ALL",
"--security-opt", "no-new-privileges",
# -- Execution --
SANDBOX_IMAGE,
"pytest", test_file,
"--timeout=15",
*extra_flags,
]
def run_test_file(
code_file: str,
test_file: str,
workspace_dir: str,
) -> Tuple[bool, str]:
"""
Execute a pytest test file using direct subprocess execution.
IMPORTANT: Direct execution is ALWAYS used (never Docker) because:
1. Docker containers don't have conftest.py for module aliasing (src.dataset issue)
2. Docker containers don't have access to the full project structure
3. Direct execution allows proper test discovery and import resolution
4. This enables SWE-bench task compatibility
Returns:
(passed, logs)
"""
# Always use direct execution - skip Docker entirely for SWE-bench compatibility
return _run_direct(code_file, test_file, workspace_dir)
def _run_in_docker(
code_file: str,
test_file: str,
workspace_dir: str,
) -> Tuple[bool, str]:
"""Docker-sandboxed execution (original path)."""
report_path_in_container = f"/app/{PYTEST_REPORT_FILENAME}"
cmd = _build_docker_cmd(
workspace_dir,
test_file,
extra_flags=[
"--tb=short",
"--json-report",
f"--json-report-file={report_path_in_container}",
],
)
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=TIMEOUT_SECONDS,
)
passed = result.returncode == 0
logs = result.stdout + "\n" + result.stderr
# Parse actual pass/fail counts from pytest output
passed_count, total_count = _parse_pytest_counts(logs)
# If we have counts, return them encoded in the logs for the environment to parse
if passed_count is not None and total_count is not None:
logs = f"[TEST_COUNTS] passed={passed_count} total={total_count}\n" + logs
return passed, logs
except subprocess.TimeoutExpired:
return False, "CRITICAL ERROR: Execution timed out."
except Exception as e:
return False, f"failed the sandbox execution: {e}"
def _run_direct(
code_file: str,
test_file: str,
workspace_dir: str,
) -> Tuple[bool, str]:
"""
Direct pytest execution used when docker is unavailable (e.g. inside a
container on HF Spaces).
Strategy: patch buggy.py in-place in the real workspace, run pytest from
the *repo root* so that `from src.dataset.problem_X.buggy import ...`
resolves correctly via PYTHONPATH, then restore the original file.
PYTHONPATH is explicitly set to repo_root so that `src.dataset.*` imports
always resolve, regardless of what the parent process inherited.
"""
workspace_path = Path(workspace_dir)
test_path = Path(test_file)
buggy_py = workspace_path / "buggy.py"
# Detect the repo root walk upward until we find pyproject.toml or src/
repo_root = workspace_path
for _ in range(8):
if (repo_root / "pyproject.toml").exists() or (repo_root / "src").is_dir():
break
repo_root = repo_root.parent
# Build a hermetic subprocess environment. We intentionally avoid inheriting
# Python import path state from parent shells/venvs to prevent dual-site-packages
# import collisions (e.g. ImportPathMismatchError on conftest).
subprocess_env = os.environ.copy()
repo_root_str = str(repo_root)
subprocess_env["PYTHONPATH"] = repo_root_str
subprocess_env["PYTHONNOUSERSITE"] = "1"
subprocess_env["PYTHONDONTWRITEBYTECODE"] = "1"
subprocess_env.pop("PYTHONHOME", None)
# Back up original buggy.py and overwrite with patched code
original_code: Optional[str] = None
if buggy_py.exists():
try:
original_code = buggy_py.read_text(encoding="utf-8")
buggy_py.write_text(code_file, encoding="utf-8")
except Exception as e:
return False, f"Failed to patch buggy.py in-place: {e}"
report_file = workspace_path / PYTEST_REPORT_FILENAME
# Import conftest first to set up src.dataset alias before running pytest
# This ensures imports like "from src.dataset.problem_X.buggy import ..." work
conftest_import_cmd = [
"python", "-c",
f"import sys; sys.path.insert(0, r'{repo_root_str}'); import conftest"
]
try:
subprocess.run(conftest_import_cmd, capture_output=True, timeout=10, cwd=str(repo_root))
except Exception:
pass # Continue even if pre-import fails
cmd = [
"python", "-m", "pytest",
str(test_path), # absolute path to test.py
"--tb=short",
f"--timeout={TIMEOUT_SECONDS - 5}",
"--json-report",
f"--json-report-file={report_file}",
f"--rootdir={repo_root_str}", # ensures conftest.py at repo root is loaded
"-v",
]
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=TIMEOUT_SECONDS,
cwd=str(repo_root), # repo root so sys.path starts there
env=subprocess_env, # explicit PYTHONPATH repo root guaranteed
)
passed = result.returncode == 0
logs = result.stdout + ("\n" + result.stderr if result.stderr else "")
# Parse actual pass/fail counts from pytest output
passed_count, total_count = _parse_pytest_counts(logs)
# If we have counts, prepend them to logs for the environment to parse
if passed_count is not None and total_count is not None:
logs = f"[TEST_COUNTS] passed={passed_count} total={total_count}\n" + logs
return passed, logs.strip() or "(no output)"
except subprocess.TimeoutExpired:
return False, "CRITICAL ERROR: Execution timed out."
except Exception as e:
return False, f"Direct execution failed: {e}"
finally:
# Always restore the original buggy.py
if original_code is not None:
try:
buggy_py.write_text(original_code, encoding="utf-8")
except Exception:
pass
def get_error_logs(workspace_dir: str) -> Dict[str, object]:
"""
Read and parse the stderr / error logs produced by the last pytest run
inside the Docker sandbox.
Strategy (in priority order):
1. Parse the JSON report written by pytest-json-report plugin
(`<workspace_dir>/.pytest_error_report.json`).
2. Fall back to the plain-text `pytest_stderr.log` if present.
3. Return a structured error dict if neither file exists.
Args:
workspace_dir: Same host directory passed to `run_test_file()`.
Returns:
A dict with the following keys:
{
"source": "json_report" | "log_file" | "none",
"summary": str, # human-readable one-liner
"failed_tests": List[str], # test node IDs that failed
"errors": List[str], # collected error / traceback strings
"warnings": List[str], # pytest warnings
"raw": str | None, # raw file content (for debugging)
}
"""
workspace = Path(workspace_dir)
json_report_path = workspace / PYTEST_REPORT_FILENAME
if json_report_path.exists():
try:
import json
report = json.loads(json_report_path.read_text(encoding="utf-8"))
return _parse_json_report(report, raw=json_report_path.read_text())
except Exception as e:
# Corrupted JSON fall through to log file
pass
log_candidates = [
workspace / "pytest_stderr.log",
workspace / "pytest.log",
workspace / ".pytest.log",
]
for log_path in log_candidates:
if log_path.exists():
raw = log_path.read_text(encoding="utf-8", errors="replace")
return _parse_plain_log(raw)
return {
"source": "none",
"summary": "No error log found. Ensure run_test_file() was called first.",
"failed_tests": [],
"errors": [],
"warnings": [],
"raw": None,
}
def _parse_json_report(report: dict, raw: str) -> Dict[str, object]:
"""Extract structured error info from a pytest-json-report dict."""
summary = report.get("summary", {})
tests = report.get("tests", [])
warnings = report.get("warnings", [])
failed_tests: List[str] = []
errors: List[str] = []
for test in tests:
if test.get("outcome") not in ("failed", "error"):
continue
node_id = test.get("nodeid", "<unknown>")
failed_tests.append(node_id)
# Collect longrepr (the actual traceback string)
call = test.get("call") or test.get("setup") or {}
longrepr = call.get("longrepr", "")
if longrepr:
errors.append(f"--- {node_id} ---\n{longrepr}")
total = summary.get("total", 0)
failed = summary.get("failed", 0) + summary.get("error", 0)
passed = summary.get("passed", 0)
summary_line = f"{passed}/{total} passed, {failed} failed"
warning_messages = [w.get("message", str(w)) for w in warnings]
return {
"source": "json_report",
"summary": summary_line,
"failed_tests": failed_tests,
"errors": errors,
"warnings": warning_messages,
"raw": raw,
}
def _parse_plain_log(raw: str) -> Dict[str, object]:
"""
Best-effort extraction from a plain pytest stdout/stderr log.
Looks for the standard FAILED / ERROR lines and short tracebacks.
"""
lines = raw.splitlines()
failed_tests: List[str] = []
errors: List[str] = []
warnings: List[str] = []
summary_line = ""
# Regex patterns for pytest plain output
FAILED_RE = re.compile(r"^FAILED\s+(.+)")
ERROR_RE = re.compile(r"^ERROR\s+(.+)")
WARNING_RE = re.compile(r"PytestWarning|DeprecationWarning|UserWarning")
SUMMARY_RE = re.compile(r"=+\s+([\d\w ,]+)\s+=+$")
current_traceback: List[str] = []
in_traceback = False
for line in lines:
# Detect start of a failure block
if line.startswith("_ ") or line.startswith("E "):
in_traceback = True
if in_traceback:
current_traceback.append(line)
# End of traceback block (short separator)
if line.startswith("_ ") and current_traceback:
errors.append("\n".join(current_traceback))
current_traceback = []
in_traceback = False
m = FAILED_RE.match(line)
if m:
failed_tests.append(m.group(1).strip())
m = ERROR_RE.match(line)
if m:
failed_tests.append(m.group(1).strip())
if WARNING_RE.search(line):
warnings.append(line.strip())
m = SUMMARY_RE.match(line)
if m:
summary_line = m.group(1).strip()
# Flush any dangling traceback
if current_traceback:
errors.append("\n".join(current_traceback))
return {
"source": "log_file",
"summary": summary_line or f"{len(failed_tests)} test(s) failed",
"failed_tests": failed_tests,
"errors": errors,
"warnings": warnings,
"raw": raw,
}