# -*- coding: utf-8 -*- """ 音频处理模块 包含 Silero VAD 切片和 Whisper 转录功能 """ import os import logging from pathlib import Path from typing import Optional, Callable, List, Tuple logger = logging.getLogger(__name__) class AudioProcessor: """音频处理器,整合VAD切片和Whisper转录""" def __init__( self, models_dir: str, progress_callback: Optional[Callable[[str], None]] = None ): """ 初始化音频处理器 参数: models_dir: 模型目录 progress_callback: 进度回调函数 """ self.models_dir = models_dir self.progress_callback = progress_callback self.vad_model = None self.whisper_pipe = None def _log(self, msg: str): """记录日志并回调""" logger.info(msg) if self.progress_callback: self.progress_callback(msg) def load_vad_model(self): """加载 Silero VAD 模型""" if self.vad_model is not None: return self._log("正在加载 Silero VAD 模型...") from src.silero_vad_downloader import ensure_vad_model import torch # 确保模型已下载 model_path = ensure_vad_model(self.models_dir, self.progress_callback) # 加载模型 self.vad_model, utils = torch.hub.load( repo_or_dir='snakers4/silero-vad', model='silero_vad', force_reload=False, onnx=True ) self.vad_utils = utils self._log("Silero VAD 模型加载完成") def load_whisper_model(self, model_name: str = "openai/whisper-small"): """ 加载 Whisper 模型 参数: model_name: 模型名称 """ if self.whisper_pipe is not None: return self._log(f"正在加载 Whisper 模型: {model_name}...") from transformers import pipeline import torch cache_dir = os.path.join(self.models_dir, "whisper") os.makedirs(cache_dir, exist_ok=True) os.environ["HF_HOME"] = cache_dir os.environ["TRANSFORMERS_CACHE"] = cache_dir self.whisper_pipe = pipeline( "automatic-speech-recognition", model=model_name, torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, device_map="auto", model_kwargs={"cache_dir": cache_dir} ) self._log("Whisper 模型加载完成") def _load_audio_ffmpeg(self, audio_path: str) -> tuple: """ 使用 ffmpeg 读取音频文件 返回: (audio, sr): 单声道 float32 numpy 数组和采样率 """ import subprocess import numpy as np import json # 使用 ffprobe 获取采样率 probe_cmd = [ 'ffprobe', '-v', 'quiet', '-print_format', 'json', '-show_streams', audio_path ] probe_result = subprocess.run(probe_cmd, capture_output=True, text=True) sr = 44100 # 默认采样率 if probe_result.returncode == 0: try: info = json.loads(probe_result.stdout) for stream in info.get('streams', []): if stream.get('codec_type') == 'audio': sr = int(stream.get('sample_rate', 44100)) break except (json.JSONDecodeError, KeyError): pass # 使用 ffmpeg 转换为单声道 PCM cmd = [ 'ffmpeg', '-i', audio_path, '-f', 's16le', '-acodec', 'pcm_s16le', '-ac', '1', '-ar', str(sr), '-v', 'quiet', '-' ] result = subprocess.run(cmd, capture_output=True) if result.returncode != 0: raise RuntimeError(f"ffmpeg 读取音频失败: {audio_path}") audio = np.frombuffer(result.stdout, dtype=np.int16).astype(np.float32) / 32768.0 return audio, sr def vad_split( self, audio_path: str, output_dir: str, min_speech_duration_ms: int = 250, min_silence_duration_ms: int = 100, threshold: float = 0.5 ) -> List[str]: """ 使用 VAD 对音频进行切片 参数: audio_path: 输入音频路径 output_dir: 输出目录 min_speech_duration_ms: 最小语音时长(毫秒) min_silence_duration_ms: 最小静音时长(毫秒) threshold: VAD阈值 返回: 切片文件路径列表 """ import torch import torchaudio import soundfile as sf self.load_vad_model() basename = Path(audio_path).stem os.makedirs(output_dir, exist_ok=True) self._log(f"正在处理: {audio_path}") # 使用 ffmpeg 读取音频 wav, sr = self._load_audio_ffmpeg(audio_path) wav = torch.from_numpy(wav).float() # 重采样到16kHz (VAD要求) if sr != 16000: resampler = torchaudio.transforms.Resample(sr, 16000) wav_16k = resampler(wav) sr_vad = 16000 else: wav_16k = wav sr_vad = sr # 获取语音时间戳 get_speech_timestamps = self.vad_utils[0] speech_timestamps = get_speech_timestamps( wav_16k, self.vad_model, threshold=threshold, min_speech_duration_ms=min_speech_duration_ms, min_silence_duration_ms=min_silence_duration_ms, sampling_rate=sr_vad ) self._log(f"检测到 {len(speech_timestamps)} 个语音片段") # 切片并保存 output_files = [] for i, ts in enumerate(speech_timestamps): # 转换回原始采样率的索引 start = int(ts['start'] * sr / sr_vad) end = int(ts['end'] * sr / sr_vad) segment = wav[start:end].numpy() output_path = os.path.join(output_dir, f"{basename}_{i:04d}.wav") sf.write(output_path, segment, sr, subtype='PCM_16') output_files.append(output_path) self._log(f"切片完成,共 {len(output_files)} 个文件") return output_files def transcribe(self, audio_path: str, language: str = "chinese") -> str: """ 使用 Whisper 转录音频 参数: audio_path: 音频文件路径 language: 语言 返回: 转录文本 """ if self.whisper_pipe is None: raise RuntimeError("Whisper 模型未加载") result = self.whisper_pipe( audio_path, generate_kwargs={"language": language} ) return result["text"].strip() def generate_lab(self, audio_path: str, text: str) -> str: """ 生成 .lab 文件 参数: audio_path: 音频文件路径 text: 转录文本 返回: lab文件路径 """ lab_path = os.path.splitext(audio_path)[0] + ".lab" with open(lab_path, "w", encoding="utf-8") as f: f.write(text) return lab_path def process_full_pipeline( self, input_path: str, output_dir: str, language: str = "chinese", whisper_model: str = "openai/whisper-small" ) -> Tuple[bool, str, List[str]]: """ 完整处理流程: VAD切片 → Whisper转录 → 生成.lab 参数: input_path: 输入音频文件或目录 output_dir: 输出目录 language: 转录语言 whisper_model: Whisper模型名称 返回: (成功标志, 消息, 输出文件列表) """ try: # 加载模型 self.load_vad_model() self.load_whisper_model(whisper_model) # 收集输入文件 input_files = [] if os.path.isfile(input_path): input_files = [input_path] elif os.path.isdir(input_path): for f in os.listdir(input_path): if f.lower().endswith(('.wav', '.mp3', '.flac', '.ogg', '.m4a')): input_files.append(os.path.join(input_path, f)) if not input_files: return False, "未找到音频文件", [] self._log(f"找到 {len(input_files)} 个音频文件") # 创建输出目录 slices_dir = os.path.join(output_dir, "slices") os.makedirs(slices_dir, exist_ok=True) all_output_files = [] for idx, audio_file in enumerate(input_files): self._log(f"处理 [{idx+1}/{len(input_files)}]: {os.path.basename(audio_file)}") # VAD切片 slice_files = self.vad_split(audio_file, slices_dir) # 转录每个切片 for slice_file in slice_files: self._log(f"转录: {os.path.basename(slice_file)}") text = self.transcribe(slice_file, language) if text: lab_path = self.generate_lab(slice_file, text) self._log(f"生成: {os.path.basename(lab_path)} -> {text[:30]}...") all_output_files.append(slice_file) else: self._log(f"跳过空转录: {os.path.basename(slice_file)}") return True, f"处理完成,共 {len(all_output_files)} 个切片", all_output_files except Exception as e: logger.error(f"处理失败: {e}", exc_info=True) return False, str(e), [] def process_audio_pipeline( input_path: str, output_dir: str, models_dir: str, language: str = "chinese", whisper_model: str = "openai/whisper-small", progress_callback: Optional[Callable[[str], None]] = None ) -> Tuple[bool, str, List[str]]: """ 便捷函数:执行完整音频处理流程 参数: input_path: 输入音频文件或目录 output_dir: 输出目录 models_dir: 模型目录 language: 转录语言 whisper_model: Whisper模型名称 progress_callback: 进度回调 返回: (成功标志, 消息, 输出文件列表) """ processor = AudioProcessor(models_dir, progress_callback) return processor.process_full_pipeline(input_path, output_dir, language, whisper_model)