Mike W commited on
Commit
046ebfc
·
1 Parent(s): 2e0855f

High risk async refactor

Browse files
Files changed (2) hide show
  1. server.py +2 -2
  2. translator.py +54 -132
server.py CHANGED
@@ -86,8 +86,8 @@ async def websocket_endpoint(websocket: WebSocket):
86
 
87
  try:
88
  # Start translation and audio processing tasks
89
- print("[SERVER] Calling translator.start_translation()...")
90
- translator.start_translation()
91
  print("[SERVER] translator.start_translation() returned. Creating tasks...")
92
  output_sender_task = asyncio.create_task(
93
  audio_output_sender(websocket, translator.output_queue)
 
86
 
87
  try:
88
  # Start translation and audio processing tasks
89
+ print("[SERVER] Awaiting translator.start_translation()...")
90
+ await translator.start_translation()
91
  print("[SERVER] translator.start_translation() returned. Creating tasks...")
92
  output_sender_task = asyncio.create_task(
93
  audio_output_sender(websocket, translator.output_queue)
translator.py CHANGED
@@ -1,11 +1,8 @@
1
  import asyncio
2
  import base64
3
  import json
4
- import queue
5
- import threading
6
- import time
7
  from collections import deque
8
- from typing import Dict, Optional
9
 
10
  import deepl
11
  import websockets
@@ -22,18 +19,11 @@ class VoiceTranslator:
22
  self.audio_rate = 16000
23
  self.audio_chunk = 1024
24
 
25
- self.input_queue = asyncio.Queue()
26
- self.output_queue = asyncio.Queue()
27
-
28
- self.lang_queues: Dict[str, queue.Queue] = {
29
- "en-US": queue.Queue(),
30
- "fr-FR": queue.Queue(),
31
- }
32
  self.prebuffer = deque(maxlen=12)
33
 
34
- self.is_recording = False
35
- self.is_speaking = False
36
- self.speaking_event = threading.Event()
37
 
38
  self.last_processed_transcript = ""
39
  self.last_tts_text_en = ""
@@ -41,48 +31,19 @@ class VoiceTranslator:
41
  self.min_confidence_threshold = 0.5
42
 
43
  self.async_loop = asyncio.new_event_loop()
44
- self.async_thread = threading.Thread(target=self._run_async_loop, daemon=True)
45
- self.async_thread.start()
46
 
47
  self._tts_queue: "asyncio.Queue[Optional[dict]]" = asyncio.Queue()
48
  self._tts_consumer_task: Optional[asyncio.Task] = None
49
  self._process_audio_task: Optional[asyncio.Task] = None
50
-
51
- def _start_consumer():
52
- self._tts_consumer_task = asyncio.create_task(self._tts_consumer())
53
-
54
- self.async_loop.call_soon_threadsafe(_start_consumer)
55
-
56
- self.stt_threads: Dict[str, threading.Thread] = {}
57
- self.restart_events: Dict[str, threading.Event] = {
58
- "en-US": threading.Event(),
59
- "fr-FR": threading.Event(),
60
- }
61
- self.stream_cancel_events: Dict[str, threading.Event] = {
62
- "en-US": threading.Event(),
63
- "fr-FR": threading.Event(),
64
- }
65
- self._stream_started = {"en-US": False, "fr-FR": False}
66
  self._tts_job_counter = 0
67
 
68
- def _run_async_loop(self):
69
- asyncio.set_event_loop(self.async_loop)
70
- try:
71
- self.async_loop.run_forever()
72
- except Exception as e:
73
- print(f"[async_loop] stopped with error: {e}")
74
-
75
  async def _process_input_audio(self):
76
  print("🎤 Audio processing task started...")
77
- while self.is_recording:
78
  try:
79
  data = await self.input_queue.get()
80
- if data is None:
81
- break
82
- if not self.speaking_event.is_set():
83
- self.prebuffer.append(data)
84
- self.lang_queues["en-US"].put(data)
85
- self.lang_queues["fr-FR"].put(data)
86
  except asyncio.CancelledError:
87
  break
88
  except Exception as e:
@@ -95,12 +56,7 @@ class VoiceTranslator:
95
  f"/stream-input?model_id=eleven_flash_v2_5&output_format=pcm_16000"
96
  )
97
  try:
98
- self.is_speaking = True
99
- self.speaking_event.set()
100
  self.prebuffer.clear()
101
- for q in self.lang_queues.values():
102
- with q.mutex:
103
- q.queue.clear()
104
 
105
  async with websockets.connect(uri) as websocket:
106
  await websocket.send(json.dumps({
@@ -183,94 +139,60 @@ class VoiceTranslator:
183
  except Exception as e:
184
  print(f"Translation error: {e}")
185
 
186
- def _run_stt_stream(self, language: str):
187
- print(f"[stt:{language}] Thread starting...")
188
- while self.is_recording:
189
- try:
190
- self.restart_events[language].wait()
191
- self.restart_events[language].clear()
192
-
193
- config = speech.RecognitionConfig(
194
- encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16,
195
- sample_rate_hertz=self.audio_rate,
196
- language_code=language,
197
- enable_automatic_punctuation=True,
198
- model="latest_short",
199
- )
200
- streaming_config = speech.StreamingRecognitionConfig(
201
- config=config, interim_results=True, single_utterance=False
202
- )
203
-
204
- def request_generator():
205
- while self.is_recording:
206
- if self.speaking_event.is_set():
207
- time.sleep(0.01)
208
- continue
209
- if self.stream_cancel_events[language].is_set():
210
- self.stream_cancel_events[language].clear()
211
- break
212
- try:
213
- chunk = self.lang_queues[language].get(timeout=0.1)
214
- yield speech.StreamingRecognizeRequest(audio_content=chunk)
215
- except queue.Empty:
216
- continue
217
-
218
- responses = self.stt_client.streaming_recognize(streaming_config, request_generator())
219
 
220
- for response in responses:
221
- if not self.is_recording:
222
- break
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
223
  for result in response.results:
224
  if result.is_final and result.alternatives:
225
  alt = result.alternatives[0]
226
- transcript = alt.transcript.strip()
227
- conf = getattr(alt, "confidence", 0.0)
228
- if transcript and conf >= self.min_confidence_threshold:
229
- asyncio.run_coroutine_threadsafe(
230
- self._process_result(transcript, conf, language), self.async_loop
231
- )
232
  except Exception as e:
233
- print(f"[stt:{language}] Error: {e}")
234
- time.sleep(1)
235
- print(f"[stt:{language}] Thread exiting")
236
 
237
- def start_translation(self):
238
- if self.is_recording:
239
  return
240
- self.is_recording = True
241
-
242
- for ev in self.restart_events.values():
243
- ev.clear()
244
- self.speaking_event.clear()
245
-
246
- def _start_tasks():
247
- self._process_audio_task = asyncio.create_task(self._process_input_audio())
248
-
249
- self.async_loop.call_soon_threadsafe(_start_tasks)
250
-
251
- for lang in ("en-US", "fr-FR"):
252
- thread = threading.Thread(target=self._run_stt_stream, args=(lang,), daemon=True)
253
- self.stt_threads[lang] = thread
254
- thread.start()
255
- self.restart_events[lang].set()
256
 
257
  def stop_translation(self):
258
- if not self.is_recording:
259
  return
260
- self.is_recording = False
261
- for ev in self.restart_events.values():
262
- ev.set()
263
-
264
- def _cancel_tasks():
265
- if self._process_audio_task:
266
- self._process_audio_task.cancel()
267
- if self._tts_queue:
268
- self._tts_queue.put_nowait(None)
269
-
270
- self.async_loop.call_soon_threadsafe(_cancel_tasks)
271
-
272
- def cleanup(self):
273
- self.stop_translation()
274
- time.sleep(0.5)
275
- if self.async_loop.is_running():
276
- self.async_loop.call_soon_threadsafe(self.async_loop.stop)
 
1
  import asyncio
2
  import base64
3
  import json
 
 
 
4
  from collections import deque
5
+ from typing import Optional
6
 
7
  import deepl
8
  import websockets
 
19
  self.audio_rate = 16000
20
  self.audio_chunk = 1024
21
 
22
+ self.input_queue = asyncio.Queue() # Audio from browser
23
+ self.output_queue = asyncio.Queue() # Audio to browser
 
 
 
 
 
24
  self.prebuffer = deque(maxlen=12)
25
 
26
+ self.is_running = False
 
 
27
 
28
  self.last_processed_transcript = ""
29
  self.last_tts_text_en = ""
 
31
  self.min_confidence_threshold = 0.5
32
 
33
  self.async_loop = asyncio.new_event_loop()
 
 
34
 
35
  self._tts_queue: "asyncio.Queue[Optional[dict]]" = asyncio.Queue()
36
  self._tts_consumer_task: Optional[asyncio.Task] = None
37
  self._process_audio_task: Optional[asyncio.Task] = None
38
+ self.stt_tasks: list[asyncio.Task] = []
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
39
  self._tts_job_counter = 0
40
 
 
 
 
 
 
 
 
41
  async def _process_input_audio(self):
42
  print("🎤 Audio processing task started...")
43
+ while self.is_running:
44
  try:
45
  data = await self.input_queue.get()
46
+ self.prebuffer.append(data)
 
 
 
 
 
47
  except asyncio.CancelledError:
48
  break
49
  except Exception as e:
 
56
  f"/stream-input?model_id=eleven_flash_v2_5&output_format=pcm_16000"
57
  )
58
  try:
 
 
59
  self.prebuffer.clear()
 
 
 
60
 
61
  async with websockets.connect(uri) as websocket:
62
  await websocket.send(json.dumps({
 
139
  except Exception as e:
140
  print(f"Translation error: {e}")
141
 
142
+ async def _run_stt_stream(self, language: str):
143
+ print(f"[stt:{language}] Task starting...")
144
+ config = speech.RecognitionConfig(
145
+ encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16,
146
+ sample_rate_hertz=self.audio_rate,
147
+ language_code=language,
148
+ enable_automatic_punctuation=True,
149
+ model="latest_short",
150
+ )
151
+ streaming_config = speech.StreamingRecognitionConfig(
152
+ config=config, interim_results=True
153
+ )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
154
 
155
+ async def request_generator():
156
+ last_chunk_time = self.async_loop.time()
157
+ while self.is_running:
158
+ if self.prebuffer:
159
+ chunk = self.prebuffer.popleft()
160
+ last_chunk_time = self.async_loop.time()
161
+ yield speech.StreamingRecognizeRequest(audio_content=chunk)
162
+ else:
163
+ # If no audio for a while, send empty to keep stream alive
164
+ if self.async_loop.time() - last_chunk_time > 0.5:
165
+ yield speech.StreamingRecognizeRequest()
166
+ await asyncio.sleep(0.1)
167
+
168
+ while self.is_running:
169
+ responses = self.stt_client.streaming_recognize(streaming_config, request_generator())
170
+ try:
171
+ async for response in responses:
172
+ if not self.is_running: break
173
  for result in response.results:
174
  if result.is_final and result.alternatives:
175
  alt = result.alternatives[0]
176
+ await self._process_result(alt.transcript, alt.confidence, language)
 
 
 
 
 
177
  except Exception as e:
178
+ print(f"[stt:{language}] Error: {e}. Restarting stream...")
179
+ await asyncio.sleep(1)
180
+ print(f"[stt:{language}] Task exiting.")
181
 
182
+ async def start_translation(self):
183
+ if self.is_running:
184
  return
185
+ self.is_running = True
186
+ self._tts_consumer_task = asyncio.create_task(self._tts_consumer())
187
+ self.stt_tasks.append(asyncio.create_task(self._run_stt_stream("en-US")))
188
+ self.stt_tasks.append(asyncio.create_task(self._run_stt_stream("fr-FR")))
 
 
 
 
 
 
 
 
 
 
 
 
189
 
190
  def stop_translation(self):
191
+ if not self.is_running:
192
  return
193
+ self.is_running = False
194
+ for task in self.stt_tasks:
195
+ if task: task.cancel()
196
+ if self._tts_consumer_task:
197
+ self._tts_consumer_task.cancel()
198
+ self.stt_tasks = []