video-dubbing / app.py
XtewaldX's picture
Update app.py
ba6d1ef verified
raw
history blame
4.85 kB
import os
import uuid
import asyncio
import subprocess
import json
import gradio as gr
import ffmpeg
import cv2
import edge_tts
from deep_translator import GoogleTranslator
from huggingface_hub import HfApi
import moviepy.editor as mp
import spaces
HF_TOKEN = os.environ.get("HF_TOKEN")
REPO_ID = "artificialguybr/video-dubbing"
MAX_VIDEO_DURATION = 60
api = HfApi(token=HF_TOKEN)
language_mapping = {
"English": ("en", "en-US-EricNeural"),
"Spanish": ("es", "es-ES-AlvaroNeural"),
"French": ("fr", "fr-FR-HenriNeural"),
"German": ("de", "de-DE-ConradNeural"),
"Italian": ("it", "it-IT-DiegoNeural"),
"Portuguese": ("pt", "pt-PT-DuarteNeural"),
"Polish": ("pl", "pl-PL-MarekNeural"),
"Turkish": ("tr", "tr-TR-AhmetNeural"),
"Russian": ("ru", "ru-RU-DmitryNeural"),
"Dutch": ("nl", "nl-NL-MaartenNeural"),
"Czech": ("cs", "cs-CZ-AntoninNeural"),
"Arabic": ("ar", "ar-SA-HamedNeural"),
"Chinese (Simplified)": ("zh-CN", "zh-CN-YunxiNeural"),
"Japanese": ("ja", "ja-JP-KeitaNeural"),
"Korean": ("ko", "ko-KR-InJoonNeural"),
"Hindi": ("hi", "hi-IN-MadhurNeural"),
}
def generate_unique_filename(ext):
return f"{uuid.uuid4()}{ext}"
def cleanup_files(*files):
for f in files:
if f and os.path.exists(f):
os.remove(f)
@spaces.GPU(duration=90)
def transcribe_audio(file_path):
temp_audio = None
if file_path.endswith((".mp4", ".avi", ".mov", ".flv")):
video = mp.VideoFileClip(file_path)
temp_audio = generate_unique_filename(".wav")
video.audio.write_audiofile(temp_audio)
file_path = temp_audio
output_file = generate_unique_filename(".json")
command = [
"insanely-fast-whisper",
"--file-name",
file_path,
"--device-id",
"0",
"--model-name",
"openai/whisper-large-v3",
"--task",
"transcribe",
"--timestamp",
"chunk",
"--transcript-path",
output_file,
]
subprocess.run(command, check=True)
with open(output_file) as f:
transcription = json.load(f)
result = transcription.get(
"text",
" ".join([chunk["text"] for chunk in transcription.get("chunks", [])]),
)
cleanup_files(output_file, temp_audio)
return result
async def text_to_speech(text, voice, output_file):
communicate = edge_tts.Communicate(text, voice)
await communicate.save(output_file)
@spaces.GPU
def process_video(video, target_language, use_wav2lip):
try:
run_uuid = uuid.uuid4().hex[:6]
resized_video = f"{run_uuid}_resized.mp4"
ffmpeg.input(video).output(resized_video, vf="scale=-2:720").run()
video_info = ffmpeg.probe(resized_video)
duration = float(video_info["streams"][0]["duration"])
if duration > MAX_VIDEO_DURATION:
raise ValueError("Video longer than 60 seconds")
audio_file = f"{run_uuid}_audio.wav"
ffmpeg.input(resized_video).output(
audio_file, acodec="pcm_s24le", ar=48000, map="a"
).run()
filtered_audio = f"{run_uuid}_filtered.wav"
subprocess.run(
f"ffmpeg -y -i {audio_file} -af lowpass=3000,highpass=100 {filtered_audio}",
shell=True,
check=True,
)
whisper_text = transcribe_audio(filtered_audio)
target_lang_code, voice = language_mapping[target_language]
translated_text = GoogleTranslator(
source="auto", target=target_lang_code
).translate(whisper_text)
synth_audio = f"{run_uuid}_tts.wav"
asyncio.run(text_to_speech(translated_text, voice, synth_audio))
output_video = f"{run_uuid}_dubbed.mp4"
subprocess.run(
f"ffmpeg -i {resized_video} -i {synth_audio} -c:v copy -c:a aac -map 0:v:0 -map 1:a:0 {output_video}",
shell=True,
check=True,
)
cleanup_files(resized_video, audio_file, filtered_audio, synth_audio)
return output_video, ""
except Exception as e:
return None, str(e)
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("# AI Video Dubbing")
with gr.Row():
with gr.Column():
video_input = gr.Video(label="Upload Video")
target_language = gr.Dropdown(
choices=list(language_mapping.keys()), value="Spanish"
)
use_wav2lip = gr.Checkbox(label="Use Wav2Lip", value=False)
submit = gr.Button("Process")
with gr.Column():
output_video = gr.Video()
error = gr.Textbox(label="Status")
submit.click(
process_video,
inputs=[video_input, target_language, use_wav2lip],
outputs=[output_video, error],
)
demo.queue()
demo.launch()