File size: 20,211 Bytes
1cecfb1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
#!/usr/bin/env python3
"""Tests for execute_code's strict / project execution modes.

The mode switch controls two things:
  - working directory: staging tmpdir (strict) vs session CWD (project)
  - interpreter:       sys.executable (strict) vs active venv's python (project)

Security-critical invariants β€” env scrubbing, tool whitelist, resource caps β€”
must apply identically in both modes. These tests guard all three layers.

Mode is sourced exclusively from ``code_execution.mode`` in config.yaml β€”
there is no env-var override. Tests patch ``_load_config`` directly.
"""

import json
import os
import sys
import unittest
from contextlib import contextmanager
from unittest.mock import patch

import pytest

os.environ["TERMINAL_ENV"] = "local"


@pytest.fixture(autouse=True)
def _force_local_terminal(monkeypatch):
    """Mirror test_code_execution.py β€” guarantee local backend under xdist."""
    monkeypatch.setenv("TERMINAL_ENV", "local")


from tools.code_execution_tool import (
    SANDBOX_ALLOWED_TOOLS,
    DEFAULT_EXECUTION_MODE,
    EXECUTION_MODES,
    _get_execution_mode,
    _is_usable_python,
    _resolve_child_cwd,
    _resolve_child_python,
    build_execute_code_schema,
    execute_code,
)


@contextmanager
def _mock_mode(mode):
    """Context manager that pins code_execution.mode to the given value."""
    with patch("tools.code_execution_tool._load_config",
               return_value={"mode": mode}):
        yield


def _mock_handle_function_call(function_name, function_args, task_id=None, user_task=None):
    """Minimal mock dispatcher reused across tests."""
    if function_name == "terminal":
        return json.dumps({"output": "mock", "exit_code": 0})
    if function_name == "read_file":
        return json.dumps({"content": "line1\n", "total_lines": 1})
    return json.dumps({"error": f"Unknown tool: {function_name}"})


# ---------------------------------------------------------------------------
# Mode resolution
# ---------------------------------------------------------------------------

class TestGetExecutionMode(unittest.TestCase):
    """_get_execution_mode reads config.yaml only (no env var surface)."""

    def test_default_is_project(self):
        self.assertEqual(DEFAULT_EXECUTION_MODE, "project")

    def test_config_project(self):
        with patch("tools.code_execution_tool._load_config",
                   return_value={"mode": "project"}):
            self.assertEqual(_get_execution_mode(), "project")

    def test_config_strict(self):
        with patch("tools.code_execution_tool._load_config",
                   return_value={"mode": "strict"}):
            self.assertEqual(_get_execution_mode(), "strict")

    def test_config_case_insensitive(self):
        with patch("tools.code_execution_tool._load_config",
                   return_value={"mode": "STRICT"}):
            self.assertEqual(_get_execution_mode(), "strict")

    def test_config_strips_whitespace(self):
        with patch("tools.code_execution_tool._load_config",
                   return_value={"mode": "  project  "}):
            self.assertEqual(_get_execution_mode(), "project")

    def test_empty_config_falls_back_to_default(self):
        with patch("tools.code_execution_tool._load_config", return_value={}):
            self.assertEqual(_get_execution_mode(), DEFAULT_EXECUTION_MODE)

    def test_bogus_config_falls_back_to_default(self):
        with patch("tools.code_execution_tool._load_config",
                   return_value={"mode": "banana"}):
            self.assertEqual(_get_execution_mode(), DEFAULT_EXECUTION_MODE)

    def test_none_config_falls_back_to_default(self):
        with patch("tools.code_execution_tool._load_config",
                   return_value={"mode": None}):
            # str(None).lower() = "none" β†’ not in EXECUTION_MODES β†’ default
            self.assertEqual(_get_execution_mode(), DEFAULT_EXECUTION_MODE)

    def test_execution_modes_tuple(self):
        """Canonical set of modes β€” tests + config layer rely on this shape."""
        self.assertEqual(set(EXECUTION_MODES), {"project", "strict"})


