| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | from collections.abc import Callable |
| |
|
| | import torch |
| | from torch import nn |
| |
|
| | from transformers.utils import add_start_docstrings |
| |
|
| | from ...activations import ACT2FN |
| | from ...modeling_layers import GradientCheckpointingLayer |
| | from ...modeling_outputs import BaseModelOutput, BaseModelOutputWithPooling |
| | from ...modeling_utils import ALL_ATTENTION_FUNCTIONS, PreTrainedModel |
| | from ...processing_utils import Unpack |
| | from ...utils import TransformersKwargs, auto_docstring, torch_int |
| | from ...utils.generic import check_model_inputs |
| | from .configuration_multimodal2 import Multimodal2Config, Multimodal2TextConfig, Multimodal2VisionConfig |
| |
|
| |
|
| | def eager_attention_forward( |
| | module: nn.Module, |
| | query: torch.Tensor, |
| | key: torch.Tensor, |
| | value: torch.Tensor, |
| | attention_mask: torch.Tensor | None, |
| | scaling: float, |
| | dropout: float = 0.0, |
| | **kwargs: Unpack[TransformersKwargs], |
| | ): |
| | attn_weights = torch.matmul(query, key.transpose(-1, -2)) * scaling |
| | if attention_mask is not None: |
| | attn_weights = attn_weights + attention_mask |
| | attn_weights = nn.functional.softmax(attn_weights, dim=-1, dtype=torch.float32).to(query.dtype) |
| | attn_weights = nn.functional.dropout(attn_weights, p=dropout, training=module.training) |
| |
|
| | attn_output = torch.matmul(attn_weights, value) |
| | attn_output = attn_output.transpose(1, 2).contiguous() |
| | return attn_output, attn_weights |
| |
|
| |
|
| | class Multimodal2VisionAttention(nn.Module): |
| | """Multi-headed attention from 'Attention Is All You Need' paper""" |
| |
|
| | def __init__(self, config: Multimodal2VisionConfig | Multimodal2TextConfig): |
| | super().__init__() |
| | self.config = config |
| | self.embed_dim = config.hidden_size |
| | self.num_heads = config.num_attention_heads |
| | self.head_dim = self.embed_dim // self.num_heads |
| | if self.head_dim * self.num_heads != self.embed_dim: |
| | raise ValueError( |
| | f"embed_dim must be divisible by num_heads (got `embed_dim`: {self.embed_dim} and `num_heads`:" |
| | f" {self.num_heads})." |
| | ) |
| | self.scale = self.head_dim**-0.5 |
| | self.dropout = config.attention_dropout |
| | self.is_causal = False |
| |
|
| | self.k_proj = nn.Linear(self.embed_dim, self.embed_dim) |
| | self.v_proj = nn.Linear(self.embed_dim, self.embed_dim) |
| | self.q_proj = nn.Linear(self.embed_dim, self.embed_dim) |
| | self.out_proj = nn.Linear(self.embed_dim, self.embed_dim) |
| |
|
| | def forward( |
| | self, |
| | hidden_states: torch.Tensor, |
| | attention_mask: torch.Tensor | None = None, |
| | **kwargs: Unpack[TransformersKwargs], |
| | ) -> tuple[torch.Tensor, torch.Tensor | None]: |
| | """Input shape: Batch x Time x Channel""" |
| |
|
| | batch_size, seq_length, embed_dim = hidden_states.shape |
| |
|
| | queries = self.q_proj(hidden_states) |
| | keys = self.k_proj(hidden_states) |
| | values = self.v_proj(hidden_states) |
| |
|
| | queries = queries.view(batch_size, seq_length, -1, self.head_dim).transpose(1, 2) |
| | keys = keys.view(batch_size, seq_length, -1, self.head_dim).transpose(1, 2) |
| | values = values.view(batch_size, seq_length, -1, self.head_dim).transpose(1, 2) |
| |
|
| | attention_interface: Callable = eager_attention_forward |
| | if self.config._attn_implementation != "eager": |
| | attention_interface = ALL_ATTENTION_FUNCTIONS[self.config._attn_implementation] |
| |
|
| | attn_output, attn_weights = attention_interface( |
| | self, |
| | queries, |
| | keys, |
| | values, |
| | attention_mask, |
| | scaling=self.scale, |
| | dropout=0.0 if not self.training else self.dropout, |
| | **kwargs, |
| | ) |
| |
|
| | attn_output = attn_output.reshape(batch_size, seq_length, -1).contiguous() |
| | attn_output = self.out_proj(attn_output) |
| |
|
| | return attn_output, attn_weights |
| |
|
| |
|
| | class Multimodal2VisionMLP(nn.Module): |
| | def __init__(self, config): |
| | super().__init__() |
| | self.config = config |
| | self.activation_fn = ACT2FN[config.hidden_act] |
| | self.fc1 = nn.Linear(config.hidden_size, config.intermediate_size) |
| | self.fc2 = nn.Linear(config.intermediate_size, config.hidden_size) |
| |
|
| | def forward(self, hidden_states: torch.Tensor) -> torch.Tensor: |
| | hidden_states = self.fc1(hidden_states) |
| | hidden_states = self.activation_fn(hidden_states) |
| | hidden_states = self.fc2(hidden_states) |
| | return hidden_states |
| |
|
| |
|
| | class Multimodal2VisionEncoderLayer(GradientCheckpointingLayer): |
| | def __init__(self, config): |
| | super().__init__() |
| | self.embed_dim = config.hidden_size |
| | self.self_attn = Multimodal2VisionAttention(config) |
| | self.layer_norm1 = nn.LayerNorm(self.embed_dim, eps=config.layer_norm_eps) |
| | self.mlp = Multimodal2VisionMLP(config) |
| | self.layer_norm2 = nn.LayerNorm(self.embed_dim, eps=config.layer_norm_eps) |
| |
|
| | def forward( |
| | self, |
| | hidden_states: torch.Tensor, |
| | attention_mask: torch.Tensor, |
| | **kwargs: Unpack[TransformersKwargs], |
| | ) -> torch.FloatTensor: |
| | residual = hidden_states |
| |
|
| | hidden_states = self.layer_norm1(hidden_states) |
| | hidden_states, _ = self.self_attn( |
| | hidden_states=hidden_states, |
| | attention_mask=attention_mask, |
| | **kwargs, |
| | ) |
| | hidden_states = residual + hidden_states |
| |
|
| | residual = hidden_states |
| | hidden_states = self.layer_norm2(hidden_states) |
| | hidden_states = self.mlp(hidden_states) |
| | hidden_states = residual + hidden_states |
| |
|
| | return hidden_states |
| |
|
| |
|
| | class Multimodal2VisionEncoder(nn.Module): |
| | """ |
| | Transformer encoder consisting of `config.num_hidden_layers` self attention layers. Each layer is a |
| | [`Multimodal2VisionEncoderLayer`]. |
| | |
| | Args: |
| | config: Multimodal2VisionConfig |
| | """ |
| |
|
| | def __init__(self, config): |
| | super().__init__() |
| | self.config = config |
| | self.layers = nn.ModuleList([Multimodal2VisionEncoderLayer(config) for _ in range(config.num_hidden_layers)]) |
| | self.gradient_checkpointing = False |
| |
|
| | def forward( |
| | self, |
| | inputs_embeds, |
| | attention_mask: torch.Tensor | None = None, |
| | **kwargs: Unpack[TransformersKwargs], |
| | ) -> BaseModelOutput: |
| | r""" |
| | Args: |
| | inputs_embeds (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`): |
| | Optionally, instead of passing `input_ids` you can choose to directly pass an embedded representation. |
| | This is useful if you want more control over how to convert `input_ids` indices into associated vectors |
| | than the model's internal embedding lookup matrix. |
| | attention_mask (`torch.Tensor` of shape `(batch_size, sequence_length)`, *optional*): |
| | Mask to avoid performing attention on padding token indices. Mask values selected in `[0, 1]`: |
| | |
| | - 1 for tokens that are **not masked**, |
| | - 0 for tokens that are **masked**. |
| | |
| | [What are attention masks?](../glossary#attention-mask) |
| | """ |
| | hidden_states = inputs_embeds |
| | for encoder_layer in self.layers: |
| | hidden_states = encoder_layer( |
| | hidden_states, |
| | attention_mask, |
| | **kwargs, |
| | ) |
| |
|
| | return BaseModelOutput( |
| | last_hidden_state=hidden_states, |
| | ) |
| |
|
| |
|
| | class Multimodal2VisionEmbeddings(nn.Module): |
| | def __init__(self, config: Multimodal2VisionConfig): |
| | super().__init__() |
| | self.config = config |
| | self.embed_dim = config.hidden_size |
| | self.image_size = config.image_size |
| | self.patch_size = config.patch_size |
| |
|
| | self.class_embedding = nn.Parameter(torch.randn(self.embed_dim)) |
| |
|
| | self.patch_embedding = nn.Conv2d( |
| | in_channels=config.num_channels, |
| | out_channels=self.embed_dim, |
| | kernel_size=self.patch_size, |
| | stride=self.patch_size, |
| | bias=False, |
| | ) |
| |
|
| | self.num_patches = (self.image_size // self.patch_size) ** 2 |
| | self.num_positions = self.num_patches + 1 |
| | self.position_embedding = nn.Embedding(self.num_positions, self.embed_dim) |
| | self.register_buffer("position_ids", torch.arange(self.num_positions).expand((1, -1)), persistent=False) |
| |
|
| | def interpolate_pos_encoding(self, embeddings: torch.Tensor, height: int, width: int) -> torch.Tensor: |
| | """ |
| | This method allows to interpolate the pre-trained position encodings, to be able to use the model on higher resolution |
| | images. This method is also adapted to support torch.jit tracing. |
| | |
| | Adapted from: |
| | - https://github.com/facebookresearch/dino/blob/de9ee3df6cf39fac952ab558447af1fa1365362a/vision_transformer.py#L174-L194, and |
| | - https://github.com/facebookresearch/dinov2/blob/e1277af2ba9496fbadf7aec6eba56e8d882d1e35/dinov2/models/vision_transformer.py#L179-L211 |
| | """ |
| |
|
| | num_patches = embeddings.shape[1] - 1 |
| | position_embedding = self.position_embedding.weight.unsqueeze(0) |
| | num_positions = position_embedding.shape[1] - 1 |
| |
|
| | |
| | if not torch.jit.is_tracing() and num_patches == num_positions and height == width: |
| | return self.position_embedding(self.position_ids) |
| |
|
| | class_pos_embed = position_embedding[:, :1] |
| | patch_pos_embed = position_embedding[:, 1:] |
| |
|
| | dim = embeddings.shape[-1] |
| |
|
| | new_height = height // self.patch_size |
| | new_width = width // self.patch_size |
| |
|
| | sqrt_num_positions = torch_int(num_positions**0.5) |
| | patch_pos_embed = patch_pos_embed.reshape(1, sqrt_num_positions, sqrt_num_positions, dim) |
| | patch_pos_embed = patch_pos_embed.permute(0, 3, 1, 2) |
| |
|
| | patch_pos_embed = nn.functional.interpolate( |
| | patch_pos_embed, |
| | size=(new_height, new_width), |
| | mode="bicubic", |
| | align_corners=False, |
| | ) |
| |
|
| | patch_pos_embed = patch_pos_embed.permute(0, 2, 3, 1).view(1, -1, dim) |
| |
|
| | return torch.cat((class_pos_embed, patch_pos_embed), dim=1) |
| |
|
| | def forward(self, pixel_values: torch.FloatTensor, interpolate_pos_encoding=False) -> torch.Tensor: |
| | batch_size, _, height, width = pixel_values.shape |
| | if not interpolate_pos_encoding and (height != self.image_size or width != self.image_size): |
| | raise ValueError( |
| | f"Input image size ({height}*{width}) doesn't match model ({self.image_size}*{self.image_size})." |
| | ) |
| | target_dtype = self.patch_embedding.weight.dtype |
| | patch_embeds = self.patch_embedding(pixel_values.to(dtype=target_dtype)) |
| | patch_embeds = patch_embeds.flatten(2).transpose(1, 2) |
| |
|
| | class_embeds = self.class_embedding.expand(batch_size, 1, -1) |
| | embeddings = torch.cat([class_embeds, patch_embeds], dim=1) |
| | if interpolate_pos_encoding: |
| | embeddings = embeddings + self.interpolate_pos_encoding(embeddings, height, width) |
| | else: |
| | embeddings = embeddings + self.position_embedding(self.position_ids) |
| | return embeddings |
| |
|
| |
|
| | class Multimodal2VisionTransformer(nn.Module): |
| | def __init__(self, config): |
| | super().__init__() |
| | self.config = config |
| | embed_dim = config.hidden_size |
| |
|
| | self.embeddings = Multimodal2VisionEmbeddings(config) |
| | self.pre_layrnorm = nn.LayerNorm(embed_dim, eps=config.layer_norm_eps) |
| | self.encoder = Multimodal2VisionEncoder(config) |
| | self.post_layernorm = nn.LayerNorm(embed_dim, eps=config.layer_norm_eps) |
| |
|
| | @auto_docstring |
| | def forward( |
| | self, |
| | pixel_values: torch.FloatTensor | None = None, |
| | interpolate_pos_encoding: bool | None = False, |
| | **kwargs: Unpack[TransformersKwargs], |
| | ) -> BaseModelOutputWithPooling: |
| | if pixel_values is None: |
| | raise ValueError("You have to specify pixel_values") |
| |
|
| | hidden_states = self.embeddings(pixel_values, interpolate_pos_encoding=interpolate_pos_encoding) |
| | hidden_states = self.pre_layrnorm(hidden_states) |
| |
|
| | encoder_outputs: BaseModelOutput = self.encoder( |
| | inputs_embeds=hidden_states, |
| | **kwargs, |
| | ) |
| |
|
| | last_hidden_state = encoder_outputs.last_hidden_state |
| | pooled_output = last_hidden_state[:, 0, :] |
| | pooled_output = self.post_layernorm(pooled_output) |
| |
|
| | return BaseModelOutputWithPooling( |
| | last_hidden_state=last_hidden_state, |
| | pooler_output=pooled_output, |
| | ) |
| |
|
| |
|
| | @auto_docstring |
| | class Multimodal2VisionPreTrainedModel(PreTrainedModel): |
| | config: Multimodal2Config |
| | base_model_prefix = "multimodal2_vision" |
| | input_modalities = ("image", "text") |
| | supports_gradient_checkpointing = True |
| | _supports_sdpa = True |
| | _supports_flash_attn = True |
| | _supports_flex_attn = True |
| | _supports_attention_backend = True |
| | _can_record_outputs = { |
| | "hidden_states": Multimodal2VisionEncoderLayer, |
| | "attentions": Multimodal2VisionAttention, |
| | } |
| |
|
| | @torch.no_grad() |
| | def _init_weights(self, module): |
| | """Initialize the weights""" |
| | if isinstance(module, Multimodal2VisionMLP): |
| | pass |
| |
|
| |
|
| | MULTIMODAL2_VISION_START_DOCSTRING = "doc" |
| |
|
| |
|
| | @add_start_docstrings("New doc", MULTIMODAL2_VISION_START_DOCSTRING) |
| | class Multimodal2VisionModel(Multimodal2VisionPreTrainedModel): |
| | config: Multimodal2VisionConfig |
| | main_input_name = "pixel_values" |
| | input_modalities = ("image",) |
| | _no_split_modules = ["Multimodal2VisionEncoderLayer"] |
| |
|
| | def __init__(self, config: Multimodal2VisionConfig): |
| | super().__init__(config) |
| | self.vision_model = Multimodal2VisionTransformer(config) |
| | |
| | self.post_init() |
| |
|
| | def get_input_embeddings(self) -> nn.Module: |
| | return self.vision_model.embeddings.patch_embedding |
| |
|
| | @check_model_inputs(tie_last_hidden_states=False) |
| | @auto_docstring |
| | def forward( |
| | self, |
| | pixel_values: torch.FloatTensor | None = None, |
| | interpolate_pos_encoding: bool = False, |
| | **kwargs: Unpack[TransformersKwargs], |
| | ) -> BaseModelOutputWithPooling: |
| | r""" |
| | Example: |
| | |
| | ```python |
| | >>> from PIL import Image |
| | >>> import requests |
| | >>> from transformers import AutoProcessor, Multimodal2VisionModel |
| | |
| | >>> model = Multimodal2VisionModel.from_pretrained("openai/multimodal2-vit-base-patch32") |
| | >>> processor = AutoProcessor.from_pretrained("openai/multimodal2-vit-base-patch32") |
| | |
| | >>> url = "http://images.cocodataset.org/val2017/000000039769.jpg" |
| | >>> image = Image.open(requests.get(url, stream=True).raw) |
| | |
| | >>> inputs = processor(images=image, return_tensors="pt") |
| | |
| | >>> outputs = model(**inputs) |
| | >>> last_hidden_state = outputs.last_hidden_state |
| | >>> pooled_output = outputs.pooler_output # pooled CLS states |
| | ```""" |
| |
|
| | return self.vision_model( |
| | pixel_values=pixel_values, |
| | interpolate_pos_encoding=interpolate_pos_encoding, |
| | **kwargs, |
| | ) |
| |
|