Spaces:
Build error
Build error
| # SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. | |
| # SPDX-License-Identifier: Apache-2.0 | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| from typing import Any, Dict, List, Optional, Union | |
| import numpy as np | |
| import torch | |
| from transformers import AutoTokenizer | |
| from cosmos_predict1.utils import log | |
| def get_tokenizer_path(model_family: str, is_instruct_model: bool = False): | |
| """ | |
| Get the tokenizer path from the model family and instruct model flag. | |
| Args: | |
| model_family (str): The model family. | |
| is_instruct_model (bool): Whether the model is an instruct model. | |
| Returns: | |
| str: The tokenizer path. | |
| """ | |
| model_family = model_family.lower() | |
| if model_family == "mistral": | |
| return "mistralai/Mistral-Nemo-Instruct-2407" | |
| else: | |
| assert model_family in ["llama3", "llama3.1"] | |
| if model_family == "llama3": | |
| model_path = "meta-llama/Meta-Llama-3-8B" | |
| elif model_family == "llama3.1": | |
| model_path = "meta-llama/Llama-3.1-8B" | |
| else: | |
| raise ValueError(f"Unsupported model family: {model_family}") | |
| suffix = "-Instruct" if is_instruct_model else "" | |
| model_path = f"{model_path}{suffix}" | |
| return model_path | |
| class TextTokenizer: | |
| """ | |
| Text tokenizer class built on HuggingFace's Fast Tokenizer (Rust based). | |
| """ | |
| def __init__( | |
| self, | |
| model_family: str, | |
| is_instruct_model: bool, | |
| local_path: Optional[str] = None, | |
| ): | |
| """ | |
| Initialize the TextTokenizer. | |
| Args: | |
| model_family (str): The model family. | |
| is_instruct_model (bool): Whether the model is an instruct model. | |
| local_path (Optional[str]): The local path to the tokenizer. If not provided, the tokenizer will be downloaded from the remote path. | |
| """ | |
| if local_path is None: | |
| tokenizer_path = get_tokenizer_path(model_family, is_instruct_model) | |
| else: | |
| tokenizer_path = local_path | |
| self.tokenizer = AutoTokenizer.from_pretrained(tokenizer_path, use_fast=True) | |
| self.stop_tokens = { | |
| self.tokenizer.eos_token_id, | |
| } | |
| self.model_family = model_family | |
| self.is_instruct_model = is_instruct_model | |
| self.eos_id = self.tokenizer.eos_token_id | |
| if self.tokenizer.pad_token is None: | |
| if model_family.startswith("llama"): | |
| self.pad_id = 128004 # "<|finetune_right_pad_id|>" | |
| elif model_family == "mistral": | |
| self.pad_id = 10 # "<pad>" | |
| elif model_family == "pixtral": | |
| self.pad_id = 11 # "<pad>" | |
| else: | |
| raise ValueError(f"pad_id not defined for model_family {model_family}") | |
| else: | |
| self.pad_id = self.tokenizer.pad_token_id | |
| def tokenize(self, text: str, *, add_special_tokens: bool = False, **kwargs) -> List[str]: | |
| """ | |
| Converts a string into a sequence of tokens, replacing unknown tokens with the `unk_token`. | |
| Args: | |
| text (`str`): | |
| The sequence to be encoded. | |
| add_special_tokens (`bool`, *optional*, defaults to `False`): | |
| Whether or not to add the special tokens associated with the corresponding model. | |
| Returns: | |
| `List[str]`: The list of tokens. | |
| """ | |
| return self.tokenizer.tokenize(text, add_special_tokens=add_special_tokens, **kwargs) | |
| def encode( | |
| self, | |
| text: Union[str, List[str], List[int]], | |
| *, # Enforce keyword-only arguments | |
| add_special_tokens: bool = True, | |
| padding: Union[bool, str] = False, | |
| truncation: Union[bool, str] = None, | |
| max_length: Optional[int] = None, | |
| stride: int = 0, | |
| return_tensors: Optional[str] = None, | |
| **kwargs, | |
| ) -> List[int]: | |
| """ | |
| Converts a string to a sequence of ids (integer), using the tokenizer and vocabulary. | |
| Args: | |
| text (`str`, `List[str]` or `List[int]`): | |
| The first sequence to be encoded. This can be a string, a list of strings (tokenized string using the | |
| `tokenize` method) or a list of integers (tokenized string ids using the `convert_tokens_to_ids` | |
| method). | |
| add_special_tokens (`bool`, *optional*, defaults to `True`): | |
| Whether or not to add special tokens when encoding the sequences. This will use the underlying | |
| `PretrainedTokenizerBase.build_inputs_with_special_tokens` function, which defines which tokens are | |
| automatically added to the input ids. This is usefull if you want to add `bos` or `eos` tokens | |
| automatically. | |
| padding (`bool`, `str`, *optional*, defaults to `False`): | |
| Activates and controls padding. Accepts the following values: | |
| - `True` or `'longest'`: Pad to the longest sequence in the batch (or no padding if only a single | |
| sequence if provided). | |
| - `'max_length'`: Pad to a maximum length specified with the argument `max_length` or to the maximum | |
| acceptable input length for the model if that argument is not provided. | |
| - `False` or `'do_not_pad'` (default): No padding (i.e., can output a batch with sequences of different | |
| lengths). | |
| truncation (`bool`, `str`, *optional*, defaults to `False`): | |
| Activates and controls truncation. Accepts the following values: | |
| - `True` or `'longest_first'`: Truncate to a maximum length specified with the argument `max_length` or | |
| to the maximum acceptable input length for the model if that argument is not provided. This will | |
| truncate token by token, removing a token from the longest sequence in the pair if a pair of | |
| sequences (or a batch of pairs) is provided. | |
| - `'only_first'`: Truncate to a maximum length specified with the argument `max_length` or to the | |
| maximum acceptable input length for the model if that argument is not provided. This will only | |
| truncate the first sequence of a pair if a pair of sequences (or a batch of pairs) is provided. | |
| - `'only_second'`: Truncate to a maximum length specified with the argument `max_length` or to the | |
| maximum acceptable input length for the model if that argument is not provided. This will only | |
| truncate the second sequence of a pair if a pair of sequences (or a batch of pairs) is provided. | |
| - `False` or `'do_not_truncate'` (default): No truncation (i.e., can output batch with sequence lengths | |
| greater than the model maximum admissible input size). | |
| max_length (`int`, *optional*): | |
| Controls the maximum length to use by one of the truncation/padding parameters. | |
| If left unset or set to `None`, this will use the predefined model maximum length if a maximum length | |
| is required by one of the truncation/padding parameters. If the model has no specific maximum input | |
| length (like XLNet) truncation/padding to a maximum length will be deactivated. | |
| stride (`int`, *optional*, defaults to 0): | |
| If set to a number along with `max_length`, the overflowing tokens returned when | |
| `return_overflowing_tokens=True` will contain some tokens from the end of the truncated sequence | |
| returned to provide some overlap between truncated and overflowing sequences. The value of this | |
| argument defines the number of overlapping tokens. | |
| is_split_into_words (`bool`, *optional*, defaults to `False`): | |
| Whether or not the input is already pre-tokenized (e.g., split into words). If set to `True`, the | |
| tokenizer assumes the input is already split into words (for instance, by splitting it on whitespace) | |
| which it will tokenize. This is useful for NER or token classification. | |
| pad_to_multiple_of (`int`, *optional*): | |
| If set will pad the sequence to a multiple of the provided value. Requires `padding` to be activated. | |
| This is especially useful to enable the use of Tensor Cores on NVIDIA hardware with compute capability | |
| `>= 7.5` (Volta). | |
| return_tensors (`str` or [`~utils.TensorType`], *optional*): | |
| If set, will return tensors instead of list of python integers. Acceptable values are: | |
| - `'tf'`: Return TensorFlow `tf.constant` objects. | |
| - `'pt'`: Return PyTorch `torch.Tensor` objects. | |
| - `'np'`: Return Numpy `np.ndarray` objects. | |
| """ | |
| return self.tokenizer.encode( | |
| text, | |
| add_special_tokens=add_special_tokens, | |
| padding=padding, | |
| truncation=truncation, | |
| max_length=max_length, | |
| stride=stride, | |
| return_tensors=return_tensors, | |
| ) | |
| def decode( | |
| self, | |
| token_ids: Union[int, List[int], "np.ndarray", "torch.Tensor"], | |
| *, # Enforce keyword-only arguments | |
| skip_special_tokens: bool = False, | |
| clean_up_tokenization_spaces: bool = None, | |
| **kwargs, | |
| ) -> str: | |
| """ | |
| Converts a sequence of ids in a string, using the tokenizer and vocabulary with options to remove special | |
| tokens and clean up tokenization spaces. | |
| Args: | |
| token_ids (`Union[int, List[int], np.ndarray, torch.Tensor, tf.Tensor]`): | |
| List of tokenized input ids. Can be obtained using the `__call__` method. | |
| skip_special_tokens (`bool`, *optional*, defaults to `False`): | |
| Whether or not to remove special tokens in the decoding. | |
| clean_up_tokenization_spaces (`bool`, *optional*): | |
| Whether or not to clean up the tokenization spaces. If `None`, will default to | |
| `self.clean_up_tokenization_spaces`. | |
| kwargs (additional keyword arguments, *optional*): | |
| Will be passed to the underlying model specific decode method. | |
| Returns: | |
| `str`: The decoded sentence. | |
| """ | |
| return self.tokenizer.decode( | |
| token_ids, | |
| skip_special_tokens=skip_special_tokens, | |
| clean_up_tokenization_spaces=clean_up_tokenization_spaces, | |
| **kwargs, | |
| ) | |
| def apply_chat_template( | |
| self, | |
| conversation: Union[List[Dict[str, str]], List[List[Dict[str, str]]]], | |
| *, | |
| add_generation_prompt: bool = False, | |
| tokenize: bool = True, | |
| padding: bool = False, | |
| truncation: bool = False, | |
| max_length: Optional[int] = None, | |
| return_tensors: Optional[str] = None, | |
| return_dict: bool = False, | |
| return_assistant_tokens_mask: bool = False, | |
| generation_prefix: str = "", | |
| tokenizer_kwargs: Optional[Dict[str, Any]] = None, | |
| **kwargs, | |
| ): | |
| """ | |
| Converts a list of dictionaries with `"role"` and `"content"` keys to a list of token | |
| ids. This method is intended for use with chat models, and will read the tokenizer's chat_template attribute to determine the format and control tokens to use when converting. | |
| More details can be found at https://huggingface.co/docs/transformers/main/en/internal/tokenization_utils#transformers.PreTrainedTokenizerBase.apply_chat_template | |
| Args: | |
| conversation (Union[List[Dict[str, str]], List[List[Dict[str, str]]]]): A list of dicts | |
| with "role" and "content" keys, representing the chat history so far. | |
| add_generation_prompt (bool, *optional*): | |
| If this is set, a prompt with the token(s) that indicate | |
| the start of an assistant message will be appended to the formatted output. This is useful when you want to generate a response from the model. | |
| Note that this argument will be passed to the chat template, and so it must be supported in the | |
| template for this argument to have any effect. | |
| continue_final_message (bool, *optional*): | |
| If this is set, the chat will be formatted so that the final | |
| message in the chat is open-ended, without any EOS tokens. The model will continue this message | |
| rather than starting a new one. This allows you to "prefill" part of | |
| the model's response for it. Cannot be used at the same time as `add_generation_prompt`. | |
| tokenize (`bool`, defaults to `True`): | |
| Whether to tokenize the output. If `False`, the output will be a string. | |
| padding (`bool`, defaults to `False`): | |
| Whether to pad sequences to the maximum length. Has no effect if tokenize is `False`. | |
| truncation (`bool`, defaults to `False`): | |
| Whether to truncate sequences at the maximum length. Has no effect if tokenize is `False`. | |
| max_length (`int`, *optional*): | |
| Maximum length (in tokens) to use for padding or truncation. Has no effect if tokenize is `False`. If | |
| not specified, the tokenizer's `max_length` attribute will be used as a default. | |
| return_tensors (`str` or [`~utils.TensorType`], *optional*): | |
| If set, will return tensors of a particular framework. Has no effect if tokenize is `False`. Acceptable | |
| values are: | |
| - `'tf'`: Return TensorFlow `tf.Tensor` objects. | |
| - `'pt'`: Return PyTorch `torch.Tensor` objects. | |
| - `'np'`: Return NumPy `np.ndarray` objects. | |
| - `'jax'`: Return JAX `jnp.ndarray` objects. | |
| return_dict (`bool`, defaults to `False`): | |
| Whether to return a dictionary with named outputs. Has no effect if tokenize is `False`. | |
| generation_prefix (str): Prefix to add before asking model to generate. Helpful to guide the generation. Defaults to "". | |
| tokenizer_kwargs (`Dict[str: Any]`, *optional*): Additional kwargs to pass to the tokenizer. | |
| return_assistant_tokens_mask (`bool`, defaults to `False`): | |
| Whether to return a mask of the assistant generated tokens. For tokens generated by the assistant, | |
| the mask will contain 1. For user and system tokens, the mask will contain 0. | |
| This functionality is only available for chat templates that support it via the `{% generation %}` keyword. | |
| **kwargs: Additional kwargs to pass to the template renderer. Will be accessible by the chat template. | |
| Returns: | |
| `Union[List[int], Dict]`: A list of token ids representing the tokenized chat so far, including control tokens. This | |
| output is ready to pass to the model, either directly or via methods like `generate()`. If `return_dict` is | |
| set, will return a dict of tokenizer outputs instead. | |
| """ | |
| if not self.is_instruct_model: | |
| raise ValueError( | |
| "apply_chat_template is only supported for instruct models. You should pass argument is_instruct_model=True to the TextTokenizer constructor." | |
| ) | |
| # Since generation_prefix is added to the text in the end, ensure that the setting is correct | |
| if generation_prefix: | |
| assert not tokenize, "tokenize must be False when generation_prefix is provided." | |
| assert add_generation_prompt, "add_generation_prompt must be set when generation_prefix is provided." | |
| formatted_text: Union[str, List[int]] = self.tokenizer.apply_chat_template( | |
| conversation, | |
| add_generation_prompt=add_generation_prompt, | |
| tokenize=tokenize, | |
| padding=padding, | |
| truncation=truncation, | |
| max_length=max_length, | |
| return_tensors=return_tensors, | |
| return_dict=return_dict, | |
| return_assistant_tokens_mask=return_assistant_tokens_mask, | |
| tokenizer_kwargs=tokenizer_kwargs, | |
| **kwargs, | |
| ) | |
| if generation_prefix: | |
| formatted_text: str = formatted_text + generation_prefix | |
| log.debug( | |
| f"Adding generation prefix: {generation_prefix} to the formatted text\n" | |
| f"Formatted text: {formatted_text}" | |
| ) | |
| return formatted_text | |