Spaces:
Sleeping
Sleeping
| import os | |
| import uuid | |
| import shutil | |
| import zipfile | |
| import subprocess | |
| import tempfile | |
| import logging | |
| from pathlib import Path | |
| import gradio as gr | |
| import numpy as np | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # Logging | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s [%(levelname)s] %(message)s", | |
| ) | |
| log = logging.getLogger("vocalclean-gradio") | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # Directories | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| BASE_DIR = Path(__file__).parent | |
| OUTPUTS_DIR = BASE_DIR / "outputs" | |
| ASSETS_DIR = BASE_DIR / "assets" | |
| OUTPUTS_DIR.mkdir(exist_ok=True) | |
| ASSETS_DIR.mkdir(exist_ok=True) | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # Constants | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| MAX_FILE_SIZE_MB = 100 | |
| MAX_FILE_SIZE_BYTES = MAX_FILE_SIZE_MB * 1024 * 1024 | |
| ALLOWED_EXTENSIONS = {".mp3", ".wav", ".m4a", ".flac", ".ogg"} | |
| DEMUCS_MODEL = "htdemucs" | |
| STEM_META = { | |
| "vocals": {"label": "Vocals", "color": "#4F46E5", "icon": "π€"}, | |
| "drums": {"label": "Drums", "color": "#EF4444", "icon": "π₯"}, | |
| "bass": {"label": "Bass", "color": "#8B5CF6", "icon": "πΈ"}, | |
| "other": {"label": "Other / Melody", "color": "#F59E0B", "icon": "πΉ"}, | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # GPU Detection | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def detect_device() -> str: | |
| try: | |
| import torch | |
| if torch.cuda.is_available(): | |
| name = torch.cuda.get_device_name(0) | |
| log.info(f"GPU detected: {name}") | |
| return "cuda" | |
| except Exception: | |
| pass | |
| log.info("No GPU β running on CPU") | |
| return "cpu" | |
| DEVICE = detect_device() | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # FFmpeg Preprocessing | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def preprocess_audio(input_path: Path, output_path: Path) -> Path: | |
| """Normalise to WAV, stereo, 44.1 kHz before Demucs.""" | |
| cmd = [ | |
| "ffmpeg", "-y", | |
| "-i", str(input_path), | |
| "-ac", "2", | |
| "-ar", "44100", | |
| "-sample_fmt", "s16", | |
| "-f", "wav", | |
| str(output_path), | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True, timeout=120) | |
| if result.returncode != 0: | |
| raise RuntimeError(f"FFmpeg failed: {result.stderr[-400:]}") | |
| return output_path | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # Demucs Separation | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def run_demucs(input_path: Path, output_dir: Path, progress_cb=None) -> dict[str, Path]: | |
| """Run Demucs htdemucs and return a dict of stem_name β wav path.""" | |
| if progress_cb: | |
| progress_cb(0.1, "Preprocessing audio...") | |
| preprocessed = input_path.parent / f"pre_{input_path.stem}.wav" | |
| try: | |
| preprocess_audio(input_path, preprocessed) | |
| demucs_input = preprocessed | |
| except Exception as e: | |
| log.warning(f"FFmpeg preprocessing skipped: {e}") | |
| demucs_input = input_path | |
| if progress_cb: | |
| progress_cb(0.2, f"Running Hybrid Demucs on {DEVICE.upper()}...") | |
| cmd = [ | |
| "python3", "-m", "demucs", | |
| "--device", DEVICE, | |
| "-n", DEMUCS_MODEL, | |
| "-o", str(output_dir), | |
| str(demucs_input), | |
| ] | |
| log.info(f"Demucs command: {' '.join(cmd)}") | |
| proc = subprocess.run(cmd, capture_output=True, text=True, timeout=600) | |
| if proc.returncode != 0: | |
| error_msg = (proc.stderr or proc.stdout or "Unknown error")[-600:] | |
| log.error(f"Demucs failed: {error_msg}") | |
| raise RuntimeError(f"Demucs separation failed:\n{error_msg}") | |
| if progress_cb: | |
| progress_cb(0.85, "Collecting output stems...") | |
| stems: dict[str, Path] = {} | |
| for wav in output_dir.rglob("*.wav"): | |
| stems[wav.stem] = wav | |
| if not stems: | |
| raise RuntimeError("No output files were generated by Demucs.") | |
| # Clean up preprocessed file | |
| try: | |
| preprocessed.unlink(missing_ok=True) | |
| except Exception: | |
| pass | |
| log.info(f"Stems found: {list(stems.keys())}") | |
| return stems | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # ZIP Builder | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def build_zip(stems: dict[str, Path], job_dir: Path) -> Path: | |
| zip_path = job_dir / "stems.zip" | |
| with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf: | |
| for name, path in stems.items(): | |
| zf.write(path, f"{name}.wav") | |
| return zip_path | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # Main Processing Function | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def separate_audio(audio_file, progress=gr.Progress(track_tqdm=True)): | |
| if audio_file is None: | |
| return ( | |
| "β No file uploaded.", | |
| None, None, None, None, None, | |
| ) | |
| input_path = Path(audio_file) | |
| ext = input_path.suffix.lower() | |
| if ext not in ALLOWED_EXTENSIONS: | |
| return ( | |
| f"β Unsupported format '{ext}'. Please upload MP3, WAV, M4A, FLAC, or OGG.", | |
| None, None, None, None, None, | |
| ) | |
| file_size = input_path.stat().st_size | |
| if file_size > MAX_FILE_SIZE_BYTES: | |
| size_mb = file_size / (1024 * 1024) | |
| return ( | |
| f"β File too large ({size_mb:.1f} MB). Maximum allowed size is {MAX_FILE_SIZE_MB} MB.", | |
| None, None, None, None, None, | |
| ) | |
| job_id = str(uuid.uuid4())[:8] | |
| job_dir = OUTPUTS_DIR / job_id | |
| job_dir.mkdir(parents=True, exist_ok=True) | |
| log.info(f"Job {job_id}: processing '{input_path.name}' ({file_size / 1024:.0f} KB)") | |
| try: | |
| def update_progress(frac: float, msg: str): | |
| progress(frac, desc=msg) | |
| log.info(f"Job {job_id}: [{int(frac * 100)}%] {msg}") | |
| update_progress(0.05, "Starting AI separation β this may take 1β3 minutes on free servers...") | |
| stems = run_demucs(input_path, job_dir, progress_cb=update_progress) | |
| update_progress(0.92, "Building download archive...") | |
| zip_path = build_zip(stems, job_dir) | |
| update_progress(1.0, "β Done!") | |
| log.info(f"Job {job_id}: complete β {list(stems.keys())}") | |
| def stem_path(name: str): | |
| return str(stems[name]) if name in stems else None | |
| status = f"β Separation complete! Stems: {', '.join(stems.keys())}" | |
| return ( | |
| status, | |
| stem_path("vocals"), | |
| stem_path("drums"), | |
| stem_path("bass"), | |
| stem_path("other"), | |
| str(zip_path), | |
| ) | |
| except Exception as exc: | |
| log.exception(f"Job {job_id}: error") | |
| try: | |
| shutil.rmtree(job_dir, ignore_errors=True) | |
| except Exception: | |
| pass | |
| return ( | |
| f"β Processing failed: {exc}", | |
| None, None, None, None, None, | |
| ) | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # Gradio Interface | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| css = """ | |
| #title { text-align: center; margin-bottom: 8px; } | |
| #subtitle { text-align: center; color: #6B7280; margin-bottom: 24px; } | |
| #status-box { border-radius: 10px; } | |
| .stem-row { gap: 16px; } | |
| footer { display: none !important; } | |
| """ | |
| with gr.Blocks( | |
| title="VocalClean AI β Music Stem Separator", | |
| theme=gr.themes.Soft( | |
| primary_hue="indigo", | |
| secondary_hue="sky", | |
| font=gr.themes.GoogleFont("Inter"), | |
| ), | |
| css=css, | |
| ) as demo: | |
| gr.HTML(""" | |
| <h1 id="title" style="font-size:2rem;font-weight:700;"> | |
| π΅ VocalClean AI | |
| </h1> | |
| <p id="subtitle"> | |
| Separate music into individual stems using Hybrid Demucs AI | |
| | Vocals Β· Drums Β· Bass Β· Other | |
| </p> | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### π€ Upload Audio") | |
| audio_input = gr.Audio( | |
| label="Drop your audio file here", | |
| type="filepath", | |
| sources=["upload"], | |
| ) | |
| gr.Markdown( | |
| "_Supported: MP3, WAV, M4A, FLAC, OGG β up to 100 MB_", | |
| elem_classes=["upload-hint"], | |
| ) | |
| run_btn = gr.Button( | |
| "π Separate Stems", | |
| variant="primary", | |
| size="lg", | |
| ) | |
| with gr.Column(scale=1): | |
| gr.Markdown("### π Processing Status") | |
| status_out = gr.Textbox( | |
| label="Status", | |
| interactive=False, | |
| placeholder="Upload a file and click 'Separate Stems' to begin...", | |
| lines=3, | |
| elem_id="status-box", | |
| ) | |
| gr.Markdown( | |
| "β±οΈ _Processing may take **1β3 minutes** on free CPU servers. " | |
| "GPU environments run significantly faster._" | |
| ) | |
| gr.Markdown("---") | |
| gr.Markdown("### π§ Stem Results") | |
| with gr.Row(elem_classes=["stem-row"]): | |
| with gr.Column(): | |
| gr.Markdown("#### π€ Vocals") | |
| vocals_out = gr.Audio(label="Vocals", type="filepath", interactive=False) | |
| with gr.Column(): | |
| gr.Markdown("#### π₯ Drums") | |
| drums_out = gr.Audio(label="Drums", type="filepath", interactive=False) | |
| with gr.Row(elem_classes=["stem-row"]): | |
| with gr.Column(): | |
| gr.Markdown("#### πΈ Bass") | |
| bass_out = gr.Audio(label="Bass", type="filepath", interactive=False) | |
| with gr.Column(): | |
| gr.Markdown("#### πΉ Other / Melody") | |
| other_out = gr.Audio(label="Other", type="filepath", interactive=False) | |
| gr.Markdown("---") | |
| gr.Markdown("### π¦ Download") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| zip_out = gr.File( | |
| label="Download All Stems (ZIP)", | |
| interactive=False, | |
| ) | |
| with gr.Column(scale=1): | |
| gr.Markdown( | |
| "Each stem is exported as a high-quality **WAV** file. " | |
| "The ZIP archive contains all separated tracks." | |
| ) | |
| gr.Markdown("---") | |
| gr.Markdown( | |
| "<center><small>Powered by " | |
| "[Hybrid Demucs](https://github.com/facebookresearch/demucs) " | |
| "by Meta Research Β· " | |
| "Built with [Gradio](https://gradio.app)</small></center>" | |
| ) | |
| run_btn.click( | |
| fn=separate_audio, | |
| inputs=[audio_input], | |
| outputs=[status_out, vocals_out, drums_out, bass_out, other_out, zip_out], | |
| show_progress="full", | |
| ) | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # Launch | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| if __name__ == "__main__": | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=int(os.environ.get("PORT", 7860)), | |
| share=False, | |
| show_error=True, | |
| ) | |