| """ |
| SVM classifier — PRIMARY model per the report (§3.1.3 + §4.1). |
| |
| Report quote: |
| "the Support Vector Machine (SVM) was adopted as the core classifier" |
| "classification using SVM to predict the associated MITRE ATT&CK techniques" |
| |
| Inference logic (verbatim from MurshidUIPipeline.ipynb cell 16+19): |
| scores = svm_model.named_steps["clf"].decision_function( |
| svm_model.named_steps["pca"].transform(X_user) |
| ).reshape(-1) |
| pred = (scores >= thr_per_label).astype(int) |
| margins = scores - thr_per_label |
| conf = sigmoid(margins) * 100 |
| |
| Original notebook file is NOT modified. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| from pathlib import Path |
|
|
| import joblib |
| import numpy as np |
|
|
| from app.config import settings |
|
|
|
|
| def _sigmoid(x: np.ndarray) -> np.ndarray: |
| """Probability calibration: sigmoid(margin) — notebook cell 17.""" |
| x = np.clip(x, -30, 30) |
| return 1.0 / (1.0 + np.exp(-x)) |
|
|
|
|
| class SVMModel: |
| """ |
| Wraps the trained LinearSVC pipeline with per-label thresholds. |
| Structure of the .joblib pack (from notebook): |
| svm_pack["model"] → sklearn Pipeline (PCA + LinearSVC) |
| svm_pack["thresholds_per_label"] → np.ndarray shape (n_techniques,) |
| """ |
|
|
| def __init__(self, models_dir: Path | None = None) -> None: |
| base = Path(models_dir or settings.murshid_models_dir).resolve() |
|
|
| svm_path = base / settings.svm_joblib |
| labels_path = base / settings.label_columns_json |
|
|
| for p in (svm_path, labels_path): |
| if not p.is_file(): |
| raise FileNotFoundError(f"Missing model file: {p}") |
|
|
| svm_pack = joblib.load(svm_path) |
| self._model = svm_pack["model"] |
| self._thresholds = np.asarray( |
| svm_pack["thresholds_per_label"], dtype=np.float64 |
| ) |
|
|
| with open(labels_path, encoding="utf-8") as f: |
| self.technique_names: list[str] = json.load(f) |
|
|
| n = len(self.technique_names) |
| if self._thresholds.shape[0] != n: |
| raise ValueError( |
| f"SVM thresholds length {self._thresholds.shape[0]} != {n} labels" |
| ) |
|
|
| |
|
|
| def predict(self, embedding_1d: np.ndarray) -> list[dict]: |
| """ |
| Run SVM inference exactly as in the notebook. |
| |
| Returns list of dicts sorted by confidence_percent desc: |
| technique_id, predicted, confidence_percent, score, threshold, margin |
| """ |
| X = embedding_1d.reshape(1, -1) |
|
|
| |
| scores = self._model.named_steps["clf"].decision_function( |
| self._model.named_steps["pca"].transform(X) |
| ).reshape(-1) |
|
|
| pred = (scores >= self._thresholds).astype(int) |
| margins = scores - self._thresholds |
| conf = _sigmoid(margins) * 100 |
|
|
| results = [ |
| { |
| "technique_id": self.technique_names[i], |
| "predicted": bool(pred[i]), |
| "confidence_percent": round(float(conf[i]), 2), |
| "score": round(float(scores[i]), 4), |
| "threshold": round(float(self._thresholds[i]), 4), |
| "margin": round(float(margins[i]), 4), |
| } |
| for i in range(len(self.technique_names)) |
| ] |
|
|
| return sorted(results, key=lambda r: r["confidence_percent"], reverse=True) |
|
|