#!/usr/bin/env python3 # SPDX-License-Identifier: Apache-2.0 """V8 cipher-agnostic byte-amplification detector — scoreability-gated inference. This is the **recommended consumption surface** for V8. It wraps the raw ``CalibratedClassifierCV`` estimator with two production-side gates: 1. **Scoreability gate**: refuses to score bundles where ``responses.parquet`` is missing or zero-rows. V8's training distribution doesn't cover all-zero feature vectors and the underlying estimator produces spurious high attack-scores on them (typical for passive-workload bundles where the validator listens without serving RPC). The gate returns an explicit "unscoreable" verdict instead. 2. **Feature-coverage gate**: notes when the raw ``packets.pcap`` is absent (as in the public ``nr-bundles-public`` bundles) and emits a coverage flag with the score so callers can downweight or ignore the prediction. V8's two cardinality features default to 0 when raw pcap is absent, which under-scores attacks relative to the model's training expectation. Callers who want raw model output without these gates should load ``model.joblib`` directly via ``joblib.load`` — see the "Bypassing the gate" section of the model card. Usage:: from predict import score_bundle, load_v8 payload = load_v8("/path/to/model.joblib") # or from hf_hub_download record = score_bundle("/path/to/some/bundle_dir", payload) if record["verdict"] == "unscoreable": print(f"refused: {record['reason']}") else: print(f"V8 score: {record['v8_score']:.4f} ({record['verdict']})") """ from __future__ import annotations from pathlib import Path from typing import Any import joblib import numpy as np import pyarrow.parquet as pq # nr-bundle-spec — the reference parser. Pip-install via # pip install git+https://github.com/NullRabbitLabs/nr-bundle-spec.git from bundle_spec import BundleManifest V8_FEATURES = [ "pcap.unique_dst_ports", "pcap.unique_src_ports", "resp.amp_ratio_max", "resp.amp_ratio_mean", "resp.amp_ratio_median", "resp.req_bytes_max", "resp.resp_bytes_max", ] def load_v8(model_path: str | Path) -> dict[str, Any]: """Load the V8 lineage-dict payload from a joblib file.""" return joblib.load(model_path) def _extract_features(bundle_dir: Path) -> tuple[dict[str, float], int, bool]: """Extract V8 features + diagnostic flags from a bundle. Returns (features, n_responses_rows, has_packets_pcap). """ features = {name: 0.0 for name in V8_FEATURES} responses_path = bundle_dir / "responses.parquet" n_resp_rows = 0 if responses_path.is_file(): table = pq.read_table(responses_path) n_resp_rows = table.num_rows if n_resp_rows > 0: req = table.column("request_size_bytes").to_numpy() resp = table.column("response_size_bytes").to_numpy() features["resp.req_bytes_max"] = float(req.max()) features["resp.resp_bytes_max"] = float(resp.max()) with np.errstate(divide="ignore", invalid="ignore"): ratios = np.where(req > 0, resp / req, 0.0) features["resp.amp_ratio_max"] = float(ratios.max()) features["resp.amp_ratio_mean"] = float(ratios.mean()) features["resp.amp_ratio_median"] = float(np.median(ratios)) has_packets_pcap = (bundle_dir / "packets.pcap").is_file() # If raw pcap is present, callers can implement the cardinality # feature extraction; this helper does not parse pcaps. The two # pcap.unique_*_ports features stay at 0.0 — emitting a coverage # warning to the caller is the gate's job. return features, n_resp_rows, has_packets_pcap def score_bundle( bundle_dir: str | Path, payload: dict[str, Any] ) -> dict[str, Any]: """Score a bundle through V8, with the scoreability gate applied. Returns a record with: - ``verdict``: one of ``"attack"``, ``"benign"``, ``"unscoreable"``. - ``v8_score``: P(attack) in [0, 1], or ``None`` if unscoreable. - ``reason``: human-readable explanation when unscoreable. - ``feature_coverage``: ``"full"`` or ``"resp_only"`` (raw pcap absent). - ``corpus_id``, ``primitive_id``, ``ground_truth``: from manifest. - ``features``: the 7 feature values as scored (zeros where absent). - ``n_responses_rows``: number of rows in responses.parquet. """ bundle_dir = Path(bundle_dir) manifest_path = bundle_dir / "manifest.json" if not manifest_path.is_file(): return { "verdict": "unscoreable", "reason": f"manifest.json not found at {manifest_path}", "v8_score": None, } manifest = BundleManifest.model_validate_json(manifest_path.read_text()) features, n_resp_rows, has_packets_pcap = _extract_features(bundle_dir) # Scoreability gate if n_resp_rows == 0: return { "verdict": "unscoreable", "reason": ( "responses.parquet is missing or zero-rows; V8 cannot score " "bundles with no observed RPC traffic. Use a non-amplification-" "family detector for passive-workload bundles, or compose with " "the multi-class softmax model NullRabbit/multiclass-folded." ), "v8_score": None, "corpus_id": manifest.corpus_id, "primitive_id": manifest.primitive_id, "n_responses_rows": 0, "feature_coverage": "none", } # Score X = np.array([[features[name] for name in V8_FEATURES]]) score = float(payload["model"].predict_proba(X)[0, 1]) verdict = "attack" if score >= 0.5 else "benign" coverage = "full" if has_packets_pcap else "resp_only" return { "verdict": verdict, "v8_score": score, "reason": None, "corpus_id": manifest.corpus_id, "primitive_id": manifest.primitive_id, "ground_truth": ( manifest.ground_truth_label.value if hasattr(manifest.ground_truth_label, "value") else str(manifest.ground_truth_label) ), "features": features, "n_responses_rows": n_resp_rows, "feature_coverage": coverage, }