"""Execution-based code evaluation helpers for coder benchmarks.""" from __future__ import annotations import math import os import re import shutil import sqlite3 import subprocess import tempfile from dataclasses import dataclass from pathlib import Path try: import resource except ImportError: # pragma: no cover - non-POSIX fallback resource = None # type: ignore[assignment] _CODE_BLOCK_RE = re.compile(r"```(?P[^\n`]*)\n(?P.*?)```", re.DOTALL) DEFAULT_EXECUTION_MEMORY_LIMIT_MB = 512 DEFAULT_EXECUTION_MAX_OUTPUT_CHARS = 12_000 @dataclass(frozen=True, slots=True) class CodeExecutionSpec: language: str test_code: str = "" timeout_seconds: float = 8.0 compile_only: bool = False memory_limit_mb: int = DEFAULT_EXECUTION_MEMORY_LIMIT_MB max_output_chars: int = DEFAULT_EXECUTION_MAX_OUTPUT_CHARS @dataclass(frozen=True, slots=True) class CodeExecutionResult: language: str available: bool passed: bool summary: str exit_code: int | None = None stdout: str = "" stderr: str = "" def extract_code_block(text: str, language: str | None = None) -> str: matches = list(_CODE_BLOCK_RE.finditer(text)) if not matches: return text.strip() normalized_language = (language or "").strip().lower() if normalized_language: for match in matches: fence_language = match.group("lang").strip().lower() if fence_language == normalized_language: return match.group("code").strip() return matches[0].group("code").strip() def evaluate_code_response(response_text: str, spec: CodeExecutionSpec) -> CodeExecutionResult: language = spec.language.strip().lower() code = extract_code_block(response_text, language=language) if not code: return CodeExecutionResult( language=language, available=True, passed=False, summary="Atbildē nav atrasts izpildāms koda bloks.", ) if language == "python": python_path = shutil.which("python3") or shutil.which("python") if python_path is None: return _unsupported_language_result(language, "python nav pieejams.") command = ( [python_path, "-I", "-B", "-s", "main.py"] if not spec.compile_only else [python_path, "-I", "-B", "-s", "-m", "py_compile", "main.py"] ) return _run_script_eval( language=language, command=command, file_name="main.py", source=_build_source(code, spec.test_code, "#"), spec=spec, ) if language in {"javascript", "js"}: node_path = shutil.which("node") if node_path is None: return _unsupported_language_result(language, "node nav pieejams.") command = ( [node_path, "main.js"] if not spec.compile_only else [node_path, "--check", "main.js"] ) return _run_script_eval( language=language, command=command, file_name="main.js", source=_build_source(code, spec.test_code, "//"), spec=spec, ) if language in {"typescript", "ts"}: return _run_typescript_eval(code, spec) if language in {"bash", "sh"}: bash_path = shutil.which("bash") if bash_path is None: return _unsupported_language_result(language, "bash nav pieejams.") command = [bash_path, "main.sh"] if not spec.compile_only else [bash_path, "-n", "main.sh"] return _run_script_eval( language=language, command=command, file_name="main.sh", source=_build_source(code, spec.test_code, "#"), spec=spec, ) if language == "rust": return _run_rust_eval(code, spec) if language == "sql": return _run_sql_eval(code, spec) return _unsupported_language_result( language, "Valoda execution evals režīmā vēl nav atbalstīta." ) def _build_source(code: str, test_code: str, comment_prefix: str) -> str: source = code.strip() tests = test_code.strip() if not tests: return source + "\n" return f"{source}\n\n{comment_prefix} execution harness\n{tests}\n" def _run_script_eval( *, language: str, command: list[str], file_name: str, source: str, spec: CodeExecutionSpec, ) -> CodeExecutionResult: with tempfile.TemporaryDirectory(prefix="maris-code-eval-") as tmp_dir: workspace = Path(tmp_dir) file_path = workspace / file_name file_path.write_text(source, encoding="utf-8") result = _run_command(command, cwd=workspace, spec=spec, language=language) if result is None: return CodeExecutionResult( language=language, available=True, passed=True, summary=f"{language} kods izpildījās veiksmīgi.", exit_code=0, ) return result def _run_typescript_eval(code: str, spec: CodeExecutionSpec) -> CodeExecutionResult: tsc_path = shutil.which("tsc") if tsc_path is None: return _unsupported_language_result("typescript", "tsc nav pieejams.") node_path = shutil.which("node") if not spec.compile_only and node_path is None: return _unsupported_language_result("typescript", "node nav pieejams TypeScript izpildei.") with tempfile.TemporaryDirectory(prefix="maris-code-eval-") as tmp_dir: workspace = Path(tmp_dir) source_path = workspace / "main.ts" source_path.write_text(_build_source(code, spec.test_code, "//"), encoding="utf-8") compile_result = _run_command( [ tsc_path, "--pretty", "false", "--target", "ES2020", "--module", "commonjs", "main.ts", ], cwd=workspace, spec=spec, language="typescript", ) if compile_result is not None: return compile_result if spec.compile_only: return CodeExecutionResult( language="typescript", available=True, passed=True, summary="TypeScript kods veiksmīgi sakompilējās.", exit_code=0, ) run_result = _run_command( [node_path, "main.js"], cwd=workspace, spec=spec, language="typescript" ) if run_result is None: return CodeExecutionResult( language="typescript", available=True, passed=True, summary="TypeScript kods veiksmīgi sakompilējās un izpildījās.", exit_code=0, ) return run_result def _run_rust_eval(code: str, spec: CodeExecutionSpec) -> CodeExecutionResult: rustc_path = shutil.which("rustc") if rustc_path is None: return _unsupported_language_result("rust", "rustc nav pieejams.") with tempfile.TemporaryDirectory(prefix="maris-code-eval-") as tmp_dir: workspace = Path(tmp_dir) source_path = workspace / "main.rs" binary_path = workspace / "main" source_path.write_text(_build_source(code, spec.test_code, "//"), encoding="utf-8") compile_result = _run_command( [rustc_path, "main.rs", "-o", str(binary_path)], cwd=workspace, spec=spec, language="rust", ) if compile_result is not None: return compile_result if spec.compile_only: return CodeExecutionResult( language="rust", available=True, passed=True, summary="Rust kods veiksmīgi sakompilējās.", exit_code=0, ) run_result = _run_command([str(binary_path)], cwd=workspace, spec=spec, language="rust") if run_result is None: return CodeExecutionResult( language="rust", available=True, passed=True, summary="Rust kods veiksmīgi sakompilējās un izpildījās.", exit_code=0, ) return run_result def _run_sql_eval(code: str, spec: CodeExecutionSpec) -> CodeExecutionResult: try: with tempfile.TemporaryDirectory(prefix="maris-sql-eval-") as tmp_dir: workspace = Path(tmp_dir) connection = sqlite3.connect(":memory:") try: connection.execute("PRAGMA foreign_keys = ON") script = _build_sql_script(code, spec.test_code, compile_only=spec.compile_only) connection.executescript(script) finally: connection.close() workspace.mkdir(parents=True, exist_ok=True) except sqlite3.Error as exc: return CodeExecutionResult( language="sql", available=True, passed=False, summary="SQL execution eval neizdevās.", stderr=str(exc), ) return CodeExecutionResult( language="sql", available=True, passed=True, summary="SQL skripts veiksmīgi validējās un izpildījās.", exit_code=0, ) def _build_sql_script(code: str, test_code: str, *, compile_only: bool) -> str: candidate = code.strip().rstrip(";") harness = test_code.strip() if harness and "{{CODE}}" in harness: return harness.replace("{{CODE}}", candidate) if compile_only: if harness: return f"{harness}\nEXPLAIN QUERY PLAN {candidate};\n" return f"EXPLAIN QUERY PLAN {candidate};\n" if harness: return f"{harness}\n{candidate};\n" return candidate + ";\n" def _run_command( command: list[str], *, cwd: Path, spec: CodeExecutionSpec, language: str, ) -> CodeExecutionResult | None: try: completed = subprocess.run( # noqa: S603 command, cwd=str(cwd), check=False, capture_output=True, text=True, timeout=spec.timeout_seconds, stdin=subprocess.DEVNULL, env=_build_isolated_env(cwd), preexec_fn=_build_subprocess_preexec(spec), ) except subprocess.TimeoutExpired as exc: return CodeExecutionResult( language=language, available=True, passed=False, summary="Execution eval pārsniedza laika limitu.", stdout=_truncate_output(exc.stdout or "", spec.max_output_chars), stderr=_truncate_output(exc.stderr or "", spec.max_output_chars), ) if completed.returncode == 0: return None return CodeExecutionResult( language=language, available=True, passed=False, summary="Execution eval neizdevās.", exit_code=completed.returncode, stdout=_truncate_output(completed.stdout, spec.max_output_chars), stderr=_truncate_output(completed.stderr, spec.max_output_chars), ) def _build_isolated_env(workspace: Path) -> dict[str, str]: env: dict[str, str] = { "HOME": str(workspace), "TMPDIR": str(workspace), "TEMP": str(workspace), "TMP": str(workspace), "PYTHONNOUSERSITE": "1", "PYTHONDONTWRITEBYTECODE": "1", "PYTHONIOENCODING": "utf-8", "NODE_DISABLE_COLORS": "1", "CI": "1", } for key in ("PATH", "SYSTEMROOT", "SystemRoot", "WINDIR", "ComSpec"): value = os.environ.get(key) if value: env[key] = value return env def _build_subprocess_preexec(spec: CodeExecutionSpec): if os.name != "posix" or resource is None: return None memory_limit_bytes = max(spec.memory_limit_mb, 64) * 1024 * 1024 cpu_limit_seconds = max(2, math.ceil(spec.timeout_seconds) + 1) def _apply_limits() -> None: os.setsid() resource.setrlimit(resource.RLIMIT_CPU, (cpu_limit_seconds, cpu_limit_seconds)) resource.setrlimit(resource.RLIMIT_AS, (memory_limit_bytes, memory_limit_bytes)) resource.setrlimit(resource.RLIMIT_CORE, (0, 0)) resource.setrlimit(resource.RLIMIT_FSIZE, (8 * 1024 * 1024, 8 * 1024 * 1024)) resource.setrlimit(resource.RLIMIT_NOFILE, (64, 64)) if hasattr(resource, "RLIMIT_NPROC"): resource.setrlimit(resource.RLIMIT_NPROC, (32, 32)) return _apply_limits def _truncate_output(value: str, max_chars: int) -> str: if len(value) <= max_chars: return value return value[:max_chars] + "\n...[truncated]" def _unsupported_language_result(language: str, reason: str) -> CodeExecutionResult: return CodeExecutionResult( language=language, available=False, passed=False, summary=reason, )