File size: 12,978 Bytes
f440f03
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
"""Execution-based code evaluation helpers for coder benchmarks."""

from __future__ import annotations

import math
import os
import re
import shutil
import sqlite3
import subprocess
import tempfile
from dataclasses import dataclass
from pathlib import Path

try:
    import resource
except ImportError:  # pragma: no cover - non-POSIX fallback
    resource = None  # type: ignore[assignment]

_CODE_BLOCK_RE = re.compile(r"```(?P<lang>[^\n`]*)\n(?P<code>.*?)```", re.DOTALL)
DEFAULT_EXECUTION_MEMORY_LIMIT_MB = 512
DEFAULT_EXECUTION_MAX_OUTPUT_CHARS = 12_000


@dataclass(frozen=True, slots=True)
class CodeExecutionSpec:
    language: str
    test_code: str = ""
    timeout_seconds: float = 8.0
    compile_only: bool = False
    memory_limit_mb: int = DEFAULT_EXECUTION_MEMORY_LIMIT_MB
    max_output_chars: int = DEFAULT_EXECUTION_MAX_OUTPUT_CHARS


@dataclass(frozen=True, slots=True)
class CodeExecutionResult:
    language: str
    available: bool
    passed: bool
    summary: str
    exit_code: int | None = None
    stdout: str = ""
    stderr: str = ""


def extract_code_block(text: str, language: str | None = None) -> str:
    matches = list(_CODE_BLOCK_RE.finditer(text))
    if not matches:
        return text.strip()

    normalized_language = (language or "").strip().lower()
    if normalized_language:
        for match in matches:
            fence_language = match.group("lang").strip().lower()
            if fence_language == normalized_language:
                return match.group("code").strip()
    return matches[0].group("code").strip()


def evaluate_code_response(response_text: str, spec: CodeExecutionSpec) -> CodeExecutionResult:
    language = spec.language.strip().lower()
    code = extract_code_block(response_text, language=language)
    if not code:
        return CodeExecutionResult(
            language=language,
            available=True,
            passed=False,
            summary="Atbildē nav atrasts izpildāms koda bloks.",
        )

    if language == "python":
        python_path = shutil.which("python3") or shutil.which("python")
        if python_path is None:
            return _unsupported_language_result(language, "python nav pieejams.")
        command = (
            [python_path, "-I", "-B", "-s", "main.py"]
            if not spec.compile_only
            else [python_path, "-I", "-B", "-s", "-m", "py_compile", "main.py"]
        )
        return _run_script_eval(
            language=language,
            command=command,
            file_name="main.py",
            source=_build_source(code, spec.test_code, "#"),
            spec=spec,
        )
    if language in {"javascript", "js"}:
        node_path = shutil.which("node")
        if node_path is None:
            return _unsupported_language_result(language, "node nav pieejams.")
        command = (
            [node_path, "main.js"] if not spec.compile_only else [node_path, "--check", "main.js"]
        )
        return _run_script_eval(
            language=language,
            command=command,
            file_name="main.js",
            source=_build_source(code, spec.test_code, "//"),
            spec=spec,
        )
    if language in {"typescript", "ts"}:
        return _run_typescript_eval(code, spec)
    if language in {"bash", "sh"}:
        bash_path = shutil.which("bash")
        if bash_path is None:
            return _unsupported_language_result(language, "bash nav pieejams.")
        command = [bash_path, "main.sh"] if not spec.compile_only else [bash_path, "-n", "main.sh"]
        return _run_script_eval(
            language=language,
            command=command,
            file_name="main.sh",
            source=_build_source(code, spec.test_code, "#"),
            spec=spec,
        )
    if language == "rust":
        return _run_rust_eval(code, spec)
    if language == "sql":
        return _run_sql_eval(code, spec)

    return _unsupported_language_result(
        language, "Valoda execution evals režīmā vēl nav atbalstīta."
    )


def _build_source(code: str, test_code: str, comment_prefix: str) -> str:
    source = code.strip()
    tests = test_code.strip()
    if not tests:
        return source + "\n"
    return f"{source}\n\n{comment_prefix} execution harness\n{tests}\n"


