SaltProphet's picture
Update app.py
00e1986 verified
raw
history blame
36.9 kB
import gradio as gr
import os
import shutil
import zipfile
import librosa
import numpy as np
from pydub import AudioSegment
from pydub.silence import split_on_silence
from moviepy.editor import AudioFileClip, ImageClip, CompositeVideoClip
# Keep import available but unused (Full feature set available)
from moviepy.video.fx.all import blackwhite, lum_contrast
import subprocess
from pathlib import Path
import sys
import yt_dlp
import json
from datetime import datetime
# --- PATCH FOR PILLOW 10.0+ vs MOVIEPY 1.0.3 COMPATIBILITY ---
import PIL.Image
if not hasattr(PIL.Image, 'ANTIALIAS'):
PIL.Image.ANTIALIAS = PIL.Image.LANCZOS
# -------------------------------------------------------------
# --- Configuration ---
OUTPUT_DIR = Path("nightpulse_output")
TEMP_DIR = Path("temp_processing")
# -----------------------------
# Startup Checks
# -----------------------------
def check_ffmpeg():
"""Ensure FFmpeg is installed and accessible."""
if shutil.which("ffmpeg") is None:
print("CRITICAL WARNING: FFmpeg not found in system PATH.")
print("Audio processing (pydub/demucs) will fail.")
return False
return True
check_ffmpeg()
# -----------------------------
# Cloud Import
# -----------------------------
def download_from_url(url):
"""Downloads audio from YouTube/SC/Direct Link to bypass file picker crashes."""
if not url:
return None
print(f"Fetching URL: {url}")
# Clean temp before new download to avoid collisions
if TEMP_DIR.exists():
shutil.rmtree(TEMP_DIR, ignore_errors=True)
TEMP_DIR.mkdir(parents=True, exist_ok=True)
ydl_opts = {
"format": "bestaudio/best",
"outtmpl": str(TEMP_DIR / "%(title)s.%(ext)s"),
"postprocessors": [
{"key": "FFmpegExtractAudio", "preferredcodec": "wav", "preferredquality": "192"}
],
"quiet": True,
"no_warnings": True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=True)
filename = ydl.prepare_filename(info)
final_path = Path(filename).with_suffix(".wav")
return str(final_path)
# -----------------------------
# File Handling (Safer)
# -----------------------------
def safe_copy_to_temp(audio_file: str) -> str:
"""
Copy source file into TEMP_DIR with a safe filename (avoids path/space/unicode surprises).
"""
src = Path(audio_file)
TEMP_DIR.mkdir(parents=True, exist_ok=True)
safe_stem = "".join(c if c.isalnum() or c in "._-" else "_" for c in src.stem)
dst = TEMP_DIR / f"{safe_stem}{src.suffix.lower()}"
try:
shutil.copy(src, dst)
except Exception:
return str(src)
return str(dst)
def ensure_wav(input_path: str) -> str:
"""
Convert input audio to WAV for Demucs reliability.
"""
p = Path(input_path)
if p.suffix.lower() == ".wav":
return str(p)
TEMP_DIR.mkdir(parents=True, exist_ok=True)
out = TEMP_DIR / f"{p.stem}.wav"
audio = AudioSegment.from_file(str(p))
audio.export(str(out), format="wav")
return str(out)
# -----------------------------
# Demucs Runner
# -----------------------------
def run_demucs(cmd):
"""
Run demucs and return stdout. If it fails, raise gr.Error with stdout/stderr tail.
"""
p = subprocess.run(cmd, capture_output=True, text=True)
if p.returncode != 0:
raise gr.Error(
"Demucs failed.\n\n"
f"Command:\n{cmd}\n\n"
f"STDOUT (tail):\n{(p.stdout or '')[-4000:]}\n\n"
f"STDERR (tail):\n{(p.stderr or '')[-4000:]}"
)
return p.stdout or ""
# -----------------------------
# BPM + Grid
# -----------------------------
def detect_bpm_multiwindow(audio_path, windows=((0, 60), (60, 60), (120, 60))):
"""
Multi-window BPM detection: sample multiple slices and take median.
"""
bpms = []
for offset, dur in windows:
try:
y, sr = librosa.load(audio_path, offset=float(offset), duration=float(dur), mono=True)
if len(y) < sr * 10:
continue
tempo, _ = librosa.beat.beat_track(y=y, sr=sr)
t = float(tempo[0] if np.ndim(tempo) > 0 else tempo)
if 40 <= t <= 220:
bpms.append(t)
except Exception:
pass
if not bpms:
return None
return int(round(float(np.median(bpms))))
def detect_bar_grid(audio_path, bpm, sr=22050, max_seconds=240):
"""
Returns bar start times in ms.
"""
y, sr = librosa.load(audio_path, sr=sr, mono=True, duration=max_seconds)
try:
tempo, beat_frames = librosa.beat.beat_track(y=y, sr=sr, units="frames")
beat_times = librosa.frames_to_time(beat_frames, sr=sr)
except Exception:
beat_times = None
if beat_times is None or len(beat_times) < 8:
ms_per_beat = (60.0 / max(1, int(bpm))) * 1000.0
total_ms = (len(y) / sr) * 1000.0
bar_ms = ms_per_beat * 4.0
return [int(i * bar_ms) for i in range(int(total_ms // bar_ms) + 1)]
bar_starts = beat_times[::4]
return [int(t * 1000.0) for t in bar_starts]
# -----------------------------
# Loudness + Processing
# -----------------------------
def rms_dbfs(seg: AudioSegment) -> float:
if seg.rms <= 0:
return -120.0
return 20.0 * float(np.log10(seg.rms / 32768.0))
def apply_loudness(seg: AudioSegment, mode: str, target_dbfs: float = -14.0) -> AudioSegment:
mode = (mode or "none").lower().strip()
if mode == "none":
return seg
if mode == "peak":
return seg.normalize()
if mode == "rms":
current = rms_dbfs(seg)
gain = float(target_dbfs) - float(current)
gain = max(min(gain, 12.0), -12.0)
return seg.apply_gain(gain)
return seg
def trim_tiny(seg: AudioSegment, window_ms: int = 8) -> AudioSegment:
"""Shave window_ms off BOTH ends."""
if len(seg) <= window_ms * 2:
return seg
return seg[window_ms:-window_ms]
def loop_seam_crossfade(seg: AudioSegment, seam_ms=20) -> AudioSegment:
"""
Takes the tail (seam_ms) and crossfades it into the head.
This reduces total length by seam_ms.
"""
seam_ms = int(seam_ms)
if seam_ms <= 0 or len(seg) <= seam_ms * 2:
return seg
head = seg[:seam_ms]
tail = seg[-seam_ms:]
body = seg[seam_ms:-seam_ms]
# Append head to tail (blending)
blended = tail.append(head, crossfade=seam_ms)
# Reattach to body
return body.append(blended, crossfade=seam_ms)
# -----------------------------
# De-dup
# -----------------------------
def dedupe_by_bar_spacing(candidates, bar_starts_ms, min_bar_gap=4):
if not bar_starts_ms:
return candidates
selected = []
used_bars = []
for score, start_ms, bar_len in candidates:
bar_index = int(np.argmin([abs(start_ms - b) for b in bar_starts_ms]))
if any(abs(bar_index - ub) < int(min_bar_gap) for ub in used_bars):
continue
selected.append((score, start_ms, bar_len))
used_bars.append(bar_index)
return selected
# -----------------------------
# Loop Engine (FIXED DRIFT)
# -----------------------------
def make_quantized_loops(
stem_path: Path,
stem_name: str,
bpm: int,
bar_starts_ms: list,
bar_lengths: list,
hop_bars: int,
loops_per_stem: int,
top_k: int,
fade_ms: int,
loop_seam: bool,
seam_ms: int,
min_bar_gap: int,
loudness_mode: str,
target_dbfs: float,
out_dir: Path
):
if not stem_path.exists():
return []
audio = AudioSegment.from_wav(str(stem_path))
ms_per_beat = (60.0 / max(1, int(bpm))) * 1000.0
ms_per_bar = int(ms_per_beat * 4.0)
hop_bars = max(1, int(hop_bars))
loops_per_stem = max(1, int(loops_per_stem))
fade_ms = int(fade_ms)
seam_ms = int(seam_ms)
min_bar_gap = int(min_bar_gap)
# Calculate extra audio needed to compensate for trim and seam
# trim_tiny removes 2x window (8ms start, 8ms end)
trim_window = 8
needed_extra = 0
if loop_seam:
needed_extra += seam_ms
needed_extra += (trim_window * 2)
grid = bar_starts_ms[::hop_bars] if bar_starts_ms else []
candidates = []
for bar_len in bar_lengths:
target_dur_ms = ms_per_bar * int(bar_len)
extract_dur_ms = target_dur_ms + needed_extra
for start_ms in grid:
if start_ms + extract_dur_ms > len(audio):
continue
# Extract WITH the buffer
seg = audio[start_ms : start_ms + extract_dur_ms]
if len(seg) < extract_dur_ms:
continue
# Score based on RMS
candidates.append((rms_dbfs(seg), int(start_ms), int(bar_len)))
candidates.sort(key=lambda x: x[0], reverse=True)
if int(top_k) > 0:
candidates = candidates[:int(top_k)]
candidates = dedupe_by_bar_spacing(candidates, bar_starts_ms, min_bar_gap=min_bar_gap)
exported = []
for rank, (score, start_ms, bar_len) in enumerate(candidates[:loops_per_stem], start=1):
target_dur_ms = ms_per_bar * int(bar_len)
extract_dur_ms = target_dur_ms + needed_extra
loop = audio[start_ms : start_ms + extract_dur_ms]
# 1. Trim Tiny (removes trim_window from start and end)
loop = trim_tiny(loop, window_ms=trim_window)
# 2. Seam or Fade
if loop_seam:
loop = loop_seam_crossfade(loop, seam_ms=seam_ms)
else:
# If no seam, we just have extra audio hanging off the end. Trim it.
loop = loop[:target_dur_ms]
if fade_ms > 0:
loop = loop.fade_in(fade_ms).fade_out(fade_ms)
# 3. Final Hard Quantize (Critical for DAW sync)
# Force length to be exactly the grid length
loop = loop[:target_dur_ms]
loop = apply_loudness(loop, mode=loudness_mode, target_dbfs=float(target_dbfs))
if bar_starts_ms:
bar_index = int(np.argmin([abs(start_ms - b) for b in bar_starts_ms]))
else:
bar_index = int(start_ms // max(1, ms_per_bar))
out_name = f"{int(bpm)}BPM_{stem_name}_B{bar_index:03d}_L{int(bar_len)}bars_R{rank:02d}.wav"
out_path = out_dir / out_name
loop.export(out_path, format="wav")
exported.append(out_path)
return exported
# -----------------------------
# Vocal Chop Engines
# -----------------------------
def vocal_chops_silence(
vocals_path: Path,
bpm: int,
out_dir: Path,
max_chops: int = 48,
min_len_ms: int = 120,
max_len_ms: int = 1500,
silence_thresh_db: int = -35,
min_silence_len_ms: int = 140,
keep_silence_ms: int = 20,
fade_ms: int = 8,
loudness_mode: str = "none",
target_dbfs: float = -14.0
):
if not vocals_path.exists():
return []
audio = AudioSegment.from_wav(str(vocals_path))
chunks = split_on_silence(
audio,
min_silence_len=int(min_silence_len_ms),
silence_thresh=int(silence_thresh_db),
keep_silence=int(keep_silence_ms),
)
kept = []
for c in chunks:
if len(c) < int(min_len_ms):
continue
if len(c) > int(max_len_ms):
c = c[:int(max_len_ms)]
kept.append(c)
scored = [(rms_dbfs(c), c) for c in kept]
scored.sort(key=lambda x: x[0], reverse=True)
out_dir.mkdir(parents=True, exist_ok=True)
exported = []
for i, (score, c) in enumerate(scored[:int(max_chops)], start=1):
c = trim_tiny(c, window_ms=8)
if int(fade_ms) > 0:
c = c.fade_in(int(fade_ms)).fade_out(int(fade_ms))
c = apply_loudness(c, mode=loudness_mode, target_dbfs=float(target_dbfs))
out_name = f"{int(bpm)}BPM_Vocals_CHOP_SIL_R{i:02d}.wav"
out_path = out_dir / out_name
c.export(out_path, format="wav")
exported.append(out_path)
return exported
def vocal_chops_onset(
vocals_path: Path,
bpm: int,
out_dir: Path,
max_chops: int = 48,
min_len_ms: int = 90,
max_len_ms: int = 900,
sr: int = 22050,
backtrack: bool = True,
fade_ms: int = 8,
loudness_mode: str = "none",
target_dbfs: float = -14.0
):
if not vocals_path.exists():
return []
y, sr = librosa.load(str(vocals_path), sr=sr, mono=True)
onset_frames = librosa.onset.onset_detect(y=y, sr=sr, backtrack=bool(backtrack))
onset_times = librosa.frames_to_time(onset_frames, sr=sr)
onset_ms = [int(t * 1000.0) for t in onset_times]
if len(onset_ms) < 3:
return vocal_chops_silence(
vocals_path=vocals_path,
bpm=bpm,
out_dir=out_dir,
max_chops=max_chops,
min_len_ms=min_len_ms,
max_len_ms=max_len_ms,
fade_ms=fade_ms,
loudness_mode=loudness_mode,
target_dbfs=target_dbfs
)
audio = AudioSegment.from_wav(str(vocals_path))
segments = []
for i in range(len(onset_ms) - 1):
s = onset_ms[i]
e = onset_ms[i + 1]
if e <= s:
continue
seg = audio[s:e]
if len(seg) < int(min_len_ms):
continue
if len(seg) > int(max_len_ms):
seg = seg[:int(max_len_ms)]
segments.append(seg)
tail_start = onset_ms[-1]
if tail_start < len(audio):
tail = audio[tail_start: min(len(audio), tail_start + int(max_len_ms))]
if len(tail) >= int(min_len_ms):
segments.append(tail)
scored = [(rms_dbfs(s), s) for s in segments]
scored.sort(key=lambda x: x[0], reverse=True)
out_dir.mkdir(parents=True, exist_ok=True)
exported = []
for i, (score, seg) in enumerate(scored[:int(max_chops)], start=1):
seg = trim_tiny(seg, window_ms=8)
if int(fade_ms) > 0:
seg = seg.fade_in(int(fade_ms)).fade_out(int(fade_ms))
seg = apply_loudness(seg, mode=loudness_mode, target_dbfs=float(target_dbfs))
out_name = f"{int(bpm)}BPM_Vocals_CHOP_ONS_R{i:02d}.wav"
out_path = out_dir / out_name
seg.export(out_path, format="wav")
exported.append(out_path)
return exported
def vocal_chops_grid(
vocals_path: Path,
bpm: int,
out_dir: Path,
grid_size: str = "1beat",
max_chops: int = 64,
fade_ms: int = 6,
loudness_mode: str = "none",
target_dbfs: float = -14.0,
rms_gate: int = 200
):
if not vocals_path.exists():
return []
audio = AudioSegment.from_wav(str(vocals_path))
ms_per_beat = (60.0 / max(1, int(bpm))) * 1000.0
grid_map = {
"half": ms_per_beat * 0.5,
"1beat": ms_per_beat,
"2beat": ms_per_beat * 2,
"1bar": ms_per_beat * 4,
}
step = int(grid_map.get((grid_size or "1beat").strip(), ms_per_beat))
chops = []
for start in range(0, len(audio) - step, step):
seg = audio[start:start + step]
if seg.rms < int(rms_gate):
continue
chops.append((rms_dbfs(seg), seg))
chops.sort(key=lambda x: x[0], reverse=True)
out_dir.mkdir(parents=True, exist_ok=True)
exported = []
for i, (score, seg) in enumerate(chops[:int(max_chops)], start=1):
seg = trim_tiny(seg, 6)
if int(fade_ms) > 0:
seg = seg.fade_in(int(fade_ms)).fade_out(int(fade_ms))
seg = apply_loudness(seg, mode=loudness_mode, target_dbfs=float(target_dbfs))
out_name = f"{int(bpm)}BPM_Vocals_CHOP_GRID_{grid_size}_R{i:02d}.wav"
out_path = out_dir / out_name
seg.export(out_path, format="wav")
exported.append(out_path)
return exported
# -----------------------------
# Demucs Modes + Stem Mapping
# -----------------------------
def demucs_command(model_mode: str, audio_file: str):
model_mode = (model_mode or "6stem").lower().strip()
if model_mode == "2stem":
return [sys.executable, "-m", "demucs", "-n", "htdemucs", "--two-stems", "vocals", "--out", str(TEMP_DIR), audio_file], "htdemucs"
if model_mode == "4stem":
return [sys.executable, "-m", "demucs", "-n", "htdemucs", "--out", str(TEMP_DIR), audio_file], "htdemucs"
return [sys.executable, "-m", "demucs", "-n", "htdemucs_6s", "--out", str(TEMP_DIR), audio_file], "htdemucs_6s"
def map_stems(track_folder: Path, mode: str):
mode = (mode or "6stem").lower().strip()
stems = {}
if mode == "2stem":
stems["Vocals"] = track_folder / "vocals.wav"
stems["Instrumental"] = track_folder / "no_vocals.wav"
return stems
if mode == "4stem":
stems["Drums"] = track_folder / "drums.wav"
stems["Bass"] = track_folder / "bass.wav"
stems["Synths"] = track_folder / "other.wav"
stems["Vocals"] = track_folder / "vocals.wav"
return stems
stems["Drums"] = track_folder / "drums.wav"
stems["Bass"] = track_folder / "bass.wav"
stems["Guitar"] = track_folder / "guitar.wav"
stems["Piano"] = track_folder / "piano.wav"
stems["Synths"] = track_folder / "other.wav"
stems["Vocals"] = track_folder / "vocals.wav"
return stems
# -----------------------------
# Phase 1: Analyze + Separate
# -----------------------------
def analyze_and_separate(file_input, url_input, stem_mode, manual_bpm):
# --- CRITICAL FIX: CLEAN UP OLD RUNS TO PREVENT GHOST STEMS ---
if TEMP_DIR.exists():
try:
shutil.rmtree(TEMP_DIR, ignore_errors=True)
except Exception:
pass
TEMP_DIR.mkdir(parents=True, exist_ok=True)
# -------------------------------------------------------------
audio_file = None
if url_input and len(url_input) > 5:
print("Using Cloud Import...")
try:
audio_file = download_from_url(url_input)
except Exception as e:
raise gr.Error(f"Link Download Failed: {str(e)}")
elif file_input:
print("Using File Upload...")
audio_file = file_input
if not audio_file:
raise gr.Error("No audio source found. Paste a link or upload a file.")
try:
if OUTPUT_DIR.exists():
shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
(OUTPUT_DIR / "Stems").mkdir(parents=True, exist_ok=True)
(OUTPUT_DIR / "Loops").mkdir(parents=True, exist_ok=True)
TEMP_DIR.mkdir(parents=True, exist_ok=True)
audio_file = safe_copy_to_temp(audio_file)
audio_file = ensure_wav(audio_file)
# BPM
if manual_bpm and int(manual_bpm) > 0:
bpm = int(manual_bpm)
else:
bpm = detect_bpm_multiwindow(audio_file)
if bpm is None:
raise gr.Error("BPM detection failed. Enter BPM manually.")
bpm = max(40, min(220, int(bpm)))
print(f"Using BPM: {bpm}")
cmd, demucs_model_folder = demucs_command(stem_mode, audio_file)
print(f"Separating stems (mode={stem_mode})...")
try:
run_demucs(cmd)
except gr.Error as e:
if (stem_mode or "").lower().strip() == "6stem":
print("6-stem failed; falling back to 4-stem htdemucs...")
stem_mode = "4stem"
cmd, demucs_model_folder = demucs_command(stem_mode, audio_file)
run_demucs(cmd)
else:
raise
demucs_out = TEMP_DIR / demucs_model_folder
track_folder = next(demucs_out.iterdir(), None)
if not track_folder:
raise FileNotFoundError("Demucs separation failed (no output folder found).")
stems = map_stems(track_folder, stem_mode)
# --- UPDATE UI CHECKBOXES DYNAMICALLY ---
available_stems = list(stems.keys())
# Default checked = all stems for export
new_export_stems = gr.CheckboxGroup(choices=available_stems, value=available_stems)
# Default checked = all except vocals for loops
loop_defaults = [s for s in available_stems if s != "Vocals"]
new_loop_stems = gr.CheckboxGroup(choices=available_stems, value=loop_defaults)
# ----------------------------------------
p_drums = str(stems["Drums"]) if "Drums" in stems and stems["Drums"].exists() else None
p_bass = str(stems["Bass"]) if "Bass" in stems and stems["Bass"].exists() else None
p_guitar = str(stems["Guitar"]) if "Guitar" in stems and stems["Guitar"].exists() else None
p_piano = str(stems["Piano"]) if "Piano" in stems and stems["Piano"].exists() else None
if "Synths" in stems and stems["Synths"].exists():
p_other = str(stems["Synths"])
elif "Instrumental" in stems and stems["Instrumental"].exists():
p_other = str(stems["Instrumental"])
else:
p_other = None
p_vocals = str(stems["Vocals"]) if "Vocals" in stems and stems["Vocals"].exists() else None
return (
p_drums, p_bass, p_guitar, p_piano, p_other, p_vocals,
bpm, str(track_folder), stem_mode,
new_export_stems, new_loop_stems # Return the dynamic updates
)
except Exception as e:
raise gr.Error(f"Process Failed: {str(e)}")
# -----------------------------
# Phase 2: Package + Export
# -----------------------------
def package_and_export(
track_folder_str,
bpm,
stem_mode,
cover_art,
export_stems,
loop_stems,
enable_vocal_chops,
loops_per_stem,
bar_lengths,
hop_bars,
top_k,
fade_ms,
loop_seam,
seam_ms,
min_bar_gap,
loudness_mode,
target_dbfs,
vocal_chop_mode,
vocal_grid_size,
vocal_max_chops,
vocal_min_ms,
vocal_max_ms,
vocal_silence_thresh_db,
vocal_min_silence_len_ms
):
# --- FIX: Check if Phase 1 was run ---
if not track_folder_str:
raise gr.Error("Phase 1 incomplete! Please run 'Separate Stems' first, or wait for it to finish.")
# -------------------------------------
try:
track_folder = Path(track_folder_str)
bpm = int(bpm)
stems = map_stems(track_folder, stem_mode)
if OUTPUT_DIR.exists():
shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
(OUTPUT_DIR / "Stems").mkdir(parents=True, exist_ok=True)
(OUTPUT_DIR / "Loops").mkdir(parents=True, exist_ok=True)
(OUTPUT_DIR / "Vocal_Chops").mkdir(parents=True, exist_ok=True)
export_stems = set(export_stems or [])
loop_stems = set(loop_stems or [])
for name, path in stems.items():
if name in export_stems and path.exists():
shutil.copy(path, OUTPUT_DIR / "Stems" / f"{bpm}BPM_Full_{name}.wav")
grid_source = None
for k in ("Drums", "Synths", "Instrumental", "Vocals", "Bass"):
if k in stems and stems[k].exists():
grid_source = stems[k]
break
if grid_source is None:
raise FileNotFoundError("No stems found to build bar grid.")
bar_starts_ms = detect_bar_grid(str(grid_source), bpm=bpm, max_seconds=240)
if not bar_lengths:
bar_lengths = ["4", "8"]
bar_lengths_int = sorted(list({int(x) for x in bar_lengths if str(x).strip().isdigit()}))
if not bar_lengths_int:
bar_lengths_int = [4, 8]
loops_dir = OUTPUT_DIR / "Loops"
all_loops = {}
for stem_name, stem_path in stems.items():
if stem_name == "Vocals":
continue
if stem_name not in loop_stems:
continue
if not stem_path.exists():
continue
exported = make_quantized_loops(
stem_path=stem_path,
stem_name=stem_name,
bpm=bpm,
bar_starts_ms=bar_starts_ms,
bar_lengths=bar_lengths_int,
hop_bars=int(hop_bars),
loops_per_stem=int(loops_per_stem),
top_k=int(top_k),
fade_ms=int(fade_ms),
loop_seam=bool(loop_seam),
seam_ms=int(seam_ms),
min_bar_gap=int(min_bar_gap),
loudness_mode=str(loudness_mode),
target_dbfs=float(target_dbfs),
out_dir=loops_dir
)
all_loops[stem_name] = exported
vocal_exports = []
if "Vocals" in stems and stems["Vocals"].exists():
if enable_vocal_chops:
mode = (vocal_chop_mode or "grid").lower().strip()
if mode == "silence":
vocal_exports = vocal_chops_silence(
vocals_path=stems["Vocals"],
bpm=bpm,
out_dir=OUTPUT_DIR / "Vocal_Chops",
max_chops=int(vocal_max_chops),
min_len_ms=int(vocal_min_ms),
max_len_ms=int(vocal_max_ms),
silence_thresh_db=int(vocal_silence_thresh_db),
min_silence_len_ms=int(vocal_min_silence_len_ms),
fade_ms=int(fade_ms),
loudness_mode=str(loudness_mode),
target_dbfs=float(target_dbfs),
)
elif mode == "onset":
vocal_exports = vocal_chops_onset(
vocals_path=stems["Vocals"],
bpm=bpm,
out_dir=OUTPUT_DIR / "Vocal_Chops",
max_chops=int(vocal_max_chops),
min_len_ms=int(vocal_min_ms),
max_len_ms=int(vocal_max_ms),
fade_ms=int(fade_ms),
loudness_mode=str(loudness_mode),
target_dbfs=float(target_dbfs),
)
elif mode == "grid":
vocal_exports = vocal_chops_grid(
vocals_path=stems["Vocals"],
bpm=bpm,
out_dir=OUTPUT_DIR / "Vocal_Chops",
grid_size=str(vocal_grid_size),
max_chops=int(vocal_max_chops),
fade_ms=max(1, int(fade_ms // 2)),
loudness_mode=str(loudness_mode),
target_dbfs=float(target_dbfs),
)
else:
vocal_exports = []
else:
if "Vocals" in loop_stems:
vocal_exports = make_quantized_loops(
stem_path=stems["Vocals"],
stem_name="Vocals",
bpm=bpm,
bar_starts_ms=bar_starts_ms,
bar_lengths=bar_lengths_int,
hop_bars=int(hop_bars),
loops_per_stem=int(loops_per_stem),
top_k=int(top_k),
fade_ms=int(fade_ms),
loop_seam=bool(loop_seam),
seam_ms=int(seam_ms),
min_bar_gap=int(min_bar_gap),
loudness_mode=str(loudness_mode),
target_dbfs=float(target_dbfs),
out_dir=loops_dir
)
all_loops["Vocals"] = vocal_exports
video_loop = None
for key in ("Synths", "Piano", "Guitar", "Instrumental"):
if all_loops.get(key):
video_loop = all_loops[key][0]
break
video_path = None
if cover_art and video_loop:
print("Rendering Video...")
vid_out = OUTPUT_DIR / "Promo_Video.mp4"
audio_clip = AudioFileClip(str(video_loop))
duration = audio_clip.duration
img = ImageClip(cover_art).resize(width=1080)
img = img.resize(lambda t: 1 + 0.02 * t)
img = img.set_position(("center", "center"))
img = img.set_duration(duration)
img = img.set_audio(audio_clip)
final_clip = CompositeVideoClip([img], size=(1080, 1920))
final_clip.duration = duration
final_clip.audio = audio_clip
final_clip.fps = 24
final_clip.write_videofile(str(vid_out), codec="libx264", audio_codec="aac", logger=None)
video_path = str(vid_out)
manifest = {
"created_at": datetime.utcnow().isoformat() + "Z",
"bpm": bpm,
"stem_mode": stem_mode,
"export_stems": sorted(list(export_stems)),
"loop_stems": sorted(list(loop_stems)),
"enable_vocal_chops": bool(enable_vocal_chops),
"bar_lengths": bar_lengths_int,
"hop_bars": int(hop_bars),
"loops_per_stem": int(loops_per_stem),
"top_k": int(top_k),
"fade_ms": int(fade_ms),
"loop_seam": bool(loop_seam),
"seam_ms": int(seam_ms),
"min_bar_gap": int(min_bar_gap),
"loudness_mode": str(loudness_mode),
"target_dbfs": float(target_dbfs),
"vocal_chop_mode": str(vocal_chop_mode),
"vocal_grid_size": str(vocal_grid_size),
"vocal_max_chops": int(vocal_max_chops),
"vocal_min_ms": int(vocal_min_ms),
"vocal_max_ms": int(vocal_max_ms),
"vocal_silence_thresh_db": int(vocal_silence_thresh_db),
"vocal_min_silence_len_ms": int(vocal_min_silence_len_ms),
}
(OUTPUT_DIR / "manifest.json").write_text(json.dumps(manifest, indent=2), encoding="utf-8")
zip_file = "NightPulse_Pack.zip"
with zipfile.ZipFile(zip_file, "w") as zf:
for root, dirs, files in os.walk(OUTPUT_DIR):
for file in files:
file_path = Path(root) / file
arcname = file_path.relative_to(OUTPUT_DIR)
zf.write(file_path, arcname)
return zip_file, video_path
except Exception as e:
raise gr.Error(f"Packaging Failed: {str(e)}")
# -----------------------------
# UI
# -----------------------------
with gr.Blocks(title="Night Pulse | Studio Pro") as app:
gr.Markdown("# 🎛️ Night Pulse | Studio Command Center")
gr.Markdown("Selectable stems + loop engine + real vocal chops + loop-safe seams + variety dedupe.")
stored_folder = gr.State()
stored_bpm = gr.State()
stored_mode = gr.State()
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### 1. Audio Source")
stem_mode = gr.Dropdown(
choices=[
("2 stems (Vocals + Instrumental)", "2stem"),
("4 stems (Drums/Bass/Other/Vocals)", "4stem"),
("6 stems (Drums/Bass/Guitar/Piano/Other/Vocals)", "6stem"),
],
value="6stem",
label="Stem Mode"
)
manual_bpm = gr.Number(label="Manual BPM Override (optional)", precision=0, value=None)
with gr.Tabs():
with gr.TabItem("☁️ Import Link (Mobile Safe)"):
input_url = gr.Textbox(
label="Paste URL Here",
placeholder="https://youtube.com/watch?v=...",
show_label=False,
)
with gr.TabItem("📂 Upload File (Desktop)"):
input_file = gr.Audio(type="filepath", label="Upload Master Track")
input_art = gr.Image(type="filepath", label="Cover Art (9:16)")
btn_analyze = gr.Button("🔍 Phase 1: Separate Stems", variant="primary")
with gr.Column(scale=1):
gr.Markdown("### 2. Stem Preview (missing stems will be blank)")
with gr.Row():
p_drums = gr.Audio(label="Drums")
p_bass = gr.Audio(label="Bass")
with gr.Row():
p_guitar = gr.Audio(label="Guitar")
p_piano = gr.Audio(label="Piano")
with gr.Row():
p_other = gr.Audio(label="Other / Synths / Instrumental")
p_vocals = gr.Audio(label="Vocals")
gr.Markdown("---")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### 3. Stem Selection")
export_stems = gr.CheckboxGroup(
["Drums", "Bass", "Guitar", "Piano", "Synths", "Vocals", "Instrumental"],
value=["Drums", "Bass", "Synths", "Vocals"],
label="Export Full Stems"
)
loop_stems = gr.CheckboxGroup(
["Drums", "Bass", "Guitar", "Piano", "Synths", "Instrumental", "Vocals"],
value=["Drums", "Bass", "Synths"],
label="Generate Loops For"
)
enable_vocal_chops = gr.Checkbox(value=True, label="Generate Vocal Chops (vocals only)")
gr.Markdown("### 4. Loop Engine Settings")
loops_per_stem = gr.Slider(1, 40, value=12, step=1, label="Loops per Stem (selected loop stems)")
bar_lengths = gr.CheckboxGroup(
choices=["1", "2", "4", "8"],
value=["4", "8"],
label="Bar Lengths (4/4)"
)
hop_bars = gr.Slider(1, 8, value=1, step=1, label="Hop (bars between starts)")
top_k = gr.Slider(0, 200, value=30, step=1, label="Top-K candidates per stem (0 = no filter)")
min_bar_gap = gr.Slider(0, 16, value=4, step=1, label="Min bar gap (de-dup spacing)")
fade_ms = gr.Slider(0, 50, value=12, step=1, label="Click-safety fade (ms)")
loop_seam = gr.Checkbox(value=True, label="Loop-safe seam (crossfade ends)")
seam_ms = gr.Slider(0, 80, value=20, step=1, label="Loop seam crossfade (ms)")
loudness_mode = gr.Dropdown(
choices=["none", "peak", "rms"],
value="none",
label="Loudness mode"
)
target_dbfs = gr.Slider(-24, -8, value=-14, step=1, label="Target RMS dBFS (only for rms mode)")
gr.Markdown("### 5. Vocals: Real Chop Mode")
vocal_chop_mode = gr.Dropdown(
choices=[("Silence chops", "silence"),
("Onset chops", "onset"),
("Grid chops (BPM)", "grid")],
value="grid",
label="Vocal Chop Mode"
)
vocal_grid_size = gr.Dropdown(
choices=[("Half beat", "half"),
("1 beat", "1beat"),
("2 beats", "2beat"),
("1 bar", "1bar")],
value="1beat",
label="Grid Chop Size"
)
vocal_max_chops = gr.Slider(4, 160, value=64, step=1, label="Max vocal chops to export")
vocal_min_ms = gr.Slider(40, 500, value=120, step=10, label="Min chop length (ms)")
vocal_max_ms = gr.Slider(200, 4000, value=1500, step=50, label="Max chop length (ms)")
vocal_silence_thresh_db = gr.Slider(-60, -10, value=-35, step=1, label="Silence threshold (dBFS, silence mode)")
vocal_min_silence_len_ms = gr.Slider(60, 800, value=140, step=10, label="Min silence length (ms, silence mode)")
btn_package = gr.Button("📦 Phase 2: Package & Export", variant="primary")
with gr.Column(scale=1):
gr.Markdown("### 6. Final Output")
out_zip = gr.File(label="Download Pack (ZIP)")
out_video = gr.Video(label="Promo Video")
# Events
btn_analyze.click(
fn=analyze_and_separate,
inputs=[input_file, input_url, stem_mode, manual_bpm],
outputs=[
p_drums, p_bass, p_guitar, p_piano, p_other, p_vocals,
stored_bpm, stored_folder, stored_mode,
export_stems, loop_stems # Targeted updates for checkboxes
],
)
btn_package.click(
fn=package_and_export,
inputs=[
stored_folder,
stored_bpm,
stored_mode,
input_art,
export_stems,
loop_stems,
enable_vocal_chops,
loops_per_stem,
bar_lengths,
hop_bars,
top_k,
fade_ms,
loop_seam,
seam_ms,
min_bar_gap,
loudness_mode,
target_dbfs,
vocal_chop_mode,
vocal_grid_size,
vocal_max_chops,
vocal_min_ms,
vocal_max_ms,
vocal_silence_thresh_db,
vocal_min_silence_len_ms,
],
outputs=[out_zip, out_video],
)
if __name__ == "__main__":
app.launch()