|
|
from typing import Any, Optional, Tuple |
|
|
|
|
|
import torch |
|
|
from torch import Tensor, nn |
|
|
from transformers import WhisperConfig |
|
|
from transformers.modeling_outputs import BaseModelOutputWithPastAndCrossAttentions |
|
|
from transformers.models.whisper.modeling_whisper import WhisperEncoder, WhisperEncoderLayer, WhisperFlashAttention2 |
|
|
from transformers.utils import logging |
|
|
from torch.nn.functional import scaled_dot_product_attention |
|
|
|
|
|
logger = logging.get_logger(__name__) |
|
|
|
|
|
|
|
|
class RotaryEmbedding: |
|
|
def __init__(self, dim, rope_ratio=1, original_impl=False): |
|
|
super().__init__() |
|
|
self.dim = dim |
|
|
self.original_impl = original_impl |
|
|
self.rope_ratio = rope_ratio |
|
|
|
|
|
def forward_impl( |
|
|
self, seq_len: int, n_elem: int, dtype: torch.dtype, device: torch.device, base: int = 10000 |
|
|
): |
|
|
"""Enhanced Transformer with Rotary Position Embedding. |
|
|
|
|
|
Derived from: https://github.com/labmlai/annotated_deep_learning_paper_implementations/blob/master/labml_nn/ |
|
|
transformers/rope/__init__.py. MIT License: |
|
|
https://github.com/labmlai/annotated_deep_learning_paper_implementations/blob/master/license. |
|
|
""" |
|
|
|
|
|
base = base * self.rope_ratio |
|
|
theta = 1.0 / (base ** (torch.arange(0, n_elem, 2, dtype=torch.float, device=device) / n_elem)) |
|
|
|
|
|
|
|
|
seq_idx = torch.arange(seq_len, dtype=torch.float, device=device) |
|
|
|
|
|
|
|
|
idx_theta = torch.outer(seq_idx, theta).float() |
|
|
|
|
|
cache = torch.stack([torch.cos(idx_theta), torch.sin(idx_theta)], dim=-1) |
|
|
|
|
|
|
|
|
if dtype in (torch.float16, torch.bfloat16, torch.int8): |
|
|
cache = cache.bfloat16() if dtype == torch.bfloat16 else cache.half() |
|
|
return cache |
|
|
|
|
|
@torch.no_grad() |
|
|
def get_emb(self, max_seq_len, dtype, device): |
|
|
return self.forward_impl( |
|
|
max_seq_len, self.dim, dtype=dtype, device=device, |
|
|
) |
|
|
|
|
|
|
|
|
def apply_rotary_pos_emb(x: torch.Tensor, rope_cache: torch.Tensor) -> torch.Tensor: |
|
|
|
|
|
b, np, sq, hn = x.size(0), x.size(1), x.size(2), x.size(3) |
|
|
rot_dim = rope_cache.shape[-2] * 2 |
|
|
x, x_pass = x[..., :rot_dim], x[..., rot_dim:] |
|
|
|
|
|
rope_cache = rope_cache[:, :sq] |
|
|
xshaped = x.reshape(b, np, sq, rot_dim // 2, 2) |
|
|
rope_cache = rope_cache.view(-1, 1, sq, xshaped.size(3), 2) |
|
|
x_out2 = torch.stack( |
|
|
[ |
|
|
xshaped[..., 0] * rope_cache[..., 0] - xshaped[..., 1] * rope_cache[..., 1], |
|
|
xshaped[..., 1] * rope_cache[..., 0] + xshaped[..., 0] * rope_cache[..., 1], |
|
|
], |
|
|
-1, |
|
|
) |
|
|
x_out2 = x_out2.flatten(3) |
|
|
return torch.cat((x_out2, x_pass), dim=-1) |
|
|
|
|
|
|
|
|
class WhisperRoPEFlashAttn(WhisperFlashAttention2): |
|
|
def __init__(self, *args, **kwargs): |
|
|
super().__init__(*args, **kwargs) |
|
|
|
|
|
def _reshape(self, tensor: torch.Tensor, seq_len: int, bsz: int): |
|
|
return tensor.view(bsz, seq_len, self.num_heads, self.head_dim) |
|
|
|
|
|
def forward( |
|
|
self, |
|
|
hidden_states: torch.Tensor, |
|
|
key_value_states: Optional[torch.Tensor] = None, |
|
|
past_key_value: Optional[Tuple[torch.Tensor]] = None, |
|
|
attention_mask: Optional[torch.Tensor] = None, |
|
|
layer_head_mask: Optional[torch.Tensor] = None, |
|
|
output_attentions: bool = False, |
|
|
rotary_pos_emb: Optional[torch.Tensor] = None, |
|
|
position_ids: Optional[torch.Tensor] = None, |
|
|
) -> Tuple[torch.Tensor, Optional[torch.Tensor], Optional[Tuple[torch.Tensor]]]: |
|
|
|
|
|
if output_attentions: |
|
|
logger.warning_once("WhisperFlashAttention2 attention does not support output_attentions, " |
|
|
"manually calculating attention weights.") |
|
|
|
|
|
|
|
|
|
|
|
is_cross_attention = key_value_states is not None |
|
|
bsz, q_len, _ = hidden_states.size() |
|
|
|
|
|
|
|
|
assert not is_cross_attention, "Cross-attention not supported" |
|
|
key_states = self._reshape(self.k_proj(hidden_states), -1, bsz) |
|
|
query_states = self._reshape(self.q_proj(hidden_states), -1, bsz) |
|
|
if rotary_pos_emb is not None: |
|
|
query_states, key_states = [apply_rotary_pos_emb( |
|
|
i.transpose(1, 2), |
|
|
rotary_pos_emb, |
|
|
).transpose(1, 2) for i in (query_states, key_states)] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
value_states = self._reshape(self.v_proj(hidden_states), -1, bsz) |
|
|
if past_key_value is not None: |
|
|
|
|
|
key_states = torch.cat([past_key_value[0].transpose(1, 2), key_states], dim=1) |
|
|
value_states = torch.cat([past_key_value[1].transpose(1, 2), value_states], dim=1) |
|
|
|
|
|
if self.is_decoder: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
past_key_value = (key_states.transpose(1, 2), value_states.transpose(1, 2)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
input_dtype = query_states.dtype |
|
|
if input_dtype == torch.float32: |
|
|
if torch.is_autocast_enabled(): |
|
|
target_dtype = torch.get_autocast_gpu_dtype() |
|
|
|
|
|
elif hasattr(self.config, "_pre_quantization_dtype"): |
|
|
target_dtype = self.config._pre_quantization_dtype |
|
|
else: |
|
|
target_dtype = self.q_proj.weight.dtype |
|
|
|
|
|
query_states = query_states.to(target_dtype) |
|
|
key_states = key_states.to(target_dtype) |
|
|
value_states = value_states.to(target_dtype) |
|
|
|
|
|
attn_output = scaled_dot_product_attention( |
|
|
query_states.transpose(1, 2), |
|
|
key_states.transpose(1, 2), |
|
|
value_states.transpose(1, 2), |
|
|
attn_mask=None, |
|
|
dropout_p=self.dropout if self.training else 0.0, |
|
|
is_causal=self.is_causal, |
|
|
).transpose(1, 2) |
|
|
|
|
|
attn_output = attn_output.reshape(bsz, q_len, -1) |
|
|
attn_output = self.out_proj(attn_output) |
|
|
|
|
|
if not output_attentions: |
|
|
attn_weights = None |
|
|
else: |
|
|
attn_weights = (query_states.transpose(1, 2) * self.scaling) @ key_states.permute(0, 2, 3, 1) |
|
|
if self.is_causal: |
|
|
causal_mask = torch.triu( |
|
|
torch.ones(q_len, q_len, device=attn_weights.device), diagonal=1, |
|
|
).unsqueeze(0).unsqueeze(0) * -1e9 |
|
|
attn_weights = attn_weights + causal_mask |
|
|
attn_weights = nn.functional.softmax(attn_weights, dim=-1) |
|
|
|
|
|
return attn_output, attn_weights, past_key_value |
|
|
|
|
|
|
|
|
class WhisperSpecialEncoderLayer(WhisperEncoderLayer): |
|
|
def __init__(self, config: WhisperConfig): |
|
|
super().__init__(config) |
|
|
self.self_attn = WhisperRoPEFlashAttn( |
|
|
embed_dim=self.embed_dim, |
|
|
num_heads=config.encoder_attention_heads, |
|
|
dropout=config.attention_dropout, |
|
|
config=config, |
|
|
) |
|
|
|
|
|
def forward( |
|
|
self, |
|
|
hidden_states: torch.Tensor, |
|
|
attention_mask: torch.Tensor, |
|
|
layer_head_mask: torch.Tensor, |
|
|
output_attentions: bool = False, |
|
|
rotary_pos_emb: Optional[torch.Tensor] = None, |
|
|
position_ids: Optional[torch.Tensor] = None, |
|
|
) -> tuple[Tensor, Any]: |
|
|
""" |
|
|
Args: |
|
|
hidden_states (`torch.FloatTensor`): input to the layer of shape `(batch, seq_len, embed_dim)` |
|
|
attention_mask (`torch.FloatTensor`): attention mask of size |
|
|
`(batch, 1, tgt_len, src_len)` where padding elements are indicated by very large negative values. |
|
|
layer_head_mask (`torch.FloatTensor`): mask for attention heads in a given layer of size |
|
|
`(encoder_attention_heads,)`. |
|
|
output_attentions (`bool`, *optional*): |
|
|
Whether or not to return the attentions tensors of all attention layers. See `attentions` under |
|
|
returned tensors for more detail. |
|
|
""" |
|
|
residual = hidden_states |
|
|
hidden_states = self.self_attn_layer_norm(hidden_states) |
|
|
hidden_states, attn_weights, kv_cache = self.self_attn( |
|
|
hidden_states=hidden_states, |
|
|
attention_mask=attention_mask, |
|
|
layer_head_mask=layer_head_mask, |
|
|
output_attentions=output_attentions, |
|
|
rotary_pos_emb=rotary_pos_emb, |
|
|
position_ids=position_ids, |
|
|
) |
|
|
hidden_states = nn.functional.dropout( |
|
|
hidden_states, p=self.dropout, training=self.training |
|
|
) |
|
|
hidden_states = residual + hidden_states |
|
|
|
|
|
residual = hidden_states |
|
|
hidden_states = self.final_layer_norm(hidden_states) |
|
|
hidden_states = self.activation_fn(self.fc1(hidden_states)) |
|
|
hidden_states = nn.functional.dropout( |
|
|
hidden_states, p=self.activation_dropout, training=self.training |
|
|
) |
|
|
hidden_states = self.fc2(hidden_states) |
|
|
hidden_states = nn.functional.dropout( |
|
|
hidden_states, p=self.dropout, training=self.training |
|
|
) |
|
|
hidden_states = residual + hidden_states |
|
|
|
|
|
if hidden_states.dtype == torch.float16 and ( |
|
|
torch.isinf(hidden_states).any() or torch.isnan(hidden_states).any() |
|
|
): |
|
|
clamp_value = torch.finfo(hidden_states.dtype).max - 1000 |
|
|
hidden_states = torch.clamp( |
|
|
hidden_states, min=-clamp_value, max=clamp_value |
|
|
) |
|
|
|
|
|
outputs = (hidden_states, kv_cache) |
|
|
|
|
|
if output_attentions: |
|
|
outputs += (attn_weights,) |
|
|
|
|
|
return outputs |
|
|
|
|
|
class WhisperSpecialEncoder(WhisperEncoder): |
|
|
def __init__( |
|
|
self, |
|
|
config: WhisperConfig, |
|
|
use_rope=False, |
|
|
rope_ratio=1, |
|
|
): |
|
|
super().__init__(config) |
|
|
self.use_rope = use_rope |
|
|
self.layers = nn.ModuleList( |
|
|
[WhisperSpecialEncoderLayer(config) for _ in range(config.encoder_layers)] |
|
|
) |
|
|
if use_rope: |
|
|
self.rotary_embedding = RotaryEmbedding( |
|
|
config.hidden_size // config.encoder_attention_heads // 2, |
|
|
rope_ratio, |
|
|
) |
|
|
|
|
|
def forward( |
|
|
self, |
|
|
input_features, |
|
|
attention_mask=None, |
|
|
head_mask=None, |
|
|
output_attentions=None, |
|
|
output_hidden_states=None, |
|
|
return_dict=None, |
|
|
position_ids=None, |
|
|
): |
|
|
r""" |
|
|
Args: |
|
|
input_features (`torch.LongTensor` of shape `(batch_size, feature_size, sequence_length)`): |
|
|
Float values of mel features extracted from the raw speech waveform. Raw speech waveform can be |
|
|
obtained by loading a `.flac` or `.wav` audio file into an array of type `List[float]` or a |
|
|
`numpy.ndarray`, *e.g.* via the soundfile library (`pip install soundfile`). To prepare the array into |
|
|
`input_features`, the [`AutoFeatureExtractor`] should be used for extracting the mel features, padding |
|
|
and conversion into a tensor of type `torch.FloatTensor`. See [`~WhisperFeatureExtractor.__call__`] |
|
|
attention_mask (`torch.Tensor`)`, *optional*): |
|
|
Whisper does not support masking of the `input_features`, this argument is preserved for compatibility, |
|
|
but it is not used. By default the silence in the input log mel spectrogram are ignored. |
|
|
head_mask (`torch.Tensor` of shape `(encoder_layers, encoder_attention_heads)`, *optional*): |
|
|
Mask to nullify selected heads of the attention modules. Mask values selected in `[0, 1]`: |
|
|
|
|
|
- 1 indicates the head is **not masked**, |
|
|
- 0 indicates the head is **masked**. |
|
|
output_attentions (`bool`, *optional*): |
|
|
Whether or not to return the attentions tensors of all attention layers. See `attentions` under |
|
|
returned tensors for more detail. |
|
|
output_hidden_states (`bool`, *optional*): |
|
|
Whether or not to return the hidden states of all layers. See `hidden_states` under returned tensors |
|
|
for more detail. |
|
|
return_dict (`bool`, *optional*): |
|
|
Whether or not to return a [`~utils.ModelOutput`] instead of a plain tuple. |
|
|
""" |
|
|
output_attentions = ( |
|
|
output_attentions |
|
|
if output_attentions is not None |
|
|
else self.config.output_attentions |
|
|
) |
|
|
output_hidden_states = ( |
|
|
output_hidden_states |
|
|
if output_hidden_states is not None |
|
|
else self.config.output_hidden_states |
|
|
) |
|
|
return_dict = ( |
|
|
return_dict if return_dict is not None else self.config.use_return_dict |
|
|
) |
|
|
|
|
|
|
|
|
inputs_embeds = nn.functional.gelu(self.conv1(input_features)) |
|
|
inputs_embeds = nn.functional.gelu(self.conv2(inputs_embeds)) |
|
|
|
|
|
inputs_embeds = inputs_embeds.permute(0, 2, 1) |
|
|
if self.use_rope: |
|
|
rotary_embs = self.rotary_embedding.get_emb( |
|
|
inputs_embeds.shape[1], |
|
|
inputs_embeds.dtype, |
|
|
inputs_embeds.device, |
|
|
) |
|
|
if position_ids is not None: |
|
|
rotary_embs = rotary_embs[position_ids] |
|
|
else: |
|
|
rotary_embs = rotary_embs[None] |
|
|
hidden_states = inputs_embeds |
|
|
else: |
|
|
rotary_embs = None |
|
|
if position_ids is not None: |
|
|
|
|
|
max_l = self.embed_positions.weight.shape[0] |
|
|
if position_ids.max() >= max_l: |
|
|
print("Pos id max", position_ids.max(), "wrapping") |
|
|
embed_pos = self.embed_positions.weight[position_ids % max_l] |
|
|
else: |
|
|
embed_pos = self.embed_positions.weight[:inputs_embeds.shape[1]] |
|
|
hidden_states = inputs_embeds + embed_pos |
|
|
hidden_states = nn.functional.dropout( |
|
|
hidden_states, p=self.dropout, training=self.training |
|
|
) |
|
|
|
|
|
encoder_states = () if output_hidden_states else None |
|
|
all_attentions = () if output_attentions else None |
|
|
|
|
|
|
|
|
if head_mask is not None: |
|
|
assert head_mask.size()[0] == ( |
|
|
len(self.layers) |
|
|
), f"The head_mask should be specified for {len(self.layers)} layers, but it is for {head_mask.size()[0]}." |
|
|
|
|
|
for idx, encoder_layer in enumerate(self.layers): |
|
|
if output_hidden_states: |
|
|
encoder_states = encoder_states + (hidden_states,) |
|
|
|
|
|
to_drop = False |
|
|
if self.training: |
|
|
dropout_probability = torch.rand([]) |
|
|
if dropout_probability < self.layerdrop: |
|
|
to_drop = True |
|
|
|
|
|
if to_drop: |
|
|
layer_outputs = (None, None) |
|
|
else: |
|
|
if self.gradient_checkpointing and self.training: |
|
|
layer_outputs = self._gradient_checkpointing_func( |
|
|
encoder_layer.__call__, |
|
|
hidden_states, |
|
|
None, |
|
|
(head_mask[idx] if head_mask is not None else None), |
|
|
output_attentions, |
|
|
rotary_embs, |
|
|
position_ids, |
|
|
) |
|
|
else: |
|
|
layer_outputs = encoder_layer( |
|
|
hidden_states, |
|
|
None, |
|
|
layer_head_mask=( |
|
|
head_mask[idx] if head_mask is not None else None |
|
|
), |
|
|
output_attentions=output_attentions, |
|
|
rotary_pos_emb=rotary_embs, |
|
|
position_ids=position_ids, |
|
|
) |
|
|
|
|
|
hidden_states = layer_outputs[0] |
|
|
|
|
|
if output_attentions: |
|
|
all_attentions = all_attentions + (layer_outputs[2],) |
|
|
|
|
|
hidden_states = self.layer_norm(hidden_states) |
|
|
if output_hidden_states: |
|
|
encoder_states = encoder_states + (hidden_states,) |
|
|
|
|
|
if not return_dict: |
|
|
return tuple( |
|
|
v |
|
|
for v in [hidden_states, encoder_states, all_attentions] |
|
|
if v is not None |
|
|
) |
|
|
return BaseModelOutputWithPastAndCrossAttentions( |
|
|
last_hidden_state=hidden_states, |
|
|
hidden_states=encoder_states, |
|
|
attentions=all_attentions, |
|
|
) |
|
|
|