"""Tests for core.scanner — scan_repo orchestration with parallel execution.""" from unittest.mock import MagicMock, patch import pytest def _mock_finding(tool="bandit", rule="B101", sev="WARNING", conf="likely", file_="test.py", line=1, msg="test", cat="security") -> dict: return dict( tool=tool, rule=rule, severity=sev, confidence=conf, file=file_, line=line, message=msg, owasp=["A01"], category=cat, remediation="", ) class TestScanRepoLocalDirectory: def test_local_dir_all_scanners_disabled(self, tmp_path): """Covers: temp dir creation, copytree, empty task list, ThreadPoolExecutor, dedup/sort, path prefix, final log entry.""" (tmp_path / "dummy.py").write_text("x = 1\n", encoding="utf-8") from core.scanner import scan_repo findings, log = scan_repo( str(tmp_path), run_security=False, run_performance=False, run_llm=False ) assert isinstance(findings, list) assert len(findings) == 0 assert log[0].startswith("OK") def test_local_dir_returns_prefixed_file_paths(self, tmp_path): """File paths in findings are prefixed with the repo_url.""" (tmp_path / "app.py").write_text("x = 1\n", encoding="utf-8") mock_f = _mock_finding(file_="app.py") with patch.multiple( "core.scanner", ALL_SECURITY=[], ALL_PERFORMANCE=[], ALL_LLM=[], bandit=MagicMock(return_value=([mock_f], "bandit: 1")), detect_secrets=MagicMock(return_value=([], "ok")), forbidden_files=MagicMock(return_value=([], "ok")), pip_audit=MagicMock(return_value=([], "ok")), hadolint=MagicMock(return_value=([], "ok")), ): from core.scanner import scan_repo findings, log = scan_repo(str(tmp_path), run_security=True, run_performance=False, run_llm=False) assert len(findings) == 1 assert str(tmp_path) in findings[0]["file"] def test_progress_callback_is_called(self, tmp_path): (tmp_path / "f.py").write_text("x=1", encoding="utf-8") calls = [] from core.scanner import scan_repo scan_repo( str(tmp_path), run_security=False, run_performance=False, run_llm=False, progress_cb=lambda frac, desc: calls.append((frac, desc)), ) assert len(calls) >= 1 # Progress fractions should be between 0 and 1 assert all(0.0 <= f <= 1.0 for f, _ in calls) def test_dedup_removes_duplicate_findings(self, tmp_path): (tmp_path / "f.py").write_text("x=1", encoding="utf-8") dup = _mock_finding() with patch.multiple( "core.scanner", ALL_SECURITY=[], ALL_PERFORMANCE=[], ALL_LLM=[], bandit=MagicMock(return_value=([dup, dup.copy()], "ok")), detect_secrets=MagicMock(return_value=([], "ok")), forbidden_files=MagicMock(return_value=([], "ok")), pip_audit=MagicMock(return_value=([], "ok")), hadolint=MagicMock(return_value=([], "ok")), ): from core.scanner import scan_repo findings, _ = scan_repo(str(tmp_path), run_security=True, run_performance=False, run_llm=False) assert len(findings) == 1 def test_scanner_exception_is_logged(self, tmp_path): (tmp_path / "f.py").write_text("x=1", encoding="utf-8") def failing_bandit(work): raise RuntimeError("disk full") with patch.multiple( "core.scanner", ALL_SECURITY=[], ALL_PERFORMANCE=[], ALL_LLM=[], bandit=failing_bandit, detect_secrets=MagicMock(return_value=([], "ok")), forbidden_files=MagicMock(return_value=([], "ok")), pip_audit=MagicMock(return_value=([], "ok")), hadolint=MagicMock(return_value=([], "ok")), ): from core.scanner import scan_repo findings, log = scan_repo(str(tmp_path), run_security=True, run_performance=False, run_llm=False) error_lines = [l for l in log if "ERROR" in l] assert len(error_lines) >= 1 assert "disk full" in error_lines[0] def test_perf_scanner_invoked_when_enabled(self, tmp_path): (tmp_path / "f.py").write_text("x=1", encoding="utf-8") mock_ruff = MagicMock(return_value=([], "ruff: 0")) with patch.multiple( "core.scanner", ALL_SECURITY=[], ALL_PERFORMANCE=[], ALL_LLM=[], ruff_perf=mock_ruff, ): from core.scanner import scan_repo scan_repo(str(tmp_path), run_security=False, run_performance=True, run_llm=False) mock_ruff.assert_called_once() def test_llm_scanner_invoked_when_enabled(self, tmp_path): (tmp_path / "f.py").write_text("x=1", encoding="utf-8") mock_agent = MagicMock(return_value=([], "agent-audit: 0")) with patch.multiple( "core.scanner", ALL_SECURITY=[], ALL_PERFORMANCE=[], ALL_LLM=[], agent_audit=mock_agent, ): from core.scanner import scan_repo scan_repo(str(tmp_path), run_security=False, run_performance=False, run_llm=True) mock_agent.assert_called_once() def test_log_contains_ok_prefix(self, tmp_path): (tmp_path / "f.py").write_text("x=1", encoding="utf-8") from core.scanner import scan_repo _, log = scan_repo(str(tmp_path), run_security=False, run_performance=False, run_llm=False) assert log[0].startswith("OK") def test_log_contains_finding_count(self, tmp_path): (tmp_path / "f.py").write_text("x=1", encoding="utf-8") from core.scanner import scan_repo _, log = scan_repo(str(tmp_path), run_security=False, run_performance=False, run_llm=False) assert "0 unique" in log[0] def test_max_workers_param_accepted(self, tmp_path): (tmp_path / "f.py").write_text("x=1", encoding="utf-8") from core.scanner import scan_repo findings, log = scan_repo(str(tmp_path), run_security=False, run_performance=False, run_llm=False, max_workers=2) assert isinstance(findings, list) def test_copytree_excludes_venv(self, tmp_path): """Confirms copytree ignore patterns: .venv dir is not included.""" (tmp_path / ".venv").mkdir() (tmp_path / ".venv" / "lib.py").write_text("x=1", encoding="utf-8") (tmp_path / "app.py").write_text("y=2", encoding="utf-8") from core.scanner import scan_repo # Should not fail even with .venv directory present findings, log = scan_repo(str(tmp_path), run_security=False, run_performance=False, run_llm=False) assert log[0].startswith("OK") class TestScanRepoInvalidTarget: def test_nonexistent_path_returns_error(self): from core.scanner import scan_repo findings, log = scan_repo("/nonexistent/path/xyz_does_not_exist_abc") assert findings == [] assert any("neither URL nor" in l for l in log) def test_nonexistent_returns_empty_findings(self): from core.scanner import scan_repo findings, log = scan_repo("/no/such/dir") assert findings == [] class TestScanRepoGitUrl: def test_git_clone_failure_returns_error_log(self): from git import GitCommandError from core.scanner import scan_repo with patch("core.scanner.Repo") as mock_repo: mock_repo.clone_from.side_effect = GitCommandError("clone", "failed") findings, log = scan_repo("https://example.com/nonexistent/repo") assert findings == [] assert any("git clone failed" in l for l in log) def test_shallow_clone_uses_depth_1_by_default(self): from core.scanner import scan_repo with patch("core.scanner.Repo") as mock_repo: mock_repo.clone_from.return_value = MagicMock() scan_repo( "https://huggingface.co/spaces/ns/name", run_security=False, run_performance=False, run_llm=False, ) call_kwargs = mock_repo.clone_from.call_args[1] assert call_kwargs.get("depth") == 1 def test_deep_history_clone_has_no_depth(self): from core.scanner import scan_repo with patch("core.scanner.Repo") as mock_repo: mock_repo.clone_from.return_value = MagicMock() scan_repo( "https://huggingface.co/spaces/ns/name", deep_history=True, run_security=False, run_performance=False, run_llm=False, ) # When deep_history=True, clone_from is called without depth keyword call_kwargs = mock_repo.clone_from.call_args[1] assert "depth" not in call_kwargs def test_hf_space_url_converted_to_git_url(self): """hf_space_to_git transforms the URL before passing to Repo.clone_from.""" from core.scanner import scan_repo with patch("core.scanner.Repo") as mock_repo: mock_repo.clone_from.return_value = MagicMock() scan_repo( "https://huggingface.co/spaces/owner/myspace", run_security=False, run_performance=False, run_llm=False, ) # The git URL passed must come from hf_space_to_git assert mock_repo.clone_from.called cloned_url = mock_repo.clone_from.call_args[0][0] assert "huggingface.co/spaces/owner/myspace" in cloned_url class TestScanRepoSemgrepTaskBranches: """Cover scanner.py lines touched only when semgrep_pack is added to tasks (ALL_SECURITY / ALL_PERFORMANCE / ALL_LLM are non-empty) and when semgrep_pack returns a plain list (not a tuple).""" def test_security_semgrep_task_added_and_list_result_merged(self, tmp_path): """ALL_SECURITY entry → lambda added (line 87) and list result merged (line 126).""" (tmp_path / "f.py").write_text("x=1", encoding="utf-8") fake_finding = _mock_finding(tool="Semgrep") mock_semgrep = MagicMock(return_value=([fake_finding], "ok")) with patch.multiple( "core.scanner", ALL_SECURITY=[("SecPack", tmp_path / "rules.yaml", "security")], ALL_PERFORMANCE=[], ALL_LLM=[], ALL_SUPPLY_CHAIN=[], semgrep_pack=mock_semgrep, bandit=MagicMock(return_value=([], "ok")), detect_secrets=MagicMock(return_value=([], "ok")), forbidden_files=MagicMock(return_value=([], "ok")), pip_audit=MagicMock(return_value=([], "ok")), hadolint=MagicMock(return_value=([], "ok")), ): from core.scanner import scan_repo findings, log = scan_repo(str(tmp_path), run_security=True, run_performance=False, run_llm=False) mock_semgrep.assert_called_once() assert len(findings) == 1 def test_performance_semgrep_task_added_from_all_performance(self, tmp_path): """ALL_PERFORMANCE entry → for-loop body executed (line 98).""" (tmp_path / "f.py").write_text("x=1", encoding="utf-8") mock_semgrep = MagicMock(return_value=([], "ok")) with patch.multiple( "core.scanner", ALL_SECURITY=[], ALL_PERFORMANCE=[("PerfPack", tmp_path / "perf.yaml", "performance")], ALL_LLM=[], ALL_SUPPLY_CHAIN=[], semgrep_pack=mock_semgrep, ruff_perf=MagicMock(return_value=([], "ok")), ): from core.scanner import scan_repo scan_repo(str(tmp_path), run_security=False, run_performance=True, run_llm=False) mock_semgrep.assert_called_once() def test_llm_semgrep_task_added_from_all_llm(self, tmp_path): """ALL_LLM entry → for-loop body executed (line 103).""" (tmp_path / "f.py").write_text("x=1", encoding="utf-8") mock_semgrep = MagicMock(return_value=([], "ok")) with patch.multiple( "core.scanner", ALL_SECURITY=[], ALL_PERFORMANCE=[], ALL_LLM=[("LLMPack", tmp_path / "llm.yaml", "llm")], ALL_SUPPLY_CHAIN=[], semgrep_pack=mock_semgrep, agent_audit=MagicMock(return_value=([], "ok")), ): from core.scanner import scan_repo scan_repo(str(tmp_path), run_security=False, run_performance=False, run_llm=True) mock_semgrep.assert_called_once() def test_deep_history_with_security_adds_gitleaks_task(self): """deep_history=True + run_security=True → gitleaks task added (line 94).""" mock_gitleaks = MagicMock(return_value=([], "gitleaks: 0")) with patch("core.scanner.Repo") as mock_repo, \ patch.multiple( "core.scanner", ALL_SECURITY=[], ALL_PERFORMANCE=[], ALL_LLM=[], bandit=MagicMock(return_value=([], "ok")), detect_secrets=MagicMock(return_value=([], "ok")), forbidden_files=MagicMock(return_value=([], "ok")), pip_audit=MagicMock(return_value=([], "ok")), hadolint=MagicMock(return_value=([], "ok")), gitleaks=mock_gitleaks, ): mock_repo.clone_from.return_value = MagicMock() from core.scanner import scan_repo findings, log = scan_repo( "https://huggingface.co/spaces/ns/name", deep_history=True, run_security=True, run_performance=False, run_llm=False, ) mock_gitleaks.assert_called_once() assert log[0].startswith("OK") # --------------------------------------------------------------------------- # TestScanRepoToolsParam — per-tool ``tools`` frozenset parameter # --------------------------------------------------------------------------- class TestScanRepoToolsParam: """Tests for the ``tools`` kwarg added in v5: restricts which individual tools execute regardless of the run_security/performance/llm flags.""" def _security_mocks(self): """Return a dict of MagicMock patches for all security-group tools.""" return dict( ALL_SECURITY=[], ALL_PERFORMANCE=[], ALL_LLM=[], ALL_SUPPLY_CHAIN=[], bandit=MagicMock(return_value=([], "bandit: 0")), detect_secrets=MagicMock(return_value=([], "detect-secrets: 0")), forbidden_files=MagicMock(return_value=([], "forbidden-files: 0")), pip_audit=MagicMock(return_value=([], "pip-audit: 0")), hadolint=MagicMock(return_value=([], "hadolint: 0")), ruff_perf=MagicMock(return_value=([], "ruff: 0")), agent_audit=MagicMock(return_value=([], "agent-audit: 0")), ) def test_tools_none_runs_all_enabled_tools(self, tmp_path): """tools=None (default) → all tools enabled by run_* flags execute.""" (tmp_path / "f.py").write_text("x=1", encoding="utf-8") mocks = self._security_mocks() mocks["bandit"] = MagicMock(return_value=([_mock_finding()], "bandit: 1")) with patch.multiple("core.scanner", **mocks): from core.scanner import scan_repo findings, log = scan_repo( str(tmp_path), run_security=True, run_performance=False, run_llm=False, tools=None, ) mocks["bandit"].assert_called_once() assert len(findings) == 1 def test_tools_bandit_only_skips_other_security_tools(self, tmp_path): """tools=frozenset({'bandit'}) → only bandit runs, pip-audit/hadolint etc. skipped.""" (tmp_path / "f.py").write_text("x=1", encoding="utf-8") mocks = self._security_mocks() with patch.multiple("core.scanner", **mocks): from core.scanner import scan_repo scan_repo( str(tmp_path), run_security=True, run_performance=False, run_llm=False, tools=frozenset({"bandit"}), ) mocks["bandit"].assert_called_once() mocks["pip_audit"].assert_not_called() mocks["hadolint"].assert_not_called() mocks["detect_secrets"].assert_not_called() def test_tools_pip_audit_only(self, tmp_path): """tools=frozenset({'pip-audit'}) → only pip-audit executes.""" (tmp_path / "f.py").write_text("x=1", encoding="utf-8") mocks = self._security_mocks() with patch.multiple("core.scanner", **mocks): from core.scanner import scan_repo scan_repo( str(tmp_path), run_security=True, run_performance=False, run_llm=False, tools=frozenset({"pip-audit"}), ) mocks["pip_audit"].assert_called_once() mocks["bandit"].assert_not_called() mocks["ruff_perf"].assert_not_called() def test_tools_ruff_only_skips_security_tools(self, tmp_path): """tools=frozenset({'ruff'}) → ruff-perf runs, security tools do not.""" (tmp_path / "f.py").write_text("x=1", encoding="utf-8") mocks = self._security_mocks() with patch.multiple("core.scanner", **mocks): from core.scanner import scan_repo scan_repo( str(tmp_path), run_security=False, run_performance=True, run_llm=False, tools=frozenset({"ruff"}), ) mocks["ruff_perf"].assert_called_once() mocks["bandit"].assert_not_called() mocks["agent_audit"].assert_not_called() def test_tools_agent_audit_only(self, tmp_path): """tools=frozenset({'agent-audit'}) → only agent-audit runs.""" (tmp_path / "f.py").write_text("x=1", encoding="utf-8") mocks = self._security_mocks() with patch.multiple("core.scanner", **mocks): from core.scanner import scan_repo scan_repo( str(tmp_path), run_security=False, run_performance=False, run_llm=True, tools=frozenset({"agent-audit"}), ) mocks["agent_audit"].assert_called_once() mocks["bandit"].assert_not_called() mocks["ruff_perf"].assert_not_called() def test_tools_semgrep_only_skips_bandit(self, tmp_path): """tools=frozenset({'semgrep'}) → Semgrep tasks run, bandit does not.""" (tmp_path / "f.py").write_text("x=1", encoding="utf-8") mock_semgrep = MagicMock(return_value=([], "ok")) mocks = self._security_mocks() mocks["semgrep_pack"] = mock_semgrep mocks["ALL_SECURITY"] = [("SecPack", tmp_path / "rules.yaml", "security")] with patch.multiple("core.scanner", **mocks): from core.scanner import scan_repo scan_repo( str(tmp_path), run_security=True, run_performance=False, run_llm=False, tools=frozenset({"semgrep"}), ) mock_semgrep.assert_called_once() mocks["bandit"].assert_not_called() def test_tools_without_semgrep_skips_semgrep_tasks(self, tmp_path): """tools without 'semgrep' → Semgrep tasks are filtered out.""" (tmp_path / "f.py").write_text("x=1", encoding="utf-8") mock_semgrep = MagicMock(return_value=([], "ok")) mocks = self._security_mocks() mocks["semgrep_pack"] = mock_semgrep mocks["ALL_SECURITY"] = [("SecPack", tmp_path / "rules.yaml", "security")] with patch.multiple("core.scanner", **mocks): from core.scanner import scan_repo scan_repo( str(tmp_path), run_security=True, run_performance=False, run_llm=False, tools=frozenset({"bandit"}), ) mock_semgrep.assert_not_called() mocks["bandit"].assert_called_once() def test_tools_multiple_tools(self, tmp_path): """tools with bandit+pip-audit → both run, others skipped.""" (tmp_path / "f.py").write_text("x=1", encoding="utf-8") mocks = self._security_mocks() with patch.multiple("core.scanner", **mocks): from core.scanner import scan_repo scan_repo( str(tmp_path), run_security=True, run_performance=False, run_llm=False, tools=frozenset({"bandit", "pip-audit"}), ) mocks["bandit"].assert_called_once() mocks["pip_audit"].assert_called_once() mocks["hadolint"].assert_not_called() mocks["detect_secrets"].assert_not_called() def test_tools_empty_frozenset_runs_nothing(self, tmp_path): """tools=frozenset() → all tasks filtered → no findings, OK log.""" (tmp_path / "f.py").write_text("x=1", encoding="utf-8") mocks = self._security_mocks() with patch.multiple("core.scanner", **mocks): from core.scanner import scan_repo findings, log = scan_repo( str(tmp_path), run_security=True, run_performance=True, run_llm=True, tools=frozenset(), ) assert findings == [] assert log[0].startswith("OK") mocks["bandit"].assert_not_called() mocks["ruff_perf"].assert_not_called() mocks["agent_audit"].assert_not_called() def test_tools_gitleaks_only_with_deep_history(self, tmp_path): """tools={'gitleaks'} with deep_history=True → gitleaks runs.""" mocks = self._security_mocks() mock_gitleaks = MagicMock(return_value=([], "gitleaks: 0")) mocks["gitleaks"] = mock_gitleaks with patch("core.scanner.Repo") as mock_repo, \ patch.multiple("core.scanner", **mocks): mock_repo.clone_from.return_value = MagicMock() from core.scanner import scan_repo scan_repo( "https://huggingface.co/spaces/ns/name", deep_history=True, run_security=True, run_performance=False, run_llm=False, tools=frozenset({"gitleaks"}), ) mock_gitleaks.assert_called_once() mocks["bandit"].assert_not_called() def test_tools_gitleaks_without_deep_history_not_added(self, tmp_path): """gitleaks is never added when deep_history=False, even if in tools.""" (tmp_path / "f.py").write_text("x=1", encoding="utf-8") mocks = self._security_mocks() mock_gitleaks = MagicMock(return_value=([], "gitleaks: 0")) mocks["gitleaks"] = mock_gitleaks with patch.multiple("core.scanner", **mocks): from core.scanner import scan_repo scan_repo( str(tmp_path), deep_history=False, run_security=True, run_performance=False, run_llm=False, tools=frozenset({"gitleaks"}), ) mock_gitleaks.assert_not_called()