import os
import gc
import ast
import requests
import sys
import shutil
import zipfile
import gradio as gr
import urllib.request
import gdown
import tempfile
from datetime import datetime
current_dir = os.getcwd()
dirs = [
"voice_models",
"vbach",
os.path.join("vbach", "cli"),
os.path.join("vbach", "infer"),
os.path.join("vbach", "lib"),
os.sep.join(["vbach", "lib", "algorithm"]),
os.sep.join(["vbach", "lib", "predictors"]),
os.path.join("vbach", "models"),
os.sep.join(["vbach", "models", "predictors"]),
os.sep.join(["vbach", "models", "embedders"]),
os.path.join("vbach", "scripts"),
os.path.join("vbach", "utils")
]
RMVPE_PATH = os.path.join(dirs[8], "rmvpe.pt")
FCPE_PATH = os.path.join(dirs[8], "fcpe.pt")
RVC_MODELS_DIR = dirs[0]
HUBERT_MODEL_PATH = os.path.join(
dirs[9], "hubert_base.pt"
)
CURRENT_LANG = "ru"
OUTPUT_FORMAT = ["mp3", "wav", "flac", "aiff", "m4a", "aac", "ogg", "opus"]
TRANSLATIONS = {
"ru": {
"app_title": "VBach",
"inference": "Инференс",
"select_file": "Выберите файл",
"audio_path": "Путь к файлу",
"audio_path_info": "Здесь можно ввести путь к файлу/список путей к файлам , либо загрузить его/их выше и получить путь к нему/их список",
"audio_processing": "Режим обработки аудио",
"output_format": "Формат вывода",
"name_format": "Шаблон",
"name_format_info": """Доступные ключи для формата:
NAME - Имя входного файла
MODEL - Название модели
PITCH - Высота тона
F0_METHOD - Метод извлечения тона
DATETIME - Время и дата создания результата
Пример - NAME_MODEL_PITCH → name_your-model_12""",
"convert_single": "Конвертировать один",
"convert_batch": "Конвертировать несколько",
"model_name": "Имя модели",
"pitch_method": "Метод извлечения тона",
"pitch": "Высота тона",
"hop_length": "Длина шага",
"bitrate": "Битрейт (Кбит/сек)",
"f0_min": "Нижний лимит определения высоты тона",
"f0_max": "Верхний лимит определения высоты тона",
"advanced_settings": "Дополнительные настройки",
"filter_radius": "Радиус фильтра",
"index_rate": "Влияние индекса",
"rms": "Огибающая громкости",
"protect": "Защита согласных",
"model_manager": "Менеджер моделей",
"download_url": "Загрузить по ссылке",
"download_zip": "Загрузить ZIP архивом",
"download_files": "Загрузить файлами",
"delete_model": "Удалить модель",
"download_link": "Ссылка на загрузку модели",
"unique_name": "Дайте вашей загружаемой модели уникальное имя, отличное от других голосовых моделей.",
"download_button": "Загрузить модель",
"supported_sites": "Поддерживаемые сайты",
"output_message": "Сообщение вывода",
"zip_file": "Zip-файл",
"upload_steps": "
1. Найдите и скачайте файлы: .pth и необязательный файл .index
2. Закиньте файл(-ы) в ZIP-архив и поместите его в область загрузки
3. Дождитесь полной загрузки ZIP-архива в интерфейс
",
"pth_file": "pth-файл",
"index_file": "index-файл",
"delete_info": "Выберите модель, которую надо удалить",
"refresh_button": "Обновить список моделей",
"delete_button": "Удалить модель",
"batch_upload": "Пакетная загрузка",
"single_upload": "Одиночная загрузка",
"converted_voice": "Преобразованный вокал",
"converted_voices": "Преобразованные вокалы",
"update_button": "Обновить",
"processing": "Сейчас обрабатывается - {namefile}",
"files": "файлов",
"error_no_audio": "Не удалось найти аудиофайл(ы). Убедитесь, что файл загрузился или проверьте правильность пути к нему.",
"error_no_model": "Выберите модель голоса для преобразования голоса",
"warning_file_not_found": "Файл {file} не найден.",
"success_single": "Вокал успешно преобразован",
"success_batch": "Вокалы успешно преобразованы",
"language": "Язык",
"stereo_modes": {
"mono": "Моно",
"left/right": "Левый/Правый",
"sim/dif": "Сходство/Различия"
},
# Прогресс-бары
'downloading_google': "[~] Загрузка модели с Google Drive...",
'downloading_huggingface': "[~] Загрузка модели с HuggingFace...",
'downloading_pixeldrain': "[~] Загрузка модели с Pixeldrain...",
'downloading_yandex': "[~] Загрузка модели с Яндекс Диска...",
'downloading_model': "[~] Загрузка голосовой модели {dir_name}...",
'unpacking_zip': "[~] Распаковка zip-файла...",
# Уведомления об ошибках
'unsupported_source': "Неподдерживаемый источник: {url}",
'download_error': "Ошибка при скачивании: {error}",
'yandex_api_error': "Ошибка при получении ссылки с Яндекс Диска: {status}",
'pth_not_found': "Не найден файл модели .pth в распакованном zip-файле. Проверьте содержимое в {folder}.",
'model_exists': "Директория голосовой модели {dir_name} уже существует! Выберите другое имя.",
'model_load_error': "Ошибка при загрузке модели: {error}",
'model_delete_error': "Ошибка при удалении модели: {error}",
# Статус операции
'mega_unsupported': "Mega не поддерживается!",
'model_uploaded': "[+] Модель {dir_name} успешно загружена!",
'model_deleted': "[-] Модель {dir_name} успешно удалена!",
'model_not_found': "[-] Модели {dir_name} не существует",
"error_strlist_is_not_list": "Эта строка не является списком файлов",
"error_path_is_list": "Путь к файлу является списком"
},
"en": {
"app_title": "VBach",
"inference": "Inference",
"select_file": "Select File",
"audio_path": "Audio path",
"audio_path_info": "You can enter a file path or a list of file paths here, or upload the file(s) above to obtain their path(s)",
"audio_processing": "Audio Processing Mode",
"output_format": "Output Format",
"name_format": "Template",
"name_format_info": """Available format keys:
NAME - Input file name
MODEL - Model name
PITCH - Pitch
F0_METHOD - Method extraction pitch
DATETIME - Date & time create results
Example - NAME_MODEL_PITCH → name_your-model_12""",
"convert_single": "Convert Single",
"convert_batch": "Convert Batch",
"model_name": "Model Name",
"pitch_method": "Pitch Extraction Method",
"pitch": "Pitch",
"hop_length": "Hop Length",
"bitrate": "Bitrate (Kbit/sec)",
"f0_min": "F0 Min",
"f0_max": "F0 Max",
"advanced_settings": "Advanced Settings",
"filter_radius": "Filter Radius",
"index_rate": "Index Rate",
"rms": "RMS Envelope",
"protect": "Consonant Protection",
"model_manager": "Model Manager",
"download_url": "Download by URL",
"download_zip": "Upload ZIP Archive",
"download_files": "Upload Files",
"delete_model": "Delete Model",
"download_link": "Model Download Link",
"unique_name": "Give your model a unique name different from other voice models.",
"download_button": "Download Model",
"supported_sites": "Supported Sites",
"output_message": "Output Message",
"zip_file": "Zip File",
"upload_steps": "1. Find and download files: .pth and optional .index
2. Put file(s) in a ZIP archive and upload it
3. Wait for the ZIP archive to be fully uploaded
",
"pth_file": "PTH File",
"index_file": "Index File",
"delete_info": "Select the model to delete",
"refresh_button": "Refresh Model List",
"delete_button": "Delete Model",
"batch_upload": "Batch Upload",
"single_upload": "Single Upload",
"converted_voice": "Converted Voice",
"converted_voices": "Converted Voices",
"update_button": "Refresh",
"processing": "Processing - {namefile}",
"files": "files",
"error_no_audio": "Could not find audio file(s). Make sure the file is uploaded or check the file path.",
"error_no_model": "Select a voice model for voice conversion",
"warning_file_not_found": "File {file} not found.",
"success_single": "Voice successfully converted",
"success_batch": "Voices successfully converted",
"language": "Language",
"stereo_modes": {
"mono": "Mono",
"left/right": "Left/Right",
"sim/dif": "Similarity/Difference"
},
'downloading_google': "[~] Downloading model from Google Drive...",
'downloading_huggingface': "[~] Downloading model from HuggingFace...",
'downloading_pixeldrain': "[~] Downloading model from Pixeldrain...",
'downloading_yandex': "[~] Downloading model from Yandex Disk...",
'downloading_model': "[~] Downloading voice model {dir_name}...",
'unpacking_zip': "[~] Unpacking zip file...",
# Error messages
'unsupported_source': "Unsupported source: {url}",
'download_error': "Download error: {error}",
'yandex_api_error': "Yandex Disk API error: {status}",
'pth_not_found': "Model .pth file not found in unzipped archive. Check contents in {folder}.",
'model_exists': "Voice model directory {dir_name} already exists! Choose another name.",
'model_load_error': "Error loading model: {error}",
'model_delete_error': "Error deleting model: {error}",
# Operation status
'mega_unsupported': "Mega is not supported!",
'model_uploaded': "[+] Model {dir_name} uploaded successfully!",
'model_deleted': "[-] Model {dir_name} deleted successfully!",
'model_not_found': "[-] Model {dir_name} does not exist",
"error_strlist_is_not_list": "This string is not a file list",
"error_path_is_list": "The file path is a list"
}
}
for dir in dirs:
os.makedirs(os.path.join(current_dir, dir), exist_ok=True)
for url, file in [["https://huggingface.co/Politrees/RVC_resources/resolve/main/predictors/rmvpe.pt", RMVPE_PATH], ["https://huggingface.co/Politrees/RVC_resources/resolve/main/predictors/fcpe.pt", FCPE_PATH], ["https://huggingface.co/Politrees/RVC_resources/resolve/main/embedders/hubert_base.pt", HUBERT_MODEL_PATH]]:
if not os.path.exists(file):
try:
r = requests.get(url, stream=True)
r.raise_for_status()
with open(os.path.join(file), "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
except requests.exceptions.RequestException as e:
print(f"Произошла ошибка при загрузке модели: {e}")
except Exception as e:
print(f"Произошла непредвиденная ошибка: {e}")
inference = '''
import torch
import numpy as np
import librosa
from multiprocessing import cpu_count
from fairseq import checkpoint_utils
from vbach.lib.algorithm.synthesizers import Synthesizer
from .pipeline import VC
from separator.audio_writer import write_audio_file
from vbach.utils.remove_center import remove_center
def overlay_mono_on_stereo(mono_audio, stereo_audio, gain=0.5):
if mono_audio is None or stereo_audio is None:
raise ValueError("Input audio arrays cannot be None")
# Ensure float32 for processing
mono_audio = mono_audio.astype(np.float32)
stereo_audio = stereo_audio.astype(np.float32)
# Convert mono to stereo if needed
if mono_audio.ndim == 1:
mono_audio = np.vstack([mono_audio, mono_audio])
elif mono_audio.shape[0] == 1:
mono_audio = np.vstack([mono_audio[0], mono_audio[0]])
if mono_audio.shape[0] != 2 or stereo_audio.shape[0] != 2:
raise ValueError("Shapes must be (2, N)")
min_len = min(mono_audio.shape[1], stereo_audio.shape[1])
if min_len == 0:
raise ValueError("Audio arrays cannot be empty")
mono_audio = mono_audio[:, :min_len]
stereo_audio = stereo_audio[:, :min_len]
result = stereo_audio + mono_audio * gain
# Normalize to prevent clipping
max_amp = np.max(np.abs(result))
if max_amp > 0:
result /= max_amp
# Convert back to int16 for output (if needed)
result = (result * 32767).astype(np.int16)
return result
def load_audio(
file_path: str,
target_sr: int,
stereo_mode: str
) -> np.ndarray:
"""
Загружает аудиофайл с помощью librosa, обрабатывает и возвращает аудиосигнал
Параметры:
file_path: Путь к аудиофайлу
target_sr: Целевая частота дискретизации
mono: Преобразовать в моно (по умолчанию True)
normalize: Нормализовать аудио (по умолчанию False)
duration: Загрузить только указанную длительность (в секундах)
offset: Начальное смещение для загрузки (в секундах)
Возвращает:
Аудиоданные в виде numpy array (моно: (samples,), стерео: (channels, samples))
Исключения:
RuntimeError: При ошибках загрузки или обработки аудио
"""
try:
mid, left, right = None, None, None
if stereo_mode == "mono":
# Загрузка аудио с помощью librosa
mid_audio, sr = librosa.load(
file_path,
sr=None,
mono=True
)
mid_audio = librosa.resample(
mid_audio, # Исправлено: было audio
orig_sr=sr,
target_sr=target_sr
)
mid = mid_audio.flatten()
elif stereo_mode == "left/right" or stereo_mode == "sim/dif":
# Загрузка аудио с помощью librosa
stereo_audio, sr = librosa.load(
file_path,
sr=None,
mono=False
)
if stereo_mode == "left/right":
left_audio = stereo_audio[0] # Исправлено: было [:, 0]
right_audio = stereo_audio[1] # Исправлено: было [:, 1]
left_audio = librosa.resample(
left_audio,
orig_sr=sr,
target_sr=target_sr
)
right_audio = librosa.resample(
right_audio,
orig_sr=sr,
target_sr=target_sr
)
left = left_audio.flatten()
right = right_audio.flatten()
elif stereo_mode == "sim/dif":
mid_left, mid_right, dif_left, dif_right = remove_center(input_array=stereo_audio, samplerate=sr)
mid_audio = (mid_left + mid_right) * 0.5
mid_audio = librosa.resample(
mid_audio,
orig_sr=sr,
target_sr=target_sr
)
dif_left = librosa.resample(
dif_left,
orig_sr=sr,
target_sr=target_sr
)
dif_right = librosa.resample(
dif_right,
orig_sr=sr,
target_sr=target_sr
)
mid = mid_audio.flatten()
left = dif_left.flatten() # Исправлено: было left_audio
right = dif_right.flatten() # Исправлено: было right_audio
return mid, left, right
except Exception as e:
raise RuntimeError(f"Ошибка загрузки аудио '{file_path}': {str(e)}")
class Config:
def __init__(self):
self.device = self.get_device()
self.is_half = self.device == "cpu"
self.n_cpu = cpu_count()
self.gpu_name = None
self.gpu_mem = None
self.x_pad, self.x_query, self.x_center, self.x_max = self.device_config()
def get_device(self):
if torch.cuda.is_available():
return "cuda"
elif torch.backends.mps.is_available():
return "mps"
else:
return "cpu"
def device_config(self):
if torch.cuda.is_available():
print("Используется устройство CUDA")
self._configure_gpu()
elif torch.backends.mps.is_available():
print("Используется устройство MPS")
self.device = "mps"
else:
print("Используется CPU")
self.device = "cpu"
self.is_half = True
x_pad, x_query, x_center, x_max = (
(3, 10, 60, 65) if self.is_half else (1, 6, 38, 41)
)
if self.gpu_mem is not None and self.gpu_mem <= 4:
x_pad, x_query, x_center, x_max = (1, 5, 30, 32)
return x_pad, x_query, x_center, x_max
def _configure_gpu(self):
self.gpu_name = torch.cuda.get_device_name(self.device)
low_end_gpus = ["16", "P40", "P10", "1060", "1070", "1080"]
if (
any(gpu in self.gpu_name for gpu in low_end_gpus)
and "V100" not in self.gpu_name.upper()
):
self.is_half = False
self.gpu_mem = int(
torch.cuda.get_device_properties(self.device).total_memory
/ 1024
/ 1024
/ 1024
+ 0.4
)
# Загрузка модели Hubert
def load_hubert(device, is_half, model_path):
models, saved_cfg, task = checkpoint_utils.load_model_ensemble_and_task(
[model_path], suffix=""
)
hubert = models[0].to(device)
hubert = hubert.half() if is_half else hubert.float()
hubert.eval()
return hubert
# Получение голосового преобразователя
def get_vc(device, is_half, config, model_path):
cpt = torch.load(model_path, map_location="cpu", weights_only=False)
if "config" not in cpt or "weight" not in cpt:
raise ValueError(
f"Некорректный формат для {model_path}. "
"Используйте голосовую модель, обученную с использованием RVC v2."
)
tgt_sr = cpt["config"][-1]
cpt["config"][-3] = cpt["weight"]["emb_g.weight"].shape[0]
pitch_guidance = cpt.get("f0", 1)
version = cpt.get("version", "v1")
input_dim = 768 if version == "v2" else 256
net_g = Synthesizer(
*cpt["config"],
use_f0=pitch_guidance,
input_dim=input_dim,
is_half=is_half,
)
del net_g.enc_q
print(net_g.load_state_dict(cpt["weight"], strict=False))
net_g.eval().to(device)
net_g = net_g.half() if is_half else net_g.float()
vc = VC(tgt_sr, config)
return cpt, version, net_g, tgt_sr, vc
def rvc_infer(
index_path,
index_rate,
input_path,
output_path,
pitch,
f0_method,
cpt,
version,
net_g,
filter_radius,
tgt_sr,
volume_envelope,
protect,
hop_length,
vc,
hubert_model,
f0_min=50,
f0_max=1100,
format_output="wav",
output_bitrate="320k",
stereo_mode="mono"
):
mid, left, right = load_audio(input_path, 16000, stereo_mode)
pitch_guidance = cpt.get("f0", 1)
if stereo_mode == "mono":
if mid is None:
raise ValueError("Mono audio data is None")
audio_opt = vc.pipeline(
hubert_model,
net_g,
0,
mid,
input_path,
pitch,
f0_method,
index_path,
index_rate,
pitch_guidance,
filter_radius,
tgt_sr,
0,
volume_envelope,
version,
protect,
hop_length,
f0_file=None,
f0_min=f0_min,
f0_max=f0_max,
)
elif stereo_mode == "left/right":
if left is None or right is None:
raise ValueError("Left or right audio channel is None")
left_audio_opt = vc.pipeline(
hubert_model,
net_g,
0,
left,
input_path,
pitch,
f0_method,
index_path,
index_rate,
pitch_guidance,
filter_radius,
tgt_sr,
0,
volume_envelope,
version,
protect,
hop_length,
f0_file=None,
f0_min=f0_min,
f0_max=f0_max,
)
right_audio_opt = vc.pipeline(
hubert_model,
net_g,
0,
right,
input_path,
pitch,
f0_method,
index_path,
index_rate,
pitch_guidance,
filter_radius,
tgt_sr,
0,
volume_envelope,
version,
protect,
hop_length,
f0_file=None,
f0_min=f0_min,
f0_max=f0_max,
)
# Ensure both channels have the same length
min_len = min(len(left_audio_opt), len(right_audio_opt))
if min_len == 0:
raise ValueError("Processed audio is empty")
left_audio_opt = left_audio_opt[:min_len]
right_audio_opt = right_audio_opt[:min_len]
audio_opt = np.stack((left_audio_opt, right_audio_opt), axis=0)
elif stereo_mode == "sim/dif":
if mid is None or left is None or right is None:
raise ValueError("Mid, left or right audio channel is None")
mid_audio_opt = vc.pipeline(
hubert_model,
net_g,
0,
mid,
input_path,
pitch,
f0_method,
index_path,
index_rate,
pitch_guidance,
filter_radius,
tgt_sr,
0,
volume_envelope,
version,
protect,
hop_length,
f0_file=None,
f0_min=f0_min,
f0_max=f0_max,
)
left_audio_opt = vc.pipeline(
hubert_model,
net_g,
0,
left,
input_path,
pitch,
f0_method,
index_path,
index_rate,
pitch_guidance,
filter_radius,
tgt_sr,
0,
volume_envelope,
version,
protect,
hop_length,
f0_file=None,
f0_min=f0_min,
f0_max=f0_max,
)
right_audio_opt = vc.pipeline(
hubert_model,
net_g,
0,
right,
input_path,
pitch,
f0_method,
index_path,
index_rate,
pitch_guidance,
filter_radius,
tgt_sr,
0,
volume_envelope,
version,
protect,
hop_length,
f0_file=None,
f0_min=f0_min,
f0_max=f0_max,
)
# Ensure all channels have the same length
min_len = min(len(mid_audio_opt), len(left_audio_opt), len(right_audio_opt))
if min_len == 0:
raise ValueError("Processed audio is empty")
mid_audio_opt = mid_audio_opt[:min_len]
left_audio_opt = left_audio_opt[:min_len]
right_audio_opt = right_audio_opt[:min_len]
dif_audio_opt = np.stack((left_audio_opt, right_audio_opt), axis=0)
audio_opt = overlay_mono_on_stereo(mid_audio_opt, dif_audio_opt)
write_audio_file(output_path, audio_opt, tgt_sr, format_output, output_bitrate)
return output_path
'''
pipeline = '''
import os
import gc
import torch
import torch.nn.functional as F
import torchcrepe
import faiss
import librosa
import numpy as np
from scipy import signal
from vbach.lib.predictors.FCPE import FCPEF0Predictor
from vbach.lib.predictors.RMVPE import RMVPE0Predictor
PREDICTORS_DIR = os.path.join(os.getcwd(), "vbach", "models", "predictors")
RMVPE_DIR = os.path.join(PREDICTORS_DIR, "rmvpe.pt")
FCPE_DIR = os.path.join(PREDICTORS_DIR, "fcpe.pt")
# Фильтр Баттерворта для высоких частот
FILTER_ORDER = 5 # Порядок фильтра
CUTOFF_FREQUENCY = 48 # Частота среза (в Гц)
SAMPLE_RATE = 16000 # Частота дискретизации (в Гц)
bh, ah = signal.butter(N=FILTER_ORDER, Wn=CUTOFF_FREQUENCY, btype="high", fs=SAMPLE_RATE)
input_audio_path2wav = {}
# Класс для обработки аудио
class AudioProcessor:
@staticmethod
def change_rms(source_audio, source_rate, target_audio, target_rate, rate):
"""
Изменяет RMS (среднеквадратичное значение) аудио.
"""
rms1 = librosa.feature.rms(
y=source_audio,
frame_length=source_rate // 2 * 2,
hop_length=source_rate // 2,
)
rms2 = librosa.feature.rms(
y=target_audio,
frame_length=target_rate // 2 * 2,
hop_length=target_rate // 2,
)
rms1 = F.interpolate(
torch.from_numpy(rms1).float().unsqueeze(0),
size=target_audio.shape[0],
mode="linear",
).squeeze()
rms2 = F.interpolate(
torch.from_numpy(rms2).float().unsqueeze(0),
size=target_audio.shape[0],
mode="linear",
).squeeze()
rms2 = torch.maximum(rms2, torch.zeros_like(rms2) + 1e-6)
adjusted_audio = (
target_audio * (torch.pow(rms1, 1 - rate) * torch.pow(rms2, rate - 1)).numpy()
)
return adjusted_audio
# Класс для преобразования голоса
class VC:
def __init__(self, tgt_sr, config):
"""
Инициализация параметров для преобразования голоса.
"""
self.x_pad = config.x_pad
self.x_query = config.x_query
self.x_center = config.x_center
self.x_max = config.x_max
self.is_half = config.is_half
self.sample_rate = 16000
self.window = 160
self.t_pad = self.sample_rate * self.x_pad
self.t_pad_tgt = tgt_sr * self.x_pad
self.t_pad2 = self.t_pad * 2
self.t_query = self.sample_rate * self.x_query
self.t_center = self.sample_rate * self.x_center
self.t_max = self.sample_rate * self.x_max
self.time_step = self.window / self.sample_rate * 1000
self.device = config.device
def get_f0_crepe(self, x, f0_min, f0_max, p_len, hop_length, model="full"):
"""
Получает F0 с использованием модели crepe.
"""
x = x.astype(np.float32)
x /= np.quantile(np.abs(x), 0.999)
audio = torch.from_numpy(x).to(self.device, copy=True).unsqueeze(0)
if audio.ndim == 2 and audio.shape[0] > 1:
audio = torch.mean(audio, dim=0, keepdim=True)
pitch = torchcrepe.predict(
audio,
self.sample_rate,
hop_length,
f0_min,
f0_max,
model,
batch_size=hop_length * 2,
device=self.device,
pad=True,
)
p_len = p_len or x.shape[0] // hop_length
source = np.array(pitch.squeeze(0).cpu().float().numpy())
source[source < 0.001] = np.nan
target = np.interp(
np.arange(0, len(source) * p_len, len(source)) / p_len,
np.arange(0, len(source)),
source,
)
f0 = np.nan_to_num(target)
return f0
def get_f0_rmvpe(self, x, f0_min=1, f0_max=40000, *args, **kwargs):
"""
Получает F0 с использованием модели rmvpe.
"""
if not hasattr(self, "model_rmvpe"):
self.model_rmvpe = RMVPE0Predictor(
RMVPE_DIR, is_half=self.is_half, device=self.device
)
f0 = self.model_rmvpe.infer_from_audio_with_pitch(
x, thred=0.03, f0_min=f0_min, f0_max=f0_max
)
return f0
def get_f0(
self,
input_audio_path,
x,
p_len,
pitch,
f0_method,
filter_radius,
hop_length,
inp_f0=None,
f0_min=50,
f0_max=1100,
):
"""
Получает F0 с использованием выбранного метода.
"""
global input_audio_path2wav
f0_mel_min = 1127 * np.log(1 + f0_min / 700)
f0_mel_max = 1127 * np.log(1 + f0_max / 700)
if f0_method == "mangio-crepe":
f0 = self.get_f0_crepe(x, f0_min, f0_max, p_len, int(hop_length))
elif f0_method == "rmvpe+":
params = {
"x": x,
"p_len": p_len,
"pitch": pitch,
"f0_min": f0_min,
"f0_max": f0_max,
"time_step": self.time_step,
"filter_radius": filter_radius,
"crepe_hop_length": int(hop_length),
"model": "full",
}
f0 = self.get_f0_rmvpe(**params)
elif f0_method == "fcpe":
self.model_fcpe = FCPEF0Predictor(
FCPE_DIR,
f0_min=int(f0_min),
f0_max=int(f0_max),
dtype=torch.float32,
device=self.device,
sample_rate=self.sample_rate,
threshold=0.03,
)
f0 = self.model_fcpe.compute_f0(x, p_len=p_len)
del self.model_fcpe
gc.collect()
f0 *= pow(2, pitch / 12)
tf0 = self.sample_rate // self.window
if inp_f0 is not None:
delta_t = np.round(
(inp_f0[:, 0].max() - inp_f0[:, 0].min()) * tf0 + 1
).astype("int16")
replace_f0 = np.interp(list(range(delta_t)), inp_f0[:, 0] * 100, inp_f0[:, 1])
shape = f0[self.x_pad * tf0 : self.x_pad * tf0 + len(replace_f0)].shape[0]
f0[self.x_pad * tf0 : self.x_pad * tf0 + len(replace_f0)] = replace_f0[:shape]
f0bak = f0.copy()
f0_mel = 1127 * np.log(1 + f0 / 700)
f0_mel[f0_mel > 0] = (f0_mel[f0_mel > 0] - f0_mel_min) * 254 / (
f0_mel_max - f0_mel_min
) + 1
f0_mel[f0_mel <= 1] = 1
f0_mel[f0_mel > 255] = 255
f0_coarse = np.rint(f0_mel).astype(int)
return f0_coarse, f0bak
def vc(
self,
model,
net_g,
sid,
audio0,
pitch,
pitchf,
index,
big_npy,
index_rate,
version,
protect,
):
"""
Преобразует аудио с использованием модели.
"""
feats = torch.from_numpy(audio0)
feats = feats.half() if self.is_half else feats.float()
if feats.dim() == 2:
feats = feats.mean(-1)
assert feats.dim() == 1, feats.dim()
feats = feats.view(1, -1)
padding_mask = torch.BoolTensor(feats.shape).to(self.device).fill_(False)
inputs = {
"source": feats.to(self.device),
"padding_mask": padding_mask,
"output_layer": 9 if version == "v1" else 12,
}
with torch.no_grad():
logits = model.extract_features(**inputs)
feats = model.final_proj(logits[0]) if version == "v1" else logits[0]
if protect < 0.5 and pitch is not None and pitchf is not None:
feats0 = feats.clone()
if index is not None and big_npy is not None and index_rate != 0:
npy = feats[0].cpu().numpy()
npy = npy.astype("float32") if self.is_half else npy
score, ix = index.search(npy, k=8)
weight = np.square(1 / score)
weight /= weight.sum(axis=1, keepdims=True)
npy = np.sum(big_npy[ix] * np.expand_dims(weight, axis=2), axis=1)
npy = npy.astype("float16") if self.is_half else npy
feats = (
torch.from_numpy(npy).unsqueeze(0).to(self.device) * index_rate
+ (1 - index_rate) * feats
)
feats = F.interpolate(feats.permute(0, 2, 1), scale_factor=2).permute(0, 2, 1)
if protect < 0.5 and pitch is not None and pitchf is not None:
feats0 = F.interpolate(feats0.permute(0, 2, 1), scale_factor=2).permute(
0, 2, 1
)
p_len = audio0.shape[0] // self.window
if feats.shape[1] < p_len:
p_len = feats.shape[1]
if pitch is not None and pitchf is not None:
pitch = pitch[:, :p_len]
pitchf = pitchf[:, :p_len]
if protect < 0.5 and pitch is not None and pitchf is not None:
pitchff = pitchf.clone()
pitchff[pitchf > 0] = 1
pitchff[pitchf < 1] = protect
pitchff = pitchff.unsqueeze(-1)
feats = feats * pitchff + feats0 * (1 - pitchff)
feats = feats.to(feats0.dtype)
p_len = torch.tensor([p_len], device=self.device).long()
with torch.no_grad():
if pitch is not None and pitchf is not None:
audio1 = (
(net_g.infer(feats, p_len, pitch, pitchf, sid)[0][0, 0])
.data.cpu()
.float()
.numpy()
)
else:
audio1 = (
(net_g.infer(feats, p_len, sid)[0][0, 0]).data.cpu().float().numpy()
)
del feats, p_len, padding_mask
if torch.cuda.is_available():
torch.cuda.empty_cache()
return audio1
def pipeline(
self,
model,
net_g,
sid,
audio,
input_audio_path,
pitch,
f0_method,
file_index,
index_rate,
pitch_guidance,
filter_radius,
tgt_sr,
resample_sr,
volume_envelope,
version,
protect,
hop_length,
f0_file,
f0_min=50,
f0_max=1100,
):
"""
Основной конвейер для преобразования аудио.
"""
if (
file_index is not None
and file_index != ""
and os.path.exists(file_index)
and index_rate != 0
):
try:
index = faiss.read_index(file_index)
big_npy = index.reconstruct_n(0, index.ntotal)
except Exception as e:
print(f"Произошла ошибка при чтении индекса FAISS: {e}")
index = big_npy = None
else:
index = big_npy = None
audio = signal.filtfilt(bh, ah, audio)
audio_pad = np.pad(audio, (self.window // 2, self.window // 2), mode="reflect")
opt_ts = []
if audio_pad.shape[0] > self.t_max:
audio_sum = np.zeros_like(audio)
for i in range(self.window):
audio_sum += audio_pad[i : i - self.window]
for t in range(self.t_center, audio.shape[0], self.t_center):
opt_ts.append(
t
- self.t_query
+ np.where(
np.abs(audio_sum[t - self.t_query : t + self.t_query])
== np.abs(audio_sum[t - self.t_query : t + self.t_query]).min()
)[0][0]
)
s = 0
audio_opt = []
t = None
audio_pad = np.pad(audio, (self.t_pad, self.t_pad), mode="reflect")
p_len = audio_pad.shape[0] // self.window
inp_f0 = None
if f0_file and hasattr(f0_file, "name"):
try:
with open(f0_file.name, "r") as f:
lines = f.read().strip("\\n").split("\\n")
inp_f0 = np.array(
[[float(i) for i in line.split(",")] for line in lines],
dtype="float32",
)
except Exception as e:
print(f"Произошла ошибка при чтении файла F0: {e}")
sid = torch.tensor(sid, device=self.device).unsqueeze(0).long()
if pitch_guidance:
pitch, pitchf = self.get_f0(
input_audio_path,
audio_pad,
p_len,
pitch,
f0_method,
filter_radius,
hop_length,
inp_f0,
f0_min,
f0_max,
)
pitch = pitch[:p_len]
pitchf = pitchf[:p_len]
if self.device == "mps":
pitchf = pitchf.astype(np.float32)
pitch = torch.tensor(pitch, device=self.device).unsqueeze(0).long()
pitchf = torch.tensor(pitchf, device=self.device).unsqueeze(0).float()
for t in opt_ts:
t = t // self.window * self.window
if pitch_guidance:
audio_opt.append(
self.vc(
model,
net_g,
sid,
audio_pad[s : t + self.t_pad2 + self.window],
pitch[:, s // self.window : (t + self.t_pad2) // self.window],
pitchf[:, s // self.window : (t + self.t_pad2) // self.window],
index,
big_npy,
index_rate,
version,
protect,
)[self.t_pad_tgt : -self.t_pad_tgt]
)
else:
audio_opt.append(
self.vc(
model,
net_g,
sid,
audio_pad[s : t + self.t_pad2 + self.window],
None,
None,
index,
big_npy,
index_rate,
version,
protect,
)[self.t_pad_tgt : -self.t_pad_tgt]
)
s = t
if pitch_guidance:
audio_opt.append(
self.vc(
model,
net_g,
sid,
audio_pad[t:],
pitch[:, t // self.window :] if t is not None else pitch,
pitchf[:, t // self.window :] if t is not None else pitchf,
index,
big_npy,
index_rate,
version,
protect,
)[self.t_pad_tgt : -self.t_pad_tgt]
)
else:
audio_opt.append(
self.vc(
model,
net_g,
sid,
audio_pad[t:],
None,
None,
index,
big_npy,
index_rate,
version,
protect,
)[self.t_pad_tgt : -self.t_pad_tgt]
)
audio_opt = np.concatenate(audio_opt)
if volume_envelope != 1:
audio_opt = AudioProcessor.change_rms(
audio, self.sample_rate, audio_opt, tgt_sr, volume_envelope
)
if resample_sr >= self.sample_rate and tgt_sr != resample_sr:
audio_opt = librosa.resample(audio_opt, orig_sr=tgt_sr, target_sr=resample_sr)
audio_max = np.abs(audio_opt).max() / 0.99
max_int16 = 32768
if audio_max > 1:
max_int16 /= audio_max
audio_opt = (audio_opt * max_int16).astype(np.int16)
del pitch, pitchf, sid
if torch.cuda.is_available():
torch.cuda.empty_cache()
return audio_opt
'''
for path, text in [[os.sep.join([current_dir, dirs[3], "infer.py"]), inference], [os.sep.join([current_dir, dirs[3], "pipeline.py"]), pipeline]]:
with open(path, 'w') as f:
f.write(text)
remove_center = '''
import numpy as np
from scipy import signal
def remove_center(input_array, samplerate, rdf=0.99999, window_size=2048, overlap=2, window_type="blackman", stereo_mode="stereo"):
# Validate input
# if input_array.ndim != 2 or input_array.shape[1] != 2:
# raise ValueError("Input must be a stereo array with shape (samples, 2)")
left = input_array[0]
right = input_array[1]
# mono = np.mean(input_array, axis=1)
# Adjust window size if input is too short
nperseg = min(window_size, len(left))
if nperseg < 16: # Minimum reasonable window size
nperseg = 16
if len(left) < 16:
# For very short inputs, just return the original with warning
import warnings
warnings.warn(f"Input too short ({len(left)} samples), returning original audio")
return left, right, left, right
noverlap = nperseg // overlap # Ensure noverlap < nperseg
if noverlap >= nperseg:
noverlap = nperseg - 1 # Ensure at least 1 sample difference
# Compute STFT
f, t, Z_left = signal.stft(left, fs=samplerate, nperseg=nperseg, noverlap=noverlap, window=window_type)
f, t, Z_right = signal.stft(right, fs=samplerate, nperseg=nperseg, noverlap=noverlap, window=window_type)
# f, t, Z_mono = signal.stft(mono, fs=samplerate, nperseg=nperseg, noverlap=noverlap, window=window_type)
if stereo_mode == "mono":
Z_common_left = np.minimum(np.abs(Z_left), np.abs(Z_right)) * np.exp(1j*np.angle(Z_mono))
Z_common_right = np.minimum(np.abs(Z_left), np.abs(Z_right)) * np.exp(1j*np.angle(Z_mono))
else:
Z_common_left = np.minimum(np.abs(Z_left), np.abs(Z_right)) * np.exp(1j*np.angle(Z_right))
Z_common_right = np.minimum(np.abs(Z_left), np.abs(Z_right)) * np.exp(1j*np.angle(Z_left))
reduction_factor = rdf
Z_new_left = Z_left - Z_common_left * reduction_factor
Z_new_right = Z_right - Z_common_right * reduction_factor
# Compute ISTFT
_, new_left = signal.istft(Z_new_left, fs=samplerate, nperseg=nperseg, noverlap=noverlap, window=window_type)
_, new_right = signal.istft(Z_new_right, fs=samplerate, nperseg=nperseg, noverlap=noverlap, window=window_type)
_, common_signal_left = signal.istft(Z_common_left, fs=samplerate, nperseg=nperseg, noverlap=noverlap, window=window_type)
_, common_signal_right = signal.istft(Z_common_right, fs=samplerate, nperseg=nperseg, noverlap=noverlap, window=window_type)
# Trim to original length
new_left = new_left[:len(left)]
new_right = new_right[:len(right)]
common_signal_left = common_signal_left[:len(left)]
common_signal_right = common_signal_right[:len(left)]
# Normalize
peak = np.max([np.abs(new_left).max(), np.abs(new_right).max()])
if peak > 1.0:
new_left = new_left / peak
new_right = new_right / peak
inverted_center_left = -common_signal_left
inverted_center_right = -common_signal_right
mixed_left = left + inverted_center_left
mixed_right = right + inverted_center_right
peak_mixed = np.max([np.abs(mixed_left).max(), np.abs(mixed_right).max()])
if peak_mixed > 1.0:
mixed_left = mixed_left / peak_mixed
mixed_right = mixed_right / peak_mixed
return common_signal_left, common_signal_right, new_left, new_right
'''
for path, text in [[os.sep.join([current_dir, dirs[11], "remove_center.py"]), remove_center]]:
with open(path, 'w') as f:
f.write(text)
lib_algorithm = {
"synthesizers" : ["synthesizers.py", '''
import torch
from torch import nn
from torch.nn.utils.weight_norm import remove_weight_norm
from typing import Optional
from .commons import slice_segments, rand_slice_segments
from .encoders import TextEncoder, PosteriorEncoder
from .generators import Generator
from .nsf import GeneratorNSF
from .residuals import ResidualCouplingBlock
class Synthesizer(nn.Module):
def __init__(
self,
spec_channels,
segment_size,
inter_channels,
hidden_channels,
filter_channels,
n_heads,
n_layers,
kernel_size,
p_dropout,
resblock,
resblock_kernel_sizes,
resblock_dilation_sizes,
upsample_rates,
upsample_initial_channel,
upsample_kernel_sizes,
spk_embed_dim,
gin_channels,
sr,
use_f0,
input_dim=768,
**kwargs
):
super(Synthesizer, self).__init__()
self.spec_channels = spec_channels
self.inter_channels = inter_channels
self.hidden_channels = hidden_channels
self.filter_channels = filter_channels
self.n_heads = n_heads
self.n_layers = n_layers
self.kernel_size = kernel_size
self.p_dropout = float(p_dropout)
self.resblock = resblock
self.resblock_kernel_sizes = resblock_kernel_sizes
self.resblock_dilation_sizes = resblock_dilation_sizes
self.upsample_rates = upsample_rates
self.upsample_initial_channel = upsample_initial_channel
self.upsample_kernel_sizes = upsample_kernel_sizes
self.segment_size = segment_size
self.gin_channels = gin_channels
self.spk_embed_dim = spk_embed_dim
self.use_f0 = use_f0
self.enc_p = TextEncoder(
inter_channels,
hidden_channels,
filter_channels,
n_heads,
n_layers,
kernel_size,
float(p_dropout),
input_dim,
f0=use_f0,
)
if use_f0:
self.dec = GeneratorNSF(
inter_channels,
resblock,
resblock_kernel_sizes,
resblock_dilation_sizes,
upsample_rates,
upsample_initial_channel,
upsample_kernel_sizes,
gin_channels=gin_channels,
sr=sr,
is_half=kwargs["is_half"],
)
else:
self.dec = Generator(
inter_channels,
resblock,
resblock_kernel_sizes,
resblock_dilation_sizes,
upsample_rates,
upsample_initial_channel,
upsample_kernel_sizes,
gin_channels=gin_channels,
)
self.enc_q = PosteriorEncoder(
spec_channels,
inter_channels,
hidden_channels,
5,
1,
16,
gin_channels=gin_channels,
)
self.flow = ResidualCouplingBlock(
inter_channels, hidden_channels, 5, 1, 3, gin_channels=gin_channels
)
self.emb_g = nn.Embedding(self.spk_embed_dim, gin_channels)
def remove_weight_norm(self):
self.dec.remove_weight_norm()
self.flow.remove_weight_norm()
self.enc_q.remove_weight_norm()
def __prepare_scriptable__(self):
for hook in self.dec._forward_pre_hooks.values():
if (
hook.__module__ == "torch.nn.utils.parametrizations.weight_norm"
and hook.__class__.__name__ == "_WeightNorm"
):
remove_weight_norm(self.dec)
for hook in self.flow._forward_pre_hooks.values():
if (
hook.__module__ == "torch.nn.utils.parametrizations.weight_norm"
and hook.__class__.__name__ == "_WeightNorm"
):
remove_weight_norm(self.flow)
if hasattr(self, "enc_q"):
for hook in self.enc_q._forward_pre_hooks.values():
if (
hook.__module__ == "torch.nn.utils.parametrizations.weight_norm"
and hook.__class__.__name__ == "_WeightNorm"
):
remove_weight_norm(self.enc_q)
return self
@torch.jit.ignore
def forward(
self,
phone: torch.Tensor,
phone_lengths: torch.Tensor,
pitch: Optional[torch.Tensor] = None,
pitchf: Optional[torch.Tensor] = None,
y: torch.Tensor = None,
y_lengths: torch.Tensor = None,
ds: Optional[torch.Tensor] = None,
):
g = self.emb_g(ds).unsqueeze(-1)
m_p, logs_p, x_mask = self.enc_p(phone, pitch, phone_lengths)
if y is not None:
z, m_q, logs_q, y_mask = self.enc_q(y, y_lengths, g=g)
z_p = self.flow(z, y_mask, g=g)
z_slice, ids_slice = rand_slice_segments(z, y_lengths, self.segment_size)
if self.use_f0:
pitchf = slice_segments(pitchf, ids_slice, self.segment_size, 2)
o = self.dec(z_slice, pitchf, g=g)
else:
o = self.dec(z_slice, g=g)
return o, ids_slice, x_mask, y_mask, (z, z_p, m_p, logs_p, m_q, logs_q)
else:
return None, None, x_mask, None, (None, None, m_p, logs_p, None, None)
@torch.jit.export
def infer(
self,
phone: torch.Tensor,
phone_lengths: torch.Tensor,
pitch: Optional[torch.Tensor] = None,
nsff0: Optional[torch.Tensor] = None,
sid: torch.Tensor = None,
rate: Optional[torch.Tensor] = None,
):
g = self.emb_g(sid).unsqueeze(-1)
m_p, logs_p, x_mask = self.enc_p(phone, pitch, phone_lengths)
z_p = (m_p + torch.exp(logs_p) * torch.randn_like(m_p) * 0.66666) * x_mask
if rate is not None:
assert isinstance(rate, torch.Tensor)
head = int(z_p.shape[2] * (1.0 - rate.item()))
z_p = z_p[:, :, head:]
x_mask = x_mask[:, :, head:]
if self.use_f0:
nsff0 = nsff0[:, head:]
if self.use_f0:
z = self.flow(z_p, x_mask, g=g, reverse=True)
o = self.dec(z * x_mask, nsff0, g=g)
else:
z = self.flow(z_p, x_mask, g=g, reverse=True)
o = self.dec(z * x_mask, g=g)
return o, x_mask, (z, z_p, m_p, logs_p)
'''],
"residuals" : ["residuals.py", '''
import torch
from torch import nn
from torch.nn import functional as F
from torch.nn.utils.weight_norm import remove_weight_norm
from torch.nn.utils.parametrizations import weight_norm
from typing import Optional
from .commons import get_padding, init_weights
from .modules import WaveNet
LRELU_SLOPE = 0.1
def create_conv1d_layer(channels, kernel_size, dilation):
return weight_norm(
nn.Conv1d(
channels,
channels,
kernel_size,
1,
dilation=dilation,
padding=get_padding(kernel_size, dilation),
)
)
def apply_mask(tensor, mask):
return tensor * mask if mask is not None else tensor
class ResBlockBase(nn.Module):
def __init__(self, channels, kernel_size, dilations):
super(ResBlockBase, self).__init__()
self.convs1 = nn.ModuleList(
[create_conv1d_layer(channels, kernel_size, d) for d in dilations]
)
self.convs1.apply(init_weights)
self.convs2 = nn.ModuleList(
[create_conv1d_layer(channels, kernel_size, 1) for _ in dilations]
)
self.convs2.apply(init_weights)
def forward(self, x, x_mask=None):
for c1, c2 in zip(self.convs1, self.convs2):
xt = F.leaky_relu(x, LRELU_SLOPE)
xt = apply_mask(xt, x_mask)
xt = F.leaky_relu(c1(xt), LRELU_SLOPE)
xt = apply_mask(xt, x_mask)
xt = c2(xt)
x = xt + x
return apply_mask(x, x_mask)
def remove_weight_norm(self):
for conv in self.convs1 + self.convs2:
remove_weight_norm(conv)
class ResBlock1(ResBlockBase):
def __init__(self, channels, kernel_size=3, dilation=(1, 3, 5)):
super(ResBlock1, self).__init__(channels, kernel_size, dilation)
class ResBlock2(ResBlockBase):
def __init__(self, channels, kernel_size=3, dilation=(1, 3)):
super(ResBlock2, self).__init__(channels, kernel_size, dilation)
class Log(nn.Module):
def forward(self, x, x_mask, reverse=False, **kwargs):
if not reverse:
y = torch.log(torch.clamp_min(x, 1e-5)) * x_mask
logdet = torch.sum(-y, [1, 2])
return y, logdet
else:
x = torch.exp(x) * x_mask
return x
class Flip(nn.Module):
def forward(self, x, *args, reverse=False, **kwargs):
x = torch.flip(x, [1])
if not reverse:
logdet = torch.zeros(x.size(0)).to(dtype=x.dtype, device=x.device)
return x, logdet
else:
return x
class ElementwiseAffine(nn.Module):
def __init__(self, channels):
super().__init__()
self.channels = channels
self.m = nn.Parameter(torch.zeros(channels, 1))
self.logs = nn.Parameter(torch.zeros(channels, 1))
def forward(self, x, x_mask, reverse=False, **kwargs):
if not reverse:
y = self.m + torch.exp(self.logs) * x
y = y * x_mask
logdet = torch.sum(self.logs * x_mask, [1, 2])
return y, logdet
else:
x = (x - self.m) * torch.exp(-self.logs) * x_mask
return x
class ResidualCouplingBlock(nn.Module):
def __init__(
self,
channels,
hidden_channels,
kernel_size,
dilation_rate,
n_layers,
n_flows=4,
gin_channels=0,
):
super(ResidualCouplingBlock, self).__init__()
self.channels = channels
self.hidden_channels = hidden_channels
self.kernel_size = kernel_size
self.dilation_rate = dilation_rate
self.n_layers = n_layers
self.n_flows = n_flows
self.gin_channels = gin_channels
self.flows = nn.ModuleList()
for i in range(n_flows):
self.flows.append(
ResidualCouplingLayer(
channels,
hidden_channels,
kernel_size,
dilation_rate,
n_layers,
gin_channels=gin_channels,
mean_only=True,
)
)
self.flows.append(Flip())
def forward(
self,
x: torch.Tensor,
x_mask: torch.Tensor,
g: Optional[torch.Tensor] = None,
reverse: bool = False,
):
if not reverse:
for flow in self.flows:
x, _ = flow(x, x_mask, g=g, reverse=reverse)
else:
for flow in reversed(self.flows):
x = flow.forward(x, x_mask, g=g, reverse=reverse)
return x
def remove_weight_norm(self):
for i in range(self.n_flows):
self.flows[i * 2].remove_weight_norm()
def __prepare_scriptable__(self):
for i in range(self.n_flows):
for hook in self.flows[i * 2]._forward_pre_hooks.values():
if (
hook.__module__ == "torch.nn.utils.parametrizations.weight_norm"
and hook.__class__.__name__ == "_WeightNorm"
):
remove_weight_norm(self.flows[i * 2])
return self
class ResidualCouplingLayer(nn.Module):
def __init__(
self,
channels,
hidden_channels,
kernel_size,
dilation_rate,
n_layers,
p_dropout=0,
gin_channels=0,
mean_only=False,
):
assert channels % 2 == 0, "channels should be divisible by 2"
super().__init__()
self.channels = channels
self.hidden_channels = hidden_channels
self.kernel_size = kernel_size
self.dilation_rate = dilation_rate
self.n_layers = n_layers
self.half_channels = channels // 2
self.mean_only = mean_only
self.pre = nn.Conv1d(self.half_channels, hidden_channels, 1)
self.enc = WaveNet(
hidden_channels,
kernel_size,
dilation_rate,
n_layers,
p_dropout=p_dropout,
gin_channels=gin_channels,
)
self.post = nn.Conv1d(hidden_channels, self.half_channels * (2 - mean_only), 1)
self.post.weight.data.zero_()
self.post.bias.data.zero_()
def forward(self, x, x_mask, g=None, reverse=False):
x0, x1 = torch.split(x, [self.half_channels] * 2, 1)
h = self.pre(x0) * x_mask
h = self.enc(h, x_mask, g=g)
stats = self.post(h) * x_mask
if not self.mean_only:
m, logs = torch.split(stats, [self.half_channels] * 2, 1)
else:
m = stats
logs = torch.zeros_like(m)
if not reverse:
x1 = m + x1 * torch.exp(logs) * x_mask
x = torch.cat([x0, x1], 1)
logdet = torch.sum(logs, [1, 2])
return x, logdet
else:
x1 = (x1 - m) * torch.exp(-logs) * x_mask
x = torch.cat([x0, x1], 1)
return x
def remove_weight_norm(self):
self.enc.remove_weight_norm()
'''],
"nsf" : ["nsf.py", '''
import math
import torch
from torch import nn
from torch.nn import functional as F
from torch.nn.utils.weight_norm import remove_weight_norm
from torch.nn.utils.parametrizations import weight_norm
from typing import Optional
from .commons import init_weights
from .generators import SineGen
from .residuals import LRELU_SLOPE, ResBlock1, ResBlock2
class SourceModuleHnNSF(nn.Module):
def __init__(
self,
sample_rate,
harmonic_num=0,
sine_amp=0.1,
add_noise_std=0.003,
voiced_threshod=0,
is_half=True,
):
super(SourceModuleHnNSF, self).__init__()
self.sine_amp = sine_amp
self.noise_std = add_noise_std
self.is_half = is_half
self.l_sin_gen = SineGen(
sample_rate, harmonic_num, sine_amp, add_noise_std, voiced_threshod
)
self.l_linear = nn.Linear(harmonic_num + 1, 1)
self.l_tanh = nn.Tanh()
def forward(self, x: torch.Tensor, upsample_factor: int = 1):
sine_wavs, uv, _ = self.l_sin_gen(x, upsample_factor)
sine_wavs = sine_wavs.to(dtype=self.l_linear.weight.dtype)
sine_merge = self.l_tanh(self.l_linear(sine_wavs))
return sine_merge, None, None
class GeneratorNSF(nn.Module):
def __init__(
self,
initial_channel,
resblock,
resblock_kernel_sizes,
resblock_dilation_sizes,
upsample_rates,
upsample_initial_channel,
upsample_kernel_sizes,
gin_channels,
sr,
is_half=False,
):
super(GeneratorNSF, self).__init__()
self.num_kernels = len(resblock_kernel_sizes)
self.num_upsamples = len(upsample_rates)
self.f0_upsamp = nn.Upsample(scale_factor=math.prod(upsample_rates))
self.m_source = SourceModuleHnNSF(sample_rate=sr, harmonic_num=0, is_half=is_half)
self.conv_pre = nn.Conv1d(
initial_channel, upsample_initial_channel, 7, 1, padding=3
)
resblock_cls = ResBlock1 if resblock == "1" else ResBlock2
self.ups = nn.ModuleList()
self.noise_convs = nn.ModuleList()
channels = [
upsample_initial_channel // (2 ** (i + 1)) for i in range(len(upsample_rates))
]
stride_f0s = [
math.prod(upsample_rates[i + 1 :]) if i + 1 < len(upsample_rates) else 1
for i in range(len(upsample_rates))
]
for i, (u, k) in enumerate(zip(upsample_rates, upsample_kernel_sizes)):
self.ups.append(
weight_norm(
nn.ConvTranspose1d(
upsample_initial_channel // (2**i),
channels[i],
k,
u,
padding=(k - u) // 2,
)
)
)
self.noise_convs.append(
nn.Conv1d(
1,
channels[i],
kernel_size=(stride_f0s[i] * 2 if stride_f0s[i] > 1 else 1),
stride=stride_f0s[i],
padding=(stride_f0s[i] // 2 if stride_f0s[i] > 1 else 0),
)
)
self.resblocks = nn.ModuleList(
[
resblock_cls(channels[i], k, d)
for i in range(len(self.ups))
for k, d in zip(resblock_kernel_sizes, resblock_dilation_sizes)
]
)
self.conv_post = nn.Conv1d(channels[-1], 1, 7, 1, padding=3, bias=False)
self.ups.apply(init_weights)
if gin_channels != 0:
self.cond = nn.Conv1d(gin_channels, upsample_initial_channel, 1)
self.upp = math.prod(upsample_rates)
self.lrelu_slope = LRELU_SLOPE
def forward(self, x, f0, g: Optional[torch.Tensor] = None):
har_source, _, _ = self.m_source(f0, self.upp)
har_source = har_source.transpose(1, 2)
x = self.conv_pre(x)
if g is not None:
x = x + self.cond(g)
for i, (ups, noise_convs) in enumerate(zip(self.ups, self.noise_convs)):
x = F.leaky_relu(x, self.lrelu_slope)
x = ups(x)
x = x + noise_convs(har_source)
xs = sum(
[
resblock(x)
for j, resblock in enumerate(self.resblocks)
if j in range(i * self.num_kernels, (i + 1) * self.num_kernels)
]
)
x = xs / self.num_kernels
x = F.leaky_relu(x)
x = torch.tanh(self.conv_post(x))
return x
def remove_weight_norm(self):
for l in self.ups:
remove_weight_norm(l)
for l in self.resblocks:
l.remove_weight_norm()
def __prepare_scriptable__(self):
for l in self.ups:
for hook in l._forward_pre_hooks.values():
if (
hook.__module__ == "torch.nn.utils.parametrizations.weight_norm"
and hook.__class__.__name__ == "_WeightNorm"
):
remove_weight_norm(l)
for l in self.resblocks:
for hook in l._forward_pre_hooks.values():
if (
hook.__module__ == "torch.nn.utils.parametrizations.weight_norm"
and hook.__class__.__name__ == "_WeightNorm"
):
remove_weight_norm(l)
return self
'''],
"normalization" : ["normalization.py", '''
import torch
from torch import nn
from torch.nn import functional as F
class LayerNorm(nn.Module):
def __init__(self, channels, eps=1e-5):
super().__init__()
self.eps = eps
self.gamma = nn.Parameter(torch.ones(channels))
self.beta = nn.Parameter(torch.zeros(channels))
def forward(self, x):
x = x.transpose(1, -1)
x = F.layer_norm(x, (x.size(-1),), self.gamma, self.beta, self.eps)
return x.transpose(1, -1)
'''],
"modules" : ["modules.py", '''
import torch
from torch import nn
from torch.nn.utils.weight_norm import remove_weight_norm
from torch.nn.utils.parametrizations import weight_norm
from .commons import fused_add_tanh_sigmoid_multiply
class WaveNet(nn.Module):
def __init__(
self,
hidden_channels,
kernel_size,
dilation_rate,
n_layers,
gin_channels=0,
p_dropout=0,
):
super(WaveNet, self).__init__()
assert kernel_size % 2 == 1
self.hidden_channels = hidden_channels
self.kernel_size = (kernel_size,)
self.dilation_rate = dilation_rate
self.n_layers = n_layers
self.gin_channels = gin_channels
self.p_dropout = p_dropout
self.in_layers = nn.ModuleList()
self.res_skip_layers = nn.ModuleList()
self.drop = nn.Dropout(p_dropout)
if gin_channels != 0:
cond_layer = nn.Conv1d(gin_channels, 2 * hidden_channels * n_layers, 1)
self.cond_layer = weight_norm(cond_layer, name="weight")
dilations = [dilation_rate**i for i in range(n_layers)]
paddings = [(kernel_size * d - d) // 2 for d in dilations]
for i in range(n_layers):
in_layer = nn.Conv1d(
hidden_channels,
2 * hidden_channels,
kernel_size,
dilation=dilations[i],
padding=paddings[i],
)
in_layer = weight_norm(in_layer, name="weight")
self.in_layers.append(in_layer)
res_skip_channels = (
hidden_channels if i == n_layers - 1 else 2 * hidden_channels
)
res_skip_layer = nn.Conv1d(hidden_channels, res_skip_channels, 1)
res_skip_layer = weight_norm(res_skip_layer, name="weight")
self.res_skip_layers.append(res_skip_layer)
def forward(self, x, x_mask, g=None, **kwargs):
output = torch.zeros_like(x)
n_channels_tensor = torch.IntTensor([self.hidden_channels])
if g is not None:
g = self.cond_layer(g)
for i in range(self.n_layers):
x_in = self.in_layers[i](x)
if g is not None:
cond_offset = i * 2 * self.hidden_channels
g_l = g[:, cond_offset : cond_offset + 2 * self.hidden_channels, :]
else:
g_l = torch.zeros_like(x_in)
acts = fused_add_tanh_sigmoid_multiply(x_in, g_l, n_channels_tensor)
acts = self.drop(acts)
res_skip_acts = self.res_skip_layers[i](acts)
if i < self.n_layers - 1:
res_acts = res_skip_acts[:, : self.hidden_channels, :]
x = (x + res_acts) * x_mask
output = output + res_skip_acts[:, self.hidden_channels :, :]
else:
output = output + res_skip_acts
return output * x_mask
def remove_weight_norm(self):
if self.gin_channels != 0:
remove_weight_norm(self.cond_layer)
for l in self.in_layers:
remove_weight_norm(l)
for l in self.res_skip_layers:
remove_weight_norm(l)
'''],
"generators" : ["generators.py", '''
import torch
from torch import nn
from torch.nn import functional as F
from torch.nn.utils.weight_norm import remove_weight_norm
from torch.nn.utils.parametrizations import weight_norm
from typing import Optional
from .commons import init_weights
from .residuals import LRELU_SLOPE, ResBlock1, ResBlock2
class Generator(nn.Module):
def __init__(
self,
initial_channel,
resblock,
resblock_kernel_sizes,
resblock_dilation_sizes,
upsample_rates,
upsample_initial_channel,
upsample_kernel_sizes,
gin_channels=0,
):
super(Generator, self).__init__()
self.num_kernels = len(resblock_kernel_sizes)
self.num_upsamples = len(upsample_rates)
self.conv_pre = nn.Conv1d(
initial_channel, upsample_initial_channel, 7, 1, padding=3
)
resblock = ResBlock1 if resblock == "1" else ResBlock2
self.ups_and_resblocks = nn.ModuleList()
for i, (u, k) in enumerate(zip(upsample_rates, upsample_kernel_sizes)):
self.ups_and_resblocks.append(
weight_norm(
nn.ConvTranspose1d(
upsample_initial_channel // (2**i),
upsample_initial_channel // (2 ** (i + 1)),
k,
u,
padding=(k - u) // 2,
)
)
)
ch = upsample_initial_channel // (2 ** (i + 1))
for j, (k, d) in enumerate(
zip(resblock_kernel_sizes, resblock_dilation_sizes)
):
self.ups_and_resblocks.append(resblock(ch, k, d))
self.conv_post = nn.Conv1d(ch, 1, 7, 1, padding=3, bias=False)
self.ups_and_resblocks.apply(init_weights)
if gin_channels != 0:
self.cond = nn.Conv1d(gin_channels, upsample_initial_channel, 1)
def forward(self, x: torch.Tensor, g: Optional[torch.Tensor] = None):
x = self.conv_pre(x)
if g is not None:
x = x + self.cond(g)
resblock_idx = 0
for _ in range(self.num_upsamples):
x = F.leaky_relu(x, LRELU_SLOPE)
x = self.ups_and_resblocks[resblock_idx](x)
resblock_idx += 1
xs = 0
for _ in range(self.num_kernels):
xs += self.ups_and_resblocks[resblock_idx](x)
resblock_idx += 1
x = xs / self.num_kernels
x = F.leaky_relu(x)
x = self.conv_post(x)
x = torch.tanh(x)
return x
def __prepare_scriptable__(self):
for l in self.ups_and_resblocks:
for hook in l._forward_pre_hooks.values():
if (
hook.__module__ == "torch.nn.utils.parametrizations.weight_norm"
and hook.__class__.__name__ == "_WeightNorm"
):
remove_weight_norm(l)
return self
def remove_weight_norm(self):
for l in self.ups_and_resblocks:
remove_weight_norm(l)
class SineGen(nn.Module):
def __init__(
self,
samp_rate,
harmonic_num=0,
sine_amp=0.1,
noise_std=0.003,
voiced_threshold=0,
flag_for_pulse=False,
):
super(SineGen, self).__init__()
self.sine_amp = sine_amp
self.noise_std = noise_std
self.harmonic_num = harmonic_num
self.dim = self.harmonic_num + 1
self.sample_rate = samp_rate
self.voiced_threshold = voiced_threshold
def _f02uv(self, f0):
uv = torch.ones_like(f0)
uv = uv * (f0 > self.voiced_threshold)
return uv
def forward(self, f0: torch.Tensor, upp: int):
with torch.no_grad():
f0 = f0[:, None].transpose(1, 2)
f0_buf = torch.zeros(f0.shape[0], f0.shape[1], self.dim, device=f0.device)
f0_buf[:, :, 0] = f0[:, :, 0]
f0_buf[:, :, 1:] = (
f0_buf[:, :, 0:1]
* torch.arange(2, self.harmonic_num + 2, device=f0.device)[None, None, :]
)
rad_values = (f0_buf / float(self.sample_rate)) % 1
rand_ini = torch.rand(f0_buf.shape[0], f0_buf.shape[2], device=f0_buf.device)
rand_ini[:, 0] = 0
rad_values[:, 0, :] = rad_values[:, 0, :] + rand_ini
tmp_over_one = torch.cumsum(rad_values, 1)
tmp_over_one *= upp
tmp_over_one = F.interpolate(
tmp_over_one.transpose(2, 1),
scale_factor=float(upp),
mode="linear",
align_corners=True,
).transpose(2, 1)
rad_values = F.interpolate(
rad_values.transpose(2, 1), scale_factor=float(upp), mode="nearest"
).transpose(2, 1)
tmp_over_one %= 1
tmp_over_one_idx = (tmp_over_one[:, 1:, :] - tmp_over_one[:, :-1, :]) < 0
cumsum_shift = torch.zeros_like(rad_values)
cumsum_shift[:, 1:, :] = tmp_over_one_idx * -1.0
sine_waves = torch.sin(
torch.cumsum(rad_values + cumsum_shift, dim=1) * 2 * torch.pi
)
sine_waves = sine_waves * self.sine_amp
uv = self._f02uv(f0)
uv = F.interpolate(
uv.transpose(2, 1), scale_factor=float(upp), mode="nearest"
).transpose(2, 1)
noise_amp = uv * self.noise_std + (1 - uv) * self.sine_amp / 3
noise = noise_amp * torch.randn_like(sine_waves)
sine_waves = sine_waves * uv + noise
return sine_waves, uv, noise
'''],
"encoders" : ["encoders.py", '''
import math
import torch
from torch import nn
from torch.nn.utils.weight_norm import remove_weight_norm
from typing import Optional
from .attentions import FFN, MultiHeadAttention
from .commons import sequence_mask
from .modules import WaveNet
from .normalization import LayerNorm
class Encoder(nn.Module):
def __init__(
self,
hidden_channels,
filter_channels,
n_heads,
n_layers,
kernel_size=1,
p_dropout=0.0,
window_size=10,
**kwargs
):
super().__init__()
self.hidden_channels = hidden_channels
self.filter_channels = filter_channels
self.n_heads = n_heads
self.n_layers = n_layers
self.kernel_size = kernel_size
self.p_dropout = p_dropout
self.window_size = window_size
self.drop = nn.Dropout(p_dropout)
self.attn_layers = nn.ModuleList()
self.norm_layers_1 = nn.ModuleList()
self.ffn_layers = nn.ModuleList()
self.norm_layers_2 = nn.ModuleList()
for i in range(self.n_layers):
self.attn_layers.append(
MultiHeadAttention(
hidden_channels,
hidden_channels,
n_heads,
p_dropout=p_dropout,
window_size=window_size,
)
)
self.norm_layers_1.append(LayerNorm(hidden_channels))
self.ffn_layers.append(
FFN(
hidden_channels,
hidden_channels,
filter_channels,
kernel_size,
p_dropout=p_dropout,
)
)
self.norm_layers_2.append(LayerNorm(hidden_channels))
def forward(self, x, x_mask):
attn_mask = x_mask.unsqueeze(2) * x_mask.unsqueeze(-1)
x = x * x_mask
for i in range(self.n_layers):
y = self.attn_layers[i](x, x, attn_mask)
y = self.drop(y)
x = self.norm_layers_1[i](x + y)
y = self.ffn_layers[i](x, x_mask)
y = self.drop(y)
x = self.norm_layers_2[i](x + y)
x = x * x_mask
return x
class TextEncoder(nn.Module):
def __init__(
self,
out_channels,
hidden_channels,
filter_channels,
n_heads,
n_layers,
kernel_size,
p_dropout,
embedding_dim,
f0=True,
):
super(TextEncoder, self).__init__()
self.out_channels = out_channels
self.hidden_channels = hidden_channels
self.filter_channels = filter_channels
self.n_heads = n_heads
self.n_layers = n_layers
self.kernel_size = kernel_size
self.p_dropout = float(p_dropout)
self.emb_phone = nn.Linear(embedding_dim, hidden_channels)
self.lrelu = nn.LeakyReLU(0.1, inplace=True)
if f0:
self.emb_pitch = nn.Embedding(256, hidden_channels)
self.encoder = Encoder(
hidden_channels,
filter_channels,
n_heads,
n_layers,
kernel_size,
float(p_dropout),
)
self.proj = nn.Conv1d(hidden_channels, out_channels * 2, 1)
def forward(
self, phone: torch.Tensor, pitch: Optional[torch.Tensor], lengths: torch.Tensor
):
if pitch is None:
x = self.emb_phone(phone)
else:
x = self.emb_phone(phone) + self.emb_pitch(pitch)
x = x * math.sqrt(self.hidden_channels)
x = self.lrelu(x)
x = torch.transpose(x, 1, -1)
x_mask = torch.unsqueeze(sequence_mask(lengths, x.size(2)), 1).to(x.dtype)
x = self.encoder(x * x_mask, x_mask)
stats = self.proj(x) * x_mask
m, logs = torch.split(stats, self.out_channels, dim=1)
return m, logs, x_mask
class PosteriorEncoder(nn.Module):
def __init__(
self,
in_channels,
out_channels,
hidden_channels,
kernel_size,
dilation_rate,
n_layers,
gin_channels=0,
):
super(PosteriorEncoder, self).__init__()
self.in_channels = in_channels
self.out_channels = out_channels
self.hidden_channels = hidden_channels
self.kernel_size = kernel_size
self.dilation_rate = dilation_rate
self.n_layers = n_layers
self.gin_channels = gin_channels
self.pre = nn.Conv1d(in_channels, hidden_channels, 1)
self.enc = WaveNet(
hidden_channels,
kernel_size,
dilation_rate,
n_layers,
gin_channels=gin_channels,
)
self.proj = nn.Conv1d(hidden_channels, out_channels * 2, 1)
def forward(
self, x: torch.Tensor, x_lengths: torch.Tensor, g: Optional[torch.Tensor] = None
):
x_mask = torch.unsqueeze(sequence_mask(x_lengths, x.size(2)), 1).to(x.dtype)
x = self.pre(x) * x_mask
x = self.enc(x, x_mask, g=g)
stats = self.proj(x) * x_mask
m, logs = torch.split(stats, self.out_channels, dim=1)
z = (m + torch.randn_like(m) * torch.exp(logs)) * x_mask
return z, m, logs, x_mask
def remove_weight_norm(self):
self.enc.remove_weight_norm()
def __prepare_scriptable__(self):
for hook in self.enc._forward_pre_hooks.values():
if (
hook.__module__ == "torch.nn.utils.parametrizations.weight_norm"
and hook.__class__.__name__ == "_WeightNorm"
):
remove_weight_norm(self.enc)
return self
'''],
"discriminators" : ["discriminators.py", '''
import torch
from torch import nn
from torch.nn import functional as F
from torch.nn.utils.parametrizations import spectral_norm, weight_norm
from .commons import get_padding
from .residuals import LRELU_SLOPE
PERIODS_V1 = [2, 3, 5, 7, 11, 17]
PERIODS_V2 = [2, 3, 5, 7, 11, 17, 23, 37]
IN_CHANNELS = [1, 32, 128, 512, 1024]
OUT_CHANNELS = [32, 128, 512, 1024, 1024]
class MultiPeriodDiscriminator(nn.Module):
def __init__(self, use_spectral_norm=False):
super(MultiPeriodDiscriminator, self).__init__()
self.discriminators = nn.ModuleList(
[DiscriminatorS(use_spectral_norm=use_spectral_norm)]
+ [DiscriminatorP(p, use_spectral_norm=use_spectral_norm) for p in PERIODS_V1]
)
def forward(self, y, y_hat):
y_d_rs, y_d_gs, fmap_rs, fmap_gs = [], [], [], []
for d in self.discriminators:
y_d_r, fmap_r = d(y)
y_d_g, fmap_g = d(y_hat)
y_d_rs.append(y_d_r)
y_d_gs.append(y_d_g)
fmap_rs.append(fmap_r)
fmap_gs.append(fmap_g)
return y_d_rs, y_d_gs, fmap_rs, fmap_gs
class MultiPeriodDiscriminatorV2(nn.Module):
def __init__(self, use_spectral_norm=False):
super(MultiPeriodDiscriminatorV2, self).__init__()
self.discriminators = nn.ModuleList(
[DiscriminatorS(use_spectral_norm=use_spectral_norm)]
+ [DiscriminatorP(p, use_spectral_norm=use_spectral_norm) for p in PERIODS_V2]
)
def forward(self, y, y_hat):
y_d_rs, y_d_gs, fmap_rs, fmap_gs = [], [], [], []
for d in self.discriminators:
y_d_r, fmap_r = d(y)
y_d_g, fmap_g = d(y_hat)
y_d_rs.append(y_d_r)
y_d_gs.append(y_d_g)
fmap_rs.append(fmap_r)
fmap_gs.append(fmap_g)
return y_d_rs, y_d_gs, fmap_rs, fmap_gs
class DiscriminatorS(nn.Module):
def __init__(self, use_spectral_norm=False):
super(DiscriminatorS, self).__init__()
norm_f = spectral_norm if use_spectral_norm else weight_norm
self.convs = nn.ModuleList(
[
norm_f(nn.Conv1d(1, 16, 15, 1, padding=7)),
norm_f(nn.Conv1d(16, 64, 41, 4, groups=4, padding=20)),
norm_f(nn.Conv1d(64, 256, 41, 4, groups=16, padding=20)),
norm_f(nn.Conv1d(256, 1024, 41, 4, groups=64, padding=20)),
norm_f(nn.Conv1d(1024, 1024, 41, 4, groups=256, padding=20)),
norm_f(nn.Conv1d(1024, 1024, 5, 1, padding=2)),
]
)
self.conv_post = norm_f(nn.Conv1d(1024, 1, 3, 1, padding=1))
self.lrelu = nn.LeakyReLU(LRELU_SLOPE)
def forward(self, x):
fmap = []
for conv in self.convs:
x = self.lrelu(conv(x))
fmap.append(x)
x = self.conv_post(x)
fmap.append(x)
x = torch.flatten(x, 1, -1)
return x, fmap
class DiscriminatorP(nn.Module):
def __init__(self, period, kernel_size=5, stride=3, use_spectral_norm=False):
super(DiscriminatorP, self).__init__()
self.period = period
norm_f = spectral_norm if use_spectral_norm else weight_norm
self.convs = nn.ModuleList(
[
norm_f(
nn.Conv2d(
in_ch,
out_ch,
(kernel_size, 1),
(stride, 1),
padding=(get_padding(kernel_size, 1), 0),
)
)
for in_ch, out_ch in zip(IN_CHANNELS, OUT_CHANNELS)
]
)
self.conv_post = norm_f(nn.Conv2d(1024, 1, (3, 1), 1, padding=(1, 0)))
self.lrelu = nn.LeakyReLU(LRELU_SLOPE)
def forward(self, x):
fmap = []
b, c, t = x.shape
if t % self.period != 0:
n_pad = self.period - (t % self.period)
x = F.pad(x, (0, n_pad), "reflect")
x = x.view(b, c, -1, self.period)
for conv in self.convs:
x = self.lrelu(conv(x))
fmap.append(x)
x = self.conv_post(x)
fmap.append(x)
x = torch.flatten(x, 1, -1)
return x, fmap
'''],
"commons" : ["commons.py", '''
import math
import torch
from torch.nn import functional as F
from typing import List, Optional
def init_weights(m, mean=0.0, std=0.01):
classname = m.__class__.__name__
if classname.find("Conv") != -1:
m.weight.data.normal_(mean, std)
def get_padding(kernel_size, dilation=1):
return int((kernel_size * dilation - dilation) / 2)
def convert_pad_shape(pad_shape):
l = pad_shape[::-1]
pad_shape = [item for sublist in l for item in sublist]
return pad_shape
def kl_divergence(m_p, logs_p, m_q, logs_q):
kl = (logs_q - logs_p) - 0.5
kl += 0.5 * (torch.exp(2.0 * logs_p) + ((m_p - m_q) ** 2)) * torch.exp(-2.0 * logs_q)
return kl
def slice_segments(
x: torch.Tensor, ids_str: torch.Tensor, segment_size: int = 4, dim: int = 2
):
if dim == 2:
ret = torch.zeros_like(x[:, :segment_size])
elif dim == 3:
ret = torch.zeros_like(x[:, :, :segment_size])
for i in range(x.size(0)):
idx_str = ids_str[i].item()
idx_end = idx_str + segment_size
if dim == 2:
ret[i] = x[i, idx_str:idx_end]
else:
ret[i] = x[i, :, idx_str:idx_end]
return ret
def rand_slice_segments(x, x_lengths=None, segment_size=4):
b, d, t = x.size()
if x_lengths is None:
x_lengths = t
ids_str_max = x_lengths - segment_size + 1
ids_str = (torch.rand([b]).to(device=x.device) * ids_str_max).to(dtype=torch.long)
ret = slice_segments(x, ids_str, segment_size, dim=3)
return ret, ids_str
def get_timing_signal_1d(length, channels, min_timescale=1.0, max_timescale=1.0e4):
position = torch.arange(length, dtype=torch.float)
num_timescales = channels // 2
log_timescale_increment = math.log(float(max_timescale) / float(min_timescale)) / (
num_timescales - 1
)
inv_timescales = min_timescale * torch.exp(
torch.arange(num_timescales, dtype=torch.float) * -log_timescale_increment
)
scaled_time = position.unsqueeze(0) * inv_timescales.unsqueeze(1)
signal = torch.cat([torch.sin(scaled_time), torch.cos(scaled_time)], 0)
signal = F.pad(signal, [0, 0, 0, channels % 2])
signal = signal.view(1, channels, length)
return signal
def subsequent_mask(length):
mask = torch.tril(torch.ones(length, length)).unsqueeze(0).unsqueeze(0)
return mask
@torch.jit.script
def fused_add_tanh_sigmoid_multiply(input_a, input_b, n_channels):
n_channels_int = n_channels[0]
in_act = input_a + input_b
t_act = torch.tanh(in_act[:, :n_channels_int, :])
s_act = torch.sigmoid(in_act[:, n_channels_int:, :])
acts = t_act * s_act
return acts
def sequence_mask(length: torch.Tensor, max_length: Optional[int] = None):
if max_length is None:
max_length = length.max()
x = torch.arange(max_length, dtype=length.dtype, device=length.device)
return x.unsqueeze(0) < length.unsqueeze(1)
def clip_grad_value(parameters, clip_value, norm_type=2):
if isinstance(parameters, torch.Tensor):
parameters = [parameters]
parameters = List(filter(lambda p: p.grad is not None, parameters))
norm_type = float(norm_type)
if clip_value is not None:
clip_value = float(clip_value)
total_norm = 0
for p in parameters:
param_norm = p.grad.data.norm(norm_type)
total_norm += param_norm.item() ** norm_type
if clip_value is not None:
p.grad.data.clamp_(min=-clip_value, max=clip_value)
total_norm = total_norm ** (1.0 / norm_type)
return total_norm
'''],
"attentions" : ["attentions.py", '''
import math
import torch
from torch import nn
from torch.nn import functional as F
from .commons import convert_pad_shape
class MultiHeadAttention(nn.Module):
def __init__(
self,
channels,
out_channels,
n_heads,
p_dropout=0.0,
window_size=None,
heads_share=True,
block_length=None,
proximal_bias=False,
proximal_init=False,
):
super().__init__()
assert channels % n_heads == 0
self.channels = channels
self.out_channels = out_channels
self.n_heads = n_heads
self.p_dropout = p_dropout
self.window_size = window_size
self.heads_share = heads_share
self.block_length = block_length
self.proximal_bias = proximal_bias
self.proximal_init = proximal_init
self.attn = None
self.k_channels = channels // n_heads
self.conv_q = nn.Conv1d(channels, channels, 1)
self.conv_k = nn.Conv1d(channels, channels, 1)
self.conv_v = nn.Conv1d(channels, channels, 1)
self.conv_o = nn.Conv1d(channels, out_channels, 1)
self.drop = nn.Dropout(p_dropout)
if window_size is not None:
n_heads_rel = 1 if heads_share else n_heads
rel_stddev = self.k_channels**-0.5
self.emb_rel_k = nn.Parameter(
torch.randn(n_heads_rel, window_size * 2 + 1, self.k_channels)
* rel_stddev
)
self.emb_rel_v = nn.Parameter(
torch.randn(n_heads_rel, window_size * 2 + 1, self.k_channels)
* rel_stddev
)
nn.init.xavier_uniform_(self.conv_q.weight)
nn.init.xavier_uniform_(self.conv_k.weight)
nn.init.xavier_uniform_(self.conv_v.weight)
if proximal_init:
with torch.no_grad():
self.conv_k.weight.copy_(self.conv_q.weight)
self.conv_k.bias.copy_(self.conv_q.bias)
def forward(self, x, c, attn_mask=None):
q = self.conv_q(x)
k = self.conv_k(c)
v = self.conv_v(c)
x, self.attn = self.attention(q, k, v, mask=attn_mask)
x = self.conv_o(x)
return x
def attention(self, query, key, value, mask=None):
b, d, t_s, t_t = (*key.size(), query.size(2))
query = query.view(b, self.n_heads, self.k_channels, t_t).transpose(2, 3)
key = key.view(b, self.n_heads, self.k_channels, t_s).transpose(2, 3)
value = value.view(b, self.n_heads, self.k_channels, t_s).transpose(2, 3)
scores = torch.matmul(query / math.sqrt(self.k_channels), key.transpose(-2, -1))
if self.window_size is not None:
assert t_s == t_t, "Relative attention is only available for self-attention."
key_relative_embeddings = self._get_relative_embeddings(self.emb_rel_k, t_s)
rel_logits = self._matmul_with_relative_keys(
query / math.sqrt(self.k_channels), key_relative_embeddings
)
scores_local = self._relative_position_to_absolute_position(rel_logits)
scores = scores + scores_local
if self.proximal_bias:
assert t_s == t_t, "Proximal bias is only available for self-attention."
scores = scores + self._attention_bias_proximal(t_s).to(
device=scores.device, dtype=scores.dtype
)
if mask is not None:
scores = scores.masked_fill(mask == 0, -1e4)
if self.block_length is not None:
assert t_s == t_t, "Local attention is only available for self-attention."
block_mask = (
torch.ones_like(scores)
.triu(-self.block_length)
.tril(self.block_length)
)
scores = scores.masked_fill(block_mask == 0, -1e4)
p_attn = F.softmax(scores, dim=-1)
p_attn = self.drop(p_attn)
output = torch.matmul(p_attn, value)
if self.window_size is not None:
relative_weights = self._absolute_position_to_relative_position(p_attn)
value_relative_embeddings = self._get_relative_embeddings(self.emb_rel_v, t_s)
output = output + self._matmul_with_relative_values(
relative_weights, value_relative_embeddings
)
output = output.transpose(2, 3).contiguous().view(b, d, t_t)
return output, p_attn
def _matmul_with_relative_values(self, x, y):
ret = torch.matmul(x, y.unsqueeze(0))
return ret
def _matmul_with_relative_keys(self, x, y):
ret = torch.matmul(x, y.unsqueeze(0).transpose(-2, -1))
return ret
def _get_relative_embeddings(self, relative_embeddings, length):
pad_length = max(length - (self.window_size + 1), 0)
slice_start_position = max((self.window_size + 1) - length, 0)
slice_end_position = slice_start_position + 2 * length - 1
if pad_length > 0:
padded_relative_embeddings = F.pad(
relative_embeddings,
convert_pad_shape([[0, 0], [pad_length, pad_length], [0, 0]]),
)
else:
padded_relative_embeddings = relative_embeddings
used_relative_embeddings = padded_relative_embeddings[
:, slice_start_position:slice_end_position
]
return used_relative_embeddings
def _relative_position_to_absolute_position(self, x):
batch, heads, length, _ = x.size()
x = F.pad(x, convert_pad_shape([[0, 0], [0, 0], [0, 0], [0, 1]]))
x_flat = x.view([batch, heads, length * 2 * length])
x_flat = F.pad(x_flat, convert_pad_shape([[0, 0], [0, 0], [0, length - 1]]))
x_final = x_flat.view([batch, heads, length + 1, 2 * length - 1])[
:, :, :length, length - 1 :
]
return x_final
def _absolute_position_to_relative_position(self, x):
batch, heads, length, _ = x.size()
x = F.pad(x, convert_pad_shape([[0, 0], [0, 0], [0, 0], [0, length - 1]]))
x_flat = x.view([batch, heads, length**2 + length * (length - 1)])
x_flat = F.pad(x_flat, convert_pad_shape([[0, 0], [0, 0], [length, 0]]))
x_final = x_flat.view([batch, heads, length, 2 * length])[:, :, :, 1:]
return x_final
def _attention_bias_proximal(self, length):
r = torch.arange(length, dtype=torch.float32)
diff = torch.unsqueeze(r, 0) - torch.unsqueeze(r, 1)
return torch.unsqueeze(torch.unsqueeze(-torch.log1p(torch.abs(diff)), 0), 0)
class FFN(nn.Module):
def __init__(
self,
in_channels,
out_channels,
filter_channels,
kernel_size,
p_dropout=0.0,
activation=None,
causal=False,
):
super().__init__()
self.in_channels = in_channels
self.out_channels = out_channels
self.filter_channels = filter_channels
self.kernel_size = kernel_size
self.p_dropout = p_dropout
self.activation = activation
self.causal = causal
if causal:
self.padding = self._causal_padding
else:
self.padding = self._same_padding
self.conv_1 = nn.Conv1d(in_channels, filter_channels, kernel_size)
self.conv_2 = nn.Conv1d(filter_channels, out_channels, kernel_size)
self.drop = nn.Dropout(p_dropout)
def forward(self, x, x_mask):
x = self.conv_1(self.padding(x * x_mask))
if self.activation == "gelu":
x = x * torch.sigmoid(1.702 * x)
else:
x = torch.relu(x)
x = self.drop(x)
x = self.conv_2(self.padding(x * x_mask))
return x * x_mask
def _causal_padding(self, x):
if self.kernel_size == 1:
return x
pad_l = self.kernel_size - 1
pad_r = 0
padding = [[0, 0], [0, 0], [pad_l, pad_r]]
x = F.pad(x, convert_pad_shape(padding))
return x
def _same_padding(self, x):
if self.kernel_size == 1:
return x
pad_l = (self.kernel_size - 1) // 2
pad_r = self.kernel_size // 2
padding = [[0, 0], [0, 0], [pad_l, pad_r]]
x = F.pad(x, convert_pad_shape(padding))
return x
'''],
"init" : ["__init__.py", '''
''']
}
with open(os.sep.join([current_dir, dirs[5], lib_algorithm["synthesizers"][0]]), 'w') as f:
f.write(lib_algorithm["synthesizers"][1])
with open(os.sep.join([current_dir, dirs[5], lib_algorithm["residuals"][0]]), 'w') as f:
f.write(lib_algorithm["residuals"][1])
with open(os.sep.join([current_dir, dirs[5], lib_algorithm["nsf"][0]]), 'w') as f:
f.write(lib_algorithm["nsf"][1])
with open(os.sep.join([current_dir, dirs[5], lib_algorithm["normalization"][0]]), 'w') as f:
f.write(lib_algorithm["normalization"][1])
with open(os.sep.join([current_dir, dirs[5], lib_algorithm["modules"][0]]), 'w') as f:
f.write(lib_algorithm["modules"][1])
with open(os.sep.join([current_dir, dirs[5], lib_algorithm["generators"][0]]), 'w') as f:
f.write(lib_algorithm["generators"][1])
with open(os.sep.join([current_dir, dirs[5], lib_algorithm["encoders"][0]]), 'w') as f:
f.write(lib_algorithm["encoders"][1])
with open(os.sep.join([current_dir, dirs[5], lib_algorithm["discriminators"][0]]), 'w') as f:
f.write(lib_algorithm["discriminators"][1])
with open(os.sep.join([current_dir, dirs[5], lib_algorithm["commons"][0]]), 'w') as f:
f.write(lib_algorithm["commons"][1])
with open(os.sep.join([current_dir, dirs[5], lib_algorithm["attentions"][0]]), 'w') as f:
f.write(lib_algorithm["attentions"][1])
with open(os.sep.join([current_dir, dirs[5], lib_algorithm["init"][0]]), 'w') as f:
f.write(lib_algorithm["init"][1])
RMVPE = '''
import torch
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
from librosa.filters import mel
from scipy.signal import get_window
from librosa.util import pad_center, tiny, normalize
def window_sumsquare(
window,
n_frames,
hop_length=200,
win_length=800,
n_fft=800,
dtype=np.float32,
norm=None,
):
if win_length is None:
win_length = n_fft
n = n_fft + hop_length * (n_frames - 1)
x = np.zeros(n, dtype=dtype)
win_sq = get_window(window, win_length, fftbins=True)
win_sq = normalize(win_sq, norm=norm) ** 2
win_sq = pad_center(win_sq, n_fft)
for i in range(n_frames):
sample = i * hop_length
x[sample : min(n, sample + n_fft)] += win_sq[: max(0, min(n_fft, n - sample))]
return x
class STFT(nn.Module):
def __init__(
self, filter_length=1024, hop_length=512, win_length=None, window="hann"
):
super(STFT, self).__init__()
self.filter_length = filter_length
self.hop_length = hop_length
self.win_length = win_length if win_length else filter_length
self.window = window
self.pad_amount = int(self.filter_length / 2)
scale = self.filter_length / self.hop_length
fourier_basis = np.fft.fft(np.eye(self.filter_length))
cutoff = int((self.filter_length / 2 + 1))
fourier_basis = np.vstack(
[np.real(fourier_basis[:cutoff, :]), np.imag(fourier_basis[:cutoff, :])]
)
forward_basis = torch.FloatTensor(fourier_basis[:, None, :])
inverse_basis = torch.FloatTensor(
np.linalg.pinv(scale * fourier_basis).T[:, None, :]
)
assert filter_length >= self.win_length
fft_window = get_window(window, self.win_length, fftbins=True)
fft_window = pad_center(fft_window, size=filter_length)
fft_window = torch.from_numpy(fft_window).float()
forward_basis *= fft_window
inverse_basis *= fft_window
self.register_buffer("forward_basis", forward_basis.float())
self.register_buffer("inverse_basis", inverse_basis.float())
def transform(self, input_data):
num_batches = input_data.shape[0]
num_samples = input_data.shape[-1]
input_data = input_data.view(num_batches, 1, num_samples)
input_data = F.pad(
input_data.unsqueeze(1),
(self.pad_amount, self.pad_amount, 0, 0, 0, 0),
mode="reflect",
).squeeze(1)
forward_transform = F.conv1d(
input_data, self.forward_basis, stride=self.hop_length, padding=0
)
cutoff = int((self.filter_length / 2) + 1)
real_part = forward_transform[:, :cutoff, :]
imag_part = forward_transform[:, cutoff:, :]
return torch.sqrt(real_part**2 + imag_part**2)
def inverse(self, magnitude, phase):
recombine_magnitude_phase = torch.cat(
[magnitude * torch.cos(phase), magnitude * torch.sin(phase)], dim=1
)
inverse_transform = F.conv_transpose1d(
recombine_magnitude_phase,
self.inverse_basis,
stride=self.hop_length,
padding=0,
)
if self.window is not None:
window_sum = window_sumsquare(
self.window,
magnitude.size(-1),
hop_length=self.hop_length,
win_length=self.win_length,
n_fft=self.filter_length,
dtype=np.float32,
)
approx_nonzero_indices = torch.from_numpy(
np.where(window_sum > tiny(window_sum))[0]
)
window_sum = torch.from_numpy(window_sum).to(inverse_transform.device)
inverse_transform[:, :, approx_nonzero_indices] /= window_sum[
approx_nonzero_indices
]
inverse_transform *= float(self.filter_length) / self.hop_length
inverse_transform = inverse_transform[..., self.pad_amount :]
inverse_transform = inverse_transform[..., : self.num_samples]
return inverse_transform.squeeze(1)
def forward(self, input_data):
self.magnitude, self.phase = self.transform(input_data)
return self.inverse(self.magnitude, self.phase)
class BiGRU(nn.Module):
def __init__(self, input_features, hidden_features, num_layers):
super(BiGRU, self).__init__()
self.gru = nn.GRU(
input_features,
hidden_features,
num_layers=num_layers,
batch_first=True,
bidirectional=True,
)
def forward(self, x):
return self.gru(x)[0]
class ConvBlockRes(nn.Module):
def __init__(self, in_channels, out_channels, momentum=0.01):
super(ConvBlockRes, self).__init__()
self.conv = nn.Sequential(
nn.Conv2d(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=(3, 3),
stride=(1, 1),
padding=(1, 1),
bias=False,
),
nn.BatchNorm2d(out_channels, momentum=momentum),
nn.ReLU(),
nn.Conv2d(
in_channels=out_channels,
out_channels=out_channels,
kernel_size=(3, 3),
stride=(1, 1),
padding=(1, 1),
bias=False,
),
nn.BatchNorm2d(out_channels, momentum=momentum),
nn.ReLU(),
)
self.shortcut = (
nn.Conv2d(in_channels, out_channels, (1, 1))
if in_channels != out_channels
else None
)
def forward(self, x):
out = self.conv(x)
if self.shortcut is not None:
x = self.shortcut(x)
return out + x
class ResEncoderBlock(nn.Module):
def __init__(self, in_channels, out_channels, kernel_size, n_blocks=1, momentum=0.01):
super(ResEncoderBlock, self).__init__()
self.conv = nn.ModuleList(
[
ConvBlockRes(
in_channels if i == 0 else out_channels, out_channels, momentum
)
for i in range(n_blocks)
]
)
self.pool = (
nn.AvgPool2d(kernel_size=kernel_size) if kernel_size is not None else None
)
def forward(self, x):
for conv in self.conv:
x = conv(x)
pooled = self.pool(x) if self.pool is not None else x
return pooled, x
class Encoder(nn.Module):
def __init__(
self,
in_channels,
in_size,
n_encoders,
kernel_size,
n_blocks,
out_channels=16,
momentum=0.01,
):
super(Encoder, self).__init__()
self.bn = nn.BatchNorm2d(in_channels, momentum=momentum)
self.layers = nn.ModuleList()
self.latent_channels = []
for _ in range(n_encoders):
self.layers.append(
ResEncoderBlock(
in_channels, out_channels, kernel_size, n_blocks, momentum=momentum
)
)
self.latent_channels.append([out_channels, in_size])
in_channels = out_channels
out_channels *= 2
in_size //= 2
self.out_size = in_size
self.out_channel = out_channels
def forward(self, x):
concat_tensors = []
x = self.bn(x)
for layer in self.layers:
x, pooled = layer(x)
concat_tensors.append(pooled)
return x, concat_tensors
class Intermediate(nn.Module):
def __init__(self, in_channels, out_channels, n_inters, n_blocks, momentum=0.01):
super(Intermediate, self).__init__()
self.layers = nn.ModuleList(
[
ResEncoderBlock(
in_channels if i == 0 else out_channels,
out_channels,
None,
n_blocks,
momentum,
)
for i in range(n_inters)
]
)
def forward(self, x):
for layer in self.layers:
_, x = layer(x)
return x
class ResDecoderBlock(nn.Module):
def __init__(self, in_channels, out_channels, stride, n_blocks=1, momentum=0.01):
super(ResDecoderBlock, self).__init__()
out_padding = (0, 1) if stride == (1, 2) else (1, 1)
self.conv1 = nn.Sequential(
nn.ConvTranspose2d(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=(3, 3),
stride=stride,
padding=(1, 1),
output_padding=out_padding,
bias=False,
),
nn.BatchNorm2d(out_channels, momentum=momentum),
nn.ReLU(),
)
self.conv2 = nn.ModuleList(
[
ConvBlockRes(
out_channels * 2 if i == 0 else out_channels, out_channels, momentum
)
for i in range(n_blocks)
]
)
def forward(self, x, concat_tensor):
x = self.conv1(x)
x = torch.cat((x, concat_tensor), dim=1)
for conv in self.conv2:
x = conv(x)
return x
class Decoder(nn.Module):
def __init__(self, in_channels, n_decoders, stride, n_blocks, momentum=0.01):
super(Decoder, self).__init__()
self.layers = nn.ModuleList()
for _ in range(n_decoders):
out_channels = in_channels // 2
self.layers.append(
ResDecoderBlock(in_channels, out_channels, stride, n_blocks, momentum)
)
in_channels = out_channels
def forward(self, x, concat_tensors):
for layer, concat_tensor in zip(self.layers, reversed(concat_tensors)):
x = layer(x, concat_tensor)
return x
class DeepUnet(nn.Module):
def __init__(
self,
kernel_size,
n_blocks,
en_de_layers=5,
inter_layers=4,
in_channels=1,
en_out_channels=16,
):
super(DeepUnet, self).__init__()
self.encoder = Encoder(
in_channels, 128, en_de_layers, kernel_size, n_blocks, en_out_channels
)
self.intermediate = Intermediate(
self.encoder.out_channel // 2,
self.encoder.out_channel,
inter_layers,
n_blocks,
)
self.decoder = Decoder(
self.encoder.out_channel, en_de_layers, kernel_size, n_blocks
)
def forward(self, x):
x, concat_tensors = self.encoder(x)
x = self.intermediate(x)
return self.decoder(x, concat_tensors)
class E2E(nn.Module):
def __init__(
self,
n_blocks,
n_gru,
kernel_size,
en_de_layers=5,
inter_layers=4,
in_channels=1,
en_out_channels=16,
):
super(E2E, self).__init__()
self.unet = DeepUnet(
kernel_size,
n_blocks,
en_de_layers,
inter_layers,
in_channels,
en_out_channels,
)
self.cnn = nn.Conv2d(en_out_channels, 3, (3, 3), padding=(1, 1))
if n_gru:
self.fc = nn.Sequential(
BiGRU(3 * 128, 256, n_gru),
nn.Linear(512, 360),
nn.Dropout(0.25),
nn.Sigmoid(),
)
else:
self.fc = nn.Sequential(
nn.Linear(3 * nn.N_MELS, nn.N_CLASS), nn.Dropout(0.25), nn.Sigmoid()
)
def forward(self, mel):
mel = mel.transpose(-1, -2).unsqueeze(1)
x = self.cnn(self.unet(mel)).transpose(1, 2).flatten(-2)
return self.fc(x)
class MelSpectrogram(nn.Module):
def __init__(
self,
is_half,
n_mel_channels,
sample_rate,
win_length,
hop_length,
n_fft=None,
mel_fmin=0,
mel_fmax=None,
clamp=1e-5,
):
super(MelSpectrogram, self).__init__()
n_fft = win_length if n_fft is None else n_fft
self.hann_window = {}
mel_basis = mel(
sr=sample_rate,
n_fft=n_fft,
n_mels=n_mel_channels,
fmin=mel_fmin,
fmax=mel_fmax,
htk=True,
)
self.register_buffer("mel_basis", torch.from_numpy(mel_basis).float())
self.n_fft = n_fft
self.hop_length = hop_length
self.win_length = win_length
self.sample_rate = sample_rate
self.n_mel_channels = n_mel_channels
self.clamp = clamp
self.is_half = is_half
def forward(self, audio, keyshift=0, speed=1, center=True):
factor = 2 ** (keyshift / 12)
n_fft_new = int(np.round(self.n_fft * factor))
win_length_new = int(np.round(self.win_length * factor))
hop_length_new = int(np.round(self.hop_length * speed))
keyshift_key = f"{keyshift}_{audio.device}"
if keyshift_key not in self.hann_window:
self.hann_window[keyshift_key] = torch.hann_window(win_length_new).to(
audio.device
)
if not hasattr(self, "stft"):
self.stft = STFT(
filter_length=n_fft_new,
hop_length=hop_length_new,
win_length=win_length_new,
window="hann",
).to(audio.device)
magnitude = self.stft.transform(audio)
if keyshift != 0:
size = self.n_fft // 2 + 1
resize = magnitude.size(1)
if resize < size:
magnitude = F.pad(magnitude, (0, 0, 0, size - resize))
magnitude = magnitude[:, :size, :] * self.win_length / win_length_new
mel_output = torch.matmul(self.mel_basis, magnitude)
if self.is_half:
mel_output = mel_output.half()
return torch.log(torch.clamp(mel_output, min=self.clamp))
class RMVPE0Predictor:
def __init__(self, model_path, is_half, device=None):
self.resample_kernel = {}
self.is_half = is_half
if device is None:
device = "cuda" if torch.cuda.is_available() else "cpu"
self.device = device
self.mel_extractor = MelSpectrogram(
is_half, 128, 16000, 1024, 160, None, 30, 8000
).to(device)
model = E2E(4, 1, (2, 2))
ckpt = torch.load(model_path, map_location="cpu", weights_only=True)
model.load_state_dict(ckpt)
model.eval()
if is_half:
model = model.half()
self.model = model.to(device)
self.cents_mapping = np.pad(20 * np.arange(360) + 1997.3794084376191, (4, 4))
def mel2hidden(self, mel):
with torch.no_grad():
n_frames = mel.shape[-1]
mel = mel.float()
padding = min(32 * ((n_frames - 1) // 32 + 1) - n_frames, n_frames)
mel = F.pad(mel, (0, padding), mode="reflect")
if self.is_half:
mel = mel.half()
hidden = self.model(mel)
return hidden[:, :n_frames]
def decode(self, hidden, thred=0.03):
cents_pred = self.to_local_average_cents(hidden, thred=thred)
f0 = 10 * (2 ** (cents_pred / 1200))
f0[f0 == 10] = 0
return f0
def infer_from_audio(self, audio, thred=0.03):
audio = torch.from_numpy(audio).float().to(self.device).unsqueeze(0)
mel = self.mel_extractor(audio, center=True)
hidden = self.mel2hidden(mel)
hidden = hidden.squeeze(0).cpu().numpy()
if self.is_half:
hidden = hidden.astype("float32")
return self.decode(hidden, thred=thred)
def infer_from_audio_with_pitch(self, audio, thred=0.03, f0_min=50, f0_max=1100):
audio = torch.from_numpy(audio).float().to(self.device).unsqueeze(0)
mel = self.mel_extractor(audio, center=True)
hidden = self.mel2hidden(mel)
hidden = hidden.squeeze(0).cpu().numpy()
if self.is_half:
hidden = hidden.astype("float32")
f0 = self.decode(hidden, thred=thred)
f0[(f0 < f0_min) | (f0 > f0_max)] = 0
return f0
def to_local_average_cents(self, salience, thred=0.05):
center = np.argmax(salience, axis=1)
salience = np.pad(salience, ((0, 0), (4, 4)))
center += 4
todo_salience = []
todo_cents_mapping = []
starts = center - 4
ends = center + 5
for idx in range(salience.shape[0]):
todo_salience.append(salience[:, starts[idx] : ends[idx]][idx])
todo_cents_mapping.append(self.cents_mapping[starts[idx] : ends[idx]])
todo_salience = np.array(todo_salience)
todo_cents_mapping = np.array(todo_cents_mapping)
product_sum = np.sum(todo_salience * todo_cents_mapping, 1)
weight_sum = np.sum(todo_salience, 1)
divided = product_sum / weight_sum
maxx = np.max(salience, axis=1)
divided[maxx <= thred] = 0
return divided
'''
with open(os.sep.join([current_dir, dirs[6], "RMVPE.py"]), 'w') as f:
f.write(RMVPE)
FCPE = '''
from typing import Union
import torch.nn.functional as F
import numpy as np
import torch
import torch.nn as nn
from torch.nn.utils.parametrizations import weight_norm
from torchaudio.transforms import Resample
import os
import librosa
import soundfile as sf
import torch.utils.data
from librosa.filters import mel as librosa_mel_fn
import math
from functools import partial
from einops import rearrange, repeat
from local_attention import LocalAttention
os.environ["LRU_CACHE_CAPACITY"] = "3"
def load_wav_to_torch(full_path, target_sr=None, return_empty_on_exception=False):
try:
data, sample_rate = sf.read(full_path, always_2d=True)
except Exception as error:
print(f"An error occurred loading {full_path}: {error}")
if return_empty_on_exception:
return [], sample_rate or target_sr or 48000
else:
raise
data = data[:, 0] if len(data.shape) > 1 else data
assert len(data) > 2
max_mag = (
-np.iinfo(data.dtype).min
if np.issubdtype(data.dtype, np.integer)
else max(np.amax(data), -np.amin(data))
)
max_mag = (
(2**31) + 1 if max_mag > (2**15) else ((2**15) + 1 if max_mag > 1.01 else 1.0)
)
data = torch.FloatTensor(data.astype(np.float32)) / max_mag
if (torch.isinf(data) | torch.isnan(data)).any() and return_empty_on_exception:
return [], sample_rate or target_sr or 48000
if target_sr is not None and sample_rate != target_sr:
data = torch.from_numpy(
librosa.core.resample(data.numpy(), orig_sr=sample_rate, target_sr=target_sr)
)
sample_rate = target_sr
return data, sample_rate
def dynamic_range_compression(x, C=1, clip_val=1e-5):
return np.log(np.clip(x, a_min=clip_val, a_max=None) * C)
def dynamic_range_decompression(x, C=1):
return np.exp(x) / C
def dynamic_range_compression_torch(x, C=1, clip_val=1e-5):
return torch.log(torch.clamp(x, min=clip_val) * C)
def dynamic_range_decompression_torch(x, C=1):
return torch.exp(x) / C
class STFT:
def __init__(
self,
sr=22050,
n_mels=80,
n_fft=1024,
win_size=1024,
hop_length=256,
fmin=20,
fmax=11025,
clip_val=1e-5,
):
self.target_sr = sr
self.n_mels = n_mels
self.n_fft = n_fft
self.win_size = win_size
self.hop_length = hop_length
self.fmin = fmin
self.fmax = fmax
self.clip_val = clip_val
self.mel_basis = {}
self.hann_window = {}
def get_mel(self, y, keyshift=0, speed=1, center=False, train=False):
sample_rate = self.target_sr
n_mels = self.n_mels
n_fft = self.n_fft
win_size = self.win_size
hop_length = self.hop_length
fmin = self.fmin
fmax = self.fmax
clip_val = self.clip_val
factor = 2 ** (keyshift / 12)
n_fft_new = int(np.round(n_fft * factor))
win_size_new = int(np.round(win_size * factor))
hop_length_new = int(np.round(hop_length * speed))
mel_basis = self.mel_basis if not train else {}
hann_window = self.hann_window if not train else {}
mel_basis_key = str(fmax) + "_" + str(y.device)
if mel_basis_key not in mel_basis:
mel = librosa_mel_fn(
sr=sample_rate, n_fft=n_fft, n_mels=n_mels, fmin=fmin, fmax=fmax
)
mel_basis[mel_basis_key] = torch.from_numpy(mel).float().to(y.device)
keyshift_key = str(keyshift) + "_" + str(y.device)
if keyshift_key not in hann_window:
hann_window[keyshift_key] = torch.hann_window(win_size_new).to(y.device)
pad_left = (win_size_new - hop_length_new) // 2
pad_right = max(
(win_size_new - hop_length_new + 1) // 2,
win_size_new - y.size(-1) - pad_left,
)
mode = "reflect" if pad_right < y.size(-1) else "constant"
y = torch.nn.functional.pad(y.unsqueeze(1), (pad_left, pad_right), mode=mode)
y = y.squeeze(1)
spec = torch.stft(
y,
n_fft_new,
hop_length=hop_length_new,
win_length=win_size_new,
window=hann_window[keyshift_key],
center=center,
pad_mode="reflect",
normalized=False,
onesided=True,
return_complex=True,
)
spec = torch.sqrt(spec.real.pow(2) + spec.imag.pow(2) + (1e-9))
if keyshift != 0:
size = n_fft // 2 + 1
resize = spec.size(1)
spec = (
F.pad(spec, (0, 0, 0, size - resize))
if resize < size
else spec[:, :size, :]
)
spec = spec * win_size / win_size_new
spec = torch.matmul(mel_basis[mel_basis_key], spec)
spec = dynamic_range_compression_torch(spec, clip_val=clip_val)
return spec
def __call__(self, audiopath):
audio, sr = load_wav_to_torch(audiopath, target_sr=self.target_sr)
spect = self.get_mel(audio.unsqueeze(0)).squeeze(0)
return spect
stft = STFT()
def softmax_kernel(
data, *, projection_matrix, is_query, normalize_data=True, eps=1e-4, device=None
):
b, h, *_ = data.shape
data_normalizer = (data.shape[-1] ** -0.25) if normalize_data else 1.0
ratio = projection_matrix.shape[0] ** -0.5
projection = repeat(projection_matrix, "j d -> b h j d", b=b, h=h)
projection = projection.type_as(data)
data_dash = torch.einsum("...id,...jd->...ij", (data_normalizer * data), projection)
diag_data = data**2
diag_data = torch.sum(diag_data, dim=-1)
diag_data = (diag_data / 2.0) * (data_normalizer**2)
diag_data = diag_data.unsqueeze(dim=-1)
if is_query:
data_dash = ratio * (
torch.exp(
data_dash - diag_data - torch.max(data_dash, dim=-1, keepdim=True).values
)
+ eps
)
else:
data_dash = ratio * (torch.exp(data_dash - diag_data + eps))
return data_dash.type_as(data)
def orthogonal_matrix_chunk(cols, qr_uniform_q=False, device=None):
unstructured_block = torch.randn((cols, cols), device=device)
q, r = torch.linalg.qr(unstructured_block.cpu(), mode="reduced")
q, r = map(lambda t: t.to(device), (q, r))
if qr_uniform_q:
d = torch.diag(r, 0)
q *= d.sign()
return q.t()
def exists(val):
return val is not None
def empty(tensor):
return tensor.numel() == 0
def default(val, d):
return val if exists(val) else d
def cast_tuple(val):
return (val,) if not isinstance(val, tuple) else val
class PCmer(nn.Module):
def __init__(
self,
num_layers,
num_heads,
dim_model,
dim_keys,
dim_values,
residual_dropout,
attention_dropout,
):
super().__init__()
self.num_layers = num_layers
self.num_heads = num_heads
self.dim_model = dim_model
self.dim_values = dim_values
self.dim_keys = dim_keys
self.residual_dropout = residual_dropout
self.attention_dropout = attention_dropout
self._layers = nn.ModuleList([_EncoderLayer(self) for _ in range(num_layers)])
def forward(self, phone, mask=None):
for layer in self._layers:
phone = layer(phone, mask)
return phone
class _EncoderLayer(nn.Module):
def __init__(self, parent: PCmer):
super().__init__()
self.conformer = ConformerConvModule(parent.dim_model)
self.norm = nn.LayerNorm(parent.dim_model)
self.dropout = nn.Dropout(parent.residual_dropout)
self.attn = SelfAttention(
dim=parent.dim_model, heads=parent.num_heads, causal=False
)
def forward(self, phone, mask=None):
phone = phone + (self.attn(self.norm(phone), mask=mask))
phone = phone + (self.conformer(phone))
return phone
def calc_same_padding(kernel_size):
pad = kernel_size // 2
return (pad, pad - (kernel_size + 1) % 2)
class Swish(nn.Module):
def forward(self, x):
return x * x.sigmoid()
class Transpose(nn.Module):
def __init__(self, dims):
super().__init__()
assert len(dims) == 2, "dims must be a tuple of two dimensions"
self.dims = dims
def forward(self, x):
return x.transpose(*self.dims)
class GLU(nn.Module):
def __init__(self, dim):
super().__init__()
self.dim = dim
def forward(self, x):
out, gate = x.chunk(2, dim=self.dim)
return out * gate.sigmoid()
class DepthWiseConv1d(nn.Module):
def __init__(self, chan_in, chan_out, kernel_size, padding):
super().__init__()
self.padding = padding
self.conv = nn.Conv1d(chan_in, chan_out, kernel_size, groups=chan_in)
def forward(self, x):
x = F.pad(x, self.padding)
return self.conv(x)
class ConformerConvModule(nn.Module):
def __init__(
self, dim, causal=False, expansion_factor=2, kernel_size=31, dropout=0.0
):
super().__init__()
inner_dim = dim * expansion_factor
padding = calc_same_padding(kernel_size) if not causal else (kernel_size - 1, 0)
self.net = nn.Sequential(
nn.LayerNorm(dim),
Transpose((1, 2)),
nn.Conv1d(dim, inner_dim * 2, 1),
GLU(dim=1),
DepthWiseConv1d(
inner_dim, inner_dim, kernel_size=kernel_size, padding=padding
),
Swish(),
nn.Conv1d(inner_dim, dim, 1),
Transpose((1, 2)),
nn.Dropout(dropout),
)
def forward(self, x):
return self.net(x)
def linear_attention(q, k, v):
if v is None:
out = torch.einsum("...ed,...nd->...ne", k, q)
return out
else:
k_cumsum = k.sum(dim=-2)
D_inv = 1.0 / (torch.einsum("...nd,...d->...n", q, k_cumsum.type_as(q)) + 1e-8)
context = torch.einsum("...nd,...ne->...de", k, v)
out = torch.einsum("...de,...nd,...n->...ne", context, q, D_inv)
return out
def gaussian_orthogonal_random_matrix(
nb_rows, nb_columns, scaling=0, qr_uniform_q=False, device=None
):
nb_full_blocks = int(nb_rows / nb_columns)
block_list = []
for _ in range(nb_full_blocks):
q = orthogonal_matrix_chunk(nb_columns, qr_uniform_q=qr_uniform_q, device=device)
block_list.append(q)
remaining_rows = nb_rows - nb_full_blocks * nb_columns
if remaining_rows > 0:
q = orthogonal_matrix_chunk(nb_columns, qr_uniform_q=qr_uniform_q, device=device)
block_list.append(q[:remaining_rows])
final_matrix = torch.cat(block_list)
if scaling == 0:
multiplier = torch.randn((nb_rows, nb_columns), device=device).norm(dim=1)
elif scaling == 1:
multiplier = math.sqrt((float(nb_columns))) * torch.ones(
(nb_rows,), device=device
)
else:
raise ValueError(f"Invalid scaling {scaling}")
return torch.diag(multiplier) @ final_matrix
class FastAttention(nn.Module):
def __init__(
self,
dim_heads,
nb_features=None,
ortho_scaling=0,
causal=False,
generalized_attention=False,
kernel_fn=nn.ReLU(),
qr_uniform_q=False,
no_projection=False,
):
super().__init__()
nb_features = default(nb_features, int(dim_heads * math.log(dim_heads)))
self.dim_heads = dim_heads
self.nb_features = nb_features
self.ortho_scaling = ortho_scaling
self.create_projection = partial(
gaussian_orthogonal_random_matrix,
nb_rows=self.nb_features,
nb_columns=dim_heads,
scaling=ortho_scaling,
qr_uniform_q=qr_uniform_q,
)
projection_matrix = self.create_projection()
self.register_buffer("projection_matrix", projection_matrix)
self.generalized_attention = generalized_attention
self.kernel_fn = kernel_fn
self.no_projection = no_projection
self.causal = causal
@torch.no_grad()
def redraw_projection_matrix(self):
projections = self.create_projection()
self.projection_matrix.copy_(projections)
del projections
def forward(self, q, k, v):
device = q.device
if self.no_projection:
q = q.softmax(dim=-1)
k = torch.exp(k) if self.causal else k.softmax(dim=-2)
else:
create_kernel = partial(
softmax_kernel, projection_matrix=self.projection_matrix, device=device
)
q = create_kernel(q, is_query=True)
k = create_kernel(k, is_query=False)
attn_fn = linear_attention if not self.causal else self.causal_linear_fn
if v is None:
out = attn_fn(q, k, None)
return out
else:
out = attn_fn(q, k, v)
return out
class SelfAttention(nn.Module):
def __init__(
self,
dim,
causal=False,
heads=8,
dim_head=64,
local_heads=0,
local_window_size=256,
nb_features=None,
feature_redraw_interval=1000,
generalized_attention=False,
kernel_fn=nn.ReLU(),
qr_uniform_q=False,
dropout=0.0,
no_projection=False,
):
super().__init__()
assert dim % heads == 0, "dimension must be divisible by number of heads"
dim_head = default(dim_head, dim // heads)
inner_dim = dim_head * heads
self.fast_attention = FastAttention(
dim_head,
nb_features,
causal=causal,
generalized_attention=generalized_attention,
kernel_fn=kernel_fn,
qr_uniform_q=qr_uniform_q,
no_projection=no_projection,
)
self.heads = heads
self.global_heads = heads - local_heads
self.local_attn = (
LocalAttention(
window_size=local_window_size,
causal=causal,
autopad=True,
dropout=dropout,
look_forward=int(not causal),
rel_pos_emb_config=(dim_head, local_heads),
)
if local_heads > 0
else None
)
self.to_q = nn.Linear(dim, inner_dim)
self.to_k = nn.Linear(dim, inner_dim)
self.to_v = nn.Linear(dim, inner_dim)
self.to_out = nn.Linear(inner_dim, dim)
self.dropout = nn.Dropout(dropout)
@torch.no_grad()
def redraw_projection_matrix(self):
self.fast_attention.redraw_projection_matrix()
def forward(
self,
x,
context=None,
mask=None,
context_mask=None,
name=None,
inference=False,
**kwargs,
):
_, _, _, h, gh = *x.shape, self.heads, self.global_heads
cross_attend = exists(context)
context = default(context, x)
context_mask = default(context_mask, mask) if not cross_attend else context_mask
q, k, v = self.to_q(x), self.to_k(context), self.to_v(context)
q, k, v = map(lambda t: rearrange(t, "b n (h d) -> b h n d", h=h), (q, k, v))
(q, lq), (k, lk), (v, lv) = map(lambda t: (t[:, :gh], t[:, gh:]), (q, k, v))
attn_outs = []
if not empty(q):
if exists(context_mask):
global_mask = context_mask[:, None, :, None]
v.masked_fill_(~global_mask, 0.0)
if cross_attend:
pass
else:
out = self.fast_attention(q, k, v)
attn_outs.append(out)
if not empty(lq):
assert (
not cross_attend
), "local attention is not compatible with cross attention"
out = self.local_attn(lq, lk, lv, input_mask=mask)
attn_outs.append(out)
out = torch.cat(attn_outs, dim=1)
out = rearrange(out, "b h n d -> b n (h d)")
out = self.to_out(out)
return self.dropout(out)
def l2_regularization(model, l2_alpha):
l2_loss = []
for module in model.modules():
if type(module) is nn.Conv2d:
l2_loss.append((module.weight**2).sum() / 2.0)
return l2_alpha * sum(l2_loss)
class FCPE(nn.Module):
def __init__(
self,
input_channel=128,
out_dims=360,
n_layers=12,
n_chans=512,
use_siren=False,
use_full=False,
loss_mse_scale=10,
loss_l2_regularization=False,
loss_l2_regularization_scale=1,
loss_grad1_mse=False,
loss_grad1_mse_scale=1,
f0_max=1975.5,
f0_min=32.70,
confidence=False,
threshold=0.05,
use_input_conv=True,
):
super().__init__()
if use_siren is True:
raise ValueError("Siren is not supported yet.")
if use_full is True:
raise ValueError("Full model is not supported yet.")
self.loss_mse_scale = loss_mse_scale if (loss_mse_scale is not None) else 10
self.loss_l2_regularization = (
loss_l2_regularization if (loss_l2_regularization is not None) else False
)
self.loss_l2_regularization_scale = (
loss_l2_regularization_scale
if (loss_l2_regularization_scale is not None)
else 1
)
self.loss_grad1_mse = loss_grad1_mse if (loss_grad1_mse is not None) else False
self.loss_grad1_mse_scale = (
loss_grad1_mse_scale if (loss_grad1_mse_scale is not None) else 1
)
self.f0_max = f0_max if (f0_max is not None) else 1975.5
self.f0_min = f0_min if (f0_min is not None) else 32.70
self.confidence = confidence if (confidence is not None) else False
self.threshold = threshold if (threshold is not None) else 0.05
self.use_input_conv = use_input_conv if (use_input_conv is not None) else True
self.cent_table_b = torch.Tensor(
np.linspace(
self.f0_to_cent(torch.Tensor([f0_min]))[0],
self.f0_to_cent(torch.Tensor([f0_max]))[0],
out_dims,
)
)
self.register_buffer("cent_table", self.cent_table_b)
_leaky = nn.LeakyReLU()
self.stack = nn.Sequential(
nn.Conv1d(input_channel, n_chans, 3, 1, 1),
nn.GroupNorm(4, n_chans),
_leaky,
nn.Conv1d(n_chans, n_chans, 3, 1, 1),
)
self.decoder = PCmer(
num_layers=n_layers,
num_heads=8,
dim_model=n_chans,
dim_keys=n_chans,
dim_values=n_chans,
residual_dropout=0.1,
attention_dropout=0.1,
)
self.norm = nn.LayerNorm(n_chans)
self.n_out = out_dims
self.dense_out = weight_norm(nn.Linear(n_chans, self.n_out))
def forward(
self, mel, infer=True, gt_f0=None, return_hz_f0=False, cdecoder="local_argmax"
):
if cdecoder == "argmax":
self.cdecoder = self.cents_decoder
elif cdecoder == "local_argmax":
self.cdecoder = self.cents_local_decoder
x = (
self.stack(mel.transpose(1, 2)).transpose(1, 2)
if self.use_input_conv
else mel
)
x = self.decoder(x)
x = self.norm(x)
x = self.dense_out(x)
x = torch.sigmoid(x)
if not infer:
gt_cent_f0 = self.f0_to_cent(gt_f0)
gt_cent_f0 = self.gaussian_blurred_cent(gt_cent_f0)
loss_all = self.loss_mse_scale * F.binary_cross_entropy(x, gt_cent_f0)
if self.loss_l2_regularization:
loss_all = loss_all + l2_regularization(
model=self, l2_alpha=self.loss_l2_regularization_scale
)
x = loss_all
if infer:
x = self.cdecoder(x)
x = self.cent_to_f0(x)
x = (1 + x / 700).log() if not return_hz_f0 else x
return x
def cents_decoder(self, y, mask=True):
B, N, _ = y.size()
ci = self.cent_table[None, None, :].expand(B, N, -1)
rtn = torch.sum(ci * y, dim=-1, keepdim=True) / torch.sum(y, dim=-1, keepdim=True)
if mask:
confident = torch.max(y, dim=-1, keepdim=True)[0]
confident_mask = torch.ones_like(confident)
confident_mask[confident <= self.threshold] = float("-INF")
rtn = rtn * confident_mask
return (rtn, confident) if self.confidence else rtn
def cents_local_decoder(self, y, mask=True):
B, N, _ = y.size()
ci = self.cent_table[None, None, :].expand(B, N, -1)
confident, max_index = torch.max(y, dim=-1, keepdim=True)
local_argmax_index = torch.arange(0, 9).to(max_index.device) + (max_index - 4)
local_argmax_index = torch.clamp(local_argmax_index, 0, self.n_out - 1)
ci_l = torch.gather(ci, -1, local_argmax_index)
y_l = torch.gather(y, -1, local_argmax_index)
rtn = torch.sum(ci_l * y_l, dim=-1, keepdim=True) / torch.sum(
y_l, dim=-1, keepdim=True
)
if mask:
confident_mask = torch.ones_like(confident)
confident_mask[confident <= self.threshold] = float("-INF")
rtn = rtn * confident_mask
return (rtn, confident) if self.confidence else rtn
def cent_to_f0(self, cent):
return 10.0 * 2 ** (cent / 1200.0)
def f0_to_cent(self, f0):
return 1200.0 * torch.log2(f0 / 10.0)
def gaussian_blurred_cent(self, cents):
mask = (cents > 0.1) & (cents < (1200.0 * np.log2(self.f0_max / 10.0)))
B, N, _ = cents.size()
ci = self.cent_table[None, None, :].expand(B, N, -1)
return torch.exp(-torch.square(ci - cents) / 1250) * mask.float()
class FCPEInfer:
def __init__(self, model_path, device=None, dtype=torch.float32):
if device is None:
device = "cuda" if torch.cuda.is_available() else "cpu"
self.device = device
ckpt = torch.load(model_path, map_location=torch.device(self.device))
self.args = DotDict(ckpt["config"])
self.dtype = dtype
model = FCPE(
input_channel=self.args.model.input_channel,
out_dims=self.args.model.out_dims,
n_layers=self.args.model.n_layers,
n_chans=self.args.model.n_chans,
use_siren=self.args.model.use_siren,
use_full=self.args.model.use_full,
loss_mse_scale=self.args.loss.loss_mse_scale,
loss_l2_regularization=self.args.loss.loss_l2_regularization,
loss_l2_regularization_scale=self.args.loss.loss_l2_regularization_scale,
loss_grad1_mse=self.args.loss.loss_grad1_mse,
loss_grad1_mse_scale=self.args.loss.loss_grad1_mse_scale,
f0_max=self.args.model.f0_max,
f0_min=self.args.model.f0_min,
confidence=self.args.model.confidence,
)
model.to(self.device).to(self.dtype)
model.load_state_dict(ckpt["model"])
model.eval()
self.model = model
self.wav2mel = Wav2Mel(self.args, dtype=self.dtype, device=self.device)
@torch.no_grad()
def __call__(self, audio, sr, threshold=0.05):
self.model.threshold = threshold
audio = audio[None, :]
mel = self.wav2mel(audio=audio, sample_rate=sr).to(self.dtype)
f0 = self.model(mel=mel, infer=True, return_hz_f0=True)
return f0
class Wav2Mel:
def __init__(self, args, device=None, dtype=torch.float32):
self.sample_rate = args.mel.sampling_rate
self.hop_size = args.mel.hop_size
if device is None:
device = "cuda" if torch.cuda.is_available() else "cpu"
self.device = device
self.dtype = dtype
self.stft = STFT(
args.mel.sampling_rate,
args.mel.num_mels,
args.mel.n_fft,
args.mel.win_size,
args.mel.hop_size,
args.mel.fmin,
args.mel.fmax,
)
self.resample_kernel = {}
def extract_nvstft(self, audio, keyshift=0, train=False):
mel = self.stft.get_mel(audio, keyshift=keyshift, train=train).transpose(1, 2)
return mel
def extract_mel(self, audio, sample_rate, keyshift=0, train=False):
audio = audio.to(self.dtype).to(self.device)
if sample_rate == self.sample_rate:
audio_res = audio
else:
key_str = str(sample_rate)
if key_str not in self.resample_kernel:
self.resample_kernel[key_str] = Resample(
sample_rate, self.sample_rate, lowpass_filter_width=128
)
self.resample_kernel[key_str] = (
self.resample_kernel[key_str].to(self.dtype).to(self.device)
)
audio_res = self.resample_kernel[key_str](audio)
mel = self.extract_nvstft(audio_res, keyshift=keyshift, train=train)
n_frames = int(audio.shape[1] // self.hop_size) + 1
mel = torch.cat((mel, mel[:, -1:, :]), 1) if n_frames > int(mel.shape[1]) else mel
mel = mel[:, :n_frames, :] if n_frames < int(mel.shape[1]) else mel
return mel
def __call__(self, audio, sample_rate, keyshift=0, train=False):
return self.extract_mel(audio, sample_rate, keyshift=keyshift, train=train)
class DotDict(dict):
def __getattr__(*args):
val = dict.get(*args)
return DotDict(val) if type(val) is dict else val
__setattr__ = dict.__setitem__
__delattr__ = dict.__delitem__
class F0Predictor(object):
def compute_f0(self, wav, p_len):
pass
def compute_f0_uv(self, wav, p_len):
pass
class FCPEF0Predictor(F0Predictor):
def __init__(
self,
model_path,
hop_length=512,
f0_min=50,
f0_max=1100,
dtype=torch.float32,
device=None,
sample_rate=44100,
threshold=0.05,
):
self.fcpe = FCPEInfer(model_path, device=device, dtype=dtype)
self.hop_length = hop_length
self.f0_min = f0_min
self.f0_max = f0_max
self.device = device or ("cuda" if torch.cuda.is_available() else "cpu")
self.threshold = threshold
self.sample_rate = sample_rate
self.dtype = dtype
self.name = "fcpe"
def repeat_expand(
self,
content: Union[torch.Tensor, np.ndarray],
target_len: int,
mode: str = "nearest",
):
ndim = content.ndim
content = (
content[None, None] if ndim == 1 else content[None] if ndim == 2 else content
)
assert content.ndim == 3
is_np = isinstance(content, np.ndarray)
content = torch.from_numpy(content) if is_np else content
results = torch.nn.functional.interpolate(content, size=target_len, mode=mode)
results = results.numpy() if is_np else results
return results[0, 0] if ndim == 1 else results[0] if ndim == 2 else results
def post_process(self, x, sample_rate, f0, pad_to):
f0 = (
torch.from_numpy(f0).float().to(x.device)
if isinstance(f0, np.ndarray)
else f0
)
f0 = self.repeat_expand(f0, pad_to) if pad_to is not None else f0
vuv_vector = torch.zeros_like(f0)
vuv_vector[f0 > 0.0] = 1.0
vuv_vector[f0 <= 0.0] = 0.0
nzindex = torch.nonzero(f0).squeeze()
f0 = torch.index_select(f0, dim=0, index=nzindex).cpu().numpy()
time_org = self.hop_length / sample_rate * nzindex.cpu().numpy()
time_frame = np.arange(pad_to) * self.hop_length / sample_rate
vuv_vector = F.interpolate(vuv_vector[None, None, :], size=pad_to)[0][0]
if f0.shape[0] <= 0:
return np.zeros(pad_to), vuv_vector.cpu().numpy()
if f0.shape[0] == 1:
return np.ones(pad_to) * f0[0], vuv_vector.cpu().numpy()
f0 = np.interp(time_frame, time_org, f0, left=f0[0], right=f0[-1])
return f0, vuv_vector.cpu().numpy()
def compute_f0(self, wav, p_len=None):
x = torch.FloatTensor(wav).to(self.dtype).to(self.device)
p_len = x.shape[0] // self.hop_length if p_len is None else p_len
f0 = self.fcpe(x, sr=self.sample_rate, threshold=self.threshold)[0, :, 0]
if torch.all(f0 == 0):
return f0.cpu().numpy() if p_len is None else np.zeros(p_len), (
f0.cpu().numpy() if p_len is None else np.zeros(p_len)
)
return self.post_process(x, self.sample_rate, f0, p_len)[0]
def compute_f0_uv(self, wav, p_len=None):
x = torch.FloatTensor(wav).to(self.dtype).to(self.device)
p_len = x.shape[0] // self.hop_length if p_len is None else p_len
f0 = self.fcpe(x, sr=self.sample_rate, threshold=self.threshold)[0, :, 0]
if torch.all(f0 == 0):
return f0.cpu().numpy() if p_len is None else np.zeros(p_len), (
f0.cpu().numpy() if p_len is None else np.zeros(p_len)
)
return self.post_process(x, self.sample_rate, f0, p_len)
'''
with open(os.sep.join([current_dir, dirs[6], "FCPE.py"]), 'w') as f:
f.write(FCPE)
VBACH_CLI = '''
import gc
import os
import datetime
import gradio as gr
import torch
import librosa
import tempfile
from datetime import datetime
import argparse
from vbach.infer.infer import Config, load_hubert, get_vc, rvc_infer
# Константы
RVC_MODELS_DIR = os.path.join(os.getcwd(), "voice_models")
HUBERT_MODEL_PATH = os.path.join(
os.getcwd(), "vbach", "models", "embedders", "hubert_base.pt"
)
OUTPUT_FORMAT = ["mp3", "wav", "flac", "aiff", "m4a", "aac", "ogg", "opus"]
audio_extensions = {".mp3", ".wav", ".flac", ".aiff", ".m4a", ".aac", ".ogg", ".opus"}
# Важные функции
def load_rvc_model(voice_model):
model_dir = os.path.join(RVC_MODELS_DIR, voice_model)
model_files = os.listdir(model_dir)
rvc_model_path = next(
(os.path.join(model_dir, f) for f in model_files if f.endswith(".pth")), None
)
rvc_index_path = next(
(os.path.join(model_dir, f) for f in model_files if f.endswith(".index")), None
)
if not rvc_model_path:
raise ValueError(
f"\033[91mМодели {voice_model} не существует. "
"Возможно, вы неправильно ввели имя.\033[0m"
)
return rvc_model_path, rvc_index_path
def voice_conversion(
voice_model,
vocals_path,
output_path,
pitch,
f0_method,
index_rate,
filter_radius,
volume_envelope,
protect,
hop_length,
f0_min,
f0_max,
format_output,
output_bitrate,
stereo_mode
):
rvc_model_path, rvc_index_path = load_rvc_model(voice_model)
config = Config()
hubert_model = load_hubert(config.device, config.is_half, HUBERT_MODEL_PATH)
cpt, version, net_g, tgt_sr, vc = get_vc(
config.device, config.is_half, config, rvc_model_path
)
output_audio = rvc_infer(
rvc_index_path,
index_rate,
vocals_path,
output_path,
pitch,
f0_method,
cpt,
version,
net_g,
filter_radius,
tgt_sr,
volume_envelope,
protect,
hop_length,
vc,
hubert_model,
f0_min,
f0_max,
format_output,
output_bitrate,
stereo_mode
)
del hubert_model, cpt, net_g, vc
gc.collect()
torch.cuda.empty_cache()
return output_audio
def cli_conversion(input_audios, template="NAME_MODEL_F0METHOD_PITCH", output_dir="output", model_name="", index_rate=0, output_format="wav", stereo_mode="mono", method_pitch="rmvpe+", pitch=0, hop_length=128, filter_radius=3, rms=0.25, protect=0.33, f0_min=50, f0_max=1100):
if not input_audios:
raise ValueError(
"Не удалось найти аудиофайл(ы). "
"Убедитесь, что файл загрузился или проверьте правильность пути к нему."
)
if not model_name:
raise ValueError("Выберите модель голоса для преобразования.")
if not os.path.exists(input_audios):
raise ValueError(f"Файл {input_audios} не найден.")
if not os.path.exists(input_audios):
raise FileNotFoundError(f"Ошибка: '{input_audios}' не существует.")
os.makedirs(output_dir, exist_ok=True)
if os.path.isfile(input_audios):
# Проверяем, является ли файл аудио
ext = os.path.splitext(input_audios)[1].lower()
if ext not in audio_extensions:
raise ValueError(f"Ошибка: '{input_audios}' не является аудиофайлом (допустимые расширения: {audio_extensions}).")
print(f"Найден аудиофайл: {input_audios}")
try:
file_name = os.path.basename(input_audios)
namefile = os.path.splitext(file_name)[0]
time_create_file = datetime.now().strftime("%Y%m%d_%H%M%S")
output_name = template
output_path = os.path.join(output_dir, f"{output_name}.{output_format}")
voice_conversion(model_name, input_audios, output_path, pitch, method_pitch, index_rate, filter_radius, rms, protect, hop_length, f0_min, f0_max, output_format, "320k", stereo_mode)
finally:
print("Вокал успешно преобразован")
elif os.path.isdir(input_audios):
# Ищем аудиофайлы в папке
audio_files = []
for file in os.listdir(input_audios):
ext = os.path.splitext(file)[1].lower()
if ext in audio_extensions:
audio_files.append(os.path.join(input_audios, file))
if not audio_files:
raise FileNotFoundError(f"Ошибка: в папке '{input_audios}' нет аудиофайлов (допустимые расширения: {audio_extensions}).")
print(f"Найдены аудиофайлы: {audio_files}")
try:
output_paths = []
for file in audio_files:
file_name = os.path.basename(file)
namefile = os.path.splitext(file_name)[0]
time_create_file = datetime.now().strftime("%Y%m%d_%H%M%S")
output_name = (
template
.replace("DATETIME", time_create_file)
.replace("NAME", namefile)
.replace("MODEL", model_name)
.replace("F0METHOD", method_pitch)
.replace("PITCH", f"{pitch}")
)
output_path = os.path.join(output_dir, f"{output_name}.{output_format}")
voice_conversion(model_name, file, output_path, pitch, method_pitch, index_rate, filter_radius, rms, protect, hop_length, 50, 1100, output_format, "320k", stereo_mode)
output_paths.append(output_path)
finally:
print("Вокалы успешно преобразованы")
else:
raise ValueError(f"Ошибка: '{input_audios}' не является ни файлом, ни папкой.")
def setup_args():
parser = argparse.ArgumentParser(description='Vbach CLI')
# Обязательные аргументы
parser.add_argument(
'input_audios',
type=str,
help='Путь к аудиофайлу или папке с аудиофайлами для обработки'
)
parser.add_argument(
'output_dir',
type=str,
help='Папка для сохранения результатов конвертации'
)
parser.add_argument(
'model_name',
type=str,
help='Название голосовой модели RVC для преобразования'
)
# Необязательные аргументы с значениями по умолчанию
parser.add_argument(
'--template',
type=str,
default="NAME_MODEL_F0METHOD_PITCH",
help='Шаблон имени выходного файла (доступные замены: DATETIME, NAME, MODEL, F0METHOD, PITCH)'
)
parser.add_argument(
'--index_rate',
type=float,
default=0,
help='Интенсивность использования индексного файла (от 0.0 до 1.0)',
metavar='[0.0-1.0]'
)
parser.add_argument(
'--output_format',
type=str,
default="wav",
choices=OUTPUT_FORMAT,
help='Формат выходного аудиофайла'
)
parser.add_argument(
'--stereo_mode',
type=str,
default="mono",
choices=["mono", "left/right", "sim/dif"],
help='Режим каналов: моно или стерео'
)
parser.add_argument(
'--method_pitch',
type=str,
default="rmvpe+",
help='Метод извлечения pitch (тона)'
)
parser.add_argument(
'--pitch',
type=int,
default=0,
help='Корректировка тона в полутонах'
)
parser.add_argument(
'--hop_length',
type=int,
default=128,
help='Длина hop (в семплах) для обработки'
)
parser.add_argument(
'--filter_radius',
type=int,
default=3,
help='Радиус фильтра для сглаживания'
)
parser.add_argument(
'--rms',
type=float,
default=0.25,
help='Масштабирование огибающей громкости (RMS)'
)
parser.add_argument(
'--protect',
type=float,
default=0.33,
help='Защита для глухих согласных звуков'
)
parser.add_argument(
'--f0_min',
type=int,
default=50,
help='Минимальная частота pitch (F0) в Hz'
)
parser.add_argument(
'--f0_max',
type=int,
default=1100,
help='Максимальная частота pitch (F0) в Hz'
)
return parser.parse_args()
# Пример использования:
if __name__ == "__main__":
args = setup_args()
cli_conversion(
input_audios=args.input_audios,
output_dir=args.output_dir,
model_name=args.model_name,
template=args.template,
index_rate=args.index_rate,
output_format=args.output_format,
stereo_mode=args.stereo_mode,
method_pitch=args.method_pitch,
pitch=args.pitch,
hop_length=args.hop_length,
filter_radius=args.filter_radius,
rms=args.rms,
protect=args.protect,
f0_min=args.f0_min,
f0_max=args.f0_max
)
'''
with open(os.sep.join([current_dir, dirs[2], "vbach.py"]), 'w') as f:
f.write(VBACH_CLI)
def set_language(lang):
global CURRENT_LANG
CURRENT_LANG = lang
def t(key, **kwargs):
translation = TRANSLATIONS[CURRENT_LANG].get(key, key)
if isinstance(translation, dict):
return translation
return translation.format(**kwargs) if kwargs else translation
def download_file(url, zip_name, progress):
try:
if "drive.google.com" in url:
progress(0.5, desc=t('downloading_google'))
download_from_google_drive(url, zip_name, progress)
elif "huggingface.co" in url:
progress(0.5, desc=t('downloading_huggingface'))
download_from_huggingface(url, zip_name, progress)
elif "pixeldrain.com" in url:
progress(0.5, desc=t('downloading_pixeldrain'))
download_from_pixeldrain(url, zip_name, progress)
elif "mega.nz" in url:
print(t('mega_unsupported'))
elif "disk.yandex.ru" in url or "yadi.sk" in url:
progress(0.5, desc=t('downloading_yandex'))
download_from_yandex(url, zip_name, progress)
else:
raise ValueError(t('unsupported_source', url=url))
except Exception as e:
raise gr.Error(t('download_error', error=str(e)))
def download_from_google_drive(url, zip_name, progress):
file_id = (
url.split("file/d/")[1].split("/")[0]
if "file/d/" in url
else url.split("id=")[1].split("&")[0]
)
gdown.download(id=file_id, output=str(zip_name), quiet=False)
def download_from_huggingface(url, zip_name, progress):
urllib.request.urlretrieve(url, zip_name)
def download_from_pixeldrain(url, zip_name, progress):
file_id = url.split("pixeldrain.com/u/")[1]
response = requests.get(f"https://pixeldrain.com/api/file/{file_id}")
with open(zip_name, "wb") as f:
f.write(response.content)
def download_from_yandex(url, zip_name, progress):
yandex_public_key = f"download?public_key={url}"
yandex_api_url = f"https://cloud-api.yandex.net/v1/disk/public/resources/{yandex_public_key}"
response = requests.get(yandex_api_url)
if response.status_code == 200:
download_link = response.json().get("href")
urllib.request.urlretrieve(download_link, zip_name)
else:
raise gr.Error(t('yandex_api_error', status=response.status_code))
def extract_zip(extraction_folder, zip_name):
os.makedirs(extraction_folder, exist_ok=True)
with zipfile.ZipFile(zip_name, "r") as zip_ref:
zip_ref.extractall(extraction_folder)
os.remove(zip_name)
index_filepath, model_filepath = None, None
for root, _, files in os.walk(extraction_folder):
for name in files:
file_path = os.path.join(root, name)
if name.endswith(".index") and os.stat(file_path).st_size > 1024 * 100:
index_filepath = file_path
if name.endswith(".pth") and os.stat(file_path).st_size > 1024 * 1024 * 40:
model_filepath = file_path
if not model_filepath:
raise gr.Error(t('pth_not_found', folder=extraction_folder))
rename_and_cleanup(extraction_folder, model_filepath, index_filepath)
def rename_and_cleanup(extraction_folder, model_filepath, index_filepath):
os.rename(
model_filepath,
os.path.join(extraction_folder, os.path.basename(model_filepath)),
)
if index_filepath:
os.rename(
index_filepath,
os.path.join(extraction_folder, os.path.basename(index_filepath)),
)
for filepath in os.listdir(extraction_folder):
full_path = os.path.join(extraction_folder, filepath)
if os.path.isdir(full_path):
shutil.rmtree(full_path)
def download_from_url(url, dir_name, progress=gr.Progress()):
try:
progress(0, desc=t('downloading_model', dir_name=dir_name))
zip_name = os.path.join(dirs[0], dir_name + ".zip")
extraction_folder = os.path.join(current_dir, dirs[0], dir_name)
if os.path.exists(extraction_folder):
raise gr.Error(t('model_exists', dir_name=dir_name))
download_file(url, zip_name, progress)
progress(0.8, desc=t('unpacking_zip'))
extract_zip(extraction_folder, zip_name)
return t('model_uploaded', dir_name=dir_name)
except Exception as e:
raise gr.Error(t('model_load_error', error=str(e)))
def upload_zip_file(zip_path, dir_name, progress=gr.Progress()):
try:
extraction_folder = os.path.join(current_dir, dirs[0], dir_name)
if os.path.exists(extraction_folder):
raise gr.Error(t('model_exists', dir_name=dir_name))
zip_name = zip_path.name
progress(0.8, desc=t('unpacking_zip'))
extract_zip(extraction_folder, zip_name)
return t('model_uploaded', dir_name=dir_name)
except Exception as e:
raise gr.Error(t('model_load_error', error=str(e)))
def upload_separate_files(pth_file, index_file, dir_name, progress=gr.Progress()):
try:
extraction_folder = os.path.join(current_dir, dirs[0], dir_name)
if os.path.exists(extraction_folder):
raise gr.Error(t('model_exists', dir_name=dir_name))
os.makedirs(extraction_folder, exist_ok=True)
if pth_file:
pth_path = os.path.join(extraction_folder, os.path.basename(pth_file.name))
shutil.copyfile(pth_file.name, pth_path)
if index_file:
index_path = os.path.join(extraction_folder, os.path.basename(index_file.name))
shutil.copyfile(index_file.name, index_path)
return t('model_uploaded', dir_name=dir_name)
except Exception as e:
raise gr.Error(t('model_load_error', error=str(e)))
def delete_model_name(dir_name):
model_dir = os.path.join(current_dir, dirs[0], dir_name)
if os.path.exists(model_dir):
try:
if os.path.isdir(model_dir):
shutil.rmtree(model_dir)
return t('model_deleted', dir_name=dir_name)
except Exception as e:
raise gr.Error(t('model_delete_error', error=str(e)))
else:
return t('model_not_found', dir_name=dir_name)
from vbach.cli.vbach import voice_conversion
def process_audio(
input_file: str = None,
input_list: str = None,
template: str = "NAME_MODEL_F0METHOD_PITCH",
model_name: str = "",
index_rate: float = 0,
output_format: str = "wav",
output_bitrate: int = 320,
stereo_mode: str = "mono",
method_pitch: str = "rmvpe+",
pitch: float = 0,
hop_length: int = 128,
filter_radius: int = 3,
rms: float = 0.25,
protect: float = 0.33,
f0_min: int = 50,
f0_max: int = 1100
):
keys = ["NAME", "PITCH", "F0_METHOD", "DATETIME", "MODEL"]
if any(key in template for key in keys):
pass
else:
template = "DATETIME_Vbach_F0METHOD_PITCH"
if not isinstance(input_list, list) and not input_file:
try:
print(input_list)
input_list = ast.literal_eval(input_list)
except Exception as e:
print(e)
gr.Warning(t("error_strlist_is_not_list"))
return None
if input_file is not None:
try:
print(input_file)
input_list = ast.literal_eval(input_file)
gr.Warning(t("error_path_is_list"))
return None
except Exception as e:
pass
output_bitrate = f"{output_bitrate}k"
if not input_file and not input_list:
raise gr.Error(t("error_no_audio"))
if not model_name:
raise gr.Error(t("error_no_model"))
if input_file is not None and isinstance(input_file, str) and input_list == None:
if not os.path.exists(input_file):
gr.Warning(t("warning_file_not_found", file=input_file))
return None
file_name = os.path.basename(input_file)
namefile = os.path.splitext(file_name)[0]
time_create_file = datetime.now().strftime("%Y%m%d_%H%M%S")
output_name = template
output_dir = tempfile.mkdtemp(prefix="converted_voice_")
print(output_dir)
output_name = (
template
.replace("DATETIME", time_create_file)
.replace("NAME", namefile)
.replace("MODEL", model_name)
.replace("F0METHOD", method_pitch)
.replace("PITCH", f"{pitch}")
)
output_path = os.path.join(output_dir, f"{output_name}.{output_format}")
try:
output_path = voice_conversion(
model_name,
input_file,
output_path,
pitch,
method_pitch,
index_rate,
filter_radius,
rms,
protect,
hop_length,
f0_min,
f0_max,
output_format,
output_bitrate,
stereo_mode
)
except Exception as e:
print(e)
finally:
print(t("success_single"))
return output_path
if input_file is None and input_list is not None and isinstance(input_list, list):
output_dir = tempfile.mkdtemp(prefix="converted_voice_")
print(output_dir)
output_paths = []
progress = gr.Progress()
for i, file in enumerate(input_list):
if not os.path.exists(file):
gr.Warning(t("warning_file_not_found", file=file))
continue
total_steps = len(input_list)
file_name = os.path.basename(file)
namefile = os.path.splitext(file_name)[0]
time_create_file = datetime.now().strftime("%Y%m%d_%H%M%S")
progress(
(i+1, total_steps),
desc=t("processing", namefile=namefile),
unit=t("files")
)
output_name = (
template
.replace("DATETIME", time_create_file)
.replace("NAME", namefile)
.replace("MODEL", model_name)
.replace("F0METHOD", method_pitch)
.replace("PITCH", f"{pitch}")
)
output_path = os.path.join(output_dir, f"{output_name}.{output_format}")
try:
output_path = voice_conversion(
model_name,
file,
output_path,
pitch,
method_pitch,
index_rate,
filter_radius,
rms,
protect,
hop_length,
f0_min,
f0_max,
output_format,
output_bitrate,
stereo_mode
)
except Exception as e:
print(e)
finally:
output_paths.append(output_path)
print(t("success_batch"))
return output_paths
def vbach_plugin_name():
return "VBach"
def vbach_plugin(lang="ru"):
set_language(lang)
with gr.TabItem(t("inference")):
with gr.Column():
with gr.Column(scale=3) as input_voice_group:
with gr.Group() as single_voice_file:
input_voice = gr.Audio(label=t("select_file"), interactive=True, type="filepath")
batch_upload_btn = gr.Button(t("batch_upload"))
with gr.Group(visible=False) as batch_voice_file:
input_voices = gr.Files(type="filepath", interactive=True, show_label=False)
single_upload_btn = gr.Button(t("single_upload"))
input_voice_path = gr.Textbox(label=t("audio_path"), info=t("audio_path_info"), interactive=True)
input_voice.upload(fn=(lambda x: gr.update(value=x)), inputs=input_voice, outputs=input_voice_path)
input_voices.upload(fn=(lambda x: gr.update(value=str(x))), inputs=input_voices, outputs=input_voice_path)
with gr.Column():
with gr.Row(equal_height=True):
model_name = gr.Dropdown(label=t("model_name"), choices=[d for d in os.listdir(os.path.join(current_dir, dirs[0])) if os.path.isdir(os.path.join(os.path.join(current_dir, dirs[0]), d))], interactive=True, filterable=False, scale=6)
model_update_btn = gr.Button(t("update_button"), variant="primary", scale=3, size="lg")
model_update_btn.click(fn=(lambda : gr.update(choices=[d for d in os.listdir(os.path.join(current_dir, dirs[0])) if os.path.isdir(os.path.join(os.path.join(current_dir, dirs[0]), d))])), inputs=None, outputs=model_name)
with gr.Row():
method_pitch = gr.Dropdown(label=t("pitch_method"), choices=["mangio-crepe", "rmvpe+", "fcpe"], value="rmvpe+", interactive=True, filterable=False)
hop_length = gr.Slider(minimum=2, maximum=512, step=1, value=128, label=t("hop_length"), interactive=True, visible=False)
with gr.Row():
pitch = gr.Slider(minimum=-48, maximum=48, step=12, value=0, label=t("pitch"), interactive=True)
with gr.Row():
f0_min = gr.Slider(minimum=50, maximum=3500, step=1, value=50, label=t("f0_min"), interactive=True)
f0_max = gr.Slider(minimum=500, maximum=3500, step=1, value=1100, label=t("f0_max"), interactive=True)
with gr.Column(variant="panel"):
with gr.Group():
with gr.Row(equal_height=True):
with gr.Column(scale=3):
stereo_mode = gr.Dropdown(
label=t("audio_processing"),
choices=list(t("stereo_modes").keys()),
value="mono",
interactive=True,
filterable=False
)
output_format = gr.Dropdown(label=t("output_format"), choices=OUTPUT_FORMAT)
output_bitrate = gr.Slider(32, 320, step=1, label=t("bitrate"), value=320, interactive=True)
with gr.Column(scale=6) as single_output_group:
converted_voice = gr.Audio(label=t("converted_voice"), type="filepath", interactive=False, show_download_button=True, elem_classes="fixed-height")
with gr.Column(scale=6, visible=False) as batch_output_group:
converted_voices = gr.Files(label=t("converted_voices"), type="filepath", interactive=False, height="100%", elem_classes="fixed-height")
convert_btn = gr.Button(t("convert_single"), variant="primary", scale=3)
convert_batch_btn = gr.Button(t("convert_batch"), variant="primary", visible=False, scale=3)
with gr.Column():
with gr.Tab(t("name_format")):
template_info = gr.Markdown(t("name_format_info"), line_breaks=True)
template = gr.Text(label=t("name_format"), value="NAME_MODEL_F0METHOD_PITCH", interactive=True)
with gr.Tab(t("advanced_settings")):
with gr.Row():
with gr.Column(scale=3):
filter_radius = gr.Slider(minimum=0, maximum=7, step=1, value=3, label=t("filter_radius"), interactive=True)
index_rate = gr.Slider(minimum=0, maximum=1, step=0.01, value=0, label=t("index_rate"), interactive=True)
rms = gr.Slider(minimum=0, maximum=1, step=0.01, value=0.25, label=t("rms"), interactive=True)
protect = gr.Slider(minimum=0, maximum=0.5, step=0.01, value=0.33, label=t("protect"), interactive=True)
with gr.TabItem(t("model_manager")):
with gr.TabItem(t("download_url")):
with gr.Row():
with gr.Column(variant="panel"):
gr.HTML(f"{t('download_link')}
")
model_zip_link = gr.Text(label=t("download_link"))
with gr.Group():
zip_model_name = gr.Text(
label=t("model_name"),
info=t("unique_name"),
)
download_btn = gr.Button(t("download_button"), variant="primary")
gr.HTML(
f""
)
dl_output_message = gr.Text(label=t("output_message"), interactive=False)
download_btn.click(
download_from_url,
inputs=[model_zip_link, zip_model_name],
outputs=dl_output_message,
)
with gr.Tab(t("download_zip")):
with gr.Row():
with gr.Column():
zip_file = gr.File(
label=t("zip_file"), file_types=[".zip"], file_count="single"
)
with gr.Column(variant="panel"):
gr.HTML(t("upload_steps"))
with gr.Group():
local_model_name = gr.Text(
label=t("model_name"),
info=t("unique_name"),
)
model_upload_button = gr.Button(t("download_button"), variant="primary")
local_upload_output_message = gr.Text(label=t("output_message"), interactive=False)
model_upload_button.click(
upload_zip_file,
inputs=[zip_file, local_model_name],
outputs=local_upload_output_message,
)
with gr.TabItem(t("download_files")):
with gr.Group():
with gr.Row():
pth_file = gr.File(
label=t("pth_file"), file_types=[".pth"], file_count="single"
)
index_file = gr.File(
label=t("index_file"), file_types=[".index"], file_count="single"
)
with gr.Column(variant="panel"):
with gr.Group():
separate_model_name = gr.Text(
label=t("model_name"),
info=t("unique_name"),
)
separate_upload_button = gr.Button(t("download_button"), variant="primary")
separate_upload_output_message = gr.Text(
label=t("output_message"), interactive=False
)
separate_upload_button.click(
upload_separate_files,
inputs=[pth_file, index_file, separate_model_name],
outputs=separate_upload_output_message,
)
with gr.TabItem(t("delete_model")):
with gr.Column(variant="panel"):
with gr.Group():
delete_voicemodel_name = gr.Dropdown(
label=t("model_name"),
info=t("delete_info"),
choices=[d for d in os.listdir(os.path.join(current_dir, dirs[0])) if os.path.isdir(os.path.join(os.path.join(current_dir, dirs[0]), d))],
interactive=True,
filterable=False
)
refresh_delete_btn = gr.Button(t("refresh_button"))
refresh_delete_btn.click(fn=(lambda : gr.update(choices=[d for d in os.listdir(os.path.join(current_dir, dirs[0])) if os.path.isdir(os.path.join(os.path.join(current_dir, dirs[0]), d))])), inputs=None, outputs=delete_voicemodel_name)
delete_model_output_message = gr.Text(
label=t("output_message"), interactive=False
)
delete_model_btn = gr.Button(t("delete_button"))
delete_model_btn.click(
fn=delete_model_name,
inputs=delete_voicemodel_name,
outputs=delete_model_output_message
)
method_pitch.change(fn=lambda x: gr.update(visible=True if x == "mangio-crepe" else False), inputs=method_pitch, outputs=hop_length)
batch_upload_btn.click(fn=(lambda : (gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), gr.update(visible=True))), inputs=None, outputs=[single_voice_file, batch_voice_file, single_output_group, batch_output_group, convert_btn, convert_batch_btn])
single_upload_btn.click(fn=(lambda : (gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), gr.update(visible=True))), inputs=None, outputs=[batch_voice_file, single_voice_file, batch_output_group, single_output_group, convert_batch_btn, convert_btn])
convert_btn.click(fn=process_audio, inputs=[input_voice_path, gr.State(None), template, model_name, index_rate, output_format, output_bitrate, stereo_mode, method_pitch, pitch, hop_length, filter_radius, rms, protect, f0_min, f0_max], outputs=converted_voice)
convert_batch_btn.click(fn=process_audio, inputs=[gr.State(None), input_voice_path, template, model_name, index_rate, output_format, output_bitrate, stereo_mode, method_pitch, pitch, hop_length, filter_radius, rms, protect, f0_min, f0_max], outputs=converted_voices)