# app.py — PRODUCTION: Indic Video Translator + Dub (MMS-TTS) + Optional Wav2Lip # ✅ HF Spaces friendly (Docker or Python) with clear dependency checks # ✅ Whisper (faster-whisper preferred, fallback to openai-whisper) # ✅ NLLB translation with safe chunking # ✅ MMS-TTS per-language models for correct pronunciation # ✅ Optional Wav2Lip lip-sync (auto-detect). Falls back to audio-only replace # ✅ Voice sample is OPTIONAL in this build (kept for future voice-cloning) import os import gc import re import shutil import logging import tempfile import subprocess from pathlib import Path from typing import Optional, Tuple, List, Dict import gradio as gr import torch import librosa import numpy as np import scipy.io.wavfile as wavfile from transformers import VitsModel, AutoTokenizer, AutoModelForSeq2SeqLM from pathlib import Path import os import logging WAV2LIP_DIR = Path("Wav2Lip").resolve() # ----------------------------------------------------------------------------- # LOGGING # ----------------------------------------------------------------------------- logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") logger = logging.getLogger("app") ckpt_dir = WAV2LIP_DIR / "checkpoints" logger.info(f"WAV2LIP_DIR: {WAV2LIP_DIR}") logger.info(f"Exists WAV2LIP_DIR? {WAV2LIP_DIR.exists()}") logger.info(f"Checkpoint dir: {ckpt_dir}") logger.info(f"Exists checkpoints? {ckpt_dir.exists()}") if ckpt_dir.exists(): logger.info("checkpoints/ contents:") for p in sorted(ckpt_dir.iterdir()): size = p.stat().st_size if p.exists() else -1 logger.info(f" - {p.name} ({size} bytes)") else: logger.info("No checkpoints/ directory found inside Wav2Lip") expected = ckpt_dir / "wav2lip_gan.pth" logger.info(f"Expected checkpoint path: {expected}") logger.info(f"Exists expected checkpoint? {expected.exists()}") if expected.exists(): logger.info(f"Expected checkpoint size: {expected.stat().st_size} bytes") # If it's an LFS pointer file, it will be tiny and start with "version https://git-lfs..." try: head = expected.open("r", encoding="utf-8", errors="ignore").read(200) logger.info(f"Checkpoint head(200): {head.replace(chr(10),' ')}") except Exception as e: logger.info(f"Could not read checkpoint as text (good if binary). err={e}") # ----------------------------------------------------------------------------- # GLOBALS (lazy-loaded) # ----------------------------------------------------------------------------- whisper_model = None whisper_kind = None # "faster" | "openai" nllb_model = None nllb_tokenizer = None mms_models: Dict[str, tuple] = {} # model_id -> (model, tokenizer) # ----------------------------------------------------------------------------- # LANGUAGE MAP (NLLB + MMS) # ----------------------------------------------------------------------------- LANGUAGE_MAP = { "Hindi": {"nllb": "hin_Deva", "mms": "facebook/mms-tts-hin"}, "Tamil": {"nllb": "tam_Taml", "mms": "facebook/mms-tts-tam"}, "Telugu": {"nllb": "tel_Telu", "mms": "facebook/mms-tts-tel"}, "Malayalam": {"nllb": "mal_Mlym", "mms": "facebook/mms-tts-mal"}, "Kannada": {"nllb": "kan_Knda", "mms": "facebook/mms-tts-kan"}, "Marathi": {"nllb": "mar_Deva", "mms": "facebook/mms-tts-mar"}, "Bengali": {"nllb": "ben_Beng", "mms": "facebook/mms-tts-ben"}, "Gujarati": {"nllb": "guj_Gujr", "mms": "facebook/mms-tts-guj"}, "Punjabi": {"nllb": "pan_Guru", "mms": "facebook/mms-tts-pan"}, } # ----------------------------------------------------------------------------- # CONFIG # ----------------------------------------------------------------------------- WHISPER_SIZE = os.getenv("WHISPER_SIZE", "base") # tiny/base/small/medium/large-v3 (if available) NLLB_MODEL_ID = os.getenv("NLLB_MODEL_ID", "facebook/nllb-200-distilled-600M") MAX_QUEUE = int(os.getenv("GRADIO_MAX_QUEUE", "5")) # Chunking limits (tune for memory) NLLB_MAX_TOKENS = 450 NLLB_MAX_NEW_TOKENS = 512 MMS_TTS_MAX_CHARS = 500 # ----------------------------------------------------------------------------- # UTILITIES # ----------------------------------------------------------------------------- def _which(bin_name: str) -> Optional[str]: return shutil.which(bin_name) def check_system_deps() -> None: """Fail fast with clear message if ffmpeg/ffprobe missing.""" missing = [b for b in ("ffmpeg", "ffprobe") if _which(b) is None] if missing: raise RuntimeError( f"Missing system binaries: {', '.join(missing)}. " f"Install ffmpeg in your Space (recommended: Docker + apt-get install ffmpeg)." ) def run_cmd(cmd: List[str], cwd: Optional[str] = None, err_prefix: str = "Command failed"): r = subprocess.run(cmd, cwd=cwd, capture_output=True, text=True) if r.returncode != 0: msg = (r.stderr or r.stdout or "").strip() raise RuntimeError(f"{err_prefix}: {msg}") return r def safe_unlink(path: Path) -> None: try: if path.exists(): path.unlink() except Exception: pass def is_wav2lip_available() -> Tuple[bool, str]: wav2lip_dir = WAV2LIP_DIR ckpt = wav2lip_dir / "checkpoints" / "wav2lip_gan.pth" if wav2lip_dir.exists() and ckpt.exists(): return True, f"Wav2Lip detected at {wav2lip_dir} (lip-sync enabled)" return False, f"Wav2Lip missing. Checked: {wav2lip_dir} and {ckpt}" # ----------------------------------------------------------------------------- # MODEL LOADERS # ----------------------------------------------------------------------------- def load_whisper(): """Prefer faster-whisper, fallback to openai-whisper.""" global whisper_model, whisper_kind if whisper_model is not None: return whisper_model # Try Faster-Whisper try: from faster_whisper import WhisperModel device = "cuda" if torch.cuda.is_available() else "cpu" compute_type = "float16" if torch.cuda.is_available() else "int8" logger.info(f"Loading Faster-Whisper '{WHISPER_SIZE}' on {device} (compute={compute_type})...") whisper_model = WhisperModel(WHISPER_SIZE, device=device, compute_type=compute_type) whisper_kind = "faster" logger.info("✅ Faster-Whisper loaded") return whisper_model except Exception as e: logger.warning(f"Faster-Whisper unavailable, falling back to openai-whisper. Reason: {e}") # Fallback OpenAI Whisper try: import whisper logger.info(f"Loading OpenAI Whisper '{WHISPER_SIZE}'...") whisper_model = whisper.load_model(WHISPER_SIZE) whisper_kind = "openai" logger.info("✅ OpenAI Whisper loaded") return whisper_model except Exception as e: raise RuntimeError( f"No Whisper backend available. Install faster-whisper or openai-whisper. Error: {e}" ) def load_nllb(): global nllb_model, nllb_tokenizer if nllb_model is not None and nllb_tokenizer is not None: return nllb_model, nllb_tokenizer logger.info(f"Loading NLLB: {NLLB_MODEL_ID} ...") nllb_tokenizer = AutoTokenizer.from_pretrained(NLLB_MODEL_ID) nllb_model = AutoModelForSeq2SeqLM.from_pretrained(NLLB_MODEL_ID) if torch.cuda.is_available(): nllb_model = nllb_model.to("cuda") logger.info("✅ NLLB loaded") return nllb_model, nllb_tokenizer def load_mms_tts(lang_name: str): model_id = LANGUAGE_MAP[lang_name]["mms"] if model_id in mms_models: return mms_models[model_id] logger.info(f"Loading MMS-TTS for {lang_name}: {model_id} ...") model = VitsModel.from_pretrained(model_id) tokenizer = AutoTokenizer.from_pretrained(model_id) if torch.cuda.is_available(): model = model.to("cuda") mms_models[model_id] = (model, tokenizer) logger.info(f"✅ MMS-TTS loaded for {lang_name}") return model, tokenizer # ----------------------------------------------------------------------------- # MEDIA HELPERS # ----------------------------------------------------------------------------- def extract_audio_16k_mono(video_path: str, out_wav: str) -> str: """Extract audio for Whisper.""" cmd = [ "ffmpeg", "-y", "-i", video_path, "-vn", "-ac", "1", "-ar", "16000", out_wav, "-loglevel", "error" ] run_cmd(cmd, err_prefix="FFmpeg audio extraction failed") return out_wav def get_video_duration(video_path: str) -> float: cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", video_path ] r = run_cmd(cmd, err_prefix="FFprobe duration failed") dur = float(r.stdout.strip()) if dur <= 0.05: raise RuntimeError("Video duration too short/invalid") return dur def align_audio_duration(audio_path: str, target_duration: float, out_wav: str) -> str: """Time-stretch + final trim/pad to match target_duration.""" cur = librosa.get_duration(filename=audio_path) if cur <= 0.05: raise RuntimeError("Generated audio duration too short (TTS likely failed).") ratio = target_duration / cur logger.info(f"Audio align: {cur:.2f}s -> {target_duration:.2f}s (ratio={ratio:.3f})") # Build atempo chain (each atempo must be 0.5..2.0) filters = [] r = ratio while r > 2.0: filters.append("atempo=2.0") r /= 2.0 while r < 0.5: filters.append("atempo=0.5") r /= 0.5 filters.append(f"atempo={r}") cmd = [ "ffmpeg", "-y", "-i", audio_path, "-filter:a", ",".join(filters), out_wav, "-loglevel", "error" ] run_cmd(cmd, err_prefix="Audio time-stretch failed") # Final trim/pad for safety new_dur = librosa.get_duration(filename=out_wav) if abs(new_dur - target_duration) > 0.2: tmp = out_wav + ".tmp.wav" shutil.move(out_wav, tmp) if new_dur < target_duration: pad_dur = target_duration - new_dur cmd2 = ["ffmpeg", "-y", "-i", tmp, "-af", f"apad=pad_dur={pad_dur}", out_wav, "-loglevel", "error"] else: cmd2 = ["ffmpeg", "-y", "-i", tmp, "-t", str(target_duration), out_wav, "-loglevel", "error"] run_cmd(cmd2, err_prefix="Audio final trim/pad failed") safe_unlink(Path(tmp)) return out_wav def replace_audio_only(video_path: str, audio_path: str, out_mp4: str) -> str: """Mux new audio into video (no lip-sync).""" cmd = [ "ffmpeg", "-y", "-i", video_path, "-i", audio_path, "-c:v", "copy", "-c:a", "aac", "-b:a", "192k", "-map", "0:v:0", "-map", "1:a:0", "-shortest", out_mp4, "-loglevel", "error" ] run_cmd(cmd, err_prefix="Audio replacement failed") return out_mp4 def run_wav2lip(video_path: str, audio_path: str, out_mp4: str) -> str: wav2lip_dir = WAV2LIP_DIR ckpt = wav2lip_dir / "checkpoints" / "wav2lip_gan.pth" if not wav2lip_dir.exists(): raise RuntimeError("Wav2Lip directory not found") if not ckpt.exists(): raise RuntimeError("Wav2Lip checkpoint not found") cmd = [ "python", "inference.py", "--checkpoint_path", str(ckpt), "--face", video_path, "--audio", audio_path, "--outfile", out_mp4, "--nosmooth" ] r = subprocess.run(cmd, cwd=str(wav2lip_dir), capture_output=True, text=True) if r.returncode != 0: msg = (r.stderr or r.stdout or "").strip() raise RuntimeError(f"Wav2Lip failed: {msg}") if not os.path.exists(out_mp4): raise RuntimeError("Wav2Lip did not produce output") return out_mp4 # ----------------------------------------------------------------------------- # WHISPER TRANSCRIPTION # ----------------------------------------------------------------------------- def transcribe_audio_en(audio_path: str) -> str: model = load_whisper() if whisper_kind == "faster": segments, _info = model.transcribe(audio_path, language="en", beam_size=5) parts = [] for seg in segments: t = getattr(seg, "text", "").strip() if t: parts.append(t) text = " ".join(parts).strip() logger.info("✅ Transcribed with Faster-Whisper") else: result = model.transcribe(audio_path, language="en") text = (result.get("text") or "").strip() logger.info("✅ Transcribed with OpenAI Whisper") if not text: raise RuntimeError("Transcription returned empty text") logger.info(f"Transcription preview: {text[:120]}...") return text # ----------------------------------------------------------------------------- # NLLB TRANSLATION # ----------------------------------------------------------------------------- def chunk_for_nllb(text: str, tokenizer, max_tokens: int = NLLB_MAX_TOKENS) -> List[str]: """Token-aware chunking: avoids truncation.""" text = " ".join(text.split()).strip() if not text: return [] parts = re.split(r"(?<=[.!?])\s+", text) chunks, cur, cur_tokens = [], [], 0 for p in parts: p = p.strip() if not p: continue tok_len = len(tokenizer.encode(p, add_special_tokens=False)) if cur and (cur_tokens + tok_len) > max_tokens: chunks.append(" ".join(cur)) cur, cur_tokens = [p], tok_len else: cur.append(p) cur_tokens += tok_len if cur: chunks.append(" ".join(cur)) return chunks def translate_text_nllb(text: str, target_lang: str) -> str: model, tokenizer = load_nllb() lang_code = LANGUAGE_MAP[target_lang]["nllb"] chunks = chunk_for_nllb(text, tokenizer, max_tokens=NLLB_MAX_TOKENS) if not chunks: raise RuntimeError("Nothing to translate after chunking") translated_chunks = [] for i, chunk in enumerate(chunks, 1): inputs = tokenizer(chunk, return_tensors="pt", padding=True) if torch.cuda.is_available(): inputs = {k: v.to("cuda") for k, v in inputs.items()} out = model.generate( **inputs, forced_bos_token_id=tokenizer.lang_code_to_id[lang_code], max_length=NLLB_MAX_NEW_TOKENS ) translated = tokenizer.batch_decode(out, skip_special_tokens=True)[0].strip() translated_chunks.append(translated) if i % 5 == 0: logger.info(f"Translated {i}/{len(chunks)} chunks...") result = " ".join(translated_chunks).strip() if not result: raise RuntimeError("Translation returned empty") logger.info(f"Translation preview: {result[:120]}...") return result # ----------------------------------------------------------------------------- # MMS-TTS # ----------------------------------------------------------------------------- def chunk_text_for_tts(text: str, max_chars: int = MMS_TTS_MAX_CHARS) -> List[str]: text = " ".join(text.split()).strip() if not text: return [] if len(text) <= max_chars: return [text] parts = re.split(r"(?<=[.!?])\s+", text) chunks, buf = [], "" for p in parts: p = p.strip() if not p: continue if len(buf) + len(p) + 1 <= max_chars: buf = (buf + " " + p).strip() else: if buf: chunks.append(buf) buf = p if buf: chunks.append(buf) return chunks def generate_mms_speech(text: str, lang_name: str, out_wav: str) -> str: model, tokenizer = load_mms_tts(lang_name) chunks = chunk_text_for_tts(text, max_chars=MMS_TTS_MAX_CHARS) if not chunks: raise RuntimeError("Nothing to synthesize (empty translated text?)") tmp_dir = Path(tempfile.mkdtemp(prefix="mms_")) chunk_files: List[Path] = [] logger.info(f"🎤 MMS-TTS: generating {lang_name} audio in {len(chunks)} chunks...") for i, chunk in enumerate(chunks): inputs = tokenizer(chunk, return_tensors="pt") if torch.cuda.is_available(): inputs = {k: v.to("cuda") for k, v in inputs.items()} with torch.no_grad(): out = model(**inputs) waveform = out.waveform.squeeze(0).float().cpu().numpy() waveform = np.nan_to_num(waveform, nan=0.0, posinf=0.0, neginf=0.0) # Convert to int16 PCM audio_i16 = np.int16(np.clip(waveform, -1.0, 1.0) * 32767) cf = tmp_dir / f"chunk_{i:03d}.wav" wavfile.write(str(cf), rate=int(model.config.sampling_rate), data=audio_i16) if not cf.exists() or cf.stat().st_size < 100: raise RuntimeError(f"MMS-TTS produced invalid chunk {i}") chunk_files.append(cf) if (i + 1) % 3 == 0: logger.info(f" generated {i+1}/{len(chunks)} chunks") if len(chunk_files) == 1: shutil.copy(str(chunk_files[0]), out_wav) else: list_file = tmp_dir / "concat.txt" with open(list_file, "w", encoding="utf-8") as f: for cf in chunk_files: f.write(f"file '{cf.as_posix()}'\n") cmd = [ "ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", str(list_file), "-c", "copy", out_wav, "-loglevel", "error" ] run_cmd(cmd, err_prefix="MMS-TTS audio concatenation failed") if not os.path.exists(out_wav) or os.path.getsize(out_wav) < 1000: raise RuntimeError("Final MMS-TTS audio missing/too small") logger.info(f"✅ MMS-TTS complete: {out_wav}") return out_wav # ----------------------------------------------------------------------------- # GRADIO INPUT PATH UTIL # ----------------------------------------------------------------------------- def _path_from_gradio(obj) -> str: """Gradio may pass a path (str) or a file object with .name.""" if obj is None: return "" if isinstance(obj, str): return obj return getattr(obj, "name", "") or "" # ----------------------------------------------------------------------------- # MAIN PIPELINE # ----------------------------------------------------------------------------- def process_video( video_input, voice_sample, # optional in this build target_language: str, use_lipsync: bool, progress=gr.Progress() ) -> Tuple[Optional[str], str, str, str]: """ Pipeline: 1) Extract + transcribe English audio 2) Translate to target language (NLLB) 3) Generate target-language speech (MMS-TTS) 4) Align duration to video 5) Lip-sync with Wav2Lip (if available & enabled), else audio-only replace """ check_system_deps() temp_dir = Path(tempfile.mkdtemp(prefix="indic_app_")) try: progress(0.02, desc="📥 Validating inputs...") video_path = _path_from_gradio(video_input) _voice_path = _path_from_gradio(voice_sample) # kept for future voice cloning; unused now if not video_path or not os.path.exists(video_path): return None, "", "", "❌ Error: Video file not found" if target_language not in LANGUAGE_MAP: return None, "", "", "❌ Error: Unsupported language selection" wav2lip_ok, wav2lip_msg = is_wav2lip_available() logger.info(wav2lip_msg) progress(0.08, desc="🎵 Extracting video audio...") extracted = temp_dir / "extracted.wav" extract_audio_16k_mono(video_path, str(extracted)) progress(0.12, desc="⏱️ Reading video duration...") vid_dur = get_video_duration(video_path) logger.info(f"Video duration: {vid_dur:.2f}s") progress(0.22, desc="🎤 Transcribing (Whisper)...") original_text = transcribe_audio_en(str(extracted)) progress(0.40, desc=f"🌍 Translating to {target_language} (NLLB)...") translated_text = translate_text_nllb(original_text, target_language) progress(0.60, desc=f"🎙️ Generating speech ({target_language}, MMS-TTS)...") tts_out = temp_dir / "tts.wav" generate_mms_speech(translated_text, target_language, str(tts_out)) progress(0.75, desc="🧭 Aligning audio to video duration...") aligned = temp_dir / "aligned.wav" align_audio_duration(str(tts_out), vid_dur, str(aligned)) progress(0.88, desc="🎬 Creating output video...") out_video = temp_dir / "output.mp4" method = "Audio-only" if use_lipsync and wav2lip_ok: try: run_wav2lip(video_path, str(aligned), str(out_video)) method = "Wav2Lip (lip-synced)" except Exception as e: logger.warning(f"Wav2Lip failed, falling back to audio-only. Reason: {e}") replace_audio_only(video_path, str(aligned), str(out_video)) method = "Audio-only (Wav2Lip failed)" else: replace_audio_only(video_path, str(aligned), str(out_video)) if use_lipsync and not wav2lip_ok: method = "Audio-only (Wav2Lip missing)" if not out_video.exists() or out_video.stat().st_size < 2000: return None, "", "", "❌ Error: Output video not created" progress(1.0, desc="✅ Done!") # Cleanup GPU memory if torch.cuda.is_available(): torch.cuda.empty_cache() gc.collect() status = f"✅ Success — {target_language} dub created. Method: **{method}**" if not wav2lip_ok and use_lipsync: status += "\n\n⚠️ Wav2Lip not found in this Space, so lip-sync was skipped." return ( str(out_video), f"**Original (English):**\n{original_text}", f"**Translated ({target_language}):**\n{translated_text}", status, ) except Exception as e: logger.error(f"Pipeline error: {e}", exc_info=True) return None, "", "", f"❌ Error: {str(e)}" finally: # Keep temp_dir so Gradio can serve the output path. pass # ----------------------------------------------------------------------------- # GRADIO UI # ----------------------------------------------------------------------------- def create_interface(): wav2lip_ok, wav2lip_msg = is_wav2lip_available() with gr.Blocks(title="Indic Video Translator + Dub", theme=gr.themes.Soft()) as demo: gr.Markdown( f""" # 🎬 Indic Video Translator + Dub (MMS-TTS) **English video → Transcribe → Translate → Dub in target Indian language** ### Supported languages {", ".join(LANGUAGE_MAP.keys())} ### Lip-sync status - {wav2lip_msg} > **Note:** This build focuses on **correct pronunciation** (MMS-TTS). > True “same-voice cloning” needs an extra **voice conversion** stage (OpenVoice/RVC). """ ) with gr.Row(): with gr.Column(scale=1): gr.Markdown("### 📹 Step 1: Video Input (English)") with gr.Tabs(): with gr.Tab("Upload Video"): video_upload = gr.Video(label="Upload Video", sources=["upload"]) with gr.Tab("Record Video"): video_record = gr.Video(label="Record Video", sources=["webcam"]) gr.Markdown("### 🎤 Optional: Voice Sample (not used in this build)") gr.Markdown("*Kept for future voice cloning. You can ignore this for now.*") with gr.Tabs(): with gr.Tab("Upload Audio"): audio_upload = gr.Audio(label="Upload Voice Sample", type="filepath", sources=["upload"]) with gr.Tab("Record Audio"): audio_record = gr.Audio(label="Record Voice Sample", type="filepath", sources=["microphone"]) gr.Markdown("### 🌍 Step 2: Target Language") language_dropdown = gr.Dropdown( choices=list(LANGUAGE_MAP.keys()), value="Tamil", label="Select Target Language", ) use_lipsync = gr.Checkbox( value=True, label="Try lip-sync (Wav2Lip if available)", ) process_btn = gr.Button("🚀 Start", variant="primary", size="lg") with gr.Column(scale=1): gr.Markdown("### 📊 Output") status_box = gr.Markdown("ℹ️ Ready.") output_video = gr.Video(label="Processed Video") with gr.Accordion("📝 Transcript + Translation", open=False): transcription_box = gr.Markdown() translation_box = gr.Markdown() def pick_video(vu, vr): return vu if vu is not None else vr def pick_audio(au, ar): return au if au is not None else ar process_btn.click( fn=lambda vu, vr, au, ar, lang, lipsync: process_video( pick_video(vu, vr), pick_audio(au, ar), lang, lipsync, ), inputs=[video_upload, video_record, audio_upload, audio_record, language_dropdown, use_lipsync], outputs=[output_video, transcription_box, translation_box, status_box], ) gr.Markdown( """ --- ### Troubleshooting (HF Spaces) - If you see **ffmpeg not found**, use a **Docker Space** and install ffmpeg. - If you want **lip-sync**, include `Wav2Lip/` + `checkpoints/wav2lip_gan.pth` in the repo. - For real **voice cloning** (same voice as your sample), add a voice-conversion stage (OpenVoice/RVC). """ ) return demo if __name__ == "__main__": demo = create_interface() demo.queue(max_size=MAX_QUEUE) demo.launch(server_name="0.0.0.0", server_port=7860, share=False)