Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import os | |
| import subprocess | |
| import torch | |
| from TTS.api import TTS | |
| from deep_translator import GoogleTranslator | |
| import pysrt | |
| import whisper | |
| import webvtt | |
| import shutil | |
| import time | |
| from tqdm import tqdm | |
| from typing import Dict, List, Optional | |
| import logging | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| # Configuration | |
| LANGUAGES = { | |
| "English": {"code": "en", "speakers": ["default"], "whisper": "en"}, | |
| "Spanish": {"code": "es", "speakers": ["default"], "whisper": "es"}, | |
| "French": {"code": "fr", "speakers": ["default"], "whisper": "fr"}, | |
| "German": {"code": "de", "speakers": ["thorsten", "eva_k"], "whisper": "de"}, | |
| "Japanese": {"code": "ja", "speakers": ["default"], "whisper": "ja"}, | |
| "Hindi": {"code": "hi", "speakers": ["default"], "whisper": "hi"} | |
| } | |
| SUBTITLE_STYLES = { | |
| "Default": "", | |
| "White Text": "color: white;", | |
| "Yellow Text": "color: yellow;", | |
| "Large Text": "font-size: 24px;", | |
| "Bold Text": "font-weight: bold;", | |
| "Black Background": "background-color: black; padding: 5px;" | |
| } | |
| # Create output directory (relative path for Spaces) | |
| OUTPUT_DIR = "outputs" | |
| os.makedirs(OUTPUT_DIR, exist_ok=True) | |
| # Initialize TTS with error handling | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| tts_models = {} | |
| def load_tts_model(model_name: str, lang_code: str): | |
| try: | |
| tts = TTS(model_name).to(device) | |
| # Try to use gruut phonemizer if espeak fails | |
| if hasattr(tts.synthesizer, 'tts_config'): | |
| tts.synthesizer.tts_config.phonemizer = "gruut" | |
| return tts | |
| except Exception as e: | |
| logger.error(f"Failed to load {model_name}: {str(e)}") | |
| return None | |
| # Initialize models only when needed | |
| def get_tts_model(lang_code: str): | |
| if lang_code not in tts_models: | |
| model_map = { | |
| "en": "tts_models/en/ljspeech/tacotron2-DDC", | |
| "es": "tts_models/es/css10/vits", | |
| "fr": "tts_models/fr/css10/vits", | |
| "de": "tts_models/de/thorsten/vits", # Using VITS instead of tacotron2 | |
| "ja": "tts_models/ja/kokoro/tacotron2-DDC", | |
| "hi": "tts_models/hi/kb/tacotron2-DDC" | |
| } | |
| tts_models[lang_code] = load_tts_model(model_map[lang_code], lang_code) | |
| return tts_models[lang_code] | |
| # Initialize Whisper (load when needed) | |
| whisper_model = None | |
| def get_whisper_model(): | |
| global whisper_model | |
| if whisper_model is None: | |
| whisper_model = whisper.load_model("small") | |
| return whisper_model | |
| def extract_audio(video_path: str) -> str: | |
| """Extract audio using ffmpeg""" | |
| audio_path = os.path.join(OUTPUT_DIR, "audio.wav") | |
| cmd = [ | |
| 'ffmpeg', '-i', video_path, '-vn', | |
| '-acodec', 'pcm_s16le', '-ar', '16000', | |
| '-ac', '1', '-y', audio_path | |
| ] | |
| subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| return audio_path | |
| def transcribe_with_whisper(audio_path: str, language: str = None) -> str: | |
| """Transcribe audio using Whisper""" | |
| model = get_whisper_model() | |
| result = model.transcribe(audio_path, language=language, word_timestamps=True) | |
| return result | |
| def generate_srt_from_whisper(audio_path: str, language: str) -> str: | |
| """Generate SRT subtitles from Whisper output""" | |
| result = transcribe_with_whisper(audio_path, language) | |
| subs = pysrt.SubRipFile() | |
| for i, segment in enumerate(result["segments"]): | |
| subs.append(pysrt.SubRipItem( | |
| index=i+1, | |
| start=pysrt.SubRipTime(seconds=segment["start"]), | |
| end=pysrt.SubRipTime(seconds=segment["end"]), | |
| text=segment["text"] | |
| )) | |
| srt_path = os.path.join(OUTPUT_DIR, "subtitles.srt") | |
| subs.save(srt_path, encoding='utf-8') | |
| return srt_path | |
| def detect_language(audio_path: str) -> str: | |
| """Detect language using Whisper""" | |
| result = transcribe_with_whisper(audio_path) | |
| detected_code = result["language"] | |
| for name, data in LANGUAGES.items(): | |
| if data["whisper"] == detected_code: | |
| return name | |
| return "English" | |
| def translate_subtitles(srt_path: str, target_langs: List[str]) -> Dict[str, str]: | |
| """Translate subtitles to multiple languages""" | |
| subs = pysrt.open(srt_path) | |
| results = {} | |
| for lang_name in target_langs: | |
| lang_code = LANGUAGES[lang_name]["code"] | |
| translated_subs = subs[:] | |
| translator = GoogleTranslator(source='auto', target=lang_code) | |
| for sub in translated_subs: | |
| try: | |
| sub.text = translator.translate(sub.text) | |
| except Exception as e: | |
| logger.warning(f"Translation failed: {str(e)}") | |
| continue | |
| output_path = os.path.join(OUTPUT_DIR, f"subtitles_{lang_code}.srt") | |
| translated_subs.save(output_path, encoding='utf-8') | |
| results[lang_code] = output_path | |
| return results | |
| def generate_webvtt_subtitles(srt_path: str, style: str = "") -> str: | |
| """Convert SRT to WebVTT with optional styling""" | |
| subs = pysrt.open(srt_path) | |
| lang_code = os.path.basename(srt_path).split('_')[-1].replace('.srt', '') | |
| vtt_path = os.path.join(OUTPUT_DIR, f"subtitles_{lang_code}.vtt") | |
| with open(vtt_path, 'w', encoding='utf-8') as f: | |
| f.write("WEBVTT\n\n") | |
| if style: | |
| f.write(f"STYLE\n::cue {{\n{style}\n}}\n\n") | |
| for sub in subs: | |
| start = sub.start.to_time().strftime('%H:%M:%S.%f')[:-3] | |
| end = sub.end.to_time().strftime('%H:%M:%S.%f')[:-3] | |
| f.write(f"{start} --> {end}\n") | |
| f.write(f"{sub.text}\n\n") | |
| return vtt_path | |
| def generate_translated_audio( | |
| srt_path: str, | |
| target_lang: str, | |
| speaker: str = "default" | |
| ) -> str: | |
| """Generate translated audio using TTS""" | |
| subs = pysrt.open(srt_path) | |
| temp_dir = os.path.join(OUTPUT_DIR, f"temp_audio_{target_lang}") | |
| os.makedirs(temp_dir, exist_ok=True) | |
| audio_files = [] | |
| timings = [] | |
| tts = get_tts_model(target_lang) | |
| if tts is None: | |
| raise Exception(f"TTS model for {target_lang} not available") | |
| for i, sub in enumerate(tqdm(subs, desc=f"Generating {target_lang} audio")): | |
| text = sub.text.strip() | |
| if not text: | |
| continue | |
| start_time = sub.start.ordinal / 1000 | |
| audio_file = os.path.join(temp_dir, f"chunk_{i:04d}.wav") | |
| try: | |
| kwargs = {"speaker": speaker} if speaker != "default" and hasattr(tts, 'synthesizer') else {} | |
| tts.tts_to_file(text=text, file_path=audio_file, **kwargs) | |
| audio_files.append(audio_file) | |
| timings.append((start_time, audio_file)) | |
| except Exception as e: | |
| logger.warning(f"TTS failed: {str(e)}") | |
| if not audio_files: | |
| raise Exception("No audio generated") | |
| # Create silent audio | |
| video_duration = get_video_duration(os.path.join(OUTPUT_DIR, "base_video.mp4")) | |
| silence_file = os.path.join(temp_dir, "silence.wav") | |
| subprocess.run([ | |
| 'ffmpeg', '-f', 'lavfi', '-i', 'anullsrc=r=44100:cl=stereo', | |
| '-t', str(video_duration), '-y', silence_file | |
| ], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| # Mix audio | |
| filter_complex = "[0:a]" + "".join( | |
| f"[{i+1}:a]adelay={int(start*1000)}|{int(start*1000)}[a{i}];" + | |
| f"[a{i-1 if i>0 else 'out'}]" + f"[a{i}]amix=inputs=2[aout]" | |
| for i, (start, _) in enumerate(timings) | |
| ) | |
| cmd = ['ffmpeg', '-y', '-i', silence_file] + \ | |
| [f'-i {f}' for f in audio_files] + [ | |
| '-filter_complex', filter_complex, | |
| '-map', '[aout]', | |
| os.path.join(OUTPUT_DIR, f"translated_audio_{target_lang}.wav")] | |
| subprocess.run(' '.join(cmd), shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| shutil.rmtree(temp_dir) | |
| return os.path.join(OUTPUT_DIR, f"translated_audio_{target_lang}.wav") | |
| def get_video_duration(video_path: str) -> float: | |
| """Get video duration in seconds""" | |
| result = subprocess.run([ | |
| 'ffprobe', '-v', 'error', '-show_entries', 'format=duration', | |
| '-of', 'default=noprint_wrappers=1:nokey=1', video_path | |
| ], capture_output=True, text=True) | |
| return float(result.stdout.strip() or 180) | |
| def create_html_player( | |
| video_path: str, | |
| subtitle_paths: Dict[str, str], | |
| style: str = "" | |
| ) -> str: | |
| """Create HTML player with video and subtitles""" | |
| html_path = os.path.join(OUTPUT_DIR, "player.html") | |
| video_name = os.path.basename(video_path) | |
| subtitle_tracks = "\n".join( | |
| f'<track kind="subtitles" src="{os.path.basename(path)}" ' | |
| f'srclang="{lang}" label="{lang.capitalize()}" ' | |
| f'{"default" if lang == "en" else ""}>' | |
| for lang, path in subtitle_paths.items() | |
| ) | |
| style_block = f"video::cue {{ {style} }}" if style else "" | |
| html_content = f"""<!DOCTYPE html> | |
| <html> | |
| <head> | |
| <title>Video Player</title> | |
| <style> | |
| body {{ font-family: Arial, sans-serif; margin: 20px; }} | |
| .container {{ max-width: 800px; margin: 0 auto; }} | |
| video {{ width: 100%; background: #000; }} | |
| .downloads {{ margin-top: 20px; }} | |
| {style_block} | |
| </style> | |
| </head> | |
| <body> | |
| <div class="container"> | |
| <h2>Video Player with Subtitles</h2> | |
| <video controls> | |
| <source src="{video_name}" type="video/mp4"> | |
| {subtitle_tracks} | |
| </video> | |
| <div class="downloads"> | |
| <h3>Download Subtitles:</h3> | |
| {"".join( | |
| f'<a href="{os.path.basename(path)}" download>' | |
| f'{lang.upper()} Subtitles (.vtt)</a><br>' | |
| for lang, path in subtitle_paths.items() | |
| )} | |
| </div> | |
| </div> | |
| </body> | |
| </html>""" | |
| with open(html_path, 'w', encoding='utf-8') as f: | |
| f.write(html_content) | |
| return html_path | |
| def process_video( | |
| video_file: str, | |
| source_lang: str, | |
| target_langs: List[str], | |
| subtitle_style: str, | |
| speaker_settings: Dict[str, str], | |
| progress: gr.Progress = gr.Progress() | |
| ) -> List[str]: | |
| """Complete video processing pipeline""" | |
| try: | |
| progress(0.05, "Initializing...") | |
| # 1. Extract audio | |
| progress(0.1, "Extracting audio...") | |
| audio_path = extract_audio(video_file) | |
| # 2. Detect language if needed | |
| if source_lang == "Auto-detect": | |
| source_lang = detect_language(audio_path) | |
| progress(0.15, f"Detected language: {source_lang}") | |
| # 3. Generate subtitles | |
| progress(0.2, "Generating subtitles...") | |
| srt_path = generate_srt_from_whisper( | |
| audio_path, | |
| LANGUAGES[source_lang]["whisper"] | |
| ) | |
| # 4. Translate subtitles | |
| progress(0.3, "Translating subtitles...") | |
| translated_subs = translate_subtitles(srt_path, target_langs) | |
| # 5. Save original video | |
| base_video = os.path.join(OUTPUT_DIR, "base_video.mp4") | |
| shutil.copy(video_file, base_video) | |
| # 6. Process each target language | |
| translated_vtts = {} | |
| for i, lang_name in enumerate(target_langs, 1): | |
| lang_code = LANGUAGES[lang_name]["code"] | |
| progress(0.4 + (i * 0.5 / len(target_langs)), f"Processing {lang_name}...") | |
| # Generate audio | |
| translated_audio = generate_translated_audio( | |
| translated_subs[lang_code], | |
| lang_code, | |
| speaker_settings.get(lang_code, "default") | |
| ) | |
| # Generate subtitles | |
| vtt_path = generate_webvtt_subtitles( | |
| translated_subs[lang_code], | |
| SUBTITLE_STYLES.get(subtitle_style, "") | |
| ) | |
| translated_vtts[lang_code] = vtt_path | |
| # Create translated video version | |
| output_video = os.path.join(OUTPUT_DIR, f"output_{lang_code}.mp4") | |
| subprocess.run([ | |
| 'ffmpeg', '-i', base_video, '-i', translated_audio, | |
| '-map', '0:v', '-map', '1:a', '-c:v', 'copy', '-c:a', 'aac', | |
| '-y', output_video | |
| ], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| # 7. Create HTML player | |
| progress(0.9, "Creating HTML player...") | |
| html_path = create_html_player( | |
| base_video, | |
| translated_vtts, | |
| SUBTITLE_STYLES.get(subtitle_style, "") | |
| ) | |
| # Prepare all output files | |
| output_files = [html_path, base_video] + \ | |
| list(translated_vtts.values()) + \ | |
| [os.path.join(OUTPUT_DIR, f"output_{LANGUAGES[lang]['code']}.mp4") | |
| for lang in target_langs] | |
| progress(1.0, "Done!") | |
| return output_files, "Processing completed successfully!" | |
| except Exception as e: | |
| logger.error(f"Processing failed: {str(e)}", exc_info=True) | |
| return None, f"Error: {str(e)}" | |
| def get_speaker_settings(*args) -> Dict[str, str]: | |
| """Create speaker settings dictionary from inputs""" | |
| settings = {} | |
| for i, lang in enumerate(LANGUAGES.keys()): | |
| if i < len(args) and args[i]: | |
| settings[LANGUAGES[lang]["code"]] = args[i] | |
| return settings | |
| def create_interface(): | |
| """Create Gradio interface""" | |
| with gr.Blocks(title="Video Translator") as demo: | |
| gr.Markdown("# Free Video Translation System") | |
| gr.Markdown("Translate videos with subtitles and audio dubbing using free/open-source tools") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| video_input = gr.Video(label="Upload Video") | |
| with gr.Accordion("Source Settings", open=True): | |
| source_lang = gr.Dropdown( | |
| label="Source Language", | |
| choices=["Auto-detect"] + list(LANGUAGES.keys()), | |
| value="Auto-detect" | |
| ) | |
| with gr.Accordion("Target Languages", open=True): | |
| target_langs = gr.CheckboxGroup( | |
| label="Select target languages", | |
| choices=list(LANGUAGES.keys()), | |
| value=["English", "Spanish"] | |
| ) | |
| with gr.Accordion("Subtitle Styling", open=False): | |
| subtitle_style = gr.Dropdown( | |
| label="Subtitle Appearance", | |
| choices=list(SUBTITLE_STYLES.keys()), | |
| value="Default" | |
| ) | |
| with gr.Accordion("Voice Settings", open=False): | |
| speaker_inputs = [] | |
| for lang_name in LANGUAGES.keys(): | |
| speakers = LANGUAGES[lang_name]["speakers"] | |
| if len(speakers) > 1: | |
| speaker_inputs.append( | |
| gr.Dropdown( | |
| label=f"{lang_name} Speaker", | |
| choices=speakers, | |
| value=speakers[0], | |
| visible=False | |
| ) | |
| ) | |
| else: | |
| speaker_inputs.append(gr.Textbox(visible=False)) | |
| submit_btn = gr.Button("Translate Video", variant="primary") | |
| with gr.Column(scale=2): | |
| output_files = gr.Files(label="Download Files") | |
| status = gr.Textbox(label="Status") | |
| gr.Markdown(""" | |
| **Instructions:** | |
| 1. Upload a video file | |
| 2. Select source and target languages | |
| 3. Customize subtitles and voices | |
| 4. Click Translate | |
| 5. Download the HTML player and open in browser | |
| """) | |
| def update_speaker_ui(selected_langs): | |
| updates = [] | |
| for i, lang_name in enumerate(LANGUAGES.keys()): | |
| visible = lang_name in selected_langs and len(LANGUAGES[lang_name]["speakers"]) > 1 | |
| updates.append(gr.Dropdown.update(visible=visible)) | |
| return updates | |
| target_langs.change( | |
| update_speaker_ui, | |
| inputs=target_langs, | |
| outputs=speaker_inputs | |
| ) | |
| submit_btn.click( | |
| process_video, | |
| inputs=[ | |
| video_input, | |
| source_lang, | |
| target_langs, | |
| subtitle_style, | |
| gr.State(lambda: get_speaker_settings(*speaker_inputs)) | |
| ], | |
| outputs=[output_files, status] | |
| ) | |
| return demo | |
| if __name__ == "__main__": | |
| # Clear output directory on startup | |
| if os.path.exists(OUTPUT_DIR): | |
| shutil.rmtree(OUTPUT_DIR) | |
| os.makedirs(OUTPUT_DIR, exist_ok=True) | |
| demo = create_interface() | |
| demo.launch(share=True) # Required for Hugging Face Spaces |