from __future__ import annotations import json import math import os import signal import subprocess import sys import tempfile from dataclasses import dataclass DEFAULT_ADDRESS_SPACE_BYTES = 512 * 1024 * 1024 DEFAULT_FILE_BYTES = 2 * 1024 * 1024 DEFAULT_OPEN_FILES = 64 DEFAULT_PROCESSES = 32 _RUNNER_BOOTSTRAP = r""" import json import runpy import sys try: import resource except ImportError: # pragma: no cover resource = None def _set_limit(name, value): if resource is None or not hasattr(resource, name): return limit = int(value) try: _, current_hard = resource.getrlimit(getattr(resource, name)) soft = min(limit, current_hard) if current_hard >= 0 else limit resource.setrlimit(getattr(resource, name), (soft, current_hard)) except (OSError, ValueError): return config = json.loads(sys.argv[1]) _set_limit("RLIMIT_CORE", 0) _set_limit("RLIMIT_CPU", config["cpu_seconds"]) _set_limit("RLIMIT_FSIZE", config["file_bytes"]) _set_limit("RLIMIT_NOFILE", config["open_files"]) _set_limit("RLIMIT_AS", config["address_space_bytes"]) _set_limit("RLIMIT_NPROC", config["processes"]) mode = config["mode"] if mode == "script": script = sys.argv[2] sys.argv = sys.argv[2:] runpy.run_path(script, run_name="__main__") elif mode == "code": sys.argv = ["-c"] exec(config["code"], {"__name__": "__main__"}) else: # pragma: no cover raise SystemExit(f"Unsupported execution mode: {mode}") """ @dataclass(frozen=True) class PythonRunResult: returncode: int stdout: str stderr: str timed_out: bool = False def _safe_env(workspace_dir: str) -> dict[str, str]: return { "HOME": workspace_dir, "TMPDIR": workspace_dir, "LANG": "C.UTF-8", "LC_ALL": "C.UTF-8", "PATH": "", "PYTHONDONTWRITEBYTECODE": "1", "PYTHONHASHSEED": "0", "PYTHONIOENCODING": "utf-8", "PYTHONNOUSERSITE": "1", } def _limit_config(timeout_s: float) -> dict[str, int]: return { "cpu_seconds": max(1, int(math.ceil(timeout_s)) + 1), "file_bytes": DEFAULT_FILE_BYTES, "open_files": DEFAULT_OPEN_FILES, "address_space_bytes": DEFAULT_ADDRESS_SPACE_BYTES, "processes": DEFAULT_PROCESSES, } def _read_limited_text(handle, limit: int) -> str: handle.seek(0) data = handle.read(limit + 1) if isinstance(data, bytes): return data.decode("utf-8", errors="replace")[:limit] return str(data)[:limit] def _terminate_process(proc: subprocess.Popen[bytes]) -> None: if proc.poll() is not None: return if os.name != "nt": try: os.killpg(proc.pid, signal.SIGKILL) return except ProcessLookupError: return proc.kill() def _run_python_command( config: dict[str, object], *, cwd: str, argv: list[str], timeout_s: float, stdout_limit: int, stderr_limit: int, ) -> PythonRunResult: command = [ sys.executable, "-I", "-B", "-c", _RUNNER_BOOTSTRAP, json.dumps(config, ensure_ascii=True), *argv, ] start_new_session = os.name != "nt" with tempfile.TemporaryFile() as stdout_file, tempfile.TemporaryFile() as stderr_file: proc = subprocess.Popen( command, cwd=cwd, env=_safe_env(cwd), stdin=subprocess.DEVNULL, stdout=stdout_file, stderr=stderr_file, start_new_session=start_new_session, ) timed_out = False try: proc.wait(timeout=timeout_s) except subprocess.TimeoutExpired: timed_out = True _terminate_process(proc) proc.wait() return PythonRunResult( returncode=proc.returncode if proc.returncode is not None else -1, stdout=_read_limited_text(stdout_file, stdout_limit), stderr=_read_limited_text(stderr_file, stderr_limit), timed_out=timed_out, ) def run_python_script( script_name: str, *, cwd: str, args: list[str], timeout_s: float, stdout_limit: int, stderr_limit: int, ) -> PythonRunResult: config = {"mode": "script", **_limit_config(timeout_s)} return _run_python_command( config, cwd=cwd, argv=[script_name, *args], timeout_s=timeout_s, stdout_limit=stdout_limit, stderr_limit=stderr_limit, ) def run_python_code( code: str, *, cwd: str, timeout_s: float, stdout_limit: int, stderr_limit: int, ) -> PythonRunResult: config = {"mode": "code", "code": code, **_limit_config(timeout_s)} return _run_python_command( config, cwd=cwd, argv=[], timeout_s=timeout_s, stdout_limit=stdout_limit, stderr_limit=stderr_limit, )