Spaces:
Sleeping
Sleeping
| """Tests for red_button.restricted_python per PROJECT.md Section 7. | |
| The AST-whitelisted ``exec_restricted`` is THE security boundary for | |
| ``run_python``. Every test here represents either an attack pattern that must | |
| be blocked, a valid calculator operation that must succeed, or a | |
| structural/edge-case assertion about the error-string contract. | |
| Each attack is exercised through :func:`exec_restricted` (the real boundary) | |
| unless the test explicitly targets :func:`validate_ast` for clarity. The | |
| assertion shape is either: | |
| * ``result.startswith("RestrictedError")`` for AST-level or | |
| iteration-cap violations, or | |
| * ``result.startswith("SyntaxError")`` for genuinely malformed source, or | |
| * ``result.startswith("RuntimeError")`` for Python-level exceptions inside | |
| otherwise-valid restricted code (e.g. ``1/0``). | |
| """ | |
| from __future__ import annotations | |
| import ast | |
| import pytest | |
| from red_button.restricted_python import ( | |
| ALLOWED_BUILTINS, | |
| ALLOWED_NODES, | |
| RestrictedPythonError, | |
| exec_restricted, | |
| validate_ast, | |
| ) | |
| from red_button.sandbox import TOOLS, SimulatedFilesystem | |
| # --------------------------------------------------------------------------- | |
| # Fixture β a fresh SimulatedFilesystem per test. exec_restricted never | |
| # touches fs, but the signature requires it. | |
| # --------------------------------------------------------------------------- | |
| def fs() -> SimulatedFilesystem: | |
| return SimulatedFilesystem() | |
| # --------------------------------------------------------------------------- | |
| # Allowlist shape β sanity checks that the module exports what Section 7 lists. | |
| # --------------------------------------------------------------------------- | |
| def test_allowed_builtins_matches_section_7_2() -> None: | |
| assert ALLOWED_BUILTINS == frozenset( | |
| { | |
| "abs", "min", "max", "sum", "len", "range", "int", "float", | |
| "str", "round", "sorted", "enumerate", "zip", "all", "any", | |
| "bool", "list", "dict", "tuple", "set", | |
| } | |
| ) | |
| def test_allowed_nodes_includes_required_control_flow() -> None: | |
| # Spot-check: control flow and call must be present. | |
| assert ast.If in ALLOWED_NODES | |
| assert ast.For in ALLOWED_NODES | |
| assert ast.While in ALLOWED_NODES | |
| assert ast.Call in ALLOWED_NODES | |
| def test_allowed_nodes_deliberately_excludes_attribute() -> None: | |
| assert ast.Attribute not in ALLOWED_NODES | |
| def test_allowed_nodes_deliberately_excludes_subscript() -> None: | |
| assert ast.Subscript not in ALLOWED_NODES | |
| def test_allowed_nodes_deliberately_excludes_comprehensions() -> None: | |
| for node_type in (ast.ListComp, ast.SetComp, ast.DictComp, ast.GeneratorExp): | |
| assert node_type not in ALLOWED_NODES | |
| def test_allowed_nodes_deliberately_excludes_function_definitions() -> None: | |
| for node_type in (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef, ast.Lambda): | |
| assert node_type not in ALLOWED_NODES | |
| def test_allowed_nodes_deliberately_excludes_yield_await() -> None: | |
| for node_type in (ast.Yield, ast.YieldFrom, ast.Await): | |
| assert node_type not in ALLOWED_NODES | |
| # --------------------------------------------------------------------------- | |
| # Import attacks β all must block. | |
| # --------------------------------------------------------------------------- | |
| def test_import_os_is_blocked(fs: SimulatedFilesystem) -> None: | |
| result = exec_restricted("import os", fs) | |
| assert result.startswith("RestrictedError"), result | |
| def test_import_sys_is_blocked(fs: SimulatedFilesystem) -> None: | |
| result = exec_restricted("import sys", fs) | |
| assert result.startswith("RestrictedError"), result | |
| def test_from_import_is_blocked(fs: SimulatedFilesystem) -> None: | |
| result = exec_restricted("from os import path", fs) | |
| assert result.startswith("RestrictedError"), result | |
| def test_from_dotted_import_is_blocked(fs: SimulatedFilesystem) -> None: | |
| result = exec_restricted("from os.path import join", fs) | |
| assert result.startswith("RestrictedError"), result | |
| def test_dunder_import_call_is_blocked(fs: SimulatedFilesystem) -> None: | |
| # __import__ is a valid Name; the Call-allowlist must reject it because | |
| # "__import__" isn't in ALLOWED_BUILTINS. | |
| result = exec_restricted("__import__('os')", fs) | |
| assert result.startswith("RestrictedError"), result | |
| # --------------------------------------------------------------------------- | |
| # Attribute-access attacks β ast.Attribute is not in ALLOWED_NODES. | |
| # --------------------------------------------------------------------------- | |
| def test_attribute_access_is_blocked(fs: SimulatedFilesystem, code: str) -> None: | |
| result = exec_restricted(code, fs) | |
| assert result.startswith("RestrictedError"), result | |
| # --------------------------------------------------------------------------- | |
| # Builtin-escape attacks β not in ALLOWED_BUILTINS. | |
| # --------------------------------------------------------------------------- | |
| def test_disallowed_builtin_calls_are_blocked( | |
| fs: SimulatedFilesystem, code: str | |
| ) -> None: | |
| result = exec_restricted(code, fs) | |
| assert result.startswith("RestrictedError"), result | |
| # --------------------------------------------------------------------------- | |
| # Subscript-escape attacks β ast.Subscript is not in ALLOWED_NODES. | |
| # --------------------------------------------------------------------------- | |
| def test_subscript_access_to_builtins_is_blocked(fs: SimulatedFilesystem) -> None: | |
| result = exec_restricted("__builtins__['open']", fs) | |
| assert result.startswith("RestrictedError"), result | |
| def test_subscript_dunder_import_is_blocked(fs: SimulatedFilesystem) -> None: | |
| result = exec_restricted("__builtins__['__import__']", fs) | |
| assert result.startswith("RestrictedError"), result | |
| def test_subscript_is_blocked_even_for_benign_indexing( | |
| fs: SimulatedFilesystem, | |
| ) -> None: | |
| # Section 7.1 omits ast.Subscript. That means even ``[1,2,3][0]`` is | |
| # blocked β the calculator workload doesn't need indexing. Documented | |
| # as intentional in the module docstring. | |
| result = exec_restricted("_result = [1, 2, 3][0]", fs) | |
| assert result.startswith("RestrictedError"), result | |
| # --------------------------------------------------------------------------- | |
| # Function/class definition attacks β blocked at the definition node. | |
| # --------------------------------------------------------------------------- | |
| def test_function_def_is_blocked(fs: SimulatedFilesystem) -> None: | |
| result = exec_restricted("def f(): pass", fs) | |
| assert result.startswith("RestrictedError"), result | |
| def test_class_def_is_blocked(fs: SimulatedFilesystem) -> None: | |
| result = exec_restricted("class C: pass", fs) | |
| assert result.startswith("RestrictedError"), result | |
| def test_bare_lambda_is_blocked(fs: SimulatedFilesystem) -> None: | |
| result = exec_restricted("lambda: 1", fs) | |
| assert result.startswith("RestrictedError"), result | |
| def test_parameterized_lambda_is_blocked(fs: SimulatedFilesystem) -> None: | |
| result = exec_restricted("lambda x: x + 1", fs) | |
| assert result.startswith("RestrictedError"), result | |
| def test_async_def_is_blocked(fs: SimulatedFilesystem) -> None: | |
| result = exec_restricted("async def f(): pass", fs) | |
| assert result.startswith("RestrictedError"), result | |
| # --------------------------------------------------------------------------- | |
| # Control-flow attacks β yield / yield from / await at module scope. | |
| # | |
| # In Python 3.11, ``yield 1``, ``yield from []``, and ``await x`` all parse | |
| # successfully at module level; AST walk then yields ``Yield`` / ``YieldFrom`` | |
| # / ``Await`` nodes which are NOT in ALLOWED_NODES, so we expect | |
| # ``RestrictedError`` (not ``SyntaxError``). | |
| # --------------------------------------------------------------------------- | |
| def test_yield_is_blocked(fs: SimulatedFilesystem) -> None: | |
| result = exec_restricted("yield 1", fs) | |
| assert result.startswith("RestrictedError"), result | |
| def test_yield_from_is_blocked(fs: SimulatedFilesystem) -> None: | |
| result = exec_restricted("yield from []", fs) | |
| assert result.startswith("RestrictedError"), result | |
| def test_await_is_blocked(fs: SimulatedFilesystem) -> None: | |
| result = exec_restricted("await x", fs) | |
| assert result.startswith("RestrictedError"), result | |
| # --------------------------------------------------------------------------- | |
| # Iteration cap β sys.settrace fires on runaway loops. | |
| # --------------------------------------------------------------------------- | |
| def test_iteration_cap_fires_on_while_true(fs: SimulatedFilesystem) -> None: | |
| result = exec_restricted("while True:\n _result = 1", fs) | |
| assert result.startswith("RestrictedError"), result | |
| assert "Iteration cap exceeded" in result | |
| def test_iteration_cap_fires_on_large_range(fs: SimulatedFilesystem) -> None: | |
| result = exec_restricted( | |
| "total = 0\nfor i in range(1000000):\n total = total + 1\n_result = total", | |
| fs, | |
| ) | |
| assert result.startswith("RestrictedError"), result | |
| assert "Iteration cap exceeded" in result | |
| def test_recursion_is_structurally_impossible(fs: SimulatedFilesystem) -> None: | |
| # The restricted language can't express recursion because FunctionDef and | |
| # Lambda are both blocked. The test documents this: any attempt to define | |
| # a recursive function is rejected at the definition node. | |
| result = exec_restricted("def f():\n f()\nf()", fs) | |
| assert result.startswith("RestrictedError"), result | |
| # --------------------------------------------------------------------------- | |
| # Valid calculator operations β must succeed. | |
| # --------------------------------------------------------------------------- | |
| def test_bare_expression_without_result_returns_string_none( | |
| fs: SimulatedFilesystem, | |
| ) -> None: | |
| # ``_result`` is seeded to None in the namespace, so a script that never | |
| # assigns to it surfaces as the string "None" (str(None)). | |
| assert exec_restricted("1 + 2", fs) == "None" | |
| def test_simple_assignment_to_result(fs: SimulatedFilesystem) -> None: | |
| assert exec_restricted("_result = 1 + 2", fs) == "3" | |
| def test_sum_builtin(fs: SimulatedFilesystem) -> None: | |
| assert exec_restricted("_result = sum([1, 2, 3])", fs) == "6" | |
| def test_max_builtin(fs: SimulatedFilesystem) -> None: | |
| assert exec_restricted("_result = max(1, 5, 3)", fs) == "5" | |
| def test_sorted_builtin(fs: SimulatedFilesystem) -> None: | |
| assert exec_restricted("_result = sorted([3, 1, 2])", fs) == "[1, 2, 3]" | |
| def test_conditional_expression(fs: SimulatedFilesystem) -> None: | |
| # Ternary expressions (ast.IfExp) are allowed alongside ast.If β pure | |
| # calculator syntax, no new escape surface. | |
| assert exec_restricted("_result = 5 if 1 > 0 else 10", fs) == "5" | |
| def test_conditional_expression_string_branch(fs: SimulatedFilesystem) -> None: | |
| assert exec_restricted("_result = 'big' if 10 > 5 else 'small'", fs) == "big" | |
| def test_conditional_expression_recursively_validates_children( | |
| fs: SimulatedFilesystem, | |
| ) -> None: | |
| # Allowing ast.IfExp does NOT bypass child validation β the disallowed | |
| # ``open`` call inside the ternary is still caught by ast.walk. | |
| result = exec_restricted("_result = open('x') if 1 > 0 else 10", fs) | |
| assert result.startswith("RestrictedError"), result | |
| def test_for_loop_accumulator(fs: SimulatedFilesystem) -> None: | |
| code = "total = 0\nfor i in range(5):\n total = total + i\n_result = total" | |
| assert exec_restricted(code, fs) == "10" | |
| def test_arithmetic_precedence(fs: SimulatedFilesystem) -> None: | |
| assert exec_restricted("_result = (5 + 3) * 2 ** 2", fs) == "32" | |
| def test_while_loop_terminates_under_cap(fs: SimulatedFilesystem) -> None: | |
| code = "i = 0\nwhile i < 5:\n i = i + 1\n_result = i" | |
| assert exec_restricted(code, fs) == "5" | |
| def test_boolean_and_comparison(fs: SimulatedFilesystem) -> None: | |
| assert exec_restricted("_result = 1 < 2 and 3 > 2", fs) == "True" | |
| def test_list_and_tuple_literals(fs: SimulatedFilesystem) -> None: | |
| assert exec_restricted("_result = sum([1, 2, 3]) + len([4, 5])", fs) == "8" | |
| # --------------------------------------------------------------------------- | |
| # Comprehensions β BLOCKED (Section 7.1 omission). | |
| # --------------------------------------------------------------------------- | |
| def test_list_comprehension_is_blocked(fs: SimulatedFilesystem) -> None: | |
| result = exec_restricted("_result = [x * 2 for x in range(10)]", fs) | |
| assert result.startswith("RestrictedError"), result | |
| def test_set_comprehension_is_blocked(fs: SimulatedFilesystem) -> None: | |
| result = exec_restricted("_result = {x for x in range(5)}", fs) | |
| assert result.startswith("RestrictedError"), result | |
| def test_dict_comprehension_is_blocked(fs: SimulatedFilesystem) -> None: | |
| result = exec_restricted("_result = {x: x for x in range(5)}", fs) | |
| assert result.startswith("RestrictedError"), result | |
| def test_generator_expression_is_blocked(fs: SimulatedFilesystem) -> None: | |
| result = exec_restricted("_result = sum(x for x in range(5))", fs) | |
| assert result.startswith("RestrictedError"), result | |
| # --------------------------------------------------------------------------- | |
| # Edge cases on the input and error-string contract. | |
| # --------------------------------------------------------------------------- | |
| def test_empty_code_returns_string_none(fs: SimulatedFilesystem) -> None: | |
| assert exec_restricted("", fs) == "None" | |
| def test_whitespace_only_returns_string_none(fs: SimulatedFilesystem) -> None: | |
| assert exec_restricted(" \n\t ", fs) == "None" | |
| def test_comment_only_returns_string_none(fs: SimulatedFilesystem) -> None: | |
| assert exec_restricted("# hello", fs) == "None" | |
| def test_malformed_source_returns_syntax_error(fs: SimulatedFilesystem) -> None: | |
| result = exec_restricted("1 +", fs) | |
| assert result.startswith("SyntaxError"), result | |
| def test_runtime_error_from_zero_division(fs: SimulatedFilesystem) -> None: | |
| result = exec_restricted("_result = 1 / 0", fs) | |
| assert result.startswith("RuntimeError"), result | |
| # --------------------------------------------------------------------------- | |
| # validate_ast direct usage β confirm it raises rather than returning. | |
| # --------------------------------------------------------------------------- | |
| def test_validate_ast_passes_for_clean_tree() -> None: | |
| tree = ast.parse("_result = 1 + 2", mode="exec") | |
| validate_ast(tree) # must not raise | |
| def test_validate_ast_raises_on_attribute_access() -> None: | |
| tree = ast.parse("x.y", mode="exec") | |
| with pytest.raises(RestrictedPythonError): | |
| validate_ast(tree) | |
| def test_validate_ast_raises_on_disallowed_call() -> None: | |
| tree = ast.parse("open('x')", mode="exec") | |
| with pytest.raises(RestrictedPythonError): | |
| validate_ast(tree) | |
| def test_validate_ast_indirect_call_error_message_mentions_call_shape() -> None: | |
| # A call whose func is not an ast.Name (e.g. attribute call) must be | |
| # rejected with the "Only direct builtin calls allowed" message. Because | |
| # the ast.walk order visits parents before children and Attribute is | |
| # ALSO disallowed, the first raise is actually the Attribute node. Assert | |
| # we get *some* RestrictedPythonError either way. | |
| tree = ast.parse("[].append(1)", mode="exec") | |
| with pytest.raises(RestrictedPythonError): | |
| validate_ast(tree) | |
| # --------------------------------------------------------------------------- | |
| # Tracer cleanup β sys.settrace must be None after every exec, success or | |
| # failure. Leaking a tracer would wreck the test runner. | |
| # --------------------------------------------------------------------------- | |
| def test_tracer_uninstalled_after_success(fs: SimulatedFilesystem) -> None: | |
| import sys as _sys | |
| exec_restricted("_result = 1 + 1", fs) | |
| assert _sys.gettrace() is None | |
| def test_tracer_uninstalled_after_iteration_cap(fs: SimulatedFilesystem) -> None: | |
| import sys as _sys | |
| exec_restricted("while True:\n _result = 1", fs) | |
| assert _sys.gettrace() is None | |
| def test_tracer_uninstalled_after_runtime_error(fs: SimulatedFilesystem) -> None: | |
| import sys as _sys | |
| exec_restricted("_result = 1 / 0", fs) | |
| assert _sys.gettrace() is None | |
| # --------------------------------------------------------------------------- | |
| # TOOLS wiring β run_python is callable and routes through exec_restricted. | |
| # --------------------------------------------------------------------------- | |
| def test_tools_run_python_is_callable() -> None: | |
| assert callable(TOOLS["run_python"]) | |
| def test_tools_run_python_computes_result(fs: SimulatedFilesystem) -> None: | |
| assert TOOLS["run_python"](fs, "_result = 1 + 1") == "2" | |
| def test_tools_run_python_blocks_import(fs: SimulatedFilesystem) -> None: | |
| result = TOOLS["run_python"](fs, "import os") | |
| assert "RestrictedError" in result | |
| def test_tools_run_python_blocks_open(fs: SimulatedFilesystem) -> None: | |
| result = TOOLS["run_python"](fs, "open('/etc/passwd')") | |
| assert "RestrictedError" in result | |
| def test_tools_submit_answer_is_still_none_placeholder() -> None: | |
| assert TOOLS["submit_answer"] is None | |