| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | import numpy as np |
| | from typing import List, Union, Tuple, Optional |
| | import torch |
| |
|
| | from transformers.feature_extraction_utils import BatchFeature |
| | from transformers.processing_utils import ProcessorMixin |
| | from transformers.tokenization_utils_base import PaddingStrategy |
| |
|
| | def sp_encoding(timeseries: np.ndarray, eots_token: bool = True) -> Tuple[np.ndarray, str, dict]: |
| | """ |
| | Encodes a time series with scalar normalization. |
| | |
| | Args: |
| | timeseries (np.ndarray): The raw time series data (1D or 2D). |
| | |
| | Returns: |
| | result_timeseries (np.ndarray): The encoded time series, shape [seq_len, 1]. |
| | prompt (str): The placeholder string with offset and scaling info. |
| | metadata (dict): Metadata containing the offset and scaling factor. |
| | """ |
| | timeseries = np.array(timeseries) |
| | mean = np.mean(timeseries) |
| | scaled_timeseries = timeseries - mean |
| | scale_factor = 1.0 |
| | if np.any(np.abs(scaled_timeseries) >= 3.0): |
| | scale_factor = np.max(np.abs(scaled_timeseries)) / 3.0 |
| | scaled_timeseries /= scale_factor |
| |
|
| | prompt = f"[offset={-mean:.4f}|scaling={scale_factor:.4f}|length={len(timeseries)}|max={max(timeseries):.4f}|min={min(timeseries):.4f}|left={timeseries[0]:.4f}|right={timeseries[-1]:.4f}]<ts>" |
| | if eots_token: |
| | prompt += '<ts/>' |
| |
|
| | result_timeseries = np.stack([scaled_timeseries, np.ones_like(scaled_timeseries)], axis=-1).reshape(-1, 1) |
| |
|
| | return result_timeseries, prompt, {"offset": float(-mean), "scale_factor": float(scale_factor)} |
| |
|
| | class Qwen3TSProcessor(ProcessorMixin): |
| | """ |
| | A processor for ChatTS that integrates text prompt processing and time series encoding. |
| | """ |
| |
|
| | attributes = ["tokenizer"] |
| | feature_extractor_class = None |
| | tokenizer_class = "AutoTokenizer" |
| |
|
| | def __init__(self, tokenizer=None, chat_template=None, **kwargs): |
| | """ |
| | Args: |
| | tokenizer: An optional tokenizer to process text prompts. |
| | """ |
| | if chat_template is None and tokenizer is not None and tokenizer.chat_template is not None: |
| | chat_template = tokenizer.chat_template |
| | self.chat_template = chat_template |
| |
|
| | super().__init__(tokenizer=tokenizer, chat_template=chat_template, **kwargs) |
| |
|
| | def __call__( |
| | self, |
| | text: Union[str, List[str]], |
| | timeseries: Optional[List[List[np.ndarray]]] = None, |
| | padding: Union[bool, str, PaddingStrategy] = False, |
| | padding_side: str = 'left', |
| | vllm_flag: bool = False, |
| | tokenize: bool = True, |
| | **kwargs, |
| | ) -> BatchFeature: |
| | """ |
| | Encodes a prompt and its associated time series. |
| | |
| | Args: |
| | prompt (List[str]): The input prompt containing <ts><ts/> placeholders. |
| | timeseries (List[np.ndarray]): A list of time series matched to placeholders in the prompt. |
| | padding (bool or str or PaddingStrategy, optional): Passed to the tokenizer for text padding. |
| | return_tensors (str, optional): "pt" to return PyTorch tensors; None to return NumPy arrays. |
| | **kwargs: Additional tokenizer parameters. |
| | |
| | Returns: |
| | BatchFeature: Contains processed prompt, encoded time series, and tokenizer outputs. |
| | """ |
| | if type(text) == str: |
| | text = [text] |
| | if timeseries is None: |
| | timeseries = [] |
| |
|
| | reconstructed_prompts = [] |
| | concatenated_ts = None |
| | ts_tokens = [] |
| |
|
| | if vllm_flag: |
| | |
| | |
| | reconstructed_prompts = text |
| | |
| | |
| | encoded_ts_arrays = [] |
| | for ts in timeseries: |
| | |
| | encoded_ts, ts_prompt, _ = sp_encoding(ts, eots_token=False) |
| | |
| | if self.tokenizer is not None: |
| | tokens = self.tokenizer.encode(ts_prompt, add_special_tokens=False) |
| | ts_tokens.append(tokens) |
| | encoded_ts_arrays.append(encoded_ts[None, ...]) |
| | else: |
| | encoded_ts_arrays = [] |
| | total_ts_cnt = 0 |
| | for idx, prompt in enumerate(text): |
| | |
| | last_ts_cnt = total_ts_cnt |
| | prompt_segments = prompt.split("<ts><ts/>") |
| | total_ts_cnt = total_ts_cnt + len(prompt_segments) - 1 |
| |
|
| | |
| | reconstructed_prompt = prompt_segments[0] |
| |
|
| | for i, ts in enumerate(timeseries[last_ts_cnt:total_ts_cnt]): |
| | encoded_ts, ts_prompt, _ = sp_encoding(ts, eots_token=not vllm_flag) |
| | reconstructed_prompt += ts_prompt + prompt_segments[i + 1] |
| | |
| | encoded_ts_arrays.append(encoded_ts[None, ...]) |
| |
|
| | reconstructed_prompts.append(reconstructed_prompt) |
| |
|
| | if len(timeseries) != len(encoded_ts_arrays): |
| | raise ValueError( |
| | f"Mismatch between <ts><ts/> placeholders ({total_ts_cnt}) " |
| | f"and time series ({len(encoded_ts_arrays)})." |
| | ) |
| |
|
| | if len(encoded_ts_arrays) > 0: |
| | |
| | max_length = max(ts.shape[1] for ts in encoded_ts_arrays) |
| | padded_ts_arrays = [ |
| | np.pad(ts, ((0, 0), (0, max_length - ts.shape[1]), (0, 0)), mode="constant", constant_values=0.0) |
| | for ts in encoded_ts_arrays |
| | ] |
| | concatenated_ts = np.concatenate(padded_ts_arrays, axis=0) |
| | |
| | |
| | concatenated_ts = torch.from_numpy(concatenated_ts).half() |
| |
|
| | |
| | tokenizer_outputs = {} |
| | if tokenize and self.tokenizer is not None: |
| | tokenizer_outputs = self.tokenizer(reconstructed_prompts, padding=padding, padding_side=padding_side, **kwargs) |
| | else: |
| | tokenizer_outputs = {"text": reconstructed_prompts} |
| |
|
| | |
| | outputs = tokenizer_outputs |
| | if vllm_flag: |
| | outputs["timeseries"] = zip(ts_tokens, encoded_ts_arrays) |
| | elif concatenated_ts is not None: |
| | outputs["timeseries"] = concatenated_ts |
| |
|
| | return BatchFeature(data=outputs) |
| |
|
| | def encode_timeseries( |
| | self, |
| | timeseries: Optional[List[List[np.ndarray]]] = None, |
| | ) -> np.ndarray: |
| | if timeseries is None: |
| | timeseries = [] |
| |
|
| | concatenated_ts = None |
| | encoded_ts_arrays = [] |
| |
|
| | for i, ts in enumerate(timeseries): |
| | encoded_ts, _, _ = sp_encoding(ts) |
| | |
| | encoded_ts_arrays.append(encoded_ts[None, ...]) |
| |
|
| | if len(encoded_ts_arrays) > 0: |
| | |
| | max_length = max(ts.shape[1] for ts in encoded_ts_arrays) |
| | padded_ts_arrays = [ |
| | np.pad(ts, ((0, 0), (0, max_length - ts.shape[1]), (0, 0)), mode="constant", constant_values=0.0) |
| | for ts in encoded_ts_arrays |
| | ] |
| | concatenated_ts = np.concatenate(padded_ts_arrays, axis=0) |
| | |
| | |
| | concatenated_ts = torch.from_numpy(concatenated_ts).half() |
| |
|
| | return concatenated_ts |
| |
|
| | @property |
| | def model_input_names(self): |
| | """ |
| | Define the input names expected by the model. |
| | """ |
| | tokenizer_input_names = [] |
| | if self.tokenizer and hasattr(self.tokenizer, "model_input_names"): |
| | tokenizer_input_names = self.tokenizer.model_input_names |
| | return list(dict.fromkeys(["processed_prompt", "time_series"] + tokenizer_input_names)) |
| |
|
| | def batch_decode(self, *args, **kwargs): |
| | """ |
| | This method forwards all its arguments to Qwen3TokenizerFast's [`~PreTrainedTokenizer.batch_decode`]. Please |
| | refer to the docstring of this method for more information. |
| | """ |
| | return self.tokenizer.batch_decode(*args, **kwargs) |
| |
|
| | def decode(self, *args, **kwargs): |
| | """ |
| | This method forwards all its arguments to Qwen3TokenizerFast's [`~PreTrainedTokenizer.decode`]. Please refer to |
| | the docstring of this method for more information. |
| | """ |
| | return self.tokenizer.decode(*args, **kwargs) |