Loop-Architect / app.py
SaltProphet's picture
Update app.py
1e72884 verified
raw
history blame
17.3 kB
import gradio as gr
import os
import shutil
import asyncio
import librosa
import soundfile as sf
import numpy as np
import zipfile
import tempfile
import matplotlib
matplotlib.use('Agg')
# ---------------------------
# UI helpers
# ---------------------------
def update_output_visibility(choice):
if "2 Stems" in choice:
return (
gr.update(visible=True),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=True, label="Instrumental (No Vocals)")
)
else: # 4 stems
return (
gr.update(visible=True),
gr.update(visible=True),
gr.update(visible=True),
gr.update(visible=True, label="Other")
)
# ---------------------------
# Stem separation (Demucs)
# ---------------------------
async def separate_stems(audio_file_path, stem_choice, progress=gr.Progress(track_tqdm=True)):
# outputs: [vocals_out, drums_out, bass_out, other_out, status_log]
if audio_file_path is None:
raise gr.Error("No audio file uploaded!")
log_history = "Starting separation...\n"
yield (gr.update(), gr.update(), gr.update(), gr.update(), log_history)
try:
progress(0.05, desc="Preparing audio file...")
log_history += "Preparing audio file...\n"
yield (gr.update(), gr.update(), gr.update(), gr.update(), log_history)
original_filename_base = os.path.basename(audio_file_path).rsplit('.', 1)[0]
stable_input_path = f"stable_input_{original_filename_base}.wav"
# Ensure a .wav input for Demucs (demucs can read many formats, but filepath stability helps)
shutil.copy(audio_file_path, stable_input_path)
model_arg = "--two-stems=vocals" if "2 Stems" in stem_choice else ""
output_dir = "separated"
if os.path.exists(output_dir):
shutil.rmtree(output_dir)
command = f"python -m demucs {model_arg} -o \"{output_dir}\" \"{stable_input_path}\""
log_history += f"Running Demucs command: {command}\n"
log_history += "(This may take a minute or more depending on track length)\n"
yield (gr.update(), gr.update(), gr.update(), gr.update(), log_history)
progress(0.2, desc="Running Demucs...")
process = await asyncio.create_subprocess_shell(
command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
raise gr.Error(f"Demucs failed. Error: {stderr.decode(errors='ignore')[:800]}")
log_history += "Demucs finished. Locating stem files...\n"
progress(0.8, desc="Locating stems...")
yield (gr.update(), gr.update(), gr.update(), gr.update(), log_history)
stable_filename_base = os.path.basename(stable_input_path).rsplit('.', 1)[0]
subfolders = [f.name for f in os.scandir(output_dir) if f.is_dir()]
if not subfolders:
raise gr.Error("Demucs output folder structure not found!")
model_folder_name = subfolders[0]
stems_path = os.path.join(output_dir, model_folder_name, stable_filename_base)
if not os.path.exists(stems_path):
raise gr.Error(f"Demucs output directory was not found! Looked for: {stems_path}")
vocals_path = os.path.join(stems_path, "vocals.wav") if os.path.exists(os.path.join(stems_path, "vocals.wav")) else None
drums_path = os.path.join(stems_path, "drums.wav") if os.path.exists(os.path.join(stems_path, "drums.wav")) else None
bass_path = os.path.join(stems_path, "bass.wav") if os.path.exists(os.path.join(stems_path, "bass.wav")) else None
other_name = "no_vocals.wav" if "2 Stems" in stem_choice else "other.wav"
other_path = os.path.join(stems_path, other_name) if os.path.exists(os.path.join(stems_path, other_name)) else None
try:
os.remove(stable_input_path)
except Exception:
pass
log_history += "✅ Stem separation complete!\n"
yield (
gr.update(value=vocals_path),
gr.update(value=drums_path),
gr.update(value=bass_path),
gr.update(value=other_path),
log_history
)
except Exception as e:
err = f"❌ ERROR: {e}"
print(f"Separation error: {e}")
yield (gr.update(), gr.update(), gr.update(), gr.update(), log_history + err)
# ---------------------------
# Slicing (BPM override + Quantized grid)
# ---------------------------
def _grid_beats_per_step(grid_label: str) -> float:
# "1/16 (Sixteenth)" -> "1/16"
tok = grid_label.split(" ")[0].strip()
mapping = {"1/1": 4.0, "1/2": 2.0, "1/4": 1.0, "1/8": 0.5, "1/16": 0.25}
return mapping.get(tok, 0.25)
def _load_audio_any(stem_input):
"""
Accepts a filepath string OR (sr, numpy_array) and returns (sr, mono_float_array).
We use filepath everywhere in this app to keep memory stable.
"""
if stem_input is None:
return None, None
if isinstance(stem_input, str):
y, sr = librosa.load(stem_input, sr=None, mono=False)
else:
sr, y = stem_input
y = librosa.util.buf_to_float(y)
y_mono = librosa.to_mono(y) if y.ndim > 1 else y
return sr, y_mono
def slice_stem_real(
stem_input,
loop_choice,
sensitivity,
stem_name,
progress_fn=None,
bpm_override_val: int = 0,
quantize: bool = True,
grid_label: str = "1/16 (Sixteenth)",
):
sr, y_mono = _load_audio_any(stem_input)
if sr is None or y_mono is None:
return None, None
if progress_fn: progress_fn(0.1, desc="Detecting BPM...")
tempo, _ = librosa.beat.beat_track(y=y_mono, sr=sr)
det_bpm = int(np.round(tempo)) if tempo and tempo > 0 else 120
bpm = int(bpm_override_val) if bpm_override_val and bpm_override_val > 0 else det_bpm
if bpm <= 0:
bpm = 120
output_files = []
loops_dir = tempfile.mkdtemp()
if "One-Shots" in loop_choice:
if progress_fn: progress_fn(0.3, desc="Finding transients...")
onset_frames = librosa.onset.onset_detect(
y=y_mono, sr=sr, delta=sensitivity, wait=1, pre_avg=1, post_avg=1, post_max=1
)
onset_samples = librosa.frames_to_samples(onset_frames)
# Quantize onsets to musical grid if enabled
if quantize and len(onset_samples) > 0:
beats_per_step = _grid_beats_per_step(grid_label)
samples_per_beat = sr * (60.0 / bpm)
step = max(1, int(round(samples_per_beat * beats_per_step)))
q = np.clip(np.round(onset_samples / step) * step, 0, len(y_mono) - 1).astype(int)
q = np.unique(q) # dedupe after snapping
onset_samples = q
if progress_fn: progress_fn(0.5, desc="Slicing one-shots...")
if len(onset_samples) > 0:
num_onsets = len(onset_samples)
min_len = max(1, int(0.02 * sr)) # 20ms min slice guard
for i, start_sample in enumerate(onset_samples):
end_sample = onset_samples[i+1] if i+1 < num_onsets else len(y_mono)
if end_sample - start_sample < min_len:
continue
slice_data = y_mono[start_sample:end_sample]
filename = os.path.join(loops_dir, f"{stem_name}_one_shot_{i+1:03d}.wav")
sf.write(filename, slice_data, sr, subtype='PCM_16')
output_files.append(filename)
if progress_fn and num_onsets > 1:
progress_fn(0.5 + (i / (num_onsets - 1) * 0.5), desc=f"Exporting slice {i+1}/{num_onsets}...")
else:
# Grid-true by construction; honor BPM override
bars = int(loop_choice.split(" ")[0])
seconds_per_beat = 60.0 / bpm
seconds_per_bar = seconds_per_beat * 4
loop_duration_seconds = seconds_per_bar * bars
loop_duration_samples = int(loop_duration_seconds * sr)
if progress_fn: progress_fn(0.4, desc=f"Slicing into {bars}-bar loops @ {bpm} BPM...")
num_loops = len(y_mono) // loop_duration_samples
if num_loops == 0:
return None, None
for i in range(num_loops):
start_sample = i * loop_duration_samples
end_sample = start_sample + loop_duration_samples
slice_data = y_mono[start_sample:end_sample]
filename = os.path.join(loops_dir, f"{stem_name}_{bars}bar_loop_{i+1:03d}_{bpm}bpm.wav")
sf.write(filename, slice_data, sr, subtype='PCM_16')
output_files.append(filename)
if progress_fn and num_loops > 1:
progress_fn(0.4 + (i / (num_loops - 1) * 0.6), desc=f"Exporting loop {i+1}/{num_loops}...")
if not output_files:
return None, None
return output_files, loops_dir
# ---------------------------
# Batch wrapper
# ---------------------------
async def slice_all_and_zip_real(vocals, drums, bass, other, loop_choice, sensitivity,
bpm_over, quantize_on, grid_sel,
progress=gr.Progress(track_tqdm=True)):
# outputs: [status_log, download_zip_file]
log_history = "Starting batch slice...\n"
yield (log_history, gr.update(visible=False))
await asyncio.sleep(0.05)
stems_to_process = {"vocals": vocals, "drums": drums, "bass": bass, "other": other}
present = {k: v for k, v in stems_to_process.items() if v}
if not present:
raise gr.Error("No stems to process! Please separate stems first.")
zip_path = "Loop_Architect_Pack.zip"
all_temp_dirs = []
try:
with zipfile.ZipFile(zip_path, 'w') as zf:
processed, total = 0, len(present)
for name, data in present.items():
log_history += f"--- Slicing {name} stem ---\n"
yield (log_history, gr.update(visible=False))
def update_main_progress(p, desc=""):
progress((processed + max(0.01, p)) / total, desc=f"{name}: {desc}")
sliced_files, temp_dir = slice_stem_real(
data, loop_choice, sensitivity, name,
progress_fn=update_main_progress,
bpm_override_val=int(bpm_over),
quantize=bool(quantize_on),
grid_label=grid_sel
)
if temp_dir:
all_temp_dirs.append(temp_dir)
if sliced_files:
log_history += f"Generated {len(sliced_files)} slices for {name}.\n"
for loop_file in sliced_files:
zf.write(loop_file, os.path.join(name, os.path.basename(loop_file)))
else:
log_history += f"No slices generated for {name}.\n"
processed += 1
yield (log_history, gr.update(visible=False))
log_history += "Packaging complete! ✅ Pack ready for download.\n"
yield (log_history, gr.update(value=zip_path, visible=True))
except Exception as e:
err = f"❌ ERROR: {e}"
print(f"Batch slice error: {e}")
yield (log_history + err, gr.update(visible=False))
finally:
for d in all_temp_dirs:
if d and os.path.exists(d):
shutil.rmtree(d)
# ---------------------------
# Gradio UI
# ---------------------------
with gr.Blocks(theme=gr.themes.Default(primary_hue="blue", secondary_hue="red")) as demo:
gr.Markdown("# 🎵 Loop Architect — SALT PRØPHET Edition")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### 1) Separate Stems")
audio_input = gr.Audio(type="filepath", label="Upload a Track")
stem_options = gr.Radio(
["4 Stems (Vocals, Drums, Bass, Other)", "2 Stems (Vocals + Instrumental)"],
label="Separation Type",
value="4 Stems (Vocals, Drums, Bass, Other)"
)
submit_button = gr.Button("Separate Stems", variant="secondary")
with gr.Accordion("2) Slicing Options", open=True):
loop_options_radio = gr.Radio(
["One-Shots (All Transients)", "4 Bar Loops", "8 Bar Loops"],
label="Slice Type",
value="One-Shots (All Transients)"
)
sensitivity_slider = gr.Slider(
minimum=0.01, maximum=0.5, value=0.05, step=0.01,
label="One-Shot Sensitivity", info="Lower values = more slices"
)
bpm_override = gr.Slider(
minimum=0, maximum=240, value=0, step=1,
label="BPM Override (0 = auto-detect)"
)
quantize_toggle = gr.Checkbox(
label="Quantize One-Shots to Grid", value=True
)
grid_select = gr.Dropdown(
choices=["1/1 (Whole)", "1/2 (Half)", "1/4 (Quarter)", "1/8 (Eighth)", "1/16 (Sixteenth)"],
value="1/16 (Sixteenth)",
label="Grid Resolution"
)
gr.Markdown("### 3) Create Pack")
slice_all_button = gr.Button("Slice All Stems & Create Pack", variant="primary")
download_zip_file = gr.File(label="Download Your Loop Pack", visible=False)
gr.Markdown("### Status")
status_log = gr.Textbox(label="Status Log", lines=12, interactive=False)
with gr.Column(scale=2):
with gr.Accordion("Separated Stems", open=True):
with gr.Row():
vocals_output = gr.Audio(type="filepath", label="Vocals", scale=4)
slice_vocals_btn = gr.Button("Slice Vocals", scale=1)
with gr.Row():
drums_output = gr.Audio(type="filepath", label="Drums", scale=4)
slice_drums_btn = gr.Button("Slice Drums", scale=1)
with gr.Row():
bass_output = gr.Audio(type="filepath", label="Bass", scale=4)
slice_bass_btn = gr.Button("Slice Bass", scale=1)
with gr.Row():
other_output = gr.Audio(type="filepath", label="Other / Instrumental", scale=4)
slice_other_btn = gr.Button("Slice Other", scale=1)
gr.Markdown("### Sliced Files")
loops_files = gr.Files(label="Generated Loops / One-Shots")
# wire events
submit_button.click(
fn=separate_stems,
inputs=[audio_input, stem_options],
outputs=[vocals_output, drums_output, bass_output, other_output, status_log]
)
stem_options.change(
fn=update_output_visibility,
inputs=stem_options,
outputs=[vocals_output, drums_output, bass_output, other_output]
)
def slice_and_display(stem_path, loop_choice, sensitivity, stem_name, bpm_over, quantize_on, grid_sel):
log_history = f"Slicing {stem_name}...\n"
def _p(p, desc=""):
gr.Progress(track_tqdm=True)(p, desc=desc)
files, temp_dir = slice_stem_real(
stem_path, loop_choice, sensitivity, stem_name,
progress_fn=_p,
bpm_override_val=int(bpm_over),
quantize=bool(quantize_on),
grid_label=grid_sel
)
if temp_dir and os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
if files:
log_history += f"✅ Sliced {stem_name} into {len(files)} pieces."
return gr.update(value=files), log_history
else:
log_history += f"⚠️ No slices generated for {stem_name}."
return gr.update(value=None), log_history
slice_vocals_btn.click(
fn=slice_and_display,
inputs=[vocals_output, loop_options_radio, sensitivity_slider, gr.Textbox("vocals", visible=False),
bpm_override, quantize_toggle, grid_select],
outputs=[loops_files, status_log]
)
slice_drums_btn.click(
fn=slice_and_display,
inputs=[drums_output, loop_options_radio, sensitivity_slider, gr.Textbox("drums", visible=False),
bpm_override, quantize_toggle, grid_select],
outputs=[loops_files, status_log]
)
slice_bass_btn.click(
fn=slice_and_display,
inputs=[bass_output, loop_options_radio, sensitivity_slider, gr.Textbox("bass", visible=False),
bpm_override, quantize_toggle, grid_select],
outputs=[loops_files, status_log]
)
slice_other_btn.click(
fn=slice_and_display,
inputs=[other_output, loop_options_radio, sensitivity_slider, gr.Textbox("other", visible=False),
bpm_override, quantize_toggle, grid_select],
outputs=[loops_files, status_log]
)
slice_all_button.click(
fn=slice_all_and_zip_real,
inputs=[vocals_output, drums_output, bass_output, other_output,
loop_options_radio, sensitivity_slider,
bpm_override, quantize_toggle, grid_select],
outputs=[status_log, download_zip_file]
)
# Spaces calls launch() automatically
demo.queue(concurrency_count=1, max_size=8).launch()