| |
| """Tests for execute_code's strict / project execution modes. |
| |
| The mode switch controls two things: |
| - working directory: staging tmpdir (strict) vs session CWD (project) |
| - interpreter: sys.executable (strict) vs active venv's python (project) |
| |
| Security-critical invariants — env scrubbing, tool whitelist, resource caps — |
| must apply identically in both modes. These tests guard all three layers. |
| |
| Mode is sourced exclusively from ``code_execution.mode`` in config.yaml — |
| there is no env-var override. Tests patch ``_load_config`` directly. |
| """ |
|
|
| import json |
| import os |
| import sys |
| import unittest |
| from contextlib import contextmanager |
| from unittest.mock import patch |
|
|
| import pytest |
|
|
| os.environ["TERMINAL_ENV"] = "local" |
|
|
|
|
| @pytest.fixture(autouse=True) |
| def _force_local_terminal(monkeypatch): |
| """Mirror test_code_execution.py — guarantee local backend under xdist.""" |
| monkeypatch.setenv("TERMINAL_ENV", "local") |
|
|
|
|
| from tools.code_execution_tool import ( |
| SANDBOX_ALLOWED_TOOLS, |
| DEFAULT_EXECUTION_MODE, |
| EXECUTION_MODES, |
| _get_execution_mode, |
| _is_usable_python, |
| _resolve_child_cwd, |
| _resolve_child_python, |
| build_execute_code_schema, |
| execute_code, |
| ) |
|
|
|
|
| @contextmanager |
| def _mock_mode(mode): |
| """Context manager that pins code_execution.mode to the given value.""" |
| with patch("tools.code_execution_tool._load_config", |
| return_value={"mode": mode}): |
| yield |
|
|
|
|
| def _mock_handle_function_call(function_name, function_args, task_id=None, user_task=None): |
| """Minimal mock dispatcher reused across tests.""" |
| if function_name == "terminal": |
| return json.dumps({"output": "mock", "exit_code": 0}) |
| if function_name == "read_file": |
| return json.dumps({"content": "line1\n", "total_lines": 1}) |
| return json.dumps({"error": f"Unknown tool: {function_name}"}) |
|
|
|
|
| |
| |
| |
|
|
| class TestGetExecutionMode(unittest.TestCase): |
| """_get_execution_mode reads config.yaml only (no env var surface).""" |
|
|
| def test_default_is_project(self): |
| self.assertEqual(DEFAULT_EXECUTION_MODE, "project") |
|
|
| def test_config_project(self): |
| with patch("tools.code_execution_tool._load_config", |
| return_value={"mode": "project"}): |
| self.assertEqual(_get_execution_mode(), "project") |
|
|
| def test_config_strict(self): |
| with patch("tools.code_execution_tool._load_config", |
| return_value={"mode": "strict"}): |
| self.assertEqual(_get_execution_mode(), "strict") |
|
|
| def test_config_case_insensitive(self): |
| with patch("tools.code_execution_tool._load_config", |
| return_value={"mode": "STRICT"}): |
| self.assertEqual(_get_execution_mode(), "strict") |
|
|
| def test_config_strips_whitespace(self): |
| with patch("tools.code_execution_tool._load_config", |
| return_value={"mode": " project "}): |
| self.assertEqual(_get_execution_mode(), "project") |
|
|
| def test_empty_config_falls_back_to_default(self): |
| with patch("tools.code_execution_tool._load_config", return_value={}): |
| self.assertEqual(_get_execution_mode(), DEFAULT_EXECUTION_MODE) |
|
|
| def test_bogus_config_falls_back_to_default(self): |
| with patch("tools.code_execution_tool._load_config", |
| return_value={"mode": "banana"}): |
| self.assertEqual(_get_execution_mode(), DEFAULT_EXECUTION_MODE) |
|
|
| def test_none_config_falls_back_to_default(self): |
| with patch("tools.code_execution_tool._load_config", |
| return_value={"mode": None}): |
| |
| self.assertEqual(_get_execution_mode(), DEFAULT_EXECUTION_MODE) |
|
|
| def test_execution_modes_tuple(self): |
| """Canonical set of modes — tests + config layer rely on this shape.""" |
| self.assertEqual(set(EXECUTION_MODES), {"project", "strict"}) |
|
|
|
|
| |
| |
| |
|
|
| class TestResolveChildPython(unittest.TestCase): |
| """_resolve_child_python — picks the right interpreter per mode.""" |
|
|
| def test_strict_always_sys_executable(self): |
| """Strict mode never leaves sys.executable, even if venv is set.""" |
| with patch.dict(os.environ, {"VIRTUAL_ENV": "/some/venv"}): |
| self.assertEqual(_resolve_child_python("strict"), sys.executable) |
|
|
| def test_project_with_no_venv_falls_back(self): |
| """Project mode without VIRTUAL_ENV or CONDA_PREFIX → sys.executable.""" |
| env = {k: v for k, v in os.environ.items() |
| if k not in ("VIRTUAL_ENV", "CONDA_PREFIX")} |
| with patch.dict(os.environ, env, clear=True): |
| self.assertEqual(_resolve_child_python("project"), sys.executable) |
|
|
| def test_project_with_virtualenv_picks_venv_python(self): |
| """Project mode + VIRTUAL_ENV pointing at a real venv → that python.""" |
| import tempfile, pathlib |
| with tempfile.TemporaryDirectory() as td: |
| fake_venv = pathlib.Path(td) |
| (fake_venv / "bin").mkdir() |
| |
| (fake_venv / "bin" / "python").symlink_to(sys.executable) |
| with patch.dict(os.environ, {"VIRTUAL_ENV": str(fake_venv)}): |
| |
| _is_usable_python.cache_clear() |
| result = _resolve_child_python("project") |
| self.assertEqual(result, str(fake_venv / "bin" / "python")) |
|
|
| def test_project_with_broken_venv_falls_back(self): |
| """VIRTUAL_ENV set but bin/python missing → sys.executable.""" |
| import tempfile |
| with tempfile.TemporaryDirectory() as td: |
| |
| with patch.dict(os.environ, {"VIRTUAL_ENV": td}): |
| _is_usable_python.cache_clear() |
| self.assertEqual(_resolve_child_python("project"), sys.executable) |
|
|
| def test_project_prefers_virtualenv_over_conda(self): |
| """If both VIRTUAL_ENV and CONDA_PREFIX are set, VIRTUAL_ENV wins.""" |
| import tempfile, pathlib |
| with tempfile.TemporaryDirectory() as ve_td, tempfile.TemporaryDirectory() as conda_td: |
| ve = pathlib.Path(ve_td) |
| (ve / "bin").mkdir() |
| (ve / "bin" / "python").symlink_to(sys.executable) |
|
|
| conda = pathlib.Path(conda_td) |
| (conda / "bin").mkdir() |
| (conda / "bin" / "python").symlink_to(sys.executable) |
|
|
| with patch.dict(os.environ, {"VIRTUAL_ENV": str(ve), "CONDA_PREFIX": str(conda)}): |
| _is_usable_python.cache_clear() |
| result = _resolve_child_python("project") |
| self.assertEqual(result, str(ve / "bin" / "python")) |
|
|
| def test_is_usable_python_rejects_nonexistent(self): |
| _is_usable_python.cache_clear() |
| self.assertFalse(_is_usable_python("/does/not/exist/python")) |
|
|
| def test_is_usable_python_accepts_real_python(self): |
| _is_usable_python.cache_clear() |
| self.assertTrue(_is_usable_python(sys.executable)) |
|
|
|
|
| |
| |
| |
|
|
| class TestResolveChildCwd(unittest.TestCase): |
|
|
| def test_strict_uses_staging_dir(self): |
| self.assertEqual(_resolve_child_cwd("strict", "/tmp/staging"), "/tmp/staging") |
|
|
| def test_project_without_terminal_cwd_uses_getcwd(self): |
| env = {k: v for k, v in os.environ.items() if k != "TERMINAL_CWD"} |
| with patch.dict(os.environ, env, clear=True): |
| self.assertEqual(_resolve_child_cwd("project", "/tmp/staging"), os.getcwd()) |
|
|
| def test_project_uses_terminal_cwd_when_set(self): |
| import tempfile |
| with tempfile.TemporaryDirectory() as td: |
| with patch.dict(os.environ, {"TERMINAL_CWD": td}): |
| self.assertEqual(_resolve_child_cwd("project", "/tmp/staging"), td) |
|
|
| def test_project_bogus_terminal_cwd_falls_back_to_getcwd(self): |
| with patch.dict(os.environ, {"TERMINAL_CWD": "/does/not/exist/anywhere"}): |
| self.assertEqual(_resolve_child_cwd("project", "/tmp/staging"), os.getcwd()) |
|
|
| def test_project_expands_tilde(self): |
| import pathlib |
| home = str(pathlib.Path.home()) |
| with patch.dict(os.environ, {"TERMINAL_CWD": "~"}): |
| self.assertEqual(_resolve_child_cwd("project", "/tmp/staging"), home) |
|
|
|
|
| |
| |
| |
|
|
| class TestModeAwareSchema(unittest.TestCase): |
|
|
| def test_strict_description_mentions_temp_dir(self): |
| desc = build_execute_code_schema(mode="strict")["description"] |
| self.assertIn("temp dir", desc) |
|
|
| def test_project_description_mentions_session_and_venv(self): |
| desc = build_execute_code_schema(mode="project")["description"] |
| self.assertIn("session", desc) |
| self.assertIn("venv", desc) |
|
|
| def test_neither_description_uses_sandbox_language(self): |
| """REGRESSION GUARD for commit 39b83f34. |
| |
| Agents on local backends falsely believed they were sandboxed and |
| refused networking tasks. Do not reintroduce any 'sandbox' / |
| 'isolated' / 'cloud' language in the tool description. |
| """ |
| for mode in EXECUTION_MODES: |
| desc = build_execute_code_schema(mode=mode)["description"].lower() |
| for forbidden in ("sandbox", "isolated", "cloud"): |
| self.assertNotIn(forbidden, desc, |
| f"mode={mode}: '{forbidden}' leaked into description") |
|
|
| def test_descriptions_are_similar_length(self): |
| """Both modes should have roughly the same-size description.""" |
| strict = len(build_execute_code_schema(mode="strict")["description"]) |
| project = len(build_execute_code_schema(mode="project")["description"]) |
| self.assertLess(abs(strict - project), 200) |
|
|
| def test_default_mode_reads_config(self): |
| """build_execute_code_schema() with mode=None reads config.yaml.""" |
| with _mock_mode("strict"): |
| desc = build_execute_code_schema()["description"] |
| self.assertIn("temp dir", desc) |
| with _mock_mode("project"): |
| desc = build_execute_code_schema()["description"] |
| self.assertIn("session", desc) |
|
|
|
|
| |
| |
| |
|
|
| @pytest.mark.skipif(sys.platform == "win32", reason="execute_code is POSIX-only") |
| class TestExecuteCodeModeIntegration(unittest.TestCase): |
| """End-to-end: verify the subprocess actually runs where we expect.""" |
|
|
| def _run(self, code, mode, enabled_tools=None, extra_env=None): |
| env_overrides = extra_env or {} |
| with _mock_mode(mode): |
| with patch.dict(os.environ, env_overrides): |
| with patch("model_tools.handle_function_call", |
| side_effect=_mock_handle_function_call): |
| raw = execute_code( |
| code=code, |
| task_id=f"test-{mode}", |
| enabled_tools=enabled_tools or list(SANDBOX_ALLOWED_TOOLS), |
| ) |
| return json.loads(raw) |
|
|
| def test_strict_mode_runs_in_tmpdir(self): |
| """Strict mode: script's os.getcwd() is the staging tmpdir.""" |
| result = self._run("import os; print(os.getcwd())", mode="strict") |
| self.assertEqual(result["status"], "success") |
| self.assertIn("hermes_sandbox_", result["output"]) |
|
|
| def test_project_mode_runs_in_session_cwd(self): |
| """Project mode: script's os.getcwd() is the session's working dir.""" |
| import tempfile |
| with tempfile.TemporaryDirectory() as td: |
| result = self._run( |
| "import os; print(os.getcwd())", |
| mode="project", |
| extra_env={"TERMINAL_CWD": td}, |
| ) |
| self.assertEqual(result["status"], "success") |
| |
| self.assertEqual( |
| os.path.realpath(result["output"].strip()), |
| os.path.realpath(td), |
| ) |
|
|
| def test_project_mode_interpreter_is_venv_python(self): |
| """Project mode: sys.executable inside the child is the venv's python |
| when VIRTUAL_ENV is set to a real venv.""" |
| |
| |
| |
| |
| result = self._run("import sys; print(sys.executable)", mode="project") |
| self.assertEqual(result["status"], "success") |
| |
| output = result["output"].strip() |
| ve = os.environ.get("VIRTUAL_ENV", "").strip() |
| if ve: |
| self.assertTrue( |
| output.startswith(ve) or output == sys.executable, |
| f"project-mode python should be under VIRTUAL_ENV={ve} or sys.executable={sys.executable}, got {output}", |
| ) |
|
|
| def test_project_mode_can_still_import_hermes_tools(self): |
| """Regression: hermes_tools still importable from non-tmpdir CWD. |
| |
| This is the PYTHONPATH fix — without it, switching to session CWD |
| breaks `from hermes_tools import terminal`. |
| """ |
| import tempfile |
| with tempfile.TemporaryDirectory() as td: |
| code = ( |
| "from hermes_tools import terminal\n" |
| "r = terminal('echo x')\n" |
| "print(r.get('output', 'MISSING'))\n" |
| ) |
| result = self._run(code, mode="project", extra_env={"TERMINAL_CWD": td}) |
| self.assertEqual(result["status"], "success") |
| self.assertIn("mock", result["output"]) |
|
|
| def test_strict_mode_can_still_import_hermes_tools(self): |
| """Regression: strict mode's tmpdir CWD still works for imports.""" |
| code = ( |
| "from hermes_tools import terminal\n" |
| "r = terminal('echo x')\n" |
| "print(r.get('output', 'MISSING'))\n" |
| ) |
| result = self._run(code, mode="strict") |
| self.assertEqual(result["status"], "success") |
| self.assertIn("mock", result["output"]) |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
|
|
| @pytest.mark.skipif(sys.platform == "win32", reason="execute_code is POSIX-only") |
| class TestSecurityInvariantsAcrossModes(unittest.TestCase): |
|
|
| def _run(self, code, mode): |
| with _mock_mode(mode): |
| with patch("model_tools.handle_function_call", |
| side_effect=_mock_handle_function_call): |
| raw = execute_code( |
| code=code, |
| task_id=f"test-sec-{mode}", |
| enabled_tools=list(SANDBOX_ALLOWED_TOOLS), |
| ) |
| return json.loads(raw) |
|
|
| def test_api_keys_scrubbed_in_strict_mode(self): |
| code = ( |
| "import os\n" |
| "print('KEY=' + os.environ.get('OPENAI_API_KEY', 'MISSING'))\n" |
| "print('TOK=' + os.environ.get('ANTHROPIC_API_KEY', 'MISSING'))\n" |
| ) |
| with patch.dict(os.environ, { |
| "OPENAI_API_KEY": "sk-should-not-leak", |
| "ANTHROPIC_API_KEY": "ant-should-not-leak", |
| }): |
| result = self._run(code, mode="strict") |
| self.assertEqual(result["status"], "success") |
| self.assertIn("KEY=MISSING", result["output"]) |
| self.assertIn("TOK=MISSING", result["output"]) |
| self.assertNotIn("sk-should-not-leak", result["output"]) |
| self.assertNotIn("ant-should-not-leak", result["output"]) |
|
|
| def test_api_keys_scrubbed_in_project_mode(self): |
| """CRITICAL: the project-mode default does NOT leak user credentials.""" |
| code = ( |
| "import os\n" |
| "print('KEY=' + os.environ.get('OPENAI_API_KEY', 'MISSING'))\n" |
| "print('TOK=' + os.environ.get('ANTHROPIC_API_KEY', 'MISSING'))\n" |
| "print('SEC=' + os.environ.get('GITHUB_TOKEN', 'MISSING'))\n" |
| ) |
| with patch.dict(os.environ, { |
| "OPENAI_API_KEY": "sk-should-not-leak", |
| "ANTHROPIC_API_KEY": "ant-should-not-leak", |
| "GITHUB_TOKEN": "ghp-should-not-leak", |
| }): |
| result = self._run(code, mode="project") |
| self.assertEqual(result["status"], "success") |
| for needle in ("KEY=MISSING", "TOK=MISSING", "SEC=MISSING"): |
| self.assertIn(needle, result["output"]) |
| for leaked in ("sk-should-not-leak", "ant-should-not-leak", "ghp-should-not-leak"): |
| self.assertNotIn(leaked, result["output"]) |
|
|
| def test_secret_substrings_scrubbed_in_project_mode(self): |
| """SECRET/PASSWORD/CREDENTIAL/PASSWD/AUTH filters still apply.""" |
| code = ( |
| "import os\n" |
| "for k in ('MY_SECRET', 'DB_PASSWORD', 'VAULT_CREDENTIAL', " |
| "'LDAP_PASSWD', 'AUTH_TOKEN'):\n" |
| " print(f'{k}=' + os.environ.get(k, 'MISSING'))\n" |
| ) |
| with patch.dict(os.environ, { |
| "MY_SECRET": "secret-should-not-leak", |
| "DB_PASSWORD": "password-should-not-leak", |
| "VAULT_CREDENTIAL": "cred-should-not-leak", |
| "LDAP_PASSWD": "passwd-should-not-leak", |
| "AUTH_TOKEN": "auth-should-not-leak", |
| }): |
| result = self._run(code, mode="project") |
| self.assertEqual(result["status"], "success") |
| for leaked in ("secret-should-not-leak", "password-should-not-leak", |
| "cred-should-not-leak", "passwd-should-not-leak", |
| "auth-should-not-leak"): |
| self.assertNotIn(leaked, result["output"]) |
|
|
| def test_tool_whitelist_enforced_in_strict_mode(self): |
| """A script cannot RPC-call tools outside SANDBOX_ALLOWED_TOOLS.""" |
| |
| self.assertNotIn("execute_code", SANDBOX_ALLOWED_TOOLS) |
| code = ( |
| "import hermes_tools as ht\n" |
| "print('execute_code_available:', hasattr(ht, 'execute_code'))\n" |
| "print('delegate_task_available:', hasattr(ht, 'delegate_task'))\n" |
| ) |
| result = self._run(code, mode="strict") |
| self.assertEqual(result["status"], "success") |
| self.assertIn("execute_code_available: False", result["output"]) |
| self.assertIn("delegate_task_available: False", result["output"]) |
|
|
| def test_tool_whitelist_enforced_in_project_mode(self): |
| """CRITICAL: project mode does NOT widen the tool whitelist.""" |
| code = ( |
| "import hermes_tools as ht\n" |
| "print('execute_code_available:', hasattr(ht, 'execute_code'))\n" |
| "print('delegate_task_available:', hasattr(ht, 'delegate_task'))\n" |
| ) |
| result = self._run(code, mode="project") |
| self.assertEqual(result["status"], "success") |
| self.assertIn("execute_code_available: False", result["output"]) |
| self.assertIn("delegate_task_available: False", result["output"]) |
|
|
|
|
| if __name__ == "__main__": |
| unittest.main() |
|
|