| """ |
| This function is adapted from [pyod] by [yzhao062] |
| Original source: [https://github.com/yzhao062/pyod] |
| """ |
|
|
| from __future__ import division |
| from __future__ import print_function |
|
|
| import numpy as np |
| import torch, math |
| from sklearn.utils import check_array |
| from sklearn.utils.validation import check_is_fitted |
| from torch import nn |
| from sklearn.preprocessing import MinMaxScaler |
|
|
| from .feature import Window |
| from .base import BaseDetector |
| from ..utils.stat_models import pairwise_distances_no_broadcast |
| from ..utils.dataset import TSDataset |
| from ..utils.utility import get_activation_by_name |
|
|
| class InnerAutoencoder(nn.Module): |
| def __init__(self, |
| n_features, |
| hidden_neurons=(128, 64), |
| dropout_rate=0.2, |
| batch_norm=True, |
| hidden_activation='relu'): |
|
|
| |
| super(InnerAutoencoder, self).__init__() |
|
|
| |
| self.n_features = n_features |
| self.dropout_rate = dropout_rate |
| self.batch_norm = batch_norm |
| self.hidden_activation = hidden_activation |
|
|
| |
| self.layers_neurons_encoder_ = [self.n_features, *hidden_neurons] |
| self.layers_neurons_decoder_ = self.layers_neurons_encoder_[::-1] |
|
|
| |
| self.activation = get_activation_by_name(hidden_activation) |
|
|
| |
| self.encoder = nn.Sequential() |
| self.decoder = nn.Sequential() |
|
|
| |
| for idx, layer in enumerate(self.layers_neurons_encoder_[:-1]): |
|
|
| |
| self.encoder.add_module( |
| "linear" + str(idx), |
| torch.nn.Linear(layer,self.layers_neurons_encoder_[idx + 1])) |
|
|
| |
| if batch_norm: |
| self.encoder.add_module("batch_norm" + str(idx), |
| nn.BatchNorm1d(self.layers_neurons_encoder_[idx + 1])) |
|
|
| |
| self.encoder.add_module(self.hidden_activation + str(idx), |
| self.activation) |
|
|
| |
| self.encoder.add_module("dropout" + str(idx), |
| torch.nn.Dropout(dropout_rate)) |
|
|
| |
| for idx, layer in enumerate(self.layers_neurons_decoder_[:-1]): |
|
|
| |
| self.decoder.add_module( |
| "linear" + str(idx), |
| torch.nn.Linear(layer,self.layers_neurons_decoder_[idx + 1])) |
|
|
| |
| |
| if batch_norm and idx < len(self.layers_neurons_decoder_[:-1]) - 1: |
| self.decoder.add_module("batch_norm" + str(idx), |
| nn.BatchNorm1d(self.layers_neurons_decoder_[idx + 1])) |
|
|
| |
| self.decoder.add_module(self.hidden_activation + str(idx), |
| self.activation) |
|
|
| |
| if idx < len(self.layers_neurons_decoder_[:-1]) - 1: |
| self.decoder.add_module("dropout" + str(idx), |
| torch.nn.Dropout(dropout_rate)) |
|
|
| def forward(self, x): |
| |
| |
| x = self.encoder(x) |
| x = self.decoder(x) |
| return x |
|
|
| class AutoEncoder(BaseDetector): |
| """Auto Encoder (AE) is a type of neural networks for learning useful data |
| representations in an unsupervised manner. Similar to PCA, AE could be used |
| to detect outlying objects in the data by calculating the reconstruction |
| errors. See :cite:`aggarwal2015outlier` Chapter 3 for details. |
| |
| Notes |
| ----- |
| This is the PyTorch version of AutoEncoder. |
| The documentation is not finished! |
| |
| Parameters |
| ---------- |
| hidden_neurons : list, optional (default=[64, 32]) |
| The number of neurons per hidden layers. So the network has the |
| structure as [n_features, 64, 32, 32, 64, n_features] |
| |
| hidden_activation : str, optional (default='relu') |
| Activation function to use for hidden layers. |
| All hidden layers are forced to use the same type of activation. |
| See https://pytorch.org/docs/stable/nn.html for details. |
| |
| batch_norm : boolean, optional (default=True) |
| Whether to apply Batch Normalization, |
| See https://pytorch.org/docs/stable/generated/torch.nn.BatchNorm1d.html |
| |
| learning_rate : float, optional (default=1e-3) |
| Learning rate for the optimizer. This learning_rate is given to |
| an Adam optimizer (torch.optim.Adam). |
| See https://pytorch.org/docs/stable/generated/torch.optim.Adam.html |
| |
| epochs : int, optional (default=100) |
| Number of epochs to train the model. |
| |
| batch_size : int, optional (default=32) |
| Number of samples per gradient update. |
| |
| dropout_rate : float in (0., 1), optional (default=0.2) |
| The dropout to be used across all layers. |
| |
| weight_decay : float, optional (default=1e-5) |
| The weight decay for Adam optimizer. |
| See https://pytorch.org/docs/stable/generated/torch.optim.Adam.html |
| |
| preprocessing : bool, optional (default=True) |
| If True, apply standardization on the data. |
| |
| loss_fn : obj, optional (default=torch.nn.MSELoss) |
| Optimizer instance which implements torch.nn._Loss. |
| One of https://pytorch.org/docs/stable/nn.html#loss-functions |
| or a custom loss. Custom losses are currently unstable. |
| |
| verbose : int, optional (default=1) |
| Verbosity mode. |
| |
| - 0 = silent |
| - 1 = progress bar |
| - 2 = one line per epoch. |
| |
| For verbose >= 1, model summary may be printed. |
| !CURRENTLY NOT SUPPORTED.! |
| |
| random_state : random_state: int, RandomState instance or None, optional |
| (default=None) |
| If int, random_state is the seed used by the random |
| number generator; If RandomState instance, random_state is the random |
| number generator; If None, the random number generator is the |
| RandomState instance used by `np.random`. |
| !CURRENTLY NOT SUPPORTED.! |
| |
| contamination : float in (0., 0.5), optional (default=0.1) |
| The amount of contamination of the data set, i.e. |
| the proportion of outliers in the data set. When fitting this is used |
| to define the threshold on the decision function. |
| |
| Attributes |
| ---------- |
| encoding_dim_ : int |
| The number of neurons in the encoding layer. |
| |
| compression_rate_ : float |
| The ratio between the original feature and |
| the number of neurons in the encoding layer. |
| |
| model_ : Keras Object |
| The underlying AutoEncoder in Keras. |
| |
| history_: Keras Object |
| The AutoEncoder training history. |
| |
| decision_scores_ : numpy array of shape (n_samples,) |
| The outlier scores of the training data. |
| The higher, the more abnormal. Outliers tend to have higher |
| scores. This value is available once the detector is |
| fitted. |
| |
| threshold_ : float |
| The threshold is based on ``contamination``. It is the |
| ``n_samples * contamination`` most abnormal samples in |
| ``decision_scores_``. The threshold is calculated for generating |
| binary outlier labels. |
| |
| labels_ : int, either 0 or 1 |
| The binary labels of the training data. 0 stands for inliers |
| and 1 for outliers/anomalies. It is generated by applying |
| ``threshold_`` on ``decision_scores_``. |
| """ |
|
|
| def __init__(self, |
| slidingWindow=100, |
| hidden_neurons=None, |
| hidden_activation='relu', |
| batch_norm=True, |
| learning_rate=1e-3, |
| epochs=100, |
| batch_size=32, |
| dropout_rate=0.2, |
| weight_decay=1e-5, |
| |
| preprocessing=True, |
| loss_fn=None, |
| verbose=False, |
| |
| contamination=0.1, |
| device=None): |
| super(AutoEncoder, self).__init__(contamination=contamination) |
|
|
| |
| self.slidingWindow = slidingWindow |
| self.hidden_neurons = hidden_neurons |
| self.hidden_activation = hidden_activation |
| self.batch_norm = batch_norm |
| self.learning_rate = learning_rate |
| self.epochs = epochs |
| self.batch_size = batch_size |
| self.dropout_rate = dropout_rate |
| self.weight_decay = weight_decay |
| self.preprocessing = preprocessing |
| self.loss_fn = loss_fn |
| self.verbose = verbose |
| self.device = device |
|
|
| |
| if self.loss_fn is None: |
| self.loss_fn = torch.nn.MSELoss() |
|
|
| |
| if self.device is None: |
| self.device = torch.device( |
| "cuda:0" if torch.cuda.is_available() else "cpu") |
|
|
| |
| if self.hidden_neurons is None: |
| self.hidden_neurons = [64, 32] |
|
|
| |
| def fit(self, X, y=None): |
| """Fit detector. y is ignored in unsupervised methods. |
| |
| Parameters |
| ---------- |
| X : numpy array of shape (n_samples, n_features) |
| The input samples. |
| |
| y : Ignored |
| Not used, present for API consistency by convention. |
| |
| Returns |
| ------- |
| self : object |
| Fitted estimator. |
| """ |
| n_samples, n_features = X.shape |
|
|
| if n_features == 1: |
| |
| X = Window(window = self.slidingWindow).convert(X) |
|
|
| |
| X = check_array(X) |
| self._set_n_classes(y) |
|
|
| n_samples, n_features = X.shape[0], X.shape[1] |
| X = MinMaxScaler(feature_range=(0,1)).fit_transform(X.T).T |
|
|
| |
| if self.preprocessing: |
| self.mean, self.std = np.mean(X, axis=0), np.std(X, axis=0) |
| self.std = np.where(self.std == 0, 1e-8, self.std) |
| train_set = TSDataset(X=X, mean=self.mean, std=self.std) |
| else: |
| train_set = TSDataset(X=X) |
|
|
| train_loader = torch.utils.data.DataLoader(train_set, batch_size=self.batch_size, shuffle=True, drop_last=True) |
|
|
| |
| self.model = InnerAutoencoder( |
| n_features=n_features, |
| hidden_neurons=self.hidden_neurons, |
| dropout_rate=self.dropout_rate, |
| batch_norm=self.batch_norm, |
| hidden_activation=self.hidden_activation) |
|
|
| |
| self.model = self.model.to(self.device) |
| if self.verbose: |
| print(self.model) |
|
|
| |
| self._train_autoencoder(train_loader) |
|
|
| self.model.load_state_dict(self.best_model_dict) |
| self.decision_scores_ = self.decision_function(X) |
|
|
| self._process_decision_scores() |
| return self |
|
|
| def _train_autoencoder(self, train_loader): |
| """Internal function to train the autoencoder |
| |
| Parameters |
| ---------- |
| train_loader : torch dataloader |
| Train data. |
| """ |
| optimizer = torch.optim.Adam( |
| self.model.parameters(), lr=self.learning_rate, |
| weight_decay=self.weight_decay) |
|
|
| self.best_loss = float('inf') |
| self.best_model_dict = None |
|
|
| for epoch in range(self.epochs): |
| overall_loss = [] |
| for data, data_idx in train_loader: |
| data = data.to(self.device).float() |
| loss = self.loss_fn(data, self.model(data)) |
|
|
| self.model.zero_grad() |
| loss.backward() |
| optimizer.step() |
| overall_loss.append(loss.item()) |
| if self.verbose: |
| print('epoch {epoch}: training loss {train_loss} '.format( |
| epoch=epoch, train_loss=np.mean(overall_loss))) |
|
|
| |
| if np.mean(overall_loss) <= self.best_loss: |
| |
| self.best_loss = np.mean(overall_loss) |
| self.best_model_dict = self.model.state_dict() |
|
|
| def decision_function(self, X): |
| """Predict raw anomaly score of X using the fitted detector. |
| |
| The anomaly score of an input sample is computed based on different |
| detector algorithms. For consistency, outliers are assigned with |
| larger anomaly scores. |
| |
| Parameters |
| ---------- |
| X : numpy array of shape (n_samples, n_features) |
| The training input samples. Sparse matrices are accepted only |
| if they are supported by the base estimator. |
| |
| Returns |
| ------- |
| anomaly_scores : numpy array of shape (n_samples,) |
| The anomaly score of the input samples. |
| """ |
| check_is_fitted(self, ['model', 'best_model_dict']) |
|
|
| n_samples, n_features = X.shape |
|
|
| if n_features == 1: |
| |
| X = Window(window = self.slidingWindow).convert(X) |
|
|
| X = check_array(X) |
| X = MinMaxScaler(feature_range=(0,1)).fit_transform(X.T).T |
|
|
| |
| if self.preprocessing: |
| dataset = TSDataset(X=X, mean=self.mean, std=self.std) |
| else: |
| dataset = TSDataset(X=X) |
|
|
| dataloader = torch.utils.data.DataLoader(dataset, |
| batch_size=self.batch_size, |
| shuffle=False) |
| |
| self.model.eval() |
|
|
| |
| outlier_scores = np.zeros([X.shape[0], ]) |
| with torch.no_grad(): |
| for data, data_idx in dataloader: |
| data_cuda = data.to(self.device).float() |
| |
| outlier_scores[data_idx] = pairwise_distances_no_broadcast( |
| data, self.model(data_cuda).cpu().numpy()) |
|
|
| if outlier_scores.shape[0] < n_samples: |
| outlier_scores = np.array([outlier_scores[0]]*math.ceil((self.slidingWindow-1)/2) + |
| list(outlier_scores) + [outlier_scores[-1]]*((self.slidingWindow-1)//2)) |
|
|
| return outlier_scores |