| """ |
| Moirai model for anomaly detection using zero-shot forecasting. |
| Adapted from test_anomaly.py approach for TSB-AD framework. |
| """ |
|
|
| import numpy as np |
| import pandas as pd |
| import torch |
| from torch.utils.data import DataLoader |
| from tqdm import tqdm |
| import warnings |
| warnings.filterwarnings('ignore') |
|
|
| from gluonts.dataset.pandas import PandasDataset |
| from gluonts.dataset.split import split |
| from uni2ts.model.moirai import MoiraiForecast, MoiraiModule |
|
|
| from .base import BaseDetector |
| from ..utils.dataset import MoiraiWindowedDataset |
|
|
|
|
| class Moirai(BaseDetector): |
| def __init__(self, |
| win_size=96, |
| model_path="Salesforce/moirai-1.0-R-small", |
| num_samples=100, |
| device='cuda:0', |
| use_score=False, |
| threshold=0.5): |
| """ |
| Initialize Moirai anomaly detector. |
| |
| Args: |
| win_size (int): Window size for context and prediction |
| model_path (str): Path to pretrained Moirai model |
| num_samples (int): Number of forecast samples |
| device (str): Device to run model on |
| use_score (bool): Whether to use raw scores or threshold |
| threshold (float): Threshold for binary classification |
| """ |
| self.model_name = 'Moirai' |
| self.win_size = win_size |
| self.model_path = model_path |
| self.num_samples = num_samples |
| self.device = torch.device(device if torch.cuda.is_available() else 'cpu') |
| self.use_score = use_score |
| self.threshold = threshold |
| self.decision_scores_ = None |
|
|
| def fit(self, data): |
| """ |
| Fit the Moirai model and compute anomaly scores. |
| |
| Args: |
| data: Input time series data (1D or 2D numpy array) |
| """ |
| try: |
| |
| if data.ndim == 1: |
| data = data.reshape(-1, 1) |
| |
| print(f"Moirai: Processing data with shape {data.shape}") |
| |
| |
| dataset = MoiraiWindowedDataset( |
| data=data, |
| win_size=self.win_size, |
| step=self.win_size, |
| normalize=False |
| ) |
| |
| print(f"Moirai: Created {len(dataset)} windows") |
| |
| if len(dataset) == 0: |
| print("Warning: No valid windows created. Data might be too short.") |
| self.decision_scores_ = np.zeros(len(data)) |
| return |
| |
| |
| data_loader = DataLoader( |
| dataset=dataset, |
| batch_size=1, |
| shuffle=False, |
| drop_last=False |
| ) |
| |
| all_predictions = [] |
| all_targets = [] |
| |
| print("Processing windows with Moirai model...") |
| |
| for i, (context, target) in enumerate(tqdm(data_loader, desc="Processing windows", unit="window")): |
| |
| scores = self._process_window(context.squeeze(0).numpy(), target.squeeze(0).numpy(), i) |
| all_predictions.append(scores) |
| all_targets.append(target.squeeze(0).numpy()) |
| |
| |
| if all_predictions: |
| print("Computing anomaly scores...") |
| |
| combined_predictions = np.concatenate(all_predictions, axis=0) |
| combined_targets = np.concatenate(all_targets, axis=0) |
| |
| |
| if combined_targets.ndim == 1 or combined_predictions.ndim == 1: |
| |
| if combined_targets.ndim != combined_predictions.ndim: |
| |
| if combined_predictions.ndim == 1 and combined_targets.ndim == 2: |
| combined_predictions = combined_predictions.reshape(-1, 1) |
| elif combined_targets.ndim == 1 and combined_predictions.ndim == 2: |
| combined_targets = combined_targets.reshape(-1, 1) |
| |
| if combined_targets.shape != combined_predictions.shape: |
| print(f"Shape mismatch: targets {combined_targets.shape}, predictions {combined_predictions.shape}") |
| |
| if combined_targets.ndim == 2: |
| combined_targets = combined_targets[:, 0] |
| if combined_predictions.ndim == 2: |
| combined_predictions = combined_predictions[:, 0] |
| |
| anomaly_scores = (combined_targets - combined_predictions) ** 2 |
| if anomaly_scores.ndim == 2: |
| anomaly_scores = np.mean(anomaly_scores, axis=1) |
| else: |
| |
| if combined_targets.shape != combined_predictions.shape: |
| print(f"Shape mismatch: targets {combined_targets.shape}, predictions {combined_predictions.shape}") |
| |
| min_features = min(combined_targets.shape[1], combined_predictions.shape[1]) |
| combined_targets = combined_targets[:, :min_features] |
| combined_predictions = combined_predictions[:, :min_features] |
| |
| anomaly_scores = np.mean((combined_targets - combined_predictions) ** 2, axis=1) |
| |
| |
| print("Padding scores to original data length...") |
| self.decision_scores_ = self._pad_scores_to_original_length( |
| anomaly_scores, len(data), dataset.get_window_info() |
| ) |
| else: |
| print("Warning: No predictions generated") |
| self.decision_scores_ = np.zeros(len(data)) |
| |
| except Exception as e: |
| print(f"Error in Moirai.fit(): {str(e)}") |
| import traceback |
| traceback.print_exc() |
| self.decision_scores_ = np.zeros(len(data)) |
|
|
| def _process_window(self, context, target, window_index): |
| """ |
| Process a single window following the test_anomaly.py approach. |
| |
| Args: |
| context: Context data for the window (win_size, n_features) |
| target: Target data for the window (win_size, n_features) |
| window_index: Index of the current window |
| |
| Returns: |
| predictions: Forecasted values for the target period |
| """ |
| try: |
| |
| tqdm.write(f"Processing window {window_index + 1}") |
| |
| |
| if context.ndim == 1: |
| context = context.reshape(-1, 1) |
| if target.ndim == 1: |
| target = target.reshape(-1, 1) |
| |
| |
| full_window = np.vstack([context, target]) |
| |
| |
| feature_df = pd.DataFrame(full_window) |
| |
| |
| if feature_df.shape[1] == 1: |
| feature_df.columns = ['target'] |
| target_col = 'target' |
| feature_cols = [] |
| else: |
| |
| feature_df.columns = [f'target_{i}' for i in range(feature_df.shape[1])] |
| target_col = feature_df.columns.tolist() |
| feature_cols = [] |
| |
| |
| timestamp_range = pd.date_range( |
| start=pd.Timestamp('2023-01-01 10:00:00'), |
| periods=len(feature_df), |
| freq='T' |
| ) |
| feature_df.index = timestamp_range |
| feature_df['unique_id'] = window_index |
| |
| |
| moirai_df = feature_df.reset_index().rename(columns={'index': 'timestamp'}) |
| |
| if isinstance(target_col, list): |
| |
| ds = PandasDataset.from_long_dataframe( |
| moirai_df, |
| target=target_col, |
| item_id="unique_id", |
| timestamp="timestamp", |
| ) |
| else: |
| |
| if feature_cols: |
| ds = PandasDataset.from_long_dataframe( |
| moirai_df, |
| target=target_col, |
| item_id="unique_id", |
| timestamp="timestamp", |
| feat_dynamic_real=feature_cols, |
| ) |
| else: |
| ds = PandasDataset.from_long_dataframe( |
| moirai_df, |
| target=target_col, |
| item_id="unique_id", |
| timestamp="timestamp", |
| ) |
| |
| |
| test_size = self.win_size |
| _, test_template = split(ds, offset=-test_size) |
| |
| test_data = test_template.generate_instances( |
| prediction_length=self.win_size, |
| windows=1, |
| distance=self.win_size, |
| max_history=self.win_size, |
| ) |
| |
| |
| |
| target_dim = target.shape[1] if target.ndim > 1 else 1 |
| |
| model = MoiraiForecast( |
| module=MoiraiModule.from_pretrained(self.model_path), |
| prediction_length=self.win_size, |
| context_length=self.win_size, |
| patch_size="auto", |
| num_samples=self.num_samples, |
| target_dim=target_dim, |
| feat_dynamic_real_dim=ds.num_feat_dynamic_real, |
| past_feat_dynamic_real_dim=ds.num_past_feat_dynamic_real, |
| ) |
| |
| |
| predictor = model.create_predictor(batch_size=1, device=self.device) |
| forecasts = predictor.predict(test_data.input) |
| forecasts = list(forecasts) |
| |
| |
| predictions = np.median(forecasts[0].samples, axis=0) |
| |
| return predictions |
| |
| except Exception as e: |
| print(f"Error processing window {window_index}: {str(e)}") |
| |
| target_shape = (self.win_size, target.shape[1]) if target.ndim > 1 else (self.win_size,) |
| return np.zeros(target_shape) |
|
|
| def _pad_scores_to_original_length(self, scores, original_length, window_info): |
| """ |
| Pad anomaly scores to match the original data length. |
| |
| Args: |
| scores: Computed anomaly scores from windows |
| original_length: Length of the original input data |
| window_info: Information about windowing strategy |
| |
| Returns: |
| padded_scores: Scores padded to original length |
| """ |
| padded_scores = np.zeros(original_length) |
| |
| win_size = window_info['win_size'] |
| step = window_info['step'] |
| |
| |
| score_windows = scores.reshape(-1, win_size) |
| for i, score_window in enumerate(tqdm(score_windows, desc="Padding scores", unit="window")): |
| start_idx = i * step + win_size |
| end_idx = start_idx + win_size |
| |
| if end_idx <= original_length: |
| padded_scores[start_idx:end_idx] = score_window |
| elif start_idx < original_length: |
| |
| remaining = original_length - start_idx |
| padded_scores[start_idx:] = score_window[:remaining] |
| |
| |
| if len(scores) > 0: |
| first_score = np.mean(scores[:win_size]) if len(scores) >= win_size else np.mean(scores) |
| padded_scores[:win_size] = first_score |
| |
| return padded_scores |
|
|
| def decision_function(self, X): |
| """ |
| Not used for zero-shot approach, present for API consistency. |
| """ |
| return self.decision_scores_ |
|
|