SeparateTracks / modules /yt_audio_get_tracks.py
Surn's picture
Improve progress reporting for audio separation
1e4f3ac
# yt_separator.py
# pip install yt-dlp demucs pydub (ffmpeg required)
import os
import re
import shutil
import subprocess
import sys
from typing import Any, cast
import filetype # type: ignore
import yt_dlp
from pydub import AudioSegment
def _emit_progress(progress_callback, message):
if progress_callback is not None:
progress_callback(message)
def sanitize_job_id(name):
return re.sub(r"[^A-Za-z0-9_-]+", "_", name).strip("_") or "uploaded_audio"
def detect_audio_format(filepath):
"""Detect the true audio format of *filepath* via magic bytes.
Returns one of ``'mp3'``, ``'m4a'``, ``'wav'``, ``'ogg'``, ``'flac'``,
or ``None`` when the file cannot be identified.
"""
kind = filetype.detect(filepath) # type: ignore[no-untyped-call]
if kind is None:
return None
mime = kind.mime # e.g. 'audio/mpeg', 'audio/mp4', 'audio/wav'
if 'mpeg' in mime:
return 'mp3'
if 'mp4' in mime or 'm4a' in mime:
return 'm4a'
if 'wav' in mime:
return 'wav'
if 'ogg' in mime:
return 'ogg'
if 'flac' in mime:
return 'flac'
return None
def get_title(url_or_id):
with yt_dlp.YoutubeDL({"quiet": True, "no_warnings": True}) as ydl:
try:
info = ydl.extract_info(url_or_id, download=False) or {}
except Exception:
return ""
return info.get("title") or info.get("id") or ""
cookie_path = os.path.join(os.path.dirname(__file__), 'cookies.txt')
def download_audio(url, job_id, progress_callback=None):
temp_dir = 'separated'
os.makedirs(temp_dir, exist_ok=True)
_emit_progress(progress_callback, 'Downloading audio from YouTube...')
compat_opts = ['no-youtube-unavailable-videoplayback']
if shutil.which('deno') is None:
print("⚠️ Deno not found.")
compat_opts.append('no-youtube-js')
is_hf = bool(os.getenv("SPACE_ID") or os.getenv("HF_SPACE") or os.path.exists("/.dockerenv"))
ydl_opts = {
'format': 'bestaudio/best',
'outtmpl': os.path.join(temp_dir, f'{job_id}.%(ext)s'),
'postprocessors': [{'key': 'FFmpegExtractAudio', 'preferredcodec': 'wav'}],
'keepvideo': True,
'quiet': False,
'no_warnings': False,
'nocheckcertificate': True,
'extractor_args': {'youtube': {'player_client':['web', 'android', 'ios']}},
'http_headers': {'Referer': 'https://www.youtube.com/'},
'socket_timeout': 60,
'retries': 10,
'compat_opts': compat_opts,
}
if is_hf:
ydl_opts['impersonate'] = 'chrome'
if is_hf or os.path.exists(cookie_path):
ydl_opts['cookiefile'] = cookie_path
ydl_opts['nocheckcertificate'] = True
else:
ydl_opts['cookiesfrombrowser'] = ('chrome', None, None, None)
# with yt_dlp.YoutubeDL({**ydl_opts, 'quiet': True}) as ydl:
# info = ydl.extract_info(url, download=False)
# print(ydl.list_formats(info))
# audio = [f for f in info.get('formats', [])
# if f.get('ext') in ('webm','m4a','mp4','opus') and f.get('acodec') != 'none']
# if not audio:
# _emit_progress(progress_callback, "No webm/mp4 audio available")
# return None
# _emit_progress(progress_callback, f"Found {len(audio)} audio formats")
with yt_dlp.YoutubeDL(cast(Any, ydl_opts)) as ydl:
ydl.download([url])
_emit_progress(progress_callback, 'Converting downloaded audio to WAV...')
return os.path.join(temp_dir, f'{job_id}.wav')
def separate_tracks_old(input_wav, job_id, progress_callback=None):
if not os.path.exists(input_wav):
raise FileNotFoundError(f"{input_wav} does not exist")
output_dir = 'separated'
_emit_progress(progress_callback, 'Validating input audio...')
_emit_progress(progress_callback, 'Separating tracks with Demucs...')
subprocess.run(['demucs', '-n', 'htdemucs_6s', '--mp3', '--out', output_dir, input_wav], check=True)
_emit_progress(progress_callback, 'Demucs separation complete. Loading output stems...')
base = os.path.join('.', output_dir, 'htdemucs_6s', job_id)
drums = f'{base}/drums.mp3'
vocals = f'{base}/vocals.mp3'
bass = f'{base}/bass.mp3'
guitar = f'{base}/guitar.mp3'
piano = f'{base}/piano.mp3'
other = f'{base}/other.mp3'
_emit_progress(progress_callback, 'Creating combined music stem from bass and other...')
music = AudioSegment.from_mp3(bass).overlay(AudioSegment.from_mp3(other))
music_path = os.path.join(base, 'music.mp3')
music.export(music_path, format="mp3")
_emit_progress(progress_callback, f'Combined music stem saved to {music_path}.')
full_path = os.path.join(base, 'full.mp3')
# Detect true file type by magic bytes and convert using pydub/ffmpeg
try:
_emit_progress(progress_callback, 'Exporting full mix to full.mp3...')
kind = None
try:
kind = filetype.guess(input_wav)
except Exception:
kind = None
if kind and getattr(kind, 'extension', None):
src = AudioSegment.from_file(input_wav, format=kind.extension)
else:
src = AudioSegment.from_file(input_wav)
src.export(full_path, format="mp3")
_emit_progress(progress_callback, f'Full mix saved to {full_path}.')
except Exception as exc:
# Last-resort: try ffmpeg autodetect
_emit_progress(progress_callback, f"Warning: conversion via detected format failed: {exc}. Trying autodetect.")
src = AudioSegment.from_file(input_wav)
src.export(full_path, format="mp3")
_emit_progress(progress_callback, f'Full mix saved to {full_path}.')
_emit_progress(progress_callback, 'Cleaning up temporary input audio...')
os.remove(input_wav)
_emit_progress(progress_callback, 'Separation complete.')
# Include the full mix exported as full.mp3 in the returned outputs
return drums, vocals, guitar, bass, other, piano, music_path, full_path
def separate_tracks(input_wav, job_id, progress_callback=None):
if not os.path.exists(input_wav):
raise FileNotFoundError(f"{input_wav} does not exist")
def _emit_progress_event(value, desc):
if progress_callback is not None:
progress_callback(desc)
return ("progress", {"value": value, "desc": desc})
yield _emit_progress_event(0.0, "Validating input audio...")
yield _emit_progress_event(0.05, "Starting Demucs separation (htdemucs_6s)...")
output_dir = 'separated'
# Run Demucs with live output capture
proc = subprocess.Popen([
'demucs', '-n', 'htdemucs_6s', '--mp3', '--out', output_dir, input_wav
], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1)
progress_pattern = re.compile(r'(\d+)%\|')
last_percent = None
# Read progress in real time
for line in proc.stdout or []:
line = line.strip()
if line:
match = progress_pattern.search(line)
if match:
percent = int(match.group(1))
if percent != last_percent:
last_percent = percent
yield _emit_progress_event(
0.1 + (percent / 100.0) * 0.7,
f"Demucs progress: {percent}%",
)
elif "Separating track" in line:
yield _emit_progress_event(0.1, "Demucs: Starting separation...")
proc.wait()
if proc.returncode != 0:
raise RuntimeError("Demucs failed")
yield _emit_progress_event(0.82, "Demucs separation complete. Loading stems...")
base = os.path.join(output_dir, 'htdemucs_6s', job_id)
drums = f'{base}/drums.mp3'
vocals = f'{base}/vocals.mp3'
bass = f'{base}/bass.mp3'
guitar = f'{base}/guitar.mp3'
piano = f'{base}/piano.mp3'
other = f'{base}/other.mp3'
yield _emit_progress_event(0.88, "Creating music stem (bass + other)...")
music = AudioSegment.from_mp3(bass).overlay(AudioSegment.from_mp3(other))
music_path = os.path.join(base, 'music.mp3')
music.export(music_path, format="mp3")
# Full mix export (rest of your code)
full_path = os.path.join(base, 'full.mp3')
yield _emit_progress_event(0.94, "Exporting full mix...")
src = AudioSegment.from_file(input_wav)
src.export(full_path, format="mp3")
yield _emit_progress_event(0.98, "Cleaning up...")
os.remove(input_wav)
yield _emit_progress_event(1.0, "Separation complete.")
yield ("result", (drums, vocals, guitar, bass, other, piano, music_path, full_path))
def separate_tracks_sync(input_wav, job_id, progress_callback=None):
result = None
for event_type, payload in separate_tracks(input_wav, job_id, progress_callback=progress_callback):
if event_type == "result":
result = payload
if result is None:
raise RuntimeError("separate_tracks did not produce a result")
return result
def main():
video_id = input("enter youtube video id: ")
url = f"https://www.youtube.com/watch?v={video_id}"
try:
title = get_title(url)
job_id = sanitize_job_id(title or video_id)
wav = download_audio(url, job_id)
d, v, g, b, o, p, m, full = separate_tracks_sync(wav, job_id)
print(d, v, g, b, o, p, m, full)
except Exception as exc:
print(exc)
if __name__ == "__main__":
main()