| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | from typing import List, Tuple, Union |
| |
|
| | import torch |
| | import transformers |
| | from transformers import T5EncoderModel, T5TokenizerFast |
| |
|
| | from Cosmos.utils import log |
| |
|
| | transformers.logging.set_verbosity_error() |
| |
|
| |
|
| | class CosmosT5TextEncoder(torch.nn.Module): |
| | """Handles T5 text encoding operations.""" |
| |
|
| | def __init__(self, model_name: str = "google-t5/t5-11b", device: str = "cuda", cache_dir: str = "~/.cache"): |
| | """Initializes the T5 tokenizer and encoder. |
| | |
| | Args: |
| | model_name: The name of the T5 model to use. |
| | device: The device to use for computations. |
| | """ |
| | super().__init__() |
| | try: |
| | self.tokenizer = T5TokenizerFast.from_pretrained(model_name, cache_dir=cache_dir) |
| | self.text_encoder = T5EncoderModel.from_pretrained(model_name, cache_dir=cache_dir).to(device) |
| | except Exception as e: |
| | log.warning(f"Failed to load T5 model using cache_dir '{cache_dir}', falling back to default location: {e}") |
| | self.tokenizer = T5TokenizerFast.from_pretrained(model_name) |
| | self.text_encoder = T5EncoderModel.from_pretrained(model_name).to(device) |
| | self.text_encoder.eval() |
| | self.device = device |
| |
|
| | @torch.inference_mode() |
| | def encode_prompts( |
| | self, prompts: Union[str, List[str]], max_length: int = 512 |
| | ) -> Tuple[torch.Tensor, torch.Tensor]: |
| | """Encodes text prompts into hidden state representations using a T5 encoder. |
| | |
| | This function tokenizes the input prompts, processes them through a T5 text encoder, |
| | and returns the last hidden states. The encoded outputs beyond the actual sequence |
| | length are zero-padded. All prompts in a batch are padded to max_length. |
| | |
| | Args: |
| | prompts: Input text to encode. Can be a single string or a list of strings. |
| | max_length: Maximum sequence length for tokenization and padding. Longer |
| | sequences will be truncated. Defaults to 512. |
| | return_mask: If True, returns the attention mask along with encoded text. |
| | Defaults to False. |
| | |
| | Returns: |
| | If return_mask is False: |
| | torch.Tensor: Encoded text embeddings of shape (batch_size, max_length, hidden_size). |
| | If return_mask is True: |
| | tuple[torch.Tensor, torch.Tensor]: A tuple containing: |
| | - Encoded text embeddings of shape (batch_size, max_length, hidden_size) |
| | - Attention mask of shape (batch_size, max_length) as boolean tensor |
| | |
| | Raises: |
| | ValueError: If the input prompts list is empty. |
| | |
| | Example: |
| | >>> encoder = CosmosT5TextEncoder() |
| | >>> prompts = ["Hello world", "Another example"] |
| | >>> embeddings = encoder.encode_prompts(prompts, max_length=128) |
| | """ |
| | if isinstance(prompts, str): |
| | prompts = [prompts] |
| |
|
| | if not prompts: |
| | raise ValueError("The input prompt list is empty.") |
| |
|
| | batch_encoding = self.tokenizer.batch_encode_plus( |
| | prompts, |
| | return_tensors="pt", |
| | truncation=True, |
| | padding="max_length", |
| | max_length=max_length, |
| | return_length=True, |
| | return_offsets_mapping=False, |
| | ) |
| |
|
| | input_ids = batch_encoding.input_ids.to(self.device) |
| | attn_mask = batch_encoding.attention_mask.to(self.device) |
| |
|
| | outputs = self.text_encoder(input_ids=input_ids, attention_mask=attn_mask) |
| |
|
| | encoded_text = outputs.last_hidden_state |
| | lengths = attn_mask.sum(dim=1).cpu() |
| |
|
| | for batch_id in range(encoded_text.shape[0]): |
| | encoded_text[batch_id][lengths[batch_id] :] = 0 |
| |
|
| | return encoded_text, attn_mask |
| |
|