"""Sandboxed Python code executor for ODSE. Executes agent-written code in a restricted namespace with: - Whitelisted imports (pandas, numpy, sklearn, scipy, etc.) - Per-execution time limits - Captured stdout / stderr - Persistent namespace across calls (notebook-kernel semantics) """ from __future__ import annotations import builtins import io import signal import time import traceback from contextlib import redirect_stderr, redirect_stdout from typing import Any, Callable, Dict, List, Optional import numpy as np import pandas as pd from .models import ExecutionStatus, VariableInfo # ============================================================================ # Security: allowed imports and blocked builtins # ============================================================================ ALLOWED_MODULES: set[str] = { # Core data-science stack "numpy", "pandas", "sklearn", "scipy", "math", "statistics", # Standard-library utilities "collections", "itertools", "functools", "re", "json", "copy", "typing", "operator", "datetime", "time", "warnings", # sklearn sub-packages (non-exhaustive, top-level covers them) "sklearn.linear_model", "sklearn.ensemble", "sklearn.tree", "sklearn.svm", "sklearn.neighbors", "sklearn.naive_bayes", "sklearn.preprocessing", "sklearn.model_selection", "sklearn.metrics", "sklearn.pipeline", "sklearn.impute", "sklearn.decomposition", "sklearn.cluster", "sklearn.feature_selection", "sklearn.feature_extraction", # scipy sub-packages "scipy.stats", "scipy.sparse", "scipy.optimize", # Optional extras "xgboost", "lightgbm", "catboost", } BLOCKED_BUILTINS: set[str] = { "exec", "eval", "compile", # We provide safe alternatives "__import__", # Replaced by _safe_import "open", "input", # No file/terminal I/O "breakpoint", "exit", "quit", } # ============================================================================ # Execution result (internal data class) # ============================================================================ class ExecutionResult: """Immutable result of a single code execution.""" __slots__ = ("status", "stdout", "stderr", "execution_time_ms") def __init__( self, status: ExecutionStatus, stdout: str = "", stderr: str = "", execution_time_ms: float = 0.0, ) -> None: self.status = status self.stdout = stdout self.stderr = stderr self.execution_time_ms = execution_time_ms # ============================================================================ # Sandbox executor # ============================================================================ class _SandboxTimeout(Exception): """Raised when code execution exceeds the time limit.""" class SandboxExecutor: """Executes Python code in a sandboxed, persistent namespace. Simulates a Jupyter-notebook-style kernel: variables created in one ``execute()`` call are visible in subsequent calls. Parameters ---------- timeout_seconds : float Maximum wall-clock time per ``execute()`` call. max_output_chars : int Stdout/stderr truncation threshold. """ def __init__( self, timeout_seconds: float = 30.0, max_output_chars: int = 10_000, ) -> None: self.timeout_seconds = timeout_seconds self.max_output_chars = max_output_chars self._namespace: Dict[str, Any] = {} self._setup_done: bool = False # -- Properties ---------------------------------------------------------- @property def namespace(self) -> Dict[str, Any]: """Direct (read-only) view of the sandbox namespace.""" return self._namespace # -- Lifecycle ----------------------------------------------------------- def setup_namespace( self, *, train_df: pd.DataFrame, val_features: pd.DataFrame, test_features: pd.DataFrame, target_column: str, evaluate_fn: Callable, ) -> None: """Initialise the sandbox namespace with pre-loaded variables.""" self._namespace = { # Data "train_df": train_df.copy(), "val_features": val_features.copy(), "test_features": test_features.copy(), "target_column": target_column, # Libraries "pd": pd, "np": np, # Evaluation helper "evaluate": evaluate_fn, # print is captured via redirect_stdout "print": print, } self._namespace["__builtins__"] = self._make_safe_builtins() self._setup_done = True def reset(self) -> None: """Clear the namespace entirely.""" self._namespace.clear() self._setup_done = False # -- Code execution ------------------------------------------------------ def execute(self, code: str) -> ExecutionResult: """Execute *code* in the sandbox and return an ``ExecutionResult``.""" if not self._setup_done: return ExecutionResult( status=ExecutionStatus.ERROR, stderr="Sandbox not initialised - call setup_namespace() first.", ) stdout_buf = io.StringIO() stderr_buf = io.StringIO() start = time.perf_counter() try: with redirect_stdout(stdout_buf), redirect_stderr(stderr_buf): self._exec_with_timeout(code) elapsed = (time.perf_counter() - start) * 1000 return ExecutionResult( status=ExecutionStatus.SUCCESS, stdout=self._truncate(stdout_buf.getvalue()), stderr=self._truncate(stderr_buf.getvalue()), execution_time_ms=elapsed, ) except _SandboxTimeout as exc: elapsed = (time.perf_counter() - start) * 1000 return ExecutionResult( status=ExecutionStatus.TIMEOUT, stdout=self._truncate(stdout_buf.getvalue()), stderr=str(exc), execution_time_ms=elapsed, ) except Exception: elapsed = (time.perf_counter() - start) * 1000 return ExecutionResult( status=ExecutionStatus.ERROR, stdout=self._truncate(stdout_buf.getvalue()), stderr=self._truncate(traceback.format_exc()), execution_time_ms=elapsed, ) # -- Introspection ------------------------------------------------------- def get_namespace_summary(self) -> List[VariableInfo]: """Return a summary of user-visible variables in the namespace.""" # Variables injected by the environment that agents shouldn't inspect hidden = { "__builtins__", "pd", "np", "evaluate", "target_column", "print", } summary: List[VariableInfo] = [] for name, value in self._namespace.items(): if name.startswith("_") or name in hidden: continue summary.append( VariableInfo( name=name, type_name=type(value).__name__, shape=getattr(value, "shape", None), preview=self._preview(value), ) ) return summary def get_predictions(self) -> Optional[np.ndarray]: """Retrieve ``predictions`` from the namespace (or ``None``).""" preds = self._namespace.get("predictions") if preds is None: return None try: return np.asarray(preds) except Exception: return None # -- Private helpers ----------------------------------------------------- def _exec_with_timeout(self, code: str) -> None: """Compile and exec *code* with a timeout. Uses SIGALRM on the main thread (hard kill), and falls back to a threading.Timer + ctypes interrupt on worker threads (e.g. inside uvicorn). """ import threading compiled = compile(code, "", "exec") is_main = threading.current_thread() is threading.main_thread() if is_main and hasattr(signal, "SIGALRM"): self._exec_with_sigalrm(compiled) else: self._exec_with_timer(compiled) def _exec_with_sigalrm(self, compiled: Any) -> None: """SIGALRM-based timeout (main thread only).""" def _alarm(signum, frame): # noqa: ARG001 raise _SandboxTimeout( f"Code execution exceeded {self.timeout_seconds}s time limit" ) old_handler = signal.signal(signal.SIGALRM, _alarm) signal.alarm(int(self.timeout_seconds)) try: exec(compiled, self._namespace) # noqa: S102 finally: signal.alarm(0) signal.signal(signal.SIGALRM, old_handler) def _exec_with_timer(self, compiled: Any) -> None: """threading.Timer-based timeout (works from any thread).""" import ctypes import threading tid = threading.current_thread().ident timed_out = False def _interrupt(): nonlocal timed_out timed_out = True if tid is not None: ctypes.pythonapi.PyThreadState_SetAsyncExc( ctypes.c_ulong(tid), ctypes.py_object(KeyboardInterrupt), ) timer = threading.Timer(self.timeout_seconds, _interrupt) timer.start() try: exec(compiled, self._namespace) # noqa: S102 except KeyboardInterrupt: if timed_out: raise _SandboxTimeout( f"Code execution exceeded {self.timeout_seconds}s time limit" ) from None raise finally: timer.cancel() def _make_safe_builtins(self) -> Dict[str, Any]: """Build a restricted ``__builtins__`` dict.""" safe: Dict[str, Any] = {} for name in dir(builtins): if name not in BLOCKED_BUILTINS: safe[name] = getattr(builtins, name) # Provide a guarded import safe["__import__"] = self._safe_import return safe def _safe_import(self, name: str, *args: Any, **kwargs: Any) -> Any: """``__import__`` replacement that only allows whitelisted modules.""" top_level = name.split(".")[0] if name in ALLOWED_MODULES or top_level in ALLOWED_MODULES: return __import__(name, *args, **kwargs) raise ImportError( f"Module '{name}' is not allowed in the sandbox. " f"Allowed top-level modules: " f"{', '.join(sorted({m.split('.')[0] for m in ALLOWED_MODULES}))}" ) def _truncate(self, text: str) -> str: if len(text) <= self.max_output_chars: return text return text[: self.max_output_chars] + "\n... [output truncated]" @staticmethod def _preview(value: Any, max_len: int = 300) -> str: """Generate a short string preview of *value*.""" try: if isinstance(value, pd.DataFrame): return f"DataFrame(shape={value.shape}, cols={list(value.columns[:5])})" if isinstance(value, pd.Series): return f"Series(len={len(value)}, dtype={value.dtype})" if isinstance(value, np.ndarray): return f"ndarray(shape={value.shape}, dtype={value.dtype})" s = repr(value) return s[:max_len] if len(s) > max_len else s except Exception: return ""