File size: 3,568 Bytes
215671c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
#!/usr/bin/env python3
"""nr-network-known-class-detector — scoreability-gated inference helper (Apache-2.0).

Self-contained: needs only numpy + joblib + scikit-learn (the version the model was trained with).
Loads `model.joblib` (a dict carrying the HistGradientBoostingClassifier + its feature contract) and
scores feature dicts produced by the NullRabbit `network-v1` featuriser (pcap + responses aggregates).

SCOREABILITY GATE: this is a network/resource-abuse detector. A record is *scoreable* only if it
carries at least one of the model's network features (pcap.* / resp.*) non-NaN. A record with no
network signal (e.g. a pure economic/DeFi bundle, or an empty dict) is returned `scoreable=False`
with no verdict — the model must not emit a confident score outside its domain.

The model is DIAGNOSTIC (trained on synthetic localnet reproductions of public attacks); see the
model card. Default decision threshold 0.5 (the classifier is isotonic-calibrated).

Usage:
    from predict import load, predict
    model = load("model.joblib")
    out = predict(model, [{"pcap.packet_rate": 850.0, "resp.amp_ratio_max": 224.0, ...}])
    # -> [{"scoreable": True, "score": 0.99, "verdict": "attack", "threshold": 0.5}]
"""
from __future__ import annotations

import joblib
import numpy as np

DEFAULT_THRESHOLD = 0.5


def load(path: str = "model.joblib") -> dict:
    m = joblib.load(path)
    assert {"model", "feature_names"} <= set(m), "model.joblib is not the expected contract dict"
    return m


def _is_scoreable(feat: dict, names: list[str]) -> bool:
    nameset = set(names)
    for k, v in feat.items():
        if k not in nameset or v is None:
            continue
        try:
            if not np.isnan(float(v)):
                return True
        except (TypeError, ValueError):
            continue
    return False


def predict(model: dict, records: list[dict], threshold: float = DEFAULT_THRESHOLD) -> list[dict]:
    """Score a list of network-v1 feature dicts. Unscoreable records get no verdict.

    `feature_names` in the contract is already the post-robust-guard set the model was fit on
    (34 features); build the vector over exactly those, NaN for anything absent (HGB is NaN-native).
    """
    names = model["feature_names"]
    clf = model["model"]
    idx = {n: i for i, n in enumerate(names)}

    out: list[dict | None] = []
    rows, pos = [], []
    for i, feat in enumerate(records):
        if not _is_scoreable(feat, names):
            out.append({"scoreable": False, "score": None, "verdict": None, "threshold": threshold})
            continue
        vec = np.full(len(names), np.nan)
        for k, v in feat.items():
            if k in idx and v is not None:
                try:
                    vec[idx[k]] = float(v)
                except (TypeError, ValueError):
                    pass
        rows.append(vec)
        pos.append(i)
        out.append(None)

    if rows:
        proba = clf.predict_proba(np.array(rows))[:, 1]
        for p_i, p in zip(pos, proba):
            out[p_i] = {"scoreable": True, "score": round(float(p), 4),
                        "verdict": "attack" if p >= threshold else "benign", "threshold": threshold}
    return out


if __name__ == "__main__":
    import sys
    m = load(sys.argv[1] if len(sys.argv) > 1 else "model.joblib")
    print(f"loaded nr-network-known-class-detector: {len(m['feature_names'])} features, "
          f"corpus {m.get('corpus_bundle_count')} bundles, version {m.get('features_version')}, "
          f"sha {m.get('corpus_sha256')}")