UVR5_UI / code.py
NeoPy's picture
Upload 5 files
497e986 verified
import os
import sys
import subprocess
import re
import platform
import torch
import logging
import json
import copy
import spaces
from audio_separator.separator import Separator
from variable import *
import yt_dlp
def download_audio(url, output_dir="ytdl"):
os.makedirs(output_dir, exist_ok=True)
ydl_opts = {
'format': 'bestaudio/best',
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'wav',
'preferredquality': '32',
}],
'outtmpl': os.path.join(output_dir, '%(title)s.%(ext)s'),
'postprocessor_args': [
'-acodec', 'pcm_f32le'
],
}
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
video_title = info['title']
ydl.download([url])
file_path = os.path.join(output_dir, f"{video_title}.wav")
if os.path.exists(file_path):
return os.path.abspath(file_path)
else:
raise Exception("Something went wrong")
except Exception as e:
raise Exception(f"Error extracting audio with yt-dlp: {str(e)}")
def leaderboard(list_filter):
try:
if python_location:
command = [python_location, separator_location, "-l", f"--list_filter={list_filter}"]
else:
command = [separator_location, "-l", f"--list_filter={list_filter}"]
result = subprocess.run(
command,
capture_output=True,
text=True,
)
if result.returncode != 0:
return f"Error: {result.stderr}"
return "<table border='1'>" + "".join(
f"<tr style='{'font-weight: bold; font-size: 1.2em;' if i == 0 else ''}'>" +
"".join(f"<td>{cell}</td>" for cell in re.split(r"\s{2,}", line.strip())) +
"</tr>"
for i, line in enumerate(re.findall(r"^(?!-+)(.+)$", result.stdout.strip(), re.MULTILINE))
) + "</table>"
except Exception as e:
return f"Error: {e}"
def get_language_settings():
with open(config_file, "r", encoding="utf8") as file:
config = json.load(file)
if config["lang"]["override"] == False:
return "Language automatically detected by system"
else:
return config["lang"]["selected_lang"]
def save_lang_settings(selected_language):
with open(config_file, "r", encoding="utf8") as file:
config = json.load(file)
if selected_language == "Language automatically detected by system":
config["lang"]["override"] = False
else:
config["lang"]["override"] = True
config["lang"]["selected_lang"] = selected_language
gr.Info(i18n("Language have been saved. Restart UVR5 UI to apply the changes"))
with open(config_file, "w", encoding="utf8") as file:
json.dump(config, file, indent=2)
def alternative_model_downloader(method, key, output_dir="models", progress=gr.Progress()):
logs.clear()
with open(models_file, 'r', encoding='utf-8') as file:
model_data = json.load(file)
if key not in model_data:
return f"Model '{key}' cannot be found."
total_files = len(model_data[key])
progress(0, desc="Starting downloads...")
for i, url in enumerate(model_data[key]):
filename = os.path.basename(urllib.parse.urlparse(url).path)
full_name = os.path.join(output_dir, filename)
if os.path.exists(full_name):
logs.append(f"{filename} already exists.")
continue
progress((i + 0.1) / total_files, desc=f"Starting download of {filename} ({i+1}/{total_files})")
if method == 'wget':
cmd = ['wget', '--progress=bar:force', '-O', full_name, url]
elif method == 'curl':
cmd = ['curl', '-L', '-#', '-o', full_name, url]
try:
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
bufsize=1
)
for line in process.stderr:
if method == 'wget' and '%' in line:
try:
percent = int(line.strip().split('%')[0].split()[-1])
file_progress = percent / 100.0
total_progress = (i + file_progress) / total_files
progress(total_progress, desc=f"File {i+1}/{total_files}: {filename} ({percent}%)")
except (ValueError, IndexError):
pass
elif method == 'curl' and '##' in line:
try:
hash_count = line.count('#')
file_progress = min(hash_count / 50.0, 1.0)
total_progress = (i + file_progress) / total_files
percent = int(file_progress * 100)
progress(total_progress, desc=f"File {i+1}/{total_files}: {filename} ({percent}%)")
except Exception:
pass
process.wait()
if process.returncode != 0:
logs.append(f"Error downloading {filename}")
else:
logs.append(f"{filename} downloaded successfully!")
progress((i + 1) / total_files, desc=f"File {i+1}/{total_files} completed")
except Exception as e:
logs.append(f"Error running download command: {str(e)}")
progress(1.0, desc="Download process completed")
return "\n".join(logs)
def read_main_config():
try:
with open(config_file, "r", encoding="utf8") as f:
return json.load(f)
except Exception as e:
print(f"Error reading main config file '{config_file}': {e}")
gr.Warning(i18n("Error reading main config file"))
def write_main_config(data):
try:
with open(config_file, "w", encoding="utf8") as f:
json.dump(data, f, indent=2)
except Exception as e:
print(f"Error writing to main config file '{config_file}': {e}")
gr.Warning(i18n("Error writing to main config file"))
def load_settings_from_file(filepath):
try:
with open(filepath, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"Error reading settings file '{filepath}': {e}")
gr.Warning(i18n("Error reading settings file"))
return None
def get_initial_settings():
main_config = read_main_config()
load_custom = main_config.get('load_custom_settings', False)
settings_to_load = {}
default_settings = load_settings_from_file(default_settings_file)
if load_custom:
print("Attempting to load custom settings...")
custom_settings = load_settings_from_file(custom_settings_file)
if custom_settings:
settings_to_load = copy.deepcopy(default_settings)
for section, params in custom_settings.items():
if section in settings_to_load:
for key, value in params.items():
settings_to_load[section][key] = value
else:
settings_to_load[section] = params
print("Custom settings loaded successfully.")
else:
print("Custom settings file not found or invalid. Falling back to default settings.")
settings_to_load = default_settings
else:
print("Loading default settings...")
settings_to_load = default_settings
return settings_to_load
initial_settings = get_initial_settings()
def get_all_components(components_dict):
all_comps = []
for section in components_dict.values():
all_comps.extend(section.values())
return all_comps
def save_current_settings(*values):
global components
try:
current_config_data = {}
value_index = 0
for section_name, section_comps in components.items():
current_config_data[section_name] = {}
for comp_name in section_comps.keys():
current_config_data[section_name][comp_name] = values[value_index]
value_index += 1
with open(custom_settings_file, 'w', encoding='utf-8') as f:
json.dump(current_config_data, f, indent=4)
main_config = read_main_config()
main_config['load_custom_settings'] = True
write_main_config(main_config)
gr.Info(i18n("Current settings saved successfully! They will be loaded next time"))
except Exception as e:
print(f"Error saving settings: {e}")
gr.Warning(i18n("Error saving settings"))
def reset_settings_to_default():
global components, default_settings_file
updates = []
all_comps_flat = get_all_components(components)
try:
default_settings = load_settings_from_file(default_settings_file)
for section_name, section_comps in components.items():
for comp_name, comp_instance in section_comps.items():
default_value = default_settings.get(section_name, {}).get(comp_name, None)
if isinstance(comp_instance, gr.Dropdown) and hasattr(comp_instance, 'choices') and default_value is not None:
if default_value not in comp_instance.choices:
print(f"Warning: Default value '{default_value}' for '{comp_name}' not in choices {comp_instance.choices}. Setting to None.")
default_value = None
updates.append(gr.update(value=default_value))
main_config = read_main_config()
main_config['load_custom_settings'] = False
write_main_config(main_config)
gr.Info(i18n("Settings reset to default. Default settings will be loaded next time"))
return updates
except Exception as e:
print(f"Error resetting settings: {e}")
gr.Warning(i18n("Error resetting settings"))
return [gr.update() for _ in all_comps_flat]
components = {
"Roformer": {}, "MDX23C": {}, "MDX-NET": {}, "VR Arch": {}, "Demucs": {}
}
@track_presence("Performing BS/Mel Roformer Separation")
@spaces.GPU(duration=60)
def roformer_separator(audio, model_key, out_format, segment_size, override_seg_size, overlap, batch_size, norm_thresh, amp_thresh, single_stem, progress=gr.Progress(track_tqdm=True)):
roformer_model = roformer_models[model_key]
model_path = os.path.join(models_dir, roformer_model)
try:
if not os.path.exists(model_path):
gr.Info(f"This is the first time the {model_key} model is being used. The separation will take a little longer because the model needs to be downloaded.")
separator = Separator(
log_level=logging.WARNING,
model_file_dir=models_dir,
output_dir=out_dir,
output_format=out_format,
use_autocast=use_autocast,
normalization_threshold=norm_thresh,
amplification_threshold=amp_thresh,
output_single_stem=single_stem,
mdxc_params={
"segment_size": segment_size,
"override_model_segment_size": override_seg_size,
"batch_size": batch_size,
"overlap": overlap,
}
)
progress(0.2, desc="Loading model...")
separator.load_model(model_filename=roformer_model)
progress(0.7, desc="Separating audio...")
separation = separator.separate(audio)
stems = [os.path.join(out_dir, file_name) for file_name in separation]
if single_stem.strip():
return stems[0], None
return stems[0], stems[1]
except Exception as e:
raise RuntimeError(f"Roformer separation failed: {e}") from e
@track_presence("Performing MDXC Separationn")
@spaces.GPU(duration=60)
def mdxc_separator(audio, model, out_format, segment_size, override_seg_size, overlap, batch_size, norm_thresh, amp_thresh, single_stem, progress=gr.Progress(track_tqdm=True)):
model_path = os.path.join(models_dir, model)
try:
if not os.path.exists(model_path):
gr.Info(f"This is the first time the {model} model is being used. The separation will take a little longer because the model needs to be downloaded.")
separator = Separator(
log_level=logging.WARNING,
model_file_dir=models_dir,
output_dir=out_dir,
output_format=out_format,
use_autocast=use_autocast,
normalization_threshold=norm_thresh,
amplification_threshold=amp_thresh,
output_single_stem=single_stem,
mdxc_params={
"segment_size": segment_size,
"override_model_segment_size": override_seg_size,
"batch_size": batch_size,
"overlap": overlap,
}
)
progress(0.2, desc="Loading model...")
separator.load_model(model_filename=model)
progress(0.7, desc="Separating audio...")
separation = separator.separate(audio)
stems = [os.path.join(out_dir, file_name) for file_name in separation]
if single_stem.strip():
return stems[0], None
return stems[0], stems[1]
except Exception as e:
raise RuntimeError(f"MDX23C separation failed: {e}") from e
@track_presence("Performing MDX-NET Separation")
@spaces.GPU(duration=60)
def mdxnet_separator(audio, model, out_format, hop_length, segment_size, denoise, overlap, batch_size, norm_thresh, amp_thresh, single_stem, progress=gr.Progress(track_tqdm=True)):
model_path = os.path.join(models_dir, model)
try:
if not os.path.exists(model_path):
gr.Info(f"This is the first time the {model} model is being used. The separation will take a little longer because the model needs to be downloaded.")
separator = Separator(
log_level=logging.WARNING,
model_file_dir=models_dir,
output_dir=out_dir,
output_format=out_format,
use_autocast=use_autocast,
normalization_threshold=norm_thresh,
amplification_threshold=amp_thresh,
output_single_stem=single_stem,
mdx_params={
"hop_length": hop_length,
"segment_size": segment_size,
"overlap": overlap,
"batch_size": batch_size,
"enable_denoise": denoise,
}
)
progress(0.2, desc="Loading model...")
separator.load_model(model_filename=model)
progress(0.7, desc="Separating audio...")
separation = separator.separate(audio)
stems = [os.path.join(out_dir, file_name) for file_name in separation]
if single_stem.strip():
return stems[0], None
return stems[0], stems[1]
except Exception as e:
raise RuntimeError(f"MDX-NET separation failed: {e}") from e
@track_presence("Performing VR Arch Separation")
@spaces.GPU(duration=60)
def vrarch_separator(audio, model, out_format, window_size, aggression, tta, post_process, post_process_threshold, high_end_process, batch_size, norm_thresh, amp_thresh, single_stem, progress=gr.Progress(track_tqdm=True)):
model_path = os.path.join(models_dir, model)
try:
if not os.path.exists(model_path):
gr.Info(f"This is the first time the {model} model is being used. The separation will take a little longer because the model needs to be downloaded.")
separator = Separator(
log_level=logging.WARNING,
model_file_dir=models_dir,
output_dir=out_dir,
output_format=out_format,
use_autocast=use_autocast,
normalization_threshold=norm_thresh,
amplification_threshold=amp_thresh,
output_single_stem=single_stem,
vr_params={
"batch_size": batch_size,
"window_size": window_size,
"aggression": aggression,
"enable_tta": tta,
"enable_post_process": post_process,
"post_process_threshold": post_process_threshold,
"high_end_process": high_end_process,
}
)
progress(0.2, desc="Loading model...")
separator.load_model(model_filename=model)
progress(0.7, desc="Separating audio...")
separation = separator.separate(audio)
stems = [os.path.join(out_dir, file_name) for file_name in separation]
if single_stem.strip():
return stems[0], None
return stems[0], stems[1]
except Exception as e:
raise RuntimeError(f"VR ARCH separation failed: {e}") from e
@track_presence("Performing Demucs Separation")
@spaces.GPU(duration=60)
def demucs_separator(audio, model, out_format, shifts, segment_size, segments_enabled, overlap, batch_size, norm_thresh, amp_thresh, progress=gr.Progress(track_tqdm=True)):
model_path = os.path.join(models_dir, model)
try:
if not os.path.exists(model_path):
gr.Info(f"This is the first time the {model} model is being used. The separation will take a little longer because the model needs to be downloaded.")
separator = Separator(
log_level=logging.WARNING,
model_file_dir=models_dir,
output_dir=out_dir,
output_format=out_format,
use_autocast=use_autocast,
normalization_threshold=norm_thresh,
amplification_threshold=amp_thresh,
demucs_params={
"batch_size": batch_size,
"segment_size": segment_size,
"shifts": shifts,
"overlap": overlap,
"segments_enabled": segments_enabled,
}
)
progress(0.2, desc="Loading model...")
separator.load_model(model_filename=model)
progress(0.7, desc="Separating audio...")
separation = separator.separate(audio)
stems = [os.path.join(out_dir, file_name) for file_name in separation]
if model == "htdemucs_6s.yaml":
return stems[0], stems[1], stems[2], stems[3], stems[4], stems[5]
else:
return stems[0], stems[1], stems[2], stems[3], None, None
except Exception as e:
raise RuntimeError(f"Demucs separation failed: {e}") from e
def update_stems(model):
if model == "htdemucs_6s.yaml":
return gr.update(visible=True)
else:
return gr.update(visible=False)
@track_presence("Performing BS/Mel Roformer Batch Separation")
@spaces.GPU(duration=60)
def roformer_batch(path_input, path_output, model_key, out_format, segment_size, override_seg_size, overlap, batch_size, norm_thresh, amp_thresh, single_stem, progress=gr.Progress()):
found_files.clear()
logs.clear()
roformer_model = roformer_models[model_key]
model_path = os.path.join(models_dir, roformer_model)
if not os.path.exists(model_path):
gr.Info(f"This is the first time the {model_key} model is being used. The separation will take a little longer because the model needs to be downloaded.")
for audio_files in os.listdir(path_input):
if audio_files.endswith(extensions):
found_files.append(audio_files)
total_files = len(found_files)
if total_files == 0:
logs.append("No valid audio files.")
return "\n".join(logs)
else:
logs.append(f"{total_files} audio files found")
found_files.sort()
progress(0, desc="Starting processing...")
for i, audio_files in enumerate(found_files):
progress((i / total_files), desc=f"Processing file {i+1}/{total_files}")
file_path = os.path.join(path_input, audio_files)
try:
separator = Separator(
log_level=logging.WARNING,
model_file_dir=models_dir,
output_dir=path_output,
output_format=out_format,
use_autocast=use_autocast,
normalization_threshold=norm_thresh,
amplification_threshold=amp_thresh,
output_single_stem=single_stem,
mdxc_params={
"segment_size": segment_size,
"override_model_segment_size": override_seg_size,
"batch_size": batch_size,
"overlap": overlap,
}
)
logs.append("Loading model...")
separator.load_model(model_filename=roformer_model)
logs.append(f"Separating file: {audio_files}")
separator.separate(file_path)
logs.append(f"File: {audio_files} separated!")
except Exception as e:
raise RuntimeError(f"BS/Mel Roformer batch separation failed: {e}") from e
progress(1.0, desc="Processing complete")
return "\n".join(logs)
@track_presence("Performing MDXC Batch Separation")
@spaces.GPU(duration=60)
def mdx23c_batch(path_input, path_output, model, out_format, segment_size, override_seg_size, overlap, batch_size, norm_thresh, amp_thresh, single_stem, progress=gr.Progress()):
found_files.clear()
logs.clear()
model_path = os.path.join(models_dir, model)
if not os.path.exists(model_path):
gr.Info(f"This is the first time the {model} model is being used. The separation will take a little longer because the model needs to be downloaded.")
for audio_files in os.listdir(path_input):
if audio_files.endswith(extensions):
found_files.append(audio_files)
total_files = len(found_files)
if total_files == 0:
logs.append("No valid audio files.")
return "\n".join(logs)
else:
logs.append(f"{total_files} audio files found")
found_files.sort()
progress(0, desc="Starting processing...")
for i, audio_files in enumerate(found_files):
progress((i / total_files), desc=f"Processing file {i+1}/{total_files}")
file_path = os.path.join(path_input, audio_files)
try:
separator = Separator(
log_level=logging.WARNING,
model_file_dir=models_dir,
output_dir=path_output,
output_format=out_format,
use_autocast=use_autocast,
normalization_threshold=norm_thresh,
amplification_threshold=amp_thresh,
output_single_stem=single_stem,
mdxc_params={
"segment_size": segment_size,
"override_model_segment_size": override_seg_size,
"batch_size": batch_size,
"overlap": overlap,
}
)
logs.append("Loading model...")
separator.load_model(model_filename=model)
logs.append(f"Separating file: {audio_files}")
separator.separate(file_path)
logs.append(f"File: {audio_files} separated!")
except Exception as e:
raise RuntimeError(f"MDXC batch separation failed: {e}") from e
progress(1.0, desc="Processing complete")
return "\n".join(logs)
@track_presence("Performing MDX-NET Batch Separation")
@spaces.GPU(duration=60)
def mdxnet_batch(path_input, path_output, model, out_format, hop_length, segment_size, denoise, overlap, batch_size, norm_thresh, amp_thresh, single_stem, progress=gr.Progress()):
found_files.clear()
logs.clear()
model_path = os.path.join(models_dir, model)
if not os.path.exists(model_path):
gr.Info(f"This is the first time the {model} model is being used. The separation will take a little longer because the model needs to be downloaded.")
for audio_files in os.listdir(path_input):
if audio_files.endswith(extensions):
found_files.append(audio_files)
total_files = len(found_files)
if total_files == 0:
logs.append("No valid audio files.")
return "\n".join(logs)
else:
logs.append(f"{total_files} audio files found")
found_files.sort()
progress(0, desc="Starting processing...")
for i, audio_files in enumerate(found_files):
progress((i / total_files), desc=f"Processing file {i+1}/{total_files}")
file_path = os.path.join(path_input, audio_files)
try:
separator = Separator(
log_level=logging.WARNING,
model_file_dir=models_dir,
output_dir=path_output,
output_format=out_format,
use_autocast=use_autocast,
normalization_threshold=norm_thresh,
amplification_threshold=amp_thresh,
output_single_stem=single_stem,
mdx_params={
"hop_length": hop_length,
"segment_size": segment_size,
"overlap": overlap,
"batch_size": batch_size,
"enable_denoise": denoise,
}
)
logs.append("Loading model...")
separator.load_model(model_filename=model)
logs.append(f"Separating file: {audio_files}")
separator.separate(file_path)
logs.append(f"File: {audio_files} separated!")
except Exception as e:
raise RuntimeError(f"MDX-NET batch separation failed: {e}") from e
progress(1.0, desc="Processing complete")
return "\n".join(logs)
@track_presence("Performing VR Arch Batch Separation")
@spaces.GPU(duration=60)
def vrarch_batch(path_input, path_output, model, out_format, window_size, aggression, tta, post_process, post_process_threshold, high_end_process, batch_size, norm_thresh, amp_thresh, single_stem, progress=gr.Progress()):
found_files.clear()
logs.clear()
model_path = os.path.join(models_dir, model)
if not os.path.exists(model_path):
gr.Info(f"This is the first time the {model} model is being used. The separation will take a little longer because the model needs to be downloaded.")
for audio_files in os.listdir(path_input):
if audio_files.endswith(extensions):
found_files.append(audio_files)
total_files = len(found_files)
if total_files == 0:
logs.append("No valid audio files.")
return "\n".join(logs)
else:
logs.append(f"{total_files} audio files found")
found_files.sort()
progress(0, desc="Starting processing...")
for i, audio_files in enumerate(found_files):
progress((i / total_files), desc=f"Processing file {i+1}/{total_files}")
file_path = os.path.join(path_input, audio_files)
try:
separator = Separator(
log_level=logging.WARNING,
model_file_dir=models_dir,
output_dir=path_output,
output_format=out_format,
use_autocast=use_autocast,
normalization_threshold=norm_thresh,
amplification_threshold=amp_thresh,
output_single_stem=single_stem,
vr_params={
"batch_size": batch_size,
"window_size": window_size,
"aggression": aggression,
"enable_tta": tta,
"enable_post_process": post_process,
"post_process_threshold": post_process_threshold,
"high_end_process": high_end_process,
}
)
logs.append("Loading model...")
separator.load_model(model_filename=model)
logs.append(f"Separating file: {audio_files}")
separator.separate(file_path)
logs.append(f"File: {audio_files} separated!")
except Exception as e:
raise RuntimeError(f"VR Arch batch separation failed: {e}") from e
progress(1.0, desc="Processing complete")
return "\n".join(logs)
@track_presence("Performing Demucs Batch Separation")
@spaces.GPU(duration=60)
def demucs_batch(path_input, path_output, model, out_format, shifts, segment_size, segments_enabled, overlap, batch_size, norm_thresh, amp_thresh, progress=gr.Progress()):
found_files.clear()
logs.clear()
model_path = os.path.join(models_dir, model)
if not os.path.exists(model_path):
gr.Info(f"This is the first time the {model} model is being used. The separation will take a little longer because the model needs to be downloaded.")
for audio_files in os.listdir(path_input):
if audio_files.endswith(extensions):
found_files.append(audio_files)
total_files = len(found_files)
if total_files == 0:
logs.append("No valid audio files.")
return "\n".join(logs)
else:
logs.append(f"{total_files} audio files found")
found_files.sort()
progress(0, desc="Starting processing...")
for i, audio_files in enumerate(found_files):
progress((i / total_files), desc=f"Processing file {i+1}/{total_files}")
file_path = os.path.join(path_input, audio_files)
try:
separator = Separator(
log_level=logging.WARNING,
model_file_dir=models_dir,
output_dir=path_output,
output_format=out_format,
use_autocast=use_autocast,
normalization_threshold=norm_thresh,
amplification_threshold=amp_thresh,
demucs_params={
"batch_size": batch_size,
"segment_size": segment_size,
"shifts": shifts,
"overlap": overlap,
"segments_enabled": segments_enabled,
}
)
logs.append("Loading model...")
separator.load_model(model_filename=model)
logs.append(f"Separating file: {audio_files}")
separator.separate(file_path)
logs.append(f"File: {audio_files} separated!")
except Exception as e:
raise RuntimeError(f"Demucs batch separation failed: {e}") from e
progress(1.0, desc="Processing complete")
return "\n".join(logs)