Spaces:
Sleeping
Sleeping
File size: 7,326 Bytes
bcae25e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 |
import os
import gradio as gr
import subprocess
import torch
import torchaudio
import shutil
import huggingface_hub
from speechbrain.inference.separation import SepformerSeparation
from speechbrain.inference.speaker import SpeakerRecognition
# ==========================================
# 0. 初始化与配置
# ==========================================
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"🚀 运行硬件: {device}")
# 强制登录 (确保能下载模型)
try:
token = os.environ.get("HF_TOKEN")
if token:
huggingface_hub.login(token=token)
except:
pass
# 全局模型缓存
sep_model = None
spk_model = None
def load_sep_model():
global sep_model
if sep_model is None:
print("⏳ 正在加载 SpeechBrain 分离模型...")
sep_model = SepformerSeparation.from_hparams(
source="speechbrain/sepformer-whamr16k",
savedir="pretrained_models/sepformer-whamr16k",
run_opts={"device": device}
)
return sep_model
def load_spk_model():
global spk_model
if spk_model is None:
print("⏳ 正在加载声纹识别模型 (用于匹配样本)...")
spk_model = SpeakerRecognition.from_hparams(
source="speechbrain/spkrec-ecapa-voxceleb",
savedir="pretrained_models/spkrec-ecapa-voxceleb",
run_opts={"device": device}
)
return spk_model
# ==========================================
# 1. 第一级:Demucs 降噪 (清洗背景音)
# ==========================================
def stage1_denoise(audio_path):
print("--- [阶段1] Demucs 降噪启动 ---")
output_dir = "separated_stage1"
if os.path.exists(output_dir):
shutil.rmtree(output_dir)
# 调用 Demucs 提取人声
command = [
"python3", "-m", "demucs.separate",
"-n", "htdemucs",
"--two-stems=vocals", # 只要人声
audio_path,
"-o", output_dir
]
subprocess.run(command, capture_output=True)
# 寻找输出文件
filename = os.path.splitext(os.path.basename(audio_path))[0]
# 路径通常是 separated_stage1/htdemucs/文件名/vocals.wav
# 我们需要递归查找,防止文件名被 Demucs 修改
vocals_path = None
for root, dirs, files in os.walk(output_dir):
if "vocals.wav" in files:
vocals_path = os.path.join(root, "vocals.wav")
break
if not vocals_path:
raise Exception("Demucs 未能生成人声文件")
print("✅ [阶段1] 完成:背景杂音已去除,获得纯净混合人声。")
return vocals_path
# ==========================================
# 2. 第二级:SpeechBrain 双源分离
# ==========================================
def stage2_separate(clean_audio_path):
print("--- [阶段2] SpeechBrain 双源分离启动 ---")
model = load_sep_model()
# 执行分离
est_sources = model.separate_file(path=clean_audio_path)
# 归一化
est_sources = est_sources / est_sources.abs().max()
# 提取两个源
source1 = est_sources[:, :, 0].detach().cpu()
source2 = est_sources[:, :, 1].detach().cpu()
# 保存
path1 = "temp_source1.wav"
path2 = "temp_source2.wav"
# 注意:SpeechBrain 此模型输出通常为 8k 或 16k,这里统一保存
torchaudio.save(path1, source1, 16000)
torchaudio.save(path2, source2, 16000)
print("✅ [阶段2] 完成:混合人声已拆分为两个独立音轨。")
return path1, path2
# ==========================================
# 3. 第三级:声纹比对 (找出主角)
# ==========================================
def stage3_identify(track1, track2, reference_path):
if not reference_path:
return track1, track2, "⚠️ 未提供参考样本,无法自动识别主角。展示原始分离结果。"
print("--- [阶段3] 声纹比对启动 ---")
model = load_spk_model()
# 计算相似度分数的函数
def get_score(test_file, ref_file):
score, prediction = model.verify_files(test_file, ref_file)
return score.item()
try:
score1 = get_score(track1, reference_path)
score2 = get_score(track2, reference_path)
print(f"🔍 相似度得分 - 音轨1: {score1:.4f} | 音轨2: {score2:.4f}")
# 逻辑判断:得分高的是主角
if score1 > score2:
return track1, track2, f"✅ 匹配成功!\n音轨1 是目标人物 (得分 {score1:.2f})\n音轨2 是干扰人声 (得分 {score2:.2f})"
else:
# 如果音轨2分高,我们交换一下位置,让用户总是在第一个位置听到主角
return track2, track1, f"✅ 匹配成功!\n音轨1 是目标人物 (得分 {score2:.2f})\n音轨2 是干扰人声 (得分 {score1:.2f})"
except Exception as e:
print(f"❌ 声纹识别出错: {e}")
return track1, track2, "⚠️ 声纹识别运行失败,展示原始分离结果。"
# ==========================================
# 主流程
# ==========================================
def pipeline(input_audio, ref_audio):
status_log = ""
if not input_audio:
return None, None, "请上传需要处理的主音频"
try:
# 1. 清洗
clean_vocals = stage1_denoise(input_audio)
# 2. 分离
track1, track2 = stage2_separate(clean_vocals)
# 3. 识别 (如果用户上传了样本)
final_target, final_noise, msg = stage3_identify(track1, track2, ref_audio)
return final_target, final_noise, msg
except Exception as e:
return None, None, f"❌ 处理链中断: {str(e)}"
# ==========================================
# 界面
# ==========================================
with gr.Blocks(title="V3: 级联智能人声提取") as demo:
gr.Markdown("## 🕵️♂️ V3 级联架构:Demucs 降噪 + 双源分离 + 声纹锁定")
gr.Markdown("这是最高级的处理流程:\n1. **Demucs** 先把背景音乐杀掉。\n2. **SpeechBrain** 再把混在一起的两个人拆开。\n3. **声纹识别** 根据你上传的样本,自动把你想要的人挑出来。")
with gr.Row():
with gr.Column(scale=1):
input_main = gr.Audio(label="1. 上传复杂的混合音频 (必须)", type="filepath")
input_ref = gr.Audio(label="2. 上传主角的声音样本 (可选,用于自动挑选)", type="filepath")
btn = gr.Button("🚀 启动三级火箭分离", variant="primary")
with gr.Column(scale=1):
log_box = gr.Textbox(label="分析报告", value="等待任务...")
out_target = gr.Audio(label="🎉 结果:目标主角 (Target)", type="filepath")
out_noise = gr.Audio(label="🗑️ 结果:被剔除的干扰人声/杂音", type="filepath")
btn.click(fn=pipeline, inputs=[input_main, input_ref], outputs=[out_target, out_noise, log_box])
if __name__ == "__main__":
demo.launch()
|