Spaces:
Build error
Build error
| from dataclasses import dataclass | |
| from typing import List, Optional, Tuple, Union, Dict | |
| from copy import deepcopy | |
| import re | |
| import math | |
| import torch | |
| import torch.utils.checkpoint | |
| from torch import nn | |
| import matplotlib.pyplot as plt | |
| from transformers import PreTrainedModel | |
| from transformers.activations import ACT2FN | |
| from transformers.cache_utils import Cache | |
| from transformers.modeling_outputs import ModelOutput | |
| from transformers.utils import ( | |
| add_start_docstrings, | |
| add_start_docstrings_to_model_forward, | |
| logging, | |
| replace_return_docstrings, | |
| ) | |
| from transformers.models.auto import AutoModel, AutoModelForCausalLM | |
| from .configuration_gecko import GeckoConfig | |
| logger = logging.get_logger(__name__) | |
| _CONFIG_FOR_DOC = "GeckoConfig" | |
| class GeckoCausalLMOutputWithPast(ModelOutput): | |
| """ | |
| Base class for Llava causal language model (or autoregressive) outputs. | |
| Args: | |
| loss (`torch.FloatTensor` of shape `(1,)`, *optional*, returned when `labels` is provided): | |
| Language modeling loss (for next-token prediction). | |
| logits (`torch.FloatTensor` of shape `(batch_size, sequence_length, config.vocab_size)`): | |
| Prediction scores of the language modeling head (scores for each vocabulary token before SoftMax). | |
| past_key_values (`tuple(tuple(torch.FloatTensor))`, *optional*, returned when `use_cache=True` is passed or when `config.use_cache=True`): | |
| Tuple of `tuple(torch.FloatTensor)` of length `config.n_layers`, with each tuple having 2 tensors of shape | |
| `(batch_size, num_heads, sequence_length, embed_size_per_head)`) | |
| Contains pre-computed hidden-states (key and values in the self-attention blocks) that can be used (see | |
| `past_key_values` input) to speed up sequential decoding. | |
| hidden_states (`tuple(torch.FloatTensor)`, *optional*, returned when `output_hidden_states=True` is passed or when `config.output_hidden_states=True`): | |
| Tuple of `torch.FloatTensor` (one for the output of the embeddings, if the model has an embedding layer, + | |
| one for the output of each layer) of shape `(batch_size, sequence_length, hidden_size)`. | |
| Hidden-states of the model at the output of each layer plus the optional initial embedding outputs. | |
| attentions (`tuple(torch.FloatTensor)`, *optional*, returned when `output_attentions=True` is passed or when `config.output_attentions=True`): | |
| Tuple of `torch.FloatTensor` (one for each layer) of shape `(batch_size, num_heads, sequence_length, | |
| sequence_length)`. | |
| Attentions weights after the attention softmax, used to compute the weighted average in the self-attention | |
| heads. | |
| image_hidden_states (`tuple(torch.FloatTensor)`, *optional*): | |
| Tuple of `torch.FloatTensor` (one for the output of the image embeddings, `(batch_size, num_images, | |
| sequence_length, hidden_size)`. | |
| image_hidden_states of the model produced by the vision encoder, and optionally by the perceiver | |
| """ | |
| loss: Optional[torch.FloatTensor] = None | |
| logits: torch.FloatTensor = None | |
| past_key_values: Optional[Tuple[Tuple[torch.FloatTensor]]] = None | |
| hidden_states: Optional[Tuple[torch.FloatTensor]] = None | |
| attentions: Optional[Tuple[torch.FloatTensor]] = None | |
| image_hidden_states: Optional[Tuple[torch.FloatTensor]] = None | |
| class GeckoPreTrainedModel(PreTrainedModel): | |
| config_class = GeckoConfig | |
| base_model_prefix = "model" | |
| supports_gradient_checkpointing = True | |
| _no_split_modules = ["GeckoVisionAttention"] | |
| _skip_keys_device_placement = "past_key_values" | |
| _supports_flash_attn_2 = True | |
| def _init_weights(self, module): | |
| std = ( | |
| self.config.intializer_range if hasattr(self.config, "intializer_range") else self.config.text_config.initializer_range | |
| ) | |
| if hasattr(module, "class_embedding"): | |
| module.class_embedding.data.normal_(mean=0.0, std=std) | |
| if isinstance(module, (nn.Linear, nn.Conv2d)): | |
| module.weight.data.normal_(mean=0.0, std=std) | |
| if module.bias is not None: | |
| module.bias.data.zero_() | |
| elif isinstance(module, nn.Embedding): | |
| module.weight.data.normal_(mean=0.0, std=std) | |
| if module.padding_idx is not None: | |
| module.weight.data[module.padding_idx].zero_() | |
| def _supports_sdpa(self): | |
| return self.language_model._supports_sdpa | |
| class PositionalEncoding2D(nn.Module): | |
| def __init__(self, config: GeckoConfig): | |
| """ | |
| :param channels: The last dimension of the tensor you want to apply pos emb to. | |
| """ | |
| super(PositionalEncoding2D, self).__init__() | |
| if config.positional_information == "2d_before": | |
| channels = config.vision_config.hidden_size | |
| else: | |
| channels = config.text_config.hidden_size | |
| self.org_channels = channels | |
| channels = int(math.ceil(channels / 4) * 2) | |
| self.channels = channels | |
| inv_freq = 1.0 / (10000 ** (torch.arange(0, channels, 2).float() / channels)) | |
| self.register_buffer("inv_freq", inv_freq) | |
| self.register_buffer("cached_penc", None, persistent=False) | |
| def get_emb(self, sin_inp): | |
| """ | |
| Gets a base embedding for one dimension with sin and cos intertwined | |
| """ | |
| emb = torch.stack((sin_inp.sin(), sin_inp.cos()), dim=-1) | |
| return torch.flatten(emb, -2, -1) | |
| def forward(self, tensor): | |
| """ | |
| :param tensor: A 4d tensor of size (x, y, num_tokens, ch) | |
| :return: Positional Encoding Matrix of size (x, y, num_tokens, ch) | |
| """ | |
| if len(tensor.shape) != 4: | |
| raise RuntimeError("The input tensor has to be 4d!") | |
| if self.cached_penc is not None and self.cached_penc.shape == tensor.shape: | |
| return self.cached_penc | |
| self.cached_penc = None | |
| x, y, num_tokens, orig_ch = tensor.shape | |
| pos_x = torch.arange(x, device=tensor.device, dtype=self.inv_freq.dtype) | |
| pos_y = torch.arange(y, device=tensor.device, dtype=self.inv_freq.dtype) | |
| sin_inp_x = torch.einsum("i,j->ij", pos_x, self.inv_freq) | |
| sin_inp_y = torch.einsum("i,j->ij", pos_y, self.inv_freq) | |
| emb_x = self.get_emb(sin_inp_x).unsqueeze(1) | |
| emb_y = self.get_emb(sin_inp_y) | |
| emb = torch.zeros( | |
| (x, y, self.channels * 2), | |
| device=tensor.device, | |
| dtype=tensor.dtype, | |
| ) | |
| emb[:, :, : self.channels] = emb_x | |
| emb[:, :, self.channels : 2 * self.channels] = emb_y | |
| self.cached_penc = emb[:, :, None, :orig_ch].repeat(1, 1, num_tokens, 1) | |
| return self.cached_penc | |
| class GeckoMultiModalProjector(nn.Module): | |
| def __init__(self, config: GeckoConfig): | |
| super().__init__() | |
| self.linear_1 = nn.Linear(config.vision_config.hidden_size, config.text_config.hidden_size, bias=True) | |
| self.act = ACT2FN[config.projector_hidden_act] | |
| self.linear_2 = nn.Linear(config.text_config.hidden_size, config.text_config.hidden_size, bias=True) | |
| def forward(self, image_features): | |
| hidden_states = self.linear_1(image_features) | |
| hidden_states = self.act(hidden_states) | |
| hidden_states = self.linear_2(hidden_states) | |
| return hidden_states | |
| class GeckoForConditionalGeneration(GeckoPreTrainedModel): | |
| def __init__(self, config: GeckoConfig, vision_tower=None, language_model=None, multimodal_projector=None): | |
| super().__init__(config) | |
| self.vision_tower = AutoModel.from_config(config.vision_config) if vision_tower is None else vision_tower | |
| self.positional_encoding = PositionalEncoding2D(config) if '2d' in config.positional_information else None | |
| self.multi_modal_projector = GeckoMultiModalProjector(config) | |
| self.vocab_size = config.vocab_size | |
| self.language_model = AutoModelForCausalLM.from_config( | |
| config.text_config, attn_implementation=config._attn_implementation | |
| ) if language_model is None else language_model | |
| self.pad_token_id = self.config.pad_token_id if self.config.pad_token_id is not None else -1 | |
| self.post_init() | |
| def load_text_encoder(self, processor): | |
| self.tokenizer = processor.tokenizer | |
| self.clip_tokenizer = processor.clip_tokenizer | |
| self.eos_token_id = [self.tokenizer.eos_token_id, self.tokenizer.convert_tokens_to_ids("<|eot_id|>")] | |
| self.encoder_type = self.config.vision_config.model_type | |
| if 'clip' in self.encoder_type: | |
| self.encoder = AutoModel.from_pretrained('openai/clip-vit-large-patch14-336') | |
| elif 'siglip' in self.encoder_type: | |
| self.encoder = AutoModel.from_pretrained("google/siglip-so400m-patch14-384") | |
| else: | |
| raise ValueError(f"Vision model {self.config.vision_config.model_type} is not supported.") | |
| def get_input_embeddings(self): | |
| return self.language_model.get_input_embeddings() | |
| def set_input_embeddings(self, value): | |
| self.language_model.set_input_embeddings(value) | |
| def get_output_embeddings(self): | |
| return self.language_model.get_output_embeddings() | |
| def set_output_embeddings(self, new_embeddings): | |
| self.language_model.set_output_embeddings(new_embeddings) | |
| def set_decoder(self, decoder): | |
| self.language_model.set_decoder(decoder) | |
| def get_decoder(self): | |
| return self.language_model.get_decoder() | |
| def tie_weights(self): | |
| return self.language_model.tie_weights() | |
| def resize_token_embeddings(self, new_num_tokens: Optional[int] = None, pad_to_multiple_of=None) -> nn.Embedding: | |
| model_embeds = self.language_model.resize_token_embeddings(new_num_tokens, pad_to_multiple_of) | |
| # update vocab size | |
| self.config.text_config.vocab_size = model_embeds.num_embeddings | |
| self.config.vocab_size = model_embeds.num_embeddings | |
| self.vocab_size = model_embeds.num_embeddings | |
| return model_embeds | |
| # def _get_highest_similarity(self, cls_token, keyword_hidden_states, top_patches): | |
| # num_patches, embed_dim = cls_token.shape | |
| # batch_size, sequence_length, hidden_size = keyword_hidden_states.shape | |
| # assert embed_dim == hidden_size, f"The embedding dimension of cls token and keyword hidden states do not match. Dimension of cls token: {embed_dim} and dimension of keyword hidden states: {hidden_size}." | |
| # keyword_hidden_states = keyword_hidden_states.squeeze(0) | |
| # # calculate the similarity between the cls token and the keyword hidden states | |
| # similarity_score = torch.matmul(cls_token, keyword_hidden_states.T) # shape: (num_patches, sequence_length) | |
| # similarity_score = similarity_score.mean(dim=1) # shape: (num_patches) | |
| # # take the index of the patch with the highest similarity score | |
| # patch_index = torch.topk(similarity_score, top_patches).indices | |
| # return patch_index | |
| # def _select_patches(self, image_features, keyword_hidden_states, top_patches=1): | |
| # selected_patches = [] | |
| # # iterate through each image | |
| # for image in image_features: | |
| # if keyword_hidden_states is not None: | |
| # # take the first token of each patch | |
| # cls_token = image[:, 0, :].squeeze(1) | |
| # # get the index of the patch with the highest similarity score | |
| # patch_index = self._get_highest_similarity(cls_token, keyword_hidden_states, top_patches) | |
| # else: | |
| # top_patches = image.shape[0] | |
| # patch_index = torch.arange(top_patches) | |
| # # select the patch with the highest similarity score | |
| # if self.multimodal_projector == 'mlp': | |
| # image = image[patch_index, 1:, :].reshape(-1, image.shape[-1]).type(self.vision_tower.dtype) | |
| # elif self.multimodal_projector == 'perceiver': | |
| # image = image[patch_index, :, :].reshape(-1, image.shape[-1]).type(self.vision_tower.dtype) | |
| # else: | |
| # raise ValueError(f"Multimodal projector {self.multimodal_projector} is not supported.") | |
| # selected_patches.append(image) | |
| # return selected_patches # shape: list with shape of num_images, each element of shape (num_tokens * num_patches_i, embed_dim) | |
| # def _input_to_vision_tower(self, pixel_values): | |
| # output = [] | |
| # for i in range(len(pixel_values)): | |
| # num_patches = pixel_values[i].shape[0] | |
| # pixel_batch_size = 2 | |
| # processed_pixel_values | |
| # def _input_to_multimodal_projector(self, selected_image_features): | |
| # output = [] | |
| # for selected_image in selected_image_features: | |
| # selected_image = self.multi_modal_projector(selected_image) | |
| # output.append(selected_image) | |
| # return output # shape: list with shape of num_images, each element of shape (num_patches_i, num_tokens, embed_dim) where i is the index of the image | |
| # def _process_keyword_input(self, keyword_input_ids, maximum_keyword_tokens=10): | |
| # self.language_model.eval() | |
| # with torch.no_grad(): | |
| # output_ids = self.language_model.generate(input_ids=keyword_input_ids, return_dict_in_generate=True, max_new_tokens=maximum_keyword_tokens) | |
| # output_ids = output_ids.sequences[:, keyword_input_ids.shape[-1]:] | |
| # self.language_model.train() | |
| # # conditions | |
| # if output_ids[0, 0:2].tolist() == [35581, 25]: # condition where the output is in the form Keyword: <keyword> | |
| # keyword_ids = output_ids[:, 2:-1] | |
| # if keyword_ids[0, 0].item() == 482: | |
| # return None | |
| # return self.get_input_embeddings()(keyword_ids) | |
| # else: # output | |
| # return None | |
| def generate_keywords(self, keywords_text, criteria='template'): | |
| keywords_text = keywords_text.lstrip('\n') | |
| first_sentence = keywords_text.split('.')[0] + '.' | |
| if re.search(r'are (.+?)\.', first_sentence): | |
| objects = re.search(r'are (.+?)\.', first_sentence).group(1).split(' and ') | |
| elif re.search(r'is (.+?)\.', first_sentence): | |
| objects = [re.search(r'is (.+?)\.', first_sentence).group(1)] | |
| else: | |
| objects = [] | |
| def generate_template(object, description): | |
| if object[0] in ['a', 'e', 'i', 'o', 'u']: | |
| return f'An {object}, which {description}' | |
| else: | |
| return f'A {object}, which {description}' | |
| descriptions = [] | |
| keywords = [] | |
| for i, obj in enumerate(objects): | |
| keywords.append(obj) | |
| if criteria == 'word': | |
| descriptions.append([obj]) | |
| elif criteria == 'template': | |
| descriptions.append([f'a photo of {obj}']) | |
| elif criteria == 'description': | |
| # pattern = rf"'{obj}':(.*?)('|\Z)" | |
| # match = re.search(pattern, keywords_text, re.DOTALL) | |
| # if match: | |
| # # Extract the feature keywords_text and clean it up | |
| # feature_text = match.group(1).strip() | |
| # # Split on new lines and strip each line | |
| # feature_list = [generate_template(obj, line.strip('* ').strip()) for line in feature_text.split('\n') if line.strip()] | |
| # descriptions.append(feature_list) | |
| # The problem of the above code is that it does not work for the case where the object is not found in the text | |
| # make it more general | |
| features = re.findall(r"\* (.+)", keywords_text, re.MULTILINE) | |
| descriptions.append([generate_template(obj, feature) for feature in features[i * len(features) // len(objects): (i + 1) * len(features) // len(objects)]]) | |
| else: | |
| raise ValueError(f'invalid criteria: {criteria}') | |
| return keywords, descriptions | |
| def _merge_input_ids_with_image_features(self, image_features, inputs_embeds, input_ids, attention_mask, labels): | |
| num_images = len(image_features) | |
| num_image_tokens = torch.tensor([x.shape[0] for x in image_features], device=self.vision_tower.device, dtype=torch.int64) # total image tokens | |
| embed_dim = image_features[0].shape[-1] | |
| batch_size, sequence_length = input_ids.shape | |
| left_padding = not torch.sum(input_ids[:, -1] == torch.tensor(self.pad_token_id)) | |
| # 1. Create a mask to know where special image tokens are | |
| special_image_token_mask = input_ids == self.config.image_token_index | |
| # num_special_image_tokens = torch.sum(special_image_token_mask, dim=-1) | |
| # Compute the maximum embed dimension | |
| # max_embed_dim = (num_special_image_tokens.max() * (num_image_tokens - 1)) + sequence_length | |
| max_embed_dim = torch.sum(num_image_tokens) - num_images + sequence_length | |
| batch_indices, non_image_indices = torch.where(input_ids != self.config.image_token_index) | |
| _, image_indices = torch.where(input_ids == self.config.image_token_index) | |
| # 2. Compute the positions where text should be written | |
| # Calculate new positions for text tokens in merged image-text sequence. | |
| # `special_image_token_mask` identifies image tokens. Each image token will be replaced by `nb_text_tokens_per_images - 1` text tokens. | |
| # `torch.cumsum` computes how each image token shifts subsequent text token positions. | |
| # - 1 to adjust for zero-based indexing, as `cumsum` inherently increases indices by one. | |
| image_token_mask = special_image_token_mask * 1 | |
| image_token_mask[0, image_indices] = num_image_tokens - 1 | |
| # for i, index in enumerate(image_indices): | |
| # special_image_token_mask[0, index] = num_image_tokens[i] - 1 | |
| new_token_positions = torch.cumsum((image_token_mask) + 1, -1) - 1 | |
| # new_token_positions = torch.cumsum((special_image_token_mask * (num_image_patches - 1) + 1), -1) - 1 | |
| nb_image_pad = max_embed_dim - 1 - new_token_positions[:, -1] | |
| if left_padding: | |
| new_token_positions += nb_image_pad[:, None] # offset for left padding | |
| text_to_overwrite = new_token_positions[batch_indices, non_image_indices] | |
| # 3. Create the full embedding, already padded to the maximum position | |
| final_embedding = torch.zeros( | |
| batch_size, max_embed_dim, embed_dim, dtype=inputs_embeds.dtype, device=inputs_embeds.device | |
| ) | |
| final_attention_mask = torch.zeros( | |
| batch_size, max_embed_dim, dtype=attention_mask.dtype, device=inputs_embeds.device | |
| ) | |
| if labels is not None: | |
| final_labels = torch.full( | |
| (batch_size, max_embed_dim), self.config.ignore_index, dtype=input_ids.dtype, device=input_ids.device | |
| ) | |
| # In case the Vision model or the Language model has been offloaded to CPU, we need to manually | |
| # set the corresponding tensors into their correct target device. | |
| target_device = inputs_embeds.device | |
| batch_indices, non_image_indices, text_to_overwrite = ( | |
| batch_indices.to(target_device), | |
| non_image_indices.to(target_device), | |
| text_to_overwrite.to(target_device), | |
| ) | |
| attention_mask = attention_mask.to(target_device) | |
| # 4. Fill the embeddings based on the mask. If we have ["hey" "<image>", "how", "are"] | |
| # we need to index copy on [0, 577, 578, 579] for the text and [1:576] for the image features | |
| final_embedding[batch_indices, text_to_overwrite] = inputs_embeds[batch_indices, non_image_indices] | |
| final_attention_mask[batch_indices, text_to_overwrite] = attention_mask[batch_indices, non_image_indices] | |
| if labels is not None: | |
| final_labels[batch_indices, text_to_overwrite] = labels[batch_indices, non_image_indices] | |
| # 5. Fill the embeddings corresponding to the images. Anything that is still zeros needs filling | |
| image_to_overwrite = torch.all(final_embedding == 0, dim=-1) | |
| image_to_overwrite &= image_to_overwrite.cumsum(-1) - 1 >= nb_image_pad[:, None].to(target_device) | |
| if image_to_overwrite.sum() != torch.sum(num_image_tokens): | |
| raise ValueError( | |
| f"The input provided to the model are wrong. The number of image tokens is {torch.sum(special_image_token_mask)} while" | |
| f" the number of image given to the model is {num_images}. This prevents correct indexing and breaks batch generation." | |
| ) | |
| final_embedding[image_to_overwrite] = torch.cat([image_patches for image_patches in image_features], dim=0).to(target_device) | |
| # final_embedding[image_to_overwrite] = image_features.contiguous().reshape(-1, embed_dim).to(target_device) | |
| final_attention_mask |= image_to_overwrite | |
| position_ids = (final_attention_mask.cumsum(-1) - 1).masked_fill_((final_attention_mask == 0), 1) | |
| if labels is None: | |
| final_labels = None | |
| return final_embedding, final_attention_mask, final_labels, position_ids | |
| def forward( | |
| self, | |
| input_ids: torch.LongTensor = None, | |
| pixel_values: List[torch.FloatTensor] = None, | |
| coords: List[torch.FloatTensor] = None, | |
| attention_mask: Optional[torch.Tensor] = None, | |
| position_ids: Optional[torch.LongTensor] = None, | |
| past_key_values: Optional[List[torch.FloatTensor]] = None, | |
| inputs_embeds: Optional[torch.FloatTensor] = None, | |
| keyword_prompt_input_ids: torch.LongTensor = None, | |
| vision_feature_select_strategy: Optional[str] = None, | |
| vision_feature_layer: Optional[int] = None, | |
| patch_picking_strategy: Optional[str] = None, | |
| topk: Optional[int] = None, | |
| keyword_criteria: Optional[str] = None, | |
| positional_information: Optional[str] = None, | |
| labels: Optional[torch.LongTensor] = None, | |
| use_cache: Optional[bool] = None, | |
| output_attentions: Optional[bool] = None, | |
| output_hidden_states: Optional[bool] = None, | |
| return_dict: Optional[bool] = None, | |
| visualize_patches: Optional[bool] = None, | |
| visualize_topk_patches: Optional[bool] = None, | |
| print_keyword: Optional[bool] = None, | |
| print_topk_patches: Optional[bool] = None, | |
| ) -> Union[Tuple, GeckoCausalLMOutputWithPast]: | |
| """ | |
| Parameters: | |
| text_inputs: Dict | |
| Output of tokenizer for text data. A dictionary containing the following keys: | |
| - input_ids: torch.LongTensor of shape (batch_size, sequence_length) | |
| - attention_mask: torch.LongTensor of shape (batch_size, sequence_length) | |
| - token_type_ids: torch.LongTensor of shape (batch_size, sequence_length) | |
| keyword_inputs: Dict | |
| Output of tokenizer for keyword data. A dictionary containing the following keys: | |
| - input_ids: torch.LongTensor of shape (batch_size, sequence_length) | |
| - attention_mask: torch.LongTensor of shape (batch_size, sequence_length) | |
| - token_type_ids: torch.LongTensor of shape (batch_size, sequence_length) | |
| image_inputs: Dict | |
| Output of ImageProcessor for image data. A dictionary containing the following keys: | |
| - pixel_values: torch.FloatTensor of shape (num_images, num_patches, num_tokens, embed_dim) | |
| - coords: List of shape (batch_size, num_images) | |
| """ | |
| # processing image and text inputs | |
| output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions | |
| output_hidden_states = ( | |
| output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states | |
| ) | |
| return_dict = return_dict if return_dict is not None else self.config.use_return_dict | |
| vision_feature_layer = ( | |
| vision_feature_layer if vision_feature_layer is not None else self.config.vision_feature_layer | |
| ) | |
| vision_feature_select_strategy = ( | |
| vision_feature_select_strategy if vision_feature_select_strategy is not None else self.config.vision_feature_select_strategy | |
| ) | |
| patch_picking_strategy = patch_picking_strategy if patch_picking_strategy is not None else self.config.patch_picking_strategy | |
| topk = topk if topk is not None else self.config.topk | |
| keyword_criteria = keyword_criteria if keyword_criteria is not None else self.config.keyword_criteria | |
| positional_information = positional_information if positional_information is not None else self.config.positional_information | |
| visualize_patches = visualize_patches if visualize_patches is not None else self.config.visualize_patches | |
| visualize_topk_patches = visualize_topk_patches if visualize_topk_patches is not None else self.config.visualize_topk_patches | |
| print_keyword = print_keyword if print_keyword is not None else self.config.print_keyword | |
| print_topk_patches = print_topk_patches if print_topk_patches is not None else self.config.print_topk_patches | |
| if inputs_embeds is None: | |
| # 1. Extra the input embeddings | |
| inputs_embeds = self.get_input_embeddings()(input_ids) | |
| # 2. Merge text and images | |
| if pixel_values is not None and input_ids.shape[1] != 1: | |
| with torch.no_grad(): | |
| keyword_input_ids = self.language_model.generate(keyword_prompt_input_ids, return_dict_in_generate=True, max_new_tokens=1024, eos_token_id=self.eos_token_id) | |
| keyword_input_ids = keyword_input_ids.sequences[:, keyword_prompt_input_ids.shape[-1]:] | |
| keyword_text = self.tokenizer.decode(keyword_input_ids[0], skip_special_tokens=True) | |
| # print(keyword_text) | |
| generated_keywords, generated_descriptions = self.generate_keywords(keyword_text, criteria=keyword_criteria) | |
| all_text_features = [] | |
| for descriptions in generated_descriptions: | |
| one_text_features = [] | |
| for description in descriptions: | |
| keyword_ids = self.clip_tokenizer(description, return_tensors='pt') | |
| keyword_ids = {k: v.to(self.device) for k, v in keyword_ids.items()} | |
| text_features = self.encoder.get_text_features(**keyword_ids) | |
| one_text_features.append(text_features / text_features.norm(p=2, dim=-1, keepdim=True)) | |
| all_text_features.append(torch.cat(one_text_features, dim=0)) | |
| pixel_values = [pixel_value.to(self.vision_tower.device, dtype=self.vision_tower.dtype) for pixel_value in pixel_values] | |
| selected_image_features = [] | |
| selected_coords = [] | |
| for p, pixel_value in enumerate(pixel_values): # iterate through each image | |
| print_keyword_text = f'Keywords (criteria: {keyword_criteria}):\n' | |
| all_hidden_states = self.vision_tower(pixel_value, output_hidden_states=True).hidden_states # tuple of size (num_layers, num_patch, num_tokens, vison_embed_dim) | |
| if patch_picking_strategy == 'last_layer': | |
| hidden_states = [all_hidden_states[-1]] | |
| elif patch_picking_strategy == 'across_layers': | |
| hidden_states = deepcopy(all_hidden_states) | |
| top_patches = [0] | |
| for i, text_feature in enumerate(all_text_features): | |
| print_keyword_text += f' {i+1}: ' + "\n ".join(generated_descriptions[i]) + '\n' | |
| top_index = [] | |
| for hidden_state in hidden_states: # iterate through each layer | |
| if 'clip' in self.encoder_type: | |
| if vision_feature_select_strategy == 'cls': | |
| image_features = self.encoder.visual_projection(self.encoder.vision_model.post_layernorm(hidden_state[1:, 0, :])) # (num_patch-1, embed_dim) | |
| elif vision_feature_select_strategy == 'image_features': | |
| image_features = self.encoder.visual_projection(self.encoder.vision_model.post_layernorm(hidden_state[1:, 1:, :])) # (num_patch-1 * num_tokens, embed_dim) | |
| num_tokens = hidden_state.shape[1] - 1 | |
| elif 'siglip' in self.encoder_type: | |
| if vision_feature_select_strategy == 'cls': | |
| image_features = self.encoder.vision_model.head(self.encoder.vision_model.post_layernorm(hidden_state[1:, :, :])) # (num_patch-1, embed_dim) | |
| elif vision_feature_select_strategy == 'image_features': | |
| image_features = self.encoder.vision_model.post_layernorm(hidden_state[1:, :, :]) # (num_patch-1 * num_tokens, embed_dim) | |
| num_tokens = hidden_state.shape[1] | |
| image_features = image_features / image_features.norm(p=2, dim=-1, keepdim=True) | |
| if vision_feature_select_strategy == 'cls': | |
| similarity_score = torch.matmul(image_features, text_feature.T).mean(dim=1) # (num_patch-1) | |
| if patch_picking_strategy == 'across_layers': | |
| index = torch.topk(similarity_score, 1).indices | |
| top_index.append(index.item()+1) | |
| elif patch_picking_strategy == 'last_layer': | |
| index = torch.topk(similarity_score, math.ceil(topk / len(all_text_features))).indices + 1 # take top k patches | |
| top_index += index.tolist() | |
| elif vision_feature_select_strategy == 'image_features': | |
| image_features = image_features.flatten(0, 1) | |
| similarity_score = torch.matmul(image_features, text_feature.T).mean(dim=1) # (num_patch-1 * num_tokens) | |
| index = torch.topk(similarity_score, 100).indices # take top 100 tokens | |
| patch_index = torch.floor(index / num_tokens) # get the patch index | |
| count = torch.nn.functional.one_hot(patch_index.to(torch.int64)).sum(dim=0) # count the occurrences of each patch | |
| if patch_picking_strategy == 'across_layers': | |
| top_count = torch.topk(count, 1).indices # take top 1 | |
| top_index.append(top_count.item()+1) | |
| elif patch_picking_strategy == 'last_layer': | |
| top_count = torch.topk(count, math.ceil(topk / len(all_text_features))).indices + 1 | |
| top_index += top_count.tolist() | |
| if visualize_patches and patch_picking_strategy == 'across_layers': | |
| if 'clip' in self.encoder_type: | |
| (x, y) = (5, 5) | |
| elif 'siglip' in self.encoder_type: | |
| (x, y) = (7, 4) | |
| fig, axs = plt.subplots(y, x, figsize=(x * 2, y * 2)) | |
| fig.suptitle(f'keyword: {generated_keywords[i]}') | |
| for k, index in enumerate(top_index): | |
| axs[k // x, k % x].imshow(pixel_value[index].to(torch.float32).cpu().numpy().transpose(1, 2, 0)) | |
| axs[k // x, k % x].set_title(f'Layer {k}') | |
| axs[k // x, k % x].axis('off') | |
| plt.show() | |
| if patch_picking_strategy == 'across_layers': | |
| top_patches += torch.topk(torch.bincount(torch.tensor(top_index, dtype=torch.int64)), math.ceil(topk / len(all_text_features))).indices.to(dtype=torch.int64).tolist() | |
| elif patch_picking_strategy == 'last_layer': | |
| top_patches += top_index | |
| topk_patches = list(set(top_patches)) | |
| if visualize_topk_patches: | |
| fig, axs = plt.subplots(1, len(topk_patches), figsize=(len(topk_patches) * 2, 2)) | |
| fig.suptitle(f'top-{len(topk_patches)} patches') | |
| for m, topk_patch in enumerate(topk_patches): | |
| axs[m].imshow(pixel_value[topk_patch].to(torch.float32).cpu().numpy().transpose(1, 2, 0)) | |
| axs[m].axis('off') | |
| plt.show() | |
| if 'clip' in self.encoder_type: | |
| selected_image_features.append(all_hidden_states[vision_feature_layer][topk_patches, 1:, :]) | |
| elif 'siglip' in self.encoder_type: | |
| selected_image_features.append(all_hidden_states[vision_feature_layer][topk_patches, :, :]) | |
| selected_coords.append([coords[p][q-1] for q in topk_patches[1:]]) | |
| # if isinstance(pixel_values, list): | |
| # pixel_values = torch.cat([x for x in pixel_values if x is not None], dim=0) | |
| if print_keyword: | |
| print(print_keyword_text) | |
| multimodal_projector_features = [] | |
| for x, (selected_image_feature, selected_coord) in enumerate(zip(selected_image_features, selected_coords)): | |
| print(f'image {x+1}: {selected_coord}') | |
| if '2d' in positional_information: | |
| max_width = max(selected_coord, key= lambda x: x[0])[0] + 1 | |
| max_height = max(selected_coord, key= lambda x: x[1])[1] + 1 | |
| positional_encoding = self.positional_encoding(torch.ones((max_width, max_height, selected_image_feature.shape[1], self.positional_encoding.org_channels), dtype=self.dtype, device=self.device)) | |
| accumulate = [] | |
| for i, top_patch in enumerate(selected_image_feature): | |
| if positional_information == '2d_before' and i != 0: | |
| top_patch += positional_encoding[selected_coord[i-1][0], selected_coord[i-1][1], :, :] | |
| aligned_image_feature = self.multi_modal_projector(top_patch) | |
| if positional_information == '2d_after' and i != 0: | |
| aligned_image_feature += positional_encoding[selected_coord[i-1][0], selected_coord[i-1][1], :, :] | |
| accumulate.append(aligned_image_feature) | |
| if i == 0: | |
| accumulate.append(self.get_input_embeddings()(self.tokenizer(', ', padding=False, truncation=False, max_length=None, return_tensors='pt')['input_ids'].to(device=self.device)[0, 1:])) | |
| continue | |
| if positional_information == 'explicit': | |
| accumulate.append(self.get_input_embeddings()(self.tokenizer(f' at {str(selected_coord[i-1])}, ', padding=False, truncation=False, max_length=None, return_tensors='pt')['input_ids'].to(device=self.device)[0, 1:])) | |
| else: | |
| accumulate.append(self.get_input_embeddings()(self.tokenizer(f', ', padding=False, truncation=False, max_length=None, return_tensors='pt')['input_ids'].to(device=self.device)[0, 1:])) | |
| multimodal_projector_features.append(torch.cat(accumulate, dim=0)) # dimension of (num_selected_patch * num_tokens-1 + num_selected_patch * sep_len - 1) -> (num_selected_patch * num_tokens - 1) as sep_len = 1 | |
| assert len(selected_image_features) == len(multimodal_projector_features), f"The number of selected image features and image features do not match. Dimension of selected image features: {len(selected_image_features)} and dimension of image features: {len(multimodal_projector_features)}." | |
| # print(multimodal_projector_features[0].shape) | |
| inputs_embeds, attention_mask, labels, position_ids = self._merge_input_ids_with_image_features( | |
| multimodal_projector_features, inputs_embeds, input_ids, attention_mask, labels | |
| ) | |
| if labels is None: | |
| labels = torch.full_like(attention_mask, self.config.ignore_index).to(torch.long) | |
| else: | |
| # In case input_ids.shape[1] == 1 & pixel_values==None & past_key_values != None, we are in the case of | |
| # generation with cache | |
| if past_key_values is not None and pixel_values is not None and input_ids.shape[1] == 1: | |
| # Retrieve the first layer to inspect the logits and mask out the hidden states | |
| # that are set to 0 | |
| first_layer_past_key_value = past_key_values[0][0][:, :, :, 0] | |
| # Sum all dimensions of head_dim (-2) to avoid random errors such as: https://github.com/huggingface/transformers/pull/28032#issuecomment-1863691941 | |
| batch_index, non_attended_tokens = torch.where(first_layer_past_key_value.float().sum(-2) == 0) | |
| # Get the target length | |
| target_seqlen = first_layer_past_key_value.shape[-1] + 1 | |
| extended_attention_mask = torch.ones( | |
| (attention_mask.shape[0], target_seqlen - attention_mask.shape[1]), | |
| dtype=attention_mask.dtype, | |
| device=attention_mask.device, | |
| ) | |
| # Filter out only the tokens that can be un-attended, this can happen | |
| # if one uses Llava + Fused modules where the cache on the | |
| # first iteration is already big enough, or if one passes custom cache | |
| valid_indices = non_attended_tokens < extended_attention_mask.size(-1) | |
| new_batch_index = batch_index[valid_indices] | |
| new_non_attended_tokens = non_attended_tokens[valid_indices] | |
| # Zero-out the places where we don't need to attend | |
| extended_attention_mask[new_batch_index, new_non_attended_tokens] = 0 | |
| attention_mask = torch.cat((attention_mask, extended_attention_mask), dim=1) | |
| position_ids = torch.sum(attention_mask, dim=1).unsqueeze(-1) - 1 | |
| outputs = self.language_model( | |
| attention_mask=attention_mask, | |
| position_ids=position_ids, | |
| past_key_values=past_key_values, | |
| inputs_embeds=inputs_embeds, | |
| use_cache=use_cache, | |
| output_attentions=output_attentions, | |
| output_hidden_states=output_hidden_states, | |
| return_dict=return_dict, | |
| ) | |
| logits = outputs[0] | |
| batch_shift = 100 | |
| loss = None | |
| if labels is not None: | |
| # Shift so that tokens < n predict n | |
| if attention_mask is not None: | |
| shift_attention_mask = attention_mask[..., 1:] | |
| logits_shape = logits.shape | |
| labels_shape = labels.shape | |
| shift_attention_mask_shape = shift_attention_mask.shape | |
| for i in range(0, shift_attention_mask.shape[1], batch_shift): | |
| shift_logits = logits[..., i:min(i+batch_shift, logits_shape[1]-1), :][shift_attention_mask[..., i:min(i+batch_shift, shift_attention_mask_shape[1])].to(logits.device) != 0].contiguous() | |
| shift_labels = labels[..., i+1:min(i+batch_shift+1, labels_shape[1])][shift_attention_mask[..., i:min(i+batch_shift, shift_attention_mask_shape[1])].to(labels.device) != 0].contiguous() | |
| else: | |
| shift_logits = logits[..., :-1, :].contiguous() | |
| shift_labels = labels[..., 1:].contiguous() | |
| # Flatten the tokens | |
| loss_fct = nn.CrossEntropyLoss() | |
| loss = loss_fct( | |
| shift_logits.view(-1, shift_logits.size(-1)), shift_labels.view(-1).to(shift_logits.device) | |
| ) | |
| if not return_dict: | |
| output = (logits,) + outputs[1:] | |
| return (loss,) + output if loss is not None else output | |
| return GeckoCausalLMOutputWithPast( | |
| loss=loss, | |
| logits=logits, | |
| past_key_values=outputs.past_key_values, | |
| hidden_states=outputs.hidden_states, | |
| attentions=outputs.attentions, | |
| ) | |
| def prepare_inputs_for_generation( | |
| self, input_ids, past_key_values=None, inputs_embeds=None, pixel_values=None, attention_mask=None, keyword_prompt_input_ids=None, coords=None, **kwargs | |
| ): | |
| if past_key_values is not None: | |
| if isinstance(past_key_values, Cache): | |
| cache_length = past_key_values.get_seq_length() | |
| past_length = past_key_values.seen_tokens | |
| else: | |
| cache_length = past_length = past_key_values[0][0].shape[2] | |
| # Keep only the unprocessed tokens: | |
| # 1 - If the length of the attention_mask exceeds the length of input_ids, then we are in a setting where | |
| # some of the inputs are exclusively passed as part of the cache (e.g. when passing input_embeds as | |
| # input) | |
| if attention_mask is not None and attention_mask.shape[1] > input_ids.shape[1]: | |
| input_ids = input_ids[:, -(attention_mask.shape[1] - past_length) :] | |
| # 2 - If the past_length is smaller than input_ids', then input_ids holds all input tokens. We can discard | |
| # input_ids based on the past_length. | |
| elif past_length < input_ids.shape[1]: | |
| input_ids = input_ids[:, past_length:] | |
| # 3 - Otherwise (past_length >= input_ids.shape[1]), let's assume input_ids only has unprocessed tokens. | |
| elif self.config.image_token_index in input_ids: | |
| input_ids = input_ids[:, input_ids.shape[1] - 1 :] | |
| # If the cache has seen more tokens than it can hold, then the cache has a size limit. Let's discard the | |
| # older attention values, as their corresponding values are not part of the input. | |
| if cache_length < past_length and attention_mask is not None: | |
| attention_mask = attention_mask[:, -(cache_length + input_ids.shape[1]) :] | |
| position_ids = kwargs.get("position_ids", None) | |
| if attention_mask is not None and position_ids is None: | |
| # create position_ids on the fly for batch generation | |
| position_ids = attention_mask.long().cumsum(-1) - 1 | |
| position_ids.masked_fill_(attention_mask == 0, 1) | |
| if past_key_values: | |
| position_ids = position_ids[:, -input_ids.shape[1] :] | |
| # if `inputs_embeds` are passed, we only want to use them in the 1st generation step | |
| if inputs_embeds is not None and past_key_values is None: | |
| model_inputs = {"inputs_embeds": inputs_embeds} | |
| else: | |
| model_inputs = {"input_ids": input_ids} | |
| model_inputs.update( | |
| { | |
| "position_ids": position_ids, | |
| "past_key_values": past_key_values, | |
| "use_cache": kwargs.get("use_cache"), | |
| "attention_mask": attention_mask, | |
| "pixel_values": pixel_values, | |
| "keyword_prompt_input_ids": keyword_prompt_input_ids, | |
| "coords": coords, | |
| "topk": kwargs.get("topk"), | |
| "vision_feature_select_strategy": kwargs.get("vision_feature_select_strategy"), | |
| "vision_feature_layer": kwargs.get("vision_feature_layer"), | |
| "patch_picking_strategy": kwargs.get("patch_picking_strategy"), | |
| "keyword_criteria": kwargs.get("keyword_criteria"), | |
| "positional_information": kwargs.get("positional_information"), | |
| "visualize_patches": kwargs.get("visualize_patches"), | |
| "visualize_topk_patches": kwargs.get("visualize_topk_patches"), | |
| "print_keyword": kwargs.get("print_keyword"), | |
| "print_topk_patches": kwargs.get("print_topk_patches"), | |
| } | |
| ) | |
| return model_inputs | |
| def _reorder_cache(self, *args, **kwargs): | |
| return self.language_model._reorder_cache(*args, **kwargs) | |