Spaces:
Sleeping
Sleeping
| import numpy as np | |
| import soundfile as sf | |
| import subprocess | |
| import tempfile | |
| import os | |
| import gradio as gr | |
| from scipy import signal | |
| # ========== Processing Functions ========== | |
| def convert_to_wav_float(input_file): | |
| """ | |
| Convert any input audio to 32-bit float WAV to preserve full dynamic range. | |
| """ | |
| temp_wav = tempfile.NamedTemporaryFile(suffix=".wav", delete=False) | |
| temp_wav.close() | |
| # PCM 32-bit little endian preserves float dynamic without clipping | |
| subprocess.run([ | |
| "ffmpeg", "-y", "-i", input_file, | |
| "-c:a", "pcm_f32le", "-f", "wav", temp_wav.name | |
| ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True) | |
| return temp_wav.name | |
| def apply_reverb_wet_only(audio, samplerate, reverb_args): | |
| """ | |
| Apply wet-only reverb using SoX to a single channel with custom reverb args. | |
| """ | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tin, \ | |
| tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tout: | |
| sf.write(tin.name, audio, samplerate, subtype='FLOAT') | |
| subprocess.run( | |
| ["sox", tin.name, tout.name, "reverb", "-w"] + reverb_args, | |
| stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True | |
| ) | |
| wet, _ = sf.read(tout.name, dtype='float32') | |
| os.unlink(tin.name) | |
| os.unlink(tout.name) | |
| return wet | |
| def sox_filter(audio, samplerate, filter_type, cutoff): | |
| """ | |
| Apply highpass or lowpass filter via SoX. | |
| filter_type: 'highpass' or 'lowpass'; cutoff in Hz. | |
| """ | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tin, \ | |
| tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tout: | |
| sf.write(tin.name, audio, samplerate, subtype='FLOAT') | |
| subprocess.run( | |
| ["sox", tin.name, tout.name, filter_type, str(cutoff)], | |
| stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True | |
| ) | |
| out, _ = sf.read(tout.name, dtype='float32') | |
| os.unlink(tin.name) | |
| os.unlink(tout.name) | |
| return out | |
| def extract_phantom_center(input_file, rdf=0.99999): | |
| """ | |
| Returns FL (front left without centre), FR, and FC (phantom centre). | |
| """ | |
| wav = convert_to_wav_float(input_file) | |
| data, fs = sf.read(wav, dtype='float32') | |
| os.unlink(wav) | |
| if data.ndim != 2 or data.shape[1] != 2: | |
| raise gr.Error("Input must be stereo 2-channel") | |
| L, R = data[:,0], data[:,1] | |
| M = (L + R) / 2 | |
| nperseg = fs | |
| noverlap = nperseg // 2 | |
| _, _, ZL = signal.stft(L, fs=fs, nperseg=nperseg, noverlap=noverlap) | |
| _, _, ZR = signal.stft(R, fs=fs, nperseg=nperseg, noverlap=noverlap) | |
| _, _, ZM = signal.stft(M, fs=fs, nperseg=nperseg, noverlap=noverlap) | |
| Zc = np.minimum(np.abs(ZL), np.abs(ZR)) * np.exp(1j * np.angle(ZM)) | |
| Zl_res = ZL - Zc * rdf | |
| Zr_res = ZR - Zc * rdf | |
| _, FL = signal.istft(Zl_res, fs=fs, nperseg=nperseg, noverlap=noverlap) | |
| _, FR = signal.istft(Zr_res, fs=fs, nperseg=nperseg, noverlap=noverlap) | |
| _, FC = signal.istft(Zc, fs=fs, nperseg=nperseg, noverlap=noverlap) | |
| return fs, FL[:len(L)], FR[:len(R)], FC[:len(M)] | |
| def create_5_1_surround(input_file, preset="music"): | |
| print("Starting Normal Processing") | |
| p = gr.Progress() | |
| # Preset-based parameters | |
| # Reverberance (50%) HF-damping (50%) room-scale (100%) stereo-depth (100%) pre-delay (0ms) wet-gain (0dB) | |
| if preset == "music": | |
| hp_cutoff = 120 | |
| lfe_cutoff = 120 | |
| reverb_args = ['70', '40', '100', '95', '10', '-2'] | |
| elif preset == "speech": | |
| hp_cutoff = 120 | |
| lfe_cutoff = 120 | |
| reverb_args = ['50', '99', '50', '70', '0', '0'] | |
| elif preset == "open": | |
| hp_cutoff = 120 | |
| lfe_cutoff = 120 | |
| reverb_args = ['20', '50', '100', '100', '100', '0'] | |
| else: | |
| raise gr.Error(f"Unknown preset: {preset}") | |
| p((1,7),"Extracting Centre")# 1. Extract FL/FR/phantom centre | |
| fs, FL, FR, FC = extract_phantom_center(input_file) | |
| p((2,7),"Getting File")# 2. Get stereo original for reverb | |
| wav = convert_to_wav_float(input_file) | |
| stereo, _ = sf.read(wav, dtype='float32') | |
| os.unlink(wav) | |
| L_orig, R_orig = stereo[:, 0], stereo[:, 1] | |
| p((3,7),"Reverb For Rear")# 3. Wet-only reverb with chosen settings | |
| SL = apply_reverb_wet_only(L_orig, fs, reverb_args) | |
| SR = apply_reverb_wet_only(R_orig, fs, reverb_args) | |
| p((4,7),"Highpassing")# 4. Highpass filter everything except LFE | |
| FL_hp = sox_filter(FL, fs, 'highpass', hp_cutoff) | |
| FR_hp = sox_filter(FR, fs, 'highpass', hp_cutoff) | |
| FC_hp = sox_filter(FC, fs, 'highpass', hp_cutoff) | |
| SL_hp = sox_filter(SL, fs, 'highpass', hp_cutoff) | |
| SR_hp = sox_filter(SR, fs, 'highpass', hp_cutoff) | |
| p((5,7),"Extracting LFE")# 5. Lowpass for LFE | |
| bass_sum = .5 * (L_orig + R_orig) | |
| LFE = sox_filter(bass_sum, fs, 'lowpass', lfe_cutoff) | |
| p((6,7),"Stacking")# 6. Stack and pad | |
| channels = [FL_hp, FR_hp, FC_hp, LFE, SL_hp, SR_hp] | |
| length = max(len(ch) for ch in channels) | |
| def pad(x): return np.pad(x, (0, length - len(x))) | |
| multich = np.column_stack([pad(ch) for ch in channels]) | |
| p((7,7),"Encoding")# 7. Write WAV and encode to OGG | |
| out_wav = tempfile.NamedTemporaryFile(suffix='.wav', delete=False) | |
| sf.write(out_wav.name, multich, fs, subtype='FLOAT') | |
| out_wav.close() | |
| out_ogg = tempfile.NamedTemporaryFile(suffix='.ogg', delete=False) | |
| out_ogg.close() | |
| subprocess.run([ | |
| "ffmpeg", "-y", "-i", out_wav.name, | |
| "-c:a", "libvorbis", "-ac", "6", "-channel_layout", "5.1", out_ogg.name | |
| ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True) | |
| os.unlink(out_wav.name) | |
| return out_ogg.name | |
| import mimetypes | |
| import requests | |
| import time | |
| def send_mvsep_audio_job( | |
| api_token: str, | |
| audio_bytes: bytes, | |
| filename: str, | |
| sep_type: int = 34, | |
| output_format: int = 2, | |
| addopt1: str = None, | |
| addopt2: str = None, | |
| poll_interval_sec: int = 5 | |
| ): | |
| """ | |
| Send audio to MVSep for source separation and wait for the result. | |
| Args: | |
| api_token (str): Your API token. | |
| audio_bytes (bytes): Audio data (any format). | |
| filename (str): Original filename, used for extension/MIME type. | |
| sep_type (int): Separation type (e.g., 34 for karaoke). | |
| output_format (int): Output format (e.g., 2 for FLAC). | |
| addopt1 (str): Optional extra parameter 1. | |
| addopt2 (str): Optional extra parameter 2. | |
| poll_interval_sec (int): How often to check job status. | |
| Returns: | |
| dict: Completed result data from mvsep.com (including file URLs). | |
| """ | |
| # Step 1: Determine MIME type | |
| mime_type, _ = mimetypes.guess_type(filename) | |
| if not mime_type: | |
| mime_type = "application/octet-stream" # fallback | |
| # Step 2: Prepare request | |
| url = "https://mvsep.com/api/separation/create" | |
| files = { | |
| 'audiofile': (filename, audio_bytes, mime_type) | |
| } | |
| data = { | |
| 'api_token': api_token, | |
| 'sep_type': str(sep_type), | |
| 'output_format': str(output_format) | |
| } | |
| if addopt1: | |
| data['add_opt1'] = str(addopt1) | |
| if addopt2: | |
| data['add_opt2'] = str(addopt2) | |
| # Step 3: Send creation request | |
| response = requests.post(url, files=files, data=data) | |
| response.raise_for_status() | |
| json_resp = response.json() | |
| if not json_resp.get('success'): | |
| error_msg = json_resp.get('data', {}).get('message', 'Unknown error') | |
| print(json_resp) | |
| raise gr.Error(f"API error: {error_msg}") | |
| job_hash = json_resp['data']['hash'] | |
| print(f"Job submitted successfully. Hash: {job_hash}") | |
| # Step 4: Poll until job is done | |
| status_url = "https://mvsep.com/api/separation/get" | |
| while True: | |
| poll_resp = requests.get(status_url, params={'hash': job_hash}) | |
| poll_resp.raise_for_status() | |
| poll_data = poll_resp.json() | |
| status = poll_data.get('status') | |
| print(f"Job status: {status}") | |
| if status == 'done': | |
| return poll_data.get('data', {}) | |
| elif status in ('failed', 'not_found'): | |
| raise gr.Error(f"Job failed or not found: {poll_data.get('data', {}).get('message', '')}") | |
| time.sleep(poll_interval_sec) | |
| # Download WAV and preserve sample rate, with optional resampling to target_fs | |
| def download_wav(url, target_fs=None): | |
| r = requests.get(url) | |
| r.raise_for_status() | |
| temp = tempfile.NamedTemporaryFile(suffix=".wav", delete=False) | |
| temp.write(r.content) | |
| temp.close() | |
| audio, sr = sf.read(temp.name, dtype='float32') | |
| os.unlink(temp.name) | |
| if target_fs and sr != target_fs: | |
| # resample if needed | |
| num_samples = int(len(audio) * target_fs / sr) | |
| audio = signal.resample(audio, num_samples) | |
| sr = target_fs | |
| return audio, sr | |
| # Smart mode workflow | |
| def smart_mode_process(input_file, api_key, multi_singer=False): | |
| print("Starting Smartmode") | |
| p = gr.Progress() | |
| import shutil | |
| if not api_key: | |
| raise gr.Error("An MVSep API Key Is Required For This. Get your key <a href=\"https://mvsep.com/user-api\">Here</a>. it's Free!") | |
| # Load original | |
| wav = convert_to_wav_float(input_file) | |
| data, fs = sf.read(wav, dtype='float32') | |
| os.unlink(wav) | |
| p((0, 8), "Loading File") | |
| if data.ndim != 2 or data.shape[1] != 2: | |
| raise gr.Error("Expected stereo input (2 channels), got something else.") | |
| L, R = data[:, 0], data[:, 1] | |
| # Step 1: LFE from lowpass | |
| p((1, 8), "Processing LFE") | |
| bass = sox_filter(0.5 * (L + R), fs, 'lowpass', 120) | |
| # Step 2: Highpass for crowd extraction | |
| p((2, 8), "Extracting Crowd") | |
| hp_left = sox_filter(L, fs, 'highpass', 120) | |
| hp_right = sox_filter(R, fs, 'highpass', 120) | |
| hp_stereo = np.column_stack([hp_left, hp_right]) | |
| music_buf = tempfile.NamedTemporaryFile(suffix=".flac", delete=False) | |
| sf.write(music_buf.name, hp_stereo, fs, format='FLAC', subtype='PCM_16') | |
| music_buf.close() | |
| crowd_resp = send_mvsep_audio_job( | |
| api_key, open(music_buf.name, 'rb').read(), os.path.basename(music_buf.name), | |
| sep_type=34, output_format=2, addopt1=0 | |
| ) | |
| os.unlink(music_buf.name) | |
| crowd, _ = download_wav(crowd_resp['files'][0]['url'], target_fs=fs) | |
| other_after_crowd, _ = download_wav(crowd_resp['files'][1]['url'], target_fs=fs) | |
| # Step 3: Speech, music, SFX separation from 'other_after_crowd' | |
| p((3, 8), "Separating Speech, Music, and SFX") | |
| demucs_input_buf = tempfile.NamedTemporaryFile(suffix=".flac", delete=False) | |
| sf.write(demucs_input_buf.name, other_after_crowd, fs, format='FLAC', subtype='PCM_16') | |
| demucs_input_buf.close() | |
| demucs_resp = send_mvsep_audio_job( | |
| api_key, open(demucs_input_buf.name, 'rb').read(), os.path.basename(demucs_input_buf.name), | |
| sep_type=24, output_format=2 | |
| ) | |
| os.unlink(demucs_input_buf.name) | |
| dialog, _ = download_wav(demucs_resp['files'][0]['url'], target_fs=fs) | |
| sfx, _ = download_wav(demucs_resp['files'][2]['url'], target_fs=fs) | |
| music, _ = download_wav(demucs_resp['files'][1]['url'], target_fs=fs) | |
| # Step 4: Apply Reverb to the 'music' stem | |
| p((4, 8), "Applying Reverb") | |
| reverb_args = ['20', '50', '100', '100', '100', '0'] # open preset | |
| reverb_L = apply_reverb_wet_only(music[:, 0], fs, reverb_args) | |
| reverb_R = apply_reverb_wet_only(music[:, 1], fs, reverb_args) | |
| reverb = np.column_stack([reverb_L, reverb_R]) | |
| # Step 5: Vocal Extraction from music | |
| p((5, 8), "Extracting Vocals") | |
| music_buf = tempfile.NamedTemporaryFile(suffix=".flac", delete=False) | |
| sf.write(music_buf.name, music, fs, format='FLAC', subtype='PCM_16') | |
| music_buf.close() | |
| karaoke_resp = send_mvsep_audio_job( | |
| api_key, open(music_buf.name, 'rb').read(), os.path.basename(music_buf.name), | |
| sep_type=49, output_format=2, addopt1=3, addopt2=1 | |
| ) | |
| os.unlink(music_buf.name) | |
| vocals_full, _ = download_wav(karaoke_resp['files'][0]['url'], target_fs=fs) | |
| vocals_lead, _ = download_wav(karaoke_resp['files'][1]['url'], target_fs=fs) | |
| vocals_back, _ = download_wav(karaoke_resp['files'][2]['url'], target_fs=fs) | |
| instr, _ = download_wav(karaoke_resp['files'][3]['url'], target_fs=fs) | |
| # Step 6: Phantom center on vocals (lead or full) | |
| p((6, 8), "Phantom Center for Lead Vocals") | |
| vl_buf = tempfile.NamedTemporaryFile(suffix=".wav", delete=False) | |
| sf.write(vl_buf.name, vocals_full if multi_singer else vocals_lead, fs, subtype='FLOAT') | |
| vl_buf.close() | |
| _, FL_vl, FR_vl, FC_vl = extract_phantom_center(vl_buf.name) | |
| os.unlink(vl_buf.name) | |
| # Mix dialog into the centre channel | |
| FC_vl += dialog[:, 0] if dialog.ndim == 2 else dialog | |
| # Step 7: Mapping and stacking | |
| p((7, 8), "Mapping Channels and Encoding") | |
| def match_len(x, length): return np.pad(x, (0, length - len(x))) | |
| lens = [len(FL_vl), len(FR_vl), len(FC_vl), len(bass), len(sfx), crowd.shape[0], vocals_back.shape[0], instr.shape[0], len(reverb)] | |
| length = max(lens) | |
| # FL and FR: Lead vocals + SFX + instruments | |
| out_L = match_len(FL_vl, length) + match_len(sfx[:, 0], length) + match_len(instr[:, 0], length) | |
| out_R = match_len(FR_vl, length) + match_len(sfx[:, 1], length) + match_len(instr[:, 1], length) | |
| out_C = match_len(FC_vl, length) | |
| out_LFE = match_len(bass, length) | |
| # SL/SR: Use reverb output | |
| SL = match_len(reverb[:, 0], length) | |
| SR = match_len(reverb[:, 1], length) | |
| if not multi_singer: | |
| SL += match_len(vocals_back[:, 0], length) | |
| SR += match_len(vocals_back[:, 1], length) | |
| SL += match_len(crowd[:, 0], length) | |
| SR += match_len(crowd[:, 1], length) | |
| multich = np.column_stack([out_L, out_R, out_C, out_LFE, SL, SR]) | |
| out_wav = tempfile.NamedTemporaryFile(suffix='.wav', delete=False) | |
| sf.write(out_wav.name, multich, fs, subtype='FLOAT') | |
| out_wav.close() | |
| out_ogg = tempfile.NamedTemporaryFile(suffix='.ogg', delete=False) | |
| subprocess.run([ | |
| "ffmpeg", "-y", "-i", out_wav.name, | |
| "-c:a", "libvorbis", "-ac", "6", "-channel_layout", "5.1", out_ogg.name | |
| ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True) | |
| os.unlink(out_wav.name) | |
| return out_ogg.name | |
| # ========== Gradio UI ========== | |
| with gr.Blocks(title="Stereo to 5.1 Surround") as demo: | |
| gr.Markdown("# 🎧 Stereo to 5.1 Converter") | |
| gr.Markdown("Convert A Stereo File Into Surround") | |
| inp = gr.Audio(label="Upload stereo audio", type="filepath") | |
| smart_mode = gr.Checkbox(label="Enable Smart Mode", value=False) | |
| # Normal mode elements | |
| preset = gr.Dropdown( | |
| label="Select Preset", | |
| choices=["music", "speech", "open"], | |
| value="music" | |
| ) | |
| btn = gr.Button("Convert to 5.1 OGG") | |
| out = gr.File(label="Download 5.1 OGG") | |
| # Smart mode section | |
| with gr.Column(visible=False) as smart_section: | |
| api_key = gr.Textbox(label="MVSep API Key", type="password") | |
| multi_singer = gr.Checkbox(label="Multi Singer Mode", value=False) | |
| smart_btn = gr.Button("Convert") | |
| smart_out = gr.File(label="Output") | |
| # Logic for toggling sections | |
| def toggle_mode(enabled): | |
| return ( | |
| gr.update(visible=not enabled), # preset | |
| gr.update(visible=not enabled), # btn | |
| gr.update(visible=not enabled), # out | |
| gr.update(visible=enabled) # smart_section | |
| ) | |
| smart_mode.change( | |
| fn=toggle_mode, | |
| inputs=[smart_mode], | |
| outputs=[preset, btn, out, smart_section] | |
| ) | |
| # Button functions | |
| btn.click(fn=create_5_1_surround, inputs=[inp, preset], outputs=[out], concurrency_limit=10) | |
| smart_btn.click(fn=smart_mode_process, inputs=[inp, api_key, multi_singer], outputs=[smart_out], concurrency_limit=20) | |
| if __name__ == "__main__": | |
| demo.launch(show_error=True) |