on-chain-anomaly / scripts /ml_models.py
robrtt's picture
first commit
63105b7
"""
ml_models.py
Anomaly detection on MEV-Boost relay data.
Two models:
1. IsolationForest β€” fast unsupervised anomaly detection
2. Autoencoder (PyTorch) β€” learns compressed representation of normal block patterns
Both detect ANOMALOUS BUILDER BEHAVIOR β€” not individual MEV types.
An anomaly here means: a block whose builder payment, gas usage, relay visibility,
or tx density pattern is statistically unusual compared to recent history.
This is honest anomaly detection on real data, not heuristic-labeled classification.
"""
import os
import logging
from typing import Optional
import numpy as np
import pandas as pd
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
import joblib
log = logging.getLogger(__name__)
MODELS_DIR = os.path.join(os.path.dirname(__file__), "..", "models")
# ─── Isolation Forest ─────────────────────────────────────────────────────
class IsolationForestDetector:
def __init__(self, contamination: float = 0.05, n_estimators: int = 200, random_state: int = 42):
self.contamination = contamination
self.scaler = StandardScaler()
self.model = IsolationForest(
n_estimators=n_estimators,
contamination=contamination,
random_state=random_state,
n_jobs=-1,
)
self._is_fitted = False
def fit(self, X: pd.DataFrame) -> "IsolationForestDetector":
Xs = self.scaler.fit_transform(X)
self.model.fit(Xs)
self._is_fitted = True
log.info(f"IsolationForest fitted on {len(X)} samples, {X.shape[1]} features")
return self
def score(self, X: pd.DataFrame) -> np.ndarray:
"""Anomaly scores in [0, 1]. Higher = more anomalous."""
Xs = self.scaler.transform(X)
raw = self.model.score_samples(Xs)
scores = 1 - (raw - raw.min()) / (raw.max() - raw.min() + 1e-9)
return scores.astype(float)
def predict(self, X: pd.DataFrame, threshold: float = 0.6) -> np.ndarray:
return (self.score(X) >= threshold).astype(int)
def save(self, path: Optional[str] = None) -> str:
path = path or os.path.join(MODELS_DIR, "isolation_forest.joblib")
os.makedirs(os.path.dirname(path), exist_ok=True)
joblib.dump({"model": self.model, "scaler": self.scaler}, path)
log.info(f"IsolationForest saved to {path}")
return path
@classmethod
def load(cls, path: Optional[str] = None) -> "IsolationForestDetector":
path = path or os.path.join(MODELS_DIR, "isolation_forest.joblib")
obj = cls.__new__(cls)
saved = joblib.load(path)
obj.model = saved["model"]
obj.scaler = saved["scaler"]
obj._is_fitted = True
log.info(f"IsolationForest loaded from {path}")
return obj
# ─── Autoencoder ──────────────────────────────────────────────────────────
class _AE(nn.Module):
def __init__(self, input_dim: int, hidden_dims: list[int]):
super().__init__()
enc_layers = []
prev = input_dim
for h in hidden_dims:
enc_layers += [nn.Linear(prev, h), nn.ReLU()]
prev = h
self.encoder = nn.Sequential(*enc_layers)
dec_layers = []
for h in reversed(hidden_dims[:-1]):
dec_layers += [nn.Linear(prev, h), nn.ReLU()]
prev = h
dec_layers.append(nn.Linear(prev, input_dim))
self.decoder = nn.Sequential(*dec_layers)
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.decoder(self.encoder(x))
class AutoencoderDetector:
def __init__(
self,
hidden_dims: Optional[list[int]] = None,
epochs: int = 50,
lr: float = 1e-3,
batch_size: int = 64,
device: Optional[str] = None,
):
self.hidden_dims = hidden_dims or [32, 16, 8]
self.epochs = epochs
self.lr = lr
self.batch_size = batch_size
self.device = device or ("cuda" if torch.cuda.is_available() else "cpu")
self.scaler = StandardScaler()
self.model: Optional[_AE] = None
self._threshold: float = 0.0
self._is_fitted = False
def fit(self, X: pd.DataFrame) -> "AutoencoderDetector":
Xs = self.scaler.fit_transform(X).astype(np.float32)
input_dim = Xs.shape[1]
self.model = _AE(input_dim, self.hidden_dims).to(self.device)
optimizer = torch.optim.Adam(self.model.parameters(), lr=self.lr)
criterion = nn.MSELoss()
tensor = torch.tensor(Xs, device=self.device)
loader = DataLoader(TensorDataset(tensor), batch_size=self.batch_size, shuffle=True)
self.model.train()
for epoch in range(self.epochs):
epoch_loss = 0.0
for (batch,) in loader:
recon = self.model(batch)
loss = criterion(recon, batch)
optimizer.zero_grad()
loss.backward()
optimizer.step()
epoch_loss += loss.item()
if (epoch + 1) % 10 == 0:
log.info(f" Epoch {epoch+1}/{self.epochs} loss={epoch_loss/len(loader):.6f}")
errors = self._reconstruction_errors(Xs)
self._threshold = float(np.percentile(errors, 95))
self._is_fitted = True
log.info(f"Autoencoder fitted. Threshold (p95): {self._threshold:.6f}")
return self
def _reconstruction_errors(self, Xs: np.ndarray) -> np.ndarray:
self.model.eval()
with torch.no_grad():
t = torch.tensor(Xs, device=self.device)
recon = self.model(t).cpu().numpy()
return np.mean((Xs - recon) ** 2, axis=1)
def score(self, X: pd.DataFrame) -> np.ndarray:
Xs = self.scaler.transform(X).astype(np.float32)
errors = self._reconstruction_errors(Xs)
scores = errors / (self._threshold * 2 + 1e-9)
return np.clip(scores, 0, 1).astype(float)
def predict(self, X: pd.DataFrame, threshold: float = 0.5) -> np.ndarray:
return (self.score(X) >= threshold).astype(int)
def save(self, path: Optional[str] = None) -> str:
path = path or os.path.join(MODELS_DIR, "autoencoder.pt")
os.makedirs(os.path.dirname(path), exist_ok=True)
torch.save({
"model_state": self.model.state_dict(),
"scaler_mean": self.scaler.mean_,
"scaler_scale": self.scaler.scale_,
"hidden_dims": self.hidden_dims,
"threshold": self._threshold,
"input_dim": self.model.encoder[0].in_features,
}, path)
log.info(f"Autoencoder saved to {path}")
return path
@classmethod
def load(cls, path: Optional[str] = None) -> "AutoencoderDetector":
path = path or os.path.join(MODELS_DIR, "autoencoder.pt")
obj = cls.__new__(cls)
saved = torch.load(path, map_location="cpu", weights_only=False)
obj.hidden_dims = saved["hidden_dims"]
obj.device = "cpu"
obj.scaler = StandardScaler()
obj.scaler.mean_ = saved["scaler_mean"]
obj.scaler.scale_ = saved["scaler_scale"]
obj.scaler.n_features_in_ = len(saved["scaler_mean"])
obj._threshold = saved["threshold"]
obj.model = _AE(saved["input_dim"], saved["hidden_dims"])
obj.model.load_state_dict(saved["model_state"])
obj._is_fitted = True
log.info(f"Autoencoder loaded from {path}")
return obj
# ─── Ensemble ─────────────────────────────────────────────────────────────
class EnsembleDetector:
def __init__(self, if_weight: float = 0.5, ae_weight: float = 0.5):
self.if_detector = IsolationForestDetector()
self.ae_detector = AutoencoderDetector()
self.if_weight = if_weight
self.ae_weight = ae_weight
def fit(self, X: pd.DataFrame) -> "EnsembleDetector":
log.info("Training IsolationForest...")
self.if_detector.fit(X)
log.info("Training Autoencoder...")
self.ae_detector.fit(X)
return self
def score(self, X: pd.DataFrame) -> np.ndarray:
if_scores = self.if_detector.score(X)
ae_scores = self.ae_detector.score(X)
return self.if_weight * if_scores + self.ae_weight * ae_scores
def predict(self, X: pd.DataFrame, threshold: float = 0.55) -> np.ndarray:
return (self.score(X) >= threshold).astype(int)
def save(self) -> None:
self.if_detector.save()
self.ae_detector.save()
@classmethod
def load(cls) -> "EnsembleDetector":
obj = cls.__new__(cls)
obj.if_detector = IsolationForestDetector.load()
obj.ae_detector = AutoencoderDetector.load()
obj.if_weight = 0.5
obj.ae_weight = 0.5
return obj
# ─── Evaluation ───────────────────────────────────────────────────────────
def evaluate_detector(
detector,
X: pd.DataFrame,
y_true: pd.Series,
threshold: float = 0.55,
name: str = "Detector",
) -> dict:
from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_score
y_pred = detector.predict(X, threshold=threshold)
scores = detector.score(X)
metrics = {
"name": name,
"precision": round(precision_score(y_true, y_pred, zero_division=0), 4),
"recall": round(recall_score(y_true, y_pred, zero_division=0), 4),
"f1": round(f1_score(y_true, y_pred, zero_division=0), 4),
"roc_auc": round(roc_auc_score(y_true, scores), 4) if y_true.nunique() > 1 else 0.0,
"detection_rate": round(y_pred.sum() / len(y_pred), 4),
}
log.info(f"{name}: precision={metrics['precision']} recall={metrics['recall']} f1={metrics['f1']}")
return metrics
if __name__ == "__main__":
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from scripts.data_pipeline import run_pipeline, get_feature_matrix
logging.basicConfig(level=logging.INFO)
df = run_pipeline(limit_per_relay=100)
if df.empty:
print("No data β€” cannot train. Check network connectivity.")
sys.exit(1)
X = get_feature_matrix(df)
ensemble = EnsembleDetector()
ensemble.fit(X)
scores = ensemble.score(X)
print(f"\nScore distribution:")
print(f" mean={scores.mean():.4f} std={scores.std():.4f}")
print(f" min={scores.min():.4f} max={scores.max():.4f}")
print(f" anomalies (>0.55): {(scores >= 0.55).sum()} / {len(scores)}")
ensemble.save()
print("\nModels saved.")