Spaces:
Running
on
Zero
Running
on
Zero
| import gradio as gr | |
| import tempfile | |
| import os | |
| import shutil | |
| from moviepy.editor import VideoFileClip, AudioFileClip | |
| from faster_whisper import WhisperModel | |
| import torch | |
| import torchaudio as ta | |
| import torchaudio.transforms as transforms | |
| from chatterbox.mtl_tts import ChatterboxMultilingualTTS, SUPPORTED_LANGUAGES | |
| import logging | |
| from typing import List, Dict | |
| from deep_translator import GoogleTranslator | |
| # Try to import spaces for ZeroGPU support (Hugging Face Spaces) | |
| try: | |
| import spaces | |
| SPACES_AVAILABLE = True | |
| except ImportError: | |
| SPACES_AVAILABLE = False | |
| logger_temp = logging.getLogger(__name__) | |
| logger_temp.info("spaces library not available - running without ZeroGPU support") | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Configuration - Auto-detect GPU | |
| # Note: faster-whisper uses ctranslate2 which doesn't work well with ZeroGPU, | |
| # so we always use CPU for Whisper. TTS will use GPU when available. | |
| if torch.cuda.is_available() and not SPACES_AVAILABLE: | |
| # Only use GPU for local CUDA setups, not ZeroGPU | |
| TTS_DEVICE = "cuda" | |
| logger_temp = logging.getLogger(__name__) | |
| logger_temp.info(f"π GPU detected! Using CUDA with {torch.cuda.get_device_name(0)} for TTS") | |
| else: | |
| TTS_DEVICE = "cpu" | |
| logger_temp = logging.getLogger(__name__) | |
| if SPACES_AVAILABLE: | |
| logger_temp.info("π Running on ZeroGPU - TTS will use GPU inside decorated function") | |
| else: | |
| logger_temp.info("Running on CPU") | |
| # Whisper always uses CPU (ctranslate2 compatibility) | |
| WHISPER_DEVICE = "cpu" | |
| WHISPER_COMPUTE_TYPE = "int8" | |
| # Set temp directory to writable location | |
| os.environ['TMPDIR'] = '/tmp' | |
| tempfile.tempdir = '/tmp' | |
| # Patch torch.load to force CPU mapping | |
| torch_load_orig = torch.load | |
| def torch_load_cpu(*args, **kwargs): | |
| kwargs["map_location"] = torch.device("cpu") | |
| return torch_load_orig(*args, **kwargs) | |
| torch.load = torch_load_cpu | |
| # Global models (loaded once) | |
| whisper_model = None | |
| tts_model = None | |
| # ==================== Model Loading ==================== | |
| def load_models(): | |
| """Load models (lazy loading for ZeroGPU compatibility)""" | |
| global whisper_model, tts_model | |
| if whisper_model is None: | |
| logger.info("Loading Whisper model...") | |
| whisper_model = WhisperModel( | |
| "small", | |
| device=WHISPER_DEVICE, | |
| compute_type=WHISPER_COMPUTE_TYPE, | |
| cpu_threads=4 | |
| ) | |
| logger.info("β Whisper model loaded!") | |
| if tts_model is None: | |
| logger.info("Loading TTS model...") | |
| # In ZeroGPU, determine device at runtime | |
| tts_device = "cuda" if (SPACES_AVAILABLE and torch.cuda.is_available()) else TTS_DEVICE | |
| tts_model = ChatterboxMultilingualTTS.from_pretrained(device=tts_device) | |
| logger.info(f"β TTS model loaded on {tts_device}!") | |
| return whisper_model, tts_model | |
| # ==================== TTS Processing ==================== | |
| def generate_translated_audio( | |
| reference_audio_path: str, | |
| segments: List[Dict], | |
| output_path: str, | |
| tts_model, | |
| progress=gr.Progress(), | |
| silence_duration: float = 0.5, | |
| target_language: str = "en" | |
| ) -> str: | |
| """Generate translated audio using Chatterbox TTS with progress updates""" | |
| try: | |
| progress(0, desc=f"Generating TTS for {len(segments)} segments...") | |
| all_wavs = [] | |
| silence_samples = int(silence_duration * tts_model.sr) | |
| silence = torch.zeros(1, silence_samples) | |
| total_segments = len(segments) | |
| for counter, segment in enumerate(segments): | |
| # Update progress | |
| prog = (counter + 1) / total_segments | |
| text_preview = segment['translated_text'][:50] | |
| progress(prog, desc=f"Processing segment {counter + 1}/{total_segments}: {text_preview}...") | |
| original_duration = segment['end'] - segment['start'] | |
| logger.info(f"Generating audio for text: {segment['translated_text']}") | |
| # Send heartbeat progress update before generation | |
| progress(prog, desc=f"ποΈ Generating audio for segment {counter + 1}/{total_segments}...") | |
| # Generate audio for this segment | |
| wav = tts_model.generate( | |
| segment['translated_text'], | |
| language_id = target_language, | |
| audio_prompt_path=reference_audio_path, | |
| exaggeration=0.2, | |
| cfg_weight=0.8, | |
| temperature=0.4, | |
| repetition_penalty=1.2, | |
| min_p=0.05, | |
| top_p=0.9 | |
| ) | |
| generated_duration = wav.shape[-1] / tts_model.sr | |
| # Add leading silence for the first segment (from 0.0 to segment start) | |
| if counter == 0 and segment['start'] > 0: | |
| leading_silence_duration = segment['start'] | |
| leading_silence_samples = int(leading_silence_duration * tts_model.sr) | |
| leading_silence = torch.zeros((wav.shape[0], leading_silence_samples), dtype=wav.dtype, device=wav.device) | |
| all_wavs.append(leading_silence) | |
| # Handle duration matching | |
| if generated_duration < original_duration: | |
| # Generated audio is shorter - add it as is | |
| all_wavs.append(wav) | |
| # Add trailing silence to match original segment duration | |
| trailing_silence_duration = original_duration - generated_duration | |
| trailing_silence_samples = int(trailing_silence_duration * tts_model.sr) | |
| if trailing_silence_samples > 0: | |
| trailing_silence = torch.zeros((wav.shape[0], trailing_silence_samples), dtype=wav.dtype, device=wav.device) | |
| all_wavs.append(trailing_silence) | |
| elif generated_duration > original_duration: | |
| # Generated audio is longer - speed it up to fit | |
| speed_factor = generated_duration / original_duration | |
| speed_transform = transforms.Speed(tts_model.sr, speed_factor) | |
| wav_adjusted, _ = speed_transform(wav) | |
| all_wavs.append(wav_adjusted) | |
| else: | |
| # Duration matches perfectly | |
| all_wavs.append(wav) | |
| # Add silence between segments (not after the last segment) | |
| if counter < len(segments) - 1: | |
| next_segment = segments[counter + 1] | |
| gap_duration = next_segment['start'] - segment['end'] | |
| if gap_duration > 0: | |
| gap_samples = int(gap_duration * tts_model.sr) | |
| gap_silence = torch.zeros((wav.shape[0], gap_samples), dtype=wav.dtype, device=wav.device) | |
| all_wavs.append(gap_silence) | |
| # Save output | |
| progress(0.95, desc="Combining audio segments...") | |
| combined_wav = torch.cat(all_wavs, dim=-1) | |
| ta.save(output_path, combined_wav, tts_model.sr) | |
| total_duration = combined_wav.shape[-1] / tts_model.sr | |
| logger.info(f"TTS completed! Total duration: {total_duration:.2f}s") | |
| progress(1.0, desc="TTS generation completed!") | |
| return output_path | |
| except Exception as e: | |
| logger.exception("Error generating TTS audio") | |
| raise | |
| # ==================== Helper Functions ==================== | |
| def audio_extractor(video_path): | |
| """Extract audio from video""" | |
| video_clip = VideoFileClip(video_path) | |
| audio_clip = video_clip.audio | |
| temp_file = tempfile.NamedTemporaryFile(suffix='.wav', delete=False, dir='/tmp') | |
| full_audio_path = temp_file.name | |
| temp_file.close() | |
| audio_clip.write_audiofile(full_audio_path, codec='pcm_s16le', logger=None) | |
| audio_clip.close() | |
| video_clip.close() | |
| return full_audio_path | |
| def transcribe(full_audio_path, whisper_model, progress=None): | |
| """Transcribe audio using faster-whisper""" | |
| if progress: | |
| progress(0, desc="Transcribing audio...") | |
| # faster-whisper transcription | |
| segments_generator, info = whisper_model.transcribe( | |
| full_audio_path, | |
| beam_size=5, | |
| word_timestamps=True, | |
| vad_filter=False, | |
| # vad_parameters=dict(min_silence_duration_ms=500) | |
| ) | |
| detected_language = info.language | |
| if progress: | |
| progress(0, desc=f"Detected language: {detected_language}") | |
| # Convert generator to list and format segments | |
| segments = [] | |
| for segment in segments_generator: | |
| seg_dict = { | |
| "start": segment.start, | |
| "end": segment.end, | |
| "text": segment.text.strip(), | |
| "words": [] | |
| } | |
| # Add word-level timestamps if available | |
| if segment.words: | |
| for word in segment.words: | |
| seg_dict["words"].append({ | |
| "word": word.word, | |
| "start": word.start, | |
| "end": word.end | |
| }) | |
| segments.append(seg_dict) | |
| result = { | |
| "segments": segments, | |
| "language": detected_language, | |
| "language_code": detected_language | |
| } | |
| if progress: | |
| progress(0, desc=f"Transcribed {len(segments)} segments") | |
| return result | |
| def translate_segments(segments: List[Dict], target_lang: str) -> List[Dict]: | |
| """Translate segments to target language using deep-translator""" | |
| results = [] | |
| translator = GoogleTranslator(source='auto', target=target_lang) | |
| for seg in segments: | |
| clean_seg = {k: v for k, v in seg.items() if k != "words"} | |
| if not clean_seg["text"] or clean_seg["text"].isspace(): | |
| translated_text = "" | |
| else: | |
| translated_text = translator.translate(clean_seg["text"]) | |
| clean_seg["translated_text"] = translated_text | |
| results.append(clean_seg) | |
| return results | |
| def replace_video_audio(video_path, new_audio_path, output_video_path): | |
| """Replace video audio with proper temp file handling""" | |
| # Set MoviePy temp directory | |
| os.environ['FFMPEG_BINARY'] = 'ffmpeg' | |
| video_clip = VideoFileClip(video_path) | |
| new_audio_clip = AudioFileClip(new_audio_path) | |
| video_duration = video_clip.duration | |
| audio_duration = new_audio_clip.duration | |
| if audio_duration < video_duration: | |
| final_video = video_clip.subclip(0, audio_duration) | |
| final_audio = new_audio_clip | |
| elif audio_duration > video_duration: | |
| final_video = video_clip | |
| final_audio = new_audio_clip.subclip(0, video_duration) | |
| else: | |
| final_video = video_clip | |
| final_audio = new_audio_clip | |
| final_clip = final_video.set_audio(final_audio) | |
| # Write with explicit temp audiofile location | |
| final_clip.write_videofile( | |
| output_video_path, | |
| codec='libx264', | |
| audio_codec='aac', | |
| temp_audiofile=f'/tmp/temp-audio-{os.getpid()}.m4a', | |
| remove_temp=True, | |
| logger=None | |
| ) | |
| video_clip.close() | |
| new_audio_clip.close() | |
| final_audio.close() | |
| final_video.close() | |
| final_clip.close() | |
| def format_transcription(transcription, translated_segments): | |
| """Format transcription for display""" | |
| output = "" | |
| for i, seg in enumerate(translated_segments): | |
| output += f"**Segment {i+1}** ({seg['start']:.2f}s - {seg['end']:.2f}s)\n" | |
| output += f"*Original:* {transcription['segments'][i]['text']}\n" | |
| output += f"*Translated:* {seg['translated_text']}\n" | |
| output += "---\n" | |
| return output | |
| # ==================== Main Processing Function ==================== | |
| # Apply ZeroGPU decorator if available (for Hugging Face Spaces) | |
| if SPACES_AVAILABLE: | |
| def process_video(video_file, target_language, progress=gr.Progress()): | |
| """Main processing function for Gradio""" | |
| if video_file is None: | |
| return None, "Please upload a video file.", "" | |
| temp_dir = tempfile.mkdtemp(dir='/tmp') | |
| try: | |
| # Load models | |
| progress(0.05, desc="Loading models...") | |
| whisper_mdl, tts_mdl = load_models() | |
| # Copy uploaded video to temp directory | |
| input_video_path = os.path.join(temp_dir, "input_video.mp4") | |
| shutil.copy(video_file, input_video_path) | |
| # Extract audio | |
| progress(0.1, desc="Extracting audio from video...") | |
| audio_path = audio_extractor(input_video_path) | |
| # Transcribe | |
| progress(0.2, desc="Transcribing audio...") | |
| transcription = transcribe(audio_path, whisper_mdl, progress) | |
| status_msg = f"β Transcribed {len(transcription['segments'])} segments\n" | |
| # Translate | |
| progress(0.4, desc="Translating segments...") | |
| translated_segments = translate_segments(transcription['segments'], target_language) | |
| status_msg += f"β Translated {len(translated_segments)} segments\n" | |
| # Generate TTS | |
| progress(0.5, desc="Generating voice-cloned audio...") | |
| output_audio_path = os.path.join(temp_dir, "translated_audio.wav") | |
| generate_translated_audio( | |
| reference_audio_path=audio_path, | |
| segments=translated_segments, | |
| output_path=output_audio_path, | |
| tts_model=tts_mdl, | |
| progress=progress, | |
| silence_duration=0.5, | |
| target_language=target_language | |
| ) | |
| status_msg += "β TTS audio generated successfully!\n" | |
| # Merge audio with video | |
| progress(0.9, desc="Merging audio with video...") | |
| output_video_path = os.path.join(temp_dir, "translated_video.mp4") | |
| replace_video_audio(input_video_path, output_audio_path, output_video_path) | |
| status_msg += "β Video translation completed successfully!" | |
| # Format transcription | |
| transcription_text = format_transcription(transcription, translated_segments) | |
| progress(1.0, desc="Complete!") | |
| return output_video_path, status_msg, transcription_text | |
| except Exception as e: | |
| logger.exception("Error in translation pipeline") | |
| return None, f"β Error: {str(e)}", "" | |
| finally: | |
| # Clean up audio file if it exists | |
| try: | |
| if 'audio_path' in locals() and os.path.exists(audio_path): | |
| os.remove(audio_path) | |
| except: | |
| pass | |
| else: | |
| def process_video(video_file, target_language, progress=gr.Progress()): | |
| """Main processing function for Gradio""" | |
| if video_file is None: | |
| return None, "Please upload a video file.", "" | |
| temp_dir = tempfile.mkdtemp(dir='/tmp') | |
| try: | |
| # Load models | |
| progress(0.05, desc="Loading models...") | |
| whisper_mdl, tts_mdl = load_models() | |
| # Copy uploaded video to temp directory | |
| input_video_path = os.path.join(temp_dir, "input_video.mp4") | |
| shutil.copy(video_file, input_video_path) | |
| # Extract audio | |
| progress(0.1, desc="Extracting audio from video...") | |
| audio_path = audio_extractor(input_video_path) | |
| # Transcribe | |
| progress(0.2, desc="Transcribing audio...") | |
| transcription = transcribe(audio_path, whisper_mdl, progress) | |
| status_msg = f"β Transcribed {len(transcription['segments'])} segments\n" | |
| # Translate | |
| progress(0.4, desc="Translating segments...") | |
| translated_segments = translate_segments(transcription['segments'], target_language) | |
| status_msg += f"β Translated {len(translated_segments)} segments\n" | |
| # Generate TTS | |
| progress(0.5, desc="Generating voice-cloned audio...") | |
| output_audio_path = os.path.join(temp_dir, "translated_audio.wav") | |
| generate_translated_audio( | |
| reference_audio_path=audio_path, | |
| segments=translated_segments, | |
| output_path=output_audio_path, | |
| tts_model=tts_mdl, | |
| progress=progress, | |
| silence_duration=0.5, | |
| target_language=target_language | |
| ) | |
| status_msg += "β TTS audio generated successfully!\n" | |
| # Merge audio with video | |
| progress(0.9, desc="Merging audio with video...") | |
| output_video_path = os.path.join(temp_dir, "translated_video.mp4") | |
| replace_video_audio(input_video_path, output_audio_path, output_video_path) | |
| status_msg += "β Video translation completed successfully!" | |
| # Format transcription | |
| transcription_text = format_transcription(transcription, translated_segments) | |
| progress(1.0, desc="Complete!") | |
| return output_video_path, status_msg, transcription_text | |
| except Exception as e: | |
| logger.exception("Error in translation pipeline") | |
| return None, f"β Error: {str(e)}", "" | |
| finally: | |
| # Clean up audio file if it exists | |
| try: | |
| if 'audio_path' in locals() and os.path.exists(audio_path): | |
| os.remove(audio_path) | |
| except: | |
| pass | |
| # ==================== Gradio Interface ==================== | |
| def create_interface(): | |
| """Create Gradio interface""" | |
| with gr.Blocks(title="Video Voice Translator", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown( | |
| """ | |
| # π¬ Video Voice Translator | |
| Upload a video, and we'll translate it to your target language while preserving the voice! | |
| """ | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### π€ Upload Video") | |
| video_input = gr.Video(label="Choose a video file", height=550) | |
| target_language = gr.Dropdown( | |
| choices=[(name, code) for code, name in ChatterboxMultilingualTTS.get_supported_languages().items()], | |
| value="en", | |
| label="Target Language", | |
| info="Select the target language for text-to-speech synthesis" | |
| ) | |
| # gr.Markdown("### βοΈ Configuration") | |
| # target_language = gr.Dropdown( | |
| # choices=[ | |
| # ("English", "en"), | |
| # ("Hindi", "hi"), | |
| # ("Spanish", "es"), | |
| # ("French", "fr"), | |
| # ("German", "de"), | |
| # ("Italian", "it"), | |
| # ("Portuguese", "pt"), | |
| # ("Russian", "ru"), | |
| # ("Japanese", "ja"), | |
| # ("Korean", "ko"), | |
| # ("Chinese (Simplified)", "zh-cn"), | |
| # ], | |
| # value="en", | |
| # label="Target Language", | |
| # type="value" | |
| # ) | |
| translate_btn = gr.Button("π Start Translation", variant="primary", size="lg") | |
| gr.Markdown( | |
| """ | |
| ### About | |
| This app uses: | |
| - **faster-whisper** for transcription | |
| - **Google Translate** for translation | |
| - **Chatterbox** for voice cloning TTS | |
| All processing runs locally in this app. | |
| """ | |
| ) | |
| with gr.Column(scale=1): | |
| gr.Markdown("### π₯ Output") | |
| status_output = gr.Textbox(label="Status", lines=5, interactive=False) | |
| video_output = gr.Video(label="Translated Video", height=550) | |
| with gr.Accordion("π View Transcription & Translation", open=False): | |
| transcription_output = gr.Markdown() | |
| # Connect the button to the processing function | |
| translate_btn.click( | |
| fn=process_video, | |
| inputs=[video_input, target_language], | |
| outputs=[video_output, status_output, transcription_output] | |
| ).then( | |
| fn=lambda: gr.Button(interactive=True), | |
| outputs=[translate_btn] | |
| ) | |
| # Disable button when clicked | |
| translate_btn.click( | |
| fn=lambda: gr.Button(interactive=False), | |
| outputs=[translate_btn], | |
| queue=False | |
| ) | |
| gr.Markdown( | |
| """ | |
| --- | |
| **Note:** Processing time depends on video length and number of segments. | |
| Large videos may take several minutes to process. | |
| """ | |
| ) | |
| return demo | |
| # ==================== Main ==================== | |
| if __name__ == "__main__": | |
| # Load models at startup (except in ZeroGPU where GPU isn't available yet) | |
| if not SPACES_AVAILABLE: | |
| logger.info("Initializing models...") | |
| load_models() | |
| logger.info("Models loaded successfully!") | |
| else: | |
| logger.info("Running in ZeroGPU mode - models will be loaded on first request") | |
| # Create and launch interface | |
| # .queue() is essential for long-running tasks like model generation | |
| demo = create_interface() | |
| demo.queue(max_size=20, default_concurrency_limit=2).launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False | |
| ) | |