| import gradio as gr |
| import torch |
| import numpy as np |
| import librosa |
| import soundfile as sf |
| import yt_dlp |
| import os |
| import tempfile |
| import traceback |
| from pathlib import Path |
| import requests |
| import subprocess |
| from model import UNet |
|
|
| |
| DEVICE = "cuda" if torch.cuda.is_available() else "cpu" |
| SR = 8192 |
| N_FFT = 1024 |
| HOP_LENGTH = 768 |
| FRAME_SIZE = 128 |
| STRIDE_FRAMES = 64 |
|
|
| |
| MODEL_URL = "https://www.dropbox.com/scl/fi/pnzxhaueynzljif7kh86i/unet_final.pth?rlkey=umz3jel4az9wf8j75d0hmx04z&st=2vihy6yj&dl=1" |
| MODEL_PATH = "unet_final.pth" |
|
|
| |
| |
| |
| INVIDIOUS_INSTANCES = [ |
| "https://inv.nadeko.net", |
| "https://invidious.nerdvpn.de", |
| "https://invidious.privacyredirect.com", |
| "https://yewtu.be", |
| "https://vid.puffyan.us", |
| "https://invidious.snopyta.org", |
| "https://invidious.kavin.rocks", |
| ] |
|
|
| |
| |
| |
| COBALT_INSTANCES = [ |
| "https://api.cobalt.tools", |
| ] |
|
|
|
|
| def download_model_if_needed(): |
| """Télécharge et charge le modèle U-Net""" |
| if not os.path.exists(MODEL_PATH) or os.path.getsize(MODEL_PATH) < 1024: |
| print("📥 Téléchargement du modèle...") |
|
|
| with requests.get(MODEL_URL, stream=True, allow_redirects=True, timeout=120) as r: |
| r.raise_for_status() |
|
|
| ct = (r.headers.get("Content-Type") or "").lower() |
| if "text/html" in ct: |
| raise RuntimeError( |
| f"Dropbox a renvoyé du HTML (Content-Type={ct}). " |
| f"Assure-toi d'avoir dl=1 dans l'URL." |
| ) |
|
|
| with open(MODEL_PATH, "wb") as f: |
| for chunk in r.iter_content(chunk_size=1024 * 1024): |
| if chunk: |
| f.write(chunk) |
|
|
| with open(MODEL_PATH, "rb") as f: |
| head = f.read(32) |
| if head.startswith(b"<!DOCTYPE html") or head.startswith(b"<html") or head.startswith(b"<"): |
| raise RuntimeError("Le fichier téléchargé ressemble à une page HTML.") |
|
|
| print("✅ Modèle téléchargé") |
|
|
| model = UNet().to(DEVICE) |
|
|
| try: |
| state = torch.load(MODEL_PATH, map_location=DEVICE, weights_only=True) |
| except Exception: |
| state = torch.load(MODEL_PATH, map_location=DEVICE, weights_only=False) |
|
|
| if isinstance(state, dict) and "state_dict" in state: |
| state = state["state_dict"] |
|
|
| model.load_state_dict(state) |
| model.eval() |
| return model |
|
|
|
|
| model = download_model_if_needed() |
|
|
|
|
| |
| |
| |
| def download_with_ytdlp(query): |
| """Télécharge via yt-dlp (peut échouer sur HuggingFace)""" |
| temp_dir = tempfile.mkdtemp() |
| |
| ydl_opts = { |
| 'format': 'bestaudio/best', |
| 'outtmpl': os.path.join(temp_dir, 'song.%(ext)s'), |
| 'quiet': True, |
| 'no_warnings': True, |
| 'postprocessors': [{ |
| 'key': 'FFmpegExtractAudio', |
| 'preferredcodec': 'wav', |
| 'preferredquality': '192', |
| }], |
| 'default_search': 'ytsearch1', |
| 'noplaylist': True, |
| 'socket_timeout': 30, |
| 'retries': 3, |
| 'extractor_args': { |
| 'youtube': { |
| 'player_client': ['android', 'tv_embedded', 'web'], |
| 'skip': ['dash', 'hls'] |
| } |
| }, |
| 'http_headers': { |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', |
| 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', |
| 'Accept-Language': 'en-us,en;q=0.5', |
| }, |
| } |
| |
| try: |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
| print(f"🔍 [yt-dlp] Recherche: {query}") |
| info = ydl.extract_info(f"ytsearch:{query}", download=True) |
| |
| for f in os.listdir(temp_dir): |
| if f.endswith('.wav'): |
| print(f"✅ [yt-dlp] Téléchargé: {f}") |
| title = "Chanson" |
| if info and 'entries' in info and info['entries']: |
| title = info['entries'][0].get('title', 'Chanson') |
| elif info: |
| title = info.get('title', 'Chanson') |
| return os.path.join(temp_dir, f), title |
| |
| except Exception as e: |
| print(f"❌ [yt-dlp] Échec: {e}") |
| |
| return None, None |
|
|
|
|
| |
| |
| |
| def search_via_invidious(query): |
| """Recherche une vidéo via les instances Invidious""" |
| for instance in INVIDIOUS_INSTANCES: |
| try: |
| search_url = f"{instance}/api/v1/search" |
| params = {'q': query, 'type': 'video'} |
| |
| resp = requests.get(search_url, params=params, timeout=10) |
| |
| if resp.status_code == 200: |
| results = resp.json() |
| if results and len(results) > 0: |
| video = results[0] |
| video_id = video.get('videoId') |
| title = video.get('title', 'Chanson') |
| |
| print(f"✅ [Invidious] Trouvé: {title} ({instance})") |
| return { |
| 'id': video_id, |
| 'title': title, |
| 'instance': instance |
| } |
| except Exception as e: |
| print(f"⚠️ [Invidious] {instance} échoué: {e}") |
| continue |
| |
| return None |
|
|
|
|
| def download_from_invidious(video_info): |
| """Télécharge l'audio depuis Invidious""" |
| instance = video_info['instance'] |
| video_id = video_info['id'] |
| |
| |
| audio_itags = [140, 251, 250, 249, 139] |
| |
| for itag in audio_itags: |
| try: |
| audio_url = f"{instance}/latest_version?id={video_id}&itag={itag}" |
| print(f"🔄 [Invidious] Téléchargement itag={itag}...") |
| |
| resp = requests.get(audio_url, timeout=60, stream=True) |
| |
| if resp.status_code == 200: |
| |
| ext = '.m4a' if itag in [140, 139] else '.opus' |
| temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=ext) |
| |
| for chunk in resp.iter_content(chunk_size=8192): |
| temp_file.write(chunk) |
| temp_file.close() |
| |
| |
| wav_path = temp_file.name.replace(ext, '.wav') |
| try: |
| subprocess.run([ |
| 'ffmpeg', '-i', temp_file.name, |
| '-ar', str(SR), '-ac', '1', |
| '-y', wav_path |
| ], capture_output=True, timeout=60) |
| |
| if os.path.exists(wav_path) and os.path.getsize(wav_path) > 1000: |
| os.remove(temp_file.name) |
| print(f"✅ [Invidious] Audio converti: {os.path.getsize(wav_path)/1e6:.1f} MB") |
| return wav_path, video_info['title'] |
| except Exception as e: |
| print(f"⚠️ [Invidious] Conversion ffmpeg échouée: {e}") |
| |
| return temp_file.name, video_info['title'] |
| |
| except Exception as e: |
| print(f"⚠️ [Invidious] itag={itag} échoué: {e}") |
| continue |
| |
| return None, None |
|
|
|
|
| |
| |
| |
| def download_via_cobalt(query): |
| """Télécharge via Cobalt API (open source)""" |
| |
| video_info = search_via_invidious(query) |
| if not video_info: |
| return None, None |
| |
| video_url = f"https://youtube.com/watch?v={video_info['id']}" |
| |
| for instance in COBALT_INSTANCES: |
| try: |
| print(f"🔄 [Cobalt] Tentative via {instance}...") |
| |
| resp = requests.post( |
| f"{instance}/api/json", |
| json={ |
| "url": video_url, |
| "vCodec": "h264", |
| "aFormat": "mp3", |
| "isAudioOnly": True, |
| "filenamePattern": "basic" |
| }, |
| headers={ |
| "Accept": "application/json", |
| "Content-Type": "application/json" |
| }, |
| timeout=30 |
| ) |
| |
| if resp.status_code == 200: |
| data = resp.json() |
| |
| if data.get('status') == 'stream' and data.get('url'): |
| |
| audio_resp = requests.get(data['url'], timeout=60, stream=True) |
| |
| if audio_resp.status_code == 200: |
| temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp3') |
| for chunk in audio_resp.iter_content(chunk_size=8192): |
| temp_file.write(chunk) |
| temp_file.close() |
| |
| print(f"✅ [Cobalt] Téléchargé: {os.path.getsize(temp_file.name)/1e6:.1f} MB") |
| return temp_file.name, video_info['title'] |
| |
| except Exception as e: |
| print(f"⚠️ [Cobalt] {instance} échoué: {e}") |
| continue |
| |
| return None, None |
|
|
|
|
| |
| |
| |
| def download_youtube_audio(query): |
| """Pipeline hybride avec multiples fallbacks""" |
| |
| print(f"\n{'='*50}") |
| print(f"🎵 Recherche: {query}") |
| print(f"{'='*50}\n") |
| |
| |
| print("📡 Tentative 1/3: yt-dlp direct...") |
| audio_path, title = download_with_ytdlp(query) |
| if audio_path: |
| return audio_path, title |
| |
| |
| print("\n📡 Tentative 2/3: Invidious (proxy YouTube)...") |
| video_info = search_via_invidious(query) |
| if video_info: |
| audio_path, title = download_from_invidious(video_info) |
| if audio_path: |
| return audio_path, title |
| |
| |
| print("\n📡 Tentative 3/3: Cobalt API...") |
| audio_path, title = download_via_cobalt(query) |
| if audio_path: |
| return audio_path, title |
| |
| print("\n❌ Toutes les méthodes ont échoué") |
| return None, None |
|
|
|
|
| |
| |
| |
| def separate_vocals(audio_path): |
| """Sépare les voix avec le modèle U-Net""" |
| print(f"\n🧠 Extraction vocale U-Net...") |
| |
| |
| y, sr_orig = librosa.load(audio_path, sr=SR, mono=True) |
| print(f" Audio chargé: {len(y)/SR:.1f}s @ {SR}Hz") |
| |
| |
| stft_mix = librosa.stft(y, n_fft=N_FFT, hop_length=HOP_LENGTH) |
| mix_mag_513 = np.abs(stft_mix).astype(np.float32) |
| mix_phase_513 = np.angle(stft_mix).astype(np.float32) |
| |
| |
| mix_max = float(mix_mag_513.max() + 1e-8) |
| mix_mag_513_norm = (mix_mag_513 / mix_max).astype(np.float32) |
| |
| |
| mix_mag_512 = mix_mag_513_norm[:-1, :] |
| F, T = mix_mag_512.shape |
| print(f" Spectrogramme: {F}x{T}") |
| |
| |
| voc_norm_512 = np.zeros((F, T), dtype=np.float32) |
| weight = np.zeros((F, T), dtype=np.float32) |
| |
| n_patches = max(1, (T - FRAME_SIZE) // STRIDE_FRAMES + 1) |
| print(f" Traitement de {n_patches} patches...") |
| |
| with torch.no_grad(): |
| for i, t0 in enumerate(range(0, T - FRAME_SIZE + 1, STRIDE_FRAMES)): |
| mix_patch = mix_mag_512[:, t0:t0 + FRAME_SIZE] |
| mix_t = torch.from_numpy(mix_patch).unsqueeze(0).unsqueeze(0).to(DEVICE) |
| |
| mask_patch = model(mix_t).squeeze().cpu().numpy().astype(np.float32) |
| voc_patch = mask_patch * mix_patch |
| |
| voc_norm_512[:, t0:t0 + FRAME_SIZE] += voc_patch |
| weight[:, t0:t0 + FRAME_SIZE] += 1.0 |
| |
| voc_norm_512 /= np.maximum(weight, 1.0) |
| |
| |
| voc_mag_512 = voc_norm_512 * mix_max |
| nyquist_row = np.zeros((1, T), dtype=np.float32) |
| voc_mag_513 = np.vstack([voc_mag_512, nyquist_row]) |
| voc_stft_513 = voc_mag_513 * np.exp(1j * mix_phase_513) |
| |
| voc_audio = librosa.istft( |
| voc_stft_513, |
| n_fft=N_FFT, |
| hop_length=HOP_LENGTH, |
| win_length=N_FFT, |
| length=len(y) |
| ) |
| |
| |
| output_path = tempfile.mktemp(suffix='_vocals.wav') |
| sf.write(output_path, voc_audio, SR) |
| |
| print(f"✅ Voix extraites: {output_path}") |
| return output_path |
|
|
|
|
| |
| |
| |
| def process_song(query, progress=gr.Progress()): |
| """Pipeline complète: recherche → téléchargement → extraction""" |
| if not query.strip(): |
| return None, None, "❌ Veuillez entrer un titre de chanson" |
| |
| try: |
| |
| progress(0.1, desc="🔍 Recherche YouTube...") |
| audio_path, title = download_youtube_audio(query) |
| |
| if not audio_path: |
| return None, None, """❌ **Impossible de télécharger la chanson** |
| |
| Les serveurs YouTube semblent bloqués. Essayez: |
| - Un titre différent |
| - D'uploader directement un fichier MP3""" |
| |
| progress(0.5, desc=f"✅ Trouvé: {title[:30]}...") |
| |
| |
| progress(0.6, desc="🧠 Extraction vocale U-Net...") |
| vocals_path = separate_vocals(audio_path) |
| |
| progress(1.0, desc="✅ Terminé!") |
| |
| return audio_path, vocals_path, f"""✅ **Traitement réussi!** |
| |
| 🎵 **{title}** |
| 📊 Modèle: U-Net ({DEVICE.upper()}) |
| 🎚️ Sample rate: {SR} Hz""" |
| |
| except Exception as e: |
| traceback.print_exc() |
| return None, None, f"❌ Erreur: {str(e)}" |
|
|
|
|
| def process_uploaded_file(audio_file, progress=gr.Progress()): |
| """Traite un fichier audio uploadé""" |
| if audio_file is None: |
| return None, "❌ Veuillez uploader un fichier audio" |
| |
| try: |
| progress(0.3, desc="🧠 Extraction vocale...") |
| vocals_path = separate_vocals(audio_file) |
| progress(1.0, desc="✅ Terminé!") |
| |
| return vocals_path, f"""✅ **Extraction terminée!** |
| |
| 📊 Modèle: U-Net ({DEVICE.upper()}) |
| 🎚️ Sample rate: {SR} Hz""" |
| |
| except Exception as e: |
| traceback.print_exc() |
| return None, f"❌ Erreur: {str(e)}" |
|
|
|
|
| |
| |
| |
| with gr.Blocks(title="🎵 ACAPPELLA - Extracteur Vocal", theme=gr.themes.Soft()) as demo: |
| gr.Markdown(""" |
| # 🎵 ACAPPELLA - Extracteur Vocal U-Net |
| ### *Extrayez les voix des chansons avec l'IA* |
| --- |
| """) |
| |
| with gr.Tabs(): |
| |
| with gr.TabItem("🔍 Recherche YouTube"): |
| with gr.Row(): |
| with gr.Column(scale=1): |
| gr.Markdown(""" |
| **Comment utiliser:** |
| 1. Entrez un titre de chanson |
| 2. Cliquez sur 'Extraire' |
| 3. Écoutez et téléchargez |
| |
| ⚠️ *Si YouTube est bloqué, utilisez l'onglet "Upload"* |
| """) |
| |
| query_input = gr.Textbox( |
| label="🎤 Titre de la chanson", |
| placeholder="Ex: The Weeknd - Blinding Lights", |
| lines=2 |
| ) |
| |
| extract_btn = gr.Button("🚀 Extraire les voix", variant="primary", size="lg") |
| |
| with gr.Column(scale=2): |
| status_yt = gr.Markdown("**Status:** En attente...") |
| |
| with gr.Row(): |
| original_audio = gr.Audio(label="🎧 Original", type="filepath") |
| vocals_audio = gr.Audio(label="🎤 Voix Extraites", type="filepath") |
| |
| gr.Examples( |
| examples=[ |
| ["Adele - Hello"], |
| ["Michael Jackson - Billie Jean"], |
| ["Queen - Bohemian Rhapsody"], |
| ["Ed Sheeran - Shape of You"], |
| ["Daft Punk - Get Lucky"] |
| ], |
| inputs=[query_input], |
| label="🎵 Exemples rapides" |
| ) |
| |
| extract_btn.click( |
| fn=process_song, |
| inputs=[query_input], |
| outputs=[original_audio, vocals_audio, status_yt] |
| ) |
| |
| |
| with gr.TabItem("📤 Upload Manuel"): |
| gr.Markdown(""" |
| ### Upload direct |
| Si la recherche YouTube ne fonctionne pas, uploadez votre fichier audio ici. |
| |
| **Formats supportés:** MP3, WAV, M4A, OGG, FLAC |
| """) |
| |
| with gr.Row(): |
| with gr.Column(): |
| audio_upload = gr.Audio( |
| label="📁 Fichier audio", |
| type="filepath", |
| sources=["upload"] |
| ) |
| upload_btn = gr.Button("🚀 Extraire les voix", variant="primary") |
| |
| with gr.Column(): |
| status_upload = gr.Markdown("**Status:** En attente d'un fichier...") |
| vocals_upload = gr.Audio(label="🎤 Voix Extraites", type="filepath") |
| |
| upload_btn.click( |
| fn=process_uploaded_file, |
| inputs=[audio_upload], |
| outputs=[vocals_upload, status_upload] |
| ) |
| |
| gr.Markdown(""" |
| --- |
| ### ℹ️ Informations |
| - **Modèle:** U-Net entraîné pour la séparation vocale |
| - **Device:** """ + DEVICE.upper() + """ |
| - **Usage:** Recherche uniquement |
| |
| *Les méthodes de téléchargement utilisent des proxies (Invidious) si YouTube direct échoue.* |
| """) |
|
|
|
|
| |
| |
| |
| if __name__ == "__main__": |
| print("🚀 Démarrage ACAPPELLA...") |
| print(f"🔧 Device: {DEVICE}") |
| print(f"🎯 Sample rate: {SR} Hz") |
| print(f"📊 Modèle chargé: {MODEL_PATH}") |
| |
| |
| test_input = torch.randn(1, 1, 512, 128).to(DEVICE) |
| with torch.no_grad(): |
| test_output = model(test_input) |
| print(f"✅ Test U-Net: input {test_input.shape} → output {test_output.shape}") |
| |
| demo.launch( |
| server_name="0.0.0.0", |
| server_port=7860, |
| share=False |
| ) |