| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
|
|
| """ PantagruelUni model.""" |
| import math |
| import warnings |
| from typing import Optional, Tuple, Dict, List, Callable, Any, Union |
| from functools import partial |
| from dataclasses import dataclass |
|
|
| import numpy as np |
|
|
| import torch |
| import torch.nn.functional as F |
| from torch import nn |
| from torch import Tensor |
| from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss |
|
|
| from transformers import PreTrainedModel |
| from transformers.utils import ( |
| ModelOutput, TransformersKwargs, auto_docstring |
| ) |
| from transformers.activations import ACT2FN, gelu |
| from transformers.modeling_attn_mask_utils import ( |
| _prepare_4d_attention_mask, _prepare_4d_attention_mask_for_sdpa |
| ) |
| from transformers.utils.generic import can_return_tuple |
| from transformers.processing_utils import Unpack |
| from transformers.modeling_outputs import ( |
| MaskedLMOutput, |
| MultipleChoiceModelOutput, |
| QuestionAnsweringModelOutput, |
| SequenceClassifierOutput, |
| TokenClassifierOutput, |
| CausalLMOutput, |
| XVectorOutput, |
| ) |
| from transformers.utils import auto_docstring, is_peft_available |
| from .configuration_pantagruel_uni import ( |
| PantagruelUniConfig, |
| PantagruelModalityConfig, |
| PantagruelAudioConfig, |
| PantagruelTextConfig, |
| ) |
|
|
| from .utils_pantagruel_uni import ( |
| _learned_alibi_bias, |
| gather_unmasked, |
| gather_unmasked_mask, |
| masked_alibi, |
| random_masking, |
| get_alibi_bias, |
| compute_mask_indices, |
| index_put, |
| MaskInfo, MaskSeed, |
| make_positions, |
| ) |
|
|
|
|
| @dataclass |
| class PantagruelUniBaseModelOutput(ModelOutput): |
| last_hidden_state: Optional[torch.FloatTensor] = None |
| pooler_output: Optional[torch.FloatTensor] = None |
| local_features: Optional[torch.FloatTensor] = None |
| hidden_states: Optional[tuple[torch.FloatTensor, ...]] = None |
| attentions: Optional[tuple[torch.FloatTensor, ...]] = None |
|
|
|
|
| |
| class GradMultiply(torch.autograd.Function): |
| @staticmethod |
| def forward(ctx, x, scale): |
| ctx.scale = scale |
| res = x.new(x) |
| return res |
|
|
| @staticmethod |
| def backward(ctx, grad): |
| return grad * ctx.scale, None |
|
|
|
|
| |
| class TransposeLast(nn.Module): |
| def __init__(self, deconstruct_idx=None, tranpose_dim=-2): |
| super().__init__() |
| self.deconstruct_idx = deconstruct_idx |
| self.tranpose_dim = tranpose_dim |
|
|
| def forward(self, x): |
| if self.deconstruct_idx is not None: |
| x = x[self.deconstruct_idx] |
| return x.transpose(self.tranpose_dim, -1) |
| |
|
|
| |
| class Fp32LayerNorm(nn.LayerNorm): |
| def __init__(self, *args, **kwargs): |
| super().__init__(*args, **kwargs) |
|
|
| def forward(self, input): |
| output = F.layer_norm( |
| input.float(), |
| self.normalized_shape, |
| self.weight.float() if self.weight is not None else None, |
| self.bias.float() if self.bias is not None else None, |
| self.eps, |
| ) |
| return output.type_as(input) |
|
|
|
|
| def LayerNorm(normalized_shape, eps=1e-5, elementwise_affine=True): |
| return torch.nn.LayerNorm(normalized_shape, eps, elementwise_affine) |
|
|
|
|
| |
| class Fp32GroupNorm(nn.GroupNorm): |
| def __init__(self, *args, **kwargs): |
| super().__init__(*args, **kwargs) |
|
|
| def forward(self, input): |
| output = F.group_norm( |
| input.float(), |
| self.num_groups, |
| self.weight.float() if self.weight is not None else None, |
| self.bias.float() if self.bias is not None else None, |
| self.eps, |
| ) |
| return output.type_as(input) |
|
|
|
|
| |
| class SamePad(nn.Module): |
| def __init__(self, kernel_size, causal=False): |
| super().__init__() |
| if causal: |
| self.remove = kernel_size - 1 |
| else: |
| self.remove = 1 if kernel_size % 2 == 0 else 0 |
|
|
| def forward(self, x): |
| if self.remove > 0: |
| x = x[:, :, : -self.remove] |
| return x |
|
|
|
|
| |
| class ConvFeatureExtractionModel(nn.Module): |
| def __init__( |
| self, |
| conv_layers: List[Tuple[int, int, int]], |
| dropout: float = 0.0, |
| mode: str = "default", |
| conv_bias: bool = False, |
| ): |
| super().__init__() |
|
|
| assert mode in {"default", "layer_norm"} |
|
|
| def block( |
| n_in, |
| n_out, |
| k, |
| stride, |
| is_layer_norm=False, |
| is_group_norm=False, |
| conv_bias=False, |
| ): |
| def make_conv(): |
| conv = nn.Conv1d(n_in, n_out, k, stride=stride, bias=conv_bias) |
| nn.init.kaiming_normal_(conv.weight) |
| return conv |
|
|
| assert ( |
| is_layer_norm and is_group_norm |
| ) == False, "layer norm and group norm are exclusive" |
|
|
| if is_layer_norm: |
| return nn.Sequential( |
| make_conv(), |
| nn.Dropout(p=dropout), |
| nn.Sequential( |
| TransposeLast(), |
| Fp32LayerNorm(dim, elementwise_affine=True), |
| TransposeLast(), |
| ), |
| nn.GELU(), |
| ) |
| elif is_group_norm: |
| return nn.Sequential( |
| make_conv(), |
| nn.Dropout(p=dropout), |
| Fp32GroupNorm(dim, dim, affine=True), |
| nn.GELU(), |
| ) |
| else: |
| return nn.Sequential(make_conv(), nn.Dropout(p=dropout), nn.GELU()) |
|
|
| in_d = 1 |
| self.conv_layers = nn.ModuleList() |
| for i, cl in enumerate(conv_layers): |
| assert len(cl) == 3, "invalid conv definition: " + str(cl) |
| (dim, k, stride) = cl |
|
|
| self.conv_layers.append( |
| block( |
| in_d, |
| dim, |
| k, |
| stride, |
| is_layer_norm=mode == "layer_norm", |
| is_group_norm=mode == "default" and i == 0, |
| conv_bias=conv_bias, |
| ) |
| ) |
| in_d = dim |
|
|
| def forward(self, x): |
|
|
| |
| x = x.unsqueeze(1) |
|
|
| for conv in self.conv_layers: |
| x = conv(x) |
|
|
| return x |
| |
|
|
| |
| class AltAttention(nn.Module): |
| def __init__( |
| self, |
| dim, |
| num_heads=8, |
| qkv_bias=False, |
| qk_scale=None, |
| attn_drop=0.0, |
| proj_drop=0.0, |
| cosine_attention=False, |
| ): |
| super().__init__() |
| self.num_heads = num_heads |
| head_dim = dim // num_heads |
| self.scale = qk_scale or head_dim ** -0.5 |
|
|
| self.qkv = nn.Linear(dim, dim * 3, bias=qkv_bias) |
| |
| self.attn_drop = attn_drop |
| self.proj = nn.Linear(dim, dim) |
| |
| self.proj_drop = proj_drop |
|
|
| self.cosine_attention = cosine_attention |
|
|
| if cosine_attention: |
| self.logit_scale = nn.Parameter( |
| torch.log(10 * torch.ones((num_heads, 1, 1))), requires_grad=True |
| ) |
|
|
| def forward(self, x, padding_mask=None, alibi_bias=None, fast=True): |
| B, N, C = x.shape |
| qkv = ( |
| self.qkv(x) |
| .reshape(B, N, 3, self.num_heads, C // self.num_heads) |
| .permute(2, 0, 3, 1, 4) |
| ) |
| q, k, v = ( |
| qkv[0], |
| qkv[1], |
| qkv[2], |
| ) |
|
|
| dtype = q.dtype |
|
|
| attn = None |
| if not fast: |
| if self.cosine_attention: |
| |
| attn = F.normalize(q, dim=-1) @ F.normalize(k, dim=-1).transpose(-2, -1) |
| logit_scale = torch.clamp( |
| self.logit_scale, max=torch.log(torch.tensor(1.0 / 0.01)) |
| ).exp() |
| attn = attn * logit_scale |
| else: |
| q = q * self.scale |
| attn = q @ k.transpose(-2, -1) |
|
|
| if alibi_bias is not None: |
| attn = attn.type_as(alibi_bias) |
| attn[:, : alibi_bias.size(1)] += alibi_bias |
|
|
| if padding_mask is not None and padding_mask.any(): |
| attn = attn.masked_fill( |
| padding_mask.unsqueeze(1).unsqueeze(2).to(torch.bool), |
| float("-inf"), |
| ) |
|
|
| attn = attn.softmax(dim=-1, dtype=torch.float32).to(dtype=dtype) |
| |
| attn = F.dropout(attn, p=self.attn_drop if self.training else 0.0) |
| x = (attn @ v).transpose(1, 2) |
| else: |
| |
| assert not self.cosine_attention, "Not support cosine attention yet" |
| |
| if padding_mask is not None and padding_mask.any(): |
| if alibi_bias is not None: |
| padding_mask = alibi_bias.masked_fill( |
| padding_mask.unsqueeze(1).unsqueeze(2).to(torch.bool), |
| float("-inf"), |
| ).to(dtype=dtype) |
| else: |
| padding_mask = padding_mask.unsqueeze(1).unsqueeze(2).to( |
| torch.bool).to(dtype=dtype) |
| else: |
| if alibi_bias is not None: |
| padding_mask = alibi_bias.to(dtype=dtype) |
| else: |
| padding_mask = None |
|
|
| x = F.scaled_dot_product_attention(q, k, v, |
| attn_mask=padding_mask, |
| dropout_p=self.attn_drop if self.training else 0.0, |
| scale=self.scale).transpose(1, 2) |
|
|
| x = x.reshape(B, N, C) |
| x = self.proj(x) |
| x = F.dropout(x, p=self.proj_drop if self.training else 0.0) |
|
|
| return x, attn |
| |
|
|
| |
| class AltBlock(nn.Module): |
| def __init__( |
| self, |
| dim, |
| num_heads, |
| mlp_ratio=4.0, |
| qkv_bias=False, |
| qk_scale=None, |
| drop=0.0, |
| attn_drop=0.0, |
| mlp_drop=0.0, |
| post_mlp_drop=0.0, |
| drop_path=0.0, |
| act_layer=nn.GELU, |
| norm_layer=nn.LayerNorm, |
| layer_norm_first=True, |
| ffn_targets=False, |
| cosine_attention=False, |
| ): |
| super().__init__() |
|
|
| self.layer_norm_first = layer_norm_first |
| self.ffn_targets = ffn_targets |
|
|
| from timm.models.vision_transformer import DropPath, Mlp |
|
|
| self.norm1 = norm_layer(dim) |
| self.attn = AltAttention( |
| dim, |
| num_heads=num_heads, |
| qkv_bias=qkv_bias, |
| qk_scale=qk_scale, |
| attn_drop=attn_drop, |
| proj_drop=drop, |
| cosine_attention=cosine_attention, |
| ) |
|
|
| self.drop_path = DropPath(drop_path) if drop_path > 0.0 else nn.Identity() |
| self.norm2 = norm_layer(dim) |
| mlp_hidden_dim = int(dim * mlp_ratio) |
| self.mlp = Mlp( |
| in_features=dim, |
| hidden_features=mlp_hidden_dim, |
| act_layer=act_layer, |
| drop=mlp_drop, |
| ) |
| self.post_mlp_dropout = nn.Dropout(post_mlp_drop, inplace=False) |
|
|
| def forward(self, x, padding_mask=None, alibi_bias=None, fast=True): |
| if self.layer_norm_first: |
| _x, _attn = self.attn(self.norm1(x), padding_mask, alibi_bias, fast=fast) |
| x = x + self.drop_path(_x) |
| r = x = self.mlp(self.norm2(x)) |
| t = x |
| x = r + self.drop_path(self.post_mlp_dropout(x)) |
| if not self.ffn_targets: |
| t = x |
| else: |
| _x, _attn = self.attn(x, padding_mask, alibi_bias, fast=fast) |
| x = x + self.drop_path(_x) |
| r = x = self.norm1(x) |
| x = self.mlp(x) |
| t = x |
| x = self.norm2(r + self.drop_path(self.post_mlp_dropout(x))) |
| if not self.ffn_targets: |
| t = x |
|
|
| return x, t, _attn |
|
|
|
|
| |
| class BlockEncoder(nn.Module): |
| def __init__(self, blocks, norm_layer, layer_norm_first, layerdrop, dropout): |
| super().__init__() |
| self.blocks = blocks |
| self.norm = norm_layer |
| self.layer_norm_first = layer_norm_first |
| self.layerdrop = layerdrop |
| self.dropout = nn.Dropout(dropout, inplace=True) |
|
|
| def forward(self, x, padding_mask, alibi_bias, alibi_scale): |
| if self.norm is not None and not self.layer_norm_first: |
| x = self.norm(x) |
|
|
| x = self.dropout(x) |
|
|
| for i, blk in enumerate(self.blocks): |
| if ( |
| not self.training |
| or self.layerdrop == 0 |
| or (np.random.random() > self.layerdrop) |
| ): |
| ab = alibi_bias |
| if ab is not None and alibi_scale is not None: |
| scale = ( |
| alibi_scale[i] |
| if alibi_scale.size(0) > 1 |
| else alibi_scale.squeeze(0) |
| ) |
| ab = ab * scale.type_as(ab) |
| x, _, _ = blk(x, padding_mask, ab) |
|
|
| if self.norm is not None and self.layer_norm_first: |
| x = self.norm(x) |
|
|
| return x |
| |
|
|
| |
| class ModalitySpecificEncoder(nn.Module): |
| def __init__( |
| self, |
| modality_cfg: PantagruelModalityConfig, |
| embed_dim: int, |
| local_encoder: nn.Module, |
| project_features: nn.Module, |
| fixed_positional_encoder: Optional[nn.Module], |
| relative_positional_encoder: Optional[nn.Module], |
| context_encoder: nn.Module, |
| decoder: nn.Module, |
| get_alibi_bias: Optional[Callable[[int, int, str, str], torch.Tensor]], |
| ): |
| super().__init__() |
|
|
| self.modality_cfg = modality_cfg |
| self.local_encoder = local_encoder |
| self.project_features = project_features |
| self.fixed_positional_encoder = fixed_positional_encoder |
| self.relative_positional_encoder = relative_positional_encoder |
| self.context_encoder = context_encoder |
|
|
| self.decoder = None |
| self.get_alibi_bias = get_alibi_bias if modality_cfg.use_alibi_encoder else None |
|
|
| self.local_grad_mult = self.modality_cfg.local_grad_mult |
|
|
| self.extra_tokens = None |
| if modality_cfg.num_extra_tokens > 0: |
| self.extra_tokens = nn.Parameter( |
| torch.zeros(1, modality_cfg.num_extra_tokens, embed_dim) |
| ) |
| if not modality_cfg.init_extra_token_zero: |
| nn.init.normal_(self.extra_tokens) |
| elif self.extra_tokens.size(1) > 1: |
| nn.init.normal_(self.extra_tokens[:, 1:]) |
|
|
| self.alibi_scale = None |
| if self.get_alibi_bias is not None: |
| self.alibi_scale = nn.Parameter( |
| torch.full( |
| ( |
| (modality_cfg.prenet_depth + modality_cfg.model_depth) |
| if modality_cfg.learned_alibi_scale_per_layer |
| else 1, |
| 1, |
| self.modality_cfg.num_alibi_heads |
| if modality_cfg.learned_alibi_scale_per_head |
| else 1, |
| 1, |
| 1, |
| ), |
| modality_cfg.alibi_scale, |
| dtype=torch.float, |
| ), |
| requires_grad=modality_cfg.learned_alibi_scale, |
| ) |
|
|
| if modality_cfg.learned_alibi and self.get_alibi_bias is not None: |
| assert modality_cfg.alibi_max_pos is not None |
| alibi_bias = self.get_alibi_bias( |
| batch_size=1, |
| time_steps=modality_cfg.alibi_max_pos, |
| heads=modality_cfg.num_alibi_heads, |
| scale=1.0, |
| dtype=torch.float, |
| device="cpu", |
| ) |
| self.alibi_bias = nn.Parameter(alibi_bias) |
| self.get_alibi_bias = partial( |
| _learned_alibi_bias, alibi_bias=self.alibi_bias |
| ) |
|
|
| |
| def _freeze_parameters(self): |
| for param in self.parameters(): |
| param.requires_grad = False |
| self._requires_grad = False |
|
|
| def convert_padding_mask(self, x, padding_mask): |
| return padding_mask |
|
|
| def local_features(self, features): |
| if self.local_grad_mult > 0: |
| if self.local_grad_mult == 1.0: |
| x = self.local_encoder(features) |
| else: |
| x = GradMultiply.apply( |
| self.local_encoder(features), self.local_grad_mult |
| ) |
| else: |
| with torch.no_grad(): |
| x = self.local_encoder(features) |
|
|
| x = self.project_features(x) |
| return x |
| |
| def contextualized_features( |
| self, |
| x, |
| padding_mask, |
| mask, |
| remove_masked, |
| clone_batch: int = 1, |
| mask_seeds: Optional[torch.Tensor] = None, |
| precomputed_mask=None, |
| ): |
|
|
| if padding_mask is not None: |
| padding_mask = self.convert_padding_mask(x, padding_mask) |
|
|
| local_features = x |
| if mask and clone_batch == 1: |
| local_features = local_features.clone() |
|
|
| orig_B, orig_T, _ = x.shape |
| pre_mask_B = orig_B |
| mask_info = None |
|
|
| x_pos = None |
| if self.fixed_positional_encoder is not None: |
| x = x + self.fixed_positional_encoder(x, padding_mask) |
|
|
| if mask: |
| if clone_batch > 1: |
| x = x.repeat_interleave(clone_batch, 0) |
| if mask_seeds is not None: |
| clone_hash = [ |
| int(hash((mask_seeds.seed, ind)) % 1e10) |
| for ind in range(clone_batch - 1) |
| ] |
| clone_hash = torch.tensor([0] + clone_hash).long().view(1, -1) |
|
|
| id = mask_seeds.ids |
| id = id.repeat_interleave(clone_batch, 0) |
| id = id.view(-1, clone_batch) + clone_hash.to(id) |
| id = id.view(-1) |
| mask_seeds = MaskSeed( |
| seed=mask_seeds.seed, update=mask_seeds.update, ids=id |
| ) |
| if padding_mask is not None: |
| padding_mask = padding_mask.repeat_interleave(clone_batch, 0) |
|
|
| x, mask_info = self.compute_mask( |
| x, |
| padding_mask, |
| mask_seed=mask_seeds, |
| apply=self.relative_positional_encoder is not None or not remove_masked, |
| precomputed_mask=precomputed_mask, |
| ) |
|
|
| if self.relative_positional_encoder is not None: |
| x_pos = self.relative_positional_encoder(x) |
|
|
| masked_padding_mask = padding_mask |
| if mask and remove_masked: |
| x = mask_info.x_unmasked |
| if x_pos is not None: |
| x = x + gather_unmasked(x_pos, mask_info) |
|
|
| if padding_mask is not None and padding_mask.any(): |
| masked_padding_mask = gather_unmasked_mask(padding_mask, mask_info) |
| if not masked_padding_mask.any(): |
| masked_padding_mask = None |
| else: |
| masked_padding_mask = None |
|
|
| elif x_pos is not None: |
| x = x + x_pos |
|
|
| alibi_bias = None |
| alibi_scale = self.alibi_scale |
|
|
| if self.get_alibi_bias is not None: |
| alibi_bias = self.get_alibi_bias( |
| batch_size=pre_mask_B, |
| time_steps=orig_T, |
| heads=self.modality_cfg.num_alibi_heads, |
| dtype=torch.float32, |
| device=x.device, |
| ) |
|
|
| if alibi_scale is not None: |
| alibi_scale = alibi_scale.clamp_min(0) |
| if alibi_scale.size(0) == 1: |
| alibi_bias = alibi_bias * alibi_scale.squeeze(0).type_as(alibi_bias) |
| alibi_scale = None |
|
|
| if clone_batch > 1: |
| alibi_bias = alibi_bias.repeat_interleave(clone_batch, 0) |
|
|
| if mask_info is not None and remove_masked: |
| alibi_bias = masked_alibi(alibi_bias, mask_info) |
|
|
| if self.extra_tokens is not None: |
| num = self.extra_tokens.size(1) |
| x = torch.cat([self.extra_tokens.expand(x.size(0), -1, -1), x], dim=1) |
| if masked_padding_mask is not None: |
| |
| masked_padding_mask = F.pad(masked_padding_mask, (num, 0)) |
| if alibi_bias is not None: |
| |
| alibi_bias = F.pad(alibi_bias, (num, 0, num, 0)) |
|
|
| x = self.context_encoder( |
| x, |
| masked_padding_mask, |
| alibi_bias, |
| alibi_scale[: self.modality_cfg.prenet_depth] |
| if alibi_scale is not None |
| else None, |
| ) |
|
|
| return { |
| "x": x, |
| "local_features": local_features, |
| "padding_mask": masked_padding_mask, |
| "alibi_bias": alibi_bias, |
| "alibi_scale": alibi_scale[self.modality_cfg.prenet_depth :] |
| if alibi_scale is not None and alibi_scale.size(0) > 1 |
| else alibi_scale, |
| "encoder_mask": mask_info, |
| } |
|
|
| def forward( |
| self, |
| features, |
| padding_mask, |
| mask: bool, |
| remove_masked: bool, |
| clone_batch: int = 1, |
| mask_seeds: Optional[torch.Tensor] = None, |
| precomputed_mask=None, |
| ): |
| x = self.local_features(features) |
| return self.contextualized_features( |
| x, |
| padding_mask, |
| mask, |
| remove_masked, |
| clone_batch, |
| mask_seeds, |
| precomputed_mask, |
| ) |
| |
| def compute_mask( |
| self, |
| x, |
| padding_mask, |
| mask_seed: Optional[MaskSeed], |
| apply, |
| precomputed_mask, |
| ): |
| if precomputed_mask is not None: |
| mask = precomputed_mask |
| mask_info = self.make_maskinfo(x, mask) |
| else: |
| B, T, C = x.shape |
| cfg = self.modality_cfg |
|
|
| mask_prob = cfg.mask_prob |
|
|
| if ( |
| cfg.mask_prob_min is not None |
| and cfg.mask_prob_min >= 0 |
| and cfg.mask_prob_min < mask_prob |
| ): |
| mask_prob = np.random.uniform(cfg.mask_prob_min, mask_prob) |
|
|
| if mask_prob > 0: |
| if cfg.mask_length == 1: |
| mask_info = random_masking(x, mask_prob, mask_seed) |
| else: |
| if self.modality_cfg.inverse_mask: |
| mask_prob = 1 - mask_prob |
|
|
| mask = compute_mask_indices( |
| (B, T), |
| padding_mask, |
| mask_prob, |
| cfg.mask_length, |
| min_masks=1, |
| require_same_masks=True, |
| mask_dropout=cfg.mask_dropout, |
| add_masks=cfg.add_masks, |
| seed=mask_seed.seed if mask_seed is not None else None, |
| epoch=mask_seed.update if mask_seed is not None else None, |
| indices=mask_seed.ids if mask_seed is not None else None, |
| ) |
|
|
| mask = torch.from_numpy(mask).to(device=x.device) |
| if self.modality_cfg.inverse_mask: |
| mask = 1 - mask |
| mask_info = self.make_maskinfo(x, mask) |
| else: |
| mask_info = None |
|
|
| if apply: |
| x = self.apply_mask(x, mask_info) |
|
|
| return x, mask_info |
|
|
| def make_maskinfo(self, x, mask, shape=None): |
| if shape is None: |
| B, T, D = x.shape |
| else: |
| B, T, D = shape |
|
|
| mask = mask.to(torch.uint8) |
| ids_shuffle = mask.argsort(dim=1) |
| ids_restore = ids_shuffle.argsort(dim=1).unsqueeze(-1).expand(-1, -1, D) |
|
|
| len_keep = T - mask[0].sum() |
| if self.modality_cfg.keep_masked_pct > 0: |
| len_keep += round((T - int(len_keep)) * self.modality_cfg.keep_masked_pct) |
|
|
| ids_keep = ids_shuffle[:, :len_keep] |
|
|
| if shape is not None: |
| x_unmasked = None |
| else: |
| ids_keep = ids_keep.unsqueeze(-1).expand(-1, -1, D) |
| x_unmasked = torch.gather(x, dim=1, index=ids_keep) |
|
|
| mask_info = MaskInfo( |
| x_unmasked=x_unmasked, |
| mask=mask, |
| ids_restore=ids_restore, |
| ids_keep=ids_keep, |
| ) |
| return mask_info |
|
|
| def apply_mask(self, x, mask_info): |
| cfg = self.modality_cfg |
| B, T, C = x.shape |
|
|
| if mask_info is not None: |
| mask = mask_info.mask |
| if cfg.encoder_zero_mask: |
| x = x * (1 - mask.type_as(x).unsqueeze(-1)) |
| else: |
| num_masks = mask.sum().item() |
| masks = x.new_empty(num_masks, x.size(-1)).normal_( |
| 0, cfg.mask_noise_std |
| ) |
| x = index_put(x, mask, masks) |
| if cfg.mask_channel_prob > 0: |
| mask_channel = compute_mask_indices( |
| (B, C), |
| None, |
| cfg.mask_channel_prob, |
| cfg.mask_channel_length, |
| ) |
| mask_channel = ( |
| torch.from_numpy(mask_channel) |
| .to(x.device) |
| .unsqueeze(1) |
| .expand(-1, T, -1) |
| ) |
| x = index_put(x, mask_channel, 0) |
| return x |
| |
|
|
| |
| class AudioEncoder(ModalitySpecificEncoder): |
|
|
| modality_cfg: PantagruelAudioConfig |
|
|
| def __init__( |
| self, |
| modality_cfg: PantagruelAudioConfig, |
| embed_dim: int, |
| make_block: Callable[[float], nn.ModuleList], |
| norm_layer: Callable[[int], nn.LayerNorm], |
| layer_norm_first: bool, |
| alibi_biases: Dict, |
| ): |
|
|
| self.feature_enc_layers = eval(modality_cfg.feature_encoder_spec) |
| feature_embed_dim = self.feature_enc_layers[-1][0] |
|
|
| local_encoder = ConvFeatureExtractionModel( |
| conv_layers=self.feature_enc_layers, |
| dropout=0.0, |
| mode=modality_cfg.extractor_mode, |
| conv_bias=False, |
| ) |
|
|
| project_features = nn.Sequential( |
| TransposeLast(), |
| nn.LayerNorm(feature_embed_dim), |
| nn.Linear(feature_embed_dim, embed_dim), |
| ) |
|
|
| num_pos_layers = modality_cfg.conv_pos_depth |
| k = max(3, modality_cfg.conv_pos_width // num_pos_layers) |
|
|
| positional_encoder = nn.Sequential( |
| TransposeLast(), |
| *[ |
| nn.Sequential( |
| nn.Conv1d( |
| embed_dim, |
| embed_dim, |
| kernel_size=k, |
| padding=k // 2, |
| groups=modality_cfg.conv_pos_groups, |
| ), |
| SamePad(k), |
| TransposeLast(), |
| LayerNorm(embed_dim, elementwise_affine=False), |
| TransposeLast(), |
| nn.GELU(), |
| ) |
| for _ in range(num_pos_layers) |
| ], |
| TransposeLast(), |
| ) |
|
|
| if modality_cfg.conv_pos_pre_ln: |
| positional_encoder = nn.Sequential(LayerNorm(embed_dim), positional_encoder) |
|
|
| dpr = np.linspace( |
| modality_cfg.start_drop_path_rate, |
| modality_cfg.end_drop_path_rate, |
| modality_cfg.prenet_depth, |
| ) |
| context_encoder = BlockEncoder( |
| nn.ModuleList(make_block(dpr[i]) for i in range(modality_cfg.prenet_depth)), |
| norm_layer(embed_dim) if not layer_norm_first else None, |
| layer_norm_first, |
| modality_cfg.prenet_layerdrop, |
| modality_cfg.prenet_dropout, |
| ) |
|
|
| decoder = None |
|
|
| alibi_bias_fn = partial(get_alibi_bias, alibi_biases=alibi_biases) |
|
|
| super().__init__( |
| modality_cfg=modality_cfg, |
| embed_dim=embed_dim, |
| local_encoder=local_encoder, |
| project_features=project_features, |
| fixed_positional_encoder=None, |
| relative_positional_encoder=positional_encoder, |
| context_encoder=context_encoder, |
| decoder=decoder, |
| get_alibi_bias=alibi_bias_fn, |
| ) |
|
|
| def convert_padding_mask(self, x, padding_mask): |
| def get_feat_extract_output_lengths(input_lengths: torch.LongTensor): |
| """ |
| Computes the output length of the convolutional layers |
| """ |
|
|
| def _conv_out_length(input_length, kernel_size, stride): |
| return torch.floor((input_length - kernel_size) / stride + 1) |
|
|
| for i in range(len(self.feature_enc_layers)): |
| input_lengths = _conv_out_length( |
| input_lengths, |
| self.feature_enc_layers[i][1], |
| self.feature_enc_layers[i][2], |
| ) |
|
|
| return input_lengths.to(torch.long) |
|
|
| if padding_mask is not None: |
| input_lengths = (1 - padding_mask.long()).sum(-1) |
| |
| output_lengths = get_feat_extract_output_lengths(input_lengths) |
|
|
| if padding_mask.any(): |
| padding_mask = torch.zeros(x.shape[:2], dtype=x.dtype, device=x.device) |
|
|
| |
| |
| padding_mask[ |
| ( |
| torch.arange(padding_mask.shape[0], device=padding_mask.device), |
| output_lengths - 1, |
| ) |
| ] = 1 |
| padding_mask = ( |
| 1 - padding_mask.flip([-1]).cumsum(-1).flip([-1]) |
| ).bool() |
| else: |
| padding_mask = torch.zeros( |
| x.shape[:2], dtype=torch.bool, device=x.device |
| ) |
|
|
| return padding_mask |
| |
|
|
| |
| class LearnedPositionalEmbedding(nn.Embedding): |
| """ |
| This module learns positional embeddings up to a fixed maximum size. |
| Padding ids are ignored by either offsetting based on padding_idx |
| or by setting padding_idx to None and ensuring that the appropriate |
| position ids are passed to the forward function. |
| """ |
|
|
| def __init__(self, num_embeddings: int, embedding_dim: int, padding_idx: int): |
| super().__init__(num_embeddings, embedding_dim, padding_idx) |
| self.onnx_trace = False |
| if self.padding_idx is not None: |
| self.max_positions = self.num_embeddings - self.padding_idx - 1 |
| else: |
| self.max_positions = self.num_embeddings |
|
|
| def forward( |
| self, |
| input: Tensor, |
| incremental_state: Optional[Dict[str, Dict[str, Optional[Tensor]]]] = None, |
| positions: Optional[Tensor] = None, |
| ): |
| """Input is expected to be of size [bsz x seqlen].""" |
| assert (positions is None) or ( |
| self.padding_idx is None |
| ), "If positions is pre-computed then padding_idx should not be set." |
|
|
| if positions is None: |
| if incremental_state is not None: |
| |
| |
| positions = torch.zeros( |
| (1, 1), device=input.device, dtype=input.dtype |
| ).fill_(int(self.padding_idx + input.size(1))) |
| else: |
| positions = make_positions( |
| input, self.padding_idx, onnx_trace=self.onnx_trace |
| ) |
| return F.embedding( |
| positions, |
| self.weight, |
| self.padding_idx, |
| self.max_norm, |
| self.norm_type, |
| self.scale_grad_by_freq, |
| self.sparse, |
| ) |
|
|
|
|
| |
| class SinusoidalPositionalEmbedding(nn.Module): |
| """This module produces sinusoidal positional embeddings of any length. |
| |
| Padding symbols are ignored. |
| """ |
|
|
| def __init__(self, embedding_dim, padding_idx, init_size=1024): |
| super().__init__() |
| self.embedding_dim = embedding_dim |
| self.padding_idx = padding_idx if padding_idx is not None else 0 |
| self.register_buffer("weights", SinusoidalPositionalEmbedding.get_embedding( |
| init_size, embedding_dim, padding_idx |
| ), persistent=False) |
| self.max_positions = int(1e5) |
| self.onnx_trace = False |
|
|
| def prepare_for_onnx_export_(self): |
| self.onnx_trace = True |
|
|
| def _load_from_state_dict(self, state_dict, prefix, *args, **kwargs): |
| |
| deprecated_keys = ["weights", "_float_tensor"] |
| for key in deprecated_keys: |
| if prefix + key in state_dict: |
| del state_dict[prefix + key] |
| super()._load_from_state_dict(state_dict, prefix, *args, **kwargs) |
|
|
| @staticmethod |
| def get_embedding( |
| num_embeddings: int, embedding_dim: int, padding_idx: Optional[int] = None |
| ): |
| """Build sinusoidal embeddings. |
| |
| This matches the implementation in tensor2tensor, but differs slightly |
| from the description in Section 3.5 of "Attention Is All You Need". |
| """ |
| half_dim = embedding_dim // 2 |
| emb = math.log(10000) / (half_dim - 1) |
| emb = torch.exp(torch.arange(half_dim, dtype=torch.float) * -emb) |
| emb = torch.arange(num_embeddings, dtype=torch.float).unsqueeze( |
| 1 |
| ) * emb.unsqueeze(0) |
| emb = torch.cat([torch.sin(emb), torch.cos(emb)], dim=1).view( |
| num_embeddings, -1 |
| ) |
| if embedding_dim % 2 == 1: |
| |
| emb = torch.cat([emb, torch.zeros(num_embeddings, 1)], dim=1) |
| if padding_idx is not None: |
| emb[padding_idx, :] = 0 |
| return emb |
|
|
| def forward( |
| self, |
| input, |
| incremental_state: Optional[Any] = None, |
| timestep: Optional[Tensor] = None, |
| positions: Optional[Any] = None, |
| ): |
| """Input is expected to be of size [bsz x seqlen].""" |
| bspair = torch.onnx.operators.shape_as_tensor(input) |
| bsz, seq_len = bspair[0], bspair[1] |
| max_pos = self.padding_idx + 1 + seq_len |
| if max_pos > self.weights.size(0): |
| |
| self.weights = SinusoidalPositionalEmbedding.get_embedding( |
| max_pos, self.embedding_dim, self.padding_idx |
| ).to(self.weights) |
|
|
| if incremental_state is not None: |
| |
| pos = timestep.view(-1)[0] + 1 if timestep is not None else seq_len |
| if self.onnx_trace: |
| return ( |
| self.weights.index_select(index=self.padding_idx + pos, dim=0) |
| .unsqueeze(1) |
| .repeat(bsz, 1, 1) |
| ) |
| return self.weights[self.padding_idx + pos, :].expand(bsz, 1, -1) |
|
|
| positions = make_positions( |
| input, self.padding_idx, onnx_trace=self.onnx_trace |
| ) |
| if self.onnx_trace: |
| flat_embeddings = self.weights.detach().index_select(0, positions.view(-1)) |
| embedding_shape = torch.cat( |
| (bsz.view(1), seq_len.view(1), torch.tensor([-1], dtype=torch.long)) |
| ) |
| embeddings = torch.onnx.operators.reshape_from_tensor_shape( |
| flat_embeddings, embedding_shape |
| ) |
| return embeddings |
| return ( |
| self.weights.index_select(0, positions.view(-1)) |
| .view(bsz, seq_len, -1) |
| .detach() |
| ) |
|
|
|
|
| |
| def PositionalEmbedding( |
| num_embeddings: int, |
| embedding_dim: int, |
| padding_idx: int, |
| learned: bool = False, |
| ): |
| if learned: |
| |
| |
| |
| |
| if padding_idx is not None: |
| num_embeddings = num_embeddings + padding_idx + 1 |
| m = LearnedPositionalEmbedding(num_embeddings, embedding_dim, padding_idx) |
| nn.init.normal_(m.weight, mean=0, std=embedding_dim**-0.5) |
| if padding_idx is not None: |
| nn.init.constant_(m.weight[padding_idx], 0) |
| else: |
| m = SinusoidalPositionalEmbedding( |
| embedding_dim, |
| padding_idx, |
| init_size=num_embeddings + padding_idx + 1, |
| ) |
| return m |
|
|
|
|
| |
| class TextLocalEncoder(nn.Module): |
| def __init__( |
| self, |
| vocab_size, |
| embed_dim, |
| max_source_positions, |
| pad_idx, |
| no_scale_embedding, |
| layernorm_embedding, |
| dropout, |
| no_token_positional_embeddings, |
| learned_pos, |
| ): |
| super().__init__() |
| self.pad_idx = pad_idx |
| self.dropout_module = nn.Dropout(dropout) |
|
|
| self.embed_tokens = nn.Embedding(vocab_size, embed_dim, pad_idx) |
| self.embed_scale = 1.0 if no_scale_embedding else math.sqrt(embed_dim) |
| self.embed_positions = ( |
| PositionalEmbedding( |
| max_source_positions, |
| embed_dim, |
| pad_idx, |
| learned=learned_pos, |
| ) |
| if not no_token_positional_embeddings |
| else None |
| ) |
| self.embed_scale = 1.0 if no_scale_embedding else math.sqrt(embed_dim) |
|
|
| self.layernorm_embedding = None |
| if layernorm_embedding: |
| self.layernorm_embedding = LayerNorm(embed_dim) |
|
|
| def forward(self, src_tokens): |
| x = self.embed_scale * self.embed_tokens(src_tokens) |
| if self.embed_positions is not None: |
| x = x + self.embed_positions(src_tokens) |
|
|
| if self.layernorm_embedding is not None: |
| x = self.layernorm_embedding(x) |
| x = self.dropout_module(x) |
| return x |
| |
|
|
| class TextEncoder(ModalitySpecificEncoder): |
|
|
| modality_cfg: PantagruelTextConfig |
|
|
| def __init__( |
| self, |
| modality_cfg: PantagruelTextConfig, |
| embed_dim: int, |
| make_block: Callable[[float], nn.ModuleList], |
| norm_layer: Callable[[int], nn.LayerNorm], |
| layer_norm_first: bool, |
| alibi_biases: Dict, |
| ): |
| self.pad_idx = modality_cfg.pad_token_id |
| self.vocab_size = modality_cfg.vocab_size |
|
|
| local_encoder = TextLocalEncoder( |
| vocab_size=self.vocab_size, |
| embed_dim=embed_dim, |
| max_source_positions=modality_cfg.max_source_positions, |
| pad_idx=self.pad_idx, |
| no_scale_embedding=modality_cfg.no_scale_embedding, |
| layernorm_embedding=modality_cfg.layernorm_embedding, |
| dropout=modality_cfg.dropout, |
| no_token_positional_embeddings=modality_cfg.no_token_positional_embeddings, |
| learned_pos=modality_cfg.learned_pos, |
| ) |
| dpr = np.linspace( |
| modality_cfg.start_drop_path_rate, |
| modality_cfg.end_drop_path_rate, |
| modality_cfg.prenet_depth, |
| ) |
| context_encoder = BlockEncoder( |
| nn.ModuleList(make_block(dpr[i]) for i in range(modality_cfg.prenet_depth)), |
| norm_layer(embed_dim) |
| if not layer_norm_first and modality_cfg.prenet_depth > 0 |
| else None, |
| layer_norm_first, |
| modality_cfg.prenet_layerdrop, |
| modality_cfg.prenet_dropout if modality_cfg.prenet_depth > 0 else 0.0, |
| ) |
| decoder = None |
|
|
| alibi_bias_fn = partial(get_alibi_bias, alibi_biases=alibi_biases) |
|
|
| super().__init__( |
| modality_cfg=modality_cfg, |
| embed_dim=embed_dim, |
| local_encoder=local_encoder, |
| project_features=nn.Identity(), |
| fixed_positional_encoder=None, |
| relative_positional_encoder=None, |
| context_encoder=context_encoder, |
| decoder=decoder, |
| get_alibi_bias=alibi_bias_fn, |
| ) |
|
|
| def convert_padding_mask(self, x, padding_mask): |
| if padding_mask is None or padding_mask.size(1) == x.size(1): |
| return padding_mask |
|
|
| diff = self.downsample - padding_mask.size(1) % self.downsample |
| if 0 < diff < self.downsample: |
| padding_mask = F.pad(padding_mask, (0, diff), value=True) |
|
|
| padding_mask = padding_mask.view(padding_mask.size(0), -1, self.downsample) |
| padding_mask = padding_mask.all(-1) |
| if padding_mask.size(1) > x.size(1): |
| padding_mask = padding_mask[:, : x.size(1)] |
|
|
| assert x.size(1) == padding_mask.size( |
| 1 |
| ), f"{x.size(1), padding_mask.size(1), diff, self.downsample}" |
|
|
| return padding_mask |
|
|
|
|
| |
| class PantagruelUniTextPooler(nn.Module): |
| def __init__(self, config): |
| super().__init__() |
| self.dense = nn.Linear(config.embed_dim, config.embed_dim) |
| self.activation = nn.Tanh() |
|
|
| def forward(self, hidden_states: torch.Tensor) -> torch.Tensor: |
| |
| |
| first_token_tensor = hidden_states[:, 0] |
| pooled_output = self.dense(first_token_tensor) |
| pooled_output = self.activation(pooled_output) |
| return pooled_output |
|
|
|
|
| |
| class AMSoftmaxLoss(nn.Module): |
| def __init__(self, input_dim, num_labels, scale=30.0, margin=0.4): |
| super().__init__() |
| self.scale = scale |
| self.margin = margin |
| self.num_labels = num_labels |
| self.weight = nn.Parameter(torch.randn(input_dim, num_labels), requires_grad=True) |
| self.loss = nn.CrossEntropyLoss() |
|
|
| def forward(self, hidden_states, labels): |
| labels = labels.flatten() |
| weight = nn.functional.normalize(self.weight, dim=0) |
| hidden_states = nn.functional.normalize(hidden_states, dim=1) |
| cos_theta = torch.mm(hidden_states, weight) |
| psi = cos_theta - self.margin |
|
|
| onehot = nn.functional.one_hot(labels, self.num_labels) |
| logits = self.scale * torch.where(onehot.bool(), psi, cos_theta) |
| loss = self.loss(logits, labels) |
|
|
| return loss |
|
|
|
|
| |
| class TDNNLayer(nn.Module): |
| def __init__(self, config, layer_id=0): |
| super().__init__() |
| self.in_conv_dim = config.tdnn_dim[layer_id - 1] if layer_id > 0 else config.tdnn_dim[layer_id] |
| self.out_conv_dim = config.tdnn_dim[layer_id] |
| self.kernel_size = config.tdnn_kernel[layer_id] |
| self.dilation = config.tdnn_dilation[layer_id] |
|
|
| self.kernel = nn.Linear(self.in_conv_dim * self.kernel_size, self.out_conv_dim) |
| self.activation = nn.ReLU() |
|
|
| def forward(self, hidden_states: torch.Tensor) -> torch.Tensor: |
| if is_peft_available(): |
| from peft.tuners.lora import LoraLayer |
|
|
| if is_peft_available(): |
| if isinstance(self.kernel, LoraLayer): |
| warnings.warn( |
| "Detected LoRA on TDNNLayer. LoRA weights won't be applied due to optimization. " |
| "You should exclude TDNNLayer from LoRA's target modules.", |
| ) |
|
|
| |
| hidden_states = hidden_states.transpose(1, 2) |
| weight = self.kernel.weight.view(self.out_conv_dim, self.kernel_size, self.in_conv_dim).transpose(1, 2) |
| hidden_states = nn.functional.conv1d(hidden_states, weight, self.kernel.bias, dilation=self.dilation) |
| hidden_states = hidden_states.transpose(1, 2) |
|
|
| hidden_states = self.activation(hidden_states) |
| return hidden_states |
|
|
|
|
| @auto_docstring |
| class PantagruelUniPreTrainedModel(PreTrainedModel): |
| config_class = PantagruelUniConfig |
| base_model_prefix = "pantagruel_uni" |
|
|
| |
| |
| def _init_weights(self, module): |
| """Initialize the weights""" |
|
|
| def normal_(data): |
| |
| |
| if not data.is_meta: |
| data.copy_(data.cpu().normal_(mean=0.0, std=0.02).to(data.device)) |
| return data |
|
|
| def _init(module): |
| if isinstance(module, nn.Linear): |
| normal_(module.weight.data) |
| if module.bias is not None: |
| module.bias.data.zero_() |
| if isinstance(module, nn.Embedding): |
| normal_(module.weight.data) |
| if module.padding_idx is not None: |
| module.weight.data[module.padding_idx].zero_() |
| if isinstance(module, AltBlock): |
| normal_(module.attn.proj.weight.data) |
| |
| if isinstance(module, (nn.LayerNorm, nn.GroupNorm)): |
| if module.bias is not None: |
| module.bias.data.zero_() |
| if module.weight is not None: |
| module.weight.data.fill_(1.0) |
| if isinstance(module, nn.Conv1d): |
| nn.init.kaiming_normal_(module.weight) |
| if module.bias is not None: |
| k = math.sqrt(module.groups / (module.in_channels * module.kernel_size[0])) |
| nn.init.uniform_(module.bias, a=-k, b=k) |
|
|
| if isinstance(module, nn.ModuleList): |
| for _, mod in enumerate(module): |
| _init(mod) |
| else: |
| _init(module) |
|
|
| def _get_feat_extract_output_lengths( |
| self, input_lengths: Union[torch.LongTensor, int], add_adapter: Optional[bool] = None |
| ): |
| """ |
| Computes the output length of the convolutional layers |
| """ |
|
|
| add_adapter = self.config.modalities.audio.add_adapter if add_adapter is None else add_adapter |
|
|
| def _conv_out_length(input_length, kernel_size, stride): |
| |
| |
| return torch.div(input_length - kernel_size, stride, rounding_mode="floor") + 1 |
|
|
| for kernel_size, stride in zip(self.config.conv_kernel, self.config.conv_stride): |
| input_lengths = _conv_out_length(input_lengths, kernel_size, stride) |
|
|
| if add_adapter: |
| for _ in range(self.config.num_adapter_layers): |
| input_lengths = _conv_out_length(input_lengths, 1, self.config.adapter_stride) |
|
|
| return input_lengths |
|
|
| def _get_feature_vector_attention_mask( |
| self, feature_vector_length: int, attention_mask: torch.LongTensor, add_adapter=None |
| ): |
| |
| |
| non_padded_lengths = attention_mask.cumsum(dim=-1)[:, -1] |
|
|
| output_lengths = self._get_feat_extract_output_lengths(non_padded_lengths, add_adapter=add_adapter) |
| output_lengths = output_lengths.to(torch.long) |
|
|
| batch_size = attention_mask.shape[0] |
|
|
| attention_mask = torch.zeros( |
| (batch_size, feature_vector_length), dtype=attention_mask.dtype, device=attention_mask.device |
| ) |
| |
| attention_mask[(torch.arange(attention_mask.shape[0], device=attention_mask.device), output_lengths - 1)] = 1 |
| attention_mask = attention_mask.flip([-1]).cumsum(-1).flip([-1]).bool() |
| return attention_mask |
| |
|
|
| @auto_docstring |
| class PantagruelUniModel(PantagruelUniPreTrainedModel): |
|
|
| def __init__( |
| self, config: PantagruelUniConfig, add_pooling_layer: bool = True |
| ): |
| r""" |
| add_pooling_layer (bool, *optional*, defaults to `True`): |
| Whether to add a pooling layer |
| """ |
| super().__init__(config) |
| self.config = config |
| modalities_cfg = config.modalities |
| self.modalities = [config.supported_modality] |
|
|
| make_layer_norm = partial( |
| nn.LayerNorm, eps=config.norm_eps, elementwise_affine=config.norm_affine |
| ) |
|
|
| def make_block(drop_path, dim=None, heads=None): |
| return AltBlock( |
| config.embed_dim if dim is None else dim, |
| config.num_heads if heads is None else heads, |
| config.mlp_ratio, |
| qkv_bias=True, |
| drop=config.encoder_dropout, |
| attn_drop=config.attention_dropout, |
| mlp_drop=config.activation_dropout, |
| post_mlp_drop=config.post_mlp_drop, |
| drop_path=drop_path, |
| norm_layer=make_layer_norm, |
| layer_norm_first=config.layer_norm_first, |
| ffn_targets=not config.end_of_block_targets, |
| ) |
| |
| self.alibi_biases = {} |
| self.modality_encoders = nn.ModuleDict() |
| for mod in self.modalities: |
| mod_cfg = getattr(modalities_cfg, mod.lower()) |
| enc = self.make_modality_encoder( |
| mod_cfg, |
| config.embed_dim, |
| make_block, |
| make_layer_norm, |
| config.layer_norm_first, |
| self.alibi_biases, |
| ) |
| self.modality_encoders[mod] = enc |
|
|
| self.dropout_input = nn.Dropout(config.dropout_input) |
|
|
| dpr = np.linspace(config.start_drop_path_rate, config.end_drop_path_rate, config.depth) |
|
|
| self.blocks = nn.ModuleList([make_block(dpr[i]) for i in range(config.depth)]) |
|
|
| self.text_pooler = None |
| if add_pooling_layer and config.supported_modality == "TEXT": |
| self.text_pooler = PantagruelUniTextPooler(config) |
|
|
| self.norm = None |
| if config.layer_norm_first: |
| self.norm = make_layer_norm(config.embed_dim) |
|
|
| self.num_updates = 0 |
|
|
| |
| self.post_init() |
|
|
| def get_input_embeddings(self): |
| if "TEXT" in self.modality_encoders: |
| return self.modality_encoders["TEXT"].local_encoder.embed_tokens |
|
|
| def set_input_embeddings(self, value): |
| if "TEXT" in self.modality_encoders: |
| self.modality_encoders["TEXT"].local_encoder.embed_tokens = value |
|
|
| def freeze_feature_extractor(self): |
| """ |
| Calling this function will disable the gradient computation for the feature encoder so that its parameters will |
| not be updated during training. |
| """ |
| warnings.warn( |
| "The method `freeze_feature_extractor` is deprecated and will be removed in Transformers v5. " |
| "Please use the equivalent `freeze_feature_encoder` method instead.", |
| FutureWarning, |
| ) |
| self.freeze_feature_encoder() |
|
|
| def freeze_feature_encoder(self): |
| """ |
| Calling this function will disable the gradient computation for the feature encoder so that its parameter will |
| not be updated during training. |
| """ |
| for mod in self.modalities: |
| self.modality_encoders[mod]._freeze_parameters() |
|
|
| def freeze_base_model(self): |
| """ |
| Calling this function will disable the gradient computation for the feature encoder so that its parameter will |
| not be updated during training. |
| """ |
| for mod in self.modalities: |
| self.modality_encoders[mod]._freeze_parameters() |
| for block in self.blocks: |
| for p in block.parameters(): |
| p.requires_grad = False |
|
|
| def make_modality_encoder( |
| self, |
| cfg: PantagruelModalityConfig, |
| embed_dim: int, |
| make_block: Callable[[float], nn.ModuleList], |
| norm_layer: Callable[[int], nn.LayerNorm], |
| layer_norm_first: bool, |
| alibi_biases, |
| ) -> ModalitySpecificEncoder: |
| if cfg.type == "AUDIO": |
| enc_cls = AudioEncoder |
| elif cfg.type == "TEXT": |
| enc_cls = TextEncoder |
| else: |
| raise Exception(f"unsupported modality {cfg.type}") |
|
|
| return enc_cls( |
| cfg, |
| embed_dim, |
| make_block, |
| norm_layer, |
| layer_norm_first, |
| alibi_biases, |
| ) |
| |
| def forward( |
| self, |
| input_values=None, |
| input_ids=None, |
| attention_mask=None, |
| padding_mask=None, |
| mask=False, |
| mode=None, |
| output_hidden_states=True, |
| output_attn_weights=False, |
| return_dict=True, |
| ) -> Union[Tuple, PantagruelUniBaseModelOutput]: |
| r""" |
| Performs a forward pass of the model for either audio or text inputs. |
| |
| The modality is automatically inferred if `mode` is not provided: |
| `"TEXT"` is used when `input_ids` is specified, otherwise `"AUDIO"`. |
| |
| Args: |
| input_values (`torch.FloatTensor`, *optional*): |
| Audio input values of shape `(batch_size, sequence_length)` |
| containing *normalized* audio samples |
| Required when operating in `"AUDIO"` mode. |
| |
| input_ids (`torch.LongTensor`, *optional*): |
| Tokenized text input IDs of shape `(batch_size, sequence_length)`. |
| Required when operating in `"TEXT"` mode. |
| |
| attention_mask (`torch.LongTensor`, *optional*): |
| Attention mask for text inputs, with values in `{0, 1}`: |
| - `1` for tokens that should be attended to, |
| - `0` for tokens that should be masked. |
| If provided and `padding_mask` is `None`, it will be converted internally |
| to a padding mask. |
| |
| padding_mask (`torch.BoolTensor` or `torch.LongTensor`, *optional*): |
| Padding mask indicating which positions are padded: |
| - `1` (or `True`) for padded positions (not attended to), |
| - `0` (or `False`) for non-padded positions. |
| If not provided and `attention_mask` is given, this is inferred as |
| the logical negation of `attention_mask`. |
| |
| mask (`bool`, *optional*, defaults to `False`): |
| Whether to apply input masking. |
| |
| mode (`str`, *optional*): |
| Explicitly specifies the input modality. Supported values are |
| `"TEXT"` and `"AUDIO"`. If `None`, the mode is inferred from the |
| provided inputs. |
| |
| output_hidden_states (`bool`, *optional*, defaults to `True`): |
| Whether to return the hidden states of all layers. |
| |
| output_attn_weights (`bool`, *optional*, defaults to `False`): |
| Whether to return attention weights. |
| |
| return_dict (`bool`, *optional*, defaults to `True`): |
| Whether to return a [`ModelOutput`] instead of a plain tuple. |
| |
| Returns: |
| [`ModelOutput`] or `tuple`: |
| The model outputs. If `return_dict=True`, a [`ModelOutput`] is returned |
| containing (depending on configuration) the final hidden states, |
| optional hidden states from all layers, and optional attention weights. |
| If `return_dict=False`, a tuple is returned with the same contents in |
| a fixed order. |
| """ |
|
|
| if mode is None: |
| mode = "TEXT" if input_ids is not None else "AUDIO" |
|
|
| if padding_mask is None and attention_mask is not None: |
| padding_mask = ~attention_mask.bool() |
|
|
| feature_extractor = self.modality_encoders[mode] |
| extractor_out = feature_extractor( |
| input_ids if input_ids is not None else input_values, |
| padding_mask, |
| mask, |
| remove_masked=False, |
| clone_batch=1, |
| mask_seeds=None, |
| precomputed_mask=None, |
| ) |
| x = extractor_out["x"] |
| local_features = x |
|
|
| |
| masked_padding_mask = extractor_out["padding_mask"] |
| masked_alibi_bias = extractor_out.get("alibi_bias", None) |
| alibi_scale = extractor_out.get("alibi_scale", None) |
|
|
| if self.dropout_input is not None: |
| x = self.dropout_input(x) |
|
|
| layer_results = [] |
| attn_weights = [] |
| for i, blk in enumerate(self.blocks): |
| if ( |
| not self.training |
| or self.config.layerdrop == 0 |
| or (np.random.random() > self.config.layerdrop) |
| ): |
| ab = masked_alibi_bias |
| if ab is not None and alibi_scale is not None: |
| scale = ( |
| alibi_scale[i] |
| if alibi_scale.size(0) > 1 |
| else alibi_scale.squeeze(0) |
| ) |
| ab = ab * scale.type_as(ab) |
|
|
| x, lr, _attn = blk( |
| x, |
| padding_mask=masked_padding_mask, |
| alibi_bias=ab, |
| fast=not output_attn_weights, |
| ) |
| layer_results.append(lr) |
| attn_weights.append(_attn) |
|
|
| if self.norm is not None: |
| x = self.norm(x) |
|
|
| x = x[:, feature_extractor.modality_cfg.num_extra_tokens :] |
| if masked_padding_mask is not None: |
| masked_padding_mask = masked_padding_mask[ |
| :, feature_extractor.modality_cfg.num_extra_tokens : |
| ] |
|
|
| txt_pooled_output = ( |
| self.text_pooler(x) if self.text_pooler is not None else None |
| ) |
|
|
| if not return_dict: |
| return tuple( |
| v |
| for v in [ |
| x, |
| txt_pooled_output, |
| local_features, |
| layer_results, |
| attn_weights, |
| ] |
| if v is not None |
| ) |
| |
| return PantagruelUniBaseModelOutput( |
| last_hidden_state=x, |
| pooler_output=txt_pooled_output, |
| local_features=local_features, |
| hidden_states=layer_results if output_hidden_states else None, |
| attentions=attn_weights if output_attn_weights else None, |
| ) |
|
|
|
|
| class PantagruelTextLMHead(nn.Module): |
| """PantagruelText Head for masked language modeling.""" |
|
|
| def __init__(self, config): |
| super().__init__() |
| self.dense = nn.Linear(config.embed_dim, config.embed_dim) |
| self.layer_norm = nn.LayerNorm(config.embed_dim, eps=config.norm_eps) |
|
|
| self.decoder = nn.Linear(config.embed_dim, config.modalities.text.vocab_size) |
| self.bias = nn.Parameter(torch.zeros(config.modalities.text.vocab_size)) |
| self.decoder.bias = self.bias |
|
|
| def forward(self, features, **kwargs): |
| x = self.dense(features) |
| x = gelu(x) |
| x = self.layer_norm(x) |
|
|
| |
| x = self.decoder(x) |
|
|
| return x |
|
|
| def _tie_weights(self): |
| |
| |
| if self.decoder.bias.device.type == "meta": |
| self.decoder.bias = self.bias |
| else: |
| self.bias = self.decoder.bias |
|
|
|
|
| class PantagruelTextClassificationHead(nn.Module): |
| """Head for sentence-level classification tasks.""" |
|
|
| def __init__(self, config): |
| super().__init__() |
| self.dense = nn.Linear(config.embed_dim, config.embed_dim) |
| classifier_dropout = ( |
| config.classifier_dropout if config.classifier_dropout is not None else config.encoder_dropout |
| ) |
| self.dropout = nn.Dropout(classifier_dropout) |
| self.out_proj = nn.Linear(config.embed_dim, config.num_labels) |
|
|
| def forward(self, features, **kwargs): |
| x = features[:, 0, :] |
| x = self.dropout(x) |
| x = self.dense(x) |
| x = torch.tanh(x) |
| x = self.dropout(x) |
| x = self.out_proj(x) |
| return x |
|
|
|
|
| @auto_docstring |
| class PantagruelUniForMaskedLM(PantagruelUniPreTrainedModel): |
| |
|
|
| def __init__(self, config): |
| super().__init__(config) |
|
|
| if config.is_decoder: |
| logger.warning( |
| "If you want to use `PantagruelTextForMaskedLM` make sure `config.is_decoder=False` for " |
| "bi-directional self-attention." |
| ) |
|
|
| self.pantagruel_uni = PantagruelUniModel(config, add_pooling_layer=False) |
| self.lm_head = PantagruelTextLMHead(config) |
|
|
| |
| self.post_init() |
|
|
| def get_output_embeddings(self): |
| return self.lm_head.decoder |
|
|
| def set_output_embeddings(self, new_embeddings): |
| self.lm_head.decoder = new_embeddings |
|
|
| @can_return_tuple |
| @auto_docstring |
| def forward( |
| self, |
| input_ids: Optional[torch.LongTensor] = None, |
| attention_mask: Optional[torch.FloatTensor] = None, |
| padding_mask: Optional[torch.FloatTensor] = None, |
| labels: Optional[torch.LongTensor] = None, |
| **kwargs: Unpack[TransformersKwargs], |
| ) -> Union[tuple, MaskedLMOutput]: |
| r""" |
| labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*): |
| Labels for computing the masked language modeling loss. Indices should be in `[-100, 0, ..., |
| config.vocab_size]` (see `input_ids` docstring) Tokens with indices set to `-100` are ignored (masked), the |
| loss is only computed for the tokens with labels in `[0, ..., config.vocab_size]` |
| """ |
| outputs = self.pantagruel_uni( |
| input_ids=input_ids, |
| attention_mask=attention_mask, |
| padding_mask=padding_mask, |
| mask=False, |
| mode="TEXT", |
| return_dict=True, |
| ) |
| sequence_output = outputs.last_hidden_state[0] |
| prediction_scores = self.lm_head(sequence_output) |
|
|
| masked_lm_loss = None |
| if labels is not None: |
| loss_fct = CrossEntropyLoss() |
|
|
| labels = labels.to(prediction_scores.device) |
| masked_lm_loss = loss_fct(prediction_scores.view(-1, self.config.vocab_size), labels.view(-1)) |
|
|
| return MaskedLMOutput( |
| loss=masked_lm_loss, |
| logits=prediction_scores, |
| hidden_states=outputs.last_hidden_state, |
| attentions=outputs.attentions, |
| ) |
|
|
|
|
| _HIDDEN_STATES_START_POSITION = 2 |
|
|
|
|
| @auto_docstring( |
| custom_intro=""" |
| PantagruelUniModel with a sequence classification or regression head on top (a linear layer applied to a pooled representation of the sequence). |
| This model supports text and audio modalities. The classification head and internal processing are selected automatically based on the configuration. |
| """ |
| ) |
| class PantagruelUniForSequenceClassification(PantagruelUniPreTrainedModel): |
| def __init__(self, config): |
| super().__init__(config) |
| self.num_labels = config.num_labels |
| self.config = config |
| self.pantagruel_uni = PantagruelUniModel(config, add_pooling_layer=False) |
|
|
| if config.supported_modality == "TEXT": |
| logger.info("Initializing PantagruelUniForSequenceClassification for TEXT") |
| self.classifier = PantagruelTextClassificationHead(config) |
| elif config.supported_modality == "AUDIO": |
| logger.info("Initializing PantagruelUniForSequenceClassification for AUDIO") |
| num_layers = config.num_hidden_layers + 1 |
| if config.modalities.audio.use_weighted_layer_sum: |
| self.layer_weights = nn.Parameter(torch.ones(num_layers) / num_layers) |
| self.projector = nn.Linear(config.hidden_size, config.modalities.audio.classifier_proj_size) |
| self.classifier = nn.Linear(config.modalities.audio.classifier_proj_size, config.num_labels) |
|
|
| |
| self.post_init() |
|
|
| def freeze_feature_extractor(self): |
| """ |
| Calling this function will disable the gradient computation for the feature encoder so that its parameter will |
| not be updated during training. |
| """ |
| warnings.warn( |
| "The method `freeze_feature_extractor` is deprecated and will be removed in Transformers v5. " |
| "Please use the equivalent `freeze_feature_encoder` method instead.", |
| FutureWarning, |
| ) |
| self.freeze_feature_encoder() |
|
|
| def freeze_feature_encoder(self): |
| """ |
| Calling this function will disable the gradient computation for the feature encoder so that its parameter will |
| not be updated during training. |
| """ |
| self.pantagruel_uni.freeze_feature_encoder() |
|
|
| def freeze_base_model(self): |
| """ |
| Calling this function will disable the gradient computation for the base model so that its parameters will not |
| be updated during training. Only the classification head will be updated. |
| """ |
| for param in self.pantagruel_uni.parameters(): |
| param.requires_grad = False |
|
|
| @can_return_tuple |
| @auto_docstring |
| def forward( |
| self, |
| input_values: Optional[torch.FloatTensor] = None, |
| input_ids: Optional[torch.LongTensor] = None, |
| attention_mask: Optional[torch.FloatTensor] = None, |
| padding_mask: Optional[torch.FloatTensor] = None, |
| output_attentions: Optional[bool] = None, |
| output_hidden_states: Optional[bool] = None, |
| return_dict: Optional[bool] = None, |
| labels: Optional[torch.LongTensor] = None, |
| **kwargs: Unpack[TransformersKwargs], |
| ) -> Union[tuple, SequenceClassifierOutput]: |
| r""" |
| Performs a forward pass for sequence classification or regression. |
| |
| This method supports both **text** and **audio** inputs. The modality is inferred |
| from the provided inputs and the model configuration. |
| |
| Args: |
| input_values (`torch.FloatTensor`, *optional*): |
| Audio input values of shape `(batch_size, sequence_length)` |
| containing *normalized* audio samples. |
| input_ids (`torch.LongTensor`, *optional*): |
| Tokenized text input IDs of shape `(batch_size, sequence_length)`. |
| Used when the model is configured for `"TEXT"` modality. |
| |
| labels (`torch.LongTensor` of shape `(batch_size,)`, *optional*): |
| Labels for computing the sequence classification/regression loss. Indices should be in `[0, ..., config.num_labels - 1]`. |
| If `config.num_labels == 1` a regression loss is computed (Mean-Square loss), |
| If `config.num_labels > 1` a classification loss is computed (Cross-Entropy). |
| """ |
| if self.config.supported_modality == "TEXT": |
| outputs = self.pantagruel_uni( |
| input_ids=input_ids, |
| attention_mask=attention_mask, |
| padding_mask=padding_mask, |
| mask=False, |
| mode="TEXT", |
| return_dict=True, |
| ) |
|
|
| sequence_output = outputs.last_hidden_state |
| logits = self.classifier(sequence_output) |
|
|
| loss = None |
| if labels is not None: |
| labels = labels.to(logits.device) |
|
|
| if self.config.problem_type is None: |
| if self.num_labels == 1: |
| self.config.problem_type = "regression" |
| elif self.num_labels > 1 and (labels.dtype == torch.long or labels.dtype == torch.int): |
| self.config.problem_type = "single_label_classification" |
| else: |
| self.config.problem_type = "multi_label_classification" |
|
|
| if self.config.problem_type == "regression": |
| loss_fct = MSELoss() |
| if self.num_labels == 1: |
| loss = loss_fct(logits.squeeze(), labels.squeeze()) |
| else: |
| loss = loss_fct(logits, labels) |
| elif self.config.problem_type == "single_label_classification": |
| loss_fct = CrossEntropyLoss() |
| loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1)) |
| elif self.config.problem_type == "multi_label_classification": |
| loss_fct = BCEWithLogitsLoss() |
| loss = loss_fct(logits, labels) |
|
|
| else: |
| outputs = self.pantagruel_uni( |
| input_values=input_values, |
| attention_mask=attention_mask, |
| mask=False, |
| mode="AUDIO", |
| output_hidden_states=output_hidden_states, |
| output_attn_weights=output_attentions, |
| return_dict=return_dict, |
| ) |
| if self.config.modalities.audio.use_weighted_layer_sum: |
| hidden_states = outputs[_HIDDEN_STATES_START_POSITION] |
| hidden_states = torch.stack(hidden_states, dim=1) |
| norm_weights = nn.functional.softmax(self.layer_weights, dim=-1) |
| hidden_states = (hidden_states * norm_weights.view(-1, 1, 1)).sum(dim=1) |
| else: |
| hidden_states = outputs[0] |
|
|
| hidden_states = self.projector(hidden_states) |
| if attention_mask is None: |
| pooled_output = hidden_states.mean(dim=1) |
| else: |
| padding_mask = self._get_feature_vector_attention_mask(hidden_states.shape[1], attention_mask) |
| expand_padding_mask = padding_mask.unsqueeze(-1).repeat(1, 1, hidden_states.shape[2]) |
| hidden_states[~expand_padding_mask] = 0.0 |
| pooled_output = hidden_states.sum(dim=1) / padding_mask.sum(dim=1).view(-1, 1) |
|
|
| logits = self.classifier(pooled_output) |
|
|
| loss = None |
| if labels is not None: |
| loss_fct = CrossEntropyLoss() |
| loss = loss_fct(logits.view(-1, self.config.num_labels), labels.view(-1)) |
|
|
| if not return_dict: |
| output = (logits,) + outputs[_HIDDEN_STATES_START_POSITION:] |
| return ((loss,) + output) if loss is not None else output |
|
|
| return SequenceClassifierOutput( |
| loss=loss, |
| logits=logits, |
| hidden_states=outputs.last_hidden_state, |
| attentions=outputs.attentions, |
| ) |
|
|
|
|
| @auto_docstring |
| class PantagruelUniForMultipleChoice(PantagruelUniPreTrainedModel): |
| def __init__(self, config): |
| super().__init__(config) |
|
|
| self.pantagruel_uni = PantagruelUniModel(config) |
| self.dropout = nn.Dropout(config.encoder_dropout) |
| self.classifier = nn.Linear(config.embed_dim, 1) |
|
|
| |
| self.post_init() |
|
|
| @can_return_tuple |
| @auto_docstring |
| def forward( |
| self, |
| input_ids: Optional[torch.LongTensor] = None, |
| token_type_ids: Optional[torch.LongTensor] = None, |
| attention_mask: Optional[torch.FloatTensor] = None, |
| padding_mask: Optional[torch.FloatTensor] = None, |
| labels: Optional[torch.LongTensor] = None, |
| position_ids: Optional[torch.LongTensor] = None, |
| inputs_embeds: Optional[torch.FloatTensor] = None, |
| **kwargs: Unpack[TransformersKwargs], |
| ) -> Union[tuple, MultipleChoiceModelOutput]: |
| r""" |
| input_ids (`torch.LongTensor` of shape `(batch_size, num_choices, sequence_length)`): |
| Indices of input sequence tokens in the vocabulary. |
| |
| Indices can be obtained using [`AutoTokenizer`]. See [`PreTrainedTokenizer.encode`] and |
| [`PreTrainedTokenizer.__call__`] for details. |
| |
| [What are input IDs?](../glossary#input-ids) |
| token_type_ids (`torch.LongTensor` of shape `(batch_size, num_choices, sequence_length)`, *optional*): |
| Segment token indices to indicate first and second portions of the inputs. Indices are selected in `[0, |
| 1]`: |
| |
| - 0 corresponds to a *sentence A* token, |
| - 1 corresponds to a *sentence B* token. |
| |
| [What are token type IDs?](../glossary#token-type-ids) |
| labels (`torch.LongTensor` of shape `(batch_size,)`, *optional*): |
| Labels for computing the multiple choice classification loss. Indices should be in `[0, ..., |
| num_choices-1]` where `num_choices` is the size of the second dimension of the input tensors. (See |
| `input_ids` above) |
| """ |
| num_choices = input_ids.shape[1] if input_ids is not None else inputs_embeds.shape[1] |
|
|
| flat_input_ids = input_ids.view(-1, input_ids.size(-1)) if input_ids is not None else None |
| flat_position_ids = position_ids.view(-1, position_ids.size(-1)) if position_ids is not None else None |
| flat_token_type_ids = token_type_ids.view(-1, token_type_ids.size(-1)) if token_type_ids is not None else None |
| flat_attention_mask = attention_mask.view(-1, attention_mask.size(-1)) if attention_mask is not None else None |
| flat_inputs_embeds = ( |
| inputs_embeds.view(-1, inputs_embeds.size(-2), inputs_embeds.size(-1)) |
| if inputs_embeds is not None |
| else None |
| ) |
|
|
| outputs = self.data2vec_text( |
| input_ids=flat_input_ids, |
| attention_mask=flat_attention_mask, |
| padding_mask=flat_attention_mask, |
| mask=False, |
| mode="TEXT", |
| return_dict=True, |
| ) |
| pooled_output = outputs.pooler_output |
|
|
| pooled_output = self.dropout(pooled_output) |
| logits = self.classifier(pooled_output) |
| reshaped_logits = logits.view(-1, num_choices) |
|
|
| loss = None |
| if labels is not None: |
| loss_fct = CrossEntropyLoss() |
|
|
| labels = labels.to(reshaped_logits.device) |
| loss = loss_fct(reshaped_logits, labels) |
|
|
| return MultipleChoiceModelOutput( |
| loss=loss, |
| logits=reshaped_logits, |
| hidden_states=outputs.hidden_states, |
| attentions=outputs.attentions, |
| ) |
|
|
|
|
| @auto_docstring |
| class PantagruelUniForTokenClassification(PantagruelUniPreTrainedModel): |
| def __init__(self, config): |
| super().__init__(config) |
| self.num_labels = config.num_labels |
|
|
| self.pantagruel_uni = PantagruelUniModel(config, add_pooling_layer=False) |
| classifier_dropout = ( |
| config.classifier_dropout if config.classifier_dropout is not None else config.encoder_dropout |
| ) |
| self.dropout = nn.Dropout(classifier_dropout) |
| self.classifier = nn.Linear(config.embed_dim, config.num_labels) |
|
|
| |
| self.post_init() |
|
|
| @can_return_tuple |
| @auto_docstring |
| def forward( |
| self, |
| input_ids: Optional[torch.LongTensor] = None, |
| attention_mask: Optional[torch.FloatTensor] = None, |
| padding_mask: Optional[torch.FloatTensor] = None, |
| labels: Optional[torch.LongTensor] = None, |
| **kwargs: Unpack[TransformersKwargs], |
| ) -> Union[tuple, TokenClassifierOutput]: |
| r""" |
| labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*): |
| Labels for computing the token classification loss. Indices should be in `[0, ..., config.num_labels - 1]`. |
| """ |
| outputs = self.pantagruel_uni( |
| input_ids=input_ids, |
| attention_mask=attention_mask, |
| padding_mask=padding_mask, |
| mask=False, |
| mode="TEXT", |
| return_dict=True, |
| ) |
|
|
| sequence_output = outputs.last_hidden_state |
|
|
| sequence_output = self.dropout(sequence_output) |
| logits = self.classifier(sequence_output) |
|
|
| loss = None |
| if labels is not None: |
| loss_fct = CrossEntropyLoss() |
|
|
| labels = labels.to(logits.device) |
| loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1)) |
|
|
| return TokenClassifierOutput( |
| loss=loss, |
| logits=logits, |
| hidden_states=outputs.hidden_states, |
| attentions=outputs.attentions, |
| ) |
|
|
|
|
| @auto_docstring |
| class PantagruelUniForQuestionAnswering(PantagruelUniPreTrainedModel): |
| def __init__(self, config): |
| super().__init__(config) |
| self.num_labels = config.num_labels |
|
|
| self.pantagruel_uni = PantagruelUniModel(config, add_pooling_layer=False) |
| self.qa_outputs = nn.Linear(config.embed_dim, config.num_labels) |
|
|
| |
| self.post_init() |
|
|
| @can_return_tuple |
| @auto_docstring |
| def forward( |
| self, |
| input_ids: Optional[torch.LongTensor] = None, |
| attention_mask: Optional[torch.FloatTensor] = None, |
| padding_mask: Optional[torch.FloatTensor] = None, |
| start_positions: Optional[torch.LongTensor] = None, |
| end_positions: Optional[torch.LongTensor] = None, |
| **kwargs: Unpack[TransformersKwargs], |
| ) -> Union[tuple, QuestionAnsweringModelOutput]: |
|
|
| outputs = self.pantagruel_uni( |
| input_ids=input_ids, |
| attention_mask=attention_mask, |
| padding_mask=padding_mask, |
| mask=False, |
| mode="TEXT", |
| return_dict=True, |
| ) |
|
|
| sequence_output = outputs.last_hidden_state[0] |
|
|
| logits = self.qa_outputs(sequence_output) |
| start_logits, end_logits = logits.split(1, dim=-1) |
| start_logits = start_logits.squeeze(-1).contiguous() |
| end_logits = end_logits.squeeze(-1).contiguous() |
|
|
| total_loss = None |
| if start_positions is not None and end_positions is not None: |
| |
| if len(start_positions.size()) > 1: |
| start_positions = start_positions.squeeze(-1) |
| if len(end_positions.size()) > 1: |
| end_positions = end_positions.squeeze(-1) |
| |
| ignored_index = start_logits.size(1) |
| start_positions = start_positions.clamp(0, ignored_index) |
| end_positions = end_positions.clamp(0, ignored_index) |
|
|
| loss_fct = CrossEntropyLoss(ignore_index=ignored_index) |
| start_loss = loss_fct(start_logits, start_positions) |
| end_loss = loss_fct(end_logits, end_positions) |
| total_loss = (start_loss + end_loss) / 2 |
|
|
| return QuestionAnsweringModelOutput( |
| loss=total_loss, |
| start_logits=start_logits, |
| end_logits=end_logits, |
| hidden_states=outputs.hidden_states, |
| attentions=outputs.attentions, |
| ) |
|
|
| class PantagruelUniForAudioFrameClassification(PantagruelUniPreTrainedModel): |
| def __init__(self, config): |
| super().__init__(config) |
|
|
| self.config = config |
|
|
| if hasattr(config.modalities.audio, "add_adapter") and config.modalities.audio.add_adapter: |
| raise ValueError( |
| "Audio frame classification does not support the use of Data2VecAudio adapters (config.add_adapter=True)" |
| ) |
|
|
| self.pantagruel_uni = PantagruelUniModel(config, add_pooling_layer=False) |
| num_layers = config.num_hidden_layers + 1 |
| if config.modalities.audio.use_weighted_layer_sum: |
| self.layer_weights = nn.Parameter(torch.ones(num_layers) / num_layers) |
| self.classifier = nn.Linear(config.hidden_size, config.num_labels) |
| self.num_labels = config.num_labels |
|
|
| self.init_weights() |
|
|
| def freeze_feature_extractor(self): |
| """ |
| Calling this function will disable the gradient computation for the feature encoder so that its parameter will |
| not be updated during training. |
| """ |
| warnings.warn( |
| "The method `freeze_feature_extractor` is deprecated and will be removed in Transformers v5. " |
| "Please use the equivalent `freeze_feature_encoder` method instead.", |
| FutureWarning, |
| ) |
| self.freeze_feature_encoder() |
|
|
| def freeze_feature_encoder(self): |
| """ |
| Calling this function will disable the gradient computation for the feature encoder so that its parameter will |
| not be updated during training. |
| """ |
| self.pantagruel_uni.freeze_feature_encoder() |
|
|
| def freeze_base_model(self): |
| """ |
| Calling this function will disable the gradient computation for the base model so that its parameters will not |
| be updated during training. Only the classification head will be updated. |
| """ |
| for param in self.pantagruel_uni.parameters(): |
| param.requires_grad = False |
|
|
| @auto_docstring |
| def forward( |
| self, |
| input_values: Optional[torch.Tensor], |
| attention_mask: Optional[torch.Tensor] = None, |
| labels: Optional[torch.Tensor] = None, |
| output_attentions: Optional[bool] = None, |
| output_hidden_states: Optional[bool] = None, |
| return_dict: Optional[bool] = None, |
| ) -> Union[tuple, TokenClassifierOutput]: |
| r""" |
| input_values (`torch.FloatTensor` of shape `(batch_size, sequence_length)`): |
| Float values of input raw speech waveform. Values can be obtained by loading a `.flac` or `.wav` audio file |
| into an array of type `list[float]`, a `numpy.ndarray` or a `torch.Tensor`, *e.g.* via the torchcodec library |
| (`pip install torchcodec`) or the soundfile library (`pip install soundfile`). |
| To prepare the array into `input_values`, the [`AutoProcessor`] should be used for padding and conversion |
| into a tensor of type `torch.FloatTensor`. See [`Data2VecAudioProcessor.__call__`] for details. |
| labels (`torch.LongTensor` of shape `(batch_size,)`, *optional*): |
| Labels for computing the sequence classification/regression loss. Indices should be in `[0, ..., |
| config.num_labels - 1]`. If `config.num_labels == 1` a regression loss is computed (Mean-Square loss), If |
| `config.num_labels > 1` a classification loss is computed (Cross-Entropy). |
| """ |
| return_dict = return_dict if return_dict is not None else self.config.use_return_dict |
| output_hidden_states = ( |
| True if self.config.modalities.audio.use_weighted_layer_sum |
| else output_hidden_states |
| ) |
|
|
| outputs = self.pantagruel_uni( |
| input_values=input_values, |
| attention_mask=attention_mask, |
| mask=False, |
| mode="AUDIO", |
| output_hidden_states=output_hidden_states, |
| output_attn_weights=output_attentions, |
| return_dict=return_dict, |
| ) |
|
|
| if self.config.modalities.audio.use_weighted_layer_sum: |
| hidden_states = outputs[_HIDDEN_STATES_START_POSITION] |
| hidden_states = torch.stack(hidden_states, dim=1) |
| norm_weights = nn.functional.softmax(self.layer_weights, dim=-1) |
| hidden_states = (hidden_states * norm_weights.view(-1, 1, 1)).sum(dim=1) |
| else: |
| hidden_states = outputs[0] |
|
|
| logits = self.classifier(hidden_states) |
|
|
| loss = None |
| if labels is not None: |
| loss_fct = CrossEntropyLoss() |
| loss = loss_fct(logits.view(-1, self.num_labels), torch.argmax(labels.view(-1, self.num_labels), axis=1)) |
|
|
| if not return_dict: |
| output = (logits,) + outputs[_HIDDEN_STATES_START_POSITION:] |
| return output |
|
|
| return TokenClassifierOutput( |
| loss=loss, |
| logits=logits, |
| hidden_states=outputs.hidden_states, |
| attentions=outputs.attentions, |
| ) |
|
|
|
|
| @auto_docstring( |
| custom_intro=""" |
| PantagruelUniForCTC Model with a `language modeling` head on top for Connectionist Temporal Classification (CTC). |
| """ |
| ) |
| class PantagruelUniForCTC(PantagruelUniPreTrainedModel): |
| def __init__(self, config): |
| r""" |
| target_lang (`str`, *optional*): |
| Language id of adapter weights. Adapter weights are stored in the format adapter.<lang>.safetensors or |
| adapter.<lang>.bin. Only relevant when using an instance of [`Data2VecAudioForCTC`] with adapters. Uses 'eng' by |
| default. |
| """ |
| super().__init__(config) |
|
|
| self.pantagruel_uni = PantagruelUniModel(config, add_pooling_layer=False) |
| self.dropout = nn.Dropout(config.final_dropout) |
|
|
| if config.modalities.audio.vocab_size is None: |
| raise ValueError( |
| f"You are trying to instantiate {self.__class__} with a configuration that " |
| "does not define the vocabulary size of the language model head. Please " |
| "instantiate the model as follows: `Data2VecAudioForCTC.from_pretrained(..., vocab_size=vocab_size)`. " |
| "or define `vocab_size` of your model's configuration." |
| ) |
| output_hidden_size = ( |
| config.modalities.audio.output_hidden_size if hasattr(config.modalities.audio, "add_adapter") and config.modalities.audio.add_adapter else config.hidden_size |
| ) |
| self.lm_head = nn.Linear(output_hidden_size, config.modalities.audio.vocab_size) |
|
|
| |
| self.post_init() |
| |
| def freeze_feature_extractor(self): |
| """ |
| Calling this function will disable the gradient computation for the feature encoder so that its parameter will |
| not be updated during training. |
| """ |
| warnings.warn( |
| "The method `freeze_feature_extractor` is deprecated and will be removed in Transformers v5. " |
| "Please use the equivalent `freeze_feature_encoder` method instead.", |
| FutureWarning, |
| ) |
| self.freeze_feature_encoder() |
|
|
| def freeze_feature_encoder(self): |
| """ |
| Calling this function will disable the gradient computation for the feature encoder so that its parameter will |
| not be updated during training. |
| """ |
| self.pantagruel_uni.freeze_feature_encoder() |
|
|
| def freeze_base_model(self): |
| """ |
| Calling this function will disable the gradient computation for the base model so that its parameters will not |
| be updated during training. Only the classification head will be updated. |
| """ |
| for param in self.pantagruel_uni.parameters(): |
| param.requires_grad = False |
|
|
| @auto_docstring |
| def forward( |
| self, |
| input_values: Optional[torch.Tensor], |
| attention_mask: Optional[torch.Tensor] = None, |
| output_attentions: Optional[bool] = None, |
| output_hidden_states: Optional[bool] = None, |
| return_dict: Optional[bool] = None, |
| labels: Optional[torch.Tensor] = None, |
| ) -> Union[tuple, CausalLMOutput]: |
| r""" |
| labels (`torch.LongTensor` of shape `(batch_size, target_length)`, *optional*): |
| Labels for connectionist temporal classification. Note that `target_length` has to be smaller or equal to |
| the sequence length of the output logits. Indices are selected in `[-100, 0, ..., config.vocab_size - 1]`. |
| All labels set to `-100` are ignored (masked), the loss is only computed for labels in `[0, ..., |
| config.vocab_size - 1]`. |
| """ |
| return_dict = return_dict if return_dict is not None else self.config.use_return_dict |
|
|
| if labels is not None and labels.max() >= self.config.modalities.audio.vocab_size: |
| raise ValueError(f"Label values must be <= vocab_size: {self.config.modalities.audio.vocab_size}") |
|
|
| outputs = self.pantagruel_uni( |
| input_values=input_values, |
| attention_mask=attention_mask, |
| mask=False, |
| mode="AUDIO", |
| output_hidden_states=output_hidden_states, |
| output_attn_weights=output_attentions, |
| return_dict=return_dict, |
| ) |
|
|
| hidden_states = outputs[0] |
| hidden_states = self.dropout(hidden_states) |
|
|
| logits = self.lm_head(hidden_states) |
|
|
| loss = None |
| if labels is not None: |
| |
| attention_mask = ( |
| attention_mask if attention_mask is not None else torch.ones_like(input_values, dtype=torch.long) |
| ) |
| input_lengths = self._get_feat_extract_output_lengths(attention_mask.sum(-1)).to(torch.long) |
|
|
| |
| |
| labels_mask = labels >= 0 |
| target_lengths = labels_mask.sum(-1) |
| flattened_targets = labels.masked_select(labels_mask) |
|
|
| |
| log_probs = nn.functional.log_softmax(logits, dim=-1, dtype=torch.float32).transpose(0, 1) |
|
|
| with torch.backends.cudnn.flags(enabled=False): |
| loss = nn.functional.ctc_loss( |
| log_probs, |
| flattened_targets, |
| input_lengths, |
| target_lengths, |
| blank=self.config.pad_token_id, |
| reduction=self.config.ctc_loss_reduction, |
| zero_infinity=self.config.ctc_zero_infinity, |
| ) |
|
|
| if not return_dict: |
| output = (logits,) + outputs[_HIDDEN_STATES_START_POSITION:] |
| return ((loss,) + output) if loss is not None else output |
|
|
| return CausalLMOutput( |
| loss=loss, logits=logits, hidden_states=outputs.hidden_states, attentions=outputs.attentions |
| ) |
|
|
|
|
| class PantagruelUniForXVector(PantagruelUniPreTrainedModel): |
| def __init__(self, config): |
| super().__init__(config) |
|
|
| self.config = config |
|
|
| self.pantagruel_uni = PantagruelUniModel(config, add_pooling_layer=False) |
| num_layers = config.num_hidden_layers + 1 |
| if config.modalities.audio.use_weighted_layer_sum: |
| self.layer_weights = nn.Parameter(torch.ones(num_layers) / num_layers) |
| self.projector = nn.Linear(config.hidden_size, config.modalities.audio.tdnn_dim[0]) |
|
|
| tdnn_layers = [ |
| TDNNLayer(config.modalities.audio, i) for i in range(len(config.modalities.audio.tdnn_dim)) |
| ] |
| self.tdnn = nn.ModuleList(tdnn_layers) |
|
|
| self.feature_extractor = nn.Linear( |
| config.modalities.audio.tdnn_dim[-1] * 2, config.modalities.audio.xvector_output_dim |
| ) |
| self.classifier = nn.Linear( |
| config.modalities.audio.xvector_output_dim, config.modalities.audio.xvector_output_dim |
| ) |
|
|
| self.objective = AMSoftmaxLoss( |
| config.modalities.audio.xvector_output_dim, config.num_labels |
| ) |
|
|
| self.init_weights() |
|
|
| def freeze_feature_extractor(self): |
| """ |
| Calling this function will disable the gradient computation for the feature encoder so that its parameter will |
| not be updated during training. |
| """ |
| warnings.warn( |
| "The method `freeze_feature_extractor` is deprecated and will be removed in Transformers v5. " |
| "Please use the equivalent `freeze_feature_encoder` method instead.", |
| FutureWarning, |
| ) |
| self.freeze_feature_encoder() |
|
|
| def freeze_feature_encoder(self): |
| """ |
| Calling this function will disable the gradient computation for the feature encoder so that its parameter will |
| not be updated during training. |
| """ |
| self.pantagruel_uni.freeze_feature_encoder() |
|
|
| def freeze_base_model(self): |
| """ |
| Calling this function will disable the gradient computation for the base model so that its parameters will not |
| be updated during training. Only the classification head will be updated. |
| """ |
| for param in self.pantagruel_uni.parameters(): |
| param.requires_grad = False |
| |
| def _get_tdnn_output_lengths(self, input_lengths: Union[torch.LongTensor, int]): |
| """ |
| Computes the output length of the TDNN layers |
| """ |
|
|
| def _conv_out_length(input_length, kernel_size, stride): |
| |
| |
| return (input_length - kernel_size) // stride + 1 |
|
|
| for kernel_size in self.config.modalities.audio.tdnn_kernel: |
| input_lengths = _conv_out_length(input_lengths, kernel_size, 1) |
|
|
| return input_lengths |
|
|
| @auto_docstring |
| def forward( |
| self, |
| input_values: Optional[torch.Tensor], |
| attention_mask: Optional[torch.Tensor] = None, |
| output_attentions: Optional[bool] = None, |
| output_hidden_states: Optional[bool] = None, |
| return_dict: Optional[bool] = None, |
| labels: Optional[torch.Tensor] = None, |
| ) -> Union[tuple, XVectorOutput]: |
| r""" |
| input_values (`torch.FloatTensor` of shape `(batch_size, sequence_length)`): |
| Float values of input raw speech waveform. Values can be obtained by loading a `.flac` or `.wav` audio file |
| into an array of type `list[float]`, a `numpy.ndarray` or a `torch.Tensor`, *e.g.* via the torchcodec library |
| (`pip install torchcodec`) or the soundfile library (`pip install soundfile`). |
| To prepare the array into `input_values`, the [`AutoProcessor`] should be used for padding and conversion |
| into a tensor of type `torch.FloatTensor`. See [`Data2VecAudioProcessor.__call__`] for details. |
| labels (`torch.LongTensor` of shape `(batch_size,)`, *optional*): |
| Labels for computing the sequence classification/regression loss. Indices should be in `[0, ..., |
| config.num_labels - 1]`. If `config.num_labels == 1` a regression loss is computed (Mean-Square loss), If |
| `config.num_labels > 1` a classification loss is computed (Cross-Entropy). |
| """ |
|
|
| return_dict = return_dict if return_dict is not None else self.config.use_return_dict |
| output_hidden_states = True if self.config.modalities.audio.use_weighted_layer_sum else output_hidden_states |
|
|
| outputs = self.pantagruel_uni( |
| input_values=input_values, |
| attention_mask=attention_mask, |
| mask=False, |
| mode="AUDIO", |
| output_hidden_states=output_hidden_states, |
| output_attn_weights=output_attentions, |
| return_dict=return_dict, |
| ) |
|
|
| if self.config.use_weighted_layer_sum: |
| hidden_states = outputs[_HIDDEN_STATES_START_POSITION] |
| hidden_states = torch.stack(hidden_states, dim=1) |
| norm_weights = nn.functional.softmax(self.layer_weights, dim=-1) |
| hidden_states = (hidden_states * norm_weights.view(-1, 1, 1)).sum(dim=1) |
| else: |
| hidden_states = outputs[0] |
|
|
| hidden_states = self.projector(hidden_states) |
|
|
| for tdnn_layer in self.tdnn: |
| hidden_states = tdnn_layer(hidden_states) |
|
|
| |
| if attention_mask is None: |
| mean_features = hidden_states.mean(dim=1) |
| std_features = hidden_states.std(dim=1) |
| else: |
| feat_extract_output_lengths = self._get_feat_extract_output_lengths(attention_mask.sum(dim=1)) |
| tdnn_output_lengths = self._get_tdnn_output_lengths(feat_extract_output_lengths) |
| mean_features = [] |
| std_features = [] |
| for i, length in enumerate(tdnn_output_lengths): |
| mean_features.append(hidden_states[i, :length].mean(dim=0)) |
| std_features.append(hidden_states[i, :length].std(dim=0)) |
| mean_features = torch.stack(mean_features) |
| std_features = torch.stack(std_features) |
| statistic_pooling = torch.cat([mean_features, std_features], dim=-1) |
|
|
| output_embeddings = self.feature_extractor(statistic_pooling) |
| logits = self.classifier(output_embeddings) |
|
|
| loss = None |
| if labels is not None: |
| loss = self.objective(logits, labels) |
|
|
| if not return_dict: |
| output = (logits, output_embeddings) + outputs[_HIDDEN_STATES_START_POSITION:] |
| return ((loss,) + output) if loss is not None else output |
|
|
| return XVectorOutput( |
| loss=loss, |
| logits=logits, |
| embeddings=output_embeddings, |
| hidden_states=outputs.hidden_states, |
| attentions=outputs.attentions, |
| ) |
|
|
|
|
| __all__ = [ |
| "PantagruelUniForMaskedLM", |
| "PantagruelUniForMultipleChoice", |
| "PantagruelUniForQuestionAnswering", |
| "PantagruelUniForSequenceClassification", |
| "PantagruelUniForTokenClassification", |
| "PantagruelUniModel", |
| "PantagruelUniPreTrainedModel", |
| "PantagruelUniForAudioFrameClassification", |
| ] |