| | |
| | """ |
| | RVC v2 合成器模型定义 |
| | """ |
| | import math |
| | import torch |
| | import torch.nn as nn |
| | import torch.nn.functional as F |
| | from typing import Optional, Tuple |
| | import numpy as np |
| |
|
| |
|
| | class LayerNorm(nn.Module): |
| | """Layer normalization for channels-first tensors""" |
| |
|
| | def __init__(self, channels: int, eps: float = 1e-5): |
| | super().__init__() |
| | self.channels = channels |
| | self.eps = eps |
| | self.gamma = nn.Parameter(torch.ones(channels)) |
| | self.beta = nn.Parameter(torch.zeros(channels)) |
| |
|
| | def forward(self, x): |
| | |
| | x = x.transpose(1, -1) |
| | x = F.layer_norm(x, (self.channels,), self.gamma, self.beta, self.eps) |
| | return x.transpose(1, -1) |
| |
|
| |
|
| | class MultiHeadAttention(nn.Module): |
| | """Multi-head attention module""" |
| |
|
| | def __init__(self, channels: int, out_channels: int, n_heads: int, |
| | p_dropout: float = 0.0, window_size: Optional[int] = None, |
| | heads_share: bool = True, block_length: Optional[int] = None, |
| | proximal_bias: bool = False, proximal_init: bool = False): |
| | super().__init__() |
| | assert channels % n_heads == 0 |
| |
|
| | self.channels = channels |
| | self.out_channels = out_channels |
| | self.n_heads = n_heads |
| | self.p_dropout = p_dropout |
| | self.window_size = window_size |
| | self.heads_share = heads_share |
| | self.block_length = block_length |
| | self.proximal_bias = proximal_bias |
| | self.proximal_init = proximal_init |
| | self.attn = None |
| |
|
| | self.k_channels = channels // n_heads |
| | self.conv_q = nn.Conv1d(channels, channels, 1) |
| | self.conv_k = nn.Conv1d(channels, channels, 1) |
| | self.conv_v = nn.Conv1d(channels, channels, 1) |
| | self.conv_o = nn.Conv1d(channels, out_channels, 1) |
| | self.drop = nn.Dropout(p_dropout) |
| |
|
| | if window_size is not None: |
| | n_heads_rel = 1 if heads_share else n_heads |
| | rel_stddev = self.k_channels ** -0.5 |
| | self.emb_rel_k = nn.Parameter( |
| | torch.randn(n_heads_rel, window_size * 2 + 1, self.k_channels) * rel_stddev |
| | ) |
| | self.emb_rel_v = nn.Parameter( |
| | torch.randn(n_heads_rel, window_size * 2 + 1, self.k_channels) * rel_stddev |
| | ) |
| |
|
| | nn.init.xavier_uniform_(self.conv_q.weight) |
| | nn.init.xavier_uniform_(self.conv_k.weight) |
| | nn.init.xavier_uniform_(self.conv_v.weight) |
| | if proximal_init: |
| | with torch.no_grad(): |
| | self.conv_k.weight.copy_(self.conv_q.weight) |
| | self.conv_k.bias.copy_(self.conv_q.bias) |
| |
|
| | def forward(self, x, c, attn_mask=None): |
| | q = self.conv_q(x) |
| | k = self.conv_k(c) |
| | v = self.conv_v(c) |
| |
|
| | x, self.attn = self.attention(q, k, v, mask=attn_mask) |
| |
|
| | x = self.conv_o(x) |
| | return x |
| |
|
| | def attention(self, query, key, value, mask=None): |
| | |
| | b, d, t_s = key.size() |
| | t_t = query.size(2) |
| |
|
| | query = query.view(b, self.n_heads, self.k_channels, t_t).transpose(2, 3) |
| | key = key.view(b, self.n_heads, self.k_channels, t_s).transpose(2, 3) |
| | value = value.view(b, self.n_heads, self.k_channels, t_s).transpose(2, 3) |
| |
|
| | scores = torch.matmul(query / math.sqrt(self.k_channels), key.transpose(-2, -1)) |
| |
|
| | if self.window_size is not None: |
| | assert t_s == t_t, "Relative attention only for self-attention" |
| | key_relative_embeddings = self._get_relative_embeddings(self.emb_rel_k, t_s) |
| | rel_logits = self._matmul_with_relative_keys(query / math.sqrt(self.k_channels), key_relative_embeddings) |
| | scores_local = self._relative_position_to_absolute_position(rel_logits) |
| | scores = scores + scores_local |
| |
|
| | if self.proximal_bias: |
| | assert t_s == t_t, "Proximal bias only for self-attention" |
| | scores = scores + self._attention_bias_proximal(t_s).to(device=scores.device, dtype=scores.dtype) |
| |
|
| | if mask is not None: |
| | scores = scores.masked_fill(mask == 0, -1e4) |
| | if self.block_length is not None: |
| | assert t_s == t_t, "Block length only for self-attention" |
| | block_mask = torch.ones_like(scores).triu(-self.block_length).tril(self.block_length) |
| | scores = scores.masked_fill(block_mask == 0, -1e4) |
| |
|
| | p_attn = F.softmax(scores, dim=-1) |
| | p_attn = self.drop(p_attn) |
| | output = torch.matmul(p_attn, value) |
| |
|
| | if self.window_size is not None: |
| | relative_weights = self._absolute_position_to_relative_position(p_attn) |
| | value_relative_embeddings = self._get_relative_embeddings(self.emb_rel_v, t_s) |
| | output = output + self._matmul_with_relative_values(relative_weights, value_relative_embeddings) |
| |
|
| | output = output.transpose(2, 3).contiguous().view(b, d, t_t) |
| | return output, p_attn |
| |
|
| | def _matmul_with_relative_values(self, x, y): |
| | ret = torch.matmul(x, y.unsqueeze(0)) |
| | return ret |
| |
|
| | def _matmul_with_relative_keys(self, x, y): |
| | ret = torch.matmul(x, y.unsqueeze(0).transpose(-2, -1)) |
| | return ret |
| |
|
| | def _get_relative_embeddings(self, relative_embeddings, length): |
| | max_relative_position = 2 * self.window_size + 1 |
| | pad_length = max(length - (self.window_size + 1), 0) |
| | slice_start_position = max((self.window_size + 1) - length, 0) |
| | slice_end_position = slice_start_position + 2 * length - 1 |
| | if pad_length > 0: |
| | padded_relative_embeddings = F.pad( |
| | relative_embeddings, |
| | (0, 0, pad_length, pad_length, 0, 0) |
| | ) |
| | else: |
| | padded_relative_embeddings = relative_embeddings |
| | used_relative_embeddings = padded_relative_embeddings[:, slice_start_position:slice_end_position] |
| | return used_relative_embeddings |
| |
|
| | def _relative_position_to_absolute_position(self, x): |
| | batch, heads, length, _ = x.size() |
| | x = F.pad(x, (0, 1, 0, 0, 0, 0, 0, 0)) |
| | x_flat = x.view(batch, heads, length * 2 * length) |
| | x_flat = F.pad(x_flat, (0, length - 1, 0, 0, 0, 0)) |
| | x_final = x_flat.view(batch, heads, length + 1, 2 * length - 1)[:, :, :length, length - 1:] |
| | return x_final |
| |
|
| | def _absolute_position_to_relative_position(self, x): |
| | batch, heads, length, _ = x.size() |
| | x = F.pad(x, (0, length - 1, 0, 0, 0, 0, 0, 0)) |
| | x_flat = x.view(batch, heads, length ** 2 + length * (length - 1)) |
| | x_flat = F.pad(x_flat, (length, 0, 0, 0, 0, 0)) |
| | x_final = x_flat.view(batch, heads, length, 2 * length)[:, :, :, 1:] |
| | return x_final |
| |
|
| | def _attention_bias_proximal(self, length): |
| | r = torch.arange(length, dtype=torch.float32) |
| | diff = torch.unsqueeze(r, 0) - torch.unsqueeze(r, 1) |
| | return torch.unsqueeze(torch.unsqueeze(-torch.log1p(torch.abs(diff)), 0), 0) |
| |
|
| |
|
| | class FFN(nn.Module): |
| | """Feed-forward network with optional causal convolution""" |
| |
|
| | def __init__(self, in_channels: int, out_channels: int, filter_channels: int, |
| | kernel_size: int, p_dropout: float = 0.0, activation: str = None, |
| | causal: bool = False): |
| | super().__init__() |
| | self.in_channels = in_channels |
| | self.out_channels = out_channels |
| | self.filter_channels = filter_channels |
| | self.kernel_size = kernel_size |
| | self.p_dropout = p_dropout |
| | self.activation = activation |
| | self.causal = causal |
| |
|
| | if causal: |
| | self.padding = self._causal_padding |
| | else: |
| | self.padding = self._same_padding |
| |
|
| | self.conv_1 = nn.Conv1d(in_channels, filter_channels, kernel_size) |
| | self.conv_2 = nn.Conv1d(filter_channels, out_channels, kernel_size) |
| | self.drop = nn.Dropout(p_dropout) |
| |
|
| | def forward(self, x, x_mask): |
| | x = self.conv_1(self.padding(x)) |
| | if self.activation == "gelu": |
| | x = x * torch.sigmoid(1.702 * x) |
| | else: |
| | x = torch.relu(x) |
| | x = self.drop(x) |
| | x = self.conv_2(self.padding(x)) |
| | return x * x_mask |
| |
|
| | def _causal_padding(self, x): |
| | if self.kernel_size == 1: |
| | return x |
| | pad_l = self.kernel_size - 1 |
| | pad_r = 0 |
| | return F.pad(x, (pad_l, pad_r, 0, 0, 0, 0)) |
| |
|
| | def _same_padding(self, x): |
| | if self.kernel_size == 1: |
| | return x |
| | pad_l = (self.kernel_size - 1) // 2 |
| | pad_r = self.kernel_size // 2 |
| | return F.pad(x, (pad_l, pad_r, 0, 0, 0, 0)) |
| |
|
| |
|
| | class Encoder(nn.Module): |
| | """Transformer encoder with multi-head attention""" |
| |
|
| | def __init__(self, hidden_channels: int, filter_channels: int, n_heads: int, |
| | n_layers: int, kernel_size: int = 1, p_dropout: float = 0.0, |
| | window_size: int = 10): |
| | super().__init__() |
| | self.hidden_channels = hidden_channels |
| | self.filter_channels = filter_channels |
| | self.n_heads = n_heads |
| | self.n_layers = n_layers |
| | self.kernel_size = kernel_size |
| | self.p_dropout = p_dropout |
| | self.window_size = window_size |
| |
|
| | self.drop = nn.Dropout(p_dropout) |
| | self.attn_layers = nn.ModuleList() |
| | self.norm_layers_1 = nn.ModuleList() |
| | self.ffn_layers = nn.ModuleList() |
| | self.norm_layers_2 = nn.ModuleList() |
| |
|
| | for _ in range(n_layers): |
| | self.attn_layers.append( |
| | MultiHeadAttention( |
| | hidden_channels, hidden_channels, n_heads, |
| | p_dropout=p_dropout, window_size=window_size |
| | ) |
| | ) |
| | self.norm_layers_1.append(LayerNorm(hidden_channels)) |
| | self.ffn_layers.append( |
| | FFN(hidden_channels, hidden_channels, filter_channels, |
| | kernel_size, p_dropout=p_dropout) |
| | ) |
| | self.norm_layers_2.append(LayerNorm(hidden_channels)) |
| |
|
| | def forward(self, x, x_mask): |
| | attn_mask = x_mask.unsqueeze(2) * x_mask.unsqueeze(-1) |
| | x = x * x_mask |
| | for i in range(self.n_layers): |
| | y = self.attn_layers[i](x, x, attn_mask) |
| | y = self.drop(y) |
| | x = self.norm_layers_1[i](x + y) |
| |
|
| | y = self.ffn_layers[i](x, x_mask) |
| | y = self.drop(y) |
| | x = self.norm_layers_2[i](x + y) |
| | x = x * x_mask |
| | return x |
| |
|
| |
|
| | class TextEncoder(nn.Module): |
| | """Text encoder for RVC - encodes phone and pitch embeddings""" |
| |
|
| | def __init__(self, out_channels: int, hidden_channels: int, filter_channels: int, |
| | n_heads: int, n_layers: int, kernel_size: int, p_dropout: float, |
| | f0: bool = True): |
| | super().__init__() |
| | self.out_channels = out_channels |
| | self.hidden_channels = hidden_channels |
| | self.filter_channels = filter_channels |
| | self.n_heads = n_heads |
| | self.n_layers = n_layers |
| | self.kernel_size = kernel_size |
| | self.p_dropout = p_dropout |
| | self.f0 = f0 |
| |
|
| | |
| | self.emb_phone = nn.Linear(768, hidden_channels) |
| |
|
| | |
| | if f0: |
| | self.emb_pitch = nn.Embedding(256, hidden_channels) |
| |
|
| | |
| | self.encoder = Encoder( |
| | hidden_channels, filter_channels, n_heads, n_layers, |
| | kernel_size, p_dropout |
| | ) |
| |
|
| | |
| | self.proj = nn.Conv1d(hidden_channels, out_channels * 2, 1) |
| |
|
| | def forward(self, phone, pitch, lengths): |
| | """ |
| | Args: |
| | phone: [B, 768, T] phone features from HuBERT (channels first) |
| | pitch: [B, T] pitch indices (0-255) |
| | lengths: [B] sequence lengths |
| | |
| | Returns: |
| | m: [B, out_channels, T] mean |
| | logs: [B, out_channels, T] log-variance |
| | x_mask: [B, 1, T] mask |
| | """ |
| | import logging |
| | log = logging.getLogger(__name__) |
| |
|
| | log.debug(f"[TextEncoder] 输入 phone: shape={phone.shape}") |
| | log.debug(f"[TextEncoder] 输入 pitch: shape={pitch.shape}, max={pitch.max().item()}, min={pitch.min().item()}") |
| | log.debug(f"[TextEncoder] 输入 lengths: {lengths}") |
| |
|
| | |
| | phone = phone.transpose(1, 2) |
| | log.debug(f"[TextEncoder] 转置后 phone: shape={phone.shape}") |
| |
|
| | |
| | x_mask = torch.unsqueeze( |
| | self._sequence_mask(lengths, phone.size(1)), 1 |
| | ).to(phone.dtype) |
| | log.debug(f"[TextEncoder] x_mask: shape={x_mask.shape}, sum={x_mask.sum().item()}") |
| |
|
| | |
| | x = self.emb_phone(phone) |
| | log.debug(f"[TextEncoder] emb_phone 输出: shape={x.shape}, max={x.abs().max().item():.4f}, mean={x.abs().mean().item():.4f}") |
| |
|
| | |
| | if self.f0 and pitch is not None: |
| | |
| | pitch_clamped = torch.clamp(pitch, 0, 255) |
| | pitch_emb = self.emb_pitch(pitch_clamped) |
| | log.debug(f"[TextEncoder] emb_pitch 输出: shape={pitch_emb.shape}, max={pitch_emb.abs().max().item():.4f}") |
| | x = x + pitch_emb |
| |
|
| | |
| | x = x.transpose(1, 2) |
| | log.debug(f"[TextEncoder] 转置后 x: shape={x.shape}") |
| |
|
| | |
| | x = x * x_mask |
| |
|
| | |
| | x = self.encoder(x, x_mask) |
| | log.debug(f"[TextEncoder] Transformer 输出: shape={x.shape}, max={x.abs().max().item():.4f}, mean={x.abs().mean().item():.4f}") |
| |
|
| | |
| | stats = self.proj(x) * x_mask |
| | m, logs = torch.split(stats, self.out_channels, dim=1) |
| | log.debug(f"[TextEncoder] 最终输出 m: shape={m.shape}, max={m.abs().max().item():.4f}") |
| | log.debug(f"[TextEncoder] 最终输出 logs: shape={logs.shape}, max={logs.max().item():.4f}, min={logs.min().item():.4f}") |
| |
|
| | return m, logs, x_mask |
| |
|
| | def _sequence_mask(self, length, max_length=None): |
| | if max_length is None: |
| | max_length = length.max() |
| | x = torch.arange(max_length, dtype=length.dtype, device=length.device) |
| | return x.unsqueeze(0) < length.unsqueeze(1) |
| |
|
| |
|
| | class ResidualCouplingBlock(nn.Module): |
| | """残差耦合块""" |
| |
|
| | def __init__(self, channels: int, hidden_channels: int, kernel_size: int, |
| | dilation_rate: int, n_layers: int, n_flows: int = 4, |
| | gin_channels: int = 0): |
| | super().__init__() |
| | self.flows = nn.ModuleList() |
| |
|
| | for _ in range(n_flows): |
| | self.flows.append( |
| | ResidualCouplingLayer( |
| | channels, hidden_channels, kernel_size, |
| | dilation_rate, n_layers, gin_channels=gin_channels |
| | ) |
| | ) |
| | self.flows.append(Flip()) |
| |
|
| | def forward(self, x, x_mask, g=None, reverse=False): |
| | if not reverse: |
| | for flow in self.flows: |
| | x, _ = flow(x, x_mask, g=g, reverse=reverse) |
| | else: |
| | for flow in reversed(self.flows): |
| | x = flow(x, x_mask, g=g, reverse=reverse) |
| | return x |
| |
|
| |
|
| | class ResidualCouplingLayer(nn.Module): |
| | """残差耦合层""" |
| |
|
| | def __init__(self, channels: int, hidden_channels: int, kernel_size: int, |
| | dilation_rate: int, n_layers: int, mean_only: bool = True, |
| | gin_channels: int = 0): |
| | super().__init__() |
| | self.half_channels = channels // 2 |
| | self.mean_only = mean_only |
| |
|
| | self.pre = nn.Conv1d(self.half_channels, hidden_channels, 1) |
| | self.enc = WN(hidden_channels, kernel_size, dilation_rate, n_layers, gin_channels) |
| | self.post = nn.Conv1d(hidden_channels, self.half_channels, 1) |
| | self.post.weight.data.zero_() |
| | self.post.bias.data.zero_() |
| |
|
| | def forward(self, x, x_mask, g=None, reverse=False): |
| | x0, x1 = torch.split(x, [self.half_channels] * 2, dim=1) |
| | h = self.pre(x0) * x_mask |
| | h = self.enc(h, x_mask, g=g) |
| | stats = self.post(h) * x_mask |
| | m = stats |
| |
|
| | if not reverse: |
| | x1 = m + x1 * x_mask |
| | x = torch.cat([x0, x1], dim=1) |
| | return x, None |
| | else: |
| | x1 = (x1 - m) * x_mask |
| | x = torch.cat([x0, x1], dim=1) |
| | return x |
| |
|
| |
|
| | class Flip(nn.Module): |
| | """翻转层""" |
| |
|
| | def forward(self, x, *args, reverse=False, **kwargs): |
| | x = torch.flip(x, [1]) |
| | return x |
| |
|
| |
|
| | class WN(nn.Module): |
| | """WaveNet 风格网络 (带权重归一化)""" |
| |
|
| | def __init__(self, hidden_channels: int, kernel_size: int, |
| | dilation_rate: int, n_layers: int, gin_channels: int = 0, |
| | p_dropout: float = 0): |
| | super().__init__() |
| | self.n_layers = n_layers |
| | self.hidden_channels = hidden_channels |
| | self.gin_channels = gin_channels |
| |
|
| | self.in_layers = nn.ModuleList() |
| | self.res_skip_layers = nn.ModuleList() |
| | self.drop = nn.Dropout(p_dropout) |
| |
|
| | if gin_channels > 0: |
| | self.cond_layer = nn.utils.weight_norm( |
| | nn.Conv1d(gin_channels, 2 * hidden_channels * n_layers, 1) |
| | ) |
| |
|
| | for i in range(n_layers): |
| | dilation = dilation_rate ** i |
| | padding = (kernel_size * dilation - dilation) // 2 |
| | self.in_layers.append( |
| | nn.utils.weight_norm( |
| | nn.Conv1d(hidden_channels, 2 * hidden_channels, kernel_size, |
| | dilation=dilation, padding=padding) |
| | ) |
| | ) |
| | |
| | if i < n_layers - 1: |
| | res_skip_channels = 2 * hidden_channels |
| | else: |
| | res_skip_channels = hidden_channels |
| | self.res_skip_layers.append( |
| | nn.utils.weight_norm( |
| | nn.Conv1d(hidden_channels, res_skip_channels, 1) |
| | ) |
| | ) |
| |
|
| | def forward(self, x, x_mask, g=None): |
| | output = torch.zeros_like(x) |
| |
|
| | if g is not None and self.gin_channels > 0: |
| | g = self.cond_layer(g) |
| |
|
| | for i in range(self.n_layers): |
| | x_in = self.in_layers[i](x) |
| | if g is not None: |
| | cond_offset = i * 2 * self.hidden_channels |
| | g_l = g[:, cond_offset:cond_offset + 2 * self.hidden_channels, :] |
| | x_in = x_in + g_l |
| |
|
| | acts = torch.tanh(x_in[:, :self.hidden_channels]) * torch.sigmoid(x_in[:, self.hidden_channels:]) |
| | acts = self.drop(acts) |
| | res_skip = self.res_skip_layers[i](acts) |
| |
|
| | if i < self.n_layers - 1: |
| | |
| | x = (x + res_skip[:, :self.hidden_channels]) * x_mask |
| | output = output + res_skip[:, self.hidden_channels:] |
| | else: |
| | |
| | x = (x + res_skip) * x_mask |
| | output = output + res_skip |
| |
|
| | return output * x_mask |
| |
|
| |
|
| | class PosteriorEncoder(nn.Module): |
| | """后验编码器""" |
| |
|
| | def __init__(self, in_channels: int, out_channels: int, hidden_channels: int, |
| | kernel_size: int, dilation_rate: int, n_layers: int, |
| | gin_channels: int = 0): |
| | super().__init__() |
| | self.out_channels = out_channels |
| |
|
| | self.pre = nn.Conv1d(in_channels, hidden_channels, 1) |
| | self.enc = WN(hidden_channels, kernel_size, dilation_rate, n_layers, gin_channels) |
| | self.proj = nn.Conv1d(hidden_channels, out_channels * 2, 1) |
| |
|
| | def forward(self, x, x_lengths, g=None): |
| | x_mask = torch.unsqueeze( |
| | self._sequence_mask(x_lengths, x.size(2)), 1 |
| | ).to(x.dtype) |
| |
|
| | x = self.pre(x) * x_mask |
| | x = self.enc(x, x_mask, g=g) |
| | stats = self.proj(x) * x_mask |
| | m, logs = torch.split(stats, self.out_channels, dim=1) |
| | z = (m + torch.randn_like(m) * torch.exp(logs)) * x_mask |
| | return z, m, logs, x_mask |
| |
|
| | def _sequence_mask(self, length, max_length=None): |
| | if max_length is None: |
| | max_length = length.max() |
| | x = torch.arange(max_length, dtype=length.dtype, device=length.device) |
| | return x.unsqueeze(0) < length.unsqueeze(1) |
| |
|
| |
|
| | class Generator(nn.Module): |
| | """NSF-HiFi-GAN 生成器 (带权重归一化)""" |
| |
|
| | def __init__(self, initial_channel: int, resblock_kernel_sizes: list, |
| | resblock_dilation_sizes: list, upsample_rates: list, |
| | upsample_initial_channel: int, upsample_kernel_sizes: list, |
| | gin_channels: int = 0, sr: int = 40000, is_half: bool = False): |
| | super().__init__() |
| | self.num_kernels = len(resblock_kernel_sizes) |
| | self.num_upsamples = len(upsample_rates) |
| | self.sr = sr |
| | self.is_half = is_half |
| |
|
| | |
| | self.upp = int(np.prod(upsample_rates)) |
| |
|
| | self.conv_pre = nn.Conv1d(initial_channel, upsample_initial_channel, 7, 1, 3) |
| |
|
| | |
| | self.m_source = SourceModuleHnNSF(sample_rate=sr, harmonic_num=0) |
| |
|
| | |
| | self.noise_convs = nn.ModuleList() |
| |
|
| | self.ups = nn.ModuleList() |
| | for i, (u, k) in enumerate(zip(upsample_rates, upsample_kernel_sizes)): |
| | c_cur = upsample_initial_channel // (2 ** (i + 1)) |
| | self.ups.append( |
| | nn.utils.weight_norm( |
| | nn.ConvTranspose1d( |
| | upsample_initial_channel // (2 ** i), |
| | c_cur, |
| | k, u, (k - u) // 2 |
| | ) |
| | ) |
| | ) |
| | |
| | if i + 1 < len(upsample_rates): |
| | stride_f0 = int(np.prod(upsample_rates[i + 1:])) |
| | self.noise_convs.append( |
| | nn.Conv1d(1, c_cur, kernel_size=stride_f0 * 2, stride=stride_f0, padding=stride_f0 // 2) |
| | ) |
| | else: |
| | self.noise_convs.append(nn.Conv1d(1, c_cur, kernel_size=1)) |
| |
|
| | self.resblocks = nn.ModuleList() |
| | for i in range(len(self.ups)): |
| | ch = upsample_initial_channel // (2 ** (i + 1)) |
| | for j, (k, d) in enumerate(zip(resblock_kernel_sizes, resblock_dilation_sizes)): |
| | self.resblocks.append(ResBlock(ch, k, d)) |
| |
|
| | self.conv_post = nn.Conv1d(ch, 1, 7, 1, 3, bias=False) |
| |
|
| | if gin_channels > 0: |
| | self.cond = nn.Conv1d(gin_channels, upsample_initial_channel, 1) |
| |
|
| | def forward(self, x, f0, g=None): |
| | import logging |
| | log = logging.getLogger(__name__) |
| |
|
| | log.debug(f"[Generator] 输入 x: shape={x.shape}, max={x.abs().max().item():.4f}, mean={x.abs().mean().item():.4f}") |
| | log.debug(f"[Generator] 输入 f0: shape={f0.shape}, max={f0.max().item():.1f}, min={f0.min().item():.1f}") |
| | if g is not None: |
| | log.debug(f"[Generator] 输入 g: shape={g.shape}, max={g.abs().max().item():.4f}") |
| |
|
| | |
| | har_source, _, _ = self.m_source(f0, self.upp) |
| | har_source = har_source.transpose(1, 2) |
| | log.debug(f"[Generator] NSF har_source: shape={har_source.shape}, max={har_source.abs().max().item():.4f}") |
| |
|
| | x = self.conv_pre(x) |
| | log.debug(f"[Generator] conv_pre 输出: shape={x.shape}, max={x.abs().max().item():.4f}") |
| |
|
| | if g is not None: |
| | x = x + self.cond(g) |
| | log.debug(f"[Generator] 加入条件后: max={x.abs().max().item():.4f}") |
| |
|
| | for i in range(self.num_upsamples): |
| | x = F.leaky_relu(x, 0.1) |
| | x = self.ups[i](x) |
| |
|
| | |
| | x_source = self.noise_convs[i](har_source) |
| | x = x + x_source |
| |
|
| | xs = None |
| | for j in range(self.num_kernels): |
| | if xs is None: |
| | xs = self.resblocks[i * self.num_kernels + j](x) |
| | else: |
| | xs += self.resblocks[i * self.num_kernels + j](x) |
| | x = xs / self.num_kernels |
| | log.debug(f"[Generator] 上采样层 {i}: shape={x.shape}, max={x.abs().max().item():.4f}") |
| |
|
| | x = F.leaky_relu(x) |
| | x = self.conv_post(x) |
| | log.debug(f"[Generator] conv_post 输出: shape={x.shape}, max={x.abs().max().item():.4f}") |
| | x = torch.tanh(x) |
| | log.debug(f"[Generator] tanh 输出: shape={x.shape}, max={x.abs().max().item():.4f}") |
| |
|
| | return x |
| |
|
| | def remove_weight_norm(self): |
| | for l in self.ups: |
| | nn.utils.remove_weight_norm(l) |
| | for l in self.resblocks: |
| | l.remove_weight_norm() |
| |
|
| |
|
| | class ResBlock(nn.Module): |
| | """残差块 (带权重归一化)""" |
| |
|
| | def __init__(self, channels: int, kernel_size: int = 3, dilation: tuple = (1, 3, 5)): |
| | super().__init__() |
| | self.convs1 = nn.ModuleList([ |
| | nn.utils.weight_norm( |
| | nn.Conv1d(channels, channels, kernel_size, 1, |
| | (kernel_size * d - d) // 2, dilation=d) |
| | ) |
| | for d in dilation |
| | ]) |
| | self.convs2 = nn.ModuleList([ |
| | nn.utils.weight_norm( |
| | nn.Conv1d(channels, channels, kernel_size, 1, |
| | (kernel_size - 1) // 2) |
| | ) |
| | for _ in dilation |
| | ]) |
| |
|
| | def forward(self, x): |
| | for c1, c2 in zip(self.convs1, self.convs2): |
| | xt = F.leaky_relu(x, 0.1) |
| | xt = c1(xt) |
| | xt = F.leaky_relu(xt, 0.1) |
| | xt = c2(xt) |
| | x = xt + x |
| | return x |
| |
|
| | def remove_weight_norm(self): |
| | for l in self.convs1: |
| | nn.utils.remove_weight_norm(l) |
| | for l in self.convs2: |
| | nn.utils.remove_weight_norm(l) |
| |
|
| |
|
| | class SineGenerator(nn.Module): |
| | """正弦波生成器 - NSF 的核心组件""" |
| |
|
| | def __init__(self, sample_rate: int, harmonic_num: int = 0, |
| | sine_amp: float = 0.1, noise_std: float = 0.003, |
| | voiced_threshold: float = 10): |
| | super().__init__() |
| | self.sample_rate = sample_rate |
| | self.harmonic_num = harmonic_num |
| | self.sine_amp = sine_amp |
| | self.noise_std = noise_std |
| | self.voiced_threshold = voiced_threshold |
| | self.dim = harmonic_num + 1 |
| |
|
| | def forward(self, f0: torch.Tensor, upp: int): |
| | """ |
| | 生成正弦波激励信号 |
| | |
| | Args: |
| | f0: 基频张量 [B, T] |
| | upp: 上采样因子 |
| | |
| | Returns: |
| | 正弦波信号 [B, T*upp, 1] |
| | """ |
| | with torch.no_grad(): |
| | |
| | f0 = f0.unsqueeze(1) |
| | f0_up = F.interpolate(f0, scale_factor=upp, mode='nearest') |
| | f0_up = f0_up.transpose(1, 2) |
| |
|
| | |
| | rad = f0_up / self.sample_rate |
| | rad_acc = torch.cumsum(rad, dim=1) % 1 |
| | sine_wave = torch.sin(2 * np.pi * rad_acc) * self.sine_amp |
| |
|
| | |
| | voiced_mask = (f0_up > self.voiced_threshold).float() |
| | noise = torch.randn_like(sine_wave) * self.noise_std |
| | sine_wave = sine_wave * voiced_mask + noise * (1 - voiced_mask) |
| |
|
| | return sine_wave |
| |
|
| |
|
| | class SourceModuleHnNSF(nn.Module): |
| | """谐波加噪声源模块""" |
| |
|
| | def __init__(self, sample_rate: int, harmonic_num: int = 0, |
| | sine_amp: float = 0.1, noise_std: float = 0.003, |
| | add_noise_std: float = 0.003): |
| | super().__init__() |
| | self.sine_generator = SineGenerator( |
| | sample_rate, harmonic_num, sine_amp, noise_std |
| | ) |
| | self.l_linear = nn.Linear(harmonic_num + 1, 1) |
| | self.l_tanh = nn.Tanh() |
| |
|
| | def forward(self, f0: torch.Tensor, upp: int): |
| | sine = self.sine_generator(f0, upp) |
| | sine = self.l_tanh(self.l_linear(sine)) |
| | noise = torch.randn_like(sine) * 0.003 |
| | return sine, noise, None |
| |
|
| |
|
| | class SynthesizerTrnMs768NSFsid(nn.Module): |
| | """RVC v2 合成器 (768 维 HuBERT + NSF + SID)""" |
| |
|
| | def __init__(self, spec_channels: int, segment_size: int, |
| | inter_channels: int, hidden_channels: int, filter_channels: int, |
| | n_heads: int, n_layers: int, kernel_size: int, p_dropout: float, |
| | resblock: str, resblock_kernel_sizes: list, |
| | resblock_dilation_sizes: list, upsample_rates: list, |
| | upsample_initial_channel: int, upsample_kernel_sizes: list, |
| | spk_embed_dim: int, gin_channels: int, sr: int): |
| | super().__init__() |
| |
|
| | self.spec_channels = spec_channels |
| | self.inter_channels = inter_channels |
| | self.hidden_channels = hidden_channels |
| | self.filter_channels = filter_channels |
| | self.n_heads = n_heads |
| | self.n_layers = n_layers |
| | self.kernel_size = kernel_size |
| | self.p_dropout = p_dropout |
| | self.resblock = resblock |
| | self.resblock_kernel_sizes = resblock_kernel_sizes |
| | self.resblock_dilation_sizes = resblock_dilation_sizes |
| | self.upsample_rates = upsample_rates |
| | self.upsample_initial_channel = upsample_initial_channel |
| | self.upsample_kernel_sizes = upsample_kernel_sizes |
| | self.segment_size = segment_size |
| | self.gin_channels = gin_channels |
| | self.spk_embed_dim = spk_embed_dim |
| | self.sr = sr |
| |
|
| | |
| | self.enc_p = TextEncoder( |
| | inter_channels, hidden_channels, filter_channels, |
| | n_heads, n_layers, kernel_size, p_dropout, f0=True |
| | ) |
| |
|
| | |
| | self.dec = Generator( |
| | inter_channels, resblock_kernel_sizes, resblock_dilation_sizes, |
| | upsample_rates, upsample_initial_channel, upsample_kernel_sizes, |
| | gin_channels, sr=sr |
| | ) |
| |
|
| | |
| | self.flow = ResidualCouplingBlock( |
| | inter_channels, hidden_channels, 5, 1, 3, gin_channels=gin_channels |
| | ) |
| |
|
| | |
| | self.emb_g = nn.Embedding(spk_embed_dim, gin_channels) |
| |
|
| | def forward(self, phone, phone_lengths, pitch, nsff0, sid, skip_head=0, return_length=0): |
| | """前向传播""" |
| | g = self.emb_g(sid).unsqueeze(-1) |
| |
|
| | |
| | m_p, logs_p, x_mask = self.enc_p(phone, pitch, phone_lengths) |
| |
|
| | |
| | z_p = (m_p + torch.exp(logs_p) * torch.randn_like(m_p) * 0.66666) * x_mask |
| |
|
| | |
| | z = self.flow(z_p, x_mask, g=g) |
| |
|
| | |
| | o = self.dec(z, nsff0, g=g) |
| |
|
| | return o |
| |
|
| | def infer(self, phone, phone_lengths, pitch, nsff0, sid, rate=1.0): |
| | """推理""" |
| | import logging |
| | log = logging.getLogger(__name__) |
| |
|
| | log.debug(f"[infer] 输入 phone: shape={phone.shape}, dtype={phone.dtype}") |
| | log.debug(f"[infer] 输入 phone 统计: max={phone.abs().max().item():.4f}, mean={phone.abs().mean().item():.4f}") |
| | log.debug(f"[infer] 输入 phone_lengths: {phone_lengths}") |
| | log.debug(f"[infer] 输入 pitch: shape={pitch.shape}, max={pitch.max().item()}, min={pitch.min().item()}") |
| | log.debug(f"[infer] 输入 nsff0: shape={nsff0.shape}, max={nsff0.max().item():.1f}, min={nsff0.min().item():.1f}") |
| | log.debug(f"[infer] 输入 sid: {sid}") |
| |
|
| | g = self.emb_g(sid).unsqueeze(-1) |
| | log.debug(f"[infer] 说话人嵌入 g: shape={g.shape}, max={g.abs().max().item():.4f}") |
| |
|
| | |
| | m_p, logs_p, x_mask = self.enc_p(phone, pitch, phone_lengths) |
| | log.debug(f"[infer] TextEncoder 输出:") |
| | log.debug(f"[infer] m_p: shape={m_p.shape}, max={m_p.abs().max().item():.4f}, mean={m_p.abs().mean().item():.4f}") |
| | log.debug(f"[infer] logs_p: shape={logs_p.shape}, max={logs_p.max().item():.4f}, min={logs_p.min().item():.4f}") |
| | log.debug(f"[infer] x_mask: shape={x_mask.shape}, sum={x_mask.sum().item()}") |
| |
|
| | |
| | z_p = (m_p + torch.exp(logs_p) * torch.randn_like(m_p) * 0.66666) * x_mask |
| | log.debug(f"[infer] 采样后 z_p: shape={z_p.shape}, max={z_p.abs().max().item():.4f}, mean={z_p.abs().mean().item():.4f}") |
| |
|
| | |
| | z = self.flow(z_p, x_mask, g=g, reverse=True) |
| | log.debug(f"[infer] Flow 输出 z: shape={z.shape}, max={z.abs().max().item():.4f}, mean={z.abs().mean().item():.4f}") |
| |
|
| | |
| | o = self.dec(z * x_mask, nsff0, g=g) |
| | log.debug(f"[infer] Generator 输出 o: shape={o.shape}, max={o.abs().max().item():.4f}, mean={o.abs().mean().item():.4f}") |
| |
|
| | return o, x_mask |
| |
|