| import numpy as np |
| import logging |
| import math |
| from stumpy import stumpi |
| from TSB_AD.models.base import BaseDetector |
| from TSB_AD.utils.utility import zscore |
|
|
| class Left_STAMPi(BaseDetector): |
|
|
| def __init__(self, n_init_train=100, window_size=50, normalize=True): |
| super().__init__() |
| self.n_init_train = n_init_train |
| self.window_size = window_size |
| self.normalize = normalize |
|
|
| def fit(self, X, y=None): |
| """Fit detector. y is ignored in unsupervised methods. |
| |
| Parameters |
| ---------- |
| X : numpy array of shape (n_samples, n_features) |
| The input samples. |
| |
| y : Ignored |
| Not used, present for API consistency by convention. |
| |
| Returns |
| ------- |
| self : object |
| Fitted estimator. |
| """ |
| n_samples, n_features = X.shape |
| if self.normalize: |
| X = zscore(X, axis=0, ddof=0) |
|
|
| warmup = self.n_init_train |
| ws = self.window_size |
|
|
| if ws > warmup: |
| logging.warning(f"WARN: window_size is larger than n_init_train. Adjusting to n_init_train={warmup}.") |
| ws = warmup |
| if ws < 3: |
| logging.warning("WARN: window_size must be at least 3. Adjusting to 3.") |
| ws = 3 |
|
|
| self.stream = stumpi(X[:warmup, 0], m=ws, egress=False) |
| for point in X[warmup:, 0]: |
| self.stream.update(point) |
| |
| self.decision_scores_ = self.stream.left_P_ |
| self.decision_scores_[:warmup] = 0 |
|
|
| return self |
|
|
| def decision_function(self, X): |
| """Predict raw anomaly score of X using the fitted detector. |
| |
| Parameters |
| ---------- |
| X : numpy array of shape (n_samples, n_features) |
| The training input samples. |
| |
| Returns |
| ------- |
| anomaly_scores : numpy array of shape (n_samples,) |
| The anomaly score of the input samples. |
| """ |
| n_samples = X.shape[0] |
| padded_scores = self.pad_anomaly_scores(self.decision_scores_, n_samples, self.window_size) |
| return padded_scores |
|
|
| @staticmethod |
| def pad_anomaly_scores(scores, n_samples, window_size): |
| """ |
| Pads the anomaly scores to match the length of the input time series. |
| Padding is symmetric, using the first and last values. |
| """ |
| left_padding = [scores[0]] * math.ceil((window_size - 1) / 2) |
| right_padding = [scores[-1]] * ((window_size - 1) // 2) |
| padded_scores = np.array(left_padding + list(scores) + right_padding) |
|
|
| return padded_scores[:n_samples] |