| | """ |
| | Time_RCD Processor for Time Series Preprocessing |
| | |
| | This processor handles: |
| | - Data windowing/sliding windows |
| | - Normalization (per-window z-score) |
| | - Padding to window size multiples |
| | - Creating attention masks |
| | |
| | Usage: |
| | >>> from huggingface_time_rcd import TimeRCDProcessor |
| | >>> processor = TimeRCDProcessor(win_size=5000, normalize=True) |
| | >>> inputs = processor(time_series_data) |
| | >>> # inputs contains: {'time_series': tensor, 'attention_mask': tensor} |
| | """ |
| |
|
| | import numpy as np |
| | import torch |
| | from typing import Optional, Dict, Any |
| | from transformers import ProcessorMixin |
| |
|
| |
|
| | class TimeRCDProcessor(ProcessorMixin): |
| | """ |
| | Processor for preparing time series data for Time_RCD model. |
| | |
| | Mimics the AnomalyClipDataset preprocessing pipeline: |
| | - Creates sliding windows |
| | - Normalizes per-window (z-score normalization) |
| | - Pads to window_size multiples |
| | - Creates attention masks for padding |
| | |
| | Parameters |
| | ---------- |
| | win_size : int, default=5000 |
| | Window size for creating sliding windows |
| | stride : int, default=None |
| | Stride for sliding windows. If None, uses win_size (non-overlapping) |
| | normalize : bool, default=True |
| | Whether to normalize each window (zero mean, unit variance) |
| | pad_to_multiple : bool, default=True |
| | Whether to pad data to make length a multiple of window_size |
| | """ |
| | |
| | def __init__( |
| | self, |
| | win_size: int = 5000, |
| | stride: Optional[int] = None, |
| | normalize: bool = True, |
| | pad_to_multiple: bool = True, |
| | **kwargs |
| | ): |
| | super().__init__(**kwargs) |
| | self.win_size = win_size |
| | self.stride = stride if stride is not None else win_size |
| | self.normalize = normalize |
| | self.pad_to_multiple = pad_to_multiple |
| | |
| | |
| | self.model_input_names = ["time_series", "attention_mask"] |
| | |
| | @property |
| | def attributes(self): |
| | """Return list of attribute names for serialization.""" |
| | return ["win_size", "stride", "normalize", "pad_to_multiple"] |
| | |
| | def __call__( |
| | self, |
| | time_series: np.ndarray, |
| | return_tensors: Optional[str] = "pt", |
| | ) -> Dict[str, Any]: |
| | """ |
| | Preprocess time series data. |
| | |
| | Parameters |
| | ---------- |
| | time_series : np.ndarray |
| | Input time series data of shape (n_samples, n_features) or (n_samples,) |
| | return_tensors : str, optional |
| | Type of tensors to return: "pt" (PyTorch) or None |
| | |
| | Returns |
| | ------- |
| | dict |
| | Dictionary containing: |
| | - 'time_series': Processed time series windows |
| | - 'attention_mask': Attention masks indicating real vs padded data |
| | """ |
| | |
| | time_series = np.asarray(time_series) |
| | |
| | |
| | if time_series.ndim == 1: |
| | time_series = time_series.reshape(-1, 1) |
| | |
| | original_length = time_series.shape[0] |
| | |
| | |
| | if self.normalize: |
| | time_series = self._normalize_data(time_series) |
| | |
| | |
| | if self.pad_to_multiple: |
| | time_series, padding_mask = self._pad_data_to_multiple(time_series) |
| | else: |
| | padding_mask = np.ones(time_series.shape[0], dtype=bool) |
| | |
| | |
| | windows, masks = self._create_windows(time_series, padding_mask) |
| | |
| | |
| | if return_tensors == "pt": |
| | windows = torch.tensor(windows, dtype=torch.float32) |
| | masks = torch.tensor(masks, dtype=torch.bool) |
| | |
| | return { |
| | "time_series": windows, |
| | "attention_mask": masks |
| | } |
| | |
| | def _normalize_data(self, data: np.ndarray, epsilon: float = 1e-8) -> np.ndarray: |
| | """Normalize data using mean and standard deviation (per-feature).""" |
| | mean = np.mean(data, axis=0) |
| | std = np.std(data, axis=0) |
| | std = np.where(std == 0, epsilon, std) |
| | return (data - mean) / std |
| | |
| | def _pad_data_to_multiple(self, data: np.ndarray) -> tuple: |
| | """ |
| | Pad data to make its length a multiple of window_size. |
| | Returns padded data and padding mask. |
| | """ |
| | data_length = data.shape[0] |
| | remainder = data_length % self.win_size |
| | |
| | if remainder == 0: |
| | |
| | padding_mask = np.ones(data_length, dtype=bool) |
| | return data, padding_mask |
| | |
| | |
| | padding_length = self.win_size - remainder |
| | |
| | |
| | last_row = data[-1:, :] |
| | padding_data = np.repeat(last_row, padding_length, axis=0) |
| | padded_data = np.vstack([data, padding_data]) |
| | |
| | |
| | padding_mask = np.ones(data_length + padding_length, dtype=bool) |
| | padding_mask[data_length:] = False |
| | |
| | return padded_data, padding_mask |
| | |
| | def _create_windows(self, data: np.ndarray, padding_mask: np.ndarray) -> tuple: |
| | """ |
| | Create sliding windows from time series data. |
| | Returns windows and corresponding masks. |
| | """ |
| | windows = [] |
| | masks = [] |
| | |
| | for i in range(0, len(data) - self.win_size + 1, self.stride): |
| | window = data[i:i + self.win_size, :] |
| | mask = padding_mask[i:i + self.win_size] |
| | windows.append(window) |
| | masks.append(mask) |
| | |
| | return np.array(windows), np.array(masks) |
| | |
| | def save_pretrained(self, save_directory: str): |
| | """Save processor configuration to directory.""" |
| | import json |
| | import os |
| | |
| | os.makedirs(save_directory, exist_ok=True) |
| | |
| | config = { |
| | "processor_type": "TimeRCDProcessor", |
| | "auto_map": { |
| | "AutoProcessor": "processing_time_rcd.TimeRCDProcessor" |
| | }, |
| | "win_size": self.win_size, |
| | "stride": self.stride, |
| | "normalize": self.normalize, |
| | "pad_to_multiple": self.pad_to_multiple, |
| | } |
| | |
| | with open(os.path.join(save_directory, "preprocessor_config.json"), "w") as f: |
| | json.dump(config, f, indent=2) |
| | |
| | @classmethod |
| | def from_pretrained(cls, pretrained_model_name_or_path: str, **kwargs): |
| | """Load processor from pretrained configuration.""" |
| | import json |
| | import os |
| | from huggingface_hub import hf_hub_download |
| | |
| | |
| | |
| | hf_hub_kwargs = { |
| | k: v for k, v in kwargs.items() |
| | if k in [ |
| | 'cache_dir', 'force_download', 'proxies', 'resume_download', |
| | 'token', 'revision', 'local_files_only', 'library_name', |
| | 'library_version', 'user_agent', 'subfolder' |
| | ] |
| | } |
| | |
| | |
| | config_file = os.path.join(pretrained_model_name_or_path, "preprocessor_config.json") |
| | |
| | if os.path.exists(config_file): |
| | |
| | with open(config_file, "r") as f: |
| | config = json.load(f) |
| | else: |
| | |
| | try: |
| | config_file = hf_hub_download( |
| | repo_id=pretrained_model_name_or_path, |
| | filename="preprocessor_config.json", |
| | **hf_hub_kwargs |
| | ) |
| | with open(config_file, "r") as f: |
| | config = json.load(f) |
| | except Exception as e: |
| | raise FileNotFoundError( |
| | f"Could not load preprocessor config from {pretrained_model_name_or_path}. " |
| | f"Error: {e}" |
| | ) |
| | |
| | |
| | config.pop("processor_type", None) |
| | config.pop("auto_map", None) |
| | |
| | return cls(**config) |
| |
|
| |
|