autoscan / core /scanner.py
Chris4K's picture
Upload 384 files
a2a5bfd verified
"""scan_repo() orchestration β€” shared by app.py (Gradio UI) and cli.py.
Uses ThreadPoolExecutor so independent scanners run in parallel. Each scanner
spawns its own subprocess; threads only coordinate on the findings list, so
there are no GIL-contention issues and no shared temp-file races.
"""
import os
import shutil
import tempfile
from collections.abc import Callable
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from git import GitCommandError, Repo
from core.helpers import have_binary
from core.hf import hf_space_to_git
from core.models import dedup_findings, sort_findings
from core.scoring import score_finding
from rules import ALL_LLM, ALL_PERFORMANCE, ALL_SECURITY, ALL_SUPPLY_CHAIN
from scanners import (
agent_audit,
augustus,
azure_redteam,
bandit,
checkov,
cve_trigger,
deepteam,
detect_secrets,
fickling,
forbidden_files,
fuzzyai,
garak,
giskard_scan,
gitleaks,
gradio_state,
gradio_version,
grype,
guardrails_ai,
hadolint,
llm_guard,
modelscan,
nemo_guardrails,
osv_scanner,
picklescan,
pip_audit,
promptfoo,
pyrit,
readme_inject,
ruff_perf,
safety_check,
semgrep_pack,
socket_scanner,
trivy,
trufflehog,
vigil,
)
ProgressCB = Callable[[float, str], None] | None
def _prog(cb: ProgressCB, frac: float, desc: str = "") -> None:
if cb:
cb(min(frac, 0.99), desc)
# Map task label prefixes/names to individual tool keys
_TASK_TO_TOOL: dict = {
"bandit": "bandit",
"detect-secrets": "detect-secrets",
"forbidden-files": "forbidden-files",
"pip-audit": "pip-audit",
"hadolint": "hadolint",
"gitleaks": "gitleaks",
"ruff-perf": "ruff",
"agent-audit": "agent-audit",
"cve-trigger": "cve-trigger",
"gradio-state": "gradio-state",
"gradio-version": "gradio-version",
"readme-inject": "readme-inject",
# new P0 security
"modelscan": "modelscan",
"picklescan": "picklescan",
"fickling": "fickling",
"trivy": "trivy",
"trufflehog": "trufflehog",
"osv-scanner": "osv-scanner",
"checkov": "checkov",
"grype": "grype",
"socket": "socket",
"safety": "safety",
# new LLM static/dynamic
"llm-guard": "llm-guard",
"garak": "garak",
"deepteam": "deepteam",
"promptfoo": "promptfoo",
"azure-redteam": "azure-redteam",
"pyrit": "pyrit",
"augustus": "augustus",
"fuzzyai": "fuzzyai",
"giskard": "giskard",
"vigil": "vigil",
"nemo-guardrails": "nemo-guardrails",
"guardrails-ai": "guardrails-ai",
}
def scan_repo(
repo_url: str,
hf_token: str | None = None,
deep_history: bool = False,
run_security: bool = True,
run_performance: bool = True,
run_llm: bool = True,
run_supply_chain: bool = True,
tools: frozenset | None = None,
max_workers: int = 8,
progress_cb: ProgressCB = None,
) -> tuple[list[dict], list[str]]:
"""Clone or copy *repo_url*, run all enabled scanners, return (findings, log).
``tools`` is an optional frozenset of individual tool names
(e.g. ``frozenset({"bandit", "ruff", "pip-audit"})``).
When provided only those tools run (the ``run_*`` flags still gate the
group-level semgrep passes unless "semgrep" is also in ``tools``).
Scanners within each phase run concurrently in a thread pool.
"""
work = tempfile.mkdtemp(prefix="scan_")
log: list[str] = []
try:
# ── Step 1: fetch source ──────────────────────────────────────────────
_prog(progress_cb, 0.01, "Fetching source…")
if repo_url.startswith("http"):
url = repo_url.strip().rstrip("/")
git_url = hf_space_to_git(url, token=hf_token) or url
try:
if deep_history:
Repo.clone_from(git_url, work)
else:
Repo.clone_from(git_url, work, depth=1)
except GitCommandError as exc:
return [], [f"git clone failed: {str(exc)[:200]}"]
elif os.path.isdir(repo_url):
shutil.rmtree(work)
shutil.copytree(
repo_url, work,
ignore=shutil.ignore_patterns(
".venv", "venv", "env", ".env",
".pylint-venv", "pylint-venv",
".pylint-results", ".mypy_cache",
".pytest_cache", ".ruff_cache",
"node_modules", "__pycache__", ".git", "*.pyc",
"vindex_compiled",
),
)
else:
return [], ["target is neither URL nor existing directory"]
# ── Step 2: build scanner task list ──────────────────────────────────
tasks: list[tuple[str, Callable]] = []
if run_security:
for label, rules_path, category in ALL_SECURITY:
tasks.append((f"Semgrep:{label}", lambda rp=rules_path, l=label, c=category: semgrep_pack(rp, work, l, c)))
tasks.append(("bandit", lambda: bandit(work)))
tasks.append(("detect-secrets", lambda: detect_secrets(work)))
tasks.append(("forbidden-files", lambda: forbidden_files(work)))
# pip-audit: when scanning a local dir, the venv is excluded from
# the copytree work dir, so pass the original path as a fallback so
# the runner can find and audit the installed environment.
_pip_audit_base = repo_url if os.path.isdir(repo_url) else work
tasks.append(("pip-audit", lambda b=_pip_audit_base: pip_audit(b)))
tasks.append(("hadolint", lambda: hadolint(work)))
if deep_history:
tasks.append(("gitleaks", lambda: gitleaks(work)))
tasks.append(("gradio-state", lambda: gradio_state(work)))
tasks.append(("gradio-version", lambda: gradio_version(work)))
# cve-trigger runs after pip-audit; pass None so it re-runs pip-audit
# internally (keeps the parallel executor architecture clean)
tasks.append(("cve-trigger", lambda: cve_trigger(work)))
# new P0 model-security scanners
tasks.append(("modelscan", lambda: modelscan(work)))
tasks.append(("picklescan", lambda: picklescan(work)))
tasks.append(("fickling", lambda: fickling(work)))
# new infrastructure / CVE scanners
tasks.append(("trivy", lambda: trivy(work)))
tasks.append(("osv-scanner", lambda: osv_scanner(work)))
tasks.append(("checkov", lambda: checkov(work)))
tasks.append(("grype", lambda: grype(work)))
tasks.append(("safety", lambda: safety_check(work)))
# secrets (TruffleHog)
tasks.append(("trufflehog", lambda: trufflehog(work, deep_history)))
if run_performance:
for label, rules_path, category in ALL_PERFORMANCE:
tasks.append((f"Semgrep:{label}", lambda rp=rules_path, l=label, c=category: semgrep_pack(rp, work, l, c)))
tasks.append(("ruff-perf", lambda: ruff_perf(work)))
if run_supply_chain:
for label, rules_path, category in ALL_SUPPLY_CHAIN:
tasks.append((f"Semgrep:{label}", lambda rp=rules_path, l=label, c=category: semgrep_pack(rp, work, l, c)))
tasks.append(("socket", lambda: socket_scanner(work)))
if run_llm:
for label, rules_path, category in ALL_LLM:
tasks.append((f"Semgrep:{label}", lambda rp=rules_path, l=label, c=category: semgrep_pack(rp, work, l, c)))
_agent_audit_cwd = repo_url if os.path.isdir(repo_url) else work
tasks.append(("agent-audit", lambda c=_agent_audit_cwd: agent_audit(work, config_dir=c)))
tasks.append(("readme-inject", lambda: readme_inject(work)))
# new static LLM scanners
tasks.append(("llm-guard", lambda: llm_guard(work)))
tasks.append(("vigil", lambda: vigil(work)))
tasks.append(("nemo-guardrails", lambda: nemo_guardrails(work)))
tasks.append(("guardrails-ai", lambda: guardrails_ai(work)))
# dynamic red-team scanners (only run if endpoint env vars are set)
_garak_url = os.getenv("GARAK_TARGET_URL", "")
_deepteam_url = os.getenv("DEEPTEAM_TARGET_URL", "")
_promptfoo_url = os.getenv("PROMPTFOO_TARGET_URL", "")
_azure_url = os.getenv("AZURE_REDTEAM_TARGET_URL", "")
_pyrit_url = os.getenv("PYRIT_TARGET_URL", "")
_augustus_url = os.getenv("AUGUSTUS_TARGET_URL", "")
_fuzzyai_url = os.getenv("FUZZYAI_TARGET_URL", "")
_giskard_url = os.getenv("GISKARD_TARGET_URL", "")
tasks.append(("garak", lambda u=_garak_url: garak(u)))
tasks.append(("deepteam", lambda u=_deepteam_url: deepteam(u)))
tasks.append(("promptfoo", lambda u=_promptfoo_url: promptfoo(u)))
tasks.append(("azure-redteam", lambda u=_azure_url: azure_redteam(u)))
tasks.append(("pyrit", lambda u=_pyrit_url: pyrit(u)))
tasks.append(("augustus", lambda u=_augustus_url: augustus(u)))
tasks.append(("fuzzyai", lambda u=_fuzzyai_url: fuzzyai(u)))
tasks.append(("giskard", lambda u=_giskard_url: giskard_scan(u)))
# ── Filter to individual tools when caller specified them ─────────────
if tools is not None:
def _keep(label: str) -> bool:
if label.startswith("Semgrep:"):
return "semgrep" in tools
return _TASK_TO_TOOL.get(label, label) in tools
tasks = [(lbl, fn) for lbl, fn in tasks if _keep(lbl)]
n = len(tasks)
completed_count = 0
# ── Step 3: run in parallel ───────────────────────────────────────────
findings: list[dict] = []
with ThreadPoolExecutor(max_workers=max_workers) as pool:
future_to_label = {pool.submit(fn): label for label, fn in tasks}
for future in as_completed(future_to_label):
label = future_to_label[future]
completed_count += 1 # nosemgrep: string-concat-in-loop
_prog(progress_cb, 0.05 + 0.90 * completed_count / n,
f"{label} done…")
try:
result = future.result()
# all runners return (list, str)
fs, msg = result
findings.extend(fs)
log.append(msg)
except Exception as exc: # noqa: BLE001
log.append(f"{label}: ERROR {exc}")
# ── Step 4: dedup, sort, score, prefix paths ─────────────────────────
findings = dedup_findings(findings)
findings = sort_findings(findings)
for f in findings:
f["score"] = score_finding(f, repo_url, findings)
for f in findings:
f["file"] = f"{repo_url}#{f['file']}"
log.insert(0, f"OK ({len(findings)} unique findings)")
return findings, log
finally:
shutil.rmtree(work, ignore_errors=True)