Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import os | |
| import shutil | |
| import asyncio | |
| import librosa | |
| import librosa.display | |
| import soundfile as sf | |
| import numpy as np | |
| import time | |
| import zipfile | |
| import tempfile | |
| import matplotlib.pyplot as plt | |
| import matplotlib | |
| # Use a non-interactive backend for Matplotlib | |
| matplotlib.use('Agg') | |
| def update_output_visibility(choice): | |
| """Updates the visibility of stem outputs based on user's choice.""" | |
| if "2 Stems" in choice: | |
| return { | |
| vocals_output: gr.update(visible=True), | |
| drums_output: gr.update(visible=False), | |
| bass_output: gr.update(visible=False), | |
| other_output: gr.update(visible=True, label="Instrumental (No Vocals)") | |
| } | |
| elif "4 Stems" in choice: | |
| return { | |
| vocals_output: gr.update(visible=True), | |
| drums_output: gr.update(visible=True), | |
| bass_output: gr.update(visible=True), | |
| other_output: gr.update(visible=True, label="Other") | |
| } | |
| async def separate_stems(audio_file_path, stem_choice): | |
| """ | |
| Separates audio into stems using Demucs. | |
| This is an async generator function that yields updates to the UI. | |
| """ | |
| if audio_file_path is None: | |
| raise gr.Error("No audio file uploaded!") | |
| log_history = "Starting separation...\n" | |
| yield { | |
| status_log: log_history, | |
| progress_bar: gr.Progress(0, desc="Starting...", visible=True) | |
| } | |
| try: | |
| log_history += "Preparing audio file...\n" | |
| yield { | |
| status_log: log_history, | |
| progress_bar: gr.Progress(0.05, desc="Preparing...") | |
| } | |
| original_filename_base = os.path.basename(audio_file_path).rsplit('.', 1)[0] | |
| # Create a stable path to avoid issues with temp files | |
| stable_input_path = f"stable_input_{original_filename_base}.wav" | |
| shutil.copy(audio_file_path, stable_input_path) | |
| model_arg = "--two-stems=vocals" if "2 Stems" in stem_choice else "" | |
| output_dir = "separated" | |
| if os.path.exists(output_dir): | |
| shutil.rmtree(output_dir) | |
| command = f"python3 -m demucs {model_arg} -o \"{output_dir}\" \"{stable_input_path}\"" | |
| log_history += f"Running Demucs command: {command}\n(This may take a minute or more depending on track length)\n" | |
| yield { | |
| status_log: log_history, | |
| progress_bar: gr.Progress(0.2, desc="Running Demucs...") | |
| } | |
| process = await asyncio.create_subprocess_shell( | |
| command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) | |
| stdout, stderr = await process.communicate() | |
| if process.returncode != 0: | |
| print(f"Demucs Error: {stderr.decode()}") | |
| raise gr.Error(f"Demucs failed. Is it installed? Error: {stderr.decode()[:500]}") | |
| log_history += "Demucs finished. Locating stem files...\n" | |
| yield { | |
| status_log: log_history, | |
| progress_bar: gr.Progress(0.8, desc="Locating stems...") | |
| } | |
| stable_filename_base = os.path.basename(stable_input_path).rsplit('.', 1)[0] | |
| subfolders = [f.name for f in os.scandir(output_dir) if f.is_dir()] | |
| if not subfolders: | |
| raise gr.Error("Demucs output folder structure not found!") | |
| # Assume the first subfolder is the one Demucs created | |
| model_folder_name = subfolders[0] | |
| stems_path = os.path.join(output_dir, model_folder_name, stable_filename_base) | |
| if not os.path.exists(stems_path): | |
| # Fallback: sometimes demucs doesn't create the innermost folder | |
| stems_path = os.path.join(output_dir, model_folder_name) | |
| if not (os.path.exists(os.path.join(stems_path, "vocals.wav")) or os.path.exists(os.path.join(stems_path, "no_vocals.wav"))): | |
| raise gr.Error(f"Demucs output directory was not found! Looked for: {stems_path}") | |
| vocals_path = os.path.join(stems_path, "vocals.wav") if os.path.exists(os.path.join(stems_path, "vocals.wav")) else None | |
| drums_path = os.path.join(stems_path, "drums.wav") if os.path.exists(os.path.join(stems_path, "drums.wav")) else None | |
| bass_path = os.path.join(stems_path, "bass.wav") if os.path.exists(os.path.join(stems_path, "bass.wav")) else None | |
| other_filename = "no_vocals.wav" if "2 Stems" in stem_choice else "other.wav" | |
| other_path = os.path.join(stems_path, other_filename) if os.path.exists(os.path.join(stems_path, other_filename)) else None | |
| # Clean up the copied input file | |
| os.remove(stable_input_path) | |
| log_history += "✅ Stem separation complete!\n" | |
| yield { | |
| status_log: log_history, | |
| progress_bar: gr.Progress(1, desc="Complete!", visible=False), # Hide progress bar when done | |
| vocals_output: gr.update(value=vocals_path), | |
| drums_output: gr.update(value=drums_path), | |
| bass_output: gr.update(value=bass_path), | |
| other_output: gr.update(value=other_path) | |
| } | |
| except Exception as e: | |
| print(f"An error occurred during separation: {e}") | |
| # Clean up input file on error too | |
| if 'stable_input_path' in locals() and os.path.exists(stable_input_path): | |
| os.remove(stable_input_path) | |
| yield { | |
| status_log: log_history + f"❌ ERROR: {e}", | |
| progress_bar: gr.update(visible=False) # Hide progress bar on error | |
| } | |
| def slice_stem_real(stem_audio_data, loop_choice, sensitivity, stem_name, progress_fn=None): | |
| """ | |
| The core logic for slicing a single stem. | |
| Accepts an optional progress_fn callback for use in loops. | |
| """ | |
| if stem_audio_data is None: | |
| return None, None | |
| sample_rate, y_int = stem_audio_data | |
| # Convert from int16 to float | |
| y = librosa.util.buf_to_float(y_int, dtype=np.float32) | |
| # Ensure y is 1D or 2D | |
| if y.ndim == 0: | |
| print(f"Warning: Empty audio data for {stem_name}.") | |
| return None, None | |
| y_mono = librosa.to_mono(y.T) if y.ndim > 1 else y | |
| if progress_fn: progress_fn(0.1, desc="Detecting BPM...") | |
| tempo, beats = librosa.beat.beat_track(y=y_mono, sr=sample_rate) | |
| bpm = 120 if tempo is None or tempo == 0 else np.round(tempo) | |
| bpm_int = int(bpm.item()) | |
| if bpm_int == 0: | |
| bpm_int = 120 # Final fallback | |
| print("BPM detection failed, defaulting to 120 BPM.") | |
| print(f"Detected BPM for {stem_name}: {bpm_int}") | |
| output_files = [] | |
| loops_dir = tempfile.mkdtemp() | |
| if "One-Shots" in loop_choice: | |
| if progress_fn: progress_fn(0.3, desc="Finding transients...") | |
| # Adjust onset detection parameters | |
| onset_frames = librosa.onset.onset_detect( | |
| y=y_mono, sr=sample_rate, delta=sensitivity, | |
| wait=1, pre_avg=1, post_avg=1, post_max=1, units='frames' | |
| ) | |
| onset_samples = librosa.frames_to_samples(onset_frames) | |
| if progress_fn: progress_fn(0.5, desc="Slicing one-shots...") | |
| if len(onset_samples) > 0: | |
| num_onsets = len(onset_samples) | |
| for i, start_sample in enumerate(onset_samples): | |
| end_sample = onset_samples[i+1] if i+1 < num_onsets else len(y) | |
| # Use original stereo/mono data for the slice | |
| slice_data = y[start_sample:end_sample] | |
| filename = os.path.join(loops_dir, f"{stem_name}_one_shot_{i+1:03d}.wav") | |
| sf.write(filename, slice_data, sample_rate, subtype='PCM_16') | |
| output_files.append(filename) | |
| if progress_fn and num_onsets > 1 and i > 0: | |
| progress = 0.5 + (i / (num_onsets - 1) * 0.5) | |
| progress_fn(progress, desc=f"Exporting slice {i+1}/{num_onsets}...") | |
| else: | |
| # Loop slicing logic | |
| bars = int(loop_choice.split(" ")[0]) | |
| seconds_per_beat = 60.0 / bpm_int | |
| seconds_per_bar = seconds_per_beat * 4 # Assuming 4/4 time | |
| loop_duration_seconds = seconds_per_bar * bars | |
| loop_duration_samples = int(loop_duration_seconds * sample_rate) | |
| if loop_duration_samples == 0: | |
| print(f"Loop duration is 0 for {stem_name}. BPM: {bpm_int}") | |
| return None, None | |
| if progress_fn: progress_fn(0.4, desc=f"Slicing into {bars}-bar loops...") | |
| num_loops = len(y) // loop_duration_samples | |
| if num_loops == 0: | |
| print(f"Audio for {stem_name} is too short for {bars}-bar loops at {bpm_int} BPM.") | |
| return None, None | |
| for i in range(num_loops): | |
| start_sample = i * loop_duration_samples | |
| end_sample = start_sample + loop_duration_samples | |
| # Use original stereo/mono data for the slice | |
| slice_data = y[start_sample:end_sample] | |
| filename = os.path.join(loops_dir, f"{stem_name}_{bars}bar_loop_{i+1:03d}_{bpm_int}bpm.wav") | |
| sf.write(filename, slice_data, sample_rate, subtype='PCM_16') | |
| output_files.append(filename) | |
| if progress_fn and num_loops > 1 and i > 0: | |
| progress = 0.4 + (i / (num_loops - 1) * 0.6) | |
| progress_fn(progress, desc=f"Exporting loop {i+1}/{num_loops}...") | |
| if not output_files: | |
| return None, None | |
| return output_files, loops_dir | |
| async def slice_all_and_zip_real(vocals, drums, bass, other, loop_choice, sensitivity): | |
| """ | |
| Slices all available stems and packages them into a ZIP file. | |
| This is an async generator function that yields updates to the UI. | |
| """ | |
| log_history = "Starting batch slice...\n" | |
| yield { | |
| status_log: log_history, | |
| progress_bar: gr.Progress(0, desc="Starting...", visible=True) | |
| } | |
| await asyncio.sleep(0.1) # Give UI time to update | |
| stems_to_process = {"vocals": vocals, "drums": drums, "bass": bass, "other": other} | |
| zip_path = "Loop_Architect_Pack.zip" | |
| num_stems = sum(1 for data in stems_to_process.values() if data is not None) | |
| if num_stems == 0: | |
| raise gr.Error("No stems to process! Please separate stems first.") | |
| all_temp_dirs = [] | |
| try: | |
| with zipfile.ZipFile(zip_path, 'w') as zf: | |
| processed_count = 0 | |
| for name, data in stems_to_process.items(): | |
| if data is not None: | |
| log_history += f"--- Slicing {name} stem ---\n" | |
| yield { status_log: log_history } | |
| # This progress callback updates the main progress bar | |
| def update_main_progress(p, desc=""): | |
| overall_progress = (processed_count + p) / num_stems | |
| # We yield from the outer function's scope | |
| yield { | |
| progress_bar: gr.Progress(overall_progress, desc=f"Slicing {name}: {desc}", visible=True) | |
| } | |
| # We need a way to pass a generator-based progress fn | |
| # to a normal function. We can't. | |
| # Instead, we'll just update the progress bar *within* this loop | |
| # and pass a simple lambda that does nothing to slice_stem_real. | |
| # A more complex refactor would make slice_stem_real a generator. | |
| yield { progress_bar: gr.Progress(processed_count / num_stems, desc=f"Slicing {name}...", visible=True) } | |
| # This is a local callback just for slice_stem_real | |
| def simple_progress_callback(p, desc=""): | |
| # This doesn't update the UI, it's just a placeholder | |
| # We update the UI *around* this call. | |
| print(f"Slice Progress {name}: {p} - {desc}") # Log to console | |
| sliced_files, temp_dir = slice_stem_real( | |
| (data[0], data[1]), loop_choice, sensitivity, name, | |
| progress_fn=simple_progress_callback | |
| ) | |
| if sliced_files: | |
| log_history += f"Generated {len(sliced_files)} slices for {name}.\n" | |
| all_temp_dirs.append(temp_dir) | |
| for loop_file in sliced_files: | |
| arcname = os.path.join(name, os.path.basename(loop_file)) | |
| zf.write(loop_file, arcname) | |
| else: | |
| log_history += f"No slices generated for {name}.\n" | |
| processed_count += 1 | |
| yield { | |
| status_log: log_history, | |
| progress_bar: gr.Progress(processed_count / num_stems, desc=f"Finished {name}", visible=True) | |
| } | |
| log_history += "Packaging complete!\n" | |
| yield { | |
| status_log: log_history + "✅ Pack ready for download!", | |
| progress_bar: gr.Progress(1, desc="Pack Ready!", visible=False), | |
| download_zip_file: gr.update(value=zip_path, visible=True) | |
| } | |
| except Exception as e: | |
| print(f"An error occurred during slice all: {e}") | |
| yield { | |
| status_log: log_history + f"❌ ERROR: {e}", | |
| progress_bar: gr.update(visible=False) | |
| } | |
| finally: | |
| # Clean up all temporary directories | |
| for d in all_temp_dirs: | |
| if d and os.path.exists(d): | |
| shutil.rmtree(d) | |
| # --- Create the full Gradio Interface --- | |
| with gr.Blocks(theme=gr.themes.Default(primary_hue="blue", secondary_hue="red")) as demo: | |
| gr.Markdown("# 🎵 Loop Architect") | |
| gr.Markdown("Upload any song to separate it into stems (vocals, drums, etc.) and then slice those stems into loops or one-shot samples.") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### 1. Separate Stems") | |
| audio_input = gr.Audio(type="filepath", label="Upload a Track") | |
| stem_options = gr.Radio( | |
| ["4 Stems (Vocals, Drums, Bass, Other)", "2 Stems (Vocals + Instrumental)"], | |
| label="Separation Type", | |
| value="4 Stems (Vocals, Drums, Bass, Other)" | |
| ) | |
| submit_button = gr.Button("Separate Stems", variant="primary") | |
| gr.Markdown("### 2. Slicing Options") | |
| with gr.Group(): | |
| loop_options_radio = gr.Radio( | |
| ["One-Shots (All Transients)", "4 Bar Loops", "8 Bar Loops"], | |
| label="Slice Type", | |
| value="One-Shots (All Transients)" | |
| ) | |
| sensitivity_slider = gr.Slider( | |
| minimum=0.01, maximum=0.5, value=0.05, step=0.01, | |
| label="One-Shot Sensitivity", | |
| info="Lower values = more slices" | |
| ) | |
| gr.Markdown("### 3. Create Pack") | |
| slice_all_button = gr.Button("Slice All Stems & Create Pack") | |
| download_zip_file = gr.File(label="Download Your Loop Pack", visible=False) | |
| gr.Markdown("### Status") | |
| # Initialize without any arguments | |
| progress_bar = gr.Progress() | |
| status_log = gr.Textbox(label="Status Log", lines=10, interactive=False) | |
| with gr.Column(scale=2): | |
| with gr.Accordion("Separated Stems (Preview & Slice)", open=True): | |
| with gr.Row(): | |
| vocals_output = gr.Audio(label="Vocals", scale=4) | |
| slice_vocals_btn = gr.Button("Slice Vocals", scale=1) | |
| with gr.Row(): | |
| drums_output = gr.Audio(label="Drums", scale=4) | |
| slice_drums_btn = gr.Button("Slice Drums", scale=1) | |
| with gr.Row(): | |
| bass_output = gr.Audio(label="Bass", scale=4) | |
| slice_bass_btn = gr.Button("Slice Bass", scale=1) | |
| with gr.Row(): | |
| other_output = gr.Audio(label="Other / Instrumental", scale=4) | |
| slice_other_btn = gr.Button("Slice Other", scale=1) | |
| gr.Markdown("### Sliced Loops / Samples (Preview)") | |
| loop_gallery = gr.Gallery( | |
| label="Generated Loops Preview", | |
| columns=8, object_fit="contain", height="auto", preview=True | |
| ) | |
| # --- Define Event Listeners --- | |
| def slice_and_display(stem_data, loop_choice, sensitivity, stem_name): | |
| """ | |
| Generator function to slice a single stem and update UI. | |
| """ | |
| if stem_data is None: | |
| yield { | |
| status_log: f"No {stem_name} audio data to slice.", | |
| progress_bar: gr.update(visible=False) | |
| } | |
| return | |
| log_history = f"Slicing {stem_name}...\n" | |
| # Make progress bar visible when starting | |
| yield { | |
| status_log: log_history, | |
| progress_bar: gr.Progress(0, visible=True, desc=f"Slicing {stem_name}...") | |
| } | |
| # Define a callback for slice_stem_real to update this generator's progress | |
| def update_single_progress(p, desc=""): | |
| # This is a bit of a hack. We can't yield from here. | |
| # We will just rely on the yield before/after this call. | |
| # A full fix would make slice_stem_real a generator. | |
| print(f"Slice Progress {stem_name}: {p} - {desc}") # Log to console | |
| files, temp_dir = slice_stem_real( | |
| stem_data, loop_choice, sensitivity, stem_name, | |
| progress_fn=update_single_progress # Pass the callback | |
| ) | |
| if temp_dir and os.path.exists(temp_dir): | |
| shutil.rmtree(temp_dir) # Clean up temp dir | |
| yield { | |
| loop_gallery: gr.update(value=files), | |
| status_log: log_history + f"✅ Sliced {stem_name} into {len(files) if files else 0} pieces.", | |
| progress_bar: gr.update(visible=False) # Hide progress bar when done | |
| } | |
| # --- MAIN EVENT LISTENERS (FIXED) --- | |
| # This event separates the stems | |
| submit_event = submit_button.click( | |
| fn=separate_stems, | |
| inputs=[audio_input, stem_options], | |
| # Add progress_bar to the outputs list | |
| outputs=[ | |
| vocals_output, drums_output, bass_output, other_output, | |
| status_log # Removed progress_bar from outputs | |
| ] | |
| ) | |
| # This event updates the UI based on the stem choice | |
| stem_options.change( | |
| fn=update_output_visibility, | |
| inputs=stem_options, | |
| outputs=[vocals_output, drums_output, bass_output, other_output] | |
| ) | |
| # --- Single Slice Button Events --- | |
| slice_vocals_btn.click(fn=slice_and_display, inputs=[vocals_output, loop_options_radio, sensitivity_slider, gr.Textbox("vocals", visible=False)], outputs=[loop_gallery, status_log]) # Removed progress_bar | |
| slice_drums_btn.click(fn=slice_and_display, inputs=[drums_output, loop_options_radio, sensitivity_slider, gr.Textbox("drums", visible=False)], outputs=[loop_gallery, status_log]) # Removed progress_bar | |
| slice_bass_btn.click(fn=slice_and_display, inputs=[bass_output, loop_options_radio, sensitivity_slider, gr.Textbox("bass", visible=False)], outputs=[loop_gallery, status_log]) # Removed progress_bar | |
| slice_other_btn.click(fn=slice_and_display, inputs=[other_output, loop_options_radio, sensitivity_slider, gr.Textbox("other", visible=False)], outputs=[loop_gallery, status_log]) # Removed progress_bar | |
| # This event slices all stems and zips them | |
| slice_all_event = slice_all_button.click( | |
| fn=slice_all_and_zip_real, | |
| inputs=[vocals_output, drums_output, bass_output, other_output, loop_options_radio, sensitivity_slider], | |
| # Add progress_bar and fix the duplicate output | |
| outputs=[download_zip_file, status_log] # Removed progress_bar | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(debug=True) |