|
|
import os |
|
|
import gc |
|
|
import ast |
|
|
import requests |
|
|
import sys |
|
|
import shutil |
|
|
import zipfile |
|
|
import gradio as gr |
|
|
import urllib.request |
|
|
import gdown |
|
|
import tempfile |
|
|
from datetime import datetime |
|
|
|
|
|
current_dir = os.getcwd() |
|
|
dirs = [ |
|
|
"voice_models", |
|
|
"vbach", |
|
|
os.path.join("vbach", "cli"), |
|
|
os.path.join("vbach", "infer"), |
|
|
os.path.join("vbach", "lib"), |
|
|
os.sep.join(["vbach", "lib", "algorithm"]), |
|
|
os.sep.join(["vbach", "lib", "predictors"]), |
|
|
os.path.join("vbach", "models"), |
|
|
os.sep.join(["vbach", "models", "predictors"]), |
|
|
os.sep.join(["vbach", "models", "embedders"]), |
|
|
os.path.join("vbach", "scripts"), |
|
|
os.path.join("vbach", "utils") |
|
|
] |
|
|
|
|
|
RMVPE_PATH = os.path.join(dirs[8], "rmvpe.pt") |
|
|
FCPE_PATH = os.path.join(dirs[8], "fcpe.pt") |
|
|
RVC_MODELS_DIR = dirs[0] |
|
|
HUBERT_MODEL_PATH = os.path.join( |
|
|
dirs[9], "hubert_base.pt" |
|
|
) |
|
|
CURRENT_LANG = "ru" |
|
|
OUTPUT_FORMAT = ["mp3", "wav", "flac", "aiff", "m4a", "aac", "ogg", "opus"] |
|
|
TRANSLATIONS = { |
|
|
"ru": { |
|
|
"app_title": "VBach", |
|
|
"inference": "Инференс", |
|
|
"select_file": "Выберите файл", |
|
|
"audio_path": "Путь к файлу", |
|
|
"audio_path_info": "Здесь можно ввести путь к файлу/список путей к файлам , либо загрузить его/их выше и получить путь к нему/их список", |
|
|
"audio_processing": "Режим обработки аудио", |
|
|
"output_format": "Формат вывода", |
|
|
"name_format": "Шаблон", |
|
|
"name_format_info": """Доступные ключи для формата: |
|
|
NAME - Имя входного файла |
|
|
MODEL - Название модели |
|
|
PITCH - Высота тона |
|
|
F0_METHOD - Метод извлечения тона |
|
|
DATETIME - Время и дата создания результата |
|
|
|
|
|
Пример - NAME_MODEL_PITCH → name_your-model_12""", |
|
|
"convert_single": "Конвертировать один", |
|
|
"convert_batch": "Конвертировать несколько", |
|
|
"model_name": "Имя модели", |
|
|
"pitch_method": "Метод извлечения тона", |
|
|
"pitch": "Высота тона", |
|
|
"hop_length": "Длина шага", |
|
|
"bitrate": "Битрейт (Кбит/сек)", |
|
|
"f0_min": "Нижний лимит определения высоты тона", |
|
|
"f0_max": "Верхний лимит определения высоты тона", |
|
|
"advanced_settings": "Дополнительные настройки", |
|
|
"filter_radius": "Радиус фильтра", |
|
|
"index_rate": "Влияние индекса", |
|
|
"rms": "Огибающая громкости", |
|
|
"protect": "Защита согласных", |
|
|
"model_manager": "Менеджер моделей", |
|
|
"download_url": "Загрузить по ссылке", |
|
|
"download_zip": "Загрузить ZIP архивом", |
|
|
"download_files": "Загрузить файлами", |
|
|
"delete_model": "Удалить модель", |
|
|
"download_link": "Ссылка на загрузку модели", |
|
|
"unique_name": "Дайте вашей загружаемой модели уникальное имя, отличное от других голосовых моделей.", |
|
|
"download_button": "Загрузить модель", |
|
|
"supported_sites": "Поддерживаемые сайты", |
|
|
"output_message": "Сообщение вывода", |
|
|
"zip_file": "Zip-файл", |
|
|
"upload_steps": "<h3>1. Найдите и скачайте файлы: .pth и необязательный файл .index</h3><h3>2. Закиньте файл(-ы) в ZIP-архив и поместите его в область загрузки</h3><h3>3. Дождитесь полной загрузки ZIP-архива в интерфейс</h3>", |
|
|
"pth_file": "pth-файл", |
|
|
"index_file": "index-файл", |
|
|
"delete_info": "Выберите модель, которую надо удалить", |
|
|
"refresh_button": "Обновить список моделей", |
|
|
"delete_button": "Удалить модель", |
|
|
"batch_upload": "Пакетная загрузка", |
|
|
"single_upload": "Одиночная загрузка", |
|
|
"converted_voice": "Преобразованный вокал", |
|
|
"converted_voices": "Преобразованные вокалы", |
|
|
"update_button": "Обновить", |
|
|
"processing": "Сейчас обрабатывается - {namefile}", |
|
|
"files": "файлов", |
|
|
"error_no_audio": "Не удалось найти аудиофайл(ы). Убедитесь, что файл загрузился или проверьте правильность пути к нему.", |
|
|
"error_no_model": "Выберите модель голоса для преобразования голоса", |
|
|
"warning_file_not_found": "Файл {file} не найден.", |
|
|
"success_single": "Вокал успешно преобразован", |
|
|
"success_batch": "Вокалы успешно преобразованы", |
|
|
"language": "Язык", |
|
|
"stereo_modes": { |
|
|
"mono": "Моно", |
|
|
"left/right": "Левый/Правый", |
|
|
"sim/dif": "Сходство/Различия" |
|
|
}, |
|
|
|
|
|
'downloading_google': "[~] Загрузка модели с Google Drive...", |
|
|
'downloading_huggingface': "[~] Загрузка модели с HuggingFace...", |
|
|
'downloading_pixeldrain': "[~] Загрузка модели с Pixeldrain...", |
|
|
'downloading_yandex': "[~] Загрузка модели с Яндекс Диска...", |
|
|
'downloading_model': "[~] Загрузка голосовой модели {dir_name}...", |
|
|
'unpacking_zip': "[~] Распаковка zip-файла...", |
|
|
|
|
|
|
|
|
'unsupported_source': "Неподдерживаемый источник: {url}", |
|
|
'download_error': "Ошибка при скачивании: {error}", |
|
|
'yandex_api_error': "Ошибка при получении ссылки с Яндекс Диска: {status}", |
|
|
'pth_not_found': "Не найден файл модели .pth в распакованном zip-файле. Проверьте содержимое в {folder}.", |
|
|
'model_exists': "Директория голосовой модели {dir_name} уже существует! Выберите другое имя.", |
|
|
'model_load_error': "Ошибка при загрузке модели: {error}", |
|
|
'model_delete_error': "Ошибка при удалении модели: {error}", |
|
|
|
|
|
|
|
|
'mega_unsupported': "Mega не поддерживается!", |
|
|
'model_uploaded': "[+] Модель {dir_name} успешно загружена!", |
|
|
'model_deleted': "[-] Модель {dir_name} успешно удалена!", |
|
|
'model_not_found': "[-] Модели {dir_name} не существует", |
|
|
"error_strlist_is_not_list": "Эта строка не является списком файлов", |
|
|
"error_path_is_list": "Путь к файлу является списком" |
|
|
}, |
|
|
"en": { |
|
|
"app_title": "VBach", |
|
|
"inference": "Inference", |
|
|
"select_file": "Select File", |
|
|
"audio_path": "Audio path", |
|
|
"audio_path_info": "You can enter a file path or a list of file paths here, or upload the file(s) above to obtain their path(s)", |
|
|
"audio_processing": "Audio Processing Mode", |
|
|
"output_format": "Output Format", |
|
|
"name_format": "Template", |
|
|
"name_format_info": """Available format keys: |
|
|
NAME - Input file name |
|
|
MODEL - Model name |
|
|
PITCH - Pitch |
|
|
F0_METHOD - Method extraction pitch |
|
|
DATETIME - Date & time create results |
|
|
|
|
|
Example - NAME_MODEL_PITCH → name_your-model_12""", |
|
|
"convert_single": "Convert Single", |
|
|
"convert_batch": "Convert Batch", |
|
|
"model_name": "Model Name", |
|
|
"pitch_method": "Pitch Extraction Method", |
|
|
"pitch": "Pitch", |
|
|
"hop_length": "Hop Length", |
|
|
"bitrate": "Bitrate (Kbit/sec)", |
|
|
"f0_min": "F0 Min", |
|
|
"f0_max": "F0 Max", |
|
|
"advanced_settings": "Advanced Settings", |
|
|
"filter_radius": "Filter Radius", |
|
|
"index_rate": "Index Rate", |
|
|
"rms": "RMS Envelope", |
|
|
"protect": "Consonant Protection", |
|
|
"model_manager": "Model Manager", |
|
|
"download_url": "Download by URL", |
|
|
"download_zip": "Upload ZIP Archive", |
|
|
"download_files": "Upload Files", |
|
|
"delete_model": "Delete Model", |
|
|
"download_link": "Model Download Link", |
|
|
"unique_name": "Give your model a unique name different from other voice models.", |
|
|
"download_button": "Download Model", |
|
|
"supported_sites": "Supported Sites", |
|
|
"output_message": "Output Message", |
|
|
"zip_file": "Zip File", |
|
|
"upload_steps": "<h3>1. Find and download files: .pth and optional .index</h3><h3>2. Put file(s) in a ZIP archive and upload it</h3><h3>3. Wait for the ZIP archive to be fully uploaded</h3>", |
|
|
"pth_file": "PTH File", |
|
|
"index_file": "Index File", |
|
|
"delete_info": "Select the model to delete", |
|
|
"refresh_button": "Refresh Model List", |
|
|
"delete_button": "Delete Model", |
|
|
"batch_upload": "Batch Upload", |
|
|
"single_upload": "Single Upload", |
|
|
"converted_voice": "Converted Voice", |
|
|
"converted_voices": "Converted Voices", |
|
|
"update_button": "Refresh", |
|
|
"processing": "Processing - {namefile}", |
|
|
"files": "files", |
|
|
"error_no_audio": "Could not find audio file(s). Make sure the file is uploaded or check the file path.", |
|
|
"error_no_model": "Select a voice model for voice conversion", |
|
|
"warning_file_not_found": "File {file} not found.", |
|
|
"success_single": "Voice successfully converted", |
|
|
"success_batch": "Voices successfully converted", |
|
|
"language": "Language", |
|
|
"stereo_modes": { |
|
|
"mono": "Mono", |
|
|
"left/right": "Left/Right", |
|
|
"sim/dif": "Similarity/Difference" |
|
|
}, |
|
|
'downloading_google': "[~] Downloading model from Google Drive...", |
|
|
'downloading_huggingface': "[~] Downloading model from HuggingFace...", |
|
|
'downloading_pixeldrain': "[~] Downloading model from Pixeldrain...", |
|
|
'downloading_yandex': "[~] Downloading model from Yandex Disk...", |
|
|
'downloading_model': "[~] Downloading voice model {dir_name}...", |
|
|
'unpacking_zip': "[~] Unpacking zip file...", |
|
|
|
|
|
|
|
|
'unsupported_source': "Unsupported source: {url}", |
|
|
'download_error': "Download error: {error}", |
|
|
'yandex_api_error': "Yandex Disk API error: {status}", |
|
|
'pth_not_found': "Model .pth file not found in unzipped archive. Check contents in {folder}.", |
|
|
'model_exists': "Voice model directory {dir_name} already exists! Choose another name.", |
|
|
'model_load_error': "Error loading model: {error}", |
|
|
'model_delete_error': "Error deleting model: {error}", |
|
|
|
|
|
|
|
|
'mega_unsupported': "Mega is not supported!", |
|
|
'model_uploaded': "[+] Model {dir_name} uploaded successfully!", |
|
|
'model_deleted': "[-] Model {dir_name} deleted successfully!", |
|
|
'model_not_found': "[-] Model {dir_name} does not exist", |
|
|
"error_strlist_is_not_list": "This string is not a file list", |
|
|
"error_path_is_list": "The file path is a list" |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
for dir in dirs: |
|
|
os.makedirs(os.path.join(current_dir, dir), exist_ok=True) |
|
|
|
|
|
for url, file in [["https://huggingface.co/Politrees/RVC_resources/resolve/main/predictors/rmvpe.pt", RMVPE_PATH], ["https://huggingface.co/Politrees/RVC_resources/resolve/main/predictors/fcpe.pt", FCPE_PATH], ["https://huggingface.co/Politrees/RVC_resources/resolve/main/embedders/hubert_base.pt", HUBERT_MODEL_PATH]]: |
|
|
if not os.path.exists(file): |
|
|
try: |
|
|
r = requests.get(url, stream=True) |
|
|
r.raise_for_status() |
|
|
with open(os.path.join(file), "wb") as f: |
|
|
for chunk in r.iter_content(chunk_size=8192): |
|
|
f.write(chunk) |
|
|
except requests.exceptions.RequestException as e: |
|
|
print(f"Произошла ошибка при загрузке модели: {e}") |
|
|
except Exception as e: |
|
|
print(f"Произошла непредвиденная ошибка: {e}") |
|
|
|
|
|
|
|
|
inference = ''' |
|
|
import torch |
|
|
import numpy as np |
|
|
import librosa |
|
|
from multiprocessing import cpu_count |
|
|
from fairseq import checkpoint_utils |
|
|
|
|
|
from vbach.lib.algorithm.synthesizers import Synthesizer |
|
|
from .pipeline import VC |
|
|
|
|
|
from separator.audio_writer import write_audio_file |
|
|
|
|
|
from vbach.utils.remove_center import remove_center |
|
|
|
|
|
def overlay_mono_on_stereo(mono_audio, stereo_audio, gain=0.5): |
|
|
if mono_audio is None or stereo_audio is None: |
|
|
raise ValueError("Input audio arrays cannot be None") |
|
|
|
|
|
# Ensure float32 for processing |
|
|
mono_audio = mono_audio.astype(np.float32) |
|
|
stereo_audio = stereo_audio.astype(np.float32) |
|
|
|
|
|
# Convert mono to stereo if needed |
|
|
if mono_audio.ndim == 1: |
|
|
mono_audio = np.vstack([mono_audio, mono_audio]) |
|
|
elif mono_audio.shape[0] == 1: |
|
|
mono_audio = np.vstack([mono_audio[0], mono_audio[0]]) |
|
|
|
|
|
if mono_audio.shape[0] != 2 or stereo_audio.shape[0] != 2: |
|
|
raise ValueError("Shapes must be (2, N)") |
|
|
|
|
|
min_len = min(mono_audio.shape[1], stereo_audio.shape[1]) |
|
|
if min_len == 0: |
|
|
raise ValueError("Audio arrays cannot be empty") |
|
|
|
|
|
mono_audio = mono_audio[:, :min_len] |
|
|
stereo_audio = stereo_audio[:, :min_len] |
|
|
|
|
|
result = stereo_audio + mono_audio * gain |
|
|
|
|
|
# Normalize to prevent clipping |
|
|
max_amp = np.max(np.abs(result)) |
|
|
if max_amp > 0: |
|
|
result /= max_amp |
|
|
|
|
|
# Convert back to int16 for output (if needed) |
|
|
result = (result * 32767).astype(np.int16) |
|
|
|
|
|
return result |
|
|
|
|
|
def load_audio( |
|
|
file_path: str, |
|
|
target_sr: int, |
|
|
stereo_mode: str |
|
|
) -> np.ndarray: |
|
|
""" |
|
|
Загружает аудиофайл с помощью librosa, обрабатывает и возвращает аудиосигнал |
|
|
|
|
|
Параметры: |
|
|
file_path: Путь к аудиофайлу |
|
|
target_sr: Целевая частота дискретизации |
|
|
mono: Преобразовать в моно (по умолчанию True) |
|
|
normalize: Нормализовать аудио (по умолчанию False) |
|
|
duration: Загрузить только указанную длительность (в секундах) |
|
|
offset: Начальное смещение для загрузки (в секундах) |
|
|
|
|
|
Возвращает: |
|
|
Аудиоданные в виде numpy array (моно: (samples,), стерео: (channels, samples)) |
|
|
|
|
|
Исключения: |
|
|
RuntimeError: При ошибках загрузки или обработки аудио |
|
|
""" |
|
|
try: |
|
|
mid, left, right = None, None, None |
|
|
|
|
|
if stereo_mode == "mono": |
|
|
# Загрузка аудио с помощью librosa |
|
|
mid_audio, sr = librosa.load( |
|
|
file_path, |
|
|
sr=None, |
|
|
mono=True |
|
|
) |
|
|
mid_audio = librosa.resample( |
|
|
mid_audio, # Исправлено: было audio |
|
|
orig_sr=sr, |
|
|
target_sr=target_sr |
|
|
) |
|
|
mid = mid_audio.flatten() |
|
|
|
|
|
elif stereo_mode == "left/right" or stereo_mode == "sim/dif": |
|
|
# Загрузка аудио с помощью librosa |
|
|
stereo_audio, sr = librosa.load( |
|
|
file_path, |
|
|
sr=None, |
|
|
mono=False |
|
|
) |
|
|
|
|
|
if stereo_mode == "left/right": |
|
|
left_audio = stereo_audio[0] # Исправлено: было [:, 0] |
|
|
right_audio = stereo_audio[1] # Исправлено: было [:, 1] |
|
|
left_audio = librosa.resample( |
|
|
left_audio, |
|
|
orig_sr=sr, |
|
|
target_sr=target_sr |
|
|
) |
|
|
right_audio = librosa.resample( |
|
|
right_audio, |
|
|
orig_sr=sr, |
|
|
target_sr=target_sr |
|
|
) |
|
|
|
|
|
left = left_audio.flatten() |
|
|
right = right_audio.flatten() |
|
|
|
|
|
elif stereo_mode == "sim/dif": |
|
|
mid_left, mid_right, dif_left, dif_right = remove_center(input_array=stereo_audio, samplerate=sr) |
|
|
mid_audio = (mid_left + mid_right) * 0.5 |
|
|
|
|
|
mid_audio = librosa.resample( |
|
|
mid_audio, |
|
|
orig_sr=sr, |
|
|
target_sr=target_sr |
|
|
) |
|
|
dif_left = librosa.resample( |
|
|
dif_left, |
|
|
orig_sr=sr, |
|
|
target_sr=target_sr |
|
|
) |
|
|
dif_right = librosa.resample( |
|
|
dif_right, |
|
|
orig_sr=sr, |
|
|
target_sr=target_sr |
|
|
) |
|
|
|
|
|
mid = mid_audio.flatten() |
|
|
left = dif_left.flatten() # Исправлено: было left_audio |
|
|
right = dif_right.flatten() # Исправлено: было right_audio |
|
|
|
|
|
return mid, left, right |
|
|
|
|
|
except Exception as e: |
|
|
raise RuntimeError(f"Ошибка загрузки аудио '{file_path}': {str(e)}") |
|
|
|
|
|
class Config: |
|
|
def __init__(self): |
|
|
self.device = self.get_device() |
|
|
self.is_half = self.device == "cpu" |
|
|
self.n_cpu = cpu_count() |
|
|
self.gpu_name = None |
|
|
self.gpu_mem = None |
|
|
self.x_pad, self.x_query, self.x_center, self.x_max = self.device_config() |
|
|
|
|
|
def get_device(self): |
|
|
if torch.cuda.is_available(): |
|
|
return "cuda" |
|
|
elif torch.backends.mps.is_available(): |
|
|
return "mps" |
|
|
else: |
|
|
return "cpu" |
|
|
|
|
|
def device_config(self): |
|
|
if torch.cuda.is_available(): |
|
|
print("Используется устройство CUDA") |
|
|
self._configure_gpu() |
|
|
elif torch.backends.mps.is_available(): |
|
|
print("Используется устройство MPS") |
|
|
self.device = "mps" |
|
|
else: |
|
|
print("Используется CPU") |
|
|
self.device = "cpu" |
|
|
self.is_half = True |
|
|
|
|
|
x_pad, x_query, x_center, x_max = ( |
|
|
(3, 10, 60, 65) if self.is_half else (1, 6, 38, 41) |
|
|
) |
|
|
if self.gpu_mem is not None and self.gpu_mem <= 4: |
|
|
x_pad, x_query, x_center, x_max = (1, 5, 30, 32) |
|
|
|
|
|
return x_pad, x_query, x_center, x_max |
|
|
|
|
|
def _configure_gpu(self): |
|
|
self.gpu_name = torch.cuda.get_device_name(self.device) |
|
|
low_end_gpus = ["16", "P40", "P10", "1060", "1070", "1080"] |
|
|
if ( |
|
|
any(gpu in self.gpu_name for gpu in low_end_gpus) |
|
|
and "V100" not in self.gpu_name.upper() |
|
|
): |
|
|
self.is_half = False |
|
|
self.gpu_mem = int( |
|
|
torch.cuda.get_device_properties(self.device).total_memory |
|
|
/ 1024 |
|
|
/ 1024 |
|
|
/ 1024 |
|
|
+ 0.4 |
|
|
) |
|
|
|
|
|
# Загрузка модели Hubert |
|
|
def load_hubert(device, is_half, model_path): |
|
|
models, saved_cfg, task = checkpoint_utils.load_model_ensemble_and_task( |
|
|
[model_path], suffix="" |
|
|
) |
|
|
hubert = models[0].to(device) |
|
|
hubert = hubert.half() if is_half else hubert.float() |
|
|
hubert.eval() |
|
|
return hubert |
|
|
|
|
|
# Получение голосового преобразователя |
|
|
def get_vc(device, is_half, config, model_path): |
|
|
cpt = torch.load(model_path, map_location="cpu", weights_only=False) |
|
|
if "config" not in cpt or "weight" not in cpt: |
|
|
raise ValueError( |
|
|
f"Некорректный формат для {model_path}. " |
|
|
"Используйте голосовую модель, обученную с использованием RVC v2." |
|
|
) |
|
|
|
|
|
tgt_sr = cpt["config"][-1] |
|
|
cpt["config"][-3] = cpt["weight"]["emb_g.weight"].shape[0] |
|
|
pitch_guidance = cpt.get("f0", 1) |
|
|
version = cpt.get("version", "v1") |
|
|
input_dim = 768 if version == "v2" else 256 |
|
|
|
|
|
net_g = Synthesizer( |
|
|
*cpt["config"], |
|
|
use_f0=pitch_guidance, |
|
|
input_dim=input_dim, |
|
|
is_half=is_half, |
|
|
) |
|
|
|
|
|
del net_g.enc_q |
|
|
print(net_g.load_state_dict(cpt["weight"], strict=False)) |
|
|
net_g.eval().to(device) |
|
|
net_g = net_g.half() if is_half else net_g.float() |
|
|
|
|
|
vc = VC(tgt_sr, config) |
|
|
return cpt, version, net_g, tgt_sr, vc |
|
|
|
|
|
def rvc_infer( |
|
|
index_path, |
|
|
index_rate, |
|
|
input_path, |
|
|
output_path, |
|
|
pitch, |
|
|
f0_method, |
|
|
cpt, |
|
|
version, |
|
|
net_g, |
|
|
filter_radius, |
|
|
tgt_sr, |
|
|
volume_envelope, |
|
|
protect, |
|
|
hop_length, |
|
|
vc, |
|
|
hubert_model, |
|
|
f0_min=50, |
|
|
f0_max=1100, |
|
|
format_output="wav", |
|
|
output_bitrate="320k", |
|
|
stereo_mode="mono" |
|
|
): |
|
|
|
|
|
mid, left, right = load_audio(input_path, 16000, stereo_mode) |
|
|
pitch_guidance = cpt.get("f0", 1) |
|
|
|
|
|
if stereo_mode == "mono": |
|
|
if mid is None: |
|
|
raise ValueError("Mono audio data is None") |
|
|
audio_opt = vc.pipeline( |
|
|
hubert_model, |
|
|
net_g, |
|
|
0, |
|
|
mid, |
|
|
input_path, |
|
|
pitch, |
|
|
f0_method, |
|
|
index_path, |
|
|
index_rate, |
|
|
pitch_guidance, |
|
|
filter_radius, |
|
|
tgt_sr, |
|
|
0, |
|
|
volume_envelope, |
|
|
version, |
|
|
protect, |
|
|
hop_length, |
|
|
f0_file=None, |
|
|
f0_min=f0_min, |
|
|
f0_max=f0_max, |
|
|
) |
|
|
|
|
|
elif stereo_mode == "left/right": |
|
|
if left is None or right is None: |
|
|
raise ValueError("Left or right audio channel is None") |
|
|
|
|
|
left_audio_opt = vc.pipeline( |
|
|
hubert_model, |
|
|
net_g, |
|
|
0, |
|
|
left, |
|
|
input_path, |
|
|
pitch, |
|
|
f0_method, |
|
|
index_path, |
|
|
index_rate, |
|
|
pitch_guidance, |
|
|
filter_radius, |
|
|
tgt_sr, |
|
|
0, |
|
|
volume_envelope, |
|
|
version, |
|
|
protect, |
|
|
hop_length, |
|
|
f0_file=None, |
|
|
f0_min=f0_min, |
|
|
f0_max=f0_max, |
|
|
) |
|
|
right_audio_opt = vc.pipeline( |
|
|
hubert_model, |
|
|
net_g, |
|
|
0, |
|
|
right, |
|
|
input_path, |
|
|
pitch, |
|
|
f0_method, |
|
|
index_path, |
|
|
index_rate, |
|
|
pitch_guidance, |
|
|
filter_radius, |
|
|
tgt_sr, |
|
|
0, |
|
|
volume_envelope, |
|
|
version, |
|
|
protect, |
|
|
hop_length, |
|
|
f0_file=None, |
|
|
f0_min=f0_min, |
|
|
f0_max=f0_max, |
|
|
) |
|
|
|
|
|
# Ensure both channels have the same length |
|
|
min_len = min(len(left_audio_opt), len(right_audio_opt)) |
|
|
if min_len == 0: |
|
|
raise ValueError("Processed audio is empty") |
|
|
|
|
|
left_audio_opt = left_audio_opt[:min_len] |
|
|
right_audio_opt = right_audio_opt[:min_len] |
|
|
|
|
|
audio_opt = np.stack((left_audio_opt, right_audio_opt), axis=0) |
|
|
|
|
|
elif stereo_mode == "sim/dif": |
|
|
if mid is None or left is None or right is None: |
|
|
raise ValueError("Mid, left or right audio channel is None") |
|
|
|
|
|
mid_audio_opt = vc.pipeline( |
|
|
hubert_model, |
|
|
net_g, |
|
|
0, |
|
|
mid, |
|
|
input_path, |
|
|
pitch, |
|
|
f0_method, |
|
|
index_path, |
|
|
index_rate, |
|
|
pitch_guidance, |
|
|
filter_radius, |
|
|
tgt_sr, |
|
|
0, |
|
|
volume_envelope, |
|
|
version, |
|
|
protect, |
|
|
hop_length, |
|
|
f0_file=None, |
|
|
f0_min=f0_min, |
|
|
f0_max=f0_max, |
|
|
) |
|
|
left_audio_opt = vc.pipeline( |
|
|
hubert_model, |
|
|
net_g, |
|
|
0, |
|
|
left, |
|
|
input_path, |
|
|
pitch, |
|
|
f0_method, |
|
|
index_path, |
|
|
index_rate, |
|
|
pitch_guidance, |
|
|
filter_radius, |
|
|
tgt_sr, |
|
|
0, |
|
|
volume_envelope, |
|
|
version, |
|
|
protect, |
|
|
hop_length, |
|
|
f0_file=None, |
|
|
f0_min=f0_min, |
|
|
f0_max=f0_max, |
|
|
) |
|
|
right_audio_opt = vc.pipeline( |
|
|
hubert_model, |
|
|
net_g, |
|
|
0, |
|
|
right, |
|
|
input_path, |
|
|
pitch, |
|
|
f0_method, |
|
|
index_path, |
|
|
index_rate, |
|
|
pitch_guidance, |
|
|
filter_radius, |
|
|
tgt_sr, |
|
|
0, |
|
|
volume_envelope, |
|
|
version, |
|
|
protect, |
|
|
hop_length, |
|
|
f0_file=None, |
|
|
f0_min=f0_min, |
|
|
f0_max=f0_max, |
|
|
) |
|
|
|
|
|
# Ensure all channels have the same length |
|
|
min_len = min(len(mid_audio_opt), len(left_audio_opt), len(right_audio_opt)) |
|
|
if min_len == 0: |
|
|
raise ValueError("Processed audio is empty") |
|
|
|
|
|
mid_audio_opt = mid_audio_opt[:min_len] |
|
|
left_audio_opt = left_audio_opt[:min_len] |
|
|
right_audio_opt = right_audio_opt[:min_len] |
|
|
|
|
|
dif_audio_opt = np.stack((left_audio_opt, right_audio_opt), axis=0) |
|
|
|
|
|
audio_opt = overlay_mono_on_stereo(mid_audio_opt, dif_audio_opt) |
|
|
|
|
|
write_audio_file(output_path, audio_opt, tgt_sr, format_output, output_bitrate) |
|
|
return output_path |
|
|
''' |
|
|
|
|
|
pipeline = ''' |
|
|
import os |
|
|
import gc |
|
|
import torch |
|
|
import torch.nn.functional as F |
|
|
import torchcrepe |
|
|
import faiss |
|
|
import librosa |
|
|
import numpy as np |
|
|
from scipy import signal |
|
|
|
|
|
from vbach.lib.predictors.FCPE import FCPEF0Predictor |
|
|
from vbach.lib.predictors.RMVPE import RMVPE0Predictor |
|
|
|
|
|
PREDICTORS_DIR = os.path.join(os.getcwd(), "vbach", "models", "predictors") |
|
|
RMVPE_DIR = os.path.join(PREDICTORS_DIR, "rmvpe.pt") |
|
|
FCPE_DIR = os.path.join(PREDICTORS_DIR, "fcpe.pt") |
|
|
|
|
|
# Фильтр Баттерворта для высоких частот |
|
|
FILTER_ORDER = 5 # Порядок фильтра |
|
|
CUTOFF_FREQUENCY = 48 # Частота среза (в Гц) |
|
|
SAMPLE_RATE = 16000 # Частота дискретизации (в Гц) |
|
|
bh, ah = signal.butter(N=FILTER_ORDER, Wn=CUTOFF_FREQUENCY, btype="high", fs=SAMPLE_RATE) |
|
|
|
|
|
|
|
|
input_audio_path2wav = {} |
|
|
|
|
|
|
|
|
# Класс для обработки аудио |
|
|
class AudioProcessor: |
|
|
@staticmethod |
|
|
def change_rms(source_audio, source_rate, target_audio, target_rate, rate): |
|
|
""" |
|
|
Изменяет RMS (среднеквадратичное значение) аудио. |
|
|
""" |
|
|
rms1 = librosa.feature.rms( |
|
|
y=source_audio, |
|
|
frame_length=source_rate // 2 * 2, |
|
|
hop_length=source_rate // 2, |
|
|
) |
|
|
rms2 = librosa.feature.rms( |
|
|
y=target_audio, |
|
|
frame_length=target_rate // 2 * 2, |
|
|
hop_length=target_rate // 2, |
|
|
) |
|
|
|
|
|
rms1 = F.interpolate( |
|
|
torch.from_numpy(rms1).float().unsqueeze(0), |
|
|
size=target_audio.shape[0], |
|
|
mode="linear", |
|
|
).squeeze() |
|
|
rms2 = F.interpolate( |
|
|
torch.from_numpy(rms2).float().unsqueeze(0), |
|
|
size=target_audio.shape[0], |
|
|
mode="linear", |
|
|
).squeeze() |
|
|
rms2 = torch.maximum(rms2, torch.zeros_like(rms2) + 1e-6) |
|
|
|
|
|
adjusted_audio = ( |
|
|
target_audio * (torch.pow(rms1, 1 - rate) * torch.pow(rms2, rate - 1)).numpy() |
|
|
) |
|
|
return adjusted_audio |
|
|
|
|
|
|
|
|
# Класс для преобразования голоса |
|
|
class VC: |
|
|
def __init__(self, tgt_sr, config): |
|
|
""" |
|
|
Инициализация параметров для преобразования голоса. |
|
|
""" |
|
|
self.x_pad = config.x_pad |
|
|
self.x_query = config.x_query |
|
|
self.x_center = config.x_center |
|
|
self.x_max = config.x_max |
|
|
self.is_half = config.is_half |
|
|
self.sample_rate = 16000 |
|
|
self.window = 160 |
|
|
self.t_pad = self.sample_rate * self.x_pad |
|
|
self.t_pad_tgt = tgt_sr * self.x_pad |
|
|
self.t_pad2 = self.t_pad * 2 |
|
|
self.t_query = self.sample_rate * self.x_query |
|
|
self.t_center = self.sample_rate * self.x_center |
|
|
self.t_max = self.sample_rate * self.x_max |
|
|
self.time_step = self.window / self.sample_rate * 1000 |
|
|
self.device = config.device |
|
|
|
|
|
def get_f0_crepe(self, x, f0_min, f0_max, p_len, hop_length, model="full"): |
|
|
""" |
|
|
Получает F0 с использованием модели crepe. |
|
|
""" |
|
|
x = x.astype(np.float32) |
|
|
x /= np.quantile(np.abs(x), 0.999) |
|
|
audio = torch.from_numpy(x).to(self.device, copy=True).unsqueeze(0) |
|
|
if audio.ndim == 2 and audio.shape[0] > 1: |
|
|
audio = torch.mean(audio, dim=0, keepdim=True) |
|
|
|
|
|
pitch = torchcrepe.predict( |
|
|
audio, |
|
|
self.sample_rate, |
|
|
hop_length, |
|
|
f0_min, |
|
|
f0_max, |
|
|
model, |
|
|
batch_size=hop_length * 2, |
|
|
device=self.device, |
|
|
pad=True, |
|
|
) |
|
|
|
|
|
p_len = p_len or x.shape[0] // hop_length |
|
|
source = np.array(pitch.squeeze(0).cpu().float().numpy()) |
|
|
source[source < 0.001] = np.nan |
|
|
target = np.interp( |
|
|
np.arange(0, len(source) * p_len, len(source)) / p_len, |
|
|
np.arange(0, len(source)), |
|
|
source, |
|
|
) |
|
|
f0 = np.nan_to_num(target) |
|
|
return f0 |
|
|
|
|
|
def get_f0_rmvpe(self, x, f0_min=1, f0_max=40000, *args, **kwargs): |
|
|
""" |
|
|
Получает F0 с использованием модели rmvpe. |
|
|
""" |
|
|
if not hasattr(self, "model_rmvpe"): |
|
|
self.model_rmvpe = RMVPE0Predictor( |
|
|
RMVPE_DIR, is_half=self.is_half, device=self.device |
|
|
) |
|
|
f0 = self.model_rmvpe.infer_from_audio_with_pitch( |
|
|
x, thred=0.03, f0_min=f0_min, f0_max=f0_max |
|
|
) |
|
|
return f0 |
|
|
|
|
|
def get_f0( |
|
|
self, |
|
|
input_audio_path, |
|
|
x, |
|
|
p_len, |
|
|
pitch, |
|
|
f0_method, |
|
|
filter_radius, |
|
|
hop_length, |
|
|
inp_f0=None, |
|
|
f0_min=50, |
|
|
f0_max=1100, |
|
|
): |
|
|
""" |
|
|
Получает F0 с использованием выбранного метода. |
|
|
""" |
|
|
global input_audio_path2wav |
|
|
f0_mel_min = 1127 * np.log(1 + f0_min / 700) |
|
|
f0_mel_max = 1127 * np.log(1 + f0_max / 700) |
|
|
|
|
|
if f0_method == "mangio-crepe": |
|
|
f0 = self.get_f0_crepe(x, f0_min, f0_max, p_len, int(hop_length)) |
|
|
|
|
|
elif f0_method == "rmvpe+": |
|
|
params = { |
|
|
"x": x, |
|
|
"p_len": p_len, |
|
|
"pitch": pitch, |
|
|
"f0_min": f0_min, |
|
|
"f0_max": f0_max, |
|
|
"time_step": self.time_step, |
|
|
"filter_radius": filter_radius, |
|
|
"crepe_hop_length": int(hop_length), |
|
|
"model": "full", |
|
|
} |
|
|
f0 = self.get_f0_rmvpe(**params) |
|
|
|
|
|
elif f0_method == "fcpe": |
|
|
self.model_fcpe = FCPEF0Predictor( |
|
|
FCPE_DIR, |
|
|
f0_min=int(f0_min), |
|
|
f0_max=int(f0_max), |
|
|
dtype=torch.float32, |
|
|
device=self.device, |
|
|
sample_rate=self.sample_rate, |
|
|
threshold=0.03, |
|
|
) |
|
|
f0 = self.model_fcpe.compute_f0(x, p_len=p_len) |
|
|
del self.model_fcpe |
|
|
gc.collect() |
|
|
|
|
|
f0 *= pow(2, pitch / 12) |
|
|
tf0 = self.sample_rate // self.window |
|
|
if inp_f0 is not None: |
|
|
delta_t = np.round( |
|
|
(inp_f0[:, 0].max() - inp_f0[:, 0].min()) * tf0 + 1 |
|
|
).astype("int16") |
|
|
replace_f0 = np.interp(list(range(delta_t)), inp_f0[:, 0] * 100, inp_f0[:, 1]) |
|
|
shape = f0[self.x_pad * tf0 : self.x_pad * tf0 + len(replace_f0)].shape[0] |
|
|
f0[self.x_pad * tf0 : self.x_pad * tf0 + len(replace_f0)] = replace_f0[:shape] |
|
|
|
|
|
f0bak = f0.copy() |
|
|
f0_mel = 1127 * np.log(1 + f0 / 700) |
|
|
f0_mel[f0_mel > 0] = (f0_mel[f0_mel > 0] - f0_mel_min) * 254 / ( |
|
|
f0_mel_max - f0_mel_min |
|
|
) + 1 |
|
|
f0_mel[f0_mel <= 1] = 1 |
|
|
f0_mel[f0_mel > 255] = 255 |
|
|
f0_coarse = np.rint(f0_mel).astype(int) |
|
|
return f0_coarse, f0bak |
|
|
|
|
|
def vc( |
|
|
self, |
|
|
model, |
|
|
net_g, |
|
|
sid, |
|
|
audio0, |
|
|
pitch, |
|
|
pitchf, |
|
|
index, |
|
|
big_npy, |
|
|
index_rate, |
|
|
version, |
|
|
protect, |
|
|
): |
|
|
""" |
|
|
Преобразует аудио с использованием модели. |
|
|
""" |
|
|
feats = torch.from_numpy(audio0) |
|
|
feats = feats.half() if self.is_half else feats.float() |
|
|
if feats.dim() == 2: |
|
|
feats = feats.mean(-1) |
|
|
assert feats.dim() == 1, feats.dim() |
|
|
feats = feats.view(1, -1) |
|
|
padding_mask = torch.BoolTensor(feats.shape).to(self.device).fill_(False) |
|
|
|
|
|
inputs = { |
|
|
"source": feats.to(self.device), |
|
|
"padding_mask": padding_mask, |
|
|
"output_layer": 9 if version == "v1" else 12, |
|
|
} |
|
|
|
|
|
with torch.no_grad(): |
|
|
logits = model.extract_features(**inputs) |
|
|
feats = model.final_proj(logits[0]) if version == "v1" else logits[0] |
|
|
if protect < 0.5 and pitch is not None and pitchf is not None: |
|
|
feats0 = feats.clone() |
|
|
if index is not None and big_npy is not None and index_rate != 0: |
|
|
npy = feats[0].cpu().numpy() |
|
|
npy = npy.astype("float32") if self.is_half else npy |
|
|
score, ix = index.search(npy, k=8) |
|
|
weight = np.square(1 / score) |
|
|
weight /= weight.sum(axis=1, keepdims=True) |
|
|
npy = np.sum(big_npy[ix] * np.expand_dims(weight, axis=2), axis=1) |
|
|
npy = npy.astype("float16") if self.is_half else npy |
|
|
feats = ( |
|
|
torch.from_numpy(npy).unsqueeze(0).to(self.device) * index_rate |
|
|
+ (1 - index_rate) * feats |
|
|
) |
|
|
|
|
|
feats = F.interpolate(feats.permute(0, 2, 1), scale_factor=2).permute(0, 2, 1) |
|
|
if protect < 0.5 and pitch is not None and pitchf is not None: |
|
|
feats0 = F.interpolate(feats0.permute(0, 2, 1), scale_factor=2).permute( |
|
|
0, 2, 1 |
|
|
) |
|
|
p_len = audio0.shape[0] // self.window |
|
|
if feats.shape[1] < p_len: |
|
|
p_len = feats.shape[1] |
|
|
if pitch is not None and pitchf is not None: |
|
|
pitch = pitch[:, :p_len] |
|
|
pitchf = pitchf[:, :p_len] |
|
|
|
|
|
if protect < 0.5 and pitch is not None and pitchf is not None: |
|
|
pitchff = pitchf.clone() |
|
|
pitchff[pitchf > 0] = 1 |
|
|
pitchff[pitchf < 1] = protect |
|
|
pitchff = pitchff.unsqueeze(-1) |
|
|
feats = feats * pitchff + feats0 * (1 - pitchff) |
|
|
feats = feats.to(feats0.dtype) |
|
|
p_len = torch.tensor([p_len], device=self.device).long() |
|
|
with torch.no_grad(): |
|
|
if pitch is not None and pitchf is not None: |
|
|
audio1 = ( |
|
|
(net_g.infer(feats, p_len, pitch, pitchf, sid)[0][0, 0]) |
|
|
.data.cpu() |
|
|
.float() |
|
|
.numpy() |
|
|
) |
|
|
else: |
|
|
audio1 = ( |
|
|
(net_g.infer(feats, p_len, sid)[0][0, 0]).data.cpu().float().numpy() |
|
|
) |
|
|
del feats, p_len, padding_mask |
|
|
if torch.cuda.is_available(): |
|
|
torch.cuda.empty_cache() |
|
|
return audio1 |
|
|
|
|
|
def pipeline( |
|
|
self, |
|
|
model, |
|
|
net_g, |
|
|
sid, |
|
|
audio, |
|
|
input_audio_path, |
|
|
pitch, |
|
|
f0_method, |
|
|
file_index, |
|
|
index_rate, |
|
|
pitch_guidance, |
|
|
filter_radius, |
|
|
tgt_sr, |
|
|
resample_sr, |
|
|
volume_envelope, |
|
|
version, |
|
|
protect, |
|
|
hop_length, |
|
|
f0_file, |
|
|
f0_min=50, |
|
|
f0_max=1100, |
|
|
): |
|
|
""" |
|
|
Основной конвейер для преобразования аудио. |
|
|
""" |
|
|
if ( |
|
|
file_index is not None |
|
|
and file_index != "" |
|
|
and os.path.exists(file_index) |
|
|
and index_rate != 0 |
|
|
): |
|
|
try: |
|
|
index = faiss.read_index(file_index) |
|
|
big_npy = index.reconstruct_n(0, index.ntotal) |
|
|
except Exception as e: |
|
|
print(f"Произошла ошибка при чтении индекса FAISS: {e}") |
|
|
index = big_npy = None |
|
|
else: |
|
|
index = big_npy = None |
|
|
audio = signal.filtfilt(bh, ah, audio) |
|
|
audio_pad = np.pad(audio, (self.window // 2, self.window // 2), mode="reflect") |
|
|
opt_ts = [] |
|
|
if audio_pad.shape[0] > self.t_max: |
|
|
audio_sum = np.zeros_like(audio) |
|
|
for i in range(self.window): |
|
|
audio_sum += audio_pad[i : i - self.window] |
|
|
for t in range(self.t_center, audio.shape[0], self.t_center): |
|
|
opt_ts.append( |
|
|
t |
|
|
- self.t_query |
|
|
+ np.where( |
|
|
np.abs(audio_sum[t - self.t_query : t + self.t_query]) |
|
|
== np.abs(audio_sum[t - self.t_query : t + self.t_query]).min() |
|
|
)[0][0] |
|
|
) |
|
|
s = 0 |
|
|
audio_opt = [] |
|
|
t = None |
|
|
audio_pad = np.pad(audio, (self.t_pad, self.t_pad), mode="reflect") |
|
|
p_len = audio_pad.shape[0] // self.window |
|
|
inp_f0 = None |
|
|
if f0_file and hasattr(f0_file, "name"): |
|
|
try: |
|
|
with open(f0_file.name, "r") as f: |
|
|
lines = f.read().strip("\\n").split("\\n") |
|
|
inp_f0 = np.array( |
|
|
[[float(i) for i in line.split(",")] for line in lines], |
|
|
dtype="float32", |
|
|
) |
|
|
except Exception as e: |
|
|
print(f"Произошла ошибка при чтении файла F0: {e}") |
|
|
sid = torch.tensor(sid, device=self.device).unsqueeze(0).long() |
|
|
if pitch_guidance: |
|
|
pitch, pitchf = self.get_f0( |
|
|
input_audio_path, |
|
|
audio_pad, |
|
|
p_len, |
|
|
pitch, |
|
|
f0_method, |
|
|
filter_radius, |
|
|
hop_length, |
|
|
inp_f0, |
|
|
f0_min, |
|
|
f0_max, |
|
|
) |
|
|
pitch = pitch[:p_len] |
|
|
pitchf = pitchf[:p_len] |
|
|
if self.device == "mps": |
|
|
pitchf = pitchf.astype(np.float32) |
|
|
pitch = torch.tensor(pitch, device=self.device).unsqueeze(0).long() |
|
|
pitchf = torch.tensor(pitchf, device=self.device).unsqueeze(0).float() |
|
|
for t in opt_ts: |
|
|
t = t // self.window * self.window |
|
|
if pitch_guidance: |
|
|
audio_opt.append( |
|
|
self.vc( |
|
|
model, |
|
|
net_g, |
|
|
sid, |
|
|
audio_pad[s : t + self.t_pad2 + self.window], |
|
|
pitch[:, s // self.window : (t + self.t_pad2) // self.window], |
|
|
pitchf[:, s // self.window : (t + self.t_pad2) // self.window], |
|
|
index, |
|
|
big_npy, |
|
|
index_rate, |
|
|
version, |
|
|
protect, |
|
|
)[self.t_pad_tgt : -self.t_pad_tgt] |
|
|
) |
|
|
else: |
|
|
audio_opt.append( |
|
|
self.vc( |
|
|
model, |
|
|
net_g, |
|
|
sid, |
|
|
audio_pad[s : t + self.t_pad2 + self.window], |
|
|
None, |
|
|
None, |
|
|
index, |
|
|
big_npy, |
|
|
index_rate, |
|
|
version, |
|
|
protect, |
|
|
)[self.t_pad_tgt : -self.t_pad_tgt] |
|
|
) |
|
|
s = t |
|
|
if pitch_guidance: |
|
|
audio_opt.append( |
|
|
self.vc( |
|
|
model, |
|
|
net_g, |
|
|
sid, |
|
|
audio_pad[t:], |
|
|
pitch[:, t // self.window :] if t is not None else pitch, |
|
|
pitchf[:, t // self.window :] if t is not None else pitchf, |
|
|
index, |
|
|
big_npy, |
|
|
index_rate, |
|
|
version, |
|
|
protect, |
|
|
)[self.t_pad_tgt : -self.t_pad_tgt] |
|
|
) |
|
|
else: |
|
|
audio_opt.append( |
|
|
self.vc( |
|
|
model, |
|
|
net_g, |
|
|
sid, |
|
|
audio_pad[t:], |
|
|
None, |
|
|
None, |
|
|
index, |
|
|
big_npy, |
|
|
index_rate, |
|
|
version, |
|
|
protect, |
|
|
)[self.t_pad_tgt : -self.t_pad_tgt] |
|
|
) |
|
|
|
|
|
audio_opt = np.concatenate(audio_opt) |
|
|
if volume_envelope != 1: |
|
|
audio_opt = AudioProcessor.change_rms( |
|
|
audio, self.sample_rate, audio_opt, tgt_sr, volume_envelope |
|
|
) |
|
|
if resample_sr >= self.sample_rate and tgt_sr != resample_sr: |
|
|
audio_opt = librosa.resample(audio_opt, orig_sr=tgt_sr, target_sr=resample_sr) |
|
|
|
|
|
audio_max = np.abs(audio_opt).max() / 0.99 |
|
|
max_int16 = 32768 |
|
|
if audio_max > 1: |
|
|
max_int16 /= audio_max |
|
|
audio_opt = (audio_opt * max_int16).astype(np.int16) |
|
|
|
|
|
del pitch, pitchf, sid |
|
|
if torch.cuda.is_available(): |
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
return audio_opt |
|
|
''' |
|
|
|
|
|
for path, text in [[os.sep.join([current_dir, dirs[3], "infer.py"]), inference], [os.sep.join([current_dir, dirs[3], "pipeline.py"]), pipeline]]: |
|
|
with open(path, 'w') as f: |
|
|
f.write(text) |
|
|
|
|
|
remove_center = ''' |
|
|
import numpy as np |
|
|
from scipy import signal |
|
|
|
|
|
def remove_center(input_array, samplerate, rdf=0.99999, window_size=2048, overlap=2, window_type="blackman", stereo_mode="stereo"): |
|
|
# Validate input |
|
|
# if input_array.ndim != 2 or input_array.shape[1] != 2: |
|
|
# raise ValueError("Input must be a stereo array with shape (samples, 2)") |
|
|
|
|
|
left = input_array[0] |
|
|
right = input_array[1] |
|
|
# mono = np.mean(input_array, axis=1) |
|
|
|
|
|
# Adjust window size if input is too short |
|
|
nperseg = min(window_size, len(left)) |
|
|
if nperseg < 16: # Minimum reasonable window size |
|
|
nperseg = 16 |
|
|
if len(left) < 16: |
|
|
# For very short inputs, just return the original with warning |
|
|
import warnings |
|
|
warnings.warn(f"Input too short ({len(left)} samples), returning original audio") |
|
|
return left, right, left, right |
|
|
|
|
|
noverlap = nperseg // overlap # Ensure noverlap < nperseg |
|
|
if noverlap >= nperseg: |
|
|
noverlap = nperseg - 1 # Ensure at least 1 sample difference |
|
|
|
|
|
# Compute STFT |
|
|
f, t, Z_left = signal.stft(left, fs=samplerate, nperseg=nperseg, noverlap=noverlap, window=window_type) |
|
|
f, t, Z_right = signal.stft(right, fs=samplerate, nperseg=nperseg, noverlap=noverlap, window=window_type) |
|
|
# f, t, Z_mono = signal.stft(mono, fs=samplerate, nperseg=nperseg, noverlap=noverlap, window=window_type) |
|
|
|
|
|
if stereo_mode == "mono": |
|
|
Z_common_left = np.minimum(np.abs(Z_left), np.abs(Z_right)) * np.exp(1j*np.angle(Z_mono)) |
|
|
Z_common_right = np.minimum(np.abs(Z_left), np.abs(Z_right)) * np.exp(1j*np.angle(Z_mono)) |
|
|
else: |
|
|
Z_common_left = np.minimum(np.abs(Z_left), np.abs(Z_right)) * np.exp(1j*np.angle(Z_right)) |
|
|
Z_common_right = np.minimum(np.abs(Z_left), np.abs(Z_right)) * np.exp(1j*np.angle(Z_left)) |
|
|
|
|
|
reduction_factor = rdf |
|
|
|
|
|
Z_new_left = Z_left - Z_common_left * reduction_factor |
|
|
Z_new_right = Z_right - Z_common_right * reduction_factor |
|
|
|
|
|
# Compute ISTFT |
|
|
_, new_left = signal.istft(Z_new_left, fs=samplerate, nperseg=nperseg, noverlap=noverlap, window=window_type) |
|
|
_, new_right = signal.istft(Z_new_right, fs=samplerate, nperseg=nperseg, noverlap=noverlap, window=window_type) |
|
|
_, common_signal_left = signal.istft(Z_common_left, fs=samplerate, nperseg=nperseg, noverlap=noverlap, window=window_type) |
|
|
_, common_signal_right = signal.istft(Z_common_right, fs=samplerate, nperseg=nperseg, noverlap=noverlap, window=window_type) |
|
|
|
|
|
# Trim to original length |
|
|
new_left = new_left[:len(left)] |
|
|
new_right = new_right[:len(right)] |
|
|
common_signal_left = common_signal_left[:len(left)] |
|
|
common_signal_right = common_signal_right[:len(left)] |
|
|
|
|
|
# Normalize |
|
|
peak = np.max([np.abs(new_left).max(), np.abs(new_right).max()]) |
|
|
if peak > 1.0: |
|
|
new_left = new_left / peak |
|
|
new_right = new_right / peak |
|
|
|
|
|
inverted_center_left = -common_signal_left |
|
|
inverted_center_right = -common_signal_right |
|
|
|
|
|
mixed_left = left + inverted_center_left |
|
|
mixed_right = right + inverted_center_right |
|
|
|
|
|
peak_mixed = np.max([np.abs(mixed_left).max(), np.abs(mixed_right).max()]) |
|
|
if peak_mixed > 1.0: |
|
|
mixed_left = mixed_left / peak_mixed |
|
|
mixed_right = mixed_right / peak_mixed |
|
|
|
|
|
return common_signal_left, common_signal_right, new_left, new_right |
|
|
''' |
|
|
|
|
|
for path, text in [[os.sep.join([current_dir, dirs[11], "remove_center.py"]), remove_center]]: |
|
|
with open(path, 'w') as f: |
|
|
f.write(text) |
|
|
|
|
|
lib_algorithm = { |
|
|
"synthesizers" : ["synthesizers.py", ''' |
|
|
import torch |
|
|
from torch import nn |
|
|
from torch.nn.utils.weight_norm import remove_weight_norm |
|
|
from typing import Optional |
|
|
|
|
|
from .commons import slice_segments, rand_slice_segments |
|
|
from .encoders import TextEncoder, PosteriorEncoder |
|
|
from .generators import Generator |
|
|
from .nsf import GeneratorNSF |
|
|
from .residuals import ResidualCouplingBlock |
|
|
|
|
|
|
|
|
class Synthesizer(nn.Module): |
|
|
def __init__( |
|
|
self, |
|
|
spec_channels, |
|
|
segment_size, |
|
|
inter_channels, |
|
|
hidden_channels, |
|
|
filter_channels, |
|
|
n_heads, |
|
|
n_layers, |
|
|
kernel_size, |
|
|
p_dropout, |
|
|
resblock, |
|
|
resblock_kernel_sizes, |
|
|
resblock_dilation_sizes, |
|
|
upsample_rates, |
|
|
upsample_initial_channel, |
|
|
upsample_kernel_sizes, |
|
|
spk_embed_dim, |
|
|
gin_channels, |
|
|
sr, |
|
|
use_f0, |
|
|
input_dim=768, |
|
|
**kwargs |
|
|
): |
|
|
super(Synthesizer, self).__init__() |
|
|
self.spec_channels = spec_channels |
|
|
self.inter_channels = inter_channels |
|
|
self.hidden_channels = hidden_channels |
|
|
self.filter_channels = filter_channels |
|
|
self.n_heads = n_heads |
|
|
self.n_layers = n_layers |
|
|
self.kernel_size = kernel_size |
|
|
self.p_dropout = float(p_dropout) |
|
|
self.resblock = resblock |
|
|
self.resblock_kernel_sizes = resblock_kernel_sizes |
|
|
self.resblock_dilation_sizes = resblock_dilation_sizes |
|
|
self.upsample_rates = upsample_rates |
|
|
self.upsample_initial_channel = upsample_initial_channel |
|
|
self.upsample_kernel_sizes = upsample_kernel_sizes |
|
|
self.segment_size = segment_size |
|
|
self.gin_channels = gin_channels |
|
|
self.spk_embed_dim = spk_embed_dim |
|
|
self.use_f0 = use_f0 |
|
|
|
|
|
self.enc_p = TextEncoder( |
|
|
inter_channels, |
|
|
hidden_channels, |
|
|
filter_channels, |
|
|
n_heads, |
|
|
n_layers, |
|
|
kernel_size, |
|
|
float(p_dropout), |
|
|
input_dim, |
|
|
f0=use_f0, |
|
|
) |
|
|
|
|
|
if use_f0: |
|
|
self.dec = GeneratorNSF( |
|
|
inter_channels, |
|
|
resblock, |
|
|
resblock_kernel_sizes, |
|
|
resblock_dilation_sizes, |
|
|
upsample_rates, |
|
|
upsample_initial_channel, |
|
|
upsample_kernel_sizes, |
|
|
gin_channels=gin_channels, |
|
|
sr=sr, |
|
|
is_half=kwargs["is_half"], |
|
|
) |
|
|
else: |
|
|
self.dec = Generator( |
|
|
inter_channels, |
|
|
resblock, |
|
|
resblock_kernel_sizes, |
|
|
resblock_dilation_sizes, |
|
|
upsample_rates, |
|
|
upsample_initial_channel, |
|
|
upsample_kernel_sizes, |
|
|
gin_channels=gin_channels, |
|
|
) |
|
|
|
|
|
self.enc_q = PosteriorEncoder( |
|
|
spec_channels, |
|
|
inter_channels, |
|
|
hidden_channels, |
|
|
5, |
|
|
1, |
|
|
16, |
|
|
gin_channels=gin_channels, |
|
|
) |
|
|
self.flow = ResidualCouplingBlock( |
|
|
inter_channels, hidden_channels, 5, 1, 3, gin_channels=gin_channels |
|
|
) |
|
|
self.emb_g = nn.Embedding(self.spk_embed_dim, gin_channels) |
|
|
|
|
|
def remove_weight_norm(self): |
|
|
self.dec.remove_weight_norm() |
|
|
self.flow.remove_weight_norm() |
|
|
self.enc_q.remove_weight_norm() |
|
|
|
|
|
def __prepare_scriptable__(self): |
|
|
for hook in self.dec._forward_pre_hooks.values(): |
|
|
if ( |
|
|
hook.__module__ == "torch.nn.utils.parametrizations.weight_norm" |
|
|
and hook.__class__.__name__ == "_WeightNorm" |
|
|
): |
|
|
remove_weight_norm(self.dec) |
|
|
for hook in self.flow._forward_pre_hooks.values(): |
|
|
if ( |
|
|
hook.__module__ == "torch.nn.utils.parametrizations.weight_norm" |
|
|
and hook.__class__.__name__ == "_WeightNorm" |
|
|
): |
|
|
remove_weight_norm(self.flow) |
|
|
if hasattr(self, "enc_q"): |
|
|
for hook in self.enc_q._forward_pre_hooks.values(): |
|
|
if ( |
|
|
hook.__module__ == "torch.nn.utils.parametrizations.weight_norm" |
|
|
and hook.__class__.__name__ == "_WeightNorm" |
|
|
): |
|
|
remove_weight_norm(self.enc_q) |
|
|
return self |
|
|
|
|
|
@torch.jit.ignore |
|
|
def forward( |
|
|
self, |
|
|
phone: torch.Tensor, |
|
|
phone_lengths: torch.Tensor, |
|
|
pitch: Optional[torch.Tensor] = None, |
|
|
pitchf: Optional[torch.Tensor] = None, |
|
|
y: torch.Tensor = None, |
|
|
y_lengths: torch.Tensor = None, |
|
|
ds: Optional[torch.Tensor] = None, |
|
|
): |
|
|
g = self.emb_g(ds).unsqueeze(-1) |
|
|
m_p, logs_p, x_mask = self.enc_p(phone, pitch, phone_lengths) |
|
|
if y is not None: |
|
|
z, m_q, logs_q, y_mask = self.enc_q(y, y_lengths, g=g) |
|
|
z_p = self.flow(z, y_mask, g=g) |
|
|
z_slice, ids_slice = rand_slice_segments(z, y_lengths, self.segment_size) |
|
|
if self.use_f0: |
|
|
pitchf = slice_segments(pitchf, ids_slice, self.segment_size, 2) |
|
|
o = self.dec(z_slice, pitchf, g=g) |
|
|
else: |
|
|
o = self.dec(z_slice, g=g) |
|
|
return o, ids_slice, x_mask, y_mask, (z, z_p, m_p, logs_p, m_q, logs_q) |
|
|
else: |
|
|
return None, None, x_mask, None, (None, None, m_p, logs_p, None, None) |
|
|
|
|
|
@torch.jit.export |
|
|
def infer( |
|
|
self, |
|
|
phone: torch.Tensor, |
|
|
phone_lengths: torch.Tensor, |
|
|
pitch: Optional[torch.Tensor] = None, |
|
|
nsff0: Optional[torch.Tensor] = None, |
|
|
sid: torch.Tensor = None, |
|
|
rate: Optional[torch.Tensor] = None, |
|
|
): |
|
|
g = self.emb_g(sid).unsqueeze(-1) |
|
|
m_p, logs_p, x_mask = self.enc_p(phone, pitch, phone_lengths) |
|
|
z_p = (m_p + torch.exp(logs_p) * torch.randn_like(m_p) * 0.66666) * x_mask |
|
|
if rate is not None: |
|
|
assert isinstance(rate, torch.Tensor) |
|
|
head = int(z_p.shape[2] * (1.0 - rate.item())) |
|
|
z_p = z_p[:, :, head:] |
|
|
x_mask = x_mask[:, :, head:] |
|
|
if self.use_f0: |
|
|
nsff0 = nsff0[:, head:] |
|
|
if self.use_f0: |
|
|
z = self.flow(z_p, x_mask, g=g, reverse=True) |
|
|
o = self.dec(z * x_mask, nsff0, g=g) |
|
|
else: |
|
|
z = self.flow(z_p, x_mask, g=g, reverse=True) |
|
|
o = self.dec(z * x_mask, g=g) |
|
|
return o, x_mask, (z, z_p, m_p, logs_p) |
|
|
|
|
|
'''], |
|
|
"residuals" : ["residuals.py", ''' |
|
|
import torch |
|
|
from torch import nn |
|
|
from torch.nn import functional as F |
|
|
from torch.nn.utils.weight_norm import remove_weight_norm |
|
|
from torch.nn.utils.parametrizations import weight_norm |
|
|
from typing import Optional |
|
|
|
|
|
from .commons import get_padding, init_weights |
|
|
from .modules import WaveNet |
|
|
|
|
|
|
|
|
LRELU_SLOPE = 0.1 |
|
|
|
|
|
|
|
|
def create_conv1d_layer(channels, kernel_size, dilation): |
|
|
return weight_norm( |
|
|
nn.Conv1d( |
|
|
channels, |
|
|
channels, |
|
|
kernel_size, |
|
|
1, |
|
|
dilation=dilation, |
|
|
padding=get_padding(kernel_size, dilation), |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
def apply_mask(tensor, mask): |
|
|
return tensor * mask if mask is not None else tensor |
|
|
|
|
|
|
|
|
class ResBlockBase(nn.Module): |
|
|
def __init__(self, channels, kernel_size, dilations): |
|
|
super(ResBlockBase, self).__init__() |
|
|
self.convs1 = nn.ModuleList( |
|
|
[create_conv1d_layer(channels, kernel_size, d) for d in dilations] |
|
|
) |
|
|
self.convs1.apply(init_weights) |
|
|
|
|
|
self.convs2 = nn.ModuleList( |
|
|
[create_conv1d_layer(channels, kernel_size, 1) for _ in dilations] |
|
|
) |
|
|
self.convs2.apply(init_weights) |
|
|
|
|
|
def forward(self, x, x_mask=None): |
|
|
for c1, c2 in zip(self.convs1, self.convs2): |
|
|
xt = F.leaky_relu(x, LRELU_SLOPE) |
|
|
xt = apply_mask(xt, x_mask) |
|
|
xt = F.leaky_relu(c1(xt), LRELU_SLOPE) |
|
|
xt = apply_mask(xt, x_mask) |
|
|
xt = c2(xt) |
|
|
x = xt + x |
|
|
return apply_mask(x, x_mask) |
|
|
|
|
|
def remove_weight_norm(self): |
|
|
for conv in self.convs1 + self.convs2: |
|
|
remove_weight_norm(conv) |
|
|
|
|
|
|
|
|
class ResBlock1(ResBlockBase): |
|
|
def __init__(self, channels, kernel_size=3, dilation=(1, 3, 5)): |
|
|
super(ResBlock1, self).__init__(channels, kernel_size, dilation) |
|
|
|
|
|
|
|
|
class ResBlock2(ResBlockBase): |
|
|
def __init__(self, channels, kernel_size=3, dilation=(1, 3)): |
|
|
super(ResBlock2, self).__init__(channels, kernel_size, dilation) |
|
|
|
|
|
|
|
|
class Log(nn.Module): |
|
|
def forward(self, x, x_mask, reverse=False, **kwargs): |
|
|
if not reverse: |
|
|
y = torch.log(torch.clamp_min(x, 1e-5)) * x_mask |
|
|
logdet = torch.sum(-y, [1, 2]) |
|
|
return y, logdet |
|
|
else: |
|
|
x = torch.exp(x) * x_mask |
|
|
return x |
|
|
|
|
|
|
|
|
class Flip(nn.Module): |
|
|
def forward(self, x, *args, reverse=False, **kwargs): |
|
|
x = torch.flip(x, [1]) |
|
|
if not reverse: |
|
|
logdet = torch.zeros(x.size(0)).to(dtype=x.dtype, device=x.device) |
|
|
return x, logdet |
|
|
else: |
|
|
return x |
|
|
|
|
|
|
|
|
class ElementwiseAffine(nn.Module): |
|
|
def __init__(self, channels): |
|
|
super().__init__() |
|
|
self.channels = channels |
|
|
self.m = nn.Parameter(torch.zeros(channels, 1)) |
|
|
self.logs = nn.Parameter(torch.zeros(channels, 1)) |
|
|
|
|
|
def forward(self, x, x_mask, reverse=False, **kwargs): |
|
|
if not reverse: |
|
|
y = self.m + torch.exp(self.logs) * x |
|
|
y = y * x_mask |
|
|
logdet = torch.sum(self.logs * x_mask, [1, 2]) |
|
|
return y, logdet |
|
|
else: |
|
|
x = (x - self.m) * torch.exp(-self.logs) * x_mask |
|
|
return x |
|
|
|
|
|
|
|
|
class ResidualCouplingBlock(nn.Module): |
|
|
def __init__( |
|
|
self, |
|
|
channels, |
|
|
hidden_channels, |
|
|
kernel_size, |
|
|
dilation_rate, |
|
|
n_layers, |
|
|
n_flows=4, |
|
|
gin_channels=0, |
|
|
): |
|
|
super(ResidualCouplingBlock, self).__init__() |
|
|
self.channels = channels |
|
|
self.hidden_channels = hidden_channels |
|
|
self.kernel_size = kernel_size |
|
|
self.dilation_rate = dilation_rate |
|
|
self.n_layers = n_layers |
|
|
self.n_flows = n_flows |
|
|
self.gin_channels = gin_channels |
|
|
|
|
|
self.flows = nn.ModuleList() |
|
|
for i in range(n_flows): |
|
|
self.flows.append( |
|
|
ResidualCouplingLayer( |
|
|
channels, |
|
|
hidden_channels, |
|
|
kernel_size, |
|
|
dilation_rate, |
|
|
n_layers, |
|
|
gin_channels=gin_channels, |
|
|
mean_only=True, |
|
|
) |
|
|
) |
|
|
self.flows.append(Flip()) |
|
|
|
|
|
def forward( |
|
|
self, |
|
|
x: torch.Tensor, |
|
|
x_mask: torch.Tensor, |
|
|
g: Optional[torch.Tensor] = None, |
|
|
reverse: bool = False, |
|
|
): |
|
|
if not reverse: |
|
|
for flow in self.flows: |
|
|
x, _ = flow(x, x_mask, g=g, reverse=reverse) |
|
|
else: |
|
|
for flow in reversed(self.flows): |
|
|
x = flow.forward(x, x_mask, g=g, reverse=reverse) |
|
|
return x |
|
|
|
|
|
def remove_weight_norm(self): |
|
|
for i in range(self.n_flows): |
|
|
self.flows[i * 2].remove_weight_norm() |
|
|
|
|
|
def __prepare_scriptable__(self): |
|
|
for i in range(self.n_flows): |
|
|
for hook in self.flows[i * 2]._forward_pre_hooks.values(): |
|
|
if ( |
|
|
hook.__module__ == "torch.nn.utils.parametrizations.weight_norm" |
|
|
and hook.__class__.__name__ == "_WeightNorm" |
|
|
): |
|
|
remove_weight_norm(self.flows[i * 2]) |
|
|
|
|
|
return self |
|
|
|
|
|
|
|
|
class ResidualCouplingLayer(nn.Module): |
|
|
def __init__( |
|
|
self, |
|
|
channels, |
|
|
hidden_channels, |
|
|
kernel_size, |
|
|
dilation_rate, |
|
|
n_layers, |
|
|
p_dropout=0, |
|
|
gin_channels=0, |
|
|
mean_only=False, |
|
|
): |
|
|
assert channels % 2 == 0, "channels should be divisible by 2" |
|
|
super().__init__() |
|
|
self.channels = channels |
|
|
self.hidden_channels = hidden_channels |
|
|
self.kernel_size = kernel_size |
|
|
self.dilation_rate = dilation_rate |
|
|
self.n_layers = n_layers |
|
|
self.half_channels = channels // 2 |
|
|
self.mean_only = mean_only |
|
|
|
|
|
self.pre = nn.Conv1d(self.half_channels, hidden_channels, 1) |
|
|
self.enc = WaveNet( |
|
|
hidden_channels, |
|
|
kernel_size, |
|
|
dilation_rate, |
|
|
n_layers, |
|
|
p_dropout=p_dropout, |
|
|
gin_channels=gin_channels, |
|
|
) |
|
|
self.post = nn.Conv1d(hidden_channels, self.half_channels * (2 - mean_only), 1) |
|
|
self.post.weight.data.zero_() |
|
|
self.post.bias.data.zero_() |
|
|
|
|
|
def forward(self, x, x_mask, g=None, reverse=False): |
|
|
x0, x1 = torch.split(x, [self.half_channels] * 2, 1) |
|
|
h = self.pre(x0) * x_mask |
|
|
h = self.enc(h, x_mask, g=g) |
|
|
stats = self.post(h) * x_mask |
|
|
if not self.mean_only: |
|
|
m, logs = torch.split(stats, [self.half_channels] * 2, 1) |
|
|
else: |
|
|
m = stats |
|
|
logs = torch.zeros_like(m) |
|
|
|
|
|
if not reverse: |
|
|
x1 = m + x1 * torch.exp(logs) * x_mask |
|
|
x = torch.cat([x0, x1], 1) |
|
|
logdet = torch.sum(logs, [1, 2]) |
|
|
return x, logdet |
|
|
else: |
|
|
x1 = (x1 - m) * torch.exp(-logs) * x_mask |
|
|
x = torch.cat([x0, x1], 1) |
|
|
return x |
|
|
|
|
|
def remove_weight_norm(self): |
|
|
self.enc.remove_weight_norm() |
|
|
|
|
|
'''], |
|
|
"nsf" : ["nsf.py", ''' |
|
|
import math |
|
|
import torch |
|
|
from torch import nn |
|
|
from torch.nn import functional as F |
|
|
from torch.nn.utils.weight_norm import remove_weight_norm |
|
|
from torch.nn.utils.parametrizations import weight_norm |
|
|
from typing import Optional |
|
|
|
|
|
from .commons import init_weights |
|
|
from .generators import SineGen |
|
|
from .residuals import LRELU_SLOPE, ResBlock1, ResBlock2 |
|
|
|
|
|
|
|
|
class SourceModuleHnNSF(nn.Module): |
|
|
def __init__( |
|
|
self, |
|
|
sample_rate, |
|
|
harmonic_num=0, |
|
|
sine_amp=0.1, |
|
|
add_noise_std=0.003, |
|
|
voiced_threshod=0, |
|
|
is_half=True, |
|
|
): |
|
|
super(SourceModuleHnNSF, self).__init__() |
|
|
|
|
|
self.sine_amp = sine_amp |
|
|
self.noise_std = add_noise_std |
|
|
self.is_half = is_half |
|
|
|
|
|
self.l_sin_gen = SineGen( |
|
|
sample_rate, harmonic_num, sine_amp, add_noise_std, voiced_threshod |
|
|
) |
|
|
self.l_linear = nn.Linear(harmonic_num + 1, 1) |
|
|
self.l_tanh = nn.Tanh() |
|
|
|
|
|
def forward(self, x: torch.Tensor, upsample_factor: int = 1): |
|
|
sine_wavs, uv, _ = self.l_sin_gen(x, upsample_factor) |
|
|
sine_wavs = sine_wavs.to(dtype=self.l_linear.weight.dtype) |
|
|
sine_merge = self.l_tanh(self.l_linear(sine_wavs)) |
|
|
return sine_merge, None, None |
|
|
|
|
|
|
|
|
class GeneratorNSF(nn.Module): |
|
|
def __init__( |
|
|
self, |
|
|
initial_channel, |
|
|
resblock, |
|
|
resblock_kernel_sizes, |
|
|
resblock_dilation_sizes, |
|
|
upsample_rates, |
|
|
upsample_initial_channel, |
|
|
upsample_kernel_sizes, |
|
|
gin_channels, |
|
|
sr, |
|
|
is_half=False, |
|
|
): |
|
|
super(GeneratorNSF, self).__init__() |
|
|
|
|
|
self.num_kernels = len(resblock_kernel_sizes) |
|
|
self.num_upsamples = len(upsample_rates) |
|
|
self.f0_upsamp = nn.Upsample(scale_factor=math.prod(upsample_rates)) |
|
|
self.m_source = SourceModuleHnNSF(sample_rate=sr, harmonic_num=0, is_half=is_half) |
|
|
|
|
|
self.conv_pre = nn.Conv1d( |
|
|
initial_channel, upsample_initial_channel, 7, 1, padding=3 |
|
|
) |
|
|
resblock_cls = ResBlock1 if resblock == "1" else ResBlock2 |
|
|
|
|
|
self.ups = nn.ModuleList() |
|
|
self.noise_convs = nn.ModuleList() |
|
|
|
|
|
channels = [ |
|
|
upsample_initial_channel // (2 ** (i + 1)) for i in range(len(upsample_rates)) |
|
|
] |
|
|
stride_f0s = [ |
|
|
math.prod(upsample_rates[i + 1 :]) if i + 1 < len(upsample_rates) else 1 |
|
|
for i in range(len(upsample_rates)) |
|
|
] |
|
|
|
|
|
for i, (u, k) in enumerate(zip(upsample_rates, upsample_kernel_sizes)): |
|
|
self.ups.append( |
|
|
weight_norm( |
|
|
nn.ConvTranspose1d( |
|
|
upsample_initial_channel // (2**i), |
|
|
channels[i], |
|
|
k, |
|
|
u, |
|
|
padding=(k - u) // 2, |
|
|
) |
|
|
) |
|
|
) |
|
|
|
|
|
self.noise_convs.append( |
|
|
nn.Conv1d( |
|
|
1, |
|
|
channels[i], |
|
|
kernel_size=(stride_f0s[i] * 2 if stride_f0s[i] > 1 else 1), |
|
|
stride=stride_f0s[i], |
|
|
padding=(stride_f0s[i] // 2 if stride_f0s[i] > 1 else 0), |
|
|
) |
|
|
) |
|
|
|
|
|
self.resblocks = nn.ModuleList( |
|
|
[ |
|
|
resblock_cls(channels[i], k, d) |
|
|
for i in range(len(self.ups)) |
|
|
for k, d in zip(resblock_kernel_sizes, resblock_dilation_sizes) |
|
|
] |
|
|
) |
|
|
|
|
|
self.conv_post = nn.Conv1d(channels[-1], 1, 7, 1, padding=3, bias=False) |
|
|
self.ups.apply(init_weights) |
|
|
|
|
|
if gin_channels != 0: |
|
|
self.cond = nn.Conv1d(gin_channels, upsample_initial_channel, 1) |
|
|
|
|
|
self.upp = math.prod(upsample_rates) |
|
|
self.lrelu_slope = LRELU_SLOPE |
|
|
|
|
|
def forward(self, x, f0, g: Optional[torch.Tensor] = None): |
|
|
har_source, _, _ = self.m_source(f0, self.upp) |
|
|
har_source = har_source.transpose(1, 2) |
|
|
x = self.conv_pre(x) |
|
|
|
|
|
if g is not None: |
|
|
x = x + self.cond(g) |
|
|
|
|
|
for i, (ups, noise_convs) in enumerate(zip(self.ups, self.noise_convs)): |
|
|
x = F.leaky_relu(x, self.lrelu_slope) |
|
|
x = ups(x) |
|
|
x = x + noise_convs(har_source) |
|
|
|
|
|
xs = sum( |
|
|
[ |
|
|
resblock(x) |
|
|
for j, resblock in enumerate(self.resblocks) |
|
|
if j in range(i * self.num_kernels, (i + 1) * self.num_kernels) |
|
|
] |
|
|
) |
|
|
x = xs / self.num_kernels |
|
|
|
|
|
x = F.leaky_relu(x) |
|
|
x = torch.tanh(self.conv_post(x)) |
|
|
return x |
|
|
|
|
|
def remove_weight_norm(self): |
|
|
for l in self.ups: |
|
|
remove_weight_norm(l) |
|
|
for l in self.resblocks: |
|
|
l.remove_weight_norm() |
|
|
|
|
|
def __prepare_scriptable__(self): |
|
|
for l in self.ups: |
|
|
for hook in l._forward_pre_hooks.values(): |
|
|
if ( |
|
|
hook.__module__ == "torch.nn.utils.parametrizations.weight_norm" |
|
|
and hook.__class__.__name__ == "_WeightNorm" |
|
|
): |
|
|
remove_weight_norm(l) |
|
|
for l in self.resblocks: |
|
|
for hook in l._forward_pre_hooks.values(): |
|
|
if ( |
|
|
hook.__module__ == "torch.nn.utils.parametrizations.weight_norm" |
|
|
and hook.__class__.__name__ == "_WeightNorm" |
|
|
): |
|
|
remove_weight_norm(l) |
|
|
return self |
|
|
|
|
|
'''], |
|
|
"normalization" : ["normalization.py", ''' |
|
|
import torch |
|
|
from torch import nn |
|
|
from torch.nn import functional as F |
|
|
|
|
|
|
|
|
class LayerNorm(nn.Module): |
|
|
def __init__(self, channels, eps=1e-5): |
|
|
super().__init__() |
|
|
self.eps = eps |
|
|
self.gamma = nn.Parameter(torch.ones(channels)) |
|
|
self.beta = nn.Parameter(torch.zeros(channels)) |
|
|
|
|
|
def forward(self, x): |
|
|
x = x.transpose(1, -1) |
|
|
x = F.layer_norm(x, (x.size(-1),), self.gamma, self.beta, self.eps) |
|
|
return x.transpose(1, -1) |
|
|
'''], |
|
|
"modules" : ["modules.py", ''' |
|
|
import torch |
|
|
from torch import nn |
|
|
from torch.nn.utils.weight_norm import remove_weight_norm |
|
|
from torch.nn.utils.parametrizations import weight_norm |
|
|
|
|
|
from .commons import fused_add_tanh_sigmoid_multiply |
|
|
|
|
|
|
|
|
class WaveNet(nn.Module): |
|
|
def __init__( |
|
|
self, |
|
|
hidden_channels, |
|
|
kernel_size, |
|
|
dilation_rate, |
|
|
n_layers, |
|
|
gin_channels=0, |
|
|
p_dropout=0, |
|
|
): |
|
|
super(WaveNet, self).__init__() |
|
|
assert kernel_size % 2 == 1 |
|
|
self.hidden_channels = hidden_channels |
|
|
self.kernel_size = (kernel_size,) |
|
|
self.dilation_rate = dilation_rate |
|
|
self.n_layers = n_layers |
|
|
self.gin_channels = gin_channels |
|
|
self.p_dropout = p_dropout |
|
|
|
|
|
self.in_layers = nn.ModuleList() |
|
|
self.res_skip_layers = nn.ModuleList() |
|
|
self.drop = nn.Dropout(p_dropout) |
|
|
|
|
|
if gin_channels != 0: |
|
|
cond_layer = nn.Conv1d(gin_channels, 2 * hidden_channels * n_layers, 1) |
|
|
self.cond_layer = weight_norm(cond_layer, name="weight") |
|
|
|
|
|
dilations = [dilation_rate**i for i in range(n_layers)] |
|
|
paddings = [(kernel_size * d - d) // 2 for d in dilations] |
|
|
|
|
|
for i in range(n_layers): |
|
|
in_layer = nn.Conv1d( |
|
|
hidden_channels, |
|
|
2 * hidden_channels, |
|
|
kernel_size, |
|
|
dilation=dilations[i], |
|
|
padding=paddings[i], |
|
|
) |
|
|
in_layer = weight_norm(in_layer, name="weight") |
|
|
self.in_layers.append(in_layer) |
|
|
|
|
|
res_skip_channels = ( |
|
|
hidden_channels if i == n_layers - 1 else 2 * hidden_channels |
|
|
) |
|
|
|
|
|
res_skip_layer = nn.Conv1d(hidden_channels, res_skip_channels, 1) |
|
|
res_skip_layer = weight_norm(res_skip_layer, name="weight") |
|
|
self.res_skip_layers.append(res_skip_layer) |
|
|
|
|
|
def forward(self, x, x_mask, g=None, **kwargs): |
|
|
output = torch.zeros_like(x) |
|
|
n_channels_tensor = torch.IntTensor([self.hidden_channels]) |
|
|
|
|
|
if g is not None: |
|
|
g = self.cond_layer(g) |
|
|
|
|
|
for i in range(self.n_layers): |
|
|
x_in = self.in_layers[i](x) |
|
|
if g is not None: |
|
|
cond_offset = i * 2 * self.hidden_channels |
|
|
g_l = g[:, cond_offset : cond_offset + 2 * self.hidden_channels, :] |
|
|
else: |
|
|
g_l = torch.zeros_like(x_in) |
|
|
|
|
|
acts = fused_add_tanh_sigmoid_multiply(x_in, g_l, n_channels_tensor) |
|
|
|
|
|
acts = self.drop(acts) |
|
|
|
|
|
res_skip_acts = self.res_skip_layers[i](acts) |
|
|
if i < self.n_layers - 1: |
|
|
res_acts = res_skip_acts[:, : self.hidden_channels, :] |
|
|
x = (x + res_acts) * x_mask |
|
|
output = output + res_skip_acts[:, self.hidden_channels :, :] |
|
|
else: |
|
|
output = output + res_skip_acts |
|
|
return output * x_mask |
|
|
|
|
|
def remove_weight_norm(self): |
|
|
if self.gin_channels != 0: |
|
|
remove_weight_norm(self.cond_layer) |
|
|
for l in self.in_layers: |
|
|
remove_weight_norm(l) |
|
|
for l in self.res_skip_layers: |
|
|
remove_weight_norm(l) |
|
|
|
|
|
'''], |
|
|
"generators" : ["generators.py", ''' |
|
|
import torch |
|
|
from torch import nn |
|
|
from torch.nn import functional as F |
|
|
from torch.nn.utils.weight_norm import remove_weight_norm |
|
|
from torch.nn.utils.parametrizations import weight_norm |
|
|
from typing import Optional |
|
|
|
|
|
from .commons import init_weights |
|
|
from .residuals import LRELU_SLOPE, ResBlock1, ResBlock2 |
|
|
|
|
|
|
|
|
class Generator(nn.Module): |
|
|
def __init__( |
|
|
self, |
|
|
initial_channel, |
|
|
resblock, |
|
|
resblock_kernel_sizes, |
|
|
resblock_dilation_sizes, |
|
|
upsample_rates, |
|
|
upsample_initial_channel, |
|
|
upsample_kernel_sizes, |
|
|
gin_channels=0, |
|
|
): |
|
|
super(Generator, self).__init__() |
|
|
self.num_kernels = len(resblock_kernel_sizes) |
|
|
self.num_upsamples = len(upsample_rates) |
|
|
self.conv_pre = nn.Conv1d( |
|
|
initial_channel, upsample_initial_channel, 7, 1, padding=3 |
|
|
) |
|
|
resblock = ResBlock1 if resblock == "1" else ResBlock2 |
|
|
|
|
|
self.ups_and_resblocks = nn.ModuleList() |
|
|
for i, (u, k) in enumerate(zip(upsample_rates, upsample_kernel_sizes)): |
|
|
self.ups_and_resblocks.append( |
|
|
weight_norm( |
|
|
nn.ConvTranspose1d( |
|
|
upsample_initial_channel // (2**i), |
|
|
upsample_initial_channel // (2 ** (i + 1)), |
|
|
k, |
|
|
u, |
|
|
padding=(k - u) // 2, |
|
|
) |
|
|
) |
|
|
) |
|
|
ch = upsample_initial_channel // (2 ** (i + 1)) |
|
|
for j, (k, d) in enumerate( |
|
|
zip(resblock_kernel_sizes, resblock_dilation_sizes) |
|
|
): |
|
|
self.ups_and_resblocks.append(resblock(ch, k, d)) |
|
|
|
|
|
self.conv_post = nn.Conv1d(ch, 1, 7, 1, padding=3, bias=False) |
|
|
self.ups_and_resblocks.apply(init_weights) |
|
|
|
|
|
if gin_channels != 0: |
|
|
self.cond = nn.Conv1d(gin_channels, upsample_initial_channel, 1) |
|
|
|
|
|
def forward(self, x: torch.Tensor, g: Optional[torch.Tensor] = None): |
|
|
x = self.conv_pre(x) |
|
|
if g is not None: |
|
|
x = x + self.cond(g) |
|
|
|
|
|
resblock_idx = 0 |
|
|
for _ in range(self.num_upsamples): |
|
|
x = F.leaky_relu(x, LRELU_SLOPE) |
|
|
x = self.ups_and_resblocks[resblock_idx](x) |
|
|
resblock_idx += 1 |
|
|
xs = 0 |
|
|
for _ in range(self.num_kernels): |
|
|
xs += self.ups_and_resblocks[resblock_idx](x) |
|
|
resblock_idx += 1 |
|
|
x = xs / self.num_kernels |
|
|
|
|
|
x = F.leaky_relu(x) |
|
|
x = self.conv_post(x) |
|
|
x = torch.tanh(x) |
|
|
|
|
|
return x |
|
|
|
|
|
def __prepare_scriptable__(self): |
|
|
for l in self.ups_and_resblocks: |
|
|
for hook in l._forward_pre_hooks.values(): |
|
|
if ( |
|
|
hook.__module__ == "torch.nn.utils.parametrizations.weight_norm" |
|
|
and hook.__class__.__name__ == "_WeightNorm" |
|
|
): |
|
|
remove_weight_norm(l) |
|
|
return self |
|
|
|
|
|
def remove_weight_norm(self): |
|
|
for l in self.ups_and_resblocks: |
|
|
remove_weight_norm(l) |
|
|
|
|
|
|
|
|
class SineGen(nn.Module): |
|
|
def __init__( |
|
|
self, |
|
|
samp_rate, |
|
|
harmonic_num=0, |
|
|
sine_amp=0.1, |
|
|
noise_std=0.003, |
|
|
voiced_threshold=0, |
|
|
flag_for_pulse=False, |
|
|
): |
|
|
super(SineGen, self).__init__() |
|
|
self.sine_amp = sine_amp |
|
|
self.noise_std = noise_std |
|
|
self.harmonic_num = harmonic_num |
|
|
self.dim = self.harmonic_num + 1 |
|
|
self.sample_rate = samp_rate |
|
|
self.voiced_threshold = voiced_threshold |
|
|
|
|
|
def _f02uv(self, f0): |
|
|
uv = torch.ones_like(f0) |
|
|
uv = uv * (f0 > self.voiced_threshold) |
|
|
return uv |
|
|
|
|
|
def forward(self, f0: torch.Tensor, upp: int): |
|
|
with torch.no_grad(): |
|
|
f0 = f0[:, None].transpose(1, 2) |
|
|
f0_buf = torch.zeros(f0.shape[0], f0.shape[1], self.dim, device=f0.device) |
|
|
f0_buf[:, :, 0] = f0[:, :, 0] |
|
|
f0_buf[:, :, 1:] = ( |
|
|
f0_buf[:, :, 0:1] |
|
|
* torch.arange(2, self.harmonic_num + 2, device=f0.device)[None, None, :] |
|
|
) |
|
|
rad_values = (f0_buf / float(self.sample_rate)) % 1 |
|
|
rand_ini = torch.rand(f0_buf.shape[0], f0_buf.shape[2], device=f0_buf.device) |
|
|
rand_ini[:, 0] = 0 |
|
|
rad_values[:, 0, :] = rad_values[:, 0, :] + rand_ini |
|
|
tmp_over_one = torch.cumsum(rad_values, 1) |
|
|
tmp_over_one *= upp |
|
|
tmp_over_one = F.interpolate( |
|
|
tmp_over_one.transpose(2, 1), |
|
|
scale_factor=float(upp), |
|
|
mode="linear", |
|
|
align_corners=True, |
|
|
).transpose(2, 1) |
|
|
rad_values = F.interpolate( |
|
|
rad_values.transpose(2, 1), scale_factor=float(upp), mode="nearest" |
|
|
).transpose(2, 1) |
|
|
tmp_over_one %= 1 |
|
|
tmp_over_one_idx = (tmp_over_one[:, 1:, :] - tmp_over_one[:, :-1, :]) < 0 |
|
|
cumsum_shift = torch.zeros_like(rad_values) |
|
|
cumsum_shift[:, 1:, :] = tmp_over_one_idx * -1.0 |
|
|
sine_waves = torch.sin( |
|
|
torch.cumsum(rad_values + cumsum_shift, dim=1) * 2 * torch.pi |
|
|
) |
|
|
sine_waves = sine_waves * self.sine_amp |
|
|
uv = self._f02uv(f0) |
|
|
uv = F.interpolate( |
|
|
uv.transpose(2, 1), scale_factor=float(upp), mode="nearest" |
|
|
).transpose(2, 1) |
|
|
noise_amp = uv * self.noise_std + (1 - uv) * self.sine_amp / 3 |
|
|
noise = noise_amp * torch.randn_like(sine_waves) |
|
|
sine_waves = sine_waves * uv + noise |
|
|
return sine_waves, uv, noise |
|
|
|
|
|
'''], |
|
|
"encoders" : ["encoders.py", ''' |
|
|
import math |
|
|
import torch |
|
|
from torch import nn |
|
|
from torch.nn.utils.weight_norm import remove_weight_norm |
|
|
from typing import Optional |
|
|
|
|
|
from .attentions import FFN, MultiHeadAttention |
|
|
from .commons import sequence_mask |
|
|
from .modules import WaveNet |
|
|
from .normalization import LayerNorm |
|
|
|
|
|
|
|
|
class Encoder(nn.Module): |
|
|
def __init__( |
|
|
self, |
|
|
hidden_channels, |
|
|
filter_channels, |
|
|
n_heads, |
|
|
n_layers, |
|
|
kernel_size=1, |
|
|
p_dropout=0.0, |
|
|
window_size=10, |
|
|
**kwargs |
|
|
): |
|
|
super().__init__() |
|
|
self.hidden_channels = hidden_channels |
|
|
self.filter_channels = filter_channels |
|
|
self.n_heads = n_heads |
|
|
self.n_layers = n_layers |
|
|
self.kernel_size = kernel_size |
|
|
self.p_dropout = p_dropout |
|
|
self.window_size = window_size |
|
|
|
|
|
self.drop = nn.Dropout(p_dropout) |
|
|
self.attn_layers = nn.ModuleList() |
|
|
self.norm_layers_1 = nn.ModuleList() |
|
|
self.ffn_layers = nn.ModuleList() |
|
|
self.norm_layers_2 = nn.ModuleList() |
|
|
for i in range(self.n_layers): |
|
|
self.attn_layers.append( |
|
|
MultiHeadAttention( |
|
|
hidden_channels, |
|
|
hidden_channels, |
|
|
n_heads, |
|
|
p_dropout=p_dropout, |
|
|
window_size=window_size, |
|
|
) |
|
|
) |
|
|
self.norm_layers_1.append(LayerNorm(hidden_channels)) |
|
|
self.ffn_layers.append( |
|
|
FFN( |
|
|
hidden_channels, |
|
|
hidden_channels, |
|
|
filter_channels, |
|
|
kernel_size, |
|
|
p_dropout=p_dropout, |
|
|
) |
|
|
) |
|
|
self.norm_layers_2.append(LayerNorm(hidden_channels)) |
|
|
|
|
|
def forward(self, x, x_mask): |
|
|
attn_mask = x_mask.unsqueeze(2) * x_mask.unsqueeze(-1) |
|
|
x = x * x_mask |
|
|
for i in range(self.n_layers): |
|
|
y = self.attn_layers[i](x, x, attn_mask) |
|
|
y = self.drop(y) |
|
|
x = self.norm_layers_1[i](x + y) |
|
|
|
|
|
y = self.ffn_layers[i](x, x_mask) |
|
|
y = self.drop(y) |
|
|
x = self.norm_layers_2[i](x + y) |
|
|
x = x * x_mask |
|
|
return x |
|
|
|
|
|
|
|
|
class TextEncoder(nn.Module): |
|
|
def __init__( |
|
|
self, |
|
|
out_channels, |
|
|
hidden_channels, |
|
|
filter_channels, |
|
|
n_heads, |
|
|
n_layers, |
|
|
kernel_size, |
|
|
p_dropout, |
|
|
embedding_dim, |
|
|
f0=True, |
|
|
): |
|
|
super(TextEncoder, self).__init__() |
|
|
self.out_channels = out_channels |
|
|
self.hidden_channels = hidden_channels |
|
|
self.filter_channels = filter_channels |
|
|
self.n_heads = n_heads |
|
|
self.n_layers = n_layers |
|
|
self.kernel_size = kernel_size |
|
|
self.p_dropout = float(p_dropout) |
|
|
self.emb_phone = nn.Linear(embedding_dim, hidden_channels) |
|
|
self.lrelu = nn.LeakyReLU(0.1, inplace=True) |
|
|
if f0: |
|
|
self.emb_pitch = nn.Embedding(256, hidden_channels) |
|
|
self.encoder = Encoder( |
|
|
hidden_channels, |
|
|
filter_channels, |
|
|
n_heads, |
|
|
n_layers, |
|
|
kernel_size, |
|
|
float(p_dropout), |
|
|
) |
|
|
self.proj = nn.Conv1d(hidden_channels, out_channels * 2, 1) |
|
|
|
|
|
def forward( |
|
|
self, phone: torch.Tensor, pitch: Optional[torch.Tensor], lengths: torch.Tensor |
|
|
): |
|
|
if pitch is None: |
|
|
x = self.emb_phone(phone) |
|
|
else: |
|
|
x = self.emb_phone(phone) + self.emb_pitch(pitch) |
|
|
x = x * math.sqrt(self.hidden_channels) |
|
|
x = self.lrelu(x) |
|
|
x = torch.transpose(x, 1, -1) |
|
|
x_mask = torch.unsqueeze(sequence_mask(lengths, x.size(2)), 1).to(x.dtype) |
|
|
x = self.encoder(x * x_mask, x_mask) |
|
|
stats = self.proj(x) * x_mask |
|
|
|
|
|
m, logs = torch.split(stats, self.out_channels, dim=1) |
|
|
return m, logs, x_mask |
|
|
|
|
|
|
|
|
class PosteriorEncoder(nn.Module): |
|
|
def __init__( |
|
|
self, |
|
|
in_channels, |
|
|
out_channels, |
|
|
hidden_channels, |
|
|
kernel_size, |
|
|
dilation_rate, |
|
|
n_layers, |
|
|
gin_channels=0, |
|
|
): |
|
|
super(PosteriorEncoder, self).__init__() |
|
|
self.in_channels = in_channels |
|
|
self.out_channels = out_channels |
|
|
self.hidden_channels = hidden_channels |
|
|
self.kernel_size = kernel_size |
|
|
self.dilation_rate = dilation_rate |
|
|
self.n_layers = n_layers |
|
|
self.gin_channels = gin_channels |
|
|
|
|
|
self.pre = nn.Conv1d(in_channels, hidden_channels, 1) |
|
|
self.enc = WaveNet( |
|
|
hidden_channels, |
|
|
kernel_size, |
|
|
dilation_rate, |
|
|
n_layers, |
|
|
gin_channels=gin_channels, |
|
|
) |
|
|
self.proj = nn.Conv1d(hidden_channels, out_channels * 2, 1) |
|
|
|
|
|
def forward( |
|
|
self, x: torch.Tensor, x_lengths: torch.Tensor, g: Optional[torch.Tensor] = None |
|
|
): |
|
|
x_mask = torch.unsqueeze(sequence_mask(x_lengths, x.size(2)), 1).to(x.dtype) |
|
|
x = self.pre(x) * x_mask |
|
|
x = self.enc(x, x_mask, g=g) |
|
|
stats = self.proj(x) * x_mask |
|
|
m, logs = torch.split(stats, self.out_channels, dim=1) |
|
|
z = (m + torch.randn_like(m) * torch.exp(logs)) * x_mask |
|
|
return z, m, logs, x_mask |
|
|
|
|
|
def remove_weight_norm(self): |
|
|
self.enc.remove_weight_norm() |
|
|
|
|
|
def __prepare_scriptable__(self): |
|
|
for hook in self.enc._forward_pre_hooks.values(): |
|
|
if ( |
|
|
hook.__module__ == "torch.nn.utils.parametrizations.weight_norm" |
|
|
and hook.__class__.__name__ == "_WeightNorm" |
|
|
): |
|
|
remove_weight_norm(self.enc) |
|
|
return self |
|
|
|
|
|
'''], |
|
|
"discriminators" : ["discriminators.py", ''' |
|
|
import torch |
|
|
from torch import nn |
|
|
from torch.nn import functional as F |
|
|
from torch.nn.utils.parametrizations import spectral_norm, weight_norm |
|
|
|
|
|
from .commons import get_padding |
|
|
from .residuals import LRELU_SLOPE |
|
|
|
|
|
|
|
|
PERIODS_V1 = [2, 3, 5, 7, 11, 17] |
|
|
PERIODS_V2 = [2, 3, 5, 7, 11, 17, 23, 37] |
|
|
IN_CHANNELS = [1, 32, 128, 512, 1024] |
|
|
OUT_CHANNELS = [32, 128, 512, 1024, 1024] |
|
|
|
|
|
|
|
|
class MultiPeriodDiscriminator(nn.Module): |
|
|
def __init__(self, use_spectral_norm=False): |
|
|
super(MultiPeriodDiscriminator, self).__init__() |
|
|
self.discriminators = nn.ModuleList( |
|
|
[DiscriminatorS(use_spectral_norm=use_spectral_norm)] |
|
|
+ [DiscriminatorP(p, use_spectral_norm=use_spectral_norm) for p in PERIODS_V1] |
|
|
) |
|
|
|
|
|
def forward(self, y, y_hat): |
|
|
y_d_rs, y_d_gs, fmap_rs, fmap_gs = [], [], [], [] |
|
|
for d in self.discriminators: |
|
|
y_d_r, fmap_r = d(y) |
|
|
y_d_g, fmap_g = d(y_hat) |
|
|
y_d_rs.append(y_d_r) |
|
|
y_d_gs.append(y_d_g) |
|
|
fmap_rs.append(fmap_r) |
|
|
fmap_gs.append(fmap_g) |
|
|
|
|
|
return y_d_rs, y_d_gs, fmap_rs, fmap_gs |
|
|
|
|
|
|
|
|
class MultiPeriodDiscriminatorV2(nn.Module): |
|
|
def __init__(self, use_spectral_norm=False): |
|
|
super(MultiPeriodDiscriminatorV2, self).__init__() |
|
|
self.discriminators = nn.ModuleList( |
|
|
[DiscriminatorS(use_spectral_norm=use_spectral_norm)] |
|
|
+ [DiscriminatorP(p, use_spectral_norm=use_spectral_norm) for p in PERIODS_V2] |
|
|
) |
|
|
|
|
|
def forward(self, y, y_hat): |
|
|
y_d_rs, y_d_gs, fmap_rs, fmap_gs = [], [], [], [] |
|
|
for d in self.discriminators: |
|
|
y_d_r, fmap_r = d(y) |
|
|
y_d_g, fmap_g = d(y_hat) |
|
|
y_d_rs.append(y_d_r) |
|
|
y_d_gs.append(y_d_g) |
|
|
fmap_rs.append(fmap_r) |
|
|
fmap_gs.append(fmap_g) |
|
|
|
|
|
return y_d_rs, y_d_gs, fmap_rs, fmap_gs |
|
|
|
|
|
|
|
|
class DiscriminatorS(nn.Module): |
|
|
def __init__(self, use_spectral_norm=False): |
|
|
super(DiscriminatorS, self).__init__() |
|
|
norm_f = spectral_norm if use_spectral_norm else weight_norm |
|
|
self.convs = nn.ModuleList( |
|
|
[ |
|
|
norm_f(nn.Conv1d(1, 16, 15, 1, padding=7)), |
|
|
norm_f(nn.Conv1d(16, 64, 41, 4, groups=4, padding=20)), |
|
|
norm_f(nn.Conv1d(64, 256, 41, 4, groups=16, padding=20)), |
|
|
norm_f(nn.Conv1d(256, 1024, 41, 4, groups=64, padding=20)), |
|
|
norm_f(nn.Conv1d(1024, 1024, 41, 4, groups=256, padding=20)), |
|
|
norm_f(nn.Conv1d(1024, 1024, 5, 1, padding=2)), |
|
|
] |
|
|
) |
|
|
self.conv_post = norm_f(nn.Conv1d(1024, 1, 3, 1, padding=1)) |
|
|
self.lrelu = nn.LeakyReLU(LRELU_SLOPE) |
|
|
|
|
|
def forward(self, x): |
|
|
fmap = [] |
|
|
for conv in self.convs: |
|
|
x = self.lrelu(conv(x)) |
|
|
fmap.append(x) |
|
|
x = self.conv_post(x) |
|
|
fmap.append(x) |
|
|
x = torch.flatten(x, 1, -1) |
|
|
return x, fmap |
|
|
|
|
|
|
|
|
class DiscriminatorP(nn.Module): |
|
|
def __init__(self, period, kernel_size=5, stride=3, use_spectral_norm=False): |
|
|
super(DiscriminatorP, self).__init__() |
|
|
self.period = period |
|
|
norm_f = spectral_norm if use_spectral_norm else weight_norm |
|
|
|
|
|
self.convs = nn.ModuleList( |
|
|
[ |
|
|
norm_f( |
|
|
nn.Conv2d( |
|
|
in_ch, |
|
|
out_ch, |
|
|
(kernel_size, 1), |
|
|
(stride, 1), |
|
|
padding=(get_padding(kernel_size, 1), 0), |
|
|
) |
|
|
) |
|
|
for in_ch, out_ch in zip(IN_CHANNELS, OUT_CHANNELS) |
|
|
] |
|
|
) |
|
|
|
|
|
self.conv_post = norm_f(nn.Conv2d(1024, 1, (3, 1), 1, padding=(1, 0))) |
|
|
self.lrelu = nn.LeakyReLU(LRELU_SLOPE) |
|
|
|
|
|
def forward(self, x): |
|
|
fmap = [] |
|
|
b, c, t = x.shape |
|
|
if t % self.period != 0: |
|
|
n_pad = self.period - (t % self.period) |
|
|
x = F.pad(x, (0, n_pad), "reflect") |
|
|
x = x.view(b, c, -1, self.period) |
|
|
|
|
|
for conv in self.convs: |
|
|
x = self.lrelu(conv(x)) |
|
|
fmap.append(x) |
|
|
|
|
|
x = self.conv_post(x) |
|
|
fmap.append(x) |
|
|
x = torch.flatten(x, 1, -1) |
|
|
return x, fmap |
|
|
|
|
|
'''], |
|
|
"commons" : ["commons.py", ''' |
|
|
import math |
|
|
import torch |
|
|
from torch.nn import functional as F |
|
|
from typing import List, Optional |
|
|
|
|
|
|
|
|
def init_weights(m, mean=0.0, std=0.01): |
|
|
classname = m.__class__.__name__ |
|
|
if classname.find("Conv") != -1: |
|
|
m.weight.data.normal_(mean, std) |
|
|
|
|
|
|
|
|
def get_padding(kernel_size, dilation=1): |
|
|
return int((kernel_size * dilation - dilation) / 2) |
|
|
|
|
|
|
|
|
def convert_pad_shape(pad_shape): |
|
|
l = pad_shape[::-1] |
|
|
pad_shape = [item for sublist in l for item in sublist] |
|
|
return pad_shape |
|
|
|
|
|
|
|
|
def kl_divergence(m_p, logs_p, m_q, logs_q): |
|
|
kl = (logs_q - logs_p) - 0.5 |
|
|
kl += 0.5 * (torch.exp(2.0 * logs_p) + ((m_p - m_q) ** 2)) * torch.exp(-2.0 * logs_q) |
|
|
return kl |
|
|
|
|
|
|
|
|
def slice_segments( |
|
|
x: torch.Tensor, ids_str: torch.Tensor, segment_size: int = 4, dim: int = 2 |
|
|
): |
|
|
if dim == 2: |
|
|
ret = torch.zeros_like(x[:, :segment_size]) |
|
|
elif dim == 3: |
|
|
ret = torch.zeros_like(x[:, :, :segment_size]) |
|
|
|
|
|
for i in range(x.size(0)): |
|
|
idx_str = ids_str[i].item() |
|
|
idx_end = idx_str + segment_size |
|
|
if dim == 2: |
|
|
ret[i] = x[i, idx_str:idx_end] |
|
|
else: |
|
|
ret[i] = x[i, :, idx_str:idx_end] |
|
|
|
|
|
return ret |
|
|
|
|
|
|
|
|
def rand_slice_segments(x, x_lengths=None, segment_size=4): |
|
|
b, d, t = x.size() |
|
|
if x_lengths is None: |
|
|
x_lengths = t |
|
|
ids_str_max = x_lengths - segment_size + 1 |
|
|
ids_str = (torch.rand([b]).to(device=x.device) * ids_str_max).to(dtype=torch.long) |
|
|
ret = slice_segments(x, ids_str, segment_size, dim=3) |
|
|
return ret, ids_str |
|
|
|
|
|
|
|
|
def get_timing_signal_1d(length, channels, min_timescale=1.0, max_timescale=1.0e4): |
|
|
position = torch.arange(length, dtype=torch.float) |
|
|
num_timescales = channels // 2 |
|
|
log_timescale_increment = math.log(float(max_timescale) / float(min_timescale)) / ( |
|
|
num_timescales - 1 |
|
|
) |
|
|
inv_timescales = min_timescale * torch.exp( |
|
|
torch.arange(num_timescales, dtype=torch.float) * -log_timescale_increment |
|
|
) |
|
|
scaled_time = position.unsqueeze(0) * inv_timescales.unsqueeze(1) |
|
|
signal = torch.cat([torch.sin(scaled_time), torch.cos(scaled_time)], 0) |
|
|
signal = F.pad(signal, [0, 0, 0, channels % 2]) |
|
|
signal = signal.view(1, channels, length) |
|
|
return signal |
|
|
|
|
|
|
|
|
def subsequent_mask(length): |
|
|
mask = torch.tril(torch.ones(length, length)).unsqueeze(0).unsqueeze(0) |
|
|
return mask |
|
|
|
|
|
|
|
|
@torch.jit.script |
|
|
def fused_add_tanh_sigmoid_multiply(input_a, input_b, n_channels): |
|
|
n_channels_int = n_channels[0] |
|
|
in_act = input_a + input_b |
|
|
t_act = torch.tanh(in_act[:, :n_channels_int, :]) |
|
|
s_act = torch.sigmoid(in_act[:, n_channels_int:, :]) |
|
|
acts = t_act * s_act |
|
|
return acts |
|
|
|
|
|
|
|
|
def sequence_mask(length: torch.Tensor, max_length: Optional[int] = None): |
|
|
if max_length is None: |
|
|
max_length = length.max() |
|
|
x = torch.arange(max_length, dtype=length.dtype, device=length.device) |
|
|
return x.unsqueeze(0) < length.unsqueeze(1) |
|
|
|
|
|
|
|
|
def clip_grad_value(parameters, clip_value, norm_type=2): |
|
|
if isinstance(parameters, torch.Tensor): |
|
|
parameters = [parameters] |
|
|
parameters = List(filter(lambda p: p.grad is not None, parameters)) |
|
|
norm_type = float(norm_type) |
|
|
if clip_value is not None: |
|
|
clip_value = float(clip_value) |
|
|
|
|
|
total_norm = 0 |
|
|
for p in parameters: |
|
|
param_norm = p.grad.data.norm(norm_type) |
|
|
total_norm += param_norm.item() ** norm_type |
|
|
if clip_value is not None: |
|
|
p.grad.data.clamp_(min=-clip_value, max=clip_value) |
|
|
total_norm = total_norm ** (1.0 / norm_type) |
|
|
return total_norm |
|
|
|
|
|
'''], |
|
|
"attentions" : ["attentions.py", ''' |
|
|
import math |
|
|
import torch |
|
|
from torch import nn |
|
|
from torch.nn import functional as F |
|
|
|
|
|
from .commons import convert_pad_shape |
|
|
|
|
|
|
|
|
class MultiHeadAttention(nn.Module): |
|
|
def __init__( |
|
|
self, |
|
|
channels, |
|
|
out_channels, |
|
|
n_heads, |
|
|
p_dropout=0.0, |
|
|
window_size=None, |
|
|
heads_share=True, |
|
|
block_length=None, |
|
|
proximal_bias=False, |
|
|
proximal_init=False, |
|
|
): |
|
|
super().__init__() |
|
|
assert channels % n_heads == 0 |
|
|
|
|
|
self.channels = channels |
|
|
self.out_channels = out_channels |
|
|
self.n_heads = n_heads |
|
|
self.p_dropout = p_dropout |
|
|
self.window_size = window_size |
|
|
self.heads_share = heads_share |
|
|
self.block_length = block_length |
|
|
self.proximal_bias = proximal_bias |
|
|
self.proximal_init = proximal_init |
|
|
self.attn = None |
|
|
|
|
|
self.k_channels = channels // n_heads |
|
|
self.conv_q = nn.Conv1d(channels, channels, 1) |
|
|
self.conv_k = nn.Conv1d(channels, channels, 1) |
|
|
self.conv_v = nn.Conv1d(channels, channels, 1) |
|
|
self.conv_o = nn.Conv1d(channels, out_channels, 1) |
|
|
self.drop = nn.Dropout(p_dropout) |
|
|
|
|
|
if window_size is not None: |
|
|
n_heads_rel = 1 if heads_share else n_heads |
|
|
rel_stddev = self.k_channels**-0.5 |
|
|
self.emb_rel_k = nn.Parameter( |
|
|
torch.randn(n_heads_rel, window_size * 2 + 1, self.k_channels) |
|
|
* rel_stddev |
|
|
) |
|
|
self.emb_rel_v = nn.Parameter( |
|
|
torch.randn(n_heads_rel, window_size * 2 + 1, self.k_channels) |
|
|
* rel_stddev |
|
|
) |
|
|
|
|
|
nn.init.xavier_uniform_(self.conv_q.weight) |
|
|
nn.init.xavier_uniform_(self.conv_k.weight) |
|
|
nn.init.xavier_uniform_(self.conv_v.weight) |
|
|
if proximal_init: |
|
|
with torch.no_grad(): |
|
|
self.conv_k.weight.copy_(self.conv_q.weight) |
|
|
self.conv_k.bias.copy_(self.conv_q.bias) |
|
|
|
|
|
def forward(self, x, c, attn_mask=None): |
|
|
q = self.conv_q(x) |
|
|
k = self.conv_k(c) |
|
|
v = self.conv_v(c) |
|
|
|
|
|
x, self.attn = self.attention(q, k, v, mask=attn_mask) |
|
|
|
|
|
x = self.conv_o(x) |
|
|
return x |
|
|
|
|
|
def attention(self, query, key, value, mask=None): |
|
|
b, d, t_s, t_t = (*key.size(), query.size(2)) |
|
|
query = query.view(b, self.n_heads, self.k_channels, t_t).transpose(2, 3) |
|
|
key = key.view(b, self.n_heads, self.k_channels, t_s).transpose(2, 3) |
|
|
value = value.view(b, self.n_heads, self.k_channels, t_s).transpose(2, 3) |
|
|
|
|
|
scores = torch.matmul(query / math.sqrt(self.k_channels), key.transpose(-2, -1)) |
|
|
if self.window_size is not None: |
|
|
assert t_s == t_t, "Relative attention is only available for self-attention." |
|
|
key_relative_embeddings = self._get_relative_embeddings(self.emb_rel_k, t_s) |
|
|
rel_logits = self._matmul_with_relative_keys( |
|
|
query / math.sqrt(self.k_channels), key_relative_embeddings |
|
|
) |
|
|
scores_local = self._relative_position_to_absolute_position(rel_logits) |
|
|
scores = scores + scores_local |
|
|
if self.proximal_bias: |
|
|
assert t_s == t_t, "Proximal bias is only available for self-attention." |
|
|
scores = scores + self._attention_bias_proximal(t_s).to( |
|
|
device=scores.device, dtype=scores.dtype |
|
|
) |
|
|
if mask is not None: |
|
|
scores = scores.masked_fill(mask == 0, -1e4) |
|
|
if self.block_length is not None: |
|
|
assert t_s == t_t, "Local attention is only available for self-attention." |
|
|
block_mask = ( |
|
|
torch.ones_like(scores) |
|
|
.triu(-self.block_length) |
|
|
.tril(self.block_length) |
|
|
) |
|
|
scores = scores.masked_fill(block_mask == 0, -1e4) |
|
|
p_attn = F.softmax(scores, dim=-1) |
|
|
p_attn = self.drop(p_attn) |
|
|
output = torch.matmul(p_attn, value) |
|
|
if self.window_size is not None: |
|
|
relative_weights = self._absolute_position_to_relative_position(p_attn) |
|
|
value_relative_embeddings = self._get_relative_embeddings(self.emb_rel_v, t_s) |
|
|
output = output + self._matmul_with_relative_values( |
|
|
relative_weights, value_relative_embeddings |
|
|
) |
|
|
output = output.transpose(2, 3).contiguous().view(b, d, t_t) |
|
|
return output, p_attn |
|
|
|
|
|
def _matmul_with_relative_values(self, x, y): |
|
|
ret = torch.matmul(x, y.unsqueeze(0)) |
|
|
return ret |
|
|
|
|
|
def _matmul_with_relative_keys(self, x, y): |
|
|
ret = torch.matmul(x, y.unsqueeze(0).transpose(-2, -1)) |
|
|
return ret |
|
|
|
|
|
def _get_relative_embeddings(self, relative_embeddings, length): |
|
|
pad_length = max(length - (self.window_size + 1), 0) |
|
|
slice_start_position = max((self.window_size + 1) - length, 0) |
|
|
slice_end_position = slice_start_position + 2 * length - 1 |
|
|
if pad_length > 0: |
|
|
padded_relative_embeddings = F.pad( |
|
|
relative_embeddings, |
|
|
convert_pad_shape([[0, 0], [pad_length, pad_length], [0, 0]]), |
|
|
) |
|
|
else: |
|
|
padded_relative_embeddings = relative_embeddings |
|
|
used_relative_embeddings = padded_relative_embeddings[ |
|
|
:, slice_start_position:slice_end_position |
|
|
] |
|
|
return used_relative_embeddings |
|
|
|
|
|
def _relative_position_to_absolute_position(self, x): |
|
|
batch, heads, length, _ = x.size() |
|
|
|
|
|
x = F.pad(x, convert_pad_shape([[0, 0], [0, 0], [0, 0], [0, 1]])) |
|
|
|
|
|
x_flat = x.view([batch, heads, length * 2 * length]) |
|
|
x_flat = F.pad(x_flat, convert_pad_shape([[0, 0], [0, 0], [0, length - 1]])) |
|
|
|
|
|
x_final = x_flat.view([batch, heads, length + 1, 2 * length - 1])[ |
|
|
:, :, :length, length - 1 : |
|
|
] |
|
|
return x_final |
|
|
|
|
|
def _absolute_position_to_relative_position(self, x): |
|
|
batch, heads, length, _ = x.size() |
|
|
x = F.pad(x, convert_pad_shape([[0, 0], [0, 0], [0, 0], [0, length - 1]])) |
|
|
x_flat = x.view([batch, heads, length**2 + length * (length - 1)]) |
|
|
x_flat = F.pad(x_flat, convert_pad_shape([[0, 0], [0, 0], [length, 0]])) |
|
|
x_final = x_flat.view([batch, heads, length, 2 * length])[:, :, :, 1:] |
|
|
return x_final |
|
|
|
|
|
def _attention_bias_proximal(self, length): |
|
|
r = torch.arange(length, dtype=torch.float32) |
|
|
diff = torch.unsqueeze(r, 0) - torch.unsqueeze(r, 1) |
|
|
return torch.unsqueeze(torch.unsqueeze(-torch.log1p(torch.abs(diff)), 0), 0) |
|
|
|
|
|
|
|
|
class FFN(nn.Module): |
|
|
def __init__( |
|
|
self, |
|
|
in_channels, |
|
|
out_channels, |
|
|
filter_channels, |
|
|
kernel_size, |
|
|
p_dropout=0.0, |
|
|
activation=None, |
|
|
causal=False, |
|
|
): |
|
|
super().__init__() |
|
|
self.in_channels = in_channels |
|
|
self.out_channels = out_channels |
|
|
self.filter_channels = filter_channels |
|
|
self.kernel_size = kernel_size |
|
|
self.p_dropout = p_dropout |
|
|
self.activation = activation |
|
|
self.causal = causal |
|
|
|
|
|
if causal: |
|
|
self.padding = self._causal_padding |
|
|
else: |
|
|
self.padding = self._same_padding |
|
|
|
|
|
self.conv_1 = nn.Conv1d(in_channels, filter_channels, kernel_size) |
|
|
self.conv_2 = nn.Conv1d(filter_channels, out_channels, kernel_size) |
|
|
self.drop = nn.Dropout(p_dropout) |
|
|
|
|
|
def forward(self, x, x_mask): |
|
|
x = self.conv_1(self.padding(x * x_mask)) |
|
|
if self.activation == "gelu": |
|
|
x = x * torch.sigmoid(1.702 * x) |
|
|
else: |
|
|
x = torch.relu(x) |
|
|
x = self.drop(x) |
|
|
x = self.conv_2(self.padding(x * x_mask)) |
|
|
return x * x_mask |
|
|
|
|
|
def _causal_padding(self, x): |
|
|
if self.kernel_size == 1: |
|
|
return x |
|
|
pad_l = self.kernel_size - 1 |
|
|
pad_r = 0 |
|
|
padding = [[0, 0], [0, 0], [pad_l, pad_r]] |
|
|
x = F.pad(x, convert_pad_shape(padding)) |
|
|
return x |
|
|
|
|
|
def _same_padding(self, x): |
|
|
if self.kernel_size == 1: |
|
|
return x |
|
|
pad_l = (self.kernel_size - 1) // 2 |
|
|
pad_r = self.kernel_size // 2 |
|
|
padding = [[0, 0], [0, 0], [pad_l, pad_r]] |
|
|
x = F.pad(x, convert_pad_shape(padding)) |
|
|
return x |
|
|
|
|
|
'''], |
|
|
"init" : ["__init__.py", ''' |
|
|
'''] |
|
|
} |
|
|
|
|
|
with open(os.sep.join([current_dir, dirs[5], lib_algorithm["synthesizers"][0]]), 'w') as f: |
|
|
f.write(lib_algorithm["synthesizers"][1]) |
|
|
with open(os.sep.join([current_dir, dirs[5], lib_algorithm["residuals"][0]]), 'w') as f: |
|
|
f.write(lib_algorithm["residuals"][1]) |
|
|
with open(os.sep.join([current_dir, dirs[5], lib_algorithm["nsf"][0]]), 'w') as f: |
|
|
f.write(lib_algorithm["nsf"][1]) |
|
|
with open(os.sep.join([current_dir, dirs[5], lib_algorithm["normalization"][0]]), 'w') as f: |
|
|
f.write(lib_algorithm["normalization"][1]) |
|
|
with open(os.sep.join([current_dir, dirs[5], lib_algorithm["modules"][0]]), 'w') as f: |
|
|
f.write(lib_algorithm["modules"][1]) |
|
|
with open(os.sep.join([current_dir, dirs[5], lib_algorithm["generators"][0]]), 'w') as f: |
|
|
f.write(lib_algorithm["generators"][1]) |
|
|
with open(os.sep.join([current_dir, dirs[5], lib_algorithm["encoders"][0]]), 'w') as f: |
|
|
f.write(lib_algorithm["encoders"][1]) |
|
|
with open(os.sep.join([current_dir, dirs[5], lib_algorithm["discriminators"][0]]), 'w') as f: |
|
|
f.write(lib_algorithm["discriminators"][1]) |
|
|
with open(os.sep.join([current_dir, dirs[5], lib_algorithm["commons"][0]]), 'w') as f: |
|
|
f.write(lib_algorithm["commons"][1]) |
|
|
with open(os.sep.join([current_dir, dirs[5], lib_algorithm["attentions"][0]]), 'w') as f: |
|
|
f.write(lib_algorithm["attentions"][1]) |
|
|
with open(os.sep.join([current_dir, dirs[5], lib_algorithm["init"][0]]), 'w') as f: |
|
|
f.write(lib_algorithm["init"][1]) |
|
|
|
|
|
RMVPE = ''' |
|
|
import torch |
|
|
import numpy as np |
|
|
import torch.nn as nn |
|
|
import torch.nn.functional as F |
|
|
from librosa.filters import mel |
|
|
from scipy.signal import get_window |
|
|
from librosa.util import pad_center, tiny, normalize |
|
|
|
|
|
|
|
|
def window_sumsquare( |
|
|
window, |
|
|
n_frames, |
|
|
hop_length=200, |
|
|
win_length=800, |
|
|
n_fft=800, |
|
|
dtype=np.float32, |
|
|
norm=None, |
|
|
): |
|
|
if win_length is None: |
|
|
win_length = n_fft |
|
|
|
|
|
n = n_fft + hop_length * (n_frames - 1) |
|
|
x = np.zeros(n, dtype=dtype) |
|
|
|
|
|
win_sq = get_window(window, win_length, fftbins=True) |
|
|
win_sq = normalize(win_sq, norm=norm) ** 2 |
|
|
win_sq = pad_center(win_sq, n_fft) |
|
|
|
|
|
for i in range(n_frames): |
|
|
sample = i * hop_length |
|
|
x[sample : min(n, sample + n_fft)] += win_sq[: max(0, min(n_fft, n - sample))] |
|
|
return x |
|
|
|
|
|
|
|
|
class STFT(nn.Module): |
|
|
def __init__( |
|
|
self, filter_length=1024, hop_length=512, win_length=None, window="hann" |
|
|
): |
|
|
super(STFT, self).__init__() |
|
|
self.filter_length = filter_length |
|
|
self.hop_length = hop_length |
|
|
self.win_length = win_length if win_length else filter_length |
|
|
self.window = window |
|
|
self.pad_amount = int(self.filter_length / 2) |
|
|
scale = self.filter_length / self.hop_length |
|
|
fourier_basis = np.fft.fft(np.eye(self.filter_length)) |
|
|
|
|
|
cutoff = int((self.filter_length / 2 + 1)) |
|
|
fourier_basis = np.vstack( |
|
|
[np.real(fourier_basis[:cutoff, :]), np.imag(fourier_basis[:cutoff, :])] |
|
|
) |
|
|
forward_basis = torch.FloatTensor(fourier_basis[:, None, :]) |
|
|
inverse_basis = torch.FloatTensor( |
|
|
np.linalg.pinv(scale * fourier_basis).T[:, None, :] |
|
|
) |
|
|
|
|
|
assert filter_length >= self.win_length |
|
|
fft_window = get_window(window, self.win_length, fftbins=True) |
|
|
fft_window = pad_center(fft_window, size=filter_length) |
|
|
fft_window = torch.from_numpy(fft_window).float() |
|
|
|
|
|
forward_basis *= fft_window |
|
|
inverse_basis *= fft_window |
|
|
|
|
|
self.register_buffer("forward_basis", forward_basis.float()) |
|
|
self.register_buffer("inverse_basis", inverse_basis.float()) |
|
|
|
|
|
def transform(self, input_data): |
|
|
num_batches = input_data.shape[0] |
|
|
num_samples = input_data.shape[-1] |
|
|
|
|
|
input_data = input_data.view(num_batches, 1, num_samples) |
|
|
input_data = F.pad( |
|
|
input_data.unsqueeze(1), |
|
|
(self.pad_amount, self.pad_amount, 0, 0, 0, 0), |
|
|
mode="reflect", |
|
|
).squeeze(1) |
|
|
forward_transform = F.conv1d( |
|
|
input_data, self.forward_basis, stride=self.hop_length, padding=0 |
|
|
) |
|
|
|
|
|
cutoff = int((self.filter_length / 2) + 1) |
|
|
real_part = forward_transform[:, :cutoff, :] |
|
|
imag_part = forward_transform[:, cutoff:, :] |
|
|
return torch.sqrt(real_part**2 + imag_part**2) |
|
|
|
|
|
def inverse(self, magnitude, phase): |
|
|
recombine_magnitude_phase = torch.cat( |
|
|
[magnitude * torch.cos(phase), magnitude * torch.sin(phase)], dim=1 |
|
|
) |
|
|
inverse_transform = F.conv_transpose1d( |
|
|
recombine_magnitude_phase, |
|
|
self.inverse_basis, |
|
|
stride=self.hop_length, |
|
|
padding=0, |
|
|
) |
|
|
|
|
|
if self.window is not None: |
|
|
window_sum = window_sumsquare( |
|
|
self.window, |
|
|
magnitude.size(-1), |
|
|
hop_length=self.hop_length, |
|
|
win_length=self.win_length, |
|
|
n_fft=self.filter_length, |
|
|
dtype=np.float32, |
|
|
) |
|
|
approx_nonzero_indices = torch.from_numpy( |
|
|
np.where(window_sum > tiny(window_sum))[0] |
|
|
) |
|
|
window_sum = torch.from_numpy(window_sum).to(inverse_transform.device) |
|
|
inverse_transform[:, :, approx_nonzero_indices] /= window_sum[ |
|
|
approx_nonzero_indices |
|
|
] |
|
|
inverse_transform *= float(self.filter_length) / self.hop_length |
|
|
|
|
|
inverse_transform = inverse_transform[..., self.pad_amount :] |
|
|
inverse_transform = inverse_transform[..., : self.num_samples] |
|
|
return inverse_transform.squeeze(1) |
|
|
|
|
|
def forward(self, input_data): |
|
|
self.magnitude, self.phase = self.transform(input_data) |
|
|
return self.inverse(self.magnitude, self.phase) |
|
|
|
|
|
|
|
|
class BiGRU(nn.Module): |
|
|
def __init__(self, input_features, hidden_features, num_layers): |
|
|
super(BiGRU, self).__init__() |
|
|
self.gru = nn.GRU( |
|
|
input_features, |
|
|
hidden_features, |
|
|
num_layers=num_layers, |
|
|
batch_first=True, |
|
|
bidirectional=True, |
|
|
) |
|
|
|
|
|
def forward(self, x): |
|
|
return self.gru(x)[0] |
|
|
|
|
|
|
|
|
class ConvBlockRes(nn.Module): |
|
|
def __init__(self, in_channels, out_channels, momentum=0.01): |
|
|
super(ConvBlockRes, self).__init__() |
|
|
self.conv = nn.Sequential( |
|
|
nn.Conv2d( |
|
|
in_channels=in_channels, |
|
|
out_channels=out_channels, |
|
|
kernel_size=(3, 3), |
|
|
stride=(1, 1), |
|
|
padding=(1, 1), |
|
|
bias=False, |
|
|
), |
|
|
nn.BatchNorm2d(out_channels, momentum=momentum), |
|
|
nn.ReLU(), |
|
|
nn.Conv2d( |
|
|
in_channels=out_channels, |
|
|
out_channels=out_channels, |
|
|
kernel_size=(3, 3), |
|
|
stride=(1, 1), |
|
|
padding=(1, 1), |
|
|
bias=False, |
|
|
), |
|
|
nn.BatchNorm2d(out_channels, momentum=momentum), |
|
|
nn.ReLU(), |
|
|
) |
|
|
self.shortcut = ( |
|
|
nn.Conv2d(in_channels, out_channels, (1, 1)) |
|
|
if in_channels != out_channels |
|
|
else None |
|
|
) |
|
|
|
|
|
def forward(self, x): |
|
|
out = self.conv(x) |
|
|
if self.shortcut is not None: |
|
|
x = self.shortcut(x) |
|
|
return out + x |
|
|
|
|
|
|
|
|
class ResEncoderBlock(nn.Module): |
|
|
def __init__(self, in_channels, out_channels, kernel_size, n_blocks=1, momentum=0.01): |
|
|
super(ResEncoderBlock, self).__init__() |
|
|
self.conv = nn.ModuleList( |
|
|
[ |
|
|
ConvBlockRes( |
|
|
in_channels if i == 0 else out_channels, out_channels, momentum |
|
|
) |
|
|
for i in range(n_blocks) |
|
|
] |
|
|
) |
|
|
self.pool = ( |
|
|
nn.AvgPool2d(kernel_size=kernel_size) if kernel_size is not None else None |
|
|
) |
|
|
|
|
|
def forward(self, x): |
|
|
for conv in self.conv: |
|
|
x = conv(x) |
|
|
pooled = self.pool(x) if self.pool is not None else x |
|
|
return pooled, x |
|
|
|
|
|
|
|
|
class Encoder(nn.Module): |
|
|
def __init__( |
|
|
self, |
|
|
in_channels, |
|
|
in_size, |
|
|
n_encoders, |
|
|
kernel_size, |
|
|
n_blocks, |
|
|
out_channels=16, |
|
|
momentum=0.01, |
|
|
): |
|
|
super(Encoder, self).__init__() |
|
|
self.bn = nn.BatchNorm2d(in_channels, momentum=momentum) |
|
|
self.layers = nn.ModuleList() |
|
|
self.latent_channels = [] |
|
|
for _ in range(n_encoders): |
|
|
self.layers.append( |
|
|
ResEncoderBlock( |
|
|
in_channels, out_channels, kernel_size, n_blocks, momentum=momentum |
|
|
) |
|
|
) |
|
|
self.latent_channels.append([out_channels, in_size]) |
|
|
in_channels = out_channels |
|
|
out_channels *= 2 |
|
|
in_size //= 2 |
|
|
self.out_size = in_size |
|
|
self.out_channel = out_channels |
|
|
|
|
|
def forward(self, x): |
|
|
concat_tensors = [] |
|
|
x = self.bn(x) |
|
|
for layer in self.layers: |
|
|
x, pooled = layer(x) |
|
|
concat_tensors.append(pooled) |
|
|
return x, concat_tensors |
|
|
|
|
|
|
|
|
class Intermediate(nn.Module): |
|
|
def __init__(self, in_channels, out_channels, n_inters, n_blocks, momentum=0.01): |
|
|
super(Intermediate, self).__init__() |
|
|
self.layers = nn.ModuleList( |
|
|
[ |
|
|
ResEncoderBlock( |
|
|
in_channels if i == 0 else out_channels, |
|
|
out_channels, |
|
|
None, |
|
|
n_blocks, |
|
|
momentum, |
|
|
) |
|
|
for i in range(n_inters) |
|
|
] |
|
|
) |
|
|
|
|
|
def forward(self, x): |
|
|
for layer in self.layers: |
|
|
_, x = layer(x) |
|
|
return x |
|
|
|
|
|
|
|
|
class ResDecoderBlock(nn.Module): |
|
|
def __init__(self, in_channels, out_channels, stride, n_blocks=1, momentum=0.01): |
|
|
super(ResDecoderBlock, self).__init__() |
|
|
out_padding = (0, 1) if stride == (1, 2) else (1, 1) |
|
|
self.conv1 = nn.Sequential( |
|
|
nn.ConvTranspose2d( |
|
|
in_channels=in_channels, |
|
|
out_channels=out_channels, |
|
|
kernel_size=(3, 3), |
|
|
stride=stride, |
|
|
padding=(1, 1), |
|
|
output_padding=out_padding, |
|
|
bias=False, |
|
|
), |
|
|
nn.BatchNorm2d(out_channels, momentum=momentum), |
|
|
nn.ReLU(), |
|
|
) |
|
|
self.conv2 = nn.ModuleList( |
|
|
[ |
|
|
ConvBlockRes( |
|
|
out_channels * 2 if i == 0 else out_channels, out_channels, momentum |
|
|
) |
|
|
for i in range(n_blocks) |
|
|
] |
|
|
) |
|
|
|
|
|
def forward(self, x, concat_tensor): |
|
|
x = self.conv1(x) |
|
|
x = torch.cat((x, concat_tensor), dim=1) |
|
|
for conv in self.conv2: |
|
|
x = conv(x) |
|
|
return x |
|
|
|
|
|
|
|
|
class Decoder(nn.Module): |
|
|
def __init__(self, in_channels, n_decoders, stride, n_blocks, momentum=0.01): |
|
|
super(Decoder, self).__init__() |
|
|
self.layers = nn.ModuleList() |
|
|
for _ in range(n_decoders): |
|
|
out_channels = in_channels // 2 |
|
|
self.layers.append( |
|
|
ResDecoderBlock(in_channels, out_channels, stride, n_blocks, momentum) |
|
|
) |
|
|
in_channels = out_channels |
|
|
|
|
|
def forward(self, x, concat_tensors): |
|
|
for layer, concat_tensor in zip(self.layers, reversed(concat_tensors)): |
|
|
x = layer(x, concat_tensor) |
|
|
return x |
|
|
|
|
|
|
|
|
class DeepUnet(nn.Module): |
|
|
def __init__( |
|
|
self, |
|
|
kernel_size, |
|
|
n_blocks, |
|
|
en_de_layers=5, |
|
|
inter_layers=4, |
|
|
in_channels=1, |
|
|
en_out_channels=16, |
|
|
): |
|
|
super(DeepUnet, self).__init__() |
|
|
self.encoder = Encoder( |
|
|
in_channels, 128, en_de_layers, kernel_size, n_blocks, en_out_channels |
|
|
) |
|
|
self.intermediate = Intermediate( |
|
|
self.encoder.out_channel // 2, |
|
|
self.encoder.out_channel, |
|
|
inter_layers, |
|
|
n_blocks, |
|
|
) |
|
|
self.decoder = Decoder( |
|
|
self.encoder.out_channel, en_de_layers, kernel_size, n_blocks |
|
|
) |
|
|
|
|
|
def forward(self, x): |
|
|
x, concat_tensors = self.encoder(x) |
|
|
x = self.intermediate(x) |
|
|
return self.decoder(x, concat_tensors) |
|
|
|
|
|
|
|
|
class E2E(nn.Module): |
|
|
def __init__( |
|
|
self, |
|
|
n_blocks, |
|
|
n_gru, |
|
|
kernel_size, |
|
|
en_de_layers=5, |
|
|
inter_layers=4, |
|
|
in_channels=1, |
|
|
en_out_channels=16, |
|
|
): |
|
|
super(E2E, self).__init__() |
|
|
self.unet = DeepUnet( |
|
|
kernel_size, |
|
|
n_blocks, |
|
|
en_de_layers, |
|
|
inter_layers, |
|
|
in_channels, |
|
|
en_out_channels, |
|
|
) |
|
|
self.cnn = nn.Conv2d(en_out_channels, 3, (3, 3), padding=(1, 1)) |
|
|
if n_gru: |
|
|
self.fc = nn.Sequential( |
|
|
BiGRU(3 * 128, 256, n_gru), |
|
|
nn.Linear(512, 360), |
|
|
nn.Dropout(0.25), |
|
|
nn.Sigmoid(), |
|
|
) |
|
|
else: |
|
|
self.fc = nn.Sequential( |
|
|
nn.Linear(3 * nn.N_MELS, nn.N_CLASS), nn.Dropout(0.25), nn.Sigmoid() |
|
|
) |
|
|
|
|
|
def forward(self, mel): |
|
|
mel = mel.transpose(-1, -2).unsqueeze(1) |
|
|
x = self.cnn(self.unet(mel)).transpose(1, 2).flatten(-2) |
|
|
return self.fc(x) |
|
|
|
|
|
|
|
|
class MelSpectrogram(nn.Module): |
|
|
def __init__( |
|
|
self, |
|
|
is_half, |
|
|
n_mel_channels, |
|
|
sample_rate, |
|
|
win_length, |
|
|
hop_length, |
|
|
n_fft=None, |
|
|
mel_fmin=0, |
|
|
mel_fmax=None, |
|
|
clamp=1e-5, |
|
|
): |
|
|
super(MelSpectrogram, self).__init__() |
|
|
n_fft = win_length if n_fft is None else n_fft |
|
|
self.hann_window = {} |
|
|
mel_basis = mel( |
|
|
sr=sample_rate, |
|
|
n_fft=n_fft, |
|
|
n_mels=n_mel_channels, |
|
|
fmin=mel_fmin, |
|
|
fmax=mel_fmax, |
|
|
htk=True, |
|
|
) |
|
|
self.register_buffer("mel_basis", torch.from_numpy(mel_basis).float()) |
|
|
self.n_fft = n_fft |
|
|
self.hop_length = hop_length |
|
|
self.win_length = win_length |
|
|
self.sample_rate = sample_rate |
|
|
self.n_mel_channels = n_mel_channels |
|
|
self.clamp = clamp |
|
|
self.is_half = is_half |
|
|
|
|
|
def forward(self, audio, keyshift=0, speed=1, center=True): |
|
|
factor = 2 ** (keyshift / 12) |
|
|
n_fft_new = int(np.round(self.n_fft * factor)) |
|
|
win_length_new = int(np.round(self.win_length * factor)) |
|
|
hop_length_new = int(np.round(self.hop_length * speed)) |
|
|
keyshift_key = f"{keyshift}_{audio.device}" |
|
|
if keyshift_key not in self.hann_window: |
|
|
self.hann_window[keyshift_key] = torch.hann_window(win_length_new).to( |
|
|
audio.device |
|
|
) |
|
|
if not hasattr(self, "stft"): |
|
|
self.stft = STFT( |
|
|
filter_length=n_fft_new, |
|
|
hop_length=hop_length_new, |
|
|
win_length=win_length_new, |
|
|
window="hann", |
|
|
).to(audio.device) |
|
|
magnitude = self.stft.transform(audio) |
|
|
if keyshift != 0: |
|
|
size = self.n_fft // 2 + 1 |
|
|
resize = magnitude.size(1) |
|
|
if resize < size: |
|
|
magnitude = F.pad(magnitude, (0, 0, 0, size - resize)) |
|
|
magnitude = magnitude[:, :size, :] * self.win_length / win_length_new |
|
|
mel_output = torch.matmul(self.mel_basis, magnitude) |
|
|
if self.is_half: |
|
|
mel_output = mel_output.half() |
|
|
return torch.log(torch.clamp(mel_output, min=self.clamp)) |
|
|
|
|
|
|
|
|
class RMVPE0Predictor: |
|
|
def __init__(self, model_path, is_half, device=None): |
|
|
self.resample_kernel = {} |
|
|
self.is_half = is_half |
|
|
if device is None: |
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
self.device = device |
|
|
self.mel_extractor = MelSpectrogram( |
|
|
is_half, 128, 16000, 1024, 160, None, 30, 8000 |
|
|
).to(device) |
|
|
model = E2E(4, 1, (2, 2)) |
|
|
ckpt = torch.load(model_path, map_location="cpu", weights_only=True) |
|
|
model.load_state_dict(ckpt) |
|
|
model.eval() |
|
|
if is_half: |
|
|
model = model.half() |
|
|
self.model = model.to(device) |
|
|
self.cents_mapping = np.pad(20 * np.arange(360) + 1997.3794084376191, (4, 4)) |
|
|
|
|
|
def mel2hidden(self, mel): |
|
|
with torch.no_grad(): |
|
|
n_frames = mel.shape[-1] |
|
|
mel = mel.float() |
|
|
padding = min(32 * ((n_frames - 1) // 32 + 1) - n_frames, n_frames) |
|
|
mel = F.pad(mel, (0, padding), mode="reflect") |
|
|
if self.is_half: |
|
|
mel = mel.half() |
|
|
hidden = self.model(mel) |
|
|
return hidden[:, :n_frames] |
|
|
|
|
|
def decode(self, hidden, thred=0.03): |
|
|
cents_pred = self.to_local_average_cents(hidden, thred=thred) |
|
|
f0 = 10 * (2 ** (cents_pred / 1200)) |
|
|
f0[f0 == 10] = 0 |
|
|
return f0 |
|
|
|
|
|
def infer_from_audio(self, audio, thred=0.03): |
|
|
audio = torch.from_numpy(audio).float().to(self.device).unsqueeze(0) |
|
|
mel = self.mel_extractor(audio, center=True) |
|
|
hidden = self.mel2hidden(mel) |
|
|
hidden = hidden.squeeze(0).cpu().numpy() |
|
|
if self.is_half: |
|
|
hidden = hidden.astype("float32") |
|
|
return self.decode(hidden, thred=thred) |
|
|
|
|
|
def infer_from_audio_with_pitch(self, audio, thred=0.03, f0_min=50, f0_max=1100): |
|
|
audio = torch.from_numpy(audio).float().to(self.device).unsqueeze(0) |
|
|
mel = self.mel_extractor(audio, center=True) |
|
|
hidden = self.mel2hidden(mel) |
|
|
hidden = hidden.squeeze(0).cpu().numpy() |
|
|
if self.is_half: |
|
|
hidden = hidden.astype("float32") |
|
|
f0 = self.decode(hidden, thred=thred) |
|
|
f0[(f0 < f0_min) | (f0 > f0_max)] = 0 |
|
|
return f0 |
|
|
|
|
|
def to_local_average_cents(self, salience, thred=0.05): |
|
|
center = np.argmax(salience, axis=1) |
|
|
salience = np.pad(salience, ((0, 0), (4, 4))) |
|
|
center += 4 |
|
|
todo_salience = [] |
|
|
todo_cents_mapping = [] |
|
|
starts = center - 4 |
|
|
ends = center + 5 |
|
|
for idx in range(salience.shape[0]): |
|
|
todo_salience.append(salience[:, starts[idx] : ends[idx]][idx]) |
|
|
todo_cents_mapping.append(self.cents_mapping[starts[idx] : ends[idx]]) |
|
|
todo_salience = np.array(todo_salience) |
|
|
todo_cents_mapping = np.array(todo_cents_mapping) |
|
|
product_sum = np.sum(todo_salience * todo_cents_mapping, 1) |
|
|
weight_sum = np.sum(todo_salience, 1) |
|
|
divided = product_sum / weight_sum |
|
|
maxx = np.max(salience, axis=1) |
|
|
divided[maxx <= thred] = 0 |
|
|
return divided |
|
|
|
|
|
''' |
|
|
with open(os.sep.join([current_dir, dirs[6], "RMVPE.py"]), 'w') as f: |
|
|
f.write(RMVPE) |
|
|
|
|
|
FCPE = ''' |
|
|
from typing import Union |
|
|
|
|
|
import torch.nn.functional as F |
|
|
import numpy as np |
|
|
import torch |
|
|
import torch.nn as nn |
|
|
from torch.nn.utils.parametrizations import weight_norm |
|
|
from torchaudio.transforms import Resample |
|
|
import os |
|
|
import librosa |
|
|
import soundfile as sf |
|
|
import torch.utils.data |
|
|
from librosa.filters import mel as librosa_mel_fn |
|
|
import math |
|
|
from functools import partial |
|
|
|
|
|
from einops import rearrange, repeat |
|
|
from local_attention import LocalAttention |
|
|
|
|
|
os.environ["LRU_CACHE_CAPACITY"] = "3" |
|
|
|
|
|
|
|
|
def load_wav_to_torch(full_path, target_sr=None, return_empty_on_exception=False): |
|
|
try: |
|
|
data, sample_rate = sf.read(full_path, always_2d=True) |
|
|
except Exception as error: |
|
|
print(f"An error occurred loading {full_path}: {error}") |
|
|
if return_empty_on_exception: |
|
|
return [], sample_rate or target_sr or 48000 |
|
|
else: |
|
|
raise |
|
|
|
|
|
data = data[:, 0] if len(data.shape) > 1 else data |
|
|
assert len(data) > 2 |
|
|
|
|
|
max_mag = ( |
|
|
-np.iinfo(data.dtype).min |
|
|
if np.issubdtype(data.dtype, np.integer) |
|
|
else max(np.amax(data), -np.amin(data)) |
|
|
) |
|
|
max_mag = ( |
|
|
(2**31) + 1 if max_mag > (2**15) else ((2**15) + 1 if max_mag > 1.01 else 1.0) |
|
|
) |
|
|
data = torch.FloatTensor(data.astype(np.float32)) / max_mag |
|
|
|
|
|
if (torch.isinf(data) | torch.isnan(data)).any() and return_empty_on_exception: |
|
|
return [], sample_rate or target_sr or 48000 |
|
|
if target_sr is not None and sample_rate != target_sr: |
|
|
data = torch.from_numpy( |
|
|
librosa.core.resample(data.numpy(), orig_sr=sample_rate, target_sr=target_sr) |
|
|
) |
|
|
sample_rate = target_sr |
|
|
|
|
|
return data, sample_rate |
|
|
|
|
|
|
|
|
def dynamic_range_compression(x, C=1, clip_val=1e-5): |
|
|
return np.log(np.clip(x, a_min=clip_val, a_max=None) * C) |
|
|
|
|
|
|
|
|
def dynamic_range_decompression(x, C=1): |
|
|
return np.exp(x) / C |
|
|
|
|
|
|
|
|
def dynamic_range_compression_torch(x, C=1, clip_val=1e-5): |
|
|
return torch.log(torch.clamp(x, min=clip_val) * C) |
|
|
|
|
|
|
|
|
def dynamic_range_decompression_torch(x, C=1): |
|
|
return torch.exp(x) / C |
|
|
|
|
|
|
|
|
class STFT: |
|
|
def __init__( |
|
|
self, |
|
|
sr=22050, |
|
|
n_mels=80, |
|
|
n_fft=1024, |
|
|
win_size=1024, |
|
|
hop_length=256, |
|
|
fmin=20, |
|
|
fmax=11025, |
|
|
clip_val=1e-5, |
|
|
): |
|
|
self.target_sr = sr |
|
|
self.n_mels = n_mels |
|
|
self.n_fft = n_fft |
|
|
self.win_size = win_size |
|
|
self.hop_length = hop_length |
|
|
self.fmin = fmin |
|
|
self.fmax = fmax |
|
|
self.clip_val = clip_val |
|
|
self.mel_basis = {} |
|
|
self.hann_window = {} |
|
|
|
|
|
def get_mel(self, y, keyshift=0, speed=1, center=False, train=False): |
|
|
sample_rate = self.target_sr |
|
|
n_mels = self.n_mels |
|
|
n_fft = self.n_fft |
|
|
win_size = self.win_size |
|
|
hop_length = self.hop_length |
|
|
fmin = self.fmin |
|
|
fmax = self.fmax |
|
|
clip_val = self.clip_val |
|
|
|
|
|
factor = 2 ** (keyshift / 12) |
|
|
n_fft_new = int(np.round(n_fft * factor)) |
|
|
win_size_new = int(np.round(win_size * factor)) |
|
|
hop_length_new = int(np.round(hop_length * speed)) |
|
|
|
|
|
mel_basis = self.mel_basis if not train else {} |
|
|
hann_window = self.hann_window if not train else {} |
|
|
|
|
|
mel_basis_key = str(fmax) + "_" + str(y.device) |
|
|
if mel_basis_key not in mel_basis: |
|
|
mel = librosa_mel_fn( |
|
|
sr=sample_rate, n_fft=n_fft, n_mels=n_mels, fmin=fmin, fmax=fmax |
|
|
) |
|
|
mel_basis[mel_basis_key] = torch.from_numpy(mel).float().to(y.device) |
|
|
|
|
|
keyshift_key = str(keyshift) + "_" + str(y.device) |
|
|
if keyshift_key not in hann_window: |
|
|
hann_window[keyshift_key] = torch.hann_window(win_size_new).to(y.device) |
|
|
|
|
|
pad_left = (win_size_new - hop_length_new) // 2 |
|
|
pad_right = max( |
|
|
(win_size_new - hop_length_new + 1) // 2, |
|
|
win_size_new - y.size(-1) - pad_left, |
|
|
) |
|
|
mode = "reflect" if pad_right < y.size(-1) else "constant" |
|
|
y = torch.nn.functional.pad(y.unsqueeze(1), (pad_left, pad_right), mode=mode) |
|
|
y = y.squeeze(1) |
|
|
|
|
|
spec = torch.stft( |
|
|
y, |
|
|
n_fft_new, |
|
|
hop_length=hop_length_new, |
|
|
win_length=win_size_new, |
|
|
window=hann_window[keyshift_key], |
|
|
center=center, |
|
|
pad_mode="reflect", |
|
|
normalized=False, |
|
|
onesided=True, |
|
|
return_complex=True, |
|
|
) |
|
|
spec = torch.sqrt(spec.real.pow(2) + spec.imag.pow(2) + (1e-9)) |
|
|
|
|
|
if keyshift != 0: |
|
|
size = n_fft // 2 + 1 |
|
|
resize = spec.size(1) |
|
|
spec = ( |
|
|
F.pad(spec, (0, 0, 0, size - resize)) |
|
|
if resize < size |
|
|
else spec[:, :size, :] |
|
|
) |
|
|
spec = spec * win_size / win_size_new |
|
|
spec = torch.matmul(mel_basis[mel_basis_key], spec) |
|
|
spec = dynamic_range_compression_torch(spec, clip_val=clip_val) |
|
|
return spec |
|
|
|
|
|
def __call__(self, audiopath): |
|
|
audio, sr = load_wav_to_torch(audiopath, target_sr=self.target_sr) |
|
|
spect = self.get_mel(audio.unsqueeze(0)).squeeze(0) |
|
|
return spect |
|
|
|
|
|
|
|
|
stft = STFT() |
|
|
|
|
|
|
|
|
def softmax_kernel( |
|
|
data, *, projection_matrix, is_query, normalize_data=True, eps=1e-4, device=None |
|
|
): |
|
|
b, h, *_ = data.shape |
|
|
|
|
|
data_normalizer = (data.shape[-1] ** -0.25) if normalize_data else 1.0 |
|
|
|
|
|
ratio = projection_matrix.shape[0] ** -0.5 |
|
|
projection = repeat(projection_matrix, "j d -> b h j d", b=b, h=h) |
|
|
projection = projection.type_as(data) |
|
|
data_dash = torch.einsum("...id,...jd->...ij", (data_normalizer * data), projection) |
|
|
|
|
|
diag_data = data**2 |
|
|
diag_data = torch.sum(diag_data, dim=-1) |
|
|
diag_data = (diag_data / 2.0) * (data_normalizer**2) |
|
|
diag_data = diag_data.unsqueeze(dim=-1) |
|
|
|
|
|
if is_query: |
|
|
data_dash = ratio * ( |
|
|
torch.exp( |
|
|
data_dash - diag_data - torch.max(data_dash, dim=-1, keepdim=True).values |
|
|
) |
|
|
+ eps |
|
|
) |
|
|
else: |
|
|
data_dash = ratio * (torch.exp(data_dash - diag_data + eps)) |
|
|
|
|
|
return data_dash.type_as(data) |
|
|
|
|
|
|
|
|
def orthogonal_matrix_chunk(cols, qr_uniform_q=False, device=None): |
|
|
unstructured_block = torch.randn((cols, cols), device=device) |
|
|
q, r = torch.linalg.qr(unstructured_block.cpu(), mode="reduced") |
|
|
q, r = map(lambda t: t.to(device), (q, r)) |
|
|
|
|
|
if qr_uniform_q: |
|
|
d = torch.diag(r, 0) |
|
|
q *= d.sign() |
|
|
return q.t() |
|
|
|
|
|
|
|
|
def exists(val): |
|
|
return val is not None |
|
|
|
|
|
|
|
|
def empty(tensor): |
|
|
return tensor.numel() == 0 |
|
|
|
|
|
|
|
|
def default(val, d): |
|
|
return val if exists(val) else d |
|
|
|
|
|
|
|
|
def cast_tuple(val): |
|
|
return (val,) if not isinstance(val, tuple) else val |
|
|
|
|
|
|
|
|
class PCmer(nn.Module): |
|
|
def __init__( |
|
|
self, |
|
|
num_layers, |
|
|
num_heads, |
|
|
dim_model, |
|
|
dim_keys, |
|
|
dim_values, |
|
|
residual_dropout, |
|
|
attention_dropout, |
|
|
): |
|
|
super().__init__() |
|
|
self.num_layers = num_layers |
|
|
self.num_heads = num_heads |
|
|
self.dim_model = dim_model |
|
|
self.dim_values = dim_values |
|
|
self.dim_keys = dim_keys |
|
|
self.residual_dropout = residual_dropout |
|
|
self.attention_dropout = attention_dropout |
|
|
|
|
|
self._layers = nn.ModuleList([_EncoderLayer(self) for _ in range(num_layers)]) |
|
|
|
|
|
def forward(self, phone, mask=None): |
|
|
for layer in self._layers: |
|
|
phone = layer(phone, mask) |
|
|
return phone |
|
|
|
|
|
|
|
|
class _EncoderLayer(nn.Module): |
|
|
def __init__(self, parent: PCmer): |
|
|
super().__init__() |
|
|
self.conformer = ConformerConvModule(parent.dim_model) |
|
|
self.norm = nn.LayerNorm(parent.dim_model) |
|
|
self.dropout = nn.Dropout(parent.residual_dropout) |
|
|
self.attn = SelfAttention( |
|
|
dim=parent.dim_model, heads=parent.num_heads, causal=False |
|
|
) |
|
|
|
|
|
def forward(self, phone, mask=None): |
|
|
phone = phone + (self.attn(self.norm(phone), mask=mask)) |
|
|
phone = phone + (self.conformer(phone)) |
|
|
return phone |
|
|
|
|
|
|
|
|
def calc_same_padding(kernel_size): |
|
|
pad = kernel_size // 2 |
|
|
return (pad, pad - (kernel_size + 1) % 2) |
|
|
|
|
|
|
|
|
class Swish(nn.Module): |
|
|
def forward(self, x): |
|
|
return x * x.sigmoid() |
|
|
|
|
|
|
|
|
class Transpose(nn.Module): |
|
|
def __init__(self, dims): |
|
|
super().__init__() |
|
|
assert len(dims) == 2, "dims must be a tuple of two dimensions" |
|
|
self.dims = dims |
|
|
|
|
|
def forward(self, x): |
|
|
return x.transpose(*self.dims) |
|
|
|
|
|
|
|
|
class GLU(nn.Module): |
|
|
def __init__(self, dim): |
|
|
super().__init__() |
|
|
self.dim = dim |
|
|
|
|
|
def forward(self, x): |
|
|
out, gate = x.chunk(2, dim=self.dim) |
|
|
return out * gate.sigmoid() |
|
|
|
|
|
|
|
|
class DepthWiseConv1d(nn.Module): |
|
|
def __init__(self, chan_in, chan_out, kernel_size, padding): |
|
|
super().__init__() |
|
|
self.padding = padding |
|
|
self.conv = nn.Conv1d(chan_in, chan_out, kernel_size, groups=chan_in) |
|
|
|
|
|
def forward(self, x): |
|
|
x = F.pad(x, self.padding) |
|
|
return self.conv(x) |
|
|
|
|
|
|
|
|
class ConformerConvModule(nn.Module): |
|
|
def __init__( |
|
|
self, dim, causal=False, expansion_factor=2, kernel_size=31, dropout=0.0 |
|
|
): |
|
|
super().__init__() |
|
|
|
|
|
inner_dim = dim * expansion_factor |
|
|
padding = calc_same_padding(kernel_size) if not causal else (kernel_size - 1, 0) |
|
|
|
|
|
self.net = nn.Sequential( |
|
|
nn.LayerNorm(dim), |
|
|
Transpose((1, 2)), |
|
|
nn.Conv1d(dim, inner_dim * 2, 1), |
|
|
GLU(dim=1), |
|
|
DepthWiseConv1d( |
|
|
inner_dim, inner_dim, kernel_size=kernel_size, padding=padding |
|
|
), |
|
|
Swish(), |
|
|
nn.Conv1d(inner_dim, dim, 1), |
|
|
Transpose((1, 2)), |
|
|
nn.Dropout(dropout), |
|
|
) |
|
|
|
|
|
def forward(self, x): |
|
|
return self.net(x) |
|
|
|
|
|
|
|
|
def linear_attention(q, k, v): |
|
|
if v is None: |
|
|
out = torch.einsum("...ed,...nd->...ne", k, q) |
|
|
return out |
|
|
else: |
|
|
k_cumsum = k.sum(dim=-2) |
|
|
D_inv = 1.0 / (torch.einsum("...nd,...d->...n", q, k_cumsum.type_as(q)) + 1e-8) |
|
|
context = torch.einsum("...nd,...ne->...de", k, v) |
|
|
out = torch.einsum("...de,...nd,...n->...ne", context, q, D_inv) |
|
|
return out |
|
|
|
|
|
|
|
|
def gaussian_orthogonal_random_matrix( |
|
|
nb_rows, nb_columns, scaling=0, qr_uniform_q=False, device=None |
|
|
): |
|
|
nb_full_blocks = int(nb_rows / nb_columns) |
|
|
block_list = [] |
|
|
|
|
|
for _ in range(nb_full_blocks): |
|
|
q = orthogonal_matrix_chunk(nb_columns, qr_uniform_q=qr_uniform_q, device=device) |
|
|
block_list.append(q) |
|
|
|
|
|
remaining_rows = nb_rows - nb_full_blocks * nb_columns |
|
|
if remaining_rows > 0: |
|
|
q = orthogonal_matrix_chunk(nb_columns, qr_uniform_q=qr_uniform_q, device=device) |
|
|
block_list.append(q[:remaining_rows]) |
|
|
|
|
|
final_matrix = torch.cat(block_list) |
|
|
|
|
|
if scaling == 0: |
|
|
multiplier = torch.randn((nb_rows, nb_columns), device=device).norm(dim=1) |
|
|
elif scaling == 1: |
|
|
multiplier = math.sqrt((float(nb_columns))) * torch.ones( |
|
|
(nb_rows,), device=device |
|
|
) |
|
|
else: |
|
|
raise ValueError(f"Invalid scaling {scaling}") |
|
|
|
|
|
return torch.diag(multiplier) @ final_matrix |
|
|
|
|
|
|
|
|
class FastAttention(nn.Module): |
|
|
def __init__( |
|
|
self, |
|
|
dim_heads, |
|
|
nb_features=None, |
|
|
ortho_scaling=0, |
|
|
causal=False, |
|
|
generalized_attention=False, |
|
|
kernel_fn=nn.ReLU(), |
|
|
qr_uniform_q=False, |
|
|
no_projection=False, |
|
|
): |
|
|
super().__init__() |
|
|
nb_features = default(nb_features, int(dim_heads * math.log(dim_heads))) |
|
|
|
|
|
self.dim_heads = dim_heads |
|
|
self.nb_features = nb_features |
|
|
self.ortho_scaling = ortho_scaling |
|
|
|
|
|
self.create_projection = partial( |
|
|
gaussian_orthogonal_random_matrix, |
|
|
nb_rows=self.nb_features, |
|
|
nb_columns=dim_heads, |
|
|
scaling=ortho_scaling, |
|
|
qr_uniform_q=qr_uniform_q, |
|
|
) |
|
|
projection_matrix = self.create_projection() |
|
|
self.register_buffer("projection_matrix", projection_matrix) |
|
|
|
|
|
self.generalized_attention = generalized_attention |
|
|
self.kernel_fn = kernel_fn |
|
|
self.no_projection = no_projection |
|
|
self.causal = causal |
|
|
|
|
|
@torch.no_grad() |
|
|
def redraw_projection_matrix(self): |
|
|
projections = self.create_projection() |
|
|
self.projection_matrix.copy_(projections) |
|
|
del projections |
|
|
|
|
|
def forward(self, q, k, v): |
|
|
device = q.device |
|
|
|
|
|
if self.no_projection: |
|
|
q = q.softmax(dim=-1) |
|
|
k = torch.exp(k) if self.causal else k.softmax(dim=-2) |
|
|
else: |
|
|
create_kernel = partial( |
|
|
softmax_kernel, projection_matrix=self.projection_matrix, device=device |
|
|
) |
|
|
q = create_kernel(q, is_query=True) |
|
|
k = create_kernel(k, is_query=False) |
|
|
|
|
|
attn_fn = linear_attention if not self.causal else self.causal_linear_fn |
|
|
|
|
|
if v is None: |
|
|
out = attn_fn(q, k, None) |
|
|
return out |
|
|
else: |
|
|
out = attn_fn(q, k, v) |
|
|
return out |
|
|
|
|
|
|
|
|
class SelfAttention(nn.Module): |
|
|
def __init__( |
|
|
self, |
|
|
dim, |
|
|
causal=False, |
|
|
heads=8, |
|
|
dim_head=64, |
|
|
local_heads=0, |
|
|
local_window_size=256, |
|
|
nb_features=None, |
|
|
feature_redraw_interval=1000, |
|
|
generalized_attention=False, |
|
|
kernel_fn=nn.ReLU(), |
|
|
qr_uniform_q=False, |
|
|
dropout=0.0, |
|
|
no_projection=False, |
|
|
): |
|
|
super().__init__() |
|
|
assert dim % heads == 0, "dimension must be divisible by number of heads" |
|
|
dim_head = default(dim_head, dim // heads) |
|
|
inner_dim = dim_head * heads |
|
|
self.fast_attention = FastAttention( |
|
|
dim_head, |
|
|
nb_features, |
|
|
causal=causal, |
|
|
generalized_attention=generalized_attention, |
|
|
kernel_fn=kernel_fn, |
|
|
qr_uniform_q=qr_uniform_q, |
|
|
no_projection=no_projection, |
|
|
) |
|
|
|
|
|
self.heads = heads |
|
|
self.global_heads = heads - local_heads |
|
|
self.local_attn = ( |
|
|
LocalAttention( |
|
|
window_size=local_window_size, |
|
|
causal=causal, |
|
|
autopad=True, |
|
|
dropout=dropout, |
|
|
look_forward=int(not causal), |
|
|
rel_pos_emb_config=(dim_head, local_heads), |
|
|
) |
|
|
if local_heads > 0 |
|
|
else None |
|
|
) |
|
|
|
|
|
self.to_q = nn.Linear(dim, inner_dim) |
|
|
self.to_k = nn.Linear(dim, inner_dim) |
|
|
self.to_v = nn.Linear(dim, inner_dim) |
|
|
self.to_out = nn.Linear(inner_dim, dim) |
|
|
self.dropout = nn.Dropout(dropout) |
|
|
|
|
|
@torch.no_grad() |
|
|
def redraw_projection_matrix(self): |
|
|
self.fast_attention.redraw_projection_matrix() |
|
|
|
|
|
def forward( |
|
|
self, |
|
|
x, |
|
|
context=None, |
|
|
mask=None, |
|
|
context_mask=None, |
|
|
name=None, |
|
|
inference=False, |
|
|
**kwargs, |
|
|
): |
|
|
_, _, _, h, gh = *x.shape, self.heads, self.global_heads |
|
|
|
|
|
cross_attend = exists(context) |
|
|
context = default(context, x) |
|
|
context_mask = default(context_mask, mask) if not cross_attend else context_mask |
|
|
q, k, v = self.to_q(x), self.to_k(context), self.to_v(context) |
|
|
|
|
|
q, k, v = map(lambda t: rearrange(t, "b n (h d) -> b h n d", h=h), (q, k, v)) |
|
|
(q, lq), (k, lk), (v, lv) = map(lambda t: (t[:, :gh], t[:, gh:]), (q, k, v)) |
|
|
|
|
|
attn_outs = [] |
|
|
if not empty(q): |
|
|
if exists(context_mask): |
|
|
global_mask = context_mask[:, None, :, None] |
|
|
v.masked_fill_(~global_mask, 0.0) |
|
|
if cross_attend: |
|
|
pass |
|
|
else: |
|
|
out = self.fast_attention(q, k, v) |
|
|
attn_outs.append(out) |
|
|
|
|
|
if not empty(lq): |
|
|
assert ( |
|
|
not cross_attend |
|
|
), "local attention is not compatible with cross attention" |
|
|
out = self.local_attn(lq, lk, lv, input_mask=mask) |
|
|
attn_outs.append(out) |
|
|
|
|
|
out = torch.cat(attn_outs, dim=1) |
|
|
out = rearrange(out, "b h n d -> b n (h d)") |
|
|
out = self.to_out(out) |
|
|
return self.dropout(out) |
|
|
|
|
|
|
|
|
def l2_regularization(model, l2_alpha): |
|
|
l2_loss = [] |
|
|
for module in model.modules(): |
|
|
if type(module) is nn.Conv2d: |
|
|
l2_loss.append((module.weight**2).sum() / 2.0) |
|
|
return l2_alpha * sum(l2_loss) |
|
|
|
|
|
|
|
|
class FCPE(nn.Module): |
|
|
def __init__( |
|
|
self, |
|
|
input_channel=128, |
|
|
out_dims=360, |
|
|
n_layers=12, |
|
|
n_chans=512, |
|
|
use_siren=False, |
|
|
use_full=False, |
|
|
loss_mse_scale=10, |
|
|
loss_l2_regularization=False, |
|
|
loss_l2_regularization_scale=1, |
|
|
loss_grad1_mse=False, |
|
|
loss_grad1_mse_scale=1, |
|
|
f0_max=1975.5, |
|
|
f0_min=32.70, |
|
|
confidence=False, |
|
|
threshold=0.05, |
|
|
use_input_conv=True, |
|
|
): |
|
|
super().__init__() |
|
|
if use_siren is True: |
|
|
raise ValueError("Siren is not supported yet.") |
|
|
if use_full is True: |
|
|
raise ValueError("Full model is not supported yet.") |
|
|
|
|
|
self.loss_mse_scale = loss_mse_scale if (loss_mse_scale is not None) else 10 |
|
|
self.loss_l2_regularization = ( |
|
|
loss_l2_regularization if (loss_l2_regularization is not None) else False |
|
|
) |
|
|
self.loss_l2_regularization_scale = ( |
|
|
loss_l2_regularization_scale |
|
|
if (loss_l2_regularization_scale is not None) |
|
|
else 1 |
|
|
) |
|
|
self.loss_grad1_mse = loss_grad1_mse if (loss_grad1_mse is not None) else False |
|
|
self.loss_grad1_mse_scale = ( |
|
|
loss_grad1_mse_scale if (loss_grad1_mse_scale is not None) else 1 |
|
|
) |
|
|
self.f0_max = f0_max if (f0_max is not None) else 1975.5 |
|
|
self.f0_min = f0_min if (f0_min is not None) else 32.70 |
|
|
self.confidence = confidence if (confidence is not None) else False |
|
|
self.threshold = threshold if (threshold is not None) else 0.05 |
|
|
self.use_input_conv = use_input_conv if (use_input_conv is not None) else True |
|
|
|
|
|
self.cent_table_b = torch.Tensor( |
|
|
np.linspace( |
|
|
self.f0_to_cent(torch.Tensor([f0_min]))[0], |
|
|
self.f0_to_cent(torch.Tensor([f0_max]))[0], |
|
|
out_dims, |
|
|
) |
|
|
) |
|
|
self.register_buffer("cent_table", self.cent_table_b) |
|
|
|
|
|
_leaky = nn.LeakyReLU() |
|
|
self.stack = nn.Sequential( |
|
|
nn.Conv1d(input_channel, n_chans, 3, 1, 1), |
|
|
nn.GroupNorm(4, n_chans), |
|
|
_leaky, |
|
|
nn.Conv1d(n_chans, n_chans, 3, 1, 1), |
|
|
) |
|
|
|
|
|
self.decoder = PCmer( |
|
|
num_layers=n_layers, |
|
|
num_heads=8, |
|
|
dim_model=n_chans, |
|
|
dim_keys=n_chans, |
|
|
dim_values=n_chans, |
|
|
residual_dropout=0.1, |
|
|
attention_dropout=0.1, |
|
|
) |
|
|
self.norm = nn.LayerNorm(n_chans) |
|
|
|
|
|
self.n_out = out_dims |
|
|
self.dense_out = weight_norm(nn.Linear(n_chans, self.n_out)) |
|
|
|
|
|
def forward( |
|
|
self, mel, infer=True, gt_f0=None, return_hz_f0=False, cdecoder="local_argmax" |
|
|
): |
|
|
if cdecoder == "argmax": |
|
|
self.cdecoder = self.cents_decoder |
|
|
elif cdecoder == "local_argmax": |
|
|
self.cdecoder = self.cents_local_decoder |
|
|
|
|
|
x = ( |
|
|
self.stack(mel.transpose(1, 2)).transpose(1, 2) |
|
|
if self.use_input_conv |
|
|
else mel |
|
|
) |
|
|
x = self.decoder(x) |
|
|
x = self.norm(x) |
|
|
x = self.dense_out(x) |
|
|
x = torch.sigmoid(x) |
|
|
|
|
|
if not infer: |
|
|
gt_cent_f0 = self.f0_to_cent(gt_f0) |
|
|
gt_cent_f0 = self.gaussian_blurred_cent(gt_cent_f0) |
|
|
loss_all = self.loss_mse_scale * F.binary_cross_entropy(x, gt_cent_f0) |
|
|
if self.loss_l2_regularization: |
|
|
loss_all = loss_all + l2_regularization( |
|
|
model=self, l2_alpha=self.loss_l2_regularization_scale |
|
|
) |
|
|
x = loss_all |
|
|
if infer: |
|
|
x = self.cdecoder(x) |
|
|
x = self.cent_to_f0(x) |
|
|
x = (1 + x / 700).log() if not return_hz_f0 else x |
|
|
|
|
|
return x |
|
|
|
|
|
def cents_decoder(self, y, mask=True): |
|
|
B, N, _ = y.size() |
|
|
ci = self.cent_table[None, None, :].expand(B, N, -1) |
|
|
rtn = torch.sum(ci * y, dim=-1, keepdim=True) / torch.sum(y, dim=-1, keepdim=True) |
|
|
if mask: |
|
|
confident = torch.max(y, dim=-1, keepdim=True)[0] |
|
|
confident_mask = torch.ones_like(confident) |
|
|
confident_mask[confident <= self.threshold] = float("-INF") |
|
|
rtn = rtn * confident_mask |
|
|
return (rtn, confident) if self.confidence else rtn |
|
|
|
|
|
def cents_local_decoder(self, y, mask=True): |
|
|
B, N, _ = y.size() |
|
|
ci = self.cent_table[None, None, :].expand(B, N, -1) |
|
|
confident, max_index = torch.max(y, dim=-1, keepdim=True) |
|
|
local_argmax_index = torch.arange(0, 9).to(max_index.device) + (max_index - 4) |
|
|
local_argmax_index = torch.clamp(local_argmax_index, 0, self.n_out - 1) |
|
|
ci_l = torch.gather(ci, -1, local_argmax_index) |
|
|
y_l = torch.gather(y, -1, local_argmax_index) |
|
|
rtn = torch.sum(ci_l * y_l, dim=-1, keepdim=True) / torch.sum( |
|
|
y_l, dim=-1, keepdim=True |
|
|
) |
|
|
if mask: |
|
|
confident_mask = torch.ones_like(confident) |
|
|
confident_mask[confident <= self.threshold] = float("-INF") |
|
|
rtn = rtn * confident_mask |
|
|
return (rtn, confident) if self.confidence else rtn |
|
|
|
|
|
def cent_to_f0(self, cent): |
|
|
return 10.0 * 2 ** (cent / 1200.0) |
|
|
|
|
|
def f0_to_cent(self, f0): |
|
|
return 1200.0 * torch.log2(f0 / 10.0) |
|
|
|
|
|
def gaussian_blurred_cent(self, cents): |
|
|
mask = (cents > 0.1) & (cents < (1200.0 * np.log2(self.f0_max / 10.0))) |
|
|
B, N, _ = cents.size() |
|
|
ci = self.cent_table[None, None, :].expand(B, N, -1) |
|
|
return torch.exp(-torch.square(ci - cents) / 1250) * mask.float() |
|
|
|
|
|
|
|
|
class FCPEInfer: |
|
|
def __init__(self, model_path, device=None, dtype=torch.float32): |
|
|
if device is None: |
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
self.device = device |
|
|
ckpt = torch.load(model_path, map_location=torch.device(self.device)) |
|
|
self.args = DotDict(ckpt["config"]) |
|
|
self.dtype = dtype |
|
|
model = FCPE( |
|
|
input_channel=self.args.model.input_channel, |
|
|
out_dims=self.args.model.out_dims, |
|
|
n_layers=self.args.model.n_layers, |
|
|
n_chans=self.args.model.n_chans, |
|
|
use_siren=self.args.model.use_siren, |
|
|
use_full=self.args.model.use_full, |
|
|
loss_mse_scale=self.args.loss.loss_mse_scale, |
|
|
loss_l2_regularization=self.args.loss.loss_l2_regularization, |
|
|
loss_l2_regularization_scale=self.args.loss.loss_l2_regularization_scale, |
|
|
loss_grad1_mse=self.args.loss.loss_grad1_mse, |
|
|
loss_grad1_mse_scale=self.args.loss.loss_grad1_mse_scale, |
|
|
f0_max=self.args.model.f0_max, |
|
|
f0_min=self.args.model.f0_min, |
|
|
confidence=self.args.model.confidence, |
|
|
) |
|
|
model.to(self.device).to(self.dtype) |
|
|
model.load_state_dict(ckpt["model"]) |
|
|
model.eval() |
|
|
self.model = model |
|
|
self.wav2mel = Wav2Mel(self.args, dtype=self.dtype, device=self.device) |
|
|
|
|
|
@torch.no_grad() |
|
|
def __call__(self, audio, sr, threshold=0.05): |
|
|
self.model.threshold = threshold |
|
|
audio = audio[None, :] |
|
|
mel = self.wav2mel(audio=audio, sample_rate=sr).to(self.dtype) |
|
|
f0 = self.model(mel=mel, infer=True, return_hz_f0=True) |
|
|
return f0 |
|
|
|
|
|
|
|
|
class Wav2Mel: |
|
|
def __init__(self, args, device=None, dtype=torch.float32): |
|
|
self.sample_rate = args.mel.sampling_rate |
|
|
self.hop_size = args.mel.hop_size |
|
|
if device is None: |
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
self.device = device |
|
|
self.dtype = dtype |
|
|
self.stft = STFT( |
|
|
args.mel.sampling_rate, |
|
|
args.mel.num_mels, |
|
|
args.mel.n_fft, |
|
|
args.mel.win_size, |
|
|
args.mel.hop_size, |
|
|
args.mel.fmin, |
|
|
args.mel.fmax, |
|
|
) |
|
|
self.resample_kernel = {} |
|
|
|
|
|
def extract_nvstft(self, audio, keyshift=0, train=False): |
|
|
mel = self.stft.get_mel(audio, keyshift=keyshift, train=train).transpose(1, 2) |
|
|
return mel |
|
|
|
|
|
def extract_mel(self, audio, sample_rate, keyshift=0, train=False): |
|
|
audio = audio.to(self.dtype).to(self.device) |
|
|
if sample_rate == self.sample_rate: |
|
|
audio_res = audio |
|
|
else: |
|
|
key_str = str(sample_rate) |
|
|
if key_str not in self.resample_kernel: |
|
|
self.resample_kernel[key_str] = Resample( |
|
|
sample_rate, self.sample_rate, lowpass_filter_width=128 |
|
|
) |
|
|
self.resample_kernel[key_str] = ( |
|
|
self.resample_kernel[key_str].to(self.dtype).to(self.device) |
|
|
) |
|
|
audio_res = self.resample_kernel[key_str](audio) |
|
|
|
|
|
mel = self.extract_nvstft(audio_res, keyshift=keyshift, train=train) |
|
|
n_frames = int(audio.shape[1] // self.hop_size) + 1 |
|
|
mel = torch.cat((mel, mel[:, -1:, :]), 1) if n_frames > int(mel.shape[1]) else mel |
|
|
mel = mel[:, :n_frames, :] if n_frames < int(mel.shape[1]) else mel |
|
|
return mel |
|
|
|
|
|
def __call__(self, audio, sample_rate, keyshift=0, train=False): |
|
|
return self.extract_mel(audio, sample_rate, keyshift=keyshift, train=train) |
|
|
|
|
|
|
|
|
class DotDict(dict): |
|
|
def __getattr__(*args): |
|
|
val = dict.get(*args) |
|
|
return DotDict(val) if type(val) is dict else val |
|
|
|
|
|
__setattr__ = dict.__setitem__ |
|
|
__delattr__ = dict.__delitem__ |
|
|
|
|
|
|
|
|
class F0Predictor(object): |
|
|
def compute_f0(self, wav, p_len): |
|
|
pass |
|
|
|
|
|
def compute_f0_uv(self, wav, p_len): |
|
|
pass |
|
|
|
|
|
|
|
|
class FCPEF0Predictor(F0Predictor): |
|
|
def __init__( |
|
|
self, |
|
|
model_path, |
|
|
hop_length=512, |
|
|
f0_min=50, |
|
|
f0_max=1100, |
|
|
dtype=torch.float32, |
|
|
device=None, |
|
|
sample_rate=44100, |
|
|
threshold=0.05, |
|
|
): |
|
|
self.fcpe = FCPEInfer(model_path, device=device, dtype=dtype) |
|
|
self.hop_length = hop_length |
|
|
self.f0_min = f0_min |
|
|
self.f0_max = f0_max |
|
|
self.device = device or ("cuda" if torch.cuda.is_available() else "cpu") |
|
|
self.threshold = threshold |
|
|
self.sample_rate = sample_rate |
|
|
self.dtype = dtype |
|
|
self.name = "fcpe" |
|
|
|
|
|
def repeat_expand( |
|
|
self, |
|
|
content: Union[torch.Tensor, np.ndarray], |
|
|
target_len: int, |
|
|
mode: str = "nearest", |
|
|
): |
|
|
ndim = content.ndim |
|
|
content = ( |
|
|
content[None, None] if ndim == 1 else content[None] if ndim == 2 else content |
|
|
) |
|
|
assert content.ndim == 3 |
|
|
is_np = isinstance(content, np.ndarray) |
|
|
content = torch.from_numpy(content) if is_np else content |
|
|
results = torch.nn.functional.interpolate(content, size=target_len, mode=mode) |
|
|
results = results.numpy() if is_np else results |
|
|
return results[0, 0] if ndim == 1 else results[0] if ndim == 2 else results |
|
|
|
|
|
def post_process(self, x, sample_rate, f0, pad_to): |
|
|
f0 = ( |
|
|
torch.from_numpy(f0).float().to(x.device) |
|
|
if isinstance(f0, np.ndarray) |
|
|
else f0 |
|
|
) |
|
|
f0 = self.repeat_expand(f0, pad_to) if pad_to is not None else f0 |
|
|
|
|
|
vuv_vector = torch.zeros_like(f0) |
|
|
vuv_vector[f0 > 0.0] = 1.0 |
|
|
vuv_vector[f0 <= 0.0] = 0.0 |
|
|
|
|
|
nzindex = torch.nonzero(f0).squeeze() |
|
|
f0 = torch.index_select(f0, dim=0, index=nzindex).cpu().numpy() |
|
|
time_org = self.hop_length / sample_rate * nzindex.cpu().numpy() |
|
|
time_frame = np.arange(pad_to) * self.hop_length / sample_rate |
|
|
|
|
|
vuv_vector = F.interpolate(vuv_vector[None, None, :], size=pad_to)[0][0] |
|
|
|
|
|
if f0.shape[0] <= 0: |
|
|
return np.zeros(pad_to), vuv_vector.cpu().numpy() |
|
|
if f0.shape[0] == 1: |
|
|
return np.ones(pad_to) * f0[0], vuv_vector.cpu().numpy() |
|
|
|
|
|
f0 = np.interp(time_frame, time_org, f0, left=f0[0], right=f0[-1]) |
|
|
return f0, vuv_vector.cpu().numpy() |
|
|
|
|
|
def compute_f0(self, wav, p_len=None): |
|
|
x = torch.FloatTensor(wav).to(self.dtype).to(self.device) |
|
|
p_len = x.shape[0] // self.hop_length if p_len is None else p_len |
|
|
f0 = self.fcpe(x, sr=self.sample_rate, threshold=self.threshold)[0, :, 0] |
|
|
if torch.all(f0 == 0): |
|
|
return f0.cpu().numpy() if p_len is None else np.zeros(p_len), ( |
|
|
f0.cpu().numpy() if p_len is None else np.zeros(p_len) |
|
|
) |
|
|
return self.post_process(x, self.sample_rate, f0, p_len)[0] |
|
|
|
|
|
def compute_f0_uv(self, wav, p_len=None): |
|
|
x = torch.FloatTensor(wav).to(self.dtype).to(self.device) |
|
|
p_len = x.shape[0] // self.hop_length if p_len is None else p_len |
|
|
f0 = self.fcpe(x, sr=self.sample_rate, threshold=self.threshold)[0, :, 0] |
|
|
if torch.all(f0 == 0): |
|
|
return f0.cpu().numpy() if p_len is None else np.zeros(p_len), ( |
|
|
f0.cpu().numpy() if p_len is None else np.zeros(p_len) |
|
|
) |
|
|
return self.post_process(x, self.sample_rate, f0, p_len) |
|
|
|
|
|
''' |
|
|
|
|
|
with open(os.sep.join([current_dir, dirs[6], "FCPE.py"]), 'w') as f: |
|
|
f.write(FCPE) |
|
|
|
|
|
|
|
|
VBACH_CLI = ''' |
|
|
import gc |
|
|
import os |
|
|
import datetime |
|
|
import gradio as gr |
|
|
import torch |
|
|
import librosa |
|
|
import tempfile |
|
|
from datetime import datetime |
|
|
import argparse |
|
|
from vbach.infer.infer import Config, load_hubert, get_vc, rvc_infer |
|
|
|
|
|
# Константы |
|
|
|
|
|
RVC_MODELS_DIR = os.path.join(os.getcwd(), "voice_models") |
|
|
HUBERT_MODEL_PATH = os.path.join( |
|
|
os.getcwd(), "vbach", "models", "embedders", "hubert_base.pt" |
|
|
) |
|
|
OUTPUT_FORMAT = ["mp3", "wav", "flac", "aiff", "m4a", "aac", "ogg", "opus"] |
|
|
|
|
|
audio_extensions = {".mp3", ".wav", ".flac", ".aiff", ".m4a", ".aac", ".ogg", ".opus"} |
|
|
|
|
|
|
|
|
# Важные функции |
|
|
|
|
|
def load_rvc_model(voice_model): |
|
|
model_dir = os.path.join(RVC_MODELS_DIR, voice_model) |
|
|
model_files = os.listdir(model_dir) |
|
|
rvc_model_path = next( |
|
|
(os.path.join(model_dir, f) for f in model_files if f.endswith(".pth")), None |
|
|
) |
|
|
rvc_index_path = next( |
|
|
(os.path.join(model_dir, f) for f in model_files if f.endswith(".index")), None |
|
|
) |
|
|
|
|
|
if not rvc_model_path: |
|
|
raise ValueError( |
|
|
f"\033[91mМодели {voice_model} не существует. " |
|
|
"Возможно, вы неправильно ввели имя.\033[0m" |
|
|
) |
|
|
|
|
|
return rvc_model_path, rvc_index_path |
|
|
|
|
|
def voice_conversion( |
|
|
voice_model, |
|
|
vocals_path, |
|
|
output_path, |
|
|
pitch, |
|
|
f0_method, |
|
|
index_rate, |
|
|
filter_radius, |
|
|
volume_envelope, |
|
|
protect, |
|
|
hop_length, |
|
|
f0_min, |
|
|
f0_max, |
|
|
format_output, |
|
|
output_bitrate, |
|
|
stereo_mode |
|
|
): |
|
|
rvc_model_path, rvc_index_path = load_rvc_model(voice_model) |
|
|
|
|
|
config = Config() |
|
|
hubert_model = load_hubert(config.device, config.is_half, HUBERT_MODEL_PATH) |
|
|
cpt, version, net_g, tgt_sr, vc = get_vc( |
|
|
config.device, config.is_half, config, rvc_model_path |
|
|
) |
|
|
|
|
|
output_audio = rvc_infer( |
|
|
rvc_index_path, |
|
|
index_rate, |
|
|
vocals_path, |
|
|
output_path, |
|
|
pitch, |
|
|
f0_method, |
|
|
cpt, |
|
|
version, |
|
|
net_g, |
|
|
filter_radius, |
|
|
tgt_sr, |
|
|
volume_envelope, |
|
|
protect, |
|
|
hop_length, |
|
|
vc, |
|
|
hubert_model, |
|
|
f0_min, |
|
|
f0_max, |
|
|
format_output, |
|
|
output_bitrate, |
|
|
stereo_mode |
|
|
) |
|
|
|
|
|
del hubert_model, cpt, net_g, vc |
|
|
gc.collect() |
|
|
torch.cuda.empty_cache() |
|
|
return output_audio |
|
|
|
|
|
def cli_conversion(input_audios, template="NAME_MODEL_F0METHOD_PITCH", output_dir="output", model_name="", index_rate=0, output_format="wav", stereo_mode="mono", method_pitch="rmvpe+", pitch=0, hop_length=128, filter_radius=3, rms=0.25, protect=0.33, f0_min=50, f0_max=1100): |
|
|
if not input_audios: |
|
|
raise ValueError( |
|
|
"Не удалось найти аудиофайл(ы). " |
|
|
"Убедитесь, что файл загрузился или проверьте правильность пути к нему." |
|
|
) |
|
|
if not model_name: |
|
|
raise ValueError("Выберите модель голоса для преобразования.") |
|
|
if not os.path.exists(input_audios): |
|
|
raise ValueError(f"Файл {input_audios} не найден.") |
|
|
|
|
|
if not os.path.exists(input_audios): |
|
|
raise FileNotFoundError(f"Ошибка: '{input_audios}' не существует.") |
|
|
|
|
|
os.makedirs(output_dir, exist_ok=True) |
|
|
|
|
|
if os.path.isfile(input_audios): |
|
|
# Проверяем, является ли файл аудио |
|
|
ext = os.path.splitext(input_audios)[1].lower() |
|
|
if ext not in audio_extensions: |
|
|
raise ValueError(f"Ошибка: '{input_audios}' не является аудиофайлом (допустимые расширения: {audio_extensions}).") |
|
|
print(f"Найден аудиофайл: {input_audios}") |
|
|
|
|
|
try: |
|
|
file_name = os.path.basename(input_audios) |
|
|
namefile = os.path.splitext(file_name)[0] |
|
|
time_create_file = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
output_name = template |
|
|
output_path = os.path.join(output_dir, f"{output_name}.{output_format}") |
|
|
voice_conversion(model_name, input_audios, output_path, pitch, method_pitch, index_rate, filter_radius, rms, protect, hop_length, f0_min, f0_max, output_format, "320k", stereo_mode) |
|
|
finally: |
|
|
print("Вокал успешно преобразован") |
|
|
|
|
|
elif os.path.isdir(input_audios): |
|
|
# Ищем аудиофайлы в папке |
|
|
audio_files = [] |
|
|
for file in os.listdir(input_audios): |
|
|
ext = os.path.splitext(file)[1].lower() |
|
|
if ext in audio_extensions: |
|
|
audio_files.append(os.path.join(input_audios, file)) |
|
|
|
|
|
if not audio_files: |
|
|
raise FileNotFoundError(f"Ошибка: в папке '{input_audios}' нет аудиофайлов (допустимые расширения: {audio_extensions}).") |
|
|
|
|
|
print(f"Найдены аудиофайлы: {audio_files}") |
|
|
|
|
|
try: |
|
|
output_paths = [] |
|
|
for file in audio_files: |
|
|
file_name = os.path.basename(file) |
|
|
namefile = os.path.splitext(file_name)[0] |
|
|
time_create_file = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
output_name = ( |
|
|
template |
|
|
.replace("DATETIME", time_create_file) |
|
|
.replace("NAME", namefile) |
|
|
.replace("MODEL", model_name) |
|
|
.replace("F0METHOD", method_pitch) |
|
|
.replace("PITCH", f"{pitch}") |
|
|
) |
|
|
output_path = os.path.join(output_dir, f"{output_name}.{output_format}") |
|
|
voice_conversion(model_name, file, output_path, pitch, method_pitch, index_rate, filter_radius, rms, protect, hop_length, 50, 1100, output_format, "320k", stereo_mode) |
|
|
output_paths.append(output_path) |
|
|
finally: |
|
|
print("Вокалы успешно преобразованы") |
|
|
else: |
|
|
raise ValueError(f"Ошибка: '{input_audios}' не является ни файлом, ни папкой.") |
|
|
|
|
|
def setup_args(): |
|
|
parser = argparse.ArgumentParser(description='Vbach CLI') |
|
|
|
|
|
# Обязательные аргументы |
|
|
parser.add_argument( |
|
|
'input_audios', |
|
|
type=str, |
|
|
help='Путь к аудиофайлу или папке с аудиофайлами для обработки' |
|
|
) |
|
|
parser.add_argument( |
|
|
'output_dir', |
|
|
type=str, |
|
|
help='Папка для сохранения результатов конвертации' |
|
|
) |
|
|
parser.add_argument( |
|
|
'model_name', |
|
|
type=str, |
|
|
help='Название голосовой модели RVC для преобразования' |
|
|
) |
|
|
|
|
|
# Необязательные аргументы с значениями по умолчанию |
|
|
parser.add_argument( |
|
|
'--template', |
|
|
type=str, |
|
|
default="NAME_MODEL_F0METHOD_PITCH", |
|
|
help='Шаблон имени выходного файла (доступные замены: DATETIME, NAME, MODEL, F0METHOD, PITCH)' |
|
|
) |
|
|
parser.add_argument( |
|
|
'--index_rate', |
|
|
type=float, |
|
|
default=0, |
|
|
help='Интенсивность использования индексного файла (от 0.0 до 1.0)', |
|
|
metavar='[0.0-1.0]' |
|
|
) |
|
|
parser.add_argument( |
|
|
'--output_format', |
|
|
type=str, |
|
|
default="wav", |
|
|
choices=OUTPUT_FORMAT, |
|
|
help='Формат выходного аудиофайла' |
|
|
) |
|
|
parser.add_argument( |
|
|
'--stereo_mode', |
|
|
type=str, |
|
|
default="mono", |
|
|
choices=["mono", "left/right", "sim/dif"], |
|
|
help='Режим каналов: моно или стерео' |
|
|
) |
|
|
parser.add_argument( |
|
|
'--method_pitch', |
|
|
type=str, |
|
|
default="rmvpe+", |
|
|
help='Метод извлечения pitch (тона)' |
|
|
) |
|
|
parser.add_argument( |
|
|
'--pitch', |
|
|
type=int, |
|
|
default=0, |
|
|
help='Корректировка тона в полутонах' |
|
|
) |
|
|
parser.add_argument( |
|
|
'--hop_length', |
|
|
type=int, |
|
|
default=128, |
|
|
help='Длина hop (в семплах) для обработки' |
|
|
) |
|
|
parser.add_argument( |
|
|
'--filter_radius', |
|
|
type=int, |
|
|
default=3, |
|
|
help='Радиус фильтра для сглаживания' |
|
|
) |
|
|
parser.add_argument( |
|
|
'--rms', |
|
|
type=float, |
|
|
default=0.25, |
|
|
help='Масштабирование огибающей громкости (RMS)' |
|
|
) |
|
|
parser.add_argument( |
|
|
'--protect', |
|
|
type=float, |
|
|
default=0.33, |
|
|
help='Защита для глухих согласных звуков' |
|
|
) |
|
|
parser.add_argument( |
|
|
'--f0_min', |
|
|
type=int, |
|
|
default=50, |
|
|
help='Минимальная частота pitch (F0) в Hz' |
|
|
) |
|
|
parser.add_argument( |
|
|
'--f0_max', |
|
|
type=int, |
|
|
default=1100, |
|
|
help='Максимальная частота pitch (F0) в Hz' |
|
|
) |
|
|
|
|
|
return parser.parse_args() |
|
|
|
|
|
# Пример использования: |
|
|
if __name__ == "__main__": |
|
|
args = setup_args() |
|
|
cli_conversion( |
|
|
input_audios=args.input_audios, |
|
|
output_dir=args.output_dir, |
|
|
model_name=args.model_name, |
|
|
template=args.template, |
|
|
index_rate=args.index_rate, |
|
|
output_format=args.output_format, |
|
|
stereo_mode=args.stereo_mode, |
|
|
method_pitch=args.method_pitch, |
|
|
pitch=args.pitch, |
|
|
hop_length=args.hop_length, |
|
|
filter_radius=args.filter_radius, |
|
|
rms=args.rms, |
|
|
protect=args.protect, |
|
|
f0_min=args.f0_min, |
|
|
f0_max=args.f0_max |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
''' |
|
|
|
|
|
with open(os.sep.join([current_dir, dirs[2], "vbach.py"]), 'w') as f: |
|
|
f.write(VBACH_CLI) |
|
|
|
|
|
def set_language(lang): |
|
|
global CURRENT_LANG |
|
|
CURRENT_LANG = lang |
|
|
|
|
|
|
|
|
def t(key, **kwargs): |
|
|
translation = TRANSLATIONS[CURRENT_LANG].get(key, key) |
|
|
if isinstance(translation, dict): |
|
|
return translation |
|
|
return translation.format(**kwargs) if kwargs else translation |
|
|
|
|
|
def download_file(url, zip_name, progress): |
|
|
try: |
|
|
if "drive.google.com" in url: |
|
|
progress(0.5, desc=t('downloading_google')) |
|
|
download_from_google_drive(url, zip_name, progress) |
|
|
elif "huggingface.co" in url: |
|
|
progress(0.5, desc=t('downloading_huggingface')) |
|
|
download_from_huggingface(url, zip_name, progress) |
|
|
elif "pixeldrain.com" in url: |
|
|
progress(0.5, desc=t('downloading_pixeldrain')) |
|
|
download_from_pixeldrain(url, zip_name, progress) |
|
|
elif "mega.nz" in url: |
|
|
print(t('mega_unsupported')) |
|
|
elif "disk.yandex.ru" in url or "yadi.sk" in url: |
|
|
progress(0.5, desc=t('downloading_yandex')) |
|
|
download_from_yandex(url, zip_name, progress) |
|
|
else: |
|
|
raise ValueError(t('unsupported_source', url=url)) |
|
|
except Exception as e: |
|
|
raise gr.Error(t('download_error', error=str(e))) |
|
|
|
|
|
def download_from_google_drive(url, zip_name, progress): |
|
|
file_id = ( |
|
|
url.split("file/d/")[1].split("/")[0] |
|
|
if "file/d/" in url |
|
|
else url.split("id=")[1].split("&")[0] |
|
|
) |
|
|
gdown.download(id=file_id, output=str(zip_name), quiet=False) |
|
|
|
|
|
def download_from_huggingface(url, zip_name, progress): |
|
|
urllib.request.urlretrieve(url, zip_name) |
|
|
|
|
|
def download_from_pixeldrain(url, zip_name, progress): |
|
|
file_id = url.split("pixeldrain.com/u/")[1] |
|
|
response = requests.get(f"https://pixeldrain.com/api/file/{file_id}") |
|
|
with open(zip_name, "wb") as f: |
|
|
f.write(response.content) |
|
|
|
|
|
def download_from_yandex(url, zip_name, progress): |
|
|
yandex_public_key = f"download?public_key={url}" |
|
|
yandex_api_url = f"https://cloud-api.yandex.net/v1/disk/public/resources/{yandex_public_key}" |
|
|
response = requests.get(yandex_api_url) |
|
|
if response.status_code == 200: |
|
|
download_link = response.json().get("href") |
|
|
urllib.request.urlretrieve(download_link, zip_name) |
|
|
else: |
|
|
raise gr.Error(t('yandex_api_error', status=response.status_code)) |
|
|
|
|
|
def extract_zip(extraction_folder, zip_name): |
|
|
os.makedirs(extraction_folder, exist_ok=True) |
|
|
with zipfile.ZipFile(zip_name, "r") as zip_ref: |
|
|
zip_ref.extractall(extraction_folder) |
|
|
os.remove(zip_name) |
|
|
|
|
|
index_filepath, model_filepath = None, None |
|
|
for root, _, files in os.walk(extraction_folder): |
|
|
for name in files: |
|
|
file_path = os.path.join(root, name) |
|
|
if name.endswith(".index") and os.stat(file_path).st_size > 1024 * 100: |
|
|
index_filepath = file_path |
|
|
if name.endswith(".pth") and os.stat(file_path).st_size > 1024 * 1024 * 40: |
|
|
model_filepath = file_path |
|
|
|
|
|
if not model_filepath: |
|
|
raise gr.Error(t('pth_not_found', folder=extraction_folder)) |
|
|
|
|
|
rename_and_cleanup(extraction_folder, model_filepath, index_filepath) |
|
|
|
|
|
def rename_and_cleanup(extraction_folder, model_filepath, index_filepath): |
|
|
os.rename( |
|
|
model_filepath, |
|
|
os.path.join(extraction_folder, os.path.basename(model_filepath)), |
|
|
) |
|
|
if index_filepath: |
|
|
os.rename( |
|
|
index_filepath, |
|
|
os.path.join(extraction_folder, os.path.basename(index_filepath)), |
|
|
) |
|
|
|
|
|
for filepath in os.listdir(extraction_folder): |
|
|
full_path = os.path.join(extraction_folder, filepath) |
|
|
if os.path.isdir(full_path): |
|
|
shutil.rmtree(full_path) |
|
|
|
|
|
def download_from_url(url, dir_name, progress=gr.Progress()): |
|
|
try: |
|
|
progress(0, desc=t('downloading_model', dir_name=dir_name)) |
|
|
zip_name = os.path.join(dirs[0], dir_name + ".zip") |
|
|
extraction_folder = os.path.join(current_dir, dirs[0], dir_name) |
|
|
|
|
|
if os.path.exists(extraction_folder): |
|
|
raise gr.Error(t('model_exists', dir_name=dir_name)) |
|
|
|
|
|
download_file(url, zip_name, progress) |
|
|
progress(0.8, desc=t('unpacking_zip')) |
|
|
extract_zip(extraction_folder, zip_name) |
|
|
return t('model_uploaded', dir_name=dir_name) |
|
|
except Exception as e: |
|
|
raise gr.Error(t('model_load_error', error=str(e))) |
|
|
|
|
|
def upload_zip_file(zip_path, dir_name, progress=gr.Progress()): |
|
|
try: |
|
|
extraction_folder = os.path.join(current_dir, dirs[0], dir_name) |
|
|
if os.path.exists(extraction_folder): |
|
|
raise gr.Error(t('model_exists', dir_name=dir_name)) |
|
|
|
|
|
zip_name = zip_path.name |
|
|
progress(0.8, desc=t('unpacking_zip')) |
|
|
extract_zip(extraction_folder, zip_name) |
|
|
return t('model_uploaded', dir_name=dir_name) |
|
|
except Exception as e: |
|
|
raise gr.Error(t('model_load_error', error=str(e))) |
|
|
|
|
|
def upload_separate_files(pth_file, index_file, dir_name, progress=gr.Progress()): |
|
|
try: |
|
|
extraction_folder = os.path.join(current_dir, dirs[0], dir_name) |
|
|
if os.path.exists(extraction_folder): |
|
|
raise gr.Error(t('model_exists', dir_name=dir_name)) |
|
|
|
|
|
os.makedirs(extraction_folder, exist_ok=True) |
|
|
|
|
|
if pth_file: |
|
|
pth_path = os.path.join(extraction_folder, os.path.basename(pth_file.name)) |
|
|
shutil.copyfile(pth_file.name, pth_path) |
|
|
|
|
|
if index_file: |
|
|
index_path = os.path.join(extraction_folder, os.path.basename(index_file.name)) |
|
|
shutil.copyfile(index_file.name, index_path) |
|
|
|
|
|
return t('model_uploaded', dir_name=dir_name) |
|
|
except Exception as e: |
|
|
raise gr.Error(t('model_load_error', error=str(e))) |
|
|
|
|
|
def delete_model_name(dir_name): |
|
|
model_dir = os.path.join(current_dir, dirs[0], dir_name) |
|
|
if os.path.exists(model_dir): |
|
|
try: |
|
|
if os.path.isdir(model_dir): |
|
|
shutil.rmtree(model_dir) |
|
|
return t('model_deleted', dir_name=dir_name) |
|
|
except Exception as e: |
|
|
raise gr.Error(t('model_delete_error', error=str(e))) |
|
|
else: |
|
|
return t('model_not_found', dir_name=dir_name) |
|
|
|
|
|
from vbach.cli.vbach import voice_conversion |
|
|
|
|
|
def process_audio( |
|
|
input_file: str = None, |
|
|
input_list: str = None, |
|
|
template: str = "NAME_MODEL_F0METHOD_PITCH", |
|
|
model_name: str = "", |
|
|
index_rate: float = 0, |
|
|
output_format: str = "wav", |
|
|
output_bitrate: int = 320, |
|
|
stereo_mode: str = "mono", |
|
|
method_pitch: str = "rmvpe+", |
|
|
pitch: float = 0, |
|
|
hop_length: int = 128, |
|
|
filter_radius: int = 3, |
|
|
rms: float = 0.25, |
|
|
protect: float = 0.33, |
|
|
f0_min: int = 50, |
|
|
f0_max: int = 1100 |
|
|
): |
|
|
|
|
|
keys = ["NAME", "PITCH", "F0_METHOD", "DATETIME", "MODEL"] |
|
|
|
|
|
if any(key in template for key in keys): |
|
|
pass |
|
|
else: |
|
|
template = "DATETIME_Vbach_F0METHOD_PITCH" |
|
|
|
|
|
if not isinstance(input_list, list) and not input_file: |
|
|
try: |
|
|
print(input_list) |
|
|
input_list = ast.literal_eval(input_list) |
|
|
except Exception as e: |
|
|
print(e) |
|
|
gr.Warning(t("error_strlist_is_not_list")) |
|
|
return None |
|
|
|
|
|
if input_file is not None: |
|
|
try: |
|
|
print(input_file) |
|
|
input_list = ast.literal_eval(input_file) |
|
|
gr.Warning(t("error_path_is_list")) |
|
|
return None |
|
|
except Exception as e: |
|
|
pass |
|
|
|
|
|
|
|
|
output_bitrate = f"{output_bitrate}k" |
|
|
if not input_file and not input_list: |
|
|
raise gr.Error(t("error_no_audio")) |
|
|
if not model_name: |
|
|
raise gr.Error(t("error_no_model")) |
|
|
if input_file is not None and isinstance(input_file, str) and input_list == None: |
|
|
if not os.path.exists(input_file): |
|
|
gr.Warning(t("warning_file_not_found", file=input_file)) |
|
|
return None |
|
|
|
|
|
file_name = os.path.basename(input_file) |
|
|
namefile = os.path.splitext(file_name)[0] |
|
|
time_create_file = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
output_name = template |
|
|
output_dir = tempfile.mkdtemp(prefix="converted_voice_") |
|
|
print(output_dir) |
|
|
output_name = ( |
|
|
template |
|
|
.replace("DATETIME", time_create_file) |
|
|
.replace("NAME", namefile) |
|
|
.replace("MODEL", model_name) |
|
|
.replace("F0METHOD", method_pitch) |
|
|
.replace("PITCH", f"{pitch}") |
|
|
) |
|
|
output_path = os.path.join(output_dir, f"{output_name}.{output_format}") |
|
|
try: |
|
|
output_path = voice_conversion( |
|
|
model_name, |
|
|
input_file, |
|
|
output_path, |
|
|
pitch, |
|
|
method_pitch, |
|
|
index_rate, |
|
|
filter_radius, |
|
|
rms, |
|
|
protect, |
|
|
hop_length, |
|
|
f0_min, |
|
|
f0_max, |
|
|
output_format, |
|
|
output_bitrate, |
|
|
stereo_mode |
|
|
) |
|
|
except Exception as e: |
|
|
print(e) |
|
|
finally: |
|
|
print(t("success_single")) |
|
|
return output_path |
|
|
|
|
|
if input_file is None and input_list is not None and isinstance(input_list, list): |
|
|
output_dir = tempfile.mkdtemp(prefix="converted_voice_") |
|
|
print(output_dir) |
|
|
output_paths = [] |
|
|
progress = gr.Progress() |
|
|
for i, file in enumerate(input_list): |
|
|
|
|
|
if not os.path.exists(file): |
|
|
gr.Warning(t("warning_file_not_found", file=file)) |
|
|
continue |
|
|
|
|
|
total_steps = len(input_list) |
|
|
file_name = os.path.basename(file) |
|
|
namefile = os.path.splitext(file_name)[0] |
|
|
time_create_file = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
progress( |
|
|
(i+1, total_steps), |
|
|
desc=t("processing", namefile=namefile), |
|
|
unit=t("files") |
|
|
) |
|
|
output_name = ( |
|
|
template |
|
|
.replace("DATETIME", time_create_file) |
|
|
.replace("NAME", namefile) |
|
|
.replace("MODEL", model_name) |
|
|
.replace("F0METHOD", method_pitch) |
|
|
.replace("PITCH", f"{pitch}") |
|
|
) |
|
|
output_path = os.path.join(output_dir, f"{output_name}.{output_format}") |
|
|
try: |
|
|
output_path = voice_conversion( |
|
|
model_name, |
|
|
file, |
|
|
output_path, |
|
|
pitch, |
|
|
method_pitch, |
|
|
index_rate, |
|
|
filter_radius, |
|
|
rms, |
|
|
protect, |
|
|
hop_length, |
|
|
f0_min, |
|
|
f0_max, |
|
|
output_format, |
|
|
output_bitrate, |
|
|
stereo_mode |
|
|
) |
|
|
except Exception as e: |
|
|
print(e) |
|
|
|
|
|
finally: |
|
|
output_paths.append(output_path) |
|
|
print(t("success_batch")) |
|
|
return output_paths |
|
|
|
|
|
def vbach_plugin_name(): |
|
|
return "VBach" |
|
|
|
|
|
def vbach_plugin(lang="ru"): |
|
|
set_language(lang) |
|
|
|
|
|
with gr.TabItem(t("inference")): |
|
|
with gr.Column(): |
|
|
with gr.Column(scale=3) as input_voice_group: |
|
|
with gr.Group() as single_voice_file: |
|
|
input_voice = gr.Audio(label=t("select_file"), interactive=True, type="filepath") |
|
|
batch_upload_btn = gr.Button(t("batch_upload")) |
|
|
with gr.Group(visible=False) as batch_voice_file: |
|
|
input_voices = gr.Files(type="filepath", interactive=True, show_label=False) |
|
|
single_upload_btn = gr.Button(t("single_upload")) |
|
|
input_voice_path = gr.Textbox(label=t("audio_path"), info=t("audio_path_info"), interactive=True) |
|
|
input_voice.upload(fn=(lambda x: gr.update(value=x)), inputs=input_voice, outputs=input_voice_path) |
|
|
input_voices.upload(fn=(lambda x: gr.update(value=str(x))), inputs=input_voices, outputs=input_voice_path) |
|
|
with gr.Column(): |
|
|
with gr.Row(equal_height=True): |
|
|
model_name = gr.Dropdown(label=t("model_name"), choices=[d for d in os.listdir(os.path.join(current_dir, dirs[0])) if os.path.isdir(os.path.join(os.path.join(current_dir, dirs[0]), d))], interactive=True, filterable=False, scale=6) |
|
|
model_update_btn = gr.Button(t("update_button"), variant="primary", scale=3, size="lg") |
|
|
model_update_btn.click(fn=(lambda : gr.update(choices=[d for d in os.listdir(os.path.join(current_dir, dirs[0])) if os.path.isdir(os.path.join(os.path.join(current_dir, dirs[0]), d))])), inputs=None, outputs=model_name) |
|
|
with gr.Row(): |
|
|
method_pitch = gr.Dropdown(label=t("pitch_method"), choices=["mangio-crepe", "rmvpe+", "fcpe"], value="rmvpe+", interactive=True, filterable=False) |
|
|
hop_length = gr.Slider(minimum=2, maximum=512, step=1, value=128, label=t("hop_length"), interactive=True, visible=False) |
|
|
with gr.Row(): |
|
|
pitch = gr.Slider(minimum=-48, maximum=48, step=12, value=0, label=t("pitch"), interactive=True) |
|
|
with gr.Row(): |
|
|
f0_min = gr.Slider(minimum=50, maximum=3500, step=1, value=50, label=t("f0_min"), interactive=True) |
|
|
f0_max = gr.Slider(minimum=500, maximum=3500, step=1, value=1100, label=t("f0_max"), interactive=True) |
|
|
|
|
|
with gr.Column(variant="panel"): |
|
|
with gr.Group(): |
|
|
with gr.Row(equal_height=True): |
|
|
with gr.Column(scale=3): |
|
|
stereo_mode = gr.Dropdown( |
|
|
label=t("audio_processing"), |
|
|
choices=list(t("stereo_modes").keys()), |
|
|
value="mono", |
|
|
interactive=True, |
|
|
filterable=False |
|
|
) |
|
|
output_format = gr.Dropdown(label=t("output_format"), choices=OUTPUT_FORMAT) |
|
|
output_bitrate = gr.Slider(32, 320, step=1, label=t("bitrate"), value=320, interactive=True) |
|
|
with gr.Column(scale=6) as single_output_group: |
|
|
converted_voice = gr.Audio(label=t("converted_voice"), type="filepath", interactive=False, show_download_button=True, elem_classes="fixed-height") |
|
|
with gr.Column(scale=6, visible=False) as batch_output_group: |
|
|
converted_voices = gr.Files(label=t("converted_voices"), type="filepath", interactive=False, height="100%", elem_classes="fixed-height") |
|
|
convert_btn = gr.Button(t("convert_single"), variant="primary", scale=3) |
|
|
convert_batch_btn = gr.Button(t("convert_batch"), variant="primary", visible=False, scale=3) |
|
|
|
|
|
|
|
|
with gr.Column(): |
|
|
with gr.Tab(t("name_format")): |
|
|
template_info = gr.Markdown(t("name_format_info"), line_breaks=True) |
|
|
template = gr.Text(label=t("name_format"), value="NAME_MODEL_F0METHOD_PITCH", interactive=True) |
|
|
|
|
|
with gr.Tab(t("advanced_settings")): |
|
|
with gr.Row(): |
|
|
with gr.Column(scale=3): |
|
|
filter_radius = gr.Slider(minimum=0, maximum=7, step=1, value=3, label=t("filter_radius"), interactive=True) |
|
|
index_rate = gr.Slider(minimum=0, maximum=1, step=0.01, value=0, label=t("index_rate"), interactive=True) |
|
|
rms = gr.Slider(minimum=0, maximum=1, step=0.01, value=0.25, label=t("rms"), interactive=True) |
|
|
protect = gr.Slider(minimum=0, maximum=0.5, step=0.01, value=0.33, label=t("protect"), interactive=True) |
|
|
|
|
|
|
|
|
with gr.TabItem(t("model_manager")): |
|
|
with gr.TabItem(t("download_url")): |
|
|
with gr.Row(): |
|
|
with gr.Column(variant="panel"): |
|
|
gr.HTML(f"<center><h3>{t('download_link')}</h3></center>") |
|
|
model_zip_link = gr.Text(label=t("download_link")) |
|
|
with gr.Group(): |
|
|
zip_model_name = gr.Text( |
|
|
label=t("model_name"), |
|
|
info=t("unique_name"), |
|
|
) |
|
|
download_btn = gr.Button(t("download_button"), variant="primary") |
|
|
|
|
|
gr.HTML( |
|
|
f"<h3>{t('supported_sites')}: " |
|
|
"<a href='https://huggingface.co/' target='_blank'>HuggingFace</a>, " |
|
|
"<a href='https://pixeldrain.com/' target='_blank'>Pixeldrain</a>, " |
|
|
"<a href='https://drive.google.com/' target='_blank'>Google Drive</a>, " |
|
|
"<a href='https://disk.yandex.ru/' target='_blank'>Яндекс Диск</a>" |
|
|
"</h3>" |
|
|
) |
|
|
|
|
|
dl_output_message = gr.Text(label=t("output_message"), interactive=False) |
|
|
download_btn.click( |
|
|
download_from_url, |
|
|
inputs=[model_zip_link, zip_model_name], |
|
|
outputs=dl_output_message, |
|
|
) |
|
|
|
|
|
with gr.Tab(t("download_zip")): |
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
zip_file = gr.File( |
|
|
label=t("zip_file"), file_types=[".zip"], file_count="single" |
|
|
) |
|
|
with gr.Column(variant="panel"): |
|
|
gr.HTML(t("upload_steps")) |
|
|
with gr.Group(): |
|
|
local_model_name = gr.Text( |
|
|
label=t("model_name"), |
|
|
info=t("unique_name"), |
|
|
) |
|
|
model_upload_button = gr.Button(t("download_button"), variant="primary") |
|
|
|
|
|
local_upload_output_message = gr.Text(label=t("output_message"), interactive=False) |
|
|
model_upload_button.click( |
|
|
upload_zip_file, |
|
|
inputs=[zip_file, local_model_name], |
|
|
outputs=local_upload_output_message, |
|
|
) |
|
|
|
|
|
with gr.TabItem(t("download_files")): |
|
|
with gr.Group(): |
|
|
with gr.Row(): |
|
|
pth_file = gr.File( |
|
|
label=t("pth_file"), file_types=[".pth"], file_count="single" |
|
|
) |
|
|
index_file = gr.File( |
|
|
label=t("index_file"), file_types=[".index"], file_count="single" |
|
|
) |
|
|
with gr.Column(variant="panel"): |
|
|
with gr.Group(): |
|
|
separate_model_name = gr.Text( |
|
|
label=t("model_name"), |
|
|
info=t("unique_name"), |
|
|
) |
|
|
separate_upload_button = gr.Button(t("download_button"), variant="primary") |
|
|
|
|
|
separate_upload_output_message = gr.Text( |
|
|
label=t("output_message"), interactive=False |
|
|
) |
|
|
separate_upload_button.click( |
|
|
upload_separate_files, |
|
|
inputs=[pth_file, index_file, separate_model_name], |
|
|
outputs=separate_upload_output_message, |
|
|
) |
|
|
|
|
|
with gr.TabItem(t("delete_model")): |
|
|
with gr.Column(variant="panel"): |
|
|
with gr.Group(): |
|
|
delete_voicemodel_name = gr.Dropdown( |
|
|
label=t("model_name"), |
|
|
info=t("delete_info"), |
|
|
choices=[d for d in os.listdir(os.path.join(current_dir, dirs[0])) if os.path.isdir(os.path.join(os.path.join(current_dir, dirs[0]), d))], |
|
|
interactive=True, |
|
|
filterable=False |
|
|
) |
|
|
refresh_delete_btn = gr.Button(t("refresh_button")) |
|
|
refresh_delete_btn.click(fn=(lambda : gr.update(choices=[d for d in os.listdir(os.path.join(current_dir, dirs[0])) if os.path.isdir(os.path.join(os.path.join(current_dir, dirs[0]), d))])), inputs=None, outputs=delete_voicemodel_name) |
|
|
delete_model_output_message = gr.Text( |
|
|
label=t("output_message"), interactive=False |
|
|
) |
|
|
delete_model_btn = gr.Button(t("delete_button")) |
|
|
delete_model_btn.click( |
|
|
fn=delete_model_name, |
|
|
inputs=delete_voicemodel_name, |
|
|
outputs=delete_model_output_message |
|
|
) |
|
|
|
|
|
|
|
|
method_pitch.change(fn=lambda x: gr.update(visible=True if x == "mangio-crepe" else False), inputs=method_pitch, outputs=hop_length) |
|
|
batch_upload_btn.click(fn=(lambda : (gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), gr.update(visible=True))), inputs=None, outputs=[single_voice_file, batch_voice_file, single_output_group, batch_output_group, convert_btn, convert_batch_btn]) |
|
|
single_upload_btn.click(fn=(lambda : (gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), gr.update(visible=True))), inputs=None, outputs=[batch_voice_file, single_voice_file, batch_output_group, single_output_group, convert_batch_btn, convert_btn]) |
|
|
convert_btn.click(fn=process_audio, inputs=[input_voice_path, gr.State(None), template, model_name, index_rate, output_format, output_bitrate, stereo_mode, method_pitch, pitch, hop_length, filter_radius, rms, protect, f0_min, f0_max], outputs=converted_voice) |
|
|
convert_batch_btn.click(fn=process_audio, inputs=[gr.State(None), input_voice_path, template, model_name, index_rate, output_format, output_bitrate, stereo_mode, method_pitch, pitch, hop_length, filter_radius, rms, protect, f0_min, f0_max], outputs=converted_voices) |