| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | import os |
| | import gc |
| | import re |
| | import shutil |
| | import logging |
| | import tempfile |
| | import subprocess |
| | from pathlib import Path |
| | from typing import Optional, Tuple, List, Dict |
| |
|
| | import gradio as gr |
| | import torch |
| | import librosa |
| | import numpy as np |
| | import scipy.io.wavfile as wavfile |
| | from transformers import VitsModel, AutoTokenizer, AutoModelForSeq2SeqLM |
| | from pathlib import Path |
| | import os |
| | import logging |
| |
|
| | WAV2LIP_DIR = Path("Wav2Lip").resolve() |
| |
|
| | |
| | |
| | |
| |
|
| | logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") |
| | logger = logging.getLogger("app") |
| |
|
| | ckpt_dir = WAV2LIP_DIR / "checkpoints" |
| | logger.info(f"WAV2LIP_DIR: {WAV2LIP_DIR}") |
| | logger.info(f"Exists WAV2LIP_DIR? {WAV2LIP_DIR.exists()}") |
| |
|
| | logger.info(f"Checkpoint dir: {ckpt_dir}") |
| | logger.info(f"Exists checkpoints? {ckpt_dir.exists()}") |
| |
|
| | if ckpt_dir.exists(): |
| | logger.info("checkpoints/ contents:") |
| | for p in sorted(ckpt_dir.iterdir()): |
| | size = p.stat().st_size if p.exists() else -1 |
| | logger.info(f" - {p.name} ({size} bytes)") |
| | else: |
| | logger.info("No checkpoints/ directory found inside Wav2Lip") |
| |
|
| | expected = ckpt_dir / "wav2lip_gan.pth" |
| | logger.info(f"Expected checkpoint path: {expected}") |
| | logger.info(f"Exists expected checkpoint? {expected.exists()}") |
| | if expected.exists(): |
| | logger.info(f"Expected checkpoint size: {expected.stat().st_size} bytes") |
| | |
| | try: |
| | head = expected.open("r", encoding="utf-8", errors="ignore").read(200) |
| | logger.info(f"Checkpoint head(200): {head.replace(chr(10),' ')}") |
| | except Exception as e: |
| | logger.info(f"Could not read checkpoint as text (good if binary). err={e}") |
| |
|
| | |
| | |
| | |
| | whisper_model = None |
| | whisper_kind = None |
| |
|
| | nllb_model = None |
| | nllb_tokenizer = None |
| |
|
| | mms_models: Dict[str, tuple] = {} |
| |
|
| | |
| | |
| | |
| | LANGUAGE_MAP = { |
| | "Hindi": {"nllb": "hin_Deva", "mms": "facebook/mms-tts-hin"}, |
| | "Tamil": {"nllb": "tam_Taml", "mms": "facebook/mms-tts-tam"}, |
| | "Telugu": {"nllb": "tel_Telu", "mms": "facebook/mms-tts-tel"}, |
| | "Malayalam": {"nllb": "mal_Mlym", "mms": "facebook/mms-tts-mal"}, |
| | "Kannada": {"nllb": "kan_Knda", "mms": "facebook/mms-tts-kan"}, |
| | "Marathi": {"nllb": "mar_Deva", "mms": "facebook/mms-tts-mar"}, |
| | "Bengali": {"nllb": "ben_Beng", "mms": "facebook/mms-tts-ben"}, |
| | "Gujarati": {"nllb": "guj_Gujr", "mms": "facebook/mms-tts-guj"}, |
| | "Punjabi": {"nllb": "pan_Guru", "mms": "facebook/mms-tts-pan"}, |
| | } |
| |
|
| | |
| | |
| | |
| | WHISPER_SIZE = os.getenv("WHISPER_SIZE", "base") |
| | NLLB_MODEL_ID = os.getenv("NLLB_MODEL_ID", "facebook/nllb-200-distilled-600M") |
| | MAX_QUEUE = int(os.getenv("GRADIO_MAX_QUEUE", "5")) |
| |
|
| | |
| | NLLB_MAX_TOKENS = 450 |
| | NLLB_MAX_NEW_TOKENS = 512 |
| | MMS_TTS_MAX_CHARS = 500 |
| |
|
| | |
| | |
| | |
| | def _which(bin_name: str) -> Optional[str]: |
| | return shutil.which(bin_name) |
| |
|
| |
|
| | def check_system_deps() -> None: |
| | """Fail fast with clear message if ffmpeg/ffprobe missing.""" |
| | missing = [b for b in ("ffmpeg", "ffprobe") if _which(b) is None] |
| | if missing: |
| | raise RuntimeError( |
| | f"Missing system binaries: {', '.join(missing)}. " |
| | f"Install ffmpeg in your Space (recommended: Docker + apt-get install ffmpeg)." |
| | ) |
| |
|
| |
|
| | def run_cmd(cmd: List[str], cwd: Optional[str] = None, err_prefix: str = "Command failed"): |
| | r = subprocess.run(cmd, cwd=cwd, capture_output=True, text=True) |
| | if r.returncode != 0: |
| | msg = (r.stderr or r.stdout or "").strip() |
| | raise RuntimeError(f"{err_prefix}: {msg}") |
| | return r |
| |
|
| |
|
| | def safe_unlink(path: Path) -> None: |
| | try: |
| | if path.exists(): |
| | path.unlink() |
| | except Exception: |
| | pass |
| |
|
| |
|
| | def is_wav2lip_available() -> Tuple[bool, str]: |
| | wav2lip_dir = WAV2LIP_DIR |
| | ckpt = wav2lip_dir / "checkpoints" / "wav2lip_gan.pth" |
| | if wav2lip_dir.exists() and ckpt.exists(): |
| | return True, f"Wav2Lip detected at {wav2lip_dir} (lip-sync enabled)" |
| | return False, f"Wav2Lip missing. Checked: {wav2lip_dir} and {ckpt}" |
| |
|
| |
|
| | |
| | |
| | |
| | def load_whisper(): |
| | """Prefer faster-whisper, fallback to openai-whisper.""" |
| | global whisper_model, whisper_kind |
| |
|
| | if whisper_model is not None: |
| | return whisper_model |
| |
|
| | |
| | try: |
| | from faster_whisper import WhisperModel |
| | device = "cuda" if torch.cuda.is_available() else "cpu" |
| | compute_type = "float16" if torch.cuda.is_available() else "int8" |
| | logger.info(f"Loading Faster-Whisper '{WHISPER_SIZE}' on {device} (compute={compute_type})...") |
| | whisper_model = WhisperModel(WHISPER_SIZE, device=device, compute_type=compute_type) |
| | whisper_kind = "faster" |
| | logger.info("β
Faster-Whisper loaded") |
| | return whisper_model |
| | except Exception as e: |
| | logger.warning(f"Faster-Whisper unavailable, falling back to openai-whisper. Reason: {e}") |
| |
|
| | |
| | try: |
| | import whisper |
| | logger.info(f"Loading OpenAI Whisper '{WHISPER_SIZE}'...") |
| | whisper_model = whisper.load_model(WHISPER_SIZE) |
| | whisper_kind = "openai" |
| | logger.info("β
OpenAI Whisper loaded") |
| | return whisper_model |
| | except Exception as e: |
| | raise RuntimeError( |
| | f"No Whisper backend available. Install faster-whisper or openai-whisper. Error: {e}" |
| | ) |
| |
|
| |
|
| | def load_nllb(): |
| | global nllb_model, nllb_tokenizer |
| | if nllb_model is not None and nllb_tokenizer is not None: |
| | return nllb_model, nllb_tokenizer |
| |
|
| | logger.info(f"Loading NLLB: {NLLB_MODEL_ID} ...") |
| | nllb_tokenizer = AutoTokenizer.from_pretrained(NLLB_MODEL_ID) |
| | nllb_model = AutoModelForSeq2SeqLM.from_pretrained(NLLB_MODEL_ID) |
| | if torch.cuda.is_available(): |
| | nllb_model = nllb_model.to("cuda") |
| | logger.info("β
NLLB loaded") |
| | return nllb_model, nllb_tokenizer |
| |
|
| |
|
| | def load_mms_tts(lang_name: str): |
| | model_id = LANGUAGE_MAP[lang_name]["mms"] |
| | if model_id in mms_models: |
| | return mms_models[model_id] |
| |
|
| | logger.info(f"Loading MMS-TTS for {lang_name}: {model_id} ...") |
| | model = VitsModel.from_pretrained(model_id) |
| | tokenizer = AutoTokenizer.from_pretrained(model_id) |
| | if torch.cuda.is_available(): |
| | model = model.to("cuda") |
| | mms_models[model_id] = (model, tokenizer) |
| | logger.info(f"β
MMS-TTS loaded for {lang_name}") |
| | return model, tokenizer |
| |
|
| | |
| | |
| | |
| | def extract_audio_16k_mono(video_path: str, out_wav: str) -> str: |
| | """Extract audio for Whisper.""" |
| | cmd = [ |
| | "ffmpeg", "-y", "-i", video_path, |
| | "-vn", "-ac", "1", "-ar", "16000", |
| | out_wav, "-loglevel", "error" |
| | ] |
| | run_cmd(cmd, err_prefix="FFmpeg audio extraction failed") |
| | return out_wav |
| |
|
| |
|
| | def get_video_duration(video_path: str) -> float: |
| | cmd = [ |
| | "ffprobe", "-v", "error", |
| | "-show_entries", "format=duration", |
| | "-of", "default=noprint_wrappers=1:nokey=1", |
| | video_path |
| | ] |
| | r = run_cmd(cmd, err_prefix="FFprobe duration failed") |
| | dur = float(r.stdout.strip()) |
| | if dur <= 0.05: |
| | raise RuntimeError("Video duration too short/invalid") |
| | return dur |
| |
|
| |
|
| | def align_audio_duration(audio_path: str, target_duration: float, out_wav: str) -> str: |
| | """Time-stretch + final trim/pad to match target_duration.""" |
| | cur = librosa.get_duration(filename=audio_path) |
| | if cur <= 0.05: |
| | raise RuntimeError("Generated audio duration too short (TTS likely failed).") |
| | |
| | ratio = target_duration / cur |
| | logger.info(f"Audio align: {cur:.2f}s -> {target_duration:.2f}s (ratio={ratio:.3f})") |
| |
|
| | |
| | filters = [] |
| | r = ratio |
| | while r > 2.0: |
| | filters.append("atempo=2.0") |
| | r /= 2.0 |
| | while r < 0.5: |
| | filters.append("atempo=0.5") |
| | r /= 0.5 |
| | filters.append(f"atempo={r}") |
| |
|
| | cmd = [ |
| | "ffmpeg", "-y", |
| | "-i", audio_path, |
| | "-filter:a", ",".join(filters), |
| | out_wav, |
| | "-loglevel", "error" |
| | ] |
| | run_cmd(cmd, err_prefix="Audio time-stretch failed") |
| |
|
| | |
| | new_dur = librosa.get_duration(filename=out_wav) |
| | if abs(new_dur - target_duration) > 0.2: |
| | tmp = out_wav + ".tmp.wav" |
| | shutil.move(out_wav, tmp) |
| |
|
| | if new_dur < target_duration: |
| | pad_dur = target_duration - new_dur |
| | cmd2 = ["ffmpeg", "-y", "-i", tmp, "-af", f"apad=pad_dur={pad_dur}", out_wav, "-loglevel", "error"] |
| | else: |
| | cmd2 = ["ffmpeg", "-y", "-i", tmp, "-t", str(target_duration), out_wav, "-loglevel", "error"] |
| |
|
| | run_cmd(cmd2, err_prefix="Audio final trim/pad failed") |
| | safe_unlink(Path(tmp)) |
| |
|
| | return out_wav |
| |
|
| |
|
| | def replace_audio_only(video_path: str, audio_path: str, out_mp4: str) -> str: |
| | """Mux new audio into video (no lip-sync).""" |
| | cmd = [ |
| | "ffmpeg", "-y", |
| | "-i", video_path, |
| | "-i", audio_path, |
| | "-c:v", "copy", |
| | "-c:a", "aac", "-b:a", "192k", |
| | "-map", "0:v:0", |
| | "-map", "1:a:0", |
| | "-shortest", |
| | out_mp4, |
| | "-loglevel", "error" |
| | ] |
| | run_cmd(cmd, err_prefix="Audio replacement failed") |
| | return out_mp4 |
| |
|
| |
|
| | def run_wav2lip(video_path: str, audio_path: str, out_mp4: str) -> str: |
| | wav2lip_dir = WAV2LIP_DIR |
| | ckpt = wav2lip_dir / "checkpoints" / "wav2lip_gan.pth" |
| | if not wav2lip_dir.exists(): |
| | raise RuntimeError("Wav2Lip directory not found") |
| | if not ckpt.exists(): |
| | raise RuntimeError("Wav2Lip checkpoint not found") |
| |
|
| | cmd = [ |
| | "python", "inference.py", |
| | "--checkpoint_path", str(ckpt), |
| | "--face", video_path, |
| | "--audio", audio_path, |
| | "--outfile", out_mp4, |
| | "--nosmooth" |
| | ] |
| | r = subprocess.run(cmd, cwd=str(wav2lip_dir), capture_output=True, text=True) |
| | if r.returncode != 0: |
| | msg = (r.stderr or r.stdout or "").strip() |
| | raise RuntimeError(f"Wav2Lip failed: {msg}") |
| | if not os.path.exists(out_mp4): |
| | raise RuntimeError("Wav2Lip did not produce output") |
| | return out_mp4 |
| |
|
| | |
| | |
| | |
| | def transcribe_audio_en(audio_path: str) -> str: |
| | model = load_whisper() |
| | if whisper_kind == "faster": |
| | segments, _info = model.transcribe(audio_path, language="en", beam_size=5) |
| | parts = [] |
| | for seg in segments: |
| | t = getattr(seg, "text", "").strip() |
| | if t: |
| | parts.append(t) |
| | text = " ".join(parts).strip() |
| | logger.info("β
Transcribed with Faster-Whisper") |
| | else: |
| | result = model.transcribe(audio_path, language="en") |
| | text = (result.get("text") or "").strip() |
| | logger.info("β
Transcribed with OpenAI Whisper") |
| |
|
| | if not text: |
| | raise RuntimeError("Transcription returned empty text") |
| | logger.info(f"Transcription preview: {text[:120]}...") |
| | return text |
| |
|
| | |
| | |
| | |
| | def chunk_for_nllb(text: str, tokenizer, max_tokens: int = NLLB_MAX_TOKENS) -> List[str]: |
| | """Token-aware chunking: avoids truncation.""" |
| | text = " ".join(text.split()).strip() |
| | if not text: |
| | return [] |
| |
|
| | parts = re.split(r"(?<=[.!?])\s+", text) |
| | chunks, cur, cur_tokens = [], [], 0 |
| |
|
| | for p in parts: |
| | p = p.strip() |
| | if not p: |
| | continue |
| | tok_len = len(tokenizer.encode(p, add_special_tokens=False)) |
| | if cur and (cur_tokens + tok_len) > max_tokens: |
| | chunks.append(" ".join(cur)) |
| | cur, cur_tokens = [p], tok_len |
| | else: |
| | cur.append(p) |
| | cur_tokens += tok_len |
| |
|
| | if cur: |
| | chunks.append(" ".join(cur)) |
| | return chunks |
| |
|
| |
|
| | def translate_text_nllb(text: str, target_lang: str) -> str: |
| | model, tokenizer = load_nllb() |
| | lang_code = LANGUAGE_MAP[target_lang]["nllb"] |
| |
|
| | chunks = chunk_for_nllb(text, tokenizer, max_tokens=NLLB_MAX_TOKENS) |
| | if not chunks: |
| | raise RuntimeError("Nothing to translate after chunking") |
| |
|
| | translated_chunks = [] |
| | for i, chunk in enumerate(chunks, 1): |
| | inputs = tokenizer(chunk, return_tensors="pt", padding=True) |
| | if torch.cuda.is_available(): |
| | inputs = {k: v.to("cuda") for k, v in inputs.items()} |
| |
|
| | out = model.generate( |
| | **inputs, |
| | forced_bos_token_id=tokenizer.lang_code_to_id[lang_code], |
| | max_length=NLLB_MAX_NEW_TOKENS |
| | ) |
| | translated = tokenizer.batch_decode(out, skip_special_tokens=True)[0].strip() |
| | translated_chunks.append(translated) |
| | if i % 5 == 0: |
| | logger.info(f"Translated {i}/{len(chunks)} chunks...") |
| |
|
| | result = " ".join(translated_chunks).strip() |
| | if not result: |
| | raise RuntimeError("Translation returned empty") |
| | logger.info(f"Translation preview: {result[:120]}...") |
| | return result |
| |
|
| | |
| | |
| | |
| | def chunk_text_for_tts(text: str, max_chars: int = MMS_TTS_MAX_CHARS) -> List[str]: |
| | text = " ".join(text.split()).strip() |
| | if not text: |
| | return [] |
| | if len(text) <= max_chars: |
| | return [text] |
| |
|
| | parts = re.split(r"(?<=[.!?])\s+", text) |
| | chunks, buf = [], "" |
| | for p in parts: |
| | p = p.strip() |
| | if not p: |
| | continue |
| | if len(buf) + len(p) + 1 <= max_chars: |
| | buf = (buf + " " + p).strip() |
| | else: |
| | if buf: |
| | chunks.append(buf) |
| | buf = p |
| | if buf: |
| | chunks.append(buf) |
| | return chunks |
| |
|
| |
|
| | def generate_mms_speech(text: str, lang_name: str, out_wav: str) -> str: |
| | model, tokenizer = load_mms_tts(lang_name) |
| |
|
| | chunks = chunk_text_for_tts(text, max_chars=MMS_TTS_MAX_CHARS) |
| | if not chunks: |
| | raise RuntimeError("Nothing to synthesize (empty translated text?)") |
| |
|
| | tmp_dir = Path(tempfile.mkdtemp(prefix="mms_")) |
| | chunk_files: List[Path] = [] |
| |
|
| | logger.info(f"π€ MMS-TTS: generating {lang_name} audio in {len(chunks)} chunks...") |
| | for i, chunk in enumerate(chunks): |
| | inputs = tokenizer(chunk, return_tensors="pt") |
| | if torch.cuda.is_available(): |
| | inputs = {k: v.to("cuda") for k, v in inputs.items()} |
| |
|
| | with torch.no_grad(): |
| | out = model(**inputs) |
| |
|
| | waveform = out.waveform.squeeze(0).float().cpu().numpy() |
| | waveform = np.nan_to_num(waveform, nan=0.0, posinf=0.0, neginf=0.0) |
| |
|
| | |
| | audio_i16 = np.int16(np.clip(waveform, -1.0, 1.0) * 32767) |
| |
|
| | cf = tmp_dir / f"chunk_{i:03d}.wav" |
| | wavfile.write(str(cf), rate=int(model.config.sampling_rate), data=audio_i16) |
| |
|
| | if not cf.exists() or cf.stat().st_size < 100: |
| | raise RuntimeError(f"MMS-TTS produced invalid chunk {i}") |
| | chunk_files.append(cf) |
| |
|
| | if (i + 1) % 3 == 0: |
| | logger.info(f" generated {i+1}/{len(chunks)} chunks") |
| |
|
| | if len(chunk_files) == 1: |
| | shutil.copy(str(chunk_files[0]), out_wav) |
| | else: |
| | list_file = tmp_dir / "concat.txt" |
| | with open(list_file, "w", encoding="utf-8") as f: |
| | for cf in chunk_files: |
| | f.write(f"file '{cf.as_posix()}'\n") |
| |
|
| | cmd = [ |
| | "ffmpeg", "-y", |
| | "-f", "concat", "-safe", "0", |
| | "-i", str(list_file), |
| | "-c", "copy", |
| | out_wav, |
| | "-loglevel", "error" |
| | ] |
| | run_cmd(cmd, err_prefix="MMS-TTS audio concatenation failed") |
| |
|
| | if not os.path.exists(out_wav) or os.path.getsize(out_wav) < 1000: |
| | raise RuntimeError("Final MMS-TTS audio missing/too small") |
| |
|
| | logger.info(f"β
MMS-TTS complete: {out_wav}") |
| | return out_wav |
| |
|
| | |
| | |
| | |
| | def _path_from_gradio(obj) -> str: |
| | """Gradio may pass a path (str) or a file object with .name.""" |
| | if obj is None: |
| | return "" |
| | if isinstance(obj, str): |
| | return obj |
| | return getattr(obj, "name", "") or "" |
| |
|
| | |
| | |
| | |
| | def process_video( |
| | video_input, |
| | voice_sample, |
| | target_language: str, |
| | use_lipsync: bool, |
| | progress=gr.Progress() |
| | ) -> Tuple[Optional[str], str, str, str]: |
| | """ |
| | Pipeline: |
| | 1) Extract + transcribe English audio |
| | 2) Translate to target language (NLLB) |
| | 3) Generate target-language speech (MMS-TTS) |
| | 4) Align duration to video |
| | 5) Lip-sync with Wav2Lip (if available & enabled), else audio-only replace |
| | """ |
| | check_system_deps() |
| |
|
| | temp_dir = Path(tempfile.mkdtemp(prefix="indic_app_")) |
| |
|
| | try: |
| | progress(0.02, desc="π₯ Validating inputs...") |
| | video_path = _path_from_gradio(video_input) |
| | _voice_path = _path_from_gradio(voice_sample) |
| |
|
| | if not video_path or not os.path.exists(video_path): |
| | return None, "", "", "β Error: Video file not found" |
| |
|
| | if target_language not in LANGUAGE_MAP: |
| | return None, "", "", "β Error: Unsupported language selection" |
| |
|
| | wav2lip_ok, wav2lip_msg = is_wav2lip_available() |
| | logger.info(wav2lip_msg) |
| |
|
| | progress(0.08, desc="π΅ Extracting video audio...") |
| | extracted = temp_dir / "extracted.wav" |
| | extract_audio_16k_mono(video_path, str(extracted)) |
| |
|
| | progress(0.12, desc="β±οΈ Reading video duration...") |
| | vid_dur = get_video_duration(video_path) |
| | logger.info(f"Video duration: {vid_dur:.2f}s") |
| |
|
| | progress(0.22, desc="π€ Transcribing (Whisper)...") |
| | original_text = transcribe_audio_en(str(extracted)) |
| |
|
| | progress(0.40, desc=f"π Translating to {target_language} (NLLB)...") |
| | translated_text = translate_text_nllb(original_text, target_language) |
| |
|
| | progress(0.60, desc=f"ποΈ Generating speech ({target_language}, MMS-TTS)...") |
| | tts_out = temp_dir / "tts.wav" |
| | generate_mms_speech(translated_text, target_language, str(tts_out)) |
| |
|
| | progress(0.75, desc="π§ Aligning audio to video duration...") |
| | aligned = temp_dir / "aligned.wav" |
| | align_audio_duration(str(tts_out), vid_dur, str(aligned)) |
| |
|
| | progress(0.88, desc="π¬ Creating output video...") |
| | out_video = temp_dir / "output.mp4" |
| | method = "Audio-only" |
| |
|
| | if use_lipsync and wav2lip_ok: |
| | try: |
| | run_wav2lip(video_path, str(aligned), str(out_video)) |
| | method = "Wav2Lip (lip-synced)" |
| | except Exception as e: |
| | logger.warning(f"Wav2Lip failed, falling back to audio-only. Reason: {e}") |
| | replace_audio_only(video_path, str(aligned), str(out_video)) |
| | method = "Audio-only (Wav2Lip failed)" |
| | else: |
| | replace_audio_only(video_path, str(aligned), str(out_video)) |
| | if use_lipsync and not wav2lip_ok: |
| | method = "Audio-only (Wav2Lip missing)" |
| |
|
| | if not out_video.exists() or out_video.stat().st_size < 2000: |
| | return None, "", "", "β Error: Output video not created" |
| |
|
| | progress(1.0, desc="β
Done!") |
| |
|
| | |
| | if torch.cuda.is_available(): |
| | torch.cuda.empty_cache() |
| | gc.collect() |
| |
|
| | status = f"β
Success β {target_language} dub created. Method: **{method}**" |
| | if not wav2lip_ok and use_lipsync: |
| | status += "\n\nβ οΈ Wav2Lip not found in this Space, so lip-sync was skipped." |
| |
|
| | return ( |
| | str(out_video), |
| | f"**Original (English):**\n{original_text}", |
| | f"**Translated ({target_language}):**\n{translated_text}", |
| | status, |
| | ) |
| |
|
| | except Exception as e: |
| | logger.error(f"Pipeline error: {e}", exc_info=True) |
| | return None, "", "", f"β Error: {str(e)}" |
| |
|
| | finally: |
| | |
| | pass |
| |
|
| | |
| | |
| | |
| | def create_interface(): |
| | wav2lip_ok, wav2lip_msg = is_wav2lip_available() |
| |
|
| | with gr.Blocks(title="Indic Video Translator + Dub", theme=gr.themes.Soft()) as demo: |
| | gr.Markdown( |
| | f""" |
| | # π¬ Indic Video Translator + Dub (MMS-TTS) |
| | |
| | **English video β Transcribe β Translate β Dub in target Indian language** |
| | |
| | ### Supported languages |
| | {", ".join(LANGUAGE_MAP.keys())} |
| | |
| | ### Lip-sync status |
| | - {wav2lip_msg} |
| | |
| | > **Note:** This build focuses on **correct pronunciation** (MMS-TTS). |
| | > True βsame-voice cloningβ needs an extra **voice conversion** stage (OpenVoice/RVC). |
| | """ |
| | ) |
| |
|
| | with gr.Row(): |
| | with gr.Column(scale=1): |
| | gr.Markdown("### πΉ Step 1: Video Input (English)") |
| | with gr.Tabs(): |
| | with gr.Tab("Upload Video"): |
| | video_upload = gr.Video(label="Upload Video", sources=["upload"]) |
| | with gr.Tab("Record Video"): |
| | video_record = gr.Video(label="Record Video", sources=["webcam"]) |
| |
|
| | gr.Markdown("### π€ Optional: Voice Sample (not used in this build)") |
| | gr.Markdown("*Kept for future voice cloning. You can ignore this for now.*") |
| | with gr.Tabs(): |
| | with gr.Tab("Upload Audio"): |
| | audio_upload = gr.Audio(label="Upload Voice Sample", type="filepath", sources=["upload"]) |
| | with gr.Tab("Record Audio"): |
| | audio_record = gr.Audio(label="Record Voice Sample", type="filepath", sources=["microphone"]) |
| |
|
| | gr.Markdown("### π Step 2: Target Language") |
| | language_dropdown = gr.Dropdown( |
| | choices=list(LANGUAGE_MAP.keys()), |
| | value="Tamil", |
| | label="Select Target Language", |
| | ) |
| |
|
| | use_lipsync = gr.Checkbox( |
| | value=True, |
| | label="Try lip-sync (Wav2Lip if available)", |
| | ) |
| |
|
| | process_btn = gr.Button("π Start", variant="primary", size="lg") |
| |
|
| | with gr.Column(scale=1): |
| | gr.Markdown("### π Output") |
| | status_box = gr.Markdown("βΉοΈ Ready.") |
| | output_video = gr.Video(label="Processed Video") |
| |
|
| | with gr.Accordion("π Transcript + Translation", open=False): |
| | transcription_box = gr.Markdown() |
| | translation_box = gr.Markdown() |
| |
|
| | def pick_video(vu, vr): |
| | return vu if vu is not None else vr |
| |
|
| | def pick_audio(au, ar): |
| | return au if au is not None else ar |
| |
|
| | process_btn.click( |
| | fn=lambda vu, vr, au, ar, lang, lipsync: process_video( |
| | pick_video(vu, vr), |
| | pick_audio(au, ar), |
| | lang, |
| | lipsync, |
| | ), |
| | inputs=[video_upload, video_record, audio_upload, audio_record, language_dropdown, use_lipsync], |
| | outputs=[output_video, transcription_box, translation_box, status_box], |
| | ) |
| |
|
| | gr.Markdown( |
| | """ |
| | --- |
| | ### Troubleshooting (HF Spaces) |
| | - If you see **ffmpeg not found**, use a **Docker Space** and install ffmpeg. |
| | - If you want **lip-sync**, include `Wav2Lip/` + `checkpoints/wav2lip_gan.pth` in the repo. |
| | - For real **voice cloning** (same voice as your sample), add a voice-conversion stage (OpenVoice/RVC). |
| | """ |
| | ) |
| |
|
| | return demo |
| |
|
| |
|
| | if __name__ == "__main__": |
| | demo = create_interface() |
| | demo.queue(max_size=MAX_QUEUE) |
| | demo.launch(server_name="0.0.0.0", server_port=7860, share=False) |
| |
|