| """scan_repo() orchestration β shared by app.py (Gradio UI) and cli.py.
|
|
|
| Uses ThreadPoolExecutor so independent scanners run in parallel. Each scanner
|
| spawns its own subprocess; threads only coordinate on the findings list, so
|
| there are no GIL-contention issues and no shared temp-file races.
|
| """
|
| import os
|
| import shutil
|
| import tempfile
|
| from collections.abc import Callable
|
| from concurrent.futures import ThreadPoolExecutor, as_completed
|
| from pathlib import Path
|
|
|
| from git import GitCommandError, Repo
|
|
|
| from core.helpers import have_binary
|
| from core.hf import hf_space_to_git
|
| from core.models import dedup_findings, sort_findings
|
| from core.scoring import score_finding
|
| from rules import ALL_LLM, ALL_PERFORMANCE, ALL_SECURITY, ALL_SUPPLY_CHAIN
|
| from scanners import (
|
| agent_audit,
|
| augustus,
|
| azure_redteam,
|
| bandit,
|
| checkov,
|
| cve_trigger,
|
| deepteam,
|
| detect_secrets,
|
| fickling,
|
| forbidden_files,
|
| fuzzyai,
|
| garak,
|
| giskard_scan,
|
| gitleaks,
|
| gradio_state,
|
| gradio_version,
|
| grype,
|
| guardrails_ai,
|
| hadolint,
|
| llm_guard,
|
| modelscan,
|
| nemo_guardrails,
|
| osv_scanner,
|
| picklescan,
|
| pip_audit,
|
| promptfoo,
|
| pyrit,
|
| readme_inject,
|
| ruff_perf,
|
| safety_check,
|
| semgrep_pack,
|
| socket_scanner,
|
| trivy,
|
| trufflehog,
|
| vigil,
|
| )
|
|
|
| ProgressCB = Callable[[float, str], None] | None
|
|
|
|
|
| def _prog(cb: ProgressCB, frac: float, desc: str = "") -> None:
|
| if cb:
|
| cb(min(frac, 0.99), desc)
|
|
|
|
|
|
|
| _TASK_TO_TOOL: dict = {
|
| "bandit": "bandit",
|
| "detect-secrets": "detect-secrets",
|
| "forbidden-files": "forbidden-files",
|
| "pip-audit": "pip-audit",
|
| "hadolint": "hadolint",
|
| "gitleaks": "gitleaks",
|
| "ruff-perf": "ruff",
|
| "agent-audit": "agent-audit",
|
| "cve-trigger": "cve-trigger",
|
| "gradio-state": "gradio-state",
|
| "gradio-version": "gradio-version",
|
| "readme-inject": "readme-inject",
|
|
|
| "modelscan": "modelscan",
|
| "picklescan": "picklescan",
|
| "fickling": "fickling",
|
| "trivy": "trivy",
|
| "trufflehog": "trufflehog",
|
| "osv-scanner": "osv-scanner",
|
| "checkov": "checkov",
|
| "grype": "grype",
|
| "socket": "socket",
|
| "safety": "safety",
|
|
|
| "llm-guard": "llm-guard",
|
| "garak": "garak",
|
| "deepteam": "deepteam",
|
| "promptfoo": "promptfoo",
|
| "azure-redteam": "azure-redteam",
|
| "pyrit": "pyrit",
|
| "augustus": "augustus",
|
| "fuzzyai": "fuzzyai",
|
| "giskard": "giskard",
|
| "vigil": "vigil",
|
| "nemo-guardrails": "nemo-guardrails",
|
| "guardrails-ai": "guardrails-ai",
|
| }
|
|
|
|
|
| def scan_repo(
|
| repo_url: str,
|
| hf_token: str | None = None,
|
| deep_history: bool = False,
|
| run_security: bool = True,
|
| run_performance: bool = True,
|
| run_llm: bool = True,
|
| run_supply_chain: bool = True,
|
| tools: frozenset | None = None,
|
| max_workers: int = 8,
|
| progress_cb: ProgressCB = None,
|
| ) -> tuple[list[dict], list[str]]:
|
| """Clone or copy *repo_url*, run all enabled scanners, return (findings, log).
|
|
|
| ``tools`` is an optional frozenset of individual tool names
|
| (e.g. ``frozenset({"bandit", "ruff", "pip-audit"})``).
|
| When provided only those tools run (the ``run_*`` flags still gate the
|
| group-level semgrep passes unless "semgrep" is also in ``tools``).
|
|
|
| Scanners within each phase run concurrently in a thread pool.
|
| """
|
| work = tempfile.mkdtemp(prefix="scan_")
|
| log: list[str] = []
|
|
|
| try:
|
|
|
| _prog(progress_cb, 0.01, "Fetching sourceβ¦")
|
| if repo_url.startswith("http"):
|
| url = repo_url.strip().rstrip("/")
|
| git_url = hf_space_to_git(url, token=hf_token) or url
|
| try:
|
| if deep_history:
|
| Repo.clone_from(git_url, work)
|
| else:
|
| Repo.clone_from(git_url, work, depth=1)
|
| except GitCommandError as exc:
|
| return [], [f"git clone failed: {str(exc)[:200]}"]
|
| elif os.path.isdir(repo_url):
|
| shutil.rmtree(work)
|
| shutil.copytree(
|
| repo_url, work,
|
| ignore=shutil.ignore_patterns(
|
| ".venv", "venv", "env", ".env",
|
| ".pylint-venv", "pylint-venv",
|
| ".pylint-results", ".mypy_cache",
|
| ".pytest_cache", ".ruff_cache",
|
| "node_modules", "__pycache__", ".git", "*.pyc",
|
| "vindex_compiled",
|
| ),
|
| )
|
| else:
|
| return [], ["target is neither URL nor existing directory"]
|
|
|
|
|
| tasks: list[tuple[str, Callable]] = []
|
|
|
| if run_security:
|
| for label, rules_path, category in ALL_SECURITY:
|
| tasks.append((f"Semgrep:{label}", lambda rp=rules_path, l=label, c=category: semgrep_pack(rp, work, l, c)))
|
| tasks.append(("bandit", lambda: bandit(work)))
|
| tasks.append(("detect-secrets", lambda: detect_secrets(work)))
|
| tasks.append(("forbidden-files", lambda: forbidden_files(work)))
|
|
|
|
|
|
|
| _pip_audit_base = repo_url if os.path.isdir(repo_url) else work
|
| tasks.append(("pip-audit", lambda b=_pip_audit_base: pip_audit(b)))
|
| tasks.append(("hadolint", lambda: hadolint(work)))
|
| if deep_history:
|
| tasks.append(("gitleaks", lambda: gitleaks(work)))
|
| tasks.append(("gradio-state", lambda: gradio_state(work)))
|
| tasks.append(("gradio-version", lambda: gradio_version(work)))
|
|
|
|
|
| tasks.append(("cve-trigger", lambda: cve_trigger(work)))
|
|
|
| tasks.append(("modelscan", lambda: modelscan(work)))
|
| tasks.append(("picklescan", lambda: picklescan(work)))
|
| tasks.append(("fickling", lambda: fickling(work)))
|
|
|
| tasks.append(("trivy", lambda: trivy(work)))
|
| tasks.append(("osv-scanner", lambda: osv_scanner(work)))
|
| tasks.append(("checkov", lambda: checkov(work)))
|
| tasks.append(("grype", lambda: grype(work)))
|
| tasks.append(("safety", lambda: safety_check(work)))
|
|
|
| tasks.append(("trufflehog", lambda: trufflehog(work, deep_history)))
|
|
|
| if run_performance:
|
| for label, rules_path, category in ALL_PERFORMANCE:
|
| tasks.append((f"Semgrep:{label}", lambda rp=rules_path, l=label, c=category: semgrep_pack(rp, work, l, c)))
|
| tasks.append(("ruff-perf", lambda: ruff_perf(work)))
|
|
|
| if run_supply_chain:
|
| for label, rules_path, category in ALL_SUPPLY_CHAIN:
|
| tasks.append((f"Semgrep:{label}", lambda rp=rules_path, l=label, c=category: semgrep_pack(rp, work, l, c)))
|
| tasks.append(("socket", lambda: socket_scanner(work)))
|
|
|
| if run_llm:
|
| for label, rules_path, category in ALL_LLM:
|
| tasks.append((f"Semgrep:{label}", lambda rp=rules_path, l=label, c=category: semgrep_pack(rp, work, l, c)))
|
| _agent_audit_cwd = repo_url if os.path.isdir(repo_url) else work
|
| tasks.append(("agent-audit", lambda c=_agent_audit_cwd: agent_audit(work, config_dir=c)))
|
| tasks.append(("readme-inject", lambda: readme_inject(work)))
|
|
|
| tasks.append(("llm-guard", lambda: llm_guard(work)))
|
| tasks.append(("vigil", lambda: vigil(work)))
|
| tasks.append(("nemo-guardrails", lambda: nemo_guardrails(work)))
|
| tasks.append(("guardrails-ai", lambda: guardrails_ai(work)))
|
|
|
| _garak_url = os.getenv("GARAK_TARGET_URL", "")
|
| _deepteam_url = os.getenv("DEEPTEAM_TARGET_URL", "")
|
| _promptfoo_url = os.getenv("PROMPTFOO_TARGET_URL", "")
|
| _azure_url = os.getenv("AZURE_REDTEAM_TARGET_URL", "")
|
| _pyrit_url = os.getenv("PYRIT_TARGET_URL", "")
|
| _augustus_url = os.getenv("AUGUSTUS_TARGET_URL", "")
|
| _fuzzyai_url = os.getenv("FUZZYAI_TARGET_URL", "")
|
| _giskard_url = os.getenv("GISKARD_TARGET_URL", "")
|
| tasks.append(("garak", lambda u=_garak_url: garak(u)))
|
| tasks.append(("deepteam", lambda u=_deepteam_url: deepteam(u)))
|
| tasks.append(("promptfoo", lambda u=_promptfoo_url: promptfoo(u)))
|
| tasks.append(("azure-redteam", lambda u=_azure_url: azure_redteam(u)))
|
| tasks.append(("pyrit", lambda u=_pyrit_url: pyrit(u)))
|
| tasks.append(("augustus", lambda u=_augustus_url: augustus(u)))
|
| tasks.append(("fuzzyai", lambda u=_fuzzyai_url: fuzzyai(u)))
|
| tasks.append(("giskard", lambda u=_giskard_url: giskard_scan(u)))
|
|
|
|
|
| if tools is not None:
|
| def _keep(label: str) -> bool:
|
| if label.startswith("Semgrep:"):
|
| return "semgrep" in tools
|
| return _TASK_TO_TOOL.get(label, label) in tools
|
| tasks = [(lbl, fn) for lbl, fn in tasks if _keep(lbl)]
|
|
|
| n = len(tasks)
|
| completed_count = 0
|
|
|
|
|
| findings: list[dict] = []
|
| with ThreadPoolExecutor(max_workers=max_workers) as pool:
|
| future_to_label = {pool.submit(fn): label for label, fn in tasks}
|
| for future in as_completed(future_to_label):
|
| label = future_to_label[future]
|
| completed_count += 1
|
| _prog(progress_cb, 0.05 + 0.90 * completed_count / n,
|
| f"{label} doneβ¦")
|
| try:
|
| result = future.result()
|
|
|
| fs, msg = result
|
| findings.extend(fs)
|
| log.append(msg)
|
| except Exception as exc:
|
| log.append(f"{label}: ERROR {exc}")
|
|
|
|
|
| findings = dedup_findings(findings)
|
| findings = sort_findings(findings)
|
| for f in findings:
|
| f["score"] = score_finding(f, repo_url, findings)
|
| for f in findings:
|
| f["file"] = f"{repo_url}#{f['file']}"
|
|
|
| log.insert(0, f"OK ({len(findings)} unique findings)")
|
| return findings, log
|
|
|
| finally:
|
| shutil.rmtree(work, ignore_errors=True)
|
|
|