| | |
| | |
| | """ |
| | ================================================ |
| | @author: Jaron |
| | @time: 2024/02/20 16:21:56 |
| | @email: fjjth98@163.com |
| | @description: QFormer projector, convert image and video into fixed-length tokens |
| | ================================================ |
| | """ |
| |
|
| | import math |
| | import torch |
| | import torch.nn as nn |
| | from torch.nn.functional import interpolate |
| | from transformers.models.blip_2.modeling_blip_2 import Blip2QFormerModel, Blip2QFormerEncoder |
| |
|
| | from .configuration_ccam_projector import CCAMConfig |
| |
|
| |
|
| | class SimpleQFormerOutput(nn.Module): |
| | |
| | def __init__(self, config): |
| | super().__init__() |
| | self.dense = nn.Linear(config.intermediate_size, config.output_size) |
| |
|
| | def forward(self, hidden_states: torch.Tensor, input_tensor: torch.Tensor = None) -> torch.Tensor: |
| | return self.dense(hidden_states) |
| |
|
| |
|
| | class SimpleQFormerIdentity(nn.Module): |
| | |
| | |
| | def forward(self, hidden_states: torch.Tensor, *args, **kwargs) -> torch.Tensor: |
| | return hidden_states, |
| |
|
| |
|
| | class CCAMModel(Blip2QFormerModel): |
| | _auto_class = 'AutoModel' |
| | config_class = CCAMConfig |
| | base_model_prefix = 'model' |
| | supports_gradient_checkpointing = True |
| | |
| | def __init__(self, config: CCAMConfig): |
| | super(Blip2QFormerModel, self).__init__(config) |
| | self.gradient_checkpointing = False |
| | self.config = config |
| | self.num_query_tokens = config.num_query_tokens |
| | self.visual_attn_mask_type = config.visual_attn_mask_type |
| |
|
| | self.layernorm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps) |
| | self.dropout = nn.Dropout(config.hidden_dropout_prob) |
| | self.encoder = Blip2QFormerEncoder(config) |
| | self.encoder.layer[0].attention = SimpleQFormerIdentity() |
| | self.encoder.layer[-1].output_query = SimpleQFormerOutput(config) |
| |
|
| | |
| | self.query_tokens = nn.Parameter(torch.zeros(1, config.num_query_tokens, config.hidden_size)) |
| |
|
| | |
| | self.spatial_pos_embed = self._create_pos_embed(*config.spatial_resolution, type=config.spatial_pos_embed_type) |
| | self.temporal_pos_embed = self._create_pos_embed(config.temporal_resolution, type=config.temporal_pos_embed_type) |
| |
|
| | |
| | if config.query_attn_mask_type == 'full': |
| | self.query_attn_mask = None |
| | elif config.query_attn_mask_type == 'causal': |
| | query_attn_mask = torch.ones(self.num_query_tokens, self.num_query_tokens) |
| | q = torch.arange(self.num_query_tokens) |
| | query_attn_mask.masked_fill_(q > q[:, None], 0) |
| | self.query_attn_mask = query_attn_mask[None] |
| | else: |
| | raise NotImplementedError(f'Do not support {self.query_attn_mask} query_attn_mask') |
| |
|
| | self.post_init() |
| |
|
| | def _create_pos_embed(self, *size: int, type: str = 'none') -> torch.Tensor: |
| | C = self.config.encoder_hidden_size |
| | if type == 'none': |
| | pos_embed = None |
| | elif type == 'learnable': |
| | pos_embed = nn.Parameter(.02 * torch.randn(*size, C)) |
| | elif type == 'cosine': |
| | total_len = 1 |
| | for i in size: |
| | total_len *= i |
| | raw = torch.outer(torch.arange(total_len), torch.exp(torch.arange(0, C, 2) * (-math.log(10000.) / C))) |
| | pos_embed = nn.Parameter(torch.stack((raw.sin(), raw.cos()), dim=-1).view(*size, C), requires_grad=False) |
| | else: |
| | raise NotImplementedError(f'Do not support {type} position embeddings') |
| | return pos_embed |
| |
|
| | def get_attn_mask(self, embeddings: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]: |
| | """Get visual_attn_mask and query_attn_mask if needed |
| | embeddings (torch.Tensor): (B, T, L, C) |
| | """ |
| | B, T, L, _ = embeddings.size() |
| | device = embeddings.device |
| |
|
| | |
| | if T > 1: |
| | if self.visual_attn_mask_type == 'ccam': |
| | base_attn_mask = torch.ones(T, T, device=device) |
| | t = torch.arange(T, device=device) |
| | base_attn_mask.masked_fill_(t > t[:, None], 0) |
| | visual_attn_mask = torch.cat(( |
| | torch.kron( |
| | base_attn_mask, |
| | torch.ones(self.num_query_tokens // T, L, device=device) |
| | ), |
| | torch.ones(self.num_query_tokens % T, T * L, device=device) |
| | ), dim=0)[None].expand(B, -1, -1) |
| | elif self.attn_mask_type == 'full': |
| | visual_attn_mask = None |
| | else: |
| | raise NotImplementedError(f'Do not support {self.visual_attn_mask_type} attn_mask') |
| | else: |
| | visual_attn_mask = None |
| |
|
| | if self.query_attn_mask is None: |
| | query_attn_mask = None |
| | else: |
| | query_attn_mask = self.query_attn_mask.expand(B, -1, -1) |
| |
|
| | return visual_attn_mask, query_attn_mask |
| |
|
| | def batch_forward_no_spatial(self, visual_embeds: torch.Tensor) -> torch.Tensor: |
| | """Batch forward without spatial mask position embeddings |
| | |
| | Args: |
| | visual_embeds (torch.Tensor): (B, T, L, C) |
| | |
| | Returns: |
| | torch.Tensor: (B, Q, C) |
| | """ |
| | B, T, _, C = visual_embeds.size() |
| | query_embeds = self.query_tokens.expand(B, -1, -1) |
| | visual_attn_mask, query_attn_mask = self.get_attn_mask(visual_embeds) |
| |
|
| | |
| | if self.temporal_pos_embed is not None: |
| | if T == self.temporal_pos_embed.size(0): |
| | pos_embed = self.temporal_pos_embed |
| | elif T == 1: |
| | pos_embed = 0. * self.temporal_pos_embed[:1] |
| | else: |
| | pos_embed = interpolate( |
| | self.temporal_pos_embed.T[None], |
| | size=(T,), |
| | mode='linear', |
| | align_corners=False |
| | )[0].T |
| | visual_embeds = visual_embeds + pos_embed.view(1, T, 1, C) |
| | visual_embeds = visual_embeds.flatten(1, 2) |
| |
|
| | return super().forward( |
| | query_embeds=query_embeds, |
| | attention_mask=query_attn_mask, |
| | encoder_hidden_states=visual_embeds, |
| | encoder_attention_mask=visual_attn_mask |
| | )[0] |
| |
|
| | def forward(self, visual_embeds: torch.Tensor, split_sizes: list[int], unmasked_ids: torch.LongTensor = None): |
| | """ |
| | visual_embeds (torch.Tensor): (T, L, C) |
| | split_sizes (list[int]): [t0, t1, ...] sum_i ti=T |
| | unmasked_ids (torch.LongTensor): If provided, should be in the shape of (T, L) whose value v 0<=v<=HW-1 |
| | output_attentions (_type_, optional): _description_. Defaults to None. |
| | output_hidden_states (_type_, optional): _description_. Defaults to None. |
| | return_dict (_type_, optional): _description_. Defaults to None. |
| | """ |
| | _, L, C = visual_embeds.size() |
| |
|
| | |
| | if self.spatial_pos_embed is not None: |
| | pos_embed = self.spatial_pos_embed.view(-1, C) |
| | if unmasked_ids is None: |
| | pos_embed = pos_embed.view(1, L, C) |
| | else: |
| | pos_embed = pos_embed[unmasked_ids] |
| | visual_embeds = visual_embeds + pos_embed |
| |
|
| | |
| | if len(set(split_sizes)) == 1: |
| | visual_embeds = visual_embeds.view(len(split_sizes), split_sizes[0], L, C) |
| | output = self.batch_forward_no_spatial(visual_embeds) |
| | else: |
| | visual_embeds = visual_embeds.split(split_sizes, dim=0) |
| | |
| | output, group_visual_embeds = [None] * len(split_sizes), {} |
| | for idx, (embed, t) in enumerate(zip(visual_embeds, split_sizes)): |
| | if t in group_visual_embeds: |
| | group_visual_embeds[t][0].append(idx) |
| | group_visual_embeds[t][1].append(embed) |
| | else: |
| | group_visual_embeds[t] = [[idx], [embed]] |
| | for idx, embeds in group_visual_embeds.values(): |
| | cur_output = self.batch_forward_no_spatial(torch.stack(embeds, dim=0)) |
| | for i, j in enumerate(idx): |
| | output[j] = cur_output[i] |
| | output = torch.stack(output, dim=0) |
| |
|
| | return output |
| |
|