| | import copy |
| | import warnings |
| | from typing import Any, Dict, List, Optional, Union |
| |
|
| | import torch |
| | import torchvision.transforms.functional as F |
| | from transformers import BatchEncoding |
| | from transformers.processing_utils import ProcessorMixin |
| |
|
| |
|
| | def to_tensor(x): |
| | """ |
| | Convert a nested structure of numpy arrays or tensors (including lists and tuples of them) |
| | into a tensor. Assumes that all nested structures can be converted into a tensor directly. |
| | |
| | :param x: Nested structure containing numpy arrays, tensors, lists, or tuples |
| | :return: torch.Tensor |
| | """ |
| | with warnings.catch_warnings(): |
| | |
| | warnings.filterwarnings( |
| | "error", |
| | category=UserWarning, |
| | message=".*Creating a tensor from a list of numpy.ndarrays is extremely slow.*", |
| | ) |
| | try: |
| | return torch.Tensor(x) |
| | except Exception: |
| | if isinstance(x, list): |
| | return torch.stack([to_tensor(item) for item in x]) |
| | else: |
| | raise TypeError("Unsupported type for conversion to tensor") |
| |
|
| |
|
| | def truncate( |
| | encoding: Dict[str, List[List[Any]]], max_length: int, truncation_side: str = "right", preserve: bool = False |
| | ) -> Dict[str, List[List[Any]]]: |
| | """ |
| | Truncate the sequences in the encoding to the specified maximum length. |
| | |
| | This function is designed to process batch of sequences represented in the encoding dictionary. |
| | Depending on the chosen strategy, sequences are either truncated with loss of residual data or with preservation |
| | and incorporation of residual data into the batch. |
| | |
| | Args: |
| | encoding (`Mapping`): |
| | A dictionary where each key-value pair consists of a feature name and its corresponding batch of sequences. |
| | The sequences are expected to be lists. |
| | max_length (`int`): |
| | The maximum allowable length for the sequences. |
| | truncation_side (`str`, **optional**): |
| | The strategy to use for truncation. Can be `"left"` or `"right"`. Defaults to `"right"`. |
| | preserve (`bool`, **optional**): |
| | Whether to preserve the residual data by adding them as new sequences in the batch. Defaults to `False`. |
| | |
| | Returns: |
| | `Dict[str, List[List[Any]]]`: |
| | A dictionary with the same keys as the input `encoding`, containing the truncated batch of sequences. |
| | If `preserve` is set to `True`, the batch size may increase due to the addition of new sequences formed |
| | from the residual data. |
| | |
| | Example: |
| | |
| | >>> encoding = {'feature1': [[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]]} |
| | >>> truncate(encoding, 3, preserve=False) |
| | {'feature1': [[1, 2, 3], [6, 7, 8]]} |
| | |
| | >>> truncate(encoding, 3, preserve=True) |
| | {'feature1': [[1, 2, 3], [4, 5], [6, 7, 8], [9, 10]]} |
| | """ |
| | truncated_encoding = {} |
| |
|
| | for key, sequences in encoding.items(): |
| | if not all(isinstance(seq, list) for seq in sequences): |
| | raise TypeError(f"All sequences under key {key} should be of type list.") |
| |
|
| | truncated_sequences = [] |
| |
|
| | for seq in sequences: |
| | if len(seq) <= max_length: |
| | truncated_sequences.append(seq) |
| | continue |
| |
|
| | if preserve: |
| | if truncation_side == "right": |
| | truncated_sequences.extend([seq[i : i + max_length] for i in range(0, len(seq), max_length)]) |
| | elif truncation_side == "left": |
| | n = len(seq) // max_length + int(len(seq) % max_length > 0) |
| | low, high = len(seq) - n * max_length, len(seq) |
| | truncated_sequences.extend( |
| | [seq[max(0, i - max_length) : i] for i in range(high, low, -max_length)] |
| | ) |
| | else: |
| | raise ValueError(f"Invalid truncation_side: {truncation_side}") |
| | else: |
| | if truncation_side == "right": |
| | truncated_sequences.append(seq[:max_length]) |
| | elif truncation_side == "left": |
| | truncated_sequences.append(seq[-max_length:]) |
| |
|
| | truncated_encoding[key] = truncated_sequences |
| |
|
| | return truncated_encoding |
| |
|
| |
|
| | def pad(encoding: Dict[str, List[List[Any]]], target_length: int) -> Dict[str, List[List[Any]]]: |
| | """ |
| | Pad the sequences in the encoding to the specified maximum length. |
| | |
| | This function is designed to process batch of sequences represented in the encoding dictionary. |
| | The padding value is set to be the first element in the sequence. |
| | |
| | Args: |
| | encoding (`Mapping`): |
| | A dictionary where each key-value pair consists of a feature name and its corresponding batch of sequences. |
| | The sequences are expected to be lists. |
| | target_length (`int`): |
| | The desired length for the sequences. |
| | |
| | Returns: |
| | `Dict[str, List[List[Any]]]`: |
| | A dictionary with the same keys as the input `encoding`, containing the padded batch of sequences. |
| | An additional key `attention_mask` is added to the dictionary to indicate the positions of the non-padding |
| | elements with 1s and the padding elements with 0s. If the input `encoding` already contains an |
| | `attention_mask` key, the corresponding mask will be updated such that the original masking is preserved, |
| | and the newly added padding elements will be masked with 0s. In other words, the resulting |
| | `attention_mask` is a logical "AND" between the provided mask and the mask created due to padding, ensuring |
| | that any element masked originally remains masked. |
| | |
| | Example: |
| | |
| | >>> encoding = {'feature1': [[1, 2], [3, 4, 5]]} |
| | >>> pad(encoding, 4) |
| | {'feature1': [[1, 2, 1, 1], [3, 4, 5, 3]], 'attention_mask': [[1, 1, 0, 0], [1, 1, 1, 0]]} |
| | |
| | >>> encoding = {'feature1': [[1, 2], [3, 4, 5]], "attention_mask": [[1, 0], [0, 1, 1]]} |
| | >>> pad(encoding, 4) |
| | {'feature1': [[1, 2, 1, 1], [3, 4, 5, 3]], 'attention_mask': [[1, 0, 0, 0], [0, 1, 1, 0]]} |
| | """ |
| | padded_encoding = {} |
| |
|
| | for key, sequences in encoding.items(): |
| | if not all(isinstance(seq, (list, torch.Tensor)) for seq in sequences): |
| | raise TypeError(f"All sequences under key {key} should be of type list or tensor.") |
| | if key == "attention_mask": |
| | continue |
| |
|
| | padded_sequences = [] |
| | pad_mask = [] |
| |
|
| | for seq in sequences: |
| | pad_len = target_length - len(seq) |
| | padded_seq = list(seq) + [seq[0]] * max(0, pad_len) |
| | mask = [1] * len(seq) + [0] * max(0, pad_len) |
| |
|
| | padded_sequences.append(padded_seq) |
| | pad_mask.append(mask) |
| |
|
| | padded_encoding[key] = padded_sequences |
| |
|
| | if "attention_mask" in encoding: |
| | padded_encoding["attention_mask"] = [ |
| | [a * (b[i] if i < len(b) else 0) for i, a in enumerate(row)] |
| | for row, b in zip(pad_mask, encoding["attention_mask"]) |
| | ] |
| | else: |
| | padded_encoding["attention_mask"] = pad_mask |
| |
|
| | return padded_encoding |
| |
|
| |
|
| | class JatProcessor(ProcessorMixin): |
| | r""" |
| | JAT processor which wraps a CLIP image processor and a BERT tokenizer into a single processor. |
| | |
| | [`JatProcessor`] offers all the functionalities of [`CLIPImageProcessor`] and [`BertTokenizerFast`]. See the |
| | [`~JatProcessor.__call__`] and [`~JatProcessor.decode`] for more information. |
| | |
| | Args: |
| | image_processor ([`AutoImageProcessor`]): |
| | The image processor is a required input. |
| | tokenizer ([`AutoTokenizer`]): |
| | The tokenizer is a required input. |
| | """ |
| |
|
| | attributes = ["image_processor", "tokenizer"] |
| | image_processor_class = "AutoImageProcessor" |
| | tokenizer_class = "AutoTokenizer" |
| |
|
| | DONT_TRUNCATE_OR_PAD = {"pixel_values"} |
| |
|
| | def __init__(self, image_processor, tokenizer): |
| | super().__init__(image_processor, tokenizer) |
| | self.current_processor = self.image_processor |
| |
|
| | def _truncate_and_pad( |
| | self, |
| | encoding: dict, |
| | padding: Union[bool, str], |
| | truncation: Union[bool, str], |
| | truncation_side: str = "right", |
| | max_length: Optional[int] = None, |
| | ) -> dict: |
| | |
| | if max_length is None: |
| | max_length = self.tokenizer.model_max_length |
| |
|
| | |
| | excluded = {key: value for key, value in encoding.items() if key in self.DONT_TRUNCATE_OR_PAD} |
| | encoding = {key: value for key, value in encoding.items() if key not in self.DONT_TRUNCATE_OR_PAD} |
| |
|
| | |
| | if truncation in [True, "lossy"]: |
| | encoding = truncate(encoding, max_length, truncation_side, preserve=False) |
| | elif truncation == "preserve": |
| | encoding = truncate(encoding, max_length, truncation_side, preserve=True) |
| | elif truncation in [False, "do_not_truncate"]: |
| | pass |
| | else: |
| | raise ValueError("Invalid truncation strategy:" + str(truncation)) |
| |
|
| | |
| | if padding in [True, "longest"]: |
| | target_length = max(len(seq) for sequences in encoding.values() for seq in sequences) |
| | encoding = pad(encoding, target_length) |
| | elif padding == "max_length": |
| | encoding = pad(encoding, max_length) |
| | elif padding in [False, "do_not_pad"]: |
| | pass |
| | else: |
| | raise ValueError("Invalid padding strategy:" + str(padding)) |
| |
|
| | |
| | encoding.update(excluded) |
| |
|
| | |
| | |
| | if "image_observations" in encoding: |
| | encoding["image_observations"] = to_tensor(encoding["image_observations"]) |
| |
|
| | return encoding |
| |
|
| | def __call__( |
| | self, |
| | text=None, |
| | images=None, |
| | continuous_observations=None, |
| | discrete_observations=None, |
| | text_observations=None, |
| | image_observations=None, |
| | continuous_actions=None, |
| | discrete_actions=None, |
| | rewards=None, |
| | return_tensors=None, |
| | **kwargs, |
| | ): |
| | """ |
| | Main method to prepare for the model one or several sequences(s) and image(s). This method forwards the `text` |
| | and `kwargs` arguments to BertTokenizerFast's [`~BertTokenizerFast.__call__`] if `text` is not `None` to encode |
| | the text. To prepare the image(s), this method forwards the `images` and `kwrags` arguments to |
| | CLIPImageProcessor's [`~CLIPImageProcessor.__call__`] if `images` is not `None`. Please refer to the doctsring |
| | of the above two methods for more information. |
| | |
| | Args: |
| | text (`str`, `List[str]`, `List[List[str]]`): |
| | The sequence or batch of sequences to be encoded. Each sequence can be a string or a list of strings |
| | (pretokenized string). If the sequences are provided as list of strings (pretokenized), you must set |
| | `is_split_into_words=True` (to lift the ambiguity with a batch of sequences). |
| | images (`PIL.Image.Image`, `np.ndarray`, `torch.Tensor`, `List[PIL.Image.Image]`, |
| | `List[np.ndarray]`, `List[torch.Tensor]`): |
| | The image or batch of images to be prepared. Each image can be a PIL image, NumPy array or PyTorch |
| | tensor. In case of a NumPy array/PyTorch tensor, each image should be of shape (C, H, W), where C is a |
| | number of channels, H and W are image height and width. |
| | continuous_observations (`List[List[List[float]]]`): |
| | The continuous observations or batch of continuous observations to be encoded. |
| | discrete_observations (`List[List[List[int]]]`): |
| | The discrete observations or batch of discrete observations to be encoded. |
| | text_observations (`List[List[str]]`): |
| | The text observations or batch of text observations to be encoded. |
| | image_observations (`List[List[PIL.Image.Image]]`, `List[List[np.ndarray]]`, `List[List[torch.Tensor]]`): |
| | The image observations or batch of image observations to be encoded. |
| | continuous_actions (`List[List[List[float]]]`): |
| | The continuous actions or batch of continuous actions to be encoded. |
| | discrete_actions (``List[List[int]]`): |
| | The discrete actions or batch of discrete actions to be encoded. |
| | rewards (``List[List[float]]`): |
| | The rewards or batch of rewards to be encoded. |
| | return_tensors (`str` or [`~utils.TensorType`], *optional*): |
| | If set, will return tensors of a particular framework. Acceptable values are: |
| | |
| | - `'tf'`: Return TensorFlow `tf.constant` objects. |
| | - `'pt'`: Return PyTorch `torch.Tensor` objects. |
| | - `'np'`: Return NumPy `np.ndarray` objects. |
| | - `'jax'`: Return JAX `jnp.ndarray` objects. |
| | |
| | Returns: |
| | [`BatchEncoding`]: A [`BatchEncoding`] with the following fields: |
| | |
| | - **input_ids** -- List of token ids to be fed to a model. Returned when `text` is not `None`. |
| | - **attention_mask** -- List of indices specifying which tokens should be attended to by the model (when |
| | `return_attention_mask=True` or if *"attention_mask"* is in `self.model_input_names` and if `text` is not |
| | `None`). |
| | - **pixel_values** -- Pixel values to be fed to a model. Returned when `images` is not `None`. |
| | """ |
| | |
| | padding = kwargs.pop("padding", False) |
| | truncation = kwargs.pop("truncation", False) |
| | truncation_side = kwargs.pop("truncation_side", "right") |
| | max_length = kwargs.pop("max_length", None) |
| |
|
| | |
| | if text is not None and not isinstance(text, list): |
| | text = [text] |
| |
|
| | encoding = {} |
| | if text is not None: |
| | encoding["input_ids"] = self.tokenizer(text, **kwargs)["input_ids"] |
| | if images is not None: |
| | encoding["pixel_values"] = self.image_processor(images, **kwargs).pixel_values |
| | if continuous_observations is not None: |
| | encoding["continuous_observations"] = copy.deepcopy(continuous_observations) |
| | if discrete_observations is not None: |
| | encoding["discrete_observations"] = copy.deepcopy(discrete_observations) |
| | if text_observations is not None: |
| | if "discrete_observations" not in encoding: |
| | raise ValueError("discrete_observations must be provided if text_observations is provided") |
| | for batch_idx, sequence in enumerate(text_observations): |
| | encoded_text = self.tokenizer(sequence, max_length=64, padding="max_length")["input_ids"] |
| | for timestep, text_tokens in enumerate(encoded_text): |
| | encoding["discrete_observations"][batch_idx][timestep].extend(text_tokens) |
| | if image_observations is not None: |
| | image_observations = [[(F.to_tensor(im) - 0.5) / 0.5 for im in ep] for ep in image_observations] |
| | encoding["image_observations"] = image_observations |
| | if continuous_actions is not None: |
| | encoding["continuous_actions"] = copy.deepcopy(continuous_actions) |
| | if discrete_actions is not None: |
| | encoding["discrete_actions"] = copy.deepcopy(discrete_actions) |
| |
|
| | if rewards is not None: |
| | encoding["rewards"] = [[float(r) for r in ep] for ep in rewards] |
| |
|
| | |
| | if text is not None and images is not None: |
| | if max_length is None: |
| | max_length = self.tokenizer.model_max_length |
| | max_length -= (224 // 16) ** 2 |
| | elif ( |
| | continuous_observations is not None |
| | or discrete_observations is not None |
| | or text_observations is not None |
| | or image_observations is not None |
| | ): |
| | if max_length is None: |
| | max_length = self.tokenizer.model_max_length |
| | max_length //= 2 |
| |
|
| | encoding = self._truncate_and_pad(encoding, padding, truncation, truncation_side, max_length) |
| |
|
| | return BatchEncoding(encoding, tensor_type=return_tensors) |
| |
|
| | def batch_decode(self, *args, **kwargs): |
| | """ |
| | This method forwards all its arguments to BertTokenizerFast's [`~PreTrainedTokenizer.batch_decode`]. Please |
| | refer to the docstring of this method for more information. |
| | """ |
| | return self.tokenizer.batch_decode(*args, **kwargs) |
| |
|
| | def decode(self, *args, **kwargs): |
| | """ |
| | This method forwards all its arguments to BertTokenizerFast's [`~PreTrainedTokenizer.decode`]. Please refer to |
| | the docstring of this method for more information. |
| | """ |
| | return self.tokenizer.decode(*args, **kwargs) |
| |
|
| | def pad(self, *args, **kwargs): |
| | inputs = args[0] |
| | keys = [key for key in inputs[0].keys() if inputs[0][key] is not None] |
| | inputs = {key: [arg[key] for arg in inputs] for key in keys} |
| | elmt = next(iter(inputs.values())) |
| | if isinstance(elmt[0], torch.Tensor) and not isinstance(elmt, torch.Tensor): |
| | encoding = {key: torch.stack(inputs[key]) for key in inputs.keys()} |
| | else: |
| | encoding = self._truncate_and_pad( |
| | inputs, padding=kwargs.get("padding", False), truncation=False, max_length=kwargs.get("max_length") |
| | ) |
| |
|
| | return BatchEncoding(encoding, tensor_type=kwargs.get("return_tensors")) |
| |
|
| | @property |
| | def model_input_names(self): |
| | return [ |
| | "input_ids", |
| | "attention_mask", |
| | "pixel_values", |
| | "continuous_observations", |
| | "discrete_observations", |
| | "image_observations", |
| | "continuous_actions", |
| | "discrete_actions", |
| | "rewards", |
| | ] |
| |
|
| |
|
| | JatProcessor.register_for_auto_class("AutoProcessor") |
| |
|