Loop-Architect / app.py
SaltProphet's picture
Update app.py
7272862 verified
raw
history blame
19.8 kB
import gradio as gr
import os
import shutil
import asyncio
import librosa
import librosa.display
import soundfile as sf
import numpy as np
import time
import zipfile
import tempfile
import matplotlib.pyplot as plt
import matplotlib
# Use a non-interactive backend for Matplotlib
matplotlib.use('Agg')
def update_output_visibility(choice):
"""Updates the visibility of stem outputs based on user's choice."""
if "2 Stems" in choice:
return {
vocals_output: gr.update(visible=True),
drums_output: gr.update(visible=False),
bass_output: gr.update(visible=False),
other_output: gr.update(visible=True, label="Instrumental (No Vocals)")
}
elif "4 Stems" in choice:
return {
vocals_output: gr.update(visible=True),
drums_output: gr.update(visible=True),
bass_output: gr.update(visible=True),
other_output: gr.update(visible=True, label="Other")
}
async def separate_stems(audio_file_path, stem_choice):
"""
Separates audio into stems using Demucs.
This is an async generator function that yields updates to the UI.
"""
if audio_file_path is None:
raise gr.Error("No audio file uploaded!")
log_history = "Starting separation...\n"
yield {
status_log: log_history,
progress_bar: gr.Progress(0, desc="Starting...", visible=True)
}
try:
log_history += "Preparing audio file...\n"
yield {
status_log: log_history,
progress_bar: gr.Progress(0.05, desc="Preparing...")
}
original_filename_base = os.path.basename(audio_file_path).rsplit('.', 1)[0]
# Create a stable path to avoid issues with temp files
stable_input_path = f"stable_input_{original_filename_base}.wav"
shutil.copy(audio_file_path, stable_input_path)
model_arg = "--two-stems=vocals" if "2 Stems" in stem_choice else ""
output_dir = "separated"
if os.path.exists(output_dir):
shutil.rmtree(output_dir)
command = f"python3 -m demucs {model_arg} -o \"{output_dir}\" \"{stable_input_path}\""
log_history += f"Running Demucs command: {command}\n(This may take a minute or more depending on track length)\n"
yield {
status_log: log_history,
progress_bar: gr.Progress(0.2, desc="Running Demucs...")
}
process = await asyncio.create_subprocess_shell(
command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
stdout, stderr = await process.communicate()
if process.returncode != 0:
print(f"Demucs Error: {stderr.decode()}")
raise gr.Error(f"Demucs failed. Is it installed? Error: {stderr.decode()[:500]}")
log_history += "Demucs finished. Locating stem files...\n"
yield {
status_log: log_history,
progress_bar: gr.Progress(0.8, desc="Locating stems...")
}
stable_filename_base = os.path.basename(stable_input_path).rsplit('.', 1)[0]
subfolders = [f.name for f in os.scandir(output_dir) if f.is_dir()]
if not subfolders:
raise gr.Error("Demucs output folder structure not found!")
# Assume the first subfolder is the one Demucs created
model_folder_name = subfolders[0]
stems_path = os.path.join(output_dir, model_folder_name, stable_filename_base)
if not os.path.exists(stems_path):
# Fallback: sometimes demucs doesn't create the innermost folder
stems_path = os.path.join(output_dir, model_folder_name)
if not (os.path.exists(os.path.join(stems_path, "vocals.wav")) or os.path.exists(os.path.join(stems_path, "no_vocals.wav"))):
raise gr.Error(f"Demucs output directory was not found! Looked for: {stems_path}")
vocals_path = os.path.join(stems_path, "vocals.wav") if os.path.exists(os.path.join(stems_path, "vocals.wav")) else None
drums_path = os.path.join(stems_path, "drums.wav") if os.path.exists(os.path.join(stems_path, "drums.wav")) else None
bass_path = os.path.join(stems_path, "bass.wav") if os.path.exists(os.path.join(stems_path, "bass.wav")) else None
other_filename = "no_vocals.wav" if "2 Stems" in stem_choice else "other.wav"
other_path = os.path.join(stems_path, other_filename) if os.path.exists(os.path.join(stems_path, other_filename)) else None
# Clean up the copied input file
os.remove(stable_input_path)
log_history += "✅ Stem separation complete!\n"
yield {
status_log: log_history,
progress_bar: gr.Progress(1, desc="Complete!", visible=False), # Hide progress bar when done
vocals_output: gr.update(value=vocals_path),
drums_output: gr.update(value=drums_path),
bass_output: gr.update(value=bass_path),
other_output: gr.update(value=other_path)
}
except Exception as e:
print(f"An error occurred during separation: {e}")
# Clean up input file on error too
if 'stable_input_path' in locals() and os.path.exists(stable_input_path):
os.remove(stable_input_path)
yield {
status_log: log_history + f"❌ ERROR: {e}",
progress_bar: gr.update(visible=False) # Hide progress bar on error
}
def slice_stem_real(stem_audio_data, loop_choice, sensitivity, stem_name, progress_fn=None):
"""
The core logic for slicing a single stem.
Accepts an optional progress_fn callback for use in loops.
"""
if stem_audio_data is None:
return None, None
sample_rate, y_int = stem_audio_data
# Convert from int16 to float
y = librosa.util.buf_to_float(y_int, dtype=np.float32)
# Ensure y is 1D or 2D
if y.ndim == 0:
print(f"Warning: Empty audio data for {stem_name}.")
return None, None
y_mono = librosa.to_mono(y.T) if y.ndim > 1 else y
if progress_fn: progress_fn(0.1, desc="Detecting BPM...")
tempo, beats = librosa.beat.beat_track(y=y_mono, sr=sample_rate)
bpm = 120 if tempo is None or tempo == 0 else np.round(tempo)
bpm_int = int(bpm.item())
if bpm_int == 0:
bpm_int = 120 # Final fallback
print("BPM detection failed, defaulting to 120 BPM.")
print(f"Detected BPM for {stem_name}: {bpm_int}")
output_files = []
loops_dir = tempfile.mkdtemp()
if "One-Shots" in loop_choice:
if progress_fn: progress_fn(0.3, desc="Finding transients...")
# Adjust onset detection parameters
onset_frames = librosa.onset.onset_detect(
y=y_mono, sr=sample_rate, delta=sensitivity,
wait=1, pre_avg=1, post_avg=1, post_max=1, units='frames'
)
onset_samples = librosa.frames_to_samples(onset_frames)
if progress_fn: progress_fn(0.5, desc="Slicing one-shots...")
if len(onset_samples) > 0:
num_onsets = len(onset_samples)
for i, start_sample in enumerate(onset_samples):
end_sample = onset_samples[i+1] if i+1 < num_onsets else len(y)
# Use original stereo/mono data for the slice
slice_data = y[start_sample:end_sample]
filename = os.path.join(loops_dir, f"{stem_name}_one_shot_{i+1:03d}.wav")
sf.write(filename, slice_data, sample_rate, subtype='PCM_16')
output_files.append(filename)
if progress_fn and num_onsets > 1 and i > 0:
progress = 0.5 + (i / (num_onsets - 1) * 0.5)
progress_fn(progress, desc=f"Exporting slice {i+1}/{num_onsets}...")
else:
# Loop slicing logic
bars = int(loop_choice.split(" ")[0])
seconds_per_beat = 60.0 / bpm_int
seconds_per_bar = seconds_per_beat * 4 # Assuming 4/4 time
loop_duration_seconds = seconds_per_bar * bars
loop_duration_samples = int(loop_duration_seconds * sample_rate)
if loop_duration_samples == 0:
print(f"Loop duration is 0 for {stem_name}. BPM: {bpm_int}")
return None, None
if progress_fn: progress_fn(0.4, desc=f"Slicing into {bars}-bar loops...")
num_loops = len(y) // loop_duration_samples
if num_loops == 0:
print(f"Audio for {stem_name} is too short for {bars}-bar loops at {bpm_int} BPM.")
return None, None
for i in range(num_loops):
start_sample = i * loop_duration_samples
end_sample = start_sample + loop_duration_samples
# Use original stereo/mono data for the slice
slice_data = y[start_sample:end_sample]
filename = os.path.join(loops_dir, f"{stem_name}_{bars}bar_loop_{i+1:03d}_{bpm_int}bpm.wav")
sf.write(filename, slice_data, sample_rate, subtype='PCM_16')
output_files.append(filename)
if progress_fn and num_loops > 1 and i > 0:
progress = 0.4 + (i / (num_loops - 1) * 0.6)
progress_fn(progress, desc=f"Exporting loop {i+1}/{num_loops}...")
if not output_files:
return None, None
return output_files, loops_dir
async def slice_all_and_zip_real(vocals, drums, bass, other, loop_choice, sensitivity):
"""
Slices all available stems and packages them into a ZIP file.
This is an async generator function that yields updates to the UI.
"""
log_history = "Starting batch slice...\n"
yield {
status_log: log_history,
progress_bar: gr.Progress(0, desc="Starting...", visible=True)
}
await asyncio.sleep(0.1) # Give UI time to update
stems_to_process = {"vocals": vocals, "drums": drums, "bass": bass, "other": other}
zip_path = "Loop_Architect_Pack.zip"
num_stems = sum(1 for data in stems_to_process.values() if data is not None)
if num_stems == 0:
raise gr.Error("No stems to process! Please separate stems first.")
all_temp_dirs = []
try:
with zipfile.ZipFile(zip_path, 'w') as zf:
processed_count = 0
for name, data in stems_to_process.items():
if data is not None:
log_history += f"--- Slicing {name} stem ---\n"
yield { status_log: log_history }
# This progress callback updates the main progress bar
def update_main_progress(p, desc=""):
overall_progress = (processed_count + p) / num_stems
# We yield from the outer function's scope
yield {
progress_bar: gr.Progress(overall_progress, desc=f"Slicing {name}: {desc}", visible=True)
}
# We need a way to pass a generator-based progress fn
# to a normal function. We can't.
# Instead, we'll just update the progress bar *within* this loop
# and pass a simple lambda that does nothing to slice_stem_real.
# A more complex refactor would make slice_stem_real a generator.
yield { progress_bar: gr.Progress(processed_count / num_stems, desc=f"Slicing {name}...", visible=True) }
# This is a local callback just for slice_stem_real
def simple_progress_callback(p, desc=""):
# This doesn't update the UI, it's just a placeholder
# We update the UI *around* this call.
print(f"Slice Progress {name}: {p} - {desc}") # Log to console
sliced_files, temp_dir = slice_stem_real(
(data[0], data[1]), loop_choice, sensitivity, name,
progress_fn=simple_progress_callback
)
if sliced_files:
log_history += f"Generated {len(sliced_files)} slices for {name}.\n"
all_temp_dirs.append(temp_dir)
for loop_file in sliced_files:
arcname = os.path.join(name, os.path.basename(loop_file))
zf.write(loop_file, arcname)
else:
log_history += f"No slices generated for {name}.\n"
processed_count += 1
yield {
status_log: log_history,
progress_bar: gr.Progress(processed_count / num_stems, desc=f"Finished {name}", visible=True)
}
log_history += "Packaging complete!\n"
yield {
status_log: log_history + "✅ Pack ready for download!",
progress_bar: gr.Progress(1, desc="Pack Ready!", visible=False),
download_zip_file: gr.update(value=zip_path, visible=True)
}
except Exception as e:
print(f"An error occurred during slice all: {e}")
yield {
status_log: log_history + f"❌ ERROR: {e}",
progress_bar: gr.update(visible=False)
}
finally:
# Clean up all temporary directories
for d in all_temp_dirs:
if d and os.path.exists(d):
shutil.rmtree(d)
# --- Create the full Gradio Interface ---
with gr.Blocks(theme=gr.themes.Default(primary_hue="blue", secondary_hue="red")) as demo:
gr.Markdown("# 🎵 Loop Architect")
gr.Markdown("Upload any song to separate it into stems (vocals, drums, etc.) and then slice those stems into loops or one-shot samples.")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### 1. Separate Stems")
audio_input = gr.Audio(type="filepath", label="Upload a Track")
stem_options = gr.Radio(
["4 Stems (Vocals, Drums, Bass, Other)", "2 Stems (Vocals + Instrumental)"],
label="Separation Type",
value="4 Stems (Vocals, Drums, Bass, Other)"
)
submit_button = gr.Button("Separate Stems", variant="primary")
gr.Markdown("### 2. Slicing Options")
with gr.Group():
loop_options_radio = gr.Radio(
["One-Shots (All Transients)", "4 Bar Loops", "8 Bar Loops"],
label="Slice Type",
value="One-Shots (All Transients)"
)
sensitivity_slider = gr.Slider(
minimum=0.01, maximum=0.5, value=0.05, step=0.01,
label="One-Shot Sensitivity",
info="Lower values = more slices"
)
gr.Markdown("### 3. Create Pack")
slice_all_button = gr.Button("Slice All Stems & Create Pack")
download_zip_file = gr.File(label="Download Your Loop Pack", visible=False)
gr.Markdown("### Status")
# Initialize without any arguments
progress_bar = gr.Progress()
status_log = gr.Textbox(label="Status Log", lines=10, interactive=False)
with gr.Column(scale=2):
with gr.Accordion("Separated Stems (Preview & Slice)", open=True):
with gr.Row():
vocals_output = gr.Audio(label="Vocals", scale=4)
slice_vocals_btn = gr.Button("Slice Vocals", scale=1)
with gr.Row():
drums_output = gr.Audio(label="Drums", scale=4)
slice_drums_btn = gr.Button("Slice Drums", scale=1)
with gr.Row():
bass_output = gr.Audio(label="Bass", scale=4)
slice_bass_btn = gr.Button("Slice Bass", scale=1)
with gr.Row():
other_output = gr.Audio(label="Other / Instrumental", scale=4)
slice_other_btn = gr.Button("Slice Other", scale=1)
gr.Markdown("### Sliced Loops / Samples (Preview)")
loop_gallery = gr.Gallery(
label="Generated Loops Preview",
columns=8, object_fit="contain", height="auto", preview=True
)
# --- Define Event Listeners ---
def slice_and_display(stem_data, loop_choice, sensitivity, stem_name):
"""
Generator function to slice a single stem and update UI.
"""
if stem_data is None:
yield {
status_log: f"No {stem_name} audio data to slice.",
progress_bar: gr.update(visible=False)
}
return
log_history = f"Slicing {stem_name}...\n"
# Make progress bar visible when starting
yield {
status_log: log_history,
progress_bar: gr.Progress(0, visible=True, desc=f"Slicing {stem_name}...")
}
# Define a callback for slice_stem_real to update this generator's progress
def update_single_progress(p, desc=""):
# This is a bit of a hack. We can't yield from here.
# We will just rely on the yield before/after this call.
# A full fix would make slice_stem_real a generator.
print(f"Slice Progress {stem_name}: {p} - {desc}") # Log to console
files, temp_dir = slice_stem_real(
stem_data, loop_choice, sensitivity, stem_name,
progress_fn=update_single_progress # Pass the callback
)
if temp_dir and os.path.exists(temp_dir):
shutil.rmtree(temp_dir) # Clean up temp dir
yield {
loop_gallery: gr.update(value=files),
status_log: log_history + f"✅ Sliced {stem_name} into {len(files) if files else 0} pieces.",
progress_bar: gr.update(visible=False) # Hide progress bar when done
}
# --- MAIN EVENT LISTENERS (FIXED) ---
# This event separates the stems
submit_event = submit_button.click(
fn=separate_stems,
inputs=[audio_input, stem_options],
# Add progress_bar to the outputs list
outputs=[
vocals_output, drums_output, bass_output, other_output,
status_log # Removed progress_bar from outputs
]
)
# This event updates the UI based on the stem choice
stem_options.change(
fn=update_output_visibility,
inputs=stem_options,
outputs=[vocals_output, drums_output, bass_output, other_output]
)
# --- Single Slice Button Events ---
slice_vocals_btn.click(fn=slice_and_display, inputs=[vocals_output, loop_options_radio, sensitivity_slider, gr.Textbox("vocals", visible=False)], outputs=[loop_gallery, status_log]) # Removed progress_bar
slice_drums_btn.click(fn=slice_and_display, inputs=[drums_output, loop_options_radio, sensitivity_slider, gr.Textbox("drums", visible=False)], outputs=[loop_gallery, status_log]) # Removed progress_bar
slice_bass_btn.click(fn=slice_and_display, inputs=[bass_output, loop_options_radio, sensitivity_slider, gr.Textbox("bass", visible=False)], outputs=[loop_gallery, status_log]) # Removed progress_bar
slice_other_btn.click(fn=slice_and_display, inputs=[other_output, loop_options_radio, sensitivity_slider, gr.Textbox("other", visible=False)], outputs=[loop_gallery, status_log]) # Removed progress_bar
# This event slices all stems and zips them
slice_all_event = slice_all_button.click(
fn=slice_all_and_zip_real,
inputs=[vocals_output, drums_output, bass_output, other_output, loop_options_radio, sensitivity_slider],
# Add progress_bar and fix the duplicate output
outputs=[download_zip_file, status_log] # Removed progress_bar
)
if __name__ == "__main__":
demo.launch(debug=True)