# ---------------------------------------------------------------------------
# Interpreter resolver
# ---------------------------------------------------------------------------

class TestResolveChildPython(unittest.TestCase):
    """_resolve_child_python β€” picks the right interpreter per mode."""

    def test_strict_always_sys_executable(self):
        """Strict mode never leaves sys.executable, even if venv is set."""
        with patch.dict(os.environ, {"VIRTUAL_ENV": "/some/venv"}):
            self.assertEqual(_resolve_child_python("strict"), sys.executable)

    def test_project_with_no_venv_falls_back(self):
        """Project mode without VIRTUAL_ENV or CONDA_PREFIX β†’ sys.executable."""
        env = {k: v for k, v in os.environ.items()
               if k not in ("VIRTUAL_ENV", "CONDA_PREFIX")}
        with patch.dict(os.environ, env, clear=True):
            self.assertEqual(_resolve_child_python("project"), sys.executable)

    def test_project_with_virtualenv_picks_venv_python(self):
        """Project mode + VIRTUAL_ENV pointing at a real venv β†’ that python."""
        import tempfile, pathlib
        with tempfile.TemporaryDirectory() as td:
            fake_venv = pathlib.Path(td)
            (fake_venv / "bin").mkdir()
            # Symlink to real python so the version check actually passes
            (fake_venv / "bin" / "python").symlink_to(sys.executable)
            with patch.dict(os.environ, {"VIRTUAL_ENV": str(fake_venv)}):
                # Clear cache β€” _is_usable_python memoizes on path
                _is_usable_python.cache_clear()
                result = _resolve_child_python("project")
                self.assertEqual(result, str(fake_venv / "bin" / "python"))

    def test_project_with_broken_venv_falls_back(self):
        """VIRTUAL_ENV set but bin/python missing β†’ sys.executable."""
        import tempfile
        with tempfile.TemporaryDirectory() as td:
            # No bin/python inside β€” broken venv
            with patch.dict(os.environ, {"VIRTUAL_ENV": td}):
                _is_usable_python.cache_clear()
                self.assertEqual(_resolve_child_python("project"), sys.executable)

    def test_project_prefers_virtualenv_over_conda(self):
        """If both VIRTUAL_ENV and CONDA_PREFIX are set, VIRTUAL_ENV wins."""
        import tempfile, pathlib
        with tempfile.TemporaryDirectory() as ve_td, tempfile.TemporaryDirectory() as conda_td:
            ve = pathlib.Path(ve_td)
            (ve / "bin").mkdir()
            (ve / "bin" / "python").symlink_to(sys.executable)

            conda = pathlib.Path(conda_td)
            (conda / "bin").mkdir()
            (conda / "bin" / "python").symlink_to(sys.executable)

            with patch.dict(os.environ, {"VIRTUAL_ENV": str(ve), "CONDA_PREFIX": str(conda)}):
                _is_usable_python.cache_clear()
                result = _resolve_child_python("project")
                self.assertEqual(result, str(ve / "bin" / "python"))

    def test_is_usable_python_rejects_nonexistent(self):
        _is_usable_python.cache_clear()
        self.assertFalse(_is_usable_python("/does/not/exist/python"))

    def test_is_usable_python_accepts_real_python(self):
        _is_usable_python.cache_clear()
        self.assertTrue(_is_usable_python(sys.executable))


# ---------------------------------------------------------------------------
# CWD resolver
# ---------------------------------------------------------------------------

