| |
| |
| """V8 cipher-agnostic byte-amplification detector — scoreability-gated inference. |
| |
| This is the **recommended consumption surface** for V8. It wraps the raw |
| ``CalibratedClassifierCV`` estimator with two production-side gates: |
| |
| 1. **Scoreability gate**: refuses to score bundles where |
| ``responses.parquet`` is missing or zero-rows. V8's training |
| distribution doesn't cover all-zero feature vectors and the |
| underlying estimator produces spurious high attack-scores on them |
| (typical for passive-workload bundles where the validator listens |
| without serving RPC). The gate returns an explicit "unscoreable" |
| verdict instead. |
| |
| 2. **Feature-coverage gate**: notes when the raw ``packets.pcap`` is |
| absent (as in the public ``nr-bundles-public`` bundles) and emits |
| a coverage flag with the score so callers can downweight or |
| ignore the prediction. V8's two cardinality features default to |
| 0 when raw pcap is absent, which under-scores attacks relative |
| to the model's training expectation. |
| |
| Callers who want raw model output without these gates should load |
| ``model.joblib`` directly via ``joblib.load`` — see the "Bypassing the |
| gate" section of the model card. |
| |
| Usage:: |
| |
| from predict import score_bundle, load_v8 |
| |
| payload = load_v8("/path/to/model.joblib") # or from hf_hub_download |
| record = score_bundle("/path/to/some/bundle_dir", payload) |
| if record["verdict"] == "unscoreable": |
| print(f"refused: {record['reason']}") |
| else: |
| print(f"V8 score: {record['v8_score']:.4f} ({record['verdict']})") |
| """ |
|
|
| from __future__ import annotations |
|
|
| from pathlib import Path |
| from typing import Any |
|
|
| import joblib |
| import numpy as np |
| import pyarrow.parquet as pq |
|
|
| |
| |
| from bundle_spec import BundleManifest |
|
|
|
|
| V8_FEATURES = [ |
| "pcap.unique_dst_ports", |
| "pcap.unique_src_ports", |
| "resp.amp_ratio_max", |
| "resp.amp_ratio_mean", |
| "resp.amp_ratio_median", |
| "resp.req_bytes_max", |
| "resp.resp_bytes_max", |
| ] |
|
|
|
|
| def load_v8(model_path: str | Path) -> dict[str, Any]: |
| """Load the V8 lineage-dict payload from a joblib file.""" |
| return joblib.load(model_path) |
|
|
|
|
| def _extract_features(bundle_dir: Path) -> tuple[dict[str, float], int, bool]: |
| """Extract V8 features + diagnostic flags from a bundle. |
| |
| Returns (features, n_responses_rows, has_packets_pcap). |
| """ |
| features = {name: 0.0 for name in V8_FEATURES} |
|
|
| responses_path = bundle_dir / "responses.parquet" |
| n_resp_rows = 0 |
| if responses_path.is_file(): |
| table = pq.read_table(responses_path) |
| n_resp_rows = table.num_rows |
| if n_resp_rows > 0: |
| req = table.column("request_size_bytes").to_numpy() |
| resp = table.column("response_size_bytes").to_numpy() |
| features["resp.req_bytes_max"] = float(req.max()) |
| features["resp.resp_bytes_max"] = float(resp.max()) |
| with np.errstate(divide="ignore", invalid="ignore"): |
| ratios = np.where(req > 0, resp / req, 0.0) |
| features["resp.amp_ratio_max"] = float(ratios.max()) |
| features["resp.amp_ratio_mean"] = float(ratios.mean()) |
| features["resp.amp_ratio_median"] = float(np.median(ratios)) |
|
|
| has_packets_pcap = (bundle_dir / "packets.pcap").is_file() |
| |
| |
| |
| |
| return features, n_resp_rows, has_packets_pcap |
|
|
|
|
| def score_bundle( |
| bundle_dir: str | Path, payload: dict[str, Any] |
| ) -> dict[str, Any]: |
| """Score a bundle through V8, with the scoreability gate applied. |
| |
| Returns a record with: |
| - ``verdict``: one of ``"attack"``, ``"benign"``, ``"unscoreable"``. |
| - ``v8_score``: P(attack) in [0, 1], or ``None`` if unscoreable. |
| - ``reason``: human-readable explanation when unscoreable. |
| - ``feature_coverage``: ``"full"`` or ``"resp_only"`` (raw pcap absent). |
| - ``corpus_id``, ``primitive_id``, ``ground_truth``: from manifest. |
| - ``features``: the 7 feature values as scored (zeros where absent). |
| - ``n_responses_rows``: number of rows in responses.parquet. |
| """ |
| bundle_dir = Path(bundle_dir) |
|
|
| manifest_path = bundle_dir / "manifest.json" |
| if not manifest_path.is_file(): |
| return { |
| "verdict": "unscoreable", |
| "reason": f"manifest.json not found at {manifest_path}", |
| "v8_score": None, |
| } |
| manifest = BundleManifest.model_validate_json(manifest_path.read_text()) |
|
|
| features, n_resp_rows, has_packets_pcap = _extract_features(bundle_dir) |
|
|
| |
| if n_resp_rows == 0: |
| return { |
| "verdict": "unscoreable", |
| "reason": ( |
| "responses.parquet is missing or zero-rows; V8 cannot score " |
| "bundles with no observed RPC traffic. Use a non-amplification-" |
| "family detector for passive-workload bundles, or compose with " |
| "the multi-class softmax model NullRabbit/multiclass-folded." |
| ), |
| "v8_score": None, |
| "corpus_id": manifest.corpus_id, |
| "primitive_id": manifest.primitive_id, |
| "n_responses_rows": 0, |
| "feature_coverage": "none", |
| } |
|
|
| |
| X = np.array([[features[name] for name in V8_FEATURES]]) |
| score = float(payload["model"].predict_proba(X)[0, 1]) |
| verdict = "attack" if score >= 0.5 else "benign" |
| coverage = "full" if has_packets_pcap else "resp_only" |
|
|
| return { |
| "verdict": verdict, |
| "v8_score": score, |
| "reason": None, |
| "corpus_id": manifest.corpus_id, |
| "primitive_id": manifest.primitive_id, |
| "ground_truth": ( |
| manifest.ground_truth_label.value |
| if hasattr(manifest.ground_truth_label, "value") |
| else str(manifest.ground_truth_label) |
| ), |
| "features": features, |
| "n_responses_rows": n_resp_rows, |
| "feature_coverage": coverage, |
| } |
|
|