Spaces:
Sleeping
Sleeping
| import os | |
| import gradio as gr | |
| import subprocess | |
| import torch | |
| import torchaudio | |
| import shutil | |
| import huggingface_hub | |
| from speechbrain.inference.separation import SepformerSeparation | |
| from speechbrain.inference.speaker import SpeakerRecognition | |
| # ========================================== | |
| # 0. 初始化与配置 | |
| # ========================================== | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| print(f"🚀 运行硬件: {device}") | |
| # 强制登录 (确保能下载模型) | |
| try: | |
| token = os.environ.get("HF_TOKEN") | |
| if token: | |
| huggingface_hub.login(token=token) | |
| except: | |
| pass | |
| # 全局模型缓存 | |
| sep_model = None | |
| spk_model = None | |
| def load_sep_model(): | |
| global sep_model | |
| if sep_model is None: | |
| print("⏳ 正在加载 SpeechBrain 分离模型...") | |
| sep_model = SepformerSeparation.from_hparams( | |
| source="speechbrain/sepformer-whamr16k", | |
| savedir="pretrained_models/sepformer-whamr16k", | |
| run_opts={"device": device} | |
| ) | |
| return sep_model | |
| def load_spk_model(): | |
| global spk_model | |
| if spk_model is None: | |
| print("⏳ 正在加载声纹识别模型 (用于匹配样本)...") | |
| spk_model = SpeakerRecognition.from_hparams( | |
| source="speechbrain/spkrec-ecapa-voxceleb", | |
| savedir="pretrained_models/spkrec-ecapa-voxceleb", | |
| run_opts={"device": device} | |
| ) | |
| return spk_model | |
| # ========================================== | |
| # 1. 第一级:Demucs 降噪 (清洗背景音) | |
| # ========================================== | |
| def stage1_denoise(audio_path): | |
| print("--- [阶段1] Demucs 降噪启动 ---") | |
| output_dir = "separated_stage1" | |
| if os.path.exists(output_dir): | |
| shutil.rmtree(output_dir) | |
| # 调用 Demucs 提取人声 | |
| command = [ | |
| "python3", "-m", "demucs.separate", | |
| "-n", "htdemucs", | |
| "--two-stems=vocals", # 只要人声 | |
| audio_path, | |
| "-o", output_dir | |
| ] | |
| subprocess.run(command, capture_output=True) | |
| # 寻找输出文件 | |
| filename = os.path.splitext(os.path.basename(audio_path))[0] | |
| # 路径通常是 separated_stage1/htdemucs/文件名/vocals.wav | |
| # 我们需要递归查找,防止文件名被 Demucs 修改 | |
| vocals_path = None | |
| for root, dirs, files in os.walk(output_dir): | |
| if "vocals.wav" in files: | |
| vocals_path = os.path.join(root, "vocals.wav") | |
| break | |
| if not vocals_path: | |
| raise Exception("Demucs 未能生成人声文件") | |
| print("✅ [阶段1] 完成:背景杂音已去除,获得纯净混合人声。") | |
| return vocals_path | |
| # ========================================== | |
| # 2. 第二级:SpeechBrain 双源分离 | |
| # ========================================== | |
| def stage2_separate(clean_audio_path): | |
| print("--- [阶段2] SpeechBrain 双源分离启动 ---") | |
| model = load_sep_model() | |
| # 执行分离 | |
| est_sources = model.separate_file(path=clean_audio_path) | |
| # 归一化 | |
| est_sources = est_sources / est_sources.abs().max() | |
| # 提取两个源 | |
| source1 = est_sources[:, :, 0].detach().cpu() | |
| source2 = est_sources[:, :, 1].detach().cpu() | |
| # 保存 | |
| path1 = "temp_source1.wav" | |
| path2 = "temp_source2.wav" | |
| # 注意:SpeechBrain 此模型输出通常为 8k 或 16k,这里统一保存 | |
| torchaudio.save(path1, source1, 16000) | |
| torchaudio.save(path2, source2, 16000) | |
| print("✅ [阶段2] 完成:混合人声已拆分为两个独立音轨。") | |
| return path1, path2 | |
| # ========================================== | |
| # 3. 第三级:声纹比对 (找出主角) | |
| # ========================================== | |
| def stage3_identify(track1, track2, reference_path): | |
| if not reference_path: | |
| return track1, track2, "⚠️ 未提供参考样本,无法自动识别主角。展示原始分离结果。" | |
| print("--- [阶段3] 声纹比对启动 ---") | |
| model = load_spk_model() | |
| # 计算相似度分数的函数 | |
| def get_score(test_file, ref_file): | |
| score, prediction = model.verify_files(test_file, ref_file) | |
| return score.item() | |
| try: | |
| score1 = get_score(track1, reference_path) | |
| score2 = get_score(track2, reference_path) | |
| print(f"🔍 相似度得分 - 音轨1: {score1:.4f} | 音轨2: {score2:.4f}") | |
| # 逻辑判断:得分高的是主角 | |
| if score1 > score2: | |
| return track1, track2, f"✅ 匹配成功!\n音轨1 是目标人物 (得分 {score1:.2f})\n音轨2 是干扰人声 (得分 {score2:.2f})" | |
| else: | |
| # 如果音轨2分高,我们交换一下位置,让用户总是在第一个位置听到主角 | |
| return track2, track1, f"✅ 匹配成功!\n音轨1 是目标人物 (得分 {score2:.2f})\n音轨2 是干扰人声 (得分 {score1:.2f})" | |
| except Exception as e: | |
| print(f"❌ 声纹识别出错: {e}") | |
| return track1, track2, "⚠️ 声纹识别运行失败,展示原始分离结果。" | |
| # ========================================== | |
| # 主流程 | |
| # ========================================== | |
| def pipeline(input_audio, ref_audio): | |
| status_log = "" | |
| if not input_audio: | |
| return None, None, "请上传需要处理的主音频" | |
| try: | |
| # 1. 清洗 | |
| clean_vocals = stage1_denoise(input_audio) | |
| # 2. 分离 | |
| track1, track2 = stage2_separate(clean_vocals) | |
| # 3. 识别 (如果用户上传了样本) | |
| final_target, final_noise, msg = stage3_identify(track1, track2, ref_audio) | |
| return final_target, final_noise, msg | |
| except Exception as e: | |
| return None, None, f"❌ 处理链中断: {str(e)}" | |
| # ========================================== | |
| # 界面 | |
| # ========================================== | |
| with gr.Blocks(title="V3: 级联智能人声提取") as demo: | |
| gr.Markdown("## 🕵️♂️ V3 级联架构:Demucs 降噪 + 双源分离 + 声纹锁定") | |
| gr.Markdown("这是最高级的处理流程:\n1. **Demucs** 先把背景音乐杀掉。\n2. **SpeechBrain** 再把混在一起的两个人拆开。\n3. **声纹识别** 根据你上传的样本,自动把你想要的人挑出来。") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| input_main = gr.Audio(label="1. 上传复杂的混合音频 (必须)", type="filepath") | |
| input_ref = gr.Audio(label="2. 上传主角的声音样本 (可选,用于自动挑选)", type="filepath") | |
| btn = gr.Button("🚀 启动三级火箭分离", variant="primary") | |
| with gr.Column(scale=1): | |
| log_box = gr.Textbox(label="分析报告", value="等待任务...") | |
| out_target = gr.Audio(label="🎉 结果:目标主角 (Target)", type="filepath") | |
| out_noise = gr.Audio(label="🗑️ 结果:被剔除的干扰人声/杂音", type="filepath") | |
| btn.click(fn=pipeline, inputs=[input_main, input_ref], outputs=[out_target, out_noise, log_box]) | |
| if __name__ == "__main__": | |
| demo.launch() | |