Spaces:
Sleeping
Sleeping
| # yt_separator.py | |
| # pip install yt-dlp demucs pydub (ffmpeg required) | |
| import os | |
| import re | |
| import shutil | |
| import subprocess | |
| import sys | |
| from typing import Any, cast | |
| import filetype # type: ignore | |
| import yt_dlp | |
| from pydub import AudioSegment | |
| def _emit_progress(progress_callback, message): | |
| if progress_callback is not None: | |
| progress_callback(message) | |
| def sanitize_job_id(name): | |
| return re.sub(r"[^A-Za-z0-9_-]+", "_", name).strip("_") or "uploaded_audio" | |
| def detect_audio_format(filepath): | |
| """Detect the true audio format of *filepath* via magic bytes. | |
| Returns one of ``'mp3'``, ``'m4a'``, ``'wav'``, ``'ogg'``, ``'flac'``, | |
| or ``None`` when the file cannot be identified. | |
| """ | |
| kind = filetype.detect(filepath) # type: ignore[no-untyped-call] | |
| if kind is None: | |
| return None | |
| mime = kind.mime # e.g. 'audio/mpeg', 'audio/mp4', 'audio/wav' | |
| if 'mpeg' in mime: | |
| return 'mp3' | |
| if 'mp4' in mime or 'm4a' in mime: | |
| return 'm4a' | |
| if 'wav' in mime: | |
| return 'wav' | |
| if 'ogg' in mime: | |
| return 'ogg' | |
| if 'flac' in mime: | |
| return 'flac' | |
| return None | |
| def get_title(url_or_id): | |
| with yt_dlp.YoutubeDL({"quiet": True, "no_warnings": True}) as ydl: | |
| try: | |
| info = ydl.extract_info(url_or_id, download=False) or {} | |
| except Exception: | |
| return "" | |
| return info.get("title") or info.get("id") or "" | |
| cookie_path = os.path.join(os.path.dirname(__file__), 'cookies.txt') | |
| def download_audio(url, job_id, progress_callback=None): | |
| temp_dir = 'separated' | |
| os.makedirs(temp_dir, exist_ok=True) | |
| _emit_progress(progress_callback, 'Downloading audio from YouTube...') | |
| compat_opts = ['no-youtube-unavailable-videoplayback'] | |
| if shutil.which('deno') is None: | |
| print("⚠️ Deno not found.") | |
| compat_opts.append('no-youtube-js') | |
| is_hf = bool(os.getenv("SPACE_ID") or os.getenv("HF_SPACE") or os.path.exists("/.dockerenv")) | |
| ydl_opts = { | |
| 'format': 'bestaudio/best', | |
| 'outtmpl': os.path.join(temp_dir, f'{job_id}.%(ext)s'), | |
| 'postprocessors': [{'key': 'FFmpegExtractAudio', 'preferredcodec': 'wav'}], | |
| 'keepvideo': True, | |
| 'quiet': False, | |
| 'no_warnings': False, | |
| 'nocheckcertificate': True, | |
| 'extractor_args': {'youtube': {'player_client':['web', 'android', 'ios']}}, | |
| 'http_headers': {'Referer': 'https://www.youtube.com/'}, | |
| 'socket_timeout': 60, | |
| 'retries': 10, | |
| 'compat_opts': compat_opts, | |
| } | |
| if is_hf: | |
| ydl_opts['impersonate'] = 'chrome' | |
| if is_hf or os.path.exists(cookie_path): | |
| ydl_opts['cookiefile'] = cookie_path | |
| ydl_opts['nocheckcertificate'] = True | |
| else: | |
| ydl_opts['cookiesfrombrowser'] = ('chrome', None, None, None) | |
| # with yt_dlp.YoutubeDL({**ydl_opts, 'quiet': True}) as ydl: | |
| # info = ydl.extract_info(url, download=False) | |
| # print(ydl.list_formats(info)) | |
| # audio = [f for f in info.get('formats', []) | |
| # if f.get('ext') in ('webm','m4a','mp4','opus') and f.get('acodec') != 'none'] | |
| # if not audio: | |
| # _emit_progress(progress_callback, "No webm/mp4 audio available") | |
| # return None | |
| # _emit_progress(progress_callback, f"Found {len(audio)} audio formats") | |
| with yt_dlp.YoutubeDL(cast(Any, ydl_opts)) as ydl: | |
| ydl.download([url]) | |
| _emit_progress(progress_callback, 'Converting downloaded audio to WAV...') | |
| return os.path.join(temp_dir, f'{job_id}.wav') | |
| def separate_tracks_old(input_wav, job_id, progress_callback=None): | |
| if not os.path.exists(input_wav): | |
| raise FileNotFoundError(f"{input_wav} does not exist") | |
| output_dir = 'separated' | |
| _emit_progress(progress_callback, 'Validating input audio...') | |
| _emit_progress(progress_callback, 'Separating tracks with Demucs...') | |
| subprocess.run(['demucs', '-n', 'htdemucs_6s', '--mp3', '--out', output_dir, input_wav], check=True) | |
| _emit_progress(progress_callback, 'Demucs separation complete. Loading output stems...') | |
| base = os.path.join('.', output_dir, 'htdemucs_6s', job_id) | |
| drums = f'{base}/drums.mp3' | |
| vocals = f'{base}/vocals.mp3' | |
| bass = f'{base}/bass.mp3' | |
| guitar = f'{base}/guitar.mp3' | |
| piano = f'{base}/piano.mp3' | |
| other = f'{base}/other.mp3' | |
| _emit_progress(progress_callback, 'Creating combined music stem from bass and other...') | |
| music = AudioSegment.from_mp3(bass).overlay(AudioSegment.from_mp3(other)) | |
| music_path = os.path.join(base, 'music.mp3') | |
| music.export(music_path, format="mp3") | |
| _emit_progress(progress_callback, f'Combined music stem saved to {music_path}.') | |
| full_path = os.path.join(base, 'full.mp3') | |
| # Detect true file type by magic bytes and convert using pydub/ffmpeg | |
| try: | |
| _emit_progress(progress_callback, 'Exporting full mix to full.mp3...') | |
| kind = None | |
| try: | |
| kind = filetype.guess(input_wav) | |
| except Exception: | |
| kind = None | |
| if kind and getattr(kind, 'extension', None): | |
| src = AudioSegment.from_file(input_wav, format=kind.extension) | |
| else: | |
| src = AudioSegment.from_file(input_wav) | |
| src.export(full_path, format="mp3") | |
| _emit_progress(progress_callback, f'Full mix saved to {full_path}.') | |
| except Exception as exc: | |
| # Last-resort: try ffmpeg autodetect | |
| _emit_progress(progress_callback, f"Warning: conversion via detected format failed: {exc}. Trying autodetect.") | |
| src = AudioSegment.from_file(input_wav) | |
| src.export(full_path, format="mp3") | |
| _emit_progress(progress_callback, f'Full mix saved to {full_path}.') | |
| _emit_progress(progress_callback, 'Cleaning up temporary input audio...') | |
| os.remove(input_wav) | |
| _emit_progress(progress_callback, 'Separation complete.') | |
| # Include the full mix exported as full.mp3 in the returned outputs | |
| return drums, vocals, guitar, bass, other, piano, music_path, full_path | |
| def separate_tracks(input_wav, job_id, progress_callback=None): | |
| if not os.path.exists(input_wav): | |
| raise FileNotFoundError(f"{input_wav} does not exist") | |
| def _emit_progress_event(value, desc): | |
| if progress_callback is not None: | |
| progress_callback(desc) | |
| return ("progress", {"value": value, "desc": desc}) | |
| yield _emit_progress_event(0.0, "Validating input audio...") | |
| yield _emit_progress_event(0.05, "Starting Demucs separation (htdemucs_6s)...") | |
| output_dir = 'separated' | |
| # Run Demucs with live output capture | |
| proc = subprocess.Popen([ | |
| 'demucs', '-n', 'htdemucs_6s', '--mp3', '--out', output_dir, input_wav | |
| ], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1) | |
| progress_pattern = re.compile(r'(\d+)%\|') | |
| last_percent = None | |
| # Read progress in real time | |
| for line in proc.stdout or []: | |
| line = line.strip() | |
| if line: | |
| match = progress_pattern.search(line) | |
| if match: | |
| percent = int(match.group(1)) | |
| if percent != last_percent: | |
| last_percent = percent | |
| yield _emit_progress_event( | |
| 0.1 + (percent / 100.0) * 0.7, | |
| f"Demucs progress: {percent}%", | |
| ) | |
| elif "Separating track" in line: | |
| yield _emit_progress_event(0.1, "Demucs: Starting separation...") | |
| proc.wait() | |
| if proc.returncode != 0: | |
| raise RuntimeError("Demucs failed") | |
| yield _emit_progress_event(0.82, "Demucs separation complete. Loading stems...") | |
| base = os.path.join(output_dir, 'htdemucs_6s', job_id) | |
| drums = f'{base}/drums.mp3' | |
| vocals = f'{base}/vocals.mp3' | |
| bass = f'{base}/bass.mp3' | |
| guitar = f'{base}/guitar.mp3' | |
| piano = f'{base}/piano.mp3' | |
| other = f'{base}/other.mp3' | |
| yield _emit_progress_event(0.88, "Creating music stem (bass + other)...") | |
| music = AudioSegment.from_mp3(bass).overlay(AudioSegment.from_mp3(other)) | |
| music_path = os.path.join(base, 'music.mp3') | |
| music.export(music_path, format="mp3") | |
| # Full mix export (rest of your code) | |
| full_path = os.path.join(base, 'full.mp3') | |
| yield _emit_progress_event(0.94, "Exporting full mix...") | |
| src = AudioSegment.from_file(input_wav) | |
| src.export(full_path, format="mp3") | |
| yield _emit_progress_event(0.98, "Cleaning up...") | |
| os.remove(input_wav) | |
| yield _emit_progress_event(1.0, "Separation complete.") | |
| yield ("result", (drums, vocals, guitar, bass, other, piano, music_path, full_path)) | |
| def separate_tracks_sync(input_wav, job_id, progress_callback=None): | |
| result = None | |
| for event_type, payload in separate_tracks(input_wav, job_id, progress_callback=progress_callback): | |
| if event_type == "result": | |
| result = payload | |
| if result is None: | |
| raise RuntimeError("separate_tracks did not produce a result") | |
| return result | |
| def main(): | |
| video_id = input("enter youtube video id: ") | |
| url = f"https://www.youtube.com/watch?v={video_id}" | |
| try: | |
| title = get_title(url) | |
| job_id = sanitize_job_id(title or video_id) | |
| wav = download_audio(url, job_id) | |
| d, v, g, b, o, p, m, full = separate_tracks_sync(wav, job_id) | |
| print(d, v, g, b, o, p, m, full) | |
| except Exception as exc: | |
| print(exc) | |
| if __name__ == "__main__": | |
| main() | |