File size: 6,320 Bytes
6e8f9d6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
#!/usr/bin/env python3
# SPDX-License-Identifier: Apache-2.0
"""V8 cipher-agnostic byte-amplification detector — scoreability-gated inference.

This is the **recommended consumption surface** for V8. It wraps the raw
``CalibratedClassifierCV`` estimator with two production-side gates:

  1. **Scoreability gate**: refuses to score bundles where
     ``responses.parquet`` is missing or zero-rows. V8's training
     distribution doesn't cover all-zero feature vectors and the
     underlying estimator produces spurious high attack-scores on them
     (typical for passive-workload bundles where the validator listens
     without serving RPC). The gate returns an explicit "unscoreable"
     verdict instead.

  2. **Feature-coverage gate**: notes when the raw ``packets.pcap`` is
     absent (as in the public ``nr-bundles-public`` bundles) and emits
     a coverage flag with the score so callers can downweight or
     ignore the prediction. V8's two cardinality features default to
     0 when raw pcap is absent, which under-scores attacks relative
     to the model's training expectation.

Callers who want raw model output without these gates should load
``model.joblib`` directly via ``joblib.load`` — see the "Bypassing the
gate" section of the model card.

Usage::

    from predict import score_bundle, load_v8

    payload = load_v8("/path/to/model.joblib")  # or from hf_hub_download
    record = score_bundle("/path/to/some/bundle_dir", payload)
    if record["verdict"] == "unscoreable":
        print(f"refused: {record['reason']}")
    else:
        print(f"V8 score: {record['v8_score']:.4f} ({record['verdict']})")
"""

from __future__ import annotations

from pathlib import Path
from typing import Any

import joblib
import numpy as np
import pyarrow.parquet as pq

# nr-bundle-spec — the reference parser. Pip-install via
#   pip install git+https://github.com/NullRabbitLabs/nr-bundle-spec.git
from bundle_spec import BundleManifest


V8_FEATURES = [
    "pcap.unique_dst_ports",
    "pcap.unique_src_ports",
    "resp.amp_ratio_max",
    "resp.amp_ratio_mean",
    "resp.amp_ratio_median",
    "resp.req_bytes_max",
    "resp.resp_bytes_max",
]


def load_v8(model_path: str | Path) -> dict[str, Any]:
    """Load the V8 lineage-dict payload from a joblib file."""
    return joblib.load(model_path)


def _extract_features(bundle_dir: Path) -> tuple[dict[str, float], int, bool]:
    """Extract V8 features + diagnostic flags from a bundle.

    Returns (features, n_responses_rows, has_packets_pcap).
    """
    features = {name: 0.0 for name in V8_FEATURES}

    responses_path = bundle_dir / "responses.parquet"
    n_resp_rows = 0
    if responses_path.is_file():
        table = pq.read_table(responses_path)
        n_resp_rows = table.num_rows
        if n_resp_rows > 0:
            req = table.column("request_size_bytes").to_numpy()
            resp = table.column("response_size_bytes").to_numpy()
            features["resp.req_bytes_max"] = float(req.max())
            features["resp.resp_bytes_max"] = float(resp.max())
            with np.errstate(divide="ignore", invalid="ignore"):
                ratios = np.where(req > 0, resp / req, 0.0)
            features["resp.amp_ratio_max"] = float(ratios.max())
            features["resp.amp_ratio_mean"] = float(ratios.mean())
            features["resp.amp_ratio_median"] = float(np.median(ratios))

    has_packets_pcap = (bundle_dir / "packets.pcap").is_file()
    # If raw pcap is present, callers can implement the cardinality
    # feature extraction; this helper does not parse pcaps. The two
    # pcap.unique_*_ports features stay at 0.0 — emitting a coverage
    # warning to the caller is the gate's job.
    return features, n_resp_rows, has_packets_pcap


def score_bundle(
    bundle_dir: str | Path, payload: dict[str, Any]
) -> dict[str, Any]:
    """Score a bundle through V8, with the scoreability gate applied.

    Returns a record with:
      - ``verdict``: one of ``"attack"``, ``"benign"``, ``"unscoreable"``.
      - ``v8_score``: P(attack) in [0, 1], or ``None`` if unscoreable.
      - ``reason``: human-readable explanation when unscoreable.
      - ``feature_coverage``: ``"full"`` or ``"resp_only"`` (raw pcap absent).
      - ``corpus_id``, ``primitive_id``, ``ground_truth``: from manifest.
      - ``features``: the 7 feature values as scored (zeros where absent).
      - ``n_responses_rows``: number of rows in responses.parquet.
    """
    bundle_dir = Path(bundle_dir)

    manifest_path = bundle_dir / "manifest.json"
    if not manifest_path.is_file():
        return {
            "verdict": "unscoreable",
            "reason": f"manifest.json not found at {manifest_path}",
            "v8_score": None,
        }
    manifest = BundleManifest.model_validate_json(manifest_path.read_text())

    features, n_resp_rows, has_packets_pcap = _extract_features(bundle_dir)

    # Scoreability gate
    if n_resp_rows == 0:
        return {
            "verdict": "unscoreable",
            "reason": (
                "responses.parquet is missing or zero-rows; V8 cannot score "
                "bundles with no observed RPC traffic. Use a non-amplification-"
                "family detector for passive-workload bundles, or compose with "
                "the multi-class softmax model NullRabbit/multiclass-folded."
            ),
            "v8_score": None,
            "corpus_id": manifest.corpus_id,
            "primitive_id": manifest.primitive_id,
            "n_responses_rows": 0,
            "feature_coverage": "none",
        }

    # Score
    X = np.array([[features[name] for name in V8_FEATURES]])
    score = float(payload["model"].predict_proba(X)[0, 1])
    verdict = "attack" if score >= 0.5 else "benign"
    coverage = "full" if has_packets_pcap else "resp_only"

    return {
        "verdict": verdict,
        "v8_score": score,
        "reason": None,
        "corpus_id": manifest.corpus_id,
        "primitive_id": manifest.primitive_id,
        "ground_truth": (
            manifest.ground_truth_label.value
            if hasattr(manifest.ground_truth_label, "value")
            else str(manifest.ground_truth_label)
        ),
        "features": features,
        "n_responses_rows": n_resp_rows,
        "feature_coverage": coverage,
    }