| """ |
| This function is adapted from [TranAD] by [imperial-qore] |
| Original source: [https://github.com/imperial-qore/TranAD] |
| """ |
|
|
| from __future__ import division |
| from __future__ import print_function |
|
|
| import numpy as np |
| import math |
| import torch |
| import torch.nn.functional as F |
| from sklearn.utils import check_array |
| from sklearn.utils.validation import check_is_fitted |
| from torch import nn |
| from torch.nn import TransformerEncoder |
| from torch.nn import TransformerDecoder |
| from torch.utils.data import DataLoader |
| from sklearn.preprocessing import MinMaxScaler |
| import tqdm |
|
|
| from .base import BaseDetector |
| from ..utils.dataset import ReconstructDataset |
| from ..utils.torch_utility import EarlyStoppingTorch, get_gpu |
|
|
| class PositionalEncoding(nn.Module): |
| def __init__(self, d_model, dropout=0.1, max_len=5000): |
| super(PositionalEncoding, self).__init__() |
| self.dropout = nn.Dropout(p=dropout) |
|
|
| pe = torch.zeros(max_len, d_model) |
| position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) |
| div_term = torch.exp( |
| torch.arange(0, d_model).float() * (-math.log(10000.0) / d_model) |
| ) |
| pe += torch.sin(position * div_term) |
| pe += torch.cos(position * div_term) |
| pe = pe.unsqueeze(0).transpose(0, 1) |
| self.register_buffer("pe", pe) |
|
|
| def forward(self, x, pos=0): |
| x = x + self.pe[pos : pos + x.size(0), :] |
| return self.dropout(x) |
|
|
| class TransformerEncoderLayer(nn.Module): |
| def __init__(self, d_model, nhead, dim_feedforward=16, dropout=0): |
| super(TransformerEncoderLayer, self).__init__() |
| self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout) |
| self.linear1 = nn.Linear(d_model, dim_feedforward) |
| self.dropout = nn.Dropout(dropout) |
| self.linear2 = nn.Linear(dim_feedforward, d_model) |
| self.dropout1 = nn.Dropout(dropout) |
| self.dropout2 = nn.Dropout(dropout) |
|
|
| self.activation = nn.LeakyReLU(True) |
|
|
| def forward(self, src, *args, **kwargs): |
| src2 = self.self_attn(src, src, src)[0] |
| src = src + self.dropout1(src2) |
| src2 = self.linear2(self.dropout(self.activation(self.linear1(src)))) |
| src = src + self.dropout2(src2) |
| return src |
|
|
| class TransformerDecoderLayer(nn.Module): |
| def __init__(self, d_model, nhead, dim_feedforward=16, dropout=0): |
| super(TransformerDecoderLayer, self).__init__() |
| self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout) |
| self.multihead_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout) |
| self.linear1 = nn.Linear(d_model, dim_feedforward) |
| self.dropout = nn.Dropout(dropout) |
| self.linear2 = nn.Linear(dim_feedforward, d_model) |
| self.dropout1 = nn.Dropout(dropout) |
| self.dropout2 = nn.Dropout(dropout) |
| self.dropout3 = nn.Dropout(dropout) |
|
|
| self.activation = nn.LeakyReLU(True) |
|
|
| def forward(self, tgt, memory, *args, **kwargs): |
| tgt2 = self.self_attn(tgt, tgt, tgt)[0] |
| tgt = tgt + self.dropout1(tgt2) |
| tgt2 = self.multihead_attn(tgt, memory, memory)[0] |
| tgt = tgt + self.dropout2(tgt2) |
| tgt2 = self.linear2(self.dropout(self.activation(self.linear1(tgt)))) |
| tgt = tgt + self.dropout3(tgt2) |
| return tgt |
|
|
| class TranADModel(nn.Module): |
| def __init__(self, batch_size, feats, win_size): |
| super(TranADModel, self).__init__() |
| self.name = "TranAD" |
| self.batch = batch_size |
| self.n_feats = feats |
| self.n_window = win_size |
| self.n = self.n_feats * self.n_window |
| self.pos_encoder = PositionalEncoding(2 * feats, 0.1, self.n_window) |
| encoder_layers = TransformerEncoderLayer( |
| d_model=2 * feats, nhead=feats, dim_feedforward=16, dropout=0.1 |
| ) |
| self.transformer_encoder = TransformerEncoder(encoder_layers, 1) |
| decoder_layers1 = TransformerDecoderLayer( |
| d_model=2 * feats, nhead=feats, dim_feedforward=16, dropout=0.1 |
| ) |
| self.transformer_decoder1 = TransformerDecoder(decoder_layers1, 1) |
| decoder_layers2 = TransformerDecoderLayer( |
| d_model=2 * feats, nhead=feats, dim_feedforward=16, dropout=0.1 |
| ) |
| self.transformer_decoder2 = TransformerDecoder(decoder_layers2, 1) |
| self.fcn = nn.Sequential(nn.Linear(2 * feats, feats), nn.Sigmoid()) |
|
|
| def encode(self, src, c, tgt): |
| src = torch.cat((src, c), dim=2) |
| src = src * math.sqrt(self.n_feats) |
| src = self.pos_encoder(src) |
| memory = self.transformer_encoder(src) |
| tgt = tgt.repeat(1, 1, 2) |
| return tgt, memory |
|
|
| def forward(self, src, tgt): |
| |
| c = torch.zeros_like(src) |
| x1 = self.fcn(self.transformer_decoder1(*self.encode(src, c, tgt))) |
| |
| c = (x1 - src) ** 2 |
| x2 = self.fcn(self.transformer_decoder2(*self.encode(src, c, tgt))) |
| return x1, x2 |
|
|
|
|
| class TranAD(BaseDetector): |
| def __init__(self, |
| win_size = 100, |
| feats = 1, |
| batch_size = 128, |
| epochs = 50, |
| patience = 3, |
| lr = 1e-4, |
| validation_size=0.2 |
| ): |
| super().__init__() |
|
|
| self.__anomaly_score = None |
|
|
| self.cuda = True |
| self.device = get_gpu(self.cuda) |
|
|
| self.win_size = win_size |
| self.batch_size = batch_size |
| self.epochs = epochs |
| self.feats = feats |
| self.validation_size = validation_size |
|
|
| self.model = TranADModel(batch_size=self.batch_size, feats=self.feats, win_size=self.win_size).to(self.device) |
| self.optimizer = torch.optim.AdamW( |
| self.model.parameters(), lr=lr, weight_decay=1e-5 |
| ) |
| self.scheduler = torch.optim.lr_scheduler.StepLR(self.optimizer, 5, 0.9) |
| self.criterion = nn.MSELoss() |
|
|
| self.early_stopping = EarlyStoppingTorch(None, patience=patience) |
|
|
| def fit(self, data): |
| tsTrain = data[:int((1-self.validation_size)*len(data))] |
| tsValid = data[int((1-self.validation_size)*len(data)):] |
|
|
| train_loader = DataLoader( |
| dataset=ReconstructDataset(tsTrain, window_size=self.win_size), |
| batch_size=self.batch_size, |
| shuffle=True |
| ) |
| |
| valid_loader = DataLoader( |
| dataset=ReconstructDataset(tsValid, window_size=self.win_size), |
| batch_size=self.batch_size, |
| shuffle=False |
| ) |
| |
| for epoch in range(1, self.epochs + 1): |
| self.model.train(mode=True) |
| avg_loss = 0 |
| loop = tqdm.tqdm( |
| enumerate(train_loader), total=len(train_loader), leave=True |
| ) |
| for idx, (x, _) in loop: |
| if torch.isnan(x).any() or torch.isinf(x).any(): |
| print("Input data contains nan or inf") |
| x = torch.nan_to_num(x) |
|
|
| x = x.to(self.device) |
| bs = x.shape[0] |
| x = x.permute(1, 0, 2) |
| elem = x[-1, :, :].view(1, bs, self.feats) |
|
|
| self.optimizer.zero_grad() |
| z = self.model(x, elem) |
| loss = (1 / epoch) * self.criterion(z[0], elem) + (1 - 1 / epoch) * self.criterion(z[1], elem) |
| loss.backward(retain_graph=True) |
|
|
| self.optimizer.step() |
| avg_loss += loss.cpu().item() |
| loop.set_description(f"Training Epoch [{epoch}/{self.epochs}]") |
| loop.set_postfix(loss=loss.item(), avg_loss=avg_loss / (idx + 1)) |
|
|
| if torch.isnan(loss): |
| print(f"Loss is nan at epoch {epoch}") |
| break |
|
|
| if len(valid_loader) > 0: |
| self.model.eval() |
| avg_loss_val = 0 |
| loop = tqdm.tqdm( |
| enumerate(valid_loader), total=len(valid_loader), leave=True |
| ) |
| with torch.no_grad(): |
| for idx, (x, _) in loop: |
|
|
| if torch.isnan(x).any() or torch.isinf(x).any(): |
| print("Input data contains nan or inf") |
| x = torch.nan_to_num(x) |
|
|
| x = x.to(self.device) |
| |
| bs = x.shape[0] |
| x = x.permute(1, 0, 2) |
| elem = x[-1, :, :].view(1, bs, self.feats) |
|
|
| self.optimizer.zero_grad() |
| z = self.model(x, elem) |
| loss = (1 / epoch) * self.criterion(z[0], elem) + ( |
| 1 - 1 / epoch |
| ) * self.criterion(z[1], elem) |
|
|
| avg_loss_val += loss.cpu().item() |
| loop.set_description(f"Validation Epoch [{epoch}/{self.epochs}]") |
| loop.set_postfix(loss=loss.item(), avg_loss_val=avg_loss_val / (idx + 1)) |
|
|
| self.scheduler.step() |
| if len(valid_loader) > 0: |
| avg_loss = avg_loss_val / len(valid_loader) |
| else: |
| avg_loss = avg_loss / len(train_loader) |
| self.early_stopping(avg_loss, self.model) |
| if self.early_stopping.early_stop: |
| print(" Early stopping<<<") |
| break |
|
|
| def decision_function(self, data): |
| test_loader = DataLoader( |
| dataset=ReconstructDataset(data, window_size=self.win_size), |
| batch_size=self.batch_size, |
| shuffle=False |
| ) |
|
|
| self.model.eval() |
| scores = [] |
| loop = tqdm.tqdm(enumerate(test_loader), total=len(test_loader), leave=True) |
| with torch.no_grad(): |
| for idx, (x, _) in loop: |
| x = x.to(self.device) |
| bs = x.shape[0] |
| x = x.permute(1, 0, 2) |
| elem = x[-1, :, :].view(1, bs, self.feats) |
| |
| _, z = self.model(x, elem) |
|
|
| loss = torch.mean(F.mse_loss(z, elem, reduction="none")[0], axis=-1) |
| scores.append(loss.cpu()) |
|
|
| scores = torch.cat(scores, dim=0) |
| scores = scores.numpy() |
|
|
| self.__anomaly_score = scores |
|
|
| if self.__anomaly_score.shape[0] < len(data): |
| self.__anomaly_score = np.array([self.__anomaly_score[0]]*math.ceil((self.win_size-1)/2) + |
| list(self.__anomaly_score) + [self.__anomaly_score[-1]]*((self.win_size-1)//2)) |
| |
| return self.__anomaly_score |
|
|
| def anomaly_score(self) -> np.ndarray: |
| return self.__anomaly_score |
|
|
| def param_statistic(self, save_file): |
| pass |
|
|