File size: 5,458 Bytes
6e8f9d6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#!/usr/bin/env python3
# SPDX-License-Identifier: Apache-2.0
"""V8 cipher-agnostic byte-amplification detector β€” end-to-end inference example.

Three-artefact collaboration. This script:

1. Downloads a bundle from the public NullRabbit/nr-bundles-public dataset
   on Hugging Face.
2. Downloads the V8 model and the scoreability-gated inference helper
   (``predict.py``) from this repository.
3. Loads the bundle manifest via the bundle-spec reference parser
   (NullRabbitLabs/nr-bundle-spec, MIT).
4. Calls ``predict.score_bundle()`` to apply the scoreability gate and
   produce a verdict.

A worked demonstration of the **spec β†’ corpus β†’ model** path: bundles on
disk are conformant with an open spec; the spec's reference parser loads
them; the scoreability-gated inference helper produces verdicts.

Dependencies::

    pip install huggingface_hub pyarrow scikit-learn joblib numpy
    pip install git+https://github.com/NullRabbitLabs/nr-bundle-spec.git

Usage::

    python inference_example.py

Three bundles are scored: a known-attack (sui_F10_multi_get_objects_amp),
a known-benign with traffic (sui_BENIGN_reproducer_pipeline), and a
known-benign without traffic (sui_BENIGN_passive_fullnode) β€” the third
demonstrates the scoreability gate refusing to predict on empty bundles.
"""

from __future__ import annotations

import importlib.util
import sys
from pathlib import Path

from huggingface_hub import hf_hub_download, snapshot_download


# ─── Constants ──────────────────────────────────────────────────────

V8_MODEL_REPO = "NullRabbit/v8-cipher-agnostic"
DATASET_REPO = "NullRabbit/nr-bundles-public"

# Three sample bundles: attack, scoreable benign, unscoreable benign.
SAMPLES = [
    ("crp_19d438471fec4229", "sui_F10_multi_get_objects_amp", "attack"),
    ("crp_8b85da89c4e34d4c", "sui_BENIGN_reproducer_pipeline", "benign"),
    ("crp_0598afb4d5e44fb9", "sui_BENIGN_passive_fullnode", "benign (passive)"),
]


def _load_module(name: str, path: str) -> "object":
    spec = importlib.util.spec_from_file_location(name, path)
    module = importlib.util.module_from_spec(spec)  # type: ignore[arg-type]
    sys.modules[name] = module
    spec.loader.exec_module(module)  # type: ignore[union-attr]
    return module


def main() -> int:
    print("=== V8 cipher-agnostic byte-amplification detector ===")
    print(f"  model repo:   {V8_MODEL_REPO}")
    print(f"  dataset repo: {DATASET_REPO}")
    print()

    # Pull the V8 model + predict.py (the scoreability-gated helper).
    model_path = hf_hub_download(repo_id=V8_MODEL_REPO, filename="model.joblib")
    predict_path = hf_hub_download(repo_id=V8_MODEL_REPO, filename="predict.py")

    # Load the helper as a module + load V8 via the helper.
    predict = _load_module("v8_predict", predict_path)
    payload = predict.load_v8(model_path)

    print(f"V8 loaded: {type(payload['model']).__name__}, "
          f"{len(payload['feature_names'])} features, "
          f"manifest={payload['manifest_name']!r}")
    print()

    # Pull the three sample bundles.
    dataset_root = Path(snapshot_download(
        repo_id=DATASET_REPO, repo_type="dataset",
        allow_patterns=[f"{cid}/*" for cid, _, _ in SAMPLES],
    ))

    # Score each via the gated helper.
    for corpus_id, primitive_id_expected, label in SAMPLES:
        bundle_dir = dataset_root / corpus_id
        record = predict.score_bundle(bundle_dir, payload)
        print(f"--- {corpus_id} ({primitive_id_expected}) ---")
        print(f"  ground_truth label: {label}")
        print(f"  verdict:            {record['verdict']}")
        if record["verdict"] == "unscoreable":
            print(f"  reason:             {record['reason']}")
            print(f"  n_responses_rows:   {record.get('n_responses_rows', 0)}")
        else:
            print(f"  V8 score:           {record['v8_score']:.4f}")
            print(f"  feature_coverage:   {record['feature_coverage']}")
            print(f"  n_responses_rows:   {record['n_responses_rows']}")
            print(f"  features:")
            for k, v in record["features"].items():
                print(f"    {k:<28s} {v:>12.4f}")
        print()

    print("=" * 72)
    print("Notes on V8 deployment")
    print("=" * 72)
    print("""
- predict.score_bundle() is the recommended consumption surface. The
  scoreability gate refuses to predict on bundles where responses.parquet
  is missing or zero-rows. Callers who want raw model output without the
  gate should load model.joblib directly via joblib.load.

- feature_coverage=resp_only means raw packets.pcap is absent (as in the
  public nr-bundles-public bundles). V8's two cardinality features default
  to 0, which under-scores attacks relative to the model's training
  expectation. For full-coverage inference, produce your own bundles per
  nr-bundle-spec with raw pcap retained.

- V8 is a binary detector for the byte-amplification family. Attacks from
  other vulnerability families (reconnaissance, service_misconfig,
  auth_bypass, rate_limiter_bypass with simulateTransaction shape) will
  score "benign" β€” this is correct behaviour, not a failure. Use the
  multi-class softmax model NullRabbit/multiclass-folded for unified
  attack-family detection.
""".strip())
    print("=" * 72)
    return 0


if __name__ == "__main__":
    raise SystemExit(main())