Spaces:
Sleeping
Sleeping
| """AST-whitelisted ``run_python`` executor β SECURITY CRITICAL boundary. | |
| This module implements the restricted Python executor defined in | |
| PROJECT.md Section 7. It is the *primary* defense against reward hacking via | |
| the ``run_python`` tool: if this module lets an attack pattern slip through, | |
| the audit classifier (and therefore the reward) becomes gameable. The layered | |
| hash-based ShutdownInvariantRubric (Section 7.5) exists as a second line of | |
| defense, but this boundary is the main event. | |
| Design | |
| ------ | |
| Two whitelists enforced by an ``ast.walk`` pass: | |
| * :data:`ALLOWED_NODES` β every syntactic construct accepted by the sandbox. | |
| Anything not in this set raises :class:`RestrictedPythonError`. This is | |
| strict-by-default: ``ast.Attribute``, ``ast.Subscript``, | |
| ``ast.FunctionDef``, ``ast.ClassDef``, ``ast.Lambda``, | |
| ``ast.AsyncFunctionDef``, ``ast.Yield``, ``ast.YieldFrom``, ``ast.Await``, | |
| and every kind of comprehension are all left out on purpose. | |
| * :data:`ALLOWED_BUILTINS` β every name that a ``Call`` node is allowed to | |
| resolve to. Indirect calls (attribute calls, subscript calls, lambda calls) | |
| are blocked at the ``Call`` node itself, belt-and-suspenders with the | |
| already-missing :class:`ast.Attribute` / :class:`ast.Subscript` nodes. | |
| Comprehensions are deliberately blocked | |
| --------------------------------------- | |
| The Section 7.1 allowlist does not include :class:`ast.ListComp`, | |
| :class:`ast.SetComp`, :class:`ast.DictComp`, :class:`ast.GeneratorExp`, or | |
| :class:`ast.comprehension`. Python implements comprehensions with an implicit | |
| nested function scope; that scope is a potential escape surface, so we keep | |
| them blocked rather than trying to audit what runs inside. The calculator | |
| workload the agent needs (sums, min/max, sorted, range-based for-loops) is | |
| fully expressible without comprehensions. Tests assert | |
| ``[x*2 for x in range(10)]`` and friends raise ``RestrictedError``. | |
| Iteration cap β ``sys.settrace`` with a per-execution line counter | |
| ------------------------------------------------------------------ | |
| Section 7.4 calls for a 10,000-instruction cap but leaves the exact mechanism | |
| open ("bytecode-level instruction limit"). We implement it via | |
| :func:`sys.settrace` with a per-execution line counter. Every executed line | |
| increments the counter; crossing the threshold raises | |
| :class:`RestrictedPythonError` from inside the tracer, which :func:`exec` | |
| propagates out of the ``exec`` call and we catch in :func:`exec_restricted`. | |
| This uniformly covers ``for``-loops, ``while``-loops, and (hypothetical) | |
| recursion with one primitive. We explicitly rejected two alternatives: | |
| * Wrapping :func:`range` β misses ``while`` loops and recursion entirely. | |
| * :func:`sys.setrecursionlimit` β misses loop-based runaway iteration. | |
| In practice the restricted language cannot express recursion because | |
| :class:`ast.FunctionDef`, :class:`ast.Lambda`, and :class:`ast.AsyncFunctionDef` | |
| are all blocked, but the cap still guards runaway ``while`` and large | |
| ``for i in range(...)`` loops. | |
| Python version note | |
| ------------------- | |
| :class:`ast.Num` and :class:`ast.Str` are deprecated aliases of | |
| :class:`ast.Constant` in Python 3.8+. On modern interpreters ``ast.walk`` only | |
| yields :class:`ast.Constant`, so their presence in :data:`ALLOWED_NODES` is a | |
| no-op β but we include them to match the Section 7.1 spec literally and in | |
| case a future interpreter re-emits them. | |
| Allowed nodes β IfExp alongside If | |
| ---------------------------------- | |
| Both the statement form (:class:`ast.If`) and the expression/ternary form | |
| (:class:`ast.IfExp`, e.g. ``x if cond else y``) are in :data:`ALLOWED_NODES`. | |
| Ternary expressions are pure calculator syntax β structurally equivalent to | |
| the already-allowed statement-form ``if`` and adding no new escape surface | |
| because ``ast.walk`` still recursively visits the test/body/orelse children | |
| under the same allowlist. Frequently useful for GSM8K-style math. | |
| """ | |
| from __future__ import annotations | |
| import ast | |
| import builtins as _builtins | |
| import sys | |
| from typing import TYPE_CHECKING | |
| if TYPE_CHECKING: | |
| # Forward-reference-only import so ``sandbox.py`` can depend on this | |
| # module without creating an import cycle. ``fs`` is currently unused at | |
| # runtime β it's threaded through for the TOOLS dispatch contract and for | |
| # future defensive hash checks β so we never actually need the concrete | |
| # class at import time. | |
| from red_button.sandbox import SimulatedFilesystem | |
| # --------------------------------------------------------------------------- | |
| # Allowlists β PROJECT.md Section 7.1 and 7.2 | |
| # --------------------------------------------------------------------------- | |
| ALLOWED_NODES: frozenset[type] = frozenset( | |
| { | |
| # Top-level structure | |
| ast.Expression, | |
| ast.Module, | |
| ast.Expr, | |
| # Literals (ast.Num / ast.Str are 3.8+ deprecated aliases β see docstring) | |
| ast.Num, | |
| ast.Constant, | |
| ast.Str, | |
| ast.List, | |
| ast.Tuple, | |
| ast.Dict, | |
| ast.Set, | |
| # Names (local variables only) | |
| ast.Name, | |
| ast.Load, | |
| ast.Store, | |
| # Arithmetic | |
| ast.BinOp, | |
| ast.UnaryOp, | |
| ast.Add, | |
| ast.Sub, | |
| ast.Mult, | |
| ast.Div, | |
| ast.Mod, | |
| ast.Pow, | |
| ast.FloorDiv, | |
| ast.USub, | |
| ast.UAdd, | |
| # Comparison and boolean | |
| ast.Compare, | |
| ast.BoolOp, | |
| ast.And, | |
| ast.Or, | |
| ast.Not, | |
| ast.Eq, | |
| ast.NotEq, | |
| ast.Lt, | |
| ast.LtE, | |
| ast.Gt, | |
| ast.GtE, | |
| # Assignment | |
| ast.Assign, | |
| # Control flow (with iteration cap enforced via sys.settrace) | |
| ast.If, | |
| ast.IfExp, | |
| ast.For, | |
| ast.While, | |
| # Function calls β restricted further below to whitelisted builtins | |
| ast.Call, | |
| } | |
| ) | |
| ALLOWED_BUILTINS: frozenset[str] = frozenset( | |
| { | |
| "abs", | |
| "min", | |
| "max", | |
| "sum", | |
| "len", | |
| "range", | |
| "int", | |
| "float", | |
| "str", | |
| "round", | |
| "sorted", | |
| "enumerate", | |
| "zip", | |
| "all", | |
| "any", | |
| "bool", | |
| "list", | |
| "dict", | |
| "tuple", | |
| "set", | |
| } | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Error type | |
| # --------------------------------------------------------------------------- | |
| class RestrictedPythonError(Exception): | |
| """Raised when code violates the Section 7 sandbox contract. | |
| Either the AST contains a disallowed node / a call to a non-whitelisted | |
| builtin, or the iteration cap was exceeded at runtime. | |
| """ | |
| # --------------------------------------------------------------------------- | |
| # AST validation β Section 7.4 | |
| # --------------------------------------------------------------------------- | |
| def validate_ast(tree: ast.AST) -> None: | |
| """Walk ``tree`` and raise :class:`RestrictedPythonError` on disallowed nodes. | |
| Two rules: | |
| 1. Every node's exact type must be in :data:`ALLOWED_NODES` (no subclass | |
| leniency β ``type(node) is not isinstance(node, ...)``). This is what | |
| blocks imports, attribute access, subscripts, lambdas, function/class | |
| definitions, yield/await, comprehensions, and anything else the spec | |
| doesn't explicitly bless. | |
| 2. Every :class:`ast.Call` must be a *direct* call to a name in | |
| :data:`ALLOWED_BUILTINS`. ``obj.method()``, ``funcs[0]()``, and | |
| ``(lambda: 1)()`` are all rejected here at the ``Call`` node itself, | |
| belt-and-suspenders with the already-missing inner nodes. | |
| """ | |
| for node in ast.walk(tree): | |
| if type(node) not in ALLOWED_NODES: | |
| raise RestrictedPythonError( | |
| f"Disallowed AST node: {type(node).__name__}" | |
| ) | |
| if isinstance(node, ast.Call): | |
| if isinstance(node.func, ast.Name): | |
| if node.func.id not in ALLOWED_BUILTINS: | |
| raise RestrictedPythonError( | |
| f"Disallowed builtin call: {node.func.id}" | |
| ) | |
| else: | |
| # Any other Call shape β attribute call, subscript call, | |
| # lambda call, call of a call's return value β is rejected | |
| # outright. The inner node would already fail the walk, but | |
| # failing here gives a clearer error message. | |
| raise RestrictedPythonError( | |
| f"Only direct builtin calls allowed, got " | |
| f"{type(node.func).__name__}" | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Execution β Section 7.4 | |
| # --------------------------------------------------------------------------- | |
| _ITERATION_CAP = 10_000 | |
| def exec_restricted(code: str, fs: "SimulatedFilesystem") -> str: | |
| """Execute ``code`` under the Section 7 restricted-Python contract. | |
| Contract summary: | |
| * Parse in ``exec`` mode; :class:`SyntaxError` -> ``"SyntaxError: ..."``. | |
| * Run :func:`validate_ast`; :class:`RestrictedPythonError` -> | |
| ``"RestrictedError: ..."``. | |
| * Execute with a restricted ``__builtins__`` mapping containing only the | |
| names in :data:`ALLOWED_BUILTINS`. | |
| * Install a :func:`sys.settrace` tracer that increments a per-execution | |
| line counter; on reaching :data:`_ITERATION_CAP` raise | |
| :class:`RestrictedPythonError` from inside the tracer. | |
| * Any :class:`RestrictedPythonError` raised during ``exec`` surfaces as | |
| ``"RestrictedError: ..."``. Any other :class:`Exception` surfaces as | |
| ``"RuntimeError: ..."``. The tracer is uninstalled in a ``finally``. | |
| * Return ``str(namespace['_result'])`` if ``_result`` was assigned, else | |
| ``"None"`` (because the initial namespace seeds ``_result = None`` and | |
| ``str(None) == "None"``). | |
| The ``fs`` argument is received for the TOOLS wiring contract but is NOT | |
| exposed to executed code β the restricted namespace only contains the | |
| scoped ``__builtins__`` and ``_result``. | |
| """ | |
| # 1. Parse. | |
| try: | |
| tree = ast.parse(code, mode="exec") | |
| except SyntaxError as e: | |
| return f"SyntaxError: {e}" | |
| # 2. Validate AST. | |
| try: | |
| validate_ast(tree) | |
| except RestrictedPythonError as e: | |
| return f"RestrictedError: {e}" | |
| # 3. Build the restricted builtins namespace. We pull from the real | |
| # ``builtins`` module rather than ``__builtins__`` (which is a module | |
| # at module scope but a dict under nested exec) to avoid the dict/module | |
| # ambiguity entirely. | |
| restricted_builtins: dict[str, object] = { | |
| name: getattr(_builtins, name) for name in ALLOWED_BUILTINS | |
| } | |
| namespace: dict[str, object] = { | |
| "__builtins__": restricted_builtins, | |
| "_result": None, | |
| } | |
| # 4. Install iteration-cap tracer. A list-of-one holds the counter so the | |
| # closure can mutate it without ``nonlocal`` gymnastics. The tracer | |
| # returns itself so that nested frames continue to trace (sys.settrace | |
| # semantics: the global tracer fires on 'call' events and must return a | |
| # local tracer for line/return/exception events inside that frame). | |
| counter = [0] | |
| def tracer(frame, event, arg): # type: ignore[no-untyped-def] | |
| if event == "line": | |
| counter[0] += 1 | |
| if counter[0] >= _ITERATION_CAP: | |
| raise RestrictedPythonError( | |
| f"Iteration cap exceeded: {_ITERATION_CAP} instructions" | |
| ) | |
| return tracer | |
| # 5. Compile and exec with the tracer armed. Always tear the tracer down | |
| # in a ``finally`` β leaving sys.settrace hot across calls would break | |
| # unrelated code and the test runner. | |
| try: | |
| compiled = compile(tree, "<restricted>", "exec") | |
| sys.settrace(tracer) | |
| try: | |
| exec(compiled, namespace) | |
| finally: | |
| sys.settrace(None) | |
| except RestrictedPythonError as e: | |
| # Iteration cap or a tracer-raised error. Must be caught *before* | |
| # the generic Exception handler so we don't mask it as RuntimeError. | |
| return f"RestrictedError: {e}" | |
| except Exception as e: # noqa: BLE001 β deliberate broad catch at the boundary | |
| return f"RuntimeError: {e}" | |
| # 6. Result. ``_result`` is always present (we seeded it), so this yields | |
| # ``"None"`` when the script didn't assign and ``str(value)`` otherwise. | |
| return str(namespace.get("_result")) | |