offrails / scripts /model.py
Jog-sama's picture
complete ML pipeline: data processing, feature engineering, 3 models, evaluation, experiments
07660e7
"""
Defines all three required modeling approaches for agent trace anomaly detection:
1. NaiveBaseline : majority-class classifier (always predicts the most common label)
2. ClassicalMLModel : XGBoost gradient boosting on handcrafted trace features
3. TraceTransformer : DistilBERT fine-tuned on raw trace text as a sequence classifier
Each model exposes a consistent interface:
.fit(X, y) / .train(...)
.predict(X) : np.ndarray of labels
.predict_proba(X) : np.ndarray of probabilities
.save(path)
.load(path) (classmethod)
"""
import json
import os
from pathlib import Path
import joblib
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from sklearn.metrics import classification_report
from torch.utils.data import DataLoader, Dataset
from transformers import (
AutoModelForSequenceClassification,
AutoTokenizer,
get_linear_schedule_with_warmup,
)
from xgboost import XGBClassifier
# Naive Baseline
class NaiveBaseline:
"""
Majority-class classifier. Predicts the most frequent label in training data.
This serves as the lower-bound baseline.
"""
def __init__(self):
self.majority_class = 0
self.class_prior = 0.5
def fit(self, X, y):
"""Learn the majority class from training labels."""
y = np.array(y)
counts = np.bincount(y)
self.majority_class = int(np.argmax(counts))
self.class_prior = counts[self.majority_class] / len(y)
print(f"[NaiveBaseline] Majority class = {self.majority_class} "
f"(prior = {self.class_prior:.3f})")
return self
def predict(self, X) -> np.ndarray:
n = len(X) if hasattr(X, "__len__") else X.shape[0]
return np.full(n, self.majority_class)
def predict_proba(self, X) -> np.ndarray:
n = len(X) if hasattr(X, "__len__") else X.shape[0]
proba = np.zeros((n, 2))
proba[:, self.majority_class] = self.class_prior
proba[:, 1 - self.majority_class] = 1 - self.class_prior
return proba
def save(self, path: str):
os.makedirs(os.path.dirname(path), exist_ok=True)
joblib.dump({"majority_class": self.majority_class, "class_prior": self.class_prior}, path)
@classmethod
def load(cls, path: str):
data = joblib.load(path)
model = cls()
model.majority_class = data["majority_class"]
model.class_prior = data["class_prior"]
return model
# Classical ML Model (XGBoost)
class ClassicalMLModel:
"""
XGBoost classifier trained on handcrafted trace features.
Handles class imbalance via scale_pos_weight.
"""
def __init__(self, params: dict = None):
default_params = {
"n_estimators": 300,
"max_depth": 6,
"learning_rate": 0.1,
"subsample": 0.8,
"colsample_bytree": 0.8,
"min_child_weight": 3,
"gamma": 0.1,
"eval_metric": "logloss",
"random_state": 42,
"n_jobs": -1,
}
if params:
default_params.update(params)
self.params = default_params
self.model = None
self.feature_names = None
def fit(self, X, y, X_val=None, y_val=None):
"""
Train XGBoost with optional early stopping on validation set.
Automatically computes scale_pos_weight for class imbalance.
"""
y = np.array(y)
n_neg = np.sum(y == 0)
n_pos = np.sum(y == 1)
scale = n_neg / n_pos if n_pos > 0 else 1.0
self.params["scale_pos_weight"] = scale
print(f"[XGBoost] Training with scale_pos_weight = {scale:.2f}")
print(f"[XGBoost] Class distribution: 0={n_neg}, 1={n_pos}")
if isinstance(X, pd.DataFrame):
self.feature_names = list(X.columns)
self.model = XGBClassifier(**self.params)
fit_params = {}
if X_val is not None and y_val is not None:
fit_params["eval_set"] = [(X_val, y_val)]
self.model.fit(X, y, verbose=50, **fit_params)
return self
def predict(self, X) -> np.ndarray:
return self.model.predict(X)
def predict_proba(self, X) -> np.ndarray:
return self.model.predict_proba(X)
def get_feature_importance(self) -> pd.Series:
"""Return feature importances as a sorted Series."""
names = self.feature_names or [f"f{i}" for i in range(len(self.model.feature_importances_))]
return pd.Series(
self.model.feature_importances_, index=names
).sort_values(ascending=False)
def save(self, path: str):
os.makedirs(os.path.dirname(path), exist_ok=True)
joblib.dump({
"model": self.model,
"params": self.params,
"feature_names": self.feature_names,
}, path)
@classmethod
def load(cls, path: str):
data = joblib.load(path)
instance = cls(params=data["params"])
instance.model = data["model"]
instance.feature_names = data["feature_names"]
return instance
# Deep Learning Model (Transformer) - DistilBERT fine-tuned on raw trace text
class TraceDataset(Dataset):
"""PyTorch dataset for tokenized agent traces."""
def __init__(self, texts: list[str], labels: list[int], tokenizer, max_length: int = 512):
self.texts = texts
self.labels = labels
self.tokenizer = tokenizer
self.max_length = max_length
def __len__(self):
return len(self.texts)
def __getitem__(self, idx):
encoding = self.tokenizer(
self.texts[idx],
max_length=self.max_length,
padding="max_length",
truncation=True,
return_tensors="pt",
)
return {
"input_ids": encoding["input_ids"].squeeze(0),
"attention_mask": encoding["attention_mask"].squeeze(0),
"label": torch.tensor(self.labels[idx], dtype=torch.long),
}
class TraceTransformer:
"""
DistilBERT fine-tuned for binary classification on raw agent trace text.
Truncates traces to max_length tokens — captures the most salient parts
of the execution trace for anomaly signals.
"""
def __init__(
self,
model_name: str = "distilbert-base-uncased",
max_length: int = 512,
batch_size: int = 16,
learning_rate: float = 2e-5,
num_epochs: int = 3,
warmup_ratio: float = 0.1,
device: str = None,
):
self.model_name = model_name
self.max_length = max_length
self.batch_size = batch_size
self.learning_rate = learning_rate
self.num_epochs = num_epochs
self.warmup_ratio = warmup_ratio
self.device = device or ("cuda" if torch.cuda.is_available() else "cpu")
self.tokenizer = None
self.model = None
def _init_model(self):
"""Lazy-init tokenizer and model."""
self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
self.model = AutoModelForSequenceClassification.from_pretrained(
self.model_name, num_labels=2
).to(self.device)
def fit(self, X_train: list[str], y_train: list[int],
X_val: list[str] = None, y_val: list[int] = None):
"""
Fine-tune DistilBERT on trace text.
Handles class imbalance via weighted cross-entropy.
"""
self._init_model()
# compute class weights
y_arr = np.array(y_train)
n_neg = np.sum(y_arr == 0)
n_pos = np.sum(y_arr == 1)
weight_pos = n_neg / n_pos if n_pos > 0 else 1.0
class_weights = torch.tensor([1.0, weight_pos], dtype=torch.float).to(self.device)
loss_fn = nn.CrossEntropyLoss(weight=class_weights)
print(f"[Transformer] Training on {len(X_train)} samples, device={self.device}")
print(f"[Transformer] Class weights: [1.0, {weight_pos:.2f}]")
train_ds = TraceDataset(X_train, y_train, self.tokenizer, self.max_length)
train_loader = DataLoader(train_ds, batch_size=self.batch_size, shuffle=True)
val_loader = None
if X_val and y_val:
val_ds = TraceDataset(X_val, y_val, self.tokenizer, self.max_length)
val_loader = DataLoader(val_ds, batch_size=self.batch_size)
optimizer = torch.optim.AdamW(self.model.parameters(), lr=self.learning_rate)
total_steps = len(train_loader) * self.num_epochs
warmup_steps = int(total_steps * self.warmup_ratio)
scheduler = get_linear_schedule_with_warmup(
optimizer, num_warmup_steps=warmup_steps, num_training_steps=total_steps
)
best_val_f1 = 0.0
best_state = None
for epoch in range(self.num_epochs):
self.model.train()
total_loss = 0.0
n_correct = 0
n_total = 0
for batch in train_loader:
input_ids = batch["input_ids"].to(self.device)
attn_mask = batch["attention_mask"].to(self.device)
labels = batch["label"].to(self.device)
outputs = self.model(input_ids=input_ids, attention_mask=attn_mask)
loss = loss_fn(outputs.logits, labels)
optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0)
optimizer.step()
scheduler.step()
total_loss += loss.item()
preds = outputs.logits.argmax(dim=-1)
n_correct += (preds == labels).sum().item()
n_total += len(labels)
train_acc = n_correct / n_total
avg_loss = total_loss / len(train_loader)
print(f" Epoch {epoch + 1}/{self.num_epochs} — "
f"loss: {avg_loss:.4f}, train_acc: {train_acc:.4f}", end="")
# validation
if val_loader:
val_preds, val_labels = self._predict_loader(val_loader)
from sklearn.metrics import f1_score
val_f1 = f1_score(val_labels, val_preds, average="macro")
print(f", val_f1: {val_f1:.4f}")
if val_f1 > best_val_f1:
best_val_f1 = val_f1
best_state = {k: v.cpu().clone() for k, v in self.model.state_dict().items()}
else:
print()
# restore best model
if best_state:
self.model.load_state_dict(best_state)
self.model.to(self.device)
print(f"[Transformer] Restored best model (val_f1 = {best_val_f1:.4f})")
return self
def _predict_loader(self, loader):
"""Run inference on a dataloader, return predictions and labels."""
self.model.eval()
all_preds = []
all_labels = []
with torch.no_grad():
for batch in loader:
input_ids = batch["input_ids"].to(self.device)
attn_mask = batch["attention_mask"].to(self.device)
labels = batch["label"]
outputs = self.model(input_ids=input_ids, attention_mask=attn_mask)
preds = outputs.logits.argmax(dim=-1).cpu().numpy()
all_preds.extend(preds)
all_labels.extend(labels.numpy())
return np.array(all_preds), np.array(all_labels)
def predict(self, X: list[str]) -> np.ndarray:
"""Predict labels for a list of raw trace texts."""
ds = TraceDataset(X, [0] * len(X), self.tokenizer, self.max_length)
loader = DataLoader(ds, batch_size=self.batch_size)
preds, _ = self._predict_loader(loader)
return preds
def predict_proba(self, X: list[str]) -> np.ndarray:
"""Return class probabilities for a list of raw trace texts."""
self.model.eval()
ds = TraceDataset(X, [0] * len(X), self.tokenizer, self.max_length)
loader = DataLoader(ds, batch_size=self.batch_size)
all_probs = []
with torch.no_grad():
for batch in loader:
input_ids = batch["input_ids"].to(self.device)
attn_mask = batch["attention_mask"].to(self.device)
outputs = self.model(input_ids=input_ids, attention_mask=attn_mask)
probs = torch.softmax(outputs.logits, dim=-1).cpu().numpy()
all_probs.append(probs)
return np.vstack(all_probs)
def save(self, path: str):
"""Save model and tokenizer to directory."""
os.makedirs(path, exist_ok=True)
self.model.save_pretrained(path)
self.tokenizer.save_pretrained(path)
# save config
config = {
"model_name": self.model_name,
"max_length": self.max_length,
"batch_size": self.batch_size,
}
with open(os.path.join(path, "trace_config.json"), "w") as f:
json.dump(config, f)
@classmethod
def load(cls, path: str, device: str = None):
"""Load a saved model from directory."""
with open(os.path.join(path, "trace_config.json")) as f:
config = json.load(f)
instance = cls(
model_name=path, # load from local dir
max_length=config["max_length"],
batch_size=config["batch_size"],
device=device,
)
instance.tokenizer = AutoTokenizer.from_pretrained(path)
instance.model = AutoModelForSequenceClassification.from_pretrained(path).to(instance.device)
return instance