v8-cipher-agnostic / predict.py
simonmorley's picture
V8 cipher-agnostic byte-amplification detector — initial release (2026-05-13)
6e8f9d6 verified
#!/usr/bin/env python3
# SPDX-License-Identifier: Apache-2.0
"""V8 cipher-agnostic byte-amplification detector — scoreability-gated inference.
This is the **recommended consumption surface** for V8. It wraps the raw
``CalibratedClassifierCV`` estimator with two production-side gates:
1. **Scoreability gate**: refuses to score bundles where
``responses.parquet`` is missing or zero-rows. V8's training
distribution doesn't cover all-zero feature vectors and the
underlying estimator produces spurious high attack-scores on them
(typical for passive-workload bundles where the validator listens
without serving RPC). The gate returns an explicit "unscoreable"
verdict instead.
2. **Feature-coverage gate**: notes when the raw ``packets.pcap`` is
absent (as in the public ``nr-bundles-public`` bundles) and emits
a coverage flag with the score so callers can downweight or
ignore the prediction. V8's two cardinality features default to
0 when raw pcap is absent, which under-scores attacks relative
to the model's training expectation.
Callers who want raw model output without these gates should load
``model.joblib`` directly via ``joblib.load`` — see the "Bypassing the
gate" section of the model card.
Usage::
from predict import score_bundle, load_v8
payload = load_v8("/path/to/model.joblib") # or from hf_hub_download
record = score_bundle("/path/to/some/bundle_dir", payload)
if record["verdict"] == "unscoreable":
print(f"refused: {record['reason']}")
else:
print(f"V8 score: {record['v8_score']:.4f} ({record['verdict']})")
"""
from __future__ import annotations
from pathlib import Path
from typing import Any
import joblib
import numpy as np
import pyarrow.parquet as pq
# nr-bundle-spec — the reference parser. Pip-install via
# pip install git+https://github.com/NullRabbitLabs/nr-bundle-spec.git
from bundle_spec import BundleManifest
V8_FEATURES = [
"pcap.unique_dst_ports",
"pcap.unique_src_ports",
"resp.amp_ratio_max",
"resp.amp_ratio_mean",
"resp.amp_ratio_median",
"resp.req_bytes_max",
"resp.resp_bytes_max",
]
def load_v8(model_path: str | Path) -> dict[str, Any]:
"""Load the V8 lineage-dict payload from a joblib file."""
return joblib.load(model_path)
def _extract_features(bundle_dir: Path) -> tuple[dict[str, float], int, bool]:
"""Extract V8 features + diagnostic flags from a bundle.
Returns (features, n_responses_rows, has_packets_pcap).
"""
features = {name: 0.0 for name in V8_FEATURES}
responses_path = bundle_dir / "responses.parquet"
n_resp_rows = 0
if responses_path.is_file():
table = pq.read_table(responses_path)
n_resp_rows = table.num_rows
if n_resp_rows > 0:
req = table.column("request_size_bytes").to_numpy()
resp = table.column("response_size_bytes").to_numpy()
features["resp.req_bytes_max"] = float(req.max())
features["resp.resp_bytes_max"] = float(resp.max())
with np.errstate(divide="ignore", invalid="ignore"):
ratios = np.where(req > 0, resp / req, 0.0)
features["resp.amp_ratio_max"] = float(ratios.max())
features["resp.amp_ratio_mean"] = float(ratios.mean())
features["resp.amp_ratio_median"] = float(np.median(ratios))
has_packets_pcap = (bundle_dir / "packets.pcap").is_file()
# If raw pcap is present, callers can implement the cardinality
# feature extraction; this helper does not parse pcaps. The two
# pcap.unique_*_ports features stay at 0.0 — emitting a coverage
# warning to the caller is the gate's job.
return features, n_resp_rows, has_packets_pcap
def score_bundle(
bundle_dir: str | Path, payload: dict[str, Any]
) -> dict[str, Any]:
"""Score a bundle through V8, with the scoreability gate applied.
Returns a record with:
- ``verdict``: one of ``"attack"``, ``"benign"``, ``"unscoreable"``.
- ``v8_score``: P(attack) in [0, 1], or ``None`` if unscoreable.
- ``reason``: human-readable explanation when unscoreable.
- ``feature_coverage``: ``"full"`` or ``"resp_only"`` (raw pcap absent).
- ``corpus_id``, ``primitive_id``, ``ground_truth``: from manifest.
- ``features``: the 7 feature values as scored (zeros where absent).
- ``n_responses_rows``: number of rows in responses.parquet.
"""
bundle_dir = Path(bundle_dir)
manifest_path = bundle_dir / "manifest.json"
if not manifest_path.is_file():
return {
"verdict": "unscoreable",
"reason": f"manifest.json not found at {manifest_path}",
"v8_score": None,
}
manifest = BundleManifest.model_validate_json(manifest_path.read_text())
features, n_resp_rows, has_packets_pcap = _extract_features(bundle_dir)
# Scoreability gate
if n_resp_rows == 0:
return {
"verdict": "unscoreable",
"reason": (
"responses.parquet is missing or zero-rows; V8 cannot score "
"bundles with no observed RPC traffic. Use a non-amplification-"
"family detector for passive-workload bundles, or compose with "
"the multi-class softmax model NullRabbit/multiclass-folded."
),
"v8_score": None,
"corpus_id": manifest.corpus_id,
"primitive_id": manifest.primitive_id,
"n_responses_rows": 0,
"feature_coverage": "none",
}
# Score
X = np.array([[features[name] for name in V8_FEATURES]])
score = float(payload["model"].predict_proba(X)[0, 1])
verdict = "attack" if score >= 0.5 else "benign"
coverage = "full" if has_packets_pcap else "resp_only"
return {
"verdict": verdict,
"v8_score": score,
"reason": None,
"corpus_id": manifest.corpus_id,
"primitive_id": manifest.primitive_id,
"ground_truth": (
manifest.ground_truth_label.value
if hasattr(manifest.ground_truth_label, "value")
else str(manifest.ground_truth_label)
),
"features": features,
"n_responses_rows": n_resp_rows,
"feature_coverage": coverage,
}