Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Real-Time French/English Voice Translator — patched single-file v2 | |
| Changes from previous: | |
| - Adds per-language stream_cancel_events that force the STT request_generator | |
| to exit, allowing streaming_recognize to terminate and be restarted cleanly. | |
| - _stream_tts sets the cancel events immediately after playback finishes (before | |
| prebuffer re-injection and restart events). | |
| - Request generator checks cancel event frequently and breaks to end the stream. | |
| Keep your env vars: | |
| - GOOGLE_APPLICATION_CREDENTIALS, DEEPL_API_KEY, ELEVENLABS_API_KEY, ELEVENLABS_VOICE_ID | |
| """ | |
| import asyncio | |
| import json | |
| import queue | |
| import threading | |
| import time | |
| import os | |
| import base64 | |
| from collections import deque | |
| from typing import Dict, Optional | |
| import pyaudio | |
| import websockets | |
| from google.cloud import speech | |
| import deepl | |
| from dotenv import load_dotenv | |
| # ----------------------------------------------------------------------------- | |
| # VoiceTranslator | |
| # ----------------------------------------------------------------------------- | |
| class VoiceTranslator: | |
| def __init__(self, deepl_api_key: str, elevenlabs_api_key: str, elevenlabs_voice_id: str): | |
| # External clients | |
| self.deepl_client = deepl.Translator(deepl_api_key) | |
| self.elevenlabs_api_key = elevenlabs_api_key | |
| self.voice_id = elevenlabs_voice_id | |
| self.stt_client = speech.SpeechClient() | |
| # Audio params | |
| self.audio_rate = 16000 | |
| self.audio_chunk = 1024 | |
| # Per-language audio queues (raw mic frames) | |
| self.lang_queues: Dict[str, queue.Queue] = { | |
| "en-US": queue.Queue(), | |
| "fr-FR": queue.Queue(), | |
| } | |
| # Small rolling prebuffer to avoid missing the first bits after a restart | |
| self.prebuffer = deque(maxlen=12) | |
| # State flags | |
| self.is_recording = False | |
| self.is_speaking = False | |
| self.speaking_event = threading.Event() | |
| # Deduplication | |
| self.last_processed_transcript = "" | |
| self.last_tts_text_en = "" | |
| self.last_tts_text_fr = "" | |
| # Threshold | |
| self.min_confidence_threshold = 0.5 | |
| # PyAudio | |
| self.pyaudio_instance = pyaudio.PyAudio() | |
| self.audio_stream = None | |
| # Threads + async | |
| self.recording_thread: Optional[threading.Thread] = None | |
| self.async_loop = asyncio.new_event_loop() | |
| # TTS queue + consumer task | |
| self._tts_queue: "asyncio.Queue[Optional[dict]]" = asyncio.Queue() | |
| self._tts_consumer_task: Optional[asyncio.Task] = None | |
| # Start async loop in separate thread | |
| self.async_thread = threading.Thread(target=self._run_async_loop, daemon=True) | |
| self.async_thread.start() | |
| # schedule tts consumer creation inside the async loop | |
| def _start_consumer(): | |
| self._tts_consumer_task = asyncio.create_task(self._tts_consumer()) | |
| self.async_loop.call_soon_threadsafe(_start_consumer) | |
| self.stt_threads: Dict[str, threading.Thread] = {} | |
| # Per-language restart events (used to tell threads when to start new streams) | |
| self.restart_events: Dict[str, threading.Event] = { | |
| "en-US": threading.Event(), | |
| "fr-FR": threading.Event(), | |
| } | |
| # Per-language stream started flag | |
| self._stream_started = {"en-US": False, "fr-FR": False} | |
| # **NEW**: per-language cancel events to force request_generator to stop | |
| self.stream_cancel_events: Dict[str, threading.Event] = { | |
| "en-US": threading.Event(), | |
| "fr-FR": threading.Event(), | |
| } | |
| # Diagnostics | |
| self._tts_job_counter = 0 | |
| def _run_async_loop(self): | |
| asyncio.set_event_loop(self.async_loop) | |
| try: | |
| self.async_loop.run_forever() | |
| except Exception as e: | |
| print("[async_loop] stopped with error:", e) | |
| # --------------------------- | |
| # Audio capture | |
| # --------------------------- | |
| def _record_audio(self): | |
| try: | |
| stream = self.pyaudio_instance.open( | |
| format=pyaudio.paInt16, | |
| channels=1, | |
| rate=self.audio_rate, | |
| input=True, | |
| frames_per_buffer=self.audio_chunk, | |
| ) | |
| print("🎤 Recording started...") | |
| while self.is_recording: | |
| if self.speaking_event.is_set(): | |
| time.sleep(0.01) | |
| continue | |
| try: | |
| data = stream.read(self.audio_chunk, exception_on_overflow=False) | |
| except Exception as e: | |
| print(f"[recorder] read error: {e}") | |
| continue | |
| if not data: | |
| continue | |
| self.prebuffer.append(data) | |
| self.lang_queues["en-US"].put(data) | |
| self.lang_queues["fr-FR"].put(data) | |
| try: | |
| stream.stop_stream() | |
| stream.close() | |
| except Exception: | |
| pass | |
| print("🎤 Recording stopped.") | |
| except Exception as e: | |
| print(f"[recorder] fatal: {e}") | |
| # --------------------------- | |
| # TTS streaming (ElevenLabs) - async | |
| # --------------------------- | |
| async def _stream_tts(self, text: str): | |
| uri = ( | |
| f"wss://api.elevenlabs.io/v1/text-to-speech/{self.voice_id}" | |
| f"/stream-input?model_id=eleven_flash_v2_5&output_format=pcm_16000" | |
| ) | |
| tts_audio_stream = None | |
| websocket = None | |
| try: | |
| # Mark speaking and set event so recorder & STT pause | |
| self.is_speaking = True | |
| self.speaking_event.set() | |
| # print(f"[{time.strftime('%H:%M:%S')}] [tts] speaking -> True") | |
| # Clear prebuffer to avoid re-injecting TTS audio later | |
| self.prebuffer.clear() | |
| # Clear queued frames to avoid replay; we'll re-inject prebuffer after we cancel streams | |
| for q in self.lang_queues.values(): | |
| with q.mutex: | |
| q.queue.clear() | |
| # Brief pause to ensure recorder sees speaking_event before we start TTS | |
| await asyncio.sleep(0.1) | |
| websocket = await websockets.connect(uri) | |
| await websocket.send(json.dumps({ | |
| "text": " ", | |
| "voice_settings": {"stability": 0.5, "similarity_boost": 0.8}, | |
| "xi_api_key": self.elevenlabs_api_key, | |
| })) | |
| await websocket.send(json.dumps({"text": text, "try_trigger_generation": True})) | |
| await websocket.send(json.dumps({"text": ""})) | |
| tts_audio_stream = self.pyaudio_instance.open( | |
| format=pyaudio.paInt16, | |
| channels=1, | |
| rate=16000, | |
| output=True, | |
| frames_per_buffer=1024, | |
| ) | |
| prebuffer = bytearray() | |
| playback_started = False | |
| try: | |
| while True: | |
| try: | |
| message = await asyncio.wait_for(websocket.recv(), timeout=8.0) | |
| except asyncio.TimeoutError: | |
| if playback_started: | |
| break | |
| else: | |
| continue | |
| if isinstance(message, bytes): | |
| prebuffer.extend(message) | |
| if not playback_started and len(prebuffer) >= 8000: | |
| tts_audio_stream.write(bytes(prebuffer)) | |
| prebuffer.clear() | |
| playback_started = True | |
| elif playback_started: | |
| tts_audio_stream.write(message) | |
| continue | |
| try: | |
| data = json.loads(message) | |
| except Exception: | |
| continue | |
| if data.get("audio"): | |
| audio_bytes = base64.b64decode(data["audio"]) | |
| if not playback_started: | |
| prebuffer.extend(audio_bytes) | |
| if len(prebuffer) >= 16000: | |
| print(f"[tts] Starting playback, prebuffer size: {len(prebuffer)}") | |
| tts_audio_stream.write(bytes(prebuffer)) | |
| prebuffer.clear() | |
| playback_started = True | |
| else: | |
| tts_audio_stream.write(audio_bytes) | |
| elif data.get("isFinal"): | |
| print(f"[tts] Received isFinal, prebuffer remaining: {len(prebuffer)}") | |
| break | |
| elif data.get("error"): | |
| print("TTS error:", data["error"]) | |
| break | |
| # Prebuffer should be empty after playback starts, but just in case | |
| if prebuffer and not playback_started: | |
| print(f"[tts] Writing final prebuffer: {len(prebuffer)} bytes (playback never started)") | |
| tts_audio_stream.write(bytes(prebuffer)) | |
| elif prebuffer: | |
| print(f"[tts] WARNING: prebuffer has {len(prebuffer)} bytes after playback - this is a bug!") | |
| finally: | |
| try: | |
| await websocket.close() | |
| except Exception: | |
| pass | |
| except Exception as e: | |
| # print(f"[tts] error: {e}") | |
| pass | |
| finally: | |
| if tts_audio_stream: | |
| try: | |
| tts_audio_stream.stop_stream() | |
| tts_audio_stream.close() | |
| except Exception: | |
| pass | |
| # **NEW**: force the STT request generators to exit by setting cancel events. | |
| # This makes streaming_recognize finish; threads will then wait for restart_events | |
| # and start fresh streams. | |
| for lang, ev in self.stream_cancel_events.items(): | |
| ev.set() | |
| # print(f"[{time.strftime('%H:%M:%S')}] [cancel] set -> {lang}") | |
| # Don't re-inject prebuffer - it may contain TTS echo | |
| # Just clear the queues and let fresh audio come in | |
| for q in self.lang_queues.values(): | |
| with q.mutex: | |
| q.queue.clear() | |
| # Wait for TTS audio to clear from environment (acoustic decay) | |
| await asyncio.sleep(0.1) | |
| # Clear speaking state and signal STT threads to restart (robustly) | |
| self.is_speaking = False | |
| self.speaking_event.clear() | |
| # print(f"[{time.strftime('%H:%M:%S')}] [tts] speaking -> False") | |
| # Primary restart: set both events | |
| for lang, ev in self.restart_events.items(): | |
| ev.set() | |
| # print(f"[{time.strftime('%H:%M:%S')}] [restart] set -> {lang}") | |
| await asyncio.sleep(0.25) | |
| for lang, ev in self.restart_events.items(): | |
| ev.set() | |
| await asyncio.sleep(0.25) | |
| # --------------------------- | |
| # TTS consumer (serializes TTS) | |
| # --------------------------- | |
| async def _tts_consumer(self): | |
| print("[tts_consumer] started") | |
| while True: | |
| item = await self._tts_queue.get() | |
| if item is None: | |
| print("[tts_consumer] shutdown sentinel received") | |
| break | |
| text = item.get("text", "") | |
| self._tts_job_counter += 1 | |
| job_id = self._tts_job_counter | |
| print(f"[tts_consumer] job #{job_id} dequeued: '{text}'") | |
| try: | |
| await asyncio.wait_for(self._stream_tts(text), timeout=35.0) | |
| except asyncio.TimeoutError: | |
| print(f"[tts_consumer] job #{job_id} _stream_tts timed out; proceeding.") | |
| except Exception as e: | |
| print(f"[tts_consumer] job #{job_id} error during _stream_tts: {e}") | |
| finally: | |
| await asyncio.sleep(0.05) | |
| print("[tts_consumer] exiting") | |
| # --------------------------- | |
| # Translation & TTS trigger | |
| # --------------------------- | |
| async def _process_result(self, transcript: str, confidence: float, language: str): | |
| lang_flag = "🇫🇷" if language == "fr-FR" else "🇬🇧" | |
| print(f"{lang_flag} Heard ({language}, conf {confidence:.2f}): {transcript}") | |
| # echo suppression vs last TTS in same language | |
| if language == "fr-FR": | |
| if transcript.strip().lower() == self.last_tts_text_fr.strip().lower(): | |
| print(" (echo suppressed)") | |
| return | |
| else: | |
| if transcript.strip().lower() == self.last_tts_text_en.strip().lower(): | |
| print(" (echo suppressed)") | |
| return | |
| try: | |
| if language == "fr-FR": | |
| translated = self.deepl_client.translate_text(transcript, target_lang="EN-US").text | |
| print(f"🌐 FR → EN: {translated}") | |
| await self._tts_queue.put({"text": translated, "source_lang": language}) | |
| self.last_tts_text_en = translated | |
| else: | |
| translated = self.deepl_client.translate_text(transcript, target_lang="FR").text | |
| print(f"🌐 EN → FR: {translated}") | |
| await self._tts_queue.put({"text": translated, "source_lang": language}) | |
| self.last_tts_text_fr = translated | |
| print("🔊 Queued for speaking...") | |
| except Exception as e: | |
| print(f"Translation error: {e}") | |
| # --------------------------- | |
| # STT streaming (run per language) | |
| # --------------------------- | |
| def _run_stt_stream(self, language: str): | |
| print(f"[stt:{language}] Thread starting, thread_id={threading.get_ident()}") | |
| self._stream_started[language] = False | |
| last_transcript_in_stream = "" | |
| while self.is_recording: | |
| try: | |
| if self._stream_started[language]: | |
| print(f"[{time.strftime('%H:%M:%S')}] [stt:{language}] Waiting for restart signal...") | |
| signaled = self.restart_events[language].wait(timeout=30) | |
| if not signaled and self.is_recording: | |
| print(f"[{time.strftime('%H:%M:%S')}] [stt:{language}] Timeout waiting for restart, restarting anyway") | |
| if not self.is_recording: | |
| break | |
| try: | |
| self.restart_events[language].clear() | |
| except Exception: | |
| pass | |
| time.sleep(0.01) | |
| self._stream_started[language] = True | |
| print(f"[{time.strftime('%H:%M:%S')}] [stt:{language}] Starting new stream...") | |
| config = speech.RecognitionConfig( | |
| encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16, | |
| sample_rate_hertz=self.audio_rate, | |
| language_code=language, | |
| enable_automatic_punctuation=True, | |
| model="latest_short", | |
| ) | |
| streaming_config = speech.StreamingRecognitionConfig( | |
| config=config, | |
| interim_results=True, | |
| single_utterance=False, | |
| ) | |
| # Request generator yields StreamingRecognizeRequest messages | |
| def request_generator(): | |
| while self.is_recording: | |
| # If TTS is playing, skip sending mic frames to STT | |
| if self.speaking_event.is_set(): | |
| time.sleep(0.01) | |
| continue | |
| # If cancel event set, clear and break to end stream | |
| if self.stream_cancel_events[language].is_set(): | |
| # print(f"[{time.strftime('%H:%M:%S')}] [stt:{language}] request_generator observed cancel -> exiting generator") | |
| try: | |
| self.stream_cancel_events[language].clear() | |
| except Exception: | |
| pass | |
| break | |
| try: | |
| chunk = self.lang_queues[language].get(timeout=1.0) | |
| except queue.Empty: | |
| continue | |
| yield speech.StreamingRecognizeRequest(audio_content=chunk) | |
| responses = self.stt_client.streaming_recognize(streaming_config, request_generator()) | |
| response_count = 0 | |
| final_received = False | |
| for response in responses: | |
| if not self.is_recording: | |
| print(f"[stt:{language}] Stopped by user") | |
| break | |
| if not response.results: | |
| continue | |
| response_count += 1 | |
| for result in response.results: | |
| if not result.alternatives: | |
| continue | |
| alt = result.alternatives[0] | |
| transcript = alt.transcript.strip() | |
| conf = getattr(alt, "confidence", 0.0) | |
| is_final = bool(result.is_final) | |
| if is_final: | |
| now = time.strftime("%H:%M:%S") | |
| print(f"[{now}] [stt:{language}] → '{transcript}' (final={is_final}, conf={conf:.2f})") | |
| # Filter empty transcripts - don't break stream | |
| if not transcript or len(transcript.strip()) == 0: | |
| print(f"[{now}] [stt:{language}] Empty transcript -> ignoring, continuing stream") | |
| continue | |
| # Deduplicate within same stream | |
| if transcript.strip().lower() == last_transcript_in_stream.strip().lower(): | |
| print(f"[{now}] [stt:{language}] Duplicate final in same stream -> suppressed") | |
| continue | |
| if conf < self.min_confidence_threshold: | |
| print(f"[{now}] [stt:{language}] Final received but confidence {conf:.2f} < threshold -> suppressed") | |
| continue | |
| if language == "fr-FR" and transcript.strip().lower() == self.last_tts_text_fr.strip().lower(): | |
| print(f"[{now}] [stt:{language}] (echo suppressed - matches last_tts_text_fr)") | |
| continue | |
| if language == "en-US" and transcript.strip().lower() == self.last_tts_text_en.strip().lower(): | |
| print(f"[{now}] [stt:{language}] (echo suppressed - matches last_tts_text_en)") | |
| continue | |
| asyncio.run_coroutine_threadsafe( | |
| self._process_result(transcript, conf, language), | |
| self.async_loop | |
| ) | |
| final_received = True | |
| break | |
| if final_received: | |
| break | |
| print(f"[stt:{language}] Stream ended after {response_count} responses") | |
| if self.is_recording and final_received: | |
| print(f"[{time.strftime('%H:%M:%S')}] [stt:{language}] Final result processed. Waiting for TTS to complete and signal restart.") | |
| elif self.is_recording and not final_received: | |
| print(f"[stt:{language}] Stream ended unexpectedly, reconnecting...") | |
| time.sleep(0.5) | |
| else: | |
| break | |
| except Exception as e: | |
| if self.is_recording: | |
| import traceback | |
| print(f"[stt:{language}] Error: {e}") | |
| print(traceback.format_exc()) | |
| time.sleep(1.0) | |
| else: | |
| break | |
| print(f"[stt:{language}] Thread exiting") | |
| # --------------------------- | |
| # Control | |
| # --------------------------- | |
| def start_translation(self): | |
| if self.is_recording: | |
| print("Already recording!") | |
| return | |
| self.is_recording = True | |
| self.last_processed_transcript = "" | |
| for ev in self.restart_events.values(): | |
| try: | |
| ev.clear() | |
| except Exception: | |
| pass | |
| self.speaking_event.clear() | |
| for q in self.lang_queues.values(): | |
| with q.mutex: | |
| q.queue.clear() | |
| self.recording_thread = threading.Thread(target=self._record_audio, daemon=True) | |
| self.recording_thread.start() | |
| for lang in ("en-US", "fr-FR"): | |
| t = threading.Thread(target=self._run_stt_stream, args=(lang,), daemon=True) | |
| self.stt_threads[lang] = t | |
| t.start() | |
| print(f"[main] STT thread {lang} started: {t.is_alive()} at {time.strftime('%H:%M:%S')}") | |
| for ev in self.restart_events.values(): | |
| ev.set() | |
| def stop_translation(self): | |
| print("\n⏹️ Stopping translation...") | |
| self.is_recording = False | |
| for ev in self.restart_events.values(): | |
| ev.set() | |
| self.speaking_event.clear() | |
| if self._tts_consumer_task and not (self._tts_consumer_task.done() if hasattr(self._tts_consumer_task, 'done') else False): | |
| try: | |
| def _put_sentinel(): | |
| try: | |
| self._tts_queue.put_nowait(None) | |
| except Exception: | |
| asyncio.create_task(self._tts_queue.put(None)) | |
| self.async_loop.call_soon_threadsafe(_put_sentinel) | |
| except Exception: | |
| pass | |
| time.sleep(0.2) | |
| def cleanup(self): | |
| self.stop_translation() | |
| try: | |
| if self.async_loop.is_running(): | |
| def _stop_loop(): | |
| if self._tts_consumer_task and not self._tts_consumer_task.done(): | |
| try: | |
| self._tts_queue.put_nowait(None) | |
| except Exception: | |
| pass | |
| self.async_loop.stop() | |
| self.async_loop.call_soon_threadsafe(_stop_loop) | |
| except Exception: | |
| pass | |
| try: | |
| self.pyaudio_instance.terminate() | |
| except Exception: | |
| pass | |
| # ----------------------------------------------------------------------------- | |
| # Main entry | |
| # ----------------------------------------------------------------------------- | |
| def main(): | |
| load_dotenv() | |
| google_creds = os.getenv("GOOGLE_APPLICATION_CREDENTIALS") | |
| deepl_key = os.getenv("DEEPL_API_KEY") | |
| eleven_key = os.getenv("ELEVENLABS_API_KEY") | |
| voice_id = os.getenv("ELEVENLABS_VOICE_ID") | |
| if not all([google_creds, deepl_key, eleven_key, voice_id]): | |
| print("Missing API keys or credentials.") | |
| return | |
| translator = VoiceTranslator(deepl_key, eleven_key, voice_id) | |
| print("Ready! Press ENTER to start, ENTER again to stop, Ctrl+C to quit.\n") | |
| try: | |
| while True: | |
| input("Press ENTER to start speaking...") | |
| translator.start_translation() | |
| input("Press ENTER to stop...\n") | |
| translator.stop_translation() | |
| except KeyboardInterrupt: | |
| print("\nKeyboardInterrupt received — cleaning up.") | |
| translator.cleanup() | |
| if __name__ == "__main__": | |
| main() | |