murshid / murshid_backend /app /ml /svm_model.py
devorbit's picture
Initial deployment - secrets removed
26e1c2e
"""
SVM classifier — PRIMARY model per the report (§3.1.3 + §4.1).
Report quote:
"the Support Vector Machine (SVM) was adopted as the core classifier"
"classification using SVM to predict the associated MITRE ATT&CK techniques"
Inference logic (verbatim from MurshidUIPipeline.ipynb cell 16+19):
scores = svm_model.named_steps["clf"].decision_function(
svm_model.named_steps["pca"].transform(X_user)
).reshape(-1)
pred = (scores >= thr_per_label).astype(int)
margins = scores - thr_per_label
conf = sigmoid(margins) * 100
Original notebook file is NOT modified.
"""
from __future__ import annotations
import json
from pathlib import Path
import joblib
import numpy as np
from app.config import settings
def _sigmoid(x: np.ndarray) -> np.ndarray:
"""Probability calibration: sigmoid(margin) — notebook cell 17."""
x = np.clip(x, -30, 30)
return 1.0 / (1.0 + np.exp(-x))
class SVMModel:
"""
Wraps the trained LinearSVC pipeline with per-label thresholds.
Structure of the .joblib pack (from notebook):
svm_pack["model"] → sklearn Pipeline (PCA + LinearSVC)
svm_pack["thresholds_per_label"] → np.ndarray shape (n_techniques,)
"""
def __init__(self, models_dir: Path | None = None) -> None:
base = Path(models_dir or settings.murshid_models_dir).resolve()
svm_path = base / settings.svm_joblib
labels_path = base / settings.label_columns_json
for p in (svm_path, labels_path):
if not p.is_file():
raise FileNotFoundError(f"Missing model file: {p}")
svm_pack = joblib.load(svm_path)
self._model = svm_pack["model"] # Pipeline(PCA → LinearSVC)
self._thresholds = np.asarray(
svm_pack["thresholds_per_label"], dtype=np.float64
)
with open(labels_path, encoding="utf-8") as f:
self.technique_names: list[str] = json.load(f)
n = len(self.technique_names)
if self._thresholds.shape[0] != n:
raise ValueError(
f"SVM thresholds length {self._thresholds.shape[0]} != {n} labels"
)
# ------------------------------------------------------------------
def predict(self, embedding_1d: np.ndarray) -> list[dict]:
"""
Run SVM inference exactly as in the notebook.
Returns list of dicts sorted by confidence_percent desc:
technique_id, predicted, confidence_percent, score, threshold, margin
"""
X = embedding_1d.reshape(1, -1)
# Apply PCA then LinearSVC decision function (notebook cell 19)
scores = self._model.named_steps["clf"].decision_function(
self._model.named_steps["pca"].transform(X)
).reshape(-1)
pred = (scores >= self._thresholds).astype(int)
margins = scores - self._thresholds
conf = _sigmoid(margins) * 100 # calibrated confidence (%)
results = [
{
"technique_id": self.technique_names[i],
"predicted": bool(pred[i]),
"confidence_percent": round(float(conf[i]), 2),
"score": round(float(scores[i]), 4),
"threshold": round(float(self._thresholds[i]), 4),
"margin": round(float(margins[i]), 4),
}
for i in range(len(self.technique_names))
]
return sorted(results, key=lambda r: r["confidence_percent"], reverse=True)