ACAPPELLA / app.py
mouadblrs's picture
Update app.py
5581332 verified
import gradio as gr
import torch
import numpy as np
import librosa
import soundfile as sf
import yt_dlp
import os
import tempfile
import traceback
from pathlib import Path
import requests
import subprocess
from model import UNet
# Configuration
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
SR = 8192
N_FFT = 1024
HOP_LENGTH = 768
FRAME_SIZE = 128
STRIDE_FRAMES = 64
# URLs du modèle
MODEL_URL = "https://www.dropbox.com/scl/fi/pnzxhaueynzljif7kh86i/unet_final.pth?rlkey=umz3jel4az9wf8j75d0hmx04z&st=2vihy6yj&dl=1"
MODEL_PATH = "unet_final.pth"
# =========================
# INSTANCES INVIDIOUS (Proxy YouTube)
# =========================
INVIDIOUS_INSTANCES = [
"https://inv.nadeko.net",
"https://invidious.nerdvpn.de",
"https://invidious.privacyredirect.com",
"https://yewtu.be",
"https://vid.puffyan.us",
"https://invidious.snopyta.org",
"https://invidious.kavin.rocks",
]
# =========================
# INSTANCES COBALT (API open source)
# =========================
COBALT_INSTANCES = [
"https://api.cobalt.tools",
]
def download_model_if_needed():
"""Télécharge et charge le modèle U-Net"""
if not os.path.exists(MODEL_PATH) or os.path.getsize(MODEL_PATH) < 1024:
print("📥 Téléchargement du modèle...")
with requests.get(MODEL_URL, stream=True, allow_redirects=True, timeout=120) as r:
r.raise_for_status()
ct = (r.headers.get("Content-Type") or "").lower()
if "text/html" in ct:
raise RuntimeError(
f"Dropbox a renvoyé du HTML (Content-Type={ct}). "
f"Assure-toi d'avoir dl=1 dans l'URL."
)
with open(MODEL_PATH, "wb") as f:
for chunk in r.iter_content(chunk_size=1024 * 1024):
if chunk:
f.write(chunk)
with open(MODEL_PATH, "rb") as f:
head = f.read(32)
if head.startswith(b"<!DOCTYPE html") or head.startswith(b"<html") or head.startswith(b"<"):
raise RuntimeError("Le fichier téléchargé ressemble à une page HTML.")
print("✅ Modèle téléchargé")
model = UNet().to(DEVICE)
try:
state = torch.load(MODEL_PATH, map_location=DEVICE, weights_only=True)
except Exception:
state = torch.load(MODEL_PATH, map_location=DEVICE, weights_only=False)
if isinstance(state, dict) and "state_dict" in state:
state = state["state_dict"]
model.load_state_dict(state)
model.eval()
return model
model = download_model_if_needed()
# =========================
# MÉTHODE 1: yt-dlp direct
# =========================
def download_with_ytdlp(query):
"""Télécharge via yt-dlp (peut échouer sur HuggingFace)"""
temp_dir = tempfile.mkdtemp()
ydl_opts = {
'format': 'bestaudio/best',
'outtmpl': os.path.join(temp_dir, 'song.%(ext)s'),
'quiet': True,
'no_warnings': True,
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'wav',
'preferredquality': '192',
}],
'default_search': 'ytsearch1',
'noplaylist': True,
'socket_timeout': 30,
'retries': 3,
'extractor_args': {
'youtube': {
'player_client': ['android', 'tv_embedded', 'web'],
'skip': ['dash', 'hls']
}
},
'http_headers': {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-us,en;q=0.5',
},
}
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
print(f"🔍 [yt-dlp] Recherche: {query}")
info = ydl.extract_info(f"ytsearch:{query}", download=True)
for f in os.listdir(temp_dir):
if f.endswith('.wav'):
print(f"✅ [yt-dlp] Téléchargé: {f}")
title = "Chanson"
if info and 'entries' in info and info['entries']:
title = info['entries'][0].get('title', 'Chanson')
elif info:
title = info.get('title', 'Chanson')
return os.path.join(temp_dir, f), title
except Exception as e:
print(f"❌ [yt-dlp] Échec: {e}")
return None, None
# =========================
# MÉTHODE 2: Invidious (Proxy YouTube)
# =========================
def search_via_invidious(query):
"""Recherche une vidéo via les instances Invidious"""
for instance in INVIDIOUS_INSTANCES:
try:
search_url = f"{instance}/api/v1/search"
params = {'q': query, 'type': 'video'}
resp = requests.get(search_url, params=params, timeout=10)
if resp.status_code == 200:
results = resp.json()
if results and len(results) > 0:
video = results[0]
video_id = video.get('videoId')
title = video.get('title', 'Chanson')
print(f"✅ [Invidious] Trouvé: {title} ({instance})")
return {
'id': video_id,
'title': title,
'instance': instance
}
except Exception as e:
print(f"⚠️ [Invidious] {instance} échoué: {e}")
continue
return None
def download_from_invidious(video_info):
"""Télécharge l'audio depuis Invidious"""
instance = video_info['instance']
video_id = video_info['id']
# Essayer différents itags pour l'audio
audio_itags = [140, 251, 250, 249, 139] # m4a et opus
for itag in audio_itags:
try:
audio_url = f"{instance}/latest_version?id={video_id}&itag={itag}"
print(f"🔄 [Invidious] Téléchargement itag={itag}...")
resp = requests.get(audio_url, timeout=60, stream=True)
if resp.status_code == 200:
# Déterminer l'extension
ext = '.m4a' if itag in [140, 139] else '.opus'
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=ext)
for chunk in resp.iter_content(chunk_size=8192):
temp_file.write(chunk)
temp_file.close()
# Convertir en WAV avec ffmpeg
wav_path = temp_file.name.replace(ext, '.wav')
try:
subprocess.run([
'ffmpeg', '-i', temp_file.name,
'-ar', str(SR), '-ac', '1',
'-y', wav_path
], capture_output=True, timeout=60)
if os.path.exists(wav_path) and os.path.getsize(wav_path) > 1000:
os.remove(temp_file.name)
print(f"✅ [Invidious] Audio converti: {os.path.getsize(wav_path)/1e6:.1f} MB")
return wav_path, video_info['title']
except Exception as e:
print(f"⚠️ [Invidious] Conversion ffmpeg échouée: {e}")
# Retourner le fichier original si ffmpeg échoue
return temp_file.name, video_info['title']
except Exception as e:
print(f"⚠️ [Invidious] itag={itag} échoué: {e}")
continue
return None, None
# =========================
# MÉTHODE 3: Cobalt API
# =========================
def download_via_cobalt(query):
"""Télécharge via Cobalt API (open source)"""
# D'abord chercher l'ID YouTube via Invidious
video_info = search_via_invidious(query)
if not video_info:
return None, None
video_url = f"https://youtube.com/watch?v={video_info['id']}"
for instance in COBALT_INSTANCES:
try:
print(f"🔄 [Cobalt] Tentative via {instance}...")
resp = requests.post(
f"{instance}/api/json",
json={
"url": video_url,
"vCodec": "h264",
"aFormat": "mp3",
"isAudioOnly": True,
"filenamePattern": "basic"
},
headers={
"Accept": "application/json",
"Content-Type": "application/json"
},
timeout=30
)
if resp.status_code == 200:
data = resp.json()
if data.get('status') == 'stream' and data.get('url'):
# Télécharger le stream
audio_resp = requests.get(data['url'], timeout=60, stream=True)
if audio_resp.status_code == 200:
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp3')
for chunk in audio_resp.iter_content(chunk_size=8192):
temp_file.write(chunk)
temp_file.close()
print(f"✅ [Cobalt] Téléchargé: {os.path.getsize(temp_file.name)/1e6:.1f} MB")
return temp_file.name, video_info['title']
except Exception as e:
print(f"⚠️ [Cobalt] {instance} échoué: {e}")
continue
return None, None
# =========================
# PIPELINE HYBRIDE
# =========================
def download_youtube_audio(query):
"""Pipeline hybride avec multiples fallbacks"""
print(f"\n{'='*50}")
print(f"🎵 Recherche: {query}")
print(f"{'='*50}\n")
# Méthode 1: yt-dlp direct
print("📡 Tentative 1/3: yt-dlp direct...")
audio_path, title = download_with_ytdlp(query)
if audio_path:
return audio_path, title
# Méthode 2: Invidious
print("\n📡 Tentative 2/3: Invidious (proxy YouTube)...")
video_info = search_via_invidious(query)
if video_info:
audio_path, title = download_from_invidious(video_info)
if audio_path:
return audio_path, title
# Méthode 3: Cobalt
print("\n📡 Tentative 3/3: Cobalt API...")
audio_path, title = download_via_cobalt(query)
if audio_path:
return audio_path, title
print("\n❌ Toutes les méthodes ont échoué")
return None, None
# =========================
# SÉPARATION VOCALE U-Net
# =========================
def separate_vocals(audio_path):
"""Sépare les voix avec le modèle U-Net"""
print(f"\n🧠 Extraction vocale U-Net...")
# Charger audio
y, sr_orig = librosa.load(audio_path, sr=SR, mono=True)
print(f" Audio chargé: {len(y)/SR:.1f}s @ {SR}Hz")
# STFT
stft_mix = librosa.stft(y, n_fft=N_FFT, hop_length=HOP_LENGTH)
mix_mag_513 = np.abs(stft_mix).astype(np.float32)
mix_phase_513 = np.angle(stft_mix).astype(np.float32)
# Normalisation
mix_max = float(mix_mag_513.max() + 1e-8)
mix_mag_513_norm = (mix_mag_513 / mix_max).astype(np.float32)
# Enlever Nyquist bin (513 -> 512)
mix_mag_512 = mix_mag_513_norm[:-1, :]
F, T = mix_mag_512.shape
print(f" Spectrogramme: {F}x{T}")
# Inférence patch par patch
voc_norm_512 = np.zeros((F, T), dtype=np.float32)
weight = np.zeros((F, T), dtype=np.float32)
n_patches = max(1, (T - FRAME_SIZE) // STRIDE_FRAMES + 1)
print(f" Traitement de {n_patches} patches...")
with torch.no_grad():
for i, t0 in enumerate(range(0, T - FRAME_SIZE + 1, STRIDE_FRAMES)):
mix_patch = mix_mag_512[:, t0:t0 + FRAME_SIZE]
mix_t = torch.from_numpy(mix_patch).unsqueeze(0).unsqueeze(0).to(DEVICE)
mask_patch = model(mix_t).squeeze().cpu().numpy().astype(np.float32)
voc_patch = mask_patch * mix_patch
voc_norm_512[:, t0:t0 + FRAME_SIZE] += voc_patch
weight[:, t0:t0 + FRAME_SIZE] += 1.0
voc_norm_512 /= np.maximum(weight, 1.0)
# Reconstruction
voc_mag_512 = voc_norm_512 * mix_max
nyquist_row = np.zeros((1, T), dtype=np.float32)
voc_mag_513 = np.vstack([voc_mag_512, nyquist_row])
voc_stft_513 = voc_mag_513 * np.exp(1j * mix_phase_513)
voc_audio = librosa.istft(
voc_stft_513,
n_fft=N_FFT,
hop_length=HOP_LENGTH,
win_length=N_FFT,
length=len(y)
)
# Sauvegarde
output_path = tempfile.mktemp(suffix='_vocals.wav')
sf.write(output_path, voc_audio, SR)
print(f"✅ Voix extraites: {output_path}")
return output_path
# =========================
# FONCTION PRINCIPALE
# =========================
def process_song(query, progress=gr.Progress()):
"""Pipeline complète: recherche → téléchargement → extraction"""
if not query.strip():
return None, None, "❌ Veuillez entrer un titre de chanson"
try:
# Étape 1: Téléchargement
progress(0.1, desc="🔍 Recherche YouTube...")
audio_path, title = download_youtube_audio(query)
if not audio_path:
return None, None, """❌ **Impossible de télécharger la chanson**
Les serveurs YouTube semblent bloqués. Essayez:
- Un titre différent
- D'uploader directement un fichier MP3"""
progress(0.5, desc=f"✅ Trouvé: {title[:30]}...")
# Étape 2: Extraction vocale
progress(0.6, desc="🧠 Extraction vocale U-Net...")
vocals_path = separate_vocals(audio_path)
progress(1.0, desc="✅ Terminé!")
return audio_path, vocals_path, f"""✅ **Traitement réussi!**
🎵 **{title}**
📊 Modèle: U-Net ({DEVICE.upper()})
🎚️ Sample rate: {SR} Hz"""
except Exception as e:
traceback.print_exc()
return None, None, f"❌ Erreur: {str(e)}"
def process_uploaded_file(audio_file, progress=gr.Progress()):
"""Traite un fichier audio uploadé"""
if audio_file is None:
return None, "❌ Veuillez uploader un fichier audio"
try:
progress(0.3, desc="🧠 Extraction vocale...")
vocals_path = separate_vocals(audio_file)
progress(1.0, desc="✅ Terminé!")
return vocals_path, f"""✅ **Extraction terminée!**
📊 Modèle: U-Net ({DEVICE.upper()})
🎚️ Sample rate: {SR} Hz"""
except Exception as e:
traceback.print_exc()
return None, f"❌ Erreur: {str(e)}"
# =========================
# INTERFACE GRADIO
# =========================
with gr.Blocks(title="🎵 ACAPPELLA - Extracteur Vocal", theme=gr.themes.Soft()) as demo:
gr.Markdown("""
# 🎵 ACAPPELLA - Extracteur Vocal U-Net
### *Extrayez les voix des chansons avec l'IA*
---
""")
with gr.Tabs():
# Tab 1: Recherche YouTube
with gr.TabItem("🔍 Recherche YouTube"):
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("""
**Comment utiliser:**
1. Entrez un titre de chanson
2. Cliquez sur 'Extraire'
3. Écoutez et téléchargez
⚠️ *Si YouTube est bloqué, utilisez l'onglet "Upload"*
""")
query_input = gr.Textbox(
label="🎤 Titre de la chanson",
placeholder="Ex: The Weeknd - Blinding Lights",
lines=2
)
extract_btn = gr.Button("🚀 Extraire les voix", variant="primary", size="lg")
with gr.Column(scale=2):
status_yt = gr.Markdown("**Status:** En attente...")
with gr.Row():
original_audio = gr.Audio(label="🎧 Original", type="filepath")
vocals_audio = gr.Audio(label="🎤 Voix Extraites", type="filepath")
gr.Examples(
examples=[
["Adele - Hello"],
["Michael Jackson - Billie Jean"],
["Queen - Bohemian Rhapsody"],
["Ed Sheeran - Shape of You"],
["Daft Punk - Get Lucky"]
],
inputs=[query_input],
label="🎵 Exemples rapides"
)
extract_btn.click(
fn=process_song,
inputs=[query_input],
outputs=[original_audio, vocals_audio, status_yt]
)
# Tab 2: Upload manuel
with gr.TabItem("📤 Upload Manuel"):
gr.Markdown("""
### Upload direct
Si la recherche YouTube ne fonctionne pas, uploadez votre fichier audio ici.
**Formats supportés:** MP3, WAV, M4A, OGG, FLAC
""")
with gr.Row():
with gr.Column():
audio_upload = gr.Audio(
label="📁 Fichier audio",
type="filepath",
sources=["upload"]
)
upload_btn = gr.Button("🚀 Extraire les voix", variant="primary")
with gr.Column():
status_upload = gr.Markdown("**Status:** En attente d'un fichier...")
vocals_upload = gr.Audio(label="🎤 Voix Extraites", type="filepath")
upload_btn.click(
fn=process_uploaded_file,
inputs=[audio_upload],
outputs=[vocals_upload, status_upload]
)
gr.Markdown("""
---
### ℹ️ Informations
- **Modèle:** U-Net entraîné pour la séparation vocale
- **Device:** """ + DEVICE.upper() + """
- **Usage:** Recherche uniquement
*Les méthodes de téléchargement utilisent des proxies (Invidious) si YouTube direct échoue.*
""")
# =========================
# LANCEMENT
# =========================
if __name__ == "__main__":
print("🚀 Démarrage ACAPPELLA...")
print(f"🔧 Device: {DEVICE}")
print(f"🎯 Sample rate: {SR} Hz")
print(f"📊 Modèle chargé: {MODEL_PATH}")
# Test du modèle
test_input = torch.randn(1, 1, 512, 128).to(DEVICE)
with torch.no_grad():
test_output = model(test_input)
print(f"✅ Test U-Net: input {test_input.shape} → output {test_output.shape}")
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=False
)