Spaces:
Running
Running
| """ | |
| Extended tests for tools: CodeInterpreterTool, FileSearchTool, ShellTool extras, | |
| and FunctionTool edge cases. | |
| """ | |
| import tempfile | |
| from pathlib import Path | |
| import pytest | |
| from tools.code_interpreter import CodeInterpreterTool | |
| from tools.file_search import FileSearchTool | |
| # βββββββββββββββββββββββββββ CodeInterpreterTool ββββββββββββββββββββββββββββββ | |
| class TestCodeInterpreterToolInit: | |
| def test_default_init(self): | |
| tool = CodeInterpreterTool() | |
| assert tool._timeout == 30 | |
| assert tool._max_output_size == 8192 | |
| assert tool._safe_mode is True | |
| def test_custom_init(self): | |
| tool = CodeInterpreterTool(timeout=10, max_output_size=1024, safe_mode=False) | |
| assert tool._timeout == 10 | |
| assert tool._safe_mode is False | |
| def test_name_and_description(self): | |
| tool = CodeInterpreterTool() | |
| assert tool.name == "code_interpreter" | |
| assert "Python" in tool.description or "code" in tool.description.lower() | |
| def test_parameters_schema(self): | |
| tool = CodeInterpreterTool() | |
| schema = tool.parameters_schema | |
| assert schema["type"] == "object" | |
| assert "code" in schema["properties"] | |
| class TestCodeInterpreterExecution: | |
| def setup_method(self): | |
| self.tool = CodeInterpreterTool(timeout=5) | |
| def test_simple_print(self): | |
| result = self.tool.execute(code="print('hello world')") | |
| assert result.success is True | |
| assert "hello world" in result.output | |
| def test_arithmetic(self): | |
| result = self.tool.execute(code="print(2 + 2)") | |
| assert result.success is True | |
| assert "4" in result.output | |
| def test_expression_eval(self): | |
| result = self.tool.execute(code="2 + 2") | |
| assert result.success is True | |
| def test_multiline_code(self): | |
| code = """ | |
| x = 10 | |
| y = 20 | |
| print(x + y) | |
| """ | |
| result = self.tool.execute(code=code) | |
| assert result.success is True | |
| assert "30" in result.output | |
| def test_loop(self): | |
| code = "for i in range(3): print(i)" | |
| result = self.tool.execute(code=code) | |
| assert result.success is True | |
| assert "0" in result.output | |
| assert "2" in result.output | |
| def test_function_definition_and_call(self): | |
| code = """ | |
| def add(a, b): | |
| return a + b | |
| print(add(3, 4)) | |
| """ | |
| result = self.tool.execute(code=code) | |
| assert result.success is True | |
| assert "7" in result.output | |
| def test_math_module(self): | |
| # math is pre-loaded in safe globals, no import needed | |
| code = "print(math.floor(3.7))" | |
| result = self.tool.execute(code=code) | |
| assert result.success is True | |
| assert "3" in result.output | |
| def test_json_module(self): | |
| # json is pre-loaded in safe globals | |
| code = "d = {'a': 1}; print(json.dumps(d))" | |
| result = self.tool.execute(code=code) | |
| assert result.success is True | |
| assert '"a"' in result.output | |
| def test_empty_code(self): | |
| result = self.tool.execute(code="") | |
| assert result.success is False | |
| assert result.error is not None | |
| def test_no_code_kwarg(self): | |
| result = self.tool.execute() | |
| assert result.success is False | |
| def test_syntax_error(self): | |
| result = self.tool.execute(code="def broken(:") | |
| assert result.success is False | |
| assert result.error is not None | |
| def test_name_error(self): | |
| result = self.tool.execute(code="print(undefined_variable)") | |
| assert result.success is False | |
| def test_zero_division(self): | |
| result = self.tool.execute(code="print(1 / 0)") | |
| assert result.success is False | |
| assert "ZeroDivision" in str(result.error) | |
| def test_output_truncation(self): | |
| tool = CodeInterpreterTool(max_output_size=50) | |
| code = "print('x' * 1000)" | |
| result = tool.execute(code=code) | |
| assert result.success is True | |
| assert "truncated" in result.output or len(result.output) <= 100 | |
| def test_stderr_captured(self): | |
| # In unsafe mode sys is available | |
| tool = CodeInterpreterTool(safe_mode=False) | |
| code = "import sys; sys.stderr.write('error message\\n')" | |
| result = tool.execute(code=code) | |
| # stderr is captured and may be appended to output | |
| assert result.success is True | |
| def test_stderr_with_exception_includes_stderr_in_error(self): | |
| """Line 252: error_msg includes stderr output when an exception occurs after stderr write.""" | |
| tool = CodeInterpreterTool(safe_mode=False) | |
| # Write to stderr then raise an exception | |
| code = "import sys; sys.stderr.write('stderr content'); raise ValueError('test error')" | |
| result = tool.execute(code=code) | |
| assert result.success is False | |
| # The error message should contain stderr content | |
| assert result.error is not None | |
| def test_no_output_returns_placeholder(self): | |
| result = self.tool.execute(code="x = 42") | |
| assert result.success is True | |
| assert result.output is not None | |
| def test_statistics_module(self): | |
| # statistics is pre-loaded in safe globals | |
| code = "print(statistics.mean([1, 2, 3, 4, 5]))" | |
| result = self.tool.execute(code=code) | |
| assert result.success is True | |
| assert "3" in result.output | |
| def test_unsafe_mode(self): | |
| """In unsafe mode, more builtins are available.""" | |
| tool = CodeInterpreterTool(safe_mode=False) | |
| result = tool.execute(code="print(len([1, 2, 3]))") | |
| assert result.success is True | |
| assert "3" in result.output | |
| def test_safe_builtins_available(self): | |
| """Common builtins should be available in safe mode.""" | |
| code = "print(sorted([3, 1, 2]))" | |
| result = self.tool.execute(code=code) | |
| assert result.success is True | |
| def test_list_comprehension(self): | |
| code = "result = [x**2 for x in range(5)]; print(result)" | |
| result = self.tool.execute(code=code) | |
| assert result.success is True | |
| assert "16" in result.output | |
| def test_exception_in_code(self): | |
| code = "raise ValueError('test error')" | |
| result = self.tool.execute(code=code) | |
| assert result.success is False | |
| assert "ValueError" in str(result.error) | |
| def test_itertools_available(self): | |
| # itertools is pre-loaded in safe globals | |
| code = "pairs = list(itertools.combinations([1,2,3], 2)); print(len(pairs))" | |
| result = self.tool.execute(code=code) | |
| assert result.success is True | |
| assert "3" in result.output | |
| def test_datetime_available(self): | |
| # datetime is pre-loaded in safe globals | |
| code = "print(datetime.datetime(2024, 1, 1).year)" | |
| result = self.tool.execute(code=code) | |
| assert result.success is True | |
| assert "2024" in result.output | |
| # βββββββββββββββββββββββββββ FileSearchTool βββββββββββββββββββββββββββββββββββ | |
| def file_tree(tmp_path): | |
| """Create a small directory tree for testing.""" | |
| (tmp_path / "docs").mkdir() | |
| (tmp_path / "src").mkdir() | |
| (tmp_path / "src" / "sub").mkdir() | |
| (tmp_path / "README.md").write_text("# Project\nThis is a project.", encoding="utf-8") | |
| (tmp_path / "docs" / "guide.md").write_text("# Guide\nSome guide text.", encoding="utf-8") | |
| (tmp_path / "src" / "main.py").write_text("def main():\n print('hello')\n", encoding="utf-8") | |
| (tmp_path / "src" / "utils.py").write_text("def helper():\n return 42\n", encoding="utf-8") | |
| (tmp_path / "src" / "sub" / "deep.py").write_text("x = 1\n", encoding="utf-8") | |
| (tmp_path / ".hidden").write_text("hidden file", encoding="utf-8") | |
| return tmp_path | |
| class TestFileSearchToolInit: | |
| def test_default_base_dir(self): | |
| with tempfile.TemporaryDirectory() as tmp: | |
| tool = FileSearchTool(base_directory=tmp) | |
| assert tool._base_directory.exists() | |
| def test_name_and_description(self): | |
| tool = FileSearchTool() | |
| assert tool.name == "file_search" | |
| assert "search" in tool.description.lower() or "file" in tool.description.lower() | |
| def test_parameters_schema(self): | |
| tool = FileSearchTool() | |
| schema = tool.parameters_schema | |
| assert schema["type"] == "object" | |
| assert "pattern" in schema["properties"] | |
| def test_allowed_extensions(self): | |
| tool = FileSearchTool(allowed_extensions=[".py", ".md"]) | |
| assert tool._allowed_extensions is not None | |
| assert ".py" in tool._allowed_extensions | |
| assert ".txt" not in tool._allowed_extensions | |
| class TestFileSearchToolFindFiles: | |
| def test_find_all_files(self, file_tree): | |
| tool = FileSearchTool(base_directory=file_tree) | |
| result = tool.execute(pattern="*") | |
| assert result.success is True | |
| assert "README.md" in result.output or "guide.md" in result.output | |
| def test_find_py_files(self, file_tree): | |
| tool = FileSearchTool(base_directory=file_tree) | |
| result = tool.execute(pattern="*.py") | |
| assert result.success is True | |
| assert "main.py" in result.output | |
| def test_find_md_files(self, file_tree): | |
| tool = FileSearchTool(base_directory=file_tree) | |
| result = tool.execute(pattern="*.md") | |
| assert result.success is True | |
| assert "README.md" in result.output | |
| def test_no_files_found(self, file_tree): | |
| tool = FileSearchTool(base_directory=file_tree) | |
| result = tool.execute(pattern="*.xyz") | |
| assert result.success is True | |
| assert "No files found" in result.output | |
| def test_max_results_limit(self, file_tree): | |
| tool = FileSearchTool(base_directory=file_tree, max_results=2) | |
| result = tool.execute(pattern="*.py") | |
| assert result.success is True | |
| def test_allowed_extensions_filter(self, file_tree): | |
| tool = FileSearchTool(base_directory=file_tree, allowed_extensions=[".md"]) | |
| result = tool.execute(pattern="*") | |
| assert result.success is True | |
| # Only .md files should appear | |
| assert "main.py" not in result.output | |
| assert "README.md" in result.output | |
| def test_hidden_files_excluded(self, file_tree): | |
| tool = FileSearchTool(base_directory=file_tree) | |
| result = tool.execute(pattern="*") | |
| assert result.success is True | |
| assert ".hidden" not in result.output | |
| def test_search_in_subdirectory(self, file_tree): | |
| tool = FileSearchTool(base_directory=file_tree) | |
| result = tool.execute(pattern="*.py", directory="src") | |
| assert result.success is True | |
| assert "main.py" in result.output | |
| def test_search_invalid_subdirectory(self, file_tree): | |
| tool = FileSearchTool(base_directory=file_tree) | |
| result = tool.execute(pattern="*", directory="nonexistent_dir") | |
| assert result.success is False | |
| def test_search_outside_base_dir_rejected(self, file_tree): | |
| tool = FileSearchTool(base_directory=file_tree) | |
| result = tool.execute(pattern="*", directory="../..") | |
| assert result.success is False | |
| class TestFileSearchToolContentSearch: | |
| def test_find_by_content(self, file_tree): | |
| tool = FileSearchTool(base_directory=file_tree) | |
| result = tool.execute(pattern="*.py", query="def main") | |
| assert result.success is True | |
| assert "main.py" in result.output | |
| def test_find_by_content_no_match(self, file_tree): | |
| tool = FileSearchTool(base_directory=file_tree) | |
| result = tool.execute(pattern="*.py", query="zzz_nonexistent_string_zzz") | |
| assert result.success is True | |
| assert "No matches" in result.output | |
| def test_find_by_regex(self, file_tree): | |
| tool = FileSearchTool(base_directory=file_tree) | |
| result = tool.execute(pattern="*.py", query=r"def \w+", regex=True) | |
| assert result.success is True | |
| def test_invalid_regex(self, file_tree): | |
| tool = FileSearchTool(base_directory=file_tree) | |
| result = tool.execute(pattern="*.py", query="[invalid(regex", regex=True) | |
| assert result.success is False | |
| assert result.error is not None | |
| assert "Invalid regex" in result.error | |
| def test_content_search_case_insensitive(self, file_tree): | |
| tool = FileSearchTool(base_directory=file_tree) | |
| result = tool.execute(pattern="*.md", query="PROJECT") | |
| assert result.success is True | |
| # README.md contains "Project" β case-insensitive match | |
| assert "README.md" in result.output | |
| def test_many_matches_limited(self, file_tree): | |
| """Test that total_matches limit works.""" | |
| # Create file with many matches | |
| big_file = file_tree / "big.py" | |
| big_file.write_text("x = 1\n" * 200, encoding="utf-8") | |
| tool = FileSearchTool(base_directory=file_tree) | |
| result = tool.execute(pattern="big.py", query="x = 1") | |
| assert result.success is True | |
| class TestFileSearchToolReadFile: | |
| def test_read_existing_file(self, file_tree): | |
| tool = FileSearchTool(base_directory=file_tree) | |
| result = tool.execute(read_file="README.md") | |
| assert result.success is True | |
| assert "Project" in result.output | |
| def test_read_nonexistent_file(self, file_tree): | |
| tool = FileSearchTool(base_directory=file_tree) | |
| result = tool.execute(read_file="nonexistent.txt") | |
| assert result.success is False | |
| def test_read_large_file_truncated(self, file_tree): | |
| large_file = file_tree / "large.txt" | |
| large_file.write_text("x" * 20000, encoding="utf-8") | |
| tool = FileSearchTool(base_directory=file_tree, max_read_size=100) | |
| result = tool.execute(read_file="large.txt") | |
| assert result.success is True | |
| assert "truncated" in result.output | |
| def test_read_file_outside_base_rejected(self, file_tree): | |
| tool = FileSearchTool(base_directory=file_tree) | |
| result = tool.execute(read_file="/etc/passwd") | |
| assert result.success is False | |
| def test_read_directory_fails(self, file_tree): | |
| tool = FileSearchTool(base_directory=file_tree) | |
| result = tool.execute(read_file="src") | |
| assert result.success is False | |
| def test_read_nested_file(self, file_tree): | |
| tool = FileSearchTool(base_directory=file_tree) | |
| result = tool.execute(read_file="src/main.py") | |
| assert result.success is True | |
| assert "main" in result.output | |
| class TestFileSearchToolPathSafety: | |
| def test_is_path_safe_inside(self, file_tree): | |
| tool = FileSearchTool(base_directory=file_tree) | |
| safe_path = file_tree / "README.md" | |
| assert tool._is_path_safe(safe_path) is True | |
| def test_is_path_safe_outside(self, file_tree): | |
| tool = FileSearchTool(base_directory=file_tree) | |
| outside = Path("/nonexistent_outside_dir/outside.txt") | |
| # Path outside file_tree - just check it returns bool | |
| result = tool._is_path_safe(outside) | |
| assert isinstance(result, bool) | |
| def test_extension_allowed_all(self, file_tree): | |
| tool = FileSearchTool(base_directory=file_tree) # no allowed_extensions | |
| assert tool._is_extension_allowed(Path("file.xyz")) is True | |
| def test_extension_allowed_filtered(self, file_tree): | |
| tool = FileSearchTool(base_directory=file_tree, allowed_extensions=[".py"]) | |
| assert tool._is_extension_allowed(Path("file.py")) is True | |
| assert tool._is_extension_allowed(Path("file.md")) is False | |
| class TestFileSearchDepthLimit: | |
| def test_max_depth_zero(self, file_tree): | |
| """With max_depth=0, should only search top-level.""" | |
| tool = FileSearchTool(base_directory=file_tree, max_depth=0) | |
| result = tool.execute(pattern="*.py") | |
| assert result.success is True | |
| # No .py files at top level | |
| assert "main.py" not in result.output or "No files" in result.output | |
| class TestFileSearchToolMissingCoverage: | |
| """Tests to cover remaining missing lines in file_search.py.""" | |
| def test_is_path_safe_oserror(self, file_tree): | |
| """Cover lines 120-121: OSError during resolve β return False.""" | |
| from unittest.mock import MagicMock | |
| tool = FileSearchTool(base_directory=file_tree) | |
| # Mock a path that raises OSError on resolve | |
| mock_path = MagicMock(spec=Path) | |
| mock_path.resolve.side_effect = OSError("resolve failed") | |
| result = tool._is_path_safe(mock_path) | |
| assert result is False | |
| def test_read_file_permission_error(self, tmp_path): | |
| """Cover lines 169-174: PermissionError reading file.""" | |
| from pathlib import Path as _Path | |
| from unittest.mock import patch | |
| base = tmp_path | |
| test_file = base / "secret.txt" | |
| test_file.write_text("secret content", encoding="utf-8") | |
| tool = FileSearchTool(base_directory=base) | |
| with patch.object(_Path, "open", side_effect=PermissionError("no access")): | |
| result = tool._read_file_content(test_file) | |
| assert result.success is False | |
| assert result.error is not None | |
| assert "Permission denied" in result.error or "no access" in result.error | |
| def test_read_file_oserror(self, tmp_path): | |
| """Cover lines 175-180: OSError reading file.""" | |
| from pathlib import Path as _Path | |
| from unittest.mock import patch | |
| base = tmp_path | |
| test_file = base / "broken.txt" | |
| test_file.write_text("content", encoding="utf-8") | |
| tool = FileSearchTool(base_directory=base) | |
| with patch.object(_Path, "open", side_effect=OSError("disk error")): | |
| result = tool._read_file_content(test_file) | |
| assert result.success is False | |
| assert result.error is not None | |
| assert "Error reading file" in result.error | |
| def test_find_files_unsafe_path_skipped(self, tmp_path): | |
| """Cover line 200: unsafe paths are skipped during find.""" | |
| base = tmp_path | |
| (base / "file.py").write_text("x = 1", encoding="utf-8") | |
| tool = FileSearchTool(base_directory=base) | |
| from unittest.mock import patch | |
| # Make _is_path_safe always return False so files get skipped | |
| with patch.object(tool, "_is_path_safe", return_value=False): | |
| files = tool._find_files("*.py", base) | |
| # Files should be skipped since _is_path_safe returned False for them | |
| assert files == [] | |
| def test_find_files_permission_error(self, tmp_path): | |
| """Cover lines 213-214: PermissionError in find_files is swallowed.""" | |
| base = tmp_path | |
| tool = FileSearchTool(base_directory=base) | |
| from unittest.mock import patch | |
| with patch.object(Path, "iterdir", side_effect=PermissionError("no access")): | |
| result = tool._find_files("*.py", base) | |
| # PermissionError is caught β empty list returned | |
| assert result == [] | |
| def test_find_files_oserror(self, tmp_path): | |
| """Cover lines 215-216: OSError in find_files is swallowed.""" | |
| base = tmp_path | |
| tool = FileSearchTool(base_directory=base) | |
| from unittest.mock import patch | |
| with patch.object(Path, "iterdir", side_effect=OSError("disk error")): | |
| result = tool._find_files("*.py", base) | |
| assert result == [] | |
| def test_search_in_file_too_large(self, tmp_path): | |
| """Cover line 225: file too large β return empty matches.""" | |
| base = tmp_path | |
| test_file = base / "huge.py" | |
| test_file.write_text("x = 1\n" * 10, encoding="utf-8") | |
| tool = FileSearchTool(base_directory=base, max_file_size=5) # very small limit | |
| matches = tool._search_in_file(test_file, "x = 1", use_regex=False) | |
| assert matches == [] | |
| def test_search_in_file_oserror(self, tmp_path): | |
| """Cover lines 238-239: OSError during file read in _search_in_file.""" | |
| from pathlib import Path as _Path | |
| from unittest.mock import patch | |
| base = tmp_path | |
| test_file = base / "test.py" | |
| test_file.write_text("x = 1\n", encoding="utf-8") | |
| tool = FileSearchTool(base_directory=base) | |
| with patch.object(_Path, "open", side_effect=OSError("disk error")): | |
| matches = tool._search_in_file(test_file, "x", use_regex=False) | |
| assert matches == [] | |
| def test_content_search_total_matches_limit(self, tmp_path): | |
| """Cover lines 329-331: total matches limit reached.""" | |
| from tools.file_search import MAX_MATCHES_PER_FILE, MAX_TOTAL_MATCHES | |
| base = tmp_path | |
| # Each file can contribute at most MAX_MATCHES_PER_FILE (100) matches | |
| # We need at least MAX_TOTAL_MATCHES // MAX_MATCHES_PER_FILE + 1 files | |
| num_files = MAX_TOTAL_MATCHES // MAX_MATCHES_PER_FILE + 2 | |
| for i in range(num_files): | |
| f = base / f"file{i:02d}.txt" | |
| f.write_text("match\n" * MAX_MATCHES_PER_FILE, encoding="utf-8") | |
| tool = FileSearchTool(base_directory=base) | |
| result = tool.execute(pattern="*.txt", query="match") | |
| assert result.success is True | |
| assert "search limited to" in result.output | |
| # βββββββββββββββββββββββββββ ShellTool Unix path βββββββββββββββββββββββββββββ | |
| class TestShellToolUnixPath: | |
| def test_execute_uses_unix_sh_on_non_windows(self): | |
| """Line 130: ShellTool uses /bin/sh on non-Windows.""" | |
| from unittest.mock import MagicMock, patch | |
| from tools.shell import ShellTool | |
| tool = ShellTool() | |
| mock_result = MagicMock() | |
| mock_result.returncode = 0 | |
| mock_result.stdout = "hello from unix" | |
| mock_result.stderr = "" | |
| with patch("tools.shell.sys.platform", "linux"), \ | |
| patch("tools.shell.subprocess.run", return_value=mock_result) as mock_run: | |
| result = tool.execute(command="echo hello") | |
| assert result.success is True | |
| # Verify /bin/sh was used | |
| call_kwargs = mock_run.call_args | |
| assert call_kwargs is not None | |
| # βββββββββββββββββββββββββββ FunctionTool param_type=empty βββββββββββββββββββ | |
| class TestFunctionToolParamTypeEmpty: | |
| def test_extract_params_schema_with_no_annotation_in_hints(self): | |
| """Line 61: param_type = str when param_type is inspect.Parameter.empty.""" | |
| import inspect | |
| from unittest.mock import patch | |
| from tools.function_calling import _extract_parameters_schema | |
| def my_func(x, y=5): | |
| pass | |
| # Patch get_type_hints to return inspect.Parameter.empty for 'x' | |
| with patch("tools.function_calling.get_type_hints", return_value={"x": inspect.Parameter.empty}): | |
| schema = _extract_parameters_schema(my_func) | |
| # Should have handled the empty annotation gracefully | |
| assert "properties" in schema | |
| assert "x" in schema["properties"] | |