| | import os |
| | import sys |
| | import importlib.util |
| | import site |
| | import json |
| | import torch |
| | import gradio as gr |
| | import torchaudio |
| | import numpy as np |
| | from huggingface_hub import snapshot_download, hf_hub_download |
| | import subprocess |
| | import re |
| | import spaces |
| | import uuid |
| | import soundfile as sf |
| |
|
| | |
| | downloaded_resources = { |
| | "configs": False, |
| | "tokenizer_vq8192": False, |
| | "fmt_Vq8192ToMels": False, |
| | "vocoder": False |
| | } |
| |
|
| | def install_espeak(): |
| | try: |
| | result = subprocess.run(["which", "espeak-ng"], capture_output=True, text=True) |
| | if result.returncode != 0: |
| | print("Installing espeak-ng...") |
| | subprocess.run(["apt-get", "update"], check=True) |
| | subprocess.run(["apt-get", "install", "-y", "espeak-ng", "espeak-ng-data"], check=True) |
| | except Exception as e: |
| | print(f"Error installing espeak-ng: {e}") |
| |
|
| | install_espeak() |
| |
|
| | def patch_langsegment_init(): |
| | try: |
| | spec = importlib.util.find_spec("LangSegment") |
| | if spec is None or spec.origin is None: return |
| | init_path = os.path.join(os.path.dirname(spec.origin), '__init__.py') |
| | if not os.path.exists(init_path): |
| | for site_pkg_path in site.getsitepackages(): |
| | potential_path = os.path.join(site_pkg_path, 'LangSegment', '__init__.py') |
| | if os.path.exists(potential_path): |
| | init_path = potential_path |
| | break |
| | else: return |
| |
|
| | with open(init_path, 'r') as f: lines = f.readlines() |
| | modified = False |
| | new_lines = [] |
| | target_line_prefix = "from .LangSegment import" |
| |
|
| | for line in lines: |
| | if line.strip().startswith(target_line_prefix) and ('setLangfilters' in line or 'getLangfilters' in line): |
| | mod_line = line.replace(',setLangfilters', '').replace(',getLangfilters', '') |
| | mod_line = mod_line.replace('setLangfilters,', '').replace('getLangfilters,', '').rstrip(',') |
| | new_lines.append(mod_line + '\n') |
| | modified = True |
| | else: |
| | new_lines.append(line) |
| |
|
| | if modified: |
| | with open(init_path, 'w') as f: f.writelines(new_lines) |
| | try: |
| | import LangSegment |
| | importlib.reload(LangSegment) |
| | except: pass |
| | except: pass |
| |
|
| | patch_langsegment_init() |
| |
|
| | if not os.path.exists("Amphion"): |
| | subprocess.run(["git", "clone", "https://github.com/open-mmlab/Amphion.git"]) |
| | os.chdir("Amphion") |
| | else: |
| | if not os.getcwd().endswith("Amphion"): |
| | os.chdir("Amphion") |
| |
|
| | if os.path.dirname(os.path.abspath("Amphion")) not in sys.path: |
| | sys.path.append(os.path.dirname(os.path.abspath("Amphion"))) |
| |
|
| | os.makedirs("wav", exist_ok=True) |
| | os.makedirs("ckpts/Vevo", exist_ok=True) |
| |
|
| | from models.vc.vevo.vevo_utils import VevoInferencePipeline |
| |
|
| | |
| | def my_save_audio(waveform, output_path, sample_rate=24000): |
| | try: |
| | if isinstance(waveform, torch.Tensor): |
| | waveform = waveform.detach().cpu() |
| | if waveform.dim() == 2 and waveform.shape[0] == 1: |
| | waveform = waveform.squeeze(0) |
| | waveform = waveform.numpy() |
| | sf.write(output_path, waveform, sample_rate) |
| | except Exception as e: |
| | print(f"Save error: {e}") |
| | raise e |
| |
|
| | def setup_configs(): |
| | if downloaded_resources["configs"]: return |
| | config_path = "models/vc/vevo/config" |
| | os.makedirs(config_path, exist_ok=True) |
| | config_files = ["Vq8192ToMels.json", "Vocoder.json"] |
| | |
| | for file in config_files: |
| | file_path = f"{config_path}/{file}" |
| | if not os.path.exists(file_path): |
| | try: |
| | file_data = hf_hub_download(repo_id="amphion/Vevo", filename=f"config/{file}", repo_type="model") |
| | subprocess.run(["cp", file_data, file_path]) |
| | except Exception as e: print(f"Error downloading config {file}: {e}") |
| | downloaded_resources["configs"] = True |
| |
|
| | setup_configs() |
| |
|
| | device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu") |
| | print(f"Using device: {device}") |
| |
|
| | inference_pipelines = {} |
| |
|
| | def preload_all_resources(): |
| | print("Preloading resources...") |
| | setup_configs() |
| | |
| | global downloaded_content_style_tokenizer_path |
| | global downloaded_fmt_path |
| | global downloaded_vocoder_path |
| | |
| | if not downloaded_resources["tokenizer_vq8192"]: |
| | local_dir = snapshot_download(repo_id="amphion/Vevo", repo_type="model", cache_dir="./ckpts/Vevo", allow_patterns=["tokenizer/vq8192/*"]) |
| | downloaded_content_style_tokenizer_path = local_dir |
| | downloaded_resources["tokenizer_vq8192"] = True |
| | |
| | if not downloaded_resources["fmt_Vq8192ToMels"]: |
| | local_dir = snapshot_download(repo_id="amphion/Vevo", repo_type="model", cache_dir="./ckpts/Vevo", allow_patterns=["acoustic_modeling/Vq8192ToMels/*"]) |
| | downloaded_fmt_path = local_dir |
| | downloaded_resources["fmt_Vq8192ToMels"] = True |
| | |
| | if not downloaded_resources["vocoder"]: |
| | local_dir = snapshot_download(repo_id="amphion/Vevo", repo_type="model", cache_dir="./ckpts/Vevo", allow_patterns=["acoustic_modeling/Vocoder/*"]) |
| | downloaded_vocoder_path = local_dir |
| | downloaded_resources["vocoder"] = True |
| | print("Resources ready.") |
| |
|
| | downloaded_content_style_tokenizer_path = None |
| | downloaded_fmt_path = None |
| | downloaded_vocoder_path = None |
| |
|
| | preload_all_resources() |
| |
|
| | def get_pipeline(): |
| | if "timbre" in inference_pipelines: |
| | return inference_pipelines["timbre"] |
| | |
| | pipeline = VevoInferencePipeline( |
| | content_style_tokenizer_ckpt_path=os.path.join(downloaded_content_style_tokenizer_path, "tokenizer/vq8192"), |
| | fmt_cfg_path="./models/vc/vevo/config/Vq8192ToMels.json", |
| | fmt_ckpt_path=os.path.join(downloaded_fmt_path, "acoustic_modeling/Vq8192ToMels"), |
| | vocoder_cfg_path="./models/vc/vevo/config/Vocoder.json", |
| | vocoder_ckpt_path=os.path.join(downloaded_vocoder_path, "acoustic_modeling/Vocoder"), |
| | device=device, |
| | ) |
| | |
| | inference_pipelines["timbre"] = pipeline |
| | return pipeline |
| |
|
| | @spaces.GPU() |
| | def vevo_timbre(content_wav, reference_wav): |
| | |
| | session_id = str(uuid.uuid4())[:8] |
| | temp_content_path = f"wav/c_{session_id}.wav" |
| | temp_reference_path = f"wav/r_{session_id}.wav" |
| | output_path = f"wav/out_{session_id}.wav" |
| | |
| | if content_wav is None or reference_wav is None: |
| | raise ValueError("Please upload audio files") |
| | |
| | try: |
| | |
| | if isinstance(content_wav, tuple): |
| | content_sr, content_data = content_wav if isinstance(content_wav[0], int) else (content_wav[1], content_wav[0]) |
| | else: |
| | content_sr, content_data = content_wav |
| |
|
| | if len(content_data.shape) > 1 and content_data.shape[1] > 1: |
| | content_data = np.mean(content_data, axis=1) |
| | |
| | content_tensor = torch.FloatTensor(content_data).unsqueeze(0) |
| | |
| | if content_sr != 24000: |
| | content_tensor = torchaudio.functional.resample(content_tensor, content_sr, 24000) |
| | content_sr = 24000 |
| | |
| | |
| | content_tensor = content_tensor / (torch.max(torch.abs(content_tensor)) + 1e-6) * 0.95 |
| |
|
| | |
| | if isinstance(reference_wav, tuple): |
| | ref_sr, ref_data = reference_wav if isinstance(reference_wav[0], int) else (reference_wav[1], reference_wav[0]) |
| | else: |
| | ref_sr, ref_data = reference_wav |
| |
|
| | if len(ref_data.shape) > 1 and ref_data.shape[1] > 1: |
| | ref_data = np.mean(ref_data, axis=1) |
| |
|
| | ref_tensor = torch.FloatTensor(ref_data).unsqueeze(0) |
| | if ref_sr != 24000: |
| | ref_tensor = torchaudio.functional.resample(ref_tensor, ref_sr, 24000) |
| | ref_sr = 24000 |
| | |
| | ref_tensor = ref_tensor / (torch.max(torch.abs(ref_tensor)) + 1e-6) * 0.95 |
| | |
| | |
| | sf.write(temp_content_path, content_tensor.squeeze().cpu().numpy(), content_sr) |
| | sf.write(temp_reference_path, ref_tensor.squeeze().cpu().numpy(), ref_sr) |
| | |
| | print(f"[{session_id}] Processing Audio ({content_tensor.shape[1]/24000:.2f}s)...") |
| | |
| | pipeline = get_pipeline() |
| | |
| | |
| | gen_audio = pipeline.inference_fm( |
| | src_wav_path=temp_content_path, |
| | timbre_ref_wav_path=temp_reference_path, |
| | flow_matching_steps=32, |
| | ) |
| | |
| | if torch.isnan(gen_audio).any() or torch.isinf(gen_audio).any(): |
| | print("Warning: NaN fixed") |
| | gen_audio = torch.nan_to_num(gen_audio, nan=0.0, posinf=0.95, neginf=-0.95) |
| | |
| | my_save_audio(gen_audio, output_path=output_path) |
| | return output_path |
| |
|
| | finally: |
| | |
| | if os.path.exists(temp_content_path): os.remove(temp_content_path) |
| | if os.path.exists(temp_reference_path): os.remove(temp_reference_path) |
| |
|
| | with gr.Blocks(title="Vevo-Timbre (Secure)") as demo: |
| | gr.Markdown("## Vevo-Timbre: Zero-Shot Voice Conversion") |
| | |
| | with gr.Row(): |
| | with gr.Column(): |
| | timbre_content = gr.Audio(label="Source Audio", type="numpy") |
| | timbre_reference = gr.Audio(label="Target Timbre", type="numpy") |
| | timbre_button = gr.Button("Generate", variant="primary") |
| | with gr.Column(): |
| | timbre_output = gr.Audio(label="Result") |
| | |
| | timbre_button.click(vevo_timbre, inputs=[timbre_content, timbre_reference], outputs=timbre_output) |
| |
|
| | demo.launch() |