import gradio as gr import torch import numpy as np import librosa import soundfile as sf import yt_dlp import os import tempfile import traceback from pathlib import Path import requests import subprocess from model import UNet # Configuration DEVICE = "cuda" if torch.cuda.is_available() else "cpu" SR = 8192 N_FFT = 1024 HOP_LENGTH = 768 FRAME_SIZE = 128 STRIDE_FRAMES = 64 # URLs du modèle MODEL_URL = "https://www.dropbox.com/scl/fi/pnzxhaueynzljif7kh86i/unet_final.pth?rlkey=umz3jel4az9wf8j75d0hmx04z&st=2vihy6yj&dl=1" MODEL_PATH = "unet_final.pth" # ========================= # INSTANCES INVIDIOUS (Proxy YouTube) # ========================= INVIDIOUS_INSTANCES = [ "https://inv.nadeko.net", "https://invidious.nerdvpn.de", "https://invidious.privacyredirect.com", "https://yewtu.be", "https://vid.puffyan.us", "https://invidious.snopyta.org", "https://invidious.kavin.rocks", ] # ========================= # INSTANCES COBALT (API open source) # ========================= COBALT_INSTANCES = [ "https://api.cobalt.tools", ] def download_model_if_needed(): """Télécharge et charge le modèle U-Net""" if not os.path.exists(MODEL_PATH) or os.path.getsize(MODEL_PATH) < 1024: print("📥 Téléchargement du modèle...") with requests.get(MODEL_URL, stream=True, allow_redirects=True, timeout=120) as r: r.raise_for_status() ct = (r.headers.get("Content-Type") or "").lower() if "text/html" in ct: raise RuntimeError( f"Dropbox a renvoyé du HTML (Content-Type={ct}). " f"Assure-toi d'avoir dl=1 dans l'URL." ) with open(MODEL_PATH, "wb") as f: for chunk in r.iter_content(chunk_size=1024 * 1024): if chunk: f.write(chunk) with open(MODEL_PATH, "rb") as f: head = f.read(32) if head.startswith(b" 0: video = results[0] video_id = video.get('videoId') title = video.get('title', 'Chanson') print(f"✅ [Invidious] Trouvé: {title} ({instance})") return { 'id': video_id, 'title': title, 'instance': instance } except Exception as e: print(f"⚠️ [Invidious] {instance} échoué: {e}") continue return None def download_from_invidious(video_info): """Télécharge l'audio depuis Invidious""" instance = video_info['instance'] video_id = video_info['id'] # Essayer différents itags pour l'audio audio_itags = [140, 251, 250, 249, 139] # m4a et opus for itag in audio_itags: try: audio_url = f"{instance}/latest_version?id={video_id}&itag={itag}" print(f"🔄 [Invidious] Téléchargement itag={itag}...") resp = requests.get(audio_url, timeout=60, stream=True) if resp.status_code == 200: # Déterminer l'extension ext = '.m4a' if itag in [140, 139] else '.opus' temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=ext) for chunk in resp.iter_content(chunk_size=8192): temp_file.write(chunk) temp_file.close() # Convertir en WAV avec ffmpeg wav_path = temp_file.name.replace(ext, '.wav') try: subprocess.run([ 'ffmpeg', '-i', temp_file.name, '-ar', str(SR), '-ac', '1', '-y', wav_path ], capture_output=True, timeout=60) if os.path.exists(wav_path) and os.path.getsize(wav_path) > 1000: os.remove(temp_file.name) print(f"✅ [Invidious] Audio converti: {os.path.getsize(wav_path)/1e6:.1f} MB") return wav_path, video_info['title'] except Exception as e: print(f"⚠️ [Invidious] Conversion ffmpeg échouée: {e}") # Retourner le fichier original si ffmpeg échoue return temp_file.name, video_info['title'] except Exception as e: print(f"⚠️ [Invidious] itag={itag} échoué: {e}") continue return None, None # ========================= # MÉTHODE 3: Cobalt API # ========================= def download_via_cobalt(query): """Télécharge via Cobalt API (open source)""" # D'abord chercher l'ID YouTube via Invidious video_info = search_via_invidious(query) if not video_info: return None, None video_url = f"https://youtube.com/watch?v={video_info['id']}" for instance in COBALT_INSTANCES: try: print(f"🔄 [Cobalt] Tentative via {instance}...") resp = requests.post( f"{instance}/api/json", json={ "url": video_url, "vCodec": "h264", "aFormat": "mp3", "isAudioOnly": True, "filenamePattern": "basic" }, headers={ "Accept": "application/json", "Content-Type": "application/json" }, timeout=30 ) if resp.status_code == 200: data = resp.json() if data.get('status') == 'stream' and data.get('url'): # Télécharger le stream audio_resp = requests.get(data['url'], timeout=60, stream=True) if audio_resp.status_code == 200: temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp3') for chunk in audio_resp.iter_content(chunk_size=8192): temp_file.write(chunk) temp_file.close() print(f"✅ [Cobalt] Téléchargé: {os.path.getsize(temp_file.name)/1e6:.1f} MB") return temp_file.name, video_info['title'] except Exception as e: print(f"⚠️ [Cobalt] {instance} échoué: {e}") continue return None, None # ========================= # PIPELINE HYBRIDE # ========================= def download_youtube_audio(query): """Pipeline hybride avec multiples fallbacks""" print(f"\n{'='*50}") print(f"🎵 Recherche: {query}") print(f"{'='*50}\n") # Méthode 1: yt-dlp direct print("📡 Tentative 1/3: yt-dlp direct...") audio_path, title = download_with_ytdlp(query) if audio_path: return audio_path, title # Méthode 2: Invidious print("\n📡 Tentative 2/3: Invidious (proxy YouTube)...") video_info = search_via_invidious(query) if video_info: audio_path, title = download_from_invidious(video_info) if audio_path: return audio_path, title # Méthode 3: Cobalt print("\n📡 Tentative 3/3: Cobalt API...") audio_path, title = download_via_cobalt(query) if audio_path: return audio_path, title print("\n❌ Toutes les méthodes ont échoué") return None, None # ========================= # SÉPARATION VOCALE U-Net # ========================= def separate_vocals(audio_path): """Sépare les voix avec le modèle U-Net""" print(f"\n🧠 Extraction vocale U-Net...") # Charger audio y, sr_orig = librosa.load(audio_path, sr=SR, mono=True) print(f" Audio chargé: {len(y)/SR:.1f}s @ {SR}Hz") # STFT stft_mix = librosa.stft(y, n_fft=N_FFT, hop_length=HOP_LENGTH) mix_mag_513 = np.abs(stft_mix).astype(np.float32) mix_phase_513 = np.angle(stft_mix).astype(np.float32) # Normalisation mix_max = float(mix_mag_513.max() + 1e-8) mix_mag_513_norm = (mix_mag_513 / mix_max).astype(np.float32) # Enlever Nyquist bin (513 -> 512) mix_mag_512 = mix_mag_513_norm[:-1, :] F, T = mix_mag_512.shape print(f" Spectrogramme: {F}x{T}") # Inférence patch par patch voc_norm_512 = np.zeros((F, T), dtype=np.float32) weight = np.zeros((F, T), dtype=np.float32) n_patches = max(1, (T - FRAME_SIZE) // STRIDE_FRAMES + 1) print(f" Traitement de {n_patches} patches...") with torch.no_grad(): for i, t0 in enumerate(range(0, T - FRAME_SIZE + 1, STRIDE_FRAMES)): mix_patch = mix_mag_512[:, t0:t0 + FRAME_SIZE] mix_t = torch.from_numpy(mix_patch).unsqueeze(0).unsqueeze(0).to(DEVICE) mask_patch = model(mix_t).squeeze().cpu().numpy().astype(np.float32) voc_patch = mask_patch * mix_patch voc_norm_512[:, t0:t0 + FRAME_SIZE] += voc_patch weight[:, t0:t0 + FRAME_SIZE] += 1.0 voc_norm_512 /= np.maximum(weight, 1.0) # Reconstruction voc_mag_512 = voc_norm_512 * mix_max nyquist_row = np.zeros((1, T), dtype=np.float32) voc_mag_513 = np.vstack([voc_mag_512, nyquist_row]) voc_stft_513 = voc_mag_513 * np.exp(1j * mix_phase_513) voc_audio = librosa.istft( voc_stft_513, n_fft=N_FFT, hop_length=HOP_LENGTH, win_length=N_FFT, length=len(y) ) # Sauvegarde output_path = tempfile.mktemp(suffix='_vocals.wav') sf.write(output_path, voc_audio, SR) print(f"✅ Voix extraites: {output_path}") return output_path # ========================= # FONCTION PRINCIPALE # ========================= def process_song(query, progress=gr.Progress()): """Pipeline complète: recherche → téléchargement → extraction""" if not query.strip(): return None, None, "❌ Veuillez entrer un titre de chanson" try: # Étape 1: Téléchargement progress(0.1, desc="🔍 Recherche YouTube...") audio_path, title = download_youtube_audio(query) if not audio_path: return None, None, """❌ **Impossible de télécharger la chanson** Les serveurs YouTube semblent bloqués. Essayez: - Un titre différent - D'uploader directement un fichier MP3""" progress(0.5, desc=f"✅ Trouvé: {title[:30]}...") # Étape 2: Extraction vocale progress(0.6, desc="🧠 Extraction vocale U-Net...") vocals_path = separate_vocals(audio_path) progress(1.0, desc="✅ Terminé!") return audio_path, vocals_path, f"""✅ **Traitement réussi!** 🎵 **{title}** 📊 Modèle: U-Net ({DEVICE.upper()}) 🎚️ Sample rate: {SR} Hz""" except Exception as e: traceback.print_exc() return None, None, f"❌ Erreur: {str(e)}" def process_uploaded_file(audio_file, progress=gr.Progress()): """Traite un fichier audio uploadé""" if audio_file is None: return None, "❌ Veuillez uploader un fichier audio" try: progress(0.3, desc="🧠 Extraction vocale...") vocals_path = separate_vocals(audio_file) progress(1.0, desc="✅ Terminé!") return vocals_path, f"""✅ **Extraction terminée!** 📊 Modèle: U-Net ({DEVICE.upper()}) 🎚️ Sample rate: {SR} Hz""" except Exception as e: traceback.print_exc() return None, f"❌ Erreur: {str(e)}" # ========================= # INTERFACE GRADIO # ========================= with gr.Blocks(title="🎵 ACAPPELLA - Extracteur Vocal", theme=gr.themes.Soft()) as demo: gr.Markdown(""" # 🎵 ACAPPELLA - Extracteur Vocal U-Net ### *Extrayez les voix des chansons avec l'IA* --- """) with gr.Tabs(): # Tab 1: Recherche YouTube with gr.TabItem("🔍 Recherche YouTube"): with gr.Row(): with gr.Column(scale=1): gr.Markdown(""" **Comment utiliser:** 1. Entrez un titre de chanson 2. Cliquez sur 'Extraire' 3. Écoutez et téléchargez ⚠️ *Si YouTube est bloqué, utilisez l'onglet "Upload"* """) query_input = gr.Textbox( label="🎤 Titre de la chanson", placeholder="Ex: The Weeknd - Blinding Lights", lines=2 ) extract_btn = gr.Button("🚀 Extraire les voix", variant="primary", size="lg") with gr.Column(scale=2): status_yt = gr.Markdown("**Status:** En attente...") with gr.Row(): original_audio = gr.Audio(label="🎧 Original", type="filepath") vocals_audio = gr.Audio(label="🎤 Voix Extraites", type="filepath") gr.Examples( examples=[ ["Adele - Hello"], ["Michael Jackson - Billie Jean"], ["Queen - Bohemian Rhapsody"], ["Ed Sheeran - Shape of You"], ["Daft Punk - Get Lucky"] ], inputs=[query_input], label="🎵 Exemples rapides" ) extract_btn.click( fn=process_song, inputs=[query_input], outputs=[original_audio, vocals_audio, status_yt] ) # Tab 2: Upload manuel with gr.TabItem("📤 Upload Manuel"): gr.Markdown(""" ### Upload direct Si la recherche YouTube ne fonctionne pas, uploadez votre fichier audio ici. **Formats supportés:** MP3, WAV, M4A, OGG, FLAC """) with gr.Row(): with gr.Column(): audio_upload = gr.Audio( label="📁 Fichier audio", type="filepath", sources=["upload"] ) upload_btn = gr.Button("🚀 Extraire les voix", variant="primary") with gr.Column(): status_upload = gr.Markdown("**Status:** En attente d'un fichier...") vocals_upload = gr.Audio(label="🎤 Voix Extraites", type="filepath") upload_btn.click( fn=process_uploaded_file, inputs=[audio_upload], outputs=[vocals_upload, status_upload] ) gr.Markdown(""" --- ### ℹ️ Informations - **Modèle:** U-Net entraîné pour la séparation vocale - **Device:** """ + DEVICE.upper() + """ - **Usage:** Recherche uniquement *Les méthodes de téléchargement utilisent des proxies (Invidious) si YouTube direct échoue.* """) # ========================= # LANCEMENT # ========================= if __name__ == "__main__": print("🚀 Démarrage ACAPPELLA...") print(f"🔧 Device: {DEVICE}") print(f"🎯 Sample rate: {SR} Hz") print(f"📊 Modèle chargé: {MODEL_PATH}") # Test du modèle test_input = torch.randn(1, 1, 512, 128).to(DEVICE) with torch.no_grad(): test_output = model(test_input) print(f"✅ Test U-Net: input {test_input.shape} → output {test_output.shape}") demo.launch( server_name="0.0.0.0", server_port=7860, share=False )