v8-cipher-agnostic / inference_example.py
simonmorley's picture
V8 cipher-agnostic byte-amplification detector β€” initial release (2026-05-13)
6e8f9d6 verified
#!/usr/bin/env python3
# SPDX-License-Identifier: Apache-2.0
"""V8 cipher-agnostic byte-amplification detector β€” end-to-end inference example.
Three-artefact collaboration. This script:
1. Downloads a bundle from the public NullRabbit/nr-bundles-public dataset
on Hugging Face.
2. Downloads the V8 model and the scoreability-gated inference helper
(``predict.py``) from this repository.
3. Loads the bundle manifest via the bundle-spec reference parser
(NullRabbitLabs/nr-bundle-spec, MIT).
4. Calls ``predict.score_bundle()`` to apply the scoreability gate and
produce a verdict.
A worked demonstration of the **spec β†’ corpus β†’ model** path: bundles on
disk are conformant with an open spec; the spec's reference parser loads
them; the scoreability-gated inference helper produces verdicts.
Dependencies::
pip install huggingface_hub pyarrow scikit-learn joblib numpy
pip install git+https://github.com/NullRabbitLabs/nr-bundle-spec.git
Usage::
python inference_example.py
Three bundles are scored: a known-attack (sui_F10_multi_get_objects_amp),
a known-benign with traffic (sui_BENIGN_reproducer_pipeline), and a
known-benign without traffic (sui_BENIGN_passive_fullnode) β€” the third
demonstrates the scoreability gate refusing to predict on empty bundles.
"""
from __future__ import annotations
import importlib.util
import sys
from pathlib import Path
from huggingface_hub import hf_hub_download, snapshot_download
# ─── Constants ──────────────────────────────────────────────────────
V8_MODEL_REPO = "NullRabbit/v8-cipher-agnostic"
DATASET_REPO = "NullRabbit/nr-bundles-public"
# Three sample bundles: attack, scoreable benign, unscoreable benign.
SAMPLES = [
("crp_19d438471fec4229", "sui_F10_multi_get_objects_amp", "attack"),
("crp_8b85da89c4e34d4c", "sui_BENIGN_reproducer_pipeline", "benign"),
("crp_0598afb4d5e44fb9", "sui_BENIGN_passive_fullnode", "benign (passive)"),
]
def _load_module(name: str, path: str) -> "object":
spec = importlib.util.spec_from_file_location(name, path)
module = importlib.util.module_from_spec(spec) # type: ignore[arg-type]
sys.modules[name] = module
spec.loader.exec_module(module) # type: ignore[union-attr]
return module
def main() -> int:
print("=== V8 cipher-agnostic byte-amplification detector ===")
print(f" model repo: {V8_MODEL_REPO}")
print(f" dataset repo: {DATASET_REPO}")
print()
# Pull the V8 model + predict.py (the scoreability-gated helper).
model_path = hf_hub_download(repo_id=V8_MODEL_REPO, filename="model.joblib")
predict_path = hf_hub_download(repo_id=V8_MODEL_REPO, filename="predict.py")
# Load the helper as a module + load V8 via the helper.
predict = _load_module("v8_predict", predict_path)
payload = predict.load_v8(model_path)
print(f"V8 loaded: {type(payload['model']).__name__}, "
f"{len(payload['feature_names'])} features, "
f"manifest={payload['manifest_name']!r}")
print()
# Pull the three sample bundles.
dataset_root = Path(snapshot_download(
repo_id=DATASET_REPO, repo_type="dataset",
allow_patterns=[f"{cid}/*" for cid, _, _ in SAMPLES],
))
# Score each via the gated helper.
for corpus_id, primitive_id_expected, label in SAMPLES:
bundle_dir = dataset_root / corpus_id
record = predict.score_bundle(bundle_dir, payload)
print(f"--- {corpus_id} ({primitive_id_expected}) ---")
print(f" ground_truth label: {label}")
print(f" verdict: {record['verdict']}")
if record["verdict"] == "unscoreable":
print(f" reason: {record['reason']}")
print(f" n_responses_rows: {record.get('n_responses_rows', 0)}")
else:
print(f" V8 score: {record['v8_score']:.4f}")
print(f" feature_coverage: {record['feature_coverage']}")
print(f" n_responses_rows: {record['n_responses_rows']}")
print(f" features:")
for k, v in record["features"].items():
print(f" {k:<28s} {v:>12.4f}")
print()
print("=" * 72)
print("Notes on V8 deployment")
print("=" * 72)
print("""
- predict.score_bundle() is the recommended consumption surface. The
scoreability gate refuses to predict on bundles where responses.parquet
is missing or zero-rows. Callers who want raw model output without the
gate should load model.joblib directly via joblib.load.
- feature_coverage=resp_only means raw packets.pcap is absent (as in the
public nr-bundles-public bundles). V8's two cardinality features default
to 0, which under-scores attacks relative to the model's training
expectation. For full-coverage inference, produce your own bundles per
nr-bundle-spec with raw pcap retained.
- V8 is a binary detector for the byte-amplification family. Attacks from
other vulnerability families (reconnaissance, service_misconfig,
auth_bypass, rate_limiter_bypass with simulateTransaction shape) will
score "benign" β€” this is correct behaviour, not a failure. Use the
multi-class softmax model NullRabbit/multiclass-folded for unified
attack-family detection.
""".strip())
print("=" * 72)
return 0
if __name__ == "__main__":
raise SystemExit(main())