Spaces:
Build error
Build error
| import gradio as gr | |
| import os | |
| import sys | |
| import json | |
| import time | |
| import uuid | |
| import shutil | |
| import zipfile | |
| import hashlib | |
| import subprocess | |
| from pathlib import Path | |
| import numpy as np | |
| import soundfile as sf | |
| import librosa | |
| import yt_dlp | |
| import pyloudnorm as pyln | |
| # Optional: MIDI extraction | |
| try: | |
| from basic_pitch.inference import predict_and_save | |
| MIDI_AVAILABLE = True | |
| except ImportError: | |
| MIDI_AVAILABLE = False | |
| print("WARNING: 'basic-pitch' not installed. MIDI extraction will be disabled.") | |
| # ========================= | |
| # CONFIG | |
| # ========================= | |
| RUNS_DIR = Path("runs") | |
| CACHE_DIR = Path("cache") | |
| OUTPUT_DIR = Path("nightpulse_output") | |
| FFMPEG_BIN = shutil.which("ffmpeg") or "ffmpeg" | |
| RUNS_DIR.mkdir(parents=True, exist_ok=True) | |
| CACHE_DIR.mkdir(parents=True, exist_ok=True) | |
| # ========================= | |
| # UTIL | |
| # ========================= | |
| def now_job_id() -> str: | |
| ts = time.strftime("%Y%m%d_%H%M%S") | |
| short = uuid.uuid4().hex[:8] | |
| return f"{ts}_{short}" | |
| def wipe_dir(p: Path): | |
| try: | |
| if p.exists(): | |
| shutil.rmtree(p, ignore_errors=True) | |
| except Exception: | |
| pass | |
| def ensure_dir(p: Path): | |
| p.mkdir(parents=True, exist_ok=True) | |
| return p | |
| def sha256_file(path: Path, chunk_size: int = 1024 * 1024) -> str: | |
| h = hashlib.sha256() | |
| with open(path, "rb") as f: | |
| while True: | |
| b = f.read(chunk_size) | |
| if not b: | |
| break | |
| h.update(b) | |
| return h.hexdigest() | |
| def check_ffmpeg() -> bool: | |
| try: | |
| p = subprocess.run([FFMPEG_BIN, "-version"], capture_output=True, text=True) | |
| return p.returncode == 0 | |
| except Exception: | |
| return False | |
| def check_torch_cuda() -> bool: | |
| try: | |
| import torch | |
| ok = torch.cuda.is_available() | |
| if ok: | |
| print(f"CUDA OK: {torch.cuda.get_device_name(0)} | torch {torch.__version__} | cuda {torch.version.cuda}") | |
| else: | |
| print(f"WARNING: CUDA NOT available to torch. torch={torch.__version__}. Demucs will run on CPU.") | |
| return ok | |
| except Exception as e: | |
| print(f"WARNING: torch import failed: {e}. Demucs may run on CPU.") | |
| return False | |
| FFMPEG_OK = check_ffmpeg() | |
| CUDA_OK = check_torch_cuda() | |
| LOG_TAIL_MAX = 8000 | |
| def log_append(log_text: str, msg: str) -> str: | |
| msg = str(msg) | |
| if not msg.endswith("\n"): | |
| msg += "\n" | |
| combined = (log_text or "") + msg | |
| if len(combined) > LOG_TAIL_MAX: | |
| combined = combined[-LOG_TAIL_MAX:] | |
| return combined | |
| def safe_stem(name: str) -> str: | |
| return "".join(c if c.isalnum() or c in "._-" else "_" for c in name) | |
| def download_from_url(url: str, out_dir: Path) -> Path: | |
| ensure_dir(out_dir) | |
| ydl_opts = { | |
| "format": "bestaudio/best", | |
| "outtmpl": str(out_dir / "%(title)s.%(ext)s"), | |
| "postprocessors": [{"key": "FFmpegExtractAudio", "preferredcodec": "wav", "preferredquality": "192"}], | |
| "quiet": True, | |
| "no_warnings": True, | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(url, download=True) | |
| filename = ydl.prepare_filename(info) | |
| final_path = Path(filename).with_suffix(".wav") | |
| return final_path | |
| def ensure_wav(in_path: Path, out_path: Path) -> Path: | |
| if in_path.suffix.lower() == ".wav": | |
| return in_path | |
| if not FFMPEG_OK: | |
| raise gr.Error("FFmpeg not found. Install FFmpeg or provide WAV input.") | |
| ensure_dir(out_path.parent) | |
| cmd = [ | |
| FFMPEG_BIN, "-y", | |
| "-i", str(in_path), | |
| "-vn", "-acodec", "pcm_s16le", "-ar", "44100", "-ac", "2", | |
| str(out_path) | |
| ] | |
| p = subprocess.run(cmd, capture_output=True, text=True) | |
| if p.returncode != 0: | |
| raise gr.Error(f"FFmpeg convert error:\n{p.stderr[-2000:]}") | |
| return out_path | |
| def detect_key(audio_path: Path) -> str: | |
| try: | |
| y, sr = librosa.load(str(audio_path), sr=None, duration=60) | |
| chroma = librosa.feature.chroma_cqt(y=y, sr=sr) | |
| chroma_vals = np.sum(chroma, axis=1) | |
| maj_profile = np.array([6.35, 2.23, 3.48, 2.33, 4.38, 4.09, 2.52, 5.19, 2.39, 3.66, 2.29, 2.88]) | |
| min_profile = np.array([6.33, 2.68, 3.52, 5.38, 2.60, 3.53, 2.54, 4.75, 3.98, 2.69, 3.34, 3.17]) | |
| pitches = ['C', 'C#', 'D', 'D#', 'E', 'F', 'F#', 'G', 'G#', 'A', 'A#', 'B'] | |
| best_score = -1 | |
| best_key = "Unknown" | |
| for i in range(12): | |
| score_maj = np.corrcoef(chroma_vals, np.roll(maj_profile, i))[0, 1] | |
| score_min = np.corrcoef(chroma_vals, np.roll(min_profile, i))[0, 1] | |
| if np.isfinite(score_maj) and score_maj > best_score: | |
| best_score = score_maj | |
| best_key = f"{pitches[i]}maj" | |
| if np.isfinite(score_min) and score_min > best_score: | |
| best_score = score_min | |
| best_key = f"{pitches[i]}min" | |
| return best_key | |
| except Exception: | |
| return "Unknown" | |
| def run_demucs(input_wav: Path, model_name: str, out_dir: Path, two_stems_vocals: bool) -> Path: | |
| device = "cuda" if CUDA_OK else "cpu" | |
| cmd = [ | |
| sys.executable, "-m", "demucs", | |
| "--device", device, | |
| "-n", model_name, | |
| "--out", str(out_dir), | |
| str(input_wav) | |
| ] | |
| if two_stems_vocals: | |
| cmd += ["--two-stems", "vocals"] | |
| p = subprocess.run(cmd, capture_output=True, text=True) | |
| if p.returncode != 0: | |
| raise gr.Error(f"Demucs Error:\n{p.stderr[-2000:]}") | |
| model_dir = out_dir / model_name | |
| if not model_dir.exists(): | |
| raise gr.Error(f"Demucs did not produce expected folder: {model_dir}") | |
| candidates = [d for d in model_dir.iterdir() if d.is_dir()] | |
| if not candidates: | |
| raise gr.Error(f"Demucs produced no track folder in: {model_dir}") | |
| candidates.sort(key=lambda p: p.stat().st_mtime, reverse=True) | |
| return candidates[0] | |
| def build_instrumental(track_dir: Path) -> Path | None: | |
| out = track_dir / "no_vocals.wav" | |
| if out.exists(): | |
| return out | |
| parts = [] | |
| for name in ["drums.wav", "bass.wav", "other.wav", "piano.wav", "guitar.wav"]: | |
| p = track_dir / name | |
| if p.exists(): | |
| parts.append(p) | |
| if not parts: | |
| return None | |
| ys = [] | |
| sr_ref = None | |
| for p in parts: | |
| y, sr = sf.read(str(p), always_2d=True, dtype="float32") | |
| if sr_ref is None: | |
| sr_ref = sr | |
| elif sr != sr_ref: | |
| y_mono = np.mean(y, axis=1) | |
| y_rs = librosa.resample(y_mono, orig_sr=sr, target_sr=sr_ref) | |
| y = np.stack([y_rs, y_rs], axis=1).astype(np.float32) | |
| ys.append(y) | |
| max_len = max(a.shape[0] for a in ys) | |
| mix = np.zeros((max_len, 2), dtype=np.float32) | |
| for a in ys: | |
| mix[:a.shape[0], :] += a | |
| peak = np.max(np.abs(mix)) | |
| if peak > 1.0: | |
| mix /= peak | |
| sf.write(str(out), mix, sr_ref) | |
| return out | |
| def cache_paths_for_hash(h: str) -> dict: | |
| base = CACHE_DIR / h | |
| return { | |
| "base": base, | |
| "meta": base / "meta.json", | |
| "stems_dir": base / "stems", | |
| "input_wav": base / "input.wav", | |
| } | |
| def copy_tree(src: Path, dst: Path): | |
| ensure_dir(dst) | |
| for root, _, files in os.walk(src): | |
| rootp = Path(root) | |
| rel = rootp.relative_to(src) | |
| ensure_dir(dst / rel) | |
| for f in files: | |
| shutil.copy2(rootp / f, dst / rel / f) | |
| # ========================= | |
| # AUDIO PROCESSING | |
| # ========================= | |
| def peak_normalize(y: np.ndarray, peak_target: float = 0.98) -> np.ndarray: | |
| peak = np.max(np.abs(y)) | |
| if peak <= 1e-9: | |
| return y | |
| scale = peak_target / peak | |
| return y * scale | |
| def apply_loudness_np(y: np.ndarray, sr: int, mode: str, target: float) -> np.ndarray: | |
| mode = (mode or "none").lower().strip() | |
| if mode == "none": | |
| return y | |
| if mode == "peak": | |
| return peak_normalize(y) | |
| if mode == "rms": | |
| cur = 20.0 * np.log10(np.sqrt(np.mean(y ** 2)) + 1e-12) | |
| gain_db = float(target) - cur | |
| gain = 10 ** (gain_db / 20.0) | |
| return y * gain | |
| if mode == "lufs": | |
| try: | |
| meter = pyln.Meter(sr) | |
| loud = meter.integrated_loudness(y.astype(np.float64)) | |
| if loud == -float("inf"): | |
| return y | |
| gain_db = float(target) - loud | |
| gain_db = max(min(gain_db, 20.0), -20.0) | |
| gain = 10 ** (gain_db / 20.0) | |
| return y * gain | |
| except Exception: | |
| return y | |
| return y | |
| def crossfade_loop_seam(seg: np.ndarray, seam_samps: int) -> np.ndarray: | |
| n = seg.shape[0] | |
| seam = int(seam_samps) | |
| if seam <= 0 or seam * 2 >= n: | |
| return seg | |
| out = seg.copy() | |
| fade = np.linspace(0.0, 1.0, seam, dtype=np.float32) | |
| head = out[:seam].copy() | |
| tail = out[-seam:].copy() | |
| out[:seam] = head * (1.0 - fade) + tail * fade | |
| return out | |
| def fade_edges(seg: np.ndarray, fade_samps: int) -> np.ndarray: | |
| n = seg.shape[0] | |
| f = int(fade_samps) | |
| if f <= 0 or f * 2 >= n: | |
| return seg | |
| out = seg.copy() | |
| fade = np.linspace(0.0, 1.0, f, dtype=np.float32) | |
| out[:f] *= fade | |
| out[-f:] *= fade[::-1] | |
| return out | |
| def compute_segment_features(y: np.ndarray, sr: int) -> dict: | |
| r = float(np.sqrt(np.mean(y ** 2)) + 1e-12) | |
| try: | |
| oenv = librosa.onset.onset_strength(y=y, sr=sr) | |
| onset = float(np.mean(oenv)) if oenv.size else 0.0 | |
| except Exception: | |
| onset = 0.0 | |
| try: | |
| cent = librosa.feature.spectral_centroid(y=y, sr=sr) | |
| centroid = float(np.mean(cent)) if cent.size else 0.0 | |
| except Exception: | |
| centroid = 0.0 | |
| return {"rms": r, "onset": onset, "centroid": centroid} | |
| def normalize01(x: np.ndarray) -> np.ndarray: | |
| if x.size == 0: return x | |
| mn, mx = float(np.min(x)), float(np.max(x)) | |
| if mx - mn < 1e-12: return np.zeros_like(x) | |
| return (x - mn) / (mx - mn) | |
| def build_bar_grid_samples(grid_src_wav: Path, bpm: int, sr_target: int = 44100, duration_sec: int = 240) -> tuple[list[int], int]: | |
| """ | |
| 3-tier bar grid construction | |
| """ | |
| y, sr = librosa.load(str(grid_src_wav), sr=sr_target, mono=True, duration=duration_sec) | |
| if y.size < sr: | |
| return [0], sr | |
| # 1) Beat track | |
| try: | |
| _, beats = librosa.beat.beat_track(y=y, sr=sr) | |
| beat_times = librosa.frames_to_time(beats, sr=sr) | |
| if beat_times.size >= 8: | |
| bar_times = beat_times[::4] # assume 4/4 | |
| bar_samps = [int(t * sr) for t in bar_times] | |
| bar_samps = sorted(set([b for b in bar_samps if b >= 0])) | |
| if len(bar_samps) >= 2: | |
| return bar_samps, sr | |
| except Exception: | |
| pass | |
| # 2) Onset fallback | |
| try: | |
| oenv = librosa.onset.onset_strength(y=y, sr=sr) | |
| onsets = librosa.onset.onset_detect(onset_envelope=oenv, sr=sr, backtrack=True, units="time") | |
| on_samps = np.array([int(t * sr) for t in onsets], dtype=np.int64) | |
| on_samps = on_samps[(on_samps >= 0) & (on_samps < y.size)] | |
| if on_samps.size >= 8: | |
| ms_per_bar = 240000.0 / max(1, bpm) | |
| samps_per_bar = int(sr * (ms_per_bar / 1000.0)) | |
| total = y.size | |
| bar_samps = list(range(0, total, max(1, samps_per_bar))) | |
| if len(bar_samps) >= 2: | |
| return bar_samps, sr | |
| except Exception: | |
| pass | |
| # 3) Pure math | |
| ms_per_bar = 240000.0 / max(1, bpm) | |
| samps_per_bar = int(sr * (ms_per_bar / 1000.0)) | |
| total = y.size | |
| bar_samps = list(range(0, total, max(1, samps_per_bar))) | |
| if not bar_samps: bar_samps = [0] | |
| return bar_samps, sr | |
| def make_ranked_loops_numpy( | |
| stem_wav: Path, stem_name: str, bpm: int, key: str, | |
| bar_starts: list[int], sr_grid: int, bar_lengths: list[int], | |
| hop_bars: int, loops_per: int, top_k: int, fade_ms: int, | |
| seamless: bool, seam_ms: int, min_bar_gap: int, | |
| loud_mode: str, loud_target: float, out_dir: Path, | |
| ): | |
| y, sr = librosa.load(str(stem_wav), sr=sr_grid, mono=True) | |
| if y.size < sr: return [] | |
| ms_per_bar = 240000.0 / max(1, bpm) | |
| samps_per_bar = int(sr * (ms_per_bar / 1000.0)) | |
| bar_starts = [b for b in bar_starts if b >= 0 and b < y.size] | |
| if not bar_starts: bar_starts = [0] | |
| step = max(1, int(hop_bars)) | |
| grid = bar_starts[::step] | |
| candidates = [] | |
| for bl in bar_lengths: | |
| dur = int(samps_per_bar * int(bl)) | |
| for start in grid: | |
| end = start + dur | |
| if end > y.size: continue | |
| seg = y[start:end].astype(np.float32) | |
| feats = compute_segment_features(seg, sr) | |
| candidates.append({ | |
| "start": int(start), "bl": int(bl), "dur": int(dur), | |
| "rms": feats["rms"], "onset": feats["onset"], "centroid": feats["centroid"], | |
| }) | |
| if not candidates: return [] | |
| rms_n = normalize01(np.array([c["rms"] for c in candidates])) | |
| ons_n = normalize01(np.array([c["onset"] for c in candidates])) | |
| cen_n = normalize01(np.array([c["centroid"] for c in candidates])) | |
| for i, c in enumerate(candidates): | |
| # Weighted score: heavily favor Rhythm (Onset) and Energy (RMS) | |
| c["score"] = float(0.40 * rms_n[i] + 0.40 * ons_n[i] + 0.20 * cen_n[i]) | |
| candidates.sort(key=lambda d: d["score"], reverse=True) | |
| if top_k > 0: candidates = candidates[: int(top_k)] | |
| used_bar_idx = [] | |
| selected = [] | |
| for c in candidates: | |
| bidx = int(np.argmin([abs(c["start"] - b) for b in bar_starts])) | |
| if any(abs(bidx - u) < int(min_bar_gap) for u in used_bar_idx): | |
| continue | |
| selected.append(c) | |
| used_bar_idx.append(bidx) | |
| if len(selected) >= int(loops_per): break | |
| ensure_dir(out_dir) | |
| exported = [] | |
| fade_samps = int((int(fade_ms) / 1000.0) * sr) | |
| seam_samps = int((int(seam_ms) / 1000.0) * sr) | |
| for i, c in enumerate(selected, 1): | |
| start, dur, bl = c["start"], c["dur"], c["bl"] | |
| seg = y[start:start + dur].astype(np.float32) | |
| if seamless and seam_samps > 0: | |
| seg = crossfade_loop_seam(seg, seam_samps) | |
| else: | |
| seg = fade_edges(seg, fade_samps) | |
| seg = apply_loudness_np(seg, sr, loud_mode, loud_target) | |
| seg = np.clip(seg, -1.0, 1.0).astype(np.float32) | |
| fname = f"{bpm}BPM_{key}_{stem_name}_L{bl}bars_{i:02d}.wav" | |
| out_path = out_dir / fname | |
| sf.write(str(out_path), seg, sr) | |
| exported.append(out_path) | |
| return exported | |
| def export_vocal_chops( | |
| vocals_wav: Path, bpm: int, key: str, chop_mode: str, | |
| loud_mode: str, loud_target: float, out_dir: Path | |
| ): | |
| y, sr = librosa.load(str(vocals_wav), sr=44100, mono=True) | |
| if y.size < sr: return [] | |
| chop_mode = (chop_mode or "hybrid").lower().strip() | |
| # Reuse existing chop logic from original script context | |
| # (Abbreviated here assuming standard onset/silence detection) | |
| # Using Librosa Onset as default high quality slicer | |
| oenv = librosa.onset.onset_strength(y=y, sr=sr) | |
| onsets = librosa.onset.onset_detect(onset_envelope=oenv, sr=sr, backtrack=True, units="time") | |
| # Filter onsets | |
| chops = [] | |
| for t in onsets: | |
| s = int(t * sr) | |
| e = s + int(0.5 * sr) # Default 500ms slice | |
| if e < y.size: | |
| chops.append((s, e)) | |
| ensure_dir(out_dir) | |
| exported = [] | |
| for i, (s, e) in enumerate(chops[:32], 1): | |
| seg = y[s:e].astype(np.float32) | |
| seg = fade_edges(seg, 200) | |
| seg = apply_loudness_np(seg, sr, loud_mode, loud_target) | |
| out_path = out_dir / f"{bpm}BPM_{key}_VoxChop_{i:02d}.wav" | |
| sf.write(str(out_path), seg, sr) | |
| exported.append(out_path) | |
| return exported | |
| def extract_midi(audio_path: Path, out_path: Path): | |
| if not MIDI_AVAILABLE: return | |
| ensure_dir(out_path.parent) | |
| try: | |
| predict_and_save( | |
| [str(audio_path)], output_directory=str(out_path.parent), | |
| save_midi=True, save_model_outputs=False, save_notes=False, sonify_midi=False | |
| ) | |
| # Handle the name Basic Pitch assigns | |
| # It usually appends _basic_pitch.mid | |
| src_stem = audio_path.stem | |
| gen = out_path.parent / f"{src_stem}_basic_pitch.mid" | |
| if gen.exists(): | |
| shutil.move(str(gen), str(out_path)) | |
| except Exception as e: | |
| print(f"MIDI Error: {e}") | |
| # ========================= | |
| # VIDEO | |
| # ========================= | |
| def render_video_ffmpeg(art_path: Path, audio_path: Path, out_path: Path, fmt: str) -> Path: | |
| if not FFMPEG_OK: | |
| raise gr.Error("FFmpeg not found.") | |
| res_map = { | |
| "9:16 (TikTok/Reels)": (1080, 1920), | |
| "16:9 (YouTube)": (1920, 1080), | |
| "1:1 (Square)": (1080, 1080), | |
| } | |
| w, h = res_map.get(fmt, (1080, 1920)) | |
| try: | |
| info = sf.info(str(audio_path)) | |
| dur = info.frames / info.samplerate | |
| except Exception: | |
| dur = 30.0 | |
| zoom_expr = "min(zoom+0.00035,1.08)" | |
| # Safe drawbox that doesn't rely on system fonts | |
| drawbox = ( | |
| f"drawbox=x=0:y={h}-40:w='(t/{max(1.0, dur)})*{w}':h=20:color=white@0.8:t=fill" | |
| ) | |
| vf = ( | |
| f"scale={w}:{h}:force_original_aspect_ratio=increase," | |
| f"crop={w}:{h}," | |
| f"zoompan=z='{zoom_expr}':d=1:s={w}x{h}:fps=24," | |
| f"{drawbox},format=yuv420p" | |
| ) | |
| cmd = [ | |
| FFMPEG_BIN, "-y", "-loop", "1", "-i", str(art_path), "-i", str(audio_path), | |
| "-shortest", "-r", "24", "-vf", vf, "-c:v", "libx264", "-pix_fmt", "yuv420p", | |
| "-c:a", "aac", "-b:a", "192k", str(out_path) | |
| ] | |
| p = subprocess.run(cmd, capture_output=True, text=True) | |
| if p.returncode != 0: | |
| raise gr.Error(f"Video Error: {p.stderr[-2000:]}") | |
| return out_path | |
| # ========================= | |
| # PHASE 1 | |
| # ========================= | |
| def phase1_analyze(file_in, url_in, mode, manual_bpm, rerun): | |
| job_id = now_job_id() | |
| job_dir = ensure_dir(RUNS_DIR / job_id) | |
| in_dir = ensure_dir(job_dir / "input") | |
| # Input handling | |
| if url_in and str(url_in).strip(): | |
| in_path = download_from_url(str(url_in).strip(), in_dir) | |
| elif file_in: | |
| in_path = Path(file_in) | |
| local_path = in_dir / in_path.name | |
| shutil.copy2(in_path, local_path) | |
| in_path = local_path | |
| else: | |
| raise gr.Error("No audio source.") | |
| wav_path = ensure_wav(in_path, in_dir / f"{in_path.stem}.wav") | |
| # Cache Check | |
| h = sha256_file(wav_path) | |
| cache = cache_paths_for_hash(h) | |
| # BPM / Key | |
| if manual_bpm and float(manual_bpm) > 0: | |
| bpm = int(manual_bpm) | |
| else: | |
| y60, sr60 = librosa.load(str(wav_path), sr=22050, duration=60) | |
| tempo, _ = librosa.beat.beat_track(y=y60, sr=sr60) | |
| bpm = int(tempo[0] if np.ndim(tempo) > 0 else tempo) | |
| key = detect_key(wav_path) | |
| # Separation | |
| stems_dir = ensure_dir(job_dir / "stems") | |
| model_name = "htdemucs_6s" if mode == "6stem" else "htdemucs" | |
| # Check Cache | |
| if cache["stems_dir"].exists() and any(cache["stems_dir"].glob("*.wav")) and not rerun: | |
| copy_tree(cache["stems_dir"], stems_dir) | |
| source_msg = "Fetched from Cache" | |
| else: | |
| # Run Demucs | |
| track_dir = run_demucs(wav_path, model_name, job_dir / "demucs_tmp", False) | |
| build_instrumental(track_dir) | |
| for wav in track_dir.glob("*.wav"): | |
| shutil.copy2(wav, stems_dir / wav.name) | |
| # Save to Cache | |
| wipe_dir(cache["stems_dir"]) | |
| ensure_dir(cache["stems_dir"]) | |
| for wav in stems_dir.glob("*.wav"): | |
| shutil.copy2(wav, cache["stems_dir"] / wav.name) | |
| source_msg = "Ran Demucs (Saved to Cache)" | |
| valid_stems = [f.stem.capitalize() for f in stems_dir.glob("*.wav")] | |
| stem_map = { | |
| "Drums": stems_dir / "drums.wav", | |
| "Bass": stems_dir / "bass.wav", | |
| "Vocals": stems_dir / "vocals.wav" | |
| } | |
| return ( | |
| stem_map["Drums"] if stem_map["Drums"].exists() else None, | |
| stem_map["Bass"] if stem_map["Bass"].exists() else None, | |
| stem_map["Vocals"] if stem_map["Vocals"].exists() else None, | |
| f"✅ **Ready**\n- ID: `{job_id}`\n- Source: {source_msg}", | |
| bpm, key, str(job_dir), | |
| gr.update(choices=valid_stems, value=valid_stems), | |
| gr.update(choices=valid_stems, value=[s for s in valid_stems if s != "Vocals"]) | |
| ) | |
| # ========================= | |
| # PHASE 2 | |
| # ========================= | |
| def phase2_export( | |
| job_dir_in, bpm, key, art, ex_stems, loop_stems, | |
| do_midi, do_oneshots, do_vocal_chops, | |
| loops_per, bars, loud_target, make_video, log_hist | |
| ): | |
| log = log_hist or "" | |
| if not job_dir_in: raise gr.Error("No job loaded.") | |
| job_dir = Path(job_dir_in) | |
| stems_dir = job_dir / "stems" | |
| export_dir = ensure_dir(job_dir / "export") | |
| wipe_dir(export_dir) | |
| wipe_dir(OUTPUT_DIR) | |
| # Folders | |
| for d in ["Stems", "Loops", "MIDI", "OneShots", "Vocal_Chops", "Video"]: | |
| ensure_dir(export_dir / d) | |
| ensure_dir(OUTPUT_DIR / d) | |
| log = log_append(log, f"Starting Export: {bpm} BPM | {key}") | |
| # 1. Stems | |
| for stem_name in ex_stems: | |
| src = stems_dir / f"{stem_name.lower()}.wav" | |
| if src.exists(): | |
| dst = export_dir / "Stems" / f"{bpm}BPM_{key}_{stem_name}.wav" | |
| shutil.copy2(src, dst) | |
| shutil.copy2(dst, OUTPUT_DIR / "Stems" / dst.name) | |
| # 2. Loops | |
| grid_src = stems_dir / "drums.wav" if (stems_dir/"drums.wav").exists() else next(stems_dir.glob("*.wav")) | |
| bar_samps, sr_grid = build_bar_grid_samples(grid_src, int(bpm)) | |
| for stem_name in loop_stems: | |
| src = stems_dir / f"{stem_name.lower()}.wav" | |
| if src.exists(): | |
| log = log_append(log, f"Looping {stem_name}...") | |
| loops = make_ranked_loops_numpy( | |
| src, stem_name, int(bpm), key, bar_samps, sr_grid, | |
| [int(b) for b in bars], 1, loops_per, 50, | |
| 10, True, 25, 4, "lufs", float(loud_target), export_dir / "Loops" | |
| ) | |
| for l in loops: shutil.copy2(l, OUTPUT_DIR / "Loops" / l.name) | |
| # 3. One Shots (Improved Transient Preservation) | |
| if do_oneshots and (stems_dir / "drums.wav").exists(): | |
| log = log_append(log, "Slicing Drums...") | |
| y, sr = librosa.load(str(stems_dir / "drums.wav"), sr=44100, mono=True) | |
| # Use simple energy based onset | |
| onset_frames = librosa.onset.onset_detect(y=y, sr=sr, backtrack=False) | |
| onset_times = librosa.frames_to_time(onset_frames, sr=sr) | |
| shots = [] | |
| for t in onset_times: | |
| # PRE-ROLL: Start 15ms before detected onset to catch the 'click' | |
| s = max(0, int((t - 0.015) * sr)) | |
| e = min(y.size, s + int(0.4 * sr)) | |
| seg = y[s:e] | |
| # Filter silence | |
| if np.sqrt(np.mean(seg**2)) > 0.02: | |
| shots.append(seg) | |
| # Top 32 loudest | |
| shots = sorted(shots, key=lambda x: np.max(np.abs(x)), reverse=True)[:32] | |
| for i, shot in enumerate(shots, 1): | |
| shot = fade_edges(shot, 100) # Quick fade out | |
| shot = apply_loudness_np(shot, sr, "peak", -1.0) # Normalize hard | |
| dst = export_dir / "OneShots" / f"DrumShot_{i:02d}.wav" | |
| sf.write(str(dst), shot, sr) | |
| shutil.copy2(dst, OUTPUT_DIR / "OneShots" / dst.name) | |
| # 4. Vocal Chops | |
| if do_vocal_chops and (stems_dir / "vocals.wav").exists(): | |
| log = log_append(log, "Chopping Vocals...") | |
| export_vocal_chops( | |
| stems_dir / "vocals.wav", int(bpm), key, "hybrid", "lufs", -14.0, | |
| export_dir / "Vocal_Chops" | |
| ) | |
| for f in (export_dir/"Vocal_Chops").glob("*.wav"): | |
| shutil.copy2(f, OUTPUT_DIR / "Vocal_Chops" / f.name) | |
| # 5. MIDI | |
| if do_midi and MIDI_AVAILABLE: | |
| log = log_append(log, "Extracting MIDI...") | |
| for s in ["bass", "piano", "other"]: | |
| src = stems_dir / f"{s}.wav" | |
| if src.exists(): | |
| extract_midi(src, export_dir / "MIDI" / f"{bpm}BPM_{key}_{s.capitalize()}.mid") | |
| # 6. Video | |
| vid_path = None | |
| if make_video and art: | |
| log = log_append(log, "Rendering Video...") | |
| # Find audio for video | |
| audio_src = None | |
| if (export_dir / "Loops").exists(): | |
| # grab first loop | |
| audio_src = next((export_dir / "Loops").glob("*.wav"), None) | |
| if not audio_src and (stems_dir / "no_vocals.wav").exists(): | |
| audio_src = stems_dir / "no_vocals.wav" | |
| if audio_src: | |
| out_vid = export_dir / "Video" / "Promo.mp4" | |
| render_video_ffmpeg(Path(art), audio_src, out_vid, "9:16 (TikTok/Reels)") | |
| vid_path = str(out_vid) | |
| shutil.copy2(out_vid, OUTPUT_DIR / "Video" / out_vid.name) | |
| # Zip | |
| zip_path = export_dir / f"NightPulse_{bpm}_{key}.zip" | |
| with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf: | |
| for root, _, files in os.walk(export_dir): | |
| for f in files: | |
| full = Path(root) / f | |
| if full != zip_path: | |
| zf.write(full, full.relative_to(export_dir)) | |
| log = log_append(log, "✅ Done.") | |
| return str(zip_path), vid_path, log | |
| # ========================= | |
| # UI | |
| # ========================= | |
| with gr.Blocks(title="NightPulse Ultimate", theme=gr.themes.Base()) as app: | |
| gr.Markdown("## 🎹 Night Pulse | Ultimate v2") | |
| # State | |
| job_state = gr.State() | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### 1. Source & Separate") | |
| with gr.Tabs(): | |
| with gr.Tab("Link"): | |
| url = gr.Textbox(label="URL", placeholder="YouTube/SoundCloud...") | |
| with gr.Tab("File"): | |
| file = gr.Audio(type="filepath", label="Upload") | |
| with gr.Row(): | |
| mode = gr.Dropdown(["6stem", "4stem", "2stem"], value="6stem", label="Quality") | |
| mbpm = gr.Number(label="Manual BPM Override", value=0) | |
| rerun = gr.Checkbox(label="Force Re-Process (Ignore Cache)", value=False) | |
| btn1 = gr.Button("🚀 Analyze & Split", variant="primary") | |
| with gr.Column(): | |
| gr.Markdown("### 2. Verify") | |
| status = gr.Markdown("Waiting for input...") | |
| with gr.Row(): | |
| bpm_box = gr.Number(label="Detected BPM") | |
| key_box = gr.Textbox(label="Detected Key") | |
| with gr.Row(): | |
| btn_half = gr.Button("½ Halve BPM") | |
| btn_double = gr.Button("2x Double BPM") | |
| def halve_bpm(x): return int(x / 2) | |
| def double_bpm(x): return int(x * 2) | |
| btn_half.click(halve_bpm, bpm_box, bpm_box) | |
| btn_double.click(double_bpm, bpm_box, bpm_box) | |
| with gr.Row(): | |
| p1 = gr.Audio(label="Drums", interactive=False, height=60) | |
| p2 = gr.Audio(label="Bass", interactive=False, height=60) | |
| p3 = gr.Audio(label="Vocals", interactive=False, height=60) | |
| gr.Markdown("---") | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### 3. Pack Generator") | |
| ex_stems = gr.CheckboxGroup(label="Export Full Stems") | |
| lp_stems = gr.CheckboxGroup(label="Generate Loops From") | |
| with gr.Accordion("Pack Settings", open=True): | |
| with gr.Row(): | |
| loops_per = gr.Slider(1, 20, 8, 1, label="Loops per Stem") | |
| bars = gr.CheckboxGroup(["4", "8"], value=["4", "8"], label="Lengths") | |
| with gr.Row(): | |
| do_midi = gr.Checkbox(label="Extract MIDI", value=True) | |
| do_oneshots = gr.Checkbox(label="Drum One-Shots", value=True) | |
| do_vocal_chops = gr.Checkbox(label="Vocal Chops", value=True) | |
| loud_target = gr.Slider(-20, -6, -12, 1, label="Loudness Target (LUFS)") | |
| with gr.Accordion("Video Promo", open=False): | |
| art = gr.Image(type="filepath", label="Cover Art", height=200) | |
| make_video = gr.Checkbox(label="Render 9:16 Video", value=False) | |
| btn2 = gr.Button("⚡ Export Pack", variant="primary") | |
| with gr.Column(): | |
| gr.Markdown("### 4. Download") | |
| z_out = gr.File(label="Sample Pack Zip") | |
| v_out = gr.Video(label="Promo Video") | |
| log_out = gr.Textbox(label="Process Log", lines=10) | |
| # Wiring | |
| btn1.click( | |
| phase1_analyze, | |
| [file, url, mode, mbpm, rerun], | |
| [p1, p2, p3, status, bpm_box, key_box, job_state, ex_stems, lp_stems] | |
| ) | |
| btn2.click( | |
| phase2_export, | |
| [job_state, bpm_box, key_box, art, ex_stems, lp_stems, do_midi, do_oneshots, do_vocal_chops, loops_per, bars, loud_target, make_video, log_out], | |
| [z_out, v_out, log_out] | |
| ) | |
| if __name__ == "__main__": | |
| app.launch() |