Spaces:
Sleeping
Sleeping
| """ | |
| engine.py: Probe loading and scan orchestration. | |
| Responsibilities: | |
| * Discover and parse the YAML probe packs into :class:`Probe` objects | |
| (:func:`load_probes`). Packs ship inside the package but a caller can point | |
| at any directory to extend or replace the battery. | |
| * Run a battery against a :class:`Provider`, apply each probe's detector, and | |
| assemble a :class:`ScanResult` (:class:`Scanner`). | |
| The engine is intentionally thin: all the security knowledge lives in the YAML | |
| packs and the detectors, and all the rendering lives in the reporters. That | |
| separation is what makes the tool easy to audit and extend. | |
| """ | |
| from __future__ import annotations | |
| from pathlib import Path | |
| from typing import Dict, Iterable, List, Optional | |
| import yaml | |
| from .detectors import get_detector | |
| from .models import ( | |
| Finding, | |
| Probe, | |
| ProbeOutcome, | |
| ScanResult, | |
| utcnow_iso, | |
| ) | |
| from .providers import Provider | |
| DEFAULT_PROBE_DIR = Path(__file__).parent / "probes" | |
| def load_probes( | |
| probe_dir: Optional[Path] = None, | |
| categories: Optional[Iterable[str]] = None, | |
| ) -> List[Probe]: | |
| """Load every probe from the YAML packs in ``probe_dir``. | |
| Args: | |
| probe_dir: Directory of ``*.yaml`` probe packs. Defaults to the packs | |
| bundled with the package. | |
| categories: Optional allow-list of category names to include. ``None`` | |
| loads everything. | |
| Returns: | |
| Probes sorted by ``(category, id)`` for stable, reproducible runs. | |
| Raises: | |
| FileNotFoundError: if the directory does not exist. | |
| ValueError: if a pack is malformed or a probe references an unknown | |
| detector (fail fast: a broken pack must not silently shrink the | |
| battery). | |
| """ | |
| probe_dir = Path(probe_dir) if probe_dir else DEFAULT_PROBE_DIR | |
| if not probe_dir.is_dir(): | |
| raise FileNotFoundError(f"Probe directory not found: {probe_dir}") | |
| wanted = set(categories) if categories else None | |
| probes: List[Probe] = [] | |
| seen_ids: Dict[str, str] = {} | |
| for path in sorted(probe_dir.glob("*.y*ml")): | |
| with open(path, "r", encoding="utf-8") as fh: | |
| data = yaml.safe_load(fh) or {} | |
| category = data.get("category") | |
| if not category: | |
| raise ValueError(f"Probe pack {path.name} is missing a 'category'.") | |
| if wanted is not None and category not in wanted: | |
| continue | |
| pack_owasp = data.get("owasp", "") | |
| for raw in data.get("probes", []): | |
| raw.setdefault("owasp", pack_owasp) | |
| probe = Probe.from_dict(raw, category=category) | |
| # Validate the detector reference eagerly. | |
| get_detector(probe.detector) | |
| if probe.id in seen_ids: | |
| raise ValueError( | |
| f"Duplicate probe id {probe.id!r} in {path.name} " | |
| f"(already defined in {seen_ids[probe.id]})." | |
| ) | |
| seen_ids[probe.id] = path.name | |
| probes.append(probe) | |
| if wanted: | |
| missing = wanted - {p.category for p in probes} | |
| if missing: | |
| raise ValueError( | |
| f"Requested categories not found: {', '.join(sorted(missing))}." | |
| ) | |
| return sorted(probes, key=lambda p: (p.category, p.id)) | |
| def available_categories(probe_dir: Optional[Path] = None) -> List[str]: | |
| """List the probe categories available in ``probe_dir``.""" | |
| return sorted({p.category for p in load_probes(probe_dir)}) | |
| class Scanner: | |
| """Runs a probe battery against a target provider.""" | |
| def __init__( | |
| self, | |
| provider: Provider, | |
| probes: Optional[List[Probe]] = None, | |
| *, | |
| probe_dir: Optional[Path] = None, | |
| categories: Optional[Iterable[str]] = None, | |
| scanner_version: str = "", | |
| ): | |
| self.provider = provider | |
| self.probes = ( | |
| probes if probes is not None else load_probes(probe_dir, categories) | |
| ) | |
| self.scanner_version = scanner_version | |
| def run_probe(self, probe: Probe) -> ProbeOutcome: | |
| """Execute one probe end-to-end: query the provider, judge, package.""" | |
| response = self.provider.complete(probe.prompt, context=probe.context) | |
| detector = get_detector(probe.detector) | |
| result = detector(probe, response) | |
| if not result.failed: | |
| return ProbeOutcome(probe=probe, response=response, failed=False) | |
| finding = Finding( | |
| probe_id=probe.id, | |
| category=probe.category, | |
| name=probe.name, | |
| severity=probe.severity, | |
| description=probe.description, | |
| evidence=result.evidence, | |
| remediation=probe.remediation, | |
| prompt=probe.prompt, | |
| response=response, | |
| owasp=probe.owasp, | |
| detector=probe.detector, | |
| ) | |
| return ProbeOutcome(probe=probe, response=response, failed=True, finding=finding) | |
| def run(self) -> ScanResult: | |
| """Run the full battery and return an aggregate result.""" | |
| started = utcnow_iso() | |
| outcomes = [self.run_probe(p) for p in self.probes] | |
| finished = utcnow_iso() | |
| return ScanResult( | |
| target=self.provider.name, | |
| started_at=started, | |
| finished_at=finished, | |
| outcomes=outcomes, | |
| scanner_version=self.scanner_version, | |
| ) | |