liumaolin
refactor(core): Architecturally decouple Audio, ASR, and TTS modules
60f8238
import time
import typing
from queue import Queue, Empty
import numpy as np
from voice_dialogue.core.base import BaseThread
from voice_dialogue.core.constants import user_still_speaking_event, voice_state_manager, dropped_audio_cache
from voice_dialogue.models.voice_task import VoiceTask
from voice_dialogue.services.mixins import PerformanceLogMixin
from voice_dialogue.utils.cache import LRUCacheDict
from voice_dialogue.asr import asr_manager
class ASRService(BaseThread, PerformanceLogMixin):
def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None,
user_voice_queue: Queue,
transcribed_text_queue: Queue,
language: typing.Literal["auto", "zh", "en"]):
super().__init__(group, target, name, args, kwargs, daemon=daemon)
self.language = language
self.user_voice_queue = user_voice_queue
self.transcribed_text_queue = transcribed_text_queue
self.cached_user_questions = LRUCacheDict(maxsize=10)
def run(self):
self.client = asr_manager.create_asr(self.language)
self.client.setup()
self.client.warmup()
self.is_ready = True
while not self.is_exited:
try:
voice_task: VoiceTask = self.user_voice_queue.get(block=True, timeout=1)
except Empty:
continue
voice_task.language = self.language
voice_task.whisper_start_time = time.time()
user_voice: np.array = voice_task.user_voice
transcribed_text = self.client.transcribe(user_voice)
if not transcribed_text.strip():
voice_state_manager.reset_task_id()
continue
self.log_task_user_question(voice_task)
voice_task.whisper_end_time = time.time()
task_id = voice_task.id
cached_user_question = self.cached_user_questions.get(task_id, [])
if voice_task.is_over_audio_frames_threshold:
cached_user_question.append(transcribed_text)
self.cached_user_questions[task_id] = cached_user_question
answer_id = voice_task.answer_id
if user_still_speaking_event.is_set():
voice_state_manager.drop_audio_task(task_id)
dropped_audio_cache[answer_id] = answer_id
user_still_speaking_event.clear()
continue
if answer_id in dropped_audio_cache:
continue
voice_task.transcribed_text = ' '.join(cached_user_question) if cached_user_question else transcribed_text
voice_task.user_voice = []
self.transcribed_text_queue.put(voice_task.model_copy())