File size: 11,756 Bytes
fede53c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
"""Sandboxed Python code executor for ODSE.

Executes agent-written code in a restricted namespace with:
- Whitelisted imports (pandas, numpy, sklearn, scipy, etc.)
- Per-execution time limits
- Captured stdout / stderr
- Persistent namespace across calls (notebook-kernel semantics)
"""

from __future__ import annotations

import builtins
import io
import signal
import time
import traceback
from contextlib import redirect_stderr, redirect_stdout
from typing import Any, Callable, Dict, List, Optional

import numpy as np
import pandas as pd

from .models import ExecutionStatus, VariableInfo


# ============================================================================
# Security: allowed imports and blocked builtins
# ============================================================================

ALLOWED_MODULES: set[str] = {
    # Core data-science stack
    "numpy", "pandas", "sklearn", "scipy",
    "math", "statistics",
    # Standard-library utilities
    "collections", "itertools", "functools",
    "re", "json", "copy", "typing", "operator",
    "datetime", "time", "warnings",
    # sklearn sub-packages (non-exhaustive, top-level covers them)
    "sklearn.linear_model", "sklearn.ensemble", "sklearn.tree",
    "sklearn.svm", "sklearn.neighbors", "sklearn.naive_bayes",
    "sklearn.preprocessing", "sklearn.model_selection",
    "sklearn.metrics", "sklearn.pipeline", "sklearn.impute",
    "sklearn.decomposition", "sklearn.cluster",
    "sklearn.feature_selection", "sklearn.feature_extraction",
    # scipy sub-packages
    "scipy.stats", "scipy.sparse", "scipy.optimize",
    # Optional extras
    "xgboost", "lightgbm", "catboost",
}

BLOCKED_BUILTINS: set[str] = {
    "exec", "eval", "compile",  # We provide safe alternatives
    "__import__",               # Replaced by _safe_import
    "open", "input",            # No file/terminal I/O
    "breakpoint", "exit", "quit",
}

# ============================================================================
# Execution result (internal data class)
# ============================================================================

class ExecutionResult:
    """Immutable result of a single code execution."""

    __slots__ = ("status", "stdout", "stderr", "execution_time_ms")

    def __init__(
        self,
        status: ExecutionStatus,
        stdout: str = "",
        stderr: str = "",
        execution_time_ms: float = 0.0,
    ) -> None:
        self.status = status
        self.stdout = stdout
        self.stderr = stderr
        self.execution_time_ms = execution_time_ms

# ============================================================================
# Sandbox executor
# ============================================================================

class _SandboxTimeout(Exception):
    """Raised when code execution exceeds the time limit."""