class TestResolveChildCwd(unittest.TestCase):

    def test_strict_uses_staging_dir(self):
        self.assertEqual(_resolve_child_cwd("strict", "/tmp/staging"), "/tmp/staging")

    def test_project_without_terminal_cwd_uses_getcwd(self):
        env = {k: v for k, v in os.environ.items() if k != "TERMINAL_CWD"}
        with patch.dict(os.environ, env, clear=True):
            self.assertEqual(_resolve_child_cwd("project", "/tmp/staging"), os.getcwd())

    def test_project_uses_terminal_cwd_when_set(self):
        import tempfile
        with tempfile.TemporaryDirectory() as td:
            with patch.dict(os.environ, {"TERMINAL_CWD": td}):
                self.assertEqual(_resolve_child_cwd("project", "/tmp/staging"), td)

    def test_project_bogus_terminal_cwd_falls_back_to_getcwd(self):
        with patch.dict(os.environ, {"TERMINAL_CWD": "/does/not/exist/anywhere"}):
            self.assertEqual(_resolve_child_cwd("project", "/tmp/staging"), os.getcwd())

    def test_project_expands_tilde(self):
        import pathlib
        home = str(pathlib.Path.home())
        with patch.dict(os.environ, {"TERMINAL_CWD": "~"}):
            self.assertEqual(_resolve_child_cwd("project", "/tmp/staging"), home)


# ---------------------------------------------------------------------------
# Schema description
# ---------------------------------------------------------------------------

class TestModeAwareSchema(unittest.TestCase):

    def test_strict_description_mentions_temp_dir(self):
        desc = build_execute_code_schema(mode="strict")["description"]
        self.assertIn("temp dir", desc)

    def test_project_description_mentions_session_and_venv(self):
        desc = build_execute_code_schema(mode="project")["description"]
        self.assertIn("session", desc)
        self.assertIn("venv", desc)

    def test_neither_description_uses_sandbox_language(self):
        """REGRESSION GUARD for commit 39b83f34.

        Agents on local backends falsely believed they were sandboxed and
        refused networking tasks. Do not reintroduce any 'sandbox' /
        'isolated' / 'cloud' language in the tool description.
        """
        for mode in EXECUTION_MODES:
            desc = build_execute_code_schema(mode=mode)["description"].lower()
            for forbidden in ("sandbox", "isolated", "cloud"):
                self.assertNotIn(forbidden, desc,
                                 f"mode={mode}: '{forbidden}' leaked into description")

    def test_descriptions_are_similar_length(self):
        """Both modes should have roughly the same-size description."""
        strict = len(build_execute_code_schema(mode="strict")["description"])
        project = len(build_execute_code_schema(mode="project")["description"])
        self.assertLess(abs(strict - project), 200)

    def test_default_mode_reads_config(self):
        """build_execute_code_schema() with mode=None reads config.yaml."""
        with _mock_mode("strict"):
            desc = build_execute_code_schema()["description"]
            self.assertIn("temp dir", desc)
        with _mock_mode("project"):
            desc = build_execute_code_schema()["description"]
            self.assertIn("session", desc)


# ---------------------------------------------------------------------------
# Integration: what actually happens when execute_code runs per mode
# ---------------------------------------------------------------------------

