#!/usr/bin/env python3 """nr-network-known-class-detector — scoreability-gated inference helper (Apache-2.0). Self-contained: needs only numpy + joblib + scikit-learn (the version the model was trained with). Loads `model.joblib` (a dict carrying the HistGradientBoostingClassifier + its feature contract) and scores feature dicts produced by the NullRabbit `network-v1` featuriser (pcap + responses aggregates). SCOREABILITY GATE: this is a network/resource-abuse detector. A record is *scoreable* only if it carries at least one of the model's network features (pcap.* / resp.*) non-NaN. A record with no network signal (e.g. a pure economic/DeFi bundle, or an empty dict) is returned `scoreable=False` with no verdict — the model must not emit a confident score outside its domain. The model is DIAGNOSTIC (trained on synthetic localnet reproductions of public attacks); see the model card. Default decision threshold 0.5 (the classifier is isotonic-calibrated). Usage: from predict import load, predict model = load("model.joblib") out = predict(model, [{"pcap.packet_rate": 850.0, "resp.amp_ratio_max": 224.0, ...}]) # -> [{"scoreable": True, "score": 0.99, "verdict": "attack", "threshold": 0.5}] """ from __future__ import annotations import joblib import numpy as np DEFAULT_THRESHOLD = 0.5 def load(path: str = "model.joblib") -> dict: m = joblib.load(path) assert {"model", "feature_names"} <= set(m), "model.joblib is not the expected contract dict" return m def _is_scoreable(feat: dict, names: list[str]) -> bool: nameset = set(names) for k, v in feat.items(): if k not in nameset or v is None: continue try: if not np.isnan(float(v)): return True except (TypeError, ValueError): continue return False def predict(model: dict, records: list[dict], threshold: float = DEFAULT_THRESHOLD) -> list[dict]: """Score a list of network-v1 feature dicts. Unscoreable records get no verdict. `feature_names` in the contract is already the post-robust-guard set the model was fit on (34 features); build the vector over exactly those, NaN for anything absent (HGB is NaN-native). """ names = model["feature_names"] clf = model["model"] idx = {n: i for i, n in enumerate(names)} out: list[dict | None] = [] rows, pos = [], [] for i, feat in enumerate(records): if not _is_scoreable(feat, names): out.append({"scoreable": False, "score": None, "verdict": None, "threshold": threshold}) continue vec = np.full(len(names), np.nan) for k, v in feat.items(): if k in idx and v is not None: try: vec[idx[k]] = float(v) except (TypeError, ValueError): pass rows.append(vec) pos.append(i) out.append(None) if rows: proba = clf.predict_proba(np.array(rows))[:, 1] for p_i, p in zip(pos, proba): out[p_i] = {"scoreable": True, "score": round(float(p), 4), "verdict": "attack" if p >= threshold else "benign", "threshold": threshold} return out if __name__ == "__main__": import sys m = load(sys.argv[1] if len(sys.argv) > 1 else "model.joblib") print(f"loaded nr-network-known-class-detector: {len(m['feature_names'])} features, " f"corpus {m.get('corpus_bundle_count')} bundles, version {m.get('features_version')}, " f"sha {m.get('corpus_sha256')}")