class SandboxExecutor:
    """Executes Python code in a sandboxed, persistent namespace.

    Simulates a Jupyter-notebook-style kernel: variables created in one
    ``execute()`` call are visible in subsequent calls.

    Parameters
    ----------
    timeout_seconds : float
        Maximum wall-clock time per ``execute()`` call.
    max_output_chars : int
        Stdout/stderr truncation threshold.
    """

    def __init__(
        self,
        timeout_seconds: float = 30.0,
        max_output_chars: int = 10_000,
    ) -> None:
        self.timeout_seconds = timeout_seconds
        self.max_output_chars = max_output_chars
        self._namespace: Dict[str, Any] = {}
        self._setup_done: bool = False

    # -- Properties ----------------------------------------------------------

    @property
    def namespace(self) -> Dict[str, Any]:
        """Direct (read-only) view of the sandbox namespace."""
        return self._namespace

    # -- Lifecycle -----------------------------------------------------------

    def setup_namespace(
        self,
        *,
        train_df: pd.DataFrame,
        val_features: pd.DataFrame,
        test_features: pd.DataFrame,
        target_column: str,
        evaluate_fn: Callable,
    ) -> None:
        """Initialise the sandbox namespace with pre-loaded variables."""
        self._namespace = {
            # Data
            "train_df": train_df.copy(),
            "val_features": val_features.copy(),
            "test_features": test_features.copy(),
            "target_column": target_column,
            # Libraries
            "pd": pd,
            "np": np,
            # Evaluation helper
            "evaluate": evaluate_fn,
            # print is captured via redirect_stdout
            "print": print,
        }
        self._namespace["__builtins__"] = self._make_safe_builtins()
        self._setup_done = True

    def reset(self) -> None:
        """Clear the namespace entirely."""
        self._namespace.clear()
        self._setup_done = False

    # -- Code execution ------------------------------------------------------

    def execute(self, code: str) -> ExecutionResult:
        """Execute *code* in the sandbox and return an ``ExecutionResult``."""
        if not self._setup_done:
            return ExecutionResult(
                status=ExecutionStatus.ERROR,
                stderr="Sandbox not initialised - call setup_namespace() first.",
            )

        stdout_buf = io.StringIO()
        stderr_buf = io.StringIO()
        start = time.perf_counter()

        try:
            with redirect_stdout(stdout_buf), redirect_stderr(stderr_buf):
                self._exec_with_timeout(code)

            elapsed = (time.perf_counter() - start) * 1000
            return ExecutionResult(
                status=ExecutionStatus.SUCCESS,
                stdout=self._truncate(stdout_buf.getvalue()),
                stderr=self._truncate(stderr_buf.getvalue()),
                execution_time_ms=elapsed,
            )

        except _SandboxTimeout as exc:
            elapsed = (time.perf_counter() - start) * 1000
            return ExecutionResult(
                status=ExecutionStatus.TIMEOUT,
                stdout=self._truncate(stdout_buf.getvalue()),
                stderr=str(exc),
                execution_time_ms=elapsed,
            )

        except Exception:
            elapsed = (time.perf_counter() - start) * 1000
            return ExecutionResult(
                status=ExecutionStatus.ERROR,
                stdout=self._truncate(stdout_buf.getvalue()),
                stderr=self._truncate(traceback.format_exc()),
                execution_time_ms=elapsed,
            )

    # -- Introspection -------------------------------------------------------

    def get_namespace_summary(self) -> List[VariableInfo]:
        """Return a summary of user-visible variables in the namespace."""
        # Variables injected by the environment that agents shouldn't inspect
        hidden = {
            "__builtins__", "pd", "np", "evaluate",
            "target_column", "print",
        }
        summary: List[VariableInfo] = []
        for name, value in self._namespace.items():
            if name.startswith("_") or name in hidden:
                continue
            summary.append(
                VariableInfo(
                    name=name,
                    type_name=type(value).__name__,
                    shape=getattr(value, "shape", None),
                    preview=self._preview(value),
                )
            )
        return summary

    def get_predictions(self) -> Optional[np.ndarray]:
        """Retrieve ``predictions`` from the namespace (or ``None``)."""
        preds = self._namespace.get("predictions")
        if preds is None:
            return None
        try:
            return np.asarray(preds)
        except Exception:
            return None

    # -- Private helpers -----------------------------------------------------

    def _exec_with_timeout(self, code: str) -> None:
        """Compile and exec *code* with a timeout.

        Uses SIGALRM on the main thread (hard kill), and falls back to a
        threading.Timer + ctypes interrupt on worker threads (e.g. inside
        uvicorn).
        """
        import threading

        compiled = compile(code, "<sandbox>", "exec")
        is_main = threading.current_thread() is threading.main_thread()

        if is_main and hasattr(signal, "SIGALRM"):
            self._exec_with_sigalrm(compiled)
        else:
            self._exec_with_timer(compiled)

    def _exec_with_sigalrm(self, compiled: Any) -> None:
        """SIGALRM-based timeout (main thread only)."""
        def _alarm(signum, frame):  # noqa: ARG001
            raise _SandboxTimeout(
                f"Code execution exceeded {self.timeout_seconds}s time limit"
            )

        old_handler = signal.signal(signal.SIGALRM, _alarm)
        signal.alarm(int(self.timeout_seconds))
        try:
            exec(compiled, self._namespace)  # noqa: S102
        finally:
            signal.alarm(0)
            signal.signal(signal.SIGALRM, old_handler)

    def _exec_with_timer(self, compiled: Any) -> None:
        """threading.Timer-based timeout (works from any thread)."""
        import ctypes
        import threading

        tid = threading.current_thread().ident
        timed_out = False

        def _interrupt():
            nonlocal timed_out
            timed_out = True
            if tid is not None:
                ctypes.pythonapi.PyThreadState_SetAsyncExc(
                    ctypes.c_ulong(tid),
                    ctypes.py_object(KeyboardInterrupt),
                )

        timer = threading.Timer(self.timeout_seconds, _interrupt)
        timer.start()
        try:
            exec(compiled, self._namespace)  # noqa: S102
        except KeyboardInterrupt:
            if timed_out:
                raise _SandboxTimeout(
                    f"Code execution exceeded {self.timeout_seconds}s time limit"
                ) from None
            raise
        finally:
            timer.cancel()

    def _make_safe_builtins(self) -> Dict[str, Any]:
        """Build a restricted ``__builtins__`` dict."""
        safe: Dict[str, Any] = {}
        for name in dir(builtins):
            if name not in BLOCKED_BUILTINS:
                safe[name] = getattr(builtins, name)
        # Provide a guarded import
        safe["__import__"] = self._safe_import
        return safe

    def _safe_import(self, name: str, *args: Any, **kwargs: Any) -> Any:
        """``__import__`` replacement that only allows whitelisted modules."""
        top_level = name.split(".")[0]
        if name in ALLOWED_MODULES or top_level in ALLOWED_MODULES:
            return __import__(name, *args, **kwargs)
        raise ImportError(
            f"Module '{name}' is not allowed in the sandbox. "
            f"Allowed top-level modules: "
            f"{', '.join(sorted({m.split('.')[0] for m in ALLOWED_MODULES}))}"
        )

    def _truncate(self, text: str) -> str:
        if len(text) <= self.max_output_chars:
            return text
        return text[: self.max_output_chars] + "\n... [output truncated]"

    @staticmethod
    def _preview(value: Any, max_len: int = 300) -> str:
        """Generate a short string preview of *value*."""
        try:
            if isinstance(value, pd.DataFrame):
                return f"DataFrame(shape={value.shape}, cols={list(value.columns[:5])})"
            if isinstance(value, pd.Series):
                return f"Series(len={len(value)}, dtype={value.dtype})"
            if isinstance(value, np.ndarray):
                return f"ndarray(shape={value.shape}, dtype={value.dtype})"
            s = repr(value)
            return s[:max_len] if len(s) > max_len else s
        except Exception:
            return "<unprintable>"