Dixtral / modeling_dixtral.py
Lakoc's picture
Upload DixtralForConditionalGeneration
cc34926 verified
Raw
History Blame Contribute Delete
38.7 kB
# coding=utf-8
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import math
from typing import Callable, Optional, Union, Any, Dict
import wandb
import torch
from torch import nn
from transformers.activations import ACT2FN
from transformers.cache_utils import Cache
from transformers.generation import GenerationMixin
from transformers.modeling_layers import GradientCheckpointingLayer
from transformers.modeling_outputs import BaseModelOutput, BaseModelOutputWithPast, CausalLMOutputWithPast
from transformers.modeling_utils import ALL_ATTENTION_FUNCTIONS, PreTrainedModel
from transformers.processing_utils import Unpack
from transformers.utils import TransformersKwargs, auto_docstring, can_return_tuple, logging
from transformers.utils.generic import check_model_inputs
from transformers.models.auto import AutoModel, AutoModelForCausalLM
from .configuration_dixtral import DixtralConfig, DixtralEncoderConfig
from transformers.models.voxtral import VoxtralConfig
from transformers.generation.utils import GenerationConfig, LogitsProcessorList
from src.models.dicow.FDDT import FDDT
from src.models.dicow.layers import CustomLinear, CustomDiagonalLinear
from src.models.dixtral.decoding import CTCRescorerLogitsProcessorWithPruning
logger = logging.get_logger(__name__)
def eager_attention_forward(
module: nn.Module,
query: torch.Tensor,
key: torch.Tensor,
value: torch.Tensor,
attention_mask: Optional[torch.Tensor],
scaling: Optional[float] = None,
dropout: float = 0.0,
head_mask: Optional[torch.Tensor] = None,
**kwargs,
):
if scaling is None:
scaling = query.size(-1) ** -0.5
attn_weights = torch.matmul(query, key.transpose(2, 3)) * scaling
if attention_mask is not None and attention_mask.ndim == 4:
attn_weights = attn_weights + attention_mask[:, :, :, : key.shape[-2]]
attn_weights = nn.functional.softmax(attn_weights, dim=-1)
if head_mask is not None:
attn_weights = attn_weights * head_mask.view(1, -1, 1, 1)
attn_weights = nn.functional.dropout(attn_weights, p=dropout, training=module.training)
attn_output = torch.matmul(attn_weights, value)
attn_output = attn_output.transpose(1, 2).contiguous()
return attn_output, attn_weights
class CTCProcessorDummy:
def __init__(self):
super().__init__()
self.func = None
def set_func(self,func):
self.func = func
def __call__(self, input_ids_orig: torch.LongTensor, scores: torch.FloatTensor) -> torch.FloatTensor:
return self.func(input_ids_orig, scores)
class VoxtralAttention(nn.Module):
"""Multi-headed attention from 'Attention Is All You Need' paper"""
def __init__(
self,
embed_dim: int,
num_heads: int,
dropout: float = 0.0,
is_decoder: bool = False,
bias: bool = True,
is_causal: bool = False,
layer_idx: Optional[int] = None,
config: Optional[VoxtralConfig] = None,
):
super().__init__()
self.embed_dim = embed_dim
self.num_heads = num_heads
self.dropout = dropout
self.head_dim = embed_dim // num_heads
self.config = config
if (self.head_dim * num_heads) != self.embed_dim:
raise ValueError(
f"embed_dim must be divisible by num_heads (got `embed_dim`: {self.embed_dim}"
f" and `num_heads`: {num_heads})."
)
self.scaling = self.head_dim**-0.5
self.is_decoder = is_decoder
self.is_causal = is_causal
if layer_idx is None and is_decoder:
logger.warning_once(
f"Instantiating a decoder {self.__class__.__name__} without passing `layer_idx` is not recommended and "
"will to errors during the forward call, if caching is used. Please make sure to provide a `layer_idx` "
"when creating this class."
)
self.layer_idx = layer_idx
self.k_proj = nn.Linear(embed_dim, embed_dim, bias=False)
self.v_proj = nn.Linear(embed_dim, embed_dim, bias=bias)
self.q_proj = nn.Linear(embed_dim, embed_dim, bias=bias)
self.out_proj = nn.Linear(embed_dim, embed_dim, bias=bias)
def _shape(self, tensor: torch.Tensor, seq_len: int, bsz: int):
return tensor.view(bsz, seq_len, self.num_heads, self.head_dim).transpose(1, 2).contiguous()
def forward(
self,
hidden_states: torch.Tensor,
attention_mask: Optional[torch.Tensor] = None,
layer_head_mask: Optional[torch.Tensor] = None,
output_attentions: bool = False,
**kwargs,
) -> tuple[torch.Tensor, Optional[torch.Tensor], Optional[tuple[torch.Tensor]]]:
"""Input shape: Batch x Time x Channel"""
bsz, tgt_len, _ = hidden_states.size()
# Scaling is susceptible to floating point arithmetics' inprecisions
# which can lead to different results (this is dependent from model
# to model, e.g. whisper is one such case). We therefore keep the
# original order of scaling to follow the original implementation
# and enforce no scaling (1.0) in the attention call below.
query_states = self._shape(self.q_proj(hidden_states) * self.scaling, tgt_len, bsz)
key_states = self._shape(self.k_proj(hidden_states), -1, bsz)
value_states = self._shape(self.v_proj(hidden_states), -1, bsz)
attention_interface: Callable = eager_attention_forward
if self.config._attn_implementation != "eager":
attention_interface = ALL_ATTENTION_FUNCTIONS[self.config._attn_implementation]
attn_output, attn_weights = attention_interface(
self,
query_states,
key_states,
value_states,
attention_mask,
dropout=0.0 if not self.training else self.dropout,
scaling=1.0,
output_attentions=output_attentions,
head_mask=layer_head_mask,
**kwargs,
)
attn_output = attn_output.reshape(bsz, tgt_len, -1).contiguous()
attn_output = self.out_proj(attn_output)
return attn_output, attn_weights
class VoxtralEncoderLayer(GradientCheckpointingLayer):
def __init__(self, config: VoxtralConfig):
super().__init__()
self.embed_dim = config.d_model
self.self_attn = VoxtralAttention(
embed_dim=self.embed_dim,
num_heads=config.encoder_attention_heads,
dropout=config.attention_dropout,
config=config,
)
self.self_attn_layer_norm = nn.LayerNorm(self.embed_dim)
self.dropout = config.dropout
self.activation_fn = ACT2FN[config.activation_function]
self.activation_dropout = config.activation_dropout
self.fc1 = nn.Linear(self.embed_dim, config.encoder_ffn_dim)
self.fc2 = nn.Linear(config.encoder_ffn_dim, self.embed_dim)
self.final_layer_norm = nn.LayerNorm(self.embed_dim)
def forward(
self,
hidden_states: torch.Tensor,
attention_mask: torch.Tensor,
layer_head_mask: torch.Tensor,
output_attentions: bool = False,
) -> torch.Tensor:
"""
Args:
hidden_states (`torch.FloatTensor`): input to the layer of shape `(batch, seq_len, embed_dim)`
attention_mask (`torch.FloatTensor`): attention mask of size
`(batch, 1, tgt_len, src_len)` where padding elements are indicated by very large negative values.
layer_head_mask (`torch.FloatTensor`): mask for attention heads in a given layer of size
`(encoder_attention_heads,)`.
output_attentions (`bool`, *optional*):
Whether or not to return the attentions tensors of all attention layers. See `attentions` under
returned tensors for more detail.
"""
residual = hidden_states
hidden_states = self.self_attn_layer_norm(hidden_states)
hidden_states, attn_weights = self.self_attn(
hidden_states=hidden_states,
attention_mask=attention_mask,
layer_head_mask=layer_head_mask,
output_attentions=output_attentions,
)
hidden_states = nn.functional.dropout(hidden_states, p=self.dropout, training=self.training)
hidden_states = residual + hidden_states
residual = hidden_states
hidden_states = self.final_layer_norm(hidden_states)
hidden_states = self.activation_fn(self.fc1(hidden_states))
hidden_states = nn.functional.dropout(hidden_states, p=self.activation_dropout, training=self.training)
hidden_states = self.fc2(hidden_states)
hidden_states = nn.functional.dropout(hidden_states, p=self.dropout, training=self.training)
hidden_states = residual + hidden_states
if hidden_states.dtype == torch.float16:
clamp_value = torch.finfo(hidden_states.dtype).max - 1000
hidden_states = torch.clamp(hidden_states, min=-clamp_value, max=clamp_value)
return hidden_states, attn_weights
@auto_docstring
class DixtralPreTrainedModel(PreTrainedModel):
config: DixtralConfig
base_model_prefix = "model"
supports_gradient_checkpointing = True
_no_split_modules = None
_skip_keys_device_placement = "past_key_values"
_supports_flash_attn = True
_supports_sdpa = True
_supports_flex_attn = True
_supports_cache_class = True
_supports_attention_backend = True
_can_compile_fullgraph = True
def _init_weights(self, module):
# important: this ported version of Voxtral isn't meant for training from scratch - only
# inference and fine-tuning - so the proper init weights code has been removed
std = (
self.config.initializer_range
if hasattr(self.config, "initializer_range")
else self.config.audio_config.initializer_range
)
if isinstance(module, (nn.Linear, nn.Conv1d)):
module.weight.data.normal_(mean=0.0, std=std)
if module.bias is not None:
module.bias.data.zero_()
elif isinstance(module, nn.LayerNorm):
module.weight.data.fill_(1.0)
module.bias.data.zero_()
elif isinstance(module, nn.Embedding):
module.weight.data.normal_(mean=0.0, std=std)
if module.padding_idx is not None:
module.weight.data[module.padding_idx].zero_()
elif isinstance(module, (CustomLinear, CustomDiagonalLinear)):
module.reset_parameters()
@auto_docstring(
custom_intro="""
The Voxtral encoder, which is a Whisper encoder.
"""
)
class DixtralEncoder(DixtralPreTrainedModel):
"""
Transformer encoder consisting of *config.encoder_layers* self attention layers. Each layer is a
[`VoxtralEncoderLayer`].
Args:
config: VoxtralEncoderConfig
"""
# Ignore copy
config: DixtralEncoderConfig
main_input_name = "input_features"
_no_split_modules = ["VoxtralEncoderLayer"]
_can_record_outputs = {
"attentions": VoxtralAttention,
"hidden_states": VoxtralEncoderLayer,
}
def __init__(self, config: DixtralEncoderConfig):
super().__init__(config)
self.dropout = config.dropout
self.layerdrop = config.encoder_layerdrop
embed_dim = config.d_model
self.num_mel_bins = config.num_mel_bins
self.padding_idx = config.pad_token_id
self.max_source_positions = config.max_source_positions
self.embed_scale = math.sqrt(embed_dim) if config.scale_embedding else 1.0
self.conv1 = nn.Conv1d(self.num_mel_bins, embed_dim, kernel_size=3, padding=1)
self.conv2 = nn.Conv1d(embed_dim, embed_dim, kernel_size=3, stride=2, padding=1)
self.embed_positions = nn.Embedding(self.max_source_positions, embed_dim)
self.embed_positions.requires_grad_(False)
self.layers = nn.ModuleList([VoxtralEncoderLayer(config) for _ in range(config.encoder_layers)])
self.layer_norm = nn.LayerNorm(config.d_model)
# Ignore copy
self.avg_pooler = nn.AvgPool1d(2, stride=2)
self._init_dicow_components(config)
self.gradient_checkpointing = False
# Initialize weights and apply final processing
self.post_init()
def _init_dicow_components(self, config):
"""Initialize DiCoW-specific components"""
if not config.use_dicow_encoder:
return
# FDDT components
if config.use_fddt:
num_fddts = (config.apply_fddt_to_n_layers
if config.apply_fddt_to_n_layers != -1
else len(self.layers))
self.fddts = nn.ModuleList([
FDDT(
d_model=config.d_model,
non_target_rate=1.0,
fddt_init=config.fddt_init,
is_diagonal=config.fddt_is_diagonal,
bias_only=config.fddt_bias_only,
use_silence=config.fddt_use_silence,
use_target=config.fddt_use_target,
use_overlap=config.fddt_use_overlap,
use_non_target=config.fddt_use_non_target,
)
for _ in range(num_fddts)
])
if config.use_pre_pos_fddt:
self.initial_fddt = FDDT(
d_model=config.d_model,
non_target_rate=config.non_target_fddt_value,
fddt_init=config.fddt_init,
is_diagonal=config.fddt_is_diagonal,
bias_only=config.fddt_bias_only,
use_silence=config.fddt_use_silence,
use_target=config.fddt_use_target,
use_overlap=config.fddt_use_overlap,
use_non_target=config.fddt_use_non_target,
)
# For CTC label processing
self.first_task_token = config.vocab_size - 30 * 50 - 1 - 6
def _freeze_parameters(self):
for param in self.parameters():
param.requires_grad = False
self._requires_grad = False
def get_input_embeddings(self) -> nn.Module:
return self.conv1
def set_input_embeddings(self, value: nn.Module):
self.conv1 = value
@check_model_inputs
def forward(
self,
input_features,
attention_mask=None,
stno_mask=None,
**kwargs: Unpack[TransformersKwargs],
):
r"""
Args:
input_features (`torch.LongTensor` of shape `(batch_size, feature_size, sequence_length)`):
Float values of mel features extracted from the raw speech waveform. Raw speech waveform can be
obtained by loading a `.flac` or `.wav` audio file into an array of type `list[float]` or a
`numpy.ndarray`, *e.g.* via the soundfile library (`pip install soundfile`). To prepare the array into
`input_features`, the [`AutoFeatureExtractor`] should be used for extracting the mel features, padding
and conversion into a tensor of type `torch.FloatTensor`. See [`~WhisperFeatureExtractor.__call__`]
attention_mask (`torch.Tensor`)`, *optional*):
Voxtral does not support masking of the `input_features`, this argument is preserved for compatibility,
but it is not used. By default the silence in the input log mel spectrogram are ignored.
"""
expected_seq_length = self.config.max_source_positions * self.conv1.stride[0] * self.conv2.stride[0]
if input_features.shape[-1] != expected_seq_length:
raise ValueError(
f"Qwen2Audio expects the mel input features to be of length {expected_seq_length}, but found {input_features.shape[-1]}. Make sure to pad the input mel features to {expected_seq_length}."
)
input_features = input_features.to(dtype=self.conv1.weight.dtype, device=self.conv1.weight.device)
inputs_embeds = nn.functional.gelu(self.conv1(input_features))
inputs_embeds = nn.functional.gelu(self.conv2(inputs_embeds))
inputs_embeds = inputs_embeds.permute(0, 2, 1)
# Apply initial FDDT if configured
if (self.config.use_dicow_encoder and
self.config.use_fddt and
self.config.use_pre_pos_fddt and
hasattr(self, 'initial_fddt')):
inputs_embeds = self.initial_fddt(inputs_embeds, stno_mask)
embed_pos = self.embed_positions.weight
hidden_states = (inputs_embeds + embed_pos).to(inputs_embeds.dtype)
hidden_states = nn.functional.dropout(hidden_states, p=self.dropout, training=self.training)
for idx, encoder_layer in enumerate(self.layers):
if (self.config.use_dicow_encoder and
self.config.use_fddt and
hasattr(self, 'fddts') and
idx < len(self.fddts)):
hidden_states = self.fddts[idx](hidden_states, stno_mask)
layer_outputs = encoder_layer(
hidden_states,
attention_mask=attention_mask,
layer_head_mask=None,
)
hidden_states = layer_outputs[0]
hidden_states = self.layer_norm(hidden_states)
return BaseModelOutput(
last_hidden_state=hidden_states,
)
# Ignore copy
def _get_feat_extract_output_lengths(self, input_lengths: torch.LongTensor):
"""
Computes the output length of the convolutional layers and the output length of the audio encoder
"""
input_lengths = (input_lengths - 1) // 2 + 1
output_lengths = (input_lengths - 2) // 2 + 1
return input_lengths, output_lengths
class VoxtralMultiModalProjector(nn.Module):
def __init__(self, config: VoxtralConfig):
super().__init__()
self.linear_1 = nn.Linear(config.audio_config.intermediate_size, config.text_config.hidden_size, bias=False)
self.act = ACT2FN[config.projector_hidden_act]
self.linear_2 = nn.Linear(config.text_config.hidden_size, config.text_config.hidden_size, bias=False)
def forward(self, audio_features):
hidden_states = self.linear_1(audio_features)
hidden_states = self.act(hidden_states)
hidden_states = self.linear_2(hidden_states)
return hidden_states
@auto_docstring(
custom_intro="""
The Voxtral model, which consists of Whisper encoder, a multi-modal projector and a LLama language model.
"""
)
class DixtralForConditionalGeneration(DixtralPreTrainedModel, GenerationMixin):
_tied_weights_keys = ["lm_head.weight"]
_tp_plan = {"lm_head": "colwise_rep"}
_pp_plan = {"lm_head": (["hidden_states"], ["logits"])}
_keep_in_fp32_modules_strict = ["embed_positions"]
def __init__(self, config):
super().__init__(config)
self.vocab_size = config.text_config.vocab_size
self.audio_tower = DixtralEncoder(config.audio_config)
self.language_model = AutoModelForCausalLM.from_config(config.text_config)
self.multi_modal_projector = VoxtralMultiModalProjector(config)
self.num_soft_prompts = config.num_soft_prompts
if self.num_soft_prompts > 0:
self.soft_prompt_token_id = getattr(config, "soft_prompt_token_id", 23)
self.soft_prompt = nn.Parameter(
torch.randn(1, self.num_soft_prompts, config.text_config.hidden_size)
)
self._init_dicow_components(config)
# Initialize weights and apply final processing
self.post_init()
def _init_dicow_components(self, config):
self.ctc_weight = config.audio_config.ctc_weight
# Additional layers for CTC
if config.audio_config.additional_layer and self.ctc_weight > 0.0:
custom_conf = copy.deepcopy(config.audio_config)
custom_conf.d_model = config.text_config.hidden_size
custom_conf.encoder_attention_heads = config.text_config.num_attention_heads
custom_conf.encoder_ffn_dim = custom_conf.d_model * 2
self.additional_layer = VoxtralEncoderLayer(custom_conf)
if config.audio_config.additional_self_attention_layer and self.ctc_weight > 0.0:
self.additional_self_attention_layer = VoxtralAttention(
embed_dim=config.text_config.hidden_size,
num_heads=config.text_config.num_attention_heads,
dropout=config.text_config.attention_dropout,
config=config.audio_config, # Fixed: pass audio_config which is VoxtralConfig
)
# CTC head
if self.ctc_weight > 0.0:
self.ctc_lm_head = nn.Linear(config.text_config.hidden_size, config.text_config.vocab_size, bias=False)
self.ctc_lm_head.weight = self.language_model.get_input_embeddings().weight
def get_input_embeddings(self):
return self.language_model.get_input_embeddings()
def set_input_embeddings(self, value):
self.language_model.set_input_embeddings(value)
def get_output_embeddings(self):
return self.language_model.get_output_embeddings()
def set_output_embeddings(self, new_embeddings):
self.language_model.set_output_embeddings(new_embeddings)
def set_decoder(self, decoder):
self.language_model.set_decoder(decoder)
def get_decoder(self):
return self.language_model.get_decoder()
def get_audio_embeds(self, input_features: torch.FloatTensor, stno_mask: torch.FloatTensor):
"""
This method is used to get the audio embeddings from input features (a log mel spectrogram), meaning inferring the audio encoder and the multi-modal projector.
Args:
input_features (`torch.FloatTensor`):
Float values of mel features extracted from the raw speech waveform. Raw speech waveform can be
obtained by loading a `.flac` or `.wav` audio file into an array of type `list[float]` or a
`numpy.ndarray`, *e.g.* via the soundfile library (`pip install soundfile`). To prepare the array into
`input_features`, the [`AutoFeatureExtractor`] should be used for extracting the mel features, padding
and conversion into a tensor of type `torch.FloatTensor`. See [`~WhisperFeatureExtractor.__call__`]
Returns:
`torch.FloatTensor`:
The audio embeddings.
"""
audio_outputs = self.audio_tower(input_features, stno_mask=stno_mask)
audio_hidden_states = audio_outputs.last_hidden_state
audio_hidden_states = audio_hidden_states.reshape(-1, self.config.audio_config.intermediate_size)
audio_embeds = self.multi_modal_projector(audio_hidden_states)
return audio_embeds
def set_tokenizer(self, tokenizer):
self.tokenizer = tokenizer
def possibly_update_last_hidden_states(self, hidden_states):
"""DiCoW post-processing for CTC"""
if not self.config.audio_config.use_dicow_encoder:
return hidden_states
if hasattr(self, "additional_layer"):
hidden_states, _ = self.additional_layer(
hidden_states,
attention_mask=None,
layer_head_mask=None,
output_attentions=False,
)
elif hasattr(self, "additional_self_attention_layer"):
hidden_states, _ = self.additional_self_attention_layer(
hidden_states,
attention_mask=None,
layer_head_mask=None,
output_attentions=False,
)
return hidden_states
def get_enc_logits(self, hidden_states):
"""
Get CTC logits from encoder hidden states.
Applies optional additional processing layer and projects to vocabulary.
Args:
hidden_states: Encoder output hidden states
Returns:
logits: CTC logits of shape (batch_size, seq_len, vocab_size + 1)
"""
hidden_states = self.possibly_update_last_hidden_states(hidden_states)
logits = self.ctc_lm_head(hidden_states)
return logits
def right_pad_labels(self, labels, pad_value=-100):
"""
labels: (B, L) tensor possibly left/right padded
returns: right-padded labels only
"""
B, L = labels.shape
new_labels = torch.full_like(labels, pad_value)
max_len = 1
for b in range(B):
valid = labels[b][labels[b] != pad_value]
max_len = max(max_len, len(valid))
new_labels[b, :valid.numel()] = valid
new_labels = new_labels[:, :max_len]
return new_labels
def get_ctc_loss(self, logits, labels, input_lengths):
"""Compute CTC loss for DiCoW"""
if labels.max() >= self.config.text_config.vocab_size:
raise ValueError(f"Label values must be <= vocab_size: {self.config.text_config.vocab_size}")
# Assuming that padded tokens are filled with -100
labels_mask = labels >= 0
target_lengths = labels_mask.sum(-1)
# CTC loss doesn't support fp16
log_probs = nn.functional.log_softmax(logits, dim=-1, dtype=torch.float32).transpose(0, 1)
with torch.backends.cudnn.flags(enabled=True):
ctc_loss = nn.functional.ctc_loss(
log_probs,
labels,
input_lengths,
target_lengths,
blank=logits.shape[-1] - 1,
reduction=self.config.audio_config.ctc_loss_reduction,
zero_infinity=True,
)
return ctc_loss
@can_return_tuple
@auto_docstring
def forward(
self,
input_ids: Optional[torch.LongTensor] = None,
input_features: Optional[torch.FloatTensor] = None,
attention_mask: Optional[torch.Tensor] = None,
position_ids: Optional[torch.LongTensor] = None,
past_key_values: Optional[Cache] = None,
inputs_embeds: Optional[torch.FloatTensor] = None,
labels: Optional[torch.LongTensor] = None,
use_cache: Optional[bool] = None,
cache_position: Optional[torch.LongTensor] = None,
logits_to_keep: Union[int, torch.Tensor] = 0,
stno_mask=None,
**kwargs: Unpack[TransformersKwargs],
) -> CausalLMOutputWithPast:
r"""
Example:
```python
>>> from transformers import VoxtralForConditionalGeneration, AutoProcessor
>>> import torch
>>> device = "cuda" if torch.cuda.is_available() else "cpu"
>>> repo_id = "mistralai/Voxtral-Mini-3B-2507"
>>> processor = AutoProcessor.from_pretrained(repo_id)
>>> model = VoxtralForConditionalGeneration.from_pretrained(repo_id, torch_dtype=torch.bfloat16, device_map=device)
>>> conversation = [
{
"role": "user",
"content": [
{
"type": "audio",
"url": "https://huggingface.co/datasets/hf-internal-testing/dummy-audio-samples/resolve/main/dude_where_is_my_car.wav",
},
{"type": "text", "text": "What can you tell me about this audio?"},
],
}
]
>>> inputs = processor.apply_chat_template(conversation)
>>> inputs = inputs.to(device, dtype=torch.bfloat16)
>>> outputs = model.generate(**inputs, max_new_tokens=30)
>>> processor.batch_decode(outputs[:, inputs.input_ids.shape[1]:], skip_special_tokens=True)
["This audio is a humorous conversation between two friends, likely in English, where one of them is trying to figure out what the other's tattoo says."]
```"""
if inputs_embeds is None:
inputs_embeds = self.get_input_embeddings()(input_ids)
ctc_loss = None
if input_features is not None:
# Get audio encoder outputs
audio_outputs = self.audio_tower(input_features, stno_mask=stno_mask)
audio_hidden_states = audio_outputs.last_hidden_state
# Project audio features for language model
audio_hidden_states_flat = audio_hidden_states.reshape(-1, self.config.audio_config.intermediate_size)
audio_embeds_flat = self.multi_modal_projector(audio_hidden_states_flat)
# Replace text-audio token placeholders with audio embeddings
audio_token_mask = input_ids == self.config.audio_token_id
inputs_embeds[audio_token_mask] = audio_embeds_flat
if self.num_soft_prompts > 0:
prompt_mask = (input_ids == self.soft_prompt_token_id)
if prompt_mask.any():
batch_size = inputs_embeds.shape[0]
# Expand the learned soft prompts to [Batch_Size, Num_Soft_Tokens, Hidden_Size]
# Then flatten to [Batch_Size * Num_Soft_Tokens, Hidden_Size] to match the mask
prompts_expanded = self.soft_prompt.expand(batch_size, -1, -1).reshape(-1,
self.config.text_config.hidden_size)
# Replace embeddings
inputs_embeds[prompt_mask] = prompts_expanded
# Compute CTC loss on projected embeddings if configured
if (self.config.audio_config.use_dicow_encoder and
self.config.audio_config.ctc_weight > 0.0 and
labels is not None and
self.training and
audio_token_mask is not None) or hasattr(self, "ctc_rescorer"):
# Create tensor with shape of input_ids filled with zeros
batch_size, seq_len = input_ids.shape
hidden_dim = audio_embeds_flat.shape[-1]
ctc_embeds = torch.empty(
batch_size, seq_len, hidden_dim,
device=audio_embeds_flat.device,
dtype=audio_embeds_flat.dtype
)
# Fill with audio_embeds at audio_token positions
ctc_embeds[audio_token_mask] = audio_embeds_flat
ctc_embeds_detached = ctc_embeds.detach()
# 2. Force it to require gradients so the additional_layer
# builds a backward graph for its own weights
ctc_embeds_detached.requires_grad_(True)
# Remove values outside maximum valid range using audio_mask
enc_output_lens = audio_token_mask.sum(dim=1)
max_valid_len = enc_output_lens.max().item()
first_audio_token = audio_token_mask.int().argmax(dim=1).min().item() # First True position per batch
ctc_embeds = ctc_embeds[:, first_audio_token:first_audio_token+max_valid_len, :]
# Get encoder logits for CTC
enc_logits = self.get_enc_logits(ctc_embeds)
if hasattr(self, "ctc_rescorer"):
rescorer = CTCRescorerLogitsProcessorWithPruning(
enc_logits,
torch.full((enc_logits.shape[0],), fill_value=enc_logits.shape[1],
device=enc_logits.device),
enc_logits.shape[-1] - 1,
self.generation_config.pad_token_id,
self.generation_config.eos_token_id,
self.generation_config.bos_token_id,
self.tokenizer,
0,
self.generation_config.ctc_weight,
self.generation_config.num_beams,
False,
)
self.ctc_rescorer.set_func(func=rescorer)
if labels is not None:
# Prepare encoder labels
enc_labels = labels.clone()
# Replace EOS tokens with ignore index
enc_labels[enc_labels == self.config.text_config.eos_token_id] = -100
enc_labels = self.right_pad_labels(enc_labels)
# Compute CTC loss
ctc_loss = self.get_ctc_loss(enc_logits, enc_labels, enc_output_lens)
outputs: BaseModelOutputWithPast = self.language_model(
attention_mask=attention_mask,
position_ids=position_ids,
past_key_values=past_key_values,
inputs_embeds=inputs_embeds,
labels=labels,
use_cache=use_cache,
cache_position=cache_position,
logits_to_keep=logits_to_keep,
**kwargs,
)
if ctc_loss is not None and outputs.loss is not None:
if wandb.run is not None:
wandb.log({"dec_loss": outputs.loss, "ctc_loss": ctc_loss})
total_loss = outputs.loss + self.config.audio_config.ctc_weight * ctc_loss
outputs.loss = total_loss
elif ctc_loss is not None:
outputs.loss = self.config.audio_config.ctc_weight * ctc_loss
return outputs
def prepare_inputs_for_generation(self, *args, **kwargs):
# Overwritten -- we should not pass input_features/stno_mask when in cached decoding stage
input_features = kwargs.pop("input_features", None)
stno_mask = kwargs.pop("stno_mask", None)
cache_position = kwargs.get("cache_position")
model_inputs = super().prepare_inputs_for_generation(*args, **kwargs)
if cache_position is not None and cache_position[0] == 0:
# Only pass audio inputs on the first (prefill) step
model_inputs["input_features"] = input_features
model_inputs["stno_mask"] = stno_mask
return model_inputs
def _get_logits_processor(
self,
generation_config: GenerationConfig,
input_ids_seq_length: Optional[int] = None,
encoder_input_ids: torch.LongTensor = None,
prefix_allowed_tokens_fn: Optional[Callable[[int, torch.Tensor], list[int]]] = None,
logits_processor: Optional[LogitsProcessorList] = None,
device: Optional[str] = None,
model_kwargs: Optional[dict[str, Any]] = None,
negative_prompt_ids: Optional[torch.Tensor] = None,
negative_prompt_attention_mask: Optional[torch.Tensor] = None,
) -> LogitsProcessorList:
# pylint: disable=no-member
gen_config_copy = copy.deepcopy(generation_config)
processors = super()._get_logits_processor(
gen_config_copy,
input_ids_seq_length,
encoder_input_ids,
prefix_allowed_tokens_fn,
logits_processor,
device,
model_kwargs,
negative_prompt_ids,
negative_prompt_attention_mask,
)
if hasattr(generation_config, "ctc_weight") and generation_config.ctc_weight > 0:
self.ctc_rescorer = CTCProcessorDummy
processors.append(self.ctc_rescorer)
return processors
@torch.no_grad()
def decode_ctc(
self,
input_ids: torch.LongTensor,
input_features: torch.FloatTensor,
stno_mask: Optional[torch.Tensor] = None,
) -> tuple[None, torch.LongTensor]:
"""
Performs greedy CTC decoding on the audio input.
"""
audio_outputs = self.audio_tower(input_features, stno_mask=stno_mask)
audio_hidden_states = audio_outputs.last_hidden_state
# Project audio features for language model
audio_hidden_states_flat = audio_hidden_states.reshape(-1, self.config.audio_config.intermediate_size)
audio_embeds_flat = self.multi_modal_projector(audio_hidden_states_flat)
# Replace text-audio token placeholders with audio embeddings
audio_token_mask = input_ids == self.config.audio_token_id
# Create tensor with shape of input_ids filled with zeros
batch_size, seq_len = input_ids.shape
hidden_dim = audio_embeds_flat.shape[-1]
ctc_embeds = torch.empty(
batch_size, seq_len, hidden_dim,
device=audio_embeds_flat.device,
dtype=audio_embeds_flat.dtype
)
# Fill with audio_embeds at audio_token positions
ctc_embeds[audio_token_mask] = audio_embeds_flat
# Remove values outside maximum valid range using audio_mask
enc_output_lens = audio_token_mask.sum(dim=1)
max_valid_len = enc_output_lens.max().item()
first_audio_token = audio_token_mask.int().argmax(dim=1).min().item() # First True position per batch
ctc_embeds = ctc_embeds[:, first_audio_token:first_audio_token + max_valid_len, :]
# Get encoder logits for CTC
logits = self.get_enc_logits(ctc_embeds)
# 4. Greedy Decoding
predicted_ids = torch.argmax(logits, dim=-1)
# Blank token is the last index in the vocabulary (vocab_size - 1)
# Based on: blank=logits.shape[-1] - 1 in get_ctc_loss
blank_id = self.config.text_config.vocab_size - 1
sequences = []
for batch_idx in range(batch_size):
ids = predicted_ids[batch_idx].cpu().tolist()
# CTC Collapse:
# 1. Merge adjacent duplicates
# 2. Remove blank tokens
collapsed_ids = []
prev_id = -1
for token_id in ids:
if token_id != prev_id:
if token_id != blank_id:
collapsed_ids.append(token_id)
prev_id = token_id
sequences.append(torch.tensor(collapsed_ids, dtype=torch.long))
return None, torch.nn.utils.rnn.pad_sequence(sequences, batch_first=True, padding_value=-100).to(input_ids.device)
__all__ = ["DixtralPreTrainedModel", "DixtralEncoder", "DixtralForConditionalGeneration"]