| import os
|
| import gc
|
| from gradio_helper import GradioHelper, tz, dw_file, easy_check_is_colab, str2bool, all_ids, set_device, zerogpu_available, hf_spaces_gpu
|
| import torch
|
| import ast
|
| from torch import nn
|
| import torch.nn.functional as F
|
| import torchcrepe
|
| import faiss
|
| import librosa
|
| import math
|
| import numpy as np
|
| from scipy import signal
|
| import argparse
|
| from functools import lru_cache
|
| import pyworld
|
| import parselmouth
|
| import string
|
| from transformers import HubertModel
|
| from typing import Tuple, Any, Dict, List, Optional, Union, Callable
|
| import sys
|
| import json
|
| import yaml
|
| import shutil
|
| from tqdm import tqdm
|
| import urllib.request
|
| import gdown
|
| import requests
|
| import zipfile
|
| import tempfile
|
| import secrets
|
| import gradio as gr
|
| import subprocess
|
| from datetime import datetime, timezone, timedelta
|
| from functools import wraps
|
| from pathlib import Path
|
|
|
| from separator import get_files_from_list
|
| from audio import check, read, write, output_formats, split_mid_side, split_channels, easy_resampler, stereo_to_mono, mono_to_stereo, convert_to_dtype, gain, add_zero_to_end, multi_channel_array_from_arrays, trim, fit_arrays
|
| from namer import Namer
|
| from i18n import _i18n, CURRENT_LANGUAGE, set_language
|
|
|
| script_dir: str = os.path.dirname(os.path.abspath(__file__))
|
|
|
| FILTER_ORDER: int = 5
|
| CUTOFF_FREQUENCY: int = 48
|
| SAMPLE_RATE: int = 16000
|
| bh, ah = signal.butter(
|
| N=FILTER_ORDER, Wn=CUTOFF_FREQUENCY, btype="high", fs=SAMPLE_RATE
|
| )
|
|
|
| from multiprocessing import cpu_count
|
| from vbach_lib.fairseq import load_model_ensemble_and_task, load_checkpoint_to_cpu
|
| from vbach_lib.algorithm.synthesizers import Synthesizer
|
| from vbach_lib.predictors.FCPE import FCPEF0Predictor
|
| from vbach_lib.predictors.RMVPE import RMVPE0Predictor
|
| from vbach_lib.predictors.HPA_RMVPE import HPA_RMVPE
|
|
|
| VBACH_ALT_PIPELINE_TIME_CHUNK: int = int(os.environ.get("VBACH_ALTPL_BASE_SEG", "10"))
|
|
|
|
|
| def format_end_count_models(count: int) -> str:
|
| """
|
| Форматирование окончания для слова "модель" в зависимости от числа
|
|
|
| Args:
|
| count: Количество моделей
|
|
|
| Returns:
|
| Окончание слова
|
| """
|
| if CURRENT_LANGUAGE == "ru":
|
| if count % 10 == 1 and count % 100 != 11:
|
| return "ь"
|
| elif 2 <= count % 10 <= 4 and (count % 100 < 10 or count % 100 >= 20):
|
| return "и"
|
| else:
|
| return "ей"
|
| else:
|
| return "s" if count != 1 else ""
|
|
|
|
|
| class UserDirectory:
|
| """Класс для управления пользовательской директорией"""
|
|
|
| def __init__(self) -> None:
|
| self.path: str = ""
|
|
|
| def change_dir(self, directory: str) -> None:
|
| """
|
| Изменить пользовательскую директорию
|
|
|
| Args:
|
| directory: Путь к директории
|
| """
|
| self.path = directory
|
| os.makedirs(directory, exist_ok=True)
|
|
|
|
|
| user_directory: UserDirectory = UserDirectory()
|
| IS_COLAB: bool = easy_check_is_colab()
|
|
|
| if IS_COLAB:
|
| print(_i18n("msg_colab_detected"))
|
| result = subprocess.run(['/bin/mount'], capture_output=True, text=True)
|
|
|
| for line in result.stdout.strip().split('\n'):
|
| if 'type fuse.drive' in line:
|
| parts = line.split(' type ')
|
| if len(parts) >= 2:
|
| source_mount = parts[0]
|
| source, mount_point = source_mount.split(' on ')
|
| user_directory.change_dir(os.path.join(mount_point, "MyDrive", "mvsepless-data-gdrive"))
|
| os.makedirs(user_directory.path, exist_ok=True)
|
| print(_i18n("msg_gdrive_mounted", path=mount_point))
|
| break
|
|
|
|
|
| def generate_secure_random(length: int = 10) -> str:
|
| """
|
| Генерация безопасной случайной строки
|
|
|
| Args:
|
| length: Длина строки
|
|
|
| Returns:
|
| Случайная строка
|
| """
|
| characters: str = string.ascii_letters + string.digits
|
| return "".join(secrets.choice(characters) for _c in range(length))
|
|
|
|
|
| class VbachModelManager:
|
| """Менеджер моделей Vbach"""
|
|
|
| def __init__(self, user_directory: UserDirectory) -> None:
|
| """
|
| Инициализация менеджера моделей
|
|
|
| Args:
|
| user_directory: Пользовательская директория
|
| """
|
| self.user_directory: UserDirectory = user_directory
|
| self.rmvpe_path: str = os.path.join(script_dir, "vbach_lib", "predictors", "rmvpe.pt")
|
| self.hpa_rmvpe_path: str = os.path.join(script_dir, "vbach_lib", "predictors", "hpa_rmvpe.pt")
|
| self.fcpe_path: str = os.path.join(script_dir, "vbach_lib", "predictors", "fcpe.pt")
|
|
|
| self.custom_fairseq_huberts_dir: str = os.path.join(
|
| script_dir, "vbach_lib", "huberts", "fairseq"
|
| )
|
| self.custom_transformers_huberts_dir: str = os.path.join(
|
| script_dir, "vbach_lib", "huberts", "transformers"
|
| )
|
|
|
| self.huberts_fairseq_dict: Dict[str, Dict[str, str]] = {
|
| "hubert_base": {
|
| "url": "https://huggingface.co/noblebarkrr/vbach_resources/resolve/main/fairseq/hubert_base.pt?download=true",
|
| "local_path": os.path.join(
|
| self.custom_fairseq_huberts_dir, "hubert_base.pt"
|
| ),
|
| },
|
| "contentvec_base": {
|
| "url": "https://huggingface.co/noblebarkrr/vbach_resources/resolve/main/fairseq/contentvec_base.pt?download=true",
|
| "local_path": os.path.join(
|
| self.custom_fairseq_huberts_dir, "contentvec_base.pt"
|
| ),
|
| },
|
| "korean_hubert_base": {
|
| "url": "https://huggingface.co/noblebarkrr/vbach_resources/resolve/main/fairseq/korean_hubert_base.pt?download=true",
|
| "local_path": os.path.join(
|
| self.custom_fairseq_huberts_dir, "korean_hubert_base.pt"
|
| ),
|
| },
|
| "chinese_hubert_base": {
|
| "url": "https://huggingface.co/noblebarkrr/vbach_resources/resolve/main/fairseq/chinese_hubert_base.pt?download=true",
|
| "local_path": os.path.join(
|
| self.custom_fairseq_huberts_dir, "chinese_hubert_base.pt"
|
| ),
|
| },
|
| "portuguese_hubert_base": {
|
| "url": "https://huggingface.co/noblebarkrr/vbach_resources/resolve/main/fairseq/portuguese_hubert_base.pt?download=true",
|
| "local_path": os.path.join(
|
| self.custom_fairseq_huberts_dir, "portuguese_hubert_base.pt"
|
| ),
|
| },
|
| "japanese_hubert_base": {
|
| "url": "https://huggingface.co/noblebarkrr/vbach_resources/resolve/main/fairseq/japanese_hubert_base.pt?download=true",
|
| "local_path": os.path.join(
|
| self.custom_fairseq_huberts_dir, "japanese_hubert_base.pt"
|
| ),
|
| },
|
| }
|
|
|
| self.huberts_transformers_dict: Dict[str, Dict[str, str]] = {
|
| "contentvec": {
|
| "base_dir": os.path.join(
|
| self.custom_transformers_huberts_dir, "contentvec"
|
| ),
|
| "url_bin": "https://huggingface.co/noblebarkrr/vbach_resources/resolve/main/transformers/contentvec/pytorch_model.bin?download=true",
|
| "url_json": "https://huggingface.co/noblebarkrr/vbach_resources/resolve/main/transformers/contentvec/config.json?download=true",
|
| "local_bin": os.path.join(
|
| self.custom_transformers_huberts_dir,
|
| "contentvec",
|
| "pytorch_model.bin",
|
| ),
|
| "local_json": os.path.join(
|
| self.custom_transformers_huberts_dir, "contentvec", "config.json"
|
| ),
|
| },
|
| "spin": {
|
| "base_dir": os.path.join(self.custom_transformers_huberts_dir, "spin"),
|
| "url_bin": "https://huggingface.co/noblebarkrr/vbach_resources/resolve/main/transformers/spin/pytorch_model.bin?download=true",
|
| "url_json": "https://huggingface.co/noblebarkrr/vbach_resources/resolve/main/transformers/spin/config.json?download=true",
|
| "local_bin": os.path.join(
|
| self.custom_transformers_huberts_dir, "spin", "pytorch_model.bin"
|
| ),
|
| "local_json": os.path.join(
|
| self.custom_transformers_huberts_dir, "spin", "config.json"
|
| ),
|
| },
|
| "spin-v2": {
|
| "base_dir": os.path.join(
|
| self.custom_transformers_huberts_dir, "spinv2"
|
| ),
|
| "url_bin": "https://huggingface.co/noblebarkrr/vbach_resources/resolve/main/transformers/spinv2/pytorch_model.bin?download=true",
|
| "url_json": "https://huggingface.co/noblebarkrr/vbach_resources/resolve/main/transformers/spinv2/config.json?download=true",
|
| "local_bin": os.path.join(
|
| self.custom_transformers_huberts_dir, "spinv2", "pytorch_model.bin"
|
| ),
|
| "local_json": os.path.join(
|
| self.custom_transformers_huberts_dir, "spinv2", "config.json"
|
| ),
|
| },
|
| "chinese-hubert-base": {
|
| "base_dir": os.path.join(
|
| self.custom_transformers_huberts_dir, "chinese_hubert_base"
|
| ),
|
| "url_bin": "https://huggingface.co/noblebarkrr/vbach_resources/resolve/main/transformers/chinese_hubert_base/pytorch_model.bin?download=true",
|
| "url_json": "https://huggingface.co/noblebarkrr/vbach_resources/resolve/main/transformers/chinese_hubert_base/config.json?download=true",
|
| "local_bin": os.path.join(
|
| self.custom_transformers_huberts_dir,
|
| "chinese_hubert_base",
|
| "pytorch_model.bin",
|
| ),
|
| "local_json": os.path.join(
|
| self.custom_transformers_huberts_dir,
|
| "chinese_hubert_base",
|
| "config.json",
|
| ),
|
| },
|
| "japanese-hubert-base": {
|
| "base_dir": os.path.join(
|
| self.custom_transformers_huberts_dir, "japanese_hubert_base"
|
| ),
|
| "url_bin": "https://huggingface.co/noblebarkrr/vbach_resources/resolve/main/transformers/japanese_hubert_base/pytorch_model.bin?download=true",
|
| "url_json": "https://huggingface.co/noblebarkrr/vbach_resources/resolve/main/transformers/japanese_hubert_base/config.json?download=true",
|
| "local_bin": os.path.join(
|
| self.custom_transformers_huberts_dir,
|
| "japanese_hubert_base",
|
| "pytorch_model.bin",
|
| ),
|
| "local_json": os.path.join(
|
| self.custom_transformers_huberts_dir,
|
| "japanese_hubert_base",
|
| "config.json",
|
| ),
|
| },
|
| "korean-hubert-base": {
|
| "base_dir": os.path.join(
|
| self.custom_transformers_huberts_dir, "korean_hubert_base"
|
| ),
|
| "url_bin": "https://huggingface.co/noblebarkrr/vbach_resources/resolve/main/transformers/korean_hubert_base/pytorch_model.bin?download=true",
|
| "url_json": "https://huggingface.co/noblebarkrr/vbach_resources/resolve/main/transformers/korean_hubert_base/config.json?download=true",
|
| "local_bin": os.path.join(
|
| self.custom_transformers_huberts_dir,
|
| "korean_hubert_base",
|
| "pytorch_model.bin",
|
| ),
|
| "local_json": os.path.join(
|
| self.custom_transformers_huberts_dir,
|
| "korean_hubert_base",
|
| "config.json",
|
| ),
|
| },
|
| }
|
|
|
| self.requirements: List[List[str]] = [
|
| [
|
| "https://huggingface.co/noblebarkrr/vbach_resources/resolve/main/predictors/rmvpe.pt?download=true",
|
| self.rmvpe_path,
|
| ],
|
| [
|
| "https://huggingface.co/noblebarkrr/vbach_resources/resolve/main/predictors/hpa_rmvpe.pt?download=true",
|
| self.hpa_rmvpe_path,
|
| ],
|
| [
|
| "https://huggingface.co/noblebarkrr/vbach_resources/resolve/main/predictors/fcpe.pt?download=true",
|
| self.fcpe_path,
|
| ],
|
| ]
|
|
|
| self.voicemodels_dir: str = os.path.join(user_directory.path, "vbach_models_cache")
|
| os.makedirs(self.voicemodels_dir, exist_ok=True)
|
|
|
| self.voicemodels_info: str = os.path.join(self.voicemodels_dir, "vbach_models.json")
|
| self.voicemodels: Dict[str, Dict[str, Optional[str]]] = {}
|
|
|
| self.download_requirements()
|
| self.check_hubert("hubert_base")
|
| self.check_and_load()
|
|
|
| def check_hubert(self, embedder_name: str) -> Optional[str]:
|
| """
|
| Проверить наличие Hubert модели и скачать при необходимости
|
|
|
| Args:
|
| embedder_name: Имя эмбеддера
|
|
|
| Returns:
|
| Путь к модели или None
|
| """
|
| if embedder_name in self.huberts_fairseq_dict:
|
| if not os.path.exists(
|
| self.huberts_fairseq_dict[embedder_name]["local_path"]
|
| ):
|
| dw_file(
|
| self.huberts_fairseq_dict[embedder_name]["url"],
|
| self.huberts_fairseq_dict[embedder_name]["local_path"],
|
| )
|
| return self.huberts_fairseq_dict[embedder_name]["local_path"]
|
| else:
|
| return None
|
|
|
| def check_hubert_transformers(self, embedder_name: str) -> Optional[str]:
|
| """
|
| Проверить наличие Hubert модели transformers и скачать при необходимости
|
|
|
| Args:
|
| embedder_name: Имя эмбеддера
|
|
|
| Returns:
|
| Путь к директории модели или None
|
| """
|
| if embedder_name in self.huberts_transformers_dict:
|
| os.makedirs(
|
| self.huberts_transformers_dict[embedder_name]["base_dir"], exist_ok=True
|
| )
|
| if not os.path.exists(
|
| self.huberts_transformers_dict[embedder_name]["local_bin"]
|
| ) and not os.path.exists(
|
| self.huberts_transformers_dict[embedder_name]["local_json"]
|
| ):
|
| dw_file(
|
| self.huberts_transformers_dict[embedder_name]["url_bin"],
|
| self.huberts_transformers_dict[embedder_name]["local_bin"],
|
| )
|
| dw_file(
|
| self.huberts_transformers_dict[embedder_name]["url_json"],
|
| self.huberts_transformers_dict[embedder_name]["local_json"],
|
| )
|
| return self.huberts_transformers_dict[embedder_name]["base_dir"]
|
| else:
|
| return None
|
|
|
| def write_voicemodels_info(self) -> None:
|
| """Записать информацию о голосовых моделях в файл"""
|
| with open(self.voicemodels_info, "w", encoding='utf-8') as f:
|
| json.dump(self.voicemodels, f, indent=4, ensure_ascii=False)
|
|
|
| def load_voicemodels_info(self) -> Dict[str, Dict[str, Optional[str]]]:
|
| """
|
| Загрузить информацию о голосовых моделях из файла
|
|
|
| Returns:
|
| Словарь с информацией о моделях
|
| """
|
| with open(self.voicemodels_info, "r", encoding='utf-8') as f:
|
| return json.load(f)
|
|
|
| def add_voice_model(
|
| self,
|
| name: str,
|
| pth_path: Optional[str],
|
| index_path: Optional[str],
|
| ) -> None:
|
| """
|
| Добавить голосовую модель
|
|
|
| Args:
|
| name: Имя модели
|
| pth_path: Путь к PTH файлу
|
| index_path: Путь к индексному файлу
|
| """
|
| self.voicemodels[name] = {"pth": pth_path, "index": index_path}
|
| self.write_voicemodels_info()
|
|
|
| def del_voice_model(self, name: str) -> str:
|
| """
|
| Удалить голосовую модель
|
|
|
| Args:
|
| name: Имя модели
|
|
|
| Returns:
|
| Сообщение о результате
|
| """
|
| if name in self.parse_voice_models():
|
| pth: Optional[str] = self.voicemodels[name].get("pth", None)
|
| index: Optional[str] = self.voicemodels[name].get("index", None)
|
|
|
| if index and os.path.exists(index):
|
| os.remove(index)
|
| if pth and os.path.exists(pth):
|
| os.remove(pth)
|
|
|
| del self.voicemodels[name]
|
| self.write_voicemodels_info()
|
| return _i18n("model_deleted", model=name)
|
| else:
|
| return _i18n("model_not_found", model=name)
|
|
|
| def parse_voice_models(self) -> List[str]:
|
| """
|
| Получить список голосовых моделей
|
|
|
| Returns:
|
| Список имен моделей
|
| """
|
| return list(self.voicemodels.keys())
|
|
|
| def parse_pth_and_index(self, name: str) -> Tuple[Optional[str], Optional[str]]:
|
| """
|
| Получить пути к PTH и индексному файлу модели
|
|
|
| Args:
|
| name: Имя модели
|
|
|
| Returns:
|
| Кортеж (путь к PTH, путь к индексу)
|
| """
|
| pth: Optional[str] = self.voicemodels[name].get("pth", None)
|
| index: Optional[str] = self.voicemodels[name].get("index", None)
|
| return pth, index
|
|
|
| def check_and_load(self) -> None:
|
| """Проверить и загрузить информацию о моделях"""
|
| if os.path.exists(self.voicemodels_info):
|
| self.voicemodels = self.load_voicemodels_info()
|
| else:
|
| self.write_voicemodels_info()
|
|
|
| def clear_voicemodels_info(self) -> None:
|
| """Очистить информацию о голосовых моделях"""
|
| self.voicemodels = {}
|
| self.write_voicemodels_info()
|
|
|
| def download_requirements(self) -> None:
|
| """Скачать необходимые компоненты"""
|
| for url, file in self.requirements:
|
| if not os.path.exists(file):
|
| dw_file(url, file)
|
|
|
| def download_voice_model_file(self, url: str, zip_name: str) -> None:
|
| """
|
| Скачать файл голосовой модели
|
|
|
| Args:
|
| url: URL для скачивания
|
| zip_name: Имя ZIP файла
|
| """
|
| try:
|
| if "drive.google.com" in url:
|
| self.download_from_google_drive(url, zip_name)
|
| elif "pixeldrain.com" in url:
|
| self.download_from_pixeldrain(url, zip_name)
|
| elif "disk.yandex.ru" in url or "yadi.sk" in url:
|
| self.download_from_yandex(url, zip_name)
|
| else:
|
| dw_file(url, zip_name)
|
| except Exception as e:
|
| print(f"{_i18n('download_error')}: {e}")
|
|
|
| def download_from_google_drive(self, url: str, zip_name: str) -> None:
|
| """
|
| Скачать с Google Drive
|
|
|
| Args:
|
| url: URL файла
|
| zip_name: Имя для сохранения
|
| """
|
| file_id: str = (
|
| url.split("file/d/")[1].split("/")[0]
|
| if "file/d/" in url
|
| else url.split("id=")[1].split("&")[0]
|
| )
|
| gdown.download(id=file_id, output=str(zip_name), quiet=False)
|
|
|
| def download_from_pixeldrain(self, url: str, zip_name: str) -> None:
|
| """
|
| Скачать с Pixeldrain
|
|
|
| Args:
|
| url: URL файла
|
| zip_name: Имя для сохранения
|
| """
|
| file_id: str = url.split("pixeldrain.com/u/")[1]
|
| response = requests.get(f"https://pixeldrain.com/api/file/{file_id}")
|
| with open(zip_name, "wb") as f:
|
| f.write(response.content)
|
|
|
| def download_from_yandex(self, url: str, zip_name: str) -> None:
|
| """
|
| Скачать с Yandex Disk
|
|
|
| Args:
|
| url: URL файла
|
| zip_name: Имя для сохранения
|
| """
|
| yandex_public_key: str = f"download?public_key={url}"
|
| yandex_api_url: str = (
|
| f"https://cloud-api.yandex.net/v1/disk/public/resources/{yandex_public_key}"
|
| )
|
| response = requests.get(yandex_api_url)
|
| if response.status_code == 200:
|
| download_link: str = response.json().get("href", "")
|
| urllib.request.urlretrieve(download_link, zip_name)
|
| else:
|
| print(f"{_i18n('yandex_error')}: {response.status_code}")
|
|
|
| def extract_zip(self, zip_name: str, model_name: str) -> str:
|
| """
|
| Распаковать ZIP архив с моделью
|
|
|
| Args:
|
| zip_name: Путь к ZIP файлу
|
| model_name: Имя модели
|
|
|
| Returns:
|
| Сообщение о результате
|
| """
|
| model_dir: str = os.path.join(
|
| self.voicemodels_dir, f"{model_name}_{generate_secure_random(17)}"
|
| )
|
| os.makedirs(model_dir, exist_ok=True)
|
|
|
| try:
|
| with zipfile.ZipFile(zip_name, "r") as zip_ref:
|
| zip_ref.extractall(model_dir)
|
| os.remove(zip_name)
|
|
|
| added_voice_models: List[str] = []
|
|
|
| index_filepath: Optional[str] = None
|
| model_filepaths: List[str] = []
|
|
|
| for root, _c, files in os.walk(model_dir):
|
| for name in files:
|
| file_path: str = os.path.join(root, name)
|
| if (
|
| name.endswith(".index")
|
| and os.stat(file_path).st_size > 1024 * 100
|
| ):
|
| index_filepath = file_path
|
| if (
|
| name.endswith(".pth")
|
| and os.stat(file_path).st_size > 1024 * 1024 * 20
|
| ):
|
| model_filepaths.append(file_path)
|
|
|
| if len(model_filepaths) == 1:
|
| self.add_voice_model(model_name, model_filepaths[0], index_filepath)
|
| added_voice_models.append(model_name)
|
| else:
|
| for i, pth in enumerate(model_filepaths):
|
| self.add_voice_model(f"{model_name}_{i + 1}", pth, index_filepath)
|
| added_voice_models.append(f"{model_name}_{i + 1}")
|
|
|
| list_models_str: str = "\n".join(added_voice_models)
|
| return f"{_i18n('models_added')}:\n{list_models_str}"
|
|
|
| except Exception as e:
|
| return f"{_i18n('model_load_error')}: {e}"
|
|
|
| def install_model_zip(self, zip_source: str, model_name: str, mode: str = "url") -> str:
|
| """
|
| Установить модель из ZIP архива
|
|
|
| Args:
|
| zip_source: Путь к ZIP или URL
|
| model_name: Имя модели
|
| mode: Режим ("url" или "local")
|
|
|
| Returns:
|
| Сообщение о результате
|
| """
|
| if model_name in self.parse_voice_models():
|
| print(_i18n("model_overwrite_warning"))
|
|
|
| if mode == "url":
|
| with tempfile.TemporaryDirectory(
|
| prefix="vbach_temp_model", ignore_cleanup_errors=True
|
| ) as tmp:
|
| zip_path: str = os.path.join(tmp, "model.zip")
|
| self.download_voice_model_file(zip_source, zip_path)
|
| status: str = self.extract_zip(zip_path, model_name)
|
| elif mode == "local":
|
| status = self.extract_zip(zip_source, model_name)
|
| else:
|
| status = _i18n("invalid_mode")
|
|
|
| return status
|
|
|
| def install_model_files(
|
| self,
|
| index: Optional[str],
|
| pth: Optional[str],
|
| model_name: str,
|
| mode: str = "url"
|
| ) -> str:
|
| """
|
| Установить модель из отдельных файлов
|
|
|
| Args:
|
| index: Путь к индексному файлу или URL
|
| pth: Путь к PTH файлу или URL
|
| model_name: Имя модели
|
| mode: Режим ("url" или "local")
|
|
|
| Returns:
|
| Сообщение о результате
|
| """
|
| if model_name in self.parse_voice_models():
|
| print(_i18n("model_overwrite_warning"))
|
|
|
| model_dir: str = os.path.join(
|
| self.voicemodels_dir, f"{model_name}_{generate_secure_random(17)}"
|
| )
|
| os.makedirs(model_dir, exist_ok=True)
|
|
|
| local_index_path: Optional[str] = None
|
| local_pth_path: Optional[str] = None
|
|
|
| try:
|
| if mode == "url":
|
| if index:
|
| local_index_path = os.path.join(model_dir, "model.index")
|
| self.download_voice_model_file(index, local_index_path)
|
| if pth:
|
| local_pth_path = os.path.join(model_dir, "model.pth")
|
| self.download_voice_model_file(pth, local_pth_path)
|
|
|
| elif mode == "local":
|
| if index and os.path.exists(index):
|
| local_index_path = os.path.join(
|
| model_dir, os.path.basename(index)
|
| )
|
| shutil.copy(index, local_index_path)
|
| if pth and os.path.exists(pth):
|
| local_pth_path = os.path.join(model_dir, os.path.basename(pth))
|
| shutil.copy(pth, local_pth_path)
|
| else:
|
| return _i18n("invalid_mode")
|
|
|
| self.add_voice_model(model_name, local_pth_path, local_index_path)
|
| return _i18n("model_added", model=model_name)
|
|
|
| except Exception as e:
|
| return f"{_i18n('model_load_error')}: {e}"
|
|
|
| def get_list_installed_models(self) -> None:
|
| """
|
| Вывести список установленных моделей в стиле separator.py
|
| """
|
| models: List[str] = self.parse_voice_models()
|
|
|
| if not models:
|
| print(_i18n("no_models_installed"))
|
| return
|
|
|
| f_key: str = _i18n("model_name")
|
| s_key: str = _i18n("model_files")
|
|
|
|
|
| name_width = max(len(f_key), max(len(model) for model in models)) + 2
|
| files_width = 60
|
|
|
| print("|-", "-" * name_width, "-+-", "-" * files_width, "-|", sep="")
|
| print(f"| {f_key:<{name_width}} | {s_key:<{files_width}} |")
|
| print("|-", "-" * name_width, "-+-", "-" * files_width, "-|", sep="")
|
|
|
| for model in models:
|
| pth, index = self.parse_pth_and_index(model)
|
|
|
| files_info = []
|
| if pth:
|
| pth_size = os.path.getsize(pth) if os.path.exists(pth) else 0
|
| pth_size_mb = pth_size / (1024 * 1024)
|
| files_info.append(f"PTH: {pth_size_mb:.1f} MB")
|
| else:
|
| files_info.append("PTH: None")
|
|
|
| if index and os.path.exists(index):
|
| idx_size = os.path.getsize(index)
|
| idx_size_mb = idx_size / (1024 * 1024)
|
| files_info.append(f"INDEX: {idx_size_mb:.1f} MB")
|
| else:
|
| files_info.append("INDEX: None")
|
|
|
| files_str = " | ".join(files_info)
|
|
|
|
|
| if len(files_str) > files_width:
|
| files_str = files_str[:files_width-3] + "..."
|
|
|
| print(f"| {model:<{name_width}} | {files_str:<{files_width}} |")
|
| print("|-", "-" * name_width, "-+-", "-" * files_width, "-|", sep="")
|
|
|
| print(_i18n("installed_models_count", count=len(models), end=format_end_count_models(len(models))))
|
|
|
|
|
| model_manager: VbachModelManager = VbachModelManager(user_directory)
|
| namer: Namer = Namer()
|
|
|
| f0_methods: Tuple[str, ...] = (
|
| "rmvpe+",
|
| "hpa-rmvpe",
|
| "fcpe",
|
| "mangio-crepe",
|
| "mangio-crepe-tiny",
|
| "harvest",
|
| "pm",
|
| "pyin",
|
| )
|
|
|
| HPA_RMVPE_DIR: str = model_manager.hpa_rmvpe_path
|
| RMVPE_DIR: str = model_manager.rmvpe_path
|
| FCPE_DIR: str = model_manager.fcpe_path
|
|
|
| input_audio_path2wav: Dict[str, np.ndarray] = {}
|
|
|
|
|
| class HubertModelWithFinalProj(HubertModel):
|
| """Hubert модель с финальной проекцией"""
|
|
|
| def __init__(self, config):
|
| super().__init__(config)
|
| self.final_proj = nn.Linear(config.hidden_size, config.classifier_proj_size)
|
|
|
|
|
| @lru_cache(maxsize=128)
|
| def get_harvest_f0(
|
| input_audio_path: str,
|
| fs: int,
|
| f0max: float,
|
| f0min: float,
|
| frame_period: float
|
| ) -> np.ndarray:
|
| """
|
| Получить F0 с помощью Harvest
|
|
|
| Args:
|
| input_audio_path: Путь к аудиофайлу
|
| fs: Частота дискретизации
|
| f0max: Максимальная частота F0
|
| f0min: Минимальная частота F0
|
| frame_period: Период кадра
|
|
|
| Returns:
|
| Массив F0
|
| """
|
| audio: np.ndarray = input_audio_path2wav[input_audio_path]
|
| f0, t = pyworld.harvest(
|
| audio,
|
| fs=fs,
|
| f0_ceil=f0max,
|
| f0_floor=f0min,
|
| frame_period=frame_period,
|
| )
|
| f0 = pyworld.stonemask(audio, f0, t, fs)
|
| return f0
|
|
|
|
|
| class AudioProcessor:
|
| """Класс для обработки аудио"""
|
|
|
| @staticmethod
|
| def change_rms(
|
| sourceaudio: np.ndarray,
|
| source_rate: int,
|
| targetaudio: np.ndarray,
|
| target_rate: int,
|
| rate: float
|
| ) -> np.ndarray:
|
| """
|
| Изменить RMS (громкость) аудио
|
|
|
| Args:
|
| sourceaudio: Исходное аудио
|
| source_rate: Частота исходного аудио
|
| targetaudio: Целевое аудио
|
| target_rate: Частота целевого аудио
|
| rate: Коэффициент изменения
|
|
|
| Returns:
|
| Измененное аудио
|
| """
|
| rms1 = librosa.feature.rms(
|
| y=sourceaudio,
|
| frame_length=source_rate // 2 * 2,
|
| hop_length=source_rate // 2,
|
| )
|
| rms2 = librosa.feature.rms(
|
| y=targetaudio,
|
| frame_length=target_rate // 2 * 2,
|
| hop_length=target_rate // 2,
|
| )
|
|
|
| rms1 = F.interpolate(
|
| torch.from_numpy(rms1).float().unsqueeze(0),
|
| size=targetaudio.shape[0],
|
| mode="linear",
|
| ).squeeze()
|
| rms2 = F.interpolate(
|
| torch.from_numpy(rms2).float().unsqueeze(0),
|
| size=targetaudio.shape[0],
|
| mode="linear",
|
| ).squeeze()
|
| rms2 = torch.maximum(rms2, torch.zeros_like(rms2) + 1e-6)
|
|
|
| adjustedaudio: np.ndarray = (
|
| targetaudio
|
| * (torch.pow(rms1, 1 - rate) * torch.pow(rms2, rate - 1)).numpy()
|
| )
|
| return adjustedaudio
|
|
|
|
|
| class VC:
|
| """Класс для голосового преобразования"""
|
|
|
| def __init__(self, tgt_sr: int, config: Any, stack: str = "fairseq") -> None:
|
| """
|
| Инициализация VC
|
|
|
| Args:
|
| tgt_sr: Целевая частота дискретизации
|
| config: Конфигурация
|
| stack: Стек ("fairseq" или "transformers")
|
| """
|
| self.x_pad: int = config.x_pad
|
| self.x_query: int = config.x_query
|
| self.x_center: int = config.x_center
|
| self.x_max: int = config.x_max
|
| self.is_half: bool = config.is_half
|
| self.sample_rate: int = 16000
|
| self.window: int = 160
|
| self.t_pad: int = self.sample_rate * self.x_pad
|
| self.t_pad_tgt: int = tgt_sr * self.x_pad
|
| self.t_pad2: int = self.t_pad * 2
|
| self.t_query: int = self.sample_rate * self.x_query
|
| self.t_center: int = self.sample_rate * self.x_center
|
| self.t_max: int = self.sample_rate * self.x_max
|
| self.time_step: float = self.window / self.sample_rate * 1000
|
| self.device: torch.device = config.device
|
| self.vc: Callable = self._vc_transformers if stack == "transformers" else self._vc
|
|
|
| def get_f0_mangio_crepe(
|
| self,
|
| x: np.ndarray,
|
| f0_min: int,
|
| f0_max: int,
|
| p_len: int,
|
| hop_length: int,
|
| model: str = "full"
|
| ) -> np.ndarray:
|
| """
|
| Получить F0 с помощью Mangio-Crepe
|
|
|
| Args:
|
| x: Аудиоданные
|
| f0_min: Минимальная частота F0
|
| f0_max: Максимальная частота F0
|
| p_len: Длина
|
| hop_length: Длина шага
|
| model: Модель ("full" или "tiny")
|
|
|
| Returns:
|
| Массив F0
|
| """
|
| x = x.astype(np.float32)
|
| x /= np.quantile(np.abs(x), 0.999)
|
| audio = torch.from_numpy(x).to(self.device, copy=True).unsqueeze(0)
|
| if audio.ndim == 2 and audio.shape[0] > 1:
|
| audio = torch.mean(audio, dim=0, keepdim=True)
|
|
|
| pitch = torchcrepe.predict(
|
| audio,
|
| self.sample_rate,
|
| hop_length,
|
| f0_min,
|
| f0_max,
|
| model,
|
| batch_size=hop_length * 2,
|
| device=self.device,
|
| pad=True,
|
| )
|
|
|
| p_len = p_len or x.shape[0] // hop_length
|
| source = np.array(pitch.squeeze(0).cpu().float().numpy())
|
| source[source < 0.001] = np.nan
|
| target = np.interp(
|
| np.arange(0, len(source) * p_len, len(source)) / p_len,
|
| np.arange(0, len(source)),
|
| source,
|
| )
|
| f0 = np.nan_to_num(target)
|
| return f0
|
|
|
| def get_f0_rmvpe(
|
| self,
|
| x: np.ndarray,
|
| f0_min: int = 1,
|
| f0_max: int = 40000,
|
| *args,
|
| **kwargs
|
| ) -> np.ndarray:
|
| """
|
| Получить F0 с помощью RMVPE
|
|
|
| Args:
|
| x: Аудиоданные
|
| f0_min: Минимальная частота F0
|
| f0_max: Максимальная частота F0
|
|
|
| Returns:
|
| Массив F0
|
| """
|
| if not hasattr(self, "model_rmvpe"):
|
| self.model_rmvpe = RMVPE0Predictor(
|
| RMVPE_DIR, is_half=self.is_half, device=self.device
|
| )
|
| f0 = self.model_rmvpe.infer_from_audio_with_pitch(
|
| x, thred=0.03, f0_min=f0_min, f0_max=f0_max
|
| )
|
| return f0
|
|
|
| def get_f0_hpa_rmvpe(
|
| self,
|
| x: np.ndarray,
|
| f0_min: int = 1,
|
| f0_max: int = 40000,
|
| *args,
|
| **kwargs
|
| ) -> np.ndarray:
|
| """
|
| Получить F0 с помощью HPA-RMVPE
|
|
|
| Args:
|
| x: Аудиоданные
|
| f0_min: Минимальная частота F0
|
| f0_max: Максимальная частота F0
|
|
|
| Returns:
|
| Массив F0
|
| """
|
| if not hasattr(self, "model_hpa_rmvpe"):
|
| self.model_hpa_rmvpe = HPA_RMVPE(
|
| HPA_RMVPE_DIR, device=self.device, hpa=True
|
| )
|
| f0 = self.model_hpa_rmvpe.infer_from_audio_with_pitch(
|
| x, thred=0.03, f0_min=f0_min, f0_max=f0_max
|
| )
|
| return f0
|
|
|
| def get_f0_fcpe(
|
| self,
|
| x: np.ndarray,
|
| f0_min: int = 50,
|
| f0_max: int = 1100,
|
| p_len: Optional[int] = None
|
| ) -> np.ndarray:
|
| """
|
| Получить F0 с помощью FCPE
|
|
|
| Args:
|
| x: Аудиоданные
|
| f0_min: Минимальная частота F0
|
| f0_max: Максимальная частота F0
|
| p_len: Длина
|
|
|
| Returns:
|
| Массив F0
|
| """
|
| self.model_fcpe = FCPEF0Predictor(
|
| FCPE_DIR,
|
| f0_min=int(f0_min),
|
| f0_max=int(f0_max),
|
| dtype=torch.float32,
|
| device=self.device,
|
| sample_rate=self.sample_rate,
|
| threshold=0.03,
|
| )
|
| f0 = self.model_fcpe.compute_f0(x, p_len=p_len or len(x) // self.window)
|
| del self.model_fcpe
|
| gc.collect()
|
| return f0
|
|
|
| def get_f0_librosa(
|
| self,
|
| x: np.ndarray,
|
| p_len: int,
|
| f0_min: int = 50,
|
| f0_max: int = 1100,
|
| hop_length: int = 160
|
| ) -> np.ndarray:
|
| """
|
| Получить F0 с помощью Librosa
|
|
|
| Args:
|
| x: Аудиоданные
|
| p_len: Длина
|
| f0_min: Минимальная частота F0
|
| f0_max: Максимальная частота F0
|
| hop_length: Длина шага
|
|
|
| Returns:
|
| Массив F0
|
| """
|
| f0, *_ = librosa.pyin(
|
| x.astype(np.float32),
|
| sr=self.sample_rate,
|
| fmin=f0_min,
|
| fmax=f0_max,
|
| hop_length=hop_length,
|
| )
|
| return self._resize_f0(f0, p_len)
|
|
|
| def _resize_f0(self, x: np.ndarray, target_len: int) -> np.ndarray:
|
| """
|
| Изменить размер массива F0
|
|
|
| Args:
|
| x: Исходный массив F0
|
| target_len: Целевая длина
|
|
|
| Returns:
|
| Измененный массив F0
|
| """
|
| source = np.array(x)
|
| source[source < 0.001] = np.nan
|
|
|
| output_f0 = np.nan_to_num(
|
| np.interp(
|
| np.arange(0, len(source) * target_len, len(source)) / target_len,
|
| np.arange(0, len(source)),
|
| source,
|
| )
|
| )
|
| return output_f0.astype(np.float32)
|
|
|
| def get_f0(
|
| self,
|
| inputaudio_path: str,
|
| x: np.ndarray,
|
| p_len: int,
|
| pitch: float,
|
| f0_method: str,
|
| filter_radius: int,
|
| hop_length: int,
|
| inp_f0: Optional[np.ndarray] = None,
|
| f0_min: int = 50,
|
| f0_max: int = 1100,
|
| ) -> Tuple[np.ndarray, np.ndarray]:
|
| """
|
| Получить F0 выбранным методом
|
|
|
| Args:
|
| inputaudio_path: Путь к аудиофайлу
|
| x: Аудиоданные
|
| p_len: Длина
|
| pitch: Высота тона
|
| f0_method: Метод извлечения F0
|
| filter_radius: Радиус фильтра
|
| hop_length: Длина шага
|
| inp_f0: Входной F0
|
| f0_min: Минимальная частота F0
|
| f0_max: Максимальная частота F0
|
|
|
| Returns:
|
| Кортеж (f0_coarse, f0bak)
|
| """
|
| global input_audio_path2wav
|
| time_step: float = self.window / self.sample_rate * 1000
|
| f0_mel_min: float = 1127 * np.log(1 + f0_min / 700)
|
| f0_mel_max: float = 1127 * np.log(1 + f0_max / 700)
|
|
|
| if f0_method in ["mangio-crepe", "mangio-crepe-tiny"]:
|
| f0 = self.get_f0_mangio_crepe(
|
| x,
|
| f0_min,
|
| f0_max,
|
| p_len,
|
| int(hop_length),
|
| "tiny" if f0_method == "mangio-crepe-tiny" else "full",
|
| )
|
|
|
| elif f0_method == "pyin":
|
| f0 = self.get_f0_librosa(x, p_len, f0_min, f0_max, hop_length)
|
|
|
| elif f0_method == "fcpe":
|
| f0 = self.get_f0_fcpe(x, f0_min, f0_max, p_len)
|
|
|
| elif f0_method == "harvest":
|
| input_audio_path2wav = {}
|
| input_audio_path2wav[inputaudio_path] = x.astype(np.double)
|
| f0 = get_harvest_f0(inputaudio_path, self.sample_rate, f0_max, f0_min, 10)
|
| if filter_radius > 2:
|
| f0 = signal.medfilt(f0, 3)
|
|
|
| elif f0_method == "pm":
|
| f0 = (
|
| parselmouth.Sound(x, self.sample_rate)
|
| .to_pitch_ac(
|
| time_step=time_step / 1000,
|
| voicing_threshold=0.6,
|
| pitch_floor=f0_min,
|
| pitch_ceiling=f0_max,
|
| )
|
| .selected_array["frequency"]
|
| )
|
| pad_size: int = (p_len - len(f0) + 1) // 2
|
| if pad_size > 0 or p_len - len(f0) - pad_size > 0:
|
| f0 = np.pad(
|
| f0, [[pad_size, p_len - len(f0) - pad_size]], mode="constant"
|
| )
|
|
|
| elif f0_method == "rmvpe+":
|
| f0 = self.get_f0_rmvpe(x=x, f0_min=f0_min, f0_max=f0_max)
|
|
|
| elif f0_method == "hpa-rmvpe":
|
| f0 = self.get_f0_hpa_rmvpe(x=x, f0_min=f0_min, f0_max=f0_max)
|
|
|
| else:
|
| raise ValueError(_i18n("unknown_f0_method", method=f0_method))
|
|
|
| f0 *= pow(2, pitch / 12)
|
| tf0: int = self.sample_rate // self.window
|
|
|
| if inp_f0 is not None:
|
| delta_t: int = np.round(
|
| (inp_f0[:, 0].max() - inp_f0[:, 0].min()) * tf0 + 1
|
| ).astype("int16")
|
| replace_f0 = np.interp(
|
| list(range(delta_t)), inp_f0[:, 0] * 100, inp_f0[:, 1]
|
| )
|
| shape: int = f0[self.x_pad * tf0 : self.x_pad * tf0 + len(replace_f0)].shape[0]
|
| f0[self.x_pad * tf0 : self.x_pad * tf0 + len(replace_f0)] = replace_f0[
|
| :shape
|
| ]
|
|
|
| f0bak: np.ndarray = f0.copy()
|
| f0_mel: np.ndarray = 1127 * np.log(1 + f0 / 700)
|
| f0_mel[f0_mel > 0] = (f0_mel[f0_mel > 0] - f0_mel_min) * 254 / (
|
| f0_mel_max - f0_mel_min
|
| ) + 1
|
| f0_mel[f0_mel <= 1] = 1
|
| f0_mel[f0_mel > 255] = 255
|
| f0_coarse: np.ndarray = np.rint(f0_mel).astype(int)
|
|
|
| return f0_coarse, f0bak
|
|
|
| def _vc(
|
| self,
|
| model: nn.Module,
|
| net_g: nn.Module,
|
| sid: torch.Tensor,
|
| audio0: np.ndarray,
|
| pitch: Optional[torch.Tensor],
|
| pitchf: Optional[torch.Tensor],
|
| index: Optional[faiss.Index],
|
| big_npy: Optional[np.ndarray],
|
| index_rate: float,
|
| version: str,
|
| protect: float,
|
| ) -> np.ndarray:
|
| """
|
| Внутренний метод голосового преобразования (fairseq)
|
|
|
| Args:
|
| model: Модель Hubert
|
| net_g: Генератор
|
| sid: ID спикера
|
| audio0: Аудиоданные
|
| pitch: Высота тона
|
| pitchf: F0
|
| index: Индекс FAISS
|
| big_npy: Массив эмбеддингов
|
| index_rate: Коэффициент влияния индекса
|
| version: Версия модели
|
| protect: Защита согласных
|
|
|
| Returns:
|
| Преобразованные аудиоданные
|
| """
|
| if torch.cuda.is_available():
|
| torch.cuda.empty_cache()
|
|
|
| feats = torch.from_numpy(audio0)
|
| feats = feats.half() if self.is_half else feats.float()
|
|
|
| if feats.dim() == 2:
|
| feats = feats.mean(-1)
|
|
|
| assert feats.dim() == 1, feats.dim()
|
| feats = feats.view(1, -1)
|
| padding_mask = torch.BoolTensor(feats.shape).to(self.device).fill_(False)
|
|
|
| inputs: Dict[str, Any] = {
|
| "source": feats.to(self.device),
|
| "padding_mask": padding_mask,
|
| "output_layer": 9 if version == "v1" else 12,
|
| }
|
|
|
| with torch.no_grad(), torch.cuda.amp.autocast(enabled=self.is_half):
|
| logits = model.extract_features(**inputs)
|
| feats = model.final_proj(logits[0]) if version == "v1" else logits[0]
|
|
|
| if protect < 0.5 and pitch is not None and pitchf is not None:
|
| feats0 = feats.clone()
|
|
|
| if index is not None and big_npy is not None and index_rate != 0:
|
| npy = feats[0].cpu().numpy()
|
| npy = npy.astype("float32") if self.is_half else npy
|
| score, ix = index.search(npy, k=8)
|
| weight = np.square(1 / score)
|
| weight /= weight.sum(axis=1, keepdims=True)
|
| npy = np.sum(big_npy[ix] * np.expand_dims(weight, axis=2), axis=1)
|
| npy = npy.astype("float16") if self.is_half else npy
|
| feats = (
|
| torch.from_numpy(npy).unsqueeze(0).to(self.device) * index_rate
|
| + (1 - index_rate) * feats
|
| )
|
|
|
| feats = F.interpolate(feats.permute(0, 2, 1), scale_factor=2).permute(
|
| 0, 2, 1
|
| )
|
| if protect < 0.5 and pitch is not None and pitchf is not None:
|
| feats0 = F.interpolate(feats0.permute(0, 2, 1), scale_factor=2).permute(
|
| 0, 2, 1
|
| )
|
|
|
| p_len: int = audio0.shape[0] // self.window
|
| if feats.shape[1] < p_len:
|
| p_len = feats.shape[1]
|
| if pitch is not None and pitchf is not None:
|
| pitch = pitch[:, :p_len]
|
| pitchf = pitchf[:, :p_len]
|
|
|
| if protect < 0.5 and pitch is not None and pitchf is not None:
|
| pitchff = pitchf.clone()
|
| pitchff[pitchf > 0] = 1
|
| pitchff[pitchf < 1] = protect
|
| pitchff = pitchff.unsqueeze(-1)
|
| feats = feats * pitchff + feats0 * (1 - pitchff)
|
| feats = feats.to(feats0.dtype)
|
|
|
| p_len_tensor = torch.tensor([p_len], device=self.device).long()
|
|
|
| if pitch is not None and pitchf is not None:
|
| audio1 = (
|
| (net_g.infer(feats, p_len_tensor, pitch, pitchf, sid)[0][0, 0])
|
| .data.cpu()
|
| .float()
|
| .numpy()
|
| )
|
| else:
|
| audio1 = (
|
| (net_g.infer(feats, p_len_tensor, sid)[0][0, 0])
|
| .data.cpu()
|
| .float()
|
| .numpy()
|
| )
|
|
|
| del feats, p_len_tensor, padding_mask
|
| if torch.cuda.is_available():
|
| torch.cuda.empty_cache()
|
|
|
| return audio1
|
|
|
| def _vc_transformers(
|
| self,
|
| model: nn.Module,
|
| net_g: nn.Module,
|
| sid: torch.Tensor,
|
| audio0: np.ndarray,
|
| pitch: Optional[torch.Tensor],
|
| pitchf: Optional[torch.Tensor],
|
| index: Optional[faiss.Index],
|
| big_npy: Optional[np.ndarray],
|
| index_rate: float,
|
| version: str,
|
| protect: float,
|
| ) -> np.ndarray:
|
| """
|
| Внутренний метод голосового преобразования (transformers)
|
|
|
| Args:
|
| model: Модель Hubert
|
| net_g: Генератор
|
| sid: ID спикера
|
| audio0: Аудиоданные
|
| pitch: Высота тона
|
| pitchf: F0
|
| index: Индекс FAISS
|
| big_npy: Массив эмбеддингов
|
| index_rate: Коэффициент влияния индекса
|
| version: Версия модели
|
| protect: Защита согласных
|
|
|
| Returns:
|
| Преобразованные аудиоданные
|
| """
|
| with torch.no_grad():
|
| pitch_guidance: bool = pitch is not None and pitchf is not None
|
| feats = torch.from_numpy(audio0).float()
|
| feats = feats.mean(-1) if feats.dim() == 2 else feats
|
| assert feats.dim() == 1, feats.dim()
|
| feats = feats.view(1, -1).to(self.device)
|
| feats = model(feats)["last_hidden_state"]
|
| feats = (
|
| model.final_proj(feats[0]).unsqueeze(0) if version == "v1" else feats
|
| )
|
| feats0 = feats.clone() if pitch_guidance else None
|
|
|
| if index is not None and big_npy is not None and index_rate != 0:
|
| feats = self._retrieve_speaker_embeddings(feats, index, big_npy, index_rate)
|
|
|
| feats = F.interpolate(feats.permute(0, 2, 1), scale_factor=2).permute(
|
| 0, 2, 1
|
| )
|
| p_len: int = min(audio0.shape[0] // self.window, feats.shape[1])
|
|
|
| if pitch_guidance:
|
| feats0 = F.interpolate(feats0.permute(0, 2, 1), scale_factor=2).permute(
|
| 0, 2, 1
|
| )
|
| if pitch is not None and pitchf is not None:
|
| pitch = pitch[:, :p_len]
|
| pitchf = pitchf[:, :p_len]
|
|
|
| if protect < 0.5:
|
| pitchff = pitchf.clone()
|
| pitchff[pitchf > 0] = 1
|
| pitchff[pitchf < 1] = protect
|
| feats = feats * pitchff.unsqueeze(-1) + feats0 * (
|
| 1 - pitchff.unsqueeze(-1)
|
| )
|
| feats = feats.to(feats0.dtype)
|
| else:
|
| pitch, pitchf = None, None
|
|
|
| p_len_tensor = torch.tensor([p_len], device=self.device).long()
|
| audio1 = (
|
| (net_g.infer(feats.float(), p_len_tensor, pitch, pitchf.float() if pitchf is not None else None, sid)[0][0, 0])
|
| .data.cpu()
|
| .float()
|
| .numpy()
|
| )
|
|
|
| del feats, feats0, p_len_tensor
|
| if torch.cuda.is_available():
|
| torch.cuda.empty_cache()
|
|
|
| return audio1
|
|
|
| def pipeline(
|
| self,
|
| model: nn.Module,
|
| net_g: nn.Module,
|
| sid: int,
|
| audio: np.ndarray,
|
| inputaudio_path: str,
|
| pitch: float,
|
| f0_method: str,
|
| file_index: Optional[str],
|
| index_rate: float,
|
| pitch_guidance: bool,
|
| filter_radius: int,
|
| tgt_sr: int,
|
| resample_sr: int,
|
| volume_envelope: float,
|
| version: str,
|
| protect: float,
|
| hop_length: int,
|
| f0_file: Optional[Any],
|
| f0_min: int = 50,
|
| f0_max: int = 1100,
|
| add_text: str = ""
|
| ) -> np.ndarray:
|
| """
|
| Основной пайплайн обработки (оригинальный)
|
|
|
| Args:
|
| model: Модель Hubert
|
| net_g: Генератор
|
| sid: ID спикера
|
| audio: Аудиоданные
|
| inputaudio_path: Путь к аудиофайлу
|
| pitch: Высота тона
|
| f0_method: Метод извлечения F0
|
| file_index: Путь к индексному файлу
|
| index_rate: Коэффициент влияния индекса
|
| pitch_guidance: Использовать направление по высоте тона
|
| filter_radius: Радиус фильтра
|
| tgt_sr: Целевая частота дискретизации
|
| resample_sr: Частота ресемплинга
|
| volume_envelope: Огибающая громкости
|
| version: Версия модели
|
| protect: Защита согласных
|
| hop_length: Длина шага
|
| f0_file: Файл с F0
|
| f0_min: Минимальная частота F0
|
| f0_max: Максимальная частота F0
|
| add_text: Дополнительный текст для прогресса
|
|
|
| Returns:
|
| Преобразованные аудиоданные
|
| """
|
| if (
|
| file_index is not None
|
| and file_index != ""
|
| and os.path.exists(file_index)
|
| and index_rate != 0
|
| ):
|
| try:
|
| index = faiss.read_index(file_index)
|
| big_npy = index.reconstruct_n(0, index.ntotal)
|
| except Exception as e:
|
| print(f"{_i18n('faiss_error')}: {e}")
|
| index = big_npy = None
|
| else:
|
| index = big_npy = None
|
|
|
| audio = signal.filtfilt(bh, ah, audio)
|
| audio_pad = np.pad(audio, (self.window // 2, self.window // 2), mode="reflect")
|
| opt_ts: List[int] = []
|
|
|
| if audio_pad.shape[0] > self.t_max:
|
| audio_sum = np.zeros_like(audio)
|
| for i in range(self.window):
|
| audio_sum += audio_pad[i : i - self.window]
|
| for t in range(self.t_center, audio.shape[0], self.t_center):
|
| opt_ts.append(
|
| t
|
| - self.t_query
|
| + np.where(
|
| np.abs(audio_sum[t - self.t_query : t + self.t_query])
|
| == np.abs(audio_sum[t - self.t_query : t + self.t_query]).min()
|
| )[0][0]
|
| )
|
|
|
| s: int = 0
|
| audio_opt: List[np.ndarray] = []
|
| t: Optional[int] = None
|
| audio_pad = np.pad(audio, (self.t_pad, self.t_pad), mode="reflect")
|
| p_len: int = audio_pad.shape[0] // self.window
|
| inp_f0: Optional[np.ndarray] = None
|
|
|
| if f0_file and hasattr(f0_file, "name"):
|
| try:
|
| with open(f0_file.name, "r") as f:
|
| lines = f.read().strip("\n").split("\n")
|
| inp_f0 = np.array(
|
| [[float(i) for i in line.split(",")] for line in lines],
|
| dtype="float32",
|
| )
|
| except Exception as e:
|
| print(f"{_i18n('f0_file_error')}: {e}")
|
|
|
| sid_tensor = torch.tensor(sid, device=self.device).unsqueeze(0).long()
|
|
|
| progress = gr.Progress()
|
| progress((2, 4), desc=f"{_i18n('calculating_f0')} {add_text}")
|
|
|
| if pitch_guidance:
|
| pitch_coarse, pitchf = self.get_f0(
|
| inputaudio_path,
|
| audio_pad,
|
| p_len,
|
| pitch,
|
| f0_method,
|
| filter_radius,
|
| hop_length,
|
| inp_f0,
|
| f0_min,
|
| f0_max,
|
| )
|
| pitch_coarse = pitch_coarse[:p_len]
|
| pitchf = pitchf[:p_len]
|
| if self.device.type == "mps":
|
| pitchf = pitchf.astype(np.float32)
|
| pitch_tensor = torch.tensor(pitch_coarse, device=self.device).unsqueeze(0).long()
|
| pitchf_tensor = torch.tensor(pitchf, device=self.device).unsqueeze(0).float()
|
| else:
|
| pitch_tensor = pitchf_tensor = None
|
|
|
| total_ts: int = len(opt_ts)
|
|
|
| for i, t in enumerate(opt_ts, start=1):
|
| progress((i, total_ts), desc=f"{_i18n('voice_synthesis')} {add_text}", unit=_i18n("chunks"))
|
| print(f"\r{_i18n('voice_synthesis')} {int((i / total_ts) * 100)}% {add_text}", end="")
|
| t = t // self.window * self.window
|
|
|
| if pitch_guidance:
|
| audio_opt.append(
|
| self.vc(
|
| model,
|
| net_g,
|
| sid_tensor,
|
| audio_pad[s : t + self.t_pad2 + self.window],
|
| pitch_tensor[:, s // self.window : (t + self.t_pad2) // self.window] if pitch_tensor is not None else None,
|
| pitchf_tensor[:, s // self.window : (t + self.t_pad2) // self.window] if pitchf_tensor is not None else None,
|
| index,
|
| big_npy,
|
| index_rate,
|
| version,
|
| protect,
|
| )[self.t_pad_tgt : -self.t_pad_tgt]
|
| )
|
| else:
|
| audio_opt.append(
|
| self.vc(
|
| model,
|
| net_g,
|
| sid_tensor,
|
| audio_pad[s : t + self.t_pad2 + self.window],
|
| None,
|
| None,
|
| index,
|
| big_npy,
|
| index_rate,
|
| version,
|
| protect,
|
| )[self.t_pad_tgt : -self.t_pad_tgt]
|
| )
|
| s = t
|
|
|
| if pitch_guidance:
|
| progress(1, desc=f"{_i18n('voice_synthesis_final')} {add_text}")
|
| print(f"\r{_i18n('voice_synthesis')} 100% {add_text}", end="")
|
| audio_opt.append(
|
| self.vc(
|
| model,
|
| net_g,
|
| sid_tensor,
|
| audio_pad[t:] if t is not None else audio_pad,
|
| pitch_tensor[:, t // self.window :] if (pitch_tensor is not None and t is not None) else pitch_tensor,
|
| pitchf_tensor[:, t // self.window :] if (pitchf_tensor is not None and t is not None) else pitchf_tensor,
|
| index,
|
| big_npy,
|
| index_rate,
|
| version,
|
| protect,
|
| )[self.t_pad_tgt : -self.t_pad_tgt]
|
| )
|
| else:
|
| progress(1, desc=f"{_i18n('voice_synthesis_final')} {add_text}")
|
| print(f"\r{_i18n('voice_synthesis')} 100% {add_text}", end="")
|
| audio_opt.append(
|
| self.vc(
|
| model,
|
| net_g,
|
| sid_tensor,
|
| audio_pad[t:] if t is not None else audio_pad,
|
| None,
|
| None,
|
| index,
|
| big_npy,
|
| index_rate,
|
| version,
|
| protect,
|
| )[self.t_pad_tgt : -self.t_pad_tgt]
|
| )
|
|
|
| print("")
|
| audio_opt_array = np.concatenate(audio_opt)
|
|
|
| if volume_envelope != 1:
|
| audio_opt_array = AudioProcessor.change_rms(
|
| audio, self.sample_rate, audio_opt_array, tgt_sr, volume_envelope
|
| )
|
|
|
| if resample_sr >= self.sample_rate and tgt_sr != resample_sr:
|
| audio_opt_array = librosa.resample(
|
| audio_opt_array, orig_sr=tgt_sr, target_sr=resample_sr
|
| )
|
|
|
| audio_max = np.abs(audio_opt_array).max() / 0.99
|
| max_int16 = 32768
|
| if audio_max > 1:
|
| max_int16 /= audio_max
|
| audio_opt_array = (audio_opt_array * max_int16).astype(np.int16)
|
|
|
| if torch.cuda.is_available():
|
| torch.cuda.empty_cache()
|
|
|
| return audio_opt_array
|
|
|
| def pipeline2(
|
| self,
|
| model: nn.Module,
|
| net_g: nn.Module,
|
| sid: int,
|
| audio: np.ndarray,
|
| inputaudio_path: str,
|
| pitch: float,
|
| f0_method: str,
|
| file_index: Optional[str],
|
| index_rate: float,
|
| pitch_guidance: bool,
|
| filter_radius: int,
|
| tgt_sr: int,
|
| resample_sr: int,
|
| volume_envelope: float,
|
| version: str,
|
| protect: float,
|
| hop_length: int,
|
| f0_file: Optional[Any],
|
| f0_min: int = 50,
|
| f0_max: int = 1100,
|
| add_text: str = ""
|
| ) -> np.ndarray:
|
| """
|
| Альтернативный пайплайн обработки (с разбиением на чанки)
|
|
|
| Args:
|
| model: Модель Hubert
|
| net_g: Генератор
|
| sid: ID спикера
|
| audio: Аудиоданные
|
| inputaudio_path: Путь к аудиофайлу
|
| pitch: Высота тона
|
| f0_method: Метод извлечения F0
|
| file_index: Путь к индексному файлу
|
| index_rate: Коэффициент влияния индекса
|
| pitch_guidance: Использовать направление по высоте тона
|
| filter_radius: Радиус фильтра
|
| tgt_sr: Целевая частота дискретизации
|
| resample_sr: Частота ресемплинга
|
| volume_envelope: Огибающая громкости
|
| version: Версия модели
|
| protect: Защита согласных
|
| hop_length: Длина шага
|
| f0_file: Файл с F0
|
| f0_min: Минимальная частота F0
|
| f0_max: Максимальная частота F0
|
| add_text: Дополнительный текст для прогресса
|
|
|
| Returns:
|
| Преобразованные аудиоданные
|
| """
|
| device = self.device
|
| audio = signal.filtfilt(bh, ah, audio)
|
| audio_len = len(audio)
|
|
|
| if (
|
| file_index
|
| and file_index != ""
|
| and os.path.exists(file_index)
|
| and index_rate != 0
|
| ):
|
| try:
|
| index = faiss.read_index(file_index)
|
| big_npy = index.reconstruct_n(0, index.ntotal)
|
| except Exception as e:
|
| print(f"{_i18n('faiss_error')}: {e}")
|
| index = big_npy = None
|
| else:
|
| index = big_npy = None
|
|
|
| inp_f0 = None
|
| if f0_file and hasattr(f0_file, "name"):
|
| try:
|
| with open(f0_file.name, "r") as f:
|
| lines = f.read().strip("\n").split("\n")
|
| inp_f0 = np.array(
|
| [[float(i) for i in line.split(",")] for line in lines],
|
| dtype="float32",
|
| )
|
| except Exception as e:
|
| print(f"{_i18n('f0_file_error')}: {e}")
|
|
|
| sid_tensor = torch.tensor(sid, device=device).unsqueeze(0).long()
|
|
|
| raw_chunk_size = self.get_max_memory_chunk(audio_len, model, net_g, version)
|
| offset = int(tgt_sr // 12.5)
|
| real_chunk_size = raw_chunk_size
|
| if real_chunk_size <= 0:
|
| raise ValueError(_i18n("chunk_size_error"))
|
|
|
| print(f"{_i18n('chunk_size')}: {real_chunk_size} | {int(real_chunk_size / self.sample_rate)} {_i18n('seconds')}")
|
|
|
| audio_pad = np.pad(audio, (offset, offset), mode="reflect")
|
|
|
| progress = gr.Progress()
|
| progress((2, 4), desc=f"{_i18n('calculating_f0')} {add_text}")
|
|
|
| pitch_tensor: Optional[torch.Tensor] = None
|
| pitchf_tensor: Optional[torch.Tensor] = None
|
|
|
| if pitch_guidance:
|
| p_len = len(audio_pad) // self.window
|
| pitch_coarse, pitchf = self.get_f0(
|
| inputaudio_path,
|
| audio_pad,
|
| p_len,
|
| pitch,
|
| f0_method,
|
| filter_radius,
|
| hop_length,
|
| inp_f0,
|
| f0_min,
|
| f0_max,
|
| )
|
| pitch_coarse = pitch_coarse[:p_len]
|
| pitchf = pitchf[:p_len]
|
| if device.type == "mps":
|
| pitchf = pitchf.astype(np.float32)
|
| pitch_tensor = torch.tensor(pitch_coarse, device=device).unsqueeze(0).long()
|
| pitchf_tensor = torch.tensor(pitchf, device=device).unsqueeze(0).float()
|
|
|
| processed_chunks: List[Tuple[int, int, np.ndarray, int, int]] = []
|
| start = 0
|
|
|
| chunk_count: int = 0
|
| temp_start = 0
|
| while temp_start < audio_len:
|
| temp_end = min(temp_start + real_chunk_size, audio_len)
|
| chunk_count += 1
|
| temp_start = temp_end
|
|
|
| current_chunk = 0
|
|
|
| while start < audio_len:
|
| current_chunk += 1
|
| progress(
|
| (current_chunk, chunk_count),
|
| desc=f"{_i18n('voice_synthesis_alt')} {add_text}", unit=_i18n("chunks")
|
| )
|
| print(f"\r{_i18n('voice_synthesis_alt')} {int((current_chunk / chunk_count) * 100)}% {add_text}", end="")
|
|
|
| end = min(start + real_chunk_size, audio_len)
|
|
|
| need_left = start > 0
|
| need_right = end < audio_len
|
| pad_left = offset if need_left else 0
|
| pad_right = offset if need_right else 0
|
|
|
| chunk_start_in_pad = start - pad_left
|
| chunk_end_in_pad = end + pad_right
|
|
|
| chunk_audio = audio_pad[
|
| chunk_start_in_pad + offset : chunk_end_in_pad + offset
|
| ]
|
|
|
| f0_start = (chunk_start_in_pad + offset) // self.window
|
| f0_end = (chunk_end_in_pad + offset) // self.window
|
|
|
| if pitch_guidance and pitch_tensor is not None and pitchf_tensor is not None:
|
| out = self.vc(
|
| model,
|
| net_g,
|
| sid_tensor,
|
| chunk_audio,
|
| pitch_tensor[:, f0_start:f0_end],
|
| pitchf_tensor[:, f0_start:f0_end],
|
| index,
|
| big_npy,
|
| index_rate,
|
| version,
|
| protect,
|
| )
|
| else:
|
| out = self.vc(
|
| model,
|
| net_g,
|
| sid_tensor,
|
| chunk_audio,
|
| None,
|
| None,
|
| index,
|
| big_npy,
|
| index_rate,
|
| version,
|
| protect,
|
| )
|
|
|
| output_start = int(round((chunk_start_in_pad) / self.sample_rate * tgt_sr))
|
| output_end = output_start + len(out)
|
|
|
| processed_chunks.append(
|
| (output_start, output_end, out, pad_left, pad_right)
|
| )
|
|
|
| start = end
|
|
|
| if not processed_chunks:
|
| raise RuntimeError(_i18n("no_chunks_error"))
|
|
|
| max_output_end = max(end for _c, end, _c, _c, _c in processed_chunks)
|
| output = np.zeros(max_output_end, dtype=np.float32)
|
| weight = np.zeros(max_output_end, dtype=np.float32)
|
|
|
| for start_idx, end_idx, chunk, pad_left, pad_right in processed_chunks:
|
| chunk_len = len(chunk)
|
| if chunk_len != (end_idx - start_idx):
|
| end_idx = start_idx + chunk_len
|
|
|
| w = np.ones(chunk_len, dtype=np.float32)
|
| fade_len = int(round(offset / self.sample_rate * tgt_sr))
|
|
|
| if pad_left > 0 and fade_len > 0:
|
| actual_fade = min(fade_len, chunk_len)
|
| w[:actual_fade] = np.linspace(0, 1, actual_fade)
|
| if pad_right > 0 and fade_len > 0:
|
| actual_fade = min(fade_len, chunk_len)
|
| w[-actual_fade:] = np.linspace(1, 0, actual_fade)
|
|
|
| output_end = min(end_idx, len(output))
|
| chunk = chunk[: output_end - start_idx]
|
| w = w[: output_end - start_idx]
|
|
|
| output[start_idx:output_end] += chunk * w
|
| weight[start_idx:output_end] += w
|
|
|
| mask = weight > 1e-8
|
| output[mask] /= weight[mask]
|
|
|
| expected_final_len = int(round(audio_len / self.sample_rate * tgt_sr))
|
| print("")
|
| audio_opt = output[:expected_final_len]
|
|
|
| if volume_envelope != 1:
|
| audio_opt = AudioProcessor.change_rms(
|
| audio, self.sample_rate, audio_opt, tgt_sr, volume_envelope
|
| )
|
| if resample_sr >= self.sample_rate and tgt_sr != resample_sr:
|
| audio_opt = librosa.resample(
|
| audio_opt, orig_sr=tgt_sr, target_sr=resample_sr
|
| )
|
|
|
| audio_max = np.abs(audio_opt).max() / 0.99
|
| max_int16 = 32768
|
| if audio_max > 1:
|
| max_int16 /= audio_max
|
| audio_opt = (audio_opt * max_int16).astype(np.int16)
|
|
|
| if torch.cuda.is_available():
|
| torch.cuda.empty_cache()
|
|
|
| return audio_opt
|
|
|
| def get_max_memory_chunk(
|
| self, audio_length: int, model: nn.Module, net_g: nn.Module, version: str
|
| ) -> int:
|
| """
|
| Рассчитывает оптимальный размер чанка на основе доступной памяти
|
|
|
| Args:
|
| audio_length: Длина аудио
|
| model: Модель Hubert
|
| net_g: Генератор
|
| version: Версия модели
|
|
|
| Returns:
|
| Оптимальный размер чанка
|
| """
|
| base_chunk_size = min(
|
| self.sample_rate * VBACH_ALT_PIPELINE_TIME_CHUNK,
|
| audio_length
|
| )
|
|
|
| if self.device.type == "cuda" and torch.cuda.is_available() and not str2bool(os.environ.get("VBACH_ALTPL_PREF_BASE_SEG", "False")):
|
| try:
|
| torch.cuda.synchronize()
|
| total_memory = torch.cuda.get_device_properties(0).total_memory
|
| allocated = torch.cuda.memory_allocated(0)
|
| free_memory = total_memory - allocated
|
|
|
| usable_memory = free_memory * 0.2
|
|
|
| print(
|
| f"{_i18n('vram_available')}: {free_memory/1024**3:.2f} GB, "
|
| f"{_i18n('using')}: {usable_memory/1024**3:.2f} GB"
|
| )
|
|
|
| memory_per_second = 100 * 1024 * 1024
|
|
|
| max_seconds = usable_memory / memory_per_second
|
| max_seconds = int(max_seconds)
|
| chunk_seconds = max(10.0, max_seconds)
|
| chunk_size = int(chunk_seconds * self.sample_rate)
|
|
|
| chunk_size = max(self.window, (chunk_size // self.window) * self.window)
|
|
|
| min_chunk_size = self.sample_rate * 2
|
| chunk_size = max(chunk_size, min_chunk_size)
|
|
|
| chunk_size = min(chunk_size, audio_length)
|
|
|
| return chunk_size
|
|
|
| except Exception as e:
|
| print(f"{_i18n('chunk_calc_error')}: {e}")
|
|
|
| return min(base_chunk_size, audio_length)
|
|
|
| def _retrieve_speaker_embeddings(
|
| self,
|
| feats: torch.Tensor,
|
| index: faiss.Index,
|
| big_npy: np.ndarray,
|
| index_rate: float
|
| ) -> torch.Tensor:
|
| """
|
| Получить эмбеддинги спикера из индекса
|
|
|
| Args:
|
| feats: Эмбеддинги
|
| index: Индекс FAISS
|
| big_npy: Массив эмбеддингов
|
| index_rate: Коэффициент влияния индекса
|
|
|
| Returns:
|
| Обновленные эмбеддинги
|
| """
|
| npy = feats[0].cpu().numpy()
|
| score, ix = index.search(npy, k=8)
|
| weight = np.square(1 / score)
|
| weight /= weight.sum(axis=1, keepdims=True)
|
| npy = np.sum(big_npy[ix] * np.expand_dims(weight, axis=2), axis=1)
|
| feats = (
|
| torch.from_numpy(npy).unsqueeze(0).to(self.device) * index_rate
|
| + (1 - index_rate) * feats
|
| )
|
| return feats
|
|
|
|
|
| def loadaudio(
|
| file_path: str,
|
| target_sr: int,
|
| stereo_mode: str
|
| ) -> Tuple[Optional[np.ndarray], Optional[np.ndarray], Optional[np.ndarray]]:
|
| """
|
| Загрузить аудиофайл
|
|
|
| Args:
|
| file_path: Путь к файлу
|
| target_sr: Целевая частота дискретизации
|
| stereo_mode: Режим стерео
|
|
|
| Returns:
|
| Кортеж (mid, left, right)
|
| """
|
| try:
|
| mid: Optional[np.ndarray] = None
|
| left: Optional[np.ndarray] = None
|
| right: Optional[np.ndarray] = None
|
|
|
| if stereo_mode == "mono":
|
| mid, sr = read(path=file_path, sr=target_sr, mono=True, flatten=True)
|
| else:
|
| stereoaudio, sr = read(path=file_path, sr=target_sr, mono=False)
|
| if stereo_mode == "left/right":
|
| left, right = split_channels(stereoaudio)
|
| elif stereo_mode == "sim/dif":
|
| center, stereo_base = split_mid_side(stereoaudio, var=3, sr=target_sr)
|
| mid = stereo_to_mono(center, to_flatten=True)
|
| left, right = split_channels(stereo_base)
|
| return mid, left, right
|
| except Exception as e:
|
| raise RuntimeError(f"{_i18n('audio_load_error', file=file_path)}: {str(e)}")
|
|
|
|
|
| class Config:
|
| """Конфигурация для VC"""
|
|
|
| def __init__(self, device_str: str) -> None:
|
| """
|
| Инициализация конфигурации
|
|
|
| Args:
|
| device_str: Строка устройства
|
| """
|
| self.device_str: str = device_str
|
| self.device_ids: Optional[List[int]] = None
|
| self.set_device(self.device_str)
|
| self.is_half: bool = False
|
| self.n_cpu: int = cpu_count()
|
| self.gpu_name: Optional[str] = None
|
| self.gpu_mem: Optional[int] = None
|
| self.x_pad, self.x_query, self.x_center, self.x_max = self.device_config()
|
|
|
| def set_device(self, device_str: str) -> None:
|
| """
|
| Установить устройство
|
|
|
| Args:
|
| device_str: Строка устройства
|
| """
|
| if "cuda" in device_str.lower():
|
| if ":" in device_str:
|
| device_spec = device_str.split(":")[1]
|
| self.device_ids = [int(id) for id in device_spec.split(",") if id.isdigit()]
|
| else:
|
| self.device_ids = list(range(torch.cuda.device_count()))
|
| self.device = torch.device("cuda" if not self.device_ids else f"cuda:{self.device_ids[0]}")
|
| elif "mps" in device_str.lower():
|
| self.device_ids = None
|
| self.device = torch.device("mps")
|
| else:
|
| self.device_ids = None
|
| self.device = torch.device("cpu")
|
|
|
| def device_config(self) -> Tuple[int, int, int, int]:
|
| """
|
| Настройка параметров для устройства
|
|
|
| Returns:
|
| Кортеж (x_pad, x_query, x_center, x_max)
|
| """
|
| if self.device.type == "cuda":
|
| print(_i18n("using_cuda"))
|
| if self.device_ids:
|
| self.gpu_mem = self._configure_gpu(self.device_ids[0])
|
| elif self.device.type == "mps":
|
| print(_i18n("using_mps"))
|
| else:
|
| print(_i18n("using_cpu"))
|
|
|
| x_pad, x_query, x_center, x_max = (
|
| (3, 10, 60, 65) if self.is_half else (1, 6, 38, 41)
|
| )
|
| if self.gpu_mem is not None and self.gpu_mem <= 4:
|
| x_pad, x_query, x_center, x_max = (1, 5, 30, 32)
|
|
|
| return x_pad, x_query, x_center, x_max
|
|
|
| def _configure_gpu(self, device_id: int) -> int:
|
| """
|
| Настройка GPU
|
|
|
| Args:
|
| device_id: ID устройства
|
|
|
| Returns:
|
| Объем памяти GPU в GB
|
| """
|
| self.gpu_name = torch.cuda.get_device_name(f"cuda:{device_id}")
|
| low_end_gpus = ["16", "P40", "P10", "1060", "1070", "1080"]
|
| if (
|
| any(gpu in self.gpu_name for gpu in low_end_gpus)
|
| and "V100" not in self.gpu_name.upper()
|
| ):
|
| self.is_half = False
|
| return int(
|
| torch.cuda.get_device_properties(self.device).total_memory
|
| / 1024
|
| / 1024
|
| / 1024
|
| + 0.4
|
| )
|
|
|
|
|
| def load_hubert(
|
| device: torch.device,
|
| is_half: bool,
|
| model_path: str
|
| ) -> nn.Module:
|
| """
|
| Загрузить модель Hubert
|
|
|
| Args:
|
| device: Устройство
|
| is_half: Использовать половинную точность
|
| model_path: Путь к модели
|
|
|
| Returns:
|
| Модель Hubert
|
| """
|
| models, saved_cfg, task = load_model_ensemble_and_task([model_path], suffix="")
|
| hubert = models[0].to(device)
|
| hubert = hubert.half() if is_half else hubert.float()
|
| hubert.eval()
|
| return hubert
|
|
|
|
|
| def get_vc(
|
| device: torch.device,
|
| is_half: bool,
|
| config: Any,
|
| model_path: str,
|
| stack: str
|
| ) -> Tuple[Dict[str, Any], str, nn.Module, int, VC, int]:
|
| """
|
| Загрузить модель VC
|
|
|
| Args:
|
| device: Устройство
|
| is_half: Использовать половинную точность
|
| config: Конфигурация
|
| model_path: Путь к модели
|
| stack: Стек
|
|
|
| Returns:
|
| Кортеж (cpt, version, net_g, tgt_sr, vc, use_f0)
|
| """
|
| if not os.path.isfile(model_path):
|
| raise FileNotFoundError(f"{_i18n('model_not_found')}: {model_path}")
|
|
|
| try:
|
| cpt = torch.load(model_path, map_location="cpu", weights_only=True)
|
|
|
| required_keys = ["config", "weight"]
|
| missing_keys = [key for key in required_keys if key not in cpt]
|
|
|
| if missing_keys:
|
| raise ValueError(
|
| f"{_i18n('invalid_model_format', model=model_path)}. "
|
| f"{_i18n('missing_keys')}: {missing_keys}. "
|
| f"{_i18n('use_rvc_format')}"
|
| )
|
|
|
| tgt_sr = cpt["config"][-1]
|
|
|
| emb_weight_shape = cpt["weight"]["emb_g.weight"].shape
|
| cpt["config"][-3] = emb_weight_shape[0]
|
|
|
| use_f0 = cpt.get("f0", 1)
|
| version = cpt.get("version", "v1")
|
| vocoder = cpt.get("vocoder", "HiFi-GAN")
|
|
|
| text_enc_hidden_dim = 768 if version == "v2" else 256
|
|
|
| print(f"{_i18n('loading_model')}: {os.path.basename(model_path)}")
|
| print(f"{_i18n('version')}: {version}, F0: {use_f0}, {_i18n('sample_rate')}: {tgt_sr}Hz")
|
| print(f"{_i18n('speaker_count')}: {emb_weight_shape[0]}")
|
|
|
| net_g = Synthesizer(
|
| *cpt["config"],
|
| use_f0=use_f0,
|
| text_enc_hidden_dim=text_enc_hidden_dim,
|
| vocoder=vocoder,
|
| )
|
|
|
| if hasattr(net_g, "enc_q"):
|
| del net_g.enc_q
|
| else:
|
| print(f"{_i18n('enc_q_warning')}")
|
|
|
| missing_keys, unexpected_keys = net_g.load_state_dict(
|
| cpt["weight"], strict=False
|
| )
|
|
|
| if missing_keys:
|
| print(f"{_i18n('missing_keys_warning')}: {missing_keys}")
|
|
|
| if unexpected_keys:
|
| print(f"{_i18n('unexpected_keys_warning')}: {unexpected_keys}")
|
|
|
| net_g.eval()
|
|
|
| net_g = net_g.to(device)
|
| if is_half:
|
| net_g = net_g.half()
|
| print(f"{_i18n('half_precision')}")
|
| else:
|
| net_g = net_g.float()
|
| print(f"{_i18n('full_precision')}")
|
|
|
| vc = VC(tgt_sr, config, stack)
|
|
|
| if torch.cuda.is_available():
|
| torch.cuda.empty_cache()
|
|
|
| print(f"{_i18n('model_loaded', device=str(device))}")
|
|
|
| return cpt, version, net_g, tgt_sr, vc, use_f0
|
|
|
| except torch.serialization.pickle.UnpicklingError as e:
|
| raise ValueError(
|
| f"{_i18n('corrupted_model')}: {model_path}"
|
| ) from e
|
| except Exception as e:
|
| raise RuntimeError(f"{_i18n('model_load_error')}: {str(e)}") from e
|
|
|
|
|
| def rvc_infer(
|
| index_path: Optional[str],
|
| index_rate: float,
|
| input_path: str,
|
| output_path: str,
|
| pitch: float,
|
| f0_method: str,
|
| cpt: Dict[str, Any],
|
| version: str,
|
| net_g: nn.Module,
|
| filter_radius: int,
|
| tgt_sr: int,
|
| volume_envelope: float,
|
| protect: float,
|
| hop_length: int,
|
| vc: VC,
|
| hubert_model: nn.Module,
|
| pitch_guidance: bool,
|
| f0_min: int = 50,
|
| f0_max: int = 1100,
|
| format_output: str = "wav",
|
| output_bitrate: str = "320k",
|
| stereo_mode: str = "mono",
|
| pipeline_mode: str = "orig",
|
| add_text: str = ""
|
| ) -> str:
|
| """
|
| Инференс RVC
|
|
|
| Args:
|
| index_path: Путь к индексному файлу
|
| index_rate: Коэффициент влияния индекса
|
| input_path: Путь к входному файлу
|
| output_path: Путь к выходному файлу
|
| pitch: Высота тона
|
| f0_method: Метод извлечения F0
|
| cpt: Чекпоинт модели
|
| version: Версия модели
|
| net_g: Генератор
|
| filter_radius: Радиус фильтра
|
| tgt_sr: Целевая частота дискретизации
|
| volume_envelope: Огибающая громкости
|
| protect: Защита согласных
|
| hop_length: Длина шага
|
| vc: Объект VC
|
| hubert_model: Модель Hubert
|
| pitch_guidance: Использовать направление по высоте тона
|
| f0_min: Минимальная частота F0
|
| f0_max: Максимальная частота F0
|
| format_output: Формат вывода
|
| output_bitrate: Битрейт
|
| stereo_mode: Режим стерео
|
| pipeline_mode: Режим пайплайна
|
| add_text: Дополнительный текст
|
|
|
| Returns:
|
| Путь к выходному файлу
|
| """
|
| if pipeline_mode == "alt":
|
| pipeline = vc.pipeline2
|
| else:
|
| pipeline = vc.pipeline
|
|
|
| mid, left, right = loadaudio(input_path, 16000, stereo_mode)
|
|
|
| if stereo_mode == "mono":
|
| if mid is None:
|
| raise ValueError(_i18n("mono_audio_none"))
|
|
|
| audio_opt = pipeline(
|
| hubert_model,
|
| net_g,
|
| 0,
|
| mid,
|
| input_path,
|
| pitch,
|
| f0_method,
|
| index_path,
|
| index_rate,
|
| pitch_guidance,
|
| filter_radius,
|
| tgt_sr,
|
| 0,
|
| volume_envelope,
|
| version,
|
| protect,
|
| hop_length,
|
| f0_file=None,
|
| f0_min=f0_min,
|
| f0_max=f0_max,
|
| add_text=add_text
|
| )
|
|
|
| elif stereo_mode == "left/right":
|
| if left is None or right is None:
|
| raise ValueError(_i18n("stereo_channels_none"))
|
|
|
| leftaudio_opt = pipeline(
|
| hubert_model,
|
| net_g,
|
| 0,
|
| left,
|
| input_path,
|
| pitch,
|
| f0_method,
|
| index_path,
|
| index_rate,
|
| pitch_guidance,
|
| filter_radius,
|
| tgt_sr,
|
| 0,
|
| volume_envelope,
|
| version,
|
| protect,
|
| hop_length,
|
| f0_file=None,
|
| f0_min=f0_min,
|
| f0_max=f0_max,
|
| add_text=f"{add_text} (L)"
|
| )
|
| rightaudio_opt = pipeline(
|
| hubert_model,
|
| net_g,
|
| 0,
|
| right,
|
| input_path,
|
| pitch,
|
| f0_method,
|
| index_path,
|
| index_rate,
|
| pitch_guidance,
|
| filter_radius,
|
| tgt_sr,
|
| 0,
|
| volume_envelope,
|
| version,
|
| protect,
|
| hop_length,
|
| f0_file=None,
|
| f0_min=f0_min,
|
| f0_max=f0_max,
|
| add_text=f"{add_text} (R)"
|
| )
|
|
|
| min_len = min(len(leftaudio_opt), len(rightaudio_opt))
|
| if min_len == 0:
|
| raise ValueError(_i18n("processed_audio_empty"))
|
|
|
| output_dtype = leftaudio_opt.dtype
|
|
|
| leftaudio_opt = trim(leftaudio_opt, 0, min_len)
|
| rightaudio_opt = trim(rightaudio_opt, 0, min_len)
|
|
|
| audio_opt = multi_channel_array_from_arrays(
|
| leftaudio_opt,
|
| rightaudio_opt,
|
| index=1,
|
| dtype=output_dtype
|
| )
|
|
|
| elif stereo_mode == "sim/dif":
|
| if mid is None or left is None or right is None:
|
| raise ValueError(_i18n("mid_side_channels_none"))
|
|
|
| midaudio_opt = pipeline(
|
| hubert_model,
|
| net_g,
|
| 0,
|
| mid,
|
| input_path,
|
| pitch,
|
| f0_method,
|
| index_path,
|
| index_rate,
|
| pitch_guidance,
|
| filter_radius,
|
| tgt_sr,
|
| 0,
|
| volume_envelope,
|
| version,
|
| protect,
|
| hop_length,
|
| f0_file=None,
|
| f0_min=f0_min,
|
| f0_max=f0_max,
|
| add_text=f"{add_text} {_i18n('center')}"
|
| )
|
| leftaudio_opt = pipeline(
|
| hubert_model,
|
| net_g,
|
| 0,
|
| left,
|
| input_path,
|
| pitch,
|
| f0_method,
|
| index_path,
|
| index_rate,
|
| pitch_guidance,
|
| filter_radius,
|
| tgt_sr,
|
| 0,
|
| volume_envelope,
|
| version,
|
| protect,
|
| hop_length,
|
| f0_file=None,
|
| f0_min=f0_min,
|
| f0_max=f0_max,
|
| add_text=f"{add_text} {_i18n('stereo_base')} L"
|
| )
|
| rightaudio_opt = pipeline(
|
| hubert_model,
|
| net_g,
|
| 0,
|
| right,
|
| input_path,
|
| pitch,
|
| f0_method,
|
| index_path,
|
| index_rate,
|
| pitch_guidance,
|
| filter_radius,
|
| tgt_sr,
|
| 0,
|
| volume_envelope,
|
| version,
|
| protect,
|
| hop_length,
|
| f0_file=None,
|
| f0_min=f0_min,
|
| f0_max=f0_max,
|
| add_text=f"{add_text} {_i18n('stereo_base')} R"
|
| )
|
|
|
| min_len = min(len(midaudio_opt), len(leftaudio_opt), len(rightaudio_opt))
|
| if min_len == 0:
|
| raise ValueError(_i18n("processed_audio_empty"))
|
|
|
| output_dtype = leftaudio_opt.dtype
|
| midaudio_opt = trim(midaudio_opt, 0, min_len)
|
| leftaudio_opt = trim(leftaudio_opt, 0, min_len)
|
| rightaudio_opt = trim(rightaudio_opt, 0, min_len)
|
| difaudio_opt = multi_channel_array_from_arrays(
|
| leftaudio_opt,
|
| rightaudio_opt,
|
| index=1,
|
| dtype=output_dtype
|
| )
|
| audio_opt = convert_to_dtype(
|
| (mono_to_stereo(midaudio_opt, index=1) + difaudio_opt),
|
| output_dtype
|
| )
|
| else:
|
| raise ValueError(_i18n("unknown_stereo_mode"))
|
|
|
| output_path = write(
|
| namer.iter(output_path), audio_opt, tgt_sr, output_bitrate
|
| )
|
| return output_path
|
|
|
|
|
| def load_rvc_model(voice_model: str) -> Tuple[str, Optional[str]]:
|
| """
|
| Загрузить RVC модель
|
|
|
| Args:
|
| voice_model: Имя голосовой модели
|
|
|
| Returns:
|
| Кортеж (путь к PTH, путь к индексу)
|
| """
|
| if voice_model in model_manager.parse_voice_models():
|
| rvc_model_path, rvc_index_path = model_manager.parse_pth_and_index(voice_model)
|
|
|
| if not rvc_model_path:
|
| raise ValueError(
|
| _i18n("model_file_missing", model=voice_model)
|
| )
|
| return rvc_model_path, rvc_index_path
|
| else:
|
| raise ValueError(
|
| _i18n("model_not_found", model=voice_model)
|
| )
|
|
|
|
|
| def voice_conversion(
|
| voice_model: str,
|
| vocals_path: str,
|
| output_path: str,
|
| pitch: float,
|
| f0_method: str,
|
| index_rate: float,
|
| filter_radius: int,
|
| volume_envelope: float,
|
| protect: float,
|
| hop_length: int,
|
| f0_min: int,
|
| f0_max: int,
|
| format_output: str,
|
| output_bitrate: str,
|
| stereo_mode: str,
|
| embedder_name: str = "hubert_base",
|
| pipeline_mode: str = "orig",
|
| device: str = "cpu",
|
| add_text_progress: str = ""
|
| ) -> str:
|
| """
|
| Голосовое преобразование (fairseq)
|
|
|
| Args:
|
| voice_model: Имя голосовой модели
|
| vocals_path: Путь к вокалу
|
| output_path: Путь к выходному файлу
|
| pitch: Высота тона
|
| f0_method: Метод извлечения F0
|
| index_rate: Коэффициент влияния индекса
|
| filter_radius: Радиус фильтра
|
| volume_envelope: Огибающая громкости
|
| protect: Защита согласных
|
| hop_length: Длина шага
|
| f0_min: Минимальная частота F0
|
| f0_max: Максимальная частота F0
|
| format_output: Формат вывода
|
| output_bitrate: Битрейт
|
| stereo_mode: Режим стерео
|
| embedder_name: Имя эмбеддера
|
| pipeline_mode: Режим пайплайна
|
| device: Устройство
|
| add_text_progress: Дополнительный текст для прогресса
|
|
|
| Returns:
|
| Путь к выходному файлу
|
| """
|
| add_text: str = f"| {add_text_progress}" if add_text_progress else ""
|
|
|
| rvc_model_path, rvc_index_path = load_rvc_model(voice_model)
|
|
|
| progress = gr.Progress()
|
| progress((0, 4), desc=f"{_i18n('loading_rvc_model')} {add_text}")
|
|
|
| config = Config(device)
|
| progress((1, 4), desc=f"{_i18n('loading_hubert_model')} {add_text}")
|
|
|
| hubert_path = model_manager.check_hubert(embedder_name)
|
| if not hubert_path:
|
| raise ValueError(
|
| _i18n("embedder_not_found", embedder=embedder_name)
|
| )
|
|
|
| hubert_model = load_hubert(config.device, config.is_half, hubert_path)
|
| cpt, version, net_g, tgt_sr, vc, use_f0 = get_vc(
|
| config.device, config.is_half, config, rvc_model_path, "fairseq"
|
| )
|
|
|
| outputaudio = rvc_infer(
|
| rvc_index_path,
|
| index_rate,
|
| vocals_path,
|
| output_path,
|
| pitch,
|
| f0_method,
|
| cpt,
|
| version,
|
| net_g,
|
| filter_radius,
|
| tgt_sr,
|
| volume_envelope,
|
| protect,
|
| hop_length,
|
| vc,
|
| hubert_model,
|
| use_f0,
|
| f0_min,
|
| f0_max,
|
| format_output,
|
| output_bitrate,
|
| stereo_mode,
|
| pipeline_mode,
|
| add_text
|
| )
|
|
|
| del hubert_model, cpt, net_g, vc
|
| gc.collect()
|
| if torch.cuda.is_available():
|
| torch.cuda.empty_cache()
|
|
|
| return outputaudio
|
|
|
|
|
| def voice_conversion_transformers(
|
| voice_model: str,
|
| vocals_path: str,
|
| output_path: str,
|
| pitch: float,
|
| f0_method: str,
|
| index_rate: float,
|
| filter_radius: int,
|
| volume_envelope: float,
|
| protect: float,
|
| hop_length: int,
|
| f0_min: int,
|
| f0_max: int,
|
| format_output: str,
|
| output_bitrate: str,
|
| stereo_mode: str,
|
| embedder_name: str = "contentvec",
|
| pipeline_mode: str = "orig",
|
| device: str = "cpu",
|
| add_text_progress: str = ""
|
| ) -> str:
|
| """
|
| Голосовое преобразование (transformers)
|
|
|
| Args:
|
| voice_model: Имя голосовой модели
|
| vocals_path: Путь к вокалу
|
| output_path: Путь к выходному файлу
|
| pitch: Высота тона
|
| f0_method: Метод извлечения F0
|
| index_rate: Коэффициент влияния индекса
|
| filter_radius: Радиус фильтра
|
| volume_envelope: Огибающая громкости
|
| protect: Защита согласных
|
| hop_length: Длина шага
|
| f0_min: Минимальная частота F0
|
| f0_max: Максимальная частота F0
|
| format_output: Формат вывода
|
| output_bitrate: Битрейт
|
| stereo_mode: Режим стерео
|
| embedder_name: Имя эмбеддера
|
| pipeline_mode: Режим пайплайна
|
| device: Устройство
|
| add_text_progress: Дополнительный текст для прогресса
|
|
|
| Returns:
|
| Путь к выходному файлу
|
| """
|
| add_text: str = f"| {add_text_progress}" if add_text_progress else ""
|
|
|
| progress = gr.Progress()
|
| progress((0, 4), desc=f"{_i18n('loading_rvc_model')} {add_text}")
|
|
|
| rvc_model_path, rvc_index_path = load_rvc_model(voice_model)
|
|
|
| config = Config(device)
|
| progress((1, 4), desc=f"{_i18n('loading_hubert_model')} {add_text}")
|
|
|
| hubert_path = model_manager.check_hubert_transformers(embedder_name)
|
| if not hubert_path:
|
| raise ValueError(
|
| _i18n("embedder_not_found", embedder=embedder_name)
|
| )
|
|
|
| hubert_model = HubertModelWithFinalProj.from_pretrained(hubert_path)
|
| hubert_model = hubert_model.to(config.device)
|
| cpt, version, net_g, tgt_sr, vc, use_f0 = get_vc(
|
| config.device, config.is_half, config, rvc_model_path, "transformers"
|
| )
|
|
|
| outputaudio = rvc_infer(
|
| rvc_index_path,
|
| index_rate,
|
| vocals_path,
|
| output_path,
|
| pitch,
|
| f0_method,
|
| cpt,
|
| version,
|
| net_g,
|
| filter_radius,
|
| tgt_sr,
|
| volume_envelope,
|
| protect,
|
| hop_length,
|
| vc,
|
| hubert_model,
|
| use_f0,
|
| f0_min,
|
| f0_max,
|
| format_output,
|
| output_bitrate,
|
| stereo_mode,
|
| pipeline_mode,
|
| add_text
|
| )
|
|
|
| del hubert_model, cpt, net_g, vc
|
| gc.collect()
|
| if torch.cuda.is_available():
|
| torch.cuda.empty_cache()
|
|
|
| return outputaudio
|
|
|
|
|
| def vbach_inference(
|
| input_file: str,
|
| model_name: str,
|
| output_dir: str,
|
| output_name: str,
|
| output_format: str,
|
| output_bitrate: Union[str, int],
|
| pitch: int,
|
| method_pitch: str,
|
| format_name: bool = False,
|
| pipeline_mode: str = "orig",
|
| embedder_name: Optional[str] = "hubert_base",
|
| stack: str = "fairseq",
|
| add_params: Dict[str, Any] = {
|
| "index_rate": 0,
|
| "filter_radius": 3,
|
| "protect": 0.33,
|
| "rms": 0.25,
|
| "mangio_crepe_hop_length": 128,
|
| "f0_min": 50,
|
| "f0_max": 1100,
|
| "stereo_mode": "mono",
|
| },
|
| add_text_progress: str = "",
|
| device: str = "cpu"
|
| ) -> str:
|
| """
|
| Основная функция инференса Vbach
|
|
|
| Args:
|
| input_file: Путь к входному файлу
|
| model_name: Имя модели
|
| output_dir: Выходная директория
|
| output_name: Имя выходного файла
|
| output_format: Формат вывода
|
| output_bitrate: Битрейт
|
| pitch: Высота тона
|
| method_pitch: Метод извлечения F0
|
| format_name: Форматировать имя
|
| pipeline_mode: Режим пайплайна
|
| embedder_name: Имя эмбеддера
|
| stack: Стек
|
| add_params: Дополнительные параметры
|
| add_text_progress: Дополнительный текст для прогресса
|
| device: Устройство
|
|
|
| Returns:
|
| Путь к выходному файлу
|
| """
|
| if stack == "fairseq":
|
| vbach_convert = voice_conversion
|
| elif stack == "transformers":
|
| vbach_convert = voice_conversion_transformers
|
| else:
|
| raise ValueError(_i18n("unknown_stack", stack=stack))
|
|
|
| stereo_mode = add_params.get("stereo_mode", "mono")
|
| index_rate = add_params.get("index_rate", 0)
|
| filter_radius = add_params.get("filter_radius", 3)
|
| protect = add_params.get("protect", 0.33)
|
| rms = add_params.get("rms", 0.25)
|
| mangio_crepe_hop_length = add_params.get("mangio_crepe_hop_length", 128)
|
| f0_min = add_params.get("f0_min", 50)
|
| f0_max = add_params.get("f0_max", 1100)
|
|
|
| if not input_file:
|
| raise ValueError(_i18n("no_input_error"))
|
| if not os.path.exists(input_file):
|
| raise ValueError(_i18n("file_not_exists"))
|
| if not check(input_file):
|
| raise ValueError(_i18n("file_no_audio"))
|
|
|
| basename = os.path.splitext(os.path.basename(input_file))[0]
|
|
|
| final_output_name: Optional[str] = None
|
|
|
| print(_i18n("inference_started"))
|
|
|
| if format_name:
|
| cleaned_output_name_template = namer.sanitize(
|
| namer.dedup_template(
|
| output_name, keys=["NAME", "MODEL", "F0METHOD", "PITCH"]
|
| )
|
| )
|
| short_basename = namer.short_input_name_template(
|
| cleaned_output_name_template,
|
| MODEL=model_name,
|
| F0METHOD=method_pitch,
|
| PITCH=pitch,
|
| NAME=basename,
|
| )
|
| final_output_name = namer.template(
|
| cleaned_output_name_template,
|
| MODEL=model_name,
|
| F0METHOD=method_pitch,
|
| PITCH=pitch,
|
| NAME=short_basename,
|
| )
|
| else:
|
| final_output_name = output_name
|
|
|
| print(f"{_i18n('embedder')}: {embedder_name}")
|
| print(f"{_i18n('stack')}: {stack}")
|
|
|
| final_output_path = os.path.join(output_dir, f"{final_output_name}.{output_format}")
|
|
|
| output_converted_voice = vbach_convert(
|
| voice_model=model_name,
|
| vocals_path=input_file,
|
| output_path=final_output_path,
|
| pitch=pitch,
|
| f0_method=method_pitch,
|
| index_rate=index_rate,
|
| filter_radius=filter_radius,
|
| volume_envelope=rms,
|
| protect=protect,
|
| hop_length=mangio_crepe_hop_length,
|
| f0_min=f0_min,
|
| f0_max=f0_max,
|
| format_output=output_format,
|
| output_bitrate=str(output_bitrate),
|
| stereo_mode=stereo_mode,
|
| pipeline_mode=pipeline_mode,
|
| embedder_name=embedder_name,
|
| device=device,
|
| add_text_progress=add_text_progress
|
| )
|
|
|
| print(f"{_i18n('inference_complete')}\n{_i18n('output_path')}: \"{output_converted_voice}\"")
|
| return output_converted_voice
|
|
|
|
|
| class History:
|
| """Класс для управления историей преобразований"""
|
|
|
| def __init__(self, user_directory: UserDirectory) -> None:
|
| """
|
| Инициализация истории
|
|
|
| Args:
|
| user_directory: Пользовательская директория
|
| """
|
| self.info: Dict[str, List] = {}
|
| self.user_directory: UserDirectory = user_directory
|
| self.path: str = os.path.join(self.user_directory.path, "history", "vbach.json")
|
| os.makedirs(os.path.join(self.user_directory.path, "history"), exist_ok=True)
|
| self.load_from_file()
|
|
|
| def _save_to_file(func):
|
| """Декоратор для автоматического сохранения после вызова метода"""
|
| @wraps(func)
|
| def wrapper(self, *args, **kwargs):
|
| result = func(self, *args, **kwargs)
|
| self._write_file()
|
| return result
|
| return wrapper
|
|
|
| def _write_file(self) -> None:
|
| """Записывает текущее состояние в файл"""
|
| try:
|
| dir_path = os.path.dirname(self.path)
|
| if dir_path:
|
| os.makedirs(dir_path, exist_ok=True)
|
| with open(self.path, 'w', encoding='utf-8') as f:
|
| json.dump(self.info, f, indent=4, ensure_ascii=False)
|
| except Exception as e:
|
| print(f"{_i18n('error_writing_file')}: {e}")
|
|
|
| @_save_to_file
|
| def add(
|
| self,
|
| state: List,
|
| model_name: str,
|
| timestamp: str,
|
| f0_method: str,
|
| pitch: int
|
| ) -> None:
|
| """
|
| Добавить запись в историю
|
|
|
| Args:
|
| state: Состояние
|
| model_name: Имя модели
|
| timestamp: Временная метка
|
| f0_method: Метод извлечения F0
|
| pitch: Высота тона
|
| """
|
| self.info[f"{timestamp} / {model_name} / {f0_method} / {pitch}"] = state
|
|
|
| @_save_to_file
|
| def clear(self) -> None:
|
| """Очистить историю"""
|
| self.info = {}
|
|
|
| def get_list(self) -> List[str]:
|
| """
|
| Получить список записей истории
|
|
|
| Returns:
|
| Список ключей истории
|
| """
|
| return sorted([key for key in self.info], reverse=True)
|
|
|
| def get(self, key: str) -> List:
|
| """
|
| Получить запись истории по ключу
|
|
|
| Args:
|
| key: Ключ записи
|
|
|
| Returns:
|
| Запись истории
|
| """
|
| return self.info.get(key, [])
|
|
|
| def load_from_file(self) -> None:
|
| """Загрузить историю из файла"""
|
| if os.path.exists(self.path):
|
| with open(self.path, 'r', encoding='utf-8') as f:
|
| self.info = json.load(f)
|
|
|
|
|
| class Vbach(GradioHelper):
|
| """Класс для Gradio интерфейса Vbach"""
|
|
|
| def __init__(self, user_directory: UserDirectory, device: str) -> None:
|
| """
|
| Инициализация Vbach интерфейса
|
|
|
| Args:
|
| user_directory: Пользовательская директория
|
| device: Устройство
|
| """
|
| super().__init__()
|
| self.device: str = device
|
| self.pitch_methods: Tuple[str, ...] = f0_methods
|
| self.hop_length_values: Tuple[int, int] = (8, 512)
|
| self.index_rates_values: Tuple[int, int] = (0, 1)
|
| self.filter_radius_values: Tuple[int, int] = (0, 7)
|
| self.protect_values: Tuple[float, float] = (0, 0.5)
|
| self.rms_values: Tuple[int, int] = (0, 1)
|
| self.f0_min_values: Tuple[int, int] = (50, 3000)
|
| self.f0_max_values: Tuple[int, int] = (300, 6000)
|
| self.fairseq_embedders: List[str] = list(
|
| model_manager.huberts_fairseq_dict.keys()
|
| )
|
| self.transformers_embedders: List[str] = list(
|
| model_manager.huberts_transformers_dict.keys()
|
| )
|
| self.last_converted_state: List = []
|
| self.input_files: List[str] = []
|
| self.user_directory: UserDirectory = user_directory
|
|
|
| model_manager.__init__(self.user_directory)
|
| self.input_base_dir: str = os.path.join(user_directory.path, "input")
|
| self.inputs_json_path: str = os.path.join(self.input_base_dir, "inputs.json")
|
| self.output_base_dir: str = os.path.join(user_directory.path, "output", "vbach")
|
| self.history: History = History(self.user_directory)
|
| self.load_from_file()
|
|
|
| def _write_file(self) -> None:
|
| """Записывает текущее состояние в файл"""
|
| try:
|
| with open(self.inputs_json_path, 'w', encoding='utf-8') as f:
|
| json.dump(self.input_files, f, indent=4, ensure_ascii=False)
|
| except Exception as e:
|
| print(f"{_i18n('error_writing_file')}: {e}")
|
|
|
| def _save_to_file(func):
|
| """Декоратор для автоматического сохранения после вызова метода"""
|
| @wraps(func)
|
| def wrapper(self, *args, **kwargs):
|
| result = func(self, *args, **kwargs)
|
| self._write_file()
|
| return result
|
| return wrapper
|
|
|
| def load_from_file(self) -> None:
|
| """Загрузить историю из файла"""
|
| if os.path.exists(self.inputs_json_path):
|
| with open(self.inputs_json_path, 'r', encoding='utf-8') as f:
|
| self.input_files = json.load(f)
|
|
|
| @_save_to_file
|
| def clean(self) -> None:
|
| """Очистить список входных файлов"""
|
| self.input_files = []
|
|
|
| @_save_to_file
|
| def upload_files(self, input_files: List[str], copy: bool = False) -> List[str]:
|
| """
|
| Загрузить файлы в пользовательскую директорию
|
|
|
| Args:
|
| input_files: Список путей к файлам
|
| copy: Копировать вместо перемещения
|
|
|
| Returns:
|
| Список путей к загруженным файлам
|
| """
|
| if input_files:
|
| input_dir: str = os.path.join(
|
| self.input_base_dir,
|
| datetime.now(tz).strftime("%Y-%m-%d_%H-%M-%S")
|
| )
|
| os.makedirs(input_dir, exist_ok=True)
|
|
|
| valid_files: List[str] = [file for file in input_files if check(file)]
|
| valid_files_moved: List[str] = []
|
|
|
| if valid_files:
|
| for file in valid_files:
|
| basename: str = os.path.basename(file)
|
| output_path: str = os.path.join(input_dir, basename)
|
| if copy:
|
| shutil.copy(file, output_path)
|
| else:
|
| shutil.move(file, output_path)
|
| valid_files_moved.append(output_path)
|
| self.input_files.append(output_path)
|
| return valid_files_moved
|
| else:
|
| return []
|
|
|
| def vbach_convert_batch(
|
| self,
|
| input_files: List[str],
|
| model_name: str,
|
| pitch_method: str,
|
| pitch: float,
|
| hop_length: int,
|
| index_rate: float,
|
| filter_radius: int,
|
| rms: float,
|
| protect: float,
|
| f0_min: int,
|
| f0_max: int,
|
| output_name: str,
|
| format_name: bool,
|
| output_format: str,
|
| stereo_mode: str,
|
| alt_pipeline: bool,
|
| embedder_name: str,
|
| transformers_mode: bool,
|
| ) -> Tuple[gr.update, gr.update]:
|
| output_converted_files: List[str] = []
|
| progress = gr.Progress(track_tqdm=True)
|
| progress(progress=0, desc=_i18n("starting_conversion"))
|
|
|
| timestamp = datetime.now(tz).strftime("%Y-%m-%d_%H-%M-%S")
|
|
|
| if input_files:
|
| total_files = len(input_files)
|
| for i, file in enumerate(input_files, start=1):
|
| try:
|
| print(f"{_i18n('processing_file', current=i, total=total_files, file=file)}")
|
| progress(
|
| progress=(i / total_files),
|
| desc=_i18n("processing_file_title", current=i, total=total_files)
|
| )
|
| gr.Warning(
|
| title=_i18n("processing_file_title", current=i, total=total_files),
|
| message=file
|
| )
|
|
|
| out_conv = vbach_inference(
|
| input_file=file,
|
| model_name=model_name,
|
| output_dir=os.path.join(self.output_base_dir, timestamp),
|
| output_name=output_name,
|
| format_name=format_name if total_files == 1 else True,
|
| output_format=output_format,
|
| pitch=pitch,
|
| method_pitch=pitch_method,
|
| output_bitrate=320,
|
| add_params={
|
| "index_rate": index_rate,
|
| "filter_radius": filter_radius,
|
| "protect": protect,
|
| "rms": rms,
|
| "mangio_crepe_hop_length": hop_length,
|
| "f0_min": f0_min,
|
| "f0_max": f0_max,
|
| "stereo_mode": stereo_mode,
|
| },
|
| pipeline_mode="alt" if alt_pipeline else "orig",
|
| embedder_name=embedder_name,
|
| stack="transformers" if transformers_mode else "fairseq",
|
| add_text_progress=f"{i}/{total_files}",
|
| device=self.device
|
| )
|
| output_converted_files.append(out_conv)
|
| except Exception as e:
|
| print(f"{_i18n('error')}: {e}")
|
|
|
| if output_converted_files:
|
| self.history.add(output_converted_files, model_name, timestamp, pitch_method, pitch)
|
|
|
| return gr.update(value=str(output_converted_files)), gr.update(visible=False)
|
|
|
| @hf_spaces_gpu(duration=70)
|
| def vbach_convert_batch_zero_gpu(
|
| self,
|
| input_files: List[str],
|
| model_name: str,
|
| pitch_method: str,
|
| pitch: float,
|
| hop_length: int,
|
| index_rate: float,
|
| filter_radius: int,
|
| rms: float,
|
| protect: float,
|
| f0_min: int,
|
| f0_max: int,
|
| output_name: str,
|
| format_name: bool,
|
| output_format: str,
|
| stereo_mode: str,
|
| alt_pipeline: bool,
|
| embedder_name: str,
|
| transformers_mode: bool,
|
| ) -> Tuple[gr.update, gr.update]:
|
| output_converted_files: List[str] = []
|
| progress = gr.Progress(track_tqdm=True)
|
| progress(progress=0, desc=_i18n("starting_conversion"))
|
|
|
| timestamp = datetime.now(tz).strftime("%Y-%m-%d_%H-%M-%S")
|
|
|
| if input_files:
|
| total_files = len(input_files)
|
| for i, file in enumerate(input_files, start=1):
|
| try:
|
| print(f"{_i18n('processing_file', current=i, total=total_files, file=file)}")
|
| progress(
|
| progress=(i / total_files),
|
| desc=_i18n("processing_file_title", current=i, total=total_files)
|
| )
|
| gr.Warning(
|
| title=_i18n("processing_file_title", current=i, total=total_files),
|
| message=file
|
| )
|
|
|
| out_conv = vbach_inference(
|
| input_file=file,
|
| model_name=model_name,
|
| output_dir=os.path.join(self.output_base_dir, timestamp),
|
| output_name=output_name,
|
| format_name=format_name if total_files == 1 else True,
|
| output_format=output_format,
|
| pitch=pitch,
|
| method_pitch=pitch_method,
|
| output_bitrate=320,
|
| add_params={
|
| "index_rate": index_rate,
|
| "filter_radius": filter_radius,
|
| "protect": protect,
|
| "rms": rms,
|
| "mangio_crepe_hop_length": hop_length,
|
| "f0_min": f0_min,
|
| "f0_max": f0_max,
|
| "stereo_mode": stereo_mode,
|
| },
|
| pipeline_mode="alt" if alt_pipeline else "orig",
|
| embedder_name=embedder_name,
|
| stack="transformers" if transformers_mode else "fairseq",
|
| add_text_progress=f"{i}/{total_files}",
|
| device="cuda:0"
|
| )
|
| output_converted_files.append(out_conv)
|
| except Exception as e:
|
| print(f"{_i18n('error')}: {e}")
|
|
|
| if output_converted_files:
|
| self.history.add(output_converted_files, model_name, timestamp, pitch_method, pitch)
|
|
|
| return gr.update(value=str(output_converted_files)), gr.update(visible=False)
|
|
|
| def vbach_convert_duet(
|
| self,
|
| input_file: Optional[str],
|
| model_name1: str,
|
| model_name2: str,
|
| pitch_method1: str,
|
| pitch_method2: str,
|
| pitch1: float,
|
| pitch2: float,
|
| hop_length1: int,
|
| hop_length2: int,
|
| index_rate1: float,
|
| index_rate2: float,
|
| filter_radius1: int,
|
| filter_radius2: int,
|
| rms1: float,
|
| rms2: float,
|
| protect1: float,
|
| protect2: float,
|
| f0_min1: int,
|
| f0_min2: int,
|
| f0_max1: int,
|
| f0_max2: int,
|
| output_format: str,
|
| stereo_mode: str,
|
| alt_pipeline: bool,
|
| embedder_name1: str,
|
| embedder_name2: str,
|
| transformers_mode1: bool,
|
| transformers_mode2: bool,
|
| mix_duet: bool,
|
| mix_duet_ratio: float
|
| ) -> Tuple[Optional[Dict], Optional[Dict], gr.update]:
|
| output_1: Optional[str] = None
|
| output_2: Optional[str] = None
|
|
|
| progress = gr.Progress(track_tqdm=True)
|
| progress(progress=0, desc=_i18n("starting_conversion"))
|
|
|
| timestamp = datetime.now(tz).strftime("%Y-%m-%d_%H-%M-%S")
|
| output_dir = os.path.join(self.output_base_dir, timestamp)
|
|
|
| if input_file:
|
| try:
|
| gr.Warning(title=_i18n("model_1"), message="")
|
| output_1 = vbach_inference(
|
| input_file=input_file,
|
| model_name=model_name1,
|
| output_dir=output_dir,
|
| output_name="NAME - MODEL 1 - F0METHOD - PITCH",
|
| format_name=True,
|
| output_format=output_format,
|
| pitch=pitch1,
|
| method_pitch=pitch_method1,
|
| output_bitrate=320,
|
| add_params={
|
| "index_rate": index_rate1,
|
| "filter_radius": filter_radius1,
|
| "protect": protect1,
|
| "rms": rms1,
|
| "mangio_crepe_hop_length": hop_length1,
|
| "f0_min": f0_min1,
|
| "f0_max": f0_max1,
|
| "stereo_mode": stereo_mode,
|
| },
|
| pipeline_mode="alt" if alt_pipeline else "orig",
|
| embedder_name=embedder_name1,
|
| stack="transformers" if transformers_mode1 else "fairseq",
|
| add_text_progress=_i18n("model_1"),
|
| device=self.device
|
| )
|
|
|
| gr.Warning(title=_i18n("model_2"), message="")
|
| output_2 = vbach_inference(
|
| input_file=input_file,
|
| model_name=model_name2,
|
| output_dir=output_dir,
|
| output_name="NAME - MODEL 2 - F0METHOD - PITCH",
|
| format_name=True,
|
| output_format=output_format,
|
| pitch=pitch2,
|
| method_pitch=pitch_method2,
|
| output_bitrate=320,
|
| add_params={
|
| "index_rate": index_rate2,
|
| "filter_radius": filter_radius2,
|
| "protect": protect2,
|
| "rms": rms2,
|
| "mangio_crepe_hop_length": hop_length2,
|
| "f0_min": f0_min2,
|
| "f0_max": f0_max2,
|
| "stereo_mode": stereo_mode,
|
| },
|
| pipeline_mode="alt" if alt_pipeline else "orig",
|
| embedder_name=embedder_name2,
|
| stack="transformers" if transformers_mode2 else "fairseq",
|
| add_text_progress=_i18n("model_2"),
|
| device=self.device
|
| )
|
|
|
| except Exception as e:
|
| print(f"{_i18n('error')}: {e}")
|
| return (
|
| gr.update(value=None),
|
| gr.update(value=None)
|
| )
|
|
|
| if mix_duet and output_1 and output_2:
|
| input_file_basename = os.path.splitext(os.path.basename(input_file))[0] if input_file else "duet"
|
| mix1, sr1 = read(output_1)
|
| mix2, sr2 = read(output_2)
|
| max_sr = max(sr1, sr2)
|
| fitted_arrays = fit_arrays([mix1, mix2], [sr1, sr2], min_sr=max_sr)
|
| g1 = (1 - mix_duet_ratio) / 2
|
| g2 = (1 + mix_duet_ratio) / 2
|
| mixed_duet = gain(fitted_arrays[0], g1) + gain(fitted_arrays[1], g2)
|
| shorted_name = namer.short(input_file_basename, length=50)
|
| sanitized_name = namer.sanitize(f"{model_name1}, {model_name2} - {shorted_name}")
|
| output_mixed = write(
|
| os.path.join(output_dir, f"{sanitized_name}.{output_format}"),
|
| mixed_duet,
|
| max_sr
|
| )
|
| self.history.add(
|
| [output_mixed],
|
| f"{model_name1}|{model_name2}",
|
| timestamp,
|
| f"{pitch_method1}|{pitch_method2}",
|
| f"{pitch1}|{pitch2}"
|
| )
|
| return (
|
| self.return_audio_with_size(label=_i18n("mixed_result"), value=output_mixed),
|
| gr.update(label=_i18n("model_2_result"), value=None),
|
| )
|
| elif output_1 and output_2:
|
| self.history.add(
|
| [output_1, output_2],
|
| f"{model_name1}|{model_name2}",
|
| timestamp,
|
| f"{pitch_method1}|{pitch_method2}",
|
| f"{pitch1}|{pitch2}"
|
| )
|
| return (
|
| self.return_audio_with_size(label=_i18n("model_1_result"), value=output_1),
|
| self.return_audio_with_size(label=_i18n("model_2_result"), value=output_2),
|
| )
|
| else:
|
| return (
|
| gr.update(value=None),
|
| gr.update(value=None)
|
| )
|
|
|
| @hf_spaces_gpu(duration=70)
|
| def vbach_convert_duet_zero_gpu(
|
| self,
|
| input_file: Optional[str],
|
| model_name1: str,
|
| model_name2: str,
|
| pitch_method1: str,
|
| pitch_method2: str,
|
| pitch1: float,
|
| pitch2: float,
|
| hop_length1: int,
|
| hop_length2: int,
|
| index_rate1: float,
|
| index_rate2: float,
|
| filter_radius1: int,
|
| filter_radius2: int,
|
| rms1: float,
|
| rms2: float,
|
| protect1: float,
|
| protect2: float,
|
| f0_min1: int,
|
| f0_min2: int,
|
| f0_max1: int,
|
| f0_max2: int,
|
| output_format: str,
|
| stereo_mode: str,
|
| alt_pipeline: bool,
|
| embedder_name1: str,
|
| embedder_name2: str,
|
| transformers_mode1: bool,
|
| transformers_mode2: bool,
|
| mix_duet: bool,
|
| mix_duet_ratio: float
|
| ) -> Tuple[Optional[Dict], Optional[Dict], gr.update]:
|
| output_1: Optional[str] = None
|
| output_2: Optional[str] = None
|
|
|
| progress = gr.Progress(track_tqdm=True)
|
| progress(progress=0, desc=_i18n("starting_conversion"))
|
|
|
| timestamp = datetime.now(tz).strftime("%Y-%m-%d_%H-%M-%S")
|
| output_dir = os.path.join(self.output_base_dir, timestamp)
|
|
|
| if input_file:
|
| try:
|
| gr.Warning(title=_i18n("model_1"), message="")
|
| output_1 = vbach_inference(
|
| input_file=input_file,
|
| model_name=model_name1,
|
| output_dir=output_dir,
|
| output_name="NAME - MODEL 1 - F0METHOD - PITCH",
|
| format_name=True,
|
| output_format=output_format,
|
| pitch=pitch1,
|
| method_pitch=pitch_method1,
|
| output_bitrate=320,
|
| add_params={
|
| "index_rate": index_rate1,
|
| "filter_radius": filter_radius1,
|
| "protect": protect1,
|
| "rms": rms1,
|
| "mangio_crepe_hop_length": hop_length1,
|
| "f0_min": f0_min1,
|
| "f0_max": f0_max1,
|
| "stereo_mode": stereo_mode,
|
| },
|
| pipeline_mode="alt" if alt_pipeline else "orig",
|
| embedder_name=embedder_name1,
|
| stack="transformers" if transformers_mode1 else "fairseq",
|
| add_text_progress=_i18n("model_1"),
|
| device="cuda:0"
|
| )
|
|
|
| gr.Warning(title=_i18n("model_2"), message="")
|
| output_2 = vbach_inference(
|
| input_file=input_file,
|
| model_name=model_name2,
|
| output_dir=output_dir,
|
| output_name="NAME - MODEL 2 - F0METHOD - PITCH",
|
| format_name=True,
|
| output_format=output_format,
|
| pitch=pitch2,
|
| method_pitch=pitch_method2,
|
| output_bitrate=320,
|
| add_params={
|
| "index_rate": index_rate2,
|
| "filter_radius": filter_radius2,
|
| "protect": protect2,
|
| "rms": rms2,
|
| "mangio_crepe_hop_length": hop_length2,
|
| "f0_min": f0_min2,
|
| "f0_max": f0_max2,
|
| "stereo_mode": stereo_mode,
|
| },
|
| pipeline_mode="alt" if alt_pipeline else "orig",
|
| embedder_name=embedder_name2,
|
| stack="transformers" if transformers_mode2 else "fairseq",
|
| add_text_progress=_i18n("model_2"),
|
| device="cuda:0"
|
| )
|
|
|
| except Exception as e:
|
| print(f"{_i18n('error')}: {e}")
|
| return (
|
| gr.update(value=None),
|
| gr.update(value=None)
|
| )
|
|
|
| if mix_duet and output_1 and output_2:
|
| input_file_basename = os.path.splitext(os.path.basename(input_file))[0] if input_file else "duet"
|
| mix1, sr1 = read(output_1)
|
| mix2, sr2 = read(output_2)
|
| max_sr = max(sr1, sr2)
|
| fitted_arrays = fit_arrays([mix1, mix2], [sr1, sr2], min_sr=max_sr)
|
| g1 = (1 - mix_duet_ratio) / 2
|
| g2 = (1 + mix_duet_ratio) / 2
|
| mixed_duet = gain(fitted_arrays[0], g1) + gain(fitted_arrays[1], g2)
|
| shorted_name = namer.short(input_file_basename, length=50)
|
| sanitized_name = namer.sanitize(f"{model_name1}, {model_name2} - {shorted_name}")
|
| output_mixed = write(
|
| os.path.join(output_dir, f"{sanitized_name}.{output_format}"),
|
| mixed_duet,
|
| max_sr
|
| )
|
| self.history.add(
|
| [output_mixed],
|
| f"{model_name1}|{model_name2}",
|
| timestamp,
|
| f"{pitch_method1}|{pitch_method2}",
|
| f"{pitch1}|{pitch2}"
|
| )
|
| return (
|
| self.return_audio_with_size(label=_i18n("mixed_result"), value=output_mixed),
|
| gr.update(label=_i18n("model_2_result"), value=None),
|
| )
|
| elif output_1 and output_2:
|
| self.history.add(
|
| [output_1, output_2],
|
| f"{model_name1}|{model_name2}",
|
| timestamp,
|
| f"{pitch_method1}|{pitch_method2}",
|
| f"{pitch1}|{pitch2}"
|
| )
|
| return (
|
| self.return_audio_with_size(label=_i18n("model_1_result"), value=output_1),
|
| self.return_audio_with_size(label=_i18n("model_2_result"), value=output_2),
|
| )
|
| else:
|
| return (
|
| gr.update(value=None),
|
| gr.update(value=None)
|
| )
|
|
|
| def UI(self) -> gr.Blocks:
|
| """
|
| Создать пользовательский интерфейс
|
|
|
| Returns:
|
| Блоки интерфейса Gradio
|
| """
|
| with gr.Blocks() as vbach_app:
|
| with gr.Tab(_i18n("tab_inference")):
|
| with gr.Row():
|
| with gr.Column():
|
| with gr.Group():
|
| upload = gr.Files(
|
| show_label=False,
|
| type="filepath",
|
| interactive=True
|
| )
|
| refresh_input_btn = gr.Button(
|
| _i18n("refresh"),
|
| variant="primary",
|
| interactive=True
|
| )
|
| list_input_files = gr.Dropdown(
|
| label=_i18n("select_input_files"),
|
| choices=reversed(self.input_files) if self.input_files else [],
|
| value=[],
|
| multiselect=True,
|
| interactive=True,
|
| filterable=False,
|
| scale=15
|
| )
|
|
|
| gr.on(
|
| fn=lambda: gr.update(choices=reversed(self.input_files) if self.input_files else [], value=[]),
|
| outputs=list_input_files,
|
| trigger_mode="once"
|
| )
|
|
|
| refresh_input_btn.click(
|
| lambda: gr.update(choices=reversed(self.input_files) if self.input_files else [], value=[]),
|
| outputs=list_input_files
|
| )
|
|
|
| @upload.upload(inputs=[upload], outputs=[list_input_files, upload])
|
| def upload_files(input_files: List[str]) -> Tuple[gr.update, gr.update]:
|
| files = self.upload_files(input_files)
|
| return (
|
| gr.update(choices=reversed(self.input_files) if self.input_files else [], value=files),
|
| gr.update(value=[])
|
| )
|
|
|
| converted_state = gr.Textbox(
|
| label=_i18n("conversion_status"),
|
| interactive=False,
|
| value="",
|
| visible=False,
|
| )
|
|
|
| with gr.Column():
|
| with gr.Group():
|
| with gr.Group():
|
| model_name = gr.Dropdown(
|
| label=_i18n("model_name"),
|
| interactive=True
|
| )
|
| model_list_refresh_btn = gr.Button(
|
| _i18n("refresh"),
|
| variant="secondary",
|
| interactive=True
|
| )
|
|
|
| @model_list_refresh_btn.click(outputs=[model_name])
|
| def refresh_list_voice_models() -> gr.update:
|
| models = model_manager.parse_voice_models()
|
| first_model = models[0] if models else None
|
| return gr.update(choices=models, value=first_model)
|
|
|
| with gr.Group():
|
| pitch_method = gr.Dropdown(
|
| label=_i18n("f0_method"),
|
| choices=self.pitch_methods,
|
| value=self.pitch_methods[0] if self.pitch_methods else "rmvpe+",
|
| interactive=True,
|
| filterable=False
|
| )
|
| pitch = gr.Slider(
|
| label=_i18n("pitch"),
|
| minimum=-48,
|
| maximum=48,
|
| step=0.5,
|
| value=0,
|
| interactive=True,
|
| )
|
| hop_length = gr.Slider(
|
| label=_i18n("hop_length"),
|
| info=_i18n("hop_length_info"),
|
| minimum=self.hop_length_values[0],
|
| maximum=self.hop_length_values[1],
|
| step=8,
|
| value=128,
|
| interactive=True,
|
| visible=False,
|
| )
|
|
|
| @pitch_method.change(
|
| inputs=[pitch_method], outputs=[hop_length]
|
| )
|
| def show_mangio_crepe_hop_length(pitch_method: str) -> gr.update:
|
| return gr.update(
|
| visible=(
|
| pitch_method
|
| in ["mangio-crepe", "mangio-crepe-tiny", "pyin"]
|
| )
|
| )
|
|
|
| with gr.Accordion(label=_i18n("additional_settings"), open=False):
|
| with gr.Group():
|
| with gr.Accordion(label=_i18n("audio_processing"), open=False):
|
| with gr.Group():
|
| stereo_mode = gr.Radio(
|
| choices=["mono", "left/right", "sim/dif"],
|
| label=_i18n("stereo_mode"),
|
| info=_i18n("stereo_mode_info"),
|
| value="mono",
|
| interactive=True,
|
| )
|
| alt_pl = gr.Checkbox(
|
| label=_i18n("alt_pipeline"),
|
| info=_i18n("alt_pipeline_info"),
|
| value=False,
|
| interactive=True,
|
| )
|
| with gr.Accordion(label=_i18n("inference"), open=False):
|
| with gr.Group():
|
| with gr.Row():
|
| index_rate = gr.Slider(
|
| label=_i18n("index_rate"),
|
| info=_i18n("index_rate_info"),
|
| minimum=self.index_rates_values[0],
|
| maximum=self.index_rates_values[1],
|
| step=0.05,
|
| value=0,
|
| interactive=True,
|
| )
|
| filter_radius = gr.Slider(
|
| label=_i18n("filter_radius"),
|
| info=_i18n("filter_radius_info"),
|
| minimum=self.filter_radius_values[0],
|
| maximum=self.filter_radius_values[1],
|
| step=1,
|
| value=3,
|
| interactive=True,
|
| )
|
| with gr.Row():
|
| rms = gr.Slider(
|
| label=_i18n("rms_envelope"),
|
| info=_i18n("rms_info"),
|
| minimum=self.rms_values[0],
|
| maximum=self.rms_values[1],
|
| step=0.05,
|
| value=0.25,
|
| interactive=True,
|
| )
|
| protect = gr.Slider(
|
| label=_i18n("protect"),
|
| info=_i18n("protect_info"),
|
| minimum=self.protect_values[0],
|
| maximum=self.protect_values[1],
|
| step=0.05,
|
| value=0.35,
|
| interactive=True,
|
| )
|
| with gr.Accordion(label=_i18n("f0_range"), open=False):
|
| with gr.Group():
|
| with gr.Row():
|
| f0_min = gr.Slider(
|
| label=_i18n("f0_min"),
|
| minimum=self.f0_min_values[0],
|
| maximum=self.f0_min_values[1],
|
| step=10,
|
| value=50,
|
| interactive=True,
|
| )
|
| f0_max = gr.Slider(
|
| label=_i18n("f0_max"),
|
| minimum=self.f0_max_values[0],
|
| maximum=self.f0_max_values[1],
|
| step=10,
|
| value=1100,
|
| interactive=True,
|
| )
|
| with gr.Accordion(label=_i18n("embedder"), open=False):
|
| with gr.Group():
|
| embedder_name = gr.Radio(
|
| label=_i18n("hubert_model"),
|
| choices=self.fairseq_embedders,
|
| value=self.fairseq_embedders[0] if self.fairseq_embedders else "hubert_base",
|
| )
|
| transformers_mode = gr.Checkbox(
|
| label=_i18n("use_transformers"),
|
| value=False,
|
| interactive=True,
|
| )
|
|
|
| @transformers_mode.change(
|
| inputs=[transformers_mode], outputs=[embedder_name]
|
| )
|
| def change_embedders(tr_m: bool) -> gr.update:
|
| if tr_m:
|
| return gr.update(
|
| value=self.transformers_embedders[0] if self.transformers_embedders else None,
|
| choices=self.transformers_embedders,
|
| )
|
| else:
|
| return gr.update(
|
| choices=self.fairseq_embedders,
|
| value=self.fairseq_embedders[0] if self.fairseq_embedders else None,
|
| )
|
|
|
| with gr.Accordion(label=_i18n("output_filename"), open=False):
|
| with gr.Group():
|
| output_name = gr.Textbox(
|
| label=_i18n("output_filename"),
|
| interactive=True,
|
| value="NAME - MODEL - F0METHOD - PITCH",
|
| )
|
| format_output_name_check = gr.Checkbox(
|
| label=_i18n("format_name"),
|
| info=_i18n("format_name_info"),
|
| value=True,
|
| interactive=True,
|
| )
|
|
|
| with gr.Group():
|
| output_format = gr.Dropdown(
|
| label=_i18n("output_format"),
|
| interactive=True,
|
| choices=output_formats,
|
| value=output_formats[0] if output_formats else "wav",
|
| filterable=False,
|
| )
|
| status = gr.Textbox(
|
| container=False,
|
| lines=4,
|
| interactive=False,
|
| max_lines=4,
|
| visible=False
|
| )
|
| convert_btn = gr.Button(
|
| _i18n("convert_btn"),
|
| variant="primary",
|
| interactive=True
|
| ).click(
|
| lambda: gr.update(visible=True),
|
| outputs=[status]
|
| )
|
|
|
| @convert_btn.then(
|
| inputs=[
|
| list_input_files,
|
| model_name,
|
| pitch_method,
|
| pitch,
|
| hop_length,
|
| index_rate,
|
| filter_radius,
|
| rms,
|
| protect,
|
| f0_min,
|
| f0_max,
|
| output_name,
|
| format_output_name_check,
|
| output_format,
|
| stereo_mode,
|
| alt_pl,
|
| embedder_name,
|
| transformers_mode,
|
| ],
|
| outputs=[converted_state, status],
|
| queue=True
|
| )
|
| def vbach_convert_batch_fn(
|
| input_files: List[str],
|
| model_name: str,
|
| pitch_method: str,
|
| pitch: float,
|
| hop_length: int,
|
| index_rate: float,
|
| filter_radius: int,
|
| rms: float,
|
| protect: float,
|
| f0_min: int,
|
| f0_max: int,
|
| output_name: str,
|
| format_name: bool,
|
| output_format: str,
|
| stereo_mode: str,
|
| alt_pipeline: bool,
|
| embedder_name: str,
|
| transformers_mode: bool,
|
| ) -> Tuple[gr.update, gr.update]:
|
|
|
| vbach_batch = self.vbach_convert_batch_zero_gpu if zerogpu_available else self.vbach_convert_batch
|
| return vbach_batch(
|
| input_files=input_files,
|
| model_name=model_name,
|
| pitch_method=pitch_method,
|
| pitch=pitch,
|
| hop_length=hop_length,
|
| index_rate=index_rate,
|
| filter_radius=filter_radius,
|
| rms=rms,
|
| protect=protect,
|
| f0_min=f0_min,
|
| f0_max=f0_max,
|
| output_name=output_name,
|
| format_name=format_name,
|
| output_format=output_format,
|
| stereo_mode=stereo_mode,
|
| alt_pipeline=alt_pipeline,
|
| embedder_name=embedder_name,
|
| transformers_mode=transformers_mode
|
| )
|
|
|
|
|
|
|
| with gr.Column(variant="panel"):
|
| gr.Markdown(f"<center><h3>{_i18n('results')}</h3></center>")
|
|
|
| with gr.Group():
|
| with gr.Row(equal_height=True):
|
| list_conversions = gr.Dropdown(
|
| label=_i18n("select_conversion_results"),
|
| choices=[],
|
| value=None,
|
| interactive=True,
|
| scale=14
|
| )
|
|
|
| list_conversions.change(
|
| lambda x: gr.update(value=str(self.history.get(x))),
|
| inputs=[list_conversions],
|
| outputs=[converted_state]
|
| )
|
|
|
| refresh_conversions_btn = gr.Button(
|
| _i18n("refresh"),
|
| scale=2,
|
| interactive=True
|
| )
|
| refresh_conversions_btn.click(
|
| lambda: gr.update(choices=self.history.get_list(), value=None),
|
| outputs=[list_conversions]
|
| )
|
|
|
| gr.on(
|
| fn=lambda: gr.update(choices=self.history.get_list(), value=None),
|
| outputs=[list_conversions]
|
| )
|
|
|
| @gr.render(inputs=[converted_state])
|
| def show_players_converted(state: str) -> None:
|
| if state:
|
| try:
|
| output_converted_files = ast.literal_eval(state)
|
| if output_converted_files:
|
| with gr.Group():
|
| for conv_file in output_converted_files:
|
| basename = os.path.splitext(
|
| os.path.basename(conv_file)
|
| )[0]
|
| self.define_audio_with_size(
|
| label=basename,
|
| value=conv_file,
|
| type="filepath",
|
| interactive=False,
|
| show_download_button=True,
|
| )
|
| except:
|
| pass
|
|
|
| with gr.TabItem(_i18n("tab_duet")):
|
| with gr.Column():
|
| with gr.Group():
|
| upload_duet = gr.File(
|
| show_label=False,
|
| type="filepath",
|
| interactive=True
|
| )
|
| refresh_input_btn_duet = gr.Button(
|
| _i18n("refresh"),
|
| variant="primary",
|
| interactive=True
|
| )
|
| list_input_files_duet = gr.Dropdown(
|
| label=_i18n("select_input_files"),
|
| choices=self.input_files,
|
| value=None,
|
| multiselect=False,
|
| interactive=True,
|
| filterable=False,
|
| scale=15
|
| )
|
|
|
| gr.on(
|
| fn=lambda: gr.update(choices=reversed(self.input_files) if self.input_files else [], value=None),
|
| outputs=list_input_files_duet,
|
| trigger_mode="once"
|
| )
|
|
|
| refresh_input_btn_duet.click(
|
| lambda: gr.update(choices=reversed(self.input_files) if self.input_files else [], value=None),
|
| outputs=list_input_files_duet
|
| )
|
|
|
| @upload_duet.upload(
|
| inputs=[upload_duet],
|
| outputs=[list_input_files_duet, upload_duet]
|
| )
|
| def upload_files(input_file: str) -> Tuple[gr.update, gr.update]:
|
| files = self.upload_files([input_file])
|
| return (
|
| gr.update(choices=reversed(self.input_files) if self.input_files else [], value=files[0] if files else None),
|
| gr.update(value=None)
|
| )
|
|
|
| with gr.Row():
|
| with gr.Column():
|
| gr.Markdown(f"<h3><center>{_i18n('model')} 1</center></h3>")
|
| with gr.Group():
|
| model_name1 = gr.Dropdown(
|
| label=_i18n("model_name"),
|
| interactive=True
|
| )
|
|
|
| pitch_method1 = gr.Dropdown(
|
| label=_i18n("f0_method"),
|
| choices=self.pitch_methods,
|
| value=self.pitch_methods[0] if self.pitch_methods else "rmvpe+",
|
| interactive=True,
|
| filterable=False
|
| )
|
| pitch1 = gr.Slider(
|
| label=_i18n("pitch"),
|
| minimum=-48,
|
| maximum=48,
|
| step=0.5,
|
| value=0,
|
| interactive=True,
|
| )
|
| hop_length1 = gr.Slider(
|
| label=_i18n("hop_length"),
|
| info=_i18n("hop_length_info"),
|
| minimum=self.hop_length_values[0],
|
| maximum=self.hop_length_values[1],
|
| step=8,
|
| value=128,
|
| interactive=True,
|
| visible=False,
|
| )
|
|
|
| @pitch_method1.change(
|
| inputs=[pitch_method1], outputs=[hop_length1]
|
| )
|
| def show_mangio_crepe_hop_length(pitch_method: str) -> gr.update:
|
| return gr.update(
|
| visible=(
|
| pitch_method
|
| in ["mangio-crepe", "mangio-crepe-tiny", "pyin"]
|
| )
|
| )
|
|
|
| with gr.Accordion(label=_i18n("additional_settings"), open=False):
|
| with gr.Group():
|
| with gr.Accordion(label=_i18n("inference"), open=False):
|
| with gr.Group():
|
| with gr.Row():
|
| index_rate1 = gr.Slider(
|
| label=_i18n("index_rate"),
|
| info=_i18n("index_rate_info"),
|
| minimum=self.index_rates_values[0],
|
| maximum=self.index_rates_values[1],
|
| step=0.05,
|
| value=0,
|
| interactive=True,
|
| )
|
| filter_radius1 = gr.Slider(
|
| label=_i18n("filter_radius"),
|
| info=_i18n("filter_radius_info"),
|
| minimum=self.filter_radius_values[0],
|
| maximum=self.filter_radius_values[1],
|
| step=1,
|
| value=3,
|
| interactive=True,
|
| )
|
| with gr.Row():
|
| rms1 = gr.Slider(
|
| label=_i18n("rms_envelope"),
|
| info=_i18n("rms_info"),
|
| minimum=self.rms_values[0],
|
| maximum=self.rms_values[1],
|
| step=0.05,
|
| value=0.25,
|
| interactive=True,
|
| )
|
| protect1 = gr.Slider(
|
| label=_i18n("protect"),
|
| info=_i18n("protect_info"),
|
| minimum=self.protect_values[0],
|
| maximum=self.protect_values[1],
|
| step=0.05,
|
| value=0.35,
|
| interactive=True,
|
| )
|
| with gr.Accordion(label=_i18n("f0_range"), open=False):
|
| with gr.Group():
|
| with gr.Row():
|
| f0_min1 = gr.Slider(
|
| label=_i18n("f0_min"),
|
| minimum=self.f0_min_values[0],
|
| maximum=self.f0_min_values[1],
|
| step=10,
|
| value=50,
|
| interactive=True,
|
| )
|
| f0_max1 = gr.Slider(
|
| label=_i18n("f0_max"),
|
| minimum=self.f0_max_values[0],
|
| maximum=self.f0_max_values[1],
|
| step=10,
|
| value=1100,
|
| interactive=True,
|
| )
|
| with gr.Accordion(label=_i18n("embedder"), open=False):
|
| with gr.Group():
|
| embedder_name1 = gr.Radio(
|
| label=_i18n("hubert_model"),
|
| choices=self.fairseq_embedders,
|
| value=self.fairseq_embedders[0] if self.fairseq_embedders else "hubert_base",
|
| )
|
| transformers_mode1 = gr.Checkbox(
|
| label=_i18n("use_transformers"),
|
| value=False,
|
| interactive=True,
|
| )
|
|
|
| @transformers_mode1.change(
|
| inputs=[transformers_mode1],
|
| outputs=[embedder_name1]
|
| )
|
| def change_embedders(tr_m: bool) -> gr.update:
|
| if tr_m:
|
| return gr.update(
|
| value=self.transformers_embedders[0] if self.transformers_embedders else None,
|
| choices=self.transformers_embedders,
|
| )
|
| else:
|
| return gr.update(
|
| choices=self.fairseq_embedders,
|
| value=self.fairseq_embedders[0] if self.fairseq_embedders else None,
|
| )
|
|
|
| with gr.Column():
|
| gr.Markdown(f"<h3><center>{_i18n('model')} 2</center></h3>")
|
| with gr.Group():
|
| model_name2 = gr.Dropdown(
|
| label=_i18n("model_name"),
|
| interactive=True
|
| )
|
|
|
| pitch_method2 = gr.Dropdown(
|
| label=_i18n("f0_method"),
|
| choices=self.pitch_methods,
|
| value=self.pitch_methods[0] if self.pitch_methods else "rmvpe+",
|
| interactive=True,
|
| filterable=False
|
| )
|
| pitch2 = gr.Slider(
|
| label=_i18n("pitch"),
|
| minimum=-48,
|
| maximum=48,
|
| step=0.5,
|
| value=0,
|
| interactive=True,
|
| )
|
| hop_length2 = gr.Slider(
|
| label=_i18n("hop_length"),
|
| info=_i18n("hop_length_info"),
|
| minimum=self.hop_length_values[0],
|
| maximum=self.hop_length_values[1],
|
| step=8,
|
| value=128,
|
| interactive=True,
|
| visible=False,
|
| )
|
|
|
| @pitch_method2.change(
|
| inputs=[pitch_method2], outputs=[hop_length2]
|
| )
|
| def show_mangio_crepe_hop_length(pitch_method: str) -> gr.update:
|
| return gr.update(
|
| visible=(
|
| pitch_method
|
| in ["mangio-crepe", "mangio-crepe-tiny", "pyin"]
|
| )
|
| )
|
|
|
| with gr.Accordion(label=_i18n("additional_settings"), open=False):
|
| with gr.Group():
|
| with gr.Accordion(label=_i18n("inference"), open=False):
|
| with gr.Group():
|
| with gr.Row():
|
| index_rate2 = gr.Slider(
|
| label=_i18n("index_rate"),
|
| info=_i18n("index_rate_info"),
|
| minimum=self.index_rates_values[0],
|
| maximum=self.index_rates_values[1],
|
| step=0.05,
|
| value=0,
|
| interactive=True,
|
| )
|
| filter_radius2 = gr.Slider(
|
| label=_i18n("filter_radius"),
|
| info=_i18n("filter_radius_info"),
|
| minimum=self.filter_radius_values[0],
|
| maximum=self.filter_radius_values[1],
|
| step=1,
|
| value=3,
|
| interactive=True,
|
| )
|
| with gr.Row():
|
| rms2 = gr.Slider(
|
| label=_i18n("rms_envelope"),
|
| info=_i18n("rms_info"),
|
| minimum=self.rms_values[0],
|
| maximum=self.rms_values[1],
|
| step=0.05,
|
| value=0.25,
|
| interactive=True,
|
| )
|
| protect2 = gr.Slider(
|
| label=_i18n("protect"),
|
| info=_i18n("protect_info"),
|
| minimum=self.protect_values[0],
|
| maximum=self.protect_values[1],
|
| step=0.05,
|
| value=0.35,
|
| interactive=True,
|
| )
|
| with gr.Accordion(label=_i18n("f0_range"), open=False):
|
| with gr.Group():
|
| with gr.Row():
|
| f0_min2 = gr.Slider(
|
| label=_i18n("f0_min"),
|
| minimum=self.f0_min_values[0],
|
| maximum=self.f0_min_values[1],
|
| step=10,
|
| value=50,
|
| interactive=True,
|
| )
|
| f0_max2 = gr.Slider(
|
| label=_i18n("f0_max"),
|
| minimum=self.f0_max_values[0],
|
| maximum=self.f0_max_values[1],
|
| step=10,
|
| value=1100,
|
| interactive=True,
|
| )
|
| with gr.Accordion(label=_i18n("embedder"), open=False):
|
| with gr.Group():
|
| embedder_name2 = gr.Radio(
|
| label=_i18n("hubert_model"),
|
| choices=self.fairseq_embedders,
|
| value=self.fairseq_embedders[0] if self.fairseq_embedders else "hubert_base",
|
| )
|
| transformers_mode2 = gr.Checkbox(
|
| label=_i18n("use_transformers"),
|
| value=False,
|
| interactive=True,
|
| )
|
|
|
| @transformers_mode2.change(
|
| inputs=[transformers_mode2],
|
| outputs=[embedder_name2]
|
| )
|
| def change_embedders(tr_m: bool) -> gr.update:
|
| if tr_m:
|
| return gr.update(
|
| value=self.transformers_embedders[0] if self.transformers_embedders else None,
|
| choices=self.transformers_embedders,
|
| )
|
| else:
|
| return gr.update(
|
| choices=self.fairseq_embedders,
|
| value=self.fairseq_embedders[0] if self.fairseq_embedders else None,
|
| )
|
|
|
| with gr.Group():
|
| model_list_refresh_btn = gr.Button(
|
| _i18n("refresh_models"),
|
| variant="secondary",
|
| interactive=True
|
| )
|
|
|
| @model_list_refresh_btn.click(outputs=[model_name1, model_name2])
|
| def refresh_list_voice_models() -> Tuple[gr.update, gr.update]:
|
| models = model_manager.parse_voice_models()
|
| first_model = models[0] if models else None
|
| return (
|
| gr.update(choices=models, value=first_model),
|
| gr.update(choices=models, value=first_model)
|
| )
|
|
|
| stereo_mode_duet = gr.Radio(
|
| choices=["mono", "left/right", "sim/dif"],
|
| label=_i18n("stereo_mode"),
|
| info=_i18n("stereo_mode_info"),
|
| value="mono",
|
| interactive=True,
|
| )
|
| alt_pl_duet = gr.Checkbox(
|
| label=_i18n("alt_pipeline"),
|
| info=_i18n("alt_pipeline_info"),
|
| value=False,
|
| interactive=True,
|
| )
|
| mix_duet = gr.Checkbox(
|
| label=_i18n("mix_voices"),
|
| value=False,
|
| interactive=True,
|
| )
|
| mix_duet_ratio = gr.Slider(
|
| label=_i18n("voice_balance"),
|
| info=_i18n("voice_balance_info"),
|
| minimum=-1,
|
| maximum=1,
|
| step=0.05,
|
| value=0,
|
| interactive=True,
|
| visible=False
|
| )
|
|
|
| output_format_duet = gr.Dropdown(
|
| label=_i18n("output_format"),
|
| interactive=True,
|
| choices=output_formats,
|
| value=output_formats[0] if output_formats else "wav",
|
| filterable=False,
|
| )
|
| convert_btn_duet = gr.Button(
|
| _i18n("convert_btn"),
|
| variant="primary",
|
| interactive=True
|
| )
|
|
|
| with gr.Row(equal_height=True):
|
| output_duet_audio_1 = gr.Audio(
|
| label=_i18n("model_1_result"),
|
| type="filepath",
|
| interactive=False,
|
| show_download_button=True,
|
| )
|
| output_duet_audio_2 = gr.Audio(
|
| label=_i18n("model_2_result"),
|
| type="filepath",
|
| interactive=False,
|
| show_download_button=True,
|
| )
|
|
|
| @mix_duet.change(
|
| inputs=mix_duet,
|
| outputs=[mix_duet_ratio, output_duet_audio_1, output_duet_audio_2]
|
| )
|
| def mix_duet_change_fn(x: bool) -> Tuple[gr.update, gr.update, gr.update]:
|
| if x:
|
| return (
|
| gr.update(visible=x),
|
| gr.update(label=_i18n("mixed_result"), value=None),
|
| gr.update(visible=False, value=None)
|
| )
|
| else:
|
| return (
|
| gr.update(visible=x),
|
| gr.update(label=_i18n("model_1_result"), value=None),
|
| gr.update(visible=True, value=None)
|
| )
|
|
|
| @convert_btn_duet.click(
|
| inputs=[
|
| list_input_files_duet,
|
| model_name1, model_name2,
|
| pitch_method1, pitch_method2,
|
| pitch1, pitch2,
|
| hop_length1, hop_length2,
|
| index_rate1, index_rate2,
|
| filter_radius1, filter_radius2,
|
| rms1, rms2,
|
| protect1, protect2,
|
| f0_min1, f0_min2,
|
| f0_max1, f0_max2,
|
| output_format_duet,
|
| stereo_mode_duet,
|
| alt_pl_duet,
|
| embedder_name1, embedder_name2,
|
| transformers_mode1, transformers_mode2,
|
| mix_duet, mix_duet_ratio
|
| ],
|
| outputs=[output_duet_audio_1, output_duet_audio_2],
|
| queue=True
|
| )
|
| def vbach_convert_duet_fn(
|
| input_file: Optional[str],
|
| model_name1: str,
|
| model_name2: str,
|
| pitch_method1: str,
|
| pitch_method2: str,
|
| pitch1: float,
|
| pitch2: float,
|
| hop_length1: int,
|
| hop_length2: int,
|
| index_rate1: float,
|
| index_rate2: float,
|
| filter_radius1: int,
|
| filter_radius2: int,
|
| rms1: float,
|
| rms2: float,
|
| protect1: float,
|
| protect2: float,
|
| f0_min1: int,
|
| f0_min2: int,
|
| f0_max1: int,
|
| f0_max2: int,
|
| output_format: str,
|
| stereo_mode: str,
|
| alt_pipeline: bool,
|
| embedder_name1: str,
|
| embedder_name2: str,
|
| transformers_mode1: bool,
|
| transformers_mode2: bool,
|
| mix_duet: bool,
|
| mix_duet_ratio: float
|
| ) -> Tuple[Optional[Dict], Optional[Dict], gr.update]:
|
| vbach_duet_ = self.vbach_convert_duet_zero_gpu if zerogpu_available else self.vbach_convert_duet_zero_gpu
|
| return vbach_duet_(
|
| input_file=input_file,
|
| model_name1=model_name1,
|
| model_name2=model_name2,
|
| pitch_method1=pitch_method1,
|
| pitch_method2=pitch_method2,
|
| pitch1=pitch1,
|
| pitch2=pitch2,
|
| hop_length1=hop_length1,
|
| hop_length2=hop_length2,
|
| index_rate1=index_rate1,
|
| index_rate2=index_rate2,
|
| filter_radius1=filter_radius1,
|
| filter_radius2=filter_radius2,
|
| rms1=rms1,
|
| rms2=rms2,
|
| protect1=protect1,
|
| protect2=protect2,
|
| f0_min1=f0_min1,
|
| f0_min2=f0_min2,
|
| f0_max1=f0_max1,
|
| f0_max2=f0_max2,
|
| output_format=output_format,
|
| stereo_mode=stereo_mode,
|
| alt_pipeline=alt_pipeline,
|
| embedder_name1=embedder_name1,
|
| embedder_name2=embedder_name2,
|
| transformers_mode1=transformers_mode1,
|
| transformers_mode2=transformers_mode2,
|
| mix_duet=mix_duet,
|
| mix_duet_ratio=mix_duet_ratio
|
| )
|
|
|
| with gr.TabItem(_i18n("tab_manager")):
|
| with gr.TabItem(_i18n("tab_download_url")):
|
| with gr.TabItem(_i18n("tab_zip")):
|
| with gr.Group():
|
| url_zip = gr.Textbox(
|
| label=_i18n("zip_url"),
|
| interactive=True
|
| )
|
| url_zip_model_name = gr.Textbox(
|
| label=_i18n("model_name"),
|
| interactive=True
|
| )
|
| url_zip_download_btn = gr.Button(
|
| _i18n("download_btn"),
|
| variant="primary",
|
| interactive=True
|
| )
|
| url_zip_output = gr.Textbox(
|
| label=_i18n("status"),
|
| interactive=False,
|
| lines=5
|
| )
|
|
|
| url_zip_download_btn.click(
|
| lambda x, y: model_manager.install_model_zip(
|
| x,
|
| namer.short(
|
| namer.sanitize(y), length=40
|
| ),
|
| "url",
|
| ),
|
| inputs=[url_zip, url_zip_model_name],
|
| outputs=url_zip_output,
|
| )
|
|
|
| with gr.TabItem(_i18n("tab_files")):
|
| with gr.Group():
|
| url_pth = gr.Textbox(
|
| label=_i18n("pth_url"),
|
| interactive=True
|
| )
|
| url_index = gr.Textbox(
|
| label=_i18n("index_url_optional"),
|
| interactive=True
|
| )
|
| url_file_model_name = gr.Textbox(
|
| label=_i18n("model_name"),
|
| interactive=True
|
| )
|
| url_file_download_btn = gr.Button(
|
| _i18n("download_btn"),
|
| variant="primary",
|
| interactive=True
|
| )
|
| url_file_output = gr.Textbox(
|
| label=_i18n("status"),
|
| interactive=False,
|
| lines=5
|
| )
|
|
|
| url_file_download_btn.click(
|
| lambda x, y, z: model_manager.install_model_files(
|
| x,
|
| y,
|
| namer.short(
|
| namer.sanitize(z), length=40
|
| ),
|
| "url",
|
| ),
|
| inputs=[url_index, url_pth, url_file_model_name],
|
| outputs=url_file_output,
|
| )
|
|
|
| with gr.Tab(_i18n("tab_upload_local")):
|
| with gr.TabItem(_i18n("tab_zip")):
|
| with gr.Group():
|
| local_zip = gr.File(
|
| label=_i18n("zip_file"),
|
| file_types=[".zip"],
|
| file_count="single",
|
| interactive=True
|
| )
|
| local_zip_model_name = gr.Textbox(
|
| label=_i18n("model_name"),
|
| interactive=True
|
| )
|
| local_zip_upload_btn = gr.Button(
|
| _i18n("upload_btn"),
|
| variant="primary",
|
| interactive=True
|
| )
|
| local_zip_output = gr.Textbox(
|
| label=_i18n("status"),
|
| interactive=False,
|
| lines=5
|
| )
|
|
|
| local_zip_upload_btn.click(
|
| lambda x, y: model_manager.install_model_zip(
|
| x,
|
| namer.short(
|
| namer.sanitize(y), length=40
|
| ),
|
| "local",
|
| ),
|
| inputs=[local_zip, local_zip_model_name],
|
| outputs=local_zip_output,
|
| )
|
|
|
| with gr.TabItem(_i18n("tab_files")):
|
| with gr.Group():
|
| with gr.Row():
|
| local_pth = gr.File(
|
| label=_i18n("pth_file"),
|
| file_types=[".pth"],
|
| file_count="single",
|
| interactive=True
|
| )
|
| local_index = gr.File(
|
| label=_i18n("index_file_optional"),
|
| file_types=[".index"],
|
| file_count="single",
|
| interactive=True
|
| )
|
| local_file_model_name = gr.Textbox(
|
| label=_i18n("model_name"),
|
| interactive=True
|
| )
|
| local_file_upload_btn = gr.Button(
|
| _i18n("upload_btn"),
|
| variant="primary",
|
| interactive=True
|
| )
|
| local_file_output = gr.Textbox(
|
| label=_i18n("status"),
|
| interactive=False,
|
| lines=5
|
| )
|
|
|
| local_file_upload_btn.click(
|
| lambda x, y, z: model_manager.install_model_files(
|
| x,
|
| y,
|
| namer.short(
|
| namer.sanitize(z), length=40
|
| ),
|
| "local",
|
| ),
|
| inputs=[local_index, local_pth, local_file_model_name],
|
| outputs=local_file_output,
|
| )
|
|
|
| with gr.TabItem(_i18n("tab_delete_model")):
|
| with gr.Group():
|
| delete_model_name = gr.Dropdown(
|
| label=_i18n("model_name"),
|
| choices=model_manager.parse_voice_models(),
|
| interactive=True,
|
| filterable=False,
|
| )
|
| delete_refresh_btn = gr.Button(
|
| _i18n("refresh"),
|
| interactive=True
|
| )
|
| delete_btn = gr.Button(
|
| _i18n("delete"),
|
| variant="stop",
|
| interactive=True
|
| )
|
|
|
| @delete_refresh_btn.click(
|
| inputs=None, outputs=delete_model_name
|
| )
|
| def refresh_list_voice_models() -> gr.update:
|
| models = model_manager.parse_voice_models()
|
| first_model = models[0] if models else None
|
| return gr.update(choices=models, value=first_model)
|
|
|
| delete_output = gr.Textbox(
|
| label=_i18n("status"),
|
| interactive=False,
|
| lines=5
|
| )
|
|
|
| delete_btn.click(
|
| fn=model_manager.del_voice_model,
|
| inputs=delete_model_name,
|
| outputs=delete_output,
|
| )
|
|
|
| @gr.on(
|
| inputs=None,
|
| outputs=[delete_model_name, model_name, model_name1, model_name2]
|
| )
|
| def refresh_all_models() -> Tuple[gr.update, gr.update, gr.update, gr.update]:
|
| models = model_manager.parse_voice_models()
|
| first_model = models[0] if models else None
|
| return (
|
| gr.update(choices=models, value=first_model),
|
| gr.update(choices=models, value=first_model),
|
| gr.update(choices=models, value=first_model),
|
| gr.update(choices=models, value=first_model)
|
| )
|
|
|
| return vbach_app
|
|
|
|
|
| if __name__ == "__main__":
|
| parser = argparse.ArgumentParser(description="Vbach - RVC форк")
|
|
|
|
|
| subparsers = parser.add_subparsers(dest="mode", help=_i18n("mode"), required=True)
|
|
|
|
|
| cli_parser = subparsers.add_parser("cli", help=_i18n("cli_mode"))
|
| cli_parser.add_argument("--input", nargs="*", help=_i18n("input_path_help"))
|
| cli_parser.add_argument(
|
| "--output_dir", type=str, required=True, help=_i18n("output_dir_help")
|
| )
|
| cli_parser.add_argument(
|
| "--output_format",
|
| type=str,
|
| default="wav",
|
| choices=output_formats,
|
| help=_i18n("output_format_help"),
|
| )
|
| cli_parser.add_argument(
|
| "--output_bitrate", type=str, default="320k", help=_i18n("output_bitrate_help")
|
| )
|
| cli_parser.add_argument(
|
| "--format_name",
|
| action="store_true",
|
| help=_i18n("format_name_help"),
|
| )
|
| cli_parser.add_argument(
|
| "--output_name",
|
| type=str,
|
| default="NAME_STEM",
|
| help=_i18n("output_name_help"),
|
| )
|
| cli_parser.add_argument(
|
| "--model_name",
|
| type=str,
|
| default="model",
|
| help=_i18n("model_name_help"),
|
| )
|
| cli_parser.add_argument(
|
| "--index_rate",
|
| type=float,
|
| default=0,
|
| help=_i18n("index_rate_help"),
|
| metavar="[0.0-1.0]",
|
| )
|
| cli_parser.add_argument(
|
| "--stereo_mode",
|
| type=str,
|
| default="mono",
|
| choices=["mono", "left/right", "sim/dif"],
|
| help=_i18n("stereo_mode_help"),
|
| )
|
| cli_parser.add_argument(
|
| "--method_pitch",
|
| type=str,
|
| default="rmvpe+",
|
| help=_i18n("f0_method_help"),
|
| )
|
| cli_parser.add_argument(
|
| "--pitch", type=int, default=0, help=_i18n("pitch_help")
|
| )
|
| cli_parser.add_argument(
|
| "--hop_length",
|
| type=int,
|
| default=128,
|
| help=_i18n("hop_length_help"),
|
| )
|
| cli_parser.add_argument(
|
| "--filter_radius", type=int, default=3, help=_i18n("filter_radius_help")
|
| )
|
| cli_parser.add_argument(
|
| "--rms",
|
| type=float,
|
| default=0.25,
|
| help=_i18n("rms_help"),
|
| )
|
| cli_parser.add_argument(
|
| "--protect", type=float, default=0.33, help=_i18n("protect_help")
|
| )
|
| cli_parser.add_argument(
|
| "--f0_min", type=int, default=50, help=_i18n("f0_min_help")
|
| )
|
| cli_parser.add_argument(
|
| "--f0_max", type=int, default=1100, help=_i18n("f0_max_help")
|
| )
|
| cli_parser.add_argument(
|
| "--alt_pipeline",
|
| action="store_true",
|
| help=_i18n("alt_pipeline_help"),
|
| )
|
| cli_parser.add_argument(
|
| "--use_transformers",
|
| action="store_true",
|
| help=_i18n("use_transformers_help"),
|
| )
|
| cli_parser.add_argument(
|
| "--embedder_name",
|
| type=str,
|
| default="hubert_base",
|
| help=_i18n("embedder_name_help"),
|
| )
|
|
|
|
|
| app_parser = subparsers.add_parser("app", help=_i18n("app_mode"))
|
| app_parser.add_argument(
|
| "--port",
|
| type=int,
|
| default=7860,
|
| help=_i18n("port_help")
|
| )
|
| app_parser.add_argument(
|
| "--share",
|
| action="store_true",
|
| help=_i18n("share_help"),
|
| )
|
| app_parser.add_argument(
|
| "--debug",
|
| action="store_true",
|
| help=_i18n("debug_help"),
|
| )
|
|
|
| model_manager_parser = subparsers.add_parser(
|
| "model_manager", help=_i18n("model_manager_help")
|
| )
|
| vbach_model_manager_parser = model_manager_parser.add_subparsers(
|
| title="vbach_commands", dest="vbach_command", required=True
|
| )
|
|
|
| install_local_parser = vbach_model_manager_parser.add_parser(
|
| "install_local", help=_i18n("install_local_help")
|
| )
|
| install_local_parser.add_argument(
|
| "--model_name", required=True, help=_i18n("model_name_help")
|
| )
|
| install_local_parser.add_argument("--pth", required=True, help=_i18n("pth_path_help"))
|
| install_local_parser.add_argument(
|
| "--index", required=False, help=_i18n("index_path_help")
|
| )
|
|
|
| install_url_zip_parser = vbach_model_manager_parser.add_parser(
|
| "install_url_zip", help=_i18n("install_url_zip_help")
|
| )
|
| install_url_zip_parser.add_argument(
|
| "--model_name", required=True, help=_i18n("model_name_help")
|
| )
|
| install_url_zip_parser.add_argument("--url", required=True, help=_i18n("zip_url_help"))
|
|
|
| install_url_files_parser = vbach_model_manager_parser.add_parser(
|
| "install_url_files", help=_i18n("install_url_files_help")
|
| )
|
| install_url_files_parser.add_argument(
|
| "--model_name", required=True, help=_i18n("model_name_help")
|
| )
|
| install_url_files_parser.add_argument(
|
| "--pth_url", required=True, help=_i18n("pth_url_help")
|
| )
|
| install_url_files_parser.add_argument(
|
| "--index_url", required=False, help=_i18n("index_url_help")
|
| )
|
|
|
| list_parser = vbach_model_manager_parser.add_parser(
|
| "list", help=_i18n("list_models_help")
|
| )
|
|
|
| remove_voice_model = vbach_model_manager_parser.add_parser(
|
| "remove", help=_i18n("remove_model_help")
|
| )
|
| remove_voice_model.add_argument(
|
| "--model_name", required=True, help=_i18n("model_name_help")
|
| )
|
|
|
| args = parser.parse_args()
|
|
|
| if args.mode == "cli":
|
| if not args.input:
|
| cli_parser.error(_i18n("input_required"))
|
|
|
| list_valid_files = get_files_from_list(args.input)
|
| if list_valid_files:
|
| for i, vocals_file in enumerate(list_valid_files, start=1):
|
| print(_i18n('processing_file', current=i, total=len(list_valid_files), file=vocals_file))
|
| vbach_inference(
|
| input_file=vocals_file,
|
| model_name=args.model_name,
|
| output_dir=args.output_dir,
|
| output_name=args.output_name,
|
| output_bitrate=args.output_bitrate,
|
| output_format=args.output_format,
|
| pitch=args.pitch,
|
| method_pitch=args.method_pitch,
|
| format_name=(True if len(list_valid_files) > 1 else args.format_name),
|
| add_params={
|
| "index_rate": args.index_rate,
|
| "filter_radius": args.filter_radius,
|
| "protect": args.protect,
|
| "rms": args.rms,
|
| "mangio_crepe_hop_length": args.hop_length,
|
| "f0_min": args.f0_min,
|
| "f0_max": args.f0_max,
|
| "stereo_mode": args.stereo_mode,
|
| },
|
| pipeline_mode="alt" if args.alt_pipeline else "orig",
|
| embedder_name=args.embedder_name,
|
| stack="transformers" if args.use_transformers else "fairseq",
|
| device=set_device()
|
| )
|
| else:
|
| sys.exit(1)
|
|
|
| elif args.mode == "app":
|
| Vbach(user_directory, set_device(0)).UI().launch(
|
| server_name="0.0.0.0",
|
| server_port=args.port,
|
| share=args.share,
|
| allowed_paths=["/"],
|
| debug=args.debug,
|
| inbrowser=True
|
| )
|
|
|
| elif args.mode == "model_manager":
|
| if args.vbach_command == "install_local":
|
| status = model_manager.install_model_files(
|
| args.index, args.pth, args.model_name, mode="local"
|
| )
|
| print(status)
|
|
|
| elif args.vbach_command == "install_url_zip":
|
| status = model_manager.install_model_zip(
|
| args.url, args.model_name, mode="url"
|
| )
|
| print(status)
|
|
|
| elif args.vbach_command == "install_url_files":
|
| status = model_manager.install_model_files(
|
| args.index_url, args.pth_url, args.model_name, mode="url"
|
| )
|
| print(status)
|
|
|
| elif args.vbach_command == "list":
|
| model_manager.get_list_installed_models()
|
|
|
| elif args.vbach_command == "remove":
|
| status = model_manager.del_voice_model(args.model_name)
|
|
|
| print(status)
|
|
|
|
|