File size: 11,752 Bytes
7b64dcd 60f8238 511ff0c 92bb56d 7b64dcd 511ff0c 2ebe57f 851495c 7b64dcd d662661 7b64dcd 57b0084 4e2e3d8 57b0084 7b64dcd 4e2e3d8 7b64dcd 57b0084 4e2e3d8 7b64dcd 15891ec 7b64dcd 259c23b 7b64dcd 57b0084 2ebe57f 57b0084 4e2e3d8 7b64dcd 4e2e3d8 2ebe57f 4e2e3d8 2ebe57f 4e2e3d8 57b0084 7b64dcd 2ebe57f 7b64dcd 5284873 7b64dcd 5284873 7b64dcd 15891ec 7b64dcd 15891ec 7b64dcd 15891ec 7b64dcd 92bb56d 7b64dcd e80f558 7b64dcd f5226c0 7b64dcd 2c7e742 7b64dcd 851495c 7b64dcd |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 |
"""
语音状态监控模块
该模块包含 SpeechStateMonitor 类,用于实时监控用户的语音状态,
包括语音活动检测、静音检测、语音任务管理等功能。
"""
import time
import uuid
from multiprocessing import Queue
from queue import Empty
import librosa
import numpy as np
from voice_dialogue.audio.vad import SileroVAD
from voice_dialogue.core.base import BaseThread
from voice_dialogue.core.constants import (
voice_state_manager, silence_over_threshold_event, user_still_speaking_event, session_manager
)
from voice_dialogue.core.enums import AudioState
from voice_dialogue.models.voice_task import VoiceTask
from voice_dialogue.services.utils import normalize_audio_frame, calculate_audio_duration
from voice_dialogue.utils.logger import logger
class SpeechMonitorConfig:
"""语音监控配置类"""
MIN_AUDIO_AMPLITUDE = 0.01 # 最小音频振幅阈值
QUEUE_TIMEOUT = 0.1 # 队列获取超时时间(秒)
# 时间阈值(毫秒)
ACTIVE_FRAME_THRESHOLD = 0.1 * 1000 # 连续活跃帧数阈值
USER_SILENCE_THRESHOLD = 1 * 1000 # 用户静音阈值
SILENCE_THRESHOLD = 0.3 * 1000 # 静音检测阈值
AUDIO_FRAMES_THRESHOLD = 5 * 1000 # 音频帧时长阈值
class SpeechStateMonitor(BaseThread):
"""
语音状态监控器
负责实时监控用户的语音状态,包括:
- 语音活动检测
- 静音检测和处理
- 语音任务的创建和管理
- 音频帧的缓存和处理
"""
def __init__(
self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None,
audio_frame_queue: Queue,
user_voice_queue: Queue,
enable_vad: bool = False,
):
"""
初始化语音状态监控器
Args:
audio_frame_queue: 音频帧队列
user_voice_queue: 用户语音队列
enable_vad: 是否启用语音活动检测
"""
super().__init__(group, target, name, args, kwargs, daemon=daemon)
self.audio_frame_queue = audio_frame_queue
self.user_voice_queue = user_voice_queue
self.sample_rate = 16000
self._enable_vad = enable_vad
self._vad_instance = None
if self._enable_vad:
self._vad_instance = SileroVAD()
# 配置参数
self.config = SpeechMonitorConfig()
# 重置状态
self._reset_monitoring_state()
def _reset_monitoring_state(self):
"""重置监控状态"""
self.silence_audio_frame_count = 0
self.active_audio_frame_duration = 0
self.user_silence_duration = 0
self.task_id = None
def _initialize_new_task(self):
"""初始化新的语音任务"""
if not voice_state_manager.task_id:
voice_state_manager.create_task_id()
voice_state_manager.reset_interrupt_task_id()
self.task_id = voice_state_manager.task_id
silence_over_threshold_event.clear()
user_still_speaking_event.clear()
# 返回初始状态
return np.array([]), False, True # audio_frames, is_audio_sent_for_processing, is_audio_frames_empty
def _handle_task_cleanup(self):
"""处理任务清理"""
if voice_state_manager.get_audio_task_state(self.task_id) == AudioState.DROP:
voice_state_manager.cleanup_task_state(self.task_id)
return True
return False
def _check_silence_threshold(self):
"""检查用户静音阈值"""
if self.user_silence_duration >= self.config.USER_SILENCE_THRESHOLD:
silence_over_threshold_event.set()
def _normalize_audio_frame(self, data: bytes) -> np.ndarray:
"""将 int16 格式的音频字节数据转换为 [-1.0, 1.0] 范围的 numpy 浮点数组。"""
return normalize_audio_frame(data)
def _detect_speech(self, audio_frame: np.ndarray) -> bool:
return self._vad_instance.is_voice_active(audio_frame, self.sample_rate)
def _get_audio_frame_from_queue(self):
"""从队列获取音频帧"""
try:
if self._enable_vad:
data = self.audio_frame_queue.get(block=True, timeout=self.config.QUEUE_TIMEOUT)
audio_frame = self._normalize_audio_frame(data)
is_voice_active = self._detect_speech(audio_frame)
else:
data, is_voice_active = self.audio_frame_queue.get(block=True, timeout=self.config.QUEUE_TIMEOUT)
audio_frame = self._normalize_audio_frame(data)
return audio_frame, is_voice_active
except Empty:
return None, None
def _calculate_frame_duration_ms(self, audio_frame):
"""计算音频帧时长(毫秒)"""
return calculate_audio_duration(audio_data=audio_frame, sample_rate=self.sample_rate) * 1000
def _process_active_voice_frame(self, audio_frame: np.ndarray):
"""
处理活跃语音帧
Args:
audio_frame: 音频帧数据
Returns:
bool: 是否为有效的活跃语音帧
"""
if np.max(audio_frame) <= self.config.MIN_AUDIO_AMPLITUDE:
return False
# 重置静音计时
self.user_silence_duration = 0
duration = self._calculate_frame_duration_ms(audio_frame)
self.active_audio_frame_duration += duration
# 检查是否需要中断当前任务
if self.active_audio_frame_duration > self.config.ACTIVE_FRAME_THRESHOLD:
voice_state_manager.interrupt_task_id = self.task_id
return True
def _process_silence_frame(self, audio_frame, audio_frames, is_audio_frames_empty, is_audio_sent_for_processing):
"""
处理静音帧
Args:
audio_frame: 音频帧数据
audio_frames: 当前音频帧缓存
is_audio_frames_empty: 音频帧缓存是否为空
is_audio_sent_for_processing: 是否已发送音频进行处理
Returns:
tuple: (更新后的音频帧缓存, 是否需要继续处理)
"""
self.active_audio_frame_duration = 0
duration = self._calculate_frame_duration_ms(audio_frame)
if is_audio_frames_empty:
# 处理空缓存的静音帧
audio_frames = np.append(audio_frames, audio_frame)
# 维持固定长度的静音缓存
silence_duration = librosa.get_duration(y=audio_frames, sr=self.sample_rate) * 1000
if silence_duration >= self.config.SILENCE_THRESHOLD:
cached_slice = len(audio_frames) - int(self.config.SILENCE_THRESHOLD * (self.sample_rate / 1000))
audio_frames = audio_frames[cached_slice:]
user_still_speaking_event.clear()
if is_audio_sent_for_processing:
self.user_silence_duration += duration
return audio_frames, True # 需要继续处理
# 处理非空缓存的静音帧
self.user_silence_duration += duration
return audio_frames, False # 不需要继续处理
def _update_speaking_state(self, is_voice_active, is_audio_sent_for_processing):
"""更新用户说话状态"""
if is_voice_active and is_audio_sent_for_processing:
user_still_speaking_event.set()
def _create_voice_task(self, audio_frames):
"""
创建语音任务
Args:
audio_frames: 音频帧数据
Returns:
VoiceTask: 创建的语音任务
"""
voice_task = VoiceTask(id=self.task_id, session_id=session_manager.current_id)
voice_task.answer_id = f'{uuid.uuid4()}'
voice_task.user_voice = audio_frames.copy()
voice_task.send_time = time.time()
# 检查音频时长是否超过阈值
audio_duration = librosa.get_duration(y=audio_frames, sr=self.sample_rate) * 1000
if audio_duration >= self.config.AUDIO_FRAMES_THRESHOLD:
voice_task.is_over_audio_frames_threshold = True
return voice_task
def _should_send_voice_task(self, is_audio_sent_for_processing):
"""判断是否应该发送语音任务"""
return self.is_user_in_silence() and not is_audio_sent_for_processing
def is_user_in_silence(self):
"""检查用户是否处于静音状态"""
return self.user_silence_duration >= self.config.SILENCE_THRESHOLD
def run(self):
"""
主运行循环 - 监控语音状态并处理音频帧
"""
self.is_ready = True
# 初始化状态变量
audio_frames = np.array([])
is_audio_sent_for_processing = False
is_audio_frames_empty = True
while not self.is_exited:
try:
# 1. 管理任务生命周期
self.task_id = voice_state_manager.task_id
if not self.task_id:
audio_frames, is_audio_sent_for_processing, is_audio_frames_empty = self._initialize_new_task()
# 2. 处理任务清理
if self._handle_task_cleanup():
is_audio_sent_for_processing = False
continue
# 3. 检查静音阈值
self._check_silence_threshold()
# 4. 获取音频帧
audio_frame, is_voice_active = self._get_audio_frame_from_queue()
if audio_frame is None and is_voice_active is None:
continue
# 5. 处理空音频帧
if audio_frame is None:
if is_audio_sent_for_processing:
self.silence_audio_frame_count += 1
continue
# 6. 处理音频帧内容
if is_voice_active:
# 处理活跃语音帧
if self._process_active_voice_frame(audio_frame):
is_audio_frames_empty = False
audio_frames = np.append(audio_frames, audio_frame)
else:
# 处理静音帧
audio_frames, should_continue = self._process_silence_frame(
audio_frame, audio_frames, is_audio_frames_empty, is_audio_sent_for_processing
)
if should_continue:
continue
is_audio_frames_empty = False
audio_frames = np.append(audio_frames, audio_frame)
# 7. 更新说话状态
self._update_speaking_state(is_voice_active, is_audio_sent_for_processing)
# 8. 检查是否需要发送语音任务
if self._should_send_voice_task(is_audio_sent_for_processing):
voice_task = self._create_voice_task(audio_frames)
self.user_voice_queue.put(voice_task.model_copy(deep=True))
# 更新状态
is_audio_sent_for_processing = True
user_still_speaking_event.clear()
# 如果音频超过时长阈值,重置缓存
if hasattr(voice_task, 'is_over_audio_frames_threshold') and \
voice_task.is_over_audio_frames_threshold:
audio_frames = np.array([])
is_audio_frames_empty = True
except Exception as e:
# 错误处理,防止线程崩溃
logger.error(f"SpeechStateMonitor 处理错误: {e}")
time.sleep(0.1) # 避免错误循环
continue
|