Elesh Vaishnav
Upload 71 files
f3d5970 verified
import os
import sys
import faiss
import numpy as np
import torch
import torch.nn.utils.parametrize
import torch.nn.functional as F
import torchaudio.transforms as tat
from torch import Tensor
now_dir = os.getcwd()
sys.path.append(now_dir)
from rvc.realtime.utils.torch import circular_write
from rvc.configs.config import Config
from rvc.infer.pipeline import Autotune, AudioProcessor
from rvc.lib.algorithm.synthesizers import Synthesizer
from rvc.lib.predictors.f0 import FCPE, RMVPE, SWIFT
from rvc.lib.utils import load_embedding, HubertModelWithFinalProj
class RealtimeVoiceConverter:
"""
A class for performing realtime voice conversion using the Retrieval-Based Voice Conversion (RVC) method.
"""
def __init__(self, weight_root):
"""
Initializes the RealtimeVoiceConverter with default configuration, and sets up models and parameters.
"""
self.config = Config() # Load configuration
self.tgt_sr = None # Target sampling rate for the output audio
self.net_g = None # Generator network for voice conversion
self.cpt = None # Checkpoint for loading model weights
self.version = None # Model version
self.use_f0 = None # Whether the model uses F0
# load weights and setup model network.
self.load_model(weight_root)
self.setup_network()
def load_model(self, weight_root):
"""
Loads the model weights from the specified path.
Args:
weight_root (str): Path to the model weights.
"""
self.cpt = (
torch.load(weight_root, map_location="cpu", weights_only=True)
if os.path.isfile(weight_root)
else None
)
def setup_network(self):
"""
Sets up the network configuration based on the loaded checkpoint.
"""
if self.cpt is not None:
self.tgt_sr = self.cpt["config"][-1]
self.cpt["config"][-3] = self.cpt["weight"]["emb_g.weight"].shape[0]
self.use_f0 = self.cpt.get("f0", 1)
self.version = self.cpt.get("version", "v1")
self.text_enc_hidden_dim = 768 if self.version == "v2" else 256
self.vocoder = self.cpt.get("vocoder", "HiFi-GAN")
self.net_g = Synthesizer(
*self.cpt["config"],
use_f0=self.use_f0,
text_enc_hidden_dim=self.text_enc_hidden_dim,
vocoder=self.vocoder,
)
self.net_g.load_state_dict(self.cpt["weight"], strict=False)
strip_parametrizations(self.net_g)
self.net_g = self.net_g.to(self.config.device).float()
self.net_g.eval()
# self.net_g.remove_weight_norm()
def inference(
self,
feats: Tensor,
p_len: Tensor,
sid: Tensor,
pitch: Tensor,
pitchf: Tensor,
):
output = self.net_g.infer(feats, p_len, pitch, pitchf, sid)[0][0, 0]
return torch.clip(output, -1.0, 1.0, out=output)
class Realtime_Pipeline:
def __init__(
self,
vc: RealtimeVoiceConverter,
hubert_model: HubertModelWithFinalProj = None,
index=None,
big_npy=None,
f0_method: str = "rmvpe",
sid: int = 0,
):
self.vc = vc
self.hubert_model = hubert_model
self.index = index
self.big_npy = big_npy
self.use_f0 = vc.use_f0
self.version = vc.version
self.f0_method = f0_method
self.sample_rate = 16000
self.tgt_sr = vc.tgt_sr
self.window = 160
self.model_window = self.tgt_sr // 100
self.f0_min = 50.0
self.f0_max = 1100.0
self.device = vc.config.device
self.sid = torch.tensor([sid], device=self.device, dtype=torch.int64)
self.autotune = Autotune()
self.resamplers = {}
self.f0_model = None
def get_f0(
self,
x: Tensor,
pitch: Tensor = None,
pitchf: Tensor = None,
f0_up_key: int = 0,
f0_autotune: bool = False,
f0_autotune_strength: float = 1.0,
proposed_pitch: bool = False,
proposed_pitch_threshold: float = 155.0,
):
"""
Estimates the fundamental frequency (F0) of a given audio signal using various methods.
"""
if torch.is_tensor(x):
# If the input is a tensor, it will need to be converted to numpy array to calculate with RMVPE and FCPE.
x = x.cpu().numpy()
if self.f0_method == "rmvpe":
if self.f0_model is None:
self.f0_model = RMVPE(
device=self.device,
sample_rate=self.sample_rate,
hop_size=self.window,
)
f0 = self.f0_model.get_f0(x, filter_radius=0.03)
elif self.f0_method == "fcpe":
if self.f0_model is None:
self.f0_model = FCPE(
device=self.device,
sample_rate=self.sample_rate,
hop_size=self.window,
)
f0 = self.f0_model.get_f0(x, x.shape[0] // self.window, filter_radius=0.006)
elif self.f0_method == "swift":
if self.f0_model is None:
self.f0_model = SWIFT(
device=self.device,
sample_rate=self.sample_rate,
hop_size=self.window,
)
f0 = self.f0_model.get_f0(
x,
self.f0_min,
self.f0_max,
x.shape[0] // self.window,
confidence_threshold=0.887,
)
# f0 adjustments
if f0_autotune is True:
f0 = self.autotune.autotune_f0(f0, f0_autotune_strength)
elif proposed_pitch is True:
limit = 12
# calculate median f0 of the audio
valid_f0 = np.where(f0 > 0)[0]
if len(valid_f0) < 2:
# no valid f0 detected
up_key = 0
else:
median_f0 = float(
np.median(np.interp(np.arange(len(f0)), valid_f0, f0[valid_f0]))
)
if median_f0 <= 0 or np.isnan(median_f0):
up_key = 0
else:
# calculate proposed shift
up_key = max(
-limit,
min(
limit,
int(
np.round(
12 * np.log2(proposed_pitch_threshold / median_f0)
)
),
),
)
print(
"calculated pitch offset:", up_key
) # Might need to hide so terminal output doesn't become a mess
f0 *= pow(2, (f0_up_key + up_key) / 12)
else:
f0 *= pow(2, f0_up_key / 12)
# Convert to Tensor for computational use
f0 = torch.from_numpy(f0).to(self.device).float()
# quantizing f0 to 255 buckets to make coarse f0
f0_mel = 1127.0 * torch.log(1.0 + f0 / 700.0)
f0_mel = torch.clip(
(f0_mel - self.f0_min) * 254 / (self.f0_max - self.f0_min) + 1,
1,
255,
out=f0_mel,
)
f0_coarse = torch.round(f0_mel, out=f0_mel).long()
if pitch is not None and pitchf is not None:
circular_write(f0_coarse, pitch)
circular_write(f0, pitchf)
else:
pitch = f0_coarse
pitchf = f0
return pitch.unsqueeze(0), pitchf.unsqueeze(0)
def voice_conversion(
self,
audio: Tensor,
pitch: Tensor = None,
pitchf: Tensor = None,
f0_up_key: int = 0,
index_rate: float = 0.5,
p_len: int = 0,
silence_front: int = 0,
skip_head: int = None,
return_length: int = None,
protect: float = 0.5,
volume_envelope: float = 1,
f0_autotune: bool = False,
f0_autotune_strength: float = 1,
proposed_pitch: bool = False,
proposed_pitch_threshold: float = 155.0,
):
"""
Performs realtime voice conversion on a given audio segment.
"""
assert audio.dim() == 1, audio.dim()
feats = audio.view(1, -1).to(self.device)
formant_length = int(np.ceil(return_length * 1.0))
pitch, pitchf = (
self.get_f0(
audio[silence_front:],
pitch,
pitchf,
f0_up_key,
f0_autotune,
f0_autotune_strength,
proposed_pitch,
proposed_pitch_threshold,
)
if self.use_f0
else (None, None)
)
# extract features
feats = self.hubert_model(feats)["last_hidden_state"]
feats = (
self.hubert_model.final_proj(feats[0]).unsqueeze(0)
if self.version == "v1"
else feats
)
feats = torch.cat((feats, feats[:, -1:, :]), 1)
# make a copy for pitch guidance and protection
feats0 = feats.detach().clone() if self.use_f0 else None
if (
self.index
): # set by parent function, only true if index is available, loaded, and index rate > 0
feats = self._retrieve_speaker_embeddings(
skip_head, feats, self.index, self.big_npy, index_rate
)
# feature upsampling
feats = F.interpolate(feats.permute(0, 2, 1), scale_factor=2).permute(0, 2, 1)[
:, :p_len, :
]
if self.use_f0:
feats0 = F.interpolate(feats0.permute(0, 2, 1), scale_factor=2).permute(
0, 2, 1
)[:, :p_len, :]
pitch, pitchf = pitch[:, -p_len:], pitchf[:, -p_len:] * (
formant_length / return_length
)
# Pitch protection blending
if protect < 0.5:
pitchff = pitchf.detach().clone()
pitchff[pitchf > 0] = 1
pitchff[pitchf < 1] = protect
feats = feats * pitchff.unsqueeze(-1) + feats0 * (
1 - pitchff.unsqueeze(-1)
)
feats = feats.to(feats0.dtype)
else:
pitch, pitchf = None, None
p_len = torch.tensor([p_len], device=self.device, dtype=torch.int64)
out_audio = self.vc.inference(feats, p_len, self.sid, pitch, pitchf).float()
if volume_envelope != 1:
out_audio = AudioProcessor.change_rms(
audio, self.sample_rate, out_audio, self.tgt_sr, volume_envelope
)
scaled_window = int(np.floor(1.0 * self.model_window))
if scaled_window != self.model_window:
if scaled_window not in self.resamplers:
self.resamplers[scaled_window] = tat.Resample(
orig_freq=scaled_window,
new_freq=self.model_window,
dtype=torch.float32,
).to(self.device)
out_audio = self.resamplers[scaled_window](
out_audio[: return_length * scaled_window]
)
return out_audio
def _retrieve_speaker_embeddings(
self, skip_head, feats, index, big_npy, index_rate
):
skip_offset = skip_head // 2
npy = feats[0][skip_offset:].cpu().numpy()
score, ix = index.search(npy, k=8)
weight = np.square(1 / score)
weight /= weight.sum(axis=1, keepdims=True)
npy = np.sum(big_npy[ix] * np.expand_dims(weight, axis=2), axis=1)
feats[0][skip_offset:] = (
torch.from_numpy(npy).unsqueeze(0).to(self.device) * index_rate
+ (1 - index_rate) * feats[0][skip_offset:]
)
return feats
def load_faiss_index(file_index):
if file_index != "" and os.path.exists(file_index):
try:
index = faiss.read_index(file_index)
big_npy = index.reconstruct_n(0, index.ntotal)
except Exception as error:
print(f"An error occurred reading the FAISS index: {error}")
index = big_npy = None
else:
index = big_npy = None
return index, big_npy
def create_pipeline(
model_path: str = None,
index_path: str = None,
f0_method: str = "rmvpe",
embedder_model: str = None,
embedder_model_custom: str = None,
# device: str = "cuda",
sid: int = 0,
):
"""
Initialize real-time voice conversion pipeline.
"""
vc = RealtimeVoiceConverter(model_path)
index, big_npy = load_faiss_index(
index_path.strip()
.strip('"')
.strip("\n")
.strip('"')
.strip()
.replace("trained", "added")
)
hubert_model = load_embedding(embedder_model, embedder_model_custom)
hubert_model = hubert_model.to(vc.config.device).float()
hubert_model.eval()
pipeline = Realtime_Pipeline(
vc,
hubert_model,
index,
big_npy,
f0_method,
sid,
)
return pipeline
def strip_parametrizations(module: torch.nn.Module):
"""
Remove all parametrizations (e.g., weight norm) from a module and log each removal.
"""
for name, submodule in module.named_modules():
if hasattr(submodule, "parametrizations"):
for pname, plist in list(submodule.parametrizations.items()):
# print(f"Removing parametrizations from {name}.{pname}: {[p.__class__.__name__ for p in plist]}")
torch.nn.utils.parametrize.remove_parametrizations(
submodule, pname, leave_parametrized=True
)