mvsepless_plugins / vbach_portable_v2.py
noblebarkrr's picture
Update vbach_portable_v2.py
1953b4c verified
import os
import gc
import ast
import requests
import sys
import shutil
import zipfile
import gradio as gr
import urllib.request
import gdown
import tempfile
from datetime import datetime
current_dir = os.getcwd()
dirs = [
"voice_models",
"vbach",
os.path.join("vbach", "cli"),
os.path.join("vbach", "infer"),
os.path.join("vbach", "lib"),
os.sep.join(["vbach", "lib", "algorithm"]),
os.sep.join(["vbach", "lib", "predictors"]),
os.path.join("vbach", "models"),
os.sep.join(["vbach", "models", "predictors"]),
os.sep.join(["vbach", "models", "embedders"]),
os.path.join("vbach", "scripts"),
os.path.join("vbach", "utils")
]
RMVPE_PATH = os.path.join(dirs[8], "rmvpe.pt")
FCPE_PATH = os.path.join(dirs[8], "fcpe.pt")
RVC_MODELS_DIR = dirs[0]
HUBERT_MODEL_PATH = os.path.join(
dirs[9], "hubert_base.pt"
)
CURRENT_LANG = "ru"
OUTPUT_FORMAT = ["mp3", "wav", "flac", "aiff", "m4a", "aac", "ogg", "opus"]
TRANSLATIONS = {
"ru": {
"app_title": "VBach",
"inference": "Инференс",
"select_file": "Выберите файл",
"audio_path": "Путь к файлу",
"audio_path_info": "Здесь можно ввести путь к файлу/список путей к файлам , либо загрузить его/их выше и получить путь к нему/их список",
"audio_processing": "Режим обработки аудио",
"output_format": "Формат вывода",
"name_format": "Шаблон",
"name_format_info": """Доступные ключи для формата:
NAME - Имя входного файла
MODEL - Название модели
PITCH - Высота тона
F0_METHOD - Метод извлечения тона
DATETIME - Время и дата создания результата
Пример - NAME_MODEL_PITCH → name_your-model_12""",
"convert_single": "Конвертировать один",
"convert_batch": "Конвертировать несколько",
"model_name": "Имя модели",
"pitch_method": "Метод извлечения тона",
"pitch": "Высота тона",
"hop_length": "Длина шага",
"bitrate": "Битрейт (Кбит/сек)",
"f0_min": "Нижний лимит определения высоты тона",
"f0_max": "Верхний лимит определения высоты тона",
"advanced_settings": "Дополнительные настройки",
"filter_radius": "Радиус фильтра",
"index_rate": "Влияние индекса",
"rms": "Огибающая громкости",
"protect": "Защита согласных",
"model_manager": "Менеджер моделей",
"download_url": "Загрузить по ссылке",
"download_zip": "Загрузить ZIP архивом",
"download_files": "Загрузить файлами",
"delete_model": "Удалить модель",
"download_link": "Ссылка на загрузку модели",
"unique_name": "Дайте вашей загружаемой модели уникальное имя, отличное от других голосовых моделей.",
"download_button": "Загрузить модель",
"supported_sites": "Поддерживаемые сайты",
"output_message": "Сообщение вывода",
"zip_file": "Zip-файл",
"upload_steps": "<h3>1. Найдите и скачайте файлы: .pth и необязательный файл .index</h3><h3>2. Закиньте файл(-ы) в ZIP-архив и поместите его в область загрузки</h3><h3>3. Дождитесь полной загрузки ZIP-архива в интерфейс</h3>",
"pth_file": "pth-файл",
"index_file": "index-файл",
"delete_info": "Выберите модель, которую надо удалить",
"refresh_button": "Обновить список моделей",
"delete_button": "Удалить модель",
"batch_upload": "Пакетная загрузка",
"single_upload": "Одиночная загрузка",
"converted_voice": "Преобразованный вокал",
"converted_voices": "Преобразованные вокалы",
"update_button": "Обновить",
"processing": "Сейчас обрабатывается - {namefile}",
"files": "файлов",
"error_no_audio": "Не удалось найти аудиофайл(ы). Убедитесь, что файл загрузился или проверьте правильность пути к нему.",
"error_no_model": "Выберите модель голоса для преобразования голоса",
"warning_file_not_found": "Файл {file} не найден.",
"success_single": "Вокал успешно преобразован",
"success_batch": "Вокалы успешно преобразованы",
"language": "Язык",
"stereo_modes": {
"mono": "Моно",
"left/right": "Левый/Правый",
"sim/dif": "Сходство/Различия"
},
# Прогресс-бары
'downloading_google': "[~] Загрузка модели с Google Drive...",
'downloading_huggingface': "[~] Загрузка модели с HuggingFace...",
'downloading_pixeldrain': "[~] Загрузка модели с Pixeldrain...",
'downloading_yandex': "[~] Загрузка модели с Яндекс Диска...",
'downloading_model': "[~] Загрузка голосовой модели {dir_name}...",
'unpacking_zip': "[~] Распаковка zip-файла...",
# Уведомления об ошибках
'unsupported_source': "Неподдерживаемый источник: {url}",
'download_error': "Ошибка при скачивании: {error}",
'yandex_api_error': "Ошибка при получении ссылки с Яндекс Диска: {status}",
'pth_not_found': "Не найден файл модели .pth в распакованном zip-файле. Проверьте содержимое в {folder}.",
'model_exists': "Директория голосовой модели {dir_name} уже существует! Выберите другое имя.",
'model_load_error': "Ошибка при загрузке модели: {error}",
'model_delete_error': "Ошибка при удалении модели: {error}",
# Статус операции
'mega_unsupported': "Mega не поддерживается!",
'model_uploaded': "[+] Модель {dir_name} успешно загружена!",
'model_deleted': "[-] Модель {dir_name} успешно удалена!",
'model_not_found': "[-] Модели {dir_name} не существует",
"error_strlist_is_not_list": "Эта строка не является списком файлов",
"error_path_is_list": "Путь к файлу является списком"
},
"en": {
"app_title": "VBach",
"inference": "Inference",
"select_file": "Select File",
"audio_path": "Audio path",
"audio_path_info": "You can enter a file path or a list of file paths here, or upload the file(s) above to obtain their path(s)",
"audio_processing": "Audio Processing Mode",
"output_format": "Output Format",
"name_format": "Template",
"name_format_info": """Available format keys:
NAME - Input file name
MODEL - Model name
PITCH - Pitch
F0_METHOD - Method extraction pitch
DATETIME - Date & time create results
Example - NAME_MODEL_PITCH → name_your-model_12""",
"convert_single": "Convert Single",
"convert_batch": "Convert Batch",
"model_name": "Model Name",
"pitch_method": "Pitch Extraction Method",
"pitch": "Pitch",
"hop_length": "Hop Length",
"bitrate": "Bitrate (Kbit/sec)",
"f0_min": "F0 Min",
"f0_max": "F0 Max",
"advanced_settings": "Advanced Settings",
"filter_radius": "Filter Radius",
"index_rate": "Index Rate",
"rms": "RMS Envelope",
"protect": "Consonant Protection",
"model_manager": "Model Manager",
"download_url": "Download by URL",
"download_zip": "Upload ZIP Archive",
"download_files": "Upload Files",
"delete_model": "Delete Model",
"download_link": "Model Download Link",
"unique_name": "Give your model a unique name different from other voice models.",
"download_button": "Download Model",
"supported_sites": "Supported Sites",
"output_message": "Output Message",
"zip_file": "Zip File",
"upload_steps": "<h3>1. Find and download files: .pth and optional .index</h3><h3>2. Put file(s) in a ZIP archive and upload it</h3><h3>3. Wait for the ZIP archive to be fully uploaded</h3>",
"pth_file": "PTH File",
"index_file": "Index File",
"delete_info": "Select the model to delete",
"refresh_button": "Refresh Model List",
"delete_button": "Delete Model",
"batch_upload": "Batch Upload",
"single_upload": "Single Upload",
"converted_voice": "Converted Voice",
"converted_voices": "Converted Voices",
"update_button": "Refresh",
"processing": "Processing - {namefile}",
"files": "files",
"error_no_audio": "Could not find audio file(s). Make sure the file is uploaded or check the file path.",
"error_no_model": "Select a voice model for voice conversion",
"warning_file_not_found": "File {file} not found.",
"success_single": "Voice successfully converted",
"success_batch": "Voices successfully converted",
"language": "Language",
"stereo_modes": {
"mono": "Mono",
"left/right": "Left/Right",
"sim/dif": "Similarity/Difference"
},
'downloading_google': "[~] Downloading model from Google Drive...",
'downloading_huggingface': "[~] Downloading model from HuggingFace...",
'downloading_pixeldrain': "[~] Downloading model from Pixeldrain...",
'downloading_yandex': "[~] Downloading model from Yandex Disk...",
'downloading_model': "[~] Downloading voice model {dir_name}...",
'unpacking_zip': "[~] Unpacking zip file...",
# Error messages
'unsupported_source': "Unsupported source: {url}",
'download_error': "Download error: {error}",
'yandex_api_error': "Yandex Disk API error: {status}",
'pth_not_found': "Model .pth file not found in unzipped archive. Check contents in {folder}.",
'model_exists': "Voice model directory {dir_name} already exists! Choose another name.",
'model_load_error': "Error loading model: {error}",
'model_delete_error': "Error deleting model: {error}",
# Operation status
'mega_unsupported': "Mega is not supported!",
'model_uploaded': "[+] Model {dir_name} uploaded successfully!",
'model_deleted': "[-] Model {dir_name} deleted successfully!",
'model_not_found': "[-] Model {dir_name} does not exist",
"error_strlist_is_not_list": "This string is not a file list",
"error_path_is_list": "The file path is a list"
}
}
for dir in dirs:
os.makedirs(os.path.join(current_dir, dir), exist_ok=True)
for url, file in [["https://huggingface.co/Politrees/RVC_resources/resolve/main/predictors/rmvpe.pt", RMVPE_PATH], ["https://huggingface.co/Politrees/RVC_resources/resolve/main/predictors/fcpe.pt", FCPE_PATH], ["https://huggingface.co/Politrees/RVC_resources/resolve/main/embedders/hubert_base.pt", HUBERT_MODEL_PATH]]:
if not os.path.exists(file):
try:
r = requests.get(url, stream=True)
r.raise_for_status()
with open(os.path.join(file), "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
except requests.exceptions.RequestException as e:
print(f"Произошла ошибка при загрузке модели: {e}")
except Exception as e:
print(f"Произошла непредвиденная ошибка: {e}")
inference = '''
import torch
import numpy as np
import librosa
from multiprocessing import cpu_count
from fairseq import checkpoint_utils
from vbach.lib.algorithm.synthesizers import Synthesizer
from .pipeline import VC
from separator.audio_writer import write_audio_file
from vbach.utils.remove_center import remove_center
def overlay_mono_on_stereo(mono_audio, stereo_audio, gain=0.5):
if mono_audio is None or stereo_audio is None:
raise ValueError("Input audio arrays cannot be None")
# Ensure float32 for processing
mono_audio = mono_audio.astype(np.float32)
stereo_audio = stereo_audio.astype(np.float32)
# Convert mono to stereo if needed
if mono_audio.ndim == 1:
mono_audio = np.vstack([mono_audio, mono_audio])
elif mono_audio.shape[0] == 1:
mono_audio = np.vstack([mono_audio[0], mono_audio[0]])
if mono_audio.shape[0] != 2 or stereo_audio.shape[0] != 2:
raise ValueError("Shapes must be (2, N)")
min_len = min(mono_audio.shape[1], stereo_audio.shape[1])
if min_len == 0:
raise ValueError("Audio arrays cannot be empty")
mono_audio = mono_audio[:, :min_len]
stereo_audio = stereo_audio[:, :min_len]
result = stereo_audio + mono_audio * gain
# Normalize to prevent clipping
max_amp = np.max(np.abs(result))
if max_amp > 0:
result /= max_amp
# Convert back to int16 for output (if needed)
result = (result * 32767).astype(np.int16)
return result
def load_audio(
file_path: str,
target_sr: int,
stereo_mode: str
) -> np.ndarray:
"""
Загружает аудиофайл с помощью librosa, обрабатывает и возвращает аудиосигнал
Параметры:
file_path: Путь к аудиофайлу
target_sr: Целевая частота дискретизации
mono: Преобразовать в моно (по умолчанию True)
normalize: Нормализовать аудио (по умолчанию False)
duration: Загрузить только указанную длительность (в секундах)
offset: Начальное смещение для загрузки (в секундах)
Возвращает:
Аудиоданные в виде numpy array (моно: (samples,), стерео: (channels, samples))
Исключения:
RuntimeError: При ошибках загрузки или обработки аудио
"""
try:
mid, left, right = None, None, None
if stereo_mode == "mono":
# Загрузка аудио с помощью librosa
mid_audio, sr = librosa.load(
file_path,
sr=None,
mono=True
)
mid_audio = librosa.resample(
mid_audio, # Исправлено: было audio
orig_sr=sr,
target_sr=target_sr
)
mid = mid_audio.flatten()
elif stereo_mode == "left/right" or stereo_mode == "sim/dif":
# Загрузка аудио с помощью librosa
stereo_audio, sr = librosa.load(
file_path,
sr=None,
mono=False
)
if stereo_mode == "left/right":
left_audio = stereo_audio[0] # Исправлено: было [:, 0]
right_audio = stereo_audio[1] # Исправлено: было [:, 1]
left_audio = librosa.resample(
left_audio,
orig_sr=sr,
target_sr=target_sr
)
right_audio = librosa.resample(
right_audio,
orig_sr=sr,
target_sr=target_sr
)
left = left_audio.flatten()
right = right_audio.flatten()
elif stereo_mode == "sim/dif":
mid_left, mid_right, dif_left, dif_right = remove_center(input_array=stereo_audio, samplerate=sr)
mid_audio = (mid_left + mid_right) * 0.5
mid_audio = librosa.resample(
mid_audio,
orig_sr=sr,
target_sr=target_sr
)
dif_left = librosa.resample(
dif_left,
orig_sr=sr,
target_sr=target_sr
)
dif_right = librosa.resample(
dif_right,
orig_sr=sr,
target_sr=target_sr
)
mid = mid_audio.flatten()
left = dif_left.flatten() # Исправлено: было left_audio
right = dif_right.flatten() # Исправлено: было right_audio
return mid, left, right
except Exception as e:
raise RuntimeError(f"Ошибка загрузки аудио '{file_path}': {str(e)}")
class Config:
def __init__(self):
self.device = self.get_device()
self.is_half = self.device == "cpu"
self.n_cpu = cpu_count()
self.gpu_name = None
self.gpu_mem = None
self.x_pad, self.x_query, self.x_center, self.x_max = self.device_config()
def get_device(self):
if torch.cuda.is_available():
return "cuda"
elif torch.backends.mps.is_available():
return "mps"
else:
return "cpu"
def device_config(self):
if torch.cuda.is_available():
print("Используется устройство CUDA")
self._configure_gpu()
elif torch.backends.mps.is_available():
print("Используется устройство MPS")
self.device = "mps"
else:
print("Используется CPU")
self.device = "cpu"
self.is_half = True
x_pad, x_query, x_center, x_max = (
(3, 10, 60, 65) if self.is_half else (1, 6, 38, 41)
)
if self.gpu_mem is not None and self.gpu_mem <= 4:
x_pad, x_query, x_center, x_max = (1, 5, 30, 32)
return x_pad, x_query, x_center, x_max
def _configure_gpu(self):
self.gpu_name = torch.cuda.get_device_name(self.device)
low_end_gpus = ["16", "P40", "P10", "1060", "1070", "1080"]
if (
any(gpu in self.gpu_name for gpu in low_end_gpus)
and "V100" not in self.gpu_name.upper()
):
self.is_half = False
self.gpu_mem = int(
torch.cuda.get_device_properties(self.device).total_memory
/ 1024
/ 1024
/ 1024
+ 0.4
)
# Загрузка модели Hubert
def load_hubert(device, is_half, model_path):
models, saved_cfg, task = checkpoint_utils.load_model_ensemble_and_task(
[model_path], suffix=""
)
hubert = models[0].to(device)
hubert = hubert.half() if is_half else hubert.float()
hubert.eval()
return hubert
# Получение голосового преобразователя
def get_vc(device, is_half, config, model_path):
cpt = torch.load(model_path, map_location="cpu", weights_only=False)
if "config" not in cpt or "weight" not in cpt:
raise ValueError(
f"Некорректный формат для {model_path}. "
"Используйте голосовую модель, обученную с использованием RVC v2."
)
tgt_sr = cpt["config"][-1]
cpt["config"][-3] = cpt["weight"]["emb_g.weight"].shape[0]
pitch_guidance = cpt.get("f0", 1)
version = cpt.get("version", "v1")
input_dim = 768 if version == "v2" else 256
net_g = Synthesizer(
*cpt["config"],
use_f0=pitch_guidance,
input_dim=input_dim,
is_half=is_half,
)
del net_g.enc_q
print(net_g.load_state_dict(cpt["weight"], strict=False))
net_g.eval().to(device)
net_g = net_g.half() if is_half else net_g.float()
vc = VC(tgt_sr, config)
return cpt, version, net_g, tgt_sr, vc
def rvc_infer(
index_path,
index_rate,
input_path,
output_path,
pitch,
f0_method,
cpt,
version,
net_g,
filter_radius,
tgt_sr,
volume_envelope,
protect,
hop_length,
vc,
hubert_model,
f0_min=50,
f0_max=1100,
format_output="wav",
output_bitrate="320k",
stereo_mode="mono"
):
mid, left, right = load_audio(input_path, 16000, stereo_mode)
pitch_guidance = cpt.get("f0", 1)
if stereo_mode == "mono":
if mid is None:
raise ValueError("Mono audio data is None")
audio_opt = vc.pipeline(
hubert_model,
net_g,
0,
mid,
input_path,
pitch,
f0_method,
index_path,
index_rate,
pitch_guidance,
filter_radius,
tgt_sr,
0,
volume_envelope,
version,
protect,
hop_length,
f0_file=None,
f0_min=f0_min,
f0_max=f0_max,
)
elif stereo_mode == "left/right":
if left is None or right is None:
raise ValueError("Left or right audio channel is None")
left_audio_opt = vc.pipeline(
hubert_model,
net_g,
0,
left,
input_path,
pitch,
f0_method,
index_path,
index_rate,
pitch_guidance,
filter_radius,
tgt_sr,
0,
volume_envelope,
version,
protect,
hop_length,
f0_file=None,
f0_min=f0_min,
f0_max=f0_max,
)
right_audio_opt = vc.pipeline(
hubert_model,
net_g,
0,
right,
input_path,
pitch,
f0_method,
index_path,
index_rate,
pitch_guidance,
filter_radius,
tgt_sr,
0,
volume_envelope,
version,
protect,
hop_length,
f0_file=None,
f0_min=f0_min,
f0_max=f0_max,
)
# Ensure both channels have the same length
min_len = min(len(left_audio_opt), len(right_audio_opt))
if min_len == 0:
raise ValueError("Processed audio is empty")
left_audio_opt = left_audio_opt[:min_len]
right_audio_opt = right_audio_opt[:min_len]
audio_opt = np.stack((left_audio_opt, right_audio_opt), axis=0)
elif stereo_mode == "sim/dif":
if mid is None or left is None or right is None:
raise ValueError("Mid, left or right audio channel is None")
mid_audio_opt = vc.pipeline(
hubert_model,
net_g,
0,
mid,
input_path,
pitch,
f0_method,
index_path,
index_rate,
pitch_guidance,
filter_radius,
tgt_sr,
0,
volume_envelope,
version,
protect,
hop_length,
f0_file=None,
f0_min=f0_min,
f0_max=f0_max,
)
left_audio_opt = vc.pipeline(
hubert_model,
net_g,
0,
left,
input_path,
pitch,
f0_method,
index_path,
index_rate,
pitch_guidance,
filter_radius,
tgt_sr,
0,
volume_envelope,
version,
protect,
hop_length,
f0_file=None,
f0_min=f0_min,
f0_max=f0_max,
)
right_audio_opt = vc.pipeline(
hubert_model,
net_g,
0,
right,
input_path,
pitch,
f0_method,
index_path,
index_rate,
pitch_guidance,
filter_radius,
tgt_sr,
0,
volume_envelope,
version,
protect,
hop_length,
f0_file=None,
f0_min=f0_min,
f0_max=f0_max,
)
# Ensure all channels have the same length
min_len = min(len(mid_audio_opt), len(left_audio_opt), len(right_audio_opt))
if min_len == 0:
raise ValueError("Processed audio is empty")
mid_audio_opt = mid_audio_opt[:min_len]
left_audio_opt = left_audio_opt[:min_len]
right_audio_opt = right_audio_opt[:min_len]
dif_audio_opt = np.stack((left_audio_opt, right_audio_opt), axis=0)
audio_opt = overlay_mono_on_stereo(mid_audio_opt, dif_audio_opt)
write_audio_file(output_path, audio_opt, tgt_sr, format_output, output_bitrate)
return output_path
'''
pipeline = '''
import os
import gc
import torch
import torch.nn.functional as F
import torchcrepe
import faiss
import librosa
import numpy as np
from scipy import signal
from vbach.lib.predictors.FCPE import FCPEF0Predictor
from vbach.lib.predictors.RMVPE import RMVPE0Predictor
PREDICTORS_DIR = os.path.join(os.getcwd(), "vbach", "models", "predictors")
RMVPE_DIR = os.path.join(PREDICTORS_DIR, "rmvpe.pt")
FCPE_DIR = os.path.join(PREDICTORS_DIR, "fcpe.pt")
# Фильтр Баттерворта для высоких частот
FILTER_ORDER = 5 # Порядок фильтра
CUTOFF_FREQUENCY = 48 # Частота среза (в Гц)
SAMPLE_RATE = 16000 # Частота дискретизации (в Гц)
bh, ah = signal.butter(N=FILTER_ORDER, Wn=CUTOFF_FREQUENCY, btype="high", fs=SAMPLE_RATE)
input_audio_path2wav = {}
# Класс для обработки аудио
class AudioProcessor:
@staticmethod
def change_rms(source_audio, source_rate, target_audio, target_rate, rate):
"""
Изменяет RMS (среднеквадратичное значение) аудио.
"""
rms1 = librosa.feature.rms(
y=source_audio,
frame_length=source_rate // 2 * 2,
hop_length=source_rate // 2,
)
rms2 = librosa.feature.rms(
y=target_audio,
frame_length=target_rate // 2 * 2,
hop_length=target_rate // 2,
)
rms1 = F.interpolate(
torch.from_numpy(rms1).float().unsqueeze(0),
size=target_audio.shape[0],
mode="linear",
).squeeze()
rms2 = F.interpolate(
torch.from_numpy(rms2).float().unsqueeze(0),
size=target_audio.shape[0],
mode="linear",
).squeeze()
rms2 = torch.maximum(rms2, torch.zeros_like(rms2) + 1e-6)
adjusted_audio = (
target_audio * (torch.pow(rms1, 1 - rate) * torch.pow(rms2, rate - 1)).numpy()
)
return adjusted_audio
# Класс для преобразования голоса
class VC:
def __init__(self, tgt_sr, config):
"""
Инициализация параметров для преобразования голоса.
"""
self.x_pad = config.x_pad
self.x_query = config.x_query
self.x_center = config.x_center
self.x_max = config.x_max
self.is_half = config.is_half
self.sample_rate = 16000
self.window = 160
self.t_pad = self.sample_rate * self.x_pad
self.t_pad_tgt = tgt_sr * self.x_pad
self.t_pad2 = self.t_pad * 2
self.t_query = self.sample_rate * self.x_query
self.t_center = self.sample_rate * self.x_center
self.t_max = self.sample_rate * self.x_max
self.time_step = self.window / self.sample_rate * 1000
self.device = config.device
def get_f0_crepe(self, x, f0_min, f0_max, p_len, hop_length, model="full"):
"""
Получает F0 с использованием модели crepe.
"""
x = x.astype(np.float32)
x /= np.quantile(np.abs(x), 0.999)
audio = torch.from_numpy(x).to(self.device, copy=True).unsqueeze(0)
if audio.ndim == 2 and audio.shape[0] > 1:
audio = torch.mean(audio, dim=0, keepdim=True)
pitch = torchcrepe.predict(
audio,
self.sample_rate,
hop_length,
f0_min,
f0_max,
model,
batch_size=hop_length * 2,
device=self.device,
pad=True,
)
p_len = p_len or x.shape[0] // hop_length
source = np.array(pitch.squeeze(0).cpu().float().numpy())
source[source < 0.001] = np.nan
target = np.interp(
np.arange(0, len(source) * p_len, len(source)) / p_len,
np.arange(0, len(source)),
source,
)
f0 = np.nan_to_num(target)
return f0
def get_f0_rmvpe(self, x, f0_min=1, f0_max=40000, *args, **kwargs):
"""
Получает F0 с использованием модели rmvpe.
"""
if not hasattr(self, "model_rmvpe"):
self.model_rmvpe = RMVPE0Predictor(
RMVPE_DIR, is_half=self.is_half, device=self.device
)
f0 = self.model_rmvpe.infer_from_audio_with_pitch(
x, thred=0.03, f0_min=f0_min, f0_max=f0_max
)
return f0
def get_f0(
self,
input_audio_path,
x,
p_len,
pitch,
f0_method,
filter_radius,
hop_length,
inp_f0=None,
f0_min=50,
f0_max=1100,
):
"""
Получает F0 с использованием выбранного метода.
"""
global input_audio_path2wav
f0_mel_min = 1127 * np.log(1 + f0_min / 700)
f0_mel_max = 1127 * np.log(1 + f0_max / 700)
if f0_method == "mangio-crepe":
f0 = self.get_f0_crepe(x, f0_min, f0_max, p_len, int(hop_length))
elif f0_method == "rmvpe+":
params = {
"x": x,
"p_len": p_len,
"pitch": pitch,
"f0_min": f0_min,
"f0_max": f0_max,
"time_step": self.time_step,
"filter_radius": filter_radius,
"crepe_hop_length": int(hop_length),
"model": "full",
}
f0 = self.get_f0_rmvpe(**params)
elif f0_method == "fcpe":
self.model_fcpe = FCPEF0Predictor(
FCPE_DIR,
f0_min=int(f0_min),
f0_max=int(f0_max),
dtype=torch.float32,
device=self.device,
sample_rate=self.sample_rate,
threshold=0.03,
)
f0 = self.model_fcpe.compute_f0(x, p_len=p_len)
del self.model_fcpe
gc.collect()
f0 *= pow(2, pitch / 12)
tf0 = self.sample_rate // self.window
if inp_f0 is not None:
delta_t = np.round(
(inp_f0[:, 0].max() - inp_f0[:, 0].min()) * tf0 + 1
).astype("int16")
replace_f0 = np.interp(list(range(delta_t)), inp_f0[:, 0] * 100, inp_f0[:, 1])
shape = f0[self.x_pad * tf0 : self.x_pad * tf0 + len(replace_f0)].shape[0]
f0[self.x_pad * tf0 : self.x_pad * tf0 + len(replace_f0)] = replace_f0[:shape]
f0bak = f0.copy()
f0_mel = 1127 * np.log(1 + f0 / 700)
f0_mel[f0_mel > 0] = (f0_mel[f0_mel > 0] - f0_mel_min) * 254 / (
f0_mel_max - f0_mel_min
) + 1
f0_mel[f0_mel <= 1] = 1
f0_mel[f0_mel > 255] = 255
f0_coarse = np.rint(f0_mel).astype(int)
return f0_coarse, f0bak
def vc(
self,
model,
net_g,
sid,
audio0,
pitch,
pitchf,
index,
big_npy,
index_rate,
version,
protect,
):
"""
Преобразует аудио с использованием модели.
"""
feats = torch.from_numpy(audio0)
feats = feats.half() if self.is_half else feats.float()
if feats.dim() == 2:
feats = feats.mean(-1)
assert feats.dim() == 1, feats.dim()
feats = feats.view(1, -1)
padding_mask = torch.BoolTensor(feats.shape).to(self.device).fill_(False)
inputs = {
"source": feats.to(self.device),
"padding_mask": padding_mask,
"output_layer": 9 if version == "v1" else 12,
}
with torch.no_grad():
logits = model.extract_features(**inputs)
feats = model.final_proj(logits[0]) if version == "v1" else logits[0]
if protect < 0.5 and pitch is not None and pitchf is not None:
feats0 = feats.clone()
if index is not None and big_npy is not None and index_rate != 0:
npy = feats[0].cpu().numpy()
npy = npy.astype("float32") if self.is_half else npy
score, ix = index.search(npy, k=8)
weight = np.square(1 / score)
weight /= weight.sum(axis=1, keepdims=True)
npy = np.sum(big_npy[ix] * np.expand_dims(weight, axis=2), axis=1)
npy = npy.astype("float16") if self.is_half else npy
feats = (
torch.from_numpy(npy).unsqueeze(0).to(self.device) * index_rate
+ (1 - index_rate) * feats
)
feats = F.interpolate(feats.permute(0, 2, 1), scale_factor=2).permute(0, 2, 1)
if protect < 0.5 and pitch is not None and pitchf is not None:
feats0 = F.interpolate(feats0.permute(0, 2, 1), scale_factor=2).permute(
0, 2, 1
)
p_len = audio0.shape[0] // self.window
if feats.shape[1] < p_len:
p_len = feats.shape[1]
if pitch is not None and pitchf is not None:
pitch = pitch[:, :p_len]
pitchf = pitchf[:, :p_len]
if protect < 0.5 and pitch is not None and pitchf is not None:
pitchff = pitchf.clone()
pitchff[pitchf > 0] = 1
pitchff[pitchf < 1] = protect
pitchff = pitchff.unsqueeze(-1)
feats = feats * pitchff + feats0 * (1 - pitchff)
feats = feats.to(feats0.dtype)
p_len = torch.tensor([p_len], device=self.device).long()
with torch.no_grad():
if pitch is not None and pitchf is not None:
audio1 = (
(net_g.infer(feats, p_len, pitch, pitchf, sid)[0][0, 0])
.data.cpu()
.float()
.numpy()
)
else:
audio1 = (
(net_g.infer(feats, p_len, sid)[0][0, 0]).data.cpu().float().numpy()
)
del feats, p_len, padding_mask
if torch.cuda.is_available():
torch.cuda.empty_cache()
return audio1
def pipeline(
self,
model,
net_g,
sid,
audio,
input_audio_path,
pitch,
f0_method,
file_index,
index_rate,
pitch_guidance,
filter_radius,
tgt_sr,
resample_sr,
volume_envelope,
version,
protect,
hop_length,
f0_file,
f0_min=50,
f0_max=1100,
):
"""
Основной конвейер для преобразования аудио.
"""
if (
file_index is not None
and file_index != ""
and os.path.exists(file_index)
and index_rate != 0
):
try:
index = faiss.read_index(file_index)
big_npy = index.reconstruct_n(0, index.ntotal)
except Exception as e:
print(f"Произошла ошибка при чтении индекса FAISS: {e}")
index = big_npy = None
else:
index = big_npy = None
audio = signal.filtfilt(bh, ah, audio)
audio_pad = np.pad(audio, (self.window // 2, self.window // 2), mode="reflect")
opt_ts = []
if audio_pad.shape[0] > self.t_max:
audio_sum = np.zeros_like(audio)
for i in range(self.window):
audio_sum += audio_pad[i : i - self.window]
for t in range(self.t_center, audio.shape[0], self.t_center):
opt_ts.append(
t
- self.t_query
+ np.where(
np.abs(audio_sum[t - self.t_query : t + self.t_query])
== np.abs(audio_sum[t - self.t_query : t + self.t_query]).min()
)[0][0]
)
s = 0
audio_opt = []
t = None
audio_pad = np.pad(audio, (self.t_pad, self.t_pad), mode="reflect")
p_len = audio_pad.shape[0] // self.window
inp_f0 = None
if f0_file and hasattr(f0_file, "name"):
try:
with open(f0_file.name, "r") as f:
lines = f.read().strip("\\n").split("\\n")
inp_f0 = np.array(
[[float(i) for i in line.split(",")] for line in lines],
dtype="float32",
)
except Exception as e:
print(f"Произошла ошибка при чтении файла F0: {e}")
sid = torch.tensor(sid, device=self.device).unsqueeze(0).long()
if pitch_guidance:
pitch, pitchf = self.get_f0(
input_audio_path,
audio_pad,
p_len,
pitch,
f0_method,
filter_radius,
hop_length,
inp_f0,
f0_min,
f0_max,
)
pitch = pitch[:p_len]
pitchf = pitchf[:p_len]
if self.device == "mps":
pitchf = pitchf.astype(np.float32)
pitch = torch.tensor(pitch, device=self.device).unsqueeze(0).long()
pitchf = torch.tensor(pitchf, device=self.device).unsqueeze(0).float()
for t in opt_ts:
t = t // self.window * self.window
if pitch_guidance:
audio_opt.append(
self.vc(
model,
net_g,
sid,
audio_pad[s : t + self.t_pad2 + self.window],
pitch[:, s // self.window : (t + self.t_pad2) // self.window],
pitchf[:, s // self.window : (t + self.t_pad2) // self.window],
index,
big_npy,
index_rate,
version,
protect,
)[self.t_pad_tgt : -self.t_pad_tgt]
)
else:
audio_opt.append(
self.vc(
model,
net_g,
sid,
audio_pad[s : t + self.t_pad2 + self.window],
None,
None,
index,
big_npy,
index_rate,
version,
protect,
)[self.t_pad_tgt : -self.t_pad_tgt]
)
s = t
if pitch_guidance:
audio_opt.append(
self.vc(
model,
net_g,
sid,
audio_pad[t:],
pitch[:, t // self.window :] if t is not None else pitch,
pitchf[:, t // self.window :] if t is not None else pitchf,
index,
big_npy,
index_rate,
version,
protect,
)[self.t_pad_tgt : -self.t_pad_tgt]
)
else:
audio_opt.append(
self.vc(
model,
net_g,
sid,
audio_pad[t:],
None,
None,
index,
big_npy,
index_rate,
version,
protect,
)[self.t_pad_tgt : -self.t_pad_tgt]
)
audio_opt = np.concatenate(audio_opt)
if volume_envelope != 1:
audio_opt = AudioProcessor.change_rms(
audio, self.sample_rate, audio_opt, tgt_sr, volume_envelope
)
if resample_sr >= self.sample_rate and tgt_sr != resample_sr:
audio_opt = librosa.resample(audio_opt, orig_sr=tgt_sr, target_sr=resample_sr)
audio_max = np.abs(audio_opt).max() / 0.99
max_int16 = 32768
if audio_max > 1:
max_int16 /= audio_max
audio_opt = (audio_opt * max_int16).astype(np.int16)
del pitch, pitchf, sid
if torch.cuda.is_available():
torch.cuda.empty_cache()
return audio_opt
'''
for path, text in [[os.sep.join([current_dir, dirs[3], "infer.py"]), inference], [os.sep.join([current_dir, dirs[3], "pipeline.py"]), pipeline]]:
with open(path, 'w') as f:
f.write(text)
remove_center = '''
import numpy as np
from scipy import signal
def remove_center(input_array, samplerate, rdf=0.99999, window_size=2048, overlap=2, window_type="blackman", stereo_mode="stereo"):
# Validate input
# if input_array.ndim != 2 or input_array.shape[1] != 2:
# raise ValueError("Input must be a stereo array with shape (samples, 2)")
left = input_array[0]
right = input_array[1]
# mono = np.mean(input_array, axis=1)
# Adjust window size if input is too short
nperseg = min(window_size, len(left))
if nperseg < 16: # Minimum reasonable window size
nperseg = 16
if len(left) < 16:
# For very short inputs, just return the original with warning
import warnings
warnings.warn(f"Input too short ({len(left)} samples), returning original audio")
return left, right, left, right
noverlap = nperseg // overlap # Ensure noverlap < nperseg
if noverlap >= nperseg:
noverlap = nperseg - 1 # Ensure at least 1 sample difference
# Compute STFT
f, t, Z_left = signal.stft(left, fs=samplerate, nperseg=nperseg, noverlap=noverlap, window=window_type)
f, t, Z_right = signal.stft(right, fs=samplerate, nperseg=nperseg, noverlap=noverlap, window=window_type)
# f, t, Z_mono = signal.stft(mono, fs=samplerate, nperseg=nperseg, noverlap=noverlap, window=window_type)
if stereo_mode == "mono":
Z_common_left = np.minimum(np.abs(Z_left), np.abs(Z_right)) * np.exp(1j*np.angle(Z_mono))
Z_common_right = np.minimum(np.abs(Z_left), np.abs(Z_right)) * np.exp(1j*np.angle(Z_mono))
else:
Z_common_left = np.minimum(np.abs(Z_left), np.abs(Z_right)) * np.exp(1j*np.angle(Z_right))
Z_common_right = np.minimum(np.abs(Z_left), np.abs(Z_right)) * np.exp(1j*np.angle(Z_left))
reduction_factor = rdf
Z_new_left = Z_left - Z_common_left * reduction_factor
Z_new_right = Z_right - Z_common_right * reduction_factor
# Compute ISTFT
_, new_left = signal.istft(Z_new_left, fs=samplerate, nperseg=nperseg, noverlap=noverlap, window=window_type)
_, new_right = signal.istft(Z_new_right, fs=samplerate, nperseg=nperseg, noverlap=noverlap, window=window_type)
_, common_signal_left = signal.istft(Z_common_left, fs=samplerate, nperseg=nperseg, noverlap=noverlap, window=window_type)
_, common_signal_right = signal.istft(Z_common_right, fs=samplerate, nperseg=nperseg, noverlap=noverlap, window=window_type)
# Trim to original length
new_left = new_left[:len(left)]
new_right = new_right[:len(right)]
common_signal_left = common_signal_left[:len(left)]
common_signal_right = common_signal_right[:len(left)]
# Normalize
peak = np.max([np.abs(new_left).max(), np.abs(new_right).max()])
if peak > 1.0:
new_left = new_left / peak
new_right = new_right / peak
inverted_center_left = -common_signal_left
inverted_center_right = -common_signal_right
mixed_left = left + inverted_center_left
mixed_right = right + inverted_center_right
peak_mixed = np.max([np.abs(mixed_left).max(), np.abs(mixed_right).max()])
if peak_mixed > 1.0:
mixed_left = mixed_left / peak_mixed
mixed_right = mixed_right / peak_mixed
return common_signal_left, common_signal_right, new_left, new_right
'''
for path, text in [[os.sep.join([current_dir, dirs[11], "remove_center.py"]), remove_center]]:
with open(path, 'w') as f:
f.write(text)
lib_algorithm = {
"synthesizers" : ["synthesizers.py", '''
import torch
from torch import nn
from torch.nn.utils.weight_norm import remove_weight_norm
from typing import Optional
from .commons import slice_segments, rand_slice_segments
from .encoders import TextEncoder, PosteriorEncoder
from .generators import Generator
from .nsf import GeneratorNSF
from .residuals import ResidualCouplingBlock
class Synthesizer(nn.Module):
def __init__(
self,
spec_channels,
segment_size,
inter_channels,
hidden_channels,
filter_channels,
n_heads,
n_layers,
kernel_size,
p_dropout,
resblock,
resblock_kernel_sizes,
resblock_dilation_sizes,
upsample_rates,
upsample_initial_channel,
upsample_kernel_sizes,
spk_embed_dim,
gin_channels,
sr,
use_f0,
input_dim=768,
**kwargs
):
super(Synthesizer, self).__init__()
self.spec_channels = spec_channels
self.inter_channels = inter_channels
self.hidden_channels = hidden_channels
self.filter_channels = filter_channels
self.n_heads = n_heads
self.n_layers = n_layers
self.kernel_size = kernel_size
self.p_dropout = float(p_dropout)
self.resblock = resblock
self.resblock_kernel_sizes = resblock_kernel_sizes
self.resblock_dilation_sizes = resblock_dilation_sizes
self.upsample_rates = upsample_rates
self.upsample_initial_channel = upsample_initial_channel
self.upsample_kernel_sizes = upsample_kernel_sizes
self.segment_size = segment_size
self.gin_channels = gin_channels
self.spk_embed_dim = spk_embed_dim
self.use_f0 = use_f0
self.enc_p = TextEncoder(
inter_channels,
hidden_channels,
filter_channels,
n_heads,
n_layers,
kernel_size,
float(p_dropout),
input_dim,
f0=use_f0,
)
if use_f0:
self.dec = GeneratorNSF(
inter_channels,
resblock,
resblock_kernel_sizes,
resblock_dilation_sizes,
upsample_rates,
upsample_initial_channel,
upsample_kernel_sizes,
gin_channels=gin_channels,
sr=sr,
is_half=kwargs["is_half"],
)
else:
self.dec = Generator(
inter_channels,
resblock,
resblock_kernel_sizes,
resblock_dilation_sizes,
upsample_rates,
upsample_initial_channel,
upsample_kernel_sizes,
gin_channels=gin_channels,
)
self.enc_q = PosteriorEncoder(
spec_channels,
inter_channels,
hidden_channels,
5,
1,
16,
gin_channels=gin_channels,
)
self.flow = ResidualCouplingBlock(
inter_channels, hidden_channels, 5, 1, 3, gin_channels=gin_channels
)
self.emb_g = nn.Embedding(self.spk_embed_dim, gin_channels)
def remove_weight_norm(self):
self.dec.remove_weight_norm()
self.flow.remove_weight_norm()
self.enc_q.remove_weight_norm()
def __prepare_scriptable__(self):
for hook in self.dec._forward_pre_hooks.values():
if (
hook.__module__ == "torch.nn.utils.parametrizations.weight_norm"
and hook.__class__.__name__ == "_WeightNorm"
):
remove_weight_norm(self.dec)
for hook in self.flow._forward_pre_hooks.values():
if (
hook.__module__ == "torch.nn.utils.parametrizations.weight_norm"
and hook.__class__.__name__ == "_WeightNorm"
):
remove_weight_norm(self.flow)
if hasattr(self, "enc_q"):
for hook in self.enc_q._forward_pre_hooks.values():
if (
hook.__module__ == "torch.nn.utils.parametrizations.weight_norm"
and hook.__class__.__name__ == "_WeightNorm"
):
remove_weight_norm(self.enc_q)
return self
@torch.jit.ignore
def forward(
self,
phone: torch.Tensor,
phone_lengths: torch.Tensor,
pitch: Optional[torch.Tensor] = None,
pitchf: Optional[torch.Tensor] = None,
y: torch.Tensor = None,
y_lengths: torch.Tensor = None,
ds: Optional[torch.Tensor] = None,
):
g = self.emb_g(ds).unsqueeze(-1)
m_p, logs_p, x_mask = self.enc_p(phone, pitch, phone_lengths)
if y is not None:
z, m_q, logs_q, y_mask = self.enc_q(y, y_lengths, g=g)
z_p = self.flow(z, y_mask, g=g)
z_slice, ids_slice = rand_slice_segments(z, y_lengths, self.segment_size)
if self.use_f0:
pitchf = slice_segments(pitchf, ids_slice, self.segment_size, 2)
o = self.dec(z_slice, pitchf, g=g)
else:
o = self.dec(z_slice, g=g)
return o, ids_slice, x_mask, y_mask, (z, z_p, m_p, logs_p, m_q, logs_q)
else:
return None, None, x_mask, None, (None, None, m_p, logs_p, None, None)
@torch.jit.export
def infer(
self,
phone: torch.Tensor,
phone_lengths: torch.Tensor,
pitch: Optional[torch.Tensor] = None,
nsff0: Optional[torch.Tensor] = None,
sid: torch.Tensor = None,
rate: Optional[torch.Tensor] = None,
):
g = self.emb_g(sid).unsqueeze(-1)
m_p, logs_p, x_mask = self.enc_p(phone, pitch, phone_lengths)
z_p = (m_p + torch.exp(logs_p) * torch.randn_like(m_p) * 0.66666) * x_mask
if rate is not None:
assert isinstance(rate, torch.Tensor)
head = int(z_p.shape[2] * (1.0 - rate.item()))
z_p = z_p[:, :, head:]
x_mask = x_mask[:, :, head:]
if self.use_f0:
nsff0 = nsff0[:, head:]
if self.use_f0:
z = self.flow(z_p, x_mask, g=g, reverse=True)
o = self.dec(z * x_mask, nsff0, g=g)
else:
z = self.flow(z_p, x_mask, g=g, reverse=True)
o = self.dec(z * x_mask, g=g)
return o, x_mask, (z, z_p, m_p, logs_p)
'''],
"residuals" : ["residuals.py", '''
import torch
from torch import nn
from torch.nn import functional as F
from torch.nn.utils.weight_norm import remove_weight_norm
from torch.nn.utils.parametrizations import weight_norm
from typing import Optional
from .commons import get_padding, init_weights
from .modules import WaveNet
LRELU_SLOPE = 0.1
def create_conv1d_layer(channels, kernel_size, dilation):
return weight_norm(
nn.Conv1d(
channels,
channels,
kernel_size,
1,
dilation=dilation,
padding=get_padding(kernel_size, dilation),
)
)
def apply_mask(tensor, mask):
return tensor * mask if mask is not None else tensor
class ResBlockBase(nn.Module):
def __init__(self, channels, kernel_size, dilations):
super(ResBlockBase, self).__init__()
self.convs1 = nn.ModuleList(
[create_conv1d_layer(channels, kernel_size, d) for d in dilations]
)
self.convs1.apply(init_weights)
self.convs2 = nn.ModuleList(
[create_conv1d_layer(channels, kernel_size, 1) for _ in dilations]
)
self.convs2.apply(init_weights)
def forward(self, x, x_mask=None):
for c1, c2 in zip(self.convs1, self.convs2):
xt = F.leaky_relu(x, LRELU_SLOPE)
xt = apply_mask(xt, x_mask)
xt = F.leaky_relu(c1(xt), LRELU_SLOPE)
xt = apply_mask(xt, x_mask)
xt = c2(xt)
x = xt + x
return apply_mask(x, x_mask)
def remove_weight_norm(self):
for conv in self.convs1 + self.convs2:
remove_weight_norm(conv)
class ResBlock1(ResBlockBase):
def __init__(self, channels, kernel_size=3, dilation=(1, 3, 5)):
super(ResBlock1, self).__init__(channels, kernel_size, dilation)
class ResBlock2(ResBlockBase):
def __init__(self, channels, kernel_size=3, dilation=(1, 3)):
super(ResBlock2, self).__init__(channels, kernel_size, dilation)
class Log(nn.Module):
def forward(self, x, x_mask, reverse=False, **kwargs):
if not reverse:
y = torch.log(torch.clamp_min(x, 1e-5)) * x_mask
logdet = torch.sum(-y, [1, 2])
return y, logdet
else:
x = torch.exp(x) * x_mask
return x
class Flip(nn.Module):
def forward(self, x, *args, reverse=False, **kwargs):
x = torch.flip(x, [1])
if not reverse:
logdet = torch.zeros(x.size(0)).to(dtype=x.dtype, device=x.device)
return x, logdet
else:
return x
class ElementwiseAffine(nn.Module):
def __init__(self, channels):
super().__init__()
self.channels = channels
self.m = nn.Parameter(torch.zeros(channels, 1))
self.logs = nn.Parameter(torch.zeros(channels, 1))
def forward(self, x, x_mask, reverse=False, **kwargs):
if not reverse:
y = self.m + torch.exp(self.logs) * x
y = y * x_mask
logdet = torch.sum(self.logs * x_mask, [1, 2])
return y, logdet
else:
x = (x - self.m) * torch.exp(-self.logs) * x_mask
return x
class ResidualCouplingBlock(nn.Module):
def __init__(
self,
channels,
hidden_channels,
kernel_size,
dilation_rate,
n_layers,
n_flows=4,
gin_channels=0,
):
super(ResidualCouplingBlock, self).__init__()
self.channels = channels
self.hidden_channels = hidden_channels
self.kernel_size = kernel_size
self.dilation_rate = dilation_rate
self.n_layers = n_layers
self.n_flows = n_flows
self.gin_channels = gin_channels
self.flows = nn.ModuleList()
for i in range(n_flows):
self.flows.append(
ResidualCouplingLayer(
channels,
hidden_channels,
kernel_size,
dilation_rate,
n_layers,
gin_channels=gin_channels,
mean_only=True,
)
)
self.flows.append(Flip())
def forward(
self,
x: torch.Tensor,
x_mask: torch.Tensor,
g: Optional[torch.Tensor] = None,
reverse: bool = False,
):
if not reverse:
for flow in self.flows:
x, _ = flow(x, x_mask, g=g, reverse=reverse)
else:
for flow in reversed(self.flows):
x = flow.forward(x, x_mask, g=g, reverse=reverse)
return x
def remove_weight_norm(self):
for i in range(self.n_flows):
self.flows[i * 2].remove_weight_norm()
def __prepare_scriptable__(self):
for i in range(self.n_flows):
for hook in self.flows[i * 2]._forward_pre_hooks.values():
if (
hook.__module__ == "torch.nn.utils.parametrizations.weight_norm"
and hook.__class__.__name__ == "_WeightNorm"
):
remove_weight_norm(self.flows[i * 2])
return self
class ResidualCouplingLayer(nn.Module):
def __init__(
self,
channels,
hidden_channels,
kernel_size,
dilation_rate,
n_layers,
p_dropout=0,
gin_channels=0,
mean_only=False,
):
assert channels % 2 == 0, "channels should be divisible by 2"
super().__init__()
self.channels = channels
self.hidden_channels = hidden_channels
self.kernel_size = kernel_size
self.dilation_rate = dilation_rate
self.n_layers = n_layers
self.half_channels = channels // 2
self.mean_only = mean_only
self.pre = nn.Conv1d(self.half_channels, hidden_channels, 1)
self.enc = WaveNet(
hidden_channels,
kernel_size,
dilation_rate,
n_layers,
p_dropout=p_dropout,
gin_channels=gin_channels,
)
self.post = nn.Conv1d(hidden_channels, self.half_channels * (2 - mean_only), 1)
self.post.weight.data.zero_()
self.post.bias.data.zero_()
def forward(self, x, x_mask, g=None, reverse=False):
x0, x1 = torch.split(x, [self.half_channels] * 2, 1)
h = self.pre(x0) * x_mask
h = self.enc(h, x_mask, g=g)
stats = self.post(h) * x_mask
if not self.mean_only:
m, logs = torch.split(stats, [self.half_channels] * 2, 1)
else:
m = stats
logs = torch.zeros_like(m)
if not reverse:
x1 = m + x1 * torch.exp(logs) * x_mask
x = torch.cat([x0, x1], 1)
logdet = torch.sum(logs, [1, 2])
return x, logdet
else:
x1 = (x1 - m) * torch.exp(-logs) * x_mask
x = torch.cat([x0, x1], 1)
return x
def remove_weight_norm(self):
self.enc.remove_weight_norm()
'''],
"nsf" : ["nsf.py", '''
import math
import torch
from torch import nn
from torch.nn import functional as F
from torch.nn.utils.weight_norm import remove_weight_norm
from torch.nn.utils.parametrizations import weight_norm
from typing import Optional
from .commons import init_weights
from .generators import SineGen
from .residuals import LRELU_SLOPE, ResBlock1, ResBlock2
class SourceModuleHnNSF(nn.Module):
def __init__(
self,
sample_rate,
harmonic_num=0,
sine_amp=0.1,
add_noise_std=0.003,
voiced_threshod=0,
is_half=True,
):
super(SourceModuleHnNSF, self).__init__()
self.sine_amp = sine_amp
self.noise_std = add_noise_std
self.is_half = is_half
self.l_sin_gen = SineGen(
sample_rate, harmonic_num, sine_amp, add_noise_std, voiced_threshod
)
self.l_linear = nn.Linear(harmonic_num + 1, 1)
self.l_tanh = nn.Tanh()
def forward(self, x: torch.Tensor, upsample_factor: int = 1):
sine_wavs, uv, _ = self.l_sin_gen(x, upsample_factor)
sine_wavs = sine_wavs.to(dtype=self.l_linear.weight.dtype)
sine_merge = self.l_tanh(self.l_linear(sine_wavs))
return sine_merge, None, None
class GeneratorNSF(nn.Module):
def __init__(
self,
initial_channel,
resblock,
resblock_kernel_sizes,
resblock_dilation_sizes,
upsample_rates,
upsample_initial_channel,
upsample_kernel_sizes,
gin_channels,
sr,
is_half=False,
):
super(GeneratorNSF, self).__init__()
self.num_kernels = len(resblock_kernel_sizes)
self.num_upsamples = len(upsample_rates)
self.f0_upsamp = nn.Upsample(scale_factor=math.prod(upsample_rates))
self.m_source = SourceModuleHnNSF(sample_rate=sr, harmonic_num=0, is_half=is_half)
self.conv_pre = nn.Conv1d(
initial_channel, upsample_initial_channel, 7, 1, padding=3
)
resblock_cls = ResBlock1 if resblock == "1" else ResBlock2
self.ups = nn.ModuleList()
self.noise_convs = nn.ModuleList()
channels = [
upsample_initial_channel // (2 ** (i + 1)) for i in range(len(upsample_rates))
]
stride_f0s = [
math.prod(upsample_rates[i + 1 :]) if i + 1 < len(upsample_rates) else 1
for i in range(len(upsample_rates))
]
for i, (u, k) in enumerate(zip(upsample_rates, upsample_kernel_sizes)):
self.ups.append(
weight_norm(
nn.ConvTranspose1d(
upsample_initial_channel // (2**i),
channels[i],
k,
u,
padding=(k - u) // 2,
)
)
)
self.noise_convs.append(
nn.Conv1d(
1,
channels[i],
kernel_size=(stride_f0s[i] * 2 if stride_f0s[i] > 1 else 1),
stride=stride_f0s[i],
padding=(stride_f0s[i] // 2 if stride_f0s[i] > 1 else 0),
)
)
self.resblocks = nn.ModuleList(
[
resblock_cls(channels[i], k, d)
for i in range(len(self.ups))
for k, d in zip(resblock_kernel_sizes, resblock_dilation_sizes)
]
)
self.conv_post = nn.Conv1d(channels[-1], 1, 7, 1, padding=3, bias=False)
self.ups.apply(init_weights)
if gin_channels != 0:
self.cond = nn.Conv1d(gin_channels, upsample_initial_channel, 1)
self.upp = math.prod(upsample_rates)
self.lrelu_slope = LRELU_SLOPE
def forward(self, x, f0, g: Optional[torch.Tensor] = None):
har_source, _, _ = self.m_source(f0, self.upp)
har_source = har_source.transpose(1, 2)
x = self.conv_pre(x)
if g is not None:
x = x + self.cond(g)
for i, (ups, noise_convs) in enumerate(zip(self.ups, self.noise_convs)):
x = F.leaky_relu(x, self.lrelu_slope)
x = ups(x)
x = x + noise_convs(har_source)
xs = sum(
[
resblock(x)
for j, resblock in enumerate(self.resblocks)
if j in range(i * self.num_kernels, (i + 1) * self.num_kernels)
]
)
x = xs / self.num_kernels
x = F.leaky_relu(x)
x = torch.tanh(self.conv_post(x))
return x
def remove_weight_norm(self):
for l in self.ups:
remove_weight_norm(l)
for l in self.resblocks:
l.remove_weight_norm()
def __prepare_scriptable__(self):
for l in self.ups:
for hook in l._forward_pre_hooks.values():
if (
hook.__module__ == "torch.nn.utils.parametrizations.weight_norm"
and hook.__class__.__name__ == "_WeightNorm"
):
remove_weight_norm(l)
for l in self.resblocks:
for hook in l._forward_pre_hooks.values():
if (
hook.__module__ == "torch.nn.utils.parametrizations.weight_norm"
and hook.__class__.__name__ == "_WeightNorm"
):
remove_weight_norm(l)
return self
'''],
"normalization" : ["normalization.py", '''
import torch
from torch import nn
from torch.nn import functional as F
class LayerNorm(nn.Module):
def __init__(self, channels, eps=1e-5):
super().__init__()
self.eps = eps
self.gamma = nn.Parameter(torch.ones(channels))
self.beta = nn.Parameter(torch.zeros(channels))
def forward(self, x):
x = x.transpose(1, -1)
x = F.layer_norm(x, (x.size(-1),), self.gamma, self.beta, self.eps)
return x.transpose(1, -1)
'''],
"modules" : ["modules.py", '''
import torch
from torch import nn
from torch.nn.utils.weight_norm import remove_weight_norm
from torch.nn.utils.parametrizations import weight_norm
from .commons import fused_add_tanh_sigmoid_multiply
class WaveNet(nn.Module):
def __init__(
self,
hidden_channels,
kernel_size,
dilation_rate,
n_layers,
gin_channels=0,
p_dropout=0,
):
super(WaveNet, self).__init__()
assert kernel_size % 2 == 1
self.hidden_channels = hidden_channels
self.kernel_size = (kernel_size,)
self.dilation_rate = dilation_rate
self.n_layers = n_layers
self.gin_channels = gin_channels
self.p_dropout = p_dropout
self.in_layers = nn.ModuleList()
self.res_skip_layers = nn.ModuleList()
self.drop = nn.Dropout(p_dropout)
if gin_channels != 0:
cond_layer = nn.Conv1d(gin_channels, 2 * hidden_channels * n_layers, 1)
self.cond_layer = weight_norm(cond_layer, name="weight")
dilations = [dilation_rate**i for i in range(n_layers)]
paddings = [(kernel_size * d - d) // 2 for d in dilations]
for i in range(n_layers):
in_layer = nn.Conv1d(
hidden_channels,
2 * hidden_channels,
kernel_size,
dilation=dilations[i],
padding=paddings[i],
)
in_layer = weight_norm(in_layer, name="weight")
self.in_layers.append(in_layer)
res_skip_channels = (
hidden_channels if i == n_layers - 1 else 2 * hidden_channels
)
res_skip_layer = nn.Conv1d(hidden_channels, res_skip_channels, 1)
res_skip_layer = weight_norm(res_skip_layer, name="weight")
self.res_skip_layers.append(res_skip_layer)
def forward(self, x, x_mask, g=None, **kwargs):
output = torch.zeros_like(x)
n_channels_tensor = torch.IntTensor([self.hidden_channels])
if g is not None:
g = self.cond_layer(g)
for i in range(self.n_layers):
x_in = self.in_layers[i](x)
if g is not None:
cond_offset = i * 2 * self.hidden_channels
g_l = g[:, cond_offset : cond_offset + 2 * self.hidden_channels, :]
else:
g_l = torch.zeros_like(x_in)
acts = fused_add_tanh_sigmoid_multiply(x_in, g_l, n_channels_tensor)
acts = self.drop(acts)
res_skip_acts = self.res_skip_layers[i](acts)
if i < self.n_layers - 1:
res_acts = res_skip_acts[:, : self.hidden_channels, :]
x = (x + res_acts) * x_mask
output = output + res_skip_acts[:, self.hidden_channels :, :]
else:
output = output + res_skip_acts
return output * x_mask
def remove_weight_norm(self):
if self.gin_channels != 0:
remove_weight_norm(self.cond_layer)
for l in self.in_layers:
remove_weight_norm(l)
for l in self.res_skip_layers:
remove_weight_norm(l)
'''],
"generators" : ["generators.py", '''
import torch
from torch import nn
from torch.nn import functional as F
from torch.nn.utils.weight_norm import remove_weight_norm
from torch.nn.utils.parametrizations import weight_norm
from typing import Optional
from .commons import init_weights
from .residuals import LRELU_SLOPE, ResBlock1, ResBlock2
class Generator(nn.Module):
def __init__(
self,
initial_channel,
resblock,
resblock_kernel_sizes,
resblock_dilation_sizes,
upsample_rates,
upsample_initial_channel,
upsample_kernel_sizes,
gin_channels=0,
):
super(Generator, self).__init__()
self.num_kernels = len(resblock_kernel_sizes)
self.num_upsamples = len(upsample_rates)
self.conv_pre = nn.Conv1d(
initial_channel, upsample_initial_channel, 7, 1, padding=3
)
resblock = ResBlock1 if resblock == "1" else ResBlock2
self.ups_and_resblocks = nn.ModuleList()
for i, (u, k) in enumerate(zip(upsample_rates, upsample_kernel_sizes)):
self.ups_and_resblocks.append(
weight_norm(
nn.ConvTranspose1d(
upsample_initial_channel // (2**i),
upsample_initial_channel // (2 ** (i + 1)),
k,
u,
padding=(k - u) // 2,
)
)
)
ch = upsample_initial_channel // (2 ** (i + 1))
for j, (k, d) in enumerate(
zip(resblock_kernel_sizes, resblock_dilation_sizes)
):
self.ups_and_resblocks.append(resblock(ch, k, d))
self.conv_post = nn.Conv1d(ch, 1, 7, 1, padding=3, bias=False)
self.ups_and_resblocks.apply(init_weights)
if gin_channels != 0:
self.cond = nn.Conv1d(gin_channels, upsample_initial_channel, 1)
def forward(self, x: torch.Tensor, g: Optional[torch.Tensor] = None):
x = self.conv_pre(x)
if g is not None:
x = x + self.cond(g)
resblock_idx = 0
for _ in range(self.num_upsamples):
x = F.leaky_relu(x, LRELU_SLOPE)
x = self.ups_and_resblocks[resblock_idx](x)
resblock_idx += 1
xs = 0
for _ in range(self.num_kernels):
xs += self.ups_and_resblocks[resblock_idx](x)
resblock_idx += 1
x = xs / self.num_kernels
x = F.leaky_relu(x)
x = self.conv_post(x)
x = torch.tanh(x)
return x
def __prepare_scriptable__(self):
for l in self.ups_and_resblocks:
for hook in l._forward_pre_hooks.values():
if (
hook.__module__ == "torch.nn.utils.parametrizations.weight_norm"
and hook.__class__.__name__ == "_WeightNorm"
):
remove_weight_norm(l)
return self
def remove_weight_norm(self):
for l in self.ups_and_resblocks:
remove_weight_norm(l)
class SineGen(nn.Module):
def __init__(
self,
samp_rate,
harmonic_num=0,
sine_amp=0.1,
noise_std=0.003,
voiced_threshold=0,
flag_for_pulse=False,
):
super(SineGen, self).__init__()
self.sine_amp = sine_amp
self.noise_std = noise_std
self.harmonic_num = harmonic_num
self.dim = self.harmonic_num + 1
self.sample_rate = samp_rate
self.voiced_threshold = voiced_threshold
def _f02uv(self, f0):
uv = torch.ones_like(f0)
uv = uv * (f0 > self.voiced_threshold)
return uv
def forward(self, f0: torch.Tensor, upp: int):
with torch.no_grad():
f0 = f0[:, None].transpose(1, 2)
f0_buf = torch.zeros(f0.shape[0], f0.shape[1], self.dim, device=f0.device)
f0_buf[:, :, 0] = f0[:, :, 0]
f0_buf[:, :, 1:] = (
f0_buf[:, :, 0:1]
* torch.arange(2, self.harmonic_num + 2, device=f0.device)[None, None, :]
)
rad_values = (f0_buf / float(self.sample_rate)) % 1
rand_ini = torch.rand(f0_buf.shape[0], f0_buf.shape[2], device=f0_buf.device)
rand_ini[:, 0] = 0
rad_values[:, 0, :] = rad_values[:, 0, :] + rand_ini
tmp_over_one = torch.cumsum(rad_values, 1)
tmp_over_one *= upp
tmp_over_one = F.interpolate(
tmp_over_one.transpose(2, 1),
scale_factor=float(upp),
mode="linear",
align_corners=True,
).transpose(2, 1)
rad_values = F.interpolate(
rad_values.transpose(2, 1), scale_factor=float(upp), mode="nearest"
).transpose(2, 1)
tmp_over_one %= 1
tmp_over_one_idx = (tmp_over_one[:, 1:, :] - tmp_over_one[:, :-1, :]) < 0
cumsum_shift = torch.zeros_like(rad_values)
cumsum_shift[:, 1:, :] = tmp_over_one_idx * -1.0
sine_waves = torch.sin(
torch.cumsum(rad_values + cumsum_shift, dim=1) * 2 * torch.pi
)
sine_waves = sine_waves * self.sine_amp
uv = self._f02uv(f0)
uv = F.interpolate(
uv.transpose(2, 1), scale_factor=float(upp), mode="nearest"
).transpose(2, 1)
noise_amp = uv * self.noise_std + (1 - uv) * self.sine_amp / 3
noise = noise_amp * torch.randn_like(sine_waves)
sine_waves = sine_waves * uv + noise
return sine_waves, uv, noise
'''],
"encoders" : ["encoders.py", '''
import math
import torch
from torch import nn
from torch.nn.utils.weight_norm import remove_weight_norm
from typing import Optional
from .attentions import FFN, MultiHeadAttention
from .commons import sequence_mask
from .modules import WaveNet
from .normalization import LayerNorm
class Encoder(nn.Module):
def __init__(
self,
hidden_channels,
filter_channels,
n_heads,
n_layers,
kernel_size=1,
p_dropout=0.0,
window_size=10,
**kwargs
):
super().__init__()
self.hidden_channels = hidden_channels
self.filter_channels = filter_channels
self.n_heads = n_heads
self.n_layers = n_layers
self.kernel_size = kernel_size
self.p_dropout = p_dropout
self.window_size = window_size
self.drop = nn.Dropout(p_dropout)
self.attn_layers = nn.ModuleList()
self.norm_layers_1 = nn.ModuleList()
self.ffn_layers = nn.ModuleList()
self.norm_layers_2 = nn.ModuleList()
for i in range(self.n_layers):
self.attn_layers.append(
MultiHeadAttention(
hidden_channels,
hidden_channels,
n_heads,
p_dropout=p_dropout,
window_size=window_size,
)
)
self.norm_layers_1.append(LayerNorm(hidden_channels))
self.ffn_layers.append(
FFN(
hidden_channels,
hidden_channels,
filter_channels,
kernel_size,
p_dropout=p_dropout,
)
)
self.norm_layers_2.append(LayerNorm(hidden_channels))
def forward(self, x, x_mask):
attn_mask = x_mask.unsqueeze(2) * x_mask.unsqueeze(-1)
x = x * x_mask
for i in range(self.n_layers):
y = self.attn_layers[i](x, x, attn_mask)
y = self.drop(y)
x = self.norm_layers_1[i](x + y)
y = self.ffn_layers[i](x, x_mask)
y = self.drop(y)
x = self.norm_layers_2[i](x + y)
x = x * x_mask
return x
class TextEncoder(nn.Module):
def __init__(
self,
out_channels,
hidden_channels,
filter_channels,
n_heads,
n_layers,
kernel_size,
p_dropout,
embedding_dim,
f0=True,
):
super(TextEncoder, self).__init__()
self.out_channels = out_channels
self.hidden_channels = hidden_channels
self.filter_channels = filter_channels
self.n_heads = n_heads
self.n_layers = n_layers
self.kernel_size = kernel_size
self.p_dropout = float(p_dropout)
self.emb_phone = nn.Linear(embedding_dim, hidden_channels)
self.lrelu = nn.LeakyReLU(0.1, inplace=True)
if f0:
self.emb_pitch = nn.Embedding(256, hidden_channels)
self.encoder = Encoder(
hidden_channels,
filter_channels,
n_heads,
n_layers,
kernel_size,
float(p_dropout),
)
self.proj = nn.Conv1d(hidden_channels, out_channels * 2, 1)
def forward(
self, phone: torch.Tensor, pitch: Optional[torch.Tensor], lengths: torch.Tensor
):
if pitch is None:
x = self.emb_phone(phone)
else:
x = self.emb_phone(phone) + self.emb_pitch(pitch)
x = x * math.sqrt(self.hidden_channels)
x = self.lrelu(x)
x = torch.transpose(x, 1, -1)
x_mask = torch.unsqueeze(sequence_mask(lengths, x.size(2)), 1).to(x.dtype)
x = self.encoder(x * x_mask, x_mask)
stats = self.proj(x) * x_mask
m, logs = torch.split(stats, self.out_channels, dim=1)
return m, logs, x_mask
class PosteriorEncoder(nn.Module):
def __init__(
self,
in_channels,
out_channels,
hidden_channels,
kernel_size,
dilation_rate,
n_layers,
gin_channels=0,
):
super(PosteriorEncoder, self).__init__()
self.in_channels = in_channels
self.out_channels = out_channels
self.hidden_channels = hidden_channels
self.kernel_size = kernel_size
self.dilation_rate = dilation_rate
self.n_layers = n_layers
self.gin_channels = gin_channels
self.pre = nn.Conv1d(in_channels, hidden_channels, 1)
self.enc = WaveNet(
hidden_channels,
kernel_size,
dilation_rate,
n_layers,
gin_channels=gin_channels,
)
self.proj = nn.Conv1d(hidden_channels, out_channels * 2, 1)
def forward(
self, x: torch.Tensor, x_lengths: torch.Tensor, g: Optional[torch.Tensor] = None
):
x_mask = torch.unsqueeze(sequence_mask(x_lengths, x.size(2)), 1).to(x.dtype)
x = self.pre(x) * x_mask
x = self.enc(x, x_mask, g=g)
stats = self.proj(x) * x_mask
m, logs = torch.split(stats, self.out_channels, dim=1)
z = (m + torch.randn_like(m) * torch.exp(logs)) * x_mask
return z, m, logs, x_mask
def remove_weight_norm(self):
self.enc.remove_weight_norm()
def __prepare_scriptable__(self):
for hook in self.enc._forward_pre_hooks.values():
if (
hook.__module__ == "torch.nn.utils.parametrizations.weight_norm"
and hook.__class__.__name__ == "_WeightNorm"
):
remove_weight_norm(self.enc)
return self
'''],
"discriminators" : ["discriminators.py", '''
import torch
from torch import nn
from torch.nn import functional as F
from torch.nn.utils.parametrizations import spectral_norm, weight_norm
from .commons import get_padding
from .residuals import LRELU_SLOPE
PERIODS_V1 = [2, 3, 5, 7, 11, 17]
PERIODS_V2 = [2, 3, 5, 7, 11, 17, 23, 37]
IN_CHANNELS = [1, 32, 128, 512, 1024]
OUT_CHANNELS = [32, 128, 512, 1024, 1024]
class MultiPeriodDiscriminator(nn.Module):
def __init__(self, use_spectral_norm=False):
super(MultiPeriodDiscriminator, self).__init__()
self.discriminators = nn.ModuleList(
[DiscriminatorS(use_spectral_norm=use_spectral_norm)]
+ [DiscriminatorP(p, use_spectral_norm=use_spectral_norm) for p in PERIODS_V1]
)
def forward(self, y, y_hat):
y_d_rs, y_d_gs, fmap_rs, fmap_gs = [], [], [], []
for d in self.discriminators:
y_d_r, fmap_r = d(y)
y_d_g, fmap_g = d(y_hat)
y_d_rs.append(y_d_r)
y_d_gs.append(y_d_g)
fmap_rs.append(fmap_r)
fmap_gs.append(fmap_g)
return y_d_rs, y_d_gs, fmap_rs, fmap_gs
class MultiPeriodDiscriminatorV2(nn.Module):
def __init__(self, use_spectral_norm=False):
super(MultiPeriodDiscriminatorV2, self).__init__()
self.discriminators = nn.ModuleList(
[DiscriminatorS(use_spectral_norm=use_spectral_norm)]
+ [DiscriminatorP(p, use_spectral_norm=use_spectral_norm) for p in PERIODS_V2]
)
def forward(self, y, y_hat):
y_d_rs, y_d_gs, fmap_rs, fmap_gs = [], [], [], []
for d in self.discriminators:
y_d_r, fmap_r = d(y)
y_d_g, fmap_g = d(y_hat)
y_d_rs.append(y_d_r)
y_d_gs.append(y_d_g)
fmap_rs.append(fmap_r)
fmap_gs.append(fmap_g)
return y_d_rs, y_d_gs, fmap_rs, fmap_gs
class DiscriminatorS(nn.Module):
def __init__(self, use_spectral_norm=False):
super(DiscriminatorS, self).__init__()
norm_f = spectral_norm if use_spectral_norm else weight_norm
self.convs = nn.ModuleList(
[
norm_f(nn.Conv1d(1, 16, 15, 1, padding=7)),
norm_f(nn.Conv1d(16, 64, 41, 4, groups=4, padding=20)),
norm_f(nn.Conv1d(64, 256, 41, 4, groups=16, padding=20)),
norm_f(nn.Conv1d(256, 1024, 41, 4, groups=64, padding=20)),
norm_f(nn.Conv1d(1024, 1024, 41, 4, groups=256, padding=20)),
norm_f(nn.Conv1d(1024, 1024, 5, 1, padding=2)),
]
)
self.conv_post = norm_f(nn.Conv1d(1024, 1, 3, 1, padding=1))
self.lrelu = nn.LeakyReLU(LRELU_SLOPE)
def forward(self, x):
fmap = []
for conv in self.convs:
x = self.lrelu(conv(x))
fmap.append(x)
x = self.conv_post(x)
fmap.append(x)
x = torch.flatten(x, 1, -1)
return x, fmap
class DiscriminatorP(nn.Module):
def __init__(self, period, kernel_size=5, stride=3, use_spectral_norm=False):
super(DiscriminatorP, self).__init__()
self.period = period
norm_f = spectral_norm if use_spectral_norm else weight_norm
self.convs = nn.ModuleList(
[
norm_f(
nn.Conv2d(
in_ch,
out_ch,
(kernel_size, 1),
(stride, 1),
padding=(get_padding(kernel_size, 1), 0),
)
)
for in_ch, out_ch in zip(IN_CHANNELS, OUT_CHANNELS)
]
)
self.conv_post = norm_f(nn.Conv2d(1024, 1, (3, 1), 1, padding=(1, 0)))
self.lrelu = nn.LeakyReLU(LRELU_SLOPE)
def forward(self, x):
fmap = []
b, c, t = x.shape
if t % self.period != 0:
n_pad = self.period - (t % self.period)
x = F.pad(x, (0, n_pad), "reflect")
x = x.view(b, c, -1, self.period)
for conv in self.convs:
x = self.lrelu(conv(x))
fmap.append(x)
x = self.conv_post(x)
fmap.append(x)
x = torch.flatten(x, 1, -1)
return x, fmap
'''],
"commons" : ["commons.py", '''
import math
import torch
from torch.nn import functional as F
from typing import List, Optional
def init_weights(m, mean=0.0, std=0.01):
classname = m.__class__.__name__
if classname.find("Conv") != -1:
m.weight.data.normal_(mean, std)
def get_padding(kernel_size, dilation=1):
return int((kernel_size * dilation - dilation) / 2)
def convert_pad_shape(pad_shape):
l = pad_shape[::-1]
pad_shape = [item for sublist in l for item in sublist]
return pad_shape
def kl_divergence(m_p, logs_p, m_q, logs_q):
kl = (logs_q - logs_p) - 0.5
kl += 0.5 * (torch.exp(2.0 * logs_p) + ((m_p - m_q) ** 2)) * torch.exp(-2.0 * logs_q)
return kl
def slice_segments(
x: torch.Tensor, ids_str: torch.Tensor, segment_size: int = 4, dim: int = 2
):
if dim == 2:
ret = torch.zeros_like(x[:, :segment_size])
elif dim == 3:
ret = torch.zeros_like(x[:, :, :segment_size])
for i in range(x.size(0)):
idx_str = ids_str[i].item()
idx_end = idx_str + segment_size
if dim == 2:
ret[i] = x[i, idx_str:idx_end]
else:
ret[i] = x[i, :, idx_str:idx_end]
return ret
def rand_slice_segments(x, x_lengths=None, segment_size=4):
b, d, t = x.size()
if x_lengths is None:
x_lengths = t
ids_str_max = x_lengths - segment_size + 1
ids_str = (torch.rand([b]).to(device=x.device) * ids_str_max).to(dtype=torch.long)
ret = slice_segments(x, ids_str, segment_size, dim=3)
return ret, ids_str
def get_timing_signal_1d(length, channels, min_timescale=1.0, max_timescale=1.0e4):
position = torch.arange(length, dtype=torch.float)
num_timescales = channels // 2
log_timescale_increment = math.log(float(max_timescale) / float(min_timescale)) / (
num_timescales - 1
)
inv_timescales = min_timescale * torch.exp(
torch.arange(num_timescales, dtype=torch.float) * -log_timescale_increment
)
scaled_time = position.unsqueeze(0) * inv_timescales.unsqueeze(1)
signal = torch.cat([torch.sin(scaled_time), torch.cos(scaled_time)], 0)
signal = F.pad(signal, [0, 0, 0, channels % 2])
signal = signal.view(1, channels, length)
return signal
def subsequent_mask(length):
mask = torch.tril(torch.ones(length, length)).unsqueeze(0).unsqueeze(0)
return mask
@torch.jit.script
def fused_add_tanh_sigmoid_multiply(input_a, input_b, n_channels):
n_channels_int = n_channels[0]
in_act = input_a + input_b
t_act = torch.tanh(in_act[:, :n_channels_int, :])
s_act = torch.sigmoid(in_act[:, n_channels_int:, :])
acts = t_act * s_act
return acts
def sequence_mask(length: torch.Tensor, max_length: Optional[int] = None):
if max_length is None:
max_length = length.max()
x = torch.arange(max_length, dtype=length.dtype, device=length.device)
return x.unsqueeze(0) < length.unsqueeze(1)
def clip_grad_value(parameters, clip_value, norm_type=2):
if isinstance(parameters, torch.Tensor):
parameters = [parameters]
parameters = List(filter(lambda p: p.grad is not None, parameters))
norm_type = float(norm_type)
if clip_value is not None:
clip_value = float(clip_value)
total_norm = 0
for p in parameters:
param_norm = p.grad.data.norm(norm_type)
total_norm += param_norm.item() ** norm_type
if clip_value is not None:
p.grad.data.clamp_(min=-clip_value, max=clip_value)
total_norm = total_norm ** (1.0 / norm_type)
return total_norm
'''],
"attentions" : ["attentions.py", '''
import math
import torch
from torch import nn
from torch.nn import functional as F
from .commons import convert_pad_shape
class MultiHeadAttention(nn.Module):
def __init__(
self,
channels,
out_channels,
n_heads,
p_dropout=0.0,
window_size=None,
heads_share=True,
block_length=None,
proximal_bias=False,
proximal_init=False,
):
super().__init__()
assert channels % n_heads == 0
self.channels = channels
self.out_channels = out_channels
self.n_heads = n_heads
self.p_dropout = p_dropout
self.window_size = window_size
self.heads_share = heads_share
self.block_length = block_length
self.proximal_bias = proximal_bias
self.proximal_init = proximal_init
self.attn = None
self.k_channels = channels // n_heads
self.conv_q = nn.Conv1d(channels, channels, 1)
self.conv_k = nn.Conv1d(channels, channels, 1)
self.conv_v = nn.Conv1d(channels, channels, 1)
self.conv_o = nn.Conv1d(channels, out_channels, 1)
self.drop = nn.Dropout(p_dropout)
if window_size is not None:
n_heads_rel = 1 if heads_share else n_heads
rel_stddev = self.k_channels**-0.5
self.emb_rel_k = nn.Parameter(
torch.randn(n_heads_rel, window_size * 2 + 1, self.k_channels)
* rel_stddev
)
self.emb_rel_v = nn.Parameter(
torch.randn(n_heads_rel, window_size * 2 + 1, self.k_channels)
* rel_stddev
)
nn.init.xavier_uniform_(self.conv_q.weight)
nn.init.xavier_uniform_(self.conv_k.weight)
nn.init.xavier_uniform_(self.conv_v.weight)
if proximal_init:
with torch.no_grad():
self.conv_k.weight.copy_(self.conv_q.weight)
self.conv_k.bias.copy_(self.conv_q.bias)
def forward(self, x, c, attn_mask=None):
q = self.conv_q(x)
k = self.conv_k(c)
v = self.conv_v(c)
x, self.attn = self.attention(q, k, v, mask=attn_mask)
x = self.conv_o(x)
return x
def attention(self, query, key, value, mask=None):
b, d, t_s, t_t = (*key.size(), query.size(2))
query = query.view(b, self.n_heads, self.k_channels, t_t).transpose(2, 3)
key = key.view(b, self.n_heads, self.k_channels, t_s).transpose(2, 3)
value = value.view(b, self.n_heads, self.k_channels, t_s).transpose(2, 3)
scores = torch.matmul(query / math.sqrt(self.k_channels), key.transpose(-2, -1))
if self.window_size is not None:
assert t_s == t_t, "Relative attention is only available for self-attention."
key_relative_embeddings = self._get_relative_embeddings(self.emb_rel_k, t_s)
rel_logits = self._matmul_with_relative_keys(
query / math.sqrt(self.k_channels), key_relative_embeddings
)
scores_local = self._relative_position_to_absolute_position(rel_logits)
scores = scores + scores_local
if self.proximal_bias:
assert t_s == t_t, "Proximal bias is only available for self-attention."
scores = scores + self._attention_bias_proximal(t_s).to(
device=scores.device, dtype=scores.dtype
)
if mask is not None:
scores = scores.masked_fill(mask == 0, -1e4)
if self.block_length is not None:
assert t_s == t_t, "Local attention is only available for self-attention."
block_mask = (
torch.ones_like(scores)
.triu(-self.block_length)
.tril(self.block_length)
)
scores = scores.masked_fill(block_mask == 0, -1e4)
p_attn = F.softmax(scores, dim=-1)
p_attn = self.drop(p_attn)
output = torch.matmul(p_attn, value)
if self.window_size is not None:
relative_weights = self._absolute_position_to_relative_position(p_attn)
value_relative_embeddings = self._get_relative_embeddings(self.emb_rel_v, t_s)
output = output + self._matmul_with_relative_values(
relative_weights, value_relative_embeddings
)
output = output.transpose(2, 3).contiguous().view(b, d, t_t)
return output, p_attn
def _matmul_with_relative_values(self, x, y):
ret = torch.matmul(x, y.unsqueeze(0))
return ret
def _matmul_with_relative_keys(self, x, y):
ret = torch.matmul(x, y.unsqueeze(0).transpose(-2, -1))
return ret
def _get_relative_embeddings(self, relative_embeddings, length):
pad_length = max(length - (self.window_size + 1), 0)
slice_start_position = max((self.window_size + 1) - length, 0)
slice_end_position = slice_start_position + 2 * length - 1
if pad_length > 0:
padded_relative_embeddings = F.pad(
relative_embeddings,
convert_pad_shape([[0, 0], [pad_length, pad_length], [0, 0]]),
)
else:
padded_relative_embeddings = relative_embeddings
used_relative_embeddings = padded_relative_embeddings[
:, slice_start_position:slice_end_position
]
return used_relative_embeddings
def _relative_position_to_absolute_position(self, x):
batch, heads, length, _ = x.size()
x = F.pad(x, convert_pad_shape([[0, 0], [0, 0], [0, 0], [0, 1]]))
x_flat = x.view([batch, heads, length * 2 * length])
x_flat = F.pad(x_flat, convert_pad_shape([[0, 0], [0, 0], [0, length - 1]]))
x_final = x_flat.view([batch, heads, length + 1, 2 * length - 1])[
:, :, :length, length - 1 :
]
return x_final
def _absolute_position_to_relative_position(self, x):
batch, heads, length, _ = x.size()
x = F.pad(x, convert_pad_shape([[0, 0], [0, 0], [0, 0], [0, length - 1]]))
x_flat = x.view([batch, heads, length**2 + length * (length - 1)])
x_flat = F.pad(x_flat, convert_pad_shape([[0, 0], [0, 0], [length, 0]]))
x_final = x_flat.view([batch, heads, length, 2 * length])[:, :, :, 1:]
return x_final
def _attention_bias_proximal(self, length):
r = torch.arange(length, dtype=torch.float32)
diff = torch.unsqueeze(r, 0) - torch.unsqueeze(r, 1)
return torch.unsqueeze(torch.unsqueeze(-torch.log1p(torch.abs(diff)), 0), 0)
class FFN(nn.Module):
def __init__(
self,
in_channels,
out_channels,
filter_channels,
kernel_size,
p_dropout=0.0,
activation=None,
causal=False,
):
super().__init__()
self.in_channels = in_channels
self.out_channels = out_channels
self.filter_channels = filter_channels
self.kernel_size = kernel_size
self.p_dropout = p_dropout
self.activation = activation
self.causal = causal
if causal:
self.padding = self._causal_padding
else:
self.padding = self._same_padding
self.conv_1 = nn.Conv1d(in_channels, filter_channels, kernel_size)
self.conv_2 = nn.Conv1d(filter_channels, out_channels, kernel_size)
self.drop = nn.Dropout(p_dropout)
def forward(self, x, x_mask):
x = self.conv_1(self.padding(x * x_mask))
if self.activation == "gelu":
x = x * torch.sigmoid(1.702 * x)
else:
x = torch.relu(x)
x = self.drop(x)
x = self.conv_2(self.padding(x * x_mask))
return x * x_mask
def _causal_padding(self, x):
if self.kernel_size == 1:
return x
pad_l = self.kernel_size - 1
pad_r = 0
padding = [[0, 0], [0, 0], [pad_l, pad_r]]
x = F.pad(x, convert_pad_shape(padding))
return x
def _same_padding(self, x):
if self.kernel_size == 1:
return x
pad_l = (self.kernel_size - 1) // 2
pad_r = self.kernel_size // 2
padding = [[0, 0], [0, 0], [pad_l, pad_r]]
x = F.pad(x, convert_pad_shape(padding))
return x
'''],
"init" : ["__init__.py", '''
''']
}
with open(os.sep.join([current_dir, dirs[5], lib_algorithm["synthesizers"][0]]), 'w') as f:
f.write(lib_algorithm["synthesizers"][1])
with open(os.sep.join([current_dir, dirs[5], lib_algorithm["residuals"][0]]), 'w') as f:
f.write(lib_algorithm["residuals"][1])
with open(os.sep.join([current_dir, dirs[5], lib_algorithm["nsf"][0]]), 'w') as f:
f.write(lib_algorithm["nsf"][1])
with open(os.sep.join([current_dir, dirs[5], lib_algorithm["normalization"][0]]), 'w') as f:
f.write(lib_algorithm["normalization"][1])
with open(os.sep.join([current_dir, dirs[5], lib_algorithm["modules"][0]]), 'w') as f:
f.write(lib_algorithm["modules"][1])
with open(os.sep.join([current_dir, dirs[5], lib_algorithm["generators"][0]]), 'w') as f:
f.write(lib_algorithm["generators"][1])
with open(os.sep.join([current_dir, dirs[5], lib_algorithm["encoders"][0]]), 'w') as f:
f.write(lib_algorithm["encoders"][1])
with open(os.sep.join([current_dir, dirs[5], lib_algorithm["discriminators"][0]]), 'w') as f:
f.write(lib_algorithm["discriminators"][1])
with open(os.sep.join([current_dir, dirs[5], lib_algorithm["commons"][0]]), 'w') as f:
f.write(lib_algorithm["commons"][1])
with open(os.sep.join([current_dir, dirs[5], lib_algorithm["attentions"][0]]), 'w') as f:
f.write(lib_algorithm["attentions"][1])
with open(os.sep.join([current_dir, dirs[5], lib_algorithm["init"][0]]), 'w') as f:
f.write(lib_algorithm["init"][1])
RMVPE = '''
import torch
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
from librosa.filters import mel
from scipy.signal import get_window
from librosa.util import pad_center, tiny, normalize
def window_sumsquare(
window,
n_frames,
hop_length=200,
win_length=800,
n_fft=800,
dtype=np.float32,
norm=None,
):
if win_length is None:
win_length = n_fft
n = n_fft + hop_length * (n_frames - 1)
x = np.zeros(n, dtype=dtype)
win_sq = get_window(window, win_length, fftbins=True)
win_sq = normalize(win_sq, norm=norm) ** 2
win_sq = pad_center(win_sq, n_fft)
for i in range(n_frames):
sample = i * hop_length
x[sample : min(n, sample + n_fft)] += win_sq[: max(0, min(n_fft, n - sample))]
return x
class STFT(nn.Module):
def __init__(
self, filter_length=1024, hop_length=512, win_length=None, window="hann"
):
super(STFT, self).__init__()
self.filter_length = filter_length
self.hop_length = hop_length
self.win_length = win_length if win_length else filter_length
self.window = window
self.pad_amount = int(self.filter_length / 2)
scale = self.filter_length / self.hop_length
fourier_basis = np.fft.fft(np.eye(self.filter_length))
cutoff = int((self.filter_length / 2 + 1))
fourier_basis = np.vstack(
[np.real(fourier_basis[:cutoff, :]), np.imag(fourier_basis[:cutoff, :])]
)
forward_basis = torch.FloatTensor(fourier_basis[:, None, :])
inverse_basis = torch.FloatTensor(
np.linalg.pinv(scale * fourier_basis).T[:, None, :]
)
assert filter_length >= self.win_length
fft_window = get_window(window, self.win_length, fftbins=True)
fft_window = pad_center(fft_window, size=filter_length)
fft_window = torch.from_numpy(fft_window).float()
forward_basis *= fft_window
inverse_basis *= fft_window
self.register_buffer("forward_basis", forward_basis.float())
self.register_buffer("inverse_basis", inverse_basis.float())
def transform(self, input_data):
num_batches = input_data.shape[0]
num_samples = input_data.shape[-1]
input_data = input_data.view(num_batches, 1, num_samples)
input_data = F.pad(
input_data.unsqueeze(1),
(self.pad_amount, self.pad_amount, 0, 0, 0, 0),
mode="reflect",
).squeeze(1)
forward_transform = F.conv1d(
input_data, self.forward_basis, stride=self.hop_length, padding=0
)
cutoff = int((self.filter_length / 2) + 1)
real_part = forward_transform[:, :cutoff, :]
imag_part = forward_transform[:, cutoff:, :]
return torch.sqrt(real_part**2 + imag_part**2)
def inverse(self, magnitude, phase):
recombine_magnitude_phase = torch.cat(
[magnitude * torch.cos(phase), magnitude * torch.sin(phase)], dim=1
)
inverse_transform = F.conv_transpose1d(
recombine_magnitude_phase,
self.inverse_basis,
stride=self.hop_length,
padding=0,
)
if self.window is not None:
window_sum = window_sumsquare(
self.window,
magnitude.size(-1),
hop_length=self.hop_length,
win_length=self.win_length,
n_fft=self.filter_length,
dtype=np.float32,
)
approx_nonzero_indices = torch.from_numpy(
np.where(window_sum > tiny(window_sum))[0]
)
window_sum = torch.from_numpy(window_sum).to(inverse_transform.device)
inverse_transform[:, :, approx_nonzero_indices] /= window_sum[
approx_nonzero_indices
]
inverse_transform *= float(self.filter_length) / self.hop_length
inverse_transform = inverse_transform[..., self.pad_amount :]
inverse_transform = inverse_transform[..., : self.num_samples]
return inverse_transform.squeeze(1)
def forward(self, input_data):
self.magnitude, self.phase = self.transform(input_data)
return self.inverse(self.magnitude, self.phase)
class BiGRU(nn.Module):
def __init__(self, input_features, hidden_features, num_layers):
super(BiGRU, self).__init__()
self.gru = nn.GRU(
input_features,
hidden_features,
num_layers=num_layers,
batch_first=True,
bidirectional=True,
)
def forward(self, x):
return self.gru(x)[0]
class ConvBlockRes(nn.Module):
def __init__(self, in_channels, out_channels, momentum=0.01):
super(ConvBlockRes, self).__init__()
self.conv = nn.Sequential(
nn.Conv2d(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=(3, 3),
stride=(1, 1),
padding=(1, 1),
bias=False,
),
nn.BatchNorm2d(out_channels, momentum=momentum),
nn.ReLU(),
nn.Conv2d(
in_channels=out_channels,
out_channels=out_channels,
kernel_size=(3, 3),
stride=(1, 1),
padding=(1, 1),
bias=False,
),
nn.BatchNorm2d(out_channels, momentum=momentum),
nn.ReLU(),
)
self.shortcut = (
nn.Conv2d(in_channels, out_channels, (1, 1))
if in_channels != out_channels
else None
)
def forward(self, x):
out = self.conv(x)
if self.shortcut is not None:
x = self.shortcut(x)
return out + x
class ResEncoderBlock(nn.Module):
def __init__(self, in_channels, out_channels, kernel_size, n_blocks=1, momentum=0.01):
super(ResEncoderBlock, self).__init__()
self.conv = nn.ModuleList(
[
ConvBlockRes(
in_channels if i == 0 else out_channels, out_channels, momentum
)
for i in range(n_blocks)
]
)
self.pool = (
nn.AvgPool2d(kernel_size=kernel_size) if kernel_size is not None else None
)
def forward(self, x):
for conv in self.conv:
x = conv(x)
pooled = self.pool(x) if self.pool is not None else x
return pooled, x
class Encoder(nn.Module):
def __init__(
self,
in_channels,
in_size,
n_encoders,
kernel_size,
n_blocks,
out_channels=16,
momentum=0.01,
):
super(Encoder, self).__init__()
self.bn = nn.BatchNorm2d(in_channels, momentum=momentum)
self.layers = nn.ModuleList()
self.latent_channels = []
for _ in range(n_encoders):
self.layers.append(
ResEncoderBlock(
in_channels, out_channels, kernel_size, n_blocks, momentum=momentum
)
)
self.latent_channels.append([out_channels, in_size])
in_channels = out_channels
out_channels *= 2
in_size //= 2
self.out_size = in_size
self.out_channel = out_channels
def forward(self, x):
concat_tensors = []
x = self.bn(x)
for layer in self.layers:
x, pooled = layer(x)
concat_tensors.append(pooled)
return x, concat_tensors
class Intermediate(nn.Module):
def __init__(self, in_channels, out_channels, n_inters, n_blocks, momentum=0.01):
super(Intermediate, self).__init__()
self.layers = nn.ModuleList(
[
ResEncoderBlock(
in_channels if i == 0 else out_channels,
out_channels,
None,
n_blocks,
momentum,
)
for i in range(n_inters)
]
)
def forward(self, x):
for layer in self.layers:
_, x = layer(x)
return x
class ResDecoderBlock(nn.Module):
def __init__(self, in_channels, out_channels, stride, n_blocks=1, momentum=0.01):
super(ResDecoderBlock, self).__init__()
out_padding = (0, 1) if stride == (1, 2) else (1, 1)
self.conv1 = nn.Sequential(
nn.ConvTranspose2d(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=(3, 3),
stride=stride,
padding=(1, 1),
output_padding=out_padding,
bias=False,
),
nn.BatchNorm2d(out_channels, momentum=momentum),
nn.ReLU(),
)
self.conv2 = nn.ModuleList(
[
ConvBlockRes(
out_channels * 2 if i == 0 else out_channels, out_channels, momentum
)
for i in range(n_blocks)
]
)
def forward(self, x, concat_tensor):
x = self.conv1(x)
x = torch.cat((x, concat_tensor), dim=1)
for conv in self.conv2:
x = conv(x)
return x
class Decoder(nn.Module):
def __init__(self, in_channels, n_decoders, stride, n_blocks, momentum=0.01):
super(Decoder, self).__init__()
self.layers = nn.ModuleList()
for _ in range(n_decoders):
out_channels = in_channels // 2
self.layers.append(
ResDecoderBlock(in_channels, out_channels, stride, n_blocks, momentum)
)
in_channels = out_channels
def forward(self, x, concat_tensors):
for layer, concat_tensor in zip(self.layers, reversed(concat_tensors)):
x = layer(x, concat_tensor)
return x
class DeepUnet(nn.Module):
def __init__(
self,
kernel_size,
n_blocks,
en_de_layers=5,
inter_layers=4,
in_channels=1,
en_out_channels=16,
):
super(DeepUnet, self).__init__()
self.encoder = Encoder(
in_channels, 128, en_de_layers, kernel_size, n_blocks, en_out_channels
)
self.intermediate = Intermediate(
self.encoder.out_channel // 2,
self.encoder.out_channel,
inter_layers,
n_blocks,
)
self.decoder = Decoder(
self.encoder.out_channel, en_de_layers, kernel_size, n_blocks
)
def forward(self, x):
x, concat_tensors = self.encoder(x)
x = self.intermediate(x)
return self.decoder(x, concat_tensors)
class E2E(nn.Module):
def __init__(
self,
n_blocks,
n_gru,
kernel_size,
en_de_layers=5,
inter_layers=4,
in_channels=1,
en_out_channels=16,
):
super(E2E, self).__init__()
self.unet = DeepUnet(
kernel_size,
n_blocks,
en_de_layers,
inter_layers,
in_channels,
en_out_channels,
)
self.cnn = nn.Conv2d(en_out_channels, 3, (3, 3), padding=(1, 1))
if n_gru:
self.fc = nn.Sequential(
BiGRU(3 * 128, 256, n_gru),
nn.Linear(512, 360),
nn.Dropout(0.25),
nn.Sigmoid(),
)
else:
self.fc = nn.Sequential(
nn.Linear(3 * nn.N_MELS, nn.N_CLASS), nn.Dropout(0.25), nn.Sigmoid()
)
def forward(self, mel):
mel = mel.transpose(-1, -2).unsqueeze(1)
x = self.cnn(self.unet(mel)).transpose(1, 2).flatten(-2)
return self.fc(x)
class MelSpectrogram(nn.Module):
def __init__(
self,
is_half,
n_mel_channels,
sample_rate,
win_length,
hop_length,
n_fft=None,
mel_fmin=0,
mel_fmax=None,
clamp=1e-5,
):
super(MelSpectrogram, self).__init__()
n_fft = win_length if n_fft is None else n_fft
self.hann_window = {}
mel_basis = mel(
sr=sample_rate,
n_fft=n_fft,
n_mels=n_mel_channels,
fmin=mel_fmin,
fmax=mel_fmax,
htk=True,
)
self.register_buffer("mel_basis", torch.from_numpy(mel_basis).float())
self.n_fft = n_fft
self.hop_length = hop_length
self.win_length = win_length
self.sample_rate = sample_rate
self.n_mel_channels = n_mel_channels
self.clamp = clamp
self.is_half = is_half
def forward(self, audio, keyshift=0, speed=1, center=True):
factor = 2 ** (keyshift / 12)
n_fft_new = int(np.round(self.n_fft * factor))
win_length_new = int(np.round(self.win_length * factor))
hop_length_new = int(np.round(self.hop_length * speed))
keyshift_key = f"{keyshift}_{audio.device}"
if keyshift_key not in self.hann_window:
self.hann_window[keyshift_key] = torch.hann_window(win_length_new).to(
audio.device
)
if not hasattr(self, "stft"):
self.stft = STFT(
filter_length=n_fft_new,
hop_length=hop_length_new,
win_length=win_length_new,
window="hann",
).to(audio.device)
magnitude = self.stft.transform(audio)
if keyshift != 0:
size = self.n_fft // 2 + 1
resize = magnitude.size(1)
if resize < size:
magnitude = F.pad(magnitude, (0, 0, 0, size - resize))
magnitude = magnitude[:, :size, :] * self.win_length / win_length_new
mel_output = torch.matmul(self.mel_basis, magnitude)
if self.is_half:
mel_output = mel_output.half()
return torch.log(torch.clamp(mel_output, min=self.clamp))
class RMVPE0Predictor:
def __init__(self, model_path, is_half, device=None):
self.resample_kernel = {}
self.is_half = is_half
if device is None:
device = "cuda" if torch.cuda.is_available() else "cpu"
self.device = device
self.mel_extractor = MelSpectrogram(
is_half, 128, 16000, 1024, 160, None, 30, 8000
).to(device)
model = E2E(4, 1, (2, 2))
ckpt = torch.load(model_path, map_location="cpu", weights_only=True)
model.load_state_dict(ckpt)
model.eval()
if is_half:
model = model.half()
self.model = model.to(device)
self.cents_mapping = np.pad(20 * np.arange(360) + 1997.3794084376191, (4, 4))
def mel2hidden(self, mel):
with torch.no_grad():
n_frames = mel.shape[-1]
mel = mel.float()
padding = min(32 * ((n_frames - 1) // 32 + 1) - n_frames, n_frames)
mel = F.pad(mel, (0, padding), mode="reflect")
if self.is_half:
mel = mel.half()
hidden = self.model(mel)
return hidden[:, :n_frames]
def decode(self, hidden, thred=0.03):
cents_pred = self.to_local_average_cents(hidden, thred=thred)
f0 = 10 * (2 ** (cents_pred / 1200))
f0[f0 == 10] = 0
return f0
def infer_from_audio(self, audio, thred=0.03):
audio = torch.from_numpy(audio).float().to(self.device).unsqueeze(0)
mel = self.mel_extractor(audio, center=True)
hidden = self.mel2hidden(mel)
hidden = hidden.squeeze(0).cpu().numpy()
if self.is_half:
hidden = hidden.astype("float32")
return self.decode(hidden, thred=thred)
def infer_from_audio_with_pitch(self, audio, thred=0.03, f0_min=50, f0_max=1100):
audio = torch.from_numpy(audio).float().to(self.device).unsqueeze(0)
mel = self.mel_extractor(audio, center=True)
hidden = self.mel2hidden(mel)
hidden = hidden.squeeze(0).cpu().numpy()
if self.is_half:
hidden = hidden.astype("float32")
f0 = self.decode(hidden, thred=thred)
f0[(f0 < f0_min) | (f0 > f0_max)] = 0
return f0
def to_local_average_cents(self, salience, thred=0.05):
center = np.argmax(salience, axis=1)
salience = np.pad(salience, ((0, 0), (4, 4)))
center += 4
todo_salience = []
todo_cents_mapping = []
starts = center - 4
ends = center + 5
for idx in range(salience.shape[0]):
todo_salience.append(salience[:, starts[idx] : ends[idx]][idx])
todo_cents_mapping.append(self.cents_mapping[starts[idx] : ends[idx]])
todo_salience = np.array(todo_salience)
todo_cents_mapping = np.array(todo_cents_mapping)
product_sum = np.sum(todo_salience * todo_cents_mapping, 1)
weight_sum = np.sum(todo_salience, 1)
divided = product_sum / weight_sum
maxx = np.max(salience, axis=1)
divided[maxx <= thred] = 0
return divided
'''
with open(os.sep.join([current_dir, dirs[6], "RMVPE.py"]), 'w') as f:
f.write(RMVPE)
FCPE = '''
from typing import Union
import torch.nn.functional as F
import numpy as np
import torch
import torch.nn as nn
from torch.nn.utils.parametrizations import weight_norm
from torchaudio.transforms import Resample
import os
import librosa
import soundfile as sf
import torch.utils.data
from librosa.filters import mel as librosa_mel_fn
import math
from functools import partial
from einops import rearrange, repeat
from local_attention import LocalAttention
os.environ["LRU_CACHE_CAPACITY"] = "3"
def load_wav_to_torch(full_path, target_sr=None, return_empty_on_exception=False):
try:
data, sample_rate = sf.read(full_path, always_2d=True)
except Exception as error:
print(f"An error occurred loading {full_path}: {error}")
if return_empty_on_exception:
return [], sample_rate or target_sr or 48000
else:
raise
data = data[:, 0] if len(data.shape) > 1 else data
assert len(data) > 2
max_mag = (
-np.iinfo(data.dtype).min
if np.issubdtype(data.dtype, np.integer)
else max(np.amax(data), -np.amin(data))
)
max_mag = (
(2**31) + 1 if max_mag > (2**15) else ((2**15) + 1 if max_mag > 1.01 else 1.0)
)
data = torch.FloatTensor(data.astype(np.float32)) / max_mag
if (torch.isinf(data) | torch.isnan(data)).any() and return_empty_on_exception:
return [], sample_rate or target_sr or 48000
if target_sr is not None and sample_rate != target_sr:
data = torch.from_numpy(
librosa.core.resample(data.numpy(), orig_sr=sample_rate, target_sr=target_sr)
)
sample_rate = target_sr
return data, sample_rate
def dynamic_range_compression(x, C=1, clip_val=1e-5):
return np.log(np.clip(x, a_min=clip_val, a_max=None) * C)
def dynamic_range_decompression(x, C=1):
return np.exp(x) / C
def dynamic_range_compression_torch(x, C=1, clip_val=1e-5):
return torch.log(torch.clamp(x, min=clip_val) * C)
def dynamic_range_decompression_torch(x, C=1):
return torch.exp(x) / C
class STFT:
def __init__(
self,
sr=22050,
n_mels=80,
n_fft=1024,
win_size=1024,
hop_length=256,
fmin=20,
fmax=11025,
clip_val=1e-5,
):
self.target_sr = sr
self.n_mels = n_mels
self.n_fft = n_fft
self.win_size = win_size
self.hop_length = hop_length
self.fmin = fmin
self.fmax = fmax
self.clip_val = clip_val
self.mel_basis = {}
self.hann_window = {}
def get_mel(self, y, keyshift=0, speed=1, center=False, train=False):
sample_rate = self.target_sr
n_mels = self.n_mels
n_fft = self.n_fft
win_size = self.win_size
hop_length = self.hop_length
fmin = self.fmin
fmax = self.fmax
clip_val = self.clip_val
factor = 2 ** (keyshift / 12)
n_fft_new = int(np.round(n_fft * factor))
win_size_new = int(np.round(win_size * factor))
hop_length_new = int(np.round(hop_length * speed))
mel_basis = self.mel_basis if not train else {}
hann_window = self.hann_window if not train else {}
mel_basis_key = str(fmax) + "_" + str(y.device)
if mel_basis_key not in mel_basis:
mel = librosa_mel_fn(
sr=sample_rate, n_fft=n_fft, n_mels=n_mels, fmin=fmin, fmax=fmax
)
mel_basis[mel_basis_key] = torch.from_numpy(mel).float().to(y.device)
keyshift_key = str(keyshift) + "_" + str(y.device)
if keyshift_key not in hann_window:
hann_window[keyshift_key] = torch.hann_window(win_size_new).to(y.device)
pad_left = (win_size_new - hop_length_new) // 2
pad_right = max(
(win_size_new - hop_length_new + 1) // 2,
win_size_new - y.size(-1) - pad_left,
)
mode = "reflect" if pad_right < y.size(-1) else "constant"
y = torch.nn.functional.pad(y.unsqueeze(1), (pad_left, pad_right), mode=mode)
y = y.squeeze(1)
spec = torch.stft(
y,
n_fft_new,
hop_length=hop_length_new,
win_length=win_size_new,
window=hann_window[keyshift_key],
center=center,
pad_mode="reflect",
normalized=False,
onesided=True,
return_complex=True,
)
spec = torch.sqrt(spec.real.pow(2) + spec.imag.pow(2) + (1e-9))
if keyshift != 0:
size = n_fft // 2 + 1
resize = spec.size(1)
spec = (
F.pad(spec, (0, 0, 0, size - resize))
if resize < size
else spec[:, :size, :]
)
spec = spec * win_size / win_size_new
spec = torch.matmul(mel_basis[mel_basis_key], spec)
spec = dynamic_range_compression_torch(spec, clip_val=clip_val)
return spec
def __call__(self, audiopath):
audio, sr = load_wav_to_torch(audiopath, target_sr=self.target_sr)
spect = self.get_mel(audio.unsqueeze(0)).squeeze(0)
return spect
stft = STFT()
def softmax_kernel(
data, *, projection_matrix, is_query, normalize_data=True, eps=1e-4, device=None
):
b, h, *_ = data.shape
data_normalizer = (data.shape[-1] ** -0.25) if normalize_data else 1.0
ratio = projection_matrix.shape[0] ** -0.5
projection = repeat(projection_matrix, "j d -> b h j d", b=b, h=h)
projection = projection.type_as(data)
data_dash = torch.einsum("...id,...jd->...ij", (data_normalizer * data), projection)
diag_data = data**2
diag_data = torch.sum(diag_data, dim=-1)
diag_data = (diag_data / 2.0) * (data_normalizer**2)
diag_data = diag_data.unsqueeze(dim=-1)
if is_query:
data_dash = ratio * (
torch.exp(
data_dash - diag_data - torch.max(data_dash, dim=-1, keepdim=True).values
)
+ eps
)
else:
data_dash = ratio * (torch.exp(data_dash - diag_data + eps))
return data_dash.type_as(data)
def orthogonal_matrix_chunk(cols, qr_uniform_q=False, device=None):
unstructured_block = torch.randn((cols, cols), device=device)
q, r = torch.linalg.qr(unstructured_block.cpu(), mode="reduced")
q, r = map(lambda t: t.to(device), (q, r))
if qr_uniform_q:
d = torch.diag(r, 0)
q *= d.sign()
return q.t()
def exists(val):
return val is not None
def empty(tensor):
return tensor.numel() == 0
def default(val, d):
return val if exists(val) else d
def cast_tuple(val):
return (val,) if not isinstance(val, tuple) else val
class PCmer(nn.Module):
def __init__(
self,
num_layers,
num_heads,
dim_model,
dim_keys,
dim_values,
residual_dropout,
attention_dropout,
):
super().__init__()
self.num_layers = num_layers
self.num_heads = num_heads
self.dim_model = dim_model
self.dim_values = dim_values
self.dim_keys = dim_keys
self.residual_dropout = residual_dropout
self.attention_dropout = attention_dropout
self._layers = nn.ModuleList([_EncoderLayer(self) for _ in range(num_layers)])
def forward(self, phone, mask=None):
for layer in self._layers:
phone = layer(phone, mask)
return phone
class _EncoderLayer(nn.Module):
def __init__(self, parent: PCmer):
super().__init__()
self.conformer = ConformerConvModule(parent.dim_model)
self.norm = nn.LayerNorm(parent.dim_model)
self.dropout = nn.Dropout(parent.residual_dropout)
self.attn = SelfAttention(
dim=parent.dim_model, heads=parent.num_heads, causal=False
)
def forward(self, phone, mask=None):
phone = phone + (self.attn(self.norm(phone), mask=mask))
phone = phone + (self.conformer(phone))
return phone
def calc_same_padding(kernel_size):
pad = kernel_size // 2
return (pad, pad - (kernel_size + 1) % 2)
class Swish(nn.Module):
def forward(self, x):
return x * x.sigmoid()
class Transpose(nn.Module):
def __init__(self, dims):
super().__init__()
assert len(dims) == 2, "dims must be a tuple of two dimensions"
self.dims = dims
def forward(self, x):
return x.transpose(*self.dims)
class GLU(nn.Module):
def __init__(self, dim):
super().__init__()
self.dim = dim
def forward(self, x):
out, gate = x.chunk(2, dim=self.dim)
return out * gate.sigmoid()
class DepthWiseConv1d(nn.Module):
def __init__(self, chan_in, chan_out, kernel_size, padding):
super().__init__()
self.padding = padding
self.conv = nn.Conv1d(chan_in, chan_out, kernel_size, groups=chan_in)
def forward(self, x):
x = F.pad(x, self.padding)
return self.conv(x)
class ConformerConvModule(nn.Module):
def __init__(
self, dim, causal=False, expansion_factor=2, kernel_size=31, dropout=0.0
):
super().__init__()
inner_dim = dim * expansion_factor
padding = calc_same_padding(kernel_size) if not causal else (kernel_size - 1, 0)
self.net = nn.Sequential(
nn.LayerNorm(dim),
Transpose((1, 2)),
nn.Conv1d(dim, inner_dim * 2, 1),
GLU(dim=1),
DepthWiseConv1d(
inner_dim, inner_dim, kernel_size=kernel_size, padding=padding
),
Swish(),
nn.Conv1d(inner_dim, dim, 1),
Transpose((1, 2)),
nn.Dropout(dropout),
)
def forward(self, x):
return self.net(x)
def linear_attention(q, k, v):
if v is None:
out = torch.einsum("...ed,...nd->...ne", k, q)
return out
else:
k_cumsum = k.sum(dim=-2)
D_inv = 1.0 / (torch.einsum("...nd,...d->...n", q, k_cumsum.type_as(q)) + 1e-8)
context = torch.einsum("...nd,...ne->...de", k, v)
out = torch.einsum("...de,...nd,...n->...ne", context, q, D_inv)
return out
def gaussian_orthogonal_random_matrix(
nb_rows, nb_columns, scaling=0, qr_uniform_q=False, device=None
):
nb_full_blocks = int(nb_rows / nb_columns)
block_list = []
for _ in range(nb_full_blocks):
q = orthogonal_matrix_chunk(nb_columns, qr_uniform_q=qr_uniform_q, device=device)
block_list.append(q)
remaining_rows = nb_rows - nb_full_blocks * nb_columns
if remaining_rows > 0:
q = orthogonal_matrix_chunk(nb_columns, qr_uniform_q=qr_uniform_q, device=device)
block_list.append(q[:remaining_rows])
final_matrix = torch.cat(block_list)
if scaling == 0:
multiplier = torch.randn((nb_rows, nb_columns), device=device).norm(dim=1)
elif scaling == 1:
multiplier = math.sqrt((float(nb_columns))) * torch.ones(
(nb_rows,), device=device
)
else:
raise ValueError(f"Invalid scaling {scaling}")
return torch.diag(multiplier) @ final_matrix
class FastAttention(nn.Module):
def __init__(
self,
dim_heads,
nb_features=None,
ortho_scaling=0,
causal=False,
generalized_attention=False,
kernel_fn=nn.ReLU(),
qr_uniform_q=False,
no_projection=False,
):
super().__init__()
nb_features = default(nb_features, int(dim_heads * math.log(dim_heads)))
self.dim_heads = dim_heads
self.nb_features = nb_features
self.ortho_scaling = ortho_scaling
self.create_projection = partial(
gaussian_orthogonal_random_matrix,
nb_rows=self.nb_features,
nb_columns=dim_heads,
scaling=ortho_scaling,
qr_uniform_q=qr_uniform_q,
)
projection_matrix = self.create_projection()
self.register_buffer("projection_matrix", projection_matrix)
self.generalized_attention = generalized_attention
self.kernel_fn = kernel_fn
self.no_projection = no_projection
self.causal = causal
@torch.no_grad()
def redraw_projection_matrix(self):
projections = self.create_projection()
self.projection_matrix.copy_(projections)
del projections
def forward(self, q, k, v):
device = q.device
if self.no_projection:
q = q.softmax(dim=-1)
k = torch.exp(k) if self.causal else k.softmax(dim=-2)
else:
create_kernel = partial(
softmax_kernel, projection_matrix=self.projection_matrix, device=device
)
q = create_kernel(q, is_query=True)
k = create_kernel(k, is_query=False)
attn_fn = linear_attention if not self.causal else self.causal_linear_fn
if v is None:
out = attn_fn(q, k, None)
return out
else:
out = attn_fn(q, k, v)
return out
class SelfAttention(nn.Module):
def __init__(
self,
dim,
causal=False,
heads=8,
dim_head=64,
local_heads=0,
local_window_size=256,
nb_features=None,
feature_redraw_interval=1000,
generalized_attention=False,
kernel_fn=nn.ReLU(),
qr_uniform_q=False,
dropout=0.0,
no_projection=False,
):
super().__init__()
assert dim % heads == 0, "dimension must be divisible by number of heads"
dim_head = default(dim_head, dim // heads)
inner_dim = dim_head * heads
self.fast_attention = FastAttention(
dim_head,
nb_features,
causal=causal,
generalized_attention=generalized_attention,
kernel_fn=kernel_fn,
qr_uniform_q=qr_uniform_q,
no_projection=no_projection,
)
self.heads = heads
self.global_heads = heads - local_heads
self.local_attn = (
LocalAttention(
window_size=local_window_size,
causal=causal,
autopad=True,
dropout=dropout,
look_forward=int(not causal),
rel_pos_emb_config=(dim_head, local_heads),
)
if local_heads > 0
else None
)
self.to_q = nn.Linear(dim, inner_dim)
self.to_k = nn.Linear(dim, inner_dim)
self.to_v = nn.Linear(dim, inner_dim)
self.to_out = nn.Linear(inner_dim, dim)
self.dropout = nn.Dropout(dropout)
@torch.no_grad()
def redraw_projection_matrix(self):
self.fast_attention.redraw_projection_matrix()
def forward(
self,
x,
context=None,
mask=None,
context_mask=None,
name=None,
inference=False,
**kwargs,
):
_, _, _, h, gh = *x.shape, self.heads, self.global_heads
cross_attend = exists(context)
context = default(context, x)
context_mask = default(context_mask, mask) if not cross_attend else context_mask
q, k, v = self.to_q(x), self.to_k(context), self.to_v(context)
q, k, v = map(lambda t: rearrange(t, "b n (h d) -> b h n d", h=h), (q, k, v))
(q, lq), (k, lk), (v, lv) = map(lambda t: (t[:, :gh], t[:, gh:]), (q, k, v))
attn_outs = []
if not empty(q):
if exists(context_mask):
global_mask = context_mask[:, None, :, None]
v.masked_fill_(~global_mask, 0.0)
if cross_attend:
pass
else:
out = self.fast_attention(q, k, v)
attn_outs.append(out)
if not empty(lq):
assert (
not cross_attend
), "local attention is not compatible with cross attention"
out = self.local_attn(lq, lk, lv, input_mask=mask)
attn_outs.append(out)
out = torch.cat(attn_outs, dim=1)
out = rearrange(out, "b h n d -> b n (h d)")
out = self.to_out(out)
return self.dropout(out)
def l2_regularization(model, l2_alpha):
l2_loss = []
for module in model.modules():
if type(module) is nn.Conv2d:
l2_loss.append((module.weight**2).sum() / 2.0)
return l2_alpha * sum(l2_loss)
class FCPE(nn.Module):
def __init__(
self,
input_channel=128,
out_dims=360,
n_layers=12,
n_chans=512,
use_siren=False,
use_full=False,
loss_mse_scale=10,
loss_l2_regularization=False,
loss_l2_regularization_scale=1,
loss_grad1_mse=False,
loss_grad1_mse_scale=1,
f0_max=1975.5,
f0_min=32.70,
confidence=False,
threshold=0.05,
use_input_conv=True,
):
super().__init__()
if use_siren is True:
raise ValueError("Siren is not supported yet.")
if use_full is True:
raise ValueError("Full model is not supported yet.")
self.loss_mse_scale = loss_mse_scale if (loss_mse_scale is not None) else 10
self.loss_l2_regularization = (
loss_l2_regularization if (loss_l2_regularization is not None) else False
)
self.loss_l2_regularization_scale = (
loss_l2_regularization_scale
if (loss_l2_regularization_scale is not None)
else 1
)
self.loss_grad1_mse = loss_grad1_mse if (loss_grad1_mse is not None) else False
self.loss_grad1_mse_scale = (
loss_grad1_mse_scale if (loss_grad1_mse_scale is not None) else 1
)
self.f0_max = f0_max if (f0_max is not None) else 1975.5
self.f0_min = f0_min if (f0_min is not None) else 32.70
self.confidence = confidence if (confidence is not None) else False
self.threshold = threshold if (threshold is not None) else 0.05
self.use_input_conv = use_input_conv if (use_input_conv is not None) else True
self.cent_table_b = torch.Tensor(
np.linspace(
self.f0_to_cent(torch.Tensor([f0_min]))[0],
self.f0_to_cent(torch.Tensor([f0_max]))[0],
out_dims,
)
)
self.register_buffer("cent_table", self.cent_table_b)
_leaky = nn.LeakyReLU()
self.stack = nn.Sequential(
nn.Conv1d(input_channel, n_chans, 3, 1, 1),
nn.GroupNorm(4, n_chans),
_leaky,
nn.Conv1d(n_chans, n_chans, 3, 1, 1),
)
self.decoder = PCmer(
num_layers=n_layers,
num_heads=8,
dim_model=n_chans,
dim_keys=n_chans,
dim_values=n_chans,
residual_dropout=0.1,
attention_dropout=0.1,
)
self.norm = nn.LayerNorm(n_chans)
self.n_out = out_dims
self.dense_out = weight_norm(nn.Linear(n_chans, self.n_out))
def forward(
self, mel, infer=True, gt_f0=None, return_hz_f0=False, cdecoder="local_argmax"
):
if cdecoder == "argmax":
self.cdecoder = self.cents_decoder
elif cdecoder == "local_argmax":
self.cdecoder = self.cents_local_decoder
x = (
self.stack(mel.transpose(1, 2)).transpose(1, 2)
if self.use_input_conv
else mel
)
x = self.decoder(x)
x = self.norm(x)
x = self.dense_out(x)
x = torch.sigmoid(x)
if not infer:
gt_cent_f0 = self.f0_to_cent(gt_f0)
gt_cent_f0 = self.gaussian_blurred_cent(gt_cent_f0)
loss_all = self.loss_mse_scale * F.binary_cross_entropy(x, gt_cent_f0)
if self.loss_l2_regularization:
loss_all = loss_all + l2_regularization(
model=self, l2_alpha=self.loss_l2_regularization_scale
)
x = loss_all
if infer:
x = self.cdecoder(x)
x = self.cent_to_f0(x)
x = (1 + x / 700).log() if not return_hz_f0 else x
return x
def cents_decoder(self, y, mask=True):
B, N, _ = y.size()
ci = self.cent_table[None, None, :].expand(B, N, -1)
rtn = torch.sum(ci * y, dim=-1, keepdim=True) / torch.sum(y, dim=-1, keepdim=True)
if mask:
confident = torch.max(y, dim=-1, keepdim=True)[0]
confident_mask = torch.ones_like(confident)
confident_mask[confident <= self.threshold] = float("-INF")
rtn = rtn * confident_mask
return (rtn, confident) if self.confidence else rtn
def cents_local_decoder(self, y, mask=True):
B, N, _ = y.size()
ci = self.cent_table[None, None, :].expand(B, N, -1)
confident, max_index = torch.max(y, dim=-1, keepdim=True)
local_argmax_index = torch.arange(0, 9).to(max_index.device) + (max_index - 4)
local_argmax_index = torch.clamp(local_argmax_index, 0, self.n_out - 1)
ci_l = torch.gather(ci, -1, local_argmax_index)
y_l = torch.gather(y, -1, local_argmax_index)
rtn = torch.sum(ci_l * y_l, dim=-1, keepdim=True) / torch.sum(
y_l, dim=-1, keepdim=True
)
if mask:
confident_mask = torch.ones_like(confident)
confident_mask[confident <= self.threshold] = float("-INF")
rtn = rtn * confident_mask
return (rtn, confident) if self.confidence else rtn
def cent_to_f0(self, cent):
return 10.0 * 2 ** (cent / 1200.0)
def f0_to_cent(self, f0):
return 1200.0 * torch.log2(f0 / 10.0)
def gaussian_blurred_cent(self, cents):
mask = (cents > 0.1) & (cents < (1200.0 * np.log2(self.f0_max / 10.0)))
B, N, _ = cents.size()
ci = self.cent_table[None, None, :].expand(B, N, -1)
return torch.exp(-torch.square(ci - cents) / 1250) * mask.float()
class FCPEInfer:
def __init__(self, model_path, device=None, dtype=torch.float32):
if device is None:
device = "cuda" if torch.cuda.is_available() else "cpu"
self.device = device
ckpt = torch.load(model_path, map_location=torch.device(self.device))
self.args = DotDict(ckpt["config"])
self.dtype = dtype
model = FCPE(
input_channel=self.args.model.input_channel,
out_dims=self.args.model.out_dims,
n_layers=self.args.model.n_layers,
n_chans=self.args.model.n_chans,
use_siren=self.args.model.use_siren,
use_full=self.args.model.use_full,
loss_mse_scale=self.args.loss.loss_mse_scale,
loss_l2_regularization=self.args.loss.loss_l2_regularization,
loss_l2_regularization_scale=self.args.loss.loss_l2_regularization_scale,
loss_grad1_mse=self.args.loss.loss_grad1_mse,
loss_grad1_mse_scale=self.args.loss.loss_grad1_mse_scale,
f0_max=self.args.model.f0_max,
f0_min=self.args.model.f0_min,
confidence=self.args.model.confidence,
)
model.to(self.device).to(self.dtype)
model.load_state_dict(ckpt["model"])
model.eval()
self.model = model
self.wav2mel = Wav2Mel(self.args, dtype=self.dtype, device=self.device)
@torch.no_grad()
def __call__(self, audio, sr, threshold=0.05):
self.model.threshold = threshold
audio = audio[None, :]
mel = self.wav2mel(audio=audio, sample_rate=sr).to(self.dtype)
f0 = self.model(mel=mel, infer=True, return_hz_f0=True)
return f0
class Wav2Mel:
def __init__(self, args, device=None, dtype=torch.float32):
self.sample_rate = args.mel.sampling_rate
self.hop_size = args.mel.hop_size
if device is None:
device = "cuda" if torch.cuda.is_available() else "cpu"
self.device = device
self.dtype = dtype
self.stft = STFT(
args.mel.sampling_rate,
args.mel.num_mels,
args.mel.n_fft,
args.mel.win_size,
args.mel.hop_size,
args.mel.fmin,
args.mel.fmax,
)
self.resample_kernel = {}
def extract_nvstft(self, audio, keyshift=0, train=False):
mel = self.stft.get_mel(audio, keyshift=keyshift, train=train).transpose(1, 2)
return mel
def extract_mel(self, audio, sample_rate, keyshift=0, train=False):
audio = audio.to(self.dtype).to(self.device)
if sample_rate == self.sample_rate:
audio_res = audio
else:
key_str = str(sample_rate)
if key_str not in self.resample_kernel:
self.resample_kernel[key_str] = Resample(
sample_rate, self.sample_rate, lowpass_filter_width=128
)
self.resample_kernel[key_str] = (
self.resample_kernel[key_str].to(self.dtype).to(self.device)
)
audio_res = self.resample_kernel[key_str](audio)
mel = self.extract_nvstft(audio_res, keyshift=keyshift, train=train)
n_frames = int(audio.shape[1] // self.hop_size) + 1
mel = torch.cat((mel, mel[:, -1:, :]), 1) if n_frames > int(mel.shape[1]) else mel
mel = mel[:, :n_frames, :] if n_frames < int(mel.shape[1]) else mel
return mel
def __call__(self, audio, sample_rate, keyshift=0, train=False):
return self.extract_mel(audio, sample_rate, keyshift=keyshift, train=train)
class DotDict(dict):
def __getattr__(*args):
val = dict.get(*args)
return DotDict(val) if type(val) is dict else val
__setattr__ = dict.__setitem__
__delattr__ = dict.__delitem__
class F0Predictor(object):
def compute_f0(self, wav, p_len):
pass
def compute_f0_uv(self, wav, p_len):
pass
class FCPEF0Predictor(F0Predictor):
def __init__(
self,
model_path,
hop_length=512,
f0_min=50,
f0_max=1100,
dtype=torch.float32,
device=None,
sample_rate=44100,
threshold=0.05,
):
self.fcpe = FCPEInfer(model_path, device=device, dtype=dtype)
self.hop_length = hop_length
self.f0_min = f0_min
self.f0_max = f0_max
self.device = device or ("cuda" if torch.cuda.is_available() else "cpu")
self.threshold = threshold
self.sample_rate = sample_rate
self.dtype = dtype
self.name = "fcpe"
def repeat_expand(
self,
content: Union[torch.Tensor, np.ndarray],
target_len: int,
mode: str = "nearest",
):
ndim = content.ndim
content = (
content[None, None] if ndim == 1 else content[None] if ndim == 2 else content
)
assert content.ndim == 3
is_np = isinstance(content, np.ndarray)
content = torch.from_numpy(content) if is_np else content
results = torch.nn.functional.interpolate(content, size=target_len, mode=mode)
results = results.numpy() if is_np else results
return results[0, 0] if ndim == 1 else results[0] if ndim == 2 else results
def post_process(self, x, sample_rate, f0, pad_to):
f0 = (
torch.from_numpy(f0).float().to(x.device)
if isinstance(f0, np.ndarray)
else f0
)
f0 = self.repeat_expand(f0, pad_to) if pad_to is not None else f0
vuv_vector = torch.zeros_like(f0)
vuv_vector[f0 > 0.0] = 1.0
vuv_vector[f0 <= 0.0] = 0.0
nzindex = torch.nonzero(f0).squeeze()
f0 = torch.index_select(f0, dim=0, index=nzindex).cpu().numpy()
time_org = self.hop_length / sample_rate * nzindex.cpu().numpy()
time_frame = np.arange(pad_to) * self.hop_length / sample_rate
vuv_vector = F.interpolate(vuv_vector[None, None, :], size=pad_to)[0][0]
if f0.shape[0] <= 0:
return np.zeros(pad_to), vuv_vector.cpu().numpy()
if f0.shape[0] == 1:
return np.ones(pad_to) * f0[0], vuv_vector.cpu().numpy()
f0 = np.interp(time_frame, time_org, f0, left=f0[0], right=f0[-1])
return f0, vuv_vector.cpu().numpy()
def compute_f0(self, wav, p_len=None):
x = torch.FloatTensor(wav).to(self.dtype).to(self.device)
p_len = x.shape[0] // self.hop_length if p_len is None else p_len
f0 = self.fcpe(x, sr=self.sample_rate, threshold=self.threshold)[0, :, 0]
if torch.all(f0 == 0):
return f0.cpu().numpy() if p_len is None else np.zeros(p_len), (
f0.cpu().numpy() if p_len is None else np.zeros(p_len)
)
return self.post_process(x, self.sample_rate, f0, p_len)[0]
def compute_f0_uv(self, wav, p_len=None):
x = torch.FloatTensor(wav).to(self.dtype).to(self.device)
p_len = x.shape[0] // self.hop_length if p_len is None else p_len
f0 = self.fcpe(x, sr=self.sample_rate, threshold=self.threshold)[0, :, 0]
if torch.all(f0 == 0):
return f0.cpu().numpy() if p_len is None else np.zeros(p_len), (
f0.cpu().numpy() if p_len is None else np.zeros(p_len)
)
return self.post_process(x, self.sample_rate, f0, p_len)
'''
with open(os.sep.join([current_dir, dirs[6], "FCPE.py"]), 'w') as f:
f.write(FCPE)
VBACH_CLI = '''
import gc
import os
import datetime
import gradio as gr
import torch
import librosa
import tempfile
from datetime import datetime
import argparse
from vbach.infer.infer import Config, load_hubert, get_vc, rvc_infer
# Константы
RVC_MODELS_DIR = os.path.join(os.getcwd(), "voice_models")
HUBERT_MODEL_PATH = os.path.join(
os.getcwd(), "vbach", "models", "embedders", "hubert_base.pt"
)
OUTPUT_FORMAT = ["mp3", "wav", "flac", "aiff", "m4a", "aac", "ogg", "opus"]
audio_extensions = {".mp3", ".wav", ".flac", ".aiff", ".m4a", ".aac", ".ogg", ".opus"}
# Важные функции
def load_rvc_model(voice_model):
model_dir = os.path.join(RVC_MODELS_DIR, voice_model)
model_files = os.listdir(model_dir)
rvc_model_path = next(
(os.path.join(model_dir, f) for f in model_files if f.endswith(".pth")), None
)
rvc_index_path = next(
(os.path.join(model_dir, f) for f in model_files if f.endswith(".index")), None
)
if not rvc_model_path:
raise ValueError(
f"\033[91mМодели {voice_model} не существует. "
"Возможно, вы неправильно ввели имя.\033[0m"
)
return rvc_model_path, rvc_index_path
def voice_conversion(
voice_model,
vocals_path,
output_path,
pitch,
f0_method,
index_rate,
filter_radius,
volume_envelope,
protect,
hop_length,
f0_min,
f0_max,
format_output,
output_bitrate,
stereo_mode
):
rvc_model_path, rvc_index_path = load_rvc_model(voice_model)
config = Config()
hubert_model = load_hubert(config.device, config.is_half, HUBERT_MODEL_PATH)
cpt, version, net_g, tgt_sr, vc = get_vc(
config.device, config.is_half, config, rvc_model_path
)
output_audio = rvc_infer(
rvc_index_path,
index_rate,
vocals_path,
output_path,
pitch,
f0_method,
cpt,
version,
net_g,
filter_radius,
tgt_sr,
volume_envelope,
protect,
hop_length,
vc,
hubert_model,
f0_min,
f0_max,
format_output,
output_bitrate,
stereo_mode
)
del hubert_model, cpt, net_g, vc
gc.collect()
torch.cuda.empty_cache()
return output_audio
def cli_conversion(input_audios, template="NAME_MODEL_F0METHOD_PITCH", output_dir="output", model_name="", index_rate=0, output_format="wav", stereo_mode="mono", method_pitch="rmvpe+", pitch=0, hop_length=128, filter_radius=3, rms=0.25, protect=0.33, f0_min=50, f0_max=1100):
if not input_audios:
raise ValueError(
"Не удалось найти аудиофайл(ы). "
"Убедитесь, что файл загрузился или проверьте правильность пути к нему."
)
if not model_name:
raise ValueError("Выберите модель голоса для преобразования.")
if not os.path.exists(input_audios):
raise ValueError(f"Файл {input_audios} не найден.")
if not os.path.exists(input_audios):
raise FileNotFoundError(f"Ошибка: '{input_audios}' не существует.")
os.makedirs(output_dir, exist_ok=True)
if os.path.isfile(input_audios):
# Проверяем, является ли файл аудио
ext = os.path.splitext(input_audios)[1].lower()
if ext not in audio_extensions:
raise ValueError(f"Ошибка: '{input_audios}' не является аудиофайлом (допустимые расширения: {audio_extensions}).")
print(f"Найден аудиофайл: {input_audios}")
try:
file_name = os.path.basename(input_audios)
namefile = os.path.splitext(file_name)[0]
time_create_file = datetime.now().strftime("%Y%m%d_%H%M%S")
output_name = template
output_path = os.path.join(output_dir, f"{output_name}.{output_format}")
voice_conversion(model_name, input_audios, output_path, pitch, method_pitch, index_rate, filter_radius, rms, protect, hop_length, f0_min, f0_max, output_format, "320k", stereo_mode)
finally:
print("Вокал успешно преобразован")
elif os.path.isdir(input_audios):
# Ищем аудиофайлы в папке
audio_files = []
for file in os.listdir(input_audios):
ext = os.path.splitext(file)[1].lower()
if ext in audio_extensions:
audio_files.append(os.path.join(input_audios, file))
if not audio_files:
raise FileNotFoundError(f"Ошибка: в папке '{input_audios}' нет аудиофайлов (допустимые расширения: {audio_extensions}).")
print(f"Найдены аудиофайлы: {audio_files}")
try:
output_paths = []
for file in audio_files:
file_name = os.path.basename(file)
namefile = os.path.splitext(file_name)[0]
time_create_file = datetime.now().strftime("%Y%m%d_%H%M%S")
output_name = (
template
.replace("DATETIME", time_create_file)
.replace("NAME", namefile)
.replace("MODEL", model_name)
.replace("F0METHOD", method_pitch)
.replace("PITCH", f"{pitch}")
)
output_path = os.path.join(output_dir, f"{output_name}.{output_format}")
voice_conversion(model_name, file, output_path, pitch, method_pitch, index_rate, filter_radius, rms, protect, hop_length, 50, 1100, output_format, "320k", stereo_mode)
output_paths.append(output_path)
finally:
print("Вокалы успешно преобразованы")
else:
raise ValueError(f"Ошибка: '{input_audios}' не является ни файлом, ни папкой.")
def setup_args():
parser = argparse.ArgumentParser(description='Vbach CLI')
# Обязательные аргументы
parser.add_argument(
'input_audios',
type=str,
help='Путь к аудиофайлу или папке с аудиофайлами для обработки'
)
parser.add_argument(
'output_dir',
type=str,
help='Папка для сохранения результатов конвертации'
)
parser.add_argument(
'model_name',
type=str,
help='Название голосовой модели RVC для преобразования'
)
# Необязательные аргументы с значениями по умолчанию
parser.add_argument(
'--template',
type=str,
default="NAME_MODEL_F0METHOD_PITCH",
help='Шаблон имени выходного файла (доступные замены: DATETIME, NAME, MODEL, F0METHOD, PITCH)'
)
parser.add_argument(
'--index_rate',
type=float,
default=0,
help='Интенсивность использования индексного файла (от 0.0 до 1.0)',
metavar='[0.0-1.0]'
)
parser.add_argument(
'--output_format',
type=str,
default="wav",
choices=OUTPUT_FORMAT,
help='Формат выходного аудиофайла'
)
parser.add_argument(
'--stereo_mode',
type=str,
default="mono",
choices=["mono", "left/right", "sim/dif"],
help='Режим каналов: моно или стерео'
)
parser.add_argument(
'--method_pitch',
type=str,
default="rmvpe+",
help='Метод извлечения pitch (тона)'
)
parser.add_argument(
'--pitch',
type=int,
default=0,
help='Корректировка тона в полутонах'
)
parser.add_argument(
'--hop_length',
type=int,
default=128,
help='Длина hop (в семплах) для обработки'
)
parser.add_argument(
'--filter_radius',
type=int,
default=3,
help='Радиус фильтра для сглаживания'
)
parser.add_argument(
'--rms',
type=float,
default=0.25,
help='Масштабирование огибающей громкости (RMS)'
)
parser.add_argument(
'--protect',
type=float,
default=0.33,
help='Защита для глухих согласных звуков'
)
parser.add_argument(
'--f0_min',
type=int,
default=50,
help='Минимальная частота pitch (F0) в Hz'
)
parser.add_argument(
'--f0_max',
type=int,
default=1100,
help='Максимальная частота pitch (F0) в Hz'
)
return parser.parse_args()
# Пример использования:
if __name__ == "__main__":
args = setup_args()
cli_conversion(
input_audios=args.input_audios,
output_dir=args.output_dir,
model_name=args.model_name,
template=args.template,
index_rate=args.index_rate,
output_format=args.output_format,
stereo_mode=args.stereo_mode,
method_pitch=args.method_pitch,
pitch=args.pitch,
hop_length=args.hop_length,
filter_radius=args.filter_radius,
rms=args.rms,
protect=args.protect,
f0_min=args.f0_min,
f0_max=args.f0_max
)
'''
with open(os.sep.join([current_dir, dirs[2], "vbach.py"]), 'w') as f:
f.write(VBACH_CLI)
def set_language(lang):
global CURRENT_LANG
CURRENT_LANG = lang
def t(key, **kwargs):
translation = TRANSLATIONS[CURRENT_LANG].get(key, key)
if isinstance(translation, dict):
return translation
return translation.format(**kwargs) if kwargs else translation
def download_file(url, zip_name, progress):
try:
if "drive.google.com" in url:
progress(0.5, desc=t('downloading_google'))
download_from_google_drive(url, zip_name, progress)
elif "huggingface.co" in url:
progress(0.5, desc=t('downloading_huggingface'))
download_from_huggingface(url, zip_name, progress)
elif "pixeldrain.com" in url:
progress(0.5, desc=t('downloading_pixeldrain'))
download_from_pixeldrain(url, zip_name, progress)
elif "mega.nz" in url:
print(t('mega_unsupported'))
elif "disk.yandex.ru" in url or "yadi.sk" in url:
progress(0.5, desc=t('downloading_yandex'))
download_from_yandex(url, zip_name, progress)
else:
raise ValueError(t('unsupported_source', url=url))
except Exception as e:
raise gr.Error(t('download_error', error=str(e)))
def download_from_google_drive(url, zip_name, progress):
file_id = (
url.split("file/d/")[1].split("/")[0]
if "file/d/" in url
else url.split("id=")[1].split("&")[0]
)
gdown.download(id=file_id, output=str(zip_name), quiet=False)
def download_from_huggingface(url, zip_name, progress):
urllib.request.urlretrieve(url, zip_name)
def download_from_pixeldrain(url, zip_name, progress):
file_id = url.split("pixeldrain.com/u/")[1]
response = requests.get(f"https://pixeldrain.com/api/file/{file_id}")
with open(zip_name, "wb") as f:
f.write(response.content)
def download_from_yandex(url, zip_name, progress):
yandex_public_key = f"download?public_key={url}"
yandex_api_url = f"https://cloud-api.yandex.net/v1/disk/public/resources/{yandex_public_key}"
response = requests.get(yandex_api_url)
if response.status_code == 200:
download_link = response.json().get("href")
urllib.request.urlretrieve(download_link, zip_name)
else:
raise gr.Error(t('yandex_api_error', status=response.status_code))
def extract_zip(extraction_folder, zip_name):
os.makedirs(extraction_folder, exist_ok=True)
with zipfile.ZipFile(zip_name, "r") as zip_ref:
zip_ref.extractall(extraction_folder)
os.remove(zip_name)
index_filepath, model_filepath = None, None
for root, _, files in os.walk(extraction_folder):
for name in files:
file_path = os.path.join(root, name)
if name.endswith(".index") and os.stat(file_path).st_size > 1024 * 100:
index_filepath = file_path
if name.endswith(".pth") and os.stat(file_path).st_size > 1024 * 1024 * 40:
model_filepath = file_path
if not model_filepath:
raise gr.Error(t('pth_not_found', folder=extraction_folder))
rename_and_cleanup(extraction_folder, model_filepath, index_filepath)
def rename_and_cleanup(extraction_folder, model_filepath, index_filepath):
os.rename(
model_filepath,
os.path.join(extraction_folder, os.path.basename(model_filepath)),
)
if index_filepath:
os.rename(
index_filepath,
os.path.join(extraction_folder, os.path.basename(index_filepath)),
)
for filepath in os.listdir(extraction_folder):
full_path = os.path.join(extraction_folder, filepath)
if os.path.isdir(full_path):
shutil.rmtree(full_path)
def download_from_url(url, dir_name, progress=gr.Progress()):
try:
progress(0, desc=t('downloading_model', dir_name=dir_name))
zip_name = os.path.join(dirs[0], dir_name + ".zip")
extraction_folder = os.path.join(current_dir, dirs[0], dir_name)
if os.path.exists(extraction_folder):
raise gr.Error(t('model_exists', dir_name=dir_name))
download_file(url, zip_name, progress)
progress(0.8, desc=t('unpacking_zip'))
extract_zip(extraction_folder, zip_name)
return t('model_uploaded', dir_name=dir_name)
except Exception as e:
raise gr.Error(t('model_load_error', error=str(e)))
def upload_zip_file(zip_path, dir_name, progress=gr.Progress()):
try:
extraction_folder = os.path.join(current_dir, dirs[0], dir_name)
if os.path.exists(extraction_folder):
raise gr.Error(t('model_exists', dir_name=dir_name))
zip_name = zip_path.name
progress(0.8, desc=t('unpacking_zip'))
extract_zip(extraction_folder, zip_name)
return t('model_uploaded', dir_name=dir_name)
except Exception as e:
raise gr.Error(t('model_load_error', error=str(e)))
def upload_separate_files(pth_file, index_file, dir_name, progress=gr.Progress()):
try:
extraction_folder = os.path.join(current_dir, dirs[0], dir_name)
if os.path.exists(extraction_folder):
raise gr.Error(t('model_exists', dir_name=dir_name))
os.makedirs(extraction_folder, exist_ok=True)
if pth_file:
pth_path = os.path.join(extraction_folder, os.path.basename(pth_file.name))
shutil.copyfile(pth_file.name, pth_path)
if index_file:
index_path = os.path.join(extraction_folder, os.path.basename(index_file.name))
shutil.copyfile(index_file.name, index_path)
return t('model_uploaded', dir_name=dir_name)
except Exception as e:
raise gr.Error(t('model_load_error', error=str(e)))
def delete_model_name(dir_name):
model_dir = os.path.join(current_dir, dirs[0], dir_name)
if os.path.exists(model_dir):
try:
if os.path.isdir(model_dir):
shutil.rmtree(model_dir)
return t('model_deleted', dir_name=dir_name)
except Exception as e:
raise gr.Error(t('model_delete_error', error=str(e)))
else:
return t('model_not_found', dir_name=dir_name)
from vbach.cli.vbach import voice_conversion
def process_audio(
input_file: str = None,
input_list: str = None,
template: str = "NAME_MODEL_F0METHOD_PITCH",
model_name: str = "",
index_rate: float = 0,
output_format: str = "wav",
output_bitrate: int = 320,
stereo_mode: str = "mono",
method_pitch: str = "rmvpe+",
pitch: float = 0,
hop_length: int = 128,
filter_radius: int = 3,
rms: float = 0.25,
protect: float = 0.33,
f0_min: int = 50,
f0_max: int = 1100
):
keys = ["NAME", "PITCH", "F0_METHOD", "DATETIME", "MODEL"]
if any(key in template for key in keys):
pass
else:
template = "DATETIME_Vbach_F0METHOD_PITCH"
if not isinstance(input_list, list) and not input_file:
try:
print(input_list)
input_list = ast.literal_eval(input_list)
except Exception as e:
print(e)
gr.Warning(t("error_strlist_is_not_list"))
return None
if input_file is not None:
try:
print(input_file)
input_list = ast.literal_eval(input_file)
gr.Warning(t("error_path_is_list"))
return None
except Exception as e:
pass
output_bitrate = f"{output_bitrate}k"
if not input_file and not input_list:
raise gr.Error(t("error_no_audio"))
if not model_name:
raise gr.Error(t("error_no_model"))
if input_file is not None and isinstance(input_file, str) and input_list == None:
if not os.path.exists(input_file):
gr.Warning(t("warning_file_not_found", file=input_file))
return None
file_name = os.path.basename(input_file)
namefile = os.path.splitext(file_name)[0]
time_create_file = datetime.now().strftime("%Y%m%d_%H%M%S")
output_name = template
output_dir = tempfile.mkdtemp(prefix="converted_voice_")
print(output_dir)
output_name = (
template
.replace("DATETIME", time_create_file)
.replace("NAME", namefile)
.replace("MODEL", model_name)
.replace("F0METHOD", method_pitch)
.replace("PITCH", f"{pitch}")
)
output_path = os.path.join(output_dir, f"{output_name}.{output_format}")
try:
output_path = voice_conversion(
model_name,
input_file,
output_path,
pitch,
method_pitch,
index_rate,
filter_radius,
rms,
protect,
hop_length,
f0_min,
f0_max,
output_format,
output_bitrate,
stereo_mode
)
except Exception as e:
print(e)
finally:
print(t("success_single"))
return output_path
if input_file is None and input_list is not None and isinstance(input_list, list):
output_dir = tempfile.mkdtemp(prefix="converted_voice_")
print(output_dir)
output_paths = []
progress = gr.Progress()
for i, file in enumerate(input_list):
if not os.path.exists(file):
gr.Warning(t("warning_file_not_found", file=file))
continue
total_steps = len(input_list)
file_name = os.path.basename(file)
namefile = os.path.splitext(file_name)[0]
time_create_file = datetime.now().strftime("%Y%m%d_%H%M%S")
progress(
(i+1, total_steps),
desc=t("processing", namefile=namefile),
unit=t("files")
)
output_name = (
template
.replace("DATETIME", time_create_file)
.replace("NAME", namefile)
.replace("MODEL", model_name)
.replace("F0METHOD", method_pitch)
.replace("PITCH", f"{pitch}")
)
output_path = os.path.join(output_dir, f"{output_name}.{output_format}")
try:
output_path = voice_conversion(
model_name,
file,
output_path,
pitch,
method_pitch,
index_rate,
filter_radius,
rms,
protect,
hop_length,
f0_min,
f0_max,
output_format,
output_bitrate,
stereo_mode
)
except Exception as e:
print(e)
finally:
output_paths.append(output_path)
print(t("success_batch"))
return output_paths
def vbach_plugin_name():
return "VBach"
def vbach_plugin(lang="ru"):
set_language(lang)
with gr.TabItem(t("inference")):
with gr.Column():
with gr.Column(scale=3) as input_voice_group:
with gr.Group() as single_voice_file:
input_voice = gr.Audio(label=t("select_file"), interactive=True, type="filepath")
batch_upload_btn = gr.Button(t("batch_upload"))
with gr.Group(visible=False) as batch_voice_file:
input_voices = gr.Files(type="filepath", interactive=True, show_label=False)
single_upload_btn = gr.Button(t("single_upload"))
input_voice_path = gr.Textbox(label=t("audio_path"), info=t("audio_path_info"), interactive=True)
input_voice.upload(fn=(lambda x: gr.update(value=x)), inputs=input_voice, outputs=input_voice_path)
input_voices.upload(fn=(lambda x: gr.update(value=str(x))), inputs=input_voices, outputs=input_voice_path)
with gr.Column():
with gr.Row(equal_height=True):
model_name = gr.Dropdown(label=t("model_name"), choices=[d for d in os.listdir(os.path.join(current_dir, dirs[0])) if os.path.isdir(os.path.join(os.path.join(current_dir, dirs[0]), d))], interactive=True, filterable=False, scale=6)
model_update_btn = gr.Button(t("update_button"), variant="primary", scale=3, size="lg")
model_update_btn.click(fn=(lambda : gr.update(choices=[d for d in os.listdir(os.path.join(current_dir, dirs[0])) if os.path.isdir(os.path.join(os.path.join(current_dir, dirs[0]), d))])), inputs=None, outputs=model_name)
with gr.Row():
method_pitch = gr.Dropdown(label=t("pitch_method"), choices=["mangio-crepe", "rmvpe+", "fcpe"], value="rmvpe+", interactive=True, filterable=False)
hop_length = gr.Slider(minimum=2, maximum=512, step=1, value=128, label=t("hop_length"), interactive=True, visible=False)
with gr.Row():
pitch = gr.Slider(minimum=-48, maximum=48, step=12, value=0, label=t("pitch"), interactive=True)
with gr.Row():
f0_min = gr.Slider(minimum=50, maximum=3500, step=1, value=50, label=t("f0_min"), interactive=True)
f0_max = gr.Slider(minimum=500, maximum=3500, step=1, value=1100, label=t("f0_max"), interactive=True)
with gr.Column(variant="panel"):
with gr.Group():
with gr.Row(equal_height=True):
with gr.Column(scale=3):
stereo_mode = gr.Dropdown(
label=t("audio_processing"),
choices=list(t("stereo_modes").keys()),
value="mono",
interactive=True,
filterable=False
)
output_format = gr.Dropdown(label=t("output_format"), choices=OUTPUT_FORMAT)
output_bitrate = gr.Slider(32, 320, step=1, label=t("bitrate"), value=320, interactive=True)
with gr.Column(scale=6) as single_output_group:
converted_voice = gr.Audio(label=t("converted_voice"), type="filepath", interactive=False, show_download_button=True, elem_classes="fixed-height")
with gr.Column(scale=6, visible=False) as batch_output_group:
converted_voices = gr.Files(label=t("converted_voices"), type="filepath", interactive=False, height="100%", elem_classes="fixed-height")
convert_btn = gr.Button(t("convert_single"), variant="primary", scale=3)
convert_batch_btn = gr.Button(t("convert_batch"), variant="primary", visible=False, scale=3)
with gr.Column():
with gr.Tab(t("name_format")):
template_info = gr.Markdown(t("name_format_info"), line_breaks=True)
template = gr.Text(label=t("name_format"), value="NAME_MODEL_F0METHOD_PITCH", interactive=True)
with gr.Tab(t("advanced_settings")):
with gr.Row():
with gr.Column(scale=3):
filter_radius = gr.Slider(minimum=0, maximum=7, step=1, value=3, label=t("filter_radius"), interactive=True)
index_rate = gr.Slider(minimum=0, maximum=1, step=0.01, value=0, label=t("index_rate"), interactive=True)
rms = gr.Slider(minimum=0, maximum=1, step=0.01, value=0.25, label=t("rms"), interactive=True)
protect = gr.Slider(minimum=0, maximum=0.5, step=0.01, value=0.33, label=t("protect"), interactive=True)
with gr.TabItem(t("model_manager")):
with gr.TabItem(t("download_url")):
with gr.Row():
with gr.Column(variant="panel"):
gr.HTML(f"<center><h3>{t('download_link')}</h3></center>")
model_zip_link = gr.Text(label=t("download_link"))
with gr.Group():
zip_model_name = gr.Text(
label=t("model_name"),
info=t("unique_name"),
)
download_btn = gr.Button(t("download_button"), variant="primary")
gr.HTML(
f"<h3>{t('supported_sites')}: "
"<a href='https://huggingface.co/' target='_blank'>HuggingFace</a>, "
"<a href='https://pixeldrain.com/' target='_blank'>Pixeldrain</a>, "
"<a href='https://drive.google.com/' target='_blank'>Google Drive</a>, "
"<a href='https://disk.yandex.ru/' target='_blank'>Яндекс Диск</a>"
"</h3>"
)
dl_output_message = gr.Text(label=t("output_message"), interactive=False)
download_btn.click(
download_from_url,
inputs=[model_zip_link, zip_model_name],
outputs=dl_output_message,
)
with gr.Tab(t("download_zip")):
with gr.Row():
with gr.Column():
zip_file = gr.File(
label=t("zip_file"), file_types=[".zip"], file_count="single"
)
with gr.Column(variant="panel"):
gr.HTML(t("upload_steps"))
with gr.Group():
local_model_name = gr.Text(
label=t("model_name"),
info=t("unique_name"),
)
model_upload_button = gr.Button(t("download_button"), variant="primary")
local_upload_output_message = gr.Text(label=t("output_message"), interactive=False)
model_upload_button.click(
upload_zip_file,
inputs=[zip_file, local_model_name],
outputs=local_upload_output_message,
)
with gr.TabItem(t("download_files")):
with gr.Group():
with gr.Row():
pth_file = gr.File(
label=t("pth_file"), file_types=[".pth"], file_count="single"
)
index_file = gr.File(
label=t("index_file"), file_types=[".index"], file_count="single"
)
with gr.Column(variant="panel"):
with gr.Group():
separate_model_name = gr.Text(
label=t("model_name"),
info=t("unique_name"),
)
separate_upload_button = gr.Button(t("download_button"), variant="primary")
separate_upload_output_message = gr.Text(
label=t("output_message"), interactive=False
)
separate_upload_button.click(
upload_separate_files,
inputs=[pth_file, index_file, separate_model_name],
outputs=separate_upload_output_message,
)
with gr.TabItem(t("delete_model")):
with gr.Column(variant="panel"):
with gr.Group():
delete_voicemodel_name = gr.Dropdown(
label=t("model_name"),
info=t("delete_info"),
choices=[d for d in os.listdir(os.path.join(current_dir, dirs[0])) if os.path.isdir(os.path.join(os.path.join(current_dir, dirs[0]), d))],
interactive=True,
filterable=False
)
refresh_delete_btn = gr.Button(t("refresh_button"))
refresh_delete_btn.click(fn=(lambda : gr.update(choices=[d for d in os.listdir(os.path.join(current_dir, dirs[0])) if os.path.isdir(os.path.join(os.path.join(current_dir, dirs[0]), d))])), inputs=None, outputs=delete_voicemodel_name)
delete_model_output_message = gr.Text(
label=t("output_message"), interactive=False
)
delete_model_btn = gr.Button(t("delete_button"))
delete_model_btn.click(
fn=delete_model_name,
inputs=delete_voicemodel_name,
outputs=delete_model_output_message
)
method_pitch.change(fn=lambda x: gr.update(visible=True if x == "mangio-crepe" else False), inputs=method_pitch, outputs=hop_length)
batch_upload_btn.click(fn=(lambda : (gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), gr.update(visible=True))), inputs=None, outputs=[single_voice_file, batch_voice_file, single_output_group, batch_output_group, convert_btn, convert_batch_btn])
single_upload_btn.click(fn=(lambda : (gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), gr.update(visible=True))), inputs=None, outputs=[batch_voice_file, single_voice_file, batch_output_group, single_output_group, convert_batch_btn, convert_btn])
convert_btn.click(fn=process_audio, inputs=[input_voice_path, gr.State(None), template, model_name, index_rate, output_format, output_bitrate, stereo_mode, method_pitch, pitch, hop_length, filter_radius, rms, protect, f0_min, f0_max], outputs=converted_voice)
convert_batch_btn.click(fn=process_audio, inputs=[gr.State(None), input_voice_path, template, model_name, index_rate, output_format, output_bitrate, stereo_mode, method_pitch, pitch, hop_length, filter_radius, rms, protect, f0_min, f0_max], outputs=converted_voices)