import asyncio import base64 import json from collections import deque from typing import Optional import deepl import websockets from google.cloud import speech class VoiceTranslator: def __init__(self, deepl_api_key: str, elevenlabs_api_key: str, elevenlabs_voice_id: str): self.deepl_client = deepl.Translator(deepl_api_key) self.elevenlabs_api_key = elevenlabs_api_key self.voice_id = elevenlabs_voice_id self.stt_client = speech.SpeechClient() self.audio_rate = 16000 self.audio_chunk = 1024 self.input_queue = asyncio.Queue() # Audio from browser self.output_queue = asyncio.Queue() # Audio to browser self.prebuffer = deque(maxlen=12) self.is_running = False self.last_processed_transcript = "" self.last_tts_text_en = "" self.last_tts_text_fr = "" self.min_confidence_threshold = 0.5 self.async_loop = asyncio.new_event_loop() self._tts_queue: "asyncio.Queue[Optional[dict]]" = asyncio.Queue() self._tts_consumer_task: Optional[asyncio.Task] = None self._process_audio_task: Optional[asyncio.Task] = None self.stt_tasks: list[asyncio.Task] = [] self._tts_job_counter = 0 async def _process_input_audio(self): print("🎤 Audio processing task started...") while self.is_running: try: data = await self.input_queue.get() self.prebuffer.append(data) except asyncio.CancelledError: break except Exception as e: print(f"[audio_processor] error: {e}") print("🎤 Audio processing task stopped.") async def _stream_tts(self, text: str): uri = ( f"wss://api.elevenlabs.io/v1/text-to-speech/{self.voice_id}" f"/stream-input?model_id=eleven_flash_v2_5&output_format=pcm_16000" ) try: self.prebuffer.clear() async with websockets.connect(uri) as websocket: await websocket.send(json.dumps({ "text": " ", "voice_settings": {"stability": 0.5, "similarity_boost": 0.8}, "xi_api_key": self.elevenlabs_api_key, })) await websocket.send(json.dumps({"text": text, "try_trigger_generation": True})) await websocket.send(json.dumps({"text": ""})) while True: try: message = await websocket.recv() data = json.loads(message) if data.get("audio"): audio_chunk = base64.b64decode(data["audio"]) await self.output_queue.put(audio_chunk) elif data.get("isFinal"): break except websockets.exceptions.ConnectionClosed: break except Exception: continue except Exception as e: print(f"TTS streaming error: {e}") finally: for lang, ev in self.stream_cancel_events.items(): ev.set() for q in self.lang_queues.values(): with q.mutex: q.queue.clear() self.is_speaking = False self.speaking_event.clear() for lang, ev in self.restart_events.items(): ev.set() await asyncio.sleep(0.1) async def _tts_consumer(self): print("[tts_consumer] started") while True: try: item = await self._tts_queue.get() if item is None: break text = item.get("text", "") self._tts_job_counter += 1 job_id = self._tts_job_counter print(f"[tts_consumer] job #{job_id} dequeued (len={len(text)})") await self._stream_tts(text) except asyncio.CancelledError: break except Exception as e: print(f"[tts_consumer] error: {e}") print("[tts_consumer] exiting") async def _process_result(self, transcript: str, confidence: float, language: str): lang_flag = "🇫🇷" if language == "fr-FR" else "🇬🇧" print(f"{lang_flag} Heard ({language}, conf {confidence:.2f}): {transcript}") if language == "fr-FR" and transcript.strip().lower() == self.last_tts_text_fr.strip().lower(): print(" (echo suppressed)") return if language == "en-US" and transcript.strip().lower() == self.last_tts_text_en.strip().lower(): print(" (echo suppressed)") return try: if language == "fr-FR": translated = self.deepl_client.translate_text(transcript, target_lang="EN-US").text print(f"🌐 FR → EN: {translated}") await self._tts_queue.put({"text": translated, "source_lang": language}) self.last_tts_text_en = translated else: translated = self.deepl_client.translate_text(transcript, target_lang="FR").text print(f"🌐 EN → FR: {translated}") await self._tts_queue.put({"text": translated, "source_lang": language}) self.last_tts_text_fr = translated print("🔊 Queued for speaking...") except Exception as e: print(f"Translation error: {e}") async def _run_stt_stream(self, language: str): print(f"[stt:{language}] Task starting...") config = speech.RecognitionConfig( encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16, sample_rate_hertz=self.audio_rate, language_code=language, enable_automatic_punctuation=True, model="latest_short", ) streaming_config = speech.StreamingRecognitionConfig( config=config, interim_results=True ) async def request_generator(): last_chunk_time = self.async_loop.time() while self.is_running: if self.prebuffer: chunk = self.prebuffer.popleft() last_chunk_time = self.async_loop.time() yield speech.StreamingRecognizeRequest(audio_content=chunk) else: # If no audio for a while, send empty to keep stream alive if self.async_loop.time() - last_chunk_time > 0.5: yield speech.StreamingRecognizeRequest() await asyncio.sleep(0.1) while self.is_running: responses = self.stt_client.streaming_recognize(streaming_config, request_generator()) try: async for response in responses: if not self.is_running: break for result in response.results: if result.is_final and result.alternatives: alt = result.alternatives[0] await self._process_result(alt.transcript, alt.confidence, language) except Exception as e: print(f"[stt:{language}] Error: {e}. Restarting stream...") await asyncio.sleep(1) print(f"[stt:{language}] Task exiting.") async def start_translation(self): if self.is_running: return self.is_running = True self._tts_consumer_task = asyncio.create_task(self._tts_consumer()) self.stt_tasks.append(asyncio.create_task(self._run_stt_stream("en-US"))) self.stt_tasks.append(asyncio.create_task(self._run_stt_stream("fr-FR"))) def stop_translation(self): if not self.is_running: return self.is_running = False for task in self.stt_tasks: if task: task.cancel() if self._tts_consumer_task: self._tts_consumer_task.cancel() self.stt_tasks = []