def _run_script_eval(
    *,
    language: str,
    command: list[str],
    file_name: str,
    source: str,
    spec: CodeExecutionSpec,
) -> CodeExecutionResult:
    with tempfile.TemporaryDirectory(prefix="maris-code-eval-") as tmp_dir:
        workspace = Path(tmp_dir)
        file_path = workspace / file_name
        file_path.write_text(source, encoding="utf-8")
        result = _run_command(command, cwd=workspace, spec=spec, language=language)
        if result is None:
            return CodeExecutionResult(
                language=language,
                available=True,
                passed=True,
                summary=f"{language} kods izpildījās veiksmīgi.",
                exit_code=0,
            )
        return result


def _run_typescript_eval(code: str, spec: CodeExecutionSpec) -> CodeExecutionResult:
    tsc_path = shutil.which("tsc")
    if tsc_path is None:
        return _unsupported_language_result("typescript", "tsc nav pieejams.")
    node_path = shutil.which("node")
    if not spec.compile_only and node_path is None:
        return _unsupported_language_result("typescript", "node nav pieejams TypeScript izpildei.")

    with tempfile.TemporaryDirectory(prefix="maris-code-eval-") as tmp_dir:
        workspace = Path(tmp_dir)
        source_path = workspace / "main.ts"
        source_path.write_text(_build_source(code, spec.test_code, "//"), encoding="utf-8")
        compile_result = _run_command(
            [
                tsc_path,
                "--pretty",
                "false",
                "--target",
                "ES2020",
                "--module",
                "commonjs",
                "main.ts",
            ],
            cwd=workspace,
            spec=spec,
            language="typescript",
        )
        if compile_result is not None:
            return compile_result
        if spec.compile_only:
            return CodeExecutionResult(
                language="typescript",
                available=True,
                passed=True,
                summary="TypeScript kods veiksmīgi sakompilējās.",
                exit_code=0,
            )
        run_result = _run_command(
            [node_path, "main.js"], cwd=workspace, spec=spec, language="typescript"
        )
        if run_result is None:
            return CodeExecutionResult(
                language="typescript",
                available=True,
                passed=True,
                summary="TypeScript kods veiksmīgi sakompilējās un izpildījās.",
                exit_code=0,
            )
        return run_result


def _run_rust_eval(code: str, spec: CodeExecutionSpec) -> CodeExecutionResult:
    rustc_path = shutil.which("rustc")
    if rustc_path is None:
        return _unsupported_language_result("rust", "rustc nav pieejams.")
    with tempfile.TemporaryDirectory(prefix="maris-code-eval-") as tmp_dir:
        workspace = Path(tmp_dir)
        source_path = workspace / "main.rs"
        binary_path = workspace / "main"
        source_path.write_text(_build_source(code, spec.test_code, "//"), encoding="utf-8")
        compile_result = _run_command(
            [rustc_path, "main.rs", "-o", str(binary_path)],
            cwd=workspace,
            spec=spec,
            language="rust",
        )
        if compile_result is not None:
            return compile_result
        if spec.compile_only:
            return CodeExecutionResult(
                language="rust",
                available=True,
                passed=True,
                summary="Rust kods veiksmīgi sakompilējās.",
                exit_code=0,
            )
        run_result = _run_command([str(binary_path)], cwd=workspace, spec=spec, language="rust")
        if run_result is None:
            return CodeExecutionResult(
                language="rust",
                available=True,
                passed=True,
                summary="Rust kods veiksmīgi sakompilējās un izpildījās.",
                exit_code=0,
            )
        return run_result


def _run_sql_eval(code: str, spec: CodeExecutionSpec) -> CodeExecutionResult:
    try:
        with tempfile.TemporaryDirectory(prefix="maris-sql-eval-") as tmp_dir:
            workspace = Path(tmp_dir)
            connection = sqlite3.connect(":memory:")
            try:
                connection.execute("PRAGMA foreign_keys = ON")
                script = _build_sql_script(code, spec.test_code, compile_only=spec.compile_only)
                connection.executescript(script)
            finally:
                connection.close()
                workspace.mkdir(parents=True, exist_ok=True)
    except sqlite3.Error as exc:
        return CodeExecutionResult(
            language="sql",
            available=True,
            passed=False,
            summary="SQL execution eval neizdevās.",
            stderr=str(exc),
        )
    return CodeExecutionResult(
        language="sql",
        available=True,
        passed=True,
        summary="SQL skripts veiksmīgi validējās un izpildījās.",
        exit_code=0,
    )


