"""scan_repo() orchestration — shared by app.py (Gradio UI) and cli.py. Uses ThreadPoolExecutor so independent scanners run in parallel. Each scanner spawns its own subprocess; threads only coordinate on the findings list, so there are no GIL-contention issues and no shared temp-file races. """ import os import shutil import tempfile from collections.abc import Callable from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path from git import GitCommandError, Repo from core.helpers import have_binary from core.hf import hf_space_to_git from core.models import dedup_findings, sort_findings from core.scoring import score_finding from rules import ALL_LLM, ALL_PERFORMANCE, ALL_SECURITY, ALL_SUPPLY_CHAIN from scanners import ( agent_audit, augustus, azure_redteam, bandit, checkov, cve_trigger, deepteam, detect_secrets, fickling, forbidden_files, fuzzyai, garak, giskard_scan, gitleaks, gradio_state, gradio_version, grype, guardrails_ai, hadolint, llm_guard, modelscan, nemo_guardrails, osv_scanner, picklescan, pip_audit, promptfoo, pyrit, readme_inject, ruff_perf, safety_check, semgrep_pack, socket_scanner, trivy, trufflehog, vigil, ) ProgressCB = Callable[[float, str], None] | None def _prog(cb: ProgressCB, frac: float, desc: str = "") -> None: if cb: cb(min(frac, 0.99), desc) # Map task label prefixes/names to individual tool keys _TASK_TO_TOOL: dict = { "bandit": "bandit", "detect-secrets": "detect-secrets", "forbidden-files": "forbidden-files", "pip-audit": "pip-audit", "hadolint": "hadolint", "gitleaks": "gitleaks", "ruff-perf": "ruff", "agent-audit": "agent-audit", "cve-trigger": "cve-trigger", "gradio-state": "gradio-state", "gradio-version": "gradio-version", "readme-inject": "readme-inject", # new P0 security "modelscan": "modelscan", "picklescan": "picklescan", "fickling": "fickling", "trivy": "trivy", "trufflehog": "trufflehog", "osv-scanner": "osv-scanner", "checkov": "checkov", "grype": "grype", "socket": "socket", "safety": "safety", # new LLM static/dynamic "llm-guard": "llm-guard", "garak": "garak", "deepteam": "deepteam", "promptfoo": "promptfoo", "azure-redteam": "azure-redteam", "pyrit": "pyrit", "augustus": "augustus", "fuzzyai": "fuzzyai", "giskard": "giskard", "vigil": "vigil", "nemo-guardrails": "nemo-guardrails", "guardrails-ai": "guardrails-ai", } def scan_repo( repo_url: str, hf_token: str | None = None, deep_history: bool = False, run_security: bool = True, run_performance: bool = True, run_llm: bool = True, run_supply_chain: bool = True, tools: frozenset | None = None, max_workers: int = 8, progress_cb: ProgressCB = None, ) -> tuple[list[dict], list[str]]: """Clone or copy *repo_url*, run all enabled scanners, return (findings, log). ``tools`` is an optional frozenset of individual tool names (e.g. ``frozenset({"bandit", "ruff", "pip-audit"})``). When provided only those tools run (the ``run_*`` flags still gate the group-level semgrep passes unless "semgrep" is also in ``tools``). Scanners within each phase run concurrently in a thread pool. """ work = tempfile.mkdtemp(prefix="scan_") log: list[str] = [] try: # ── Step 1: fetch source ────────────────────────────────────────────── _prog(progress_cb, 0.01, "Fetching source…") if repo_url.startswith("http"): url = repo_url.strip().rstrip("/") git_url = hf_space_to_git(url, token=hf_token) or url try: if deep_history: Repo.clone_from(git_url, work) else: Repo.clone_from(git_url, work, depth=1) except GitCommandError as exc: return [], [f"git clone failed: {str(exc)[:200]}"] elif os.path.isdir(repo_url): shutil.rmtree(work) shutil.copytree( repo_url, work, ignore=shutil.ignore_patterns( ".venv", "venv", "env", ".env", ".pylint-venv", "pylint-venv", ".pylint-results", ".mypy_cache", ".pytest_cache", ".ruff_cache", "node_modules", "__pycache__", ".git", "*.pyc", "vindex_compiled", ), ) else: return [], ["target is neither URL nor existing directory"] # ── Step 2: build scanner task list ────────────────────────────────── tasks: list[tuple[str, Callable]] = [] if run_security: for label, rules_path, category in ALL_SECURITY: tasks.append((f"Semgrep:{label}", lambda rp=rules_path, l=label, c=category: semgrep_pack(rp, work, l, c))) tasks.append(("bandit", lambda: bandit(work))) tasks.append(("detect-secrets", lambda: detect_secrets(work))) tasks.append(("forbidden-files", lambda: forbidden_files(work))) # pip-audit: when scanning a local dir, the venv is excluded from # the copytree work dir, so pass the original path as a fallback so # the runner can find and audit the installed environment. _pip_audit_base = repo_url if os.path.isdir(repo_url) else work tasks.append(("pip-audit", lambda b=_pip_audit_base: pip_audit(b))) tasks.append(("hadolint", lambda: hadolint(work))) if deep_history: tasks.append(("gitleaks", lambda: gitleaks(work))) tasks.append(("gradio-state", lambda: gradio_state(work))) tasks.append(("gradio-version", lambda: gradio_version(work))) # cve-trigger runs after pip-audit; pass None so it re-runs pip-audit # internally (keeps the parallel executor architecture clean) tasks.append(("cve-trigger", lambda: cve_trigger(work))) # new P0 model-security scanners tasks.append(("modelscan", lambda: modelscan(work))) tasks.append(("picklescan", lambda: picklescan(work))) tasks.append(("fickling", lambda: fickling(work))) # new infrastructure / CVE scanners tasks.append(("trivy", lambda: trivy(work))) tasks.append(("osv-scanner", lambda: osv_scanner(work))) tasks.append(("checkov", lambda: checkov(work))) tasks.append(("grype", lambda: grype(work))) tasks.append(("safety", lambda: safety_check(work))) # secrets (TruffleHog) tasks.append(("trufflehog", lambda: trufflehog(work, deep_history))) if run_performance: for label, rules_path, category in ALL_PERFORMANCE: tasks.append((f"Semgrep:{label}", lambda rp=rules_path, l=label, c=category: semgrep_pack(rp, work, l, c))) tasks.append(("ruff-perf", lambda: ruff_perf(work))) if run_supply_chain: for label, rules_path, category in ALL_SUPPLY_CHAIN: tasks.append((f"Semgrep:{label}", lambda rp=rules_path, l=label, c=category: semgrep_pack(rp, work, l, c))) tasks.append(("socket", lambda: socket_scanner(work))) if run_llm: for label, rules_path, category in ALL_LLM: tasks.append((f"Semgrep:{label}", lambda rp=rules_path, l=label, c=category: semgrep_pack(rp, work, l, c))) _agent_audit_cwd = repo_url if os.path.isdir(repo_url) else work tasks.append(("agent-audit", lambda c=_agent_audit_cwd: agent_audit(work, config_dir=c))) tasks.append(("readme-inject", lambda: readme_inject(work))) # new static LLM scanners tasks.append(("llm-guard", lambda: llm_guard(work))) tasks.append(("vigil", lambda: vigil(work))) tasks.append(("nemo-guardrails", lambda: nemo_guardrails(work))) tasks.append(("guardrails-ai", lambda: guardrails_ai(work))) # dynamic red-team scanners (only run if endpoint env vars are set) _garak_url = os.getenv("GARAK_TARGET_URL", "") _deepteam_url = os.getenv("DEEPTEAM_TARGET_URL", "") _promptfoo_url = os.getenv("PROMPTFOO_TARGET_URL", "") _azure_url = os.getenv("AZURE_REDTEAM_TARGET_URL", "") _pyrit_url = os.getenv("PYRIT_TARGET_URL", "") _augustus_url = os.getenv("AUGUSTUS_TARGET_URL", "") _fuzzyai_url = os.getenv("FUZZYAI_TARGET_URL", "") _giskard_url = os.getenv("GISKARD_TARGET_URL", "") tasks.append(("garak", lambda u=_garak_url: garak(u))) tasks.append(("deepteam", lambda u=_deepteam_url: deepteam(u))) tasks.append(("promptfoo", lambda u=_promptfoo_url: promptfoo(u))) tasks.append(("azure-redteam", lambda u=_azure_url: azure_redteam(u))) tasks.append(("pyrit", lambda u=_pyrit_url: pyrit(u))) tasks.append(("augustus", lambda u=_augustus_url: augustus(u))) tasks.append(("fuzzyai", lambda u=_fuzzyai_url: fuzzyai(u))) tasks.append(("giskard", lambda u=_giskard_url: giskard_scan(u))) # ── Filter to individual tools when caller specified them ───────────── if tools is not None: def _keep(label: str) -> bool: if label.startswith("Semgrep:"): return "semgrep" in tools return _TASK_TO_TOOL.get(label, label) in tools tasks = [(lbl, fn) for lbl, fn in tasks if _keep(lbl)] n = len(tasks) completed_count = 0 # ── Step 3: run in parallel ─────────────────────────────────────────── findings: list[dict] = [] with ThreadPoolExecutor(max_workers=max_workers) as pool: future_to_label = {pool.submit(fn): label for label, fn in tasks} for future in as_completed(future_to_label): label = future_to_label[future] completed_count += 1 # nosemgrep: string-concat-in-loop _prog(progress_cb, 0.05 + 0.90 * completed_count / n, f"{label} done…") try: result = future.result() # all runners return (list, str) fs, msg = result findings.extend(fs) log.append(msg) except Exception as exc: # noqa: BLE001 log.append(f"{label}: ERROR {exc}") # ── Step 4: dedup, sort, score, prefix paths ───────────────────────── findings = dedup_findings(findings) findings = sort_findings(findings) for f in findings: f["score"] = score_finding(f, repo_url, findings) for f in findings: f["file"] = f"{repo_url}#{f['file']}" log.insert(0, f"OK ({len(findings)} unique findings)") return findings, log finally: shutil.rmtree(work, ignore_errors=True)