autoscan / tests /test_scanner.py
Chris4K's picture
Upload 384 files
a2a5bfd verified
"""Tests for core.scanner — scan_repo orchestration with parallel execution."""
from unittest.mock import MagicMock, patch
import pytest
def _mock_finding(tool="bandit", rule="B101", sev="WARNING", conf="likely",
file_="test.py", line=1, msg="test", cat="security") -> dict:
return dict(
tool=tool, rule=rule, severity=sev, confidence=conf,
file=file_, line=line, message=msg, owasp=["A01"],
category=cat, remediation="",
)
class TestScanRepoLocalDirectory:
def test_local_dir_all_scanners_disabled(self, tmp_path):
"""Covers: temp dir creation, copytree, empty task list, ThreadPoolExecutor,
dedup/sort, path prefix, final log entry."""
(tmp_path / "dummy.py").write_text("x = 1\n", encoding="utf-8")
from core.scanner import scan_repo
findings, log = scan_repo(
str(tmp_path), run_security=False, run_performance=False, run_llm=False
)
assert isinstance(findings, list)
assert len(findings) == 0
assert log[0].startswith("OK")
def test_local_dir_returns_prefixed_file_paths(self, tmp_path):
"""File paths in findings are prefixed with the repo_url."""
(tmp_path / "app.py").write_text("x = 1\n", encoding="utf-8")
mock_f = _mock_finding(file_="app.py")
with patch.multiple(
"core.scanner",
ALL_SECURITY=[],
ALL_PERFORMANCE=[],
ALL_LLM=[],
bandit=MagicMock(return_value=([mock_f], "bandit: 1")),
detect_secrets=MagicMock(return_value=([], "ok")),
forbidden_files=MagicMock(return_value=([], "ok")),
pip_audit=MagicMock(return_value=([], "ok")),
hadolint=MagicMock(return_value=([], "ok")),
):
from core.scanner import scan_repo
findings, log = scan_repo(str(tmp_path), run_security=True,
run_performance=False, run_llm=False)
assert len(findings) == 1
assert str(tmp_path) in findings[0]["file"]
def test_progress_callback_is_called(self, tmp_path):
(tmp_path / "f.py").write_text("x=1", encoding="utf-8")
calls = []
from core.scanner import scan_repo
scan_repo(
str(tmp_path),
run_security=False, run_performance=False, run_llm=False,
progress_cb=lambda frac, desc: calls.append((frac, desc)),
)
assert len(calls) >= 1
# Progress fractions should be between 0 and 1
assert all(0.0 <= f <= 1.0 for f, _ in calls)
def test_dedup_removes_duplicate_findings(self, tmp_path):
(tmp_path / "f.py").write_text("x=1", encoding="utf-8")
dup = _mock_finding()
with patch.multiple(
"core.scanner",
ALL_SECURITY=[],
ALL_PERFORMANCE=[],
ALL_LLM=[],
bandit=MagicMock(return_value=([dup, dup.copy()], "ok")),
detect_secrets=MagicMock(return_value=([], "ok")),
forbidden_files=MagicMock(return_value=([], "ok")),
pip_audit=MagicMock(return_value=([], "ok")),
hadolint=MagicMock(return_value=([], "ok")),
):
from core.scanner import scan_repo
findings, _ = scan_repo(str(tmp_path), run_security=True,
run_performance=False, run_llm=False)
assert len(findings) == 1
def test_scanner_exception_is_logged(self, tmp_path):
(tmp_path / "f.py").write_text("x=1", encoding="utf-8")
def failing_bandit(work):
raise RuntimeError("disk full")
with patch.multiple(
"core.scanner",
ALL_SECURITY=[],
ALL_PERFORMANCE=[],
ALL_LLM=[],
bandit=failing_bandit,
detect_secrets=MagicMock(return_value=([], "ok")),
forbidden_files=MagicMock(return_value=([], "ok")),
pip_audit=MagicMock(return_value=([], "ok")),
hadolint=MagicMock(return_value=([], "ok")),
):
from core.scanner import scan_repo
findings, log = scan_repo(str(tmp_path), run_security=True,
run_performance=False, run_llm=False)
error_lines = [l for l in log if "ERROR" in l]
assert len(error_lines) >= 1
assert "disk full" in error_lines[0]
def test_perf_scanner_invoked_when_enabled(self, tmp_path):
(tmp_path / "f.py").write_text("x=1", encoding="utf-8")
mock_ruff = MagicMock(return_value=([], "ruff: 0"))
with patch.multiple(
"core.scanner",
ALL_SECURITY=[],
ALL_PERFORMANCE=[],
ALL_LLM=[],
ruff_perf=mock_ruff,
):
from core.scanner import scan_repo
scan_repo(str(tmp_path), run_security=False, run_performance=True,
run_llm=False)
mock_ruff.assert_called_once()
def test_llm_scanner_invoked_when_enabled(self, tmp_path):
(tmp_path / "f.py").write_text("x=1", encoding="utf-8")
mock_agent = MagicMock(return_value=([], "agent-audit: 0"))
with patch.multiple(
"core.scanner",
ALL_SECURITY=[],
ALL_PERFORMANCE=[],
ALL_LLM=[],
agent_audit=mock_agent,
):
from core.scanner import scan_repo
scan_repo(str(tmp_path), run_security=False, run_performance=False,
run_llm=True)
mock_agent.assert_called_once()
def test_log_contains_ok_prefix(self, tmp_path):
(tmp_path / "f.py").write_text("x=1", encoding="utf-8")
from core.scanner import scan_repo
_, log = scan_repo(str(tmp_path), run_security=False,
run_performance=False, run_llm=False)
assert log[0].startswith("OK")
def test_log_contains_finding_count(self, tmp_path):
(tmp_path / "f.py").write_text("x=1", encoding="utf-8")
from core.scanner import scan_repo
_, log = scan_repo(str(tmp_path), run_security=False,
run_performance=False, run_llm=False)
assert "0 unique" in log[0]
def test_max_workers_param_accepted(self, tmp_path):
(tmp_path / "f.py").write_text("x=1", encoding="utf-8")
from core.scanner import scan_repo
findings, log = scan_repo(str(tmp_path), run_security=False,
run_performance=False, run_llm=False,
max_workers=2)
assert isinstance(findings, list)
def test_copytree_excludes_venv(self, tmp_path):
"""Confirms copytree ignore patterns: .venv dir is not included."""
(tmp_path / ".venv").mkdir()
(tmp_path / ".venv" / "lib.py").write_text("x=1", encoding="utf-8")
(tmp_path / "app.py").write_text("y=2", encoding="utf-8")
from core.scanner import scan_repo
# Should not fail even with .venv directory present
findings, log = scan_repo(str(tmp_path), run_security=False,
run_performance=False, run_llm=False)
assert log[0].startswith("OK")
class TestScanRepoInvalidTarget:
def test_nonexistent_path_returns_error(self):
from core.scanner import scan_repo
findings, log = scan_repo("/nonexistent/path/xyz_does_not_exist_abc")
assert findings == []
assert any("neither URL nor" in l for l in log)
def test_nonexistent_returns_empty_findings(self):
from core.scanner import scan_repo
findings, log = scan_repo("/no/such/dir")
assert findings == []
class TestScanRepoGitUrl:
def test_git_clone_failure_returns_error_log(self):
from git import GitCommandError
from core.scanner import scan_repo
with patch("core.scanner.Repo") as mock_repo:
mock_repo.clone_from.side_effect = GitCommandError("clone", "failed")
findings, log = scan_repo("https://example.com/nonexistent/repo")
assert findings == []
assert any("git clone failed" in l for l in log)
def test_shallow_clone_uses_depth_1_by_default(self):
from core.scanner import scan_repo
with patch("core.scanner.Repo") as mock_repo:
mock_repo.clone_from.return_value = MagicMock()
scan_repo(
"https://huggingface.co/spaces/ns/name",
run_security=False, run_performance=False, run_llm=False,
)
call_kwargs = mock_repo.clone_from.call_args[1]
assert call_kwargs.get("depth") == 1
def test_deep_history_clone_has_no_depth(self):
from core.scanner import scan_repo
with patch("core.scanner.Repo") as mock_repo:
mock_repo.clone_from.return_value = MagicMock()
scan_repo(
"https://huggingface.co/spaces/ns/name",
deep_history=True,
run_security=False, run_performance=False, run_llm=False,
)
# When deep_history=True, clone_from is called without depth keyword
call_kwargs = mock_repo.clone_from.call_args[1]
assert "depth" not in call_kwargs
def test_hf_space_url_converted_to_git_url(self):
"""hf_space_to_git transforms the URL before passing to Repo.clone_from."""
from core.scanner import scan_repo
with patch("core.scanner.Repo") as mock_repo:
mock_repo.clone_from.return_value = MagicMock()
scan_repo(
"https://huggingface.co/spaces/owner/myspace",
run_security=False, run_performance=False, run_llm=False,
)
# The git URL passed must come from hf_space_to_git
assert mock_repo.clone_from.called
cloned_url = mock_repo.clone_from.call_args[0][0]
assert "huggingface.co/spaces/owner/myspace" in cloned_url
class TestScanRepoSemgrepTaskBranches:
"""Cover scanner.py lines touched only when semgrep_pack is added to tasks
(ALL_SECURITY / ALL_PERFORMANCE / ALL_LLM are non-empty) and when
semgrep_pack returns a plain list (not a tuple)."""
def test_security_semgrep_task_added_and_list_result_merged(self, tmp_path):
"""ALL_SECURITY entry → lambda added (line 87) and list result merged (line 126)."""
(tmp_path / "f.py").write_text("x=1", encoding="utf-8")
fake_finding = _mock_finding(tool="Semgrep")
mock_semgrep = MagicMock(return_value=([fake_finding], "ok"))
with patch.multiple(
"core.scanner",
ALL_SECURITY=[("SecPack", tmp_path / "rules.yaml", "security")],
ALL_PERFORMANCE=[],
ALL_LLM=[],
ALL_SUPPLY_CHAIN=[],
semgrep_pack=mock_semgrep,
bandit=MagicMock(return_value=([], "ok")),
detect_secrets=MagicMock(return_value=([], "ok")),
forbidden_files=MagicMock(return_value=([], "ok")),
pip_audit=MagicMock(return_value=([], "ok")),
hadolint=MagicMock(return_value=([], "ok")),
):
from core.scanner import scan_repo
findings, log = scan_repo(str(tmp_path), run_security=True,
run_performance=False, run_llm=False)
mock_semgrep.assert_called_once()
assert len(findings) == 1
def test_performance_semgrep_task_added_from_all_performance(self, tmp_path):
"""ALL_PERFORMANCE entry → for-loop body executed (line 98)."""
(tmp_path / "f.py").write_text("x=1", encoding="utf-8")
mock_semgrep = MagicMock(return_value=([], "ok"))
with patch.multiple(
"core.scanner",
ALL_SECURITY=[],
ALL_PERFORMANCE=[("PerfPack", tmp_path / "perf.yaml", "performance")],
ALL_LLM=[],
ALL_SUPPLY_CHAIN=[],
semgrep_pack=mock_semgrep,
ruff_perf=MagicMock(return_value=([], "ok")),
):
from core.scanner import scan_repo
scan_repo(str(tmp_path), run_security=False, run_performance=True,
run_llm=False)
mock_semgrep.assert_called_once()
def test_llm_semgrep_task_added_from_all_llm(self, tmp_path):
"""ALL_LLM entry → for-loop body executed (line 103)."""
(tmp_path / "f.py").write_text("x=1", encoding="utf-8")
mock_semgrep = MagicMock(return_value=([], "ok"))
with patch.multiple(
"core.scanner",
ALL_SECURITY=[],
ALL_PERFORMANCE=[],
ALL_LLM=[("LLMPack", tmp_path / "llm.yaml", "llm")],
ALL_SUPPLY_CHAIN=[],
semgrep_pack=mock_semgrep,
agent_audit=MagicMock(return_value=([], "ok")),
):
from core.scanner import scan_repo
scan_repo(str(tmp_path), run_security=False, run_performance=False,
run_llm=True)
mock_semgrep.assert_called_once()
def test_deep_history_with_security_adds_gitleaks_task(self):
"""deep_history=True + run_security=True → gitleaks task added (line 94)."""
mock_gitleaks = MagicMock(return_value=([], "gitleaks: 0"))
with patch("core.scanner.Repo") as mock_repo, \
patch.multiple(
"core.scanner",
ALL_SECURITY=[],
ALL_PERFORMANCE=[],
ALL_LLM=[],
bandit=MagicMock(return_value=([], "ok")),
detect_secrets=MagicMock(return_value=([], "ok")),
forbidden_files=MagicMock(return_value=([], "ok")),
pip_audit=MagicMock(return_value=([], "ok")),
hadolint=MagicMock(return_value=([], "ok")),
gitleaks=mock_gitleaks,
):
mock_repo.clone_from.return_value = MagicMock()
from core.scanner import scan_repo
findings, log = scan_repo(
"https://huggingface.co/spaces/ns/name",
deep_history=True,
run_security=True,
run_performance=False,
run_llm=False,
)
mock_gitleaks.assert_called_once()
assert log[0].startswith("OK")
# ---------------------------------------------------------------------------
# TestScanRepoToolsParam — per-tool ``tools`` frozenset parameter
# ---------------------------------------------------------------------------
class TestScanRepoToolsParam:
"""Tests for the ``tools`` kwarg added in v5: restricts which individual
tools execute regardless of the run_security/performance/llm flags."""
def _security_mocks(self):
"""Return a dict of MagicMock patches for all security-group tools."""
return dict(
ALL_SECURITY=[],
ALL_PERFORMANCE=[],
ALL_LLM=[],
ALL_SUPPLY_CHAIN=[],
bandit=MagicMock(return_value=([], "bandit: 0")),
detect_secrets=MagicMock(return_value=([], "detect-secrets: 0")),
forbidden_files=MagicMock(return_value=([], "forbidden-files: 0")),
pip_audit=MagicMock(return_value=([], "pip-audit: 0")),
hadolint=MagicMock(return_value=([], "hadolint: 0")),
ruff_perf=MagicMock(return_value=([], "ruff: 0")),
agent_audit=MagicMock(return_value=([], "agent-audit: 0")),
)
def test_tools_none_runs_all_enabled_tools(self, tmp_path):
"""tools=None (default) → all tools enabled by run_* flags execute."""
(tmp_path / "f.py").write_text("x=1", encoding="utf-8")
mocks = self._security_mocks()
mocks["bandit"] = MagicMock(return_value=([_mock_finding()], "bandit: 1"))
with patch.multiple("core.scanner", **mocks):
from core.scanner import scan_repo
findings, log = scan_repo(
str(tmp_path),
run_security=True, run_performance=False, run_llm=False,
tools=None,
)
mocks["bandit"].assert_called_once()
assert len(findings) == 1
def test_tools_bandit_only_skips_other_security_tools(self, tmp_path):
"""tools=frozenset({'bandit'}) → only bandit runs, pip-audit/hadolint etc. skipped."""
(tmp_path / "f.py").write_text("x=1", encoding="utf-8")
mocks = self._security_mocks()
with patch.multiple("core.scanner", **mocks):
from core.scanner import scan_repo
scan_repo(
str(tmp_path),
run_security=True, run_performance=False, run_llm=False,
tools=frozenset({"bandit"}),
)
mocks["bandit"].assert_called_once()
mocks["pip_audit"].assert_not_called()
mocks["hadolint"].assert_not_called()
mocks["detect_secrets"].assert_not_called()
def test_tools_pip_audit_only(self, tmp_path):
"""tools=frozenset({'pip-audit'}) → only pip-audit executes."""
(tmp_path / "f.py").write_text("x=1", encoding="utf-8")
mocks = self._security_mocks()
with patch.multiple("core.scanner", **mocks):
from core.scanner import scan_repo
scan_repo(
str(tmp_path),
run_security=True, run_performance=False, run_llm=False,
tools=frozenset({"pip-audit"}),
)
mocks["pip_audit"].assert_called_once()
mocks["bandit"].assert_not_called()
mocks["ruff_perf"].assert_not_called()
def test_tools_ruff_only_skips_security_tools(self, tmp_path):
"""tools=frozenset({'ruff'}) → ruff-perf runs, security tools do not."""
(tmp_path / "f.py").write_text("x=1", encoding="utf-8")
mocks = self._security_mocks()
with patch.multiple("core.scanner", **mocks):
from core.scanner import scan_repo
scan_repo(
str(tmp_path),
run_security=False, run_performance=True, run_llm=False,
tools=frozenset({"ruff"}),
)
mocks["ruff_perf"].assert_called_once()
mocks["bandit"].assert_not_called()
mocks["agent_audit"].assert_not_called()
def test_tools_agent_audit_only(self, tmp_path):
"""tools=frozenset({'agent-audit'}) → only agent-audit runs."""
(tmp_path / "f.py").write_text("x=1", encoding="utf-8")
mocks = self._security_mocks()
with patch.multiple("core.scanner", **mocks):
from core.scanner import scan_repo
scan_repo(
str(tmp_path),
run_security=False, run_performance=False, run_llm=True,
tools=frozenset({"agent-audit"}),
)
mocks["agent_audit"].assert_called_once()
mocks["bandit"].assert_not_called()
mocks["ruff_perf"].assert_not_called()
def test_tools_semgrep_only_skips_bandit(self, tmp_path):
"""tools=frozenset({'semgrep'}) → Semgrep tasks run, bandit does not."""
(tmp_path / "f.py").write_text("x=1", encoding="utf-8")
mock_semgrep = MagicMock(return_value=([], "ok"))
mocks = self._security_mocks()
mocks["semgrep_pack"] = mock_semgrep
mocks["ALL_SECURITY"] = [("SecPack", tmp_path / "rules.yaml", "security")]
with patch.multiple("core.scanner", **mocks):
from core.scanner import scan_repo
scan_repo(
str(tmp_path),
run_security=True, run_performance=False, run_llm=False,
tools=frozenset({"semgrep"}),
)
mock_semgrep.assert_called_once()
mocks["bandit"].assert_not_called()
def test_tools_without_semgrep_skips_semgrep_tasks(self, tmp_path):
"""tools without 'semgrep' → Semgrep tasks are filtered out."""
(tmp_path / "f.py").write_text("x=1", encoding="utf-8")
mock_semgrep = MagicMock(return_value=([], "ok"))
mocks = self._security_mocks()
mocks["semgrep_pack"] = mock_semgrep
mocks["ALL_SECURITY"] = [("SecPack", tmp_path / "rules.yaml", "security")]
with patch.multiple("core.scanner", **mocks):
from core.scanner import scan_repo
scan_repo(
str(tmp_path),
run_security=True, run_performance=False, run_llm=False,
tools=frozenset({"bandit"}),
)
mock_semgrep.assert_not_called()
mocks["bandit"].assert_called_once()
def test_tools_multiple_tools(self, tmp_path):
"""tools with bandit+pip-audit → both run, others skipped."""
(tmp_path / "f.py").write_text("x=1", encoding="utf-8")
mocks = self._security_mocks()
with patch.multiple("core.scanner", **mocks):
from core.scanner import scan_repo
scan_repo(
str(tmp_path),
run_security=True, run_performance=False, run_llm=False,
tools=frozenset({"bandit", "pip-audit"}),
)
mocks["bandit"].assert_called_once()
mocks["pip_audit"].assert_called_once()
mocks["hadolint"].assert_not_called()
mocks["detect_secrets"].assert_not_called()
def test_tools_empty_frozenset_runs_nothing(self, tmp_path):
"""tools=frozenset() → all tasks filtered → no findings, OK log."""
(tmp_path / "f.py").write_text("x=1", encoding="utf-8")
mocks = self._security_mocks()
with patch.multiple("core.scanner", **mocks):
from core.scanner import scan_repo
findings, log = scan_repo(
str(tmp_path),
run_security=True, run_performance=True, run_llm=True,
tools=frozenset(),
)
assert findings == []
assert log[0].startswith("OK")
mocks["bandit"].assert_not_called()
mocks["ruff_perf"].assert_not_called()
mocks["agent_audit"].assert_not_called()
def test_tools_gitleaks_only_with_deep_history(self, tmp_path):
"""tools={'gitleaks'} with deep_history=True → gitleaks runs."""
mocks = self._security_mocks()
mock_gitleaks = MagicMock(return_value=([], "gitleaks: 0"))
mocks["gitleaks"] = mock_gitleaks
with patch("core.scanner.Repo") as mock_repo, \
patch.multiple("core.scanner", **mocks):
mock_repo.clone_from.return_value = MagicMock()
from core.scanner import scan_repo
scan_repo(
"https://huggingface.co/spaces/ns/name",
deep_history=True,
run_security=True, run_performance=False, run_llm=False,
tools=frozenset({"gitleaks"}),
)
mock_gitleaks.assert_called_once()
mocks["bandit"].assert_not_called()
def test_tools_gitleaks_without_deep_history_not_added(self, tmp_path):
"""gitleaks is never added when deep_history=False, even if in tools."""
(tmp_path / "f.py").write_text("x=1", encoding="utf-8")
mocks = self._security_mocks()
mock_gitleaks = MagicMock(return_value=([], "gitleaks: 0"))
mocks["gitleaks"] = mock_gitleaks
with patch.multiple("core.scanner", **mocks):
from core.scanner import scan_repo
scan_repo(
str(tmp_path),
deep_history=False,
run_security=True, run_performance=False, run_llm=False,
tools=frozenset({"gitleaks"}),
)
mock_gitleaks.assert_not_called()