video-dubbing / app.py
Peeble's picture
Update app.py
57524e6 verified
import argparse
import os
import uuid
import tempfile
import re
from pydub import AudioSegment
from moviepy.editor import VideoFileClip, AudioFileClip
from google.cloud import texttospeech
from google.cloud import translate_v2 as translate
from transformers import pipeline
import spacy
from spacy_syllables import SpacySyllables
from tqdm import tqdm
# ---------------- Hugging Face Whisper config ----------------
HF_WHISPER_MODEL_ID = "openai/whisper-large-v3" # change if you want smaller models
# -------------------------------------------------------------
# SpaCy models
# -------------------------------------------------------------
spacy_models = {
"english": "en_core_web_sm",
"german": "de_core_news_sm",
"french": "fr_core_news_sm",
"italian": "it_core_news_sm",
"catalan": "ca_core_news_sm",
"chinese": "zh_core_web_sm",
"croatian": "hr_core_news_sm",
"danish": "da_core_news_sm",
"dutch": "nl_core_news_sm",
"finnish": "fi_core_news_sm",
"greek": "el_core_news_sm",
"japanese": "ja_core_web_sm",
"korean": "ko_core_news_sm",
"lithuanian": "lt_core_news_sm",
"macedonian": "mk_core_news_sm",
"polish": "pl_core_news_sm",
"portuguese": "pt_core_news_sm",
"romanian": "ro_core_news_sm",
"russian": "ru_core_news_sm",
"spanish": "es_core_news_sm",
"swedish": "sv_core_news_sm",
"ukrainian": "uk_core_news_sm"
}
ABBREVIATIONS = {
"Mr.": "Mister",
"Mrs.": "Misses",
"No.": "Number",
"Dr.": "Doctor",
"Ms.": "Miss",
"Ave.": "Avenue",
"Blvd.": "Boulevard",
"Ln.": "Lane",
"Rd.": "Road",
"a.m.": "before noon",
"p.m.": "after noon",
"ft.": "feet",
"hr.": "hour",
"min.": "minute",
"sq.": "square",
"St.": "street",
"Asst.": "assistant",
"Corp.": "corporation"
}
ISWORD = re.compile(r".*\w.*")
# -------------------------------------------------------------
# Audio / video helpers
# -------------------------------------------------------------
def extract_audio_from_video(video_file):
try:
print("Extracting audio track")
video = VideoFileClip(video_file)
audio = video.audio
audio_file = os.path.splitext(video_file)[0] + ".wav"
audio.write_audiofile(audio_file)
return audio_file
except Exception as e:
print(f"Error extracting audio from video: {e}")
return None
# -------------------------------------------------------------
# Hugging Face Whisper transcription
# -------------------------------------------------------------
def transcribe_audio_hf(audio_file, source_language: str):
"""
Use Hugging Face Transformers Whisper pipeline to transcribe with timestamps.
Returns a structure similar enough to your original Whisper output to reuse
the sentence-building logic.
We rely on HF's `automatic-speech-recognition` pipeline, with
`return_timestamps=True` to get segment/chunk timing. [web:62][web:64][web:71]
"""
try:
print("Loading HF Whisper pipeline")
# device=-1 means CPU; for GPU use device=0
asr = pipeline(
task="automatic-speech-recognition",
model=HF_WHISPER_MODEL_ID,
device=-1, # change to 0 if you have CUDA
return_timestamps=True
)
print("Transcribing audio via Hugging Face Whisper")
result = asr(
audio_file,
generate_kwargs={"language": source_language}
)
# HF Whisper pipeline with return_timestamps usually returns:
# {"text": "...", "chunks": [{"text": "...", "timestamp": (start, end)}, ...]} [web:62][web:71]
# We convert it to a shape compatible with your previous merge logic.
segments = []
if "chunks" in result:
for ch in result["chunks"]:
start, end = ch.get("timestamp", (0.0, 0.0))
text = ch.get("text", "")
if not text:
continue
segments.append(
{
"start": float(start),
"end": float(end),
"text": text,
# No per-word timing from HF pipeline, but we emulate a single-word segment
"words": [
{
"word": text.strip(),
"start": float(start),
"end": float(end)
}
]
}
)
else:
# Fallback: single segment, no timestamps
segments.append(
{
"start": 0.0,
"end": 0.0,
"text": result.get("text", ""),
"words": [
{
"word": result.get("text", "").strip(),
"start": 0.0,
"end": 0.0
}
]
}
)
return {"segments": segments}
except Exception as e:
print(f"Error transcribing audio with HF Whisper: {e}")
return None
# -------------------------------------------------------------
# Translation + TTS
# -------------------------------------------------------------
def translate_text(texts, target_language):
try:
translate_client = translate.Client()
results = translate_client.translate(texts, target_language=target_language)
return [result["translatedText"] for result in results]
except Exception as e:
print(f"Error translating texts: {e}")
return None
def create_audio_from_text(text, target_language, target_voice):
audio_file = "translated_" + str(uuid.uuid4()) + ".wav"
try:
client = texttospeech.TextToSpeechClient()
input_text = texttospeech.SynthesisInput(text=text)
voice = texttospeech.VoiceSelectionParams(
language_code=target_language,
name=target_voice
)
audio_config = texttospeech.AudioConfig(
audio_encoding=texttospeech.AudioEncoding.LINEAR16,
speaking_rate=1.1
)
response = client.synthesize_speech(
request={"input": input_text, "voice": voice, "audio_config": audio_config}
)
with open(audio_file, "wb") as out:
out.write(response.audio_content)
return audio_file
except Exception as e:
if os.path.isfile(audio_file):
os.remove(audio_file)
raise Exception(f"Error creating audio from text: {e}")
# -------------------------------------------------------------
# Merge translated audio with original using ducking
# -------------------------------------------------------------
def merge_audio_files(transcription, source_language, target_language, target_voice, audio_file):
temp_files = []
try:
ducked_audio = AudioSegment.from_wav(audio_file)
if spacy_models[source_language] not in spacy.util.get_installed_models():
import spacy.cli
spacy.cli.download(spacy_models[source_language])
nlp = spacy.load(spacy_models[source_language])
nlp.add_pipe("syllables", after="tagger")
merged_audio = AudioSegment.silent(duration=0)
sentences = []
sentence_starts = []
sentence_ends = []
sentence = ""
sent_start = 0
print("Composing sentences from segments")
for segment in tqdm(transcription["segments"]):
if segment["text"].isupper():
continue
for i, word in enumerate(segment["words"]):
if not ISWORD.search(word["word"]):
continue
word["word"] = ABBREVIATIONS.get(word["word"].strip(), word["word"])
if word["word"].startswith("-"):
sentence = sentence[:-1] + word["word"] + " "
else:
sentence += word["word"] + " "
word_syllables = sum(
token._.syllables_count for token in nlp(word["word"]) if token._.syllables_count
)
segment_syllables = sum(
token._.syllables_count for token in nlp(segment["text"]) if token._.syllables_count
)
if i == 0 or sent_start == 0:
duration = max(word["end"] - word["start"], 1e-6)
word_speed = word_syllables / duration if word_syllables else 1.0
if word_speed < 3:
sent_start = word["end"] - word_syllables / 3 if word_syllables else word["start"]
else:
sent_start = word["start"]
if i == len(segment["words"]) - 1:
duration = max(word["end"] - word["start"], 1e-6)
word_speed = word_syllables / duration if word_syllables else 1.0
seg_duration = max(segment["end"] - segment["start"], 1e-6)
segment_speed = segment_syllables / seg_duration if segment_syllables else 2.0
if word_speed < 1.0 or segment_speed < 2.0:
word["word"] += "."
if word["word"].endswith("."):
sentences.append(sentence)
sentence_starts.append(sent_start)
sentence_ends.append(word["end"])
sent_start = 0
sentence = ""
print("Translating sentences")
translated_texts = []
for i in tqdm(range(0, len(sentences), 128)):
chunk = sentences[i:i + 128]
translated_chunk = translate_text(chunk, target_language)
if translated_chunk is None:
raise Exception("Translation failed")
translated_texts.extend(translated_chunk)
print("Creating translated audio track and ducking original")
prev_end_time = 0
for i, translated_text in enumerate(tqdm(translated_texts)):
translated_audio_file = create_audio_from_text(
translated_text, target_language, target_voice
)
if translated_audio_file is None:
raise Exception("Audio creation failed")
temp_files.append(translated_audio_file)
translated_audio = AudioSegment.from_wav(translated_audio_file)
start_time = int(sentence_starts[i] * 1000)
end_time = start_time + len(translated_audio)
next_start_time = (
int(sentence_starts[i + 1] * 1000)
if i < len(translated_texts) - 1
else len(ducked_audio)
)
ducked_segment = ducked_audio[start_time:end_time].apply_gain(-10)
fade_out_duration = min(500, max(1, start_time - prev_end_time))
fade_in_duration = min(500, max(1, next_start_time - end_time))
prev_end_time = end_time
if start_time == 0:
ducked_audio = ducked_segment + ducked_audio[end_time:].fade_in(fade_in_duration)
elif end_time == len(ducked_audio):
ducked_audio = ducked_audio[:start_time].fade_out(fade_out_duration) + ducked_segment
else:
ducked_audio = (
ducked_audio[:start_time].fade_out(fade_out_duration)
+ ducked_segment
+ ducked_audio[end_time:].fade_in(fade_in_duration)
)
ducked_audio = ducked_audio.overlay(translated_audio, position=start_time)
original_duration = int(sentence_ends[i] * 1000)
new_duration = len(translated_audio) + len(merged_audio)
padding_duration = max(0, original_duration - new_duration)
padding = AudioSegment.silent(duration=padding_duration)
merged_audio += padding + translated_audio
return merged_audio, ducked_audio
except Exception as e:
print(f"Error merging audio files: {e}")
return None, None
finally:
for file in temp_files:
try:
os.remove(file)
except Exception as e:
print(f"Error removing temporary file {file}: {e}")
# -------------------------------------------------------------
# Save audio / replace in video
# -------------------------------------------------------------
def save_audio_to_file(audio, filename):
try:
audio.export(filename, format="wav")
print(f"Audio track with translation only saved to {filename}")
except Exception as e:
print(f"Error saving audio to file: {e}")
def replace_audio_in_video(video_file, new_audio):
temp_audio_file = None
try:
video = VideoFileClip(video_file)
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as temp_audio_file:
new_audio.export(temp_audio_file.name, format="wav")
new_audio.export("duckled.wav", format="wav")
try:
new_audio_clip = AudioFileClip(temp_audio_file.name)
except Exception as e:
print(f"Error loading new audio into an AudioFileClip: {e}")
return
if new_audio_clip.duration < video.duration:
print("Warning: new audio is shorter than video.")
elif new_audio_clip.duration > video.duration:
print("Warning: new audio is longer than video, trimming.")
new_audio_clip = new_audio_clip.subclip(0, video.duration)
video = video.set_audio(new_audio_clip)
output_filename = os.path.splitext(video_file)[0] + "_translated.mp4"
try:
video.write_videofile(output_filename, audio_codec="aac")
except Exception as e:
print(f"Error writing new video file: {e}")
return
print(f"Translated video saved as {output_filename}")
except Exception as e:
print(f"Error replacing audio in video: {e}")
finally:
if temp_audio_file and os.path.isfile(temp_audio_file.name):
os.remove(temp_audio_file.name)
# -------------------------------------------------------------
# CLI
# -------------------------------------------------------------
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--input", type=str, required=True, help="Path to source video file")
parser.add_argument(
"--voice",
type=str,
default="es-US-Neural2-B",
help="Target dubbing voice name from Google TTS voices"
)
parser.add_argument(
"--credentials",
type=str,
required=True,
help="Path to Google Cloud credentials JSON file"
)
parser.add_argument(
"--source_language",
type=str,
default="english",
help=f"Source language, e.g. english. Supported: {list(spacy_models.keys())}"
)
args = parser.parse_args()
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = args.credentials
audio_file = extract_audio_from_video(args.input)
if audio_file is None:
return
transcription = transcribe_audio_hf(audio_file, args.source_language.lower())
if transcription is None:
return
merged_audio, ducked_audio = merge_audio_files(
transcription,
args.source_language.lower(),
args.voice[:5], # "es-US" style language_code for Google TTS
args.voice,
audio_file
)
if merged_audio is None or ducked_audio is None:
return
replace_audio_in_video(args.input, ducked_audio)
output_filename = os.path.splitext(args.input)[0] + ".wav"
save_audio_to_file(merged_audio, output_filename)
if __name__ == "__main__":
main()