Emotibridge / app.py
Akshitkt001's picture
Update app.py
a353efe verified
"""
Video Language Translation Tool β€” Improved
==========================================
Fixes over original:
1. Whisper (via transformers pipeline) replaces Google SR:
- Handles unlimited audio length via 30s chunking
- Adds proper punctuation natively
- Works fully offline, supports 99 languages
2. Facebook NLLB-200 replaces the `translate` library:
- High-quality neural machine translation
- 200-language support
3. Segment-aware TTS generation:
- Each Whisper segment β†’ translated β†’ TTS generated individually
- TTS audio time-stretched with librosa to match original segment duration
- Segments overlaid at their exact original timestamps β†’ perfect sync
4. Instrumental + translated TTS merged with correct volume balance
5. Final audio padded/trimmed to exactly match video duration
6. Wrong model class bug fixed (was AutoModelForCausalLM for a Seq2Seq model)
"""
import os
import gc
import subprocess
import shutil
import tempfile
import torch
import numpy as np
import librosa
import soundfile as sf
# ─── Torch 2.6 compatibility patch (MUST be before any TTS/model imports) ───
# Torch 2.6 changed torch.load default to weights_only=True.
# Coqui TTS checkpoints contain custom Python classes (XttsConfig, etc.)
# serialized via pickle, which weights_only=True blocks unconditionally.
# We monkey-patch torch.load to always pass weights_only=False so TTS loads
# correctly. This is safe because TTS weights come from the trusted Coqui Hub.
# We also register all known Coqui TTS globals with add_safe_globals as a
# belt-and-suspenders measure for any future torch version that ignores the patch.
_original_torch_load = torch.load
def _patched_torch_load(f, map_location=None, pickle_module=None, weights_only=False, **kwargs):
# Force weights_only=False for all torch.load calls in this process.
# Pickle-based TTS checkpoints cannot load under weights_only=True.
kwargs.pop("weights_only", None)
if pickle_module is not None:
return _original_torch_load(f, map_location=map_location,
pickle_module=pickle_module,
weights_only=False, **kwargs)
return _original_torch_load(f, map_location=map_location,
weights_only=False, **kwargs)
torch.load = _patched_torch_load
# Also allowlist known Coqui TTS globals for any code that uses safe_globals context
try:
import torch.serialization as _ts
from TTS.tts.configs.xtts_config import XttsConfig
from TTS.tts.models.xtts import XttsAudioConfig, XttsArgs
from TTS.config.shared_configs import BaseDatasetConfig
_ts.add_safe_globals([XttsConfig, XttsAudioConfig, XttsArgs, BaseDatasetConfig])
except Exception as _e:
print(f"Note: Could not pre-register TTS safe globals (non-fatal): {_e}")
# ─────────────────────────────────────────────────────────────────────────────
from TTS.api import TTS
from pydub import AudioSegment, effects as pydub_effects
from moviepy.editor import VideoFileClip
import gradio as gr
from transformers import pipeline as hf_pipeline
from huggingface_hub import InferenceClient
# ─── Environment ────────────────────────────────────────────────────────────
os.environ["COQUI_TOS_AGREED"] = "1"
ffmpeg_path = "ffmpeg"
# ─── NLLB language codes ────────────────────────────────────────────────────
NLLB_LANG = {
"en": "eng_Latn",
"es": "spa_Latn",
"fr": "fra_Latn",
"de": "deu_Latn",
"it": "ita_Latn",
"pt": "por_Latn",
"pl": "pol_Latn",
"tr": "tur_Latn",
"ru": "rus_Cyrl",
"nl": "nld_Latn",
"cs": "ces_Latn",
"ar": "arb_Arab",
"zh": "zho_Hans",
"hu": "hun_Latn",
"ko": "kor_Hang",
"ja": "jpn_Jpan",
"hi": "hin_Deva",
}
# Whisper language names (ISO 639-1 short codes work directly)
WHISPER_LANG = {k: k for k in NLLB_LANG} # Whisper accepts the same short codes
SUPPORTED_LANGS = list(NLLB_LANG.keys())
# ─── Lazy model cache ────────────────────────────────────────────────────────
_asr_pipeline = None
_tts_model = None
def get_asr_pipeline():
global _asr_pipeline
if _asr_pipeline is None:
print("Loading Whisper ASR pipeline …")
_asr_pipeline = hf_pipeline(
"automatic-speech-recognition",
model="openai/whisper-small", # upgrade to "medium" for better accuracy
chunk_length_s=30, # process 30-second windows β†’ no length limit
stride_length_s=5, # 5-second overlap for continuity
return_timestamps=True, # get word/chunk timestamps
device=0 if torch.cuda.is_available() else -1,
)
return _asr_pipeline
def get_tts_model():
global _tts_model
if _tts_model is None:
print("Loading XTTS v2 …")
_tts_model = TTS(
model_name="tts_models/multilingual/multi-dataset/xtts_v2",
progress_bar=False,
gpu=torch.cuda.is_available(),
)
return _tts_model
# ─── Chat assistant (unchanged from original) ─────────────────────────────
_inference_client = InferenceClient("HuggingFaceH4/zephyr-7b-beta")
def respond(message, history, system_message, max_tokens=512, temperature=0.7, top_p=0.9):
messages = [{"role": "system", "content": system_message}]
for u, a in history:
if u:
messages.append({"role": "user", "content": u})
if a:
messages.append({"role": "assistant", "content": a})
messages.append({"role": "user", "content": message})
response = ""
for msg in _inference_client.chat_completion(
messages, max_tokens=max_tokens, stream=True,
temperature=temperature, top_p=top_p,
):
token = msg.choices[0].delta.content
response += token
yield response
# ─── Demucs vocal separation (replaces Spleeter) ────────────────────────
# Demucs has NO httpx dependency, supports Python 3.11+, and produces
# higher-quality stems than Spleeter 2stems.
def separate_vocals(audio_file: str) -> tuple:
"""
Separates vocals and instrumental using Facebook Demucs (htdemucs model).
Returns (vocal_path, instrumental_path) as 16-kHz mono WAV files.
Robustness notes:
- Captures stderr so errors are visible in HuggingFace logs
- Falls back from htdemucs -> htdemucs_6s -> mdx_extra on failure
- Uses demucs Python API directly as last resort to avoid PATH issues
- Converts 44.1kHz stereo output to 16kHz mono for ASR/TTS
"""
output_dir = "demucs_output"
if os.path.exists(output_dir):
shutil.rmtree(output_dir)
os.makedirs(output_dir, exist_ok=True)
# Try multiple model names in order of quality β€” stop at first success
models_to_try = ["htdemucs", "htdemucs_6s", "mdx_extra"]
last_error = ""
for model_name in models_to_try:
print(f"Trying Demucs model: {model_name} ...")
try:
result = subprocess.run(
[
"python", "-m", "demucs",
"--two-stems=vocals",
"-n", model_name,
"-o", output_dir,
audio_file,
],
capture_output=True,
text=True,
)
if result.returncode == 0:
print(f"Demucs succeeded with model: {model_name}")
break
else:
last_error = result.stderr.strip() or result.stdout.strip()
print(f" Model {model_name} failed (rc={result.returncode}):\n {last_error[:300]}")
# Clean output dir for next attempt
if os.path.exists(output_dir):
shutil.rmtree(output_dir)
os.makedirs(output_dir, exist_ok=True)
except FileNotFoundError:
# python -m demucs not found β€” try demucs directly
try:
result = subprocess.run(
[
"demucs",
"--two-stems=vocals",
"-n", model_name,
"-o", output_dir,
audio_file,
],
capture_output=True,
text=True,
)
if result.returncode == 0:
print(f"Demucs (direct binary) succeeded with model: {model_name}")
break
else:
last_error = result.stderr.strip()
except FileNotFoundError:
last_error = "Demucs binary not found in PATH"
else:
raise RuntimeError(
f"Demucs failed with all models ({models_to_try}).\n"
f"Last error:\n{last_error}"
)
# Locate output files β€” Demucs writes to <output_dir>/<model>/<stem>/{vocals,no_vocals}.wav
base = os.path.splitext(os.path.basename(audio_file))[0]
# Search for actual output directory (model name may differ from requested)
stem_dir = None
for candidate_model in models_to_try:
candidate = os.path.join(output_dir, candidate_model, base)
if os.path.isdir(candidate):
stem_dir = candidate
break
if stem_dir is None:
# Fallback: walk the output dir to find vocals.wav anywhere
for root, dirs, files in os.walk(output_dir):
if "vocals.wav" in files:
stem_dir = root
break
if stem_dir is None:
raise RuntimeError(
f"Demucs ran but output directory not found.\n"
f"Contents of {output_dir}:\n" +
str(list(os.walk(output_dir)))
)
vocal_raw = os.path.join(stem_dir, "vocals.wav")
instr_raw = os.path.join(stem_dir, "no_vocals.wav")
for path, label in [(vocal_raw, "vocals.wav"), (instr_raw, "no_vocals.wav")]:
if not os.path.exists(path):
raise RuntimeError(f"Expected {label} not found at {path}")
# Resample to 16 kHz mono for downstream ASR / TTS compatibility
vocal_out = "demucs_vocals.wav"
instr_out = "demucs_instrumental.wav"
for src, dst in [(vocal_raw, vocal_out), (instr_raw, instr_out)]:
result = subprocess.run(
[ffmpeg_path, "-y", "-i", src, "-ar", "16000", "-ac", "1", dst],
capture_output=True, text=True,
)
if result.returncode != 0:
raise RuntimeError(f"ffmpeg resample failed for {src}:\n{result.stderr}")
print(f"Vocal separation complete -> {vocal_out}, {instr_out}")
return vocal_out, instr_out
# ─── Step 1: Transcription with Whisper ──────────────────────────────────
def transcribe_audio(audio_path: str, source_language: str) -> tuple[list[dict], str]:
"""
Transcribes any-length audio using OpenAI Whisper (via transformers pipeline).
Returns:
segments – list of {"start": float, "end": float, "text": str}
full_text – concatenated transcript with punctuation
"""
asr = get_asr_pipeline()
lang = source_language if source_language != "auto" else None
generate_kwargs = {"language": lang, "task": "transcribe"} if lang else {"task": "transcribe"}
print(f"Transcribing {audio_path} (language={lang or 'auto-detect'}) ...")
result = asr(audio_path, generate_kwargs=generate_kwargs)
# The pipeline returns {"text": "...", "chunks": [{"timestamp": (start, end), "text": "..."}]}
raw_chunks = result.get("chunks", [])
segments = []
for chunk in raw_chunks:
ts = chunk.get("timestamp", (0, 0))
start = ts[0] if ts[0] is not None else 0.0
end = ts[1] if ts[1] is not None else start + 2.0
text = chunk["text"].strip()
if text:
segments.append({"start": start, "end": end, "text": text})
full_text = result.get("text", " ".join(s["text"] for s in segments)).strip()
print(f"Transcription done -- {len(segments)} segments, {len(full_text)} chars.")
return segments, full_text
# ─── Step 2: Translation ─────────────────────────────────────────────────
def translate_text_nllb(text: str, src_lang: str, tgt_lang: str) -> str:
"""Translate text using Facebook NLLB-200 (handles long texts via chunking)."""
if src_lang == tgt_lang or not text.strip():
return text
src_nllb = NLLB_LANG.get(src_lang, "eng_Latn")
tgt_nllb = NLLB_LANG.get(tgt_lang, "eng_Latn")
print(f"Translating {src_nllb} -> {tgt_nllb} ...")
translator = hf_pipeline(
"translation",
model="facebook/nllb-200-distilled-600M",
src_lang=src_nllb,
tgt_lang=tgt_nllb,
device=0 if torch.cuda.is_available() else -1,
max_length=512,
)
# Split into sentence-like chunks ≀ 400 chars
sentences = _split_into_sentences(text)
translated_parts = []
batch = ""
for sent in sentences:
if len(batch) + len(sent) < 380:
batch += " " + sent
else:
if batch.strip():
translated_parts.append(translator(batch.strip())[0]["translation_text"])
batch = sent
if batch.strip():
translated_parts.append(translator(batch.strip())[0]["translation_text"])
return " ".join(translated_parts)
def _split_into_sentences(text: str) -> list[str]:
"""Naive sentence splitter on punctuation."""
import re
parts = re.split(r"(?<=[.!?])\s+", text.strip())
return [p for p in parts if p]
def translate_segments(segments: list[dict], src_lang: str, tgt_lang: str) -> list[dict]:
"""Translate each segment independently to preserve timing mapping."""
if src_lang == tgt_lang:
return segments
src_nllb = NLLB_LANG.get(src_lang, "eng_Latn")
tgt_nllb = NLLB_LANG.get(tgt_lang, "eng_Latn")
translator = hf_pipeline(
"translation",
model="facebook/nllb-200-distilled-600M",
src_lang=src_nllb,
tgt_lang=tgt_nllb,
device=0 if torch.cuda.is_available() else -1,
max_length=512,
)
translated = []
for seg in segments:
txt = seg["text"].strip()
if not txt:
translated.append({**seg, "translated": txt})
continue
try:
result = translator(txt)[0]["translation_text"]
except Exception as e:
print(f"Warning: translation failed for segment '{txt}': {e}")
result = txt
translated.append({**seg, "translated": result})
# Free translator from memory
del translator
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
return translated
# ─── Step 3: TTS with time-stretching for sync ───────────────────────────
def stretch_audio_to_duration(audio_path: str, target_duration_s: float) -> AudioSegment:
"""
Time-stretch (or compress) a WAV file so it fits exactly `target_duration_s` seconds.
Uses librosa phase-vocoder β€” pitch-preserving, high quality.
"""
y, sr = librosa.load(audio_path, sr=None, mono=True)
current_duration = len(y) / sr
if current_duration <= 0 or target_duration_s <= 0:
return AudioSegment.from_wav(audio_path)
rate = current_duration / target_duration_s # >1 β†’ compress, <1 β†’ slow down
# Clamp: avoid extreme stretching that sounds bad
rate = max(0.4, min(rate, 3.5))
y_stretched = librosa.effects.time_stretch(y, rate=rate)
tmp = audio_path + "_stretched.wav"
sf.write(tmp, y_stretched, sr)
seg = AudioSegment.from_wav(tmp)
os.remove(tmp)
return seg
def generate_tts_segment(tts_model, text: str, speaker_wav: str, language: str, index: int) -> str:
"""Generate TTS for one text chunk. Returns path to WAV."""
out_path = f"tts_seg_{index}.wav"
tts_model.tts_to_file(
text=text,
speaker_wav=speaker_wav,
language=language,
file_path=out_path,
)
return out_path
def build_synchronized_tts_audio(
translated_segments: list[dict],
speaker_wav: str,
language: str,
video_duration_s: float,
) -> str:
"""
Core sync engine:
1. Generate TTS for each segment
2. Time-stretch to match original segment duration
3. Overlay at original timestamp position on a silent base track
4. Pad / trim to exactly match video_duration_s
Returns path to the final synchronized WAV.
"""
tts = get_tts_model()
total_ms = int(video_duration_s * 1000)
base_audio = AudioSegment.silent(duration=total_ms)
for i, seg in enumerate(translated_segments):
text = seg.get("translated", seg.get("text", "")).strip()
if not text:
continue
start_ms = int(seg["start"] * 1000)
end_ms = min(int(seg["end"] * 1000), total_ms)
target_ms = max(end_ms - start_ms, 200) # at least 200 ms
print(f" TTS segment {i+1}/{len(translated_segments)}: [{seg['start']:.1f}s-{seg['end']:.1f}s] [{text[:60]}]")
try:
wav_path = generate_tts_segment(tts, text, speaker_wav, language, i)
except Exception as e:
print(f" WARNING TTS failed for segment {i}: {e}")
continue
# Stretch to fit the original segment window
try:
tts_seg = stretch_audio_to_duration(wav_path, target_ms / 1000)
except Exception as e:
print(f" WARNING Stretch failed for segment {i}: {e}. Using raw TTS.")
tts_seg = AudioSegment.from_wav(wav_path)
# Trim if still too long after stretching
if len(tts_seg) > target_ms:
tts_seg = tts_seg[:target_ms]
# Normalise loudness of segment
tts_seg = pydub_effects.normalize(tts_seg)
base_audio = base_audio.overlay(tts_seg, position=start_ms)
os.remove(wav_path)
# Ensure exact video duration
if len(base_audio) < total_ms:
base_audio = base_audio + AudioSegment.silent(duration=total_ms - len(base_audio))
else:
base_audio = base_audio[:total_ms]
out_path = "synchronized_tts.wav"
base_audio.export(out_path, format="wav")
print(f"Synchronized TTS audio saved -> {out_path}")
return out_path
# ─── Audio mixing: TTS + instrumental ────────────────────────────────────
def mix_tts_with_instrumental(tts_path: str, instrumental_path: str, video_duration_s: float) -> str:
"""
Mix translated TTS (foreground) with the original instrumental (background).
Instrumental is ducked by 8 dB so speech is always intelligible.
Both tracks are padded/trimmed to exactly match video duration.
"""
total_ms = int(video_duration_s * 1000)
tts_audio = AudioSegment.from_wav(tts_path)
instr_audio = AudioSegment.from_wav(instrumental_path)
# Match length
def fit(seg, ms):
return (seg + AudioSegment.silent(duration=ms))[:ms] if len(seg) < ms else seg[:ms]
tts_audio = fit(tts_audio, total_ms)
instr_audio = fit(instr_audio, total_ms)
# Duck instrumental
instr_audio = instr_audio - 8 # βˆ’8 dB
mixed = instr_audio.overlay(tts_audio)
out_path = "mixed_audio.wav"
mixed.export(out_path, format="wav")
return out_path
# ─── Video helpers ────────────────────────────────────────────────────────
def extract_video_only(input_video: str, output_video: str) -> str:
"""Extract video stream (no audio). Try copy first, fallback re-encode."""
for cmd in [
[ffmpeg_path, "-y", "-i", input_video, "-an", "-c:v", "copy", output_video],
[ffmpeg_path, "-y", "-i", input_video, "-an", "-c:v", "libx264", "-preset", "veryfast", output_video],
]:
try:
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return output_video
except subprocess.CalledProcessError:
continue
raise RuntimeError("Could not extract video stream from input.")
def merge_video_audio(video_path: str, audio_path: str, output_path: str) -> str:
"""Mux video + audio into final MP4."""
subprocess.run(
[
ffmpeg_path, "-y",
"-i", video_path,
"-i", audio_path,
"-c:v", "copy",
"-c:a", "aac", "-b:a", "192k",
"-map", "0:v:0", "-map", "1:a:0",
"-shortest",
output_path,
],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
return output_path
def get_video_duration(video_path: str) -> float:
clip = VideoFileClip(video_path)
dur = clip.duration
clip.close()
return dur
# ─── Main pipeline ────────────────────────────────────────────────────────
def process_video(
input_video_path: str,
input_language: str = "en",
target_language: str = "en",
) -> tuple[str, str, str, str]:
"""
Step 1 β€” returns:
video_only_path : video stream without audio
full_transcript : translated text with punctuation (editable)
instrumental_path: background music / FX track
vocal_path : original vocal track (used as speaker reference)
"""
# Clean up
for f in ["only_video.mp4", "only_audio.wav"]:
if os.path.exists(f):
os.remove(f)
# 1. Extract video stream
print("Extracting video stream …")
extract_video_only(input_video_path, "only_video.mp4")
# 2. Extract audio
print("Extracting audio …")
subprocess.run(
[ffmpeg_path, "-y", "-i", input_video_path,
"-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", "only_audio.wav"],
check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
)
# 3. Separate vocals / instrumental with Demucs
print("Separating vocals …")
vocal_path, instrumental_path = separate_vocals("only_audio.wav")
# 4. Transcribe with Whisper (full audio, any length, with punctuation)
segments, full_text = transcribe_audio(vocal_path, input_language)
# 5. Translate
if input_language != target_language:
full_text_translated = translate_text_nllb(full_text, input_language, target_language)
else:
full_text_translated = full_text
return "only_video.mp4", full_text_translated, instrumental_path, vocal_path
def generate_final_output(
edited_text: str,
video_path: str,
instrumental_path: str,
accent: str,
speaker_reference: str,
input_language: str = "en",
) -> str:
"""
Step 2 β€” generates the final dubbed video:
- Re-transcribes with timestamps to get segment boundaries
- Translates each segment
- Generates TTS per segment and time-stretches to original duration
- Mixes TTS with instrumental track
- Muxes into final MP4
Returns path to the final video.
"""
video_duration = get_video_duration(video_path)
print(f"Video duration: {video_duration:.2f}s")
# Get timestamp-aligned segments from the original vocal audio
print("Re-transcribing for timestamp-aligned segments …")
segments, _ = transcribe_audio(speaker_reference, input_language)
# Translate each segment individually to preserve time alignment
print("Translating segments …")
translated_segs = translate_segments(segments, input_language, accent)
# If user edited the full text, rebuild segments proportionally
# (use edited text if it differs significantly from auto-translation)
auto_full = " ".join(s.get("translated", s["text"]) for s in translated_segs)
if edited_text.strip() and _text_similarity(edited_text.strip(), auto_full) < 0.8:
print("Using user-edited text β€” distributing across segments …")
translated_segs = _redistribute_text(edited_text, segments)
# Build synchronized TTS audio
print("Generating synchronized TTS audio …")
tts_path = build_synchronized_tts_audio(translated_segs, speaker_reference, accent, video_duration)
# Mix TTS with instrumental
print("Mixing TTS + instrumental …")
mixed_path = mix_tts_with_instrumental(tts_path, instrumental_path, video_duration)
# Mux into final video
print("Merging video + audio …")
final_path = merge_video_audio(video_path, mixed_path, "Final_output.mp4")
# Cleanup temp files
for f in [tts_path, mixed_path]:
if os.path.exists(f):
os.remove(f)
print(f"Done! Final video -> {final_path}")
return final_path
# ─── Text utilities ──────────────────────────────────────────────────────
def _text_similarity(a: str, b: str) -> float:
"""Very fast bag-of-words similarity to detect if user edited the text."""
sa, sb = set(a.lower().split()), set(b.lower().split())
if not sa or not sb:
return 0.0
return len(sa & sb) / len(sa | sb)
def _redistribute_text(full_text: str, segments: list[dict]) -> list[dict]:
"""
When user edits the translated text, distribute words proportionally
across the original timestamp segments.
"""
words = full_text.split()
total_words = len(words)
durations = [max(seg["end"] - seg["start"], 0.1) for seg in segments]
total_dur = sum(durations)
result, cursor = [], 0
for i, seg in enumerate(segments):
fraction = durations[i] / total_dur
count = max(1, round(fraction * total_words))
chunk = " ".join(words[cursor : cursor + count])
cursor += count
result.append({**seg, "translated": chunk})
# Append any remaining words to the last segment
if cursor < total_words and result:
result[-1]["translated"] += " " + " ".join(words[cursor:])
return result
# ─── Gradio UI ────────────────────────────────────────────────────────────
LANG_CHOICES = SUPPORTED_LANGS
with gr.Blocks(title="🎬 Video Language Dubbing Tool") as demo:
gr.Markdown(
"# 🎬 Real-Time Video Language Dubbing\n"
"Upload a video β†’ extract vocals β†’ transcribe (Whisper) β†’ translate (NLLB-200) "
"β†’ generate voice-cloned TTS per segment β†’ synchronise frame-perfectly β†’ output dubbed video."
)
# ── State ────────────────────────────────────────────────────────────
seg_state = gr.State([]) # stores translated segments for step 2
# ── Step 1 ───────────────────────────────────────────────────────────
gr.Markdown("## Step 1: Upload & Process")
with gr.Row():
video_input = gr.Video(label="Input Video")
input_lang = gr.Dropdown(LANG_CHOICES, label="Source Language", value="en")
target_lang = gr.Dropdown(LANG_CHOICES, label="Target Language", value="en")
accent = gr.Dropdown(LANG_CHOICES, label="TTS Language / Accent", value="en")
process_btn = gr.Button("πŸ” Process Video (Step 1)", variant="primary")
with gr.Row():
instrumental_audio = gr.Audio(label="Extracted Instrumental Track", type="filepath")
speaker_ref_audio = gr.Audio(label="Extracted Vocal Track (speaker reference)", type="filepath")
translated_text_box = gr.Textbox(
label="πŸ“ Translated Transcript (editable β€” fix any errors before Step 2)",
lines=12,
placeholder="Transcribed & translated text will appear here …",
)
# ── Step 2 ───────────────────────────────────────────────────────────
gr.Markdown(
"## Step 2: Generate Dubbed Video\n"
"Review / edit the transcript above, then click Generate."
)
generate_btn = gr.Button("πŸŽ™ Generate Dubbed Video (Step 2)", variant="primary")
final_video_output = gr.Video(label="🎬 Final Dubbed Video")
status_text = gr.Textbox(label="Status", interactive=False)
# ── Callbacks ────────────────────────────────────────────────────────
def step1(video_file, in_lang, tgt_lang, acc):
try:
video_path, translated_text, instr_path, vocal_path = process_video(
video_file, in_lang, tgt_lang
)
return translated_text, instr_path, vocal_path, "βœ… Step 1 complete. Review transcript and click Step 2."
except Exception as e:
return f"[ERROR] {e}", None, None, f"ERROR Error: {e}"
def step2(edited_text, instr_path, acc, speaker_ref, in_lang):
try:
final_video = generate_final_output(
edited_text, "only_video.mp4", instr_path, acc, speaker_ref, in_lang
)
return final_video, "βœ… Done! Your dubbed video is ready."
except Exception as e:
return None, f"ERROR Error: {e}"
process_btn.click(
fn=step1,
inputs=[video_input, input_lang, target_lang, accent],
outputs=[translated_text_box, instrumental_audio, speaker_ref_audio, status_text],
)
generate_btn.click(
fn=step2,
inputs=[translated_text_box, instrumental_audio, accent, speaker_ref_audio, input_lang],
outputs=[final_video_output, status_text],
)
# ── Optional: Chat tab ───────────────────────────────────────────────
with gr.Tab("πŸ’¬ AI Assistant"):
gr.ChatInterface(
respond,
additional_inputs=[
gr.Textbox(value="You are a helpful assistant.", label="System Message"),
gr.Slider(64, 2048, value=512, label="Max Tokens"),
gr.Slider(0.1, 2.0, value=0.7, label="Temperature"),
gr.Slider(0.1, 1.0, value=0.9, label="Top-p"),
],
)
demo.launch()