simonmorley's picture
Add nr-network-known-class-detector: v10 public-CVE cut (39 primitives, 9 chains, held-out ROC 0.9082)
215671c verified
Raw
History Blame Contribute Delete
3.57 kB
#!/usr/bin/env python3
"""nr-network-known-class-detector — scoreability-gated inference helper (Apache-2.0).
Self-contained: needs only numpy + joblib + scikit-learn (the version the model was trained with).
Loads `model.joblib` (a dict carrying the HistGradientBoostingClassifier + its feature contract) and
scores feature dicts produced by the NullRabbit `network-v1` featuriser (pcap + responses aggregates).
SCOREABILITY GATE: this is a network/resource-abuse detector. A record is *scoreable* only if it
carries at least one of the model's network features (pcap.* / resp.*) non-NaN. A record with no
network signal (e.g. a pure economic/DeFi bundle, or an empty dict) is returned `scoreable=False`
with no verdict — the model must not emit a confident score outside its domain.
The model is DIAGNOSTIC (trained on synthetic localnet reproductions of public attacks); see the
model card. Default decision threshold 0.5 (the classifier is isotonic-calibrated).
Usage:
from predict import load, predict
model = load("model.joblib")
out = predict(model, [{"pcap.packet_rate": 850.0, "resp.amp_ratio_max": 224.0, ...}])
# -> [{"scoreable": True, "score": 0.99, "verdict": "attack", "threshold": 0.5}]
"""
from __future__ import annotations
import joblib
import numpy as np
DEFAULT_THRESHOLD = 0.5
def load(path: str = "model.joblib") -> dict:
m = joblib.load(path)
assert {"model", "feature_names"} <= set(m), "model.joblib is not the expected contract dict"
return m
def _is_scoreable(feat: dict, names: list[str]) -> bool:
nameset = set(names)
for k, v in feat.items():
if k not in nameset or v is None:
continue
try:
if not np.isnan(float(v)):
return True
except (TypeError, ValueError):
continue
return False
def predict(model: dict, records: list[dict], threshold: float = DEFAULT_THRESHOLD) -> list[dict]:
"""Score a list of network-v1 feature dicts. Unscoreable records get no verdict.
`feature_names` in the contract is already the post-robust-guard set the model was fit on
(34 features); build the vector over exactly those, NaN for anything absent (HGB is NaN-native).
"""
names = model["feature_names"]
clf = model["model"]
idx = {n: i for i, n in enumerate(names)}
out: list[dict | None] = []
rows, pos = [], []
for i, feat in enumerate(records):
if not _is_scoreable(feat, names):
out.append({"scoreable": False, "score": None, "verdict": None, "threshold": threshold})
continue
vec = np.full(len(names), np.nan)
for k, v in feat.items():
if k in idx and v is not None:
try:
vec[idx[k]] = float(v)
except (TypeError, ValueError):
pass
rows.append(vec)
pos.append(i)
out.append(None)
if rows:
proba = clf.predict_proba(np.array(rows))[:, 1]
for p_i, p in zip(pos, proba):
out[p_i] = {"scoreable": True, "score": round(float(p), 4),
"verdict": "attack" if p >= threshold else "benign", "threshold": threshold}
return out
if __name__ == "__main__":
import sys
m = load(sys.argv[1] if len(sys.argv) > 1 else "model.joblib")
print(f"loaded nr-network-known-class-detector: {len(m['feature_names'])} features, "
f"corpus {m.get('corpus_bundle_count')} bundles, version {m.get('features_version')}, "
f"sha {m.get('corpus_sha256')}")