| """ |
| Defines all three required modeling approaches for agent trace anomaly detection: |
| |
| 1. NaiveBaseline : majority-class classifier (always predicts the most common label) |
| 2. ClassicalMLModel : XGBoost gradient boosting on handcrafted trace features |
| 3. TraceTransformer : DistilBERT fine-tuned on raw trace text as a sequence classifier |
| |
| Each model exposes a consistent interface: |
| .fit(X, y) / .train(...) |
| .predict(X) : np.ndarray of labels |
| .predict_proba(X) : np.ndarray of probabilities |
| .save(path) |
| .load(path) (classmethod) |
| """ |
|
|
| import json |
| import os |
| from pathlib import Path |
|
|
| import joblib |
| import numpy as np |
| import pandas as pd |
| import torch |
| import torch.nn as nn |
| from sklearn.metrics import classification_report |
| from torch.utils.data import DataLoader, Dataset |
| from transformers import ( |
| AutoModelForSequenceClassification, |
| AutoTokenizer, |
| get_linear_schedule_with_warmup, |
| ) |
| from xgboost import XGBClassifier |
|
|
|
|
| |
|
|
| class NaiveBaseline: |
| """ |
| Majority-class classifier. Predicts the most frequent label in training data. |
| This serves as the lower-bound baseline. |
| """ |
|
|
| def __init__(self): |
| self.majority_class = 0 |
| self.class_prior = 0.5 |
|
|
| def fit(self, X, y): |
| """Learn the majority class from training labels.""" |
| y = np.array(y) |
| counts = np.bincount(y) |
| self.majority_class = int(np.argmax(counts)) |
| self.class_prior = counts[self.majority_class] / len(y) |
| print(f"[NaiveBaseline] Majority class = {self.majority_class} " |
| f"(prior = {self.class_prior:.3f})") |
| return self |
|
|
| def predict(self, X) -> np.ndarray: |
| n = len(X) if hasattr(X, "__len__") else X.shape[0] |
| return np.full(n, self.majority_class) |
|
|
| def predict_proba(self, X) -> np.ndarray: |
| n = len(X) if hasattr(X, "__len__") else X.shape[0] |
| proba = np.zeros((n, 2)) |
| proba[:, self.majority_class] = self.class_prior |
| proba[:, 1 - self.majority_class] = 1 - self.class_prior |
| return proba |
|
|
| def save(self, path: str): |
| os.makedirs(os.path.dirname(path), exist_ok=True) |
| joblib.dump({"majority_class": self.majority_class, "class_prior": self.class_prior}, path) |
|
|
| @classmethod |
| def load(cls, path: str): |
| data = joblib.load(path) |
| model = cls() |
| model.majority_class = data["majority_class"] |
| model.class_prior = data["class_prior"] |
| return model |
|
|
|
|
| |
|
|
| class ClassicalMLModel: |
| """ |
| XGBoost classifier trained on handcrafted trace features. |
| Handles class imbalance via scale_pos_weight. |
| """ |
|
|
| def __init__(self, params: dict = None): |
| default_params = { |
| "n_estimators": 300, |
| "max_depth": 6, |
| "learning_rate": 0.1, |
| "subsample": 0.8, |
| "colsample_bytree": 0.8, |
| "min_child_weight": 3, |
| "gamma": 0.1, |
| "eval_metric": "logloss", |
| "random_state": 42, |
| "n_jobs": -1, |
| } |
| if params: |
| default_params.update(params) |
| self.params = default_params |
| self.model = None |
| self.feature_names = None |
|
|
| def fit(self, X, y, X_val=None, y_val=None): |
| """ |
| Train XGBoost with optional early stopping on validation set. |
| Automatically computes scale_pos_weight for class imbalance. |
| """ |
| y = np.array(y) |
| n_neg = np.sum(y == 0) |
| n_pos = np.sum(y == 1) |
| scale = n_neg / n_pos if n_pos > 0 else 1.0 |
| self.params["scale_pos_weight"] = scale |
|
|
| print(f"[XGBoost] Training with scale_pos_weight = {scale:.2f}") |
| print(f"[XGBoost] Class distribution: 0={n_neg}, 1={n_pos}") |
|
|
| if isinstance(X, pd.DataFrame): |
| self.feature_names = list(X.columns) |
|
|
| self.model = XGBClassifier(**self.params) |
|
|
| fit_params = {} |
| if X_val is not None and y_val is not None: |
| fit_params["eval_set"] = [(X_val, y_val)] |
|
|
| self.model.fit(X, y, verbose=50, **fit_params) |
| return self |
|
|
| def predict(self, X) -> np.ndarray: |
| return self.model.predict(X) |
|
|
| def predict_proba(self, X) -> np.ndarray: |
| return self.model.predict_proba(X) |
|
|
| def get_feature_importance(self) -> pd.Series: |
| """Return feature importances as a sorted Series.""" |
| names = self.feature_names or [f"f{i}" for i in range(len(self.model.feature_importances_))] |
| return pd.Series( |
| self.model.feature_importances_, index=names |
| ).sort_values(ascending=False) |
|
|
| def save(self, path: str): |
| os.makedirs(os.path.dirname(path), exist_ok=True) |
| joblib.dump({ |
| "model": self.model, |
| "params": self.params, |
| "feature_names": self.feature_names, |
| }, path) |
|
|
| @classmethod |
| def load(cls, path: str): |
| data = joblib.load(path) |
| instance = cls(params=data["params"]) |
| instance.model = data["model"] |
| instance.feature_names = data["feature_names"] |
| return instance |
|
|
|
|
| |
|
|
| class TraceDataset(Dataset): |
| """PyTorch dataset for tokenized agent traces.""" |
|
|
| def __init__(self, texts: list[str], labels: list[int], tokenizer, max_length: int = 512): |
| self.texts = texts |
| self.labels = labels |
| self.tokenizer = tokenizer |
| self.max_length = max_length |
|
|
| def __len__(self): |
| return len(self.texts) |
|
|
| def __getitem__(self, idx): |
| encoding = self.tokenizer( |
| self.texts[idx], |
| max_length=self.max_length, |
| padding="max_length", |
| truncation=True, |
| return_tensors="pt", |
| ) |
| return { |
| "input_ids": encoding["input_ids"].squeeze(0), |
| "attention_mask": encoding["attention_mask"].squeeze(0), |
| "label": torch.tensor(self.labels[idx], dtype=torch.long), |
| } |
|
|
|
|
| class TraceTransformer: |
| """ |
| DistilBERT fine-tuned for binary classification on raw agent trace text. |
| Truncates traces to max_length tokens — captures the most salient parts |
| of the execution trace for anomaly signals. |
| """ |
|
|
| def __init__( |
| self, |
| model_name: str = "distilbert-base-uncased", |
| max_length: int = 512, |
| batch_size: int = 16, |
| learning_rate: float = 2e-5, |
| num_epochs: int = 3, |
| warmup_ratio: float = 0.1, |
| device: str = None, |
| ): |
| self.model_name = model_name |
| self.max_length = max_length |
| self.batch_size = batch_size |
| self.learning_rate = learning_rate |
| self.num_epochs = num_epochs |
| self.warmup_ratio = warmup_ratio |
| self.device = device or ("cuda" if torch.cuda.is_available() else "cpu") |
|
|
| self.tokenizer = None |
| self.model = None |
|
|
| def _init_model(self): |
| """Lazy-init tokenizer and model.""" |
| self.tokenizer = AutoTokenizer.from_pretrained(self.model_name) |
| self.model = AutoModelForSequenceClassification.from_pretrained( |
| self.model_name, num_labels=2 |
| ).to(self.device) |
|
|
| def fit(self, X_train: list[str], y_train: list[int], |
| X_val: list[str] = None, y_val: list[int] = None): |
| """ |
| Fine-tune DistilBERT on trace text. |
| Handles class imbalance via weighted cross-entropy. |
| """ |
| self._init_model() |
|
|
| |
| y_arr = np.array(y_train) |
| n_neg = np.sum(y_arr == 0) |
| n_pos = np.sum(y_arr == 1) |
| weight_pos = n_neg / n_pos if n_pos > 0 else 1.0 |
| class_weights = torch.tensor([1.0, weight_pos], dtype=torch.float).to(self.device) |
| loss_fn = nn.CrossEntropyLoss(weight=class_weights) |
|
|
| print(f"[Transformer] Training on {len(X_train)} samples, device={self.device}") |
| print(f"[Transformer] Class weights: [1.0, {weight_pos:.2f}]") |
|
|
| train_ds = TraceDataset(X_train, y_train, self.tokenizer, self.max_length) |
| train_loader = DataLoader(train_ds, batch_size=self.batch_size, shuffle=True) |
|
|
| val_loader = None |
| if X_val and y_val: |
| val_ds = TraceDataset(X_val, y_val, self.tokenizer, self.max_length) |
| val_loader = DataLoader(val_ds, batch_size=self.batch_size) |
|
|
| optimizer = torch.optim.AdamW(self.model.parameters(), lr=self.learning_rate) |
| total_steps = len(train_loader) * self.num_epochs |
| warmup_steps = int(total_steps * self.warmup_ratio) |
| scheduler = get_linear_schedule_with_warmup( |
| optimizer, num_warmup_steps=warmup_steps, num_training_steps=total_steps |
| ) |
|
|
| best_val_f1 = 0.0 |
| best_state = None |
|
|
| for epoch in range(self.num_epochs): |
| self.model.train() |
| total_loss = 0.0 |
| n_correct = 0 |
| n_total = 0 |
|
|
| for batch in train_loader: |
| input_ids = batch["input_ids"].to(self.device) |
| attn_mask = batch["attention_mask"].to(self.device) |
| labels = batch["label"].to(self.device) |
|
|
| outputs = self.model(input_ids=input_ids, attention_mask=attn_mask) |
| loss = loss_fn(outputs.logits, labels) |
|
|
| optimizer.zero_grad() |
| loss.backward() |
| torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0) |
| optimizer.step() |
| scheduler.step() |
|
|
| total_loss += loss.item() |
| preds = outputs.logits.argmax(dim=-1) |
| n_correct += (preds == labels).sum().item() |
| n_total += len(labels) |
|
|
| train_acc = n_correct / n_total |
| avg_loss = total_loss / len(train_loader) |
| print(f" Epoch {epoch + 1}/{self.num_epochs} — " |
| f"loss: {avg_loss:.4f}, train_acc: {train_acc:.4f}", end="") |
|
|
| |
| if val_loader: |
| val_preds, val_labels = self._predict_loader(val_loader) |
| from sklearn.metrics import f1_score |
| val_f1 = f1_score(val_labels, val_preds, average="macro") |
| print(f", val_f1: {val_f1:.4f}") |
|
|
| if val_f1 > best_val_f1: |
| best_val_f1 = val_f1 |
| best_state = {k: v.cpu().clone() for k, v in self.model.state_dict().items()} |
| else: |
| print() |
|
|
| |
| if best_state: |
| self.model.load_state_dict(best_state) |
| self.model.to(self.device) |
| print(f"[Transformer] Restored best model (val_f1 = {best_val_f1:.4f})") |
|
|
| return self |
|
|
| def _predict_loader(self, loader): |
| """Run inference on a dataloader, return predictions and labels.""" |
| self.model.eval() |
| all_preds = [] |
| all_labels = [] |
|
|
| with torch.no_grad(): |
| for batch in loader: |
| input_ids = batch["input_ids"].to(self.device) |
| attn_mask = batch["attention_mask"].to(self.device) |
| labels = batch["label"] |
|
|
| outputs = self.model(input_ids=input_ids, attention_mask=attn_mask) |
| preds = outputs.logits.argmax(dim=-1).cpu().numpy() |
| all_preds.extend(preds) |
| all_labels.extend(labels.numpy()) |
|
|
| return np.array(all_preds), np.array(all_labels) |
|
|
| def predict(self, X: list[str]) -> np.ndarray: |
| """Predict labels for a list of raw trace texts.""" |
| ds = TraceDataset(X, [0] * len(X), self.tokenizer, self.max_length) |
| loader = DataLoader(ds, batch_size=self.batch_size) |
| preds, _ = self._predict_loader(loader) |
| return preds |
|
|
| def predict_proba(self, X: list[str]) -> np.ndarray: |
| """Return class probabilities for a list of raw trace texts.""" |
| self.model.eval() |
| ds = TraceDataset(X, [0] * len(X), self.tokenizer, self.max_length) |
| loader = DataLoader(ds, batch_size=self.batch_size) |
|
|
| all_probs = [] |
| with torch.no_grad(): |
| for batch in loader: |
| input_ids = batch["input_ids"].to(self.device) |
| attn_mask = batch["attention_mask"].to(self.device) |
| outputs = self.model(input_ids=input_ids, attention_mask=attn_mask) |
| probs = torch.softmax(outputs.logits, dim=-1).cpu().numpy() |
| all_probs.append(probs) |
|
|
| return np.vstack(all_probs) |
|
|
| def save(self, path: str): |
| """Save model and tokenizer to directory.""" |
| os.makedirs(path, exist_ok=True) |
| self.model.save_pretrained(path) |
| self.tokenizer.save_pretrained(path) |
| |
| config = { |
| "model_name": self.model_name, |
| "max_length": self.max_length, |
| "batch_size": self.batch_size, |
| } |
| with open(os.path.join(path, "trace_config.json"), "w") as f: |
| json.dump(config, f) |
|
|
| @classmethod |
| def load(cls, path: str, device: str = None): |
| """Load a saved model from directory.""" |
| with open(os.path.join(path, "trace_config.json")) as f: |
| config = json.load(f) |
|
|
| instance = cls( |
| model_name=path, |
| max_length=config["max_length"], |
| batch_size=config["batch_size"], |
| device=device, |
| ) |
| instance.tokenizer = AutoTokenizer.from_pretrained(path) |
| instance.model = AutoModelForSequenceClassification.from_pretrained(path).to(instance.device) |
| return instance |
|
|