RealtimeTranslator / translator.py
Mike W
High risk async refactor
046ebfc
import asyncio
import base64
import json
from collections import deque
from typing import Optional
import deepl
import websockets
from google.cloud import speech
class VoiceTranslator:
def __init__(self, deepl_api_key: str, elevenlabs_api_key: str, elevenlabs_voice_id: str):
self.deepl_client = deepl.Translator(deepl_api_key)
self.elevenlabs_api_key = elevenlabs_api_key
self.voice_id = elevenlabs_voice_id
self.stt_client = speech.SpeechClient()
self.audio_rate = 16000
self.audio_chunk = 1024
self.input_queue = asyncio.Queue() # Audio from browser
self.output_queue = asyncio.Queue() # Audio to browser
self.prebuffer = deque(maxlen=12)
self.is_running = False
self.last_processed_transcript = ""
self.last_tts_text_en = ""
self.last_tts_text_fr = ""
self.min_confidence_threshold = 0.5
self.async_loop = asyncio.new_event_loop()
self._tts_queue: "asyncio.Queue[Optional[dict]]" = asyncio.Queue()
self._tts_consumer_task: Optional[asyncio.Task] = None
self._process_audio_task: Optional[asyncio.Task] = None
self.stt_tasks: list[asyncio.Task] = []
self._tts_job_counter = 0
async def _process_input_audio(self):
print("🎀 Audio processing task started...")
while self.is_running:
try:
data = await self.input_queue.get()
self.prebuffer.append(data)
except asyncio.CancelledError:
break
except Exception as e:
print(f"[audio_processor] error: {e}")
print("🎀 Audio processing task stopped.")
async def _stream_tts(self, text: str):
uri = (
f"wss://api.elevenlabs.io/v1/text-to-speech/{self.voice_id}"
f"/stream-input?model_id=eleven_flash_v2_5&output_format=pcm_16000"
)
try:
self.prebuffer.clear()
async with websockets.connect(uri) as websocket:
await websocket.send(json.dumps({
"text": " ",
"voice_settings": {"stability": 0.5, "similarity_boost": 0.8},
"xi_api_key": self.elevenlabs_api_key,
}))
await websocket.send(json.dumps({"text": text, "try_trigger_generation": True}))
await websocket.send(json.dumps({"text": ""}))
while True:
try:
message = await websocket.recv()
data = json.loads(message)
if data.get("audio"):
audio_chunk = base64.b64decode(data["audio"])
await self.output_queue.put(audio_chunk)
elif data.get("isFinal"):
break
except websockets.exceptions.ConnectionClosed:
break
except Exception:
continue
except Exception as e:
print(f"TTS streaming error: {e}")
finally:
for lang, ev in self.stream_cancel_events.items():
ev.set()
for q in self.lang_queues.values():
with q.mutex:
q.queue.clear()
self.is_speaking = False
self.speaking_event.clear()
for lang, ev in self.restart_events.items():
ev.set()
await asyncio.sleep(0.1)
async def _tts_consumer(self):
print("[tts_consumer] started")
while True:
try:
item = await self._tts_queue.get()
if item is None:
break
text = item.get("text", "")
self._tts_job_counter += 1
job_id = self._tts_job_counter
print(f"[tts_consumer] job #{job_id} dequeued (len={len(text)})")
await self._stream_tts(text)
except asyncio.CancelledError:
break
except Exception as e:
print(f"[tts_consumer] error: {e}")
print("[tts_consumer] exiting")
async def _process_result(self, transcript: str, confidence: float, language: str):
lang_flag = "πŸ‡«πŸ‡·" if language == "fr-FR" else "πŸ‡¬πŸ‡§"
print(f"{lang_flag} Heard ({language}, conf {confidence:.2f}): {transcript}")
if language == "fr-FR" and transcript.strip().lower() == self.last_tts_text_fr.strip().lower():
print(" (echo suppressed)")
return
if language == "en-US" and transcript.strip().lower() == self.last_tts_text_en.strip().lower():
print(" (echo suppressed)")
return
try:
if language == "fr-FR":
translated = self.deepl_client.translate_text(transcript, target_lang="EN-US").text
print(f"🌐 FR β†’ EN: {translated}")
await self._tts_queue.put({"text": translated, "source_lang": language})
self.last_tts_text_en = translated
else:
translated = self.deepl_client.translate_text(transcript, target_lang="FR").text
print(f"🌐 EN β†’ FR: {translated}")
await self._tts_queue.put({"text": translated, "source_lang": language})
self.last_tts_text_fr = translated
print("πŸ”Š Queued for speaking...")
except Exception as e:
print(f"Translation error: {e}")
async def _run_stt_stream(self, language: str):
print(f"[stt:{language}] Task starting...")
config = speech.RecognitionConfig(
encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16,
sample_rate_hertz=self.audio_rate,
language_code=language,
enable_automatic_punctuation=True,
model="latest_short",
)
streaming_config = speech.StreamingRecognitionConfig(
config=config, interim_results=True
)
async def request_generator():
last_chunk_time = self.async_loop.time()
while self.is_running:
if self.prebuffer:
chunk = self.prebuffer.popleft()
last_chunk_time = self.async_loop.time()
yield speech.StreamingRecognizeRequest(audio_content=chunk)
else:
# If no audio for a while, send empty to keep stream alive
if self.async_loop.time() - last_chunk_time > 0.5:
yield speech.StreamingRecognizeRequest()
await asyncio.sleep(0.1)
while self.is_running:
responses = self.stt_client.streaming_recognize(streaming_config, request_generator())
try:
async for response in responses:
if not self.is_running: break
for result in response.results:
if result.is_final and result.alternatives:
alt = result.alternatives[0]
await self._process_result(alt.transcript, alt.confidence, language)
except Exception as e:
print(f"[stt:{language}] Error: {e}. Restarting stream...")
await asyncio.sleep(1)
print(f"[stt:{language}] Task exiting.")
async def start_translation(self):
if self.is_running:
return
self.is_running = True
self._tts_consumer_task = asyncio.create_task(self._tts_consumer())
self.stt_tasks.append(asyncio.create_task(self._run_stt_stream("en-US")))
self.stt_tasks.append(asyncio.create_task(self._run_stt_stream("fr-FR")))
def stop_translation(self):
if not self.is_running:
return
self.is_running = False
for task in self.stt_tasks:
if task: task.cancel()
if self._tts_consumer_task:
self._tts_consumer_task.cancel()
self.stt_tasks = []