File size: 7,326 Bytes
bcae25e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193

import os
import gradio as gr
import subprocess
import torch
import torchaudio
import shutil
import huggingface_hub
from speechbrain.inference.separation import SepformerSeparation
from speechbrain.inference.speaker import SpeakerRecognition

# ==========================================
# 0. 初始化与配置
# ==========================================
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"🚀 运行硬件: {device}")

# 强制登录 (确保能下载模型)
try:
    token = os.environ.get("HF_TOKEN")
    if token:
        huggingface_hub.login(token=token)
except:
    pass

# 全局模型缓存
sep_model = None
spk_model = None

def load_sep_model():
    global sep_model
    if sep_model is None:
        print("⏳ 正在加载 SpeechBrain 分离模型...")
        sep_model = SepformerSeparation.from_hparams(
            source="speechbrain/sepformer-whamr16k",
            savedir="pretrained_models/sepformer-whamr16k",
            run_opts={"device": device}
        )
    return sep_model

def load_spk_model():
    global spk_model
    if spk_model is None:
        print("⏳ 正在加载声纹识别模型 (用于匹配样本)...")
        spk_model = SpeakerRecognition.from_hparams(
            source="speechbrain/spkrec-ecapa-voxceleb",
            savedir="pretrained_models/spkrec-ecapa-voxceleb",
            run_opts={"device": device}
        )
    return spk_model

# ==========================================
# 1. 第一级:Demucs 降噪 (清洗背景音)
# ==========================================
def stage1_denoise(audio_path):
    print("--- [阶段1] Demucs 降噪启动 ---")
    output_dir = "separated_stage1"
    if os.path.exists(output_dir):
        shutil.rmtree(output_dir)
        
    # 调用 Demucs 提取人声
    command = [
        "python3", "-m", "demucs.separate",
        "-n", "htdemucs",
        "--two-stems=vocals", # 只要人声
        audio_path,
        "-o", output_dir
    ]
    subprocess.run(command, capture_output=True)
    
    # 寻找输出文件
    filename = os.path.splitext(os.path.basename(audio_path))[0]
    # 路径通常是 separated_stage1/htdemucs/文件名/vocals.wav
    # 我们需要递归查找,防止文件名被 Demucs 修改
    vocals_path = None
    for root, dirs, files in os.walk(output_dir):
        if "vocals.wav" in files:
            vocals_path = os.path.join(root, "vocals.wav")
            break
            
    if not vocals_path:
        raise Exception("Demucs 未能生成人声文件")
        
    print("✅ [阶段1] 完成:背景杂音已去除,获得纯净混合人声。")
    return vocals_path

# ==========================================
# 2. 第二级:SpeechBrain 双源分离
# ==========================================
def stage2_separate(clean_audio_path):
    print("--- [阶段2] SpeechBrain 双源分离启动 ---")
    model = load_sep_model()
    
    # 执行分离
    est_sources = model.separate_file(path=clean_audio_path)
    
    # 归一化
    est_sources = est_sources / est_sources.abs().max()
    
    # 提取两个源
    source1 = est_sources[:, :, 0].detach().cpu()
    source2 = est_sources[:, :, 1].detach().cpu()
    
    # 保存
    path1 = "temp_source1.wav"
    path2 = "temp_source2.wav"
    
    # 注意:SpeechBrain 此模型输出通常为 8k 或 16k,这里统一保存
    torchaudio.save(path1, source1, 16000)
    torchaudio.save(path2, source2, 16000)
    
    print("✅ [阶段2] 完成:混合人声已拆分为两个独立音轨。")
    return path1, path2

# ==========================================
# 3. 第三级:声纹比对 (找出主角)
# ==========================================
def stage3_identify(track1, track2, reference_path):
    if not reference_path:
        return track1, track2, "⚠️ 未提供参考样本,无法自动识别主角。展示原始分离结果。"
    
    print("--- [阶段3] 声纹比对启动 ---")
    model = load_spk_model()
    
    # 计算相似度分数的函数
    def get_score(test_file, ref_file):
        score, prediction = model.verify_files(test_file, ref_file)
        return score.item()
    
    try:
        score1 = get_score(track1, reference_path)
        score2 = get_score(track2, reference_path)
        
        print(f"🔍 相似度得分 - 音轨1: {score1:.4f} | 音轨2: {score2:.4f}")
        
        # 逻辑判断:得分高的是主角
        if score1 > score2:
            return track1, track2, f"✅ 匹配成功!\n音轨1 是目标人物 (得分 {score1:.2f})\n音轨2 是干扰人声 (得分 {score2:.2f})"
        else:
            # 如果音轨2分高,我们交换一下位置,让用户总是在第一个位置听到主角
            return track2, track1, f"✅ 匹配成功!\n音轨1 是目标人物 (得分 {score2:.2f})\n音轨2 是干扰人声 (得分 {score1:.2f})"
            
    except Exception as e:
        print(f"❌ 声纹识别出错: {e}")
        return track1, track2, "⚠️ 声纹识别运行失败,展示原始分离结果。"

# ==========================================
# 主流程
# ==========================================
def pipeline(input_audio, ref_audio):
    status_log = ""
    
    if not input_audio:
        return None, None, "请上传需要处理的主音频"
        
    try:
        # 1. 清洗
        clean_vocals = stage1_denoise(input_audio)
        
        # 2. 分离
        track1, track2 = stage2_separate(clean_vocals)
        
        # 3. 识别 (如果用户上传了样本)
        final_target, final_noise, msg = stage3_identify(track1, track2, ref_audio)
        
        return final_target, final_noise, msg
        
    except Exception as e:
        return None, None, f"❌ 处理链中断: {str(e)}"

# ==========================================
# 界面
# ==========================================
with gr.Blocks(title="V3: 级联智能人声提取") as demo:
    gr.Markdown("## 🕵️‍♂️ V3 级联架构:Demucs 降噪 + 双源分离 + 声纹锁定")
    gr.Markdown("这是最高级的处理流程:\n1. **Demucs** 先把背景音乐杀掉。\n2. **SpeechBrain** 再把混在一起的两个人拆开。\n3. **声纹识别** 根据你上传的样本,自动把你想要的人挑出来。")
    
    with gr.Row():
        with gr.Column(scale=1):
            input_main = gr.Audio(label="1. 上传复杂的混合音频 (必须)", type="filepath")
            input_ref = gr.Audio(label="2. 上传主角的声音样本 (可选,用于自动挑选)", type="filepath")
            btn = gr.Button("🚀 启动三级火箭分离", variant="primary")
            
        with gr.Column(scale=1):
            log_box = gr.Textbox(label="分析报告", value="等待任务...")
            out_target = gr.Audio(label="🎉 结果:目标主角 (Target)", type="filepath")
            out_noise = gr.Audio(label="🗑️ 结果:被剔除的干扰人声/杂音", type="filepath")

    btn.click(fn=pipeline, inputs=[input_main, input_ref], outputs=[out_target, out_noise, log_box])

if __name__ == "__main__":
    demo.launch()