Spaces:
Build error
Build error
| import gradio as gr | |
| import os | |
| import shutil | |
| import zipfile | |
| import librosa | |
| import numpy as np | |
| from pydub import AudioSegment | |
| from pydub.silence import split_on_silence | |
| from moviepy.editor import AudioFileClip, ImageClip, CompositeVideoClip | |
| # Keep import available but unused (Full feature set available) | |
| from moviepy.video.fx.all import blackwhite, lum_contrast | |
| import subprocess | |
| from pathlib import Path | |
| import sys | |
| import yt_dlp | |
| import json | |
| from datetime import datetime | |
| # --- PATCH FOR PILLOW 10.0+ vs MOVIEPY 1.0.3 COMPATIBILITY --- | |
| import PIL.Image | |
| if not hasattr(PIL.Image, 'ANTIALIAS'): | |
| PIL.Image.ANTIALIAS = PIL.Image.LANCZOS | |
| # ------------------------------------------------------------- | |
| # --- Configuration --- | |
| OUTPUT_DIR = Path("nightpulse_output") | |
| TEMP_DIR = Path("temp_processing") | |
| # ----------------------------- | |
| # Startup Checks | |
| # ----------------------------- | |
| def check_ffmpeg(): | |
| """Ensure FFmpeg is installed and accessible.""" | |
| if shutil.which("ffmpeg") is None: | |
| print("CRITICAL WARNING: FFmpeg not found in system PATH.") | |
| print("Audio processing (pydub/demucs) will fail.") | |
| return False | |
| return True | |
| check_ffmpeg() | |
| # ----------------------------- | |
| # Cloud Import | |
| # ----------------------------- | |
| def download_from_url(url): | |
| """Downloads audio from YouTube/SC/Direct Link to bypass file picker crashes.""" | |
| if not url: | |
| return None | |
| print(f"Fetching URL: {url}") | |
| # Clean temp before new download to avoid collisions | |
| if TEMP_DIR.exists(): | |
| shutil.rmtree(TEMP_DIR, ignore_errors=True) | |
| TEMP_DIR.mkdir(parents=True, exist_ok=True) | |
| ydl_opts = { | |
| "format": "bestaudio/best", | |
| "outtmpl": str(TEMP_DIR / "%(title)s.%(ext)s"), | |
| "postprocessors": [ | |
| {"key": "FFmpegExtractAudio", "preferredcodec": "wav", "preferredquality": "192"} | |
| ], | |
| "quiet": True, | |
| "no_warnings": True, | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(url, download=True) | |
| filename = ydl.prepare_filename(info) | |
| final_path = Path(filename).with_suffix(".wav") | |
| return str(final_path) | |
| # ----------------------------- | |
| # File Handling (Safer) | |
| # ----------------------------- | |
| def safe_copy_to_temp(audio_file: str) -> str: | |
| """ | |
| Copy source file into TEMP_DIR with a safe filename (avoids path/space/unicode surprises). | |
| """ | |
| src = Path(audio_file) | |
| TEMP_DIR.mkdir(parents=True, exist_ok=True) | |
| safe_stem = "".join(c if c.isalnum() or c in "._-" else "_" for c in src.stem) | |
| dst = TEMP_DIR / f"{safe_stem}{src.suffix.lower()}" | |
| try: | |
| shutil.copy(src, dst) | |
| except Exception: | |
| return str(src) | |
| return str(dst) | |
| def ensure_wav(input_path: str) -> str: | |
| """ | |
| Convert input audio to WAV for Demucs reliability. | |
| """ | |
| p = Path(input_path) | |
| if p.suffix.lower() == ".wav": | |
| return str(p) | |
| TEMP_DIR.mkdir(parents=True, exist_ok=True) | |
| out = TEMP_DIR / f"{p.stem}.wav" | |
| audio = AudioSegment.from_file(str(p)) | |
| audio.export(str(out), format="wav") | |
| return str(out) | |
| # ----------------------------- | |
| # Demucs Runner | |
| # ----------------------------- | |
| def run_demucs(cmd): | |
| """ | |
| Run demucs and return stdout. If it fails, raise gr.Error with stdout/stderr tail. | |
| """ | |
| p = subprocess.run(cmd, capture_output=True, text=True) | |
| if p.returncode != 0: | |
| raise gr.Error( | |
| "Demucs failed.\n\n" | |
| f"Command:\n{cmd}\n\n" | |
| f"STDOUT (tail):\n{(p.stdout or '')[-4000:]}\n\n" | |
| f"STDERR (tail):\n{(p.stderr or '')[-4000:]}" | |
| ) | |
| return p.stdout or "" | |
| # ----------------------------- | |
| # BPM + Grid | |
| # ----------------------------- | |
| def detect_bpm_multiwindow(audio_path, windows=((0, 60), (60, 60), (120, 60))): | |
| """ | |
| Multi-window BPM detection: sample multiple slices and take median. | |
| """ | |
| bpms = [] | |
| for offset, dur in windows: | |
| try: | |
| y, sr = librosa.load(audio_path, offset=float(offset), duration=float(dur), mono=True) | |
| if len(y) < sr * 10: | |
| continue | |
| tempo, _ = librosa.beat.beat_track(y=y, sr=sr) | |
| t = float(tempo[0] if np.ndim(tempo) > 0 else tempo) | |
| if 40 <= t <= 220: | |
| bpms.append(t) | |
| except Exception: | |
| pass | |
| if not bpms: | |
| return None | |
| return int(round(float(np.median(bpms)))) | |
| def detect_bar_grid(audio_path, bpm, sr=22050, max_seconds=240): | |
| """ | |
| Returns bar start times in ms. | |
| """ | |
| y, sr = librosa.load(audio_path, sr=sr, mono=True, duration=max_seconds) | |
| try: | |
| tempo, beat_frames = librosa.beat.beat_track(y=y, sr=sr, units="frames") | |
| beat_times = librosa.frames_to_time(beat_frames, sr=sr) | |
| except Exception: | |
| beat_times = None | |
| if beat_times is None or len(beat_times) < 8: | |
| ms_per_beat = (60.0 / max(1, int(bpm))) * 1000.0 | |
| total_ms = (len(y) / sr) * 1000.0 | |
| bar_ms = ms_per_beat * 4.0 | |
| return [int(i * bar_ms) for i in range(int(total_ms // bar_ms) + 1)] | |
| bar_starts = beat_times[::4] | |
| return [int(t * 1000.0) for t in bar_starts] | |
| # ----------------------------- | |
| # Loudness + Processing | |
| # ----------------------------- | |
| def rms_dbfs(seg: AudioSegment) -> float: | |
| if seg.rms <= 0: | |
| return -120.0 | |
| return 20.0 * float(np.log10(seg.rms / 32768.0)) | |
| def apply_loudness(seg: AudioSegment, mode: str, target_dbfs: float = -14.0) -> AudioSegment: | |
| mode = (mode or "none").lower().strip() | |
| if mode == "none": | |
| return seg | |
| if mode == "peak": | |
| return seg.normalize() | |
| if mode == "rms": | |
| current = rms_dbfs(seg) | |
| gain = float(target_dbfs) - float(current) | |
| gain = max(min(gain, 12.0), -12.0) | |
| return seg.apply_gain(gain) | |
| return seg | |
| def trim_tiny(seg: AudioSegment, window_ms: int = 8) -> AudioSegment: | |
| """Shave window_ms off BOTH ends.""" | |
| if len(seg) <= window_ms * 2: | |
| return seg | |
| return seg[window_ms:-window_ms] | |
| def loop_seam_crossfade(seg: AudioSegment, seam_ms=20) -> AudioSegment: | |
| """ | |
| Takes the tail (seam_ms) and crossfades it into the head. | |
| This reduces total length by seam_ms. | |
| """ | |
| seam_ms = int(seam_ms) | |
| if seam_ms <= 0 or len(seg) <= seam_ms * 2: | |
| return seg | |
| head = seg[:seam_ms] | |
| tail = seg[-seam_ms:] | |
| body = seg[seam_ms:-seam_ms] | |
| # Append head to tail (blending) | |
| blended = tail.append(head, crossfade=seam_ms) | |
| # Reattach to body | |
| return body.append(blended, crossfade=seam_ms) | |
| # ----------------------------- | |
| # De-dup | |
| # ----------------------------- | |
| def dedupe_by_bar_spacing(candidates, bar_starts_ms, min_bar_gap=4): | |
| if not bar_starts_ms: | |
| return candidates | |
| selected = [] | |
| used_bars = [] | |
| for score, start_ms, bar_len in candidates: | |
| bar_index = int(np.argmin([abs(start_ms - b) for b in bar_starts_ms])) | |
| if any(abs(bar_index - ub) < int(min_bar_gap) for ub in used_bars): | |
| continue | |
| selected.append((score, start_ms, bar_len)) | |
| used_bars.append(bar_index) | |
| return selected | |
| # ----------------------------- | |
| # Loop Engine (FIXED DRIFT) | |
| # ----------------------------- | |
| def make_quantized_loops( | |
| stem_path: Path, | |
| stem_name: str, | |
| bpm: int, | |
| bar_starts_ms: list, | |
| bar_lengths: list, | |
| hop_bars: int, | |
| loops_per_stem: int, | |
| top_k: int, | |
| fade_ms: int, | |
| loop_seam: bool, | |
| seam_ms: int, | |
| min_bar_gap: int, | |
| loudness_mode: str, | |
| target_dbfs: float, | |
| out_dir: Path | |
| ): | |
| if not stem_path.exists(): | |
| return [] | |
| audio = AudioSegment.from_wav(str(stem_path)) | |
| ms_per_beat = (60.0 / max(1, int(bpm))) * 1000.0 | |
| ms_per_bar = int(ms_per_beat * 4.0) | |
| hop_bars = max(1, int(hop_bars)) | |
| loops_per_stem = max(1, int(loops_per_stem)) | |
| fade_ms = int(fade_ms) | |
| seam_ms = int(seam_ms) | |
| min_bar_gap = int(min_bar_gap) | |
| # Calculate extra audio needed to compensate for trim and seam | |
| # trim_tiny removes 2x window (8ms start, 8ms end) | |
| trim_window = 8 | |
| needed_extra = 0 | |
| if loop_seam: | |
| needed_extra += seam_ms | |
| needed_extra += (trim_window * 2) | |
| grid = bar_starts_ms[::hop_bars] if bar_starts_ms else [] | |
| candidates = [] | |
| for bar_len in bar_lengths: | |
| target_dur_ms = ms_per_bar * int(bar_len) | |
| extract_dur_ms = target_dur_ms + needed_extra | |
| for start_ms in grid: | |
| if start_ms + extract_dur_ms > len(audio): | |
| continue | |
| # Extract WITH the buffer | |
| seg = audio[start_ms : start_ms + extract_dur_ms] | |
| if len(seg) < extract_dur_ms: | |
| continue | |
| # Score based on RMS | |
| candidates.append((rms_dbfs(seg), int(start_ms), int(bar_len))) | |
| candidates.sort(key=lambda x: x[0], reverse=True) | |
| if int(top_k) > 0: | |
| candidates = candidates[:int(top_k)] | |
| candidates = dedupe_by_bar_spacing(candidates, bar_starts_ms, min_bar_gap=min_bar_gap) | |
| exported = [] | |
| for rank, (score, start_ms, bar_len) in enumerate(candidates[:loops_per_stem], start=1): | |
| target_dur_ms = ms_per_bar * int(bar_len) | |
| extract_dur_ms = target_dur_ms + needed_extra | |
| loop = audio[start_ms : start_ms + extract_dur_ms] | |
| # 1. Trim Tiny (removes trim_window from start and end) | |
| loop = trim_tiny(loop, window_ms=trim_window) | |
| # 2. Seam or Fade | |
| if loop_seam: | |
| loop = loop_seam_crossfade(loop, seam_ms=seam_ms) | |
| else: | |
| # If no seam, we just have extra audio hanging off the end. Trim it. | |
| loop = loop[:target_dur_ms] | |
| if fade_ms > 0: | |
| loop = loop.fade_in(fade_ms).fade_out(fade_ms) | |
| # 3. Final Hard Quantize (Critical for DAW sync) | |
| # Force length to be exactly the grid length | |
| loop = loop[:target_dur_ms] | |
| loop = apply_loudness(loop, mode=loudness_mode, target_dbfs=float(target_dbfs)) | |
| if bar_starts_ms: | |
| bar_index = int(np.argmin([abs(start_ms - b) for b in bar_starts_ms])) | |
| else: | |
| bar_index = int(start_ms // max(1, ms_per_bar)) | |
| out_name = f"{int(bpm)}BPM_{stem_name}_B{bar_index:03d}_L{int(bar_len)}bars_R{rank:02d}.wav" | |
| out_path = out_dir / out_name | |
| loop.export(out_path, format="wav") | |
| exported.append(out_path) | |
| return exported | |
| # ----------------------------- | |
| # Vocal Chop Engines | |
| # ----------------------------- | |
| def vocal_chops_silence( | |
| vocals_path: Path, | |
| bpm: int, | |
| out_dir: Path, | |
| max_chops: int = 48, | |
| min_len_ms: int = 120, | |
| max_len_ms: int = 1500, | |
| silence_thresh_db: int = -35, | |
| min_silence_len_ms: int = 140, | |
| keep_silence_ms: int = 20, | |
| fade_ms: int = 8, | |
| loudness_mode: str = "none", | |
| target_dbfs: float = -14.0 | |
| ): | |
| if not vocals_path.exists(): | |
| return [] | |
| audio = AudioSegment.from_wav(str(vocals_path)) | |
| chunks = split_on_silence( | |
| audio, | |
| min_silence_len=int(min_silence_len_ms), | |
| silence_thresh=int(silence_thresh_db), | |
| keep_silence=int(keep_silence_ms), | |
| ) | |
| kept = [] | |
| for c in chunks: | |
| if len(c) < int(min_len_ms): | |
| continue | |
| if len(c) > int(max_len_ms): | |
| c = c[:int(max_len_ms)] | |
| kept.append(c) | |
| scored = [(rms_dbfs(c), c) for c in kept] | |
| scored.sort(key=lambda x: x[0], reverse=True) | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| exported = [] | |
| for i, (score, c) in enumerate(scored[:int(max_chops)], start=1): | |
| c = trim_tiny(c, window_ms=8) | |
| if int(fade_ms) > 0: | |
| c = c.fade_in(int(fade_ms)).fade_out(int(fade_ms)) | |
| c = apply_loudness(c, mode=loudness_mode, target_dbfs=float(target_dbfs)) | |
| out_name = f"{int(bpm)}BPM_Vocals_CHOP_SIL_R{i:02d}.wav" | |
| out_path = out_dir / out_name | |
| c.export(out_path, format="wav") | |
| exported.append(out_path) | |
| return exported | |
| def vocal_chops_onset( | |
| vocals_path: Path, | |
| bpm: int, | |
| out_dir: Path, | |
| max_chops: int = 48, | |
| min_len_ms: int = 90, | |
| max_len_ms: int = 900, | |
| sr: int = 22050, | |
| backtrack: bool = True, | |
| fade_ms: int = 8, | |
| loudness_mode: str = "none", | |
| target_dbfs: float = -14.0 | |
| ): | |
| if not vocals_path.exists(): | |
| return [] | |
| y, sr = librosa.load(str(vocals_path), sr=sr, mono=True) | |
| onset_frames = librosa.onset.onset_detect(y=y, sr=sr, backtrack=bool(backtrack)) | |
| onset_times = librosa.frames_to_time(onset_frames, sr=sr) | |
| onset_ms = [int(t * 1000.0) for t in onset_times] | |
| if len(onset_ms) < 3: | |
| return vocal_chops_silence( | |
| vocals_path=vocals_path, | |
| bpm=bpm, | |
| out_dir=out_dir, | |
| max_chops=max_chops, | |
| min_len_ms=min_len_ms, | |
| max_len_ms=max_len_ms, | |
| fade_ms=fade_ms, | |
| loudness_mode=loudness_mode, | |
| target_dbfs=target_dbfs | |
| ) | |
| audio = AudioSegment.from_wav(str(vocals_path)) | |
| segments = [] | |
| for i in range(len(onset_ms) - 1): | |
| s = onset_ms[i] | |
| e = onset_ms[i + 1] | |
| if e <= s: | |
| continue | |
| seg = audio[s:e] | |
| if len(seg) < int(min_len_ms): | |
| continue | |
| if len(seg) > int(max_len_ms): | |
| seg = seg[:int(max_len_ms)] | |
| segments.append(seg) | |
| tail_start = onset_ms[-1] | |
| if tail_start < len(audio): | |
| tail = audio[tail_start: min(len(audio), tail_start + int(max_len_ms))] | |
| if len(tail) >= int(min_len_ms): | |
| segments.append(tail) | |
| scored = [(rms_dbfs(s), s) for s in segments] | |
| scored.sort(key=lambda x: x[0], reverse=True) | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| exported = [] | |
| for i, (score, seg) in enumerate(scored[:int(max_chops)], start=1): | |
| seg = trim_tiny(seg, window_ms=8) | |
| if int(fade_ms) > 0: | |
| seg = seg.fade_in(int(fade_ms)).fade_out(int(fade_ms)) | |
| seg = apply_loudness(seg, mode=loudness_mode, target_dbfs=float(target_dbfs)) | |
| out_name = f"{int(bpm)}BPM_Vocals_CHOP_ONS_R{i:02d}.wav" | |
| out_path = out_dir / out_name | |
| seg.export(out_path, format="wav") | |
| exported.append(out_path) | |
| return exported | |
| def vocal_chops_grid( | |
| vocals_path: Path, | |
| bpm: int, | |
| out_dir: Path, | |
| grid_size: str = "1beat", | |
| max_chops: int = 64, | |
| fade_ms: int = 6, | |
| loudness_mode: str = "none", | |
| target_dbfs: float = -14.0, | |
| rms_gate: int = 200 | |
| ): | |
| if not vocals_path.exists(): | |
| return [] | |
| audio = AudioSegment.from_wav(str(vocals_path)) | |
| ms_per_beat = (60.0 / max(1, int(bpm))) * 1000.0 | |
| grid_map = { | |
| "half": ms_per_beat * 0.5, | |
| "1beat": ms_per_beat, | |
| "2beat": ms_per_beat * 2, | |
| "1bar": ms_per_beat * 4, | |
| } | |
| step = int(grid_map.get((grid_size or "1beat").strip(), ms_per_beat)) | |
| chops = [] | |
| for start in range(0, len(audio) - step, step): | |
| seg = audio[start:start + step] | |
| if seg.rms < int(rms_gate): | |
| continue | |
| chops.append((rms_dbfs(seg), seg)) | |
| chops.sort(key=lambda x: x[0], reverse=True) | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| exported = [] | |
| for i, (score, seg) in enumerate(chops[:int(max_chops)], start=1): | |
| seg = trim_tiny(seg, 6) | |
| if int(fade_ms) > 0: | |
| seg = seg.fade_in(int(fade_ms)).fade_out(int(fade_ms)) | |
| seg = apply_loudness(seg, mode=loudness_mode, target_dbfs=float(target_dbfs)) | |
| out_name = f"{int(bpm)}BPM_Vocals_CHOP_GRID_{grid_size}_R{i:02d}.wav" | |
| out_path = out_dir / out_name | |
| seg.export(out_path, format="wav") | |
| exported.append(out_path) | |
| return exported | |
| # ----------------------------- | |
| # Demucs Modes + Stem Mapping | |
| # ----------------------------- | |
| def demucs_command(model_mode: str, audio_file: str): | |
| model_mode = (model_mode or "6stem").lower().strip() | |
| if model_mode == "2stem": | |
| return [sys.executable, "-m", "demucs", "-n", "htdemucs", "--two-stems", "vocals", "--out", str(TEMP_DIR), audio_file], "htdemucs" | |
| if model_mode == "4stem": | |
| return [sys.executable, "-m", "demucs", "-n", "htdemucs", "--out", str(TEMP_DIR), audio_file], "htdemucs" | |
| return [sys.executable, "-m", "demucs", "-n", "htdemucs_6s", "--out", str(TEMP_DIR), audio_file], "htdemucs_6s" | |
| def map_stems(track_folder: Path, mode: str): | |
| mode = (mode or "6stem").lower().strip() | |
| stems = {} | |
| if mode == "2stem": | |
| stems["Vocals"] = track_folder / "vocals.wav" | |
| stems["Instrumental"] = track_folder / "no_vocals.wav" | |
| return stems | |
| if mode == "4stem": | |
| stems["Drums"] = track_folder / "drums.wav" | |
| stems["Bass"] = track_folder / "bass.wav" | |
| stems["Synths"] = track_folder / "other.wav" | |
| stems["Vocals"] = track_folder / "vocals.wav" | |
| return stems | |
| stems["Drums"] = track_folder / "drums.wav" | |
| stems["Bass"] = track_folder / "bass.wav" | |
| stems["Guitar"] = track_folder / "guitar.wav" | |
| stems["Piano"] = track_folder / "piano.wav" | |
| stems["Synths"] = track_folder / "other.wav" | |
| stems["Vocals"] = track_folder / "vocals.wav" | |
| return stems | |
| # ----------------------------- | |
| # Phase 1: Analyze + Separate | |
| # ----------------------------- | |
| def analyze_and_separate(file_input, url_input, stem_mode, manual_bpm): | |
| # --- CRITICAL FIX: CLEAN UP OLD RUNS TO PREVENT GHOST STEMS --- | |
| if TEMP_DIR.exists(): | |
| try: | |
| shutil.rmtree(TEMP_DIR, ignore_errors=True) | |
| except Exception: | |
| pass | |
| TEMP_DIR.mkdir(parents=True, exist_ok=True) | |
| # ------------------------------------------------------------- | |
| audio_file = None | |
| if url_input and len(url_input) > 5: | |
| print("Using Cloud Import...") | |
| try: | |
| audio_file = download_from_url(url_input) | |
| except Exception as e: | |
| raise gr.Error(f"Link Download Failed: {str(e)}") | |
| elif file_input: | |
| print("Using File Upload...") | |
| audio_file = file_input | |
| if not audio_file: | |
| raise gr.Error("No audio source found. Paste a link or upload a file.") | |
| try: | |
| if OUTPUT_DIR.exists(): | |
| shutil.rmtree(OUTPUT_DIR, ignore_errors=True) | |
| (OUTPUT_DIR / "Stems").mkdir(parents=True, exist_ok=True) | |
| (OUTPUT_DIR / "Loops").mkdir(parents=True, exist_ok=True) | |
| TEMP_DIR.mkdir(parents=True, exist_ok=True) | |
| audio_file = safe_copy_to_temp(audio_file) | |
| audio_file = ensure_wav(audio_file) | |
| # BPM | |
| if manual_bpm and int(manual_bpm) > 0: | |
| bpm = int(manual_bpm) | |
| else: | |
| bpm = detect_bpm_multiwindow(audio_file) | |
| if bpm is None: | |
| raise gr.Error("BPM detection failed. Enter BPM manually.") | |
| bpm = max(40, min(220, int(bpm))) | |
| print(f"Using BPM: {bpm}") | |
| cmd, demucs_model_folder = demucs_command(stem_mode, audio_file) | |
| print(f"Separating stems (mode={stem_mode})...") | |
| try: | |
| run_demucs(cmd) | |
| except gr.Error as e: | |
| if (stem_mode or "").lower().strip() == "6stem": | |
| print("6-stem failed; falling back to 4-stem htdemucs...") | |
| stem_mode = "4stem" | |
| cmd, demucs_model_folder = demucs_command(stem_mode, audio_file) | |
| run_demucs(cmd) | |
| else: | |
| raise | |
| demucs_out = TEMP_DIR / demucs_model_folder | |
| track_folder = next(demucs_out.iterdir(), None) | |
| if not track_folder: | |
| raise FileNotFoundError("Demucs separation failed (no output folder found).") | |
| stems = map_stems(track_folder, stem_mode) | |
| # --- UPDATE UI CHECKBOXES DYNAMICALLY --- | |
| available_stems = list(stems.keys()) | |
| # Default checked = all stems for export | |
| new_export_stems = gr.CheckboxGroup(choices=available_stems, value=available_stems) | |
| # Default checked = all except vocals for loops | |
| loop_defaults = [s for s in available_stems if s != "Vocals"] | |
| new_loop_stems = gr.CheckboxGroup(choices=available_stems, value=loop_defaults) | |
| # ---------------------------------------- | |
| p_drums = str(stems["Drums"]) if "Drums" in stems and stems["Drums"].exists() else None | |
| p_bass = str(stems["Bass"]) if "Bass" in stems and stems["Bass"].exists() else None | |
| p_guitar = str(stems["Guitar"]) if "Guitar" in stems and stems["Guitar"].exists() else None | |
| p_piano = str(stems["Piano"]) if "Piano" in stems and stems["Piano"].exists() else None | |
| if "Synths" in stems and stems["Synths"].exists(): | |
| p_other = str(stems["Synths"]) | |
| elif "Instrumental" in stems and stems["Instrumental"].exists(): | |
| p_other = str(stems["Instrumental"]) | |
| else: | |
| p_other = None | |
| p_vocals = str(stems["Vocals"]) if "Vocals" in stems and stems["Vocals"].exists() else None | |
| return ( | |
| p_drums, p_bass, p_guitar, p_piano, p_other, p_vocals, | |
| bpm, str(track_folder), stem_mode, | |
| new_export_stems, new_loop_stems # Return the dynamic updates | |
| ) | |
| except Exception as e: | |
| raise gr.Error(f"Process Failed: {str(e)}") | |
| # ----------------------------- | |
| # Phase 2: Package + Export | |
| # ----------------------------- | |
| def package_and_export( | |
| track_folder_str, | |
| bpm, | |
| stem_mode, | |
| cover_art, | |
| export_stems, | |
| loop_stems, | |
| enable_vocal_chops, | |
| loops_per_stem, | |
| bar_lengths, | |
| hop_bars, | |
| top_k, | |
| fade_ms, | |
| loop_seam, | |
| seam_ms, | |
| min_bar_gap, | |
| loudness_mode, | |
| target_dbfs, | |
| vocal_chop_mode, | |
| vocal_grid_size, | |
| vocal_max_chops, | |
| vocal_min_ms, | |
| vocal_max_ms, | |
| vocal_silence_thresh_db, | |
| vocal_min_silence_len_ms | |
| ): | |
| # --- FIX: Check if Phase 1 was run --- | |
| if not track_folder_str: | |
| raise gr.Error("Phase 1 incomplete! Please run 'Separate Stems' first, or wait for it to finish.") | |
| # ------------------------------------- | |
| try: | |
| track_folder = Path(track_folder_str) | |
| bpm = int(bpm) | |
| stems = map_stems(track_folder, stem_mode) | |
| if OUTPUT_DIR.exists(): | |
| shutil.rmtree(OUTPUT_DIR, ignore_errors=True) | |
| (OUTPUT_DIR / "Stems").mkdir(parents=True, exist_ok=True) | |
| (OUTPUT_DIR / "Loops").mkdir(parents=True, exist_ok=True) | |
| (OUTPUT_DIR / "Vocal_Chops").mkdir(parents=True, exist_ok=True) | |
| export_stems = set(export_stems or []) | |
| loop_stems = set(loop_stems or []) | |
| for name, path in stems.items(): | |
| if name in export_stems and path.exists(): | |
| shutil.copy(path, OUTPUT_DIR / "Stems" / f"{bpm}BPM_Full_{name}.wav") | |
| grid_source = None | |
| for k in ("Drums", "Synths", "Instrumental", "Vocals", "Bass"): | |
| if k in stems and stems[k].exists(): | |
| grid_source = stems[k] | |
| break | |
| if grid_source is None: | |
| raise FileNotFoundError("No stems found to build bar grid.") | |
| bar_starts_ms = detect_bar_grid(str(grid_source), bpm=bpm, max_seconds=240) | |
| if not bar_lengths: | |
| bar_lengths = ["4", "8"] | |
| bar_lengths_int = sorted(list({int(x) for x in bar_lengths if str(x).strip().isdigit()})) | |
| if not bar_lengths_int: | |
| bar_lengths_int = [4, 8] | |
| loops_dir = OUTPUT_DIR / "Loops" | |
| all_loops = {} | |
| for stem_name, stem_path in stems.items(): | |
| if stem_name == "Vocals": | |
| continue | |
| if stem_name not in loop_stems: | |
| continue | |
| if not stem_path.exists(): | |
| continue | |
| exported = make_quantized_loops( | |
| stem_path=stem_path, | |
| stem_name=stem_name, | |
| bpm=bpm, | |
| bar_starts_ms=bar_starts_ms, | |
| bar_lengths=bar_lengths_int, | |
| hop_bars=int(hop_bars), | |
| loops_per_stem=int(loops_per_stem), | |
| top_k=int(top_k), | |
| fade_ms=int(fade_ms), | |
| loop_seam=bool(loop_seam), | |
| seam_ms=int(seam_ms), | |
| min_bar_gap=int(min_bar_gap), | |
| loudness_mode=str(loudness_mode), | |
| target_dbfs=float(target_dbfs), | |
| out_dir=loops_dir | |
| ) | |
| all_loops[stem_name] = exported | |
| vocal_exports = [] | |
| if "Vocals" in stems and stems["Vocals"].exists(): | |
| if enable_vocal_chops: | |
| mode = (vocal_chop_mode or "grid").lower().strip() | |
| if mode == "silence": | |
| vocal_exports = vocal_chops_silence( | |
| vocals_path=stems["Vocals"], | |
| bpm=bpm, | |
| out_dir=OUTPUT_DIR / "Vocal_Chops", | |
| max_chops=int(vocal_max_chops), | |
| min_len_ms=int(vocal_min_ms), | |
| max_len_ms=int(vocal_max_ms), | |
| silence_thresh_db=int(vocal_silence_thresh_db), | |
| min_silence_len_ms=int(vocal_min_silence_len_ms), | |
| fade_ms=int(fade_ms), | |
| loudness_mode=str(loudness_mode), | |
| target_dbfs=float(target_dbfs), | |
| ) | |
| elif mode == "onset": | |
| vocal_exports = vocal_chops_onset( | |
| vocals_path=stems["Vocals"], | |
| bpm=bpm, | |
| out_dir=OUTPUT_DIR / "Vocal_Chops", | |
| max_chops=int(vocal_max_chops), | |
| min_len_ms=int(vocal_min_ms), | |
| max_len_ms=int(vocal_max_ms), | |
| fade_ms=int(fade_ms), | |
| loudness_mode=str(loudness_mode), | |
| target_dbfs=float(target_dbfs), | |
| ) | |
| elif mode == "grid": | |
| vocal_exports = vocal_chops_grid( | |
| vocals_path=stems["Vocals"], | |
| bpm=bpm, | |
| out_dir=OUTPUT_DIR / "Vocal_Chops", | |
| grid_size=str(vocal_grid_size), | |
| max_chops=int(vocal_max_chops), | |
| fade_ms=max(1, int(fade_ms // 2)), | |
| loudness_mode=str(loudness_mode), | |
| target_dbfs=float(target_dbfs), | |
| ) | |
| else: | |
| vocal_exports = [] | |
| else: | |
| if "Vocals" in loop_stems: | |
| vocal_exports = make_quantized_loops( | |
| stem_path=stems["Vocals"], | |
| stem_name="Vocals", | |
| bpm=bpm, | |
| bar_starts_ms=bar_starts_ms, | |
| bar_lengths=bar_lengths_int, | |
| hop_bars=int(hop_bars), | |
| loops_per_stem=int(loops_per_stem), | |
| top_k=int(top_k), | |
| fade_ms=int(fade_ms), | |
| loop_seam=bool(loop_seam), | |
| seam_ms=int(seam_ms), | |
| min_bar_gap=int(min_bar_gap), | |
| loudness_mode=str(loudness_mode), | |
| target_dbfs=float(target_dbfs), | |
| out_dir=loops_dir | |
| ) | |
| all_loops["Vocals"] = vocal_exports | |
| video_loop = None | |
| for key in ("Synths", "Piano", "Guitar", "Instrumental"): | |
| if all_loops.get(key): | |
| video_loop = all_loops[key][0] | |
| break | |
| video_path = None | |
| if cover_art and video_loop: | |
| print("Rendering Video...") | |
| vid_out = OUTPUT_DIR / "Promo_Video.mp4" | |
| audio_clip = AudioFileClip(str(video_loop)) | |
| duration = audio_clip.duration | |
| img = ImageClip(cover_art).resize(width=1080) | |
| img = img.resize(lambda t: 1 + 0.02 * t) | |
| img = img.set_position(("center", "center")) | |
| img = img.set_duration(duration) | |
| img = img.set_audio(audio_clip) | |
| final_clip = CompositeVideoClip([img], size=(1080, 1920)) | |
| final_clip.duration = duration | |
| final_clip.audio = audio_clip | |
| final_clip.fps = 24 | |
| final_clip.write_videofile(str(vid_out), codec="libx264", audio_codec="aac", logger=None) | |
| video_path = str(vid_out) | |
| manifest = { | |
| "created_at": datetime.utcnow().isoformat() + "Z", | |
| "bpm": bpm, | |
| "stem_mode": stem_mode, | |
| "export_stems": sorted(list(export_stems)), | |
| "loop_stems": sorted(list(loop_stems)), | |
| "enable_vocal_chops": bool(enable_vocal_chops), | |
| "bar_lengths": bar_lengths_int, | |
| "hop_bars": int(hop_bars), | |
| "loops_per_stem": int(loops_per_stem), | |
| "top_k": int(top_k), | |
| "fade_ms": int(fade_ms), | |
| "loop_seam": bool(loop_seam), | |
| "seam_ms": int(seam_ms), | |
| "min_bar_gap": int(min_bar_gap), | |
| "loudness_mode": str(loudness_mode), | |
| "target_dbfs": float(target_dbfs), | |
| "vocal_chop_mode": str(vocal_chop_mode), | |
| "vocal_grid_size": str(vocal_grid_size), | |
| "vocal_max_chops": int(vocal_max_chops), | |
| "vocal_min_ms": int(vocal_min_ms), | |
| "vocal_max_ms": int(vocal_max_ms), | |
| "vocal_silence_thresh_db": int(vocal_silence_thresh_db), | |
| "vocal_min_silence_len_ms": int(vocal_min_silence_len_ms), | |
| } | |
| (OUTPUT_DIR / "manifest.json").write_text(json.dumps(manifest, indent=2), encoding="utf-8") | |
| zip_file = "NightPulse_Pack.zip" | |
| with zipfile.ZipFile(zip_file, "w") as zf: | |
| for root, dirs, files in os.walk(OUTPUT_DIR): | |
| for file in files: | |
| file_path = Path(root) / file | |
| arcname = file_path.relative_to(OUTPUT_DIR) | |
| zf.write(file_path, arcname) | |
| return zip_file, video_path | |
| except Exception as e: | |
| raise gr.Error(f"Packaging Failed: {str(e)}") | |
| # ----------------------------- | |
| # UI | |
| # ----------------------------- | |
| with gr.Blocks(title="Night Pulse | Studio Pro") as app: | |
| gr.Markdown("# 🎛️ Night Pulse | Studio Command Center") | |
| gr.Markdown("Selectable stems + loop engine + real vocal chops + loop-safe seams + variety dedupe.") | |
| stored_folder = gr.State() | |
| stored_bpm = gr.State() | |
| stored_mode = gr.State() | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### 1. Audio Source") | |
| stem_mode = gr.Dropdown( | |
| choices=[ | |
| ("2 stems (Vocals + Instrumental)", "2stem"), | |
| ("4 stems (Drums/Bass/Other/Vocals)", "4stem"), | |
| ("6 stems (Drums/Bass/Guitar/Piano/Other/Vocals)", "6stem"), | |
| ], | |
| value="6stem", | |
| label="Stem Mode" | |
| ) | |
| manual_bpm = gr.Number(label="Manual BPM Override (optional)", precision=0, value=None) | |
| with gr.Tabs(): | |
| with gr.TabItem("☁️ Import Link (Mobile Safe)"): | |
| input_url = gr.Textbox( | |
| label="Paste URL Here", | |
| placeholder="https://youtube.com/watch?v=...", | |
| show_label=False, | |
| ) | |
| with gr.TabItem("📂 Upload File (Desktop)"): | |
| input_file = gr.Audio(type="filepath", label="Upload Master Track") | |
| input_art = gr.Image(type="filepath", label="Cover Art (9:16)") | |
| btn_analyze = gr.Button("🔍 Phase 1: Separate Stems", variant="primary") | |
| with gr.Column(scale=1): | |
| gr.Markdown("### 2. Stem Preview (missing stems will be blank)") | |
| with gr.Row(): | |
| p_drums = gr.Audio(label="Drums") | |
| p_bass = gr.Audio(label="Bass") | |
| with gr.Row(): | |
| p_guitar = gr.Audio(label="Guitar") | |
| p_piano = gr.Audio(label="Piano") | |
| with gr.Row(): | |
| p_other = gr.Audio(label="Other / Synths / Instrumental") | |
| p_vocals = gr.Audio(label="Vocals") | |
| gr.Markdown("---") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### 3. Stem Selection") | |
| export_stems = gr.CheckboxGroup( | |
| ["Drums", "Bass", "Guitar", "Piano", "Synths", "Vocals", "Instrumental"], | |
| value=["Drums", "Bass", "Synths", "Vocals"], | |
| label="Export Full Stems" | |
| ) | |
| loop_stems = gr.CheckboxGroup( | |
| ["Drums", "Bass", "Guitar", "Piano", "Synths", "Instrumental", "Vocals"], | |
| value=["Drums", "Bass", "Synths"], | |
| label="Generate Loops For" | |
| ) | |
| enable_vocal_chops = gr.Checkbox(value=True, label="Generate Vocal Chops (vocals only)") | |
| gr.Markdown("### 4. Loop Engine Settings") | |
| loops_per_stem = gr.Slider(1, 40, value=12, step=1, label="Loops per Stem (selected loop stems)") | |
| bar_lengths = gr.CheckboxGroup( | |
| choices=["1", "2", "4", "8"], | |
| value=["4", "8"], | |
| label="Bar Lengths (4/4)" | |
| ) | |
| hop_bars = gr.Slider(1, 8, value=1, step=1, label="Hop (bars between starts)") | |
| top_k = gr.Slider(0, 200, value=30, step=1, label="Top-K candidates per stem (0 = no filter)") | |
| min_bar_gap = gr.Slider(0, 16, value=4, step=1, label="Min bar gap (de-dup spacing)") | |
| fade_ms = gr.Slider(0, 50, value=12, step=1, label="Click-safety fade (ms)") | |
| loop_seam = gr.Checkbox(value=True, label="Loop-safe seam (crossfade ends)") | |
| seam_ms = gr.Slider(0, 80, value=20, step=1, label="Loop seam crossfade (ms)") | |
| loudness_mode = gr.Dropdown( | |
| choices=["none", "peak", "rms"], | |
| value="none", | |
| label="Loudness mode" | |
| ) | |
| target_dbfs = gr.Slider(-24, -8, value=-14, step=1, label="Target RMS dBFS (only for rms mode)") | |
| gr.Markdown("### 5. Vocals: Real Chop Mode") | |
| vocal_chop_mode = gr.Dropdown( | |
| choices=[("Silence chops", "silence"), | |
| ("Onset chops", "onset"), | |
| ("Grid chops (BPM)", "grid")], | |
| value="grid", | |
| label="Vocal Chop Mode" | |
| ) | |
| vocal_grid_size = gr.Dropdown( | |
| choices=[("Half beat", "half"), | |
| ("1 beat", "1beat"), | |
| ("2 beats", "2beat"), | |
| ("1 bar", "1bar")], | |
| value="1beat", | |
| label="Grid Chop Size" | |
| ) | |
| vocal_max_chops = gr.Slider(4, 160, value=64, step=1, label="Max vocal chops to export") | |
| vocal_min_ms = gr.Slider(40, 500, value=120, step=10, label="Min chop length (ms)") | |
| vocal_max_ms = gr.Slider(200, 4000, value=1500, step=50, label="Max chop length (ms)") | |
| vocal_silence_thresh_db = gr.Slider(-60, -10, value=-35, step=1, label="Silence threshold (dBFS, silence mode)") | |
| vocal_min_silence_len_ms = gr.Slider(60, 800, value=140, step=10, label="Min silence length (ms, silence mode)") | |
| btn_package = gr.Button("📦 Phase 2: Package & Export", variant="primary") | |
| with gr.Column(scale=1): | |
| gr.Markdown("### 6. Final Output") | |
| out_zip = gr.File(label="Download Pack (ZIP)") | |
| out_video = gr.Video(label="Promo Video") | |
| # Events | |
| btn_analyze.click( | |
| fn=analyze_and_separate, | |
| inputs=[input_file, input_url, stem_mode, manual_bpm], | |
| outputs=[ | |
| p_drums, p_bass, p_guitar, p_piano, p_other, p_vocals, | |
| stored_bpm, stored_folder, stored_mode, | |
| export_stems, loop_stems # Targeted updates for checkboxes | |
| ], | |
| ) | |
| btn_package.click( | |
| fn=package_and_export, | |
| inputs=[ | |
| stored_folder, | |
| stored_bpm, | |
| stored_mode, | |
| input_art, | |
| export_stems, | |
| loop_stems, | |
| enable_vocal_chops, | |
| loops_per_stem, | |
| bar_lengths, | |
| hop_bars, | |
| top_k, | |
| fade_ms, | |
| loop_seam, | |
| seam_ms, | |
| min_bar_gap, | |
| loudness_mode, | |
| target_dbfs, | |
| vocal_chop_mode, | |
| vocal_grid_size, | |
| vocal_max_chops, | |
| vocal_min_ms, | |
| vocal_max_ms, | |
| vocal_silence_thresh_db, | |
| vocal_min_silence_len_ms, | |
| ], | |
| outputs=[out_zip, out_video], | |
| ) | |
| if __name__ == "__main__": | |
| app.launch() |