@pytest.mark.skipif(sys.platform == "win32", reason="execute_code is POSIX-only")
class TestExecuteCodeModeIntegration(unittest.TestCase):
    """End-to-end: verify the subprocess actually runs where we expect."""

    def _run(self, code, mode, enabled_tools=None, extra_env=None):
        env_overrides = extra_env or {}
        with _mock_mode(mode):
            with patch.dict(os.environ, env_overrides):
                with patch("model_tools.handle_function_call",
                           side_effect=_mock_handle_function_call):
                    raw = execute_code(
                        code=code,
                        task_id=f"test-{mode}",
                        enabled_tools=enabled_tools or list(SANDBOX_ALLOWED_TOOLS),
                    )
        return json.loads(raw)

    def test_strict_mode_runs_in_tmpdir(self):
        """Strict mode: script's os.getcwd() is the staging tmpdir."""
        result = self._run("import os; print(os.getcwd())", mode="strict")
        self.assertEqual(result["status"], "success")
        self.assertIn("hermes_sandbox_", result["output"])

    def test_project_mode_runs_in_session_cwd(self):
        """Project mode: script's os.getcwd() is the session's working dir."""
        import tempfile
        with tempfile.TemporaryDirectory() as td:
            result = self._run(
                "import os; print(os.getcwd())",
                mode="project",
                extra_env={"TERMINAL_CWD": td},
            )
            self.assertEqual(result["status"], "success")
            # Resolve symlinks (macOS /tmp β†’ /private/tmp) on both sides
            self.assertEqual(
                os.path.realpath(result["output"].strip()),
                os.path.realpath(td),
            )

    def test_project_mode_interpreter_is_venv_python(self):
        """Project mode: sys.executable inside the child is the venv's python
        when VIRTUAL_ENV is set to a real venv."""
        # The hermes-agent venv is always active during tests, so this also
        # happens to equal sys.executable of the parent. What we're asserting
        # is: resolver picked a venv-bin/python path, not that it differs
        # from sys.executable.
        result = self._run("import sys; print(sys.executable)", mode="project")
        self.assertEqual(result["status"], "success")
        # Either VIRTUAL_ENV-bin/python or sys.executable fallback, both OK.
        output = result["output"].strip()
        ve = os.environ.get("VIRTUAL_ENV", "").strip()
        if ve:
            self.assertTrue(
                output.startswith(ve) or output == sys.executable,
                f"project-mode python should be under VIRTUAL_ENV={ve} or sys.executable={sys.executable}, got {output}",
            )

    def test_project_mode_can_still_import_hermes_tools(self):
        """Regression: hermes_tools still importable from non-tmpdir CWD.

        This is the PYTHONPATH fix β€” without it, switching to session CWD
        breaks `from hermes_tools import terminal`.
        """
        import tempfile
        with tempfile.TemporaryDirectory() as td:
            code = (
                "from hermes_tools import terminal\n"
                "r = terminal('echo x')\n"
                "print(r.get('output', 'MISSING'))\n"
            )
            result = self._run(code, mode="project", extra_env={"TERMINAL_CWD": td})
            self.assertEqual(result["status"], "success")
            self.assertIn("mock", result["output"])

    def test_strict_mode_can_still_import_hermes_tools(self):
        """Regression: strict mode's tmpdir CWD still works for imports."""
        code = (
            "from hermes_tools import terminal\n"
            "r = terminal('echo x')\n"
            "print(r.get('output', 'MISSING'))\n"
        )
        result = self._run(code, mode="strict")
        self.assertEqual(result["status"], "success")
        self.assertIn("mock", result["output"])


# ---------------------------------------------------------------------------
# SECURITY-CRITICAL regression guards
#
# These MUST pass in both strict and project mode. The whole tiered-mode
# proposition rests on the claim that switching from strict to project only
# changes CWD + interpreter, not the security posture.
# ---------------------------------------------------------------------------

