#!/usr/bin/env python3 """ Tests for the code execution sandbox (programmatic tool calling). These tests monkeypatch handle_function_call so they don't require API keys or a running terminal backend. They verify the core sandbox mechanics: UDS socket lifecycle, hermes_tools generation, timeout enforcement, output capping, tool call counting, and error propagation. Run with: python -m pytest tests/test_code_execution.py -v or: python tests/test_code_execution.py """ import pytest pytestmark = pytest.mark.skip(reason="Hangs in non-interactive environments") import json import os import sys import time import threading import unittest from unittest.mock import patch, MagicMock from tools.code_execution_tool import ( SANDBOX_ALLOWED_TOOLS, execute_code, generate_hermes_tools_module, check_sandbox_requirements, build_execute_code_schema, EXECUTE_CODE_SCHEMA, _TOOL_DOC_LINES, ) def _mock_handle_function_call(function_name, function_args, task_id=None, user_task=None): """Mock dispatcher that returns canned responses for each tool.""" if function_name == "terminal": cmd = function_args.get("command", "") return json.dumps({"output": f"mock output for: {cmd}", "exit_code": 0}) if function_name == "web_search": return json.dumps({"results": [{"url": "https://example.com", "title": "Example", "description": "A test result"}]}) if function_name == "read_file": return json.dumps({"content": "line 1\nline 2\nline 3\n", "total_lines": 3}) if function_name == "write_file": return json.dumps({"status": "ok", "path": function_args.get("path", "")}) if function_name == "search_files": return json.dumps({"matches": [{"file": "test.py", "line": 1, "text": "match"}]}) if function_name == "patch": return json.dumps({"status": "ok", "replacements": 1}) if function_name == "web_extract": return json.dumps("# Extracted content\nSome text from the page.") return json.dumps({"error": f"Unknown tool in mock: {function_name}"}) class TestSandboxRequirements(unittest.TestCase): def test_available_on_posix(self): if sys.platform != "win32": self.assertTrue(check_sandbox_requirements()) def test_schema_is_valid(self): self.assertEqual(EXECUTE_CODE_SCHEMA["name"], "execute_code") self.assertIn("code", EXECUTE_CODE_SCHEMA["parameters"]["properties"]) self.assertIn("code", EXECUTE_CODE_SCHEMA["parameters"]["required"]) class TestHermesToolsGeneration(unittest.TestCase): def test_generates_all_allowed_tools(self): src = generate_hermes_tools_module(list(SANDBOX_ALLOWED_TOOLS)) for tool in SANDBOX_ALLOWED_TOOLS: self.assertIn(f"def {tool}(", src) def test_generates_subset(self): src = generate_hermes_tools_module(["terminal", "web_search"]) self.assertIn("def terminal(", src) self.assertIn("def web_search(", src) self.assertNotIn("def read_file(", src) def test_empty_list_generates_nothing(self): src = generate_hermes_tools_module([]) self.assertNotIn("def terminal(", src) self.assertIn("def _call(", src) # infrastructure still present def test_non_allowed_tools_ignored(self): src = generate_hermes_tools_module(["vision_analyze", "terminal"]) self.assertIn("def terminal(", src) self.assertNotIn("def vision_analyze(", src) def test_rpc_infrastructure_present(self): src = generate_hermes_tools_module(["terminal"]) self.assertIn("HERMES_RPC_SOCKET", src) self.assertIn("AF_UNIX", src) self.assertIn("def _connect(", src) self.assertIn("def _call(", src) def test_convenience_helpers_present(self): """Verify json_parse, shell_quote, and retry helpers are generated.""" src = generate_hermes_tools_module(["terminal"]) self.assertIn("def json_parse(", src) self.assertIn("def shell_quote(", src) self.assertIn("def retry(", src) self.assertIn("import json, os, socket, shlex, time", src) @unittest.skipIf(sys.platform == "win32", "UDS not available on Windows") class TestExecuteCode(unittest.TestCase): """Integration tests using the mock dispatcher.""" def _run(self, code, enabled_tools=None): """Helper: run code with mocked handle_function_call.""" with patch("tools.code_execution_tool._rpc_server_loop") as mock_rpc: # Use real execution but mock the tool dispatcher pass # Actually run with full integration, mocking at the model_tools level with patch("model_tools.handle_function_call", side_effect=_mock_handle_function_call): result = execute_code( code=code, task_id="test-task", enabled_tools=enabled_tools or list(SANDBOX_ALLOWED_TOOLS), ) return json.loads(result) def test_basic_print(self): """Script that just prints -- no tool calls.""" result = self._run('print("hello world")') self.assertEqual(result["status"], "success") self.assertIn("hello world", result["output"]) self.assertEqual(result["tool_calls_made"], 0) def test_repo_root_modules_are_importable(self): """Sandboxed scripts can import modules that live at the repo root.""" result = self._run('import hermes_constants; print(hermes_constants.__file__)') self.assertEqual(result["status"], "success") self.assertIn("hermes_constants.py", result["output"]) def test_single_tool_call(self): """Script calls terminal and prints the result.""" code = """ from hermes_tools import terminal result = terminal("echo hello") print(result.get("output", "")) """ result = self._run(code) self.assertEqual(result["status"], "success") self.assertIn("mock output for: echo hello", result["output"]) self.assertEqual(result["tool_calls_made"], 1) def test_multi_tool_chain(self): """Script calls multiple tools sequentially.""" code = """ from hermes_tools import terminal, read_file r1 = terminal("ls") r2 = read_file("test.py") print(f"terminal: {r1['output'][:20]}") print(f"file lines: {r2['total_lines']}") """ result = self._run(code) self.assertEqual(result["status"], "success") self.assertEqual(result["tool_calls_made"], 2) def test_syntax_error(self): """Script with a syntax error returns error status.""" result = self._run("def broken(") self.assertEqual(result["status"], "error") self.assertIn("SyntaxError", result.get("error", "") + result.get("output", "")) def test_runtime_exception(self): """Script with a runtime error returns error status.""" result = self._run("raise ValueError('test error')") self.assertEqual(result["status"], "error") def test_excluded_tool_returns_error(self): """Script calling a tool not in the allow-list gets an error from RPC.""" code = """ from hermes_tools import terminal result = terminal("echo hi") print(result) """ # Only enable web_search -- terminal should be excluded result = self._run(code, enabled_tools=["web_search"]) # terminal won't be in hermes_tools.py, so import fails self.assertEqual(result["status"], "error") def test_empty_code(self): """Empty code string returns an error.""" result = json.loads(execute_code("", task_id="test")) self.assertIn("error", result) def test_output_captured(self): """Multiple print statements are captured in order.""" code = """ for i in range(5): print(f"line {i}") """ result = self._run(code) self.assertEqual(result["status"], "success") for i in range(5): self.assertIn(f"line {i}", result["output"]) def test_stderr_on_error(self): """Traceback from stderr is included in the response.""" code = """ import sys print("before error") raise RuntimeError("deliberate crash") """ result = self._run(code) self.assertEqual(result["status"], "error") self.assertIn("before error", result["output"]) self.assertIn("RuntimeError", result.get("error", "") + result.get("output", "")) def test_timeout_enforcement(self): """Script that sleeps too long is killed.""" code = "import time; time.sleep(999)" with patch("model_tools.handle_function_call", side_effect=_mock_handle_function_call): # Override config to use a very short timeout with patch("tools.code_execution_tool._load_config", return_value={"timeout": 2, "max_tool_calls": 50}): result = json.loads(execute_code( code=code, task_id="test-task", enabled_tools=list(SANDBOX_ALLOWED_TOOLS), )) self.assertEqual(result["status"], "timeout") self.assertIn("timed out", result.get("error", "")) def test_web_search_tool(self): """Script calls web_search and processes results.""" code = """ from hermes_tools import web_search results = web_search("test query") print(f"Found {len(results.get('results', []))} results") """ result = self._run(code) self.assertEqual(result["status"], "success") self.assertIn("Found 1 results", result["output"]) def test_json_parse_helper(self): """json_parse handles control characters that json.loads(strict=True) rejects.""" code = r""" from hermes_tools import json_parse # This JSON has a literal tab character which strict mode rejects text = '{"body": "line1\tline2\nline3"}' result = json_parse(text) print(result["body"]) """ result = self._run(code) self.assertEqual(result["status"], "success") self.assertIn("line1", result["output"]) def test_shell_quote_helper(self): """shell_quote properly escapes dangerous characters.""" code = """ from hermes_tools import shell_quote # String with backticks, quotes, and special chars dangerous = '`rm -rf /` && $(whoami) "hello"' escaped = shell_quote(dangerous) print(escaped) # Verify it's wrapped in single quotes with proper escaping assert "rm -rf" in escaped assert escaped.startswith("'") """ result = self._run(code) self.assertEqual(result["status"], "success") def test_retry_helper_success(self): """retry returns on first success.""" code = """ from hermes_tools import retry counter = [0] def flaky(): counter[0] += 1 return f"ok on attempt {counter[0]}" result = retry(flaky) print(result) """ result = self._run(code) self.assertEqual(result["status"], "success") self.assertIn("ok on attempt 1", result["output"]) def test_retry_helper_eventual_success(self): """retry retries on failure and succeeds eventually.""" code = """ from hermes_tools import retry counter = [0] def flaky(): counter[0] += 1 if counter[0] < 3: raise ConnectionError(f"fail {counter[0]}") return "success" result = retry(flaky, max_attempts=3, delay=0.01) print(result) """ result = self._run(code) self.assertEqual(result["status"], "success") self.assertIn("success", result["output"]) def test_retry_helper_all_fail(self): """retry raises the last error when all attempts fail.""" code = """ from hermes_tools import retry def always_fail(): raise ValueError("nope") try: retry(always_fail, max_attempts=2, delay=0.01) print("should not reach here") except ValueError as e: print(f"caught: {e}") """ result = self._run(code) self.assertEqual(result["status"], "success") self.assertIn("caught: nope", result["output"]) class TestStubSchemaDrift(unittest.TestCase): """Verify that _TOOL_STUBS in code_execution_tool.py stay in sync with the real tool schemas registered in tools/registry.py. If a tool gains a new parameter but the sandbox stub isn't updated, the LLM will try to use the parameter (it sees it in the system prompt) and get a TypeError. This test catches that drift. """ # Parameters that are internal (injected by the handler, not user-facing) _INTERNAL_PARAMS = {"task_id", "user_task"} # Parameters intentionally blocked in the sandbox _BLOCKED_TERMINAL_PARAMS = {"background", "check_interval", "pty"} def test_stubs_cover_all_schema_params(self): """Every user-facing parameter in the real schema must appear in the corresponding _TOOL_STUBS entry.""" import re from tools.code_execution_tool import _TOOL_STUBS # Import the registry and trigger tool registration from tools.registry import registry import tools.file_tools # noqa: F401 - registers read_file, write_file, patch, search_files import tools.web_tools # noqa: F401 - registers web_search, web_extract for tool_name, (func_name, sig, doc, args_expr) in _TOOL_STUBS.items(): entry = registry._tools.get(tool_name) if not entry: # Tool might not be registered yet (e.g., terminal uses a # different registration path). Skip gracefully. continue schema_props = entry.schema.get("parameters", {}).get("properties", {}) schema_params = set(schema_props.keys()) - self._INTERNAL_PARAMS if tool_name == "terminal": schema_params -= self._BLOCKED_TERMINAL_PARAMS # Extract parameter names from the stub signature string # Match word before colon: "pattern: str, target: str = ..." stub_params = set(re.findall(r'(\w+)\s*:', sig)) missing = schema_params - stub_params self.assertEqual( missing, set(), f"Stub for '{tool_name}' is missing parameters that exist in " f"the real schema: {missing}. Update _TOOL_STUBS in " f"code_execution_tool.py to include them." ) def test_stubs_pass_all_params_to_rpc(self): """The args_dict_expr in each stub must include every parameter from the signature, so that all params are actually sent over RPC.""" import re from tools.code_execution_tool import _TOOL_STUBS for tool_name, (func_name, sig, doc, args_expr) in _TOOL_STUBS.items(): stub_params = set(re.findall(r'(\w+)\s*:', sig)) # Check that each param name appears in the args dict expression for param in stub_params: self.assertIn( f'"{param}"', args_expr, f"Stub for '{tool_name}' has parameter '{param}' in its " f"signature but doesn't pass it in the args dict: {args_expr}" ) def test_search_files_target_uses_current_values(self): """search_files stub should use 'content'/'files', not old 'grep'/'find'.""" from tools.code_execution_tool import _TOOL_STUBS _, sig, doc, _ = _TOOL_STUBS["search_files"] self.assertIn('"content"', sig, "search_files stub should default target to 'content', not 'grep'") self.assertNotIn('"grep"', sig, "search_files stub still uses obsolete 'grep' target value") self.assertNotIn('"find"', doc, "search_files stub docstring still uses obsolete 'find' target value") def test_generated_module_accepts_all_params(self): """The generated hermes_tools.py module should accept all current params without TypeError when called with keyword arguments.""" src = generate_hermes_tools_module(list(SANDBOX_ALLOWED_TOOLS)) # Compile the generated module to check for syntax errors compile(src, "hermes_tools.py", "exec") # Verify specific parameter signatures are in the source # search_files must accept context, offset, output_mode self.assertIn("context", src) self.assertIn("offset", src) self.assertIn("output_mode", src) # patch must accept mode and patch params self.assertIn("mode", src) # --------------------------------------------------------------------------- # build_execute_code_schema # --------------------------------------------------------------------------- class TestBuildExecuteCodeSchema(unittest.TestCase): """Tests for build_execute_code_schema — the dynamic schema generator.""" def test_default_includes_all_tools(self): schema = build_execute_code_schema() desc = schema["description"] for name, _ in _TOOL_DOC_LINES: self.assertIn(name, desc, f"Default schema should mention '{name}'") def test_schema_structure(self): schema = build_execute_code_schema() self.assertEqual(schema["name"], "execute_code") self.assertIn("parameters", schema) self.assertIn("code", schema["parameters"]["properties"]) self.assertEqual(schema["parameters"]["required"], ["code"]) def test_subset_only_lists_enabled_tools(self): enabled = {"terminal", "read_file"} schema = build_execute_code_schema(enabled) desc = schema["description"] self.assertIn("terminal(", desc) self.assertIn("read_file(", desc) self.assertNotIn("web_search(", desc) self.assertNotIn("web_extract(", desc) self.assertNotIn("write_file(", desc) def test_single_tool(self): schema = build_execute_code_schema({"terminal"}) desc = schema["description"] self.assertIn("terminal(", desc) self.assertNotIn("web_search(", desc) def test_import_examples_prefer_web_search_and_terminal(self): enabled = {"web_search", "terminal", "read_file"} schema = build_execute_code_schema(enabled) code_desc = schema["parameters"]["properties"]["code"]["description"] self.assertIn("web_search", code_desc) self.assertIn("terminal", code_desc) def test_import_examples_fallback_when_no_preferred(self): """When neither web_search nor terminal are enabled, falls back to sorted first two tools.""" enabled = {"read_file", "write_file", "patch"} schema = build_execute_code_schema(enabled) code_desc = schema["parameters"]["properties"]["code"]["description"] # Should use sorted first 2: patch, read_file self.assertIn("patch", code_desc) self.assertIn("read_file", code_desc) def test_empty_set_produces_valid_description(self): """build_execute_code_schema(set()) must not produce 'import , ...' in the code property description.""" schema = build_execute_code_schema(set()) code_desc = schema["parameters"]["properties"]["code"]["description"] self.assertNotIn("import , ...", code_desc, "Empty enabled set produces broken import syntax in description") def test_real_scenario_all_sandbox_tools_disabled(self): """Reproduce the exact code path from model_tools.py:231-234. Scenario: user runs `hermes tools code_execution` (only code_execution toolset enabled). tools_to_include = {"execute_code"}. model_tools.py does: sandbox_enabled = SANDBOX_ALLOWED_TOOLS & tools_to_include dynamic_schema = build_execute_code_schema(sandbox_enabled) SANDBOX_ALLOWED_TOOLS = {web_search, web_extract, read_file, write_file, search_files, patch, terminal} tools_to_include = {"execute_code"} intersection = empty set """ # Simulate model_tools.py:233 tools_to_include = {"execute_code"} sandbox_enabled = SANDBOX_ALLOWED_TOOLS & tools_to_include self.assertEqual(sandbox_enabled, set(), "Intersection should be empty when only execute_code is enabled") schema = build_execute_code_schema(sandbox_enabled) code_desc = schema["parameters"]["properties"]["code"]["description"] self.assertNotIn("import , ...", code_desc, "Bug: broken import syntax sent to the model") def test_real_scenario_only_vision_enabled(self): """Another real path: user runs `hermes tools code_execution,vision`. tools_to_include = {"execute_code", "vision_analyze"} SANDBOX_ALLOWED_TOOLS has neither, so intersection is empty. """ tools_to_include = {"execute_code", "vision_analyze"} sandbox_enabled = SANDBOX_ALLOWED_TOOLS & tools_to_include self.assertEqual(sandbox_enabled, set()) schema = build_execute_code_schema(sandbox_enabled) code_desc = schema["parameters"]["properties"]["code"]["description"] self.assertNotIn("import , ...", code_desc) def test_description_mentions_limits(self): schema = build_execute_code_schema() desc = schema["description"] self.assertIn("5-minute timeout", desc) self.assertIn("50KB", desc) self.assertIn("50 tool calls", desc) def test_description_mentions_helpers(self): schema = build_execute_code_schema() desc = schema["description"] self.assertIn("json_parse", desc) self.assertIn("shell_quote", desc) self.assertIn("retry", desc) def test_none_defaults_to_all_tools(self): schema_none = build_execute_code_schema(None) schema_all = build_execute_code_schema(SANDBOX_ALLOWED_TOOLS) self.assertEqual(schema_none["description"], schema_all["description"]) # --------------------------------------------------------------------------- # Environment variable filtering (security critical) # --------------------------------------------------------------------------- @unittest.skipIf(sys.platform == "win32", "UDS not available on Windows") class TestEnvVarFiltering(unittest.TestCase): """Verify that execute_code filters environment variables correctly. The child process should NOT receive API keys, tokens, or secrets. It should receive safe vars like PATH, HOME, LANG, etc. """ def _get_child_env(self, extra_env=None): """Run a script that dumps its environment and return the env dict.""" code = ( "import os, json\n" "print(json.dumps(dict(os.environ)))\n" ) env_backup = os.environ.copy() try: if extra_env: os.environ.update(extra_env) with patch("model_tools.handle_function_call", return_value='{}'), \ patch("tools.code_execution_tool._load_config", return_value={"timeout": 10, "max_tool_calls": 50}): raw = execute_code(code, task_id="test-env", enabled_tools=list(SANDBOX_ALLOWED_TOOLS)) finally: os.environ.clear() os.environ.update(env_backup) result = json.loads(raw) self.assertEqual(result["status"], "success", result.get("error", "")) return json.loads(result["output"].strip()) def test_api_keys_excluded(self): child_env = self._get_child_env({ "OPENAI_API_KEY": "sk-secret123", "ANTHROPIC_API_KEY": "sk-ant-secret", "FIRECRAWL_API_KEY": "fc-secret", }) self.assertNotIn("OPENAI_API_KEY", child_env) self.assertNotIn("ANTHROPIC_API_KEY", child_env) self.assertNotIn("FIRECRAWL_API_KEY", child_env) def test_tokens_excluded(self): child_env = self._get_child_env({ "GITHUB_TOKEN": "ghp_secret", "MODAL_TOKEN_ID": "tok-123", "MODAL_TOKEN_SECRET": "tok-sec", }) self.assertNotIn("GITHUB_TOKEN", child_env) self.assertNotIn("MODAL_TOKEN_ID", child_env) self.assertNotIn("MODAL_TOKEN_SECRET", child_env) def test_password_vars_excluded(self): child_env = self._get_child_env({ "DB_PASSWORD": "hunter2", "MY_PASSWD": "secret", "AUTH_CREDENTIAL": "cred", }) self.assertNotIn("DB_PASSWORD", child_env) self.assertNotIn("MY_PASSWD", child_env) self.assertNotIn("AUTH_CREDENTIAL", child_env) def test_path_included(self): child_env = self._get_child_env() self.assertIn("PATH", child_env) def test_home_included(self): child_env = self._get_child_env() self.assertIn("HOME", child_env) def test_hermes_rpc_socket_injected(self): child_env = self._get_child_env() self.assertIn("HERMES_RPC_SOCKET", child_env) def test_pythondontwritebytecode_set(self): child_env = self._get_child_env() self.assertEqual(child_env.get("PYTHONDONTWRITEBYTECODE"), "1") def test_timezone_injected_when_set(self): env_backup = os.environ.copy() try: os.environ["HERMES_TIMEZONE"] = "America/New_York" child_env = self._get_child_env() self.assertEqual(child_env.get("TZ"), "America/New_York") finally: os.environ.clear() os.environ.update(env_backup) def test_timezone_not_set_when_empty(self): env_backup = os.environ.copy() try: os.environ.pop("HERMES_TIMEZONE", None) child_env = self._get_child_env() if "TZ" in child_env: self.assertNotEqual(child_env["TZ"], "") finally: os.environ.clear() os.environ.update(env_backup) # --------------------------------------------------------------------------- # execute_code edge cases # --------------------------------------------------------------------------- class TestExecuteCodeEdgeCases(unittest.TestCase): def test_windows_returns_error(self): """On Windows (or when SANDBOX_AVAILABLE is False), returns error JSON.""" with patch("tools.code_execution_tool.SANDBOX_AVAILABLE", False): result = json.loads(execute_code("print('hi')", task_id="test")) self.assertIn("error", result) self.assertIn("Windows", result["error"]) def test_whitespace_only_code(self): result = json.loads(execute_code(" \n\t ", task_id="test")) self.assertIn("error", result) self.assertIn("No code", result["error"]) @unittest.skipIf(sys.platform == "win32", "UDS not available on Windows") def test_none_enabled_tools_uses_all(self): """When enabled_tools is None, all sandbox tools should be available.""" code = ( "from hermes_tools import terminal, web_search, read_file\n" "print('all imports ok')\n" ) with patch("model_tools.handle_function_call", return_value=json.dumps({"ok": True})): result = json.loads(execute_code(code, task_id="test-none", enabled_tools=None)) self.assertEqual(result["status"], "success") self.assertIn("all imports ok", result["output"]) @unittest.skipIf(sys.platform == "win32", "UDS not available on Windows") def test_empty_enabled_tools_uses_all(self): """When enabled_tools is [] (empty), all sandbox tools should be available.""" code = ( "from hermes_tools import terminal, web_search\n" "print('imports ok')\n" ) with patch("model_tools.handle_function_call", return_value=json.dumps({"ok": True})): result = json.loads(execute_code(code, task_id="test-empty", enabled_tools=[])) self.assertEqual(result["status"], "success") self.assertIn("imports ok", result["output"]) @unittest.skipIf(sys.platform == "win32", "UDS not available on Windows") def test_nonoverlapping_tools_fallback(self): """When enabled_tools has no overlap with SANDBOX_ALLOWED_TOOLS, should fall back to all allowed tools.""" code = ( "from hermes_tools import terminal\n" "print('fallback ok')\n" ) with patch("model_tools.handle_function_call", return_value=json.dumps({"ok": True})): result = json.loads(execute_code( code, task_id="test-nonoverlap", enabled_tools=["vision_analyze", "browser_snapshot"], )) self.assertEqual(result["status"], "success") self.assertIn("fallback ok", result["output"]) # --------------------------------------------------------------------------- # _load_config # --------------------------------------------------------------------------- class TestLoadConfig(unittest.TestCase): def test_returns_empty_dict_when_cli_config_unavailable(self): from tools.code_execution_tool import _load_config with patch.dict("sys.modules", {"cli": None}): result = _load_config() self.assertIsInstance(result, dict) def test_returns_code_execution_section(self): from tools.code_execution_tool import _load_config mock_cli = MagicMock() mock_cli.CLI_CONFIG = {"code_execution": {"timeout": 120, "max_tool_calls": 10}} with patch.dict("sys.modules", {"cli": mock_cli}): result = _load_config() self.assertIsInstance(result, dict) # --------------------------------------------------------------------------- # Interrupt event # --------------------------------------------------------------------------- @unittest.skipIf(sys.platform == "win32", "UDS not available on Windows") class TestInterruptHandling(unittest.TestCase): def test_interrupt_event_stops_execution(self): """When _interrupt_event is set, execute_code should stop the script.""" code = "import time; time.sleep(60); print('should not reach')" def set_interrupt_after_delay(): import time as _t _t.sleep(1) from tools.terminal_tool import _interrupt_event _interrupt_event.set() t = threading.Thread(target=set_interrupt_after_delay, daemon=True) t.start() try: with patch("model_tools.handle_function_call", return_value=json.dumps({"ok": True})), \ patch("tools.code_execution_tool._load_config", return_value={"timeout": 30, "max_tool_calls": 50}): result = json.loads(execute_code( code, task_id="test-interrupt", enabled_tools=list(SANDBOX_ALLOWED_TOOLS), )) self.assertEqual(result["status"], "interrupted") self.assertIn("interrupted", result["output"]) finally: from tools.terminal_tool import _interrupt_event _interrupt_event.clear() t.join(timeout=3) class TestHeadTailTruncation(unittest.TestCase): """Tests for head+tail truncation of large stdout in execute_code.""" def _run(self, code): with patch("model_tools.handle_function_call", side_effect=_mock_handle_function_call): result = execute_code( code=code, task_id="test-task", enabled_tools=list(SANDBOX_ALLOWED_TOOLS), ) return json.loads(result) def test_short_output_not_truncated(self): """Output under MAX_STDOUT_BYTES should not be truncated.""" result = self._run('print("small output")') self.assertEqual(result["status"], "success") self.assertIn("small output", result["output"]) self.assertNotIn("TRUNCATED", result["output"]) def test_large_output_preserves_head_and_tail(self): """Output exceeding MAX_STDOUT_BYTES keeps both head and tail.""" code = ''' # Print HEAD marker, then filler, then TAIL marker print("HEAD_MARKER_START") for i in range(15000): print(f"filler_line_{i:06d}_padding_to_fill_buffer") print("TAIL_MARKER_END") ''' result = self._run(code) self.assertEqual(result["status"], "success") output = result["output"] # Head should be preserved self.assertIn("HEAD_MARKER_START", output) # Tail should be preserved (this is the key improvement) self.assertIn("TAIL_MARKER_END", output) # Truncation notice should be present self.assertIn("TRUNCATED", output) def test_truncation_notice_format(self): """Truncation notice includes character counts.""" code = ''' for i in range(15000): print(f"padding_line_{i:06d}_xxxxxxxxxxxxxxxxxxxxxxxxxx") ''' result = self._run(code) output = result["output"] if "TRUNCATED" in output: self.assertIn("chars omitted", output) self.assertIn("total", output) if __name__ == "__main__": unittest.main()