""" Oracle Executor — Docker-backed sandbox for untrusted code execution. Supports Python and C++ submissions, enforces time and memory limits, and returns both CodeCourt-native status fields and hackathon-facing outcome data. """ from __future__ import annotations import io import os import platform import resource import tarfile import tempfile import time import subprocess from dataclasses import dataclass from typing import Optional try: import docker from docker.errors import DockerException except ImportError: # pragma: no cover - handled at runtime when dependency is missing docker = None DockerException = Exception LANGUAGE_CONFIG = { "python": { "image": "python:3.11-alpine", "source_name": "main.py", "run_cmd": ["sh", "-lc", "python3 /workspace/main.py < /workspace/stdin.txt"], "compile_cmd": None, }, "cpp": { "image": "gcc:13", "source_name": "main.cpp", "compile_cmd": ["sh", "-lc", "g++ -O2 -std=c++17 /workspace/main.cpp -o /workspace/main"], "run_cmd": ["sh", "-lc", "/workspace/main < /workspace/stdin.txt"], }, } @dataclass class ExecutionResult: status: str # 'pass' | 'fail' | 'tle' | 'mle' | 'error' stdout: str stderr: str timed_out: bool memory_exceeded: bool execution_time: float expected_output: Optional[str] = None memory_used_mb: float = 0.0 language: str = "python" outcome: str = "setter_wins" # 'solver_wins' | 'setter_wins' | 'compile_error' | 'time_limit' compile_error: bool = False @property def passed(self) -> bool: return self.status == "pass" class OracleExecutor: """ Secure Docker sandbox for executing untrusted code. Each execution happens in an isolated container with: - network disabled - memory cap - CPU quota - no-new-privileges - read-only root filesystem """ def __init__( self, time_limit: float = 2.0, memory_limit_mb: int = 256, default_language: str = "python", ): self.time_limit = time_limit self.memory_limit_mb = memory_limit_mb self.default_language = default_language self._client = None self._docker_checked = False self._docker_available = False def _get_client(self): if docker is None: return None if self._docker_checked: return self._client if self._docker_available else None self._docker_checked = True if self._client is None: client = None try: client = docker.from_env() client.ping() self._client = client self._docker_available = True except DockerException: try: client.close() except Exception: pass self._client = None self._docker_available = False return self._client def _validate_language(self, language: str) -> dict: if language not in LANGUAGE_CONFIG: raise ValueError(f"Unsupported language: {language}. Expected one of {sorted(LANGUAGE_CONFIG)}") return LANGUAGE_CONFIG[language] def _create_workspace_archive(self, files: dict[str, str]) -> bytes: buffer = io.BytesIO() with tarfile.open(fileobj=buffer, mode="w") as tar: for filename, content in files.items(): encoded = content.encode("utf-8") info = tarfile.TarInfo(name=filename) info.size = len(encoded) info.mode = 0o644 tar.addfile(info, io.BytesIO(encoded)) buffer.seek(0) return buffer.read() def _read_peak_memory_mb(self, container) -> float: try: stats = container.stats(stream=False) except DockerException: return 0.0 memory_usage = 0 if isinstance(stats, dict): memory_usage = ( stats.get("memory_stats", {}).get("max_usage") or stats.get("memory_stats", {}).get("usage") or 0 ) return round(memory_usage / (1024 * 1024), 3) def _status_from_exit(self, exit_code: int, expected_output: Optional[str], stdout: str, stderr: str) -> tuple[str, str]: if exit_code == 137: return "mle", "setter_wins" if exit_code != 0: compile_like = "syntaxerror" in stderr.lower() or "traceback" in stderr.lower() return ("error", "compile_error" if compile_like else "setter_wins") if expected_output is None: return "pass", "solver_wins" if stdout.strip() == expected_output.strip(): return "pass", "solver_wins" return "fail", "setter_wins" def _set_local_limits(self): if platform.system() == "Darwin": return mem_bytes = self.memory_limit_mb * 1024 * 1024 resource.setrlimit(resource.RLIMIT_AS, (mem_bytes, mem_bytes)) resource.setrlimit(resource.RLIMIT_CPU, (int(self.time_limit) + 1, int(self.time_limit) + 1)) def _run_local(self, command: list[str], files: dict[str, str]) -> tuple[int, str, str, float, float, bool]: start = time.time() with tempfile.TemporaryDirectory() as tmpdir: for filename, content in files.items(): with open(os.path.join(tmpdir, filename), "w", encoding="utf-8") as f: f.write(content) cmd_str = command[2].replace("/workspace", tmpdir) try: res = subprocess.run( cmd_str, shell=True, cwd=tmpdir, capture_output=True, timeout=max(self.time_limit + 0.5, 1.0), text=True, preexec_fn=None if platform.system() == "Darwin" else self._set_local_limits, ) elapsed = time.time() - start stderr = res.stderr.strip() if res.returncode != 0 and not stderr: stderr = f"Exit code: {res.returncode}" return res.returncode, res.stdout.strip(), stderr, elapsed, 10.0, False except subprocess.TimeoutExpired as e: elapsed = time.time() - start stdout = e.stdout if isinstance(e.stdout, str) else (e.stdout.decode() if e.stdout else "") stderr = e.stderr if isinstance(e.stderr, str) else (e.stderr.decode() if e.stderr else "") return 124, stdout.strip(), stderr.strip(), elapsed, 10.0, True def _run_container(self, image: str, command: list[str], files: dict[str, str]) -> tuple[int, str, str, float, float, bool]: if "SPACE_ID" in os.environ or self._get_client() is None: return self._run_local(command, files) client = self._get_client() container = None start = time.time() try: container = client.containers.create( image=image, command=command, working_dir="/workspace", mem_limit=f"{self.memory_limit_mb}m", network_disabled=True, read_only=True, nano_cpus=1_000_000_000, security_opt=["no-new-privileges:true"], cap_drop=["ALL"], pids_limit=128, detach=True, stdin_open=False, tty=False, ) container.put_archive("/workspace", self._create_workspace_archive(files)) container.start() timed_out = False try: result = container.wait(timeout=max(self.time_limit + 0.5, 1.0)) except Exception: timed_out = True container.kill() result = {"StatusCode": 124} stdout = container.logs(stdout=True, stderr=False).decode("utf-8", errors="replace").strip() stderr = container.logs(stdout=False, stderr=True).decode("utf-8", errors="replace").strip() elapsed = time.time() - start peak_memory_mb = self._read_peak_memory_mb(container) return int(result.get("StatusCode", 1)), stdout, stderr, elapsed, peak_memory_mb, timed_out finally: if container is not None: try: container.remove(force=True) except DockerException: pass def run( self, code: str, stdin_input: str, expected_output: Optional[str] = None, language: Optional[str] = None, ) -> ExecutionResult: """ Execute source code in Docker with strict sandboxing. """ chosen_language = language or self.default_language try: config = self._validate_language(chosen_language) except ValueError as exc: return ExecutionResult( status="error", stdout="", stderr=str(exc), timed_out=False, memory_exceeded=False, execution_time=0.0, expected_output=expected_output, language=chosen_language, outcome="compile_error", compile_error=True, ) source_name = config["source_name"] files = { source_name: code, "stdin.txt": stdin_input, } try: if config["compile_cmd"] is not None: compile_exit, _, compile_stderr, compile_time, compile_memory_mb, compile_timed_out = self._run_container( config["image"], config["compile_cmd"], files, ) if compile_timed_out: return ExecutionResult( status="tle", stdout="", stderr="Compilation timed out", timed_out=True, memory_exceeded=False, execution_time=compile_time, expected_output=expected_output, memory_used_mb=compile_memory_mb, language=chosen_language, outcome="time_limit", ) if compile_exit != 0: return ExecutionResult( status="error", stdout="", stderr=compile_stderr or f"Compilation failed with exit code {compile_exit}", timed_out=False, memory_exceeded=compile_exit == 137, execution_time=compile_time, expected_output=expected_output, memory_used_mb=compile_memory_mb, language=chosen_language, outcome="compile_error", compile_error=True, ) exit_code, stdout, stderr, elapsed, peak_memory_mb, timed_out = self._run_container( config["image"], config["run_cmd"], files, ) if timed_out or elapsed > self.time_limit: return ExecutionResult( status="tle", stdout=stdout, stderr=stderr or "Time Limit Exceeded", timed_out=True, memory_exceeded=False, execution_time=elapsed, expected_output=expected_output, memory_used_mb=peak_memory_mb, language=chosen_language, outcome="time_limit", ) status, outcome = self._status_from_exit(exit_code, expected_output, stdout, stderr) return ExecutionResult( status=status, stdout=stdout, stderr=stderr, timed_out=False, memory_exceeded=status == "mle", execution_time=elapsed, expected_output=expected_output, memory_used_mb=peak_memory_mb, language=chosen_language, outcome=outcome, compile_error=(outcome == "compile_error"), ) except RuntimeError as exc: return ExecutionResult( status="error", stdout="", stderr=str(exc), timed_out=False, memory_exceeded=False, execution_time=0.0, expected_output=expected_output, memory_used_mb=0.0, language=chosen_language, outcome="compile_error", compile_error=True, ) except DockerException as exc: return ExecutionResult( status="error", stdout="", stderr=f"Docker execution failed: {exc}", timed_out=False, memory_exceeded=False, execution_time=0.0, expected_output=expected_output, memory_used_mb=0.0, language=chosen_language, outcome="compile_error", compile_error=True, ) def run_against_tests( self, code: str, test_cases: list, language: Optional[str] = None, ) -> dict: """ Run code against multiple test cases. Returns both existing CodeCourt keys and hackathon-facing aggregate fields: - overall_status - pass_rate - avg_time - avg_memory_mb - outcome """ results = [] passed = 0 for i, tc in enumerate(test_cases): result = self.run( code=code, stdin_input=tc["input"], expected_output=tc.get("expected"), language=language, ) results.append( { "test_id": i + 1, "status": result.status, "passed": result.passed, "execution_time": result.execution_time, "memory_used_mb": result.memory_used_mb, "stdout": result.stdout, "stderr": result.stderr, "outcome": result.outcome, "language": result.language, } ) if result.passed: passed += 1 if passed == len(test_cases): overall_status = "pass" outcome = "solver_wins" elif any(item["status"] == "tle" for item in results): overall_status = "tle" outcome = "time_limit" elif any(item["outcome"] == "compile_error" for item in results): overall_status = "error" outcome = "compile_error" elif any(item["status"] == "mle" for item in results): overall_status = "mle" outcome = "setter_wins" else: overall_status = "fail" outcome = "setter_wins" total = len(test_cases) return { "overall_status": overall_status, "outcome": outcome, "passed": passed, "total": total, "pass_rate": passed / max(total, 1), "results": results, "avg_time": sum(item["execution_time"] for item in results) / max(total, 1), "avg_memory_mb": sum(item["memory_used_mb"] for item in results) / max(total, 1), }