""" AutoDub — Video Dubbing & Subtitling (Direct Upload) Gradio app for Hugging Face Spaces Pipeline: 1. Upload video directly 2. Extract audio from video 3. Transcribe with faster-whisper (Whisper large-v3) 4. Translate with Qwen2.5-7B-Instruct (4-bit) 5. Synthesise dubbed audio with XTTS-v2 6. Merge audio/video with FFmpeg 7. Burn subtitles with FFmpeg """ from __future__ import annotations # ── Standard library ───────────────────────────────────────────────────────── import gc import glob import os import re import subprocess import tempfile import traceback from difflib import SequenceMatcher from pathlib import Path import uuid # ── Third-party ─────────────────────────────────────────────────────────────── import gradio as gr import librosa import numpy as np import soundfile as sf import torch from faster_whisper import WhisperModel from transformers import BitsAndBytesConfig, pipeline from TTS.tts.configs.xtts_config import XttsConfig from TTS.tts.models.xtts import Xtts from TTS.utils.manage import ModelManager # ── ZeroGPU (optional) ─────────────────────────────────────────────────────── try: import spaces HAS_ZEROGPU = True except Exception: HAS_ZEROGPU = False class spaces: @staticmethod def GPU(fn): return fn # ── Environment ─────────────────────────────────────────────────────────────── os.environ["COQUI_TOS_AGREED"] = "1" _DATA_ROOT = Path("/data") if os.access("/data", os.W_OK) else Path(tempfile.gettempdir()) WORK_DIR = _DATA_ROOT / "autodub" WORK_DIR.mkdir(parents=True, exist_ok=True) XTTS_CACHE = _DATA_ROOT / "tts_cache" XTTS_CACHE.mkdir(parents=True, exist_ok=True) os.environ["TTS_HOME"] = str(XTTS_CACHE) SUPPORTED_LANGUAGES = [ "Arabic", "French", "Spanish", "German", "Italian", "Portuguese", "Russian", "Chinese", "Japanese", "Korean", "English", "Hindi", "Turkish", "Polish", "Dutch", ] LANG_TO_XTTS = { "arabic": "ar", "french": "fr", "spanish": "es", "german": "de", "italian": "it", "portuguese": "pt", "russian": "ru", "chinese": "zh-cn", "japanese": "ja", "korean": "ko", "english": "en", "hindi": "hi", "turkish": "tr", "polish": "pl", "dutch": "nl", } _UNICODE_FONT_MAP = { "arabic": "Noto Naskh Arabic", "chinese": "Noto Sans CJK SC", "japanese": "Noto Sans CJK JP", "korean": "Noto Sans CJK KR", "hindi": "Noto Sans Devanagari", "russian": "Noto Sans", } _NON_LATIN_LANGS = { "arabic", "chinese", "japanese", "korean", "russian", "hindi", "thai", "greek", "persian", "farsi", "georgian", "armenian", "hebrew", "urdu", "bengali", } _LEAK_PATTERNS = [ r"<\|im_start\|>", r"<\|im_end\|>", r"^(Sure|Of course|Here is|Certainly|I will|Translation:)", ] # ── Global model handles ───────────────────────────────────────────────────── _whisper_model: WhisperModel | None = None _translator = None _xtts_model: Xtts | None = None # ═══════════════════════════════════════════════════════════════════════════════ # MODEL LOADING # ═══════════════════════════════════════════════════════════════════════════════ def _load_whisper(): global _whisper_model if _whisper_model is not None: return device = "cuda" if torch.cuda.is_available() else "cpu" print(f"[Whisper] Loading large-v3 on {device}…") _whisper_model = WhisperModel( "large-v3", device=device, compute_type="float16" if device == "cuda" else "int8", download_root=str(_DATA_ROOT / "whisper_cache"), ) print("[Whisper] Ready.") def _load_translator(): global _translator if _translator is not None: return print("[Translator] Loading Qwen2.5-7B-Instruct (4-bit)…") qconfig = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_compute_dtype=torch.float16, bnb_4bit_use_double_quant=True, bnb_4bit_quant_type="nf4", ) _translator = pipeline( "text-generation", model="Qwen/Qwen2.5-7B-Instruct", model_kwargs={"quantization_config": qconfig}, device_map="auto", max_new_tokens=512, ) print("[Translator] Ready.") def _load_xtts(): global _xtts_model if _xtts_model is not None: return print("[XTTS-v2] Loading model…") manager = ModelManager(output_prefix=str(XTTS_CACHE)) model_name = "tts_models/multilingual/multi-dataset/xtts_v2" manager.download_model(model_name) model_path = str(XTTS_CACHE / model_name.replace("/", "--")) config_path = os.path.join(model_path, "config.json") config = XttsConfig() config.load_json(config_path) _xtts_model = Xtts.init_from_config(config) _xtts_model.load_checkpoint(config, checkpoint_dir=model_path) if torch.cuda.is_available(): try: _xtts_model.cuda() print("[XTTS-v2] Ready on GPU.") except RuntimeError: print("[XTTS-v2] Ready on CPU.") else: print("[XTTS-v2] Ready on CPU.") # ═══════════════════════════════════════════════════════════════════════════════ # SRT HELPERS # ═══════════════════════════════════════════════════════════════════════════════ def _is_near_duplicate(text: str, seen: list, threshold: float = 0.92) -> bool: for prev in seen[-5:]: if SequenceMatcher(None, text.lower(), prev.lower()).ratio() >= threshold: return True return False def _is_word_loop(text: str) -> bool: words = text.split() if len(words) < 4: return False if len(set(w.lower() for w in words)) / len(words) < 0.35: return True return False def _split_into_sentences(text: str, max_chars: int = 150) -> list[str]: sentences = re.split(r"(?<=[.!?])\s+", text.strip()) result, current = [], "" for s in sentences: if len(current) + len(s) <= max_chars: current += (" " + s) if current else s else: if current: result.append(current.strip()) current = s if current: result.append(current.strip()) return result def _format_time(seconds: float) -> str: h = int(seconds // 3600) m = int((seconds % 3600) // 60) s = int(seconds % 60) ms = min(int(round((seconds - int(seconds)) * 1000)), 999) return f"{h:02}:{m:02}:{s:02},{ms:03}" def chunks_to_srt(chunks: list[dict]) -> str: srt_blocks = [] index = 1 seen_exact: set[str] = set() seen_recent: list[str] = [] for chunk in chunks: start = chunk.get("start", 0) end = chunk.get("end", start + 5) if end is None: end = start + 5 text = chunk["text"].strip() if not text or len(text) < 3: continue if text in seen_exact or _is_near_duplicate(text, seen_recent): continue if _is_word_loop(text): continue duration = max(end - start, 0.1) if len(text) / duration > 50: continue seen_exact.add(text) seen_recent.append(text) sentences = _split_into_sentences(text) if len(sentences) <= 1: srt_blocks.append(f"{index}\n{_format_time(start)} --> {_format_time(end)}\n{text}") index += 1 else: word_counts = [len(s.split()) for s in sentences] total_words = max(sum(word_counts), 1) t = start for sentence in sentences: frac = len(sentence.split()) / total_words seg_end = min(round(t + duration * frac, 3), end) seg_end = max(seg_end, t + 0.5) srt_blocks.append(f"{index}\n{_format_time(t)} --> {_format_time(seg_end)}\n{sentence}") index += 1 t = seg_end return "\n\n".join(srt_blocks) def _parse_srt_time(time_str: str) -> float: h, m, s_ms = time_str.split(":") s, ms = s_ms.split(",") return int(h) * 3600 + int(m) * 60 + int(s) + int(ms) / 1000 def parse_srt(srt_content: str) -> list[dict]: parsed = [] for block in srt_content.strip().split("\n\n"): lines = block.strip().split("\n") if len(lines) < 3: continue times = lines[1].split(" --> ") parsed.append({ "start": _parse_srt_time(times[0]), "end": _parse_srt_time(times[1]), "text": "\n".join(lines[2:]), }) return parsed # ═══════════════════════════════════════════════════════════════════════════════ # AUDIO/VIDEO EXTRACTION # ═══════════════════════════════════════════════════════════════════════════════ def extract_audio_from_video(video_path: str, output_path: str, sample_rate: int = 16000) -> str: """Extract mono audio from video at specified sample rate.""" cmd = [ "ffmpeg", "-y", "-i", video_path, "-ac", "1", "-ar", str(sample_rate), "-c:a", "pcm_s16le", output_path ] subprocess.run(cmd, capture_output=True, check=True) return output_path def extract_voice_sample_from_video(video_path: str, output_path: str, duration: float = 12.0) -> str: """Extract a voice sample for XTTS cloning.""" temp_audio = str(WORK_DIR / f"temp_voice_{uuid.uuid4().hex[:6]}.wav") cmd = [ "ffmpeg", "-y", "-i", video_path, "-ac", "1", "-ar", "22050", "-c:a", "pcm_s16le", temp_audio ] subprocess.run(cmd, capture_output=True, check=True) audio, sr = librosa.load(temp_audio, sr=22050, duration=180.0) target_samples = int(duration * sr) best_audio, best_rms = None, -1.0 offset = 3.0 while offset + duration <= len(audio) / sr: start = int(offset * sr) seg = audio[start:start + target_samples] rms = float(np.sqrt(np.mean(seg ** 2))) if rms > best_rms: best_rms, best_audio = rms, seg if rms >= 0.01: break offset += 15.0 if best_audio is None: best_audio = audio[:target_samples] sf.write(output_path, best_audio, sr) os.remove(temp_audio) return output_path # ═══════════════════════════════════════════════════════════════════════════════ # TRANSCRIPTION # ═══════════════════════════════════════════════════════════════════════════════ @spaces.GPU def transcribe_audio(audio_path: str) -> list[dict]: _load_whisper() print("[Whisper] Transcribing…") segments, info = _whisper_model.transcribe( audio_path, beam_size=5, word_timestamps=True, vad_filter=True, vad_parameters=dict(min_silence_duration_ms=500), ) chunks = [ {"start": round(s.start, 3), "end": round(s.end, 3), "text": s.text.strip()} for s in segments ] gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() print(f"[Whisper] {len(chunks)} segments | language: {info.language}") return chunks # ═══════════════════════════════════════════════════════════════════════════════ # TRANSLATION # ═══════════════════════════════════════════════════════════════════════════════ def _clean_translation(raw: str, source_text: str) -> str: lines = [l.strip() for l in raw.strip().splitlines() if l.strip()] if not lines: return source_text result = re.sub(r"<\|[^|]+\|>", "", lines[0]).strip() if result.lower() == source_text.lower(): result = lines[1] if len(lines) > 1 else source_text for pat in _LEAK_PATTERNS: if re.search(pat, result, re.IGNORECASE): result = lines[1] if len(lines) > 1 else source_text break return result.strip() def _strip_latin_tokens(text: str) -> str: cleaned = re.sub(r"\b[a-zA-Z]+\b", "", text) return re.sub(r"[ \t]{2,}", " ", cleaned).strip() def _translate_block(text: str, target_lang: str, duration: float) -> str: _load_translator() messages = [ { "role": "system", "content": ( f"You are an expert dubbing translator. Translate into {target_lang}. " f"TARGET DURATION: {duration:.2f}s. Condense if needed. " "Output ONLY the translated text — no preamble, no quotes." ), }, { "role": "user", "content": f'Original ({duration:.2f}s): "{text}"\nTranslate to {target_lang}:', }, ] tokenizer = _translator.tokenizer prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) result = _translator( prompt, do_sample=False, max_new_tokens=150, repetition_penalty=1.1, return_full_text=False, ) raw = result[0]["generated_text"].strip() cleaned = _clean_translation(raw, text) if target_lang.strip().lower() in _NON_LATIN_LANGS: stripped = _strip_latin_tokens(cleaned) return stripped if len(stripped) > 2 else cleaned return cleaned def translate_srt(srt: str, target_lang: str) -> str: translated_blocks = [] blocks = srt.strip().split("\n\n") for i, block in enumerate(blocks): lines = block.split("\n") if len(lines) < 3: translated_blocks.append(block) continue index_line, timestamp, text = lines[0], lines[1], "\n".join(lines[2:]) try: t0_str, t1_str = timestamp.split(" --> ") def _to_sec(t): ts = t.replace(",", ".") h, m, s = ts.split(":") return int(h) * 3600 + int(m) * 60 + float(s) duration = _to_sec(t1_str) - _to_sec(t0_str) except Exception: duration = 3.0 translated_text = _translate_block(text, target_lang, duration) translated_blocks.append(f"{index_line}\n{timestamp}\n{translated_text}") if (i + 1) % 10 == 0: print(f" Translated {i + 1}/{len(blocks)} blocks…") print(f"[Translator] Done — {len(blocks)} blocks.") return "\n\n".join(translated_blocks) # ═══════════════════════════════════════════════════════════════════════════════ # TTS # ═══════════════════════════════════════════════════════════════════════════════ @spaces.GPU def generate_tts_audio(srt_content: str, target_lang: str, speaker_wav: str, output_path: str) -> str: _load_xtts() lang_code = LANG_TO_XTTS.get(target_lang.lower(), "en") chunks = parse_srt(srt_content) seen_texts: set[str] = set() valid_chunks = [] for chunk in chunks: text = chunk["text"].strip() if text in seen_texts or (chunk["end"] - chunk["start"]) < 0.5 or len(text) < 10: continue seen_texts.add(text) valid_chunks.append(chunk) if not valid_chunks: sf.write(output_path, np.zeros(24000, dtype=np.float32), 24000) return output_path sample_rate = 24000 gpt_cond_latent, speaker_embedding = _xtts_model.get_conditioning_latents(audio_path=[speaker_wav]) total_duration = valid_chunks[-1]["end"] + 5.0 output_buffer = np.zeros(int(total_duration * sample_rate), dtype=np.float32) write_cursor = 0 for i, chunk in enumerate(valid_chunks): print(f"[XTTS] Generating [{i + 1}/{len(valid_chunks)}]: \"{chunk['text'][:50]}…\"") out = _xtts_model.inference( text=chunk["text"], language=lang_code, gpt_cond_latent=gpt_cond_latent, speaker_embedding=speaker_embedding, temperature=0.75, length_penalty=1.0, repetition_penalty=5.0, top_k=50, top_p=0.85, ) wav = np.array(out["wav"], dtype=np.float32) wav, _ = librosa.effects.trim(wav, top_db=20) target_duration = chunk["end"] - chunk["start"] actual_duration = len(wav) / sample_rate if actual_duration > target_duration: speed_factor = min(actual_duration / target_duration, 1.4) if speed_factor > 1.05: wav = librosa.effects.time_stretch(wav, rate=speed_factor) start_pos = max(int(chunk["start"] * sample_rate), write_cursor) end_pos = start_pos + len(wav) if end_pos > len(output_buffer): output_buffer = np.pad(output_buffer, (0, end_pos - len(output_buffer))) output_buffer[start_pos:end_pos] = wav write_cursor = end_pos sf.write(output_path, output_buffer, sample_rate) gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() return output_path # ═══════════════════════════════════════════════════════════════════════════════ # FFMPEG HELPERS # ═══════════════════════════════════════════════════════════════════════════════ def _has_nvenc() -> bool: result = subprocess.run(["ffmpeg", "-hide_banner", "-encoders"], capture_output=True, text=True) return "h264_nvenc" in result.stdout def merge_audio_video(video_path: str, dubbed_audio_path: str, output_path: str) -> str: cmd = [ "ffmpeg", "-y", "-i", video_path, "-i", dubbed_audio_path, "-filter_complex", "[0:a]volume=0.15[orig];[orig][1:a]amix=inputs=2:duration=first[aout]", "-map", "0:v", "-map", "[aout]", "-c:v", "copy", "-c:a", "aac", "-b:a", "192k", "-shortest", output_path, ] subprocess.run(cmd, capture_output=True, check=True) print(f"[FFmpeg] Merge done → {output_path}") return output_path def _color_to_ass(name: str) -> str: return { "white": "&H00FFFFFF", "black": "&H00000000", "red": "&H000000FF", "blue": "&H00FF0000", "yellow": "&H0000FFFF", "green": "&H0000FF00", }.get(name.lower(), "&H00FFFFFF") def burn_subtitles( video_path: str, srt_path: str, output_path: str, target_lang: str | None = None, font_name: str = "Arial", font_size: int = 24, ) -> str: effective_font = font_name if target_lang and font_name == "Arial": effective_font = _UNICODE_FONT_MAP.get(target_lang.lower(), "Noto Sans") print(f"[FFmpeg] Subtitle font → \"{effective_font}\"") fc = _color_to_ass("white") oc = _color_to_ass("black") style = ( f"FontName={effective_font},FontSize={font_size}," f"PrimaryColour={fc},OutlineColour={oc}," f"Outline=2,Alignment=2,Charset=1" ) escaped = srt_path.replace("'", "'\\''").replace(":", "\\:") video_codec = ( ["-c:v", "h264_nvenc", "-preset", "p2", "-cq", "23"] if _has_nvenc() else ["-c:v", "libx264", "-preset", "fast", "-crf", "23"] ) encoder_name = "h264_nvenc" if _has_nvenc() else "libx264" print(f"[FFmpeg] Burning subtitles using {encoder_name}") cmd = [ "ffmpeg", "-y", "-sub_charenc", "UTF-8", "-i", video_path, "-vf", f"subtitles='{escaped}':force_style='{style}'", *video_codec, "-c:a", "copy", output_path, ] subprocess.run(cmd, capture_output=True, check=True) print(f"[FFmpeg] Done → {output_path}") return output_path # ═══════════════════════════════════════════════════════════════════════════════ # CLEANUP # ═══════════════════════════════════════════════════════════════════════════════ def _cleanup(session_id: str, keep_final: bool = False) -> None: """Remove intermediate files, optionally keeping the final output.""" patterns = [f"{session_id}_audio.wav", f"{session_id}_voice.wav", f"{session_id}_dubbed.wav", f"{session_id}_merged.mp4", f"{session_id}_translated.srt"] if not keep_final: patterns.append(f"{session_id}_final.mp4") for pat in patterns: for f in glob.glob(str(WORK_DIR / pat)): try: os.remove(f) except OSError: pass # ═══════════════════════════════════════════════════════════════════════════════ # GRADIO PIPELINE # ═══════════════════════════════════════════════════════════════════════════════ def run_pipeline(video_file, dub_language: str, progress=gr.Progress()): session_id = None try: if video_file is None: return "❌ Please upload a video file.", None # Handle both string path and file object if isinstance(video_file, str): input_video = video_file else: input_video = video_file.name if not os.path.exists(input_video): return f"❌ Video file not found: {input_video}", None session_id = str(uuid.uuid4())[:8] progress(0.05, desc="Extracting audio from video…") audio_path = str(WORK_DIR / f"{session_id}_audio.wav") extract_audio_from_video(input_video, audio_path) progress(0.10, desc="Extracting voice sample…") voice_path = str(WORK_DIR / f"{session_id}_voice.wav") extract_voice_sample_from_video(input_video, voice_path) progress(0.20, desc="Transcribing with Whisper…") chunks = transcribe_audio(audio_path) srt_content = chunks_to_srt(chunks) progress(0.40, desc=f"Translating to {dub_language}…") translated_srt = translate_srt(srt_content, target_lang=dub_language) srt_path = str(WORK_DIR / f"{session_id}_translated.srt") Path(srt_path).write_text(translated_srt, encoding="utf-8") progress(0.60, desc="Synthesising dubbed audio…") dubbed_audio_path = str(WORK_DIR / f"{session_id}_dubbed.wav") generate_tts_audio(translated_srt, dub_language, voice_path, dubbed_audio_path) progress(0.80, desc="Merging audio + video…") merged_path = str(WORK_DIR / f"{session_id}_merged.mp4") merge_audio_video(input_video, dubbed_audio_path, merged_path) progress(0.90, desc="Burning subtitles…") final_path = str(WORK_DIR / f"{session_id}_final.mp4") burn_subtitles(merged_path, srt_path, final_path, target_lang=dub_language) progress(1.0, desc="Done!") # Clean up intermediate files but keep final video _cleanup(session_id, keep_final=True) return f"✅ Done! Video dubbed to **{dub_language}** successfully.", final_path except subprocess.CalledProcessError as e: error_msg = f"FFmpeg error: {e.stderr.decode() if e.stderr else str(e)}" traceback.print_exc() if session_id: _cleanup(session_id) return f"❌ {error_msg}", None except Exception as exc: traceback.print_exc() if session_id: _cleanup(session_id) return f"❌ Error: {exc}", None # ═══════════════════════════════════════════════════════════════════════════════ # GRADIO UI # ═══════════════════════════════════════════════════════════════════════════════ with gr.Blocks(title="AutoDub - Video Dubbing") as demo: gr.Markdown( """ # 🎬 AutoDub — Video Dubbing & Subtitling Upload a video, choose a target language, and get back a fully dubbed & subtitled MP4. **Powered by:** Whisper large-v3 + Qwen2.5-7B + XTTS-v2 """ ) with gr.Row(): with gr.Column(scale=2): video_input = gr.Video( label="Upload Video", sources=["upload"], ) lang_input = gr.Dropdown( choices=SUPPORTED_LANGUAGES, value="French", label="Target Dub Language", ) submit_btn = gr.Button("▶ Start Dubbing", variant="primary", size="lg") with gr.Column(scale=3): status_output = gr.Markdown("_Upload a video and click Start Dubbing…_") video_output = gr.Video(label="Dubbed Video", interactive=False) submit_btn.click( fn=run_pipeline, inputs=[video_input, lang_input], outputs=[status_output, video_output], ) gr.Markdown( "---\n" "**Tips:**\n" "- Processing takes 3–10 minutes depending on video length\n" "- Best results with videos under 5 minutes\n" "- Supported languages: Arabic, Chinese, French, German, Italian, Japanese, Korean, " "Portuguese, Russian, Spanish, Turkish, Hindi, Polish, Dutch, English" ) if __name__ == "__main__": demo.queue(max_size=5) demo.launch( server_name="0.0.0.0", server_port=7860, share=False, show_error=True, )