import gradio as gr import os import shutil import asyncio import librosa import soundfile as sf import numpy as np import zipfile import tempfile import matplotlib matplotlib.use('Agg') # --------------------------- # UI helpers # --------------------------- def update_output_visibility(choice): if "2 Stems" in choice: return ( gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), gr.update(visible=True, label="Instrumental (No Vocals)") ) else: # 4 stems return ( gr.update(visible=True), gr.update(visible=True), gr.update(visible=True), gr.update(visible=True, label="Other") ) # --------------------------- # Stem separation (Demucs) # --------------------------- async def separate_stems(audio_file_path, stem_choice, progress=gr.Progress(track_tqdm=True)): # outputs: [vocals_out, drums_out, bass_out, other_out, status_log] if audio_file_path is None: raise gr.Error("No audio file uploaded!") log_history = "Starting separation...\n" yield (gr.update(), gr.update(), gr.update(), gr.update(), log_history) try: progress(0.05, desc="Preparing audio file...") log_history += "Preparing audio file...\n" yield (gr.update(), gr.update(), gr.update(), gr.update(), log_history) original_filename_base = os.path.basename(audio_file_path).rsplit('.', 1)[0] stable_input_path = f"stable_input_{original_filename_base}.wav" # Ensure a .wav input for Demucs (demucs can read many formats, but filepath stability helps) shutil.copy(audio_file_path, stable_input_path) model_arg = "--two-stems=vocals" if "2 Stems" in stem_choice else "" output_dir = "separated" if os.path.exists(output_dir): shutil.rmtree(output_dir) command = f"python -m demucs {model_arg} -o \"{output_dir}\" \"{stable_input_path}\"" log_history += f"Running Demucs command: {command}\n" log_history += "(This may take a minute or more depending on track length)\n" yield (gr.update(), gr.update(), gr.update(), gr.update(), log_history) progress(0.2, desc="Running Demucs...") process = await asyncio.create_subprocess_shell( command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() if process.returncode != 0: raise gr.Error(f"Demucs failed. Error: {stderr.decode(errors='ignore')[:800]}") log_history += "Demucs finished. Locating stem files...\n" progress(0.8, desc="Locating stems...") yield (gr.update(), gr.update(), gr.update(), gr.update(), log_history) stable_filename_base = os.path.basename(stable_input_path).rsplit('.', 1)[0] subfolders = [f.name for f in os.scandir(output_dir) if f.is_dir()] if not subfolders: raise gr.Error("Demucs output folder structure not found!") model_folder_name = subfolders[0] stems_path = os.path.join(output_dir, model_folder_name, stable_filename_base) if not os.path.exists(stems_path): raise gr.Error(f"Demucs output directory was not found! Looked for: {stems_path}") vocals_path = os.path.join(stems_path, "vocals.wav") if os.path.exists(os.path.join(stems_path, "vocals.wav")) else None drums_path = os.path.join(stems_path, "drums.wav") if os.path.exists(os.path.join(stems_path, "drums.wav")) else None bass_path = os.path.join(stems_path, "bass.wav") if os.path.exists(os.path.join(stems_path, "bass.wav")) else None other_name = "no_vocals.wav" if "2 Stems" in stem_choice else "other.wav" other_path = os.path.join(stems_path, other_name) if os.path.exists(os.path.join(stems_path, other_name)) else None try: os.remove(stable_input_path) except Exception: pass log_history += "✅ Stem separation complete!\n" yield ( gr.update(value=vocals_path), gr.update(value=drums_path), gr.update(value=bass_path), gr.update(value=other_path), log_history ) except Exception as e: err = f"❌ ERROR: {e}" print(f"Separation error: {e}") yield (gr.update(), gr.update(), gr.update(), gr.update(), log_history + err) # --------------------------- # Slicing (BPM override + Quantized grid) # --------------------------- def _grid_beats_per_step(grid_label: str) -> float: # "1/16 (Sixteenth)" -> "1/16" tok = grid_label.split(" ")[0].strip() mapping = {"1/1": 4.0, "1/2": 2.0, "1/4": 1.0, "1/8": 0.5, "1/16": 0.25} return mapping.get(tok, 0.25) def _load_audio_any(stem_input): """ Accepts a filepath string OR (sr, numpy_array) and returns (sr, mono_float_array). We use filepath everywhere in this app to keep memory stable. """ if stem_input is None: return None, None if isinstance(stem_input, str): y, sr = librosa.load(stem_input, sr=None, mono=False) else: sr, y = stem_input y = librosa.util.buf_to_float(y) y_mono = librosa.to_mono(y) if y.ndim > 1 else y return sr, y_mono def slice_stem_real( stem_input, loop_choice, sensitivity, stem_name, progress_fn=None, bpm_override_val: int = 0, quantize: bool = True, grid_label: str = "1/16 (Sixteenth)", ): sr, y_mono = _load_audio_any(stem_input) if sr is None or y_mono is None: return None, None if progress_fn: progress_fn(0.1, desc="Detecting BPM...") tempo, _ = librosa.beat.beat_track(y=y_mono, sr=sr) det_bpm = int(np.round(tempo)) if tempo and tempo > 0 else 120 bpm = int(bpm_override_val) if bpm_override_val and bpm_override_val > 0 else det_bpm if bpm <= 0: bpm = 120 output_files = [] loops_dir = tempfile.mkdtemp() if "One-Shots" in loop_choice: if progress_fn: progress_fn(0.3, desc="Finding transients...") onset_frames = librosa.onset.onset_detect( y=y_mono, sr=sr, delta=sensitivity, wait=1, pre_avg=1, post_avg=1, post_max=1 ) onset_samples = librosa.frames_to_samples(onset_frames) # Quantize onsets to musical grid if enabled if quantize and len(onset_samples) > 0: beats_per_step = _grid_beats_per_step(grid_label) samples_per_beat = sr * (60.0 / bpm) step = max(1, int(round(samples_per_beat * beats_per_step))) q = np.clip(np.round(onset_samples / step) * step, 0, len(y_mono) - 1).astype(int) q = np.unique(q) # dedupe after snapping onset_samples = q if progress_fn: progress_fn(0.5, desc="Slicing one-shots...") if len(onset_samples) > 0: num_onsets = len(onset_samples) min_len = max(1, int(0.02 * sr)) # 20ms min slice guard for i, start_sample in enumerate(onset_samples): end_sample = onset_samples[i+1] if i+1 < num_onsets else len(y_mono) if end_sample - start_sample < min_len: continue slice_data = y_mono[start_sample:end_sample] filename = os.path.join(loops_dir, f"{stem_name}_one_shot_{i+1:03d}.wav") sf.write(filename, slice_data, sr, subtype='PCM_16') output_files.append(filename) if progress_fn and num_onsets > 1: progress_fn(0.5 + (i / (num_onsets - 1) * 0.5), desc=f"Exporting slice {i+1}/{num_onsets}...") else: # Grid-true by construction; honor BPM override bars = int(loop_choice.split(" ")[0]) seconds_per_beat = 60.0 / bpm seconds_per_bar = seconds_per_beat * 4 loop_duration_seconds = seconds_per_bar * bars loop_duration_samples = int(loop_duration_seconds * sr) if progress_fn: progress_fn(0.4, desc=f"Slicing into {bars}-bar loops @ {bpm} BPM...") num_loops = len(y_mono) // loop_duration_samples if num_loops == 0: return None, None for i in range(num_loops): start_sample = i * loop_duration_samples end_sample = start_sample + loop_duration_samples slice_data = y_mono[start_sample:end_sample] filename = os.path.join(loops_dir, f"{stem_name}_{bars}bar_loop_{i+1:03d}_{bpm}bpm.wav") sf.write(filename, slice_data, sr, subtype='PCM_16') output_files.append(filename) if progress_fn and num_loops > 1: progress_fn(0.4 + (i / (num_loops - 1) * 0.6), desc=f"Exporting loop {i+1}/{num_loops}...") if not output_files: return None, None return output_files, loops_dir # --------------------------- # Batch wrapper # --------------------------- async def slice_all_and_zip_real(vocals, drums, bass, other, loop_choice, sensitivity, bpm_over, quantize_on, grid_sel, progress=gr.Progress(track_tqdm=True)): # outputs: [status_log, download_zip_file] log_history = "Starting batch slice...\n" yield (log_history, gr.update(visible=False)) await asyncio.sleep(0.05) stems_to_process = {"vocals": vocals, "drums": drums, "bass": bass, "other": other} present = {k: v for k, v in stems_to_process.items() if v} if not present: raise gr.Error("No stems to process! Please separate stems first.") zip_path = "Loop_Architect_Pack.zip" all_temp_dirs = [] try: with zipfile.ZipFile(zip_path, 'w') as zf: processed, total = 0, len(present) for name, data in present.items(): log_history += f"--- Slicing {name} stem ---\n" yield (log_history, gr.update(visible=False)) def update_main_progress(p, desc=""): progress((processed + max(0.01, p)) / total, desc=f"{name}: {desc}") sliced_files, temp_dir = slice_stem_real( data, loop_choice, sensitivity, name, progress_fn=update_main_progress, bpm_override_val=int(bpm_over), quantize=bool(quantize_on), grid_label=grid_sel ) if temp_dir: all_temp_dirs.append(temp_dir) if sliced_files: log_history += f"Generated {len(sliced_files)} slices for {name}.\n" for loop_file in sliced_files: zf.write(loop_file, os.path.join(name, os.path.basename(loop_file))) else: log_history += f"No slices generated for {name}.\n" processed += 1 yield (log_history, gr.update(visible=False)) log_history += "Packaging complete! ✅ Pack ready for download.\n" yield (log_history, gr.update(value=zip_path, visible=True)) except Exception as e: err = f"❌ ERROR: {e}" print(f"Batch slice error: {e}") yield (log_history + err, gr.update(visible=False)) finally: for d in all_temp_dirs: if d and os.path.exists(d): shutil.rmtree(d) # --------------------------- # Gradio UI # --------------------------- with gr.Blocks(theme=gr.themes.Default(primary_hue="blue", secondary_hue="red")) as demo: gr.Markdown("# 🎵 Loop Architect — SALT PRØPHET Edition") with gr.Row(): with gr.Column(scale=1): gr.Markdown("### 1) Separate Stems") audio_input = gr.Audio(type="filepath", label="Upload a Track") stem_options = gr.Radio( ["4 Stems (Vocals, Drums, Bass, Other)", "2 Stems (Vocals + Instrumental)"], label="Separation Type", value="4 Stems (Vocals, Drums, Bass, Other)" ) submit_button = gr.Button("Separate Stems", variant="secondary") with gr.Accordion("2) Slicing Options", open=True): loop_options_radio = gr.Radio( ["One-Shots (All Transients)", "4 Bar Loops", "8 Bar Loops"], label="Slice Type", value="One-Shots (All Transients)" ) sensitivity_slider = gr.Slider( minimum=0.01, maximum=0.5, value=0.05, step=0.01, label="One-Shot Sensitivity", info="Lower values = more slices" ) bpm_override = gr.Slider( minimum=0, maximum=240, value=0, step=1, label="BPM Override (0 = auto-detect)" ) quantize_toggle = gr.Checkbox( label="Quantize One-Shots to Grid", value=True ) grid_select = gr.Dropdown( choices=["1/1 (Whole)", "1/2 (Half)", "1/4 (Quarter)", "1/8 (Eighth)", "1/16 (Sixteenth)"], value="1/16 (Sixteenth)", label="Grid Resolution" ) gr.Markdown("### 3) Create Pack") slice_all_button = gr.Button("Slice All Stems & Create Pack", variant="primary") download_zip_file = gr.File(label="Download Your Loop Pack", visible=False) gr.Markdown("### Status") status_log = gr.Textbox(label="Status Log", lines=12, interactive=False) with gr.Column(scale=2): with gr.Accordion("Separated Stems", open=True): with gr.Row(): vocals_output = gr.Audio(type="filepath", label="Vocals", scale=4) slice_vocals_btn = gr.Button("Slice Vocals", scale=1) with gr.Row(): drums_output = gr.Audio(type="filepath", label="Drums", scale=4) slice_drums_btn = gr.Button("Slice Drums", scale=1) with gr.Row(): bass_output = gr.Audio(type="filepath", label="Bass", scale=4) slice_bass_btn = gr.Button("Slice Bass", scale=1) with gr.Row(): other_output = gr.Audio(type="filepath", label="Other / Instrumental", scale=4) slice_other_btn = gr.Button("Slice Other", scale=1) gr.Markdown("### Sliced Files") loops_files = gr.Files(label="Generated Loops / One-Shots") # wire events submit_button.click( fn=separate_stems, inputs=[audio_input, stem_options], outputs=[vocals_output, drums_output, bass_output, other_output, status_log] ) stem_options.change( fn=update_output_visibility, inputs=stem_options, outputs=[vocals_output, drums_output, bass_output, other_output] ) def slice_and_display(stem_path, loop_choice, sensitivity, stem_name, bpm_over, quantize_on, grid_sel): log_history = f"Slicing {stem_name}...\n" def _p(p, desc=""): gr.Progress(track_tqdm=True)(p, desc=desc) files, temp_dir = slice_stem_real( stem_path, loop_choice, sensitivity, stem_name, progress_fn=_p, bpm_override_val=int(bpm_over), quantize=bool(quantize_on), grid_label=grid_sel ) if temp_dir and os.path.exists(temp_dir): shutil.rmtree(temp_dir) if files: log_history += f"✅ Sliced {stem_name} into {len(files)} pieces." return gr.update(value=files), log_history else: log_history += f"⚠️ No slices generated for {stem_name}." return gr.update(value=None), log_history slice_vocals_btn.click( fn=slice_and_display, inputs=[vocals_output, loop_options_radio, sensitivity_slider, gr.Textbox("vocals", visible=False), bpm_override, quantize_toggle, grid_select], outputs=[loops_files, status_log] ) slice_drums_btn.click( fn=slice_and_display, inputs=[drums_output, loop_options_radio, sensitivity_slider, gr.Textbox("drums", visible=False), bpm_override, quantize_toggle, grid_select], outputs=[loops_files, status_log] ) slice_bass_btn.click( fn=slice_and_display, inputs=[bass_output, loop_options_radio, sensitivity_slider, gr.Textbox("bass", visible=False), bpm_override, quantize_toggle, grid_select], outputs=[loops_files, status_log] ) slice_other_btn.click( fn=slice_and_display, inputs=[other_output, loop_options_radio, sensitivity_slider, gr.Textbox("other", visible=False), bpm_override, quantize_toggle, grid_select], outputs=[loops_files, status_log] ) slice_all_button.click( fn=slice_all_and_zip_real, inputs=[vocals_output, drums_output, bass_output, other_output, loop_options_radio, sensitivity_slider, bpm_override, quantize_toggle, grid_select], outputs=[status_log, download_zip_file] ) # Spaces calls launch() automatically demo.queue(concurrency_count=1, max_size=8).launch()