Add nr-network-known-class-detector: v10 public-CVE cut (39 primitives, 9 chains, held-out ROC 0.9082)
215671c verified | #!/usr/bin/env python3 | |
| """nr-network-known-class-detector — scoreability-gated inference helper (Apache-2.0). | |
| Self-contained: needs only numpy + joblib + scikit-learn (the version the model was trained with). | |
| Loads `model.joblib` (a dict carrying the HistGradientBoostingClassifier + its feature contract) and | |
| scores feature dicts produced by the NullRabbit `network-v1` featuriser (pcap + responses aggregates). | |
| SCOREABILITY GATE: this is a network/resource-abuse detector. A record is *scoreable* only if it | |
| carries at least one of the model's network features (pcap.* / resp.*) non-NaN. A record with no | |
| network signal (e.g. a pure economic/DeFi bundle, or an empty dict) is returned `scoreable=False` | |
| with no verdict — the model must not emit a confident score outside its domain. | |
| The model is DIAGNOSTIC (trained on synthetic localnet reproductions of public attacks); see the | |
| model card. Default decision threshold 0.5 (the classifier is isotonic-calibrated). | |
| Usage: | |
| from predict import load, predict | |
| model = load("model.joblib") | |
| out = predict(model, [{"pcap.packet_rate": 850.0, "resp.amp_ratio_max": 224.0, ...}]) | |
| # -> [{"scoreable": True, "score": 0.99, "verdict": "attack", "threshold": 0.5}] | |
| """ | |
| from __future__ import annotations | |
| import joblib | |
| import numpy as np | |
| DEFAULT_THRESHOLD = 0.5 | |
| def load(path: str = "model.joblib") -> dict: | |
| m = joblib.load(path) | |
| assert {"model", "feature_names"} <= set(m), "model.joblib is not the expected contract dict" | |
| return m | |
| def _is_scoreable(feat: dict, names: list[str]) -> bool: | |
| nameset = set(names) | |
| for k, v in feat.items(): | |
| if k not in nameset or v is None: | |
| continue | |
| try: | |
| if not np.isnan(float(v)): | |
| return True | |
| except (TypeError, ValueError): | |
| continue | |
| return False | |
| def predict(model: dict, records: list[dict], threshold: float = DEFAULT_THRESHOLD) -> list[dict]: | |
| """Score a list of network-v1 feature dicts. Unscoreable records get no verdict. | |
| `feature_names` in the contract is already the post-robust-guard set the model was fit on | |
| (34 features); build the vector over exactly those, NaN for anything absent (HGB is NaN-native). | |
| """ | |
| names = model["feature_names"] | |
| clf = model["model"] | |
| idx = {n: i for i, n in enumerate(names)} | |
| out: list[dict | None] = [] | |
| rows, pos = [], [] | |
| for i, feat in enumerate(records): | |
| if not _is_scoreable(feat, names): | |
| out.append({"scoreable": False, "score": None, "verdict": None, "threshold": threshold}) | |
| continue | |
| vec = np.full(len(names), np.nan) | |
| for k, v in feat.items(): | |
| if k in idx and v is not None: | |
| try: | |
| vec[idx[k]] = float(v) | |
| except (TypeError, ValueError): | |
| pass | |
| rows.append(vec) | |
| pos.append(i) | |
| out.append(None) | |
| if rows: | |
| proba = clf.predict_proba(np.array(rows))[:, 1] | |
| for p_i, p in zip(pos, proba): | |
| out[p_i] = {"scoreable": True, "score": round(float(p), 4), | |
| "verdict": "attack" if p >= threshold else "benign", "threshold": threshold} | |
| return out | |
| if __name__ == "__main__": | |
| import sys | |
| m = load(sys.argv[1] if len(sys.argv) > 1 else "model.joblib") | |
| print(f"loaded nr-network-known-class-detector: {len(m['feature_names'])} features, " | |
| f"corpus {m.get('corpus_bundle_count')} bundles, version {m.get('features_version')}, " | |
| f"sha {m.get('corpus_sha256')}") | |