| | import audioread |
| | import librosa |
| |
|
| | import os |
| | import sys |
| | import json |
| | import time |
| | from tqdm import tqdm |
| | import pickle |
| | import hashlib |
| | import logging |
| | import traceback |
| | import shutil |
| | import soundfile as sf |
| |
|
| | import torch |
| |
|
| | from gui_data.constants import * |
| | from gui_data.old_data_check import file_check, remove_unneeded_yamls, remove_temps |
| | from lib_v5.vr_network.model_param_init import ModelParameters |
| | from lib_v5 import spec_utils |
| | from pathlib import Path |
| | from separate import SeperateAttributes, SeperateDemucs, SeperateMDX, SeperateVR, save_format |
| | from typing import List |
| |
|
| |
|
| | logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.INFO) |
| | logging.info('UVR BEGIN') |
| |
|
| | PREVIOUS_PATCH_WIN = 'UVR_Patch_1_12_23_14_54' |
| |
|
| | is_dnd_compatible = True |
| | banner_placement = -2 |
| |
|
| | def save_data(data): |
| | """ |
| | Saves given data as a .pkl (pickle) file |
| | |
| | Paramters: |
| | data(dict): |
| | Dictionary containing all the necessary data to save |
| | """ |
| | |
| | with open('data.pkl', 'wb') as data_file: |
| | pickle.dump(data, data_file) |
| |
|
| | def load_data() -> dict: |
| | """ |
| | Loads saved pkl file and returns the stored data |
| | |
| | Returns(dict): |
| | Dictionary containing all the saved data |
| | """ |
| | try: |
| | with open('data.pkl', 'rb') as data_file: |
| | data = pickle.load(data_file) |
| |
|
| | return data |
| | except (ValueError, FileNotFoundError): |
| | |
| |
|
| | save_data(data=DEFAULT_DATA) |
| |
|
| | return load_data() |
| |
|
| | def load_model_hash_data(dictionary): |
| | '''Get the model hash dictionary''' |
| |
|
| | with open(dictionary) as d: |
| | data = d.read() |
| |
|
| | return json.loads(data) |
| |
|
| | |
| | |
| | if getattr(sys, 'frozen', False): |
| | |
| | |
| | |
| | BASE_PATH = sys._MEIPASS |
| | else: |
| | BASE_PATH = os.path.dirname(os.path.abspath(__file__)) |
| |
|
| | os.chdir(BASE_PATH) |
| |
|
| | debugger = [] |
| |
|
| | |
| | |
| | MODELS_DIR = os.path.join(BASE_PATH, 'models') |
| | VR_MODELS_DIR = os.path.join(MODELS_DIR, 'VR_Models') |
| | MDX_MODELS_DIR = os.path.join(MODELS_DIR, 'MDX_Net_Models') |
| | DEMUCS_MODELS_DIR = os.path.join(MODELS_DIR, 'Demucs_Models') |
| | DEMUCS_NEWER_REPO_DIR = os.path.join(DEMUCS_MODELS_DIR, 'v3_v4_repo') |
| | MDX_MIXER_PATH = os.path.join(BASE_PATH, 'lib_v5', 'mixer.ckpt') |
| |
|
| | |
| | VR_HASH_DIR = os.path.join(VR_MODELS_DIR, 'model_data') |
| | VR_HASH_JSON = os.path.join(VR_MODELS_DIR, 'model_data', 'model_data.json') |
| | MDX_HASH_DIR = os.path.join(MDX_MODELS_DIR, 'model_data') |
| | MDX_HASH_JSON = os.path.join(MDX_MODELS_DIR, 'model_data', 'model_data.json') |
| | DEMUCS_MODEL_NAME_SELECT = os.path.join(DEMUCS_MODELS_DIR, 'model_data', 'model_name_mapper.json') |
| | MDX_MODEL_NAME_SELECT = os.path.join(MDX_MODELS_DIR, 'model_data', 'model_name_mapper.json') |
| | ENSEMBLE_CACHE_DIR = os.path.join(BASE_PATH, 'gui_data', 'saved_ensembles') |
| | SETTINGS_CACHE_DIR = os.path.join(BASE_PATH, 'gui_data', 'saved_settings') |
| | VR_PARAM_DIR = os.path.join(BASE_PATH, 'lib_v5', 'vr_network', 'modelparams') |
| | SAMPLE_CLIP_PATH = os.path.join(BASE_PATH, 'temp_sample_clips') |
| | ENSEMBLE_TEMP_PATH = os.path.join(BASE_PATH, 'ensemble_temps') |
| |
|
| | |
| | ICON_IMG_PATH = os.path.join(BASE_PATH, 'gui_data', 'img', 'GUI-Icon.ico') |
| | FONT_PATH = os.path.join(BASE_PATH, 'gui_data', 'fonts', 'centurygothic', 'GOTHIC.TTF') |
| |
|
| | |
| | COMPLETE_CHIME = os.path.join(BASE_PATH, 'gui_data', 'complete_chime.wav') |
| | FAIL_CHIME = os.path.join(BASE_PATH, 'gui_data', 'fail_chime.wav') |
| | CHANGE_LOG = os.path.join(BASE_PATH, 'gui_data', 'change_log.txt') |
| | SPLASH_DOC = os.path.join(BASE_PATH, 'tmp', 'splash.txt') |
| |
|
| | file_check(os.path.join(MODELS_DIR, 'Main_Models'), VR_MODELS_DIR) |
| | file_check(os.path.join(DEMUCS_MODELS_DIR, 'v3_repo'), DEMUCS_NEWER_REPO_DIR) |
| | remove_unneeded_yamls(DEMUCS_MODELS_DIR) |
| |
|
| | remove_temps(ENSEMBLE_TEMP_PATH) |
| | remove_temps(SAMPLE_CLIP_PATH) |
| | remove_temps(os.path.join(BASE_PATH, 'img')) |
| |
|
| | if not os.path.isdir(ENSEMBLE_TEMP_PATH): |
| | os.mkdir(ENSEMBLE_TEMP_PATH) |
| | |
| | if not os.path.isdir(SAMPLE_CLIP_PATH): |
| | os.mkdir(SAMPLE_CLIP_PATH) |
| |
|
| | model_hash_table = {} |
| | data = load_data() |
| | |
| | class ModelData(): |
| | def __init__(self, model_name: str, |
| | selected_process_method=ENSEMBLE_MODE, |
| | is_secondary_model=False, |
| | primary_model_primary_stem=None, |
| | is_primary_model_primary_stem_only=False, |
| | is_primary_model_secondary_stem_only=False, |
| | is_pre_proc_model=False, |
| | is_dry_check=False): |
| |
|
| | self.is_gpu_conversion = 0 if root.is_gpu_conversion_var.get() else -1 |
| | self.is_normalization = root.is_normalization_var.get() |
| | self.is_primary_stem_only = root.is_primary_stem_only_var.get() |
| | self.is_secondary_stem_only = root.is_secondary_stem_only_var.get() |
| | self.is_denoise = root.is_denoise_var.get() |
| | self.mdx_batch_size = 1 if root.mdx_batch_size_var.get() == DEF_OPT else int(root.mdx_batch_size_var.get()) |
| | self.is_mdx_ckpt = False |
| | self.wav_type_set = root.wav_type_set |
| | self.mp3_bit_set = root.mp3_bit_set_var.get() |
| | self.save_format = root.save_format_var.get() |
| | self.is_invert_spec = root.is_invert_spec_var.get() |
| | self.is_mixer_mode = root.is_mixer_mode_var.get() |
| | self.demucs_stems = root.demucs_stems_var.get() |
| | self.demucs_source_list = [] |
| | self.demucs_stem_count = 0 |
| | self.mixer_path = MDX_MIXER_PATH |
| | self.model_name = model_name |
| | self.process_method = selected_process_method |
| | self.model_status = False if self.model_name == CHOOSE_MODEL or self.model_name == NO_MODEL else True |
| | self.primary_stem = None |
| | self.secondary_stem = None |
| | self.is_ensemble_mode = False |
| | self.ensemble_primary_stem = None |
| | self.ensemble_secondary_stem = None |
| | self.primary_model_primary_stem = primary_model_primary_stem |
| | self.is_secondary_model = is_secondary_model |
| | self.secondary_model = None |
| | self.secondary_model_scale = None |
| | self.demucs_4_stem_added_count = 0 |
| | self.is_demucs_4_stem_secondaries = False |
| | self.is_4_stem_ensemble = False |
| | self.pre_proc_model = None |
| | self.pre_proc_model_activated = False |
| | self.is_pre_proc_model = is_pre_proc_model |
| | self.is_dry_check = is_dry_check |
| | self.model_samplerate = 44100 |
| | self.model_capacity = 32, 128 |
| | self.is_vr_51_model = False |
| | self.is_demucs_pre_proc_model_inst_mix = False |
| | self.manual_download_Button = None |
| | self.secondary_model_4_stem = [] |
| | self.secondary_model_4_stem_scale = [] |
| | self.secondary_model_4_stem_names = [] |
| | self.secondary_model_4_stem_model_names_list = [] |
| | self.all_models = [] |
| | self.secondary_model_other = None |
| | self.secondary_model_scale_other = None |
| | self.secondary_model_bass = None |
| | self.secondary_model_scale_bass = None |
| | self.secondary_model_drums = None |
| | self.secondary_model_scale_drums = None |
| |
|
| | if selected_process_method == ENSEMBLE_MODE: |
| | partitioned_name = model_name.partition(ENSEMBLE_PARTITION) |
| | self.process_method = partitioned_name[0] |
| | self.model_name = partitioned_name[2] |
| | self.model_and_process_tag = model_name |
| | self.ensemble_primary_stem, self.ensemble_secondary_stem = root.return_ensemble_stems() |
| | self.is_ensemble_mode = True if not is_secondary_model and not is_pre_proc_model else False |
| | self.is_4_stem_ensemble = True if root.ensemble_main_stem_var.get() == FOUR_STEM_ENSEMBLE and self.is_ensemble_mode else False |
| | self.pre_proc_model_activated = root.is_demucs_pre_proc_model_activate_var.get() if not self.ensemble_primary_stem == VOCAL_STEM else False |
| |
|
| | if self.process_method == VR_ARCH_TYPE: |
| | self.is_secondary_model_activated = root.vr_is_secondary_model_activate_var.get() if not self.is_secondary_model else False |
| | self.aggression_setting = float(int(root.aggression_setting_var.get())/100) |
| | self.is_tta = root.is_tta_var.get() |
| | self.is_post_process = root.is_post_process_var.get() |
| | self.window_size = int(root.window_size_var.get()) |
| | self.batch_size = 1 if root.batch_size_var.get() == DEF_OPT else int(root.batch_size_var.get()) |
| | self.crop_size = int(root.crop_size_var.get()) |
| | self.is_high_end_process = 'mirroring' if root.is_high_end_process_var.get() else 'None' |
| | self.post_process_threshold = float(root.post_process_threshold_var.get()) |
| | self.model_capacity = 32, 128 |
| | self.model_path = os.path.join(VR_MODELS_DIR, f"{self.model_name}.pth") |
| | self.get_model_hash() |
| | if self.model_hash: |
| | self.model_data = self.get_model_data(VR_HASH_DIR, root.vr_hash_MAPPER) if not self.model_hash == WOOD_INST_MODEL_HASH else WOOD_INST_PARAMS |
| | if self.model_data: |
| | vr_model_param = os.path.join(VR_PARAM_DIR, "{}.json".format(self.model_data["vr_model_param"])) |
| | self.primary_stem = self.model_data["primary_stem"] |
| | self.secondary_stem = STEM_PAIR_MAPPER[self.primary_stem] |
| | self.vr_model_param = ModelParameters(vr_model_param) |
| | self.model_samplerate = self.vr_model_param.param['sr'] |
| | if "nout" in self.model_data.keys() and "nout_lstm" in self.model_data.keys(): |
| | self.model_capacity = self.model_data["nout"], self.model_data["nout_lstm"] |
| | self.is_vr_51_model = True |
| | else: |
| | self.model_status = False |
| | |
| | if self.process_method == MDX_ARCH_TYPE: |
| | self.is_secondary_model_activated = root.mdx_is_secondary_model_activate_var.get() if not is_secondary_model else False |
| | self.margin = int(root.margin_var.get()) |
| | self.chunks = root.determine_auto_chunks(root.chunks_var.get(), self.is_gpu_conversion) if root.is_chunk_mdxnet_var.get() else 0 |
| | self.get_mdx_model_path() |
| | self.get_model_hash() |
| | if self.model_hash: |
| | self.model_data = self.get_model_data(MDX_HASH_DIR, root.mdx_hash_MAPPER) |
| | if self.model_data: |
| | self.compensate = self.model_data["compensate"] if root.compensate_var.get() == AUTO_SELECT else float(root.compensate_var.get()) |
| | self.mdx_dim_f_set = self.model_data["mdx_dim_f_set"] |
| | self.mdx_dim_t_set = self.model_data["mdx_dim_t_set"] |
| | self.mdx_n_fft_scale_set = self.model_data["mdx_n_fft_scale_set"] |
| | self.primary_stem = self.model_data["primary_stem"] |
| | self.secondary_stem = STEM_PAIR_MAPPER[self.primary_stem] |
| | else: |
| | self.model_status = False |
| |
|
| | if self.process_method == DEMUCS_ARCH_TYPE: |
| | self.is_secondary_model_activated = root.demucs_is_secondary_model_activate_var.get() if not is_secondary_model else False |
| | if not self.is_ensemble_mode: |
| | self.pre_proc_model_activated = root.is_demucs_pre_proc_model_activate_var.get() if not root.demucs_stems_var.get() in [VOCAL_STEM, INST_STEM] else False |
| | self.overlap = float(root.overlap_var.get()) |
| | self.margin_demucs = int(root.margin_demucs_var.get()) |
| | self.chunks_demucs = root.determine_auto_chunks(root.chunks_demucs_var.get(), self.is_gpu_conversion) |
| | self.shifts = int(root.shifts_var.get()) |
| | self.is_split_mode = root.is_split_mode_var.get() |
| | self.segment = root.segment_var.get() |
| | self.is_chunk_demucs = root.is_chunk_demucs_var.get() |
| | self.is_demucs_combine_stems = root.is_demucs_combine_stems_var.get() |
| | self.is_primary_stem_only = root.is_primary_stem_only_var.get() if self.is_ensemble_mode else root.is_primary_stem_only_Demucs_var.get() |
| | self.is_secondary_stem_only = root.is_secondary_stem_only_var.get() if self.is_ensemble_mode else root.is_secondary_stem_only_Demucs_var.get() |
| | self.get_demucs_model_path() |
| | self.get_demucs_model_data() |
| |
|
| | self.model_basename = os.path.splitext(os.path.basename(self.model_path))[0] if self.model_status else None |
| | self.pre_proc_model_activated = self.pre_proc_model_activated if not self.is_secondary_model else False |
| | |
| | self.is_primary_model_primary_stem_only = is_primary_model_primary_stem_only |
| | self.is_primary_model_secondary_stem_only = is_primary_model_secondary_stem_only |
| |
|
| | if self.is_secondary_model_activated and self.model_status: |
| | if (not self.is_ensemble_mode and root.demucs_stems_var.get() == ALL_STEMS and self.process_method == DEMUCS_ARCH_TYPE) or self.is_4_stem_ensemble: |
| | for key in DEMUCS_4_SOURCE_LIST: |
| | self.secondary_model_data(key) |
| | self.secondary_model_4_stem.append(self.secondary_model) |
| | self.secondary_model_4_stem_scale.append(self.secondary_model_scale) |
| | self.secondary_model_4_stem_names.append(key) |
| | self.demucs_4_stem_added_count = sum(i is not None for i in self.secondary_model_4_stem) |
| | self.is_secondary_model_activated = False if all(i is None for i in self.secondary_model_4_stem) else True |
| | self.demucs_4_stem_added_count = self.demucs_4_stem_added_count - 1 if self.is_secondary_model_activated else self.demucs_4_stem_added_count |
| | if self.is_secondary_model_activated: |
| | self.secondary_model_4_stem_model_names_list = [None if i is None else i.model_basename for i in self.secondary_model_4_stem] |
| | self.is_demucs_4_stem_secondaries = True |
| | else: |
| | primary_stem = self.ensemble_primary_stem if self.is_ensemble_mode and self.process_method == DEMUCS_ARCH_TYPE else self.primary_stem |
| | self.secondary_model_data(primary_stem) |
| | |
| | if self.process_method == DEMUCS_ARCH_TYPE and not is_secondary_model: |
| | if self.demucs_stem_count >= 3 and self.pre_proc_model_activated: |
| | self.pre_proc_model_activated = True |
| | self.pre_proc_model = root.process_determine_demucs_pre_proc_model(self.primary_stem) |
| | self.is_demucs_pre_proc_model_inst_mix = root.is_demucs_pre_proc_model_inst_mix_var.get() if self.pre_proc_model else False |
| |
|
| | def secondary_model_data(self, primary_stem): |
| | secondary_model_data = root.process_determine_secondary_model(self.process_method, primary_stem, self.is_primary_stem_only, self.is_secondary_stem_only) |
| | self.secondary_model = secondary_model_data[0] |
| | self.secondary_model_scale = secondary_model_data[1] |
| | self.is_secondary_model_activated = False if not self.secondary_model else True |
| | if self.secondary_model: |
| | self.is_secondary_model_activated = False if self.secondary_model.model_basename == self.model_basename else True |
| | |
| | def get_mdx_model_path(self): |
| | |
| | if self.model_name.endswith(CKPT): |
| | |
| | |
| | self.is_mdx_ckpt = True |
| | |
| | ext = '' if self.is_mdx_ckpt else ONNX |
| | |
| | for file_name, chosen_mdx_model in root.mdx_name_select_MAPPER.items(): |
| | if self.model_name in chosen_mdx_model: |
| | self.model_path = os.path.join(MDX_MODELS_DIR, f"{file_name}{ext}") |
| | break |
| | else: |
| | self.model_path = os.path.join(MDX_MODELS_DIR, f"{self.model_name}{ext}") |
| | |
| | self.mixer_path = os.path.join(MDX_MODELS_DIR, f"mixer_val.ckpt") |
| | |
| | def get_demucs_model_path(self): |
| | |
| | demucs_newer = [True for x in DEMUCS_NEWER_TAGS if x in self.model_name] |
| | demucs_model_dir = DEMUCS_NEWER_REPO_DIR if demucs_newer else DEMUCS_MODELS_DIR |
| | |
| | for file_name, chosen_model in root.demucs_name_select_MAPPER.items(): |
| | if self.model_name in chosen_model: |
| | self.model_path = os.path.join(demucs_model_dir, file_name) |
| | break |
| | else: |
| | self.model_path = os.path.join(DEMUCS_NEWER_REPO_DIR, f'{self.model_name}.yaml') |
| |
|
| | def get_demucs_model_data(self): |
| |
|
| | self.demucs_version = DEMUCS_V4 |
| |
|
| | for key, value in DEMUCS_VERSION_MAPPER.items(): |
| | if value in self.model_name: |
| | self.demucs_version = key |
| |
|
| | self.demucs_source_list = DEMUCS_2_SOURCE if DEMUCS_UVR_MODEL in self.model_name else DEMUCS_4_SOURCE |
| | self.demucs_source_map = DEMUCS_2_SOURCE_MAPPER if DEMUCS_UVR_MODEL in self.model_name else DEMUCS_4_SOURCE_MAPPER |
| | self.demucs_stem_count = 2 if DEMUCS_UVR_MODEL in self.model_name else 4 |
| | |
| | if not self.is_ensemble_mode: |
| | self.primary_stem = PRIMARY_STEM if self.demucs_stems == ALL_STEMS else self.demucs_stems |
| | self.secondary_stem = STEM_PAIR_MAPPER[self.primary_stem] |
| |
|
| | def get_model_data(self, model_hash_dir, hash_mapper): |
| | model_settings_json = os.path.join(model_hash_dir, "{}.json".format(self.model_hash)) |
| |
|
| | if os.path.isfile(model_settings_json): |
| | return json.load(open(model_settings_json)) |
| | else: |
| | for hash, settings in hash_mapper.items(): |
| | if self.model_hash in hash: |
| | return settings |
| | else: |
| | return self.get_model_data_from_popup() |
| |
|
| | def get_model_data_from_popup(self): |
| | return None |
| |
|
| | def get_model_hash(self): |
| | self.model_hash = None |
| | |
| | if not os.path.isfile(self.model_path): |
| | self.model_status = False |
| | self.model_hash is None |
| | else: |
| | if model_hash_table: |
| | for (key, value) in model_hash_table.items(): |
| | if self.model_path == key: |
| | self.model_hash = value |
| | break |
| | |
| | if not self.model_hash: |
| | try: |
| | with open(self.model_path, 'rb') as f: |
| | f.seek(- 10000 * 1024, 2) |
| | self.model_hash = hashlib.md5(f.read()).hexdigest() |
| | except: |
| | self.model_hash = hashlib.md5(open(self.model_path,'rb').read()).hexdigest() |
| | |
| | table_entry = {self.model_path: self.model_hash} |
| | model_hash_table.update(table_entry) |
| |
|
| |
|
| | class Ensembler(): |
| | def __init__(self, is_manual_ensemble=False): |
| | self.is_save_all_outputs_ensemble = root.is_save_all_outputs_ensemble_var.get() |
| | chosen_ensemble_name = '{}'.format(root.chosen_ensemble_var.get().replace(" ", "_")) if not root.chosen_ensemble_var.get() == CHOOSE_ENSEMBLE_OPTION else 'Ensembled' |
| | ensemble_algorithm = root.ensemble_type_var.get().partition("/") |
| | ensemble_main_stem_pair = root.ensemble_main_stem_var.get().partition("/") |
| | time_stamp = round(time.time()) |
| | self.audio_tool = MANUAL_ENSEMBLE |
| | self.main_export_path = Path(root.export_path_var.get()) |
| | self.chosen_ensemble = f"_{chosen_ensemble_name}" if root.is_append_ensemble_name_var.get() else '' |
| | ensemble_folder_name = self.main_export_path if self.is_save_all_outputs_ensemble else ENSEMBLE_TEMP_PATH |
| | self.ensemble_folder_name = os.path.join(ensemble_folder_name, '{}_Outputs_{}'.format(chosen_ensemble_name, time_stamp)) |
| | self.is_testing_audio = f"{time_stamp}_" if root.is_testing_audio_var.get() else '' |
| | self.primary_algorithm = ensemble_algorithm[0] |
| | self.secondary_algorithm = ensemble_algorithm[2] |
| | self.ensemble_primary_stem = ensemble_main_stem_pair[0] |
| | self.ensemble_secondary_stem = ensemble_main_stem_pair[2] |
| | self.is_normalization = root.is_normalization_var.get() |
| | self.wav_type_set = root.wav_type_set |
| | self.mp3_bit_set = root.mp3_bit_set_var.get() |
| | self.save_format = root.save_format_var.get() |
| | if not is_manual_ensemble: |
| | os.mkdir(self.ensemble_folder_name) |
| |
|
| | def ensemble_outputs(self, audio_file_base, export_path, stem, is_4_stem=False, is_inst_mix=False): |
| | """Processes the given outputs and ensembles them with the chosen algorithm""" |
| | |
| | if is_4_stem: |
| | algorithm = root.ensemble_type_var.get() |
| | stem_tag = stem |
| | else: |
| | if is_inst_mix: |
| | algorithm = self.secondary_algorithm |
| | stem_tag = f"{self.ensemble_secondary_stem} {INST_STEM}" |
| | else: |
| | algorithm = self.primary_algorithm if stem == PRIMARY_STEM else self.secondary_algorithm |
| | stem_tag = self.ensemble_primary_stem if stem == PRIMARY_STEM else self.ensemble_secondary_stem |
| |
|
| | stem_outputs = self.get_files_to_ensemble(folder=export_path, prefix=audio_file_base, suffix=f"_({stem_tag}).wav") |
| | audio_file_output = f"{self.is_testing_audio}{audio_file_base}{self.chosen_ensemble}_({stem_tag})" |
| | stem_save_path = os.path.join('{}'.format(self.main_export_path),'{}.wav'.format(audio_file_output)) |
| | |
| | if stem_outputs: |
| | spec_utils.ensemble_inputs(stem_outputs, algorithm, self.is_normalization, self.wav_type_set, stem_save_path) |
| | save_format(stem_save_path, self.save_format, self.mp3_bit_set) |
| | |
| | if self.is_save_all_outputs_ensemble: |
| | for i in stem_outputs: |
| | save_format(i, self.save_format, self.mp3_bit_set) |
| | else: |
| | for i in stem_outputs: |
| | try: |
| | os.remove(i) |
| | except Exception as e: |
| | print(e) |
| |
|
| | def ensemble_manual(self, audio_inputs, audio_file_base, is_bulk=False): |
| | """Processes the given outputs and ensembles them with the chosen algorithm""" |
| | |
| | is_mv_sep = True |
| | |
| | if is_bulk: |
| | number_list = list(set([os.path.basename(i).split("_")[0] for i in audio_inputs])) |
| | for n in number_list: |
| | current_list = [i for i in audio_inputs if os.path.basename(i).startswith(n)] |
| | audio_file_base = os.path.basename(current_list[0]).split('.wav')[0] |
| | stem_testing = "instrum" if "Instrumental" in audio_file_base else "vocals" |
| | if is_mv_sep: |
| | audio_file_base = audio_file_base.split("_") |
| | audio_file_base = f"{audio_file_base[1]}_{audio_file_base[2]}_{stem_testing}" |
| | self.ensemble_manual_process(current_list, audio_file_base, is_bulk) |
| | else: |
| | self.ensemble_manual_process(audio_inputs, audio_file_base, is_bulk) |
| | |
| | def ensemble_manual_process(self, audio_inputs, audio_file_base, is_bulk): |
| | |
| | algorithm = root.choose_algorithm_var.get() |
| | algorithm_text = "" if is_bulk else f"_({root.choose_algorithm_var.get()})" |
| | stem_save_path = os.path.join('{}'.format(self.main_export_path),'{}{}{}.wav'.format(self.is_testing_audio, audio_file_base, algorithm_text)) |
| | spec_utils.ensemble_inputs(audio_inputs, algorithm, self.is_normalization, self.wav_type_set, stem_save_path) |
| | save_format(stem_save_path, self.save_format, self.mp3_bit_set) |
| |
|
| | def get_files_to_ensemble(self, folder="", prefix="", suffix=""): |
| | """Grab all the files to be ensembled""" |
| | |
| | return [os.path.join(folder, i) for i in os.listdir(folder) if i.startswith(prefix) and i.endswith(suffix)] |
| |
|
| |
|
| | def secondary_stem(stem): |
| | """Determines secondary stem""" |
| | |
| | for key, value in STEM_PAIR_MAPPER.items(): |
| | if stem in key: |
| | secondary_stem = value |
| | |
| | return secondary_stem |
| |
|
| |
|
| | class UVRInterface: |
| | def __init__(self) -> None: |
| | pass |
| |
|
| | def assemble_model_data(self, model=None, arch_type=ENSEMBLE_MODE, is_dry_check=False) -> List[ModelData]: |
| | if arch_type == ENSEMBLE_STEM_CHECK: |
| | model_data = self.model_data_table |
| | missing_models = [model.model_status for model in model_data if not model.model_status] |
| | |
| | if missing_models or not model_data: |
| | model_data: List[ModelData] = [ModelData(model_name, is_dry_check=is_dry_check) for model_name in self.ensemble_model_list] |
| | self.model_data_table = model_data |
| |
|
| | if arch_type == ENSEMBLE_MODE: |
| | model_data: List[ModelData] = [ModelData(model_name) for model_name in self.ensemble_listbox_get_all_selected_models()] |
| | if arch_type == ENSEMBLE_CHECK: |
| | model_data: List[ModelData] = [ModelData(model)] |
| | if arch_type == VR_ARCH_TYPE or arch_type == VR_ARCH_PM: |
| | model_data: List[ModelData] = [ModelData(model, VR_ARCH_TYPE)] |
| | if arch_type == MDX_ARCH_TYPE: |
| | model_data: List[ModelData] = [ModelData(model, MDX_ARCH_TYPE)] |
| | if arch_type == DEMUCS_ARCH_TYPE: |
| | model_data: List[ModelData] = [ModelData(model, DEMUCS_ARCH_TYPE)] |
| |
|
| | return model_data |
| |
|
| | def create_sample(self, audio_file, sample_path=SAMPLE_CLIP_PATH): |
| | try: |
| | with audioread.audio_open(audio_file) as f: |
| | track_length = int(f.duration) |
| | except Exception as e: |
| | print('Audioread failed to get duration. Trying Librosa...') |
| | y, sr = librosa.load(audio_file, mono=False, sr=44100) |
| | track_length = int(librosa.get_duration(y=y, sr=sr)) |
| | |
| | clip_duration = int(root.model_sample_mode_duration_var.get()) |
| | |
| | if track_length >= clip_duration: |
| | offset_cut = track_length//3 |
| | off_cut = offset_cut + track_length |
| | if not off_cut >= clip_duration: |
| | offset_cut = 0 |
| | name_apped = f'{clip_duration}_second_' |
| | else: |
| | offset_cut, clip_duration = 0, track_length |
| | name_apped = '' |
| |
|
| | sample = librosa.load(audio_file, offset=offset_cut, duration=clip_duration, mono=False, sr=44100)[0].T |
| | audio_sample = os.path.join(sample_path, f'{os.path.splitext(os.path.basename(audio_file))[0]}_{name_apped}sample.wav') |
| | sf.write(audio_sample, sample, 44100) |
| | |
| | return audio_sample |
| | |
| | def verify_audio(self, audio_file, is_process=True, sample_path=None): |
| | is_good = False |
| | error_data = '' |
| | |
| | if os.path.isfile(audio_file): |
| | try: |
| | librosa.load(audio_file, duration=3, mono=False, sr=44100) if not type(sample_path) is str else self.create_sample(audio_file, sample_path) |
| | is_good = True |
| | except Exception as e: |
| | error_name = f'{type(e).__name__}' |
| | traceback_text = ''.join(traceback.format_tb(e.__traceback__)) |
| | message = f'{error_name}: "{e}"\n{traceback_text}"' |
| | if is_process: |
| | audio_base_name = os.path.basename(audio_file) |
| | self.error_log_var.set(f'Error Loading the Following File:\n\n\"{audio_base_name}\"\n\nRaw Error Details:\n\n{message}') |
| | else: |
| | error_data = AUDIO_VERIFICATION_CHECK(audio_file, message) |
| |
|
| | if is_process: |
| | return is_good |
| | else: |
| | return is_good, error_data |
| |
|
| | def cached_sources_clear(self): |
| | self.vr_cache_source_mapper = {} |
| | self.mdx_cache_source_mapper = {} |
| | self.demucs_cache_source_mapper = {} |
| | |
| | def cached_model_source_holder(self, process_method, sources, model_name=None): |
| | if process_method == VR_ARCH_TYPE: |
| | self.vr_cache_source_mapper = {**self.vr_cache_source_mapper, **{model_name: sources}} |
| | if process_method == MDX_ARCH_TYPE: |
| | self.mdx_cache_source_mapper = {**self.mdx_cache_source_mapper, **{model_name: sources}} |
| | if process_method == DEMUCS_ARCH_TYPE: |
| | self.demucs_cache_source_mapper = {**self.demucs_cache_source_mapper, **{model_name: sources}} |
| | |
| | def cached_source_callback(self, process_method, model_name=None): |
| | model, sources = None, None |
| | |
| | if process_method == VR_ARCH_TYPE: |
| | mapper = self.vr_cache_source_mapper |
| | if process_method == MDX_ARCH_TYPE: |
| | mapper = self.mdx_cache_source_mapper |
| | if process_method == DEMUCS_ARCH_TYPE: |
| | mapper = self.demucs_cache_source_mapper |
| | |
| | for key, value in mapper.items(): |
| | if model_name in key: |
| | model = key |
| | sources = value |
| | |
| | return model, sources |
| |
|
| | def cached_source_model_list_check(self, model_list: List[ModelData]): |
| | model: ModelData |
| | primary_model_names = lambda process_method:[model.model_basename if model.process_method == process_method else None for model in model_list] |
| | secondary_model_names = lambda process_method:[model.secondary_model.model_basename if model.is_secondary_model_activated and model.process_method == process_method else None for model in model_list] |
| |
|
| | self.vr_primary_model_names = primary_model_names(VR_ARCH_TYPE) |
| | self.mdx_primary_model_names = primary_model_names(MDX_ARCH_TYPE) |
| | self.demucs_primary_model_names = primary_model_names(DEMUCS_ARCH_TYPE) |
| | self.vr_secondary_model_names = secondary_model_names(VR_ARCH_TYPE) |
| | self.mdx_secondary_model_names = secondary_model_names(MDX_ARCH_TYPE) |
| | self.demucs_secondary_model_names = [model.secondary_model.model_basename if model.is_secondary_model_activated and model.process_method == DEMUCS_ARCH_TYPE and not model.secondary_model is None else None for model in model_list] |
| | self.demucs_pre_proc_model_name = [model.pre_proc_model.model_basename if model.pre_proc_model else None for model in model_list] |
| | |
| | for model in model_list: |
| | if model.process_method == DEMUCS_ARCH_TYPE and model.is_demucs_4_stem_secondaries: |
| | if not model.is_4_stem_ensemble: |
| | self.demucs_secondary_model_names = model.secondary_model_4_stem_model_names_list |
| | break |
| | else: |
| | for i in model.secondary_model_4_stem_model_names_list: |
| | self.demucs_secondary_model_names.append(i) |
| | |
| | self.all_models = self.vr_primary_model_names + self.mdx_primary_model_names + self.demucs_primary_model_names + self.vr_secondary_model_names + self.mdx_secondary_model_names + self.demucs_secondary_model_names + self.demucs_pre_proc_model_name |
| |
|
| | def process(self, model_name, arch_type, audio_file, export_path, is_model_sample_mode=False, is_4_stem_ensemble=False, set_progress_func=None, console_write=print) -> SeperateAttributes: |
| | stime = time.perf_counter() |
| | time_elapsed = lambda:f'Time Elapsed: {time.strftime("%H:%M:%S", time.gmtime(int(time.perf_counter() - stime)))}' |
| |
|
| | if arch_type==ENSEMBLE_MODE: |
| | model_list, ensemble = self.assemble_model_data(), Ensembler() |
| | export_path = ensemble.ensemble_folder_name |
| | is_ensemble = True |
| | else: |
| | model_list = self.assemble_model_data(model_name, arch_type) |
| | is_ensemble = False |
| | self.cached_source_model_list_check(model_list) |
| | model = model_list[0] |
| |
|
| | if self.verify_audio(audio_file): |
| | audio_file = self.create_sample(audio_file) if is_model_sample_mode else audio_file |
| | else: |
| | print(f'"{os.path.basename(audio_file)}\" is missing or currupted.\n') |
| | exit() |
| |
|
| | audio_file_base = f"{os.path.splitext(os.path.basename(audio_file))[0]}" |
| | audio_file_base = audio_file_base if is_ensemble else f"{round(time.time())}_{audio_file_base}" |
| | audio_file_base = audio_file_base if not is_ensemble else f"{audio_file_base}_{model.model_basename}" |
| | if not is_ensemble: |
| | audio_file_base = f"{audio_file_base}_{model.model_basename}" |
| |
|
| | if not is_ensemble: |
| | export_path = os.path.join(Path(export_path), model.model_basename, os.path.splitext(os.path.basename(audio_file))[0]) |
| | if not os.path.isdir(export_path): |
| | os.makedirs(export_path) |
| |
|
| | if set_progress_func is None: |
| | pbar = tqdm(total=1) |
| | self._progress = 0 |
| | def set_progress_func(step, inference_iterations=0): |
| | progress_curr = step + inference_iterations |
| | pbar.update(progress_curr-self._progress) |
| | self._progress = progress_curr |
| |
|
| | def postprocess(): |
| | pbar.close() |
| | else: |
| | def postprocess(): |
| | pass |
| |
|
| | process_data = { |
| | 'model_data': model, |
| | 'export_path': export_path, |
| | 'audio_file_base': audio_file_base, |
| | 'audio_file': audio_file, |
| | 'set_progress_bar': set_progress_func, |
| | 'write_to_console': lambda progress_text, base_text='': console_write(base_text + progress_text), |
| | 'process_iteration': lambda:None, |
| | 'cached_source_callback': self.cached_source_callback, |
| | 'cached_model_source_holder': self.cached_model_source_holder, |
| | 'list_all_models': self.all_models, |
| | 'is_ensemble_master': is_ensemble, |
| | 'is_4_stem_ensemble': is_ensemble and is_4_stem_ensemble |
| | } |
| | if model.process_method == VR_ARCH_TYPE: |
| | seperator = SeperateVR(model, process_data) |
| | if model.process_method == MDX_ARCH_TYPE: |
| | seperator = SeperateMDX(model, process_data) |
| | if model.process_method == DEMUCS_ARCH_TYPE: |
| | seperator = SeperateDemucs(model, process_data) |
| |
|
| | seperator.seperate() |
| | postprocess() |
| |
|
| | if is_ensemble: |
| | audio_file_base = audio_file_base.replace(f"_{model.model_basename}", "") |
| | console_write(ENSEMBLING_OUTPUTS) |
| | |
| | if is_4_stem_ensemble: |
| | for output_stem in DEMUCS_4_SOURCE_LIST: |
| | ensemble.ensemble_outputs(audio_file_base, export_path, output_stem, is_4_stem=True) |
| | else: |
| | if not root.is_secondary_stem_only_var.get(): |
| | ensemble.ensemble_outputs(audio_file_base, export_path, PRIMARY_STEM) |
| | if not root.is_primary_stem_only_var.get(): |
| | ensemble.ensemble_outputs(audio_file_base, export_path, SECONDARY_STEM) |
| | ensemble.ensemble_outputs(audio_file_base, export_path, SECONDARY_STEM, is_inst_mix=True) |
| |
|
| | console_write(DONE) |
| |
|
| | if is_model_sample_mode: |
| | if os.path.isfile(audio_file): |
| | os.remove(audio_file) |
| |
|
| | torch.cuda.empty_cache() |
| | |
| | if is_ensemble and len(os.listdir(export_path)) == 0: |
| | shutil.rmtree(export_path) |
| | console_write(f'Process Complete, using time: {time_elapsed()}\nOutput path: {export_path}') |
| | self.cached_sources_clear() |
| | return seperator |
| |
|
| |
|
| | class RootWrapper: |
| | def __init__(self, var) -> None: |
| | self.var=var |
| | |
| | def set(self, val): |
| | self.var=val |
| | |
| | def get(self): |
| | return self.var |
| |
|
| | class FakeRoot: |
| | def __init__(self) -> None: |
| | self.wav_type_set = 'PCM_16' |
| | self.vr_hash_MAPPER = load_model_hash_data(VR_HASH_JSON) |
| | self.mdx_hash_MAPPER = load_model_hash_data(MDX_HASH_JSON) |
| | self.mdx_name_select_MAPPER = load_model_hash_data(MDX_MODEL_NAME_SELECT) |
| | self.demucs_name_select_MAPPER = load_model_hash_data(DEMUCS_MODEL_NAME_SELECT) |
| | |
| | def __getattribute__(self, __name: str): |
| | try: |
| | return super().__getattribute__(__name) |
| | except AttributeError: |
| | wrapped=RootWrapper(None) |
| | super().__setattr__(__name, wrapped) |
| | return wrapped |
| |
|
| | def load_saved_settings(self, loaded_setting: dict, process_method=None): |
| | """Loads user saved application settings or resets to default""" |
| | |
| | for key, value in DEFAULT_DATA.items(): |
| | if not key in loaded_setting.keys(): |
| | loaded_setting = {**loaded_setting, **{key:value}} |
| | loaded_setting['batch_size'] = DEF_OPT |
| | |
| | is_ensemble = True if process_method == ENSEMBLE_MODE else False |
| | |
| | if not process_method or process_method == VR_ARCH_PM or is_ensemble: |
| | self.vr_model_var.set(loaded_setting['vr_model']) |
| | self.aggression_setting_var.set(loaded_setting['aggression_setting']) |
| | self.window_size_var.set(loaded_setting['window_size']) |
| | self.batch_size_var.set(loaded_setting['batch_size']) |
| | self.crop_size_var.set(loaded_setting['crop_size']) |
| | self.is_tta_var.set(loaded_setting['is_tta']) |
| | self.is_output_image_var.set(loaded_setting['is_output_image']) |
| | self.is_post_process_var.set(loaded_setting['is_post_process']) |
| | self.is_high_end_process_var.set(loaded_setting['is_high_end_process']) |
| | self.post_process_threshold_var.set(loaded_setting['post_process_threshold']) |
| | self.vr_voc_inst_secondary_model_var.set(loaded_setting['vr_voc_inst_secondary_model']) |
| | self.vr_other_secondary_model_var.set(loaded_setting['vr_other_secondary_model']) |
| | self.vr_bass_secondary_model_var.set(loaded_setting['vr_bass_secondary_model']) |
| | self.vr_drums_secondary_model_var.set(loaded_setting['vr_drums_secondary_model']) |
| | self.vr_is_secondary_model_activate_var.set(loaded_setting['vr_is_secondary_model_activate']) |
| | self.vr_voc_inst_secondary_model_scale_var.set(loaded_setting['vr_voc_inst_secondary_model_scale']) |
| | self.vr_other_secondary_model_scale_var.set(loaded_setting['vr_other_secondary_model_scale']) |
| | self.vr_bass_secondary_model_scale_var.set(loaded_setting['vr_bass_secondary_model_scale']) |
| | self.vr_drums_secondary_model_scale_var.set(loaded_setting['vr_drums_secondary_model_scale']) |
| | |
| | if not process_method or process_method == DEMUCS_ARCH_TYPE or is_ensemble: |
| | self.demucs_model_var.set(loaded_setting['demucs_model']) |
| | self.segment_var.set(loaded_setting['segment']) |
| | self.overlap_var.set(loaded_setting['overlap']) |
| | self.shifts_var.set(loaded_setting['shifts']) |
| | self.chunks_demucs_var.set(loaded_setting['chunks_demucs']) |
| | self.margin_demucs_var.set(loaded_setting['margin_demucs']) |
| | self.is_chunk_demucs_var.set(loaded_setting['is_chunk_demucs']) |
| | self.is_chunk_mdxnet_var.set(loaded_setting['is_chunk_mdxnet']) |
| | self.is_primary_stem_only_Demucs_var.set(loaded_setting['is_primary_stem_only_Demucs']) |
| | self.is_secondary_stem_only_Demucs_var.set(loaded_setting['is_secondary_stem_only_Demucs']) |
| | self.is_split_mode_var.set(loaded_setting['is_split_mode']) |
| | self.is_demucs_combine_stems_var.set(loaded_setting['is_demucs_combine_stems']) |
| | self.demucs_voc_inst_secondary_model_var.set(loaded_setting['demucs_voc_inst_secondary_model']) |
| | self.demucs_other_secondary_model_var.set(loaded_setting['demucs_other_secondary_model']) |
| | self.demucs_bass_secondary_model_var.set(loaded_setting['demucs_bass_secondary_model']) |
| | self.demucs_drums_secondary_model_var.set(loaded_setting['demucs_drums_secondary_model']) |
| | self.demucs_is_secondary_model_activate_var.set(loaded_setting['demucs_is_secondary_model_activate']) |
| | self.demucs_voc_inst_secondary_model_scale_var.set(loaded_setting['demucs_voc_inst_secondary_model_scale']) |
| | self.demucs_other_secondary_model_scale_var.set(loaded_setting['demucs_other_secondary_model_scale']) |
| | self.demucs_bass_secondary_model_scale_var.set(loaded_setting['demucs_bass_secondary_model_scale']) |
| | self.demucs_drums_secondary_model_scale_var.set(loaded_setting['demucs_drums_secondary_model_scale']) |
| | self.demucs_stems_var.set(loaded_setting['demucs_stems']) |
| | |
| | self.demucs_pre_proc_model_var.set(data['demucs_pre_proc_model']) |
| | self.is_demucs_pre_proc_model_activate_var.set(data['is_demucs_pre_proc_model_activate']) |
| | self.is_demucs_pre_proc_model_inst_mix_var.set(data['is_demucs_pre_proc_model_inst_mix']) |
| | |
| | if not process_method or process_method == MDX_ARCH_TYPE or is_ensemble: |
| | self.mdx_net_model_var.set(loaded_setting['mdx_net_model']) |
| | self.chunks_var.set(loaded_setting['chunks']) |
| | self.margin_var.set(loaded_setting['margin']) |
| | self.compensate_var.set(loaded_setting['compensate']) |
| | self.is_denoise_var.set(loaded_setting['is_denoise']) |
| | self.is_invert_spec_var.set(loaded_setting['is_invert_spec']) |
| | self.is_mixer_mode_var.set(loaded_setting['is_mixer_mode']) |
| | self.mdx_batch_size_var.set(loaded_setting['mdx_batch_size']) |
| | self.mdx_voc_inst_secondary_model_var.set(loaded_setting['mdx_voc_inst_secondary_model']) |
| | self.mdx_other_secondary_model_var.set(loaded_setting['mdx_other_secondary_model']) |
| | self.mdx_bass_secondary_model_var.set(loaded_setting['mdx_bass_secondary_model']) |
| | self.mdx_drums_secondary_model_var.set(loaded_setting['mdx_drums_secondary_model']) |
| | self.mdx_is_secondary_model_activate_var.set(loaded_setting['mdx_is_secondary_model_activate']) |
| | self.mdx_voc_inst_secondary_model_scale_var.set(loaded_setting['mdx_voc_inst_secondary_model_scale']) |
| | self.mdx_other_secondary_model_scale_var.set(loaded_setting['mdx_other_secondary_model_scale']) |
| | self.mdx_bass_secondary_model_scale_var.set(loaded_setting['mdx_bass_secondary_model_scale']) |
| | self.mdx_drums_secondary_model_scale_var.set(loaded_setting['mdx_drums_secondary_model_scale']) |
| | |
| | if not process_method or is_ensemble: |
| | self.is_save_all_outputs_ensemble_var.set(loaded_setting['is_save_all_outputs_ensemble']) |
| | self.is_append_ensemble_name_var.set(loaded_setting['is_append_ensemble_name']) |
| | self.chosen_audio_tool_var.set(loaded_setting['chosen_audio_tool']) |
| | self.choose_algorithm_var.set(loaded_setting['choose_algorithm']) |
| | self.time_stretch_rate_var.set(loaded_setting['time_stretch_rate']) |
| | self.pitch_rate_var.set(loaded_setting['pitch_rate']) |
| | self.is_primary_stem_only_var.set(loaded_setting['is_primary_stem_only']) |
| | self.is_secondary_stem_only_var.set(loaded_setting['is_secondary_stem_only']) |
| | self.is_testing_audio_var.set(loaded_setting['is_testing_audio']) |
| | self.is_add_model_name_var.set(loaded_setting['is_add_model_name']) |
| | self.is_accept_any_input_var.set(loaded_setting["is_accept_any_input"]) |
| | self.is_task_complete_var.set(loaded_setting['is_task_complete']) |
| | self.is_create_model_folder_var.set(loaded_setting['is_create_model_folder']) |
| | self.mp3_bit_set_var.set(loaded_setting['mp3_bit_set']) |
| | self.save_format_var.set(loaded_setting['save_format']) |
| | self.wav_type_set_var.set(loaded_setting['wav_type_set']) |
| | self.user_code_var.set(loaded_setting['user_code']) |
| | |
| | self.is_gpu_conversion_var.set(loaded_setting['is_gpu_conversion']) |
| | self.is_normalization_var.set(loaded_setting['is_normalization']) |
| | self.help_hints_var.set(loaded_setting['help_hints_var']) |
| | |
| | self.model_sample_mode_var.set(loaded_setting['model_sample_mode']) |
| | self.model_sample_mode_duration_var.set(loaded_setting['model_sample_mode_duration']) |
| |
|
| |
|
| | root = FakeRoot() |
| | root.load_saved_settings(DEFAULT_DATA) |