| """Polynomial Anomoly Detector with GARCH method and raw error method |
| """ |
| |
|
|
| import numpy as np |
| import math |
| from scipy.special import erf |
| from sklearn.preprocessing import MinMaxScaler |
| from sklearn.utils.validation import check_is_fitted |
| from .base import BaseDetector |
| from .distance import Fourier |
| from ..utils.utility import zscore |
|
|
|
|
| class POLY(BaseDetector): |
| """An elementary method to detect pointwise anomolies using polynomial approxiamtion. |
| A polynomial of certain degree and window size is fitted to the given timeseries dataset. |
| A GARCH method is ran on the difference betweeen the approximation and the true value of |
| the dataset to estimate the volatitilies of each point. A detector score is derived on each |
| point based on the estimated volatitilies and residual to measure the normality of each point. |
| An alternative method that only considers absoulte difference is also used. |
| Parameters |
| ---------- |
| Power : int, optional (default=1) |
| The power of polynomial fitted to the data |
| neighborhood : int, optional (default=max (100, 10*window size)) |
| The number of samples to fit for one subsequence. Since the timeseries may vary, |
| to caculate the score for the subsequnece (a, a+k) of samples k, we only fit the |
| polynomal on its neighborhood. |
| window: int, optional (default = 20) |
| The length of the window to detect the given anomolies |
| contamination : float in (0., 0.55), optional (default=0.1) |
| The amount of contamination of the data set, i.e. the proportion |
| of outliers in the data set. Used when fitting to define the threshold |
| on the decision function. |
| |
| Attributes |
| ---------- |
| estimators_ : dictionary of coefficients at each polynomial |
| The collection of fitted sub-estimators. |
| decision_scores_ : numpy array of shape (n_samples,) |
| The outlier scores of the training data. |
| The higher, the more abnormal. Outliers tend to have higher |
| scores. This value is available once the detector is |
| fitted. |
| threshold_ : float |
| The threshold is based on ``contamination``. It is the |
| ``n_samples * contamination`` most abnormal samples in |
| ``decision_scores_``. The threshold is calculated for generating |
| binary outlier labels. |
| labels_ : int, either 0 or 1 |
| The binary labels of the training data. 0 stands for inliers |
| and 1 for outliers/anomalies. It is generated by applying |
| ``threshold_`` on ``decision_scores_``. |
| """ |
| def __init__(self, power = 1, window = 200, neighborhood = None, contamination = 0.1, normalize=True): |
| self.power = power |
| self.window = window |
| self.avg_window = None |
| if neighborhood == None: |
| self.neighborhood = max(10*window, 100) |
| else: |
| self.neighborhood = neighborhood |
| |
| self.contamination = contamination |
| self.normalize = normalize |
| self.model_name = 'POLY' |
|
|
| def fit(self, X, y=None): |
| """Fit detector. y is ignored in unsupervised methods. |
| Parameters |
| ---------- |
| X : numpy array of shape (n_samples, ) |
| The input samples. |
| y : Ignored |
| Not used, present for API consistency by convention. |
| Returns |
| ------- |
| self : object |
| Fitted estimator. |
| """ |
| if self.normalize: X = (X - X.min()) / (X.max() - X.min()) |
|
|
| X = X.squeeze() |
| |
| self._set_n_classes(y) |
|
|
| self.X_train_ = X |
| self.n_train_ = len(X) |
| self.decision_scores_ = np.zeros([self.n_train_, 1]) |
| |
| self.n_initial_ = min(500, int(0.1 * self.n_train_)) |
| self.X_initial_ = X[:self.n_initial_] |
|
|
| window = self.window |
| |
| if self.avg_window != None: |
| self.neighborhood = max(self.neighborhood, 6*self.avg_window) |
| |
| if self.neighborhood > len(X): |
| self.neighborhood = len(X) |
| |
| neighborhood = self.neighborhood |
| |
| N = math.floor(self.n_train_ / window) |
| M = math.ceil(self.n_initial_ / window) |
| data = self.X_train_ |
| power=self.power |
| fit = {} |
|
|
| for i in range(M, N+1): |
|
|
| index = int(i * window) |
| neighbor = int(neighborhood/2) |
|
|
| if index + neighbor < self.n_train_ and index - neighbor > 0: |
|
|
| x = np.concatenate((np.arange(index - neighbor, index), np.arange(index + window, index + neighbor))) |
| y = np.concatenate((data[index - neighbor: index], data[index + window: index + neighbor] )) |
| mymodel = np.poly1d(np.polyfit(x, y, power)) |
| fit['model' + str(index)] = mymodel |
| elif index + neighbor >= self.n_train_ and index + window < self.n_train_: |
| x = np.concatenate((np.arange(self.n_train_ - neighborhood, index), np.arange(index + window, self.n_train_))) |
| y = np.concatenate((data[self.n_train_ - neighborhood: index], data[index + window: self.n_train_] )) |
| mymodel = np.poly1d(np.polyfit(x, y, power)) |
| fit['model' + str(index)] = mymodel |
| elif index + window >= self.n_train_: |
| x = np.arange(self.n_train_ - neighborhood, index) |
| y = data[self.n_train_ - neighborhood: index] |
| mymodel = np.poly1d(np.polyfit(x, y, power)) |
| fit['model' + str(index)] = mymodel |
| elif index + window < neighborhood: |
| x = np.concatenate((np.arange(0, index), np.arange(index + window, neighborhood))) |
| y = np.concatenate((data[0: index], data[index + window: neighborhood] )) |
| try: |
| mymodel = np.poly1d(np.polyfit(x, y, power)) |
| except: |
| x = np.concatenate((np.arange(0, index), np.arange(len(data[index + window: neighborhood])))) |
| mymodel = np.poly1d(np.polyfit(x, y, power)) |
|
|
| fit['model' + str(index)] = mymodel |
| else: |
| x = np.concatenate((np.arange(index - neighbor, index), np.arange(index + window, index + neighbor))) |
| y = np.concatenate((data[index - neighbor: index], data[index + window: index + neighbor] )) |
| print(data.shape) |
| print(x.shape) |
| print(y.shape) |
| mymodel = np.poly1d(np.polyfit(x, y, power)) |
| fit['model' + str(index)] = mymodel |
| |
| Y = np.zeros(self.n_train_ ) |
| for i in range(M, N): |
| myline = np.linspace(window *i, window * (i+1), window) |
| Y[window *i: window * (i+1)] = fit['model' + str(i * window)](myline) |
| if self.n_train_ % N != 0: |
| x = np.arange(N*window, self.n_train_ ) |
| Y[N*window:] = fit['model'+str(N * window)](x) |
| self.estimation = Y |
| self.estimator = fit |
|
|
| measure = Fourier() |
| measure.detector = self |
| measure.set_param() |
| self.decision_scores_ = self.decision_function(X, measure=measure) |
| return self |
| |
| |
| |
| def decision_function(self, X= False, measure = None): |
| """Derive the decision score based on the given distance measure |
| Parameters |
| ---------- |
| X : numpy array of shape (n_samples, ) |
| The input samples. |
| measure : object |
| object for given distance measure with methods to derive the score |
| Returns |
| ------- |
| self : object |
| Fitted estimator. |
| """ |
| if type(X) != bool: |
| self.X_train_ = X |
| estimation = self.estimation |
| window = self.window |
| n_train_ = self.n_train_ |
| score = np.zeros(n_train_) |
| N = math.floor((self.n_train_) / window) |
| M = math.ceil(self.n_initial_ / window) |
|
|
| for i in range(M, N+1): |
| index = i * window |
| if i == N: |
| end = self.n_train_ |
| else: |
| end = index + window |
| score[index: index+window] = measure.measure(self.X_train_[index: end], estimation[index: end], index) |
| self._mu = np.mean(self.decision_scores_) |
| self._sigma = np.std(self.decision_scores_) |
| return score |
|
|
| def predict_proba(self, X, method='linear', measure = None): |
| """Predict the probability of a sample being outlier. Two approaches |
| are possible: |
| 1. simply use Min-max conversion to linearly transform the outlier |
| scores into the range of [0,1]. The model must be |
| fitted first. |
| 2. use unifying scores, see :cite:`kriegel2011interpreting`. |
| Parameters |
| ---------- |
| X : numpy array of shape (n_samples, n_features) |
| The input samples. |
| method : str, optional (default='linear') |
| probability conversion method. It must be one of |
| 'linear' or 'unify'. |
| Returns |
| ------- |
| outlier_probability : numpy array of shape (n_samples,) |
| For each observation, tells whether or not |
| it should be considered as an outlier according to the |
| fitted model. Return the outlier probability, ranging |
| in [0,1]. |
| """ |
|
|
| check_is_fitted(self, ['decision_scores_', 'threshold_', 'labels_']) |
| train_scores = self.decision_scores_ |
|
|
| self.fit(X) |
| self.decision_function(measure = measure) |
| test_scores = self.decision_scores_ |
|
|
| probs = np.zeros([X.shape[0], int(self._classes)]) |
| if method == 'linear': |
| scaler = MinMaxScaler().fit(train_scores.reshape(-1, 1)) |
| probs[:, 1] = scaler.transform( |
| test_scores.reshape(-1, 1)).ravel().clip(0, 1) |
| probs[:, 0] = 1 - probs[:, 1] |
| return probs |
|
|
| elif method == 'unify': |
| |
| pre_erf_score = (test_scores - self._mu) / ( |
| self._sigma * np.sqrt(2)) |
| erf_score = erf(pre_erf_score) |
| probs[:, 1] = erf_score.clip(0, 1).ravel() |
| probs[:, 0] = 1 - probs[:, 1] |
| return probs |
| else: |
| raise ValueError(method, |
| 'is not a valid probability conversion method') |