from __future__ import annotations import asyncio import logging import os import re import shutil import signal import subprocess import tempfile import time from pathlib import Path from typing import Optional from app.config import get_settings _logger = logging.getLogger(__name__) _APP_DIR = Path(__file__).resolve().parent _settings = get_settings() class CodeSanitizer: _BLOCKED_PATTERNS: dict[str, list[tuple[str, str]]] = { "python": [ (r"import\s+subprocess\b", "subprocess not allowed"), (r"from\s+subprocess\b", "subprocess not allowed"), (r"import\s+ctypes\b", "ctypes not allowed"), (r"from\s+ctypes\b", "ctypes not allowed"), (r"import\s+os\b", "os not allowed"), (r"from\s+os\b", "os not allowed"), (r"import\s+sys\b", "sys not allowed"), (r"from\s+sys\b", "sys not allowed"), (r"import\s+socket\b", "socket not allowed"), (r"from\s+socket\b", "socket not allowed"), (r"import\s+builtins\b", "builtins not allowed"), (r"from\s+builtins\b", "builtins not allowed"), (r"import\s+signal\b", "signal not allowed"), (r"from\s+signal\b", "signal not allowed"), (r"import\s+shutil\b", "shutil not allowed"), (r"from\s+shutil\b", "shutil not allowed"), (r"__import__\s*\(", "__import__() not allowed"), (r"exec\s*\(", "exec() not allowed"), (r"eval\s*\(", "eval() not allowed"), (r"compile\s*\(", "compile() not allowed"), (r"\.__subclasses__\s*\(\)", "subclass escape not allowed"), (r"\.__bases__", "__bases__ not allowed"), (r"\.__mro__", "__mro__ not allowed"), (r"\.__globals__", "__globals__ not allowed"), (r"\.__code__", "__code__ not allowed"), (r"\.__closure__", "__closure__ not allowed"), (r"\.__dict__", "__dict__ not allowed"), (r"\.__builtins__", "__builtins__ not allowed"), (r"\.__class__", "__class__ not allowed"), ], "javascript": [ (r"require\s*\(", "require() not allowed"), (r"process\s*\.", "process not allowed"), (r"__dirname", "__dirname not allowed"), (r"__filename", "__filename not allowed"), (r"global\s*\.", "global not allowed"), (r"globalThis\s*\.", "globalThis not allowed"), (r"eval\s*\(", "eval() not allowed"), (r"Function\s*\(", "Function() constructor not allowed"), (r"child_process", "child_process not allowed"), (r"fs\s*\.", "fs not allowed"), (r"net\s*\.", "net not allowed"), (r"http\s*\.", "http not allowed"), (r"https\s*\.", "https not allowed"), (r"worker_threads", "worker_threads not allowed"), (r"Buffer\s*\.", "Buffer not allowed"), ], # "java": [ # DISABLED (OOM mitigation) # (r"ProcessBuilder", "ProcessBuilder not allowed"), # (r"Runtime\.exec", "Runtime.exec() not allowed"), # (r"Runtime\.getRuntime\s*\(\s*\)", "Runtime.getRuntime() not allowed"), # (r"System\.exit\s*\(", "System.exit() not allowed"), # (r"System\.gc\s*\(", "System.gc() not allowed"), # (r"File\s*\(", "File operations not allowed"), # (r"FileInputStream", "FileInputStream not allowed"), # (r"FileOutputStream", "FileOutputStream not allowed"), # (r"FileReader", "FileReader not allowed"), # (r"FileWriter", "FileWriter not allowed"), # (r"Socket\s*\(", "Socket not allowed"), # (r"ServerSocket\s*\(", "ServerSocket not allowed"), # (r"URL\s*\(", "URL not allowed"), # (r"Class\.forName", "Class.forName() not allowed"), # (r"Thread\s*\(", "Thread not allowed"), # (r"ThreadPoolExecutor", "ThreadPoolExecutor not allowed"), # (r"Runtime\.", "Runtime not allowed"), # (r"System\.getProperty", "System.getProperty not allowed"), # (r"System\.getenv", "System.getenv not allowed"), # ], } @classmethod def sanitize(cls, code: str, language: str) -> tuple[bool, Optional[str]]: patterns = cls._BLOCKED_PATTERNS.get(language, []) for pattern, message in patterns: if re.search(pattern, code): return False, f"Forbidden: {message}" return True, None # @classmethod # DISABLED (OOM mitigation) # def validate_java_class(cls, code: str) -> tuple[bool, Optional[str]]: # if not re.search(r"public\s+class\s+Main\s*\{", code): # return False, "Java code must have 'public class Main' with 'public static void main(String[] args)'" # return True, None class CodeExecutorService: def __init__(self) -> None: self._max_execution_time = _settings.code_exec_max_time self._max_output_bytes = _settings.code_exec_max_output self._max_memory_mb = _settings.code_exec_max_memory self._semaphore = asyncio.Semaphore(_settings.code_exec_max_concurrent) self._workdir = Path(_settings.code_exec_workdir) self._workdir.mkdir(parents=True, exist_ok=True) async def execute( self, code: str, language: str, timeout: Optional[int] = None, ) -> dict: sanitized, err = CodeSanitizer.sanitize(code, language) if not sanitized: return { "success": False, "output": "", "error": err, "exit_code": None, "execution_time_ms": None, "language": language, "timed_out": False, } # if language == "java": # DISABLED (OOM mitigation) # valid, err = CodeSanitizer.validate_java_class(code) # if not valid: # return { # "success": False, "output": "", "error": err, # "exit_code": None, "execution_time_ms": None, # "language": language, "timed_out": False, # } exec_timeout = min(timeout or self._max_execution_time, self._max_execution_time) async with self._semaphore: run_dir = None start_time = time.monotonic() try: run_dir = Path(tempfile.mkdtemp(dir=self._workdir)) filename = self._filename_for(language) src_path = run_dir / filename src_path.write_text(code, encoding="utf-8") cmd = self._build_command(language, run_dir, filename) result = await asyncio.to_thread( self._run_subprocess, cmd, exec_timeout, ) elapsed_ms = (time.monotonic() - start_time) * 1000 if result["timed_out"]: return { "success": False, "output": result["stdout"], "error": f"Execution timed out after {exec_timeout}s", "exit_code": result["exit_code"], "execution_time_ms": round(elapsed_ms, 2), "language": language, "timed_out": True, } return { "success": result["exit_code"] == 0 if result["exit_code"] is not None else False, "output": result["stdout"], "error": result["stderr"] or None, "exit_code": result["exit_code"], "execution_time_ms": round(elapsed_ms, 2), "language": language, "timed_out": False, } except FileNotFoundError as exc: _logger.error("Runtime not found: %s", exc) return { "success": False, "output": "", "error": f"Runtime not found: {exc.filename}", "execution_time_ms": None, "language": language, "timed_out": False, } except Exception as exc: _logger.exception("Executor error") return { "success": False, "output": "", "error": f"Internal error: {exc}", "execution_time_ms": None, "language": language, "timed_out": False, } finally: if run_dir and run_dir.exists(): try: shutil.rmtree(run_dir, ignore_errors=True) except Exception: pass def _run_subprocess( self, cmd: list[str], timeout: float, ) -> dict: proc = subprocess.Popen( cmd, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) try: stdout_bytes, stderr_bytes = proc.communicate(timeout=timeout) timed_out = False except subprocess.TimeoutExpired: try: if os.name == "nt": proc.kill() else: os.killpg(os.getpgid(proc.pid), signal.SIGKILL) except Exception: proc.kill() stdout_bytes, stderr_bytes = proc.communicate() timed_out = True max_out = self._max_output_bytes return { "stdout": (stdout_bytes.decode("utf-8", errors="replace")[:max_out] if stdout_bytes else ""), "stderr": (stderr_bytes.decode("utf-8", errors="replace")[:max_out] if stderr_bytes else ""), "exit_code": proc.returncode, "timed_out": timed_out, } async def check_runtimes(self) -> dict[str, str]: status = {} for lang, runtime in [("python", "python3"), ("javascript", "node")]: # "java" DISABLED (OOM mitigation) path = shutil.which(runtime) status[lang] = f"found at {path}" if path else "missing" return status @staticmethod def _filename_for(language: str) -> str: return {"python": "code.py", "javascript": "code.js" # "java": "Main.java" DISABLED (OOM mitigation) }[language] @staticmethod def _build_command(language: str, run_dir: Path, filename: str) -> list[str]: sandbox_py = _APP_DIR / "py_sandbox.py" sandbox_js = _APP_DIR / "js_sandbox.js" if language == "python": return ["python3", str(sandbox_py), str(run_dir / filename)] elif language == "javascript": return ["node", str(sandbox_js), str(run_dir / filename)] # elif language == "java": # DISABLED (OOM mitigation) # return ["sh", "-c", # f"cd {run_dir} && javac Main.java 2>&1 && java -XX:CompressedClassSpaceSize=64m -Xmx96m Main 2>&1"] return []