| | import time |
| | from multiprocessing import Queue |
| | from queue import Empty |
| | from typing import Optional |
| |
|
| | from voice_dialogue.audio.player import play_audio |
| | from voice_dialogue.core.base import BaseThread |
| | from voice_dialogue.core.constants import voice_state_manager, silence_over_threshold_event |
| | from voice_dialogue.models.voice_task import VoiceTask, AnswerDisplayMessage |
| | from voice_dialogue.services.mixins import TaskStatusMixin, HistoryMixin, PerformanceLogMixin |
| | from voice_dialogue.utils.logger import logger |
| |
|
| |
|
| | class AudioPlayerService(BaseThread, TaskStatusMixin, HistoryMixin, PerformanceLogMixin): |
| | """音频流播放器 - 负责播放生成的音频并管理播放状态""" |
| |
|
| | def __init__( |
| | self, group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None, |
| | audio_playing_queue: Queue, |
| | websocket_message_queue: Queue = None, |
| | ): |
| | super().__init__(group, target, name, args, kwargs, daemon=daemon) |
| | self.audio_playing_queue: Queue = audio_playing_queue |
| | self.websocket_message_queue: Queue = websocket_message_queue |
| |
|
| | def _get_task_from_queue(self) -> Optional[VoiceTask]: |
| | """从音频播放队列中获取任务。""" |
| | |
| | return self.audio_playing_queue.get(block=True, timeout=1) |
| |
|
| | def _process_task(self, voice_task: VoiceTask): |
| | """处理单个音频播放任务。""" |
| | |
| | while not self.is_exited: |
| | if self.handle_user_speaking_interruption(voice_task): |
| | return |
| |
|
| | if not self.is_task_valid(voice_task): |
| | logger.info(f"音频播放: 任务<{voice_task.id}> 无效") |
| | return |
| |
|
| | |
| | if not silence_over_threshold_event.is_set(): |
| | time.sleep(0.05) |
| | continue |
| |
|
| | |
| | if self.websocket_message_queue: |
| | self.websocket_message_queue.put_nowait( |
| | AnswerDisplayMessage( |
| | session_id=voice_task.session_id, |
| | task_id=voice_task.id, |
| | answer_index=voice_task.answer_index, |
| | answer=voice_task.answer_sentence, |
| | ) |
| | ) |
| |
|
| | self.log_task_performance(voice_task, "音频播放") |
| |
|
| | self.update_chat_history(voice_task) |
| |
|
| | voice_state_manager.set_audio_playing(voice_task.id) |
| | voice_state_manager.reset_task_id() |
| |
|
| | if not self.is_stopped: |
| | audio_data, sample_rate = voice_task.tts_generated_sentence_audio |
| | play_audio(audio_data, sample_rate) |
| |
|
| | |
| | break |
| |
|
| | def run(self): |
| | """ |
| | 主运行循环。 |
| | 不断从队列获取任务,并调用_process_task进行处理。 |
| | """ |
| | if not hasattr(self, 'is_ready'): |
| | logger.warning(f"{self.__class__.__name__} 中缺少 'is_ready' 属性。") |
| |
|
| | self.is_ready = True |
| |
|
| | while not self.is_exited: |
| | try: |
| | task = self._get_task_from_queue() |
| | if task: |
| | self._process_task(task) |
| |
|
| | except Empty: |
| | |
| | continue |
| | except Exception as e: |
| | logger.error(f"在 AudioStreamPlayer 环节发生错误: {e}") |
| | time.sleep(0.1) |
| |
|