Sada / app.py
Opera8's picture
Update app.py
ef837dd verified
raw
history blame
16.5 kB
import os
import sys
import importlib.util
import site
import json
import torch
import gradio as gr
import torchaudio
import numpy as np
from huggingface_hub import snapshot_download, hf_hub_download
import subprocess
import re
import spaces
import uuid
import soundfile as sf
# --- بخش نصب و راه‌اندازی اولیه ---
downloaded_resources = {
"configs": False,
"tokenizer_vq8192": False,
"fmt_Vq8192ToMels": False,
"vocoder": False
}
def install_espeak():
try:
result = subprocess.run(["which", "espeak-ng"], capture_output=True, text=True)
if result.returncode != 0:
print("Installing espeak-ng...")
subprocess.run(["apt-get", "update"], check=True)
subprocess.run(["apt-get", "install", "-y", "espeak-ng", "espeak-ng-data"], check=True)
except Exception as e:
print(f"Error installing espeak-ng: {e}")
install_espeak()
def patch_langsegment_init():
try:
spec = importlib.util.find_spec("LangSegment")
if spec is None or spec.origin is None: return
init_path = os.path.join(os.path.dirname(spec.origin), '__init__.py')
if not os.path.exists(init_path):
for site_pkg_path in site.getsitepackages():
potential_path = os.path.join(site_pkg_path, 'LangSegment', '__init__.py')
if os.path.exists(potential_path):
init_path = potential_path
break
else: return
with open(init_path, 'r') as f: lines = f.readlines()
modified = False
new_lines = []
target_line_prefix = "from .LangSegment import"
for line in lines:
if line.strip().startswith(target_line_prefix) and ('setLangfilters' in line or 'getLangfilters' in line):
mod_line = line.replace(',setLangfilters', '').replace(',getLangfilters', '')
mod_line = mod_line.replace('setLangfilters,', '').replace('getLangfilters,', '').rstrip(',')
new_lines.append(mod_line + '\n')
modified = True
else:
new_lines.append(line)
if modified:
with open(init_path, 'w') as f: f.writelines(new_lines)
try:
import LangSegment
importlib.reload(LangSegment)
except: pass
except: pass
patch_langsegment_init()
if not os.path.exists("Amphion"):
subprocess.run(["git", "clone", "https://github.com/open-mmlab/Amphion.git"])
os.chdir("Amphion")
else:
if not os.getcwd().endswith("Amphion"):
os.chdir("Amphion")
if os.path.dirname(os.path.abspath("Amphion")) not in sys.path:
sys.path.append(os.path.dirname(os.path.abspath("Amphion")))
os.makedirs("wav", exist_ok=True)
os.makedirs("ckpts/Vevo", exist_ok=True)
from models.vc.vevo.vevo_utils import VevoInferencePipeline
def save_audio_pcm16(waveform, output_path, sample_rate=24000):
try:
if isinstance(waveform, torch.Tensor):
waveform = waveform.detach().cpu()
if waveform.dim() == 2 and waveform.shape[0] == 1:
waveform = waveform.squeeze(0)
waveform = waveform.numpy()
sf.write(output_path, waveform, sample_rate, subtype='PCM_16')
except Exception as e:
print(f"Save error: {e}")
raise e
def setup_configs():
if downloaded_resources["configs"]: return
config_path = "models/vc/vevo/config"
os.makedirs(config_path, exist_ok=True)
config_files = ["Vq8192ToMels.json", "Vocoder.json"]
for file in config_files:
file_path = f"{config_path}/{file}"
if not os.path.exists(file_path):
try:
file_data = hf_hub_download(repo_id="amphion/Vevo", filename=f"config/{file}", repo_type="model")
subprocess.run(["cp", file_data, file_path])
except Exception as e: print(f"Error downloading config {file}: {e}")
downloaded_resources["configs"] = True
setup_configs()
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
print(f"Using device: {device}")
inference_pipelines = {}
def preload_all_resources():
print("Preloading resources...")
setup_configs()
global downloaded_content_style_tokenizer_path, downloaded_fmt_path, downloaded_vocoder_path
if not downloaded_resources["tokenizer_vq8192"]:
local_dir = snapshot_download(repo_id="amphion/Vevo", repo_type="model", cache_dir="./ckpts/Vevo", allow_patterns=["tokenizer/vq8192/*"])
downloaded_content_style_tokenizer_path = local_dir
downloaded_resources["tokenizer_vq8192"] = True
if not downloaded_resources["fmt_Vq8192ToMels"]:
local_dir = snapshot_download(repo_id="amphion/Vevo", repo_type="model", cache_dir="./ckpts/Vevo", allow_patterns=["acoustic_modeling/Vq8192ToMels/*"])
downloaded_fmt_path = local_dir
downloaded_resources["fmt_Vq8192ToMels"] = True
if not downloaded_resources["vocoder"]:
local_dir = snapshot_download(repo_id="amphion/Vevo", repo_type="model", cache_dir="./ckpts/Vevo", allow_patterns=["acoustic_modeling/Vocoder/*"])
downloaded_vocoder_path = local_dir
downloaded_resources["vocoder"] = True
print("Resources ready.")
downloaded_content_style_tokenizer_path = None
downloaded_fmt_path = None
downloaded_vocoder_path = None
preload_all_resources()
def get_pipeline():
if "timbre" in inference_pipelines:
return inference_pipelines["timbre"]
pipeline = VevoInferencePipeline(
content_style_tokenizer_ckpt_path=os.path.join(downloaded_content_style_tokenizer_path, "tokenizer/vq8192"),
fmt_cfg_path="./models/vc/vevo/config/Vq8192ToMels.json",
fmt_ckpt_path=os.path.join(downloaded_fmt_path, "acoustic_modeling/Vq8192ToMels"),
vocoder_cfg_path="./models/vc/vevo/config/Vocoder.json",
vocoder_ckpt_path=os.path.join(downloaded_vocoder_path, "acoustic_modeling/Vocoder"),
device=device,
)
inference_pipelines["timbre"] = pipeline
return pipeline
@spaces.GPU()
def vevo_timbre(content_wav, reference_wav):
session_id = str(uuid.uuid4())[:8]
temp_content_path = f"wav/c_{session_id}.wav"
temp_reference_path = f"wav/r_{session_id}.wav"
output_path = f"wav/out_{session_id}.wav"
if content_wav is None or reference_wav is None:
raise ValueError("Please upload audio files")
try:
# --- 1. پردازش ورودی (Content) ---
if isinstance(content_wav, tuple):
content_sr, content_data = content_wav if isinstance(content_wav[0], int) else (content_wav[1], content_wav[0])
else:
content_sr, content_data = content_wav
# تبدیل استریو به مونو
if len(content_data.shape) > 1 and content_data.shape[1] > 1:
content_data = np.mean(content_data, axis=1)
# نرمال‌سازی و ریسمپل
content_tensor = torch.FloatTensor(content_data).unsqueeze(0)
if content_sr != 24000:
content_tensor = torchaudio.functional.resample(content_tensor, content_sr, 24000)
content_sr = 24000
content_tensor = content_tensor / (torch.max(torch.abs(content_tensor)) + 1e-6) * 0.95
content_audio_np = content_tensor.squeeze().numpy()
# --- 2. پردازش رفرنس (Reference) ---
if isinstance(reference_wav, tuple):
ref_sr, ref_data = reference_wav if isinstance(reference_wav[0], int) else (reference_wav[1], reference_wav[0])
else:
ref_sr, ref_data = reference_wav
if len(ref_data.shape) > 1 and ref_data.shape[1] > 1:
ref_data = np.mean(ref_data, axis=1)
ref_tensor = torch.FloatTensor(ref_data).unsqueeze(0)
if ref_sr != 24000:
ref_tensor = torchaudio.functional.resample(ref_tensor, ref_sr, 24000)
ref_sr = 24000
ref_max = torch.max(torch.abs(ref_tensor)) + 1e-6
ref_tensor = ref_tensor / ref_max * 0.95
# محدود کردن طول رفرنس برای جلوگیری از خطای حافظه
if ref_tensor.shape[1] > 24000 * 20:
ref_tensor = ref_tensor[:, :24000 * 20]
save_audio_pcm16(ref_tensor, temp_reference_path, ref_sr)
# --- 3. منطق Overlap-Add برای اتصال بدون لرزش ---
pipeline = get_pipeline()
SR = 24000
# تنظیمات تکه‌بندی
PROCESS_CHUNK_SEC = 12 # طول هر تکه برای پردازش (۱۲ ثانیه)
OVERLAP_SEC = 2 # میزان هم‌پوشانی برای میکس (۲ ثانیه)
# میزان پیشروی موثر در هر گام (۱۰ ثانیه)
HOP_SEC = PROCESS_CHUNK_SEC - OVERLAP_SEC
hop_samples = HOP_SEC * SR
chunk_samples = PROCESS_CHUNK_SEC * SR
overlap_samples = OVERLAP_SEC * SR
total_length = len(content_audio_np)
final_output_audio = np.array([], dtype=np.float32)
print(f"[{session_id}] Starting seamless processing. Total length: {total_length/SR:.2f}s")
# حلقه روی فایل با گام ۱۰ ثانیه
for start_idx in range(0, total_length, hop_samples):
end_idx = min(start_idx + chunk_samples, total_length)
# استخراج تکه فعلی
current_chunk_np = content_audio_np[start_idx:end_idx]
# اگر تکه خیلی کوتاه باشد (کمتر از نیم ثانیه)، صرف نظر کن
if len(current_chunk_np) < SR * 0.5:
continue
# ذخیره موقت برای مدل
save_audio_pcm16(torch.FloatTensor(current_chunk_np).unsqueeze(0), temp_content_path, SR)
try:
# اجرای مدل
gen = pipeline.inference_fm(
src_wav_path=temp_content_path,
timbre_ref_wav_path=temp_reference_path,
flow_matching_steps=64,
)
# تبدیل خروجی به numpy
if torch.isnan(gen).any(): gen = torch.nan_to_num(gen, nan=0.0)
if gen.dim() == 1: gen = gen.unsqueeze(0)
gen_np = gen.cpu().squeeze(0).numpy()
# --- میکس (Cross-Fade) ---
if len(final_output_audio) == 0:
# اولین تکه: مستقیماً اضافه کن
final_output_audio = gen_np
else:
# تکه‌های بعدی
# ۱. محاسبه طول هم‌پوشانی واقعی
# انتظار داریم خروجی مدل تقریباً هم‌اندازه ورودی باشد
# انتهای صدای فعلی در final_output_audio کجاست؟
# ما در هر دور hop_samples جلو می‌رویم.
# روش ایمن:
# دو ثانیه آخر صدای قبلی را با دو ثانیه اول صدای جدید ترکیب می‌کنیم
prev_audio_len = len(final_output_audio)
new_audio_len = len(gen_np)
# اگر خروجی مدل خیلی کوتاه بود، فقط بچسبان
if new_audio_len < overlap_samples:
final_output_audio = np.concatenate([final_output_audio, gen_np])
continue
# برش بخش هم‌پوشانی
# ما قبلاً در دور قبل، تمام خروجی (۱۲ ثانیه) را ذخیره کردیم.
# اما الان ۱۰ ثانیه جلو آمدیم. پس ۲ ثانیه آخر دور قبلی، با ۲ ثانیه اول دور جدید هم‌پوشانی دارد.
# (توجه: این منطق فرض می‌کند خروجی مدل دقیقا هم‌طول ورودی است که در VC تقریبا درست است)
# حذف ۲ ثانیه آخر از بافر نهایی (برای آماده‌سازی میکس)
# اما صبر کن، ما در دور قبل کامل اضافه کردیم. پس الان بافر نهایی شامل Overlap هم هست.
# بخش غیر اورلپ قبلی: تا سرِ overlap
# بخش اورلپ قبلی: از سرِ overlap تا آخر
# منطق ساده‌تر Overlap-Add:
# همیشه فقط بخش "جدید" (۱۰ ثانیه) را نگه نداریم، بلکه همیشه میکس کنیم.
# پیاده‌سازی دقیق Cross-Fade:
fade_out = np.linspace(1, 0, overlap_samples)
fade_in = np.linspace(0, 1, overlap_samples)
# ۲ ثانیه آخر بافر فعلی
prev_tail = final_output_audio[-overlap_samples:]
# ۲ ثانیه اول صدای جدید
curr_head = gen_np[:overlap_samples]
# اطمینان از هم‌اندازه بودن (ممکن است مدل خروجی کمی متفاوت بدهد)
min_len = min(len(prev_tail), len(curr_head))
if min_len < overlap_samples:
# اگر به هر دلیلی سایزها نخواند (نادر)، برش بزن
prev_tail = prev_tail[:min_len]
curr_head = curr_head[:min_len]
fade_out = fade_out[:min_len]
fade_in = fade_in[:min_len]
# اصلاح بافر اصلی
final_output_audio = final_output_audio[:len(final_output_audio)-(overlap_samples-min_len)]
# انجام میکس
blended_overlap = (prev_tail * fade_out) + (curr_head * fade_in)
# بروزرسانی بافر نهایی:
# ۱. حذف بخش Overlap خام قبلی
final_output_audio = final_output_audio[:-len(blended_overlap)]
# ۲. اضافه کردن بخش میکس شده
final_output_audio = np.concatenate([final_output_audio, blended_overlap])
# ۳. اضافه کردن باقی‌مانده صدای جدید (بعد از بخش Overlap)
remaining_new = gen_np[len(blended_overlap):]
final_output_audio = np.concatenate([final_output_audio, remaining_new])
except Exception as e:
print(f"Error generating chunk at {start_idx}: {e}")
# در صورت خطا، سکوت اضافه کن تا تایمینگ به هم نریزد
silence_len = chunk_samples if start_idx + chunk_samples < total_length else total_length - start_idx
final_output_audio = np.concatenate([final_output_audio, np.zeros(silence_len)])
# ذخیره نهایی
save_audio_pcm16(final_output_audio, output_path, SR)
return output_path
finally:
if os.path.exists(temp_content_path): os.remove(temp_content_path)
if os.path.exists(temp_reference_path): os.remove(temp_reference_path)
with gr.Blocks(title="Vevo-Timbre (Seamless Overlap)") as demo:
gr.Markdown("## Vevo-Timbre: Zero-Shot Voice Conversion")
gr.Markdown("Fixed Seamless Version: Uses 2s Cross-Fade Overlap to eliminate glitches.")
with gr.Row():
with gr.Column():
timbre_content = gr.Audio(label="Source Audio", type="numpy")
timbre_reference = gr.Audio(label="Target Timbre", type="numpy")
timbre_button = gr.Button("Generate", variant="primary")
with gr.Column():
timbre_output = gr.Audio(label="Result")
timbre_button.click(vevo_timbre, inputs=[timbre_content, timbre_reference], outputs=timbre_output)
demo.launch()