@pytest.mark.skipif(sys.platform == "win32", reason="execute_code is POSIX-only")
class TestSecurityInvariantsAcrossModes(unittest.TestCase):

    def _run(self, code, mode):
        with _mock_mode(mode):
            with patch("model_tools.handle_function_call",
                       side_effect=_mock_handle_function_call):
                raw = execute_code(
                    code=code,
                    task_id=f"test-sec-{mode}",
                    enabled_tools=list(SANDBOX_ALLOWED_TOOLS),
                )
        return json.loads(raw)

    def test_api_keys_scrubbed_in_strict_mode(self):
        code = (
            "import os\n"
            "print('KEY=' + os.environ.get('OPENAI_API_KEY', 'MISSING'))\n"
            "print('TOK=' + os.environ.get('ANTHROPIC_API_KEY', 'MISSING'))\n"
        )
        with patch.dict(os.environ, {
            "OPENAI_API_KEY": "sk-should-not-leak",
            "ANTHROPIC_API_KEY": "ant-should-not-leak",
        }):
            result = self._run(code, mode="strict")
        self.assertEqual(result["status"], "success")
        self.assertIn("KEY=MISSING", result["output"])
        self.assertIn("TOK=MISSING", result["output"])
        self.assertNotIn("sk-should-not-leak", result["output"])
        self.assertNotIn("ant-should-not-leak", result["output"])

    def test_api_keys_scrubbed_in_project_mode(self):
        """CRITICAL: the project-mode default does NOT leak user credentials."""
        code = (
            "import os\n"
            "print('KEY=' + os.environ.get('OPENAI_API_KEY', 'MISSING'))\n"
            "print('TOK=' + os.environ.get('ANTHROPIC_API_KEY', 'MISSING'))\n"
            "print('SEC=' + os.environ.get('GITHUB_TOKEN', 'MISSING'))\n"
        )
        with patch.dict(os.environ, {
            "OPENAI_API_KEY": "sk-should-not-leak",
            "ANTHROPIC_API_KEY": "ant-should-not-leak",
            "GITHUB_TOKEN": "ghp-should-not-leak",
        }):
            result = self._run(code, mode="project")
        self.assertEqual(result["status"], "success")
        for needle in ("KEY=MISSING", "TOK=MISSING", "SEC=MISSING"):
            self.assertIn(needle, result["output"])
        for leaked in ("sk-should-not-leak", "ant-should-not-leak", "ghp-should-not-leak"):
            self.assertNotIn(leaked, result["output"])

    def test_secret_substrings_scrubbed_in_project_mode(self):
        """SECRET/PASSWORD/CREDENTIAL/PASSWD/AUTH filters still apply."""
        code = (
            "import os\n"
            "for k in ('MY_SECRET', 'DB_PASSWORD', 'VAULT_CREDENTIAL', "
            "'LDAP_PASSWD', 'AUTH_TOKEN'):\n"
            "    print(f'{k}=' + os.environ.get(k, 'MISSING'))\n"
        )
        with patch.dict(os.environ, {
            "MY_SECRET": "secret-should-not-leak",
            "DB_PASSWORD": "password-should-not-leak",
            "VAULT_CREDENTIAL": "cred-should-not-leak",
            "LDAP_PASSWD": "passwd-should-not-leak",
            "AUTH_TOKEN": "auth-should-not-leak",
        }):
            result = self._run(code, mode="project")
        self.assertEqual(result["status"], "success")
        for leaked in ("secret-should-not-leak", "password-should-not-leak",
                       "cred-should-not-leak", "passwd-should-not-leak",
                       "auth-should-not-leak"):
            self.assertNotIn(leaked, result["output"])

    def test_tool_whitelist_enforced_in_strict_mode(self):
        """A script cannot RPC-call tools outside SANDBOX_ALLOWED_TOOLS."""
        # execute_code is NOT in SANDBOX_ALLOWED_TOOLS (no recursion)
        self.assertNotIn("execute_code", SANDBOX_ALLOWED_TOOLS)
        code = (
            "import hermes_tools as ht\n"
            "print('execute_code_available:', hasattr(ht, 'execute_code'))\n"
            "print('delegate_task_available:', hasattr(ht, 'delegate_task'))\n"
        )
        result = self._run(code, mode="strict")
        self.assertEqual(result["status"], "success")
        self.assertIn("execute_code_available: False", result["output"])
        self.assertIn("delegate_task_available: False", result["output"])

    def test_tool_whitelist_enforced_in_project_mode(self):
        """CRITICAL: project mode does NOT widen the tool whitelist."""
        code = (
            "import hermes_tools as ht\n"
            "print('execute_code_available:', hasattr(ht, 'execute_code'))\n"
            "print('delegate_task_available:', hasattr(ht, 'delegate_task'))\n"
        )
        result = self._run(code, mode="project")
        self.assertEqual(result["status"], "success")
        self.assertIn("execute_code_available: False", result["output"])
        self.assertIn("delegate_task_available: False", result["output"])


if __name__ == "__main__":
    unittest.main()