Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import os | |
| import subprocess | |
| import spaces | |
| import torch | |
| from typing import Tuple, List, Dict | |
| from pydub import AudioSegment | |
| from rich.console import Console | |
| from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TimeRemainingColumn | |
| from rich.text import Text | |
| import time | |
| import shutil | |
| # Get the terminal width, or use a default if not available | |
| terminal_width = shutil.get_terminal_size((80, 20)).columns | |
| # Create a console with a specific width | |
| console = Console(width=min(terminal_width, 100)) # Limit to 100 columns max | |
| def fade_text(text, duration=0.5): | |
| for i in range(10): | |
| opacity = i / 10 | |
| yield f"<div style='opacity: {opacity}; transition: opacity 0.05s;'>{text}</div>" | |
| time.sleep(duration / 10) | |
| def inference(audio_file: str, model_name: str, vocals: bool, drums: bool, bass: bool, other: bool, mp3: bool, mp3_bitrate: int) -> Tuple[str, List[str], gr.HTML]: | |
| log_messages = [] | |
| def stream_log(message, style=""): | |
| formatted_message = f"[{model_name}] {message}" | |
| log_messages.append(formatted_message) | |
| for frame in fade_text(f"<pre style='margin-bottom: 0;{style}'>{formatted_message}</pre>"): | |
| yield None, None, gr.HTML(frame) | |
| yield from stream_log("Initializing Demucs...", "color: #4CAF50; font-weight: bold;") | |
| time.sleep(1) # Simulate initialization time | |
| yield from stream_log("Loading audio file...", "color: #2196F3;") | |
| time.sleep(0.5) # Simulate loading time | |
| if audio_file is None: | |
| yield from stream_log("Error: No audio file provided", "color: #F44336;") | |
| raise gr.Error("Please upload an audio file") | |
| # Use absolute paths | |
| base_output_dir = os.path.abspath("separated") | |
| output_dir = os.path.join(base_output_dir, model_name, os.path.splitext(os.path.basename(audio_file))[0]) | |
| os.makedirs(output_dir, exist_ok=True) | |
| # Check if CUDA is available | |
| cuda_available = torch.cuda.is_available() | |
| device = "cuda" if cuda_available else "cpu" | |
| yield from stream_log(f"Using device: {device}", "color: #4CAF50; font-weight: bold;") | |
| # Construct the Demucs command with full paths and GPU flag | |
| cmd = [ | |
| "python", "-m", "demucs", | |
| "--out", base_output_dir, | |
| "-n", model_name, | |
| "--device", device, | |
| audio_file | |
| ] | |
| yield from stream_log("Preparing separation process...", "color: #FF9800;") | |
| time.sleep(0.5) # Simulate preparation time | |
| try: | |
| # Set CUDA_VISIBLE_DEVICES environment variable | |
| env = os.environ.copy() | |
| if cuda_available: | |
| env["CUDA_VISIBLE_DEVICES"] = "0" # Use the first GPU | |
| # Run the Demucs command | |
| process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, env=env) | |
| # Simulate a loading animation with adjusted width | |
| progress_width = min(terminal_width - 20, 60) # Adjust the width of the progress bar | |
| with Progress( | |
| SpinnerColumn(), | |
| TextColumn("[progress.description]{task.description}"), | |
| BarColumn(bar_width=progress_width), | |
| TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), | |
| TimeRemainingColumn(), | |
| console=console | |
| ) as progress: | |
| task = progress.add_task("[cyan]Separating stems...", total=100) | |
| while process.poll() is None: | |
| progress.update(task, advance=1) | |
| time.sleep(0.1) | |
| progress.update(task, completed=100) | |
| if process.returncode != 0: | |
| error_output = process.stderr.read() | |
| yield from stream_log(f"Error: Separation failed", "color: #F44336;") | |
| raise gr.Error(f"Demucs separation failed. Check the logs for details.") | |
| except Exception as e: | |
| yield from stream_log(f"Unexpected error: {str(e)}", "color: #F44336;") | |
| raise gr.Error(f"An unexpected error occurred: {str(e)}") | |
| yield from stream_log("Separation completed successfully!", "color: #4CAF50; font-weight: bold;") | |
| time.sleep(0.5) # Pause for effect | |
| yield from stream_log("Processing stems...", "color: #9C27B0;") | |
| time.sleep(0.5) # Simulate processing time | |
| # Change the stem search directory using full path | |
| stem_search_dir = os.path.join(base_output_dir, model_name, os.path.splitext(os.path.basename(audio_file))[0]) | |
| yield from stream_log(f"Searching for stems in: {stem_search_dir}") | |
| stems: Dict[str, str] = {} | |
| for stem in ["vocals", "drums", "bass", "other"]: | |
| stem_path = os.path.join(stem_search_dir, f"{stem}.wav") | |
| yield from stream_log(f"Checking for {stem} stem at: {stem_path}") | |
| if os.path.exists(stem_path): | |
| stems[stem] = stem_path | |
| yield from stream_log(f"Found {stem} stem") | |
| else: | |
| yield from stream_log(f"Warning: {stem} stem not found") | |
| if not stems: | |
| yield from stream_log("Error: No stems found. Checking alternative directory...") | |
| stem_search_dir = os.path.join(base_output_dir, model_name) | |
| for stem in ["vocals", "drums", "bass", "other"]: | |
| stem_path = os.path.join(stem_search_dir, f"{stem}.wav") | |
| yield from stream_log(f"Checking for {stem} stem at: {stem_path}") | |
| if os.path.exists(stem_path): | |
| stems[stem] = stem_path | |
| yield from stream_log(f"Found {stem} stem") | |
| else: | |
| yield from stream_log(f"Warning: {stem} stem not found") | |
| yield from stream_log(f"All found stems: {list(stems.keys())}") | |
| selected_stems: List[str] = [] | |
| for stem, selected in zip(["vocals", "drums", "bass", "other"], [vocals, drums, bass, other]): | |
| if selected: | |
| yield from stream_log(f"{stem} is selected by user") | |
| if stem in stems: | |
| selected_stems.append(stems[stem]) | |
| yield from stream_log(f"Selected {stem} stem for mixing") | |
| else: | |
| yield from stream_log(f"Warning: {stem} was selected but not found") | |
| yield from stream_log(f"Final selected stems: {selected_stems}") | |
| if not selected_stems: | |
| yield from stream_log("Error: No stems selected for mixing", "color: #F44336;") | |
| raise gr.Error("Please select at least one stem to mix and ensure it was successfully separated.") | |
| output_file: str = os.path.join(output_dir, "mixed.wav") | |
| yield from stream_log("Mixing selected stems...", "color: #FF5722;") | |
| time.sleep(0.5) # Simulate mixing time | |
| if selected_stems: | |
| # Load the first stem as the base | |
| mixed_audio: AudioSegment = AudioSegment.from_wav(selected_stems[0]) | |
| # Overlay the remaining stems | |
| for stem_path in selected_stems[1:]: | |
| overlay_audio = AudioSegment.from_wav(stem_path) | |
| # Ensure both segments have the same duration | |
| max_length = max(len(mixed_audio), len(overlay_audio)) | |
| if len(mixed_audio) < max_length: | |
| mixed_audio += AudioSegment.silent(duration=max_length - len(mixed_audio)) | |
| if len(overlay_audio) < max_length: | |
| overlay_audio += AudioSegment.silent(duration=max_length - len(overlay_audio)) | |
| # Overlay the audio | |
| mixed_audio = mixed_audio.overlay(overlay_audio) | |
| # Export the mixed audio | |
| mixed_audio.export(output_file, format="wav") | |
| else: | |
| yield from stream_log("Error: No stems to mix", "color: #F44336;") | |
| raise gr.Error("No stems were selected or found for mixing.") | |
| if mp3: | |
| yield from stream_log(f"Converting to MP3...", "color: #795548;") | |
| time.sleep(0.5) # Simulate conversion time | |
| mp3_output_file: str = os.path.splitext(output_file)[0] + ".mp3" | |
| mixed_audio.export(mp3_output_file, format="mp3", bitrate=str(mp3_bitrate) + "k") | |
| output_file = mp3_output_file | |
| yield from stream_log("Process completed successfully!", "color: #4CAF50; font-weight: bold;") | |
| yield output_file, list(stems.values()), gr.HTML("<pre style='color: green; font-weight: bold;'>Separation and mixing completed successfully!</pre>") | |
| # Define the Gradio interface | |
| with gr.Blocks() as iface: | |
| gr.Markdown("# Demucs Music Source Separation and Mixing") | |
| gr.Markdown("Separate vocals, drums, bass, and other instruments from your music using Demucs and mix the selected stems.") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| audio_input = gr.Audio(type="filepath", label="Upload Audio File") | |
| model_dropdown = gr.Dropdown( | |
| ["htdemucs", "htdemucs_ft", "htdemucs_6s", "hdemucs_mmi", "mdx", "mdx_extra", "mdx_q", "mdx_extra_q"], | |
| label="Model Name", | |
| value="htdemucs_ft" | |
| ) | |
| with gr.Row(): | |
| vocals_checkbox = gr.Checkbox(label="Vocals", value=True) | |
| drums_checkbox = gr.Checkbox(label="Drums", value=True) | |
| with gr.Row(): | |
| bass_checkbox = gr.Checkbox(label="Bass", value=True) | |
| other_checkbox = gr.Checkbox(label="Other", value=True) | |
| mp3_checkbox = gr.Checkbox(label="Save as MP3", value=False) | |
| mp3_bitrate = gr.Slider(128, 320, step=32, label="MP3 Bitrate", visible=False) | |
| submit_btn = gr.Button("Process", variant="primary") | |
| with gr.Column(scale=1): | |
| output_audio = gr.Audio(type="filepath", label="Processed Audio (Mixed)") | |
| stems_output = gr.File(label="Individual Stems", file_count="multiple") | |
| separation_log = gr.HTML() | |
| submit_btn.click( | |
| fn=inference, | |
| inputs=[audio_input, model_dropdown, vocals_checkbox, drums_checkbox, bass_checkbox, other_checkbox, mp3_checkbox, mp3_bitrate], | |
| outputs=[output_audio, stems_output, separation_log] | |
| ) | |
| mp3_checkbox.change( | |
| fn=lambda mp3: gr.update(visible=mp3), | |
| inputs=mp3_checkbox, | |
| outputs=mp3_bitrate | |
| ) | |
| # Launch the Gradio interface | |
| iface.launch() | |