klemenk's picture
Upload modeling_distilled_speech.py with huggingface_hub
7fb5ee7 verified
"""
HuggingFace Model for Distilled Speech Encoder.
A Data2Vec-style bidirectional speech encoder distilled from AuriStream.
Returns hidden states from all layers for downstream probing/finetuning.
"""
import math
from dataclasses import dataclass
from typing import Optional, Tuple, Union
import torch
import torch.nn as nn
import torch.nn.functional as F
from transformers import PreTrainedModel
from transformers.modeling_outputs import BaseModelOutput
try:
# When used as a HuggingFace model (trust_remote_code=True)
from configuration_distilled_speech import DistilledSpeechConfig
except ImportError:
# When used as part of a package
from .configuration_distilled_speech import DistilledSpeechConfig
@dataclass
class DistilledSpeechOutput(BaseModelOutput):
"""
Output type for DistilledSpeechModel.
Args:
last_hidden_state (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`):
Sequence of hidden-states at the output of the last layer of the model.
hidden_states (`tuple(torch.FloatTensor)`, *optional*):
Tuple of `torch.FloatTensor` (one for the output of the embeddings + one for each layer)
of shape `(batch_size, sequence_length, hidden_size)`.
extract_features (`torch.FloatTensor` of shape `(batch_size, sequence_length, conv_dim[-1])`):
Output of the convolutional feature encoder (before projection).
"""
last_hidden_state: torch.FloatTensor = None
hidden_states: Optional[Tuple[torch.FloatTensor, ...]] = None
extract_features: Optional[torch.FloatTensor] = None
# ==============================================================================
# Convolutional Feature Encoder
# ==============================================================================
class GroupNorm1D(nn.Module):
"""Group normalization for 1D convolutions (B, C, T) -> (B, C, T)."""
def __init__(self, num_groups: int, num_channels: int, eps: float = 1e-5):
super().__init__()
self.norm = nn.GroupNorm(num_groups, num_channels, eps=eps)
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.norm(x)
class ConvLayer(nn.Module):
"""Single convolutional layer with normalization and activation."""
def __init__(
self,
in_channels: int,
out_channels: int,
kernel_size: int,
stride: int,
bias: bool = False,
norm: str = "group",
activation: str = "gelu",
):
super().__init__()
self.conv = nn.Conv1d(
in_channels,
out_channels,
kernel_size=kernel_size,
stride=stride,
bias=bias,
)
if norm == "group":
self.norm = GroupNorm1D(num_groups=out_channels, num_channels=out_channels)
elif norm == "layer":
self.norm = nn.LayerNorm(out_channels)
else:
self.norm = None
if activation == "gelu":
self.activation = nn.GELU()
elif activation == "relu":
self.activation = nn.ReLU()
else:
self.activation = None
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = self.conv(x)
if self.norm is not None:
if isinstance(self.norm, nn.LayerNorm):
x = x.transpose(1, 2)
x = self.norm(x)
x = x.transpose(1, 2)
else:
x = self.norm(x)
if self.activation is not None:
x = self.activation(x)
return x
class ConvFeatureEncoder(nn.Module):
"""
7-layer convolutional feature encoder.
Transforms raw 16kHz audio into 50Hz feature representations.
Total stride: 5 * 2 * 2 * 2 * 2 * 2 * 2 = 320 (16kHz / 320 = 50Hz)
"""
def __init__(self, config: DistilledSpeechConfig):
super().__init__()
conv_layers = []
in_channels = 1
for i, (out_channels, kernel, stride) in enumerate(
zip(config.conv_dim, config.conv_kernel, config.conv_stride)
):
norm = "group" if i > 0 else config.feat_extract_norm
conv_layers.append(
ConvLayer(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=kernel,
stride=stride,
bias=config.conv_bias,
norm=norm,
activation=config.feat_extract_activation,
)
)
in_channels = out_channels
self.conv_layers = nn.ModuleList(conv_layers)
self.output_dim = config.conv_dim[-1]
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
Args:
x: Raw audio waveform (B, T) or (B, 1, T)
Returns:
Features (B, T', C) where T' = T // 320
"""
if x.dim() == 2:
x = x.unsqueeze(1)
for conv_layer in self.conv_layers:
x = conv_layer(x)
x = x.transpose(1, 2)
return x
class FeatureProjection(nn.Module):
"""Projects conv features to transformer hidden size."""
def __init__(self, config: DistilledSpeechConfig):
super().__init__()
self.layer_norm = nn.LayerNorm(config.conv_dim[-1], eps=config.layer_norm_eps)
self.projection = nn.Linear(config.conv_dim[-1], config.hidden_size)
self.dropout = nn.Dropout(config.feat_proj_dropout)
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = self.layer_norm(x)
x = self.projection(x)
x = self.dropout(x)
return x
# ==============================================================================
# Rotary Position Embeddings
# ==============================================================================
class RotaryEmbedding(nn.Module):
"""Rotary Position Embedding (RoPE)."""
def __init__(self, dim: int, theta: float = 10000.0, max_seq_len: int = 8192):
super().__init__()
self.dim = dim
self.theta = theta
self.max_seq_len = max_seq_len
inv_freq = 1.0 / (theta ** (torch.arange(0, dim, 2).float() / dim))
self.register_buffer("inv_freq", inv_freq, persistent=False)
self._cos_cached = None
self._sin_cached = None
self._seq_len_cached = 0
def _update_cache(self, seq_len: int, device: torch.device, dtype: torch.dtype):
if seq_len > self._seq_len_cached or self._cos_cached is None:
self._seq_len_cached = max(seq_len, self.max_seq_len)
t = torch.arange(self._seq_len_cached, device=device, dtype=dtype)
freqs = torch.outer(t, self.inv_freq.to(device))
emb = torch.cat((freqs, freqs), dim=-1)
self._cos_cached = emb.cos()
self._sin_cached = emb.sin()
def forward(self, x: torch.Tensor, seq_len: int) -> Tuple[torch.Tensor, torch.Tensor]:
self._update_cache(seq_len, x.device, x.dtype)
return (
self._cos_cached[:seq_len].to(x.dtype),
self._sin_cached[:seq_len].to(x.dtype),
)
def rotate_half(x: torch.Tensor) -> torch.Tensor:
"""Rotate half the hidden dims of the input."""
x1 = x[..., : x.shape[-1] // 2]
x2 = x[..., x.shape[-1] // 2 :]
return torch.cat((-x2, x1), dim=-1)
def apply_rotary_pos_emb(
q: torch.Tensor, k: torch.Tensor, cos: torch.Tensor, sin: torch.Tensor
) -> Tuple[torch.Tensor, torch.Tensor]:
"""Apply rotary position embedding to query and key tensors."""
cos = cos.unsqueeze(0).unsqueeze(0)
sin = sin.unsqueeze(0).unsqueeze(0)
q_embed = (q * cos) + (rotate_half(q) * sin)
k_embed = (k * cos) + (rotate_half(k) * sin)
return q_embed, k_embed
# ==============================================================================
# Transformer Layers
# ==============================================================================
class MultiHeadAttention(nn.Module):
"""Multi-head self-attention with RoPE support."""
def __init__(self, config: DistilledSpeechConfig):
super().__init__()
self.hidden_size = config.hidden_size
self.num_heads = config.num_attention_heads
self.head_dim = config.hidden_size // config.num_attention_heads
assert self.head_dim * self.num_heads == self.hidden_size
self.q_proj = nn.Linear(config.hidden_size, config.hidden_size)
self.k_proj = nn.Linear(config.hidden_size, config.hidden_size)
self.v_proj = nn.Linear(config.hidden_size, config.hidden_size)
self.out_proj = nn.Linear(config.hidden_size, config.hidden_size)
self.dropout = nn.Dropout(config.attention_dropout)
self.use_rope = config.use_rope
def forward(
self,
x: torch.Tensor,
cos: Optional[torch.Tensor] = None,
sin: Optional[torch.Tensor] = None,
attention_mask: Optional[torch.Tensor] = None,
) -> torch.Tensor:
B, T, _ = x.shape
q = self.q_proj(x).view(B, T, self.num_heads, self.head_dim).transpose(1, 2)
k = self.k_proj(x).view(B, T, self.num_heads, self.head_dim).transpose(1, 2)
v = self.v_proj(x).view(B, T, self.num_heads, self.head_dim).transpose(1, 2)
if self.use_rope and cos is not None and sin is not None:
q, k = apply_rotary_pos_emb(q, k, cos, sin)
# Scaled dot-product attention
attn_output = F.scaled_dot_product_attention(
q, k, v,
attn_mask=attention_mask,
dropout_p=self.dropout.p if self.training else 0.0,
)
attn_output = attn_output.transpose(1, 2).contiguous().view(B, T, self.hidden_size)
attn_output = self.out_proj(attn_output)
return attn_output
class FeedForward(nn.Module):
"""Feed-forward network with GELU activation."""
def __init__(self, config: DistilledSpeechConfig):
super().__init__()
self.fc1 = nn.Linear(config.hidden_size, config.intermediate_size)
self.fc2 = nn.Linear(config.intermediate_size, config.hidden_size)
self.activation = nn.GELU()
self.dropout = nn.Dropout(config.activation_dropout)
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = self.fc1(x)
x = self.activation(x)
x = self.dropout(x)
x = self.fc2(x)
return x
class TransformerLayer(nn.Module):
"""Single transformer encoder layer with pre-norm."""
def __init__(self, config: DistilledSpeechConfig):
super().__init__()
self.attention = MultiHeadAttention(config)
self.feed_forward = FeedForward(config)
self.attention_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
self.ffn_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
self.dropout = nn.Dropout(config.hidden_dropout)
def forward(
self,
x: torch.Tensor,
cos: Optional[torch.Tensor] = None,
sin: Optional[torch.Tensor] = None,
attention_mask: Optional[torch.Tensor] = None,
) -> torch.Tensor:
# Self-attention with pre-norm
residual = x
x = self.attention_norm(x)
x = self.attention(x, cos, sin, attention_mask)
x = self.dropout(x)
x = residual + x
# Feed-forward with pre-norm
residual = x
x = self.ffn_norm(x)
x = self.feed_forward(x)
x = self.dropout(x)
x = residual + x
return x
class TransformerEncoder(nn.Module):
"""Stack of transformer encoder layers with hidden state collection."""
def __init__(self, config: DistilledSpeechConfig):
super().__init__()
self.config = config
self.layers = nn.ModuleList([
TransformerLayer(config) for _ in range(config.num_hidden_layers)
])
if config.use_rope:
self.rotary_emb = RotaryEmbedding(
dim=config.hidden_size // config.num_attention_heads,
theta=config.rope_theta,
)
else:
self.rotary_emb = None
def forward(
self,
x: torch.Tensor,
attention_mask: Optional[torch.Tensor] = None,
output_hidden_states: bool = False,
) -> Tuple[torch.Tensor, Optional[Tuple[torch.Tensor, ...]]]:
"""
Args:
x: Input tensor (B, T, D)
attention_mask: Optional attention mask
output_hidden_states: Whether to return all hidden states
Returns:
Tuple of (last_hidden_state, all_hidden_states)
all_hidden_states: tuple of (num_layers + 1) tensors if output_hidden_states=True
- hidden_states[0]: input to first transformer layer
- hidden_states[i]: output of transformer layer i-1 (for i > 0)
"""
B, T, _ = x.shape
cos, sin = None, None
if self.rotary_emb is not None:
cos, sin = self.rotary_emb(x, T)
all_hidden_states = () if output_hidden_states else None
# Collect hidden state before first layer (embedding output)
if output_hidden_states:
all_hidden_states = all_hidden_states + (x,)
for layer in self.layers:
x = layer(x, cos, sin, attention_mask)
# Collect hidden state after each layer
if output_hidden_states:
all_hidden_states = all_hidden_states + (x,)
return x, all_hidden_states
# ==============================================================================
# Main Model
# ==============================================================================
class DistilledSpeechModel(PreTrainedModel):
"""
Distilled Speech Encoder Model.
A Data2Vec-style bidirectional transformer encoder for speech,
trained via distillation from AuriStream models.
This model takes raw audio waveforms as input and outputs contextualized
representations at 50Hz (20ms stride). It returns hidden states from all
transformer layers, making it suitable for downstream probing and finetuning.
IMPORTANT: Call model.eval() before inference to disable dropout and ensure
correct behavior of normalization layers.
Hidden states structure (for 12-layer model, output_hidden_states=True):
- hidden_states[0]: Feature projection output (input to transformer)
- hidden_states[1]: Output of transformer layer 0
- hidden_states[2]: Output of transformer layer 1
- ...
- hidden_states[12]: Output of transformer layer 11
Total: 13 hidden states (1 embedding + 12 layers)
Example usage:
>>> from transformers import AutoModel, Wav2Vec2FeatureExtractor
>>> model = AutoModel.from_pretrained("your-model-name", trust_remote_code=True)
>>> model.eval() # Important for inference!
>>> feature_extractor = Wav2Vec2FeatureExtractor.from_pretrained("your-model-name")
>>> audio = torch.randn(16000).numpy() # 1 second of audio at 16kHz
>>> inputs = feature_extractor(audio, return_tensors="pt", sampling_rate=16000)
>>> with torch.no_grad():
... outputs = model(inputs.input_values, output_hidden_states=True)
>>> last_hidden = outputs.last_hidden_state # (1, 50, 768)
>>> all_hidden = outputs.hidden_states # Tuple of 13 tensors
>>> # Or use dict-style access:
>>> all_hidden = outputs["hidden_states"]
"""
config_class = DistilledSpeechConfig
base_model_prefix = "distilled_speech"
main_input_name = "input_values"
supports_gradient_checkpointing = True
def __init__(self, config: DistilledSpeechConfig):
super().__init__(config)
self.config = config
# Feature extraction
self.conv_encoder = ConvFeatureEncoder(config)
self.feature_projection = FeatureProjection(config)
# Transformer encoder
self.encoder = TransformerEncoder(config)
self.final_layer_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
# Initialize weights
self.post_init()
def _init_weights(self, module):
"""Initialize the weights."""
if isinstance(module, nn.Linear):
nn.init.trunc_normal_(module.weight, std=0.02)
if module.bias is not None:
nn.init.zeros_(module.bias)
elif isinstance(module, nn.LayerNorm):
nn.init.ones_(module.weight)
nn.init.zeros_(module.bias)
elif isinstance(module, nn.Conv1d):
nn.init.kaiming_normal_(module.weight, mode="fan_out", nonlinearity="relu")
if module.bias is not None:
nn.init.zeros_(module.bias)
def forward(
self,
input_values: torch.Tensor,
attention_mask: Optional[torch.Tensor] = None,
output_hidden_states: Optional[bool] = None,
return_dict: Optional[bool] = None,
) -> Union[Tuple, DistilledSpeechOutput]:
"""
Forward pass through the model.
Args:
input_values (`torch.Tensor` of shape `(batch_size, sequence_length)`):
Raw audio waveform, normalized to zero mean and unit variance.
Expected sample rate: 16kHz.
attention_mask (`torch.Tensor` of shape `(batch_size, sequence_length)`, *optional*):
Mask to avoid performing attention on padding tokens.
output_hidden_states (`bool`, *optional*):
Whether to return hidden states from all layers.
return_dict (`bool`, *optional*):
Whether to return a ModelOutput instead of a plain tuple.
Returns:
`DistilledSpeechOutput` or `tuple`:
- last_hidden_state: (B, T', hidden_size) where T' = T // 320
- hidden_states: Tuple of (B, T', hidden_size) for each layer if output_hidden_states=True
- extract_features: (B, T', conv_dim[-1]) raw conv features
"""
output_hidden_states = (
output_hidden_states if output_hidden_states is not None
else self.config.output_hidden_states
)
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
# Ensure input is float (audio samples should be float, not integer tokens)
if input_values.dtype in (torch.long, torch.int, torch.int32, torch.int64):
input_values = input_values.float()
# Ensure 2D input (batch_size, sequence_length)
if input_values.dim() == 1:
input_values = input_values.unsqueeze(0)
# Conv encoder: (B, T) -> (B, T', conv_dim)
extract_features = self.conv_encoder(input_values)
# Feature projection: (B, T', conv_dim) -> (B, T', hidden_size)
hidden_states = self.feature_projection(extract_features)
# Transformer encoder
encoder_output, all_hidden_states = self.encoder(
hidden_states,
attention_mask=attention_mask,
output_hidden_states=output_hidden_states,
)
# Final layer norm
last_hidden_state = self.final_layer_norm(encoder_output)
if not return_dict:
outputs = (last_hidden_state,)
if output_hidden_states:
outputs = outputs + (all_hidden_states,)
outputs = outputs + (extract_features,)
return outputs
return DistilledSpeechOutput(
last_hidden_state=last_hidden_state,
hidden_states=all_hidden_states,
extract_features=extract_features,
)