| |
| |
| """ |
| ================================================ |
| @author: Jaron |
| @time: 2024/02/20 16:21:56 |
| @email: fjjth98@163.com |
| @description: QFormer projector, convert image and video into fixed-length tokens |
| ================================================ |
| """ |
|
|
| import math |
| import torch |
| import torch.nn as nn |
| from torch.nn.functional import interpolate |
| from transformers.models.blip_2.modeling_blip_2 import Blip2QFormerModel, Blip2QFormerEncoder |
|
|
| from .configuration_ccam_projector import CCAMConfig |
|
|
|
|
| class SimpleQFormerOutput(nn.Module): |
| |
| def __init__(self, config): |
| super().__init__() |
| self.dense = nn.Linear(config.intermediate_size, config.output_size) |
|
|
| def forward(self, hidden_states: torch.Tensor, input_tensor: torch.Tensor = None) -> torch.Tensor: |
| return self.dense(hidden_states) |
|
|
|
|
| class SimpleQFormerIdentity(nn.Module): |
| |
| |
| def forward(self, hidden_states: torch.Tensor, *args, **kwargs) -> torch.Tensor: |
| return hidden_states, |
|
|
|
|
| class CCAMModel(Blip2QFormerModel): |
| _auto_class = 'AutoModel' |
| config_class = CCAMConfig |
| base_model_prefix = 'model' |
| supports_gradient_checkpointing = True |
| |
| def __init__(self, config: CCAMConfig): |
| super(Blip2QFormerModel, self).__init__(config) |
| self.gradient_checkpointing = False |
| self.config = config |
| self.num_query_tokens = config.num_query_tokens |
| self.visual_attn_mask_type = config.visual_attn_mask_type |
|
|
| self.layernorm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps) |
| self.dropout = nn.Dropout(config.hidden_dropout_prob) |
| self.encoder = Blip2QFormerEncoder(config) |
| self.encoder.layer[0].attention = SimpleQFormerIdentity() |
| self.encoder.layer[-1].output_query = SimpleQFormerOutput(config) |
|
|
| |
| self.query_tokens = nn.Parameter(torch.zeros(1, config.num_query_tokens, config.hidden_size)) |
|
|
| |
| self.spatial_pos_embed = self._create_pos_embed(*config.spatial_resolution, type=config.spatial_pos_embed_type) |
| self.temporal_pos_embed = self._create_pos_embed(config.temporal_resolution, type=config.temporal_pos_embed_type) |
|
|
| |
| if config.query_attn_mask_type == 'full': |
| self.query_attn_mask = None |
| elif config.query_attn_mask_type == 'causal': |
| query_attn_mask = torch.ones(self.num_query_tokens, self.num_query_tokens) |
| q = torch.arange(self.num_query_tokens) |
| query_attn_mask.masked_fill_(q > q[:, None], 0) |
| self.query_attn_mask = query_attn_mask[None] |
| else: |
| raise NotImplementedError(f'Do not support {self.query_attn_mask} query_attn_mask') |
|
|
| self.post_init() |
|
|
| def _create_pos_embed(self, *size: int, type: str = 'none') -> torch.Tensor: |
| C = self.config.encoder_hidden_size |
| if type == 'none': |
| pos_embed = None |
| elif type == 'learnable': |
| pos_embed = nn.Parameter(.02 * torch.randn(*size, C)) |
| elif type == 'cosine': |
| total_len = 1 |
| for i in size: |
| total_len *= i |
| raw = torch.outer(torch.arange(total_len), torch.exp(torch.arange(0, C, 2) * (-math.log(10000.) / C))) |
| pos_embed = nn.Parameter(torch.stack((raw.sin(), raw.cos()), dim=-1).view(*size, C), requires_grad=False) |
| else: |
| raise NotImplementedError(f'Do not support {type} position embeddings') |
| return pos_embed |
|
|
| def get_attn_mask(self, embeddings: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]: |
| """Get visual_attn_mask and query_attn_mask if needed |
| embeddings (torch.Tensor): (B, T, L, C) |
| """ |
| B, T, L, _ = embeddings.size() |
| device = embeddings.device |
|
|
| |
| if T > 1: |
| if self.visual_attn_mask_type == 'ccam': |
| base_attn_mask = torch.ones(T, T, device=device) |
| t = torch.arange(T, device=device) |
| base_attn_mask.masked_fill_(t > t[:, None], 0) |
| visual_attn_mask = torch.cat(( |
| torch.kron( |
| base_attn_mask, |
| torch.ones(self.num_query_tokens // T, L, device=device) |
| ), |
| torch.ones(self.num_query_tokens % T, T * L, device=device) |
| ), dim=0)[None].expand(B, -1, -1) |
| elif self.visual_attn_mask_type == 'full': |
| visual_attn_mask = None |
| else: |
| raise NotImplementedError(f'Do not support {self.visual_attn_mask_type} attn_mask') |
| else: |
| visual_attn_mask = None |
|
|
| if self.query_attn_mask is None: |
| query_attn_mask = None |
| else: |
| query_attn_mask = self.query_attn_mask.expand(B, -1, -1) |
|
|
| return visual_attn_mask, query_attn_mask |
|
|
| def batch_forward_no_spatial(self, visual_embeds: torch.Tensor) -> torch.Tensor: |
| """Batch forward without spatial mask position embeddings |
| |
| Args: |
| visual_embeds (torch.Tensor): (B, T, L, C) |
| |
| Returns: |
| torch.Tensor: (B, Q, C) |
| """ |
| B, T, _, C = visual_embeds.size() |
| query_embeds = self.query_tokens.expand(B, -1, -1) |
| visual_attn_mask, query_attn_mask = self.get_attn_mask(visual_embeds) |
|
|
| |
| if self.temporal_pos_embed is not None: |
| if T == self.temporal_pos_embed.size(0): |
| pos_embed = self.temporal_pos_embed |
| elif T == 1: |
| pos_embed = 0. * self.temporal_pos_embed[:1] |
| else: |
| pos_embed = interpolate( |
| self.temporal_pos_embed.T[None], |
| size=(T,), |
| mode='linear', |
| align_corners=False |
| )[0].T |
| visual_embeds = visual_embeds + pos_embed.view(1, T, 1, C) |
| visual_embeds = visual_embeds.flatten(1, 2) |
|
|
| return super().forward( |
| query_embeds=query_embeds, |
| attention_mask=query_attn_mask, |
| encoder_hidden_states=visual_embeds, |
| encoder_attention_mask=visual_attn_mask |
| )[0] |
|
|
| def forward(self, visual_embeds: torch.Tensor, split_sizes: list[int], unmasked_ids: torch.LongTensor = None): |
| """ |
| visual_embeds (torch.Tensor): (T, L, C) |
| split_sizes (list[int]): [t0, t1, ...] sum_i ti=T |
| unmasked_ids (torch.LongTensor): If provided, should be in the shape of (T, L) whose value v 0<=v<=HW-1 |
| output_attentions (_type_, optional): _description_. Defaults to None. |
| output_hidden_states (_type_, optional): _description_. Defaults to None. |
| return_dict (_type_, optional): _description_. Defaults to None. |
| """ |
| _, L, C = visual_embeds.size() |
|
|
| |
| if self.spatial_pos_embed is not None: |
| pos_embed = self.spatial_pos_embed.view(-1, C) |
| if unmasked_ids is None: |
| pos_embed = pos_embed.view(1, L, C) |
| else: |
| pos_embed = pos_embed[unmasked_ids] |
| visual_embeds = visual_embeds + pos_embed |
|
|
| |
| if len(set(split_sizes)) == 1: |
| visual_embeds = visual_embeds.view(len(split_sizes), split_sizes[0], L, C) |
| output = self.batch_forward_no_spatial(visual_embeds) |
| else: |
| visual_embeds = visual_embeds.split(split_sizes, dim=0) |
| |
| output, group_visual_embeds = [None] * len(split_sizes), {} |
| for idx, (embed, t) in enumerate(zip(visual_embeds, split_sizes)): |
| if t in group_visual_embeds: |
| group_visual_embeds[t][0].append(idx) |
| group_visual_embeds[t][1].append(embed) |
| else: |
| group_visual_embeds[t] = [[idx], [embed]] |
| for idx, embeds in group_visual_embeds.values(): |
| cur_output = self.batch_forward_no_spatial(torch.stack(embeds, dim=0)) |
| for i, j in enumerate(idx): |
| output[j] = cur_output[i] |
| output = torch.stack(output, dim=0) |
|
|
| return output |
|
|