| import json |
| import logging |
| import os |
| from pathlib import Path |
| from typing import Any, Mapping, Optional, Union |
|
|
| import librosa |
| import numpy as np |
| import numpy.typing as npt |
| import torch |
| from coqpit import Coqpit |
| from torch import nn |
| from torch.nn import functional as F |
| from trainer.io import load_fsspec |
|
|
| from TTS.tts.layers.vits.networks import PosteriorEncoder |
| from TTS.tts.utils.speakers import SpeakerManager |
| from TTS.utils.audio.torch_transforms import wav_to_spec |
| from TTS.vc.configs.openvoice_config import OpenVoiceConfig |
| from TTS.vc.models.base_vc import BaseVC |
| from TTS.vc.models.freevc import Generator, ResidualCouplingBlock |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class ReferenceEncoder(nn.Module): |
| """NN module creating a fixed size prosody embedding from a spectrogram. |
| |
| inputs: mel spectrograms [batch_size, num_spec_frames, num_mel] |
| outputs: [batch_size, embedding_dim] |
| """ |
|
|
| def __init__(self, spec_channels: int, embedding_dim: int = 0, layernorm: bool = True) -> None: |
| super().__init__() |
| self.spec_channels = spec_channels |
| ref_enc_filters = [32, 32, 64, 64, 128, 128] |
| K = len(ref_enc_filters) |
| filters = [1] + ref_enc_filters |
| convs = [ |
| torch.nn.utils.parametrizations.weight_norm( |
| nn.Conv2d( |
| in_channels=filters[i], |
| out_channels=filters[i + 1], |
| kernel_size=(3, 3), |
| stride=(2, 2), |
| padding=(1, 1), |
| ) |
| ) |
| for i in range(K) |
| ] |
| self.convs = nn.ModuleList(convs) |
|
|
| out_channels = self.calculate_channels(spec_channels, 3, 2, 1, K) |
| self.gru = nn.GRU( |
| input_size=ref_enc_filters[-1] * out_channels, |
| hidden_size=256 // 2, |
| batch_first=True, |
| ) |
| self.proj = nn.Linear(128, embedding_dim) |
| self.layernorm = nn.LayerNorm(self.spec_channels) if layernorm else None |
|
|
| def forward(self, inputs: torch.Tensor) -> torch.Tensor: |
| N = inputs.size(0) |
|
|
| out = inputs.view(N, 1, -1, self.spec_channels) |
| if self.layernorm is not None: |
| out = self.layernorm(out) |
|
|
| for conv in self.convs: |
| out = conv(out) |
| out = F.relu(out) |
|
|
| out = out.transpose(1, 2) |
| T = out.size(1) |
| N = out.size(0) |
| out = out.contiguous().view(N, T, -1) |
|
|
| self.gru.flatten_parameters() |
| _memory, out = self.gru(out) |
|
|
| return self.proj(out.squeeze(0)) |
|
|
| def calculate_channels(self, L: int, kernel_size: int, stride: int, pad: int, n_convs: int) -> int: |
| for _ in range(n_convs): |
| L = (L - kernel_size + 2 * pad) // stride + 1 |
| return L |
|
|
|
|
| class OpenVoice(BaseVC): |
| """ |
| OpenVoice voice conversion model (inference only). |
| |
| Source: https://github.com/myshell-ai/OpenVoice |
| Paper: https://arxiv.org/abs/2312.01479 |
| |
| Paper abstract: |
| We introduce OpenVoice, a versatile voice cloning approach that requires |
| only a short audio clip from the reference speaker to replicate their voice and |
| generate speech in multiple languages. OpenVoice represents a significant |
| advancement in addressing the following open challenges in the field: 1) |
| Flexible Voice Style Control. OpenVoice enables granular control over voice |
| styles, including emotion, accent, rhythm, pauses, and intonation, in addition |
| to replicating the tone color of the reference speaker. The voice styles are not |
| directly copied from and constrained by the style of the reference speaker. |
| Previous approaches lacked the ability to flexibly manipulate voice styles after |
| cloning. 2) Zero-Shot Cross-Lingual Voice Cloning. OpenVoice achieves zero-shot |
| cross-lingual voice cloning for languages not included in the massive-speaker |
| training set. Unlike previous approaches, which typically require extensive |
| massive-speaker multi-lingual (MSML) dataset for all languages, OpenVoice can |
| clone voices into a new language without any massive-speaker training data for |
| that language. OpenVoice is also computationally efficient, costing tens of |
| times less than commercially available APIs that offer even inferior |
| performance. To foster further research in the field, we have made the source |
| code and trained model publicly accessible. We also provide qualitative results |
| in our demo website. Prior to its public release, our internal version of |
| OpenVoice was used tens of millions of times by users worldwide between May and |
| October 2023, serving as the backend of MyShell. |
| """ |
|
|
| def __init__(self, config: Coqpit, speaker_manager: Optional[SpeakerManager] = None) -> None: |
| super().__init__(config, None, speaker_manager, None) |
|
|
| self.init_multispeaker(config) |
|
|
| self.zero_g = self.args.zero_g |
| self.inter_channels = self.args.inter_channels |
| self.hidden_channels = self.args.hidden_channels |
| self.filter_channels = self.args.filter_channels |
| self.n_heads = self.args.n_heads |
| self.n_layers = self.args.n_layers |
| self.kernel_size = self.args.kernel_size |
| self.p_dropout = self.args.p_dropout |
| self.resblock = self.args.resblock |
| self.resblock_kernel_sizes = self.args.resblock_kernel_sizes |
| self.resblock_dilation_sizes = self.args.resblock_dilation_sizes |
| self.upsample_rates = self.args.upsample_rates |
| self.upsample_initial_channel = self.args.upsample_initial_channel |
| self.upsample_kernel_sizes = self.args.upsample_kernel_sizes |
| self.n_layers_q = self.args.n_layers_q |
| self.use_spectral_norm = self.args.use_spectral_norm |
| self.gin_channels = self.args.gin_channels |
| self.tau = self.args.tau |
|
|
| self.spec_channels = config.audio.fft_size // 2 + 1 |
|
|
| self.dec = Generator( |
| self.inter_channels, |
| self.resblock, |
| self.resblock_kernel_sizes, |
| self.resblock_dilation_sizes, |
| self.upsample_rates, |
| self.upsample_initial_channel, |
| self.upsample_kernel_sizes, |
| gin_channels=self.gin_channels, |
| ) |
| self.enc_q = PosteriorEncoder( |
| self.spec_channels, |
| self.inter_channels, |
| self.hidden_channels, |
| kernel_size=5, |
| dilation_rate=1, |
| num_layers=16, |
| cond_channels=self.gin_channels, |
| ) |
|
|
| self.flow = ResidualCouplingBlock( |
| self.inter_channels, |
| self.hidden_channels, |
| kernel_size=5, |
| dilation_rate=1, |
| n_layers=4, |
| gin_channels=self.gin_channels, |
| ) |
|
|
| self.ref_enc = ReferenceEncoder(self.spec_channels, self.gin_channels) |
|
|
| @property |
| def device(self) -> torch.device: |
| return next(self.parameters()).device |
|
|
| @staticmethod |
| def init_from_config(config: OpenVoiceConfig) -> "OpenVoice": |
| return OpenVoice(config) |
|
|
| def init_multispeaker(self, config: Coqpit, data: Optional[list[Any]] = None) -> None: |
| """Initialize multi-speaker modules of a model. A model can be trained either with a speaker embedding layer |
| or with external `d_vectors` computed from a speaker encoder model. |
| |
| You must provide a `speaker_manager` at initialization to set up the multi-speaker modules. |
| |
| Args: |
| config (Coqpit): Model configuration. |
| data (list, optional): Dataset items to infer number of speakers. Defaults to None. |
| """ |
| self.num_spks = config.num_speakers |
| if self.speaker_manager: |
| self.num_spks = self.speaker_manager.num_speakers |
|
|
| def load_checkpoint( |
| self, |
| config: OpenVoiceConfig, |
| checkpoint_path: Union[str, os.PathLike[Any]], |
| eval: bool = False, |
| strict: bool = True, |
| cache: bool = False, |
| ) -> None: |
| """Map from OpenVoice's config structure.""" |
| config_path = Path(checkpoint_path).parent / "config.json" |
| with open(config_path, encoding="utf-8") as f: |
| config_org = json.load(f) |
| self.config.audio.input_sample_rate = config_org["data"]["sampling_rate"] |
| self.config.audio.output_sample_rate = config_org["data"]["sampling_rate"] |
| self.config.audio.fft_size = config_org["data"]["filter_length"] |
| self.config.audio.hop_length = config_org["data"]["hop_length"] |
| self.config.audio.win_length = config_org["data"]["win_length"] |
| state = load_fsspec(str(checkpoint_path), map_location=torch.device("cpu"), cache=cache) |
| self.load_state_dict(state["model"], strict=strict) |
| if eval: |
| self.eval() |
|
|
| def forward(self) -> None: ... |
| def train_step(self) -> None: ... |
| def eval_step(self) -> None: ... |
|
|
| @staticmethod |
| def _set_x_lengths(x: torch.Tensor, aux_input: Mapping[str, Optional[torch.Tensor]]) -> torch.Tensor: |
| if "x_lengths" in aux_input and aux_input["x_lengths"] is not None: |
| return aux_input["x_lengths"] |
| return torch.tensor(x.shape[1:2]).to(x.device) |
|
|
| @torch.inference_mode() |
| def inference( |
| self, |
| x: torch.Tensor, |
| aux_input: Mapping[str, Optional[torch.Tensor]] = {"x_lengths": None, "g_src": None, "g_tgt": None}, |
| ) -> dict[str, torch.Tensor]: |
| """ |
| Inference pass of the model |
| |
| Args: |
| x (torch.Tensor): Input tensor. Shape: (batch_size, c_seq_len). |
| x_lengths (torch.Tensor): Lengths of the input tensor. Shape: (batch_size,). |
| g_src (torch.Tensor): Source speaker embedding tensor. Shape: (batch_size, spk_emb_dim). |
| g_tgt (torch.Tensor): Target speaker embedding tensor. Shape: (batch_size, spk_emb_dim). |
| |
| Returns: |
| o_hat: Output spectrogram tensor. Shape: (batch_size, spec_seq_len, spec_dim). |
| x_mask: Spectrogram mask. Shape: (batch_size, spec_seq_len). |
| (z, z_p, z_hat): A tuple of latent variables. |
| """ |
| x_lengths = self._set_x_lengths(x, aux_input) |
| if "g_src" in aux_input and aux_input["g_src"] is not None: |
| g_src = aux_input["g_src"] |
| else: |
| raise ValueError("aux_input must define g_src") |
| if "g_tgt" in aux_input and aux_input["g_tgt"] is not None: |
| g_tgt = aux_input["g_tgt"] |
| else: |
| raise ValueError("aux_input must define g_tgt") |
| z, _m_q, _logs_q, y_mask = self.enc_q( |
| x, x_lengths, g=g_src if not self.zero_g else torch.zeros_like(g_src), tau=self.tau |
| ) |
| z_p = self.flow(z, y_mask, g=g_src) |
| z_hat = self.flow(z_p, y_mask, g=g_tgt, reverse=True) |
| o_hat = self.dec(z_hat * y_mask, g=g_tgt if not self.zero_g else torch.zeros_like(g_tgt)) |
| return { |
| "model_outputs": o_hat, |
| "y_mask": y_mask, |
| "z": z, |
| "z_p": z_p, |
| "z_hat": z_hat, |
| } |
|
|
| def load_audio(self, wav: Union[str, npt.NDArray[np.float32], torch.Tensor, list[float]]) -> torch.Tensor: |
| """Read and format the input audio.""" |
| if isinstance(wav, str): |
| out = torch.from_numpy(librosa.load(wav, sr=self.config.audio.input_sample_rate)[0]) |
| elif isinstance(wav, np.ndarray): |
| out = torch.from_numpy(wav) |
| elif isinstance(wav, list): |
| out = torch.from_numpy(np.array(wav)) |
| else: |
| out = wav |
| return out.to(self.device).float() |
|
|
| def extract_se(self, audio: Union[str, torch.Tensor]) -> tuple[torch.Tensor, torch.Tensor]: |
| y = self.load_audio(audio) |
| y = y.to(self.device) |
| y = y.unsqueeze(0) |
| spec = wav_to_spec( |
| y, |
| n_fft=self.config.audio.fft_size, |
| hop_length=self.config.audio.hop_length, |
| win_length=self.config.audio.win_length, |
| center=False, |
| ).to(self.device) |
| with torch.no_grad(): |
| g = self.ref_enc(spec.transpose(1, 2)).unsqueeze(-1) |
|
|
| return g, spec |
|
|
| @torch.inference_mode() |
| def voice_conversion(self, src: Union[str, torch.Tensor], tgt: Union[str, torch.Tensor]) -> npt.NDArray[np.float32]: |
| """ |
| Voice conversion pass of the model. |
| |
| Args: |
| src (str or torch.Tensor): Source utterance. |
| tgt (str or torch.Tensor): Target utterance. |
| |
| Returns: |
| Output numpy array. |
| """ |
| src_se, src_spec = self.extract_se(src) |
| tgt_se, _ = self.extract_se(tgt) |
|
|
| aux_input = {"g_src": src_se, "g_tgt": tgt_se} |
| audio = self.inference(src_spec, aux_input) |
| return audio["model_outputs"][0, 0].data.cpu().float().numpy() |
|
|