audio-separator / app.py
haoyue518's picture
Upload 2 files
bcae25e verified
import os
import gradio as gr
import subprocess
import torch
import torchaudio
import shutil
import huggingface_hub
from speechbrain.inference.separation import SepformerSeparation
from speechbrain.inference.speaker import SpeakerRecognition
# ==========================================
# 0. 初始化与配置
# ==========================================
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"🚀 运行硬件: {device}")
# 强制登录 (确保能下载模型)
try:
token = os.environ.get("HF_TOKEN")
if token:
huggingface_hub.login(token=token)
except:
pass
# 全局模型缓存
sep_model = None
spk_model = None
def load_sep_model():
global sep_model
if sep_model is None:
print("⏳ 正在加载 SpeechBrain 分离模型...")
sep_model = SepformerSeparation.from_hparams(
source="speechbrain/sepformer-whamr16k",
savedir="pretrained_models/sepformer-whamr16k",
run_opts={"device": device}
)
return sep_model
def load_spk_model():
global spk_model
if spk_model is None:
print("⏳ 正在加载声纹识别模型 (用于匹配样本)...")
spk_model = SpeakerRecognition.from_hparams(
source="speechbrain/spkrec-ecapa-voxceleb",
savedir="pretrained_models/spkrec-ecapa-voxceleb",
run_opts={"device": device}
)
return spk_model
# ==========================================
# 1. 第一级:Demucs 降噪 (清洗背景音)
# ==========================================
def stage1_denoise(audio_path):
print("--- [阶段1] Demucs 降噪启动 ---")
output_dir = "separated_stage1"
if os.path.exists(output_dir):
shutil.rmtree(output_dir)
# 调用 Demucs 提取人声
command = [
"python3", "-m", "demucs.separate",
"-n", "htdemucs",
"--two-stems=vocals", # 只要人声
audio_path,
"-o", output_dir
]
subprocess.run(command, capture_output=True)
# 寻找输出文件
filename = os.path.splitext(os.path.basename(audio_path))[0]
# 路径通常是 separated_stage1/htdemucs/文件名/vocals.wav
# 我们需要递归查找,防止文件名被 Demucs 修改
vocals_path = None
for root, dirs, files in os.walk(output_dir):
if "vocals.wav" in files:
vocals_path = os.path.join(root, "vocals.wav")
break
if not vocals_path:
raise Exception("Demucs 未能生成人声文件")
print("✅ [阶段1] 完成:背景杂音已去除,获得纯净混合人声。")
return vocals_path
# ==========================================
# 2. 第二级:SpeechBrain 双源分离
# ==========================================
def stage2_separate(clean_audio_path):
print("--- [阶段2] SpeechBrain 双源分离启动 ---")
model = load_sep_model()
# 执行分离
est_sources = model.separate_file(path=clean_audio_path)
# 归一化
est_sources = est_sources / est_sources.abs().max()
# 提取两个源
source1 = est_sources[:, :, 0].detach().cpu()
source2 = est_sources[:, :, 1].detach().cpu()
# 保存
path1 = "temp_source1.wav"
path2 = "temp_source2.wav"
# 注意:SpeechBrain 此模型输出通常为 8k 或 16k,这里统一保存
torchaudio.save(path1, source1, 16000)
torchaudio.save(path2, source2, 16000)
print("✅ [阶段2] 完成:混合人声已拆分为两个独立音轨。")
return path1, path2
# ==========================================
# 3. 第三级:声纹比对 (找出主角)
# ==========================================
def stage3_identify(track1, track2, reference_path):
if not reference_path:
return track1, track2, "⚠️ 未提供参考样本,无法自动识别主角。展示原始分离结果。"
print("--- [阶段3] 声纹比对启动 ---")
model = load_spk_model()
# 计算相似度分数的函数
def get_score(test_file, ref_file):
score, prediction = model.verify_files(test_file, ref_file)
return score.item()
try:
score1 = get_score(track1, reference_path)
score2 = get_score(track2, reference_path)
print(f"🔍 相似度得分 - 音轨1: {score1:.4f} | 音轨2: {score2:.4f}")
# 逻辑判断:得分高的是主角
if score1 > score2:
return track1, track2, f"✅ 匹配成功!\n音轨1 是目标人物 (得分 {score1:.2f})\n音轨2 是干扰人声 (得分 {score2:.2f})"
else:
# 如果音轨2分高,我们交换一下位置,让用户总是在第一个位置听到主角
return track2, track1, f"✅ 匹配成功!\n音轨1 是目标人物 (得分 {score2:.2f})\n音轨2 是干扰人声 (得分 {score1:.2f})"
except Exception as e:
print(f"❌ 声纹识别出错: {e}")
return track1, track2, "⚠️ 声纹识别运行失败,展示原始分离结果。"
# ==========================================
# 主流程
# ==========================================
def pipeline(input_audio, ref_audio):
status_log = ""
if not input_audio:
return None, None, "请上传需要处理的主音频"
try:
# 1. 清洗
clean_vocals = stage1_denoise(input_audio)
# 2. 分离
track1, track2 = stage2_separate(clean_vocals)
# 3. 识别 (如果用户上传了样本)
final_target, final_noise, msg = stage3_identify(track1, track2, ref_audio)
return final_target, final_noise, msg
except Exception as e:
return None, None, f"❌ 处理链中断: {str(e)}"
# ==========================================
# 界面
# ==========================================
with gr.Blocks(title="V3: 级联智能人声提取") as demo:
gr.Markdown("## 🕵️‍♂️ V3 级联架构:Demucs 降噪 + 双源分离 + 声纹锁定")
gr.Markdown("这是最高级的处理流程:\n1. **Demucs** 先把背景音乐杀掉。\n2. **SpeechBrain** 再把混在一起的两个人拆开。\n3. **声纹识别** 根据你上传的样本,自动把你想要的人挑出来。")
with gr.Row():
with gr.Column(scale=1):
input_main = gr.Audio(label="1. 上传复杂的混合音频 (必须)", type="filepath")
input_ref = gr.Audio(label="2. 上传主角的声音样本 (可选,用于自动挑选)", type="filepath")
btn = gr.Button("🚀 启动三级火箭分离", variant="primary")
with gr.Column(scale=1):
log_box = gr.Textbox(label="分析报告", value="等待任务...")
out_target = gr.Audio(label="🎉 结果:目标主角 (Target)", type="filepath")
out_noise = gr.Audio(label="🗑️ 结果:被剔除的干扰人声/杂音", type="filepath")
btn.click(fn=pipeline, inputs=[input_main, input_ref], outputs=[out_target, out_noise, log_box])
if __name__ == "__main__":
demo.launch()