Spaces:
Sleeping
Sleeping
| """ | |
| Video Language Translation Tool β Improved | |
| ========================================== | |
| Fixes over original: | |
| 1. Whisper (via transformers pipeline) replaces Google SR: | |
| - Handles unlimited audio length via 30s chunking | |
| - Adds proper punctuation natively | |
| - Works fully offline, supports 99 languages | |
| 2. Facebook NLLB-200 replaces the `translate` library: | |
| - High-quality neural machine translation | |
| - 200-language support | |
| 3. Segment-aware TTS generation: | |
| - Each Whisper segment β translated β TTS generated individually | |
| - TTS audio time-stretched with librosa to match original segment duration | |
| - Segments overlaid at their exact original timestamps β perfect sync | |
| 4. Instrumental + translated TTS merged with correct volume balance | |
| 5. Final audio padded/trimmed to exactly match video duration | |
| 6. Wrong model class bug fixed (was AutoModelForCausalLM for a Seq2Seq model) | |
| """ | |
| import os | |
| import gc | |
| import subprocess | |
| import shutil | |
| import tempfile | |
| import torch | |
| import numpy as np | |
| import librosa | |
| import soundfile as sf | |
| # βββ Torch 2.6 compatibility patch (MUST be before any TTS/model imports) βββ | |
| # Torch 2.6 changed torch.load default to weights_only=True. | |
| # Coqui TTS checkpoints contain custom Python classes (XttsConfig, etc.) | |
| # serialized via pickle, which weights_only=True blocks unconditionally. | |
| # We monkey-patch torch.load to always pass weights_only=False so TTS loads | |
| # correctly. This is safe because TTS weights come from the trusted Coqui Hub. | |
| # We also register all known Coqui TTS globals with add_safe_globals as a | |
| # belt-and-suspenders measure for any future torch version that ignores the patch. | |
| _original_torch_load = torch.load | |
| def _patched_torch_load(f, map_location=None, pickle_module=None, weights_only=False, **kwargs): | |
| # Force weights_only=False for all torch.load calls in this process. | |
| # Pickle-based TTS checkpoints cannot load under weights_only=True. | |
| kwargs.pop("weights_only", None) | |
| if pickle_module is not None: | |
| return _original_torch_load(f, map_location=map_location, | |
| pickle_module=pickle_module, | |
| weights_only=False, **kwargs) | |
| return _original_torch_load(f, map_location=map_location, | |
| weights_only=False, **kwargs) | |
| torch.load = _patched_torch_load | |
| # Also allowlist known Coqui TTS globals for any code that uses safe_globals context | |
| try: | |
| import torch.serialization as _ts | |
| from TTS.tts.configs.xtts_config import XttsConfig | |
| from TTS.tts.models.xtts import XttsAudioConfig, XttsArgs | |
| from TTS.config.shared_configs import BaseDatasetConfig | |
| _ts.add_safe_globals([XttsConfig, XttsAudioConfig, XttsArgs, BaseDatasetConfig]) | |
| except Exception as _e: | |
| print(f"Note: Could not pre-register TTS safe globals (non-fatal): {_e}") | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| from TTS.api import TTS | |
| from pydub import AudioSegment, effects as pydub_effects | |
| from moviepy.editor import VideoFileClip | |
| import gradio as gr | |
| from transformers import pipeline as hf_pipeline | |
| from huggingface_hub import InferenceClient | |
| # βββ Environment ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| os.environ["COQUI_TOS_AGREED"] = "1" | |
| ffmpeg_path = "ffmpeg" | |
| # βββ NLLB language codes ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| NLLB_LANG = { | |
| "en": "eng_Latn", | |
| "es": "spa_Latn", | |
| "fr": "fra_Latn", | |
| "de": "deu_Latn", | |
| "it": "ita_Latn", | |
| "pt": "por_Latn", | |
| "pl": "pol_Latn", | |
| "tr": "tur_Latn", | |
| "ru": "rus_Cyrl", | |
| "nl": "nld_Latn", | |
| "cs": "ces_Latn", | |
| "ar": "arb_Arab", | |
| "zh": "zho_Hans", | |
| "hu": "hun_Latn", | |
| "ko": "kor_Hang", | |
| "ja": "jpn_Jpan", | |
| "hi": "hin_Deva", | |
| } | |
| # Whisper language names (ISO 639-1 short codes work directly) | |
| WHISPER_LANG = {k: k for k in NLLB_LANG} # Whisper accepts the same short codes | |
| SUPPORTED_LANGS = list(NLLB_LANG.keys()) | |
| # βββ Lazy model cache ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _asr_pipeline = None | |
| _tts_model = None | |
| def get_asr_pipeline(): | |
| global _asr_pipeline | |
| if _asr_pipeline is None: | |
| print("Loading Whisper ASR pipeline β¦") | |
| _asr_pipeline = hf_pipeline( | |
| "automatic-speech-recognition", | |
| model="openai/whisper-small", # upgrade to "medium" for better accuracy | |
| chunk_length_s=30, # process 30-second windows β no length limit | |
| stride_length_s=5, # 5-second overlap for continuity | |
| return_timestamps=True, # get word/chunk timestamps | |
| device=0 if torch.cuda.is_available() else -1, | |
| ) | |
| return _asr_pipeline | |
| def get_tts_model(): | |
| global _tts_model | |
| if _tts_model is None: | |
| print("Loading XTTS v2 β¦") | |
| _tts_model = TTS( | |
| model_name="tts_models/multilingual/multi-dataset/xtts_v2", | |
| progress_bar=False, | |
| gpu=torch.cuda.is_available(), | |
| ) | |
| return _tts_model | |
| # βββ Chat assistant (unchanged from original) βββββββββββββββββββββββββββββ | |
| _inference_client = InferenceClient("HuggingFaceH4/zephyr-7b-beta") | |
| def respond(message, history, system_message, max_tokens=512, temperature=0.7, top_p=0.9): | |
| messages = [{"role": "system", "content": system_message}] | |
| for u, a in history: | |
| if u: | |
| messages.append({"role": "user", "content": u}) | |
| if a: | |
| messages.append({"role": "assistant", "content": a}) | |
| messages.append({"role": "user", "content": message}) | |
| response = "" | |
| for msg in _inference_client.chat_completion( | |
| messages, max_tokens=max_tokens, stream=True, | |
| temperature=temperature, top_p=top_p, | |
| ): | |
| token = msg.choices[0].delta.content | |
| response += token | |
| yield response | |
| # βββ Demucs vocal separation (replaces Spleeter) ββββββββββββββββββββββββ | |
| # Demucs has NO httpx dependency, supports Python 3.11+, and produces | |
| # higher-quality stems than Spleeter 2stems. | |
| def separate_vocals(audio_file: str) -> tuple: | |
| """ | |
| Separates vocals and instrumental using Facebook Demucs (htdemucs model). | |
| Returns (vocal_path, instrumental_path) as 16-kHz mono WAV files. | |
| Robustness notes: | |
| - Captures stderr so errors are visible in HuggingFace logs | |
| - Falls back from htdemucs -> htdemucs_6s -> mdx_extra on failure | |
| - Uses demucs Python API directly as last resort to avoid PATH issues | |
| - Converts 44.1kHz stereo output to 16kHz mono for ASR/TTS | |
| """ | |
| output_dir = "demucs_output" | |
| if os.path.exists(output_dir): | |
| shutil.rmtree(output_dir) | |
| os.makedirs(output_dir, exist_ok=True) | |
| # Try multiple model names in order of quality β stop at first success | |
| models_to_try = ["htdemucs", "htdemucs_6s", "mdx_extra"] | |
| last_error = "" | |
| for model_name in models_to_try: | |
| print(f"Trying Demucs model: {model_name} ...") | |
| try: | |
| result = subprocess.run( | |
| [ | |
| "python", "-m", "demucs", | |
| "--two-stems=vocals", | |
| "-n", model_name, | |
| "-o", output_dir, | |
| audio_file, | |
| ], | |
| capture_output=True, | |
| text=True, | |
| ) | |
| if result.returncode == 0: | |
| print(f"Demucs succeeded with model: {model_name}") | |
| break | |
| else: | |
| last_error = result.stderr.strip() or result.stdout.strip() | |
| print(f" Model {model_name} failed (rc={result.returncode}):\n {last_error[:300]}") | |
| # Clean output dir for next attempt | |
| if os.path.exists(output_dir): | |
| shutil.rmtree(output_dir) | |
| os.makedirs(output_dir, exist_ok=True) | |
| except FileNotFoundError: | |
| # python -m demucs not found β try demucs directly | |
| try: | |
| result = subprocess.run( | |
| [ | |
| "demucs", | |
| "--two-stems=vocals", | |
| "-n", model_name, | |
| "-o", output_dir, | |
| audio_file, | |
| ], | |
| capture_output=True, | |
| text=True, | |
| ) | |
| if result.returncode == 0: | |
| print(f"Demucs (direct binary) succeeded with model: {model_name}") | |
| break | |
| else: | |
| last_error = result.stderr.strip() | |
| except FileNotFoundError: | |
| last_error = "Demucs binary not found in PATH" | |
| else: | |
| raise RuntimeError( | |
| f"Demucs failed with all models ({models_to_try}).\n" | |
| f"Last error:\n{last_error}" | |
| ) | |
| # Locate output files β Demucs writes to <output_dir>/<model>/<stem>/{vocals,no_vocals}.wav | |
| base = os.path.splitext(os.path.basename(audio_file))[0] | |
| # Search for actual output directory (model name may differ from requested) | |
| stem_dir = None | |
| for candidate_model in models_to_try: | |
| candidate = os.path.join(output_dir, candidate_model, base) | |
| if os.path.isdir(candidate): | |
| stem_dir = candidate | |
| break | |
| if stem_dir is None: | |
| # Fallback: walk the output dir to find vocals.wav anywhere | |
| for root, dirs, files in os.walk(output_dir): | |
| if "vocals.wav" in files: | |
| stem_dir = root | |
| break | |
| if stem_dir is None: | |
| raise RuntimeError( | |
| f"Demucs ran but output directory not found.\n" | |
| f"Contents of {output_dir}:\n" + | |
| str(list(os.walk(output_dir))) | |
| ) | |
| vocal_raw = os.path.join(stem_dir, "vocals.wav") | |
| instr_raw = os.path.join(stem_dir, "no_vocals.wav") | |
| for path, label in [(vocal_raw, "vocals.wav"), (instr_raw, "no_vocals.wav")]: | |
| if not os.path.exists(path): | |
| raise RuntimeError(f"Expected {label} not found at {path}") | |
| # Resample to 16 kHz mono for downstream ASR / TTS compatibility | |
| vocal_out = "demucs_vocals.wav" | |
| instr_out = "demucs_instrumental.wav" | |
| for src, dst in [(vocal_raw, vocal_out), (instr_raw, instr_out)]: | |
| result = subprocess.run( | |
| [ffmpeg_path, "-y", "-i", src, "-ar", "16000", "-ac", "1", dst], | |
| capture_output=True, text=True, | |
| ) | |
| if result.returncode != 0: | |
| raise RuntimeError(f"ffmpeg resample failed for {src}:\n{result.stderr}") | |
| print(f"Vocal separation complete -> {vocal_out}, {instr_out}") | |
| return vocal_out, instr_out | |
| # βββ Step 1: Transcription with Whisper ββββββββββββββββββββββββββββββββββ | |
| def transcribe_audio(audio_path: str, source_language: str) -> tuple[list[dict], str]: | |
| """ | |
| Transcribes any-length audio using OpenAI Whisper (via transformers pipeline). | |
| Returns: | |
| segments β list of {"start": float, "end": float, "text": str} | |
| full_text β concatenated transcript with punctuation | |
| """ | |
| asr = get_asr_pipeline() | |
| lang = source_language if source_language != "auto" else None | |
| generate_kwargs = {"language": lang, "task": "transcribe"} if lang else {"task": "transcribe"} | |
| print(f"Transcribing {audio_path} (language={lang or 'auto-detect'}) ...") | |
| result = asr(audio_path, generate_kwargs=generate_kwargs) | |
| # The pipeline returns {"text": "...", "chunks": [{"timestamp": (start, end), "text": "..."}]} | |
| raw_chunks = result.get("chunks", []) | |
| segments = [] | |
| for chunk in raw_chunks: | |
| ts = chunk.get("timestamp", (0, 0)) | |
| start = ts[0] if ts[0] is not None else 0.0 | |
| end = ts[1] if ts[1] is not None else start + 2.0 | |
| text = chunk["text"].strip() | |
| if text: | |
| segments.append({"start": start, "end": end, "text": text}) | |
| full_text = result.get("text", " ".join(s["text"] for s in segments)).strip() | |
| print(f"Transcription done -- {len(segments)} segments, {len(full_text)} chars.") | |
| return segments, full_text | |
| # βββ Step 2: Translation βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def translate_text_nllb(text: str, src_lang: str, tgt_lang: str) -> str: | |
| """Translate text using Facebook NLLB-200 (handles long texts via chunking).""" | |
| if src_lang == tgt_lang or not text.strip(): | |
| return text | |
| src_nllb = NLLB_LANG.get(src_lang, "eng_Latn") | |
| tgt_nllb = NLLB_LANG.get(tgt_lang, "eng_Latn") | |
| print(f"Translating {src_nllb} -> {tgt_nllb} ...") | |
| translator = hf_pipeline( | |
| "translation", | |
| model="facebook/nllb-200-distilled-600M", | |
| src_lang=src_nllb, | |
| tgt_lang=tgt_nllb, | |
| device=0 if torch.cuda.is_available() else -1, | |
| max_length=512, | |
| ) | |
| # Split into sentence-like chunks β€ 400 chars | |
| sentences = _split_into_sentences(text) | |
| translated_parts = [] | |
| batch = "" | |
| for sent in sentences: | |
| if len(batch) + len(sent) < 380: | |
| batch += " " + sent | |
| else: | |
| if batch.strip(): | |
| translated_parts.append(translator(batch.strip())[0]["translation_text"]) | |
| batch = sent | |
| if batch.strip(): | |
| translated_parts.append(translator(batch.strip())[0]["translation_text"]) | |
| return " ".join(translated_parts) | |
| def _split_into_sentences(text: str) -> list[str]: | |
| """Naive sentence splitter on punctuation.""" | |
| import re | |
| parts = re.split(r"(?<=[.!?])\s+", text.strip()) | |
| return [p for p in parts if p] | |
| def translate_segments(segments: list[dict], src_lang: str, tgt_lang: str) -> list[dict]: | |
| """Translate each segment independently to preserve timing mapping.""" | |
| if src_lang == tgt_lang: | |
| return segments | |
| src_nllb = NLLB_LANG.get(src_lang, "eng_Latn") | |
| tgt_nllb = NLLB_LANG.get(tgt_lang, "eng_Latn") | |
| translator = hf_pipeline( | |
| "translation", | |
| model="facebook/nllb-200-distilled-600M", | |
| src_lang=src_nllb, | |
| tgt_lang=tgt_nllb, | |
| device=0 if torch.cuda.is_available() else -1, | |
| max_length=512, | |
| ) | |
| translated = [] | |
| for seg in segments: | |
| txt = seg["text"].strip() | |
| if not txt: | |
| translated.append({**seg, "translated": txt}) | |
| continue | |
| try: | |
| result = translator(txt)[0]["translation_text"] | |
| except Exception as e: | |
| print(f"Warning: translation failed for segment '{txt}': {e}") | |
| result = txt | |
| translated.append({**seg, "translated": result}) | |
| # Free translator from memory | |
| del translator | |
| gc.collect() | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| return translated | |
| # βββ Step 3: TTS with time-stretching for sync βββββββββββββββββββββββββββ | |
| def stretch_audio_to_duration(audio_path: str, target_duration_s: float) -> AudioSegment: | |
| """ | |
| Time-stretch (or compress) a WAV file so it fits exactly `target_duration_s` seconds. | |
| Uses librosa phase-vocoder β pitch-preserving, high quality. | |
| """ | |
| y, sr = librosa.load(audio_path, sr=None, mono=True) | |
| current_duration = len(y) / sr | |
| if current_duration <= 0 or target_duration_s <= 0: | |
| return AudioSegment.from_wav(audio_path) | |
| rate = current_duration / target_duration_s # >1 β compress, <1 β slow down | |
| # Clamp: avoid extreme stretching that sounds bad | |
| rate = max(0.4, min(rate, 3.5)) | |
| y_stretched = librosa.effects.time_stretch(y, rate=rate) | |
| tmp = audio_path + "_stretched.wav" | |
| sf.write(tmp, y_stretched, sr) | |
| seg = AudioSegment.from_wav(tmp) | |
| os.remove(tmp) | |
| return seg | |
| def generate_tts_segment(tts_model, text: str, speaker_wav: str, language: str, index: int) -> str: | |
| """Generate TTS for one text chunk. Returns path to WAV.""" | |
| out_path = f"tts_seg_{index}.wav" | |
| tts_model.tts_to_file( | |
| text=text, | |
| speaker_wav=speaker_wav, | |
| language=language, | |
| file_path=out_path, | |
| ) | |
| return out_path | |
| def build_synchronized_tts_audio( | |
| translated_segments: list[dict], | |
| speaker_wav: str, | |
| language: str, | |
| video_duration_s: float, | |
| ) -> str: | |
| """ | |
| Core sync engine: | |
| 1. Generate TTS for each segment | |
| 2. Time-stretch to match original segment duration | |
| 3. Overlay at original timestamp position on a silent base track | |
| 4. Pad / trim to exactly match video_duration_s | |
| Returns path to the final synchronized WAV. | |
| """ | |
| tts = get_tts_model() | |
| total_ms = int(video_duration_s * 1000) | |
| base_audio = AudioSegment.silent(duration=total_ms) | |
| for i, seg in enumerate(translated_segments): | |
| text = seg.get("translated", seg.get("text", "")).strip() | |
| if not text: | |
| continue | |
| start_ms = int(seg["start"] * 1000) | |
| end_ms = min(int(seg["end"] * 1000), total_ms) | |
| target_ms = max(end_ms - start_ms, 200) # at least 200 ms | |
| print(f" TTS segment {i+1}/{len(translated_segments)}: [{seg['start']:.1f}s-{seg['end']:.1f}s] [{text[:60]}]") | |
| try: | |
| wav_path = generate_tts_segment(tts, text, speaker_wav, language, i) | |
| except Exception as e: | |
| print(f" WARNING TTS failed for segment {i}: {e}") | |
| continue | |
| # Stretch to fit the original segment window | |
| try: | |
| tts_seg = stretch_audio_to_duration(wav_path, target_ms / 1000) | |
| except Exception as e: | |
| print(f" WARNING Stretch failed for segment {i}: {e}. Using raw TTS.") | |
| tts_seg = AudioSegment.from_wav(wav_path) | |
| # Trim if still too long after stretching | |
| if len(tts_seg) > target_ms: | |
| tts_seg = tts_seg[:target_ms] | |
| # Normalise loudness of segment | |
| tts_seg = pydub_effects.normalize(tts_seg) | |
| base_audio = base_audio.overlay(tts_seg, position=start_ms) | |
| os.remove(wav_path) | |
| # Ensure exact video duration | |
| if len(base_audio) < total_ms: | |
| base_audio = base_audio + AudioSegment.silent(duration=total_ms - len(base_audio)) | |
| else: | |
| base_audio = base_audio[:total_ms] | |
| out_path = "synchronized_tts.wav" | |
| base_audio.export(out_path, format="wav") | |
| print(f"Synchronized TTS audio saved -> {out_path}") | |
| return out_path | |
| # βββ Audio mixing: TTS + instrumental ββββββββββββββββββββββββββββββββββββ | |
| def mix_tts_with_instrumental(tts_path: str, instrumental_path: str, video_duration_s: float) -> str: | |
| """ | |
| Mix translated TTS (foreground) with the original instrumental (background). | |
| Instrumental is ducked by 8 dB so speech is always intelligible. | |
| Both tracks are padded/trimmed to exactly match video duration. | |
| """ | |
| total_ms = int(video_duration_s * 1000) | |
| tts_audio = AudioSegment.from_wav(tts_path) | |
| instr_audio = AudioSegment.from_wav(instrumental_path) | |
| # Match length | |
| def fit(seg, ms): | |
| return (seg + AudioSegment.silent(duration=ms))[:ms] if len(seg) < ms else seg[:ms] | |
| tts_audio = fit(tts_audio, total_ms) | |
| instr_audio = fit(instr_audio, total_ms) | |
| # Duck instrumental | |
| instr_audio = instr_audio - 8 # β8 dB | |
| mixed = instr_audio.overlay(tts_audio) | |
| out_path = "mixed_audio.wav" | |
| mixed.export(out_path, format="wav") | |
| return out_path | |
| # βββ Video helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def extract_video_only(input_video: str, output_video: str) -> str: | |
| """Extract video stream (no audio). Try copy first, fallback re-encode.""" | |
| for cmd in [ | |
| [ffmpeg_path, "-y", "-i", input_video, "-an", "-c:v", "copy", output_video], | |
| [ffmpeg_path, "-y", "-i", input_video, "-an", "-c:v", "libx264", "-preset", "veryfast", output_video], | |
| ]: | |
| try: | |
| subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| return output_video | |
| except subprocess.CalledProcessError: | |
| continue | |
| raise RuntimeError("Could not extract video stream from input.") | |
| def merge_video_audio(video_path: str, audio_path: str, output_path: str) -> str: | |
| """Mux video + audio into final MP4.""" | |
| subprocess.run( | |
| [ | |
| ffmpeg_path, "-y", | |
| "-i", video_path, | |
| "-i", audio_path, | |
| "-c:v", "copy", | |
| "-c:a", "aac", "-b:a", "192k", | |
| "-map", "0:v:0", "-map", "1:a:0", | |
| "-shortest", | |
| output_path, | |
| ], | |
| check=True, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| ) | |
| return output_path | |
| def get_video_duration(video_path: str) -> float: | |
| clip = VideoFileClip(video_path) | |
| dur = clip.duration | |
| clip.close() | |
| return dur | |
| # βββ Main pipeline ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def process_video( | |
| input_video_path: str, | |
| input_language: str = "en", | |
| target_language: str = "en", | |
| ) -> tuple[str, str, str, str]: | |
| """ | |
| Step 1 β returns: | |
| video_only_path : video stream without audio | |
| full_transcript : translated text with punctuation (editable) | |
| instrumental_path: background music / FX track | |
| vocal_path : original vocal track (used as speaker reference) | |
| """ | |
| # Clean up | |
| for f in ["only_video.mp4", "only_audio.wav"]: | |
| if os.path.exists(f): | |
| os.remove(f) | |
| # 1. Extract video stream | |
| print("Extracting video stream β¦") | |
| extract_video_only(input_video_path, "only_video.mp4") | |
| # 2. Extract audio | |
| print("Extracting audio β¦") | |
| subprocess.run( | |
| [ffmpeg_path, "-y", "-i", input_video_path, | |
| "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", "only_audio.wav"], | |
| check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, | |
| ) | |
| # 3. Separate vocals / instrumental with Demucs | |
| print("Separating vocals β¦") | |
| vocal_path, instrumental_path = separate_vocals("only_audio.wav") | |
| # 4. Transcribe with Whisper (full audio, any length, with punctuation) | |
| segments, full_text = transcribe_audio(vocal_path, input_language) | |
| # 5. Translate | |
| if input_language != target_language: | |
| full_text_translated = translate_text_nllb(full_text, input_language, target_language) | |
| else: | |
| full_text_translated = full_text | |
| return "only_video.mp4", full_text_translated, instrumental_path, vocal_path | |
| def generate_final_output( | |
| edited_text: str, | |
| video_path: str, | |
| instrumental_path: str, | |
| accent: str, | |
| speaker_reference: str, | |
| input_language: str = "en", | |
| ) -> str: | |
| """ | |
| Step 2 β generates the final dubbed video: | |
| - Re-transcribes with timestamps to get segment boundaries | |
| - Translates each segment | |
| - Generates TTS per segment and time-stretches to original duration | |
| - Mixes TTS with instrumental track | |
| - Muxes into final MP4 | |
| Returns path to the final video. | |
| """ | |
| video_duration = get_video_duration(video_path) | |
| print(f"Video duration: {video_duration:.2f}s") | |
| # Get timestamp-aligned segments from the original vocal audio | |
| print("Re-transcribing for timestamp-aligned segments β¦") | |
| segments, _ = transcribe_audio(speaker_reference, input_language) | |
| # Translate each segment individually to preserve time alignment | |
| print("Translating segments β¦") | |
| translated_segs = translate_segments(segments, input_language, accent) | |
| # If user edited the full text, rebuild segments proportionally | |
| # (use edited text if it differs significantly from auto-translation) | |
| auto_full = " ".join(s.get("translated", s["text"]) for s in translated_segs) | |
| if edited_text.strip() and _text_similarity(edited_text.strip(), auto_full) < 0.8: | |
| print("Using user-edited text β distributing across segments β¦") | |
| translated_segs = _redistribute_text(edited_text, segments) | |
| # Build synchronized TTS audio | |
| print("Generating synchronized TTS audio β¦") | |
| tts_path = build_synchronized_tts_audio(translated_segs, speaker_reference, accent, video_duration) | |
| # Mix TTS with instrumental | |
| print("Mixing TTS + instrumental β¦") | |
| mixed_path = mix_tts_with_instrumental(tts_path, instrumental_path, video_duration) | |
| # Mux into final video | |
| print("Merging video + audio β¦") | |
| final_path = merge_video_audio(video_path, mixed_path, "Final_output.mp4") | |
| # Cleanup temp files | |
| for f in [tts_path, mixed_path]: | |
| if os.path.exists(f): | |
| os.remove(f) | |
| print(f"Done! Final video -> {final_path}") | |
| return final_path | |
| # βββ Text utilities ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _text_similarity(a: str, b: str) -> float: | |
| """Very fast bag-of-words similarity to detect if user edited the text.""" | |
| sa, sb = set(a.lower().split()), set(b.lower().split()) | |
| if not sa or not sb: | |
| return 0.0 | |
| return len(sa & sb) / len(sa | sb) | |
| def _redistribute_text(full_text: str, segments: list[dict]) -> list[dict]: | |
| """ | |
| When user edits the translated text, distribute words proportionally | |
| across the original timestamp segments. | |
| """ | |
| words = full_text.split() | |
| total_words = len(words) | |
| durations = [max(seg["end"] - seg["start"], 0.1) for seg in segments] | |
| total_dur = sum(durations) | |
| result, cursor = [], 0 | |
| for i, seg in enumerate(segments): | |
| fraction = durations[i] / total_dur | |
| count = max(1, round(fraction * total_words)) | |
| chunk = " ".join(words[cursor : cursor + count]) | |
| cursor += count | |
| result.append({**seg, "translated": chunk}) | |
| # Append any remaining words to the last segment | |
| if cursor < total_words and result: | |
| result[-1]["translated"] += " " + " ".join(words[cursor:]) | |
| return result | |
| # βββ Gradio UI ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| LANG_CHOICES = SUPPORTED_LANGS | |
| with gr.Blocks(title="π¬ Video Language Dubbing Tool") as demo: | |
| gr.Markdown( | |
| "# π¬ Real-Time Video Language Dubbing\n" | |
| "Upload a video β extract vocals β transcribe (Whisper) β translate (NLLB-200) " | |
| "β generate voice-cloned TTS per segment β synchronise frame-perfectly β output dubbed video." | |
| ) | |
| # ββ State ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| seg_state = gr.State([]) # stores translated segments for step 2 | |
| # ββ Step 1 βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| gr.Markdown("## Step 1: Upload & Process") | |
| with gr.Row(): | |
| video_input = gr.Video(label="Input Video") | |
| input_lang = gr.Dropdown(LANG_CHOICES, label="Source Language", value="en") | |
| target_lang = gr.Dropdown(LANG_CHOICES, label="Target Language", value="en") | |
| accent = gr.Dropdown(LANG_CHOICES, label="TTS Language / Accent", value="en") | |
| process_btn = gr.Button("π Process Video (Step 1)", variant="primary") | |
| with gr.Row(): | |
| instrumental_audio = gr.Audio(label="Extracted Instrumental Track", type="filepath") | |
| speaker_ref_audio = gr.Audio(label="Extracted Vocal Track (speaker reference)", type="filepath") | |
| translated_text_box = gr.Textbox( | |
| label="π Translated Transcript (editable β fix any errors before Step 2)", | |
| lines=12, | |
| placeholder="Transcribed & translated text will appear here β¦", | |
| ) | |
| # ββ Step 2 βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| gr.Markdown( | |
| "## Step 2: Generate Dubbed Video\n" | |
| "Review / edit the transcript above, then click Generate." | |
| ) | |
| generate_btn = gr.Button("π Generate Dubbed Video (Step 2)", variant="primary") | |
| final_video_output = gr.Video(label="π¬ Final Dubbed Video") | |
| status_text = gr.Textbox(label="Status", interactive=False) | |
| # ββ Callbacks ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def step1(video_file, in_lang, tgt_lang, acc): | |
| try: | |
| video_path, translated_text, instr_path, vocal_path = process_video( | |
| video_file, in_lang, tgt_lang | |
| ) | |
| return translated_text, instr_path, vocal_path, "β Step 1 complete. Review transcript and click Step 2." | |
| except Exception as e: | |
| return f"[ERROR] {e}", None, None, f"ERROR Error: {e}" | |
| def step2(edited_text, instr_path, acc, speaker_ref, in_lang): | |
| try: | |
| final_video = generate_final_output( | |
| edited_text, "only_video.mp4", instr_path, acc, speaker_ref, in_lang | |
| ) | |
| return final_video, "β Done! Your dubbed video is ready." | |
| except Exception as e: | |
| return None, f"ERROR Error: {e}" | |
| process_btn.click( | |
| fn=step1, | |
| inputs=[video_input, input_lang, target_lang, accent], | |
| outputs=[translated_text_box, instrumental_audio, speaker_ref_audio, status_text], | |
| ) | |
| generate_btn.click( | |
| fn=step2, | |
| inputs=[translated_text_box, instrumental_audio, accent, speaker_ref_audio, input_lang], | |
| outputs=[final_video_output, status_text], | |
| ) | |
| # ββ Optional: Chat tab βββββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Tab("π¬ AI Assistant"): | |
| gr.ChatInterface( | |
| respond, | |
| additional_inputs=[ | |
| gr.Textbox(value="You are a helpful assistant.", label="System Message"), | |
| gr.Slider(64, 2048, value=512, label="Max Tokens"), | |
| gr.Slider(0.1, 2.0, value=0.7, label="Temperature"), | |
| gr.Slider(0.1, 1.0, value=0.9, label="Top-p"), | |
| ], | |
| ) | |
| demo.launch() |