""" Enhanced Speech-to-Speech Translation Pipeline with Advanced Gradio Interface This script implements a complete pipeline for speech-to-speech translation with dynamic model selection and advanced configuration options. Features: - Dynamic Whisper model switching (tiny, base, small, medium) - NLLB model selection (600M, 1.3B) - Advanced translation parameters (beam size, temperature, etc.) - Real-time processing with detailed model information - Comprehensive model descriptions and performance metrics Requirements: - faster-whisper - ctranslate2 - transformers (version 4.33.0+) - torch - numpy - scipy - requests (for fallback tokenizer) - gradio """ import os import time import torch import numpy as np import ctranslate2 import scipy.io.wavfile from faster_whisper import WhisperModel import gradio as gr import re from pathlib import Path from typing import Dict, Optional, Tuple, Generator # Fix for numpy binary incompatibility os.environ["PYTHONWARNINGS"] = "ignore::RuntimeWarning" class EnhancedS2SPipeline: """ Enhanced Speech-to-Speech Translation Pipeline with dynamic model loading """ def __init__(self, device="cuda"): """ Initialize the pipeline with dynamic model loading capability Args: device: Device to run inference on ('cuda' or 'cpu') """ self.device = device if torch.cuda.is_available() else "cpu" self.compute_type = "float16" if self.device == "cuda" else "int8" # Model caches self.whisper_models: Dict[str, WhisperModel] = {} self.nllb_models: Dict[str, ctranslate2.Translator] = {} self.nllb_tokenizer = None self.tts_models = {} self.tts_tokenizers = {} # Model configurations - Updated for HuggingFace Spaces self.model_configs = { "whisper": { "tiny": {"size": "39 MB", "speed": "Very Fast", "accuracy": "Good", "multilingual": True}, "base": {"size": "74 MB", "speed": "Fast", "accuracy": "Better", "multilingual": True}, "small": {"size": "244 MB", "speed": "Medium", "accuracy": "Good", "multilingual": True}, "medium": {"size": "769 MB", "speed": "Slow", "accuracy": "Very Good", "multilingual": True} }, "nllb": { "600M": { "path": "./models/nllb-200-distilled-600M-ct2-int8", "size": "600M parameters", "speed": "Fast", "accuracy": "Good", "languages": "200+ languages" }, "1.3B": { "path": "./models/nllb-200-distilled-1.3B-ct2-int8", "size": "1.3B parameters", "speed": "Medium", "accuracy": "Better", "languages": "200+ languages" } } } # Language code mappings for NLLB self.lang_codes = { "English": "eng_Latn", # English "French": "fra_Latn", # French } # TTS language mapping self.tts_lang_codes = { "English": "eng", "French": "fra" } print(f"Enhanced Speech-to-Speech pipeline initialized on {self.device}") # Initialize TTS models (these are relatively small, so we can load them upfront) self._initialize_tts_models() # Initialize tokenizer self._initialize_nllb_tokenizer() def _initialize_tts_models(self): """Initialize TTS models for all supported languages""" print("Loading MMS-TTS models for English and French...") try: from transformers.models.vits.modeling_vits import VitsModel from transformers.models.vits.tokenization_vits import VitsTokenizer # Load English TTS model print("Loading English TTS model...") self.tts_models["English"] = VitsModel.from_pretrained( "facebook/mms-tts-eng", torch_dtype=torch.float16 if self.device == "cuda" else torch.float32 ).to(self.device) self.tts_tokenizers["English"] = VitsTokenizer.from_pretrained("facebook/mms-tts-eng") # Load French TTS model print("Loading French TTS model...") self.tts_models["French"] = VitsModel.from_pretrained( "facebook/mms-tts-fra", torch_dtype=torch.float16 if self.device == "cuda" else torch.float32 ).to(self.device) self.tts_tokenizers["French"] = VitsTokenizer.from_pretrained("facebook/mms-tts-fra") print("TTS models loaded successfully.") except Exception as e: print(f"Error loading TTS models: {e}") print("TTS functionality may be limited.") def _initialize_nllb_tokenizer(self): """Initialize NLLB tokenizer with fallback""" try: print("Loading NLLB tokenizer...") from transformers.models.nllb.tokenization_nllb import NllbTokenizer self.nllb_tokenizer = NllbTokenizer.from_pretrained("facebook/nllb-200-distilled-600M") print("NLLB tokenizer loaded successfully.") except Exception as e: print(f"Error loading NLLB tokenizer: {e}") print("Implementing simplified fallback tokenizer...") self.nllb_tokenizer = self._create_fallback_tokenizer() def _create_fallback_tokenizer(self): """Create a simplified fallback tokenizer for NLLB""" import json import requests class SimplifiedNllbTokenizer: def __init__(self): self.src_lang = "eng_Latn" cache_dir = Path.home() / ".cache" / "simplified_nllb_tokenizer" cache_dir.mkdir(parents=True, exist_ok=True) vocab_file = cache_dir / "vocab.json" if not vocab_file.exists(): print("Downloading NLLB vocabulary for fallback tokenizer...") url = "https://huggingface.co/facebook/nllb-200-distilled-600M/resolve/main/vocab.json" try: response = requests.get(url) response.raise_for_status() with open(vocab_file, 'wb') as f: f.write(response.content) print("Vocabulary downloaded successfully.") except requests.exceptions.RequestException as req_e: print(f"Failed to download vocabulary: {req_e}") with open(vocab_file, 'w') as f: json.dump({"[PAD]": 0, "[UNK]": 1}, f) with open(vocab_file, 'r', encoding='utf-8') as f: self.vocab = json.load(f) self.id_to_token = {v: k for k, v in self.vocab.items()} def tokenize(self, text): text = text.lower() tokens = re.findall(r'\w+|[^\w\s]', text) return tokens def convert_tokens_to_ids(self, tokens): return [self.vocab.get(token, self.vocab.get("[UNK]", 1)) for token in tokens] def convert_ids_to_tokens(self, ids): return [self.id_to_token.get(id, "[UNK]") for id in ids] def decode(self, token_ids, skip_special_tokens=True): tokens = [self.id_to_token.get(id, "[UNK]") for id in token_ids] if skip_special_tokens: tokens = [t for t in tokens if not t.startswith("[") and not t.endswith("]")] return " ".join(tokens) def __call__(self, text, return_tensors=None, padding=False): tokens = self.tokenize(text) input_ids = self.convert_tokens_to_ids(tokens) if return_tensors == "pt": import torch return {"input_ids": torch.tensor([input_ids])} else: return {"input_ids": [input_ids]} return SimplifiedNllbTokenizer() def get_whisper_model(self, model_size: str) -> WhisperModel: """Get or load Whisper model""" if model_size not in self.whisper_models: print(f"Loading Whisper model '{model_size}'...") # Try to load from local models directory first model_path = f"./models/whisper/{model_size}.pt" if os.path.exists(model_path): print(f"Loading Whisper model from local path: {model_path}") self.whisper_models[model_size] = WhisperModel( model_path, device=self.device, compute_type=self.compute_type ) else: # Fallback to HuggingFace Hub print(f"Loading Whisper model from HuggingFace Hub: {model_size}") self.whisper_models[model_size] = WhisperModel( model_size, device=self.device, compute_type=self.compute_type ) print(f"Whisper '{model_size}' loaded successfully.") return self.whisper_models[model_size] def get_nllb_model(self, model_size: str) -> ctranslate2.Translator: """Get or load NLLB model""" if model_size not in self.nllb_models: model_path = self.model_configs["nllb"][model_size]["path"] print(f"Loading NLLB model '{model_size}' from {model_path}...") try: self.nllb_models[model_size] = ctranslate2.Translator( model_path, device=self.device, compute_type=self.compute_type ) print(f"NLLB '{model_size}' loaded successfully.") except RuntimeError as e: print(f"ERROR: Failed to load NLLB model from '{model_path}'.") print(f"Please ensure the path is correct and contains model files.") raise return self.nllb_models[model_size] def transcribe_realtime(self, audio_file, source_lang=None, whisper_model="tiny", vad_filter=False, beam_size=5, temperature=0.0): """Enhanced transcription with configurable parameters""" print(f"\n1. Transcribing with Whisper-{whisper_model}...") start_time = time.time() # Get Whisper model whisper = self.get_whisper_model(whisper_model) # Determine language code for Whisper whisper_lang = None if source_lang: whisper_lang = "en" if source_lang == "English" else "fr" if source_lang == "French" else None full_transcript = "" # Configure transcription parameters transcribe_params = { "language": whisper_lang, "beam_size": beam_size, "vad_filter": vad_filter, "word_timestamps": False } if temperature > 0: transcribe_params["temperature"] = temperature segments_generator, info = whisper.transcribe(audio_file, **transcribe_params) yield "", info.language if info else None for segment in segments_generator: full_transcript += segment.text + " " yield full_transcript.strip(), info.language if info else None elapsed_time = time.time() - start_time print(f"Transcription completed in {elapsed_time:.2f}s with {whisper_model}") print(f"Detected language: {info.language} (confidence: {info.language_probability:.4f})") yield full_transcript.strip(), info.language if info else None def translate_realtime(self, text_to_translate, source_lang, target_lang, nllb_model="600M", beam_size=4, length_penalty=1.0, repetition_penalty=1.0): """Enhanced translation with configurable parameters""" print(f"\n2. Translating with NLLB-{nllb_model}...") start_time = time.time() # Get NLLB model translator = self.get_nllb_model(nllb_model) src_lang_nllb = self.lang_codes.get(source_lang) tgt_lang_nllb = self.lang_codes.get(target_lang) if not src_lang_nllb or not tgt_lang_nllb: raise ValueError(f"Unsupported language pair: {source_lang} -> {target_lang}") self.nllb_tokenizer.src_lang = src_lang_nllb # Split into sentences sentences = re.findall(r'[^.!?]+[.!?]', text_to_translate + ('.' if not text_to_translate.endswith(('.', '!', '?')) else '')) if not sentences: sentences = [text_to_translate] full_translation = "" for i, sentence in enumerate(sentences): if not sentence.strip(): continue try: tokenizer_output = self.nllb_tokenizer(sentence, return_tensors="pt", padding=True) source_tokens = tokenizer_output["input_ids"].tolist()[0] source_tokens_as_str = self.nllb_tokenizer.convert_ids_to_tokens(source_tokens) target_prefix = [tgt_lang_nllb] # Use configured parameters result = translator.translate_batch( [source_tokens_as_str], target_prefix=[target_prefix], beam_size=beam_size, length_penalty=length_penalty, repetition_penalty=repetition_penalty, max_batch_size=32 )[0] tgt_tokens = result.hypotheses[0][1:] if len(result.hypotheses[0]) > 1 else result.hypotheses[0] chunk_translation = self.nllb_tokenizer.decode( self.nllb_tokenizer.convert_tokens_to_ids(tgt_tokens), skip_special_tokens=True ) full_translation += chunk_translation + " " yield full_translation.strip() except Exception as e: print(f"Error translating sentence {i+1}: {e}") error_msg = f"[Translation error for segment {i+1}] " full_translation += error_msg yield full_translation.strip() elapsed_time = time.time() - start_time print(f"Translation completed in {elapsed_time:.2f}s with NLLB-{nllb_model}") yield full_translation.strip() def synthesize(self, text, target_lang, output_file="output.wav", speaking_rate=1.0): """Enhanced synthesis with speaking rate control""" print(f"\n3. Synthesizing speech in {target_lang}...") start_time = time.time() if target_lang not in self.tts_models: raise ValueError(f"TTS for language {target_lang} not supported") model = self.tts_models[target_lang] tokenizer = self.tts_tokenizers[target_lang] # Process text in chunks MAX_LENGTH = 200 sentences = re.findall(r'[^.!?]+[.!?]', text + ('.' if not text.endswith(('.', '!', '?')) else '')) sentences = [s.strip() for s in sentences if s.strip()] current_chunk = "" text_chunks = [] for sentence in sentences: if len(current_chunk) + len(sentence) + 1 <= MAX_LENGTH: current_chunk += (" " if current_chunk else "") + sentence else: if current_chunk: text_chunks.append(current_chunk) current_chunk = sentence if current_chunk: text_chunks.append(current_chunk) if not text_chunks: text_chunks = [text] print(f"Text split into {len(text_chunks)} chunks for TTS") all_audio = [] for i, chunk in enumerate(text_chunks): try: inputs = tokenizer(text=chunk, return_tensors="pt") inputs = {k: v.to(self.device) for k, v in inputs.items()} torch.manual_seed(555 + i) with torch.no_grad(): output = model(**inputs).waveform chunk_audio = output.squeeze().cpu().float().numpy() # Apply speaking rate adjustment if speaking_rate != 1.0: from scipy.signal import resample new_length = int(len(chunk_audio) / speaking_rate) chunk_audio = resample(chunk_audio, new_length) all_audio.append(chunk_audio) except Exception as e: print(f"Error generating speech for chunk {i+1}: {e}") # Combine audio chunks if all_audio: try: audio_data = np.concatenate(all_audio) except Exception as e: print(f"Error concatenating audio: {e}") audio_data = all_audio[0] if all_audio else np.zeros(16000, dtype=np.float32) else: audio_data = np.zeros(16000, dtype=np.float32) # Ensure float32 format if audio_data.dtype != np.float32: audio_data = audio_data.astype(np.float32) # Normalize and convert if np.max(np.abs(audio_data)) > 0: audio_data = audio_data / np.max(np.abs(audio_data)) audio_data_int16 = (audio_data * 32767).astype(np.int16) # Save to file sampling_rate = model.config.sampling_rate scipy.io.wavfile.write(output_file, rate=sampling_rate, data=audio_data_int16) elapsed_time = time.time() - start_time audio_duration = len(audio_data) / sampling_rate print(f"Speech synthesis completed in {elapsed_time:.2f}s") print(f"Generated {audio_duration:.2f}s of audio (RTF: {elapsed_time/audio_duration:.2f}x)") return output_file, audio_duration def process_speech_to_speech_realtime(self, audio_file, source_lang, target_lang, whisper_model="tiny", nllb_model="600M", whisper_beam_size=5, whisper_temperature=0.0, vad_filter=False, nllb_beam_size=4, length_penalty=1.0, repetition_penalty=1.0, speaking_rate=1.0, output_file=None): """Complete pipeline with all configurable parameters""" if output_file is None: output_file = f"output_{source_lang}_to_{target_lang}_{int(time.time())}.wav" print(f"\n===== ENHANCED SPEECH-TO-SPEECH TRANSLATION =====") print(f"Models: Whisper-{whisper_model}, NLLB-{nllb_model}") print(f"Languages: {source_lang} -> {target_lang}") total_start_time = time.time() current_transcript = "" current_translation = "" detected_lang = None output_path = None audio_duration = 0 success = False try: # Step 1: Transcribe yield "🎤 Transcribing audio...", "", "", None for partial_transcript, lang in self.transcribe_realtime( audio_file, source_lang, whisper_model, vad_filter, whisper_beam_size, whisper_temperature ): current_transcript = partial_transcript detected_lang = lang yield "🎤 Transcribing audio...", current_transcript, current_translation, None # Step 2: Translate yield "🔄 Translating text...", current_transcript, current_translation, None for partial_translation in self.translate_realtime( current_transcript, source_lang, target_lang, nllb_model, nllb_beam_size, length_penalty, repetition_penalty ): current_translation = partial_translation yield "🔄 Translating text...", current_transcript, current_translation, None # Step 3: Synthesize yield "🔊 Synthesizing speech...", current_transcript, current_translation, None output_path, audio_duration = self.synthesize( current_translation, target_lang, output_file, speaking_rate ) success = True except Exception as e: print(f"ERROR in pipeline: {e}") import traceback traceback.print_exc() success = False current_transcript = "❌ Transcription failed" current_translation = "❌ Translation failed" output_path = None total_elapsed_time = time.time() - total_start_time if success: status = (f"✅ Success! Total time: {total_elapsed_time:.2f}s, " f"Audio: {audio_duration:.2f}s") else: status = "❌ Processing failed" print(f"\n===== TRANSLATION {'COMPLETED' if success else 'FAILED'} =====") yield status, current_transcript, current_translation, output_path def create_enhanced_gradio_interface(): """Create enhanced Gradio interface with model selection and advanced options""" # Initialize pipeline pipeline = EnhancedS2SPipeline() def get_model_info(model_type, model_name): """Get model information for display""" config = pipeline.model_configs[model_type][model_name] if model_type == "whisper": return f"**{model_name.upper()}** - Size: {config['size']}, Speed: {config['speed']}, Accuracy: {config['accuracy']}" else: return f"**{model_name}** - {config['size']}, Speed: {config['speed']}, Accuracy: {config['accuracy']}" def process_audio_enhanced(audio_file, source_lang_str, target_lang_str, whisper_model, nllb_model, whisper_beam_size, whisper_temperature, vad_filter, nllb_beam_size, length_penalty, repetition_penalty, speaking_rate): """Enhanced processing function with all parameters""" if audio_file is None: yield "❌ No audio provided", "No transcript available", "No translation available", None return for status, transcript, translation, output_audio in pipeline.process_speech_to_speech_realtime( audio_file=audio_file, source_lang=source_lang_str, target_lang=target_lang_str, whisper_model=whisper_model, nllb_model=nllb_model, whisper_beam_size=whisper_beam_size, whisper_temperature=whisper_temperature, vad_filter=vad_filter, nllb_beam_size=nllb_beam_size, length_penalty=length_penalty, repetition_penalty=repetition_penalty, speaking_rate=speaking_rate ): yield status, transcript, translation, output_audio # Create the interface with gr.Blocks(title="Enhanced Speech-to-Speech Translation", theme=gr.themes.Soft()) as demo: gr.Markdown("# 🎙️ Enhanced Speech-to-Speech Translation") gr.Markdown("Advanced AI-powered speech translation with configurable models and parameters.") with gr.Row(): with gr.Column(scale=1): gr.Markdown("### 📥 Input Configuration") audio_input = gr.Audio( sources=["microphone", "upload"], type="filepath", label="🎵 Upload or Record Audio" ) with gr.Row(): source_lang = gr.Radio( choices=["English", "French"], value="English", label="📢 Source Language" ) target_lang = gr.Radio( choices=["English", "French"], value="French", label="🎯 Target Language" ) gr.Markdown("### 🧠 Model Selection") with gr.Accordion("🎤 Whisper ASR Model", open=True): whisper_model = gr.Radio( choices=["tiny", "base", "small", "medium"], value="tiny", label="Model Size" ) whisper_info = gr.Markdown(get_model_info("whisper", "tiny")) with gr.Accordion("🔄 NLLB Translation Model", open=True): nllb_model = gr.Radio( choices=["600M", "1.3B"], value="600M", label="Model Size" ) nllb_info = gr.Markdown(get_model_info("nllb", "600M")) with gr.Accordion("⚙️ Advanced Settings", open=False): gr.Markdown("**Whisper Parameters**") whisper_beam_size = gr.Slider(1, 10, value=5, step=1, label="Beam Size") whisper_temperature = gr.Slider(0.0, 1.0, value=0.0, step=0.1, label="Temperature") vad_filter = gr.Checkbox(label="Voice Activity Detection", value=False) gr.Markdown("**Translation Parameters**") nllb_beam_size = gr.Slider(1, 8, value=4, step=1, label="Beam Size") length_penalty = gr.Slider(0.5, 2.0, value=1.0, step=0.1, label="Length Penalty") repetition_penalty = gr.Slider(0.5, 2.0, value=1.0, step=0.1, label="Repetition Penalty") gr.Markdown("**Speech Synthesis**") speaking_rate = gr.Slider(0.5, 2.0, value=1.0, step=0.1, label="Speaking Rate") process_btn = gr.Button("🚀 Translate", variant="primary", size="lg") with gr.Column(scale=1): gr.Markdown("### 📤 Results") status_output = gr.Textbox(label="📊 Status", interactive=False) with gr.Tabs(): with gr.TabItem("📝 Text Results"): transcript_output = gr.Textbox( label="🎤 Original Transcript", lines=6, interactive=False ) translation_output = gr.Textbox( label="🔄 Translation", lines=6, interactive=False ) with gr.TabItem("🔊 Audio Output"): audio_output = gr.Audio( type="filepath", label="🔊 Translated Speech" ) # Example section with gr.Row(): gr.Markdown("### 🎵 Try Our Examples") with gr.Row(): gr.Examples( examples=[ ["./examples/input_audio/eng1.wav", "English", "French", "tiny", "600M"], ["./examples/input_audio/fr1.wav", "French", "English", "tiny", "600M"], ["./examples/input_audio/eng2.wav", "English", "French", "base", "600M"] ] if os.path.exists("./examples") else [], inputs=[audio_input, source_lang, target_lang, whisper_model, nllb_model], label="Sample Audio Files" ) # Model info update functions def update_whisper_info(model): return get_model_info("whisper", model) def update_nllb_info(model): return get_model_info("nllb", model) # Connect update functions whisper_model.change(update_whisper_info, whisper_model, whisper_info) nllb_model.change(update_nllb_info, nllb_model, nllb_info) # Main processing function process_btn.click( fn=process_audio_enhanced, inputs=[ audio_input, source_lang, target_lang, whisper_model, nllb_model, whisper_beam_size, whisper_temperature, vad_filter, nllb_beam_size, length_penalty, repetition_penalty, speaking_rate ], outputs=[status_output, transcript_output, translation_output, audio_output] ) # Information sections with gr.Accordion("📚 Model Information", open=False): gr.Markdown(""" ### 🎤 Whisper Models (OpenAI) - **Tiny**: Fastest, smallest model. Good for quick transcription. - **Base**: Balanced speed and accuracy. Recommended for most use cases. - **Small**: Better accuracy, moderate speed. Good for important content. - **Medium**: High accuracy, slower processing. Professional applications. ### 🔄 NLLB Models (Meta) - **600M**: Faster translation with good quality. Supports 200+ languages. - **1.3B**: Better translation quality with more parameters. Higher accuracy. ### 🔊 MMS-TTS (Meta) - High-quality multilingual text-to-speech synthesis - Supports natural-sounding voice generation - Optimized for English and French """) with gr.Accordion("⚙️ Parameter Guide", open=False): gr.Markdown(""" ### Whisper Parameters - **Beam Size**: Higher values = better accuracy, slower processing (1-10) - **Temperature**: Higher values = more diverse outputs (0.0-1.0) - **VAD Filter**: Removes silence automatically (may require additional dependencies) ### Translation Parameters - **Beam Size**: Search breadth for translation (1-8) - **Length Penalty**: Controls output length preference (0.5-2.0) - **Repetition Penalty**: Reduces repetitive translations (0.5-2.0) ### Speech Synthesis - **Speaking Rate**: Playback speed multiplier (0.5-2.0) """) with gr.Accordion("🔧 Usage Instructions", open=False): gr.Markdown(""" 1. **Upload/Record**: Add your audio file or record directly 2. **Select Languages**: Choose source and target languages 3. **Choose Models**: Select model sizes based on your speed/quality needs 4. **Adjust Settings**: Fine-tune advanced parameters if needed 5. **Translate**: Click the translate button and watch real-time progress 6. **Download**: Save the translated audio file **Tips:** - Use smaller models for faster processing - Use larger models for better quality - Adjust beam sizes for quality vs speed trade-off - Speaking rate can make output faster or slower """) return demo # Launch the application if __name__ == "__main__": demo = create_enhanced_gradio_interface() demo.launch()