VisCoP-VE / modeling_viscop_vision_encoder.py
dreilly's picture
Upload folder using huggingface_hub
61dcebe verified
# Adopted from https://github.com/DAMO-NLP-SG/VideoLLaMA3/blob/main/videollama3/model/videollama3_encoder/modeling_videollama3_encoder.py
# Adopted from https://github.com/huggingface/transformers/blob/main/src/transformers/models/qwen2_vl/modeling_qwen2_vl.py.
# Below is the original copyright:
# Copyright 2024 The Qwen team, Alibaba Group and the HuggingFace Inc. team. All rights reserved.
#
# This code is based on EleutherAI's GPT-NeoX library and the GPT-NeoX
# and OPT implementations in this library. It has been modified from its
# original forms to accommodate minor architectural differences compared
# to GPT-NeoX and OPT used by the Meta AI team that trained the model.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""PyTorch ViSCoP vision encoder model. This file contains the implementation of visual probes and interaction modules"""
import importlib.util
import os.path as osp
import math
import warnings
from einops import rearrange
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.checkpoint
from torch.nn.init import _calculate_fan_in_and_fan_out
from transformers.activations import ACT2FN
from transformers.modeling_utils import PreTrainedModel
from transformers.utils import is_flash_attn_2_available
if is_flash_attn_2_available():
from flash_attn import flash_attn_varlen_func
else:
flash_attn_varlen_func = None
from .configuration_viscop_vision_encoder import ViSCoP_VisionEncoderConfig
# try:
# from .configuration_viscop_encoder import ViSCoP_VisionEncoderConfig
# except ImportError:
# spec = importlib.util.spec_from_file_location(
# "configuration_videollama3_encoder",
# osp.join(osp.dirname(__file__), "configuration_videollama3_encoder.py"),
# )
# configuration_videollama3_encoder = importlib.util.module_from_spec(spec)
# spec.loader.exec_module(configuration_videollama3_encoder)
# Videollama3VisionEncoderConfig = getattr(
# configuration_videollama3_encoder,
# "Videollama3VisionEncoderConfig",
# )
def _trunc_normal_(tensor, mean, std, a, b):
# Cut & paste from PyTorch official master until it's in a few official releases - RW
# Method based on https://people.sc.fsu.edu/~jburkardt/presentations/truncated_normal.pdf
def norm_cdf(x):
# Computes standard normal cumulative distribution function
return (1.0 + math.erf(x / math.sqrt(2.0))) / 2.0
if (mean < a - 2 * std) or (mean > b + 2 * std):
warnings.warn(
"mean is more than 2 std from [a, b] in nn.init.trunc_normal_. "
"The distribution of values may be incorrect.",
stacklevel=2,
)
# Values are generated by using a truncated uniform distribution and
# then using the inverse CDF for the normal distribution.
# Get upper and lower cdf values
l = norm_cdf((a - mean) / std)
u = norm_cdf((b - mean) / std)
# Uniformly fill tensor with values from [l, u], then translate to
# [2l-1, 2u-1].
tensor.uniform_(2 * l - 1, 2 * u - 1)
# Use inverse cdf transform for normal distribution to get truncated
# standard normal
tensor.erfinv_()
# Transform to proper mean, std
tensor.mul_(std * math.sqrt(2.0))
tensor.add_(mean)
# Clamp to ensure it's in the proper range
tensor.clamp_(min=a, max=b)
def trunc_normal_tf_(
tensor: torch.Tensor, mean: float = 0.0, std: float = 1.0, a: float = -2.0, b: float = 2.0
) -> torch.Tensor:
"""Fills the input Tensor with values drawn from a truncated
normal distribution. The values are effectively drawn from the
normal distribution :math:`\\mathcal{N}(\text{mean}, \text{std}^2)`
with values outside :math:`[a, b]` redrawn until they are within
the bounds. The method used for generating the random values works
best when :math:`a \\leq \text{mean} \\leq b`.
NOTE: this 'tf' variant behaves closer to Tensorflow / JAX impl where the
bounds [a, b] are applied when sampling the normal distribution with mean=0, std=1.0
and the result is subsequently scaled and shifted by the mean and std args.
Args:
tensor: an n-dimensional `torch.Tensor`
mean: the mean of the normal distribution
std: the standard deviation of the normal distribution
a: the minimum cutoff value
b: the maximum cutoff value
"""
with torch.no_grad():
_trunc_normal_(tensor, 0, 1.0, a, b)
tensor.mul_(std).add_(mean)
def variance_scaling_(tensor, scale=1.0, mode="fan_in", distribution="normal"):
fan_in, fan_out = _calculate_fan_in_and_fan_out(tensor)
if mode == "fan_in":
denom = fan_in
elif mode == "fan_out":
denom = fan_out
elif mode == "fan_avg":
denom = (fan_in + fan_out) / 2
variance = scale / denom
if distribution == "truncated_normal":
# constant is stddev of standard normal truncated to (-2, 2)
trunc_normal_tf_(tensor, std=math.sqrt(variance) / 0.87962566103423978)
elif distribution == "normal":
with torch.no_grad():
tensor.normal_(std=math.sqrt(variance))
elif distribution == "uniform":
bound = math.sqrt(3 * variance)
with torch.no_grad():
tensor.uniform_(-bound, bound)
else:
raise ValueError(f"invalid distribution {distribution}")
def lecun_normal_(tensor):
variance_scaling_(tensor, mode="fan_in", distribution="truncated_normal")
def default_flax_embed_init(tensor):
variance_scaling_(tensor, mode="fan_in", distribution="normal")
# Copied from transformers.models.llama.modeling_llama.rotate_half
def rotate_half(x):
"""Rotates half the hidden dims of the input."""
x1 = x[..., : x.shape[-1] // 2]
x2 = x[..., x.shape[-1] // 2 :]
return torch.cat((-x2, x1), dim=-1)
def apply_rotary_pos_emb_vision(tensor: torch.Tensor, freqs: torch.Tensor) -> torch.Tensor:
orig_dtype = tensor.dtype
tensor = tensor.float()
cos = freqs.cos()
sin = freqs.sin()
cos = cos.unsqueeze(1).repeat(1, 1, 2).unsqueeze(0).float()
sin = sin.unsqueeze(1).repeat(1, 1, 2).unsqueeze(0).float()
output = (tensor * cos) + (rotate_half(tensor) * sin)
output = output.to(orig_dtype)
return output
class VisionRotaryEmbedding(nn.Module):
def __init__(self, dim: int, theta: float = 10000.0) -> None:
super().__init__()
inv_freq = 1.0 / (theta ** (torch.arange(0, dim, 2, dtype=torch.float) / dim))
self.register_buffer("inv_freq", inv_freq, persistent=False)
def forward(self, seqlen: int) -> torch.Tensor:
seq = torch.arange(seqlen, device=self.inv_freq.device, dtype=self.inv_freq.dtype)
freqs = torch.outer(seq, self.inv_freq)
return freqs
class VisionEmbeddings(nn.Module):
def __init__(self, config: ViSCoP_VisionEncoderConfig):
super().__init__()
self.config = config
self.embed_dim = config.hidden_size
self.patch_size = config.patch_size
self.patch_embedding = nn.Conv2d(
in_channels=config.num_channels,
out_channels=self.embed_dim,
kernel_size=self.patch_size,
stride=self.patch_size,
padding="valid",
)
def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
hidden_states = hidden_states.view(
-1, self.config.num_channels, self.patch_size, self.patch_size
)
patch_embeds = self.patch_embedding(hidden_states) # shape = [*, width, grid, grid]
# embeddings = patch_embeds.flatten(2).transpose(1, 2)
embeddings = patch_embeds.view(-1, self.embed_dim)
return embeddings
class VisionAttention(nn.Module):
"""Multi-headed attention from 'Attention Is All You Need' paper"""
# Copied from transformers.models.clip.modeling_clip.CLIPAttention.__init__
def __init__(self, config):
super().__init__()
self.config = config
self.embed_dim = config.hidden_size
self.num_heads = config.num_attention_heads
self.head_dim = self.embed_dim // self.num_heads
if self.head_dim * self.num_heads != self.embed_dim:
raise ValueError(
f"embed_dim must be divisible by num_heads (got `embed_dim`: {self.embed_dim} and `num_heads`:"
f" {self.num_heads})."
)
self.scale = self.head_dim**-0.5
self.dropout = config.attention_dropout
self.k_proj = nn.Linear(self.embed_dim, self.embed_dim)
self.v_proj = nn.Linear(self.embed_dim, self.embed_dim)
self.q_proj = nn.Linear(self.embed_dim, self.embed_dim)
self.out_proj = nn.Linear(self.embed_dim, self.embed_dim)
def forward(
self,
hidden_states: torch.Tensor,
cu_seqlens: torch.Tensor,
rotary_pos_emb: torch.Tensor = None,
) -> torch.Tensor:
"""Input shape: Time x Channel"""
q_len, _ = hidden_states.size()
query_states = self.q_proj(hidden_states)
key_states = self.k_proj(hidden_states)
value_states = self.v_proj(hidden_states)
query_states = query_states.view(q_len, self.num_heads, self.head_dim)
key_states = key_states.view(q_len, self.num_heads, self.head_dim)
value_states = value_states.view(q_len, self.num_heads, self.head_dim)
query_states = apply_rotary_pos_emb_vision(query_states.unsqueeze(0), rotary_pos_emb).squeeze(0)
key_states = apply_rotary_pos_emb_vision(key_states.unsqueeze(0), rotary_pos_emb).squeeze(0)
attention_mask = torch.zeros([1, q_len, q_len], device=query_states.device, dtype=torch.bool)
for i in range(1, len(cu_seqlens)):
attention_mask[..., cu_seqlens[i - 1] : cu_seqlens[i], cu_seqlens[i - 1] : cu_seqlens[i]] = True
query_states = query_states.transpose(0, 1)
key_states = key_states.transpose(0, 1)
value_states = value_states.transpose(0, 1)
attn_weights = torch.matmul(query_states, key_states.transpose(1, 2)) / math.sqrt(self.head_dim)
attn_weights = attn_weights + attention_mask
# upcast attention to fp32
attn_weights = nn.functional.softmax(attn_weights, dim=-1, dtype=torch.float32).to(query_states.dtype)
attn_weights = nn.functional.dropout(attn_weights, p=self.dropout, training=self.training)
attn_output = torch.matmul(attn_weights, value_states)
attn_output = attn_output.transpose(0, 1)
attn_output = attn_output.reshape(q_len, -1)
attn_output = self.out_proj(attn_output)
return attn_output
class VisionFlashAttention2(VisionAttention):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Adapted from transformers.models.llama.modeling_llama.LlamaFlashAttention2.forward
def forward(
self,
hidden_states: torch.Tensor,
cu_seqlens: torch.Tensor,
rotary_pos_emb: torch.Tensor = None,
) -> torch.Tensor:
q_len, _ = hidden_states.size()
query_states = self.q_proj(hidden_states)
key_states = self.k_proj(hidden_states)
value_states = self.v_proj(hidden_states)
# Flash attention requires the input to have the shape
# batch_size x seq_length x head_dim x hidden_dim
# therefore we just need to keep the original shape
query_states = query_states.view(q_len, self.num_heads, self.head_dim)
key_states = key_states.view(q_len, self.num_heads, self.head_dim)
value_states = value_states.view(q_len, self.num_heads, self.head_dim)
query_states = apply_rotary_pos_emb_vision(query_states.unsqueeze(0), rotary_pos_emb).squeeze(0)
key_states = apply_rotary_pos_emb_vision(key_states.unsqueeze(0), rotary_pos_emb).squeeze(0)
max_seqlen = (cu_seqlens[1:] - cu_seqlens[:-1]).max().item()
attn_output = flash_attn_varlen_func(query_states, key_states, value_states, cu_seqlens, cu_seqlens, max_seqlen, max_seqlen).reshape(
q_len, -1
)
attn_output = self.out_proj(attn_output)
return attn_output
class InteractionModule_CrossAttention_FA2(VisionAttention):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Adapted from transformers.models.llama.modeling_llama.LlamaFlashAttention2.forward
def forward(
self,
visual_probes: torch.Tensor,
hidden_states: torch.Tensor,
cu_seqlensq: torch.Tensor,
cu_seqlenskv: torch.Tensor,
rotary_pos_emb: torch.Tensor = None,
) -> torch.Tensor:
q_len, _ = visual_probes.size()
kv_len, _ = hidden_states.size()
query_states = self.q_proj(visual_probes)
key_states = self.k_proj(hidden_states)
value_states = self.v_proj(hidden_states)
# Flash attention requires the input to have the shape
# batch_size x seq_length x head_dim x hidden_dim
# therefore we just need to keep the original shape
query_states = query_states.view(q_len, self.num_heads, self.head_dim)
key_states = key_states.view(kv_len, self.num_heads, self.head_dim)
value_states = value_states.view(kv_len, self.num_heads, self.head_dim)
# query_states = apply_rotary_pos_emb_vision(query_states.unsqueeze(0), rotary_pos_emb).squeeze(0)
# key_states = apply_rotary_pos_emb_vision(key_states.unsqueeze(0), rotary_pos_emb).squeeze(0)
max_seqlen_q = (cu_seqlensq[1:] - cu_seqlensq[:-1]).max().item()
max_seqlen_kv = (cu_seqlenskv[1:] - cu_seqlenskv[:-1]).max().item()
attn_output = flash_attn_varlen_func(query_states, key_states, value_states, cu_seqlensq, cu_seqlenskv, max_seqlen_q, max_seqlen_kv).reshape(
q_len, -1
)
attn_output = self.out_proj(attn_output)
return attn_output
class VisionSdpaAttention(VisionAttention):
def forward(
self,
hidden_states: torch.Tensor,
cu_seqlens: torch.Tensor,
rotary_pos_emb: torch.Tensor = None,
) -> torch.Tensor:
seq_length = hidden_states.shape[0]
query_states = self.q_proj(hidden_states)
key_states = self.k_proj(hidden_states)
value_states = self.v_proj(hidden_states)
query_states = query_states.view(seq_length, self.num_heads, self.head_dim)
key_states = key_states.view(seq_length, self.num_heads, self.head_dim)
value_states = value_states.view(seq_length, self.num_heads, self.head_dim)
query_states = apply_rotary_pos_emb_vision(query_states.unsqueeze(0), rotary_pos_emb).squeeze(0)
key_states = apply_rotary_pos_emb_vision(key_states.unsqueeze(0), rotary_pos_emb).squeeze(0)
attention_mask = torch.zeros([1, seq_length, seq_length], device=query_states.device, dtype=torch.bool)
for i in range(1, len(cu_seqlens)):
attention_mask[..., cu_seqlens[i - 1] : cu_seqlens[i], cu_seqlens[i - 1] : cu_seqlens[i]] = True
query_states = query_states.transpose(0, 1)
key_states = key_states.transpose(0, 1)
value_states = value_states.transpose(0, 1)
attn_output = F.scaled_dot_product_attention(query_states, key_states, value_states, attention_mask, dropout_p=0.0)
attn_output = attn_output.transpose(0, 1)
attn_output = attn_output.reshape(seq_length, -1)
attn_output = self.out_proj(attn_output)
return attn_output
VISION_ATTENTION_CLASSES = {
"eager": VisionAttention,
"flash_attention_2": VisionFlashAttention2,
"sdpa": VisionSdpaAttention,
}
class SigLIPVisionMLP(nn.Module):
def __init__(self, config):
super().__init__()
self.config = config
self.activation_fn = ACT2FN[config.hidden_act]
self.fc1 = nn.Linear(config.hidden_size, config.intermediate_size)
self.fc2 = nn.Linear(config.intermediate_size, config.hidden_size)
def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
hidden_states = self.fc1(hidden_states)
hidden_states = self.activation_fn(hidden_states)
hidden_states = self.fc2(hidden_states)
return hidden_states
class SigLIPVisionEncoderLayer(nn.Module):
def __init__(self, config: ViSCoP_VisionEncoderConfig):
super().__init__()
self.embed_dim = config.hidden_size
self.self_attn = VISION_ATTENTION_CLASSES[config._attn_implementation](config=config)
self.layer_norm1 = nn.LayerNorm(self.embed_dim, eps=config.layer_norm_eps)
self.mlp = SigLIPVisionMLP(config)
self.layer_norm2 = nn.LayerNorm(self.embed_dim, eps=config.layer_norm_eps)
# Ignore copy
def forward(self, hidden_states, cu_seqlens, rotary_pos_emb) -> torch.Tensor:
# hidden_states = hidden_states + self.self_attn(
# self.layer_norm1(hidden_states), cu_seqlens=cu_seqlens, rotary_pos_emb=rotary_pos_emb
# )
hidden_states = hidden_states + self.self_attn(
self.layer_norm1(hidden_states), cu_seqlens, rotary_pos_emb
)
hidden_states = hidden_states + self.mlp(self.layer_norm2(hidden_states))
return hidden_states
# Alternative where each interaction module is a copy of the entire vision transformer layer.
# Empirically we found that simple cross-attention works better.
class InteractionModule_CrossAttentionEncoderLayer(nn.Module):
def __init__(self, config):
super().__init__()
embed_dim = config.hidden_size
self.cross_attn = InteractionModule_CrossAttention_FA2(config)
# Separate norms for queries and visual features
self.q_ln1 = nn.LayerNorm(embed_dim, eps=config.layer_norm_eps)
self.kv_ln1 = nn.LayerNorm(embed_dim, eps=config.layer_norm_eps)
self.q_ln2 = nn.LayerNorm(embed_dim, eps=config.layer_norm_eps)
self.mlp = SigLIPVisionMLP(config)
def forward(self, visual_probes, hidden_states, query_seqlens, ca_kv_seqlens, rotary_pos_emb):
visual_probes = visual_probes + self.cross_attn(
self.q_ln1(visual_probes), self.kv_ln1(hidden_states), query_seqlens, ca_kv_seqlens, rotary_pos_emb
)
visual_probes = visual_probes + self.mlp(self.q_ln2(visual_probes))
return visual_probes
INTERACTION_MODULES = {
'cross_attention': InteractionModule_CrossAttention_FA2,
'cross_attention_transformer': InteractionModule_CrossAttentionEncoderLayer
}
class ViSCoP_VisionTransformerEncoder(nn.Module):
def __init__(self, config: ViSCoP_VisionEncoderConfig):
super().__init__()
self.config = config
head_dim = config.hidden_size // config.num_attention_heads
self.rotary_pos_emb = VisionRotaryEmbedding(head_dim // 2)
self.layers = nn.ModuleList([SigLIPVisionEncoderLayer(config) for _ in range(config.num_hidden_layers)])
self.gradient_checkpointing = False
assert config.interaction_module in INTERACTION_MODULES, f'interaction module must be in: {INTERACTION_MODULES.keys()}'
num_interaction_modules = config.num_hidden_layers if config.interaction_module_layers is None else len(config.interaction_module_layers)
self.interaction_modules = nn.ModuleList([INTERACTION_MODULES[config.interaction_module](config) for _ in range(num_interaction_modules)]) # > Create interaction modules
self.interaction_module_layers = [_ for _ in range(config.num_hidden_layers)] if config.interaction_module_layers is None else config.interaction_module_layers
self.interaction_module_mapping = {k: v for v, k in enumerate(self.interaction_module_layers)}
self.num_visual_probes = config.num_visual_probes
self.visual_probes = nn.Parameter(torch.zeros(1, self.num_visual_probes, self.config.hidden_size)) # > Create the visual probes
def rot_pos_emb(self, grid_sizes, merge_sizes):
pos_ids = []
for (t, h, w), merge_size in zip(grid_sizes, merge_sizes):
hpos_ids = torch.arange(h).unsqueeze(1).expand(-1, w)
hpos_ids = hpos_ids.reshape(
h // merge_size,
merge_size,
w // merge_size,
merge_size,
)
hpos_ids = hpos_ids.permute(0, 2, 1, 3)
hpos_ids = hpos_ids.flatten()
wpos_ids = torch.arange(w).unsqueeze(0).expand(h, -1)
wpos_ids = wpos_ids.reshape(
h // merge_size,
merge_size,
w // merge_size,
merge_size,
)
wpos_ids = wpos_ids.permute(0, 2, 1, 3)
wpos_ids = wpos_ids.flatten()
pos_ids.append(torch.stack([hpos_ids, wpos_ids], dim=-1).repeat(t, 1))
pos_ids = torch.cat(pos_ids, dim=0)
max_grid_size = grid_sizes[:, 1:].max()
rotary_pos_emb_full = self.rotary_pos_emb(max_grid_size)
rotary_pos_emb = rotary_pos_emb_full[pos_ids].flatten(1)
return rotary_pos_emb
def forward(self, hidden_states, grid_sizes, merge_sizes) -> torch.Tensor:
rotary_pos_emb = self.rot_pos_emb(grid_sizes, merge_sizes)
# mask for self-attention (per-grid attention)
cu_seqlens = torch.repeat_interleave(grid_sizes[:, 1] * grid_sizes[:, 2], grid_sizes[:, 0]).cumsum(dim=0, dtype=torch.int32)
cu_seqlens = F.pad(cu_seqlens, (1, 0), value=0)
# > repeat probes
batch_size = grid_sizes.shape[0]
visual_probes = self.visual_probes.repeat(batch_size, 1, 1)
visual_probes = visual_probes.view(batch_size*self.num_visual_probes, self.config.hidden_size)
# > mask for interaction module (cross-attend probes to all grids)
visual_probe_seqlens = torch.tensor([self.num_visual_probes]*batch_size).cumsum(dim=0, dtype=torch.int32)
visual_probe_seqlens = F.pad(visual_probe_seqlens, (1, 0), value=0).to(cu_seqlens.device)
ca_kv_seqlens = (grid_sizes[:, 0] * (grid_sizes[:, 1] * grid_sizes[:, 2])).cumsum(dim=0, dtype=torch.int32)
ca_kv_seqlens = F.pad(ca_kv_seqlens, (1, 0), value=0)
for i, blk in enumerate(self.layers):
if self.gradient_checkpointing and self.training:
hidden_states = self._gradient_checkpointing_func(
blk.__call__,
hidden_states,
cu_seqlens,
rotary_pos_emb
)
if i in self.interaction_module_layers:
visual_probes = self._gradient_checkpointing_func(
self.interaction_modules[self.interaction_module_mapping[i]].__call__,
visual_probes,
hidden_states,
visual_probe_seqlens,
ca_kv_seqlens,
rotary_pos_emb
)
else:
hidden_states = blk(hidden_states, cu_seqlens=cu_seqlens, rotary_pos_emb=rotary_pos_emb)
if i in self.interaction_module_layers:
visual_probes = self.interaction_modules[self.interaction_module_mapping[i]](
visual_probes,
hidden_states,
visual_probe_seqlens,
ca_kv_seqlens,
rotary_pos_emb
)
return hidden_states, visual_probes
class ViSCoP_VisionEncoderModel(PreTrainedModel):
config_class = ViSCoP_VisionEncoderConfig
base_model_prefix = "viscop"
main_input_name = "pixel_values"
supports_gradient_checkpointing = True
_no_split_modules = [
"SigLIPVisionEncoderLayer",
"VisionEmbeddings",
]
_supports_flash_attn_2 = True
_supports_sdpa = True
def __init__(self, config: ViSCoP_VisionEncoderConfig, **kwargs):
super().__init__(config=config)
embed_dim = config.hidden_size
config.num_visual_probes = kwargs['model_cfg'].num_visual_probes
config.interaction_module_layers = kwargs['model_cfg'].interaction_module_layers
config.interaction_module = kwargs['model_cfg'].interaction_module
self.embeddings = VisionEmbeddings(config)
self.encoder = ViSCoP_VisionTransformerEncoder(config)
self.post_layernorm = nn.LayerNorm(embed_dim, eps=config.layer_norm_eps)
# if config.interaction_module != 'cross_attention':
# self.post_layernorm_ca = nn.LayerNorm(embed_dim, eps=config.layer_norm_eps)
self.post_init()
def forward(self, pixel_values, grid_sizes, merge_sizes=None) -> torch.Tensor:
hidden_states = self.embeddings(pixel_values)
hidden_states, visual_probes = self.encoder(hidden_states, grid_sizes, merge_sizes)
hidden_states = self.post_layernorm(hidden_states)
# if self.config.interaction_module != 'cross_attention':
# visual_probes = self.post_layernorm_ca(visual_probes)
hidden_states_chunks = hidden_states.split(grid_sizes.prod(dim=1).tolist(), dim=0)
outputs = []
for hidden_states, grid_size, merge_size in zip(hidden_states_chunks, grid_sizes, merge_sizes):
# NOTE: previous implementation, which supports downsampling with any factor
c = hidden_states.shape[-1]
hidden_states = hidden_states.view(
grid_size[0], grid_size[1] // merge_size, grid_size[2] // merge_size, merge_size, merge_size, c
).permute(0, 1, 3, 2, 4, 5)
hidden_states = hidden_states.reshape(
grid_size[0], grid_size[1], grid_size[2], c
).permute(0, 3, 1, 2)
hidden_states = torch.nn.functional.interpolate(
hidden_states,
size=(grid_size[1] // merge_size, grid_size[2] // merge_size),
mode='bilinear'
)
hidden_states = hidden_states.permute(0, 2, 3, 1).view(-1, c)
# NOTE: simplified implementation, which only supports downsampling with integer factor
# NOTE: this implementation is mathematically equivalent to the previous one when merge_size is 1 or 2 but may cause slightly different results
# hidden_states = hidden_states.view(-1, merge_size * merge_size, hidden_states.size(-1))
# hidden_states = hidden_states.mean(dim=1)
outputs.append(hidden_states)
return torch.cat(outputs, dim=0), visual_probes
def _init_weights(self, module):
"""Initialize the weights"""
if isinstance(module, nn.Embedding):
default_flax_embed_init(module.weight)
elif isinstance(module, VisionAttention):
if isinstance(module, InteractionModule_CrossAttention_FA2): # for deepspeed-zero3
return
nn.init.xavier_uniform_(module.q_proj.weight)
nn.init.xavier_uniform_(module.k_proj.weight)
nn.init.xavier_uniform_(module.v_proj.weight)
nn.init.xavier_uniform_(module.out_proj.weight)
nn.init.zeros_(module.q_proj.bias)
nn.init.zeros_(module.k_proj.bias)
nn.init.zeros_(module.v_proj.bias)
nn.init.zeros_(module.out_proj.bias)
elif isinstance(module, SigLIPVisionMLP):
nn.init.xavier_uniform_(module.fc1.weight)
nn.init.xavier_uniform_(module.fc2.weight)
nn.init.normal_(module.fc1.bias, std=1e-6)
nn.init.normal_(module.fc2.bias, std=1e-6)
elif isinstance(module, (nn.Linear, nn.Conv2d)):
lecun_normal_(module.weight)
if module.bias is not None:
nn.init.zeros_(module.bias)
elif isinstance(module, nn.LayerNorm):
module.bias.data.zero_()
module.weight.data.fill_(1.0)
elif isinstance(module, ViSCoP_VisionTransformerEncoder):
nn.init.normal_(module.visual_probes, std=.02)