from __future__ import annotations import subprocess import sys import tempfile from pathlib import Path def run_python_file(path: str | Path, *, timeout: int = 20) -> dict[str, str | int]: file_path = Path(path) return _run_python([str(file_path)], cwd=file_path.parent, timeout=timeout) def run_python_code(code: str, *, timeout: int = 20) -> dict[str, str | int]: with tempfile.TemporaryDirectory(prefix="gaia-python-") as tmpdir: file_path = Path(tmpdir) / "snippet.py" file_path.write_text(code, encoding="utf-8") return _run_python([str(file_path)], cwd=Path(tmpdir), timeout=timeout) def _run_python( args: list[str], *, cwd: Path, timeout: int, ) -> dict[str, str | int]: try: completed = subprocess.run( [sys.executable, *args], cwd=cwd, capture_output=True, text=True, timeout=timeout, check=False, ) except subprocess.TimeoutExpired as exc: return { "exit_code": 124, "stdout": exc.stdout or "", "stderr": f"Python execution timed out after {timeout}s.", } return { "exit_code": completed.returncode, "stdout": completed.stdout, "stderr": completed.stderr, }