""" This function is adapted from [donut] by [haowen-xu] Original source: [https://github.com/NetManAIOps/donut] """ from typing import Dict import numpy as np import torchinfo import torch from torch import nn, optim import tqdm import os, math import torch.nn.functional as F from torch.utils.data import DataLoader from typing import Tuple, Sequence, Union, Callable from ..utils.torch_utility import EarlyStoppingTorch, get_gpu from ..utils.dataset import ReconstructDataset class DonutModel(nn.Module): def __init__(self, input_dim, hidden_dim, latent_dim, mask_prob) -> None: super().__init__() """ Xu2018 :param input_dim: Should be window_size * features :param hidden_dims: :param latent_dim: """ self.latent_dim = latent_dim self.mask_prob = mask_prob encoder = VaeEncoder(input_dim, hidden_dim, latent_dim) decoder = VaeEncoder(latent_dim, hidden_dim, input_dim) self.vae = VAE(encoder=encoder, decoder=decoder, logvar_out=False) def forward(self, inputs: torch.Tensor) -> Tuple[torch.Tensor, ...]: # x: (B, T, D) x = inputs B, T, D = x.shape if self.training: # Randomly mask some inputs mask = torch.empty_like(x) mask.bernoulli_(1 - self.mask_prob) x = x * mask else: mask = None # Run the VAE x = x.view(x.shape[0], -1) mean_z, std_z, mean_x, std_x, sample_z = self.vae(x, return_latent_sample=True) # Reshape the outputs mean_x = mean_x.view(B, T, D) std_x = std_x.view(B, T, D) return mean_z, std_z, mean_x, std_x, sample_z, mask def sample_normal(mu: torch.Tensor, std_or_log_var: torch.Tensor, log_var: bool = False, num_samples: int = 1): # ln(σ) = 0.5 * ln(σ^2) -> σ = e^(0.5 * ln(σ^2)) if log_var: sigma = std_or_log_var.mul(0.5).exp_() else: sigma = std_or_log_var if num_samples == 1: eps = torch.randn_like(mu) # also copies device from mu else: eps = torch.rand((num_samples,) + mu.shape, dtype=mu.dtype, device=mu.device) mu = mu.unsqueeze(0) sigma = sigma.unsqueeze(0) # z = μ + σ * ϵ, with ϵ ~ N(0,I) return eps.mul(sigma).add_(mu) def normal_standard_normal_kl(mean: torch.Tensor, std_or_log_var: torch.Tensor, log_var: bool = False) -> torch.Tensor: if log_var: kl_loss = torch.sum(1 + std_or_log_var - mean.pow(2) - std_or_log_var.exp(), dim=-1) else: kl_loss = torch.sum(1 + torch.log(std_or_log_var.pow(2)) - mean.pow(2) - std_or_log_var.pow(2), dim=-1) return -0.5 * kl_loss def normal_normal_kl(mean_1: torch.Tensor, std_or_log_var_1: torch.Tensor, mean_2: torch.Tensor, std_or_log_var_2: torch.Tensor, log_var: bool = False) -> torch.Tensor: if log_var: return 0.5 * torch.sum(std_or_log_var_2 - std_or_log_var_1 + (torch.exp(std_or_log_var_1) + (mean_1 - mean_2)**2) / torch.exp(std_or_log_var_2) - 1, dim=-1) return torch.sum(torch.log(std_or_log_var_2) - torch.log(std_or_log_var_1) \ + 0.5 * (std_or_log_var_1**2 + (mean_1 - mean_2)**2) / std_or_log_var_2**2 - 0.5, dim=-1) class VAELoss(torch.nn.modules.loss._Loss): def __init__(self, size_average=None, reduce=None, reduction: str = 'mean', logvar_out: bool = True): super(VAELoss, self).__init__(size_average, reduce, reduction) self.logvar_out = logvar_out def forward(self, predictions: Tuple[torch.Tensor, ...], targets: Tuple[torch.Tensor, ...], *args, **kwargs) \ -> torch.Tensor: z_mean, z_std_or_log_var, x_dec_mean, x_dec_std = predictions[:4] if len(predictions) > 4: z_prior_mean, z_prior_std_or_logvar = predictions[4:] else: z_prior_mean, z_prior_std_or_logvar = None, None y, = targets # Gaussian nnl loss assumes multivariate normal with diagonal sigma # Alternatively we can use torch.distribution.Normal(x_dec_mean, x_dec_std).log_prob(y).sum(-1) # or torch.distribution.MultivariateNormal(mean, cov).log_prob(y).sum(-1) # with cov = torch.eye(feat_dim).repeat([1,bz,1,1])*std.pow(2).unsqueeze(-1). # However setting up a distribution seems to be an unnecessary computational overhead. # However, this requires pytorch version > 1.9!!! nll_gauss = F.gaussian_nll_loss(x_dec_mean, y, x_dec_std.pow(2), reduction='none').sum(-1) # For pytorch version < 1.9 use: # nll_gauss = -torch.distribution.Normal(x_dec_mean, x_dec_std).log_prob(y).sum(-1) # get KL loss if z_prior_mean is None and z_prior_std_or_logvar is None: # If a prior is not given, we assume standard normal kl_loss = normal_standard_normal_kl(z_mean, z_std_or_log_var, log_var=self.logvar_out) else: if z_prior_mean is None: z_prior_mean = torch.tensor(0, dtype=z_mean.dtype, device=z_mean.device) if z_prior_std_or_logvar is None: value = 0 if self.logvar_out else 1 z_prior_std_or_logvar = torch.tensor(value, dtype=z_std_or_log_var.dtype, device=z_std_or_log_var.device) kl_loss = normal_normal_kl(z_mean, z_std_or_log_var, z_prior_mean, z_prior_std_or_logvar, log_var=self.logvar_out) # Combine final_loss = nll_gauss + kl_loss if self.reduction == 'none': return final_loss elif self.reduction == 'mean': return torch.mean(final_loss) elif self.reduction == 'sum': return torch.sum(final_loss) class MaskedVAELoss(VAELoss): def __init__(self, size_average=None, reduce=None, reduction: str = 'mean'): super(MaskedVAELoss, self).__init__(size_average, reduce, reduction, logvar_out=False) def forward(self, predictions: Tuple[torch.Tensor, ...], targets: Tuple[torch.Tensor, ...], *args, **kwargs) \ -> torch.Tensor: mean_z, std_z, mean_x, std_x, sample_z, mask = predictions actual_x, = targets if mask is None: mean_z = mean_z.unsqueeze(1) std_z = std_z.unsqueeze(1) return super(MaskedVAELoss, self).forward((mean_z, std_z, mean_x, std_x), (actual_x,), *args, **kwargs) # If the loss is masked, one of the terms in the kl loss is weighted, so we can't compute it exactly # anymore and have to use a MC approximation like for the output likelihood nll_output = torch.sum(mask * F.gaussian_nll_loss(mean_x, actual_x, std_x**2, reduction='none'), dim=-1) # This is p(z), i.e., the prior likelihood of Z. The paper assumes p(z) = N(z| 0, I), we drop constants beta = torch.mean(mask, dim=(1, 2)).unsqueeze(-1) nll_prior = beta * 0.5 * torch.sum(sample_z * sample_z, dim=-1, keepdim=True) nll_approx = torch.sum(F.gaussian_nll_loss(mean_z, sample_z, std_z**2, reduction='none'), dim=-1, keepdim=True) final_loss = nll_output + nll_prior - nll_approx if self.reduction == 'none': return final_loss elif self.reduction == 'mean': return torch.mean(final_loss) elif self.reduction == 'sum': return torch.sum(final_loss) class MLP(torch.nn.Module): def __init__(self, input_features: int, hidden_layers: Union[int, Sequence[int]], output_features: int, activation: Callable = torch.nn.Identity(), activation_after_last_layer: bool = False): super(MLP, self).__init__() self.activation = activation self.activation_after_last_layer = activation_after_last_layer if isinstance(hidden_layers, int): hidden_layers = [hidden_layers] layers = [input_features] + list(hidden_layers) + [output_features] self.layers = torch.nn.ModuleList([torch.nn.Linear(inp, out) for inp, out in zip(layers[:-1], layers[1:])]) def forward(self, x: torch.Tensor) -> torch.Tensor: out = x for layer in self.layers[:-1]: out = layer(out) out = self.activation(out) out = self.layers[-1](out) if self.activation_after_last_layer: out = self.activation(out) return out class VaeEncoder(nn.Module): def __init__(self, input_dim: int, hidden_dim: int, latent_dim: int): super(VaeEncoder, self).__init__() self.latent_dim = latent_dim self.mlp = MLP(input_dim, hidden_dim, 2*latent_dim, activation=torch.nn.ReLU(), activation_after_last_layer=False) self.softplus = torch.nn.Softplus() def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]: # x: (B, T, D) mlp_out = self.mlp(x) mean, std = mlp_out.tensor_split(2, dim=-1) std = self.softplus(std) return mean, std class VAE(torch.nn.Module): """ VAE Implementation that supports normal distribution with diagonal cov matrix in the latent space and the output """ def __init__(self, encoder: torch.nn.Module, decoder: torch.nn.Module, logvar_out: bool = True): super(VAE, self).__init__() self.encoder = encoder self.decoder = decoder self.log_var = logvar_out def forward(self, x: torch.Tensor, return_latent_sample: bool = False, num_samples: int = 1, force_sample: bool = False) -> Tuple[torch.Tensor, ...]: z_mu, z_std_or_log_var = self.encoder(x) if self.training or num_samples > 1 or force_sample: z_sample = sample_normal(z_mu, z_std_or_log_var, log_var=self.log_var, num_samples=num_samples) else: z_sample = z_mu x_dec_mean, x_dec_std = self.decoder(z_sample) if not return_latent_sample: return z_mu, z_std_or_log_var, x_dec_mean, x_dec_std return z_mu, z_std_or_log_var, x_dec_mean, x_dec_std, z_sample class Donut(): def __init__(self, win_size=120, input_c=1, batch_size=128, # 32, 128 grad_clip=10.0, num_epochs=50, mc_samples=1024, hidden_dim=100, latent_dim=8, inject_ratio=0.01, lr=1e-4, l2_coff=1e-3, patience=3, validation_size=0): super().__init__() self.__anomaly_score = None self.cuda = True self.device = get_gpu(self.cuda) self.win_size = win_size self.input_c = input_c self.batch_size = batch_size self.grad_clip = grad_clip self.num_epochs = num_epochs self.mc_samples = mc_samples self.validation_size = validation_size input_dim = self.win_size*self.input_c self.model = DonutModel(input_dim=input_dim, hidden_dim=hidden_dim, latent_dim=latent_dim, mask_prob=inject_ratio).to(self.device) self.optimizer = optim.AdamW(self.model.parameters(), lr=lr, weight_decay=l2_coff) self.scheduler = optim.lr_scheduler.StepLR(self.optimizer, step_size=10, gamma=0.75) self.vaeloss = MaskedVAELoss() self.save_path = None self.early_stopping = EarlyStoppingTorch(save_path=self.save_path, patience=patience) def train(self, train_loader, epoch): self.model.train(mode=True) avg_loss = 0 loop = tqdm.tqdm(enumerate(train_loader),total=len(train_loader),leave=True) for idx, (x, target) in loop: x, target = x.to(self.device), target.to(self.device) self.optimizer.zero_grad() # print('x: ', x.shape) output = self.model(x) loss = self.vaeloss(output, (target,)) loss.backward() torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.grad_clip) self.optimizer.step() avg_loss += loss.cpu().item() loop.set_description(f'Training Epoch [{epoch}/{self.num_epochs}]') loop.set_postfix(loss=loss.item(), avg_loss=avg_loss/(idx+1)) return avg_loss/max(len(train_loader), 1) def valid(self, valid_loader, epoch): self.model.eval() avg_loss = 0 loop = tqdm.tqdm(enumerate(valid_loader),total=len(valid_loader),leave=True) with torch.no_grad(): for idx, (x, target) in loop: x, target = x.to(self.device), target.to(self.device) output = self.model(x) loss = self.vaeloss(output, (target,)) avg_loss += loss.cpu().item() loop.set_description(f'Validation Epoch [{epoch}/{self.num_epochs}]') loop.set_postfix(loss=loss.item(), avg_loss=avg_loss/(idx+1)) return avg_loss/max(len(valid_loader), 1) def fit(self, data): tsTrain = data[:int((1-self.validation_size)*len(data))] tsValid = data[int((1-self.validation_size)*len(data)):] train_loader = DataLoader( dataset=ReconstructDataset(tsTrain, window_size=self.win_size), batch_size=self.batch_size, shuffle=True ) valid_loader = DataLoader( dataset=ReconstructDataset(tsValid, window_size=self.win_size), batch_size=self.batch_size, shuffle=False ) for epoch in range(1, self.num_epochs + 1): train_loss = self.train(train_loader, epoch) if len(valid_loader) > 0: valid_loss = self.valid(valid_loader, epoch) self.scheduler.step() if len(valid_loader) > 0: self.early_stopping(valid_loss, self.model) else: self.early_stopping(train_loss, self.model) if self.early_stopping.early_stop: print(" Early stopping<<<") break def decision_function(self, data): test_loader = DataLoader( dataset=ReconstructDataset(data, window_size=self.win_size), batch_size=self.batch_size, shuffle=False ) self.model.eval() scores = [] loop = tqdm.tqdm(enumerate(test_loader),total=len(test_loader),leave=True) with torch.no_grad(): for idx, (x, _) in loop: x = x.to(self.device) x_vae = x.view(x.shape[0], -1) B, T, D = x.shape res = self.model.vae(x_vae, return_latent_sample=False, num_samples=self.mc_samples) z_mu, z_std, x_dec_mean, x_dec_std = res x_dec_mean = x_dec_mean.view(self.mc_samples, B, T, D) x_dec_std = x_dec_std.view(self.mc_samples, B, T, D) nll_output = torch.sum(F.gaussian_nll_loss(x_dec_mean[:, :, -1, :], x[:, -1, :].unsqueeze(0), x_dec_std[:, :, -1, :]**2, reduction='none'), dim=(0, 2)) nll_output /= self.mc_samples scores.append(nll_output.cpu()) loop.set_description(f'Testing: ') scores = torch.cat(scores, dim=0) scores = scores.numpy() assert scores.ndim == 1 import shutil if self.save_path and os.path.exists(self.save_path): shutil.rmtree(self.save_path) self.__anomaly_score = scores if self.__anomaly_score.shape[0] < len(data): self.__anomaly_score = np.array([self.__anomaly_score[0]]*math.ceil((self.win_size-1)/2) + list(self.__anomaly_score) + [self.__anomaly_score[-1]]*((self.win_size-1)//2)) return self.__anomaly_score def anomaly_score(self) -> np.ndarray: return self.__anomaly_score def get_y_hat(self) -> np.ndarray: return super().get_y_hat def param_statistic(self, save_file): model_stats = torchinfo.summary(self.model, (self.batch_size, self.win_size), verbose=0) with open(save_file, 'w') as f: f.write(str(model_stats))