| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
|
|
|
|
| class VectorQuantize(nn.Module): |
| """Vector quantization w/ exponential moving averages (EMA)""" |
|
|
| def __init__( |
| self, |
| dim: int, |
| codebook_size: int, |
| decay=0.8, |
| commitment=1.0, |
| eps=1e-5, |
| n_embed=None, |
| ): |
| super().__init__() |
| n_embed = self.default(n_embed, codebook_size) |
|
|
| self.dim = dim |
| self.n_embed = n_embed |
| self.decay = decay |
| self.eps = eps |
| self.commitment = commitment |
|
|
| embed = torch.randn(dim, n_embed) |
| self.register_buffer("embed", embed) |
| self.register_buffer("cluster_size", torch.zeros(n_embed)) |
| self.register_buffer("embed_avg", embed.clone()) |
|
|
| @property |
| def codebook(self): |
| return self.embed.transpose(0, 1) |
|
|
| def exists(self, val): |
| return val is not None |
|
|
| def default(self, val, d): |
| return val if self.exists(val) else d |
|
|
| def ema_inplace(self, moving_avg, new, decay): |
| moving_avg.data.mul_(decay).add_(new, alpha=(1 - decay)) |
|
|
| def laplace_smoothing(self, x, n_categories, eps=1e-5): |
| return (x + eps) / (x.sum() + n_categories * eps) |
|
|
| def forward(self, input): |
| dtype = input.dtype |
| flatten = input.reshape(-1, self.dim) |
| dist = ( |
| flatten.pow(2).sum(1, keepdim=True) |
| - 2 * flatten @ self.embed |
| + self.embed.pow(2).sum(0, keepdim=True) |
| ) |
| _, embed_ind = (-dist).max(1) |
| embed_onehot = F.one_hot(embed_ind, self.n_embed).type(dtype) |
| embed_ind = embed_ind.view(*input.shape[:-1]) |
| quantize = F.embedding(embed_ind, self.embed.transpose(0, 1)) |
|
|
| if self.training: |
| self.ema_inplace(self.cluster_size, embed_onehot.sum(0), self.decay) |
| embed_sum = flatten.transpose(0, 1) @ embed_onehot |
| self.ema_inplace(self.embed_avg, embed_sum, self.decay) |
| cluster_size = ( |
| self.laplace_smoothing(self.cluster_size, self.n_embed, self.eps) |
| * self.cluster_size.sum() |
| ) |
| embed_normalized = self.embed_avg / cluster_size.unsqueeze(0) |
| self.embed.data.copy_(embed_normalized) |
|
|
| loss = F.mse_loss(quantize.detach(), input) * self.commitment |
| quantize = input + (quantize - input).detach() |
|
|
| avg_probs = torch.mean(embed_onehot, dim=0) |
| perplexity = torch.exp(-torch.sum(avg_probs * torch.log(avg_probs + 1e-10))) |
|
|
| return quantize, loss, perplexity |
|
|
| def forward_index(self, input): |
| dtype = input.dtype |
| flatten = input.reshape(-1, self.dim) |
| dist = ( |
| flatten.pow(2).sum(1, keepdim=True) |
| - 2 * flatten @ self.embed |
| + self.embed.pow(2).sum(0, keepdim=True) |
| ) |
| _, embed_ind = (-dist).max(1) |
| embed_onehot = F.one_hot(embed_ind, self.n_embed).type(dtype) |
| embed_ind = embed_ind.view(*input.shape[:-1]) |
| quantize = F.embedding(embed_ind, self.embed.transpose(0, 1)) |
| quantize = input + (quantize - input).detach() |
|
|
| return quantize, embed_ind |
|
|
|
|
| class ResidualVQ(nn.Module): |
| """Residual VQ following algorithm 1. in https://arxiv.org/pdf/2107.03312.pdf""" |
|
|
| def __init__(self, *, num_quantizers, **kwargs): |
| super().__init__() |
| self.layers = nn.ModuleList( |
| [VectorQuantize(**kwargs) for _ in range(num_quantizers)] |
| ) |
|
|
| def forward(self, x): |
| quantized_out = 0.0 |
| residual = x |
| all_losses = [] |
| all_perplexities = [] |
| for layer in self.layers: |
| quantized, loss, perplexity = layer(residual) |
| |
| |
| |
| residual = ( |
| residual - quantized |
| ) |
| quantized_out = quantized_out + quantized |
| all_losses.append(loss) |
| all_perplexities.append(perplexity) |
| all_losses, all_perplexities = map(torch.stack, (all_losses, all_perplexities)) |
| return quantized_out, all_losses, all_perplexities |
|
|
| def forward_index(self, x, flatten_idx=False): |
| """ |
| all_indices: [num_of_quantizers, B, T] |
| """ |
| quantized_out = 0.0 |
| residual = x |
| all_indices = [] |
| for i, layer in enumerate(self.layers): |
| quantized, indices = layer.forward_index(residual) |
| |
| residual = residual - quantized |
| quantized_out = quantized_out + quantized |
| if flatten_idx: |
| indices += self.codebook_size * i |
| all_indices.append(indices) |
| all_indices = torch.stack(all_indices) |
| return quantized_out, all_indices |
|
|
| def initial(self): |
| self.codebook = [] |
| for layer in self.layers: |
| self.codebook.append(layer.codebook) |
| self.codebook_size = self.codebook[0].size(0) |
| self.codebook = torch.stack(self.codebook) |
| self.codebook = self.codebook.reshape(-1, self.codebook.size(-1)) |
|
|
| def lookup(self, indices): |
| quantized_out = F.embedding(indices, self.codebook) |
| return torch.sum(quantized_out, dim=0, keepdim=True) |
|
|
|
|
| class Quantizer(nn.Module): |
| def __init__( |
| self, |
| code_dim: int, |
| codebook_num: int, |
| codebook_size: int, |
| ): |
| super().__init__() |
| self.codebook = ResidualVQ( |
| dim=code_dim, num_quantizers=codebook_num, codebook_size=codebook_size |
| ) |
|
|
| def initial(self): |
| self.codebook.initial() |
|
|
| def forward(self, z): |
| zq, vqloss, perplexity = self.codebook(z.transpose(2, 1)) |
| zq = zq.transpose(2, 1) |
| return zq, vqloss, perplexity |
|
|
| def inference(self, z): |
| zq, indices = self.codebook.forward_index(z.transpose(2, 1)) |
| zq = zq.transpose(2, 1) |
| return zq, indices |
|
|
| def encode(self, z): |
| zq, indices = self.codebook.forward_index(z.transpose(2, 1), flatten_idx=True) |
| return zq, indices |
|
|
| def decode(self, indices): |
| z = self.codebook.lookup(indices) |
| return z |
|
|
|
|
| class Conv1d1x1(nn.Conv1d): |
| """1x1 Conv1d.""" |
|
|
| def __init__(self, in_channels, out_channels, bias=True): |
| super(Conv1d1x1, self).__init__( |
| in_channels, out_channels, kernel_size=1, bias=bias |
| ) |
|
|
|
|
| class Conv1d(nn.Module): |
| def __init__( |
| self, |
| in_channels: int, |
| out_channels: int, |
| kernel_size: int, |
| stride: int = 1, |
| padding: int = -1, |
| dilation: int = 1, |
| groups: int = 1, |
| bias: bool = True, |
| ): |
| super().__init__() |
| self.in_channels = in_channels |
| self.out_channels = out_channels |
| self.kernel_size = kernel_size |
| if padding < 0: |
| padding = (kernel_size - 1) // 2 * dilation |
| self.dilation = dilation |
| self.conv = nn.Conv1d( |
| in_channels=in_channels, |
| out_channels=out_channels, |
| kernel_size=kernel_size, |
| stride=stride, |
| padding=padding, |
| dilation=dilation, |
| groups=groups, |
| bias=bias, |
| ) |
|
|
| def forward(self, x): |
| """ |
| Args: |
| x (Tensor): Float tensor variable with the shape (B, C, T). |
| Returns: |
| Tensor: Float tensor variable with the shape (B, C, T). |
| """ |
| x = self.conv(x) |
| return x |
|
|
|
|
| class ConvTranspose1d(nn.Module): |
| def __init__( |
| self, |
| in_channels: int, |
| out_channels: int, |
| kernel_size: int, |
| stride: int, |
| padding=-1, |
| output_padding=-1, |
| groups=1, |
| bias=True, |
| ): |
| super().__init__() |
| if padding < 0: |
| padding = (stride + 1) // 2 |
| if output_padding < 0: |
| output_padding = 1 if stride % 2 else 0 |
| self.deconv = nn.ConvTranspose1d( |
| in_channels=in_channels, |
| out_channels=out_channels, |
| kernel_size=kernel_size, |
| stride=stride, |
| padding=padding, |
| output_padding=output_padding, |
| groups=groups, |
| bias=bias, |
| ) |
|
|
| def forward(self, x): |
| """ |
| Args: |
| x (Tensor): Float tensor variable with the shape (B, C, T). |
| Returns: |
| Tensor: Float tensor variable with the shape (B, C', T'). |
| """ |
| x = self.deconv(x) |
| return x |
|
|
|
|
| class ResidualUnit(nn.Module): |
| def __init__( |
| self, |
| in_channels: int, |
| out_channels: int, |
| kernel_size=3, |
| dilation=1, |
| bias=False, |
| nonlinear_activation="ELU", |
| nonlinear_activation_params={}, |
| ): |
| super().__init__() |
| self.activation = getattr(nn, nonlinear_activation)( |
| **nonlinear_activation_params |
| ) |
| self.conv1 = Conv1d( |
| in_channels=in_channels, |
| out_channels=out_channels, |
| kernel_size=kernel_size, |
| stride=1, |
| dilation=dilation, |
| bias=bias, |
| ) |
| self.conv2 = Conv1d1x1(out_channels, out_channels, bias) |
|
|
| def forward(self, x): |
| y = self.conv1(self.activation(x)) |
| y = self.conv2(self.activation(y)) |
| return x + y |
|
|
|
|
| class Projector(nn.Module): |
| def __init__( |
| self, input_channels: int, code_dim: int, kernel_size=3, stride=1, bias=False |
| ): |
| super().__init__() |
| self.project = Conv1d( |
| input_channels, code_dim, kernel_size=kernel_size, stride=stride, bias=bias |
| ) |
|
|
| def forward(self, x): |
| return self.project(x) |
|
|
|
|
| class EncoderBlock(nn.Module): |
| def __init__( |
| self, |
| in_channels: int, |
| out_channels: int, |
| stride: int, |
| dilations=(1, 1), |
| unit_kernel_size=3, |
| bias=True, |
| ): |
| super().__init__() |
| self.res_units = torch.nn.ModuleList() |
| for dilation in dilations: |
| self.res_units += [ |
| ResidualUnit( |
| in_channels, |
| in_channels, |
| kernel_size=unit_kernel_size, |
| dilation=dilation, |
| ) |
| ] |
| self.num_res = len(self.res_units) |
|
|
| self.conv = Conv1d( |
| in_channels=in_channels, |
| out_channels=out_channels, |
| kernel_size=( |
| 3 if stride == 1 else (2 * stride) |
| ), |
| stride=stride, |
| bias=bias, |
| ) |
|
|
| def forward(self, x): |
| for idx in range(self.num_res): |
| x = self.res_units[idx](x) |
| x = self.conv(x) |
| return x |
|
|
|
|
| class Encoder(nn.Module): |
| def __init__( |
| self, |
| input_channels: int, |
| encode_channels: int, |
| channel_ratios=(1, 1), |
| strides=(1, 1), |
| kernel_size=3, |
| bias=True, |
| block_dilations=(1, 1), |
| unit_kernel_size=3, |
| ): |
| super().__init__() |
| assert len(channel_ratios) == len(strides) |
|
|
| self.conv = Conv1d( |
| in_channels=input_channels, |
| out_channels=encode_channels, |
| kernel_size=kernel_size, |
| stride=1, |
| bias=False, |
| ) |
| self.conv_blocks = torch.nn.ModuleList() |
| in_channels = encode_channels |
| for idx, stride in enumerate(strides): |
| out_channels = int(encode_channels * channel_ratios[idx]) |
| self.conv_blocks += [ |
| EncoderBlock( |
| in_channels, |
| out_channels, |
| stride, |
| dilations=block_dilations, |
| unit_kernel_size=unit_kernel_size, |
| bias=bias, |
| ) |
| ] |
| in_channels = out_channels |
| self.num_blocks = len(self.conv_blocks) |
| self.out_channels = out_channels |
|
|
| def forward(self, x): |
| x = self.conv(x) |
| for i in range(self.num_blocks): |
| x = self.conv_blocks[i](x) |
| return x |
|
|
|
|
| class DecoderBlock(nn.Module): |
| """Decoder block (no up-sampling)""" |
|
|
| def __init__( |
| self, |
| in_channels: int, |
| out_channels: int, |
| stride: int, |
| dilations=(1, 1), |
| unit_kernel_size=3, |
| bias=True, |
| ): |
| super().__init__() |
|
|
| if stride == 1: |
| self.conv = Conv1d( |
| in_channels=in_channels, |
| out_channels=out_channels, |
| kernel_size=3, |
| stride=stride, |
| bias=bias, |
| ) |
| else: |
| self.conv = ConvTranspose1d( |
| in_channels=in_channels, |
| out_channels=out_channels, |
| kernel_size=(2 * stride), |
| stride=stride, |
| bias=bias, |
| ) |
|
|
| self.res_units = torch.nn.ModuleList() |
| for idx, dilation in enumerate(dilations): |
| self.res_units += [ |
| ResidualUnit( |
| out_channels, |
| out_channels, |
| kernel_size=unit_kernel_size, |
| dilation=dilation, |
| ) |
| ] |
| self.num_res = len(self.res_units) |
|
|
| def forward(self, x): |
| x = self.conv(x) |
| for idx in range(self.num_res): |
| x = self.res_units[idx](x) |
| return x |
|
|
|
|
| class Decoder(nn.Module): |
| def __init__( |
| self, |
| code_dim: int, |
| output_channels: int, |
| decode_channels: int, |
| channel_ratios=(1, 1), |
| strides=(1, 1), |
| kernel_size=3, |
| bias=True, |
| block_dilations=(1, 1), |
| unit_kernel_size=3, |
| ): |
| super().__init__() |
| assert len(channel_ratios) == len(strides) |
|
|
| self.conv1 = Conv1d( |
| in_channels=code_dim, |
| out_channels=int(decode_channels * channel_ratios[0]), |
| kernel_size=kernel_size, |
| stride=1, |
| bias=False, |
| ) |
|
|
| self.conv_blocks = torch.nn.ModuleList() |
| for idx, stride in enumerate(strides): |
| in_channels = int(decode_channels * channel_ratios[idx]) |
| if idx < (len(channel_ratios) - 1): |
| out_channels = int(decode_channels * channel_ratios[idx + 1]) |
| else: |
| out_channels = decode_channels |
| self.conv_blocks += [ |
| DecoderBlock( |
| in_channels, |
| out_channels, |
| stride, |
| dilations=block_dilations, |
| unit_kernel_size=unit_kernel_size, |
| bias=bias, |
| ) |
| ] |
| self.num_blocks = len(self.conv_blocks) |
|
|
| self.conv2 = Conv1d(out_channels, output_channels, kernel_size, 1, bias=False) |
|
|
| def forward(self, z): |
| x = self.conv1(z) |
| for i in range(self.num_blocks): |
| x = self.conv_blocks[i](x) |
| x = self.conv2(x) |
| return x |
|
|
|
|
| class VevoRepCodec(nn.Module): |
| def __init__( |
| self, |
| input_channels=768, |
| output_channels=768, |
| encode_channels=768, |
| decode_channels=768, |
| code_dim=768, |
| codebook_num=1, |
| codebook_size=1024, |
| bias=True, |
| enc_ratios=(1, 1), |
| dec_ratios=(1, 1), |
| enc_strides=(1, 1), |
| dec_strides=(1, 1), |
| enc_kernel_size=3, |
| dec_kernel_size=3, |
| enc_block_dilations=(1, 1), |
| enc_block_kernel_size=3, |
| dec_block_dilations=(1, 1), |
| dec_block_kernel_size=3, |
| ): |
| super().__init__() |
|
|
| self.input_channels = input_channels |
|
|
| self.encoder = Encoder( |
| input_channels=input_channels, |
| encode_channels=encode_channels, |
| channel_ratios=enc_ratios, |
| strides=enc_strides, |
| kernel_size=enc_kernel_size, |
| bias=bias, |
| block_dilations=enc_block_dilations, |
| unit_kernel_size=enc_block_kernel_size, |
| ) |
|
|
| self.decoder = Decoder( |
| code_dim=code_dim, |
| output_channels=output_channels, |
| decode_channels=decode_channels, |
| channel_ratios=dec_ratios, |
| strides=dec_strides, |
| kernel_size=dec_kernel_size, |
| bias=bias, |
| block_dilations=dec_block_dilations, |
| unit_kernel_size=dec_block_kernel_size, |
| ) |
|
|
| self.projector = Projector( |
| input_channels=self.encoder.out_channels, |
| code_dim=code_dim, |
| kernel_size=3, |
| stride=1, |
| bias=False, |
| ) |
|
|
| self.quantizer = Quantizer( |
| code_dim=code_dim, codebook_num=codebook_num, codebook_size=codebook_size |
| ) |
|
|
| def forward(self, x): |
| x = self.encoder(x) |
| z = self.projector(x) |
| zq, vqloss, perplexity = self.quantizer(z) |
| y = self.decoder(zq) |
| return y, zq, z, vqloss, perplexity |
|
|