File size: 5,458 Bytes
6e8f9d6 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 | #!/usr/bin/env python3
# SPDX-License-Identifier: Apache-2.0
"""V8 cipher-agnostic byte-amplification detector β end-to-end inference example.
Three-artefact collaboration. This script:
1. Downloads a bundle from the public NullRabbit/nr-bundles-public dataset
on Hugging Face.
2. Downloads the V8 model and the scoreability-gated inference helper
(``predict.py``) from this repository.
3. Loads the bundle manifest via the bundle-spec reference parser
(NullRabbitLabs/nr-bundle-spec, MIT).
4. Calls ``predict.score_bundle()`` to apply the scoreability gate and
produce a verdict.
A worked demonstration of the **spec β corpus β model** path: bundles on
disk are conformant with an open spec; the spec's reference parser loads
them; the scoreability-gated inference helper produces verdicts.
Dependencies::
pip install huggingface_hub pyarrow scikit-learn joblib numpy
pip install git+https://github.com/NullRabbitLabs/nr-bundle-spec.git
Usage::
python inference_example.py
Three bundles are scored: a known-attack (sui_F10_multi_get_objects_amp),
a known-benign with traffic (sui_BENIGN_reproducer_pipeline), and a
known-benign without traffic (sui_BENIGN_passive_fullnode) β the third
demonstrates the scoreability gate refusing to predict on empty bundles.
"""
from __future__ import annotations
import importlib.util
import sys
from pathlib import Path
from huggingface_hub import hf_hub_download, snapshot_download
# βββ Constants ββββββββββββββββββββββββββββββββββββββββββββββββββββββ
V8_MODEL_REPO = "NullRabbit/v8-cipher-agnostic"
DATASET_REPO = "NullRabbit/nr-bundles-public"
# Three sample bundles: attack, scoreable benign, unscoreable benign.
SAMPLES = [
("crp_19d438471fec4229", "sui_F10_multi_get_objects_amp", "attack"),
("crp_8b85da89c4e34d4c", "sui_BENIGN_reproducer_pipeline", "benign"),
("crp_0598afb4d5e44fb9", "sui_BENIGN_passive_fullnode", "benign (passive)"),
]
def _load_module(name: str, path: str) -> "object":
spec = importlib.util.spec_from_file_location(name, path)
module = importlib.util.module_from_spec(spec) # type: ignore[arg-type]
sys.modules[name] = module
spec.loader.exec_module(module) # type: ignore[union-attr]
return module
def main() -> int:
print("=== V8 cipher-agnostic byte-amplification detector ===")
print(f" model repo: {V8_MODEL_REPO}")
print(f" dataset repo: {DATASET_REPO}")
print()
# Pull the V8 model + predict.py (the scoreability-gated helper).
model_path = hf_hub_download(repo_id=V8_MODEL_REPO, filename="model.joblib")
predict_path = hf_hub_download(repo_id=V8_MODEL_REPO, filename="predict.py")
# Load the helper as a module + load V8 via the helper.
predict = _load_module("v8_predict", predict_path)
payload = predict.load_v8(model_path)
print(f"V8 loaded: {type(payload['model']).__name__}, "
f"{len(payload['feature_names'])} features, "
f"manifest={payload['manifest_name']!r}")
print()
# Pull the three sample bundles.
dataset_root = Path(snapshot_download(
repo_id=DATASET_REPO, repo_type="dataset",
allow_patterns=[f"{cid}/*" for cid, _, _ in SAMPLES],
))
# Score each via the gated helper.
for corpus_id, primitive_id_expected, label in SAMPLES:
bundle_dir = dataset_root / corpus_id
record = predict.score_bundle(bundle_dir, payload)
print(f"--- {corpus_id} ({primitive_id_expected}) ---")
print(f" ground_truth label: {label}")
print(f" verdict: {record['verdict']}")
if record["verdict"] == "unscoreable":
print(f" reason: {record['reason']}")
print(f" n_responses_rows: {record.get('n_responses_rows', 0)}")
else:
print(f" V8 score: {record['v8_score']:.4f}")
print(f" feature_coverage: {record['feature_coverage']}")
print(f" n_responses_rows: {record['n_responses_rows']}")
print(f" features:")
for k, v in record["features"].items():
print(f" {k:<28s} {v:>12.4f}")
print()
print("=" * 72)
print("Notes on V8 deployment")
print("=" * 72)
print("""
- predict.score_bundle() is the recommended consumption surface. The
scoreability gate refuses to predict on bundles where responses.parquet
is missing or zero-rows. Callers who want raw model output without the
gate should load model.joblib directly via joblib.load.
- feature_coverage=resp_only means raw packets.pcap is absent (as in the
public nr-bundles-public bundles). V8's two cardinality features default
to 0, which under-scores attacks relative to the model's training
expectation. For full-coverage inference, produce your own bundles per
nr-bundle-spec with raw pcap retained.
- V8 is a binary detector for the byte-amplification family. Attacks from
other vulnerability families (reconnaissance, service_misconfig,
auth_bypass, rate_limiter_bypass with simulateTransaction shape) will
score "benign" β this is correct behaviour, not a failure. Use the
multi-class softmax model NullRabbit/multiclass-folded for unified
attack-family detection.
""".strip())
print("=" * 72)
return 0
if __name__ == "__main__":
raise SystemExit(main())
|