File size: 6,320 Bytes
6e8f9d6 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 | #!/usr/bin/env python3
# SPDX-License-Identifier: Apache-2.0
"""V8 cipher-agnostic byte-amplification detector — scoreability-gated inference.
This is the **recommended consumption surface** for V8. It wraps the raw
``CalibratedClassifierCV`` estimator with two production-side gates:
1. **Scoreability gate**: refuses to score bundles where
``responses.parquet`` is missing or zero-rows. V8's training
distribution doesn't cover all-zero feature vectors and the
underlying estimator produces spurious high attack-scores on them
(typical for passive-workload bundles where the validator listens
without serving RPC). The gate returns an explicit "unscoreable"
verdict instead.
2. **Feature-coverage gate**: notes when the raw ``packets.pcap`` is
absent (as in the public ``nr-bundles-public`` bundles) and emits
a coverage flag with the score so callers can downweight or
ignore the prediction. V8's two cardinality features default to
0 when raw pcap is absent, which under-scores attacks relative
to the model's training expectation.
Callers who want raw model output without these gates should load
``model.joblib`` directly via ``joblib.load`` — see the "Bypassing the
gate" section of the model card.
Usage::
from predict import score_bundle, load_v8
payload = load_v8("/path/to/model.joblib") # or from hf_hub_download
record = score_bundle("/path/to/some/bundle_dir", payload)
if record["verdict"] == "unscoreable":
print(f"refused: {record['reason']}")
else:
print(f"V8 score: {record['v8_score']:.4f} ({record['verdict']})")
"""
from __future__ import annotations
from pathlib import Path
from typing import Any
import joblib
import numpy as np
import pyarrow.parquet as pq
# nr-bundle-spec — the reference parser. Pip-install via
# pip install git+https://github.com/NullRabbitLabs/nr-bundle-spec.git
from bundle_spec import BundleManifest
V8_FEATURES = [
"pcap.unique_dst_ports",
"pcap.unique_src_ports",
"resp.amp_ratio_max",
"resp.amp_ratio_mean",
"resp.amp_ratio_median",
"resp.req_bytes_max",
"resp.resp_bytes_max",
]
def load_v8(model_path: str | Path) -> dict[str, Any]:
"""Load the V8 lineage-dict payload from a joblib file."""
return joblib.load(model_path)
def _extract_features(bundle_dir: Path) -> tuple[dict[str, float], int, bool]:
"""Extract V8 features + diagnostic flags from a bundle.
Returns (features, n_responses_rows, has_packets_pcap).
"""
features = {name: 0.0 for name in V8_FEATURES}
responses_path = bundle_dir / "responses.parquet"
n_resp_rows = 0
if responses_path.is_file():
table = pq.read_table(responses_path)
n_resp_rows = table.num_rows
if n_resp_rows > 0:
req = table.column("request_size_bytes").to_numpy()
resp = table.column("response_size_bytes").to_numpy()
features["resp.req_bytes_max"] = float(req.max())
features["resp.resp_bytes_max"] = float(resp.max())
with np.errstate(divide="ignore", invalid="ignore"):
ratios = np.where(req > 0, resp / req, 0.0)
features["resp.amp_ratio_max"] = float(ratios.max())
features["resp.amp_ratio_mean"] = float(ratios.mean())
features["resp.amp_ratio_median"] = float(np.median(ratios))
has_packets_pcap = (bundle_dir / "packets.pcap").is_file()
# If raw pcap is present, callers can implement the cardinality
# feature extraction; this helper does not parse pcaps. The two
# pcap.unique_*_ports features stay at 0.0 — emitting a coverage
# warning to the caller is the gate's job.
return features, n_resp_rows, has_packets_pcap
def score_bundle(
bundle_dir: str | Path, payload: dict[str, Any]
) -> dict[str, Any]:
"""Score a bundle through V8, with the scoreability gate applied.
Returns a record with:
- ``verdict``: one of ``"attack"``, ``"benign"``, ``"unscoreable"``.
- ``v8_score``: P(attack) in [0, 1], or ``None`` if unscoreable.
- ``reason``: human-readable explanation when unscoreable.
- ``feature_coverage``: ``"full"`` or ``"resp_only"`` (raw pcap absent).
- ``corpus_id``, ``primitive_id``, ``ground_truth``: from manifest.
- ``features``: the 7 feature values as scored (zeros where absent).
- ``n_responses_rows``: number of rows in responses.parquet.
"""
bundle_dir = Path(bundle_dir)
manifest_path = bundle_dir / "manifest.json"
if not manifest_path.is_file():
return {
"verdict": "unscoreable",
"reason": f"manifest.json not found at {manifest_path}",
"v8_score": None,
}
manifest = BundleManifest.model_validate_json(manifest_path.read_text())
features, n_resp_rows, has_packets_pcap = _extract_features(bundle_dir)
# Scoreability gate
if n_resp_rows == 0:
return {
"verdict": "unscoreable",
"reason": (
"responses.parquet is missing or zero-rows; V8 cannot score "
"bundles with no observed RPC traffic. Use a non-amplification-"
"family detector for passive-workload bundles, or compose with "
"the multi-class softmax model NullRabbit/multiclass-folded."
),
"v8_score": None,
"corpus_id": manifest.corpus_id,
"primitive_id": manifest.primitive_id,
"n_responses_rows": 0,
"feature_coverage": "none",
}
# Score
X = np.array([[features[name] for name in V8_FEATURES]])
score = float(payload["model"].predict_proba(X)[0, 1])
verdict = "attack" if score >= 0.5 else "benign"
coverage = "full" if has_packets_pcap else "resp_only"
return {
"verdict": verdict,
"v8_score": score,
"reason": None,
"corpus_id": manifest.corpus_id,
"primitive_id": manifest.primitive_id,
"ground_truth": (
manifest.ground_truth_label.value
if hasattr(manifest.ground_truth_label, "value")
else str(manifest.ground_truth_label)
),
"features": features,
"n_responses_rows": n_resp_rows,
"feature_coverage": coverage,
}
|