Spaces:
Running
Running
| """ | |
| VoiceFixer + FreeVC: Clean & Convert Voice | |
| All code consolidated into single file for cleaner deployment. | |
| """ | |
| import os | |
| import math | |
| import json | |
| import struct | |
| import tempfile | |
| import logging | |
| from pathlib import Path | |
| from typing import Optional, Union, List | |
| from time import perf_counter as timer | |
| import gradio as gr | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| from torch.nn import Conv1d, ConvTranspose1d | |
| from torch.nn.utils import weight_norm, remove_weight_norm | |
| import numpy as np | |
| import librosa | |
| import webrtcvad | |
| from scipy.io.wavfile import write as wav_write | |
| from scipy.ndimage import binary_dilation | |
| from transformers import WavLMModel | |
| from huggingface_hub import hf_hub_download | |
| # Force CPU | |
| os.environ["CUDA_VISIBLE_DEVICES"] = "" | |
| device = torch.device("cpu") | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # ============================================================ | |
| # CONFIG (instead of JSON file) | |
| # ============================================================ | |
| FREEVC_CONFIG = { | |
| "data": { | |
| "sampling_rate": 16000, | |
| "filter_length": 1280, | |
| "hop_length": 320, | |
| }, | |
| "train": { | |
| "segment_size": 8640, | |
| }, | |
| "model": { | |
| "inter_channels": 192, | |
| "hidden_channels": 192, | |
| "filter_channels": 768, | |
| "n_heads": 2, | |
| "n_layers": 6, | |
| "kernel_size": 3, | |
| "p_dropout": 0.1, | |
| "resblock": "1", | |
| "resblock_kernel_sizes": [3, 7, 11], | |
| "resblock_dilation_sizes": [[1, 3, 5], [1, 3, 5], [1, 3, 5]], | |
| "upsample_rates": [10, 6, 4, 2], | |
| "upsample_initial_channel": 512, | |
| "upsample_kernel_sizes": [16, 16, 4, 4], | |
| "gin_channels": 256, | |
| "ssl_dim": 1024, | |
| "use_spk": True | |
| } | |
| } | |
| # Speaker encoder params | |
| SE_SAMPLING_RATE = 16000 | |
| SE_MEL_WINDOW_LENGTH = 25 | |
| SE_MEL_WINDOW_STEP = 10 | |
| SE_MEL_N_CHANNELS = 40 | |
| SE_PARTIALS_N_FRAMES = 160 | |
| SE_VAD_WINDOW_LENGTH = 30 | |
| SE_VAD_MOVING_AVG_WIDTH = 8 | |
| SE_VAD_MAX_SILENCE_LENGTH = 6 | |
| SE_AUDIO_NORM_TARGET_DBFS = -30 | |
| SE_MODEL_HIDDEN_SIZE = 256 | |
| SE_MODEL_EMBEDDING_SIZE = 256 | |
| SE_MODEL_NUM_LAYERS = 3 | |
| # ============================================================ | |
| # UTILS | |
| # ============================================================ | |
| class HParams: | |
| """Hyperparameters container.""" | |
| def __init__(self, **kwargs): | |
| for k, v in kwargs.items(): | |
| if isinstance(v, dict): | |
| v = HParams(**v) | |
| setattr(self, k, v) | |
| def __getitem__(self, key): | |
| return getattr(self, key) | |
| def get_hparams(): | |
| """Get hyperparameters from config dict.""" | |
| return HParams(**FREEVC_CONFIG) | |
| def load_checkpoint(checkpoint_path, model): | |
| """Load model checkpoint.""" | |
| checkpoint_dict = torch.load(checkpoint_path, map_location='cpu') | |
| saved_state_dict = checkpoint_dict['model'] | |
| state_dict = model.state_dict() | |
| new_state_dict = {} | |
| for k, v in state_dict.items(): | |
| if k in saved_state_dict: | |
| new_state_dict[k] = saved_state_dict[k] | |
| else: | |
| new_state_dict[k] = v | |
| model.load_state_dict(new_state_dict) | |
| return model | |
| # ============================================================ | |
| # COMMONS (helper functions) | |
| # ============================================================ | |
| def init_weights(m, mean=0.0, std=0.01): | |
| if m.__class__.__name__.find("Conv") != -1: | |
| m.weight.data.normal_(mean, std) | |
| def get_padding(kernel_size, dilation=1): | |
| return int((kernel_size * dilation - dilation) / 2) | |
| def sequence_mask(length, max_length=None): | |
| if max_length is None: | |
| max_length = length.max() | |
| x = torch.arange(max_length, dtype=length.dtype, device=length.device) | |
| return x.unsqueeze(0) < length.unsqueeze(1) | |
| def rand_slice_segments(x, x_lengths=None, segment_size=4): | |
| b, d, t = x.size() | |
| if x_lengths is None: | |
| x_lengths = t | |
| ids_str_max = x_lengths - segment_size + 1 | |
| ids_str = (torch.rand([b]).to(device=x.device) * ids_str_max).to(dtype=torch.long) | |
| ret = slice_segments(x, ids_str, segment_size) | |
| return ret, ids_str | |
| def slice_segments(x, ids_str, segment_size=4): | |
| ret = torch.zeros_like(x[:, :, :segment_size]) | |
| for i in range(x.size(0)): | |
| idx_str = ids_str[i] | |
| idx_end = idx_str + segment_size | |
| ret[i] = x[i, :, idx_str:idx_end] | |
| return ret | |
| def fused_add_tanh_sigmoid_multiply(input_a, input_b, n_channels): | |
| n_channels_int = n_channels[0] | |
| in_act = input_a + input_b | |
| t_act = torch.tanh(in_act[:, :n_channels_int, :]) | |
| s_act = torch.sigmoid(in_act[:, n_channels_int:, :]) | |
| return t_act * s_act | |
| # ============================================================ | |
| # MODULES (neural network building blocks) | |
| # ============================================================ | |
| LRELU_SLOPE = 0.1 | |
| class LayerNorm(nn.Module): | |
| def __init__(self, channels, eps=1e-5): | |
| super().__init__() | |
| self.channels = channels | |
| self.eps = eps | |
| self.gamma = nn.Parameter(torch.ones(channels)) | |
| self.beta = nn.Parameter(torch.zeros(channels)) | |
| def forward(self, x): | |
| x = x.transpose(1, -1) | |
| x = F.layer_norm(x, (self.channels,), self.gamma, self.beta, self.eps) | |
| return x.transpose(1, -1) | |
| class WN(nn.Module): | |
| def __init__(self, hidden_channels, kernel_size, dilation_rate, n_layers, gin_channels=0, p_dropout=0): | |
| super().__init__() | |
| self.hidden_channels = hidden_channels | |
| self.n_layers = n_layers | |
| self.gin_channels = gin_channels | |
| self.in_layers = nn.ModuleList() | |
| self.res_skip_layers = nn.ModuleList() | |
| self.drop = nn.Dropout(p_dropout) | |
| if gin_channels != 0: | |
| cond_layer = nn.Conv1d(gin_channels, 2 * hidden_channels * n_layers, 1) | |
| self.cond_layer = weight_norm(cond_layer, name='weight') | |
| for i in range(n_layers): | |
| dilation = dilation_rate ** i | |
| padding = int((kernel_size * dilation - dilation) / 2) | |
| in_layer = weight_norm(nn.Conv1d(hidden_channels, 2 * hidden_channels, kernel_size, | |
| dilation=dilation, padding=padding), name='weight') | |
| self.in_layers.append(in_layer) | |
| res_skip_channels = 2 * hidden_channels if i < n_layers - 1 else hidden_channels | |
| res_skip_layer = weight_norm(nn.Conv1d(hidden_channels, res_skip_channels, 1), name='weight') | |
| self.res_skip_layers.append(res_skip_layer) | |
| def forward(self, x, x_mask, g=None, **kwargs): | |
| output = torch.zeros_like(x) | |
| n_channels_tensor = torch.IntTensor([self.hidden_channels]) | |
| if g is not None: | |
| g = self.cond_layer(g) | |
| for i in range(self.n_layers): | |
| x_in = self.in_layers[i](x) | |
| g_l = g[:, i * 2 * self.hidden_channels:(i + 1) * 2 * self.hidden_channels, :] if g is not None else torch.zeros_like(x_in) | |
| acts = fused_add_tanh_sigmoid_multiply(x_in, g_l, n_channels_tensor) | |
| acts = self.drop(acts) | |
| res_skip_acts = self.res_skip_layers[i](acts) | |
| if i < self.n_layers - 1: | |
| x = (x + res_skip_acts[:, :self.hidden_channels, :]) * x_mask | |
| output = output + res_skip_acts[:, self.hidden_channels:, :] | |
| else: | |
| output = output + res_skip_acts | |
| return output * x_mask | |
| class ResBlock1(nn.Module): | |
| def __init__(self, channels, kernel_size=3, dilation=(1, 3, 5)): | |
| super().__init__() | |
| self.convs1 = nn.ModuleList([ | |
| weight_norm(Conv1d(channels, channels, kernel_size, 1, dilation=d, padding=get_padding(kernel_size, d))) | |
| for d in dilation | |
| ]) | |
| self.convs1.apply(init_weights) | |
| self.convs2 = nn.ModuleList([ | |
| weight_norm(Conv1d(channels, channels, kernel_size, 1, dilation=1, padding=get_padding(kernel_size, 1))) | |
| for _ in dilation | |
| ]) | |
| self.convs2.apply(init_weights) | |
| def forward(self, x, x_mask=None): | |
| for c1, c2 in zip(self.convs1, self.convs2): | |
| xt = F.leaky_relu(x, LRELU_SLOPE) | |
| if x_mask is not None: | |
| xt = xt * x_mask | |
| xt = c1(xt) | |
| xt = F.leaky_relu(xt, LRELU_SLOPE) | |
| if x_mask is not None: | |
| xt = xt * x_mask | |
| xt = c2(xt) | |
| x = xt + x | |
| return x * x_mask if x_mask is not None else x | |
| class ResBlock2(nn.Module): | |
| def __init__(self, channels, kernel_size=3, dilation=(1, 3)): | |
| super().__init__() | |
| self.convs = nn.ModuleList([ | |
| weight_norm(Conv1d(channels, channels, kernel_size, 1, dilation=d, padding=get_padding(kernel_size, d))) | |
| for d in dilation | |
| ]) | |
| self.convs.apply(init_weights) | |
| def forward(self, x, x_mask=None): | |
| for c in self.convs: | |
| xt = F.leaky_relu(x, LRELU_SLOPE) | |
| if x_mask is not None: | |
| xt = xt * x_mask | |
| xt = c(xt) | |
| x = xt + x | |
| return x * x_mask if x_mask is not None else x | |
| class Flip(nn.Module): | |
| def forward(self, x, *args, reverse=False, **kwargs): | |
| x = torch.flip(x, [1]) | |
| return x if reverse else (x, torch.zeros(x.size(0)).to(x)) | |
| class ResidualCouplingLayer(nn.Module): | |
| def __init__(self, channels, hidden_channels, kernel_size, dilation_rate, n_layers, gin_channels=0, mean_only=False): | |
| super().__init__() | |
| self.half_channels = channels // 2 | |
| self.mean_only = mean_only | |
| self.pre = nn.Conv1d(self.half_channels, hidden_channels, 1) | |
| self.enc = WN(hidden_channels, kernel_size, dilation_rate, n_layers, gin_channels=gin_channels) | |
| self.post = nn.Conv1d(hidden_channels, self.half_channels * (2 - mean_only), 1) | |
| self.post.weight.data.zero_() | |
| self.post.bias.data.zero_() | |
| def forward(self, x, x_mask, g=None, reverse=False): | |
| x0, x1 = torch.split(x, [self.half_channels] * 2, 1) | |
| h = self.pre(x0) * x_mask | |
| h = self.enc(h, x_mask, g=g) | |
| stats = self.post(h) * x_mask | |
| m = stats if self.mean_only else stats[:, :self.half_channels] | |
| logs = torch.zeros_like(m) if self.mean_only else stats[:, self.half_channels:] | |
| if not reverse: | |
| x1 = m + x1 * torch.exp(logs) * x_mask | |
| return torch.cat([x0, x1], 1), torch.sum(logs, [1, 2]) | |
| else: | |
| x1 = (x1 - m) * torch.exp(-logs) * x_mask | |
| return torch.cat([x0, x1], 1) | |
| # ============================================================ | |
| # MODELS (FreeVC architecture) | |
| # ============================================================ | |
| class ResidualCouplingBlock(nn.Module): | |
| def __init__(self, channels, hidden_channels, kernel_size, dilation_rate, n_layers, n_flows=4, gin_channels=0): | |
| super().__init__() | |
| self.flows = nn.ModuleList() | |
| for _ in range(n_flows): | |
| self.flows.append(ResidualCouplingLayer(channels, hidden_channels, kernel_size, dilation_rate, n_layers, gin_channels=gin_channels, mean_only=True)) | |
| self.flows.append(Flip()) | |
| def forward(self, x, x_mask, g=None, reverse=False): | |
| if not reverse: | |
| for flow in self.flows: | |
| x, _ = flow(x, x_mask, g=g, reverse=reverse) | |
| else: | |
| for flow in reversed(self.flows): | |
| x = flow(x, x_mask, g=g, reverse=reverse) | |
| return x | |
| class Encoder(nn.Module): | |
| def __init__(self, in_channels, out_channels, hidden_channels, kernel_size, dilation_rate, n_layers, gin_channels=0): | |
| super().__init__() | |
| self.out_channels = out_channels | |
| self.pre = nn.Conv1d(in_channels, hidden_channels, 1) | |
| self.enc = WN(hidden_channels, kernel_size, dilation_rate, n_layers, gin_channels=gin_channels) | |
| self.proj = nn.Conv1d(hidden_channels, out_channels * 2, 1) | |
| def forward(self, x, x_lengths, g=None): | |
| x_mask = torch.unsqueeze(sequence_mask(x_lengths, x.size(2)), 1).to(x.dtype) | |
| x = self.pre(x) * x_mask | |
| x = self.enc(x, x_mask, g=g) | |
| stats = self.proj(x) * x_mask | |
| m, logs = torch.split(stats, self.out_channels, dim=1) | |
| z = (m + torch.randn_like(m) * torch.exp(logs)) * x_mask | |
| return z, m, logs, x_mask | |
| class Generator(nn.Module): | |
| def __init__(self, initial_channel, resblock, resblock_kernel_sizes, resblock_dilation_sizes, | |
| upsample_rates, upsample_initial_channel, upsample_kernel_sizes, gin_channels=0): | |
| super().__init__() | |
| self.num_kernels = len(resblock_kernel_sizes) | |
| self.num_upsamples = len(upsample_rates) | |
| self.conv_pre = Conv1d(initial_channel, upsample_initial_channel, 7, 1, padding=3) | |
| resblock_cls = ResBlock1 if resblock == '1' else ResBlock2 | |
| self.ups = nn.ModuleList() | |
| for i, (u, k) in enumerate(zip(upsample_rates, upsample_kernel_sizes)): | |
| self.ups.append(weight_norm(ConvTranspose1d( | |
| upsample_initial_channel // (2 ** i), upsample_initial_channel // (2 ** (i + 1)), | |
| k, u, padding=(k - u) // 2))) | |
| self.resblocks = nn.ModuleList() | |
| for i in range(len(self.ups)): | |
| ch = upsample_initial_channel // (2 ** (i + 1)) | |
| for k, d in zip(resblock_kernel_sizes, resblock_dilation_sizes): | |
| self.resblocks.append(resblock_cls(ch, k, d)) | |
| self.conv_post = Conv1d(ch, 1, 7, 1, padding=3, bias=False) | |
| self.ups.apply(init_weights) | |
| if gin_channels != 0: | |
| self.cond = nn.Conv1d(gin_channels, upsample_initial_channel, 1) | |
| def forward(self, x, g=None): | |
| x = self.conv_pre(x) | |
| if g is not None: | |
| x = x + self.cond(g) | |
| for i in range(self.num_upsamples): | |
| x = F.leaky_relu(x, LRELU_SLOPE) | |
| x = self.ups[i](x) | |
| xs = sum(self.resblocks[i * self.num_kernels + j](x) for j in range(self.num_kernels)) | |
| x = xs / self.num_kernels | |
| x = F.leaky_relu(x) | |
| x = self.conv_post(x) | |
| return torch.tanh(x) | |
| class SynthesizerTrn(nn.Module): | |
| """Main FreeVC voice conversion model.""" | |
| def __init__(self, spec_channels, segment_size, inter_channels, hidden_channels, filter_channels, | |
| n_heads, n_layers, kernel_size, p_dropout, resblock, resblock_kernel_sizes, | |
| resblock_dilation_sizes, upsample_rates, upsample_initial_channel, | |
| upsample_kernel_sizes, gin_channels, ssl_dim, use_spk, **kwargs): | |
| super().__init__() | |
| self.segment_size = segment_size | |
| self.use_spk = use_spk | |
| self.enc_p = Encoder(ssl_dim, inter_channels, hidden_channels, 5, 1, 16) | |
| self.dec = Generator(inter_channels, resblock, resblock_kernel_sizes, resblock_dilation_sizes, | |
| upsample_rates, upsample_initial_channel, upsample_kernel_sizes, gin_channels=gin_channels) | |
| self.enc_q = Encoder(spec_channels, inter_channels, hidden_channels, 5, 1, 16, gin_channels=gin_channels) | |
| self.flow = ResidualCouplingBlock(inter_channels, hidden_channels, 5, 1, 4, gin_channels=gin_channels) | |
| def infer(self, c, g=None, c_lengths=None): | |
| if c_lengths is None: | |
| c_lengths = (torch.ones(c.size(0)) * c.size(-1)).to(c.device) | |
| g = g.unsqueeze(-1) | |
| z_p, m_p, logs_p, c_mask = self.enc_p(c, c_lengths) | |
| z = self.flow(z_p, c_mask, g=g, reverse=True) | |
| return self.dec(z * c_mask, g=g) | |
| # ============================================================ | |
| # SPEAKER ENCODER | |
| # ============================================================ | |
| def se_normalize_volume(wav, target_dBFS, increase_only=False): | |
| rms = np.sqrt(np.mean(wav ** 2)) | |
| if rms < 1e-10: | |
| return wav | |
| dBFS_change = target_dBFS - 10 * np.log10(np.mean(wav ** 2)) | |
| if dBFS_change < 0 and increase_only: | |
| return wav | |
| return wav * (10 ** (dBFS_change / 20)) | |
| def se_trim_long_silences(wav): | |
| samples_per_window = (SE_VAD_WINDOW_LENGTH * SE_SAMPLING_RATE) // 1000 | |
| wav = wav[:len(wav) - (len(wav) % samples_per_window)] | |
| int16_max = (2 ** 15) - 1 | |
| pcm_wave = struct.pack("%dh" % len(wav), *(np.round(wav * int16_max)).astype(np.int16)) | |
| voice_flags = [] | |
| vad = webrtcvad.Vad(mode=3) | |
| for window_start in range(0, len(wav), samples_per_window): | |
| window_end = window_start + samples_per_window | |
| voice_flags.append(vad.is_speech(pcm_wave[window_start * 2:window_end * 2], sample_rate=SE_SAMPLING_RATE)) | |
| voice_flags = np.array(voice_flags) | |
| def moving_average(array, width): | |
| array_padded = np.concatenate((np.zeros((width - 1) // 2), array, np.zeros(width // 2))) | |
| ret = np.cumsum(array_padded, dtype=float) | |
| ret[width:] = ret[width:] - ret[:-width] | |
| return ret[width - 1:] / width | |
| audio_mask = np.round(moving_average(voice_flags, SE_VAD_MOVING_AVG_WIDTH)).astype(bool) | |
| audio_mask = binary_dilation(audio_mask, np.ones(SE_VAD_MAX_SILENCE_LENGTH + 1)) | |
| audio_mask = np.repeat(audio_mask, samples_per_window) | |
| return wav[audio_mask] if len(wav[audio_mask]) > 0 else wav | |
| def se_wav_to_mel(wav): | |
| frames = librosa.feature.melspectrogram( | |
| y=wav, sr=SE_SAMPLING_RATE, | |
| n_fft=int(SE_SAMPLING_RATE * SE_MEL_WINDOW_LENGTH / 1000), | |
| hop_length=int(SE_SAMPLING_RATE * SE_MEL_WINDOW_STEP / 1000), | |
| n_mels=SE_MEL_N_CHANNELS | |
| ) | |
| return frames.astype(np.float32).T | |
| class SpeakerEncoder(nn.Module): | |
| """Speaker encoder for extracting voice embeddings.""" | |
| def __init__(self, weights_fpath, device=None): | |
| super().__init__() | |
| self.lstm = nn.LSTM(SE_MEL_N_CHANNELS, SE_MODEL_HIDDEN_SIZE, SE_MODEL_NUM_LAYERS, batch_first=True) | |
| self.linear = nn.Linear(SE_MODEL_HIDDEN_SIZE, SE_MODEL_EMBEDDING_SIZE) | |
| self.relu = nn.ReLU() | |
| self.device = device or torch.device("cpu") | |
| checkpoint = torch.load(weights_fpath, map_location="cpu") | |
| self.load_state_dict(checkpoint["model_state"], strict=False) | |
| self.to(self.device) | |
| def forward(self, mels): | |
| _, (hidden, _) = self.lstm(mels) | |
| embeds_raw = self.relu(self.linear(hidden[-1])) | |
| return embeds_raw / torch.norm(embeds_raw, dim=1, keepdim=True) | |
| def embed_utterance(self, wav, rate=1.3, min_coverage=0.75): | |
| # Preprocess | |
| if len(wav) == 0: | |
| return np.zeros(SE_MODEL_EMBEDDING_SIZE) | |
| wav = se_normalize_volume(wav, SE_AUDIO_NORM_TARGET_DBFS, increase_only=True) | |
| wav = se_trim_long_silences(wav) | |
| if len(wav) == 0: | |
| return np.zeros(SE_MODEL_EMBEDDING_SIZE) | |
| # Compute slices | |
| samples_per_frame = int((SE_SAMPLING_RATE * SE_MEL_WINDOW_STEP / 1000)) | |
| n_frames = int(np.ceil((len(wav) + 1) / samples_per_frame)) | |
| frame_step = int(np.round((SE_SAMPLING_RATE / rate) / samples_per_frame)) | |
| wav_slices, mel_slices = [], [] | |
| for i in range(0, max(1, n_frames - SE_PARTIALS_N_FRAMES + frame_step + 1), frame_step): | |
| mel_range = np.array([i, i + SE_PARTIALS_N_FRAMES]) | |
| wav_range = mel_range * samples_per_frame | |
| mel_slices.append(slice(*mel_range)) | |
| wav_slices.append(slice(*wav_range)) | |
| if len(wav_slices) == 0: | |
| mel_slices = [slice(0, SE_PARTIALS_N_FRAMES)] | |
| wav_slices = [slice(0, SE_PARTIALS_N_FRAMES * samples_per_frame)] | |
| max_wave_length = wav_slices[-1].stop | |
| if max_wave_length >= len(wav): | |
| wav = np.pad(wav, (0, max_wave_length - len(wav) + 1), "constant") | |
| mel = se_wav_to_mel(wav) | |
| mels = np.array([mel[s] if s.stop <= len(mel) else np.pad(mel, ((0, s.stop - len(mel)), (0, 0)))[s] for s in mel_slices]) | |
| with torch.no_grad(): | |
| mels = torch.from_numpy(mels).to(self.device) | |
| partial_embeds = self(mels).cpu().numpy() | |
| raw_embed = np.mean(partial_embeds, axis=0) | |
| return raw_embed / np.linalg.norm(raw_embed, 2) | |
| # ============================================================ | |
| # MODEL LOADING | |
| # ============================================================ | |
| _voicefixer_model = None | |
| _freevc_models = {} | |
| def load_voicefixer(): | |
| global _voicefixer_model | |
| if _voicefixer_model is None: | |
| print("Loading VoiceFixer...") | |
| from voicefixer import VoiceFixer | |
| _voicefixer_model = VoiceFixer() | |
| _voicefixer_model.eval() | |
| print("VoiceFixer loaded!") | |
| return _voicefixer_model | |
| def load_freevc(): | |
| global _freevc_models | |
| if not _freevc_models: | |
| print("Loading FreeVC...") | |
| os.makedirs("checkpoints", exist_ok=True) | |
| os.makedirs("speaker_encoder/ckpt", exist_ok=True) | |
| if not os.path.exists("checkpoints/freevc-24.pth"): | |
| hf_hub_download(repo_id="jn-jairo/freevc", filename="freevc_24.pth", | |
| local_dir="checkpoints", local_dir_use_symlinks=False) | |
| os.rename("checkpoints/freevc_24.pth", "checkpoints/freevc-24.pth") | |
| if not os.path.exists("speaker_encoder/ckpt/pretrained_bak_5805000.pt"): | |
| hf_hub_download(repo_id="jn-jairo/freevc", filename="speaker_encoder.pt", | |
| local_dir="speaker_encoder/ckpt", local_dir_use_symlinks=False) | |
| os.rename("speaker_encoder/ckpt/speaker_encoder.pt", "speaker_encoder/ckpt/pretrained_bak_5805000.pt") | |
| hps = get_hparams() | |
| freevc = SynthesizerTrn( | |
| hps.data.filter_length // 2 + 1, | |
| hps.train.segment_size // hps.data.hop_length, | |
| **FREEVC_CONFIG["model"] | |
| ).to(device) | |
| freevc.eval() | |
| load_checkpoint("checkpoints/freevc-24.pth", freevc) | |
| smodel = SpeakerEncoder("speaker_encoder/ckpt/pretrained_bak_5805000.pt", device=device) | |
| cmodel = WavLMModel.from_pretrained("microsoft/wavlm-large").to(device) | |
| cmodel.eval() | |
| _freevc_models = {'freevc': freevc, 'speaker_encoder': smodel, 'content_encoder': cmodel, 'hps': hps} | |
| print("FreeVC loaded!") | |
| return _freevc_models | |
| # ============================================================ | |
| # PROCESSING FUNCTIONS | |
| # ============================================================ | |
| def run_voicefixer(audio_path, mode=0): | |
| model = load_voicefixer() | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: | |
| output_path = tmp.name | |
| with torch.no_grad(): | |
| model.restore(input=audio_path, output=output_path, cuda=False, mode=mode) | |
| return output_path | |
| def run_freevc(source_path, target_path): | |
| models = load_freevc() | |
| hps = models['hps'] | |
| with torch.no_grad(): | |
| wav_tgt, _ = librosa.load(target_path, sr=hps.data.sampling_rate) | |
| wav_tgt, _ = librosa.effects.trim(wav_tgt, top_db=20) | |
| g_tgt = models['speaker_encoder'].embed_utterance(wav_tgt) | |
| g_tgt = torch.from_numpy(g_tgt).unsqueeze(0).to(device) | |
| wav_src, _ = librosa.load(source_path, sr=hps.data.sampling_rate) | |
| wav_src = torch.from_numpy(wav_src).unsqueeze(0).to(device) | |
| c = models['content_encoder'](wav_src).last_hidden_state.transpose(1, 2).to(device) | |
| audio = models['freevc'].infer(c, g=g_tgt) | |
| audio = audio[0][0].data.cpu().float().numpy() | |
| output_path = tempfile.mktemp(suffix=".wav") | |
| wav_write(output_path, 24000, audio) | |
| return output_path | |
| def process_audio(source_audio, target_audio, mode, voicefixer_mode="General", progress=gr.Progress()): | |
| """ | |
| Process audio with VoiceFixer and/or FreeVC. | |
| Args: | |
| source_audio: Path to source audio (speech to process) | |
| target_audio: Path to target audio (voice to mimic) | |
| mode: "VoiceFixer + FreeVC", "VoiceFixer only", or "FreeVC only" | |
| voicefixer_mode: "General", "Speech 44.1kHz", or "Speech 48kHz" | |
| Returns: | |
| Tuple of (output_path, status_message) | |
| """ | |
| if source_audio is None: | |
| return None, "Please upload source audio" | |
| if mode in ["VoiceFixer + FreeVC", "FreeVC only"] and target_audio is None: | |
| return None, "Please upload target voice audio" | |
| vf_mode_map = {"General": 0, "Speech 44.1kHz": 1, "Speech 48kHz": 2} | |
| vf_mode = vf_mode_map.get(voicefixer_mode, 0) | |
| try: | |
| if mode == "VoiceFixer only": | |
| progress(0.2, desc="Running VoiceFixer...") | |
| return run_voicefixer(source_audio, vf_mode), "Audio restored!" | |
| elif mode == "FreeVC only": | |
| progress(0.2, desc="Running FreeVC...") | |
| return run_freevc(source_audio, target_audio), "Voice converted!" | |
| else: | |
| progress(0.2, desc="Step 1/2: VoiceFixer...") | |
| cleaned = run_voicefixer(source_audio, vf_mode) | |
| progress(0.6, desc="Step 2/2: FreeVC...") | |
| output = run_freevc(cleaned, target_audio) | |
| os.unlink(cleaned) | |
| return output, "Cleaned and converted!" | |
| except Exception as e: | |
| return None, f"Error: {str(e)}" | |
| # ============================================================ | |
| # GRADIO UI | |
| # ============================================================ | |
| with gr.Blocks(title="VoiceFixer + FreeVC") as demo: | |
| gr.Markdown("# 🎤 VoiceFixer + FreeVC\n**Clean & Convert Voice**") | |
| with gr.Row(): | |
| with gr.Column(): | |
| mode = gr.Dropdown(["VoiceFixer + FreeVC", "VoiceFixer only", "FreeVC only"], | |
| value="VoiceFixer + FreeVC", label="Mode") | |
| source_audio = gr.Audio(label="Source Audio", type="filepath", sources=["upload", "microphone"]) | |
| target_audio = gr.Audio(label="Target Voice", type="filepath", sources=["upload", "microphone"]) | |
| with gr.Accordion("Settings", open=False): | |
| voicefixer_mode = gr.Dropdown(["General", "Speech 44.1kHz", "Speech 48kHz"], | |
| value="General", label="VoiceFixer Mode") | |
| process_btn = gr.Button("Process", variant="primary", size="lg") | |
| with gr.Column(): | |
| output_audio = gr.Audio(label="Output", type="filepath") | |
| status = gr.Textbox(label="Status") | |
| mode.change(lambda m: gr.update(visible=m != "VoiceFixer only"), [mode], [target_audio]) | |
| process_btn.click(process_audio, [source_audio, target_audio, mode, voicefixer_mode], | |
| [output_audio, status], api_name="process") | |
| gr.Examples( | |
| examples=[["examples/source_example.wav", "examples/target_example.mp3", "VoiceFixer + FreeVC", "General"]], | |
| inputs=[source_audio, target_audio, mode, voicefixer_mode], | |
| outputs=[output_audio, status], | |
| fn=process_audio, | |
| cache_examples=True, | |
| cache_mode="lazy", | |
| label="Examples" | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(mcp_server=True, show_error=True) | |