import gradio as gr import tempfile import os import shutil from moviepy.editor import VideoFileClip, AudioFileClip from faster_whisper import WhisperModel import torch import torchaudio as ta import torchaudio.transforms as transforms from chatterbox.mtl_tts import ChatterboxMultilingualTTS, SUPPORTED_LANGUAGES import logging from typing import List, Dict from deep_translator import GoogleTranslator # Try to import spaces for ZeroGPU support (Hugging Face Spaces) try: import spaces SPACES_AVAILABLE = True except ImportError: SPACES_AVAILABLE = False logger_temp = logging.getLogger(__name__) logger_temp.info("spaces library not available - running without ZeroGPU support") # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Configuration - Auto-detect GPU # Note: faster-whisper uses ctranslate2 which doesn't work well with ZeroGPU, # so we always use CPU for Whisper. TTS will use GPU when available. if torch.cuda.is_available() and not SPACES_AVAILABLE: # Only use GPU for local CUDA setups, not ZeroGPU TTS_DEVICE = "cuda" logger_temp = logging.getLogger(__name__) logger_temp.info(f"🚀 GPU detected! Using CUDA with {torch.cuda.get_device_name(0)} for TTS") else: TTS_DEVICE = "cpu" logger_temp = logging.getLogger(__name__) if SPACES_AVAILABLE: logger_temp.info("🚀 Running on ZeroGPU - TTS will use GPU inside decorated function") else: logger_temp.info("Running on CPU") # Whisper always uses CPU (ctranslate2 compatibility) WHISPER_DEVICE = "cpu" WHISPER_COMPUTE_TYPE = "int8" # Set temp directory to writable location os.environ['TMPDIR'] = '/tmp' tempfile.tempdir = '/tmp' # Patch torch.load to force CPU mapping torch_load_orig = torch.load def torch_load_cpu(*args, **kwargs): kwargs["map_location"] = torch.device("cpu") return torch_load_orig(*args, **kwargs) torch.load = torch_load_cpu # Global models (loaded once) whisper_model = None tts_model = None # ==================== Model Loading ==================== def load_models(): """Load models (lazy loading for ZeroGPU compatibility)""" global whisper_model, tts_model if whisper_model is None: logger.info("Loading Whisper model...") whisper_model = WhisperModel( "small", device=WHISPER_DEVICE, compute_type=WHISPER_COMPUTE_TYPE, cpu_threads=4 ) logger.info("✅ Whisper model loaded!") if tts_model is None: logger.info("Loading TTS model...") # In ZeroGPU, determine device at runtime tts_device = "cuda" if (SPACES_AVAILABLE and torch.cuda.is_available()) else TTS_DEVICE tts_model = ChatterboxMultilingualTTS.from_pretrained(device=tts_device) logger.info(f"✅ TTS model loaded on {tts_device}!") return whisper_model, tts_model # ==================== TTS Processing ==================== def generate_translated_audio( reference_audio_path: str, segments: List[Dict], output_path: str, tts_model, progress=gr.Progress(), silence_duration: float = 0.5, target_language: str = "en" ) -> str: """Generate translated audio using Chatterbox TTS with progress updates""" try: progress(0, desc=f"Generating TTS for {len(segments)} segments...") all_wavs = [] silence_samples = int(silence_duration * tts_model.sr) silence = torch.zeros(1, silence_samples) total_segments = len(segments) for counter, segment in enumerate(segments): # Update progress prog = (counter + 1) / total_segments text_preview = segment['translated_text'][:50] progress(prog, desc=f"Processing segment {counter + 1}/{total_segments}: {text_preview}...") original_duration = segment['end'] - segment['start'] logger.info(f"Generating audio for text: {segment['translated_text']}") # Send heartbeat progress update before generation progress(prog, desc=f"🎙️ Generating audio for segment {counter + 1}/{total_segments}...") # Generate audio for this segment wav = tts_model.generate( segment['translated_text'], language_id = target_language, audio_prompt_path=reference_audio_path, exaggeration=0.2, cfg_weight=0.8, temperature=0.4, repetition_penalty=1.2, min_p=0.05, top_p=0.9 ) generated_duration = wav.shape[-1] / tts_model.sr # Add leading silence for the first segment (from 0.0 to segment start) if counter == 0 and segment['start'] > 0: leading_silence_duration = segment['start'] leading_silence_samples = int(leading_silence_duration * tts_model.sr) leading_silence = torch.zeros((wav.shape[0], leading_silence_samples), dtype=wav.dtype, device=wav.device) all_wavs.append(leading_silence) # Handle duration matching if generated_duration < original_duration: # Generated audio is shorter - add it as is all_wavs.append(wav) # Add trailing silence to match original segment duration trailing_silence_duration = original_duration - generated_duration trailing_silence_samples = int(trailing_silence_duration * tts_model.sr) if trailing_silence_samples > 0: trailing_silence = torch.zeros((wav.shape[0], trailing_silence_samples), dtype=wav.dtype, device=wav.device) all_wavs.append(trailing_silence) elif generated_duration > original_duration: # Generated audio is longer - speed it up to fit speed_factor = generated_duration / original_duration speed_transform = transforms.Speed(tts_model.sr, speed_factor) wav_adjusted, _ = speed_transform(wav) all_wavs.append(wav_adjusted) else: # Duration matches perfectly all_wavs.append(wav) # Add silence between segments (not after the last segment) if counter < len(segments) - 1: next_segment = segments[counter + 1] gap_duration = next_segment['start'] - segment['end'] if gap_duration > 0: gap_samples = int(gap_duration * tts_model.sr) gap_silence = torch.zeros((wav.shape[0], gap_samples), dtype=wav.dtype, device=wav.device) all_wavs.append(gap_silence) # Save output progress(0.95, desc="Combining audio segments...") combined_wav = torch.cat(all_wavs, dim=-1) ta.save(output_path, combined_wav, tts_model.sr) total_duration = combined_wav.shape[-1] / tts_model.sr logger.info(f"TTS completed! Total duration: {total_duration:.2f}s") progress(1.0, desc="TTS generation completed!") return output_path except Exception as e: logger.exception("Error generating TTS audio") raise # ==================== Helper Functions ==================== def audio_extractor(video_path): """Extract audio from video""" video_clip = VideoFileClip(video_path) audio_clip = video_clip.audio temp_file = tempfile.NamedTemporaryFile(suffix='.wav', delete=False, dir='/tmp') full_audio_path = temp_file.name temp_file.close() audio_clip.write_audiofile(full_audio_path, codec='pcm_s16le', logger=None) audio_clip.close() video_clip.close() return full_audio_path def transcribe(full_audio_path, whisper_model, progress=None): """Transcribe audio using faster-whisper""" if progress: progress(0, desc="Transcribing audio...") # faster-whisper transcription segments_generator, info = whisper_model.transcribe( full_audio_path, beam_size=5, word_timestamps=True, vad_filter=False, # vad_parameters=dict(min_silence_duration_ms=500) ) detected_language = info.language if progress: progress(0, desc=f"Detected language: {detected_language}") # Convert generator to list and format segments segments = [] for segment in segments_generator: seg_dict = { "start": segment.start, "end": segment.end, "text": segment.text.strip(), "words": [] } # Add word-level timestamps if available if segment.words: for word in segment.words: seg_dict["words"].append({ "word": word.word, "start": word.start, "end": word.end }) segments.append(seg_dict) result = { "segments": segments, "language": detected_language, "language_code": detected_language } if progress: progress(0, desc=f"Transcribed {len(segments)} segments") return result def translate_segments(segments: List[Dict], target_lang: str) -> List[Dict]: """Translate segments to target language using deep-translator""" results = [] translator = GoogleTranslator(source='auto', target=target_lang) for seg in segments: clean_seg = {k: v for k, v in seg.items() if k != "words"} if not clean_seg["text"] or clean_seg["text"].isspace(): translated_text = "" else: translated_text = translator.translate(clean_seg["text"]) clean_seg["translated_text"] = translated_text results.append(clean_seg) return results def replace_video_audio(video_path, new_audio_path, output_video_path): """Replace video audio with proper temp file handling""" # Set MoviePy temp directory os.environ['FFMPEG_BINARY'] = 'ffmpeg' video_clip = VideoFileClip(video_path) new_audio_clip = AudioFileClip(new_audio_path) video_duration = video_clip.duration audio_duration = new_audio_clip.duration if audio_duration < video_duration: final_video = video_clip.subclip(0, audio_duration) final_audio = new_audio_clip elif audio_duration > video_duration: final_video = video_clip final_audio = new_audio_clip.subclip(0, video_duration) else: final_video = video_clip final_audio = new_audio_clip final_clip = final_video.set_audio(final_audio) # Write with explicit temp audiofile location final_clip.write_videofile( output_video_path, codec='libx264', audio_codec='aac', temp_audiofile=f'/tmp/temp-audio-{os.getpid()}.m4a', remove_temp=True, logger=None ) video_clip.close() new_audio_clip.close() final_audio.close() final_video.close() final_clip.close() def format_transcription(transcription, translated_segments): """Format transcription for display""" output = "" for i, seg in enumerate(translated_segments): output += f"**Segment {i+1}** ({seg['start']:.2f}s - {seg['end']:.2f}s)\n" output += f"*Original:* {transcription['segments'][i]['text']}\n" output += f"*Translated:* {seg['translated_text']}\n" output += "---\n" return output # ==================== Main Processing Function ==================== # Apply ZeroGPU decorator if available (for Hugging Face Spaces) if SPACES_AVAILABLE: @spaces.GPU def process_video(video_file, target_language, progress=gr.Progress()): """Main processing function for Gradio""" if video_file is None: return None, "Please upload a video file.", "" temp_dir = tempfile.mkdtemp(dir='/tmp') try: # Load models progress(0.05, desc="Loading models...") whisper_mdl, tts_mdl = load_models() # Copy uploaded video to temp directory input_video_path = os.path.join(temp_dir, "input_video.mp4") shutil.copy(video_file, input_video_path) # Extract audio progress(0.1, desc="Extracting audio from video...") audio_path = audio_extractor(input_video_path) # Transcribe progress(0.2, desc="Transcribing audio...") transcription = transcribe(audio_path, whisper_mdl, progress) status_msg = f"✅ Transcribed {len(transcription['segments'])} segments\n" # Translate progress(0.4, desc="Translating segments...") translated_segments = translate_segments(transcription['segments'], target_language) status_msg += f"✅ Translated {len(translated_segments)} segments\n" # Generate TTS progress(0.5, desc="Generating voice-cloned audio...") output_audio_path = os.path.join(temp_dir, "translated_audio.wav") generate_translated_audio( reference_audio_path=audio_path, segments=translated_segments, output_path=output_audio_path, tts_model=tts_mdl, progress=progress, silence_duration=0.5, target_language=target_language ) status_msg += "✅ TTS audio generated successfully!\n" # Merge audio with video progress(0.9, desc="Merging audio with video...") output_video_path = os.path.join(temp_dir, "translated_video.mp4") replace_video_audio(input_video_path, output_audio_path, output_video_path) status_msg += "✅ Video translation completed successfully!" # Format transcription transcription_text = format_transcription(transcription, translated_segments) progress(1.0, desc="Complete!") return output_video_path, status_msg, transcription_text except Exception as e: logger.exception("Error in translation pipeline") return None, f"❌ Error: {str(e)}", "" finally: # Clean up audio file if it exists try: if 'audio_path' in locals() and os.path.exists(audio_path): os.remove(audio_path) except: pass else: def process_video(video_file, target_language, progress=gr.Progress()): """Main processing function for Gradio""" if video_file is None: return None, "Please upload a video file.", "" temp_dir = tempfile.mkdtemp(dir='/tmp') try: # Load models progress(0.05, desc="Loading models...") whisper_mdl, tts_mdl = load_models() # Copy uploaded video to temp directory input_video_path = os.path.join(temp_dir, "input_video.mp4") shutil.copy(video_file, input_video_path) # Extract audio progress(0.1, desc="Extracting audio from video...") audio_path = audio_extractor(input_video_path) # Transcribe progress(0.2, desc="Transcribing audio...") transcription = transcribe(audio_path, whisper_mdl, progress) status_msg = f"✅ Transcribed {len(transcription['segments'])} segments\n" # Translate progress(0.4, desc="Translating segments...") translated_segments = translate_segments(transcription['segments'], target_language) status_msg += f"✅ Translated {len(translated_segments)} segments\n" # Generate TTS progress(0.5, desc="Generating voice-cloned audio...") output_audio_path = os.path.join(temp_dir, "translated_audio.wav") generate_translated_audio( reference_audio_path=audio_path, segments=translated_segments, output_path=output_audio_path, tts_model=tts_mdl, progress=progress, silence_duration=0.5, target_language=target_language ) status_msg += "✅ TTS audio generated successfully!\n" # Merge audio with video progress(0.9, desc="Merging audio with video...") output_video_path = os.path.join(temp_dir, "translated_video.mp4") replace_video_audio(input_video_path, output_audio_path, output_video_path) status_msg += "✅ Video translation completed successfully!" # Format transcription transcription_text = format_transcription(transcription, translated_segments) progress(1.0, desc="Complete!") return output_video_path, status_msg, transcription_text except Exception as e: logger.exception("Error in translation pipeline") return None, f"❌ Error: {str(e)}", "" finally: # Clean up audio file if it exists try: if 'audio_path' in locals() and os.path.exists(audio_path): os.remove(audio_path) except: pass # ==================== Gradio Interface ==================== def create_interface(): """Create Gradio interface""" with gr.Blocks(title="Video Voice Translator", theme=gr.themes.Soft()) as demo: gr.Markdown( """ # 🎬 Video Voice Translator Upload a video, and we'll translate it to your target language while preserving the voice! """ ) with gr.Row(): with gr.Column(scale=1): gr.Markdown("### 📤 Upload Video") video_input = gr.Video(label="Choose a video file", height=550) target_language = gr.Dropdown( choices=[(name, code) for code, name in ChatterboxMultilingualTTS.get_supported_languages().items()], value="en", label="Target Language", info="Select the target language for text-to-speech synthesis" ) # gr.Markdown("### ⚙️ Configuration") # target_language = gr.Dropdown( # choices=[ # ("English", "en"), # ("Hindi", "hi"), # ("Spanish", "es"), # ("French", "fr"), # ("German", "de"), # ("Italian", "it"), # ("Portuguese", "pt"), # ("Russian", "ru"), # ("Japanese", "ja"), # ("Korean", "ko"), # ("Chinese (Simplified)", "zh-cn"), # ], # value="en", # label="Target Language", # type="value" # ) translate_btn = gr.Button("🚀 Start Translation", variant="primary", size="lg") gr.Markdown( """ ### About This app uses: - **faster-whisper** for transcription - **Google Translate** for translation - **Chatterbox** for voice cloning TTS All processing runs locally in this app. """ ) with gr.Column(scale=1): gr.Markdown("### 📥 Output") status_output = gr.Textbox(label="Status", lines=5, interactive=False) video_output = gr.Video(label="Translated Video", height=550) with gr.Accordion("📝 View Transcription & Translation", open=False): transcription_output = gr.Markdown() # Connect the button to the processing function translate_btn.click( fn=process_video, inputs=[video_input, target_language], outputs=[video_output, status_output, transcription_output] ).then( fn=lambda: gr.Button(interactive=True), outputs=[translate_btn] ) # Disable button when clicked translate_btn.click( fn=lambda: gr.Button(interactive=False), outputs=[translate_btn], queue=False ) gr.Markdown( """ --- **Note:** Processing time depends on video length and number of segments. Large videos may take several minutes to process. """ ) return demo # ==================== Main ==================== if __name__ == "__main__": # Load models at startup (except in ZeroGPU where GPU isn't available yet) if not SPACES_AVAILABLE: logger.info("Initializing models...") load_models() logger.info("Models loaded successfully!") else: logger.info("Running in ZeroGPU mode - models will be loaded on first request") # Create and launch interface # .queue() is essential for long-running tasks like model generation demo = create_interface() demo.queue(max_size=20, default_concurrency_limit=2).launch( server_name="0.0.0.0", server_port=7860, share=False )