Spaces:
Sleeping
Sleeping
| import asyncio | |
| import base64 | |
| import json | |
| from collections import deque | |
| from typing import Optional | |
| import deepl | |
| import websockets | |
| from google.cloud import speech | |
| class VoiceTranslator: | |
| def __init__(self, deepl_api_key: str, elevenlabs_api_key: str, elevenlabs_voice_id: str): | |
| self.deepl_client = deepl.Translator(deepl_api_key) | |
| self.elevenlabs_api_key = elevenlabs_api_key | |
| self.voice_id = elevenlabs_voice_id | |
| self.stt_client = speech.SpeechClient() | |
| self.audio_rate = 16000 | |
| self.audio_chunk = 1024 | |
| self.input_queue = asyncio.Queue() # Audio from browser | |
| self.output_queue = asyncio.Queue() # Audio to browser | |
| self.prebuffer = deque(maxlen=12) | |
| self.is_running = False | |
| self.last_processed_transcript = "" | |
| self.last_tts_text_en = "" | |
| self.last_tts_text_fr = "" | |
| self.min_confidence_threshold = 0.5 | |
| self.async_loop = asyncio.new_event_loop() | |
| self._tts_queue: "asyncio.Queue[Optional[dict]]" = asyncio.Queue() | |
| self._tts_consumer_task: Optional[asyncio.Task] = None | |
| self._process_audio_task: Optional[asyncio.Task] = None | |
| self.stt_tasks: list[asyncio.Task] = [] | |
| self._tts_job_counter = 0 | |
| async def _process_input_audio(self): | |
| print("π€ Audio processing task started...") | |
| while self.is_running: | |
| try: | |
| data = await self.input_queue.get() | |
| self.prebuffer.append(data) | |
| except asyncio.CancelledError: | |
| break | |
| except Exception as e: | |
| print(f"[audio_processor] error: {e}") | |
| print("π€ Audio processing task stopped.") | |
| async def _stream_tts(self, text: str): | |
| uri = ( | |
| f"wss://api.elevenlabs.io/v1/text-to-speech/{self.voice_id}" | |
| f"/stream-input?model_id=eleven_flash_v2_5&output_format=pcm_16000" | |
| ) | |
| try: | |
| self.prebuffer.clear() | |
| async with websockets.connect(uri) as websocket: | |
| await websocket.send(json.dumps({ | |
| "text": " ", | |
| "voice_settings": {"stability": 0.5, "similarity_boost": 0.8}, | |
| "xi_api_key": self.elevenlabs_api_key, | |
| })) | |
| await websocket.send(json.dumps({"text": text, "try_trigger_generation": True})) | |
| await websocket.send(json.dumps({"text": ""})) | |
| while True: | |
| try: | |
| message = await websocket.recv() | |
| data = json.loads(message) | |
| if data.get("audio"): | |
| audio_chunk = base64.b64decode(data["audio"]) | |
| await self.output_queue.put(audio_chunk) | |
| elif data.get("isFinal"): | |
| break | |
| except websockets.exceptions.ConnectionClosed: | |
| break | |
| except Exception: | |
| continue | |
| except Exception as e: | |
| print(f"TTS streaming error: {e}") | |
| finally: | |
| for lang, ev in self.stream_cancel_events.items(): | |
| ev.set() | |
| for q in self.lang_queues.values(): | |
| with q.mutex: | |
| q.queue.clear() | |
| self.is_speaking = False | |
| self.speaking_event.clear() | |
| for lang, ev in self.restart_events.items(): | |
| ev.set() | |
| await asyncio.sleep(0.1) | |
| async def _tts_consumer(self): | |
| print("[tts_consumer] started") | |
| while True: | |
| try: | |
| item = await self._tts_queue.get() | |
| if item is None: | |
| break | |
| text = item.get("text", "") | |
| self._tts_job_counter += 1 | |
| job_id = self._tts_job_counter | |
| print(f"[tts_consumer] job #{job_id} dequeued (len={len(text)})") | |
| await self._stream_tts(text) | |
| except asyncio.CancelledError: | |
| break | |
| except Exception as e: | |
| print(f"[tts_consumer] error: {e}") | |
| print("[tts_consumer] exiting") | |
| async def _process_result(self, transcript: str, confidence: float, language: str): | |
| lang_flag = "π«π·" if language == "fr-FR" else "π¬π§" | |
| print(f"{lang_flag} Heard ({language}, conf {confidence:.2f}): {transcript}") | |
| if language == "fr-FR" and transcript.strip().lower() == self.last_tts_text_fr.strip().lower(): | |
| print(" (echo suppressed)") | |
| return | |
| if language == "en-US" and transcript.strip().lower() == self.last_tts_text_en.strip().lower(): | |
| print(" (echo suppressed)") | |
| return | |
| try: | |
| if language == "fr-FR": | |
| translated = self.deepl_client.translate_text(transcript, target_lang="EN-US").text | |
| print(f"π FR β EN: {translated}") | |
| await self._tts_queue.put({"text": translated, "source_lang": language}) | |
| self.last_tts_text_en = translated | |
| else: | |
| translated = self.deepl_client.translate_text(transcript, target_lang="FR").text | |
| print(f"π EN β FR: {translated}") | |
| await self._tts_queue.put({"text": translated, "source_lang": language}) | |
| self.last_tts_text_fr = translated | |
| print("π Queued for speaking...") | |
| except Exception as e: | |
| print(f"Translation error: {e}") | |
| async def _run_stt_stream(self, language: str): | |
| print(f"[stt:{language}] Task starting...") | |
| config = speech.RecognitionConfig( | |
| encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16, | |
| sample_rate_hertz=self.audio_rate, | |
| language_code=language, | |
| enable_automatic_punctuation=True, | |
| model="latest_short", | |
| ) | |
| streaming_config = speech.StreamingRecognitionConfig( | |
| config=config, interim_results=True | |
| ) | |
| async def request_generator(): | |
| last_chunk_time = self.async_loop.time() | |
| while self.is_running: | |
| if self.prebuffer: | |
| chunk = self.prebuffer.popleft() | |
| last_chunk_time = self.async_loop.time() | |
| yield speech.StreamingRecognizeRequest(audio_content=chunk) | |
| else: | |
| # If no audio for a while, send empty to keep stream alive | |
| if self.async_loop.time() - last_chunk_time > 0.5: | |
| yield speech.StreamingRecognizeRequest() | |
| await asyncio.sleep(0.1) | |
| while self.is_running: | |
| responses = self.stt_client.streaming_recognize(streaming_config, request_generator()) | |
| try: | |
| async for response in responses: | |
| if not self.is_running: break | |
| for result in response.results: | |
| if result.is_final and result.alternatives: | |
| alt = result.alternatives[0] | |
| await self._process_result(alt.transcript, alt.confidence, language) | |
| except Exception as e: | |
| print(f"[stt:{language}] Error: {e}. Restarting stream...") | |
| await asyncio.sleep(1) | |
| print(f"[stt:{language}] Task exiting.") | |
| async def start_translation(self): | |
| if self.is_running: | |
| return | |
| self.is_running = True | |
| self._tts_consumer_task = asyncio.create_task(self._tts_consumer()) | |
| self.stt_tasks.append(asyncio.create_task(self._run_stt_stream("en-US"))) | |
| self.stt_tasks.append(asyncio.create_task(self._run_stt_stream("fr-FR"))) | |
| def stop_translation(self): | |
| if not self.is_running: | |
| return | |
| self.is_running = False | |
| for task in self.stt_tasks: | |
| if task: task.cancel() | |
| if self._tts_consumer_task: | |
| self._tts_consumer_task.cancel() | |
| self.stt_tasks = [] | |