Loop-Architect / app.py
SaltProphet's picture
Rename Loopy.py to app.py
6a48bc0 verified
raw
history blame
17.6 kB
import gradio as gr
import os
import shutil
import asyncio
import librosa
import librosa.display
import soundfile as sf
import numpy as np
import time
import zipfile
import tempfile
import matplotlib.pyplot as plt
import matplotlib
# Use a non-interactive backend for Matplotlib
matplotlib.use('Agg')
def update_output_visibility(choice):
"""Updates the visibility of stem outputs based on user's choice."""
if "2 Stems" in choice:
return {
vocals_output: gr.update(visible=True),
drums_output: gr.update(visible=False),
bass_output: gr.update(visible=False),
other_output: gr.update(visible=True, label="Instrumental (No Vocals)")
}
elif "4 Stems" in choice:
return {
vocals_output: gr.update(visible=True),
drums_output: gr.update(visible=True),
bass_output: gr.update(visible=True),
other_output: gr.update(visible=True, label="Other")
}
async def separate_stems(audio_file_path, stem_choice):
"""
Separates audio into stems using Demucs.
This is an async generator function that yields updates to the UI.
"""
if audio_file_path is None:
raise gr.Error("No audio file uploaded!")
log_history = "Starting separation...\n"
yield {
status_log: log_history
}
try:
log_history += "Preparing audio file...\n"
yield {
status_log: log_history
}
original_filename_base = os.path.basename(audio_file_path).rsplit('.', 1)[0]
# Create a stable path to avoid issues with temp files
stable_input_path = f"stable_input_{original_filename_base}.wav"
shutil.copy(audio_file_path, stable_input_path)
model_arg = "--two-stems=vocals" if "2 Stems" in stem_choice else ""
output_dir = "separated"
if os.path.exists(output_dir):
shutil.rmtree(output_dir)
command = f"python3 -m demucs {model_arg} -o \"{output_dir}\" \"{stable_input_path}\""
log_history += f"Running Demucs command: {command}\n(This may take a minute or more depending on track length)\n"
yield {
status_log: log_history
}
process = await asyncio.create_subprocess_shell(
command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
stdout, stderr = await process.communicate()
if process.returncode != 0:
print(f"Demucs Error: {stderr.decode()}")
# Enhanced error message to explicitly tell the user about the missing dependency
raise gr.Error(f"Demucs failed. You must install the 'demucs' package first! Error: {stderr.decode()[:500]}")
log_history += "Demucs finished. Locating stem files...\n"
yield {
status_log: log_history
}
stable_filename_base = os.path.basename(stable_input_path).rsplit('.', 1)[0]
subfolders = [f.name for f in os.scandir(output_dir) if f.is_dir()]
if not subfolders:
raise gr.Error("Demucs output folder structure not found!")
# Assume the first subfolder is the one Demucs created
model_folder_name = subfolders[0]
stems_path = os.path.join(output_dir, model_folder_name, stable_filename_base)
if not os.path.exists(stems_path):
# Fallback: sometimes demucs doesn't create the innermost folder
stems_path = os.path.join(output_dir, model_folder_name)
if not (os.path.exists(os.path.join(stems_path, "vocals.wav")) or os.path.exists(os.path.join(stems_path, "no_vocals.wav"))):
raise gr.Error(f"Demucs output directory was not found! Looked for: {stems_path}")
vocals_path = os.path.join(stems_path, "vocals.wav") if os.path.exists(os.path.join(stems_path, "vocals.wav")) else None
drums_path = os.path.join(stems_path, "drums.wav") if os.path.exists(os.path.join(stems_path, "drums.wav")) else None
bass_path = os.path.join(stems_path, "bass.wav") if os.path.exists(os.path.join(stems_path, "bass.wav")) else None
other_filename = "no_vocals.wav" if "2 Stems" in stem_choice else "other.wav"
other_path = os.path.join(stems_path, other_filename) if os.path.exists(os.path.join(stems_path, other_filename)) else None
# Clean up the copied input file
os.remove(stable_input_path)
log_history += "✅ Stem separation complete!\n"
yield {
status_log: log_history,
vocals_output: gr.update(value=vocals_path),
drums_output: gr.update(value=drums_path),
bass_output: gr.update(value=bass_path),
other_output: gr.update(value=other_path)
}
except Exception as e:
print(f"An error occurred during separation: {e}")
# Clean up input file on error too
if 'stable_input_path' in locals() and os.path.exists(stable_input_path):
os.remove(stable_input_path)
yield {
status_log: log_history + f"❌ ERROR: {e}"
}
def slice_stem_real(stem_audio_data, loop_choice, sensitivity, stem_name, progress_fn=None):
"""
The core logic for slicing a single stem.
Accepts an optional progress_fn callback for use in loops.
"""
if stem_audio_data is None:
return None, None
sample_rate, y_int = stem_audio_data
# Convert from int16 to float
y = librosa.util.buf_to_float(y_int, dtype=np.float32)
# Ensure y is 1D or 2D
if y.ndim == 0:
print(f"Warning: Empty audio data for {stem_name}.")
return None, None
y_mono = librosa.to_mono(y.T) if y.ndim > 1 else y
if progress_fn: progress_fn(0.1, desc="Detecting BPM...")
tempo, beats = librosa.beat.beat_track(y=y_mono, sr=sample_rate)
bpm = 120 if tempo is None or tempo == 0 else np.round(tempo)
bpm_int = int(bpm.item())
if bpm_int == 0:
bpm_int = 120 # Final fallback
print("BPM detection failed, defaulting to 120 BPM.")
print(f"Detected BPM for {stem_name}: {bpm_int}")
output_files = []
loops_dir = tempfile.mkdtemp()
if "One-Shots" in loop_choice:
if progress_fn: progress_fn(0.3, desc="Finding transients...")
# Adjust onset detection parameters
onset_frames = librosa.onset.onset_detect(
y=y_mono, sr=sample_rate, delta=sensitivity,
wait=1, pre_avg=1, post_avg=1, post_max=1, units='frames'
)
onset_samples = librosa.frames_to_samples(onset_frames)
if progress_fn: progress_fn(0.5, desc="Slicing one-shots...")
if len(onset_samples) > 0:
num_onsets = len(onset_samples)
for i, start_sample in enumerate(onset_samples):
end_sample = onset_samples[i+1] if i+1 < num_onsets else len(y)
# Use original stereo/mono data for the slice
slice_data = y[start_sample:end_sample]
filename = os.path.join(loops_dir, f"{stem_name}_one_shot_{i+1:03d}.wav")
sf.write(filename, slice_data, sample_rate, subtype='PCM_16')
output_files.append(filename)
if progress_fn and num_onsets > 1 and i > 0:
progress = 0.5 + (i / (num_onsets - 1) * 0.5)
progress_fn(progress, desc=f"Exporting slice {i+1}/{num_onsets}...")
else:
# Loop slicing logic
bars = int(loop_choice.split(" ")[0])
seconds_per_beat = 60.0 / bpm_int
seconds_per_bar = seconds_per_beat * 4 # Assuming 4/4 time
loop_duration_seconds = seconds_per_bar * bars
loop_duration_samples = int(loop_duration_seconds * sample_rate)
if loop_duration_samples == 0:
print(f"Loop duration is 0 for {stem_name}. BPM: {bpm_int}")
return None, None
if progress_fn: progress_fn(0.4, desc=f"Slicing into {bars}-bar loops...")
num_loops = len(y) // loop_duration_samples
if num_loops == 0:
print(f"Audio for {stem_name} is too short for {bars}-bar loops at {bpm_int} BPM.")
return None, None
for i in range(num_loops):
start_sample = i * loop_duration_samples
end_sample = start_sample + loop_duration_samples
# Use original stereo/mono data for the slice
slice_data = y[start_sample:end_sample]
filename = os.path.join(loops_dir, f"{stem_name}_{bars}bar_loop_{i+1:03d}_{bpm_int}bpm.wav")
sf.write(filename, slice_data, sample_rate, subtype='PCM_16')
output_files.append(filename)
if progress_fn and num_loops > 1 and i > 0:
progress = 0.4 + (i / (num_loops - 1) * 0.6)
progress_fn(progress, desc=f"Exporting loop {i+1}/{num_loops}...")
if not output_files:
return None, None
return output_files, loops_dir
async def slice_all_and_zip_real(vocals, drums, bass, other, loop_choice, sensitivity):
"""
Slices all available stems and packages them into a ZIP file.
This is an async generator function that yields updates to the UI.
"""
log_history = "Starting batch slice...\n"
yield {
status_log: log_history
}
await asyncio.sleep(0.1) # Give UI time to update
stems_to_process = {"vocals": vocals, "drums": drums, "bass": bass, "other": other}
zip_path = "Loop_Architect_Pack.zip"
num_stems = sum(1 for data in stems_to_process.values() if data is not None)
if num_stems == 0:
raise gr.Error("No stems to process! Please separate stems first.")
all_temp_dirs = []
try:
with zipfile.ZipFile(zip_path, 'w') as zf:
processed_count = 0
for name, data in stems_to_process.items():
if data is not None:
log_history += f"--- Slicing {name} stem ---\n"
yield { status_log: log_history }
# Simple progress function: cannot update Gradio UI inside slice_stem_real
def simple_progress_callback(p, desc=""):
# Log to console for debugging, but no UI update
pass
sliced_files, temp_dir = slice_stem_real(
(data[0], data[1]), loop_choice, sensitivity, name,
progress_fn=simple_progress_callback
)
if sliced_files:
log_history += f"Generated {len(sliced_files)} slices for {name}.\n"
all_temp_dirs.append(temp_dir)
for loop_file in sliced_files:
arcname = os.path.join(name, os.path.basename(loop_file))
zf.write(loop_file, arcname)
else:
log_history += f"No slices generated for {name}.\n"
processed_count += 1
yield {
status_log: log_history
}
log_history += "Packaging complete!\n"
yield {
status_log: log_history + "✅ Pack ready for download!",
download_zip_file: gr.update(value=zip_path, visible=True)
}
except Exception as e:
print(f"An error occurred during slice all: {e}")
yield {
status_log: log_history + f"❌ ERROR: {e}"
}
finally:
# Clean up all temporary directories
for d in all_temp_dirs:
if d and os.path.exists(d):
shutil.rmtree(d)
# --- Create the full Gradio Interface ---
with gr.Blocks(theme=gr.themes.Default(primary_hue="blue", secondary_hue="red")) as demo:
gr.Markdown("# 🎵 Loop Architect")
gr.Markdown("Upload any song to separate it into stems (vocals, drums, etc.) and then slice those stems into loops or one-shot samples.")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### 1. Separate Stems")
audio_input = gr.Audio(type="filepath", label="Upload a Track")
stem_options = gr.Radio(
["4 Stems (Vocals, Drums, Bass, Other)", "2 Stems (Vocals + Instrumental)"],
label="Separation Type",
value="4 Stems (Vocals, Drums, Bass, Other)"
)
submit_button = gr.Button("Separate Stems", variant="primary")
gr.Markdown("### 2. Slicing Options")
with gr.Group():
loop_options_radio = gr.Radio(
["One-Shots (All Transients)", "4 Bar Loops", "8 Bar Loops"],
label="Slice Type",
value="One-Shots (All Transients)"
)
sensitivity_slider = gr.Slider(
minimum=0.01, maximum=0.5, value=0.05, step=0.01,
label="One-Shot Sensitivity",
info="Lower values = more slices"
)
gr.Markdown("### 3. Create Pack")
slice_all_button = gr.Button("Slice All Stems & Create Pack")
download_zip_file = gr.File(label="Download Your Loop Pack", visible=False)
gr.Markdown("### Status")
status_log = gr.Textbox(label="Status Log", lines=10, interactive=False)
with gr.Column(scale=2):
with gr.Accordion("Separated Stems (Preview & Slice)", open=True):
with gr.Row():
vocals_output = gr.Audio(label="Vocals", scale=4)
slice_vocals_btn = gr.Button("Slice Vocals", scale=1)
with gr.Row():
drums_output = gr.Audio(label="Drums", scale=4)
slice_drums_btn = gr.Button("Slice Drums", scale=1)
with gr.Row():
bass_output = gr.Audio(label="Bass", scale=4)
slice_bass_btn = gr.Button("Slice Bass", scale=1)
with gr.Row():
other_output = gr.Audio(label="Other / Instrumental", scale=4)
slice_other_btn = gr.Button("Slice Other", scale=1)
gr.Markdown("### Sliced Loops / Samples (Preview)")
loop_gallery = gr.Gallery(
label="Generated Loops Preview",
columns=8, object_fit="contain", height="auto", preview=True
)
# --- Define Event Listeners ---
def slice_and_display(stem_data, loop_choice, sensitivity, stem_name):
"""
Generator function to slice a single stem and update UI.
"""
if stem_data is None:
yield {
status_log: f"No {stem_name} audio data to slice."
}
return
log_history = f"Slicing {stem_name}...\n"
yield {
status_log: log_history
}
# Define a callback for slice_stem_real to update this generator's progress
def update_single_progress(p, desc=""):
# This is a console log for debugging, as Gradio doesn't allow UI yield from inside here
print(f"Slice Progress {stem_name}: {p} - {desc}")
files, temp_dir = slice_stem_real(
stem_data, loop_choice, sensitivity, stem_name,
progress_fn=update_single_progress # Pass the callback
)
if temp_dir and os.path.exists(temp_dir):
shutil.rmtree(temp_dir) # Clean up temp dir
yield {
loop_gallery: gr.update(value=files),
status_log: log_history + f"✅ Sliced {stem_name} into {len(files) if files else 0} pieces."
}
# --- MAIN EVENT LISTENERS ---
# This event separates the stems
submit_event = submit_button.click(
fn=separate_stems,
inputs=[audio_input, stem_options],
outputs=[
vocals_output, drums_output, bass_output, other_output,
status_log
]
)
# This event updates the UI based on the stem choice
stem_options.change(
fn=update_output_visibility,
inputs=stem_options,
outputs=[vocals_output, drums_output, bass_output, other_output]
)
# --- Single Slice Button Events ---
slice_vocals_btn.click(fn=slice_and_display, inputs=[vocals_output, loop_options_radio, sensitivity_slider, gr.Textbox("vocals", visible=False)], outputs=[loop_gallery, status_log])
slice_drums_btn.click(fn=slice_and_display, inputs=[drums_output, loop_options_radio, sensitivity_slider, gr.Textbox("drums", visible=False)], outputs=[loop_gallery, status_log])
slice_bass_btn.click(fn=slice_and_display, inputs=[bass_output, loop_options_radio, sensitivity_slider, gr.Textbox("bass", visible=False)], outputs=[loop_gallery, status_log])
slice_other_btn.click(fn=slice_and_display, inputs=[other_output, loop_options_radio, sensitivity_slider, gr.Textbox("other", visible=False)], outputs=[loop_gallery, status_log])
# This event slices all stems and zips them
slice_all_event = slice_all_button.click(
fn=slice_all_and_zip_real,
inputs=[vocals_output, drums_output, bass_output, other_output, loop_options_radio, sensitivity_slider],
outputs=[download_zip_file, status_log]
)
if __name__ == "__main__":
demo.launch(debug=True)