|
|
""" |
|
|
HuggingFace Model for Distilled Speech Encoder. |
|
|
|
|
|
A Data2Vec-style bidirectional speech encoder distilled from AuriStream. |
|
|
Returns hidden states from all layers for downstream probing/finetuning. |
|
|
""" |
|
|
|
|
|
import math |
|
|
from dataclasses import dataclass |
|
|
from typing import Optional, Tuple, Union |
|
|
|
|
|
import torch |
|
|
import torch.nn as nn |
|
|
import torch.nn.functional as F |
|
|
from transformers import PreTrainedModel |
|
|
from transformers.modeling_outputs import BaseModelOutput |
|
|
|
|
|
try: |
|
|
|
|
|
from configuration_distilled_speech import DistilledSpeechConfig |
|
|
except ImportError: |
|
|
|
|
|
from .configuration_distilled_speech import DistilledSpeechConfig |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class DistilledSpeechOutput(BaseModelOutput): |
|
|
""" |
|
|
Output type for DistilledSpeechModel. |
|
|
|
|
|
Args: |
|
|
last_hidden_state (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`): |
|
|
Sequence of hidden-states at the output of the last layer of the model. |
|
|
hidden_states (`tuple(torch.FloatTensor)`, *optional*): |
|
|
Tuple of `torch.FloatTensor` (one for the output of the embeddings + one for each layer) |
|
|
of shape `(batch_size, sequence_length, hidden_size)`. |
|
|
extract_features (`torch.FloatTensor` of shape `(batch_size, sequence_length, conv_dim[-1])`): |
|
|
Output of the convolutional feature encoder (before projection). |
|
|
""" |
|
|
last_hidden_state: torch.FloatTensor = None |
|
|
hidden_states: Optional[Tuple[torch.FloatTensor, ...]] = None |
|
|
extract_features: Optional[torch.FloatTensor] = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class GroupNorm1D(nn.Module): |
|
|
"""Group normalization for 1D convolutions (B, C, T) -> (B, C, T).""" |
|
|
|
|
|
def __init__(self, num_groups: int, num_channels: int, eps: float = 1e-5): |
|
|
super().__init__() |
|
|
self.norm = nn.GroupNorm(num_groups, num_channels, eps=eps) |
|
|
|
|
|
def forward(self, x: torch.Tensor) -> torch.Tensor: |
|
|
return self.norm(x) |
|
|
|
|
|
|
|
|
class ConvLayer(nn.Module): |
|
|
"""Single convolutional layer with normalization and activation.""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
in_channels: int, |
|
|
out_channels: int, |
|
|
kernel_size: int, |
|
|
stride: int, |
|
|
bias: bool = False, |
|
|
norm: str = "group", |
|
|
activation: str = "gelu", |
|
|
): |
|
|
super().__init__() |
|
|
self.conv = nn.Conv1d( |
|
|
in_channels, |
|
|
out_channels, |
|
|
kernel_size=kernel_size, |
|
|
stride=stride, |
|
|
bias=bias, |
|
|
) |
|
|
|
|
|
if norm == "group": |
|
|
self.norm = GroupNorm1D(num_groups=out_channels, num_channels=out_channels) |
|
|
elif norm == "layer": |
|
|
self.norm = nn.LayerNorm(out_channels) |
|
|
else: |
|
|
self.norm = None |
|
|
|
|
|
if activation == "gelu": |
|
|
self.activation = nn.GELU() |
|
|
elif activation == "relu": |
|
|
self.activation = nn.ReLU() |
|
|
else: |
|
|
self.activation = None |
|
|
|
|
|
def forward(self, x: torch.Tensor) -> torch.Tensor: |
|
|
x = self.conv(x) |
|
|
if self.norm is not None: |
|
|
if isinstance(self.norm, nn.LayerNorm): |
|
|
x = x.transpose(1, 2) |
|
|
x = self.norm(x) |
|
|
x = x.transpose(1, 2) |
|
|
else: |
|
|
x = self.norm(x) |
|
|
if self.activation is not None: |
|
|
x = self.activation(x) |
|
|
return x |
|
|
|
|
|
|
|
|
class ConvFeatureEncoder(nn.Module): |
|
|
""" |
|
|
7-layer convolutional feature encoder. |
|
|
|
|
|
Transforms raw 16kHz audio into 50Hz feature representations. |
|
|
Total stride: 5 * 2 * 2 * 2 * 2 * 2 * 2 = 320 (16kHz / 320 = 50Hz) |
|
|
""" |
|
|
|
|
|
def __init__(self, config: DistilledSpeechConfig): |
|
|
super().__init__() |
|
|
|
|
|
conv_layers = [] |
|
|
in_channels = 1 |
|
|
|
|
|
for i, (out_channels, kernel, stride) in enumerate( |
|
|
zip(config.conv_dim, config.conv_kernel, config.conv_stride) |
|
|
): |
|
|
norm = "group" if i > 0 else config.feat_extract_norm |
|
|
conv_layers.append( |
|
|
ConvLayer( |
|
|
in_channels=in_channels, |
|
|
out_channels=out_channels, |
|
|
kernel_size=kernel, |
|
|
stride=stride, |
|
|
bias=config.conv_bias, |
|
|
norm=norm, |
|
|
activation=config.feat_extract_activation, |
|
|
) |
|
|
) |
|
|
in_channels = out_channels |
|
|
|
|
|
self.conv_layers = nn.ModuleList(conv_layers) |
|
|
self.output_dim = config.conv_dim[-1] |
|
|
|
|
|
def forward(self, x: torch.Tensor) -> torch.Tensor: |
|
|
""" |
|
|
Args: |
|
|
x: Raw audio waveform (B, T) or (B, 1, T) |
|
|
|
|
|
Returns: |
|
|
Features (B, T', C) where T' = T // 320 |
|
|
""" |
|
|
if x.dim() == 2: |
|
|
x = x.unsqueeze(1) |
|
|
|
|
|
for conv_layer in self.conv_layers: |
|
|
x = conv_layer(x) |
|
|
|
|
|
x = x.transpose(1, 2) |
|
|
return x |
|
|
|
|
|
|
|
|
class FeatureProjection(nn.Module): |
|
|
"""Projects conv features to transformer hidden size.""" |
|
|
|
|
|
def __init__(self, config: DistilledSpeechConfig): |
|
|
super().__init__() |
|
|
self.layer_norm = nn.LayerNorm(config.conv_dim[-1], eps=config.layer_norm_eps) |
|
|
self.projection = nn.Linear(config.conv_dim[-1], config.hidden_size) |
|
|
self.dropout = nn.Dropout(config.feat_proj_dropout) |
|
|
|
|
|
def forward(self, x: torch.Tensor) -> torch.Tensor: |
|
|
x = self.layer_norm(x) |
|
|
x = self.projection(x) |
|
|
x = self.dropout(x) |
|
|
return x |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class RotaryEmbedding(nn.Module): |
|
|
"""Rotary Position Embedding (RoPE).""" |
|
|
|
|
|
def __init__(self, dim: int, theta: float = 10000.0, max_seq_len: int = 8192): |
|
|
super().__init__() |
|
|
self.dim = dim |
|
|
self.theta = theta |
|
|
self.max_seq_len = max_seq_len |
|
|
|
|
|
inv_freq = 1.0 / (theta ** (torch.arange(0, dim, 2).float() / dim)) |
|
|
self.register_buffer("inv_freq", inv_freq, persistent=False) |
|
|
|
|
|
self._cos_cached = None |
|
|
self._sin_cached = None |
|
|
self._seq_len_cached = 0 |
|
|
|
|
|
def _update_cache(self, seq_len: int, device: torch.device, dtype: torch.dtype): |
|
|
if seq_len > self._seq_len_cached or self._cos_cached is None: |
|
|
self._seq_len_cached = max(seq_len, self.max_seq_len) |
|
|
t = torch.arange(self._seq_len_cached, device=device, dtype=dtype) |
|
|
freqs = torch.outer(t, self.inv_freq.to(device)) |
|
|
emb = torch.cat((freqs, freqs), dim=-1) |
|
|
self._cos_cached = emb.cos() |
|
|
self._sin_cached = emb.sin() |
|
|
|
|
|
def forward(self, x: torch.Tensor, seq_len: int) -> Tuple[torch.Tensor, torch.Tensor]: |
|
|
self._update_cache(seq_len, x.device, x.dtype) |
|
|
return ( |
|
|
self._cos_cached[:seq_len].to(x.dtype), |
|
|
self._sin_cached[:seq_len].to(x.dtype), |
|
|
) |
|
|
|
|
|
|
|
|
def rotate_half(x: torch.Tensor) -> torch.Tensor: |
|
|
"""Rotate half the hidden dims of the input.""" |
|
|
x1 = x[..., : x.shape[-1] // 2] |
|
|
x2 = x[..., x.shape[-1] // 2 :] |
|
|
return torch.cat((-x2, x1), dim=-1) |
|
|
|
|
|
|
|
|
def apply_rotary_pos_emb( |
|
|
q: torch.Tensor, k: torch.Tensor, cos: torch.Tensor, sin: torch.Tensor |
|
|
) -> Tuple[torch.Tensor, torch.Tensor]: |
|
|
"""Apply rotary position embedding to query and key tensors.""" |
|
|
cos = cos.unsqueeze(0).unsqueeze(0) |
|
|
sin = sin.unsqueeze(0).unsqueeze(0) |
|
|
q_embed = (q * cos) + (rotate_half(q) * sin) |
|
|
k_embed = (k * cos) + (rotate_half(k) * sin) |
|
|
return q_embed, k_embed |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class MultiHeadAttention(nn.Module): |
|
|
"""Multi-head self-attention with RoPE support.""" |
|
|
|
|
|
def __init__(self, config: DistilledSpeechConfig): |
|
|
super().__init__() |
|
|
self.hidden_size = config.hidden_size |
|
|
self.num_heads = config.num_attention_heads |
|
|
self.head_dim = config.hidden_size // config.num_attention_heads |
|
|
|
|
|
assert self.head_dim * self.num_heads == self.hidden_size |
|
|
|
|
|
self.q_proj = nn.Linear(config.hidden_size, config.hidden_size) |
|
|
self.k_proj = nn.Linear(config.hidden_size, config.hidden_size) |
|
|
self.v_proj = nn.Linear(config.hidden_size, config.hidden_size) |
|
|
self.out_proj = nn.Linear(config.hidden_size, config.hidden_size) |
|
|
|
|
|
self.dropout = nn.Dropout(config.attention_dropout) |
|
|
self.use_rope = config.use_rope |
|
|
|
|
|
def forward( |
|
|
self, |
|
|
x: torch.Tensor, |
|
|
cos: Optional[torch.Tensor] = None, |
|
|
sin: Optional[torch.Tensor] = None, |
|
|
attention_mask: Optional[torch.Tensor] = None, |
|
|
) -> torch.Tensor: |
|
|
B, T, _ = x.shape |
|
|
|
|
|
q = self.q_proj(x).view(B, T, self.num_heads, self.head_dim).transpose(1, 2) |
|
|
k = self.k_proj(x).view(B, T, self.num_heads, self.head_dim).transpose(1, 2) |
|
|
v = self.v_proj(x).view(B, T, self.num_heads, self.head_dim).transpose(1, 2) |
|
|
|
|
|
if self.use_rope and cos is not None and sin is not None: |
|
|
q, k = apply_rotary_pos_emb(q, k, cos, sin) |
|
|
|
|
|
|
|
|
attn_output = F.scaled_dot_product_attention( |
|
|
q, k, v, |
|
|
attn_mask=attention_mask, |
|
|
dropout_p=self.dropout.p if self.training else 0.0, |
|
|
) |
|
|
|
|
|
attn_output = attn_output.transpose(1, 2).contiguous().view(B, T, self.hidden_size) |
|
|
attn_output = self.out_proj(attn_output) |
|
|
|
|
|
return attn_output |
|
|
|
|
|
|
|
|
class FeedForward(nn.Module): |
|
|
"""Feed-forward network with GELU activation.""" |
|
|
|
|
|
def __init__(self, config: DistilledSpeechConfig): |
|
|
super().__init__() |
|
|
self.fc1 = nn.Linear(config.hidden_size, config.intermediate_size) |
|
|
self.fc2 = nn.Linear(config.intermediate_size, config.hidden_size) |
|
|
self.activation = nn.GELU() |
|
|
self.dropout = nn.Dropout(config.activation_dropout) |
|
|
|
|
|
def forward(self, x: torch.Tensor) -> torch.Tensor: |
|
|
x = self.fc1(x) |
|
|
x = self.activation(x) |
|
|
x = self.dropout(x) |
|
|
x = self.fc2(x) |
|
|
return x |
|
|
|
|
|
|
|
|
class TransformerLayer(nn.Module): |
|
|
"""Single transformer encoder layer with pre-norm.""" |
|
|
|
|
|
def __init__(self, config: DistilledSpeechConfig): |
|
|
super().__init__() |
|
|
self.attention = MultiHeadAttention(config) |
|
|
self.feed_forward = FeedForward(config) |
|
|
self.attention_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps) |
|
|
self.ffn_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps) |
|
|
self.dropout = nn.Dropout(config.hidden_dropout) |
|
|
|
|
|
def forward( |
|
|
self, |
|
|
x: torch.Tensor, |
|
|
cos: Optional[torch.Tensor] = None, |
|
|
sin: Optional[torch.Tensor] = None, |
|
|
attention_mask: Optional[torch.Tensor] = None, |
|
|
) -> torch.Tensor: |
|
|
|
|
|
residual = x |
|
|
x = self.attention_norm(x) |
|
|
x = self.attention(x, cos, sin, attention_mask) |
|
|
x = self.dropout(x) |
|
|
x = residual + x |
|
|
|
|
|
|
|
|
residual = x |
|
|
x = self.ffn_norm(x) |
|
|
x = self.feed_forward(x) |
|
|
x = self.dropout(x) |
|
|
x = residual + x |
|
|
|
|
|
return x |
|
|
|
|
|
|
|
|
class TransformerEncoder(nn.Module): |
|
|
"""Stack of transformer encoder layers with hidden state collection.""" |
|
|
|
|
|
def __init__(self, config: DistilledSpeechConfig): |
|
|
super().__init__() |
|
|
self.config = config |
|
|
self.layers = nn.ModuleList([ |
|
|
TransformerLayer(config) for _ in range(config.num_hidden_layers) |
|
|
]) |
|
|
|
|
|
if config.use_rope: |
|
|
self.rotary_emb = RotaryEmbedding( |
|
|
dim=config.hidden_size // config.num_attention_heads, |
|
|
theta=config.rope_theta, |
|
|
) |
|
|
else: |
|
|
self.rotary_emb = None |
|
|
|
|
|
def forward( |
|
|
self, |
|
|
x: torch.Tensor, |
|
|
attention_mask: Optional[torch.Tensor] = None, |
|
|
output_hidden_states: bool = False, |
|
|
) -> Tuple[torch.Tensor, Optional[Tuple[torch.Tensor, ...]]]: |
|
|
""" |
|
|
Args: |
|
|
x: Input tensor (B, T, D) |
|
|
attention_mask: Optional attention mask |
|
|
output_hidden_states: Whether to return all hidden states |
|
|
|
|
|
Returns: |
|
|
Tuple of (last_hidden_state, all_hidden_states) |
|
|
all_hidden_states: tuple of (num_layers + 1) tensors if output_hidden_states=True |
|
|
- hidden_states[0]: input to first transformer layer |
|
|
- hidden_states[i]: output of transformer layer i-1 (for i > 0) |
|
|
""" |
|
|
B, T, _ = x.shape |
|
|
|
|
|
cos, sin = None, None |
|
|
if self.rotary_emb is not None: |
|
|
cos, sin = self.rotary_emb(x, T) |
|
|
|
|
|
all_hidden_states = () if output_hidden_states else None |
|
|
|
|
|
|
|
|
if output_hidden_states: |
|
|
all_hidden_states = all_hidden_states + (x,) |
|
|
|
|
|
for layer in self.layers: |
|
|
x = layer(x, cos, sin, attention_mask) |
|
|
|
|
|
if output_hidden_states: |
|
|
all_hidden_states = all_hidden_states + (x,) |
|
|
|
|
|
return x, all_hidden_states |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class DistilledSpeechModel(PreTrainedModel): |
|
|
""" |
|
|
Distilled Speech Encoder Model. |
|
|
|
|
|
A Data2Vec-style bidirectional transformer encoder for speech, |
|
|
trained via distillation from AuriStream models. |
|
|
|
|
|
This model takes raw audio waveforms as input and outputs contextualized |
|
|
representations at 50Hz (20ms stride). It returns hidden states from all |
|
|
transformer layers, making it suitable for downstream probing and finetuning. |
|
|
|
|
|
IMPORTANT: Call model.eval() before inference to disable dropout and ensure |
|
|
correct behavior of normalization layers. |
|
|
|
|
|
Hidden states structure (for 12-layer model, output_hidden_states=True): |
|
|
- hidden_states[0]: Feature projection output (input to transformer) |
|
|
- hidden_states[1]: Output of transformer layer 0 |
|
|
- hidden_states[2]: Output of transformer layer 1 |
|
|
- ... |
|
|
- hidden_states[12]: Output of transformer layer 11 |
|
|
Total: 13 hidden states (1 embedding + 12 layers) |
|
|
|
|
|
Example usage: |
|
|
>>> from transformers import AutoModel, Wav2Vec2FeatureExtractor |
|
|
>>> model = AutoModel.from_pretrained("your-model-name", trust_remote_code=True) |
|
|
>>> model.eval() # Important for inference! |
|
|
>>> feature_extractor = Wav2Vec2FeatureExtractor.from_pretrained("your-model-name") |
|
|
>>> audio = torch.randn(16000).numpy() # 1 second of audio at 16kHz |
|
|
>>> inputs = feature_extractor(audio, return_tensors="pt", sampling_rate=16000) |
|
|
>>> with torch.no_grad(): |
|
|
... outputs = model(inputs.input_values, output_hidden_states=True) |
|
|
>>> last_hidden = outputs.last_hidden_state # (1, 50, 768) |
|
|
>>> all_hidden = outputs.hidden_states # Tuple of 13 tensors |
|
|
>>> # Or use dict-style access: |
|
|
>>> all_hidden = outputs["hidden_states"] |
|
|
""" |
|
|
|
|
|
config_class = DistilledSpeechConfig |
|
|
base_model_prefix = "distilled_speech" |
|
|
main_input_name = "input_values" |
|
|
supports_gradient_checkpointing = True |
|
|
|
|
|
def __init__(self, config: DistilledSpeechConfig): |
|
|
super().__init__(config) |
|
|
self.config = config |
|
|
|
|
|
|
|
|
self.conv_encoder = ConvFeatureEncoder(config) |
|
|
self.feature_projection = FeatureProjection(config) |
|
|
|
|
|
|
|
|
self.encoder = TransformerEncoder(config) |
|
|
self.final_layer_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps) |
|
|
|
|
|
|
|
|
self.post_init() |
|
|
|
|
|
def _init_weights(self, module): |
|
|
"""Initialize the weights.""" |
|
|
if isinstance(module, nn.Linear): |
|
|
nn.init.trunc_normal_(module.weight, std=0.02) |
|
|
if module.bias is not None: |
|
|
nn.init.zeros_(module.bias) |
|
|
elif isinstance(module, nn.LayerNorm): |
|
|
nn.init.ones_(module.weight) |
|
|
nn.init.zeros_(module.bias) |
|
|
elif isinstance(module, nn.Conv1d): |
|
|
nn.init.kaiming_normal_(module.weight, mode="fan_out", nonlinearity="relu") |
|
|
if module.bias is not None: |
|
|
nn.init.zeros_(module.bias) |
|
|
|
|
|
def forward( |
|
|
self, |
|
|
input_values: torch.Tensor, |
|
|
attention_mask: Optional[torch.Tensor] = None, |
|
|
output_hidden_states: Optional[bool] = None, |
|
|
return_dict: Optional[bool] = None, |
|
|
) -> Union[Tuple, DistilledSpeechOutput]: |
|
|
""" |
|
|
Forward pass through the model. |
|
|
|
|
|
Args: |
|
|
input_values (`torch.Tensor` of shape `(batch_size, sequence_length)`): |
|
|
Raw audio waveform, normalized to zero mean and unit variance. |
|
|
Expected sample rate: 16kHz. |
|
|
attention_mask (`torch.Tensor` of shape `(batch_size, sequence_length)`, *optional*): |
|
|
Mask to avoid performing attention on padding tokens. |
|
|
output_hidden_states (`bool`, *optional*): |
|
|
Whether to return hidden states from all layers. |
|
|
return_dict (`bool`, *optional*): |
|
|
Whether to return a ModelOutput instead of a plain tuple. |
|
|
|
|
|
Returns: |
|
|
`DistilledSpeechOutput` or `tuple`: |
|
|
- last_hidden_state: (B, T', hidden_size) where T' = T // 320 |
|
|
- hidden_states: Tuple of (B, T', hidden_size) for each layer if output_hidden_states=True |
|
|
- extract_features: (B, T', conv_dim[-1]) raw conv features |
|
|
""" |
|
|
output_hidden_states = ( |
|
|
output_hidden_states if output_hidden_states is not None |
|
|
else self.config.output_hidden_states |
|
|
) |
|
|
return_dict = return_dict if return_dict is not None else self.config.use_return_dict |
|
|
|
|
|
|
|
|
if input_values.dtype in (torch.long, torch.int, torch.int32, torch.int64): |
|
|
input_values = input_values.float() |
|
|
|
|
|
|
|
|
if input_values.dim() == 1: |
|
|
input_values = input_values.unsqueeze(0) |
|
|
|
|
|
|
|
|
extract_features = self.conv_encoder(input_values) |
|
|
|
|
|
|
|
|
hidden_states = self.feature_projection(extract_features) |
|
|
|
|
|
|
|
|
encoder_output, all_hidden_states = self.encoder( |
|
|
hidden_states, |
|
|
attention_mask=attention_mask, |
|
|
output_hidden_states=output_hidden_states, |
|
|
) |
|
|
|
|
|
|
|
|
last_hidden_state = self.final_layer_norm(encoder_output) |
|
|
|
|
|
if not return_dict: |
|
|
outputs = (last_hidden_state,) |
|
|
if output_hidden_states: |
|
|
outputs = outputs + (all_hidden_states,) |
|
|
outputs = outputs + (extract_features,) |
|
|
return outputs |
|
|
|
|
|
return DistilledSpeechOutput( |
|
|
last_hidden_state=last_hidden_state, |
|
|
hidden_states=all_hidden_states, |
|
|
extract_features=extract_features, |
|
|
) |
|
|
|