Spaces:
Running
on
Zero
Running
on
Zero
| import sys | |
| sys.path.append('/aifs4su/data/zheny/bigcodec_final/BigCodec_conv_transformer_vocos') | |
| import numpy as np | |
| import torch | |
| import torch.nn as nn | |
| from xcodec2.vq.residual_vq import ResidualVQ | |
| from xcodec2.vq.module import WNConv1d, DecoderBlock, ResLSTM | |
| from xcodec2.vq.alias_free_torch import * | |
| from xcodec2.vq import activations | |
| from typing import Optional | |
| from xcodec2.vq.module import ConvNeXtBlock, AdaLayerNorm | |
| from xcodec2.vq.bs_roformer5 import TransformerBlock | |
| # from rotary_embedding_torch import RotaryEmbedding | |
| from torchtune.modules import RotaryPositionalEmbeddings | |
| from vector_quantize_pytorch import ResidualFSQ | |
| from torch.nn import Module, ModuleList | |
| class ISTFT(nn.Module): | |
| """ | |
| Custom implementation of ISTFT since torch.istft doesn't allow custom padding (other than `center=True`) with | |
| windowing. This is because the NOLA (Nonzero Overlap Add) check fails at the edges. | |
| See issue: https://github.com/pytorch/pytorch/issues/62323 | |
| Specifically, in the context of neural vocoding we are interested in "same" padding analogous to CNNs. | |
| The NOLA constraint is met as we trim padded samples anyway. | |
| Args: | |
| n_fft (int): Size of Fourier transform. | |
| hop_length (int): The distance between neighboring sliding window frames. | |
| win_length (int): The size of window frame and STFT filter. | |
| padding (str, optional): Type of padding. Options are "center" or "same". Defaults to "same". | |
| """ | |
| def __init__(self, n_fft: int, hop_length: int, win_length: int, padding: str = "same"): | |
| super().__init__() | |
| if padding not in ["center", "same"]: | |
| raise ValueError("Padding must be 'center' or 'same'.") | |
| self.padding = padding | |
| self.n_fft = n_fft | |
| self.hop_length = hop_length | |
| self.win_length = win_length | |
| window = torch.hann_window(win_length) | |
| self.register_buffer("window", window) | |
| def forward(self, spec: torch.Tensor) -> torch.Tensor: | |
| """ | |
| Compute the Inverse Short Time Fourier Transform (ISTFT) of a complex spectrogram. | |
| Args: | |
| spec (Tensor): Input complex spectrogram of shape (B, N, T), where B is the batch size, | |
| N is the number of frequency bins, and T is the number of time frames. | |
| Returns: | |
| Tensor: Reconstructed time-domain signal of shape (B, L), where L is the length of the output signal. | |
| """ | |
| if self.padding == "center": | |
| # Fallback to pytorch native implementation | |
| return torch.istft(spec, self.n_fft, self.hop_length, self.win_length, self.window, center=True) | |
| elif self.padding == "same": | |
| pad = (self.win_length - self.hop_length) // 2 | |
| else: | |
| raise ValueError("Padding must be 'center' or 'same'.") | |
| assert spec.dim() == 3, "Expected a 3D tensor as input" | |
| B, N, T = spec.shape | |
| # Inverse FFT | |
| ifft = torch.fft.irfft(spec, self.n_fft, dim=1, norm="backward") | |
| ifft = ifft * self.window[None, :, None] | |
| # Overlap and Add | |
| output_size = (T - 1) * self.hop_length + self.win_length | |
| y = torch.nn.functional.fold( | |
| ifft, output_size=(1, output_size), kernel_size=(1, self.win_length), stride=(1, self.hop_length), | |
| )[:, 0, 0, pad:-pad] | |
| # Window envelope | |
| window_sq = self.window.square().expand(1, T, -1).transpose(1, 2) | |
| window_envelope = torch.nn.functional.fold( | |
| window_sq, output_size=(1, output_size), kernel_size=(1, self.win_length), stride=(1, self.hop_length), | |
| ).squeeze()[pad:-pad] | |
| # Normalize | |
| assert (window_envelope > 1e-11).all() | |
| y = y / window_envelope | |
| return y | |
| class FourierHead(nn.Module): | |
| """Base class for inverse fourier modules.""" | |
| def forward(self, x: torch.Tensor) -> torch.Tensor: | |
| """ | |
| Args: | |
| x (Tensor): Input tensor of shape (B, L, H), where B is the batch size, | |
| L is the sequence length, and H denotes the model dimension. | |
| Returns: | |
| Tensor: Reconstructed time-domain audio signal of shape (B, T), where T is the length of the output signal. | |
| """ | |
| raise NotImplementedError("Subclasses must implement the forward method.") | |
| class ISTFTHead(FourierHead): | |
| """ | |
| ISTFT Head module for predicting STFT complex coefficients. | |
| Args: | |
| dim (int): Hidden dimension of the model. | |
| n_fft (int): Size of Fourier transform. | |
| hop_length (int): The distance between neighboring sliding window frames, which should align with | |
| the resolution of the input features. | |
| padding (str, optional): Type of padding. Options are "center" or "same". Defaults to "same". | |
| """ | |
| def __init__(self, dim: int, n_fft: int, hop_length: int, padding: str = "same"): | |
| super().__init__() | |
| out_dim = n_fft + 2 | |
| self.out = torch.nn.Linear(dim, out_dim) | |
| self.istft = ISTFT(n_fft=n_fft, hop_length=hop_length, win_length=n_fft, padding=padding) | |
| def forward(self, x: torch.Tensor) -> torch.Tensor: | |
| """ | |
| Forward pass of the ISTFTHead module. | |
| Args: | |
| x (Tensor): Input tensor of shape (B, L, H), where B is the batch size, | |
| L is the sequence length, and H denotes the model dimension. | |
| Returns: | |
| Tensor: Reconstructed time-domain audio signal of shape (B, T), where T is the length of the output signal. | |
| """ | |
| x_pred = self.out(x ) | |
| # x_pred = x | |
| x_pred = x_pred.transpose(1, 2) | |
| mag, p = x_pred.chunk(2, dim=1) | |
| mag = torch.exp(mag) | |
| mag = torch.clip(mag, max=1e2) # safeguard to prevent excessively large magnitudes | |
| # wrapping happens here. These two lines produce real and imaginary value | |
| x = torch.cos(p) | |
| y = torch.sin(p) | |
| # recalculating phase here does not produce anything new | |
| # only costs time | |
| # phase = torch.atan2(y, x) | |
| # S = mag * torch.exp(phase * 1j) | |
| # better directly produce the complex value | |
| S = mag * (x + 1j * y) | |
| audio = self.istft(S) | |
| return audio.unsqueeze(1),x_pred | |
| def nonlinearity(x): | |
| # swish | |
| return x * torch.sigmoid(x) | |
| def Normalize(in_channels, num_groups=32): | |
| return torch.nn.GroupNorm(num_groups=num_groups, num_channels=in_channels, eps=1e-6, affine=True) | |
| class ResnetBlock(nn.Module): | |
| def __init__(self, *, in_channels, out_channels=None, conv_shortcut=False, | |
| dropout, temb_channels=512): | |
| super().__init__() | |
| self.in_channels = in_channels | |
| out_channels = in_channels if out_channels is None else out_channels | |
| self.out_channels = out_channels | |
| self.use_conv_shortcut = conv_shortcut | |
| self.norm1 = Normalize(in_channels) | |
| self.conv1 = torch.nn.Conv1d(in_channels, | |
| out_channels, | |
| kernel_size=3, | |
| stride=1, | |
| padding=1) | |
| if temb_channels > 0: | |
| self.temb_proj = torch.nn.Linear(temb_channels, | |
| out_channels) | |
| self.norm2 = Normalize(out_channels) | |
| self.dropout = torch.nn.Dropout(dropout) | |
| self.conv2 = torch.nn.Conv1d(out_channels, | |
| out_channels, | |
| kernel_size=3, | |
| stride=1, | |
| padding=1) | |
| if self.in_channels != self.out_channels: | |
| if self.use_conv_shortcut: | |
| self.conv_shortcut = torch.nn.Conv1d(in_channels, | |
| out_channels, | |
| kernel_size=3, | |
| stride=1, | |
| padding=1) | |
| else: | |
| self.nin_shortcut = torch.nn.Conv1d(in_channels, | |
| out_channels, | |
| kernel_size=1, | |
| stride=1, | |
| padding=0) | |
| def forward(self, x, temb=None): | |
| h = x | |
| h = self.norm1(h) | |
| h = nonlinearity(h) | |
| h = self.conv1(h) | |
| if temb is not None: | |
| h = h + self.temb_proj(nonlinearity(temb))[:, :, None, None] | |
| h = self.norm2(h) | |
| h = nonlinearity(h) | |
| h = self.dropout(h) | |
| h = self.conv2(h) | |
| if self.in_channels != self.out_channels: | |
| if self.use_conv_shortcut: | |
| x = self.conv_shortcut(x) | |
| else: | |
| x = self.nin_shortcut(x) | |
| return x + h | |
| class AttnBlock(nn.Module): | |
| def __init__(self, in_channels): | |
| super().__init__() | |
| self.in_channels = in_channels | |
| self.norm = Normalize(in_channels) | |
| self.q = torch.nn.Conv1d(in_channels, | |
| in_channels, | |
| kernel_size=1, | |
| stride=1, | |
| padding=0) | |
| self.k = torch.nn.Conv1d(in_channels, | |
| in_channels, | |
| kernel_size=1, | |
| stride=1, | |
| padding=0) | |
| self.v = torch.nn.Conv1d(in_channels, | |
| in_channels, | |
| kernel_size=1, | |
| stride=1, | |
| padding=0) | |
| self.proj_out = torch.nn.Conv1d(in_channels, | |
| in_channels, | |
| kernel_size=1, | |
| stride=1, | |
| padding=0) | |
| def forward(self, x): | |
| h_ = x | |
| h_ = self.norm(h_) | |
| q = self.q(h_) | |
| k = self.k(h_) | |
| v = self.v(h_) | |
| # compute attention | |
| b, c, h = q.shape | |
| q = q.permute(0, 2, 1) # b,hw,c | |
| w_ = torch.bmm(q, k) # b,hw,hw w[b,i,j]=sum_c q[b,i,c]k[b,c,j] | |
| w_ = w_ * (int(c) ** (-0.5)) | |
| w_ = torch.nn.functional.softmax(w_, dim=2) | |
| # attend to values | |
| w_ = w_.permute(0, 2, 1) # b,hw,hw (first hw of k, second of q) | |
| h_ = torch.bmm(v, w_) # b, c,hw (hw of q) h_[b,c,j] = sum_i v[b,c,i] w_[b,i,j] | |
| h_ = self.proj_out(h_) | |
| return x + h_ | |
| def make_attn(in_channels, attn_type="vanilla"): | |
| assert attn_type in ["vanilla", "linear", "none"], f'attn_type {attn_type} unknown' | |
| print(f"making attention of type '{attn_type}' with {in_channels} in_channels") | |
| if attn_type == "vanilla": | |
| return AttnBlock(in_channels) | |
| class Backbone(nn.Module): | |
| """Base class for the generator's backbone. It preserves the same temporal resolution across all layers.""" | |
| def forward(self, x: torch.Tensor, **kwargs) -> torch.Tensor: | |
| """ | |
| Args: | |
| x (Tensor): Input tensor of shape (B, C, L), where B is the batch size, | |
| C denotes output features, and L is the sequence length. | |
| Returns: | |
| Tensor: Output of shape (B, L, H), where B is the batch size, L is the sequence length, | |
| and H denotes the model dimension. | |
| """ | |
| raise NotImplementedError("Subclasses must implement the forward method.") | |
| class VocosBackbone(Backbone): | |
| """ | |
| Vocos backbone module built with ConvNeXt blocks. Supports additional conditioning with Adaptive Layer Normalization | |
| Args: | |
| input_channels (int): Number of input features channels. | |
| dim (int): Hidden dimension of the model. | |
| intermediate_dim (int): Intermediate dimension used in ConvNeXtBlock. | |
| num_layers (int): Number of ConvNeXtBlock layers. | |
| layer_scale_init_value (float, optional): Initial value for layer scaling. Defaults to `1 / num_layers`. | |
| adanorm_num_embeddings (int, optional): Number of embeddings for AdaLayerNorm. | |
| None means non-conditional model. Defaults to None. | |
| """ | |
| def __init__( | |
| self, hidden_dim=1024,depth=12,heads=16,pos_meb_dim=64): | |
| super().__init__() | |
| self.embed = nn.Conv1d(hidden_dim, hidden_dim, kernel_size=7, padding=3) | |
| self.temb_ch = 0 | |
| block_in = hidden_dim | |
| dropout = 0.1 | |
| prior_net : tp.List[nn.Module] = [ | |
| ResnetBlock(in_channels=block_in,out_channels=block_in, | |
| temb_channels=self.temb_ch,dropout=dropout), | |
| ResnetBlock(in_channels=block_in,out_channels=block_in, | |
| temb_channels=self.temb_ch,dropout=dropout), | |
| ] | |
| self.prior_net = nn.Sequential(*prior_net) | |
| depth = depth | |
| time_rotary_embed = RotaryPositionalEmbeddings(dim=pos_meb_dim) | |
| transformer_blocks = [ | |
| TransformerBlock(dim=hidden_dim, n_heads=heads, rotary_embed=time_rotary_embed) | |
| for _ in range(depth) | |
| ] | |
| self.transformers = nn.Sequential(*transformer_blocks) | |
| self.final_layer_norm = nn.LayerNorm(hidden_dim, eps=1e-6) | |
| post_net : tp.List[nn.Module] = [ | |
| ResnetBlock(in_channels=block_in,out_channels=block_in, | |
| temb_channels=self.temb_ch,dropout=dropout), | |
| ResnetBlock(in_channels=block_in,out_channels=block_in, | |
| temb_channels=self.temb_ch,dropout=dropout), | |
| ] | |
| self.post_net = nn.Sequential(*post_net) | |
| def forward(self, x: torch.Tensor ) -> torch.Tensor: | |
| x = x.transpose(1, 2) | |
| x = self.embed(x) | |
| x = self.prior_net(x) | |
| x = x.transpose(1, 2) | |
| x= self.transformers(x) | |
| x = x.transpose(1, 2) | |
| x = self.post_net(x) | |
| x = x.transpose(1, 2) | |
| x = self.final_layer_norm(x) | |
| return x | |
| def init_weights(m): | |
| if isinstance(m, nn.Conv1d): | |
| nn.init.trunc_normal_(m.weight, std=0.02) | |
| nn.init.constant_(m.bias, 0) | |
| class CodecDecoderVocos(nn.Module): | |
| def __init__(self, | |
| hidden_dim=1024, | |
| depth=12, | |
| heads=16, | |
| pos_meb_dim=64, | |
| hop_length=320, | |
| vq_num_quantizers=1, | |
| vq_dim=2048, #1024 2048 | |
| vq_commit_weight=0.25, | |
| vq_weight_init=False, | |
| vq_full_commit_loss=False, | |
| codebook_size=16384, | |
| codebook_dim=16, | |
| ): | |
| super().__init__() | |
| self.hop_length = hop_length | |
| self.quantizer = ResidualFSQ( | |
| dim = vq_dim, | |
| levels = [4, 4, 4, 4, 4,4,4,4], | |
| num_quantizers = 1 | |
| ) | |
| # self.quantizer = ResidualVQ( | |
| # num_quantizers=vq_num_quantizers, | |
| # dim=vq_dim, | |
| # codebook_size=codebook_size, | |
| # codebook_dim=codebook_dim, | |
| # threshold_ema_dead_code=2, | |
| # commitment=vq_commit_weight, | |
| # weight_init=vq_weight_init, | |
| # full_commit_loss=vq_full_commit_loss, | |
| # ) | |
| self.backbone = VocosBackbone( hidden_dim=hidden_dim,depth=depth,heads=heads,pos_meb_dim=pos_meb_dim) | |
| self.head = ISTFTHead(dim=hidden_dim, n_fft=self.hop_length*4, hop_length=self.hop_length, padding="same") | |
| self.reset_parameters() | |
| def forward(self, x, vq=True): | |
| if vq is True: | |
| # x, q, commit_loss = self.quantizer(x) | |
| x = x.permute(0, 2, 1) | |
| x, q = self.quantizer(x) | |
| x = x.permute(0, 2, 1) | |
| q = q.permute(0, 2, 1) | |
| return x, q, None | |
| x = self.backbone(x) | |
| x,_ = self.head(x) | |
| return x ,_ | |
| def vq2emb(self, vq): | |
| self.quantizer = self.quantizer.eval() | |
| x = self.quantizer.vq2emb(vq) | |
| return x | |
| def get_emb(self): | |
| self.quantizer = self.quantizer.eval() | |
| embs = self.quantizer.get_emb() | |
| return embs | |
| def inference_vq(self, vq): | |
| x = vq[None,:,:] | |
| x = self.model(x) | |
| return x | |
| def inference_0(self, x): | |
| x, q, loss, perp = self.quantizer(x) | |
| x = self.model(x) | |
| return x, None | |
| def inference(self, x): | |
| x = self.model(x) | |
| return x, None | |
| def remove_weight_norm(self): | |
| """Remove weight normalization module from all of the layers.""" | |
| def _remove_weight_norm(m): | |
| try: | |
| torch.nn.utils.remove_weight_norm(m) | |
| except ValueError: # this module didn't have weight norm | |
| return | |
| self.apply(_remove_weight_norm) | |
| def apply_weight_norm(self): | |
| """Apply weight normalization module from all of the layers.""" | |
| def _apply_weight_norm(m): | |
| if isinstance(m, nn.Conv1d) or isinstance(m, nn.ConvTranspose1d): | |
| torch.nn.utils.weight_norm(m) | |
| self.apply(_apply_weight_norm) | |
| def reset_parameters(self): | |
| self.apply(init_weights) | |
| class CodecDecoderVocos_transpose(nn.Module): | |
| def __init__(self, | |
| hidden_dim=1024, | |
| depth=12, | |
| heads=16, | |
| pos_meb_dim=64, | |
| hop_length=320, | |
| vq_num_quantizers=1, | |
| vq_dim=1024, #1024 2048 | |
| vq_commit_weight=0.25, | |
| vq_weight_init=False, | |
| vq_full_commit_loss=False, | |
| codebook_size=16384, | |
| codebook_dim=16, | |
| ): | |
| super().__init__() | |
| self.hop_length = hop_length | |
| self.quantizer = ResidualVQ( | |
| num_quantizers=vq_num_quantizers, | |
| dim=vq_dim, | |
| codebook_size=codebook_size, | |
| codebook_dim=codebook_dim, | |
| threshold_ema_dead_code=2, | |
| commitment=vq_commit_weight, | |
| weight_init=vq_weight_init, | |
| full_commit_loss=vq_full_commit_loss, | |
| ) | |
| self.backbone = VocosBackbone( hidden_dim=hidden_dim,depth=depth,heads=heads,pos_meb_dim=pos_meb_dim) | |
| self.inverse_mel_conv = nn.Sequential( | |
| nn.GELU(), | |
| nn.ConvTranspose1d( | |
| in_channels=hidden_dim, | |
| out_channels=hidden_dim, | |
| kernel_size=3, | |
| stride=2, | |
| padding=1, | |
| output_padding=1 # 确保输出长度与编码前匹配 | |
| ), | |
| nn.GELU(), | |
| nn.ConvTranspose1d( | |
| in_channels=hidden_dim, | |
| out_channels=hidden_dim, | |
| kernel_size=3, | |
| padding=1 | |
| ) | |
| ) | |
| self.head = ISTFTHead(dim=hidden_dim, n_fft=self.hop_length*4, hop_length=self.hop_length, padding="same") | |
| self.reset_parameters() | |
| def forward(self, x, vq=True): | |
| if vq is True: | |
| x, q, commit_loss = self.quantizer(x) | |
| return x, q, commit_loss | |
| x = self.backbone(x) | |
| x,_ = self.head(x) | |
| return x ,_ | |
| def vq2emb(self, vq): | |
| self.quantizer = self.quantizer.eval() | |
| x = self.quantizer.vq2emb(vq) | |
| return x | |
| def get_emb(self): | |
| self.quantizer = self.quantizer.eval() | |
| embs = self.quantizer.get_emb() | |
| return embs | |
| def inference_vq(self, vq): | |
| x = vq[None,:,:] | |
| x = self.model(x) | |
| return x | |
| def inference_0(self, x): | |
| x, q, loss, perp = self.quantizer(x) | |
| x = self.model(x) | |
| return x, None | |
| def inference(self, x): | |
| x = self.model(x) | |
| return x, None | |
| def remove_weight_norm(self): | |
| """Remove weight normalization module from all of the layers.""" | |
| def _remove_weight_norm(m): | |
| try: | |
| torch.nn.utils.remove_weight_norm(m) | |
| except ValueError: # this module didn't have weight norm | |
| return | |
| self.apply(_remove_weight_norm) | |
| def apply_weight_norm(self): | |
| """Apply weight normalization module from all of the layers.""" | |
| def _apply_weight_norm(m): | |
| if isinstance(m, nn.Conv1d) or isinstance(m, nn.ConvTranspose1d): | |
| torch.nn.utils.weight_norm(m) | |
| self.apply(_apply_weight_norm) | |
| def reset_parameters(self): | |
| self.apply(init_weights) | |
| def main(): | |
| # 设置设备 | |
| device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| print(f"Using device: {device}") | |
| # 初始化模型 | |
| model = CodecDecoderVocos_transpose().to(device) | |
| print("Model initialized.") | |
| # 创建测试输入: batch_size x in_channels x sequence_length | |
| batch_size = 2 | |
| in_channels = 1024 | |
| sequence_length = 50 # 示例长度,可以根据需要调整 | |
| dummy_input = torch.randn(batch_size, in_channels, sequence_length).to(device) | |
| print(f"Dummy input shape: {dummy_input.shape}") | |
| # 将模型设为评估模式 | |
| model.eval() | |
| # 前向传播(使用 VQ) | |
| # with torch.no_grad(): | |
| # try: | |
| # output, q, commit_loss = model(dummy_input, vq=True) | |
| # print("Forward pass with VQ:") | |
| # print(f"Output shape: {output.shape}") | |
| # print(f"Quantized codes shape: {q.shape}") | |
| # print(f"Commitment loss: {commit_loss}") | |
| # except Exception as e: | |
| # print(f"Error during forward pass with VQ: {e}") | |
| # 前向传播(不使用 VQ) | |
| with torch.no_grad(): | |
| # try: | |
| output_no_vq = model(dummy_input, vq=False) | |
| print("\nForward pass without VQ:") | |
| print(f"Output shape: {output_no_vq.shape}") | |
| c=1 | |
| # except Exception as e: | |
| # print(f"Error during forward pass without VQ: {e}") | |
| # model_size_bytes = sum(p.numel() * p.element_size() for p in model.parameters()) | |
| # model_size_mb = model_size_bytes / (1024 ** 2) | |
| # print(f"Model size: {model_size_bytes} bytes ({model_size_mb:.2f} MB)") | |
| if __name__ == "__main__": | |
| main() |