Spaces:
Build error
Build error
| import os | |
| import gradio as gr | |
| import spaces | |
| from infer_rvc_python import BaseLoader | |
| import random | |
| import logging | |
| import time | |
| import soundfile as sf | |
| from infer_rvc_python.main import download_manager | |
| import zipfile | |
| import edge_tts | |
| import asyncio | |
| import librosa | |
| import traceback | |
| import soundfile as sf | |
| from pedalboard import Pedalboard, Reverb, Compressor, HighpassFilter | |
| from pedalboard.io import AudioFile | |
| from pydub import AudioSegment | |
| import noisereduce as nr | |
| import numpy as np | |
| import urllib.request | |
| import shutil | |
| import threading | |
| logging.getLogger("infer_rvc_python").setLevel(logging.ERROR) | |
| # Ensure the correct path to the models directory | |
| model_dir = os.path.join(os.path.dirname(__file__), "models") | |
| converter = BaseLoader(only_cpu=False, hubert_path=None, rmvpe_path=None) | |
| title = "<center><strong><font size='7'>Vodex AI</font></strong></center>" | |
| theme = "aliabid94/new-theme" | |
| def find_files(directory): | |
| file_paths = [] | |
| for filename in os.listdir(directory): | |
| if filename.endswith('.pth') or filename.endswith('.zip') or filename.endswith('.index'): | |
| file_paths.append(os.path.join(directory, filename)) | |
| return file_paths | |
| def unzip_in_folder(my_zip, my_dir): | |
| with zipfile.ZipFile(my_zip) as zip: | |
| for zip_info in zip.infolist(): | |
| if zip_info.is_dir(): | |
| continue | |
| zip_info.filename = os.path.basename(zip_info.filename) | |
| zip.extract(zip_info, my_dir) | |
| def find_my_model(a_, b_): | |
| if a_ is None or a_.endswith(".pth"): | |
| return a_, b_ | |
| txt_files = [] | |
| for base_file in [a_, b_]: | |
| if base_file is not None and base_file.endswith(".txt"): | |
| txt_files.append(base_file) | |
| directory = os.path.dirname(a_) | |
| for txt in txt_files: | |
| with open(txt, 'r') as file: | |
| first_line = file.readline() | |
| download_manager( | |
| url=first_line.strip(), | |
| path=directory, | |
| extension="", | |
| ) | |
| for f in find_files(directory): | |
| if f.endswith(".zip"): | |
| unzip_in_folder(f, directory) | |
| model = None | |
| index = None | |
| end_files = find_files(directory) | |
| for ff in end_files: | |
| if ff.endswith(".pth"): | |
| model = os.path.join(directory, ff) | |
| gr.Info(f"Model found: {ff}") | |
| if ff.endswith(".index"): | |
| index = os.path.join(directory, ff) | |
| gr.Info(f"Index found: {ff}") | |
| if not model: | |
| gr.Error(f"Model not found in: {end_files}") | |
| if not index: | |
| gr.Warning("Index not found") | |
| return model, index | |
| def get_file_size(url): | |
| if "huggingface" not in url: | |
| raise ValueError("Only downloads from Hugging Face are allowed") | |
| try: | |
| with urllib.request.urlopen(url) as response: | |
| info = response.info() | |
| content_length = info.get("Content-Length") | |
| file_size = int(content_length) | |
| if file_size > 500000000: | |
| raise ValueError("The file is too large. You can only download files up to 500 MB in size.") | |
| except Exception as e: | |
| raise e | |
| def clear_files(directory): | |
| time.sleep(15) | |
| print(f"Clearing files: {directory}.") | |
| shutil.rmtree(directory) | |
| def get_my_model(url_data): | |
| if not url_data: | |
| return None, None | |
| if "," in url_data: | |
| a_, b_ = url_data.split() | |
| a_, b_ = a_.strip().replace("/blob/", "/resolve/"), b_.strip().replace("/blob/", "/resolve/") | |
| else: | |
| a_, b_ = url_data.strip().replace("/blob/", "/resolve/"), None | |
| out_dir = "downloads" | |
| folder_download = str(random.randint(1000, 9999)) | |
| directory = os.path.join(out_dir, folder_download) | |
| os.makedirs(directory, exist_ok=True) | |
| try: | |
| get_file_size(a_) | |
| if b_: | |
| get_file_size(b_) | |
| valid_url = [a_] if not b_ else [a_, b_] | |
| for link in valid_url: | |
| download_manager( | |
| url=link, | |
| path=directory, | |
| extension="", | |
| ) | |
| for f in find_files(directory): | |
| if f.endswith(".zip"): | |
| unzip_in_folder(f, directory) | |
| model = None | |
| index = None | |
| end_files = find_files(directory) | |
| for ff in end_files: | |
| if ff.endswith(".pth"): | |
| model = ff | |
| gr.Info(f"Model found: {ff}") | |
| if ff.endswith(".index"): | |
| index = ff | |
| gr.Info(f"Index found: {ff}") | |
| if not model: | |
| raise ValueError(f"Model not found in: {end_files}") | |
| if not index: | |
| gr.Warning("Index not found") | |
| else: | |
| index = os.path.abspath(index) | |
| return os.path.abspath(model), index | |
| except Exception as e: | |
| raise e | |
| finally: | |
| t = threading.Thread(target=clear_files, args=(directory,)) | |
| t.start() | |
| def convert_now(audio_files, random_tag, converter): | |
| return converter( | |
| audio_files, | |
| random_tag, | |
| overwrite=False, | |
| parallel_workers=8 | |
| ) | |
| def apply_noisereduce(audio_list): | |
| print("Applying noise reduction") | |
| result = [] | |
| for audio_path in audio_list: | |
| out_path = f'{os.path.splitext(audio_path)[0]}_noisereduce.wav' | |
| try: | |
| # Load audio file | |
| audio = AudioSegment.from_file(audio_path) | |
| # Convert audio to numpy array | |
| samples = np.array(audio.get_array_of_samples()) | |
| reduced_noise = nr.reduce_noise(samples, sr=audio.frame_rate, prop_decrease=0.6) | |
| reduced_audio = AudioSegment( | |
| reduced_noise.tobytes(), | |
| frame_rate=audio.frame_rate, | |
| sample_width=audio.sample_width, | |
| channels=audio.channels | |
| ) | |
| reduced_audio.export(out_path, format="wav") | |
| result.append(out_path) | |
| except Exception as e: | |
| traceback.print_exc() | |
| print(f"Error in noise reduction: {str(e)}") | |
| result.append(audio_path) | |
| return result | |
| def run(audio_files, file_m, file_index): | |
| if not audio_files: | |
| raise ValueError("Please provide an audio file.") | |
| if isinstance(audio_files, str): | |
| audio_files = [audio_files] | |
| try: | |
| duration_base = librosa.get_duration(filename=audio_files[0]) | |
| print("Duration:", duration_base) | |
| except Exception as e: | |
| print(e) | |
| file_m = os.path.join(model_dir, file_m) | |
| file_index = os.path.join(model_dir, file_index) if file_index else None | |
| random_tag = "USER_" + str(random.randint(10000000, 99999999)) | |
| converter.apply_conf( | |
| tag=random_tag, | |
| file_model=file_m, | |
| pitch_algo="rmvpe+", | |
| pitch_lvl=0, | |
| file_index=file_index, | |
| index_influence=0.75, | |
| respiration_median_filtering=3, | |
| envelope_ratio=0.25, | |
| consonant_breath_protection=0.5, | |
| resample_sr=44100 if audio_files[0].endswith('.mp3') else 0, | |
| ) | |
| time.sleep(0.1) | |
| result = convert_now(audio_files, random_tag, converter) | |
| result = apply_noisereduce(result) | |
| return result, result[0] # Assuming result is a list of file paths | |
| def process_audio(audio_file, uploaded_files, file_m, file_index): | |
| if audio_file is not None: | |
| result, _ = run([audio_file], file_m, file_index) | |
| elif uploaded_files is not None: | |
| result, _ = run(uploaded_files, file_m, file_index) | |
| # Create a mapping from filenames to full paths | |
| file_mapping = {os.path.basename(path): path for path in result} | |
| filenames = list(file_mapping.keys()) | |
| # Return the file_mapping, updated dropdown, initial playback file, and list of all files for download | |
| return file_mapping, gr.update(choices=filenames, value=filenames[0], visible=True), file_mapping[filenames[0]], result | |
| def update_audio_selection(selected_filename, file_mapping): | |
| if file_mapping is None: | |
| raise ValueError("File mapping is not available.") | |
| if selected_filename not in file_mapping: | |
| raise ValueError(f"Selected filename {selected_filename} not found in file mapping.") | |
| # Use the selected filename to find the full path from the mapping | |
| full_path = file_mapping[selected_filename] | |
| return gr.update(value=full_path, visible=True) | |
| def switch_input(input_type): | |
| if input_type == "Record Audio": | |
| return gr.update(visible=True), gr.update(visible=False) | |
| else: | |
| return gr.update(visible=False), gr.update(visible=True) | |
| def model_conf(): | |
| model_files = [f for f in os.listdir(model_dir) if f.endswith(".pth")] | |
| return gr.Dropdown( | |
| label="Select Model File", | |
| choices=model_files, | |
| value=model_files[0] if model_files else None, | |
| interactive=True, | |
| ) | |
| def index_conf(): | |
| index_files = [f for f in os.listdir(model_dir) if f.endswith(".index")] | |
| return gr.Dropdown( | |
| label="Select Index File", | |
| choices=index_files, | |
| value=index_files[0] if index_files else None, | |
| interactive=True, | |
| ) | |
| def audio_conf(): | |
| return gr.Audio( | |
| label="Upload or Record Audio", | |
| sources=["upload", "microphone"], # Allows recording via microphone | |
| type="filepath", | |
| ) | |
| def button_conf(): | |
| return gr.Button( | |
| "Inference", | |
| variant="primary", | |
| ) | |
| def output_conf(): | |
| return gr.File(label="Result", file_count="multiple", interactive=False), gr.Audio(label="Play Result",visible=False,show_share_button=False) | |
| def get_gui(theme): | |
| with gr.Blocks(theme=theme, delete_cache=(3200, 3200)) as app: | |
| gr.Markdown(title) | |
| input_type = gr.Radio(["Record Audio", "Upload Files"], label="Select Input Method", value="Record Audio") | |
| audio = gr.Audio(label="Record Audio", sources="microphone", type="filepath", visible=True) | |
| files = gr.File(label="Upload Audio Files", type="filepath", file_count="multiple", visible=False) | |
| input_type.change(switch_input, inputs=[input_type], outputs=[audio, files]) | |
| model = model_conf() | |
| indx = index_conf() | |
| button_base = button_conf() | |
| dropdown = gr.Dropdown(choices=[], label="Select Processed Audio", visible=False) | |
| output_audio = gr.Audio(label="Play Selected Audio", visible=False,show_share_button=False) | |
| output_files = gr.File(label="Download Processed Audio", file_count="multiple", interactive=False) | |
| # output_file, output_audio = output_conf() | |
| file_mapping_state = gr.State() | |
| button_base.click( | |
| process_audio, | |
| inputs=[audio, files, model, indx], | |
| outputs=[file_mapping_state, dropdown, output_audio, output_files], # Store file mapping in state | |
| ) | |
| dropdown.change( | |
| update_audio_selection, | |
| inputs=[dropdown, file_mapping_state], # Pass the state (file_mapping) and dropdown selection | |
| outputs=output_audio, # Play the selected audio file using the full path | |
| ) | |
| # gr.Examples( | |
| # examples=[ | |
| # ["./test.ogg", "./model.pth", "./model.index"], | |
| # ["./example2/test2.ogg", "./example2/model.pth", "./example2/model.index"], | |
| # ], | |
| # fn=process_audio, | |
| # inputs=[audio, files, model, indx], | |
| # outputs=[output_file, output_audio], | |
| # cache_examples=False, | |
| # ) | |
| return app | |
| if __name__ == "__main__": | |
| app = get_gui(theme) | |
| app.queue(default_concurrency_limit=40) | |
| app.launch( | |
| max_threads=40, | |
| share=False, | |
| show_error=True, | |
| quiet=False, | |
| debug=False, | |
| allowed_paths=["./downloads/"], | |
| ) | |