# Adopted from https://github.com/DAMO-NLP-SG/VideoLLaMA3/blob/main/videollama3/model/videollama3_encoder/modeling_videollama3_encoder.py # Adopted from https://github.com/huggingface/transformers/blob/main/src/transformers/models/qwen2_vl/modeling_qwen2_vl.py. # Below is the original copyright: # Copyright 2024 The Qwen team, Alibaba Group and the HuggingFace Inc. team. All rights reserved. # # This code is based on EleutherAI's GPT-NeoX library and the GPT-NeoX # and OPT implementations in this library. It has been modified from its # original forms to accommodate minor architectural differences compared # to GPT-NeoX and OPT used by the Meta AI team that trained the model. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """PyTorch ViSCoP vision encoder model. This file contains the implementation of visual probes and interaction modules""" import importlib.util import os.path as osp import math import warnings from einops import rearrange import torch import torch.nn as nn import torch.nn.functional as F import torch.utils.checkpoint from torch.nn.init import _calculate_fan_in_and_fan_out from transformers.activations import ACT2FN from transformers.modeling_utils import PreTrainedModel from transformers.utils import is_flash_attn_2_available if is_flash_attn_2_available(): from flash_attn import flash_attn_varlen_func else: flash_attn_varlen_func = None from .configuration_viscop_vision_encoder import ViSCoP_VisionEncoderConfig # try: # from .configuration_viscop_encoder import ViSCoP_VisionEncoderConfig # except ImportError: # spec = importlib.util.spec_from_file_location( # "configuration_videollama3_encoder", # osp.join(osp.dirname(__file__), "configuration_videollama3_encoder.py"), # ) # configuration_videollama3_encoder = importlib.util.module_from_spec(spec) # spec.loader.exec_module(configuration_videollama3_encoder) # Videollama3VisionEncoderConfig = getattr( # configuration_videollama3_encoder, # "Videollama3VisionEncoderConfig", # ) def _trunc_normal_(tensor, mean, std, a, b): # Cut & paste from PyTorch official master until it's in a few official releases - RW # Method based on https://people.sc.fsu.edu/~jburkardt/presentations/truncated_normal.pdf def norm_cdf(x): # Computes standard normal cumulative distribution function return (1.0 + math.erf(x / math.sqrt(2.0))) / 2.0 if (mean < a - 2 * std) or (mean > b + 2 * std): warnings.warn( "mean is more than 2 std from [a, b] in nn.init.trunc_normal_. " "The distribution of values may be incorrect.", stacklevel=2, ) # Values are generated by using a truncated uniform distribution and # then using the inverse CDF for the normal distribution. # Get upper and lower cdf values l = norm_cdf((a - mean) / std) u = norm_cdf((b - mean) / std) # Uniformly fill tensor with values from [l, u], then translate to # [2l-1, 2u-1]. tensor.uniform_(2 * l - 1, 2 * u - 1) # Use inverse cdf transform for normal distribution to get truncated # standard normal tensor.erfinv_() # Transform to proper mean, std tensor.mul_(std * math.sqrt(2.0)) tensor.add_(mean) # Clamp to ensure it's in the proper range tensor.clamp_(min=a, max=b) def trunc_normal_tf_( tensor: torch.Tensor, mean: float = 0.0, std: float = 1.0, a: float = -2.0, b: float = 2.0 ) -> torch.Tensor: """Fills the input Tensor with values drawn from a truncated normal distribution. The values are effectively drawn from the normal distribution :math:`\\mathcal{N}(\text{mean}, \text{std}^2)` with values outside :math:`[a, b]` redrawn until they are within the bounds. The method used for generating the random values works best when :math:`a \\leq \text{mean} \\leq b`. NOTE: this 'tf' variant behaves closer to Tensorflow / JAX impl where the bounds [a, b] are applied when sampling the normal distribution with mean=0, std=1.0 and the result is subsequently scaled and shifted by the mean and std args. Args: tensor: an n-dimensional `torch.Tensor` mean: the mean of the normal distribution std: the standard deviation of the normal distribution a: the minimum cutoff value b: the maximum cutoff value """ with torch.no_grad(): _trunc_normal_(tensor, 0, 1.0, a, b) tensor.mul_(std).add_(mean) def variance_scaling_(tensor, scale=1.0, mode="fan_in", distribution="normal"): fan_in, fan_out = _calculate_fan_in_and_fan_out(tensor) if mode == "fan_in": denom = fan_in elif mode == "fan_out": denom = fan_out elif mode == "fan_avg": denom = (fan_in + fan_out) / 2 variance = scale / denom if distribution == "truncated_normal": # constant is stddev of standard normal truncated to (-2, 2) trunc_normal_tf_(tensor, std=math.sqrt(variance) / 0.87962566103423978) elif distribution == "normal": with torch.no_grad(): tensor.normal_(std=math.sqrt(variance)) elif distribution == "uniform": bound = math.sqrt(3 * variance) with torch.no_grad(): tensor.uniform_(-bound, bound) else: raise ValueError(f"invalid distribution {distribution}") def lecun_normal_(tensor): variance_scaling_(tensor, mode="fan_in", distribution="truncated_normal") def default_flax_embed_init(tensor): variance_scaling_(tensor, mode="fan_in", distribution="normal") # Copied from transformers.models.llama.modeling_llama.rotate_half def rotate_half(x): """Rotates half the hidden dims of the input.""" x1 = x[..., : x.shape[-1] // 2] x2 = x[..., x.shape[-1] // 2 :] return torch.cat((-x2, x1), dim=-1) def apply_rotary_pos_emb_vision(tensor: torch.Tensor, freqs: torch.Tensor) -> torch.Tensor: orig_dtype = tensor.dtype tensor = tensor.float() cos = freqs.cos() sin = freqs.sin() cos = cos.unsqueeze(1).repeat(1, 1, 2).unsqueeze(0).float() sin = sin.unsqueeze(1).repeat(1, 1, 2).unsqueeze(0).float() output = (tensor * cos) + (rotate_half(tensor) * sin) output = output.to(orig_dtype) return output class VisionRotaryEmbedding(nn.Module): def __init__(self, dim: int, theta: float = 10000.0) -> None: super().__init__() inv_freq = 1.0 / (theta ** (torch.arange(0, dim, 2, dtype=torch.float) / dim)) self.register_buffer("inv_freq", inv_freq, persistent=False) def forward(self, seqlen: int) -> torch.Tensor: seq = torch.arange(seqlen, device=self.inv_freq.device, dtype=self.inv_freq.dtype) freqs = torch.outer(seq, self.inv_freq) return freqs class VisionEmbeddings(nn.Module): def __init__(self, config: ViSCoP_VisionEncoderConfig): super().__init__() self.config = config self.embed_dim = config.hidden_size self.patch_size = config.patch_size self.patch_embedding = nn.Conv2d( in_channels=config.num_channels, out_channels=self.embed_dim, kernel_size=self.patch_size, stride=self.patch_size, padding="valid", ) def forward(self, hidden_states: torch.Tensor) -> torch.Tensor: hidden_states = hidden_states.view( -1, self.config.num_channels, self.patch_size, self.patch_size ) patch_embeds = self.patch_embedding(hidden_states) # shape = [*, width, grid, grid] # embeddings = patch_embeds.flatten(2).transpose(1, 2) embeddings = patch_embeds.view(-1, self.embed_dim) return embeddings class VisionAttention(nn.Module): """Multi-headed attention from 'Attention Is All You Need' paper""" # Copied from transformers.models.clip.modeling_clip.CLIPAttention.__init__ def __init__(self, config): super().__init__() self.config = config self.embed_dim = config.hidden_size self.num_heads = config.num_attention_heads self.head_dim = self.embed_dim // self.num_heads if self.head_dim * self.num_heads != self.embed_dim: raise ValueError( f"embed_dim must be divisible by num_heads (got `embed_dim`: {self.embed_dim} and `num_heads`:" f" {self.num_heads})." ) self.scale = self.head_dim**-0.5 self.dropout = config.attention_dropout self.k_proj = nn.Linear(self.embed_dim, self.embed_dim) self.v_proj = nn.Linear(self.embed_dim, self.embed_dim) self.q_proj = nn.Linear(self.embed_dim, self.embed_dim) self.out_proj = nn.Linear(self.embed_dim, self.embed_dim) def forward( self, hidden_states: torch.Tensor, cu_seqlens: torch.Tensor, rotary_pos_emb: torch.Tensor = None, ) -> torch.Tensor: """Input shape: Time x Channel""" q_len, _ = hidden_states.size() query_states = self.q_proj(hidden_states) key_states = self.k_proj(hidden_states) value_states = self.v_proj(hidden_states) query_states = query_states.view(q_len, self.num_heads, self.head_dim) key_states = key_states.view(q_len, self.num_heads, self.head_dim) value_states = value_states.view(q_len, self.num_heads, self.head_dim) query_states = apply_rotary_pos_emb_vision(query_states.unsqueeze(0), rotary_pos_emb).squeeze(0) key_states = apply_rotary_pos_emb_vision(key_states.unsqueeze(0), rotary_pos_emb).squeeze(0) attention_mask = torch.zeros([1, q_len, q_len], device=query_states.device, dtype=torch.bool) for i in range(1, len(cu_seqlens)): attention_mask[..., cu_seqlens[i - 1] : cu_seqlens[i], cu_seqlens[i - 1] : cu_seqlens[i]] = True query_states = query_states.transpose(0, 1) key_states = key_states.transpose(0, 1) value_states = value_states.transpose(0, 1) attn_weights = torch.matmul(query_states, key_states.transpose(1, 2)) / math.sqrt(self.head_dim) attn_weights = attn_weights + attention_mask # upcast attention to fp32 attn_weights = nn.functional.softmax(attn_weights, dim=-1, dtype=torch.float32).to(query_states.dtype) attn_weights = nn.functional.dropout(attn_weights, p=self.dropout, training=self.training) attn_output = torch.matmul(attn_weights, value_states) attn_output = attn_output.transpose(0, 1) attn_output = attn_output.reshape(q_len, -1) attn_output = self.out_proj(attn_output) return attn_output class VisionFlashAttention2(VisionAttention): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Adapted from transformers.models.llama.modeling_llama.LlamaFlashAttention2.forward def forward( self, hidden_states: torch.Tensor, cu_seqlens: torch.Tensor, rotary_pos_emb: torch.Tensor = None, ) -> torch.Tensor: q_len, _ = hidden_states.size() query_states = self.q_proj(hidden_states) key_states = self.k_proj(hidden_states) value_states = self.v_proj(hidden_states) # Flash attention requires the input to have the shape # batch_size x seq_length x head_dim x hidden_dim # therefore we just need to keep the original shape query_states = query_states.view(q_len, self.num_heads, self.head_dim) key_states = key_states.view(q_len, self.num_heads, self.head_dim) value_states = value_states.view(q_len, self.num_heads, self.head_dim) query_states = apply_rotary_pos_emb_vision(query_states.unsqueeze(0), rotary_pos_emb).squeeze(0) key_states = apply_rotary_pos_emb_vision(key_states.unsqueeze(0), rotary_pos_emb).squeeze(0) max_seqlen = (cu_seqlens[1:] - cu_seqlens[:-1]).max().item() attn_output = flash_attn_varlen_func(query_states, key_states, value_states, cu_seqlens, cu_seqlens, max_seqlen, max_seqlen).reshape( q_len, -1 ) attn_output = self.out_proj(attn_output) return attn_output class InteractionModule_CrossAttention_FA2(VisionAttention): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Adapted from transformers.models.llama.modeling_llama.LlamaFlashAttention2.forward def forward( self, visual_probes: torch.Tensor, hidden_states: torch.Tensor, cu_seqlensq: torch.Tensor, cu_seqlenskv: torch.Tensor, rotary_pos_emb: torch.Tensor = None, ) -> torch.Tensor: q_len, _ = visual_probes.size() kv_len, _ = hidden_states.size() query_states = self.q_proj(visual_probes) key_states = self.k_proj(hidden_states) value_states = self.v_proj(hidden_states) # Flash attention requires the input to have the shape # batch_size x seq_length x head_dim x hidden_dim # therefore we just need to keep the original shape query_states = query_states.view(q_len, self.num_heads, self.head_dim) key_states = key_states.view(kv_len, self.num_heads, self.head_dim) value_states = value_states.view(kv_len, self.num_heads, self.head_dim) # query_states = apply_rotary_pos_emb_vision(query_states.unsqueeze(0), rotary_pos_emb).squeeze(0) # key_states = apply_rotary_pos_emb_vision(key_states.unsqueeze(0), rotary_pos_emb).squeeze(0) max_seqlen_q = (cu_seqlensq[1:] - cu_seqlensq[:-1]).max().item() max_seqlen_kv = (cu_seqlenskv[1:] - cu_seqlenskv[:-1]).max().item() attn_output = flash_attn_varlen_func(query_states, key_states, value_states, cu_seqlensq, cu_seqlenskv, max_seqlen_q, max_seqlen_kv).reshape( q_len, -1 ) attn_output = self.out_proj(attn_output) return attn_output class VisionSdpaAttention(VisionAttention): def forward( self, hidden_states: torch.Tensor, cu_seqlens: torch.Tensor, rotary_pos_emb: torch.Tensor = None, ) -> torch.Tensor: seq_length = hidden_states.shape[0] query_states = self.q_proj(hidden_states) key_states = self.k_proj(hidden_states) value_states = self.v_proj(hidden_states) query_states = query_states.view(seq_length, self.num_heads, self.head_dim) key_states = key_states.view(seq_length, self.num_heads, self.head_dim) value_states = value_states.view(seq_length, self.num_heads, self.head_dim) query_states = apply_rotary_pos_emb_vision(query_states.unsqueeze(0), rotary_pos_emb).squeeze(0) key_states = apply_rotary_pos_emb_vision(key_states.unsqueeze(0), rotary_pos_emb).squeeze(0) attention_mask = torch.zeros([1, seq_length, seq_length], device=query_states.device, dtype=torch.bool) for i in range(1, len(cu_seqlens)): attention_mask[..., cu_seqlens[i - 1] : cu_seqlens[i], cu_seqlens[i - 1] : cu_seqlens[i]] = True query_states = query_states.transpose(0, 1) key_states = key_states.transpose(0, 1) value_states = value_states.transpose(0, 1) attn_output = F.scaled_dot_product_attention(query_states, key_states, value_states, attention_mask, dropout_p=0.0) attn_output = attn_output.transpose(0, 1) attn_output = attn_output.reshape(seq_length, -1) attn_output = self.out_proj(attn_output) return attn_output VISION_ATTENTION_CLASSES = { "eager": VisionAttention, "flash_attention_2": VisionFlashAttention2, "sdpa": VisionSdpaAttention, } class SigLIPVisionMLP(nn.Module): def __init__(self, config): super().__init__() self.config = config self.activation_fn = ACT2FN[config.hidden_act] self.fc1 = nn.Linear(config.hidden_size, config.intermediate_size) self.fc2 = nn.Linear(config.intermediate_size, config.hidden_size) def forward(self, hidden_states: torch.Tensor) -> torch.Tensor: hidden_states = self.fc1(hidden_states) hidden_states = self.activation_fn(hidden_states) hidden_states = self.fc2(hidden_states) return hidden_states class SigLIPVisionEncoderLayer(nn.Module): def __init__(self, config: ViSCoP_VisionEncoderConfig): super().__init__() self.embed_dim = config.hidden_size self.self_attn = VISION_ATTENTION_CLASSES[config._attn_implementation](config=config) self.layer_norm1 = nn.LayerNorm(self.embed_dim, eps=config.layer_norm_eps) self.mlp = SigLIPVisionMLP(config) self.layer_norm2 = nn.LayerNorm(self.embed_dim, eps=config.layer_norm_eps) # Ignore copy def forward(self, hidden_states, cu_seqlens, rotary_pos_emb) -> torch.Tensor: # hidden_states = hidden_states + self.self_attn( # self.layer_norm1(hidden_states), cu_seqlens=cu_seqlens, rotary_pos_emb=rotary_pos_emb # ) hidden_states = hidden_states + self.self_attn( self.layer_norm1(hidden_states), cu_seqlens, rotary_pos_emb ) hidden_states = hidden_states + self.mlp(self.layer_norm2(hidden_states)) return hidden_states # Alternative where each interaction module is a copy of the entire vision transformer layer. # Empirically we found that simple cross-attention works better. class InteractionModule_CrossAttentionEncoderLayer(nn.Module): def __init__(self, config): super().__init__() embed_dim = config.hidden_size self.cross_attn = InteractionModule_CrossAttention_FA2(config) # Separate norms for queries and visual features self.q_ln1 = nn.LayerNorm(embed_dim, eps=config.layer_norm_eps) self.kv_ln1 = nn.LayerNorm(embed_dim, eps=config.layer_norm_eps) self.q_ln2 = nn.LayerNorm(embed_dim, eps=config.layer_norm_eps) self.mlp = SigLIPVisionMLP(config) def forward(self, visual_probes, hidden_states, query_seqlens, ca_kv_seqlens, rotary_pos_emb): visual_probes = visual_probes + self.cross_attn( self.q_ln1(visual_probes), self.kv_ln1(hidden_states), query_seqlens, ca_kv_seqlens, rotary_pos_emb ) visual_probes = visual_probes + self.mlp(self.q_ln2(visual_probes)) return visual_probes INTERACTION_MODULES = { 'cross_attention': InteractionModule_CrossAttention_FA2, 'cross_attention_transformer': InteractionModule_CrossAttentionEncoderLayer } class ViSCoP_VisionTransformerEncoder(nn.Module): def __init__(self, config: ViSCoP_VisionEncoderConfig): super().__init__() self.config = config head_dim = config.hidden_size // config.num_attention_heads self.rotary_pos_emb = VisionRotaryEmbedding(head_dim // 2) self.layers = nn.ModuleList([SigLIPVisionEncoderLayer(config) for _ in range(config.num_hidden_layers)]) self.gradient_checkpointing = False assert config.interaction_module in INTERACTION_MODULES, f'interaction module must be in: {INTERACTION_MODULES.keys()}' num_interaction_modules = config.num_hidden_layers if config.interaction_module_layers is None else len(config.interaction_module_layers) self.interaction_modules = nn.ModuleList([INTERACTION_MODULES[config.interaction_module](config) for _ in range(num_interaction_modules)]) # > Create interaction modules self.interaction_module_layers = [_ for _ in range(config.num_hidden_layers)] if config.interaction_module_layers is None else config.interaction_module_layers self.interaction_module_mapping = {k: v for v, k in enumerate(self.interaction_module_layers)} self.num_visual_probes = config.num_visual_probes self.visual_probes = nn.Parameter(torch.zeros(1, self.num_visual_probes, self.config.hidden_size)) # > Create the visual probes def rot_pos_emb(self, grid_sizes, merge_sizes): pos_ids = [] for (t, h, w), merge_size in zip(grid_sizes, merge_sizes): hpos_ids = torch.arange(h).unsqueeze(1).expand(-1, w) hpos_ids = hpos_ids.reshape( h // merge_size, merge_size, w // merge_size, merge_size, ) hpos_ids = hpos_ids.permute(0, 2, 1, 3) hpos_ids = hpos_ids.flatten() wpos_ids = torch.arange(w).unsqueeze(0).expand(h, -1) wpos_ids = wpos_ids.reshape( h // merge_size, merge_size, w // merge_size, merge_size, ) wpos_ids = wpos_ids.permute(0, 2, 1, 3) wpos_ids = wpos_ids.flatten() pos_ids.append(torch.stack([hpos_ids, wpos_ids], dim=-1).repeat(t, 1)) pos_ids = torch.cat(pos_ids, dim=0) max_grid_size = grid_sizes[:, 1:].max() rotary_pos_emb_full = self.rotary_pos_emb(max_grid_size) rotary_pos_emb = rotary_pos_emb_full[pos_ids].flatten(1) return rotary_pos_emb def forward(self, hidden_states, grid_sizes, merge_sizes) -> torch.Tensor: rotary_pos_emb = self.rot_pos_emb(grid_sizes, merge_sizes) # mask for self-attention (per-grid attention) cu_seqlens = torch.repeat_interleave(grid_sizes[:, 1] * grid_sizes[:, 2], grid_sizes[:, 0]).cumsum(dim=0, dtype=torch.int32) cu_seqlens = F.pad(cu_seqlens, (1, 0), value=0) # > repeat probes batch_size = grid_sizes.shape[0] visual_probes = self.visual_probes.repeat(batch_size, 1, 1) visual_probes = visual_probes.view(batch_size*self.num_visual_probes, self.config.hidden_size) # > mask for interaction module (cross-attend probes to all grids) visual_probe_seqlens = torch.tensor([self.num_visual_probes]*batch_size).cumsum(dim=0, dtype=torch.int32) visual_probe_seqlens = F.pad(visual_probe_seqlens, (1, 0), value=0).to(cu_seqlens.device) ca_kv_seqlens = (grid_sizes[:, 0] * (grid_sizes[:, 1] * grid_sizes[:, 2])).cumsum(dim=0, dtype=torch.int32) ca_kv_seqlens = F.pad(ca_kv_seqlens, (1, 0), value=0) for i, blk in enumerate(self.layers): if self.gradient_checkpointing and self.training: hidden_states = self._gradient_checkpointing_func( blk.__call__, hidden_states, cu_seqlens, rotary_pos_emb ) if i in self.interaction_module_layers: visual_probes = self._gradient_checkpointing_func( self.interaction_modules[self.interaction_module_mapping[i]].__call__, visual_probes, hidden_states, visual_probe_seqlens, ca_kv_seqlens, rotary_pos_emb ) else: hidden_states = blk(hidden_states, cu_seqlens=cu_seqlens, rotary_pos_emb=rotary_pos_emb) if i in self.interaction_module_layers: visual_probes = self.interaction_modules[self.interaction_module_mapping[i]]( visual_probes, hidden_states, visual_probe_seqlens, ca_kv_seqlens, rotary_pos_emb ) return hidden_states, visual_probes class ViSCoP_VisionEncoderModel(PreTrainedModel): config_class = ViSCoP_VisionEncoderConfig base_model_prefix = "viscop" main_input_name = "pixel_values" supports_gradient_checkpointing = True _no_split_modules = [ "SigLIPVisionEncoderLayer", "VisionEmbeddings", ] _supports_flash_attn_2 = True _supports_sdpa = True def __init__(self, config: ViSCoP_VisionEncoderConfig, **kwargs): super().__init__(config=config) embed_dim = config.hidden_size config.num_visual_probes = kwargs['model_cfg'].num_visual_probes config.interaction_module_layers = kwargs['model_cfg'].interaction_module_layers config.interaction_module = kwargs['model_cfg'].interaction_module self.embeddings = VisionEmbeddings(config) self.encoder = ViSCoP_VisionTransformerEncoder(config) self.post_layernorm = nn.LayerNorm(embed_dim, eps=config.layer_norm_eps) # if config.interaction_module != 'cross_attention': # self.post_layernorm_ca = nn.LayerNorm(embed_dim, eps=config.layer_norm_eps) self.post_init() def forward(self, pixel_values, grid_sizes, merge_sizes=None) -> torch.Tensor: hidden_states = self.embeddings(pixel_values) hidden_states, visual_probes = self.encoder(hidden_states, grid_sizes, merge_sizes) hidden_states = self.post_layernorm(hidden_states) # if self.config.interaction_module != 'cross_attention': # visual_probes = self.post_layernorm_ca(visual_probes) hidden_states_chunks = hidden_states.split(grid_sizes.prod(dim=1).tolist(), dim=0) outputs = [] for hidden_states, grid_size, merge_size in zip(hidden_states_chunks, grid_sizes, merge_sizes): # NOTE: previous implementation, which supports downsampling with any factor c = hidden_states.shape[-1] hidden_states = hidden_states.view( grid_size[0], grid_size[1] // merge_size, grid_size[2] // merge_size, merge_size, merge_size, c ).permute(0, 1, 3, 2, 4, 5) hidden_states = hidden_states.reshape( grid_size[0], grid_size[1], grid_size[2], c ).permute(0, 3, 1, 2) hidden_states = torch.nn.functional.interpolate( hidden_states, size=(grid_size[1] // merge_size, grid_size[2] // merge_size), mode='bilinear' ) hidden_states = hidden_states.permute(0, 2, 3, 1).view(-1, c) # NOTE: simplified implementation, which only supports downsampling with integer factor # NOTE: this implementation is mathematically equivalent to the previous one when merge_size is 1 or 2 but may cause slightly different results # hidden_states = hidden_states.view(-1, merge_size * merge_size, hidden_states.size(-1)) # hidden_states = hidden_states.mean(dim=1) outputs.append(hidden_states) return torch.cat(outputs, dim=0), visual_probes def _init_weights(self, module): """Initialize the weights""" if isinstance(module, nn.Embedding): default_flax_embed_init(module.weight) elif isinstance(module, VisionAttention): if isinstance(module, InteractionModule_CrossAttention_FA2): # for deepspeed-zero3 return nn.init.xavier_uniform_(module.q_proj.weight) nn.init.xavier_uniform_(module.k_proj.weight) nn.init.xavier_uniform_(module.v_proj.weight) nn.init.xavier_uniform_(module.out_proj.weight) nn.init.zeros_(module.q_proj.bias) nn.init.zeros_(module.k_proj.bias) nn.init.zeros_(module.v_proj.bias) nn.init.zeros_(module.out_proj.bias) elif isinstance(module, SigLIPVisionMLP): nn.init.xavier_uniform_(module.fc1.weight) nn.init.xavier_uniform_(module.fc2.weight) nn.init.normal_(module.fc1.bias, std=1e-6) nn.init.normal_(module.fc2.bias, std=1e-6) elif isinstance(module, (nn.Linear, nn.Conv2d)): lecun_normal_(module.weight) if module.bias is not None: nn.init.zeros_(module.bias) elif isinstance(module, nn.LayerNorm): module.bias.data.zero_() module.weight.data.fill_(1.0) elif isinstance(module, ViSCoP_VisionTransformerEncoder): nn.init.normal_(module.visual_probes, std=.02)