| """ |
| This function is adapted from [M2N2] by [Dongmin Kim et al.] |
| Original source: [https://github.com/carrtesy/M2N2] |
| Reimplemented by: [EmorZz1G] |
| """ |
| from tqdm import tqdm |
| import numpy as np |
| import pandas as pd |
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
|
|
| from .base import BaseDetector |
| from ..utils.torch_utility import EarlyStoppingTorch, get_gpu |
| from torch.utils.data import DataLoader |
| from ..utils.dataset import ReconstructDataset |
| from typing import Literal |
| |
|
|
| |
| ''' |
| Basic MLP implementation by: |
| Dongmin Kim (tommy.dm.kim@gmail.com) |
| ''' |
| class Detrender(nn.Module): |
| def __init__(self, num_features: int, gamma=0.99): |
| """ |
| :param num_features: the number of features or channels |
| :param eps: a value added for numerical stability |
| """ |
| super(Detrender, self).__init__() |
| self.num_features = num_features |
| self.gamma = gamma |
| self.mean = nn.Parameter(torch.zeros(1, 1, self.num_features), requires_grad=False) |
|
|
|
|
| def forward(self, x, mode:str): |
| if mode == 'norm': |
| x = self._normalize(x) |
| elif mode == 'denorm': |
| x = self._denormalize(x) |
| else: raise NotImplementedError |
| return x |
|
|
|
|
| def _update_statistics(self, x): |
| dim2reduce = tuple(range(0, x.ndim-1)) |
| mu = torch.mean(x, dim=dim2reduce, keepdim=True).detach() |
| self.mean.lerp_(mu, 1-self.gamma) |
|
|
| def _set_statistics(self, x:torch.Tensor): |
| self.mean = nn.Parameter(x, requires_grad=False) |
|
|
| def _normalize(self, x): |
| x = x - self.mean |
| return x |
|
|
| def _denormalize(self, x): |
| x = x + self.mean |
| return x |
|
|
|
|
| class MLP(nn.Module): |
| def __init__(self, seq_len, num_channels, latent_space_size, gamma, normalization="None"): |
| super().__init__() |
| self.L, self.C = seq_len, num_channels |
| self.encoder = Encoder(seq_len*num_channels, latent_space_size) |
| self.decoder = Decoder(seq_len*num_channels, latent_space_size) |
| self.normalization = normalization |
|
|
| if self.normalization == "Detrend": |
| self.use_normalizer = True |
| self.normalizer = Detrender(num_channels, gamma=gamma) |
| else: |
| self.use_normalizer = False |
|
|
|
|
| def forward(self, X): |
| B, L, C = X.shape |
| assert (L == self.L) and (C == self.C) |
|
|
| if self.use_normalizer: |
| X = self.normalizer(X, "norm") |
| |
| z = self.encoder(X.reshape(B, L*C)) |
| out = self.decoder(z).reshape(B, L, C) |
|
|
| if self.use_normalizer: |
| out = self.normalizer(out, "denorm") |
| return out |
|
|
|
|
| class Encoder(nn.Module): |
| def __init__(self, input_size, latent_space_size): |
| super().__init__() |
| self.linear1 = nn.Linear(input_size, input_size // 2) |
| self.relu1 = nn.ReLU() |
| self.linear2 = nn.Linear(input_size // 2, input_size // 4) |
| self.relu2 = nn.ReLU() |
| self.linear3 = nn.Linear(input_size // 4, latent_space_size) |
| self.relu3 = nn.ReLU() |
|
|
| def forward(self, x): |
| x = self.linear1(x) |
| x = self.relu1(x) |
| x = self.linear2(x) |
| x = self.relu2(x) |
| x = self.linear3(x) |
| x = self.relu3(x) |
| return x |
|
|
|
|
| class Decoder(nn.Module): |
| def __init__(self, input_size, latent_space_size): |
| super().__init__() |
| self.linear1 = nn.Linear(latent_space_size, input_size // 4) |
| self.relu1 = nn.ReLU() |
| self.linear2 = nn.Linear(input_size // 4, input_size // 2) |
| self.relu2 = nn.ReLU() |
| self.linear3 = nn.Linear(input_size // 2, input_size) |
|
|
| def forward(self, x): |
| x = self.linear1(x) |
| x = self.relu1(x) |
| x = self.linear2(x) |
| x = self.relu2(x) |
| out = self.linear3(x) |
| return out |
|
|
|
|
| class MLP_Trainer: |
| def __init__( |
| self, model, train_loader, valid_loader=None, |
| epochs=10, lr=1e-3, L2_reg=0, device='cuda' |
| ): |
| self.model = model |
| self.train_loader = train_loader |
| self.valid_loader = valid_loader |
| self.device = device |
| self.epochs = epochs |
| self.optimizer = torch.optim.AdamW( |
| params=self.model.parameters(), lr=lr, weight_decay=L2_reg) |
|
|
| def train(self): |
| train_iterator = tqdm( |
| range(1, self.epochs + 1), |
| total=self.epochs, |
| desc="training epochs", |
| leave=True |
| ) |
| if self.valid_loader is not None: |
| early_stop = EarlyStoppingTorch(patience=5) |
| for epoch in train_iterator: |
| train_stats = self.train_epoch() |
| if self.valid_loader is not None: |
| valid_loss = self.valid() |
| early_stop(valid_loss, self.model) |
| if early_stop.early_stop: |
| break |
|
|
| def train_epoch(self): |
| self.model.train() |
| train_summary = 0.0 |
| for i, batch_data in enumerate(self.train_loader): |
| train_log = self._process_batch(batch_data) |
| train_summary += train_log["summary"] |
| train_summary /= len(self.train_loader) |
| return train_summary |
|
|
| def _process_batch(self, batch_data) -> dict: |
| X = batch_data[0].to(self.device) |
| B, L, C = X.shape |
| |
| Xhat = self.model(X) |
| |
| self.optimizer.zero_grad() |
| loss = F.mse_loss(Xhat, X) |
| loss.backward() |
| self.optimizer.step() |
| out = { |
| "recon_loss": loss.item(), |
| "summary": loss.item(), |
| } |
| return out |
|
|
| @torch.no_grad() |
| def valid(self): |
| assert self.valid_loader is not None, 'cannot valid without any data' |
| self.model.eval() |
| for i, batch_data in enumerate(self.valid_loader): |
| X = batch_data[0].to(self.device) |
| Xhat = self.model(X) |
| loss = F.mse_loss(Xhat, X) |
| return loss.item() |
|
|
| class MLP_Tester: |
| def __init__(self, model, train_loader, test_loader, lr=1e-3, device='cuda'): |
| self.model = model |
| self.train_loader = train_loader |
| self.test_loader = test_loader |
| self.device = device |
| self.lr = lr |
|
|
| @torch.no_grad() |
| def offline(self, dataloader): |
| self.model.eval() |
| it = tqdm( |
| dataloader, |
| total=len(dataloader), |
| desc="offline inference", |
| leave=True |
| ) |
| recon_errors = [] |
| for i, batch_data in enumerate(it): |
| X = batch_data[0].to(self.device) |
| B, L, C = X.shape |
| Xhat = self.model(X) |
| recon_error = F.mse_loss(Xhat, X, reduction='none') |
| recon_error = recon_error.detach().cpu().numpy() |
| recon_errors.append(recon_error) |
| torch.cuda.empty_cache() |
| recon_errors = np.concatenate(recon_errors, axis=0) |
| anomaly_scores = recon_errors.mean(axis=2).reshape(-1) |
| return anomaly_scores |
|
|
| def online(self, dataloader, init_thr, normalization="None"): |
| self.model.train() |
| it = tqdm( |
| dataloader, |
| total=len(dataloader), |
| desc="online inference", |
| leave=True |
| ) |
| tau = init_thr |
| TT_optimizer = torch.optim.SGD( |
| [p for p in self.model.parameters()], lr=self.lr) |
|
|
| Xs, Xhats = [], [] |
| preds = [] |
| As, thrs = [], [] |
| update_count = 0 |
| for i, batch_data in enumerate(it): |
| X = batch_data[0].to(self.device) |
| B, L, C = X.shape |
| |
| if normalization == "Detrend": |
| self.model.normalizer._update_statistics(X) |
| |
| Xhat = self.model(X) |
| E = (Xhat-X)**2 |
| A = E.mean(dim=2) |
| |
| ytilde = (A >= tau).float() |
| pred = ytilde |
| |
| Xs.append(X) |
| Xhats.append(Xhat.clone().detach()) |
| |
| As.append(A.clone().detach()) |
| preds.append(pred.clone().detach()) |
| thrs.append(tau) |
| |
| TT_optimizer.zero_grad() |
| mask = (ytilde == 0) |
| recon_loss = (A * mask).mean() |
| recon_loss.backward() |
| TT_optimizer.step() |
| update_count += torch.sum(mask).item() |
| |
| anoscs = torch.cat(As, axis=0).reshape(-1).detach().cpu().numpy() |
| print('total update count:', update_count) |
| return anoscs |
|
|
| class M2N2(BaseDetector): |
| def __init__(self, |
| win_size=12, |
| stride=12, |
| num_channels=1, |
| batch_size=64, |
| epochs=10, |
| latent_dim=128, |
| lr=1e-3, |
| ttlr=1e-3, |
| normalization="Detrend", |
| gamma=0.99, |
| th=0.95, |
| valid_size=0.2, |
| infer_mode='online'): |
| self.model_name = 'M2N2' |
| self.normalization = normalization |
| self.device = get_gpu(True) |
| self.model = MLP( |
| seq_len=win_size, |
| num_channels=num_channels, |
| latent_space_size=latent_dim, |
| gamma=gamma, |
| normalization=normalization, |
| ).to(self.device) |
| |
| self.th = th |
| self.lr = lr |
| self.ttlr = ttlr |
| self.epochs = epochs |
| self.batch_size = batch_size |
| self.win_size = win_size |
| self.stride = stride |
| self.valid_size = valid_size |
| self.infer_mode = infer_mode |
| |
| def fit(self, data): |
| if self.valid_size is None: |
| self.train_loader = DataLoader( |
| dataset=ReconstructDataset( |
| data, window_size=self.win_size, stride=self.stride), |
| batch_size=self.batch_size, |
| shuffle=True |
| ) |
| self.valid_loader = None |
| else: |
| data_train = data[:int((1-self.valid_size)*len(data))] |
| data_valid = data[int((1-self.valid_size)*len(data)):] |
| self.train_loader = DataLoader( |
| dataset=ReconstructDataset( |
| data_train, window_size=self.win_size, stride=self.stride), |
| batch_size=self.batch_size, |
| shuffle=True |
| ) |
| self.valid_loader = DataLoader( |
| dataset=ReconstructDataset( |
| data_valid, window_size=self.win_size, stride=self.stride), |
| batch_size=self.batch_size, |
| shuffle=False, |
| ) |
|
|
| self.trainer = MLP_Trainer( |
| model=self.model, |
| train_loader=self.train_loader, |
| valid_loader=self.valid_loader, |
| epochs=self.epochs, |
| lr=self.lr, |
| device=self.device |
| ) |
| self.trainer.train() |
|
|
| self.tester = MLP_Tester( |
| model=self.model, |
| train_loader=self.train_loader, |
| test_loader=self.train_loader, |
| lr=self.ttlr, |
| device=self.device, |
| ) |
| train_anoscs = self.tester.offline(self.train_loader) |
| self.tau = np.quantile(train_anoscs, self.th) |
| print('tau', self.tau) |
|
|
| def decision_function(self, data): |
| self.test_loader = DataLoader( |
| dataset=ReconstructDataset( |
| data, window_size=self.win_size, stride=self.stride), |
| batch_size=self.batch_size, |
| shuffle=False, |
| ) |
| self.tester = MLP_Tester( |
| model=self.model, |
| train_loader=self.train_loader, |
| test_loader=self.test_loader, |
| lr=self.ttlr, |
| device=self.device, |
| ) |
| if self.infer_mode == 'online': |
| anoscs = self.tester.online( |
| self.test_loader, self.tau, |
| normalization=self.normalization) |
| else: |
| anoscs = self.tester.offline(self.test_loader) |
|
|
| self.decision_scores_ = pad_by_edge_value(anoscs, len(data), mode='right') |
| return self.decision_scores_ |
|
|
|
|
| def pad_by_edge_value(scores, target_len, mode: Literal['both', 'left', 'right']): |
| scores = np.array(scores).reshape(-1) |
| assert len(scores) <= target_len, f'the length of scores is more than target one' |
| print(f'origin length: {len(scores)}; target length: {target_len}') |
| current_len = scores.shape[0] |
| pad_total = max(target_len-current_len, 0) |
| if mode == 'left': |
| pad_before = pad_total |
| elif mode == 'right': |
| pad_before = 0 |
| else: |
| pad_before = pad_total // 2 + 1 |
| pad_after = pad_total - pad_before |
| padded_scores = np.pad(scores, (pad_before, pad_after), mode='edge') |
| return padded_scores |