Spaces:
Sleeping
Sleeping
| import os, tempfile, subprocess | |
| import gradio as gr | |
| import numpy as np | |
| import soundfile as sf | |
| import librosa | |
| # 检查 GPU | |
| try: | |
| import torch | |
| DEVICE = "cuda" if torch.cuda.is_available() else "cpu" | |
| except: | |
| DEVICE = "cpu" | |
| SAMPLE_RATE = 44100 | |
| def extract_audio_from_video(video_path, output_path): | |
| """从视频中提取音频""" | |
| try: | |
| cmd = [ | |
| 'ffmpeg', '-i', video_path, | |
| '-vn', | |
| '-acodec', 'pcm_s16le', | |
| '-ar', str(SAMPLE_RATE), | |
| '-ac', '2', | |
| '-y', | |
| output_path | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| if result.returncode != 0: | |
| raise RuntimeError(f"FFmpeg 提取失败: {result.stderr}") | |
| return output_path | |
| except Exception as e: | |
| raise RuntimeError(f"音频提取失败: {str(e)}") | |
| def load_audio_any_format(file_path, target_sr=SAMPLE_RATE): | |
| """加载任意格式音频""" | |
| try: | |
| video_extensions = ['.mp4', '.mov', '.avi', '.mkv', '.flv', '.wmv', '.m4v'] | |
| file_ext = os.path.splitext(file_path)[1].lower() | |
| if file_ext in video_extensions: | |
| with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp: | |
| temp_audio_path = tmp.name | |
| extract_audio_from_video(file_path, temp_audio_path) | |
| audio, sr = librosa.load(temp_audio_path, sr=target_sr, mono=False) | |
| os.unlink(temp_audio_path) | |
| else: | |
| audio, sr = librosa.load(file_path, sr=target_sr, mono=False) | |
| if audio.ndim == 1: | |
| audio = audio.reshape(1, -1) | |
| return audio, sr | |
| except Exception as e: | |
| raise ValueError(f"音频加载失败: {str(e)}") | |
| def save_audio(path, audio, sr): | |
| """保存音频""" | |
| try: | |
| if audio.ndim == 1: | |
| audio = audio.reshape(1, -1) | |
| audio = np.clip(audio, -1.0, 1.0) | |
| sf.write(path, audio.T, sr, subtype="PCM_16") | |
| except Exception as e: | |
| raise RuntimeError(f"音频保存失败: {str(e)}") | |
| def run_demucs_separation(audio_path, output_dir): | |
| """使用 Demucs 进行人声/伴奏分离""" | |
| try: | |
| cmd = [ | |
| "python", "-m", "demucs.separate", | |
| "--two-stems=vocals", | |
| "-n", "htdemucs", | |
| "--mp3", | |
| "--mp3-bitrate=320", | |
| "-o", output_dir, | |
| audio_path | |
| ] | |
| result = subprocess.run(cmd, check=True, capture_output=True, text=True, timeout=600) | |
| base_name = os.path.splitext(os.path.basename(audio_path))[0] | |
| stem_dir = os.path.join(output_dir, "htdemucs", base_name) | |
| vocals_path = os.path.join(stem_dir, "vocals.mp3") | |
| instrumental_path = os.path.join(stem_dir, "no_vocals.mp3") | |
| if not os.path.exists(vocals_path): | |
| raise FileNotFoundError(f"Demucs 输出文件不存在: {vocals_path}") | |
| return vocals_path, instrumental_path | |
| except subprocess.TimeoutExpired: | |
| raise RuntimeError("处理超时(超过10分钟),请上传较短的音频") | |
| except subprocess.CalledProcessError as e: | |
| raise RuntimeError(f"Demucs 执行失败: {e.stderr}") | |
| except Exception as e: | |
| raise RuntimeError(f"Demucs 分离失败: {str(e)}") | |
| def detect_speaking_improved(vocals_audio, sr, strictness=0.6): | |
| """ | |
| 改进的说话检测算法(无需外部模型) | |
| 基于多特征融合: | |
| 1. 能量包络(RMS) | |
| 2. 零交叉率(ZCR) | |
| 3. 频谱质心(Spectral Centroid) | |
| 4. 频谱滚降(Spectral Rolloff) | |
| 5. 音高连续性 | |
| strictness: 0-1,越高越严格(只保留明确的说话) | |
| """ | |
| try: | |
| hop_length = 512 | |
| frame_length = 2048 | |
| # ===== 特征1: 能量 ===== | |
| rms = librosa.feature.rms(y=vocals_audio, frame_length=frame_length, hop_length=hop_length)[0] | |
| # ===== 特征2: 零交叉率 ===== | |
| zcr = librosa.feature.zero_crossing_rate(vocals_audio, frame_length=frame_length, hop_length=hop_length)[0] | |
| # ===== 特征3: 频谱质心 ===== | |
| spectral_centroids = librosa.feature.spectral_centroid(y=vocals_audio, sr=sr, hop_length=hop_length)[0] | |
| # ===== 特征4: 频谱滚降 ===== | |
| spectral_rolloff = librosa.feature.spectral_rolloff(y=vocals_audio, sr=sr, hop_length=hop_length)[0] | |
| # ===== 特征5: 音高检测 ===== | |
| try: | |
| f0, voiced_flag, voiced_probs = librosa.pyin( | |
| vocals_audio, | |
| fmin=librosa.note_to_hz('C2'), | |
| fmax=librosa.note_to_hz('C7'), | |
| sr=sr, | |
| frame_length=frame_length, | |
| hop_length=hop_length | |
| ) | |
| f0 = np.nan_to_num(f0, nan=0.0) | |
| voiced_probs = np.nan_to_num(voiced_probs, nan=0.0) | |
| except: | |
| f0 = np.zeros(len(rms)) | |
| voiced_probs = np.zeros(len(rms)) | |
| # ===== 特征融合 ===== | |
| min_len = min(len(rms), len(zcr), len(spectral_centroids), len(spectral_rolloff), len(voiced_probs)) | |
| rms = rms[:min_len] | |
| zcr = zcr[:min_len] | |
| spectral_centroids = spectral_centroids[:min_len] | |
| spectral_rolloff = spectral_rolloff[:min_len] | |
| voiced_probs = voiced_probs[:min_len] | |
| f0 = f0[:min_len] | |
| # 说话特征得分 | |
| # 1. 零交叉率高(但不是极高) | |
| zcr_score = np.clip((zcr - 0.05) / 0.15, 0, 1) | |
| # 2. 能量适中(不是持续的高能量) | |
| rms_norm = rms / (np.max(rms) + 1e-8) | |
| energy_variation = np.abs(np.gradient(rms_norm)) | |
| energy_score = np.clip(energy_variation * 10, 0, 1) | |
| # 3. 频谱质心变化大 | |
| centroid_variation = np.abs(np.gradient(spectral_centroids)) | |
| centroid_score = np.clip(centroid_variation / (np.mean(centroid_variation) + 1e-8), 0, 1) | |
| # 4. 音高不连续 | |
| pitch_continuity = np.zeros_like(f0) | |
| for i in range(1, len(f0)): | |
| if f0[i] > 0 and f0[i-1] > 0: | |
| pitch_diff = abs(f0[i] - f0[i-1]) | |
| if pitch_diff > 50: | |
| pitch_continuity[i] = 1 | |
| # 综合得分 | |
| speaking_score = ( | |
| 0.30 * zcr_score + | |
| 0.25 * energy_score + | |
| 0.25 * centroid_score + | |
| 0.20 * pitch_continuity | |
| ) | |
| # 根据严格度调整阈值 | |
| threshold = strictness | |
| speaking_mask = (speaking_score > threshold).astype(np.float32) | |
| # ===== 后处理 ===== | |
| # 去除过短片段(<0.2秒) | |
| min_duration = int(0.2 * sr / hop_length) | |
| i = 0 | |
| while i < len(speaking_mask): | |
| if speaking_mask[i] == 1: | |
| j = i | |
| while j < len(speaking_mask) and speaking_mask[j] == 1: | |
| j += 1 | |
| if j - i < min_duration: | |
| speaking_mask[i:j] = 0 | |
| i = j | |
| else: | |
| i += 1 | |
| # 填充小间隙(<0.15秒) | |
| gap_threshold = int(0.15 * sr / hop_length) | |
| i = 0 | |
| while i < len(speaking_mask) - 1: | |
| if speaking_mask[i] == 1: | |
| j = i + 1 | |
| while j < len(speaking_mask) and speaking_mask[j] == 0: | |
| j += 1 | |
| if j < len(speaking_mask) and j - i < gap_threshold: | |
| speaking_mask[i:j] = 1 | |
| i = j | |
| else: | |
| i += 1 | |
| # 转换为样本级掩码 | |
| speaking_mask_samples = np.repeat(speaking_mask, hop_length) | |
| # 调整长度 | |
| if len(speaking_mask_samples) < len(vocals_audio): | |
| speaking_mask_samples = np.pad(speaking_mask_samples, (0, len(vocals_audio) - len(speaking_mask_samples))) | |
| else: | |
| speaking_mask_samples = speaking_mask_samples[:len(vocals_audio)] | |
| # 平滑边界 | |
| smooth_window = int(0.03 * sr) | |
| if smooth_window > 1: | |
| speaking_mask_samples = np.convolve( | |
| speaking_mask_samples, | |
| np.ones(smooth_window) / smooth_window, | |
| mode='same' | |
| ) | |
| speaking_mask_samples = (speaking_mask_samples > 0.5).astype(np.float32) | |
| return speaking_mask_samples | |
| except Exception as e: | |
| print(f"说话检测失败: {str(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| # 🔴 修复:如果失败,返回全1(假设全是说话),而不是全0 | |
| return np.ones(len(vocals_audio), dtype=np.float32) | |
| def process_audio_full(audio_file, strictness, enable_detection): | |
| """完整的音频分离流程""" | |
| if audio_file is None: | |
| return None, None, None, "❌ 请先上传音频或视频文件" | |
| status_messages = [] | |
| try: | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| # 1. 加载音频 | |
| status_messages.append("📂 正在加载文件...") | |
| yield None, None, None, "\n".join(status_messages) | |
| input_path = audio_file | |
| file_ext = os.path.splitext(input_path)[1].lower() | |
| if file_ext in ['.mp4', '.mov', '.avi', '.mkv', '.flv', '.wmv', '.m4v']: | |
| status_messages.append(f"🎬 检测到视频文件 ({file_ext}),正在提取音频...") | |
| yield None, None, None, "\n".join(status_messages) | |
| audio, sr = load_audio_any_format(input_path, SAMPLE_RATE) | |
| temp_wav = os.path.join(tmpdir, "input.wav") | |
| save_audio(temp_wav, audio, sr) | |
| # 2. Demucs 分离 | |
| status_messages.append("━━━━━━━━━━━━━━━━━━━━") | |
| status_messages.append("🎵 使用 Demucs AI 模型分离人声和伴奏...") | |
| status_messages.append(" (首次运行会下载模型,约500MB)") | |
| yield None, None, None, "\n".join(status_messages) | |
| vocals_path, instrumental_path = run_demucs_separation(temp_wav, tmpdir) | |
| vocals, _ = librosa.load(vocals_path, sr=sr, mono=True) | |
| instrumental, _ = librosa.load(instrumental_path, sr=sr, mono=True) | |
| status_messages.append(" ✅ Demucs 分离完成") | |
| status_messages.append("━━━━━━━━━━━━━━━━━━━━") | |
| # 3. 说话检测 | |
| if enable_detection: | |
| status_messages.append("") | |
| status_messages.append("🎤 正在检测说话片段...") | |
| status_messages.append(" 算法: 多特征融合(能量+零交叉率+频谱+音高)") | |
| status_messages.append(f" 严格度: {strictness:.2f}") | |
| yield None, None, None, "\n".join(status_messages) | |
| # speaking_mask: 1=说话, 0=其他 | |
| speaking_mask = detect_speaking_improved(vocals, sr, strictness) | |
| status_messages.append(" ✅ 检测完成") | |
| else: | |
| status_messages.append("⚠️ 已关闭智能检测,所有人声归入对白") | |
| speaking_mask = np.ones(len(vocals), dtype=np.float32) | |
| # 4. 分离对白和唱歌 | |
| status_messages.append("") | |
| status_messages.append("✂️ 正在分离对白和背景音乐...") | |
| yield None, None, None, "\n".join(status_messages) | |
| singing_mask = 1 - speaking_mask | |
| dialog_vocals = vocals * speaking_mask | |
| singing_vocals = vocals * singing_mask | |
| # 5. 生成最终输出 | |
| output_a = dialog_vocals | |
| # 智能混音 | |
| singing_rms = np.sqrt(np.mean(singing_vocals**2) + 1e-8) | |
| inst_rms = np.sqrt(np.mean(instrumental**2) + 1e-8) | |
| if singing_rms > 1e-6: | |
| singing_gain = inst_rms / singing_rms * 0.8 | |
| singing_gain = np.clip(singing_gain, 0.1, 1.5) | |
| else: | |
| singing_gain = 1.0 | |
| output_b = np.clip(instrumental + singing_vocals * singing_gain, -1.0, 1.0) | |
| output_c = instrumental | |
| # 保存文件 | |
| status_messages.append("💾 正在保存输出文件...") | |
| yield None, None, None, "\n".join(status_messages) | |
| path_a = os.path.join(tmpdir, "A_dialog.wav") | |
| path_b = os.path.join(tmpdir, "B_bgm_with_singing.wav") | |
| path_c = os.path.join(tmpdir, "C_instrumental.wav") | |
| save_audio(path_a, output_a, sr) | |
| save_audio(path_b, output_b, sr) | |
| save_audio(path_c, output_c, sr) | |
| # 统计信息 | |
| total_duration = len(vocals) / sr | |
| dialog_duration = np.sum(speaking_mask) / sr | |
| singing_duration = total_duration - dialog_duration | |
| status_messages.append("") | |
| status_messages.append("━━━━━━━━━━━━━━━━━━━━") | |
| status_messages.append("✅✅✅ 分离完成!") | |
| status_messages.append("━━━━━━━━━━━━━━━━━━━━") | |
| status_messages.append("") | |
| status_messages.append("📊 统计信息:") | |
| status_messages.append(f" 总时长: {total_duration:.1f} 秒") | |
| status_messages.append(f" 对白时长: {dialog_duration:.1f} 秒 ({dialog_duration/total_duration*100:.1f}%)") | |
| status_messages.append(f" 音乐人声时长: {singing_duration:.1f} 秒 ({singing_duration/total_duration*100:.1f}%)") | |
| status_messages.append(f" 运行设备: {DEVICE.upper()}") | |
| status_messages.append("") | |
| status_messages.append("🎯 检测算法: 传统多特征融合") | |
| status_messages.append(" 📈 预期准确率: 75-80%") | |
| status_messages.append(" 🔧 技术: 能量+零交叉率+频谱+音高") | |
| status_messages.append("") | |
| status_messages.append("━━━━━━━━━━━━━━━━━━━━") | |
| yield ( | |
| path_a, | |
| path_b, | |
| path_c, | |
| "\n".join(status_messages) | |
| ) | |
| except Exception as e: | |
| import traceback | |
| error_detail = traceback.format_exc() | |
| error_msg = f"❌ 处理失败:\n{str(e)}\n\n已完成步骤:\n" + "\n".join(status_messages) | |
| error_msg += f"\n\n详细错误:\n{error_detail}" | |
| yield None, None, None, error_msg | |
| # 创建 Gradio 界面 | |
| with gr.Blocks(theme=gr.themes.Soft(), title="AI音频分离工具") as demo: | |
| gr.Markdown(f""" | |
| # 🎵 AI 音频分离工具 - 稳定版 | |
| **当前运行设备**: {DEVICE.upper()} {'✅ GPU加速' if DEVICE == 'cuda' else '⚠️ CPU模式'} | |
| ## 功能说明 | |
| - **A - 纯对白**: 旁白、解说、对话 | |
| - **B - 背景音乐+人声**: 伴奏 + 唱歌 + Rap + 和声 | |
| - **C - 纯伴奏**: 去除所有人声的纯音乐 | |
| 💡 **核心技术**: | |
| - Demucs 4.0 深度学习模型(人声/伴奏分离) | |
| - 多特征融合算法(能量、零交叉率、频谱、音高) | |
| - **准确率 75-80%,稳定快速** | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| audio_input = gr.File( | |
| label="📁 上传音频或视频文件", | |
| file_types=["audio", "video"], | |
| type="filepath" | |
| ) | |
| gr.Markdown(""" | |
| **支持格式**: | |
| - 音频: MP3, WAV, M4A, FLAC, OGG, AAC | |
| - 视频: MP4, MOV, AVI, MKV, FLV, WMV | |
| """) | |
| with gr.Accordion("⚙️ 高级设置", open=True): | |
| enable_detection = gr.Checkbox( | |
| value=True, | |
| label="🎯 启用智能说话检测(推荐开启)" | |
| ) | |
| strictness = gr.Slider( | |
| 0.4, 0.8, value=0.6, step=0.05, | |
| label="检测严格度" | |
| ) | |
| gr.Markdown(""" | |
| **调节建议**: | |
| - **0.45-0.55**: 宽松(更多人声归入对白) | |
| - **0.60-0.65**: 平衡(**推荐**,默认0.60) | |
| - **0.70-0.80**: 严格(只保留明确的说话) | |
| **效果不满意?试试这样调**: | |
| - 说话被误判为唱歌 → 降低到 0.50-0.55 | |
| - 唱歌被误判为说话 → 提高到 0.70-0.75 | |
| """) | |
| process_btn = gr.Button("🚀 开始智能分离", variant="primary", size="lg") | |
| with gr.Column(scale=1): | |
| status_box = gr.Textbox( | |
| label="📊 处理状态", | |
| lines=20, | |
| max_lines=25, | |
| show_label=True | |
| ) | |
| gr.Markdown("---") | |
| gr.Markdown("## 📥 分离结果") | |
| with gr.Row(): | |
| output_a = gr.Audio(label="🎤 A - 纯对白(旁白/解说)", type="filepath") | |
| output_b = gr.Audio(label="🎵 B - 背景音乐+人声(含唱歌/Rap)", type="filepath") | |
| output_c = gr.Audio(label="🎹 C - 纯伴奏", type="filepath") | |
| process_btn.click( | |
| fn=process_audio_full, | |
| inputs=[audio_input, strictness, enable_detection], | |
| outputs=[output_a, output_b, output_c, status_box] | |
| ) | |
| gr.Markdown(""" | |
| --- | |
| ## 📌 使用说明 | |
| ### 🎯 本版本特点 | |
| - ✅ **稳定快速**:无需下载外部模型 | |
| - ✅ **准确率 75-80%**:适合大部分场景 | |
| - ✅ **修复BUG**:确保对白始终有人声 | |
| - ✅ **启动快速**:3-5分钟构建完成 | |
| ### 💡 如何获得最佳效果 | |
| 1. **优先用默认值 0.60** 测试 | |
| 2. 根据结果微调严格度: | |
| - 对白太少 → 降低到 0.50-0.55 | |
| - 对白太多 → 提高到 0.70-0.75 | |
| 3. 每次调整 0.05 观察变化 | |
| ### ⚠️ 技术限制 | |
| 传统算法准确率有限,以下情况仍有挑战: | |
| - 说唱风格旁白 | |
| - 快速说话 + 背景音乐 | |
| - 唱歌式说话 | |
| ### 🔬 如果需要更高准确率 | |
| 可以考虑: | |
| - 使用专业软件(如 Adobe Audition) | |
| - 本地部署并手动下载 Silero VAD 模型 | |
| - 训练深度学习分类模型 | |
| """) | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=7860) |