Spaces:
Runtime error
Runtime error
| import json | |
| import os | |
| import asyncio | |
| from vosk import SetLogLevel, Model, KaldiRecognizer | |
| from multiprocessing import Process, Queue | |
| from queue import Empty | |
| SetLogLevel(-1) # mutes vosk verbosity | |
| class SpeechToTextVosk: | |
| def __init__(self, model='small', audio_bit_rate=16000) -> None: | |
| self.model = model | |
| self.audio_bit_rate = audio_bit_rate | |
| # Create a Queue for inter-process communication | |
| self.queue = Queue() | |
| self.result_queue = Queue() | |
| # Create and start a new Process with the worker function | |
| self.process = Process(target=self.worker) | |
| self.process.start() | |
| def worker(self): | |
| # load vosk model | |
| # get path of current file | |
| current_file_path = os.path.abspath(__file__) | |
| current_directory = os.path.dirname(current_file_path) | |
| _path = os.path.join(current_directory, 'models', 'vosk', self.model) | |
| model_voice = Model(_path) | |
| vosk = KaldiRecognizer(model_voice, self.audio_bit_rate) | |
| while True: | |
| try: | |
| # Get the next item from the queue. Blocks for 1s if necessary. | |
| data = self.queue.get(timeout=1) | |
| # Stop the worker if the sentinel None is received | |
| if data is None: | |
| break | |
| text, speaker_finished = self._process_speech(vosk, data) | |
| # put the result into result_queue | |
| self.result_queue.put((text, speaker_finished)) | |
| except Empty: | |
| pass | |
| def add_speech_bytes(self, data: bytearray): | |
| self.queue.put(data) | |
| def _process_speech(self, vosk: KaldiRecognizer, data: bytearray) -> tuple[str, bool]: | |
| text = '' | |
| speaker_finished = False | |
| if vosk.AcceptWaveform(data): | |
| result = vosk.Result() | |
| result_json = json.loads(result) | |
| text = result_json['text'] | |
| speaker_finished = True | |
| else: | |
| result = vosk.PartialResult() | |
| result_json = json.loads(result) | |
| text = result_json['partial'] | |
| return text, speaker_finished | |
| def get_text(self): | |
| text = '' | |
| speaker_finished = False | |
| while not self.result_queue.empty(): | |
| result, speaker_finished = self.result_queue.get() | |
| text += result | |
| if speaker_finished: | |
| break | |
| return (text, speaker_finished) | |
| def get_audio_bit_rate(self): | |
| return self.audio_bit_rate | |
| def shutdown(self): | |
| # Send sentinel value to stop the worker | |
| self.queue.put(None) | |
| # Wait for the worker process to finish | |
| self.process.join() | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, exc_type, exc_value, traceback): | |
| self.shutdown() | |
| def __del__(self): | |
| self.shutdown() | |