""" Moirai model for anomaly detection using zero-shot forecasting. Adapted from test_anomaly.py approach for TSB-AD framework. """ import numpy as np import pandas as pd import torch from torch.utils.data import DataLoader import warnings warnings.filterwarnings('ignore') from gluonts.dataset.pandas import PandasDataset from gluonts.dataset.split import split from uni2ts.model.moirai import MoiraiForecast, MoiraiModule from .base import BaseDetector from ..utils.dataset import MoiraiWindowedDataset class Moirai(BaseDetector): def __init__(self, win_size=96, model_path="Salesforce/moirai-1.0-R-small", num_samples=100, device='cuda:0', use_score=False, threshold=0.5): """ Initialize Moirai anomaly detector. Args: win_size (int): Window size for context and prediction model_path (str): Path to pretrained Moirai model num_samples (int): Number of forecast samples device (str): Device to run model on use_score (bool): Whether to use raw scores or threshold threshold (float): Threshold for binary classification """ self.model_name = 'Moirai' self.win_size = win_size self.model_path = model_path self.num_samples = num_samples self.device = torch.device(device if torch.cuda.is_available() else 'cpu') self.use_score = use_score self.threshold = threshold self.decision_scores_ = None def fit(self, data): """ Fit the Moirai model and compute anomaly scores. Args: data: Input time series data (1D or 2D numpy array) """ try: # Ensure data is in the right format if data.ndim == 1: data = data.reshape(-1, 1) print(f"Moirai: Processing data with shape {data.shape}") # Create windowed dataset following test_anomaly.py pattern dataset = MoiraiWindowedDataset( data=data, win_size=self.win_size, step=self.win_size, # Non-overlapping windows normalize=False # Let Moirai handle normalization ) print(f"Moirai: Created {len(dataset)} windows") if len(dataset) == 0: print("Warning: No valid windows created. Data might be too short.") self.decision_scores_ = np.zeros(len(data)) return # Process each window using DataLoader (similar to test_anomaly.py) data_loader = DataLoader( dataset=dataset, batch_size=1, shuffle=False, drop_last=False ) all_predictions = [] all_targets = [] for i, (context, target) in enumerate(data_loader): # Process single window following test_anomaly.py pattern scores = self._process_window(context.squeeze(0).numpy(), target.squeeze(0).numpy(), i) all_predictions.append(scores) all_targets.append(target.squeeze(0).numpy()) # Combine all predictions if all_predictions: # Concatenate predictions along time dimension combined_predictions = np.concatenate(all_predictions, axis=0) combined_targets = np.concatenate(all_targets, axis=0) # Compute anomaly scores as prediction error if combined_targets.ndim == 1: anomaly_scores = (combined_targets - combined_predictions) ** 2 else: # For multivariate, use mean squared error across features anomaly_scores = np.mean((combined_targets - combined_predictions) ** 2, axis=1) # Pad scores to match original data length self.decision_scores_ = self._pad_scores_to_original_length( anomaly_scores, len(data), dataset.get_window_info() ) else: print("Warning: No predictions generated") self.decision_scores_ = np.zeros(len(data)) except Exception as e: print(f"Error in Moirai.fit(): {str(e)}") import traceback traceback.print_exc() self.decision_scores_ = np.zeros(len(data)) def _process_window(self, context, target, window_index): """ Process a single window following the test_anomaly.py approach. Args: context: Context data for the window (win_size, n_features) target: Target data for the window (win_size, n_features) window_index: Index of the current window Returns: predictions: Forecasted values for the target period """ try: # Ensure 2D shape if context.ndim == 1: context = context.reshape(-1, 1) if target.ndim == 1: target = target.reshape(-1, 1) # Combine context and target for full window (following test_anomaly.py) full_window = np.vstack([context, target]) # Create DataFrame feature_df = pd.DataFrame(full_window) if feature_df.shape[1] == 1: feature_df.columns = ['target'] target_col = 'target' feature_cols = [] else: feature_df.columns = [f'feature_{i}' for i in range(feature_df.shape[1])] target_col = 'feature_0' # Use first feature as target feature_cols = [f'feature_{i}' for i in range(1, feature_df.shape[1])] # Add timestamp and unique_id timestamp_range = pd.date_range( start=pd.Timestamp('2023-01-01 10:00:00'), periods=len(feature_df), freq='T' ) feature_df.index = timestamp_range feature_df['unique_id'] = window_index # Create GluonTS dataset moirai_df = feature_df.reset_index().rename(columns={'index': 'timestamp'}) if feature_cols: ds = PandasDataset.from_long_dataframe( moirai_df, target=target_col, item_id="unique_id", timestamp="timestamp", feat_dynamic_real=feature_cols, ) else: ds = PandasDataset.from_long_dataframe( moirai_df, target=target_col, item_id="unique_id", timestamp="timestamp", ) # Split dataset (following test_anomaly.py) test_size = self.win_size _, test_template = split(ds, offset=-test_size) test_data = test_template.generate_instances( prediction_length=self.win_size, windows=1, distance=self.win_size, max_history=self.win_size, ) # Create Moirai model model = MoiraiForecast( module=MoiraiModule.from_pretrained(self.model_path), prediction_length=self.win_size, context_length=self.win_size, patch_size="auto", num_samples=self.num_samples, target_dim=1, feat_dynamic_real_dim=ds.num_feat_dynamic_real, past_feat_dynamic_real_dim=ds.num_past_feat_dynamic_real, ) # Create predictor and generate forecasts predictor = model.create_predictor(batch_size=1, device=self.device) forecasts = predictor.predict(test_data.input) forecasts = list(forecasts) # Get median prediction (following test_anomaly.py) predictions = np.median(forecasts[0].samples, axis=0) return predictions except Exception as e: print(f"Error processing window {window_index}: {str(e)}") # Return zeros as fallback return np.zeros(self.win_size) def _pad_scores_to_original_length(self, scores, original_length, window_info): """ Pad anomaly scores to match the original data length. Args: scores: Computed anomaly scores from windows original_length: Length of the original input data window_info: Information about windowing strategy Returns: padded_scores: Scores padded to original length """ padded_scores = np.zeros(original_length) win_size = window_info['win_size'] step = window_info['step'] # Fill in scores from each window for i, score_window in enumerate(scores.reshape(-1, win_size)): start_idx = i * step + win_size # Offset by win_size (context part) end_idx = start_idx + win_size if end_idx <= original_length: padded_scores[start_idx:end_idx] = score_window elif start_idx < original_length: # Partial window at the end remaining = original_length - start_idx padded_scores[start_idx:] = score_window[:remaining] # Fill beginning (context part) with first window's average if len(scores) > 0: first_score = np.mean(scores[:win_size]) if len(scores) >= win_size else np.mean(scores) padded_scores[:win_size] = first_score return padded_scores def decision_function(self, X): """ Not used for zero-shot approach, present for API consistency. """ return self.decision_scores_