VoiceDialogue / src /voice_dialogue /services /audio_player_service.py
liumaolin
refactor(core): Architecturally decouple Audio, ASR, and TTS modules
60f8238
import time
from multiprocessing import Queue
from queue import Empty
from typing import Optional
from voice_dialogue.audio.player import play_audio
from voice_dialogue.core.base import BaseThread
from voice_dialogue.core.constants import voice_state_manager, silence_over_threshold_event
from voice_dialogue.models.voice_task import VoiceTask, AnswerDisplayMessage
from voice_dialogue.services.mixins import TaskStatusMixin, HistoryMixin, PerformanceLogMixin
from voice_dialogue.utils.logger import logger
class AudioPlayerService(BaseThread, TaskStatusMixin, HistoryMixin, PerformanceLogMixin):
"""音频流播放器 - 负责播放生成的音频并管理播放状态"""
def __init__(
self, group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None,
audio_playing_queue: Queue,
websocket_message_queue: Queue = None,
):
super().__init__(group, target, name, args, kwargs, daemon=daemon)
self.audio_playing_queue: Queue = audio_playing_queue
self.websocket_message_queue: Queue = websocket_message_queue
def _get_task_from_queue(self) -> Optional[VoiceTask]:
"""从音频播放队列中获取任务。"""
# 使用阻塞式获取,当队列为空时,run循环中的Empty异常会处理它
return self.audio_playing_queue.get(block=True, timeout=1)
def _process_task(self, voice_task: VoiceTask):
"""处理单个音频播放任务。"""
# 这个内部循环用于等待一个外部事件(用户静音),同时检查任务是否被中断
while not self.is_exited:
if self.handle_user_speaking_interruption(voice_task):
return # 任务被中断,结束处理
if not self.is_task_valid(voice_task):
logger.info(f"音频播放: 任务<{voice_task.id}> 无效")
return # 任务无效,结束处理
# 等待用户彻底静音的信号
if not silence_over_threshold_event.is_set():
time.sleep(0.05) # 短暂等待,避免CPU空转
continue
# --- 开始播放逻辑 ---
if self.websocket_message_queue:
self.websocket_message_queue.put_nowait(
AnswerDisplayMessage(
session_id=voice_task.session_id,
task_id=voice_task.id,
answer_index=voice_task.answer_index,
answer=voice_task.answer_sentence,
)
)
self.log_task_performance(voice_task, "音频播放")
self.update_chat_history(voice_task)
voice_state_manager.set_audio_playing(voice_task.id)
voice_state_manager.reset_task_id()
if not self.is_stopped:
audio_data, sample_rate = voice_task.tts_generated_sentence_audio
play_audio(audio_data, sample_rate)
# 任务处理完毕,跳出内部循环
break
def run(self):
"""
主运行循环。
不断从队列获取任务,并调用_process_task进行处理。
"""
if not hasattr(self, 'is_ready'):
logger.warning(f"{self.__class__.__name__} 中缺少 'is_ready' 属性。")
self.is_ready = True
while not self.is_exited:
try:
task = self._get_task_from_queue()
if task:
self._process_task(task)
except Empty:
# 队列在1秒内没有新项目,这是正常现象,继续循环
continue
except Exception as e:
logger.error(f"在 AudioStreamPlayer 环节发生错误: {e}")
time.sleep(0.1) # 发生未知错误时短暂休眠