Video_Dub / app.py
7RiKuSama's picture
Update app.py
6ee3af9 verified
"""
AutoDub β€” Video Dubbing & Subtitling (Direct Upload)
Gradio app for Hugging Face Spaces
Pipeline:
1. Upload video directly
2. Extract audio from video
3. Transcribe with faster-whisper (Whisper large-v3)
4. Translate with Qwen2.5-7B-Instruct (4-bit)
5. Synthesise dubbed audio with XTTS-v2
6. Merge audio/video with FFmpeg
7. Burn subtitles with FFmpeg
"""
from __future__ import annotations
# ── Standard library ─────────────────────────────────────────────────────────
import gc
import glob
import os
import re
import subprocess
import tempfile
import traceback
from difflib import SequenceMatcher
from pathlib import Path
import uuid
# ── Third-party ───────────────────────────────────────────────────────────────
import gradio as gr
import librosa
import numpy as np
import soundfile as sf
import torch
from faster_whisper import WhisperModel
from transformers import BitsAndBytesConfig, pipeline
from TTS.tts.configs.xtts_config import XttsConfig
from TTS.tts.models.xtts import Xtts
from TTS.utils.manage import ModelManager
# ── ZeroGPU (optional) ───────────────────────────────────────────────────────
try:
import spaces
HAS_ZEROGPU = True
except Exception:
HAS_ZEROGPU = False
class spaces:
@staticmethod
def GPU(fn):
return fn
# ── Environment ───────────────────────────────────────────────────────────────
os.environ["COQUI_TOS_AGREED"] = "1"
_DATA_ROOT = Path("/data") if os.access("/data", os.W_OK) else Path(tempfile.gettempdir())
WORK_DIR = _DATA_ROOT / "autodub"
WORK_DIR.mkdir(parents=True, exist_ok=True)
XTTS_CACHE = _DATA_ROOT / "tts_cache"
XTTS_CACHE.mkdir(parents=True, exist_ok=True)
os.environ["TTS_HOME"] = str(XTTS_CACHE)
SUPPORTED_LANGUAGES = [
"Arabic", "French", "Spanish", "German", "Italian",
"Portuguese", "Russian", "Chinese", "Japanese", "Korean",
"English", "Hindi", "Turkish", "Polish", "Dutch",
]
LANG_TO_XTTS = {
"arabic": "ar", "french": "fr", "spanish": "es",
"german": "de", "italian": "it", "portuguese": "pt",
"russian": "ru", "chinese": "zh-cn", "japanese": "ja",
"korean": "ko", "english": "en", "hindi": "hi",
"turkish": "tr", "polish": "pl", "dutch": "nl",
}
_UNICODE_FONT_MAP = {
"arabic": "Noto Naskh Arabic", "chinese": "Noto Sans CJK SC",
"japanese": "Noto Sans CJK JP", "korean": "Noto Sans CJK KR",
"hindi": "Noto Sans Devanagari", "russian": "Noto Sans",
}
_NON_LATIN_LANGS = {
"arabic", "chinese", "japanese", "korean", "russian", "hindi",
"thai", "greek", "persian", "farsi", "georgian", "armenian",
"hebrew", "urdu", "bengali",
}
_LEAK_PATTERNS = [
r"<\|im_start\|>", r"<\|im_end\|>",
r"^(Sure|Of course|Here is|Certainly|I will|Translation:)",
]
# ── Global model handles ─────────────────────────────────────────────────────
_whisper_model: WhisperModel | None = None
_translator = None
_xtts_model: Xtts | None = None
# ═══════════════════════════════════════════════════════════════════════════════
# MODEL LOADING
# ═══════════════════════════════════════════════════════════════════════════════
def _load_whisper():
global _whisper_model
if _whisper_model is not None:
return
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"[Whisper] Loading large-v3 on {device}…")
_whisper_model = WhisperModel(
"large-v3",
device=device,
compute_type="float16" if device == "cuda" else "int8",
download_root=str(_DATA_ROOT / "whisper_cache"),
)
print("[Whisper] Ready.")
def _load_translator():
global _translator
if _translator is not None:
return
print("[Translator] Loading Qwen2.5-7B-Instruct (4-bit)…")
qconfig = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.float16,
bnb_4bit_use_double_quant=True,
bnb_4bit_quant_type="nf4",
)
_translator = pipeline(
"text-generation",
model="Qwen/Qwen2.5-7B-Instruct",
model_kwargs={"quantization_config": qconfig},
device_map="auto",
max_new_tokens=512,
)
print("[Translator] Ready.")
def _load_xtts():
global _xtts_model
if _xtts_model is not None:
return
print("[XTTS-v2] Loading model…")
manager = ModelManager(output_prefix=str(XTTS_CACHE))
model_name = "tts_models/multilingual/multi-dataset/xtts_v2"
manager.download_model(model_name)
model_path = str(XTTS_CACHE / model_name.replace("/", "--"))
config_path = os.path.join(model_path, "config.json")
config = XttsConfig()
config.load_json(config_path)
_xtts_model = Xtts.init_from_config(config)
_xtts_model.load_checkpoint(config, checkpoint_dir=model_path)
if torch.cuda.is_available():
try:
_xtts_model.cuda()
print("[XTTS-v2] Ready on GPU.")
except RuntimeError:
print("[XTTS-v2] Ready on CPU.")
else:
print("[XTTS-v2] Ready on CPU.")
# ═══════════════════════════════════════════════════════════════════════════════
# SRT HELPERS
# ═══════════════════════════════════════════════════════════════════════════════
def _is_near_duplicate(text: str, seen: list, threshold: float = 0.92) -> bool:
for prev in seen[-5:]:
if SequenceMatcher(None, text.lower(), prev.lower()).ratio() >= threshold:
return True
return False
def _is_word_loop(text: str) -> bool:
words = text.split()
if len(words) < 4:
return False
if len(set(w.lower() for w in words)) / len(words) < 0.35:
return True
return False
def _split_into_sentences(text: str, max_chars: int = 150) -> list[str]:
sentences = re.split(r"(?<=[.!?])\s+", text.strip())
result, current = [], ""
for s in sentences:
if len(current) + len(s) <= max_chars:
current += (" " + s) if current else s
else:
if current:
result.append(current.strip())
current = s
if current:
result.append(current.strip())
return result
def _format_time(seconds: float) -> str:
h = int(seconds // 3600)
m = int((seconds % 3600) // 60)
s = int(seconds % 60)
ms = min(int(round((seconds - int(seconds)) * 1000)), 999)
return f"{h:02}:{m:02}:{s:02},{ms:03}"
def chunks_to_srt(chunks: list[dict]) -> str:
srt_blocks = []
index = 1
seen_exact: set[str] = set()
seen_recent: list[str] = []
for chunk in chunks:
start = chunk.get("start", 0)
end = chunk.get("end", start + 5)
if end is None:
end = start + 5
text = chunk["text"].strip()
if not text or len(text) < 3:
continue
if text in seen_exact or _is_near_duplicate(text, seen_recent):
continue
if _is_word_loop(text):
continue
duration = max(end - start, 0.1)
if len(text) / duration > 50:
continue
seen_exact.add(text)
seen_recent.append(text)
sentences = _split_into_sentences(text)
if len(sentences) <= 1:
srt_blocks.append(f"{index}\n{_format_time(start)} --> {_format_time(end)}\n{text}")
index += 1
else:
word_counts = [len(s.split()) for s in sentences]
total_words = max(sum(word_counts), 1)
t = start
for sentence in sentences:
frac = len(sentence.split()) / total_words
seg_end = min(round(t + duration * frac, 3), end)
seg_end = max(seg_end, t + 0.5)
srt_blocks.append(f"{index}\n{_format_time(t)} --> {_format_time(seg_end)}\n{sentence}")
index += 1
t = seg_end
return "\n\n".join(srt_blocks)
def _parse_srt_time(time_str: str) -> float:
h, m, s_ms = time_str.split(":")
s, ms = s_ms.split(",")
return int(h) * 3600 + int(m) * 60 + int(s) + int(ms) / 1000
def parse_srt(srt_content: str) -> list[dict]:
parsed = []
for block in srt_content.strip().split("\n\n"):
lines = block.strip().split("\n")
if len(lines) < 3:
continue
times = lines[1].split(" --> ")
parsed.append({
"start": _parse_srt_time(times[0]),
"end": _parse_srt_time(times[1]),
"text": "\n".join(lines[2:]),
})
return parsed
# ═══════════════════════════════════════════════════════════════════════════════
# AUDIO/VIDEO EXTRACTION
# ═══════════════════════════════════════════════════════════════════════════════
def extract_audio_from_video(video_path: str, output_path: str, sample_rate: int = 16000) -> str:
"""Extract mono audio from video at specified sample rate."""
cmd = [
"ffmpeg", "-y", "-i", video_path,
"-ac", "1", "-ar", str(sample_rate),
"-c:a", "pcm_s16le", output_path
]
subprocess.run(cmd, capture_output=True, check=True)
return output_path
def extract_voice_sample_from_video(video_path: str, output_path: str, duration: float = 12.0) -> str:
"""Extract a voice sample for XTTS cloning."""
temp_audio = str(WORK_DIR / f"temp_voice_{uuid.uuid4().hex[:6]}.wav")
cmd = [
"ffmpeg", "-y", "-i", video_path,
"-ac", "1", "-ar", "22050",
"-c:a", "pcm_s16le", temp_audio
]
subprocess.run(cmd, capture_output=True, check=True)
audio, sr = librosa.load(temp_audio, sr=22050, duration=180.0)
target_samples = int(duration * sr)
best_audio, best_rms = None, -1.0
offset = 3.0
while offset + duration <= len(audio) / sr:
start = int(offset * sr)
seg = audio[start:start + target_samples]
rms = float(np.sqrt(np.mean(seg ** 2)))
if rms > best_rms:
best_rms, best_audio = rms, seg
if rms >= 0.01:
break
offset += 15.0
if best_audio is None:
best_audio = audio[:target_samples]
sf.write(output_path, best_audio, sr)
os.remove(temp_audio)
return output_path
# ═══════════════════════════════════════════════════════════════════════════════
# TRANSCRIPTION
# ═══════════════════════════════════════════════════════════════════════════════
@spaces.GPU
def transcribe_audio(audio_path: str) -> list[dict]:
_load_whisper()
print("[Whisper] Transcribing…")
segments, info = _whisper_model.transcribe(
audio_path,
beam_size=5,
word_timestamps=True,
vad_filter=True,
vad_parameters=dict(min_silence_duration_ms=500),
)
chunks = [
{"start": round(s.start, 3), "end": round(s.end, 3), "text": s.text.strip()}
for s in segments
]
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
print(f"[Whisper] {len(chunks)} segments | language: {info.language}")
return chunks
# ═══════════════════════════════════════════════════════════════════════════════
# TRANSLATION
# ═══════════════════════════════════════════════════════════════════════════════
def _clean_translation(raw: str, source_text: str) -> str:
lines = [l.strip() for l in raw.strip().splitlines() if l.strip()]
if not lines:
return source_text
result = re.sub(r"<\|[^|]+\|>", "", lines[0]).strip()
if result.lower() == source_text.lower():
result = lines[1] if len(lines) > 1 else source_text
for pat in _LEAK_PATTERNS:
if re.search(pat, result, re.IGNORECASE):
result = lines[1] if len(lines) > 1 else source_text
break
return result.strip()
def _strip_latin_tokens(text: str) -> str:
cleaned = re.sub(r"\b[a-zA-Z]+\b", "", text)
return re.sub(r"[ \t]{2,}", " ", cleaned).strip()
def _translate_block(text: str, target_lang: str, duration: float) -> str:
_load_translator()
messages = [
{
"role": "system",
"content": (
f"You are an expert dubbing translator. Translate into {target_lang}. "
f"TARGET DURATION: {duration:.2f}s. Condense if needed. "
"Output ONLY the translated text β€” no preamble, no quotes."
),
},
{
"role": "user",
"content": f'Original ({duration:.2f}s): "{text}"\nTranslate to {target_lang}:',
},
]
tokenizer = _translator.tokenizer
prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
result = _translator(
prompt,
do_sample=False,
max_new_tokens=150,
repetition_penalty=1.1,
return_full_text=False,
)
raw = result[0]["generated_text"].strip()
cleaned = _clean_translation(raw, text)
if target_lang.strip().lower() in _NON_LATIN_LANGS:
stripped = _strip_latin_tokens(cleaned)
return stripped if len(stripped) > 2 else cleaned
return cleaned
def translate_srt(srt: str, target_lang: str) -> str:
translated_blocks = []
blocks = srt.strip().split("\n\n")
for i, block in enumerate(blocks):
lines = block.split("\n")
if len(lines) < 3:
translated_blocks.append(block)
continue
index_line, timestamp, text = lines[0], lines[1], "\n".join(lines[2:])
try:
t0_str, t1_str = timestamp.split(" --> ")
def _to_sec(t):
ts = t.replace(",", ".")
h, m, s = ts.split(":")
return int(h) * 3600 + int(m) * 60 + float(s)
duration = _to_sec(t1_str) - _to_sec(t0_str)
except Exception:
duration = 3.0
translated_text = _translate_block(text, target_lang, duration)
translated_blocks.append(f"{index_line}\n{timestamp}\n{translated_text}")
if (i + 1) % 10 == 0:
print(f" Translated {i + 1}/{len(blocks)} blocks…")
print(f"[Translator] Done β€” {len(blocks)} blocks.")
return "\n\n".join(translated_blocks)
# ═══════════════════════════════════════════════════════════════════════════════
# TTS
# ═══════════════════════════════════════════════════════════════════════════════
@spaces.GPU
def generate_tts_audio(srt_content: str, target_lang: str, speaker_wav: str, output_path: str) -> str:
_load_xtts()
lang_code = LANG_TO_XTTS.get(target_lang.lower(), "en")
chunks = parse_srt(srt_content)
seen_texts: set[str] = set()
valid_chunks = []
for chunk in chunks:
text = chunk["text"].strip()
if text in seen_texts or (chunk["end"] - chunk["start"]) < 0.5 or len(text) < 10:
continue
seen_texts.add(text)
valid_chunks.append(chunk)
if not valid_chunks:
sf.write(output_path, np.zeros(24000, dtype=np.float32), 24000)
return output_path
sample_rate = 24000
gpt_cond_latent, speaker_embedding = _xtts_model.get_conditioning_latents(audio_path=[speaker_wav])
total_duration = valid_chunks[-1]["end"] + 5.0
output_buffer = np.zeros(int(total_duration * sample_rate), dtype=np.float32)
write_cursor = 0
for i, chunk in enumerate(valid_chunks):
print(f"[XTTS] Generating [{i + 1}/{len(valid_chunks)}]: \"{chunk['text'][:50]}…\"")
out = _xtts_model.inference(
text=chunk["text"],
language=lang_code,
gpt_cond_latent=gpt_cond_latent,
speaker_embedding=speaker_embedding,
temperature=0.75,
length_penalty=1.0,
repetition_penalty=5.0,
top_k=50,
top_p=0.85,
)
wav = np.array(out["wav"], dtype=np.float32)
wav, _ = librosa.effects.trim(wav, top_db=20)
target_duration = chunk["end"] - chunk["start"]
actual_duration = len(wav) / sample_rate
if actual_duration > target_duration:
speed_factor = min(actual_duration / target_duration, 1.4)
if speed_factor > 1.05:
wav = librosa.effects.time_stretch(wav, rate=speed_factor)
start_pos = max(int(chunk["start"] * sample_rate), write_cursor)
end_pos = start_pos + len(wav)
if end_pos > len(output_buffer):
output_buffer = np.pad(output_buffer, (0, end_pos - len(output_buffer)))
output_buffer[start_pos:end_pos] = wav
write_cursor = end_pos
sf.write(output_path, output_buffer, sample_rate)
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
return output_path
# ═══════════════════════════════════════════════════════════════════════════════
# FFMPEG HELPERS
# ═══════════════════════════════════════════════════════════════════════════════
def _has_nvenc() -> bool:
result = subprocess.run(["ffmpeg", "-hide_banner", "-encoders"], capture_output=True, text=True)
return "h264_nvenc" in result.stdout
def merge_audio_video(video_path: str, dubbed_audio_path: str, output_path: str) -> str:
cmd = [
"ffmpeg", "-y",
"-i", video_path,
"-i", dubbed_audio_path,
"-filter_complex", "[0:a]volume=0.15[orig];[orig][1:a]amix=inputs=2:duration=first[aout]",
"-map", "0:v",
"-map", "[aout]",
"-c:v", "copy",
"-c:a", "aac",
"-b:a", "192k",
"-shortest",
output_path,
]
subprocess.run(cmd, capture_output=True, check=True)
print(f"[FFmpeg] Merge done β†’ {output_path}")
return output_path
def _color_to_ass(name: str) -> str:
return {
"white": "&H00FFFFFF", "black": "&H00000000",
"red": "&H000000FF", "blue": "&H00FF0000",
"yellow": "&H0000FFFF", "green": "&H0000FF00",
}.get(name.lower(), "&H00FFFFFF")
def burn_subtitles(
video_path: str,
srt_path: str,
output_path: str,
target_lang: str | None = None,
font_name: str = "Arial",
font_size: int = 24,
) -> str:
effective_font = font_name
if target_lang and font_name == "Arial":
effective_font = _UNICODE_FONT_MAP.get(target_lang.lower(), "Noto Sans")
print(f"[FFmpeg] Subtitle font β†’ \"{effective_font}\"")
fc = _color_to_ass("white")
oc = _color_to_ass("black")
style = (
f"FontName={effective_font},FontSize={font_size},"
f"PrimaryColour={fc},OutlineColour={oc},"
f"Outline=2,Alignment=2,Charset=1"
)
escaped = srt_path.replace("'", "'\\''").replace(":", "\\:")
video_codec = (
["-c:v", "h264_nvenc", "-preset", "p2", "-cq", "23"]
if _has_nvenc()
else ["-c:v", "libx264", "-preset", "fast", "-crf", "23"]
)
encoder_name = "h264_nvenc" if _has_nvenc() else "libx264"
print(f"[FFmpeg] Burning subtitles using {encoder_name}")
cmd = [
"ffmpeg", "-y",
"-sub_charenc", "UTF-8",
"-i", video_path,
"-vf", f"subtitles='{escaped}':force_style='{style}'",
*video_codec,
"-c:a", "copy",
output_path,
]
subprocess.run(cmd, capture_output=True, check=True)
print(f"[FFmpeg] Done β†’ {output_path}")
return output_path
# ═══════════════════════════════════════════════════════════════════════════════
# CLEANUP
# ═══════════════════════════════════════════════════════════════════════════════
def _cleanup(session_id: str, keep_final: bool = False) -> None:
"""Remove intermediate files, optionally keeping the final output."""
patterns = [f"{session_id}_audio.wav", f"{session_id}_voice.wav",
f"{session_id}_dubbed.wav", f"{session_id}_merged.mp4",
f"{session_id}_translated.srt"]
if not keep_final:
patterns.append(f"{session_id}_final.mp4")
for pat in patterns:
for f in glob.glob(str(WORK_DIR / pat)):
try:
os.remove(f)
except OSError:
pass
# ═══════════════════════════════════════════════════════════════════════════════
# GRADIO PIPELINE
# ═══════════════════════════════════════════════════════════════════════════════
def run_pipeline(video_file, dub_language: str, progress=gr.Progress()):
session_id = None
try:
if video_file is None:
return "❌ Please upload a video file.", None
# Handle both string path and file object
if isinstance(video_file, str):
input_video = video_file
else:
input_video = video_file.name
if not os.path.exists(input_video):
return f"❌ Video file not found: {input_video}", None
session_id = str(uuid.uuid4())[:8]
progress(0.05, desc="Extracting audio from video…")
audio_path = str(WORK_DIR / f"{session_id}_audio.wav")
extract_audio_from_video(input_video, audio_path)
progress(0.10, desc="Extracting voice sample…")
voice_path = str(WORK_DIR / f"{session_id}_voice.wav")
extract_voice_sample_from_video(input_video, voice_path)
progress(0.20, desc="Transcribing with Whisper…")
chunks = transcribe_audio(audio_path)
srt_content = chunks_to_srt(chunks)
progress(0.40, desc=f"Translating to {dub_language}…")
translated_srt = translate_srt(srt_content, target_lang=dub_language)
srt_path = str(WORK_DIR / f"{session_id}_translated.srt")
Path(srt_path).write_text(translated_srt, encoding="utf-8")
progress(0.60, desc="Synthesising dubbed audio…")
dubbed_audio_path = str(WORK_DIR / f"{session_id}_dubbed.wav")
generate_tts_audio(translated_srt, dub_language, voice_path, dubbed_audio_path)
progress(0.80, desc="Merging audio + video…")
merged_path = str(WORK_DIR / f"{session_id}_merged.mp4")
merge_audio_video(input_video, dubbed_audio_path, merged_path)
progress(0.90, desc="Burning subtitles…")
final_path = str(WORK_DIR / f"{session_id}_final.mp4")
burn_subtitles(merged_path, srt_path, final_path, target_lang=dub_language)
progress(1.0, desc="Done!")
# Clean up intermediate files but keep final video
_cleanup(session_id, keep_final=True)
return f"βœ… Done! Video dubbed to **{dub_language}** successfully.", final_path
except subprocess.CalledProcessError as e:
error_msg = f"FFmpeg error: {e.stderr.decode() if e.stderr else str(e)}"
traceback.print_exc()
if session_id:
_cleanup(session_id)
return f"❌ {error_msg}", None
except Exception as exc:
traceback.print_exc()
if session_id:
_cleanup(session_id)
return f"❌ Error: {exc}", None
# ═══════════════════════════════════════════════════════════════════════════════
# GRADIO UI
# ═══════════════════════════════════════════════════════════════════════════════
with gr.Blocks(title="AutoDub - Video Dubbing") as demo:
gr.Markdown(
"""
# 🎬 AutoDub β€” Video Dubbing & Subtitling
Upload a video, choose a target language, and get back a fully dubbed & subtitled MP4.
**Powered by:** Whisper large-v3 + Qwen2.5-7B + XTTS-v2
"""
)
with gr.Row():
with gr.Column(scale=2):
video_input = gr.Video(
label="Upload Video",
sources=["upload"],
)
lang_input = gr.Dropdown(
choices=SUPPORTED_LANGUAGES,
value="French",
label="Target Dub Language",
)
submit_btn = gr.Button("β–Ά Start Dubbing", variant="primary", size="lg")
with gr.Column(scale=3):
status_output = gr.Markdown("_Upload a video and click Start Dubbing…_")
video_output = gr.Video(label="Dubbed Video", interactive=False)
submit_btn.click(
fn=run_pipeline,
inputs=[video_input, lang_input],
outputs=[status_output, video_output],
)
gr.Markdown(
"---\n"
"**Tips:**\n"
"- Processing takes 3–10 minutes depending on video length\n"
"- Best results with videos under 5 minutes\n"
"- Supported languages: Arabic, Chinese, French, German, Italian, Japanese, Korean, "
"Portuguese, Russian, Spanish, Turkish, Hindi, Polish, Dutch, English"
)
if __name__ == "__main__":
demo.queue(max_size=5)
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
show_error=True,
)