llm-ready-data / app /services /code_executor_service.py
Soumik-404's picture
feat: oom
4b54fab
Raw
History Blame Contribute Delete
11 kB
from __future__ import annotations
import asyncio
import logging
import os
import re
import shutil
import signal
import subprocess
import tempfile
import time
from pathlib import Path
from typing import Optional
from app.config import get_settings
_logger = logging.getLogger(__name__)
_APP_DIR = Path(__file__).resolve().parent
_settings = get_settings()
class CodeSanitizer:
_BLOCKED_PATTERNS: dict[str, list[tuple[str, str]]] = {
"python": [
(r"import\s+subprocess\b", "subprocess not allowed"),
(r"from\s+subprocess\b", "subprocess not allowed"),
(r"import\s+ctypes\b", "ctypes not allowed"),
(r"from\s+ctypes\b", "ctypes not allowed"),
(r"import\s+os\b", "os not allowed"),
(r"from\s+os\b", "os not allowed"),
(r"import\s+sys\b", "sys not allowed"),
(r"from\s+sys\b", "sys not allowed"),
(r"import\s+socket\b", "socket not allowed"),
(r"from\s+socket\b", "socket not allowed"),
(r"import\s+builtins\b", "builtins not allowed"),
(r"from\s+builtins\b", "builtins not allowed"),
(r"import\s+signal\b", "signal not allowed"),
(r"from\s+signal\b", "signal not allowed"),
(r"import\s+shutil\b", "shutil not allowed"),
(r"from\s+shutil\b", "shutil not allowed"),
(r"__import__\s*\(", "__import__() not allowed"),
(r"exec\s*\(", "exec() not allowed"),
(r"eval\s*\(", "eval() not allowed"),
(r"compile\s*\(", "compile() not allowed"),
(r"\.__subclasses__\s*\(\)", "subclass escape not allowed"),
(r"\.__bases__", "__bases__ not allowed"),
(r"\.__mro__", "__mro__ not allowed"),
(r"\.__globals__", "__globals__ not allowed"),
(r"\.__code__", "__code__ not allowed"),
(r"\.__closure__", "__closure__ not allowed"),
(r"\.__dict__", "__dict__ not allowed"),
(r"\.__builtins__", "__builtins__ not allowed"),
(r"\.__class__", "__class__ not allowed"),
],
"javascript": [
(r"require\s*\(", "require() not allowed"),
(r"process\s*\.", "process not allowed"),
(r"__dirname", "__dirname not allowed"),
(r"__filename", "__filename not allowed"),
(r"global\s*\.", "global not allowed"),
(r"globalThis\s*\.", "globalThis not allowed"),
(r"eval\s*\(", "eval() not allowed"),
(r"Function\s*\(", "Function() constructor not allowed"),
(r"child_process", "child_process not allowed"),
(r"fs\s*\.", "fs not allowed"),
(r"net\s*\.", "net not allowed"),
(r"http\s*\.", "http not allowed"),
(r"https\s*\.", "https not allowed"),
(r"worker_threads", "worker_threads not allowed"),
(r"Buffer\s*\.", "Buffer not allowed"),
],
# "java": [ # DISABLED (OOM mitigation)
# (r"ProcessBuilder", "ProcessBuilder not allowed"),
# (r"Runtime\.exec", "Runtime.exec() not allowed"),
# (r"Runtime\.getRuntime\s*\(\s*\)", "Runtime.getRuntime() not allowed"),
# (r"System\.exit\s*\(", "System.exit() not allowed"),
# (r"System\.gc\s*\(", "System.gc() not allowed"),
# (r"File\s*\(", "File operations not allowed"),
# (r"FileInputStream", "FileInputStream not allowed"),
# (r"FileOutputStream", "FileOutputStream not allowed"),
# (r"FileReader", "FileReader not allowed"),
# (r"FileWriter", "FileWriter not allowed"),
# (r"Socket\s*\(", "Socket not allowed"),
# (r"ServerSocket\s*\(", "ServerSocket not allowed"),
# (r"URL\s*\(", "URL not allowed"),
# (r"Class\.forName", "Class.forName() not allowed"),
# (r"Thread\s*\(", "Thread not allowed"),
# (r"ThreadPoolExecutor", "ThreadPoolExecutor not allowed"),
# (r"Runtime\.", "Runtime not allowed"),
# (r"System\.getProperty", "System.getProperty not allowed"),
# (r"System\.getenv", "System.getenv not allowed"),
# ],
}
@classmethod
def sanitize(cls, code: str, language: str) -> tuple[bool, Optional[str]]:
patterns = cls._BLOCKED_PATTERNS.get(language, [])
for pattern, message in patterns:
if re.search(pattern, code):
return False, f"Forbidden: {message}"
return True, None
# @classmethod # DISABLED (OOM mitigation)
# def validate_java_class(cls, code: str) -> tuple[bool, Optional[str]]:
# if not re.search(r"public\s+class\s+Main\s*\{", code):
# return False, "Java code must have 'public class Main' with 'public static void main(String[] args)'"
# return True, None
class CodeExecutorService:
def __init__(self) -> None:
self._max_execution_time = _settings.code_exec_max_time
self._max_output_bytes = _settings.code_exec_max_output
self._max_memory_mb = _settings.code_exec_max_memory
self._semaphore = asyncio.Semaphore(_settings.code_exec_max_concurrent)
self._workdir = Path(_settings.code_exec_workdir)
self._workdir.mkdir(parents=True, exist_ok=True)
async def execute(
self,
code: str,
language: str,
timeout: Optional[int] = None,
) -> dict:
sanitized, err = CodeSanitizer.sanitize(code, language)
if not sanitized:
return {
"success": False, "output": "", "error": err,
"exit_code": None, "execution_time_ms": None,
"language": language, "timed_out": False,
}
# if language == "java": # DISABLED (OOM mitigation)
# valid, err = CodeSanitizer.validate_java_class(code)
# if not valid:
# return {
# "success": False, "output": "", "error": err,
# "exit_code": None, "execution_time_ms": None,
# "language": language, "timed_out": False,
# }
exec_timeout = min(timeout or self._max_execution_time, self._max_execution_time)
async with self._semaphore:
run_dir = None
start_time = time.monotonic()
try:
run_dir = Path(tempfile.mkdtemp(dir=self._workdir))
filename = self._filename_for(language)
src_path = run_dir / filename
src_path.write_text(code, encoding="utf-8")
cmd = self._build_command(language, run_dir, filename)
result = await asyncio.to_thread(
self._run_subprocess, cmd, exec_timeout,
)
elapsed_ms = (time.monotonic() - start_time) * 1000
if result["timed_out"]:
return {
"success": False,
"output": result["stdout"],
"error": f"Execution timed out after {exec_timeout}s",
"exit_code": result["exit_code"],
"execution_time_ms": round(elapsed_ms, 2),
"language": language,
"timed_out": True,
}
return {
"success": result["exit_code"] == 0 if result["exit_code"] is not None else False,
"output": result["stdout"],
"error": result["stderr"] or None,
"exit_code": result["exit_code"],
"execution_time_ms": round(elapsed_ms, 2),
"language": language,
"timed_out": False,
}
except FileNotFoundError as exc:
_logger.error("Runtime not found: %s", exc)
return {
"success": False, "output": "", "error": f"Runtime not found: {exc.filename}",
"execution_time_ms": None, "language": language, "timed_out": False,
}
except Exception as exc:
_logger.exception("Executor error")
return {
"success": False, "output": "", "error": f"Internal error: {exc}",
"execution_time_ms": None, "language": language, "timed_out": False,
}
finally:
if run_dir and run_dir.exists():
try:
shutil.rmtree(run_dir, ignore_errors=True)
except Exception:
pass
def _run_subprocess(
self,
cmd: list[str],
timeout: float,
) -> dict:
proc = subprocess.Popen(
cmd,
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
try:
stdout_bytes, stderr_bytes = proc.communicate(timeout=timeout)
timed_out = False
except subprocess.TimeoutExpired:
try:
if os.name == "nt":
proc.kill()
else:
os.killpg(os.getpgid(proc.pid), signal.SIGKILL)
except Exception:
proc.kill()
stdout_bytes, stderr_bytes = proc.communicate()
timed_out = True
max_out = self._max_output_bytes
return {
"stdout": (stdout_bytes.decode("utf-8", errors="replace")[:max_out] if stdout_bytes else ""),
"stderr": (stderr_bytes.decode("utf-8", errors="replace")[:max_out] if stderr_bytes else ""),
"exit_code": proc.returncode,
"timed_out": timed_out,
}
async def check_runtimes(self) -> dict[str, str]:
status = {}
for lang, runtime in [("python", "python3"), ("javascript", "node")]: # "java" DISABLED (OOM mitigation)
path = shutil.which(runtime)
status[lang] = f"found at {path}" if path else "missing"
return status
@staticmethod
def _filename_for(language: str) -> str:
return {"python": "code.py", "javascript": "code.js" # "java": "Main.java" DISABLED (OOM mitigation)
}[language]
@staticmethod
def _build_command(language: str, run_dir: Path, filename: str) -> list[str]:
sandbox_py = _APP_DIR / "py_sandbox.py"
sandbox_js = _APP_DIR / "js_sandbox.js"
if language == "python":
return ["python3", str(sandbox_py), str(run_dir / filename)]
elif language == "javascript":
return ["node", str(sandbox_js), str(run_dir / filename)]
# elif language == "java": # DISABLED (OOM mitigation)
# return ["sh", "-c",
# f"cd {run_dir} && javac Main.java 2>&1 && java -XX:CompressedClassSpaceSize=64m -Xmx96m Main 2>&1"]
return []