import os import sys import subprocess import re import platform import torch import logging import json import copy import spaces from audio_separator.separator import Separator from variable import * import yt_dlp def download_audio(url, output_dir="ytdl"): os.makedirs(output_dir, exist_ok=True) ydl_opts = { 'format': 'bestaudio/best', 'postprocessors': [{ 'key': 'FFmpegExtractAudio', 'preferredcodec': 'wav', 'preferredquality': '32', }], 'outtmpl': os.path.join(output_dir, '%(title)s.%(ext)s'), 'postprocessor_args': [ '-acodec', 'pcm_f32le' ], } try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=False) video_title = info['title'] ydl.download([url]) file_path = os.path.join(output_dir, f"{video_title}.wav") if os.path.exists(file_path): return os.path.abspath(file_path) else: raise Exception("Something went wrong") except Exception as e: raise Exception(f"Error extracting audio with yt-dlp: {str(e)}") def leaderboard(list_filter): try: if python_location: command = [python_location, separator_location, "-l", f"--list_filter={list_filter}"] else: command = [separator_location, "-l", f"--list_filter={list_filter}"] result = subprocess.run( command, capture_output=True, text=True, ) if result.returncode != 0: return f"Error: {result.stderr}" return "" + "".join( f"" + "".join(f"" for cell in re.split(r"\s{2,}", line.strip())) + "" for i, line in enumerate(re.findall(r"^(?!-+)(.+)$", result.stdout.strip(), re.MULTILINE)) ) + "
{cell}
" except Exception as e: return f"Error: {e}" def get_language_settings(): with open(config_file, "r", encoding="utf8") as file: config = json.load(file) if config["lang"]["override"] == False: return "Language automatically detected by system" else: return config["lang"]["selected_lang"] def save_lang_settings(selected_language): with open(config_file, "r", encoding="utf8") as file: config = json.load(file) if selected_language == "Language automatically detected by system": config["lang"]["override"] = False else: config["lang"]["override"] = True config["lang"]["selected_lang"] = selected_language gr.Info(i18n("Language have been saved. Restart UVR5 UI to apply the changes")) with open(config_file, "w", encoding="utf8") as file: json.dump(config, file, indent=2) def alternative_model_downloader(method, key, output_dir="models", progress=gr.Progress()): logs.clear() with open(models_file, 'r', encoding='utf-8') as file: model_data = json.load(file) if key not in model_data: return f"Model '{key}' cannot be found." total_files = len(model_data[key]) progress(0, desc="Starting downloads...") for i, url in enumerate(model_data[key]): filename = os.path.basename(urllib.parse.urlparse(url).path) full_name = os.path.join(output_dir, filename) if os.path.exists(full_name): logs.append(f"{filename} already exists.") continue progress((i + 0.1) / total_files, desc=f"Starting download of {filename} ({i+1}/{total_files})") if method == 'wget': cmd = ['wget', '--progress=bar:force', '-O', full_name, url] elif method == 'curl': cmd = ['curl', '-L', '-#', '-o', full_name, url] try: process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, bufsize=1 ) for line in process.stderr: if method == 'wget' and '%' in line: try: percent = int(line.strip().split('%')[0].split()[-1]) file_progress = percent / 100.0 total_progress = (i + file_progress) / total_files progress(total_progress, desc=f"File {i+1}/{total_files}: {filename} ({percent}%)") except (ValueError, IndexError): pass elif method == 'curl' and '##' in line: try: hash_count = line.count('#') file_progress = min(hash_count / 50.0, 1.0) total_progress = (i + file_progress) / total_files percent = int(file_progress * 100) progress(total_progress, desc=f"File {i+1}/{total_files}: {filename} ({percent}%)") except Exception: pass process.wait() if process.returncode != 0: logs.append(f"Error downloading {filename}") else: logs.append(f"{filename} downloaded successfully!") progress((i + 1) / total_files, desc=f"File {i+1}/{total_files} completed") except Exception as e: logs.append(f"Error running download command: {str(e)}") progress(1.0, desc="Download process completed") return "\n".join(logs) def read_main_config(): try: with open(config_file, "r", encoding="utf8") as f: return json.load(f) except Exception as e: print(f"Error reading main config file '{config_file}': {e}") gr.Warning(i18n("Error reading main config file")) def write_main_config(data): try: with open(config_file, "w", encoding="utf8") as f: json.dump(data, f, indent=2) except Exception as e: print(f"Error writing to main config file '{config_file}': {e}") gr.Warning(i18n("Error writing to main config file")) def load_settings_from_file(filepath): try: with open(filepath, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: print(f"Error reading settings file '{filepath}': {e}") gr.Warning(i18n("Error reading settings file")) return None def get_initial_settings(): main_config = read_main_config() load_custom = main_config.get('load_custom_settings', False) settings_to_load = {} default_settings = load_settings_from_file(default_settings_file) if load_custom: print("Attempting to load custom settings...") custom_settings = load_settings_from_file(custom_settings_file) if custom_settings: settings_to_load = copy.deepcopy(default_settings) for section, params in custom_settings.items(): if section in settings_to_load: for key, value in params.items(): settings_to_load[section][key] = value else: settings_to_load[section] = params print("Custom settings loaded successfully.") else: print("Custom settings file not found or invalid. Falling back to default settings.") settings_to_load = default_settings else: print("Loading default settings...") settings_to_load = default_settings return settings_to_load initial_settings = get_initial_settings() def get_all_components(components_dict): all_comps = [] for section in components_dict.values(): all_comps.extend(section.values()) return all_comps def save_current_settings(*values): global components try: current_config_data = {} value_index = 0 for section_name, section_comps in components.items(): current_config_data[section_name] = {} for comp_name in section_comps.keys(): current_config_data[section_name][comp_name] = values[value_index] value_index += 1 with open(custom_settings_file, 'w', encoding='utf-8') as f: json.dump(current_config_data, f, indent=4) main_config = read_main_config() main_config['load_custom_settings'] = True write_main_config(main_config) gr.Info(i18n("Current settings saved successfully! They will be loaded next time")) except Exception as e: print(f"Error saving settings: {e}") gr.Warning(i18n("Error saving settings")) def reset_settings_to_default(): global components, default_settings_file updates = [] all_comps_flat = get_all_components(components) try: default_settings = load_settings_from_file(default_settings_file) for section_name, section_comps in components.items(): for comp_name, comp_instance in section_comps.items(): default_value = default_settings.get(section_name, {}).get(comp_name, None) if isinstance(comp_instance, gr.Dropdown) and hasattr(comp_instance, 'choices') and default_value is not None: if default_value not in comp_instance.choices: print(f"Warning: Default value '{default_value}' for '{comp_name}' not in choices {comp_instance.choices}. Setting to None.") default_value = None updates.append(gr.update(value=default_value)) main_config = read_main_config() main_config['load_custom_settings'] = False write_main_config(main_config) gr.Info(i18n("Settings reset to default. Default settings will be loaded next time")) return updates except Exception as e: print(f"Error resetting settings: {e}") gr.Warning(i18n("Error resetting settings")) return [gr.update() for _ in all_comps_flat] components = { "Roformer": {}, "MDX23C": {}, "MDX-NET": {}, "VR Arch": {}, "Demucs": {} } @track_presence("Performing BS/Mel Roformer Separation") @spaces.GPU(duration=60) def roformer_separator(audio, model_key, out_format, segment_size, override_seg_size, overlap, batch_size, norm_thresh, amp_thresh, single_stem, progress=gr.Progress(track_tqdm=True)): roformer_model = roformer_models[model_key] model_path = os.path.join(models_dir, roformer_model) try: if not os.path.exists(model_path): gr.Info(f"This is the first time the {model_key} model is being used. The separation will take a little longer because the model needs to be downloaded.") separator = Separator( log_level=logging.WARNING, model_file_dir=models_dir, output_dir=out_dir, output_format=out_format, use_autocast=use_autocast, normalization_threshold=norm_thresh, amplification_threshold=amp_thresh, output_single_stem=single_stem, mdxc_params={ "segment_size": segment_size, "override_model_segment_size": override_seg_size, "batch_size": batch_size, "overlap": overlap, } ) progress(0.2, desc="Loading model...") separator.load_model(model_filename=roformer_model) progress(0.7, desc="Separating audio...") separation = separator.separate(audio) stems = [os.path.join(out_dir, file_name) for file_name in separation] if single_stem.strip(): return stems[0], None return stems[0], stems[1] except Exception as e: raise RuntimeError(f"Roformer separation failed: {e}") from e @track_presence("Performing MDXC Separationn") @spaces.GPU(duration=60) def mdxc_separator(audio, model, out_format, segment_size, override_seg_size, overlap, batch_size, norm_thresh, amp_thresh, single_stem, progress=gr.Progress(track_tqdm=True)): model_path = os.path.join(models_dir, model) try: if not os.path.exists(model_path): gr.Info(f"This is the first time the {model} model is being used. The separation will take a little longer because the model needs to be downloaded.") separator = Separator( log_level=logging.WARNING, model_file_dir=models_dir, output_dir=out_dir, output_format=out_format, use_autocast=use_autocast, normalization_threshold=norm_thresh, amplification_threshold=amp_thresh, output_single_stem=single_stem, mdxc_params={ "segment_size": segment_size, "override_model_segment_size": override_seg_size, "batch_size": batch_size, "overlap": overlap, } ) progress(0.2, desc="Loading model...") separator.load_model(model_filename=model) progress(0.7, desc="Separating audio...") separation = separator.separate(audio) stems = [os.path.join(out_dir, file_name) for file_name in separation] if single_stem.strip(): return stems[0], None return stems[0], stems[1] except Exception as e: raise RuntimeError(f"MDX23C separation failed: {e}") from e @track_presence("Performing MDX-NET Separation") @spaces.GPU(duration=60) def mdxnet_separator(audio, model, out_format, hop_length, segment_size, denoise, overlap, batch_size, norm_thresh, amp_thresh, single_stem, progress=gr.Progress(track_tqdm=True)): model_path = os.path.join(models_dir, model) try: if not os.path.exists(model_path): gr.Info(f"This is the first time the {model} model is being used. The separation will take a little longer because the model needs to be downloaded.") separator = Separator( log_level=logging.WARNING, model_file_dir=models_dir, output_dir=out_dir, output_format=out_format, use_autocast=use_autocast, normalization_threshold=norm_thresh, amplification_threshold=amp_thresh, output_single_stem=single_stem, mdx_params={ "hop_length": hop_length, "segment_size": segment_size, "overlap": overlap, "batch_size": batch_size, "enable_denoise": denoise, } ) progress(0.2, desc="Loading model...") separator.load_model(model_filename=model) progress(0.7, desc="Separating audio...") separation = separator.separate(audio) stems = [os.path.join(out_dir, file_name) for file_name in separation] if single_stem.strip(): return stems[0], None return stems[0], stems[1] except Exception as e: raise RuntimeError(f"MDX-NET separation failed: {e}") from e @track_presence("Performing VR Arch Separation") @spaces.GPU(duration=60) def vrarch_separator(audio, model, out_format, window_size, aggression, tta, post_process, post_process_threshold, high_end_process, batch_size, norm_thresh, amp_thresh, single_stem, progress=gr.Progress(track_tqdm=True)): model_path = os.path.join(models_dir, model) try: if not os.path.exists(model_path): gr.Info(f"This is the first time the {model} model is being used. The separation will take a little longer because the model needs to be downloaded.") separator = Separator( log_level=logging.WARNING, model_file_dir=models_dir, output_dir=out_dir, output_format=out_format, use_autocast=use_autocast, normalization_threshold=norm_thresh, amplification_threshold=amp_thresh, output_single_stem=single_stem, vr_params={ "batch_size": batch_size, "window_size": window_size, "aggression": aggression, "enable_tta": tta, "enable_post_process": post_process, "post_process_threshold": post_process_threshold, "high_end_process": high_end_process, } ) progress(0.2, desc="Loading model...") separator.load_model(model_filename=model) progress(0.7, desc="Separating audio...") separation = separator.separate(audio) stems = [os.path.join(out_dir, file_name) for file_name in separation] if single_stem.strip(): return stems[0], None return stems[0], stems[1] except Exception as e: raise RuntimeError(f"VR ARCH separation failed: {e}") from e @track_presence("Performing Demucs Separation") @spaces.GPU(duration=60) def demucs_separator(audio, model, out_format, shifts, segment_size, segments_enabled, overlap, batch_size, norm_thresh, amp_thresh, progress=gr.Progress(track_tqdm=True)): model_path = os.path.join(models_dir, model) try: if not os.path.exists(model_path): gr.Info(f"This is the first time the {model} model is being used. The separation will take a little longer because the model needs to be downloaded.") separator = Separator( log_level=logging.WARNING, model_file_dir=models_dir, output_dir=out_dir, output_format=out_format, use_autocast=use_autocast, normalization_threshold=norm_thresh, amplification_threshold=amp_thresh, demucs_params={ "batch_size": batch_size, "segment_size": segment_size, "shifts": shifts, "overlap": overlap, "segments_enabled": segments_enabled, } ) progress(0.2, desc="Loading model...") separator.load_model(model_filename=model) progress(0.7, desc="Separating audio...") separation = separator.separate(audio) stems = [os.path.join(out_dir, file_name) for file_name in separation] if model == "htdemucs_6s.yaml": return stems[0], stems[1], stems[2], stems[3], stems[4], stems[5] else: return stems[0], stems[1], stems[2], stems[3], None, None except Exception as e: raise RuntimeError(f"Demucs separation failed: {e}") from e def update_stems(model): if model == "htdemucs_6s.yaml": return gr.update(visible=True) else: return gr.update(visible=False) @track_presence("Performing BS/Mel Roformer Batch Separation") @spaces.GPU(duration=60) def roformer_batch(path_input, path_output, model_key, out_format, segment_size, override_seg_size, overlap, batch_size, norm_thresh, amp_thresh, single_stem, progress=gr.Progress()): found_files.clear() logs.clear() roformer_model = roformer_models[model_key] model_path = os.path.join(models_dir, roformer_model) if not os.path.exists(model_path): gr.Info(f"This is the first time the {model_key} model is being used. The separation will take a little longer because the model needs to be downloaded.") for audio_files in os.listdir(path_input): if audio_files.endswith(extensions): found_files.append(audio_files) total_files = len(found_files) if total_files == 0: logs.append("No valid audio files.") return "\n".join(logs) else: logs.append(f"{total_files} audio files found") found_files.sort() progress(0, desc="Starting processing...") for i, audio_files in enumerate(found_files): progress((i / total_files), desc=f"Processing file {i+1}/{total_files}") file_path = os.path.join(path_input, audio_files) try: separator = Separator( log_level=logging.WARNING, model_file_dir=models_dir, output_dir=path_output, output_format=out_format, use_autocast=use_autocast, normalization_threshold=norm_thresh, amplification_threshold=amp_thresh, output_single_stem=single_stem, mdxc_params={ "segment_size": segment_size, "override_model_segment_size": override_seg_size, "batch_size": batch_size, "overlap": overlap, } ) logs.append("Loading model...") separator.load_model(model_filename=roformer_model) logs.append(f"Separating file: {audio_files}") separator.separate(file_path) logs.append(f"File: {audio_files} separated!") except Exception as e: raise RuntimeError(f"BS/Mel Roformer batch separation failed: {e}") from e progress(1.0, desc="Processing complete") return "\n".join(logs) @track_presence("Performing MDXC Batch Separation") @spaces.GPU(duration=60) def mdx23c_batch(path_input, path_output, model, out_format, segment_size, override_seg_size, overlap, batch_size, norm_thresh, amp_thresh, single_stem, progress=gr.Progress()): found_files.clear() logs.clear() model_path = os.path.join(models_dir, model) if not os.path.exists(model_path): gr.Info(f"This is the first time the {model} model is being used. The separation will take a little longer because the model needs to be downloaded.") for audio_files in os.listdir(path_input): if audio_files.endswith(extensions): found_files.append(audio_files) total_files = len(found_files) if total_files == 0: logs.append("No valid audio files.") return "\n".join(logs) else: logs.append(f"{total_files} audio files found") found_files.sort() progress(0, desc="Starting processing...") for i, audio_files in enumerate(found_files): progress((i / total_files), desc=f"Processing file {i+1}/{total_files}") file_path = os.path.join(path_input, audio_files) try: separator = Separator( log_level=logging.WARNING, model_file_dir=models_dir, output_dir=path_output, output_format=out_format, use_autocast=use_autocast, normalization_threshold=norm_thresh, amplification_threshold=amp_thresh, output_single_stem=single_stem, mdxc_params={ "segment_size": segment_size, "override_model_segment_size": override_seg_size, "batch_size": batch_size, "overlap": overlap, } ) logs.append("Loading model...") separator.load_model(model_filename=model) logs.append(f"Separating file: {audio_files}") separator.separate(file_path) logs.append(f"File: {audio_files} separated!") except Exception as e: raise RuntimeError(f"MDXC batch separation failed: {e}") from e progress(1.0, desc="Processing complete") return "\n".join(logs) @track_presence("Performing MDX-NET Batch Separation") @spaces.GPU(duration=60) def mdxnet_batch(path_input, path_output, model, out_format, hop_length, segment_size, denoise, overlap, batch_size, norm_thresh, amp_thresh, single_stem, progress=gr.Progress()): found_files.clear() logs.clear() model_path = os.path.join(models_dir, model) if not os.path.exists(model_path): gr.Info(f"This is the first time the {model} model is being used. The separation will take a little longer because the model needs to be downloaded.") for audio_files in os.listdir(path_input): if audio_files.endswith(extensions): found_files.append(audio_files) total_files = len(found_files) if total_files == 0: logs.append("No valid audio files.") return "\n".join(logs) else: logs.append(f"{total_files} audio files found") found_files.sort() progress(0, desc="Starting processing...") for i, audio_files in enumerate(found_files): progress((i / total_files), desc=f"Processing file {i+1}/{total_files}") file_path = os.path.join(path_input, audio_files) try: separator = Separator( log_level=logging.WARNING, model_file_dir=models_dir, output_dir=path_output, output_format=out_format, use_autocast=use_autocast, normalization_threshold=norm_thresh, amplification_threshold=amp_thresh, output_single_stem=single_stem, mdx_params={ "hop_length": hop_length, "segment_size": segment_size, "overlap": overlap, "batch_size": batch_size, "enable_denoise": denoise, } ) logs.append("Loading model...") separator.load_model(model_filename=model) logs.append(f"Separating file: {audio_files}") separator.separate(file_path) logs.append(f"File: {audio_files} separated!") except Exception as e: raise RuntimeError(f"MDX-NET batch separation failed: {e}") from e progress(1.0, desc="Processing complete") return "\n".join(logs) @track_presence("Performing VR Arch Batch Separation") @spaces.GPU(duration=60) def vrarch_batch(path_input, path_output, model, out_format, window_size, aggression, tta, post_process, post_process_threshold, high_end_process, batch_size, norm_thresh, amp_thresh, single_stem, progress=gr.Progress()): found_files.clear() logs.clear() model_path = os.path.join(models_dir, model) if not os.path.exists(model_path): gr.Info(f"This is the first time the {model} model is being used. The separation will take a little longer because the model needs to be downloaded.") for audio_files in os.listdir(path_input): if audio_files.endswith(extensions): found_files.append(audio_files) total_files = len(found_files) if total_files == 0: logs.append("No valid audio files.") return "\n".join(logs) else: logs.append(f"{total_files} audio files found") found_files.sort() progress(0, desc="Starting processing...") for i, audio_files in enumerate(found_files): progress((i / total_files), desc=f"Processing file {i+1}/{total_files}") file_path = os.path.join(path_input, audio_files) try: separator = Separator( log_level=logging.WARNING, model_file_dir=models_dir, output_dir=path_output, output_format=out_format, use_autocast=use_autocast, normalization_threshold=norm_thresh, amplification_threshold=amp_thresh, output_single_stem=single_stem, vr_params={ "batch_size": batch_size, "window_size": window_size, "aggression": aggression, "enable_tta": tta, "enable_post_process": post_process, "post_process_threshold": post_process_threshold, "high_end_process": high_end_process, } ) logs.append("Loading model...") separator.load_model(model_filename=model) logs.append(f"Separating file: {audio_files}") separator.separate(file_path) logs.append(f"File: {audio_files} separated!") except Exception as e: raise RuntimeError(f"VR Arch batch separation failed: {e}") from e progress(1.0, desc="Processing complete") return "\n".join(logs) @track_presence("Performing Demucs Batch Separation") @spaces.GPU(duration=60) def demucs_batch(path_input, path_output, model, out_format, shifts, segment_size, segments_enabled, overlap, batch_size, norm_thresh, amp_thresh, progress=gr.Progress()): found_files.clear() logs.clear() model_path = os.path.join(models_dir, model) if not os.path.exists(model_path): gr.Info(f"This is the first time the {model} model is being used. The separation will take a little longer because the model needs to be downloaded.") for audio_files in os.listdir(path_input): if audio_files.endswith(extensions): found_files.append(audio_files) total_files = len(found_files) if total_files == 0: logs.append("No valid audio files.") return "\n".join(logs) else: logs.append(f"{total_files} audio files found") found_files.sort() progress(0, desc="Starting processing...") for i, audio_files in enumerate(found_files): progress((i / total_files), desc=f"Processing file {i+1}/{total_files}") file_path = os.path.join(path_input, audio_files) try: separator = Separator( log_level=logging.WARNING, model_file_dir=models_dir, output_dir=path_output, output_format=out_format, use_autocast=use_autocast, normalization_threshold=norm_thresh, amplification_threshold=amp_thresh, demucs_params={ "batch_size": batch_size, "segment_size": segment_size, "shifts": shifts, "overlap": overlap, "segments_enabled": segments_enabled, } ) logs.append("Loading model...") separator.load_model(model_filename=model) logs.append(f"Separating file: {audio_files}") separator.separate(file_path) logs.append(f"File: {audio_files} separated!") except Exception as e: raise RuntimeError(f"Demucs batch separation failed: {e}") from e progress(1.0, desc="Processing complete") return "\n".join(logs)