Loop-Architect / app.py
SaltProphet's picture
Update app.py
0bcb76d verified
raw
history blame
15.5 kB
# Import libraries
import gradio as gr
import os
import shutil
import asyncio
import librosa
import librosa.display
import soundfile as sf
import numpy as np
import time
import zipfile
import tempfile
import matplotlib.pyplot as plt
import matplotlib
matplotlib.use('Agg') # Use a non-interactive backend for plotting
# --- Helper/Processing Functions ---
def update_output_visibility(choice):
# Shows/hides stem players based on 2 or 4 stem selection
if "2 Stems" in choice:
return {
vocals_output: gr.update(visible=True),
drums_output: gr.update(visible=False),
bass_output: gr.update(visible=False),
other_output: gr.update(visible=True, label="Instrumental (No Vocals)")
}
elif "4 Stems" in choice:
return {
vocals_output: gr.update(visible=True),
drums_output: gr.update(visible=True),
bass_output: gr.update(visible=True),
other_output: gr.update(visible=True, label="Other")
}
async def separate_stems(audio_file_path, stem_choice, progress=gr.Progress(track_tqdm=True)):
# Separates the uploaded audio using Demucs
if audio_file_path is None: raise gr.Error("No audio file uploaded!")
log_history = "Starting separation...\n"
# Initial yield to update UI immediately
yield { status_log: log_history, progress_bar: progress(0, desc="Starting...", visible=True) }
try:
progress(0.05, desc="Preparing audio file...")
log_history += "Preparing audio file...\n"; yield { status_log: log_history, progress_bar: progress(0.05, desc="Preparing...") }
original_filename_base = os.path.basename(audio_file_path).rsplit('.', 1)[0]
stable_input_path = f"stable_input_{original_filename_base}.wav"
shutil.copy(audio_file_path, stable_input_path)
model_arg = "--two-stems=vocals" if "2 Stems" in stem_choice else ""
output_dir = "separated"
if os.path.exists(output_dir): shutil.rmtree(output_dir)
command = f"python3 -m demucs {model_arg} -o \"{output_dir}\" \"{stable_input_path}\""
log_history += f"Running Demucs command: {command}\n(This may take a minute or more depending on track length)\n";
yield { status_log: log_history, progress_bar: progress(0.2, desc="Running Demucs...") }
process = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
stdout, stderr = await process.communicate()
if process.returncode != 0:
raise gr.Error(f"Demucs failed. Error: {stderr.decode()[:500]}")
log_history += "Demucs finished. Locating stem files...\n"; yield { status_log: log_history, progress_bar: progress(0.8, desc="Locating stems...") }
stable_filename_base = os.path.basename(stable_input_path).rsplit('.', 1)[0]
# Find the model-specific subfolder created by Demucs
subfolders = [f.name for f in os.scandir(output_dir) if f.is_dir()]
if not subfolders: raise gr.Error("Demucs output folder structure not found!")
model_folder_name = subfolders[0]
stems_path = os.path.join(output_dir, model_folder_name, stable_filename_base)
if not os.path.exists(stems_path):
raise gr.Error(f"Demucs output directory was not found! Looked for: {stems_path}")
vocals_path = os.path.join(stems_path, "vocals.wav") if os.path.exists(os.path.join(stems_path, "vocals.wav")) else None
drums_path = os.path.join(stems_path, "drums.wav") if os.path.exists(os.path.join(stems_path, "drums.wav")) else None
bass_path = os.path.join(stems_path, "bass.wav") if os.path.exists(os.path.join(stems_path, "bass.wav")) else None
other_filename = "no_vocals.wav" if "2 Stems" in stem_choice else "other.wav"
other_path = os.path.join(stems_path, other_filename) if os.path.exists(os.path.join(stems_path, other_filename)) else None
os.remove(stable_input_path)
log_history += "✅ Stem separation complete!\n";
yield {
status_log: log_history,
progress_bar: progress(1, desc="Complete!", visible=False), # Hide progress bar when done
vocals_output: gr.update(value=vocals_path),
drums_output: gr.update(value=drums_path),
bass_output: gr.update(value=bass_path),
other_output: gr.update(value=other_path)
}
except Exception as e:
print(f"An error occurred during separation: {e}")
yield { status_log: log_history + f"❌ ERROR: {e}", progress_bar: gr.update(visible=False) }
def slice_stem_real(stem_audio_data, loop_choice, sensitivity, stem_name, progress_fn=None):
# Slices a single stem into loops or one-shots
if stem_audio_data is None:
return None, None
sample_rate, y_int = stem_audio_data
y = librosa.util.buf_to_float(y_int)
y_mono = librosa.to_mono(y.T) if y.ndim > 1 else y
if progress_fn: progress_fn(0.1, desc="Detecting BPM...")
tempo, beats = librosa.beat.beat_track(y=y_mono, sr=sample_rate)
bpm = 120 if tempo is None else np.round(tempo)
bpm_int = int(bpm.item())
if bpm_int == 0: bpm_int = 120; print("BPM detection failed, defaulting to 120 BPM.")
print(f"Detected BPM for {stem_name}: {bpm_int}")
output_files = []
loops_dir = tempfile.mkdtemp()
if "One-Shots" in loop_choice:
if progress_fn: progress_fn(0.3, desc="Finding transients...")
onset_frames = librosa.onset.onset_detect(y=y_mono, sr=sample_rate, delta=sensitivity, wait=1, pre_avg=1, post_avg=1, post_max=1)
onset_samples = librosa.frames_to_samples(onset_frames)
if progress_fn: progress_fn(0.5, desc="Slicing one-shots...")
if len(onset_samples) > 0:
num_onsets = len(onset_samples)
for i, start_sample in enumerate(onset_samples):
end_sample = onset_samples[i+1] if i+1 < num_onsets else len(y)
slice_data = y[start_sample:end_sample]
filename = os.path.join(loops_dir, f"{stem_name}_one_shot_{i+1:03d}.wav")
sf.write(filename, slice_data, sample_rate, subtype='PCM_16')
output_files.append(filename)
if progress_fn and num_onsets > 1:
progress_fn(0.5 + (i / (num_onsets - 1) * 0.5), desc=f"Exporting slice {i+1}/{num_onsets}...")
else: # Handle bar loops
bars = int(loop_choice.split(" ")[0])
seconds_per_beat = 60.0 / bpm_int
seconds_per_bar = seconds_per_beat * 4
loop_duration_seconds = seconds_per_bar * bars
loop_duration_samples = int(loop_duration_seconds * sample_rate)
if progress_fn: progress_fn(0.4, desc=f"Slicing into {bars}-bar loops...")
num_loops = len(y) // loop_duration_samples
if num_loops == 0:
print(f"Audio for {stem_name} is too short for {bars}-bar loops at {bpm_int} BPM.")
return None, None
for i in range(num_loops):
start_sample = i * loop_duration_samples
end_sample = start_sample + loop_duration_samples
slice_data = y[start_sample:end_sample]
filename = os.path.join(loops_dir, f"{stem_name}_{bars}bar_loop_{i+1:03d}_{bpm_int}bpm.wav")
sf.write(filename, slice_data, sample_rate, subtype='PCM_16')
output_files.append(filename)
if progress_fn and num_loops > 1:
progress_fn(0.4 + (i / (num_loops - 1) * 0.6), desc=f"Exporting loop {i+1}/{num_loops}...")
if not output_files:
return None, None
return output_files, loops_dir
async def slice_all_and_zip_real(vocals, drums, bass, other, loop_choice, sensitivity, progress=gr.Progress(track_tqdm=True)):
# Slices all available stems and creates a zip pack
log_history = "Starting batch slice...\n"
yield { status_log: log_history, progress_bar: progress(0, desc="Starting...", visible=True) }
await asyncio.sleep(0.1)
stems_to_process = {"vocals": vocals, "drums": drums, "bass": bass, "other": other}
zip_path = "Loop_Architect_Pack.zip"
num_stems = sum(1 for data in stems_to_process.values() if data is not None)
if num_stems == 0: raise gr.Error("No stems to process! Please separate stems first.")
all_temp_dirs = []
with zipfile.ZipFile(zip_path, 'w') as zf:
processed_count = 0
for name, data in stems_to_process.items():
if data is not None:
log_history += f"--- Slicing {name} stem ---\n"; yield { status_log: log_history }
def update_main_progress(p, desc=""):
overall_progress = (processed_count + p) / num_stems
progress(overall_progress, desc=f"Slicing {name}: {desc}")
sliced_files, temp_dir = slice_stem_real((data[0], data[1]), loop_choice, sensitivity, name, progress_fn=update_main_progress)
if sliced_files:
log_history += f"Generated {len(sliced_files)} slices for {name}.\n"; yield { status_log: log_history }
all_temp_dirs.append(temp_dir)
for loop_file in sliced_files:
arcname = os.path.join(name, os.path.basename(loop_file))
zf.write(loop_file, arcname)
else:
log_history += f"No slices generated for {name}.\n"; yield { status_log: log_history }
processed_count += 1
yield { status_log: log_history, progress_bar: progress(processed_count / num_stems, desc=f"Finished {name}") }
log_history += "Packaging complete!\n"; yield { status_log: log_history, progress_bar: progress(1, desc="Pack Ready!", visible=False) }
for d in all_temp_dirs:
if d and os.path.exists(d):
shutil.rmtree(d)
yield { download_zip_file: gr.update(value=zip_path, visible=True), status_log: log_history + "✅ Pack ready for download!"}
# --- Create the full Gradio Interface ---
with gr.Blocks(theme=gr.themes.Default(primary_hue="blue", secondary_hue="red")) as demo:
gr.Markdown("# 🎵 Loop Architect")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### 1. Separate Stems")
audio_input = gr.Audio(type="filepath", label="Upload a Track")
stem_options = gr.Radio(["4 Stems (Vocals, Drums, Bass, Other)", "2 Stems (Vocals + Instrumental)"], label="Separation Type", value="4 Stems (Vocals, Drums, Bass, Other)")
submit_button = gr.Button("Separate Stems")
with gr.Accordion("Slicing Options", open=True):
loop_options_radio = gr.Radio(["One-Shots (All Transients)", "4 Bar Loops", "8 Bar Loops"], label="Slice Type", value="One-Shots (All Transients)")
sensitivity_slider = gr.Slider(minimum=0.01, maximum=0.5, value=0.05, step=0.01, label="One-Shot Sensitivity", info="Lower values = more slices")
gr.Markdown("### 3. Create Pack")
slice_all_button = gr.Button("Slice All Stems & Create Pack", variant="primary")
download_zip_file = gr.File(label="Download Your Loop Pack", visible=False)
gr.Markdown("### Status")
# Removed label from gr.Progress
progress_bar = gr.Progress(visible=False)
status_log = gr.Textbox(label="Status Log", lines=10, interactive=False)
with gr.Column(scale=2):
with gr.Accordion("Separated Stems", open=True):
with gr.Row():
vocals_output = gr.Audio(label="Vocals", scale=4)
slice_vocals_btn = gr.Button("Slice Vocals", scale=1)
with gr.Row():
drums_output = gr.Audio(label="Drums", scale=4)
slice_drums_btn = gr.Button("Slice Drums", scale=1)
with gr.Row():
bass_output = gr.Audio(label="Bass", scale=4)
slice_bass_btn = gr.Button("Slice Bass", scale=1)
with gr.Row():
other_output = gr.Audio(label="Other / Instrumental", scale=4)
slice_other_btn = gr.Button("Slice Other", scale=1)
gr.Markdown("### Sliced Loops / Samples (Preview)")
loop_gallery = gr.Gallery(label="Generated Loops Preview", columns=8, object_fit="contain", height="auto", preview=True)
# --- Define Event Listeners ---
def slice_and_display(stem_data, loop_choice, sensitivity, stem_name):
# Wrapper to handle progress display for single slice buttons
log_history = f"Slicing {stem_name}...\n"
# Use a dictionary to update multiple components
yield {
status_log: log_history,
progress_bar: gr.update(value=0, visible=True, label=f"Slicing {stem_name}...")
}
# Define how slice_stem_real updates the progress bar
def update_single_progress(p, desc=""):
progress_bar.update(value=p, label=desc, visible=True)
files, temp_dir = slice_stem_real(stem_data, loop_choice, sensitivity, stem_name, progress_fn=update_single_progress)
if temp_dir and os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
yield {
loop_gallery: gr.update(value=files),
status_log: log_history + f"✅ Sliced {stem_name} into {len(files) if files else 0} pieces.",
progress_bar: gr.update(visible=False) # Hide progress bar when done
}
submit_event = submit_button.click(
fn=separate_stems,
inputs=[audio_input, stem_options],
# Corrected outputs: progress_bar removed from here
outputs=[vocals_output, drums_output, bass_output, other_output, status_log]
)
stem_options.change(fn=update_output_visibility, inputs=stem_options, outputs=[vocals_output, drums_output, bass_output, other_output])
# Use the wrapper function for individual slice buttons
slice_vocals_btn.click(fn=slice_and_display, inputs=[vocals_output, loop_options_radio, sensitivity_slider, gr.Textbox("vocals", visible=False)], outputs=[loop_gallery, status_log, progress_bar])
slice_drums_btn.click(fn=slice_and_display, inputs=[drums_output, loop_options_radio, sensitivity_slider, gr.Textbox("drums", visible=False)], outputs=[loop_gallery, status_log, progress_bar])
slice_bass_btn.click(fn=slice_and_display, inputs=[bass_output, loop_options_radio, sensitivity_slider, gr.Textbox("bass", visible=False)], outputs=[loop_gallery, status_log, progress_bar])
slice_other_btn.click(fn=slice_and_display, inputs=[other_output, loop_options_radio, sensitivity_slider, gr.Textbox("other", visible=False)], outputs=[loop_gallery, status_log, progress_bar])
slice_all_event = slice_all_button.click(
fn=slice_all_and_zip_real,
inputs=[vocals_output, drums_output, bass_output, other_output, loop_options_radio, sensitivity_slider],
# Corrected outputs: progress_bar removed from here
outputs=[download_zip_file, download_zip_file, status_log]
)
# --- Launch the UI ---
demo.launch()