def _build_sql_script(code: str, test_code: str, *, compile_only: bool) -> str:
    candidate = code.strip().rstrip(";")
    harness = test_code.strip()
    if harness and "{{CODE}}" in harness:
        return harness.replace("{{CODE}}", candidate)
    if compile_only:
        if harness:
            return f"{harness}\nEXPLAIN QUERY PLAN {candidate};\n"
        return f"EXPLAIN QUERY PLAN {candidate};\n"
    if harness:
        return f"{harness}\n{candidate};\n"
    return candidate + ";\n"


def _run_command(
    command: list[str],
    *,
    cwd: Path,
    spec: CodeExecutionSpec,
    language: str,
) -> CodeExecutionResult | None:
    try:
        completed = subprocess.run(  # noqa: S603
            command,
            cwd=str(cwd),
            check=False,
            capture_output=True,
            text=True,
            timeout=spec.timeout_seconds,
            stdin=subprocess.DEVNULL,
            env=_build_isolated_env(cwd),
            preexec_fn=_build_subprocess_preexec(spec),
        )
    except subprocess.TimeoutExpired as exc:
        return CodeExecutionResult(
            language=language,
            available=True,
            passed=False,
            summary="Execution eval pārsniedza laika limitu.",
            stdout=_truncate_output(exc.stdout or "", spec.max_output_chars),
            stderr=_truncate_output(exc.stderr or "", spec.max_output_chars),
        )
    if completed.returncode == 0:
        return None
    return CodeExecutionResult(
        language=language,
        available=True,
        passed=False,
        summary="Execution eval neizdevās.",
        exit_code=completed.returncode,
        stdout=_truncate_output(completed.stdout, spec.max_output_chars),
        stderr=_truncate_output(completed.stderr, spec.max_output_chars),
    )


def _build_isolated_env(workspace: Path) -> dict[str, str]:
    env: dict[str, str] = {
        "HOME": str(workspace),
        "TMPDIR": str(workspace),
        "TEMP": str(workspace),
        "TMP": str(workspace),
        "PYTHONNOUSERSITE": "1",
        "PYTHONDONTWRITEBYTECODE": "1",
        "PYTHONIOENCODING": "utf-8",
        "NODE_DISABLE_COLORS": "1",
        "CI": "1",
    }
    for key in ("PATH", "SYSTEMROOT", "SystemRoot", "WINDIR", "ComSpec"):
        value = os.environ.get(key)
        if value:
            env[key] = value
    return env


def _build_subprocess_preexec(spec: CodeExecutionSpec):
    if os.name != "posix" or resource is None:
        return None

    memory_limit_bytes = max(spec.memory_limit_mb, 64) * 1024 * 1024
    cpu_limit_seconds = max(2, math.ceil(spec.timeout_seconds) + 1)

    def _apply_limits() -> None:
        os.setsid()
        resource.setrlimit(resource.RLIMIT_CPU, (cpu_limit_seconds, cpu_limit_seconds))
        resource.setrlimit(resource.RLIMIT_AS, (memory_limit_bytes, memory_limit_bytes))
        resource.setrlimit(resource.RLIMIT_CORE, (0, 0))
        resource.setrlimit(resource.RLIMIT_FSIZE, (8 * 1024 * 1024, 8 * 1024 * 1024))
        resource.setrlimit(resource.RLIMIT_NOFILE, (64, 64))
        if hasattr(resource, "RLIMIT_NPROC"):
            resource.setrlimit(resource.RLIMIT_NPROC, (32, 32))

    return _apply_limits


def _truncate_output(value: str, max_chars: int) -> str:
    if len(value) <= max_chars:
        return value
    return value[:max_chars] + "\n...[truncated]"


def _unsupported_language_result(language: str, reason: str) -> CodeExecutionResult:
    return CodeExecutionResult(
        language=language,
        available=False,
        passed=False,
        summary=reason,
    )