| """Execution-based code evaluation helpers for coder benchmarks.""" |
|
|
| from __future__ import annotations |
|
|
| import math |
| import os |
| import re |
| import shutil |
| import sqlite3 |
| import subprocess |
| import tempfile |
| from dataclasses import dataclass |
| from pathlib import Path |
|
|
| try: |
| import resource |
| except ImportError: |
| resource = None |
|
|
| _CODE_BLOCK_RE = re.compile(r"```(?P<lang>[^\n`]*)\n(?P<code>.*?)```", re.DOTALL) |
| DEFAULT_EXECUTION_MEMORY_LIMIT_MB = 512 |
| DEFAULT_EXECUTION_MAX_OUTPUT_CHARS = 12_000 |
|
|
|
|
| @dataclass(frozen=True, slots=True) |
| class CodeExecutionSpec: |
| language: str |
| test_code: str = "" |
| timeout_seconds: float = 8.0 |
| compile_only: bool = False |
| memory_limit_mb: int = DEFAULT_EXECUTION_MEMORY_LIMIT_MB |
| max_output_chars: int = DEFAULT_EXECUTION_MAX_OUTPUT_CHARS |
|
|
|
|
| @dataclass(frozen=True, slots=True) |
| class CodeExecutionResult: |
| language: str |
| available: bool |
| passed: bool |
| summary: str |
| exit_code: int | None = None |
| stdout: str = "" |
| stderr: str = "" |
|
|
|
|
| def extract_code_block(text: str, language: str | None = None) -> str: |
| matches = list(_CODE_BLOCK_RE.finditer(text)) |
| if not matches: |
| return text.strip() |
|
|
| normalized_language = (language or "").strip().lower() |
| if normalized_language: |
| for match in matches: |
| fence_language = match.group("lang").strip().lower() |
| if fence_language == normalized_language: |
| return match.group("code").strip() |
| return matches[0].group("code").strip() |
|
|
|
|
| def evaluate_code_response(response_text: str, spec: CodeExecutionSpec) -> CodeExecutionResult: |
| language = spec.language.strip().lower() |
| code = extract_code_block(response_text, language=language) |
| if not code: |
| return CodeExecutionResult( |
| language=language, |
| available=True, |
| passed=False, |
| summary="Atbildē nav atrasts izpildāms koda bloks.", |
| ) |
|
|
| if language == "python": |
| python_path = shutil.which("python3") or shutil.which("python") |
| if python_path is None: |
| return _unsupported_language_result(language, "python nav pieejams.") |
| command = ( |
| [python_path, "-I", "-B", "-s", "main.py"] |
| if not spec.compile_only |
| else [python_path, "-I", "-B", "-s", "-m", "py_compile", "main.py"] |
| ) |
| return _run_script_eval( |
| language=language, |
| command=command, |
| file_name="main.py", |
| source=_build_source(code, spec.test_code, "#"), |
| spec=spec, |
| ) |
| if language in {"javascript", "js"}: |
| node_path = shutil.which("node") |
| if node_path is None: |
| return _unsupported_language_result(language, "node nav pieejams.") |
| command = ( |
| [node_path, "main.js"] if not spec.compile_only else [node_path, "--check", "main.js"] |
| ) |
| return _run_script_eval( |
| language=language, |
| command=command, |
| file_name="main.js", |
| source=_build_source(code, spec.test_code, "//"), |
| spec=spec, |
| ) |
| if language in {"typescript", "ts"}: |
| return _run_typescript_eval(code, spec) |
| if language in {"bash", "sh"}: |
| bash_path = shutil.which("bash") |
| if bash_path is None: |
| return _unsupported_language_result(language, "bash nav pieejams.") |
| command = [bash_path, "main.sh"] if not spec.compile_only else [bash_path, "-n", "main.sh"] |
| return _run_script_eval( |
| language=language, |
| command=command, |
| file_name="main.sh", |
| source=_build_source(code, spec.test_code, "#"), |
| spec=spec, |
| ) |
| if language == "rust": |
| return _run_rust_eval(code, spec) |
| if language == "sql": |
| return _run_sql_eval(code, spec) |
|
|
| return _unsupported_language_result( |
| language, "Valoda execution evals režīmā vēl nav atbalstīta." |
| ) |
|
|
|
|
| def _build_source(code: str, test_code: str, comment_prefix: str) -> str: |
| source = code.strip() |
| tests = test_code.strip() |
| if not tests: |
| return source + "\n" |
| return f"{source}\n\n{comment_prefix} execution harness\n{tests}\n" |
|
|
|
|
| def _run_script_eval( |
| *, |
| language: str, |
| command: list[str], |
| file_name: str, |
| source: str, |
| spec: CodeExecutionSpec, |
| ) -> CodeExecutionResult: |
| with tempfile.TemporaryDirectory(prefix="maris-code-eval-") as tmp_dir: |
| workspace = Path(tmp_dir) |
| file_path = workspace / file_name |
| file_path.write_text(source, encoding="utf-8") |
| result = _run_command(command, cwd=workspace, spec=spec, language=language) |
| if result is None: |
| return CodeExecutionResult( |
| language=language, |
| available=True, |
| passed=True, |
| summary=f"{language} kods izpildījās veiksmīgi.", |
| exit_code=0, |
| ) |
| return result |
|
|
|
|
| def _run_typescript_eval(code: str, spec: CodeExecutionSpec) -> CodeExecutionResult: |
| tsc_path = shutil.which("tsc") |
| if tsc_path is None: |
| return _unsupported_language_result("typescript", "tsc nav pieejams.") |
| node_path = shutil.which("node") |
| if not spec.compile_only and node_path is None: |
| return _unsupported_language_result("typescript", "node nav pieejams TypeScript izpildei.") |
|
|
| with tempfile.TemporaryDirectory(prefix="maris-code-eval-") as tmp_dir: |
| workspace = Path(tmp_dir) |
| source_path = workspace / "main.ts" |
| source_path.write_text(_build_source(code, spec.test_code, "//"), encoding="utf-8") |
| compile_result = _run_command( |
| [ |
| tsc_path, |
| "--pretty", |
| "false", |
| "--target", |
| "ES2020", |
| "--module", |
| "commonjs", |
| "main.ts", |
| ], |
| cwd=workspace, |
| spec=spec, |
| language="typescript", |
| ) |
| if compile_result is not None: |
| return compile_result |
| if spec.compile_only: |
| return CodeExecutionResult( |
| language="typescript", |
| available=True, |
| passed=True, |
| summary="TypeScript kods veiksmīgi sakompilējās.", |
| exit_code=0, |
| ) |
| run_result = _run_command( |
| [node_path, "main.js"], cwd=workspace, spec=spec, language="typescript" |
| ) |
| if run_result is None: |
| return CodeExecutionResult( |
| language="typescript", |
| available=True, |
| passed=True, |
| summary="TypeScript kods veiksmīgi sakompilējās un izpildījās.", |
| exit_code=0, |
| ) |
| return run_result |
|
|
|
|
| def _run_rust_eval(code: str, spec: CodeExecutionSpec) -> CodeExecutionResult: |
| rustc_path = shutil.which("rustc") |
| if rustc_path is None: |
| return _unsupported_language_result("rust", "rustc nav pieejams.") |
| with tempfile.TemporaryDirectory(prefix="maris-code-eval-") as tmp_dir: |
| workspace = Path(tmp_dir) |
| source_path = workspace / "main.rs" |
| binary_path = workspace / "main" |
| source_path.write_text(_build_source(code, spec.test_code, "//"), encoding="utf-8") |
| compile_result = _run_command( |
| [rustc_path, "main.rs", "-o", str(binary_path)], |
| cwd=workspace, |
| spec=spec, |
| language="rust", |
| ) |
| if compile_result is not None: |
| return compile_result |
| if spec.compile_only: |
| return CodeExecutionResult( |
| language="rust", |
| available=True, |
| passed=True, |
| summary="Rust kods veiksmīgi sakompilējās.", |
| exit_code=0, |
| ) |
| run_result = _run_command([str(binary_path)], cwd=workspace, spec=spec, language="rust") |
| if run_result is None: |
| return CodeExecutionResult( |
| language="rust", |
| available=True, |
| passed=True, |
| summary="Rust kods veiksmīgi sakompilējās un izpildījās.", |
| exit_code=0, |
| ) |
| return run_result |
|
|
|
|
| def _run_sql_eval(code: str, spec: CodeExecutionSpec) -> CodeExecutionResult: |
| try: |
| with tempfile.TemporaryDirectory(prefix="maris-sql-eval-") as tmp_dir: |
| workspace = Path(tmp_dir) |
| connection = sqlite3.connect(":memory:") |
| try: |
| connection.execute("PRAGMA foreign_keys = ON") |
| script = _build_sql_script(code, spec.test_code, compile_only=spec.compile_only) |
| connection.executescript(script) |
| finally: |
| connection.close() |
| workspace.mkdir(parents=True, exist_ok=True) |
| except sqlite3.Error as exc: |
| return CodeExecutionResult( |
| language="sql", |
| available=True, |
| passed=False, |
| summary="SQL execution eval neizdevās.", |
| stderr=str(exc), |
| ) |
| return CodeExecutionResult( |
| language="sql", |
| available=True, |
| passed=True, |
| summary="SQL skripts veiksmīgi validējās un izpildījās.", |
| exit_code=0, |
| ) |
|
|
|
|
| def _build_sql_script(code: str, test_code: str, *, compile_only: bool) -> str: |
| candidate = code.strip().rstrip(";") |
| harness = test_code.strip() |
| if harness and "{{CODE}}" in harness: |
| return harness.replace("{{CODE}}", candidate) |
| if compile_only: |
| if harness: |
| return f"{harness}\nEXPLAIN QUERY PLAN {candidate};\n" |
| return f"EXPLAIN QUERY PLAN {candidate};\n" |
| if harness: |
| return f"{harness}\n{candidate};\n" |
| return candidate + ";\n" |
|
|
|
|
| def _run_command( |
| command: list[str], |
| *, |
| cwd: Path, |
| spec: CodeExecutionSpec, |
| language: str, |
| ) -> CodeExecutionResult | None: |
| try: |
| completed = subprocess.run( |
| command, |
| cwd=str(cwd), |
| check=False, |
| capture_output=True, |
| text=True, |
| timeout=spec.timeout_seconds, |
| stdin=subprocess.DEVNULL, |
| env=_build_isolated_env(cwd), |
| preexec_fn=_build_subprocess_preexec(spec), |
| ) |
| except subprocess.TimeoutExpired as exc: |
| return CodeExecutionResult( |
| language=language, |
| available=True, |
| passed=False, |
| summary="Execution eval pārsniedza laika limitu.", |
| stdout=_truncate_output(exc.stdout or "", spec.max_output_chars), |
| stderr=_truncate_output(exc.stderr or "", spec.max_output_chars), |
| ) |
| if completed.returncode == 0: |
| return None |
| return CodeExecutionResult( |
| language=language, |
| available=True, |
| passed=False, |
| summary="Execution eval neizdevās.", |
| exit_code=completed.returncode, |
| stdout=_truncate_output(completed.stdout, spec.max_output_chars), |
| stderr=_truncate_output(completed.stderr, spec.max_output_chars), |
| ) |
|
|
|
|
| def _build_isolated_env(workspace: Path) -> dict[str, str]: |
| env: dict[str, str] = { |
| "HOME": str(workspace), |
| "TMPDIR": str(workspace), |
| "TEMP": str(workspace), |
| "TMP": str(workspace), |
| "PYTHONNOUSERSITE": "1", |
| "PYTHONDONTWRITEBYTECODE": "1", |
| "PYTHONIOENCODING": "utf-8", |
| "NODE_DISABLE_COLORS": "1", |
| "CI": "1", |
| } |
| for key in ("PATH", "SYSTEMROOT", "SystemRoot", "WINDIR", "ComSpec"): |
| value = os.environ.get(key) |
| if value: |
| env[key] = value |
| return env |
|
|
|
|
| def _build_subprocess_preexec(spec: CodeExecutionSpec): |
| if os.name != "posix" or resource is None: |
| return None |
|
|
| memory_limit_bytes = max(spec.memory_limit_mb, 64) * 1024 * 1024 |
| cpu_limit_seconds = max(2, math.ceil(spec.timeout_seconds) + 1) |
|
|
| def _apply_limits() -> None: |
| os.setsid() |
| resource.setrlimit(resource.RLIMIT_CPU, (cpu_limit_seconds, cpu_limit_seconds)) |
| resource.setrlimit(resource.RLIMIT_AS, (memory_limit_bytes, memory_limit_bytes)) |
| resource.setrlimit(resource.RLIMIT_CORE, (0, 0)) |
| resource.setrlimit(resource.RLIMIT_FSIZE, (8 * 1024 * 1024, 8 * 1024 * 1024)) |
| resource.setrlimit(resource.RLIMIT_NOFILE, (64, 64)) |
| if hasattr(resource, "RLIMIT_NPROC"): |
| resource.setrlimit(resource.RLIMIT_NPROC, (32, 32)) |
|
|
| return _apply_limits |
|
|
|
|
| def _truncate_output(value: str, max_chars: int) -> str: |
| if len(value) <= max_chars: |
| return value |
| return value[:max_chars] + "\n...[truncated]" |
|
|
|
|
| def _unsupported_language_result(language: str, reason: str) -> CodeExecutionResult: |
| return CodeExecutionResult( |
| language=language, |
| available=False, |
| passed=False, |
| summary=reason, |
| ) |
|
|