import argparse import os import uuid import tempfile import re from pydub import AudioSegment from moviepy.editor import VideoFileClip, AudioFileClip from google.cloud import texttospeech from google.cloud import translate_v2 as translate from transformers import pipeline import spacy from spacy_syllables import SpacySyllables from tqdm import tqdm # ---------------- Hugging Face Whisper config ---------------- HF_WHISPER_MODEL_ID = "openai/whisper-large-v3" # change if you want smaller models # ------------------------------------------------------------- # SpaCy models # ------------------------------------------------------------- spacy_models = { "english": "en_core_web_sm", "german": "de_core_news_sm", "french": "fr_core_news_sm", "italian": "it_core_news_sm", "catalan": "ca_core_news_sm", "chinese": "zh_core_web_sm", "croatian": "hr_core_news_sm", "danish": "da_core_news_sm", "dutch": "nl_core_news_sm", "finnish": "fi_core_news_sm", "greek": "el_core_news_sm", "japanese": "ja_core_web_sm", "korean": "ko_core_news_sm", "lithuanian": "lt_core_news_sm", "macedonian": "mk_core_news_sm", "polish": "pl_core_news_sm", "portuguese": "pt_core_news_sm", "romanian": "ro_core_news_sm", "russian": "ru_core_news_sm", "spanish": "es_core_news_sm", "swedish": "sv_core_news_sm", "ukrainian": "uk_core_news_sm" } ABBREVIATIONS = { "Mr.": "Mister", "Mrs.": "Misses", "No.": "Number", "Dr.": "Doctor", "Ms.": "Miss", "Ave.": "Avenue", "Blvd.": "Boulevard", "Ln.": "Lane", "Rd.": "Road", "a.m.": "before noon", "p.m.": "after noon", "ft.": "feet", "hr.": "hour", "min.": "minute", "sq.": "square", "St.": "street", "Asst.": "assistant", "Corp.": "corporation" } ISWORD = re.compile(r".*\w.*") # ------------------------------------------------------------- # Audio / video helpers # ------------------------------------------------------------- def extract_audio_from_video(video_file): try: print("Extracting audio track") video = VideoFileClip(video_file) audio = video.audio audio_file = os.path.splitext(video_file)[0] + ".wav" audio.write_audiofile(audio_file) return audio_file except Exception as e: print(f"Error extracting audio from video: {e}") return None # ------------------------------------------------------------- # Hugging Face Whisper transcription # ------------------------------------------------------------- def transcribe_audio_hf(audio_file, source_language: str): """ Use Hugging Face Transformers Whisper pipeline to transcribe with timestamps. Returns a structure similar enough to your original Whisper output to reuse the sentence-building logic. We rely on HF's `automatic-speech-recognition` pipeline, with `return_timestamps=True` to get segment/chunk timing. [web:62][web:64][web:71] """ try: print("Loading HF Whisper pipeline") # device=-1 means CPU; for GPU use device=0 asr = pipeline( task="automatic-speech-recognition", model=HF_WHISPER_MODEL_ID, device=-1, # change to 0 if you have CUDA return_timestamps=True ) print("Transcribing audio via Hugging Face Whisper") result = asr( audio_file, generate_kwargs={"language": source_language} ) # HF Whisper pipeline with return_timestamps usually returns: # {"text": "...", "chunks": [{"text": "...", "timestamp": (start, end)}, ...]} [web:62][web:71] # We convert it to a shape compatible with your previous merge logic. segments = [] if "chunks" in result: for ch in result["chunks"]: start, end = ch.get("timestamp", (0.0, 0.0)) text = ch.get("text", "") if not text: continue segments.append( { "start": float(start), "end": float(end), "text": text, # No per-word timing from HF pipeline, but we emulate a single-word segment "words": [ { "word": text.strip(), "start": float(start), "end": float(end) } ] } ) else: # Fallback: single segment, no timestamps segments.append( { "start": 0.0, "end": 0.0, "text": result.get("text", ""), "words": [ { "word": result.get("text", "").strip(), "start": 0.0, "end": 0.0 } ] } ) return {"segments": segments} except Exception as e: print(f"Error transcribing audio with HF Whisper: {e}") return None # ------------------------------------------------------------- # Translation + TTS # ------------------------------------------------------------- def translate_text(texts, target_language): try: translate_client = translate.Client() results = translate_client.translate(texts, target_language=target_language) return [result["translatedText"] for result in results] except Exception as e: print(f"Error translating texts: {e}") return None def create_audio_from_text(text, target_language, target_voice): audio_file = "translated_" + str(uuid.uuid4()) + ".wav" try: client = texttospeech.TextToSpeechClient() input_text = texttospeech.SynthesisInput(text=text) voice = texttospeech.VoiceSelectionParams( language_code=target_language, name=target_voice ) audio_config = texttospeech.AudioConfig( audio_encoding=texttospeech.AudioEncoding.LINEAR16, speaking_rate=1.1 ) response = client.synthesize_speech( request={"input": input_text, "voice": voice, "audio_config": audio_config} ) with open(audio_file, "wb") as out: out.write(response.audio_content) return audio_file except Exception as e: if os.path.isfile(audio_file): os.remove(audio_file) raise Exception(f"Error creating audio from text: {e}") # ------------------------------------------------------------- # Merge translated audio with original using ducking # ------------------------------------------------------------- def merge_audio_files(transcription, source_language, target_language, target_voice, audio_file): temp_files = [] try: ducked_audio = AudioSegment.from_wav(audio_file) if spacy_models[source_language] not in spacy.util.get_installed_models(): import spacy.cli spacy.cli.download(spacy_models[source_language]) nlp = spacy.load(spacy_models[source_language]) nlp.add_pipe("syllables", after="tagger") merged_audio = AudioSegment.silent(duration=0) sentences = [] sentence_starts = [] sentence_ends = [] sentence = "" sent_start = 0 print("Composing sentences from segments") for segment in tqdm(transcription["segments"]): if segment["text"].isupper(): continue for i, word in enumerate(segment["words"]): if not ISWORD.search(word["word"]): continue word["word"] = ABBREVIATIONS.get(word["word"].strip(), word["word"]) if word["word"].startswith("-"): sentence = sentence[:-1] + word["word"] + " " else: sentence += word["word"] + " " word_syllables = sum( token._.syllables_count for token in nlp(word["word"]) if token._.syllables_count ) segment_syllables = sum( token._.syllables_count for token in nlp(segment["text"]) if token._.syllables_count ) if i == 0 or sent_start == 0: duration = max(word["end"] - word["start"], 1e-6) word_speed = word_syllables / duration if word_syllables else 1.0 if word_speed < 3: sent_start = word["end"] - word_syllables / 3 if word_syllables else word["start"] else: sent_start = word["start"] if i == len(segment["words"]) - 1: duration = max(word["end"] - word["start"], 1e-6) word_speed = word_syllables / duration if word_syllables else 1.0 seg_duration = max(segment["end"] - segment["start"], 1e-6) segment_speed = segment_syllables / seg_duration if segment_syllables else 2.0 if word_speed < 1.0 or segment_speed < 2.0: word["word"] += "." if word["word"].endswith("."): sentences.append(sentence) sentence_starts.append(sent_start) sentence_ends.append(word["end"]) sent_start = 0 sentence = "" print("Translating sentences") translated_texts = [] for i in tqdm(range(0, len(sentences), 128)): chunk = sentences[i:i + 128] translated_chunk = translate_text(chunk, target_language) if translated_chunk is None: raise Exception("Translation failed") translated_texts.extend(translated_chunk) print("Creating translated audio track and ducking original") prev_end_time = 0 for i, translated_text in enumerate(tqdm(translated_texts)): translated_audio_file = create_audio_from_text( translated_text, target_language, target_voice ) if translated_audio_file is None: raise Exception("Audio creation failed") temp_files.append(translated_audio_file) translated_audio = AudioSegment.from_wav(translated_audio_file) start_time = int(sentence_starts[i] * 1000) end_time = start_time + len(translated_audio) next_start_time = ( int(sentence_starts[i + 1] * 1000) if i < len(translated_texts) - 1 else len(ducked_audio) ) ducked_segment = ducked_audio[start_time:end_time].apply_gain(-10) fade_out_duration = min(500, max(1, start_time - prev_end_time)) fade_in_duration = min(500, max(1, next_start_time - end_time)) prev_end_time = end_time if start_time == 0: ducked_audio = ducked_segment + ducked_audio[end_time:].fade_in(fade_in_duration) elif end_time == len(ducked_audio): ducked_audio = ducked_audio[:start_time].fade_out(fade_out_duration) + ducked_segment else: ducked_audio = ( ducked_audio[:start_time].fade_out(fade_out_duration) + ducked_segment + ducked_audio[end_time:].fade_in(fade_in_duration) ) ducked_audio = ducked_audio.overlay(translated_audio, position=start_time) original_duration = int(sentence_ends[i] * 1000) new_duration = len(translated_audio) + len(merged_audio) padding_duration = max(0, original_duration - new_duration) padding = AudioSegment.silent(duration=padding_duration) merged_audio += padding + translated_audio return merged_audio, ducked_audio except Exception as e: print(f"Error merging audio files: {e}") return None, None finally: for file in temp_files: try: os.remove(file) except Exception as e: print(f"Error removing temporary file {file}: {e}") # ------------------------------------------------------------- # Save audio / replace in video # ------------------------------------------------------------- def save_audio_to_file(audio, filename): try: audio.export(filename, format="wav") print(f"Audio track with translation only saved to {filename}") except Exception as e: print(f"Error saving audio to file: {e}") def replace_audio_in_video(video_file, new_audio): temp_audio_file = None try: video = VideoFileClip(video_file) with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as temp_audio_file: new_audio.export(temp_audio_file.name, format="wav") new_audio.export("duckled.wav", format="wav") try: new_audio_clip = AudioFileClip(temp_audio_file.name) except Exception as e: print(f"Error loading new audio into an AudioFileClip: {e}") return if new_audio_clip.duration < video.duration: print("Warning: new audio is shorter than video.") elif new_audio_clip.duration > video.duration: print("Warning: new audio is longer than video, trimming.") new_audio_clip = new_audio_clip.subclip(0, video.duration) video = video.set_audio(new_audio_clip) output_filename = os.path.splitext(video_file)[0] + "_translated.mp4" try: video.write_videofile(output_filename, audio_codec="aac") except Exception as e: print(f"Error writing new video file: {e}") return print(f"Translated video saved as {output_filename}") except Exception as e: print(f"Error replacing audio in video: {e}") finally: if temp_audio_file and os.path.isfile(temp_audio_file.name): os.remove(temp_audio_file.name) # ------------------------------------------------------------- # CLI # ------------------------------------------------------------- def main(): parser = argparse.ArgumentParser() parser.add_argument("--input", type=str, required=True, help="Path to source video file") parser.add_argument( "--voice", type=str, default="es-US-Neural2-B", help="Target dubbing voice name from Google TTS voices" ) parser.add_argument( "--credentials", type=str, required=True, help="Path to Google Cloud credentials JSON file" ) parser.add_argument( "--source_language", type=str, default="english", help=f"Source language, e.g. english. Supported: {list(spacy_models.keys())}" ) args = parser.parse_args() os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = args.credentials audio_file = extract_audio_from_video(args.input) if audio_file is None: return transcription = transcribe_audio_hf(audio_file, args.source_language.lower()) if transcription is None: return merged_audio, ducked_audio = merge_audio_files( transcription, args.source_language.lower(), args.voice[:5], # "es-US" style language_code for Google TTS args.voice, audio_file ) if merged_audio is None or ducked_audio is None: return replace_audio_in_video(args.input, ducked_audio) output_filename = os.path.splitext(args.input)[0] + ".wav" save_audio_to_file(merged_audio, output_filename) if __name__ == "__main__": main()