| from typing import Optional, Tuple, Union |
|
|
| import torch |
| import torch.nn as nn |
| from torch.nn import CrossEntropyLoss |
| import torch.utils.checkpoint |
| import torch.utils.checkpoint |
| from transformers.modeling_outputs import Seq2SeqLMOutput |
| from transformers.models.speech_encoder_decoder.modeling_speech_encoder_decoder import ( |
| shift_tokens_right, |
| ) |
| from transformers.models.whisper.modeling_whisper import ( |
| WhisperEncoder, |
| ) |
| from transformers.models.whisper.modeling_whisper import ( |
| WhisperForConditionalGeneration, |
| shift_tokens_right, |
| WhisperModel, |
| ) |
| from transformers.models.whisper.modeling_whisper import sinusoids |
| from transformers.utils import logging |
|
|
| from .config import Seq2SeqLMOutputLosses, Seq2SeqModelOutputLogit, DiCoWConfig |
| from .encoder import CustomLinear, CustomDiagonalLinear, FDDT, DiCoWEncoder |
| from .generation import DiCoWGenerationMixin |
|
|
| logging.set_verbosity_debug() |
| logger = logging.get_logger("transformers") |
|
|
|
|
| class DiCoW(WhisperModel): |
| def __init__(self, config: DiCoWConfig): |
| super().__init__(config) |
| self.encoder = DiCoWEncoder(config) |
|
|
| def forward( |
| self, |
| input_features: Optional[torch.FloatTensor] = None, |
| attention_mask: Optional[torch.LongTensor] = None, |
| decoder_input_ids: Optional[torch.LongTensor] = None, |
| decoder_attention_mask: Optional[torch.LongTensor] = None, |
| head_mask: Optional[torch.Tensor] = None, |
| decoder_head_mask: Optional[torch.Tensor] = None, |
| cross_attn_head_mask: Optional[torch.Tensor] = None, |
| encoder_outputs: Optional[Tuple[Tuple[torch.FloatTensor]]] = None, |
| past_key_values: Optional[Tuple[Tuple[torch.FloatTensor]]] = None, |
| decoder_inputs_embeds: Optional[Tuple[torch.FloatTensor]] = None, |
| decoder_position_ids: Optional[Tuple[torch.LongTensor]] = None, |
| use_cache: Optional[bool] = None, |
| output_attentions: Optional[bool] = None, |
| output_hidden_states: Optional[bool] = None, |
| return_dict: Optional[bool] = None, |
| stno_mask: Optional[torch.FloatTensor] = None, |
| per_group_sizes: Optional[torch.LongTensor] = None, |
| ) -> Union[Tuple[torch.Tensor], Seq2SeqLMOutputLosses]: |
| r""" |
| Returns: |
| |
| Example: |
| ```python |
| >>> import torch |
| >>> from transformers import AutoFeatureExtractor, WhisperModel |
| >>> from datasets import load_dataset |
| |
| >>> model = WhisperModel.from_pretrained("openai/whisper-base") |
| >>> feature_extractor = AutoFeatureExtractor.from_pretrained("openai/whisper-base") |
| >>> ds = load_dataset("hf-internal-testing/librispeech_asr_dummy", "clean", split="validation") |
| >>> inputs = feature_extractor(ds[0]["audio"]["array"], return_tensors="pt") |
| >>> input_features = inputs.input_features |
| >>> decoder_input_ids = torch.tensor([[1, 1]]) * model.config.decoder_start_token_id |
| >>> last_hidden_state = model(input_features, decoder_input_ids=decoder_input_ids).last_hidden_state |
| >>> list(last_hidden_state.shape) |
| [1, 2, 512] |
| ```""" |
| output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions |
| output_hidden_states = ( |
| output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states |
| ) |
| use_cache = use_cache if use_cache is not None else self.config.use_cache |
| return_dict = return_dict if return_dict is not None else self.config.use_return_dict |
|
|
| if encoder_outputs is None: |
| input_features = self._mask_input_features(input_features, attention_mask=attention_mask) |
|
|
| encoder_outputs = self.encoder( |
| input_features, |
| output_attentions=output_attentions, |
| output_hidden_states=True, |
| head_mask=head_mask, |
| return_dict=return_dict, |
| stno_mask=stno_mask, |
| per_group_sizes=per_group_sizes |
| ) |
| |
| |
| |
|
|
| |
| decoder_outputs = self.decoder( |
| input_ids=decoder_input_ids, |
| attention_mask=decoder_attention_mask, |
| encoder_hidden_states=encoder_outputs.hidden_states[-1], |
| head_mask=decoder_head_mask, |
| cross_attn_head_mask=cross_attn_head_mask, |
| past_key_values=past_key_values, |
| inputs_embeds=decoder_inputs_embeds, |
| position_ids=decoder_position_ids, |
| use_cache=use_cache, |
| output_attentions=output_attentions, |
| output_hidden_states=output_hidden_states, |
| return_dict=return_dict, |
| ) |
|
|
| if not return_dict: |
| return decoder_outputs + encoder_outputs |
|
|
| return Seq2SeqModelOutputLogit( |
| last_hidden_state=decoder_outputs.last_hidden_state, |
| past_key_values=decoder_outputs.past_key_values, |
| decoder_hidden_states=decoder_outputs.hidden_states, |
| decoder_attentions=decoder_outputs.attentions, |
| cross_attentions=decoder_outputs.cross_attentions, |
| encoder_last_hidden_state=encoder_outputs.hidden_states[-1], |
| encoder_hidden_states=encoder_outputs.hidden_states, |
| encoder_attentions=encoder_outputs.attentions, |
| encoder_logits=encoder_outputs.logits, |
| ) |
|
|
|
|
| class DiCoWForConditionalGeneration(DiCoWGenerationMixin, WhisperForConditionalGeneration): |
| config_class = DiCoWConfig |
|
|
| def __init__(self, config: DiCoWConfig): |
| super().__init__(config) |
| self.model = DiCoW(config) |
| self.encoder_logits = None |
| self.tokenizer = None |
| self.vad_seek_callback = None |
| self.stno_mask = None |
| self.stno_mask_seek = None |
|
|
| |
| |
| def set_vad_seek_callback(self, vad_seek_callback): |
| self.vad_seek_callback = vad_seek_callback |
|
|
| def set_tokenizer(self, tokenizer): |
| self.tokenizer = tokenizer |
|
|
| def _init_weights(self, module): |
| std = self.config.init_std |
| fddt_init = self.config.fddt_init |
| if isinstance(module, CustomLinear): |
| with torch.no_grad(): |
| if fddt_init == 'random': |
| module.weight.data.normal_(mean=0.0, std=std) |
| if module.bias is not None: |
| module.bias.data.normal_(mean=0.0, std=std) |
| elif fddt_init == 'non-disturbing': |
| module.weight.data = torch.eye(*module.weight.shape).data |
| if module.bias is not None: |
| module.bias.data.zero_() |
| elif fddt_init == 'disparagement': |
| eye = torch.eye(*module.weight.shape) |
| eye *= module.init_eye_val |
| module.weight.data = eye.data |
| if module.bias is not None: |
| module.bias.data.zero_() |
| elif isinstance(module, CustomDiagonalLinear): |
| with torch.no_grad(): |
| if fddt_init == 'random': |
| module.weight.data.normal_(mean=0.0, std=std) |
| if module.bias is not None: |
| module.bias.data.normal_(mean=0.0, std=std) |
| elif fddt_init == 'non-disturbing': |
| module.weight.data = torch.ones_like(module.weight.data).data |
| if module.bias is not None: |
| module.bias.data.zero_() |
| elif fddt_init == 'disparagement': |
| module.weight.data = module.init_eye_val * torch.ones_like(module.weight.data).data |
| if module.bias is not None: |
| module.bias.data.zero_() |
| elif isinstance(module, FDDT): |
| if module.bias_only: |
| if fddt_init == 'random': |
| module.target_linear.data.normal_(mean=0.0, std=std) |
| module.non_target_linear.data.normal_(mean=0.0, std=std) |
| module.overlap_linear.data.normal_(mean=0.0, std=std) |
| module.silence_linear.data.normal_(mean=0.0, std=std) |
| else: |
| module.target_linear.data.zero_() |
| module.non_target_linear.data.zero_() |
| module.overlap_linear.data.zero_() |
| module.silence_linear.data.zero_() |
| elif isinstance(module, (nn.Linear, nn.Conv1d)): |
| module.weight.data.normal_(mean=0.0, std=std) |
| if module.bias is not None: |
| module.bias.data.zero_() |
| elif isinstance(module, nn.Embedding): |
| module.weight.data.normal_(mean=0.0, std=std) |
| if module.padding_idx is not None: |
| module.weight.data[module.padding_idx].zero_() |
| elif isinstance(module, WhisperEncoder): |
| with torch.no_grad(): |
| embed_positions = module.embed_positions.weight |
| embed_positions.copy_(sinusoids(*embed_positions.shape)) |
| elif isinstance(module, nn.LayerNorm): |
| module.reset_parameters() |
| elif isinstance(module, nn.MultiheadAttention): |
| module._reset_parameters() |
| elif isinstance(module, nn.ConvTranspose1d): |
| module.reset_parameters() |
|
|
| def forward( |
| self, |
| input_features: Optional[torch.FloatTensor] = None, |
| stno_mask: Optional[torch.FloatTensor] = None, |
| per_group_sizes: Optional[torch.LongTensor] = None, |
| attention_mask_enc: Optional[torch.LongTensor] = None, |
| attention_mask: Optional[torch.LongTensor] = None, |
| decoder_input_ids: Optional[torch.LongTensor] = None, |
| decoder_attention_mask: Optional[torch.LongTensor] = None, |
| head_mask: Optional[torch.Tensor] = None, |
| decoder_head_mask: Optional[torch.Tensor] = None, |
| cross_attn_head_mask: Optional[torch.Tensor] = None, |
| encoder_outputs: Optional[Tuple[Tuple[torch.FloatTensor]]] = None, |
| past_key_values: Optional[Tuple[Tuple[torch.FloatTensor]]] = None, |
| decoder_inputs_embeds: Optional[Tuple[torch.FloatTensor]] = None, |
| decoder_position_ids: Optional[Tuple[torch.LongTensor]] = None, |
| labels: Optional[torch.LongTensor] = None, |
| upp_labels: Optional[torch.LongTensor] = None, |
| use_cache: Optional[bool] = None, |
| output_attentions: Optional[bool] = None, |
| output_hidden_states: Optional[bool] = None, |
| return_dict: Optional[bool] = None, |
| is_valid: Optional[bool] = None, |
| ) -> Union[Tuple[torch.Tensor], Seq2SeqLMOutput]: |
| r""" |
| labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*): |
| Labels for computing the language modeling loss. Indices should either be in `[0, ..., config.vocab_size]` |
| or -100 (see `input_ids` docstring). Tokens with indices set to `-100` are ignored (masked), the loss is |
| only computed for the tokens with labels in `[0, ..., config.vocab_size]`. |
| |
| Returns: |
| |
| Example: |
| |
| ```python |
| >>> import torch |
| >>> from transformers import AutoProcessor, WhisperForConditionalGeneration |
| >>> from datasets import load_dataset |
| |
| >>> processor = AutoProcessor.from_pretrained("openai/whisper-tiny.en") |
| >>> model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-tiny.en") |
| |
| >>> ds = load_dataset("hf-internal-testing/librispeech_asr_dummy", "clean", split="validation") |
| |
| >>> inputs = processor(ds[0]["audio"]["array"], return_tensors="pt") |
| >>> input_features = inputs.input_features |
| |
| >>> generated_ids = model.generate(inputs=input_features) |
| |
| >>> transcription = processor.batch_decode(generated_ids, skip_special_tokens=True)[0] |
| >>> transcription |
| ' Mr. Quilter is the apostle of the middle classes, and we are glad to welcome his gospel.' |
| ```""" |
| return_dict = return_dict if return_dict is not None else self.config.use_return_dict |
|
|
| if labels is not None: |
| if decoder_input_ids is None and decoder_inputs_embeds is None: |
| decoder_input_ids = shift_tokens_right( |
| labels, self.config.pad_token_id, self.config.decoder_start_token_id |
| ) |
|
|
| outputs = self.model( |
| input_features, |
| attention_mask=attention_mask, |
| decoder_input_ids=decoder_input_ids, |
| encoder_outputs=encoder_outputs, |
| decoder_attention_mask=decoder_attention_mask, |
| head_mask=head_mask, |
| decoder_head_mask=decoder_head_mask, |
| cross_attn_head_mask=cross_attn_head_mask, |
| past_key_values=past_key_values, |
| decoder_inputs_embeds=decoder_inputs_embeds, |
| decoder_position_ids=decoder_position_ids, |
| use_cache=use_cache, |
| output_attentions=output_attentions, |
| output_hidden_states=output_hidden_states, |
| return_dict=return_dict, |
| stno_mask=stno_mask, |
| per_group_sizes=per_group_sizes |
| ) |
|
|
| dec_lm_logits = self.proj_out(outputs.last_hidden_state) |
| enc_lm_logits = outputs.encoder_logits |
|
|
| loss = None |
| ctc_loss = 0 |
|
|
| |
| if is_valid is not None: |
| if self.config.ctc_weight > 0.0: |
| enc_lm_logits = enc_lm_logits[is_valid] |
| dec_lm_logits = dec_lm_logits[is_valid] |
| labels = labels[is_valid] |
| upp_labels = upp_labels[is_valid] |
|
|
| if labels is not None and self.config.ctc_weight > 0.0: |
| enc_labels = labels.clone() |
| for token in self.tokenizer.prefix_tokens: |
| if (enc_labels[:, 0] == token).all(): |
| enc_labels = enc_labels[:, 1:] |
| enc_labels[enc_labels == self.config.eos_token_id] = -100 |
|
|
| ctc_loss = self.get_encoder().get_loss(enc_lm_logits, enc_labels) |
|
|
| if labels is not None: |
| loss_fct = CrossEntropyLoss(reduction='none') |
| |
| labels = labels.to(dec_lm_logits.device) |
| dec_loss1 = loss_fct(dec_lm_logits.view(-1, self.config.vocab_size), labels.reshape(-1)) |
| dec_loss2 = loss_fct(dec_lm_logits.view(-1, self.config.vocab_size), upp_labels.reshape(-1)) |
| dec_loss = torch.hstack((dec_loss1[..., None], dec_loss2[..., None])).min(dim=-1).values.mean() |
| loss = (1 - self.config.ctc_weight) * dec_loss + self.config.ctc_weight * ctc_loss |
|
|
| if not return_dict: |
| output = (dec_lm_logits,) + outputs[1:] |
| return ((loss,) + output) if loss is not None else output |
|
|
| return Seq2SeqLMOutputLosses( |
| loss=loss, |
| logits=dec_lm_logits, |
| past_key_values=outputs.past_key_values, |
| decoder_hidden_states=outputs.decoder_hidden_states, |
| decoder_attentions=outputs.decoder_attentions, |
| cross_attentions=outputs.cross_attentions, |
| encoder_last_hidden_state=outputs.encoder_last_hidden_state, |
| encoder_hidden_states=outputs.encoder_hidden_states, |
| encoder_attentions=outputs.encoder_attentions, |
| encoder_logits=enc_lm_logits, |
| ) |
|
|
| def _get_feat_extract_output_lengths(self, attention_mask: torch.Tensor) -> torch.Tensor: |
| return (self.model.encoder._get_feat_extract_output_lengths(attention_mask) / 4).ceil() |
|
|
| def freeze_except(self, prefixes_to_preheat): |
| for name, param in self.named_parameters(): |
| param.requires_grad = False |
| for prefix in prefixes_to_preheat: |
| if name.startswith(prefix): |
| param.requires_grad = True |
|
|
| def suppress_interactions(self): |
| """This method suppress final projection in CoAttention blocks to let the original information flow through""" |
| for name, param in self.named_parameters(): |
| if "interaction" in name and "cat_proj" in name: |
| with torch.no_grad(): |
| if "bias" in name: |
| param[:] = 0. |
| else: |
| param[:] *= 0.001 |
|
|