|
|
import tempfile |
|
|
import os |
|
|
import shutil |
|
|
import librosa |
|
|
import json |
|
|
import subprocess |
|
|
import gc |
|
|
from googletrans import Translator |
|
|
import asyncio |
|
|
from flask import Flask, request, jsonify, send_from_directory |
|
|
from omegaconf import OmegaConf |
|
|
import torch |
|
|
from diffusers import AutoencoderKL, DDIMScheduler |
|
|
from latentsync.models.unet import UNet3DConditionModel |
|
|
from latentsync.pipelines.lipsync_pipeline import LipsyncPipeline |
|
|
from diffusers.utils.import_utils import is_xformers_available |
|
|
from accelerate.utils import set_seed |
|
|
from latentsync.whisper.audio2feature import Audio2Feature |
|
|
from openai import OpenAI |
|
|
from elevenlabs import set_api_key, generate, play, clone, Voice, VoiceSettings |
|
|
from torch.cuda.amp import autocast |
|
|
|
|
|
|
|
|
app = Flask(__name__) |
|
|
TEMP_DIR = None |
|
|
VIDEO_DIRECTORY = os.path.abspath("videos") |
|
|
os.makedirs(VIDEO_DIRECTORY, exist_ok=True) |
|
|
|
|
|
def clear_cuda_memory(): |
|
|
torch.cuda.empty_cache() |
|
|
gc.collect() |
|
|
|
|
|
def run_inference(video_path, audio_path, video_out_path, |
|
|
inference_ckpt_path, unet_config_path="configs/unet/second_stage.yaml", |
|
|
inference_steps=20, guidance_scale=1.0, seed=1247): |
|
|
clear_cuda_memory() |
|
|
|
|
|
|
|
|
config = OmegaConf.load(unet_config_path) |
|
|
|
|
|
|
|
|
is_fp16_supported = torch.cuda.is_available() and torch.cuda.get_device_capability()[0] > 7 |
|
|
dtype = torch.float16 if is_fp16_supported else torch.float32 |
|
|
|
|
|
|
|
|
scheduler = DDIMScheduler.from_pretrained("configs") |
|
|
|
|
|
|
|
|
if config.model.cross_attention_dim == 768: |
|
|
whisper_model_path = "checkpoints/whisper/small.pt" |
|
|
elif config.model.cross_attention_dim == 384: |
|
|
whisper_model_path = "checkpoints/whisper/tiny.pt" |
|
|
else: |
|
|
raise NotImplementedError("cross_attention_dim must be 768 or 384") |
|
|
|
|
|
|
|
|
audio_encoder = Audio2Feature(model_path=whisper_model_path, |
|
|
device="cuda", num_frames=config.data.num_frames) |
|
|
|
|
|
|
|
|
vae = AutoencoderKL.from_pretrained("stabilityai/sd-vae-ft-mse", torch_dtype=dtype) |
|
|
vae.config.scaling_factor = 0.18215 |
|
|
vae.config.shift_factor = 0 |
|
|
|
|
|
|
|
|
unet, _ = UNet3DConditionModel.from_pretrained( |
|
|
OmegaConf.to_container(config.model), |
|
|
inference_ckpt_path, |
|
|
device="cpu", |
|
|
) |
|
|
unet = unet.to(dtype=dtype) |
|
|
|
|
|
|
|
|
if is_xformers_available(): |
|
|
unet.enable_xformers_memory_efficient_attention() |
|
|
|
|
|
|
|
|
pipeline = LipsyncPipeline( |
|
|
vae=vae, |
|
|
audio_encoder=audio_encoder, |
|
|
unet=unet, |
|
|
scheduler=scheduler, |
|
|
).to("cuda") |
|
|
|
|
|
|
|
|
if seed != -1: |
|
|
set_seed(seed) |
|
|
else: |
|
|
torch.seed() |
|
|
|
|
|
with autocast(): |
|
|
try: |
|
|
pipeline( |
|
|
video_path=video_path, |
|
|
audio_path=audio_path, |
|
|
video_out_path=video_out_path, |
|
|
video_mask_path=video_out_path.replace(".mp4", "_mask.mp4"), |
|
|
num_frames=config.data.num_frames, |
|
|
num_inference_steps=inference_steps, |
|
|
guidance_scale=guidance_scale, |
|
|
weight_dtype=dtype, |
|
|
width=config.data.resolution, |
|
|
height=config.data.resolution, |
|
|
) |
|
|
finally: |
|
|
clear_cuda_memory() |
|
|
|
|
|
def create_temp_dir(): |
|
|
return tempfile.TemporaryDirectory() |
|
|
|
|
|
def generate_audio(voice_cloning, text_prompt): |
|
|
if voice_cloning == 'yes': |
|
|
print('Entering Custom Audio creation using elevenlabs') |
|
|
set_api_key('92e149985ea2732b4359c74346c3daee') |
|
|
voice = Voice(voice_id="VJpttplXHolgV2leGe5V",name="Marc",settings=VoiceSettings( |
|
|
stability=0.71, similarity_boost=0.9, style=0.0, use_speaker_boost=True),) |
|
|
|
|
|
audio = generate(text = text_prompt, voice = voice, model = "eleven_multilingual_v2",stream=True, latency=4) |
|
|
with tempfile.NamedTemporaryFile(suffix=".mp3", prefix="cloned_audio_",dir=TEMP_DIR.name, delete=False) as temp_file: |
|
|
for chunk in audio: |
|
|
temp_file.write(chunk) |
|
|
driven_audio_path = temp_file.name |
|
|
print('driven_audio_path',driven_audio_path) |
|
|
|
|
|
return driven_audio_path |
|
|
|
|
|
elif voice_cloning == 'no': |
|
|
voice = 'echo' |
|
|
print('Entering Default Audio creation using elevenlabs') |
|
|
set_api_key('92e149985ea2732b4359c74346c3daee') |
|
|
audio = generate(text = text_prompt, voice = "Daniel", model = "eleven_multilingual_v2",stream=True, latency=4) |
|
|
with tempfile.NamedTemporaryFile(suffix=".mp3", prefix="default_audio_",dir=TEMP_DIR.name, delete=False) as temp_file: |
|
|
for chunk in audio: |
|
|
temp_file.write(chunk) |
|
|
driven_audio_path = temp_file.name |
|
|
print('driven_audio_path',driven_audio_path) |
|
|
return driven_audio_path |
|
|
|
|
|
|
|
|
|
|
|
def get_video_duration(video_path): |
|
|
"""Extracts video duration dynamically using ffprobe.""" |
|
|
cmd = [ |
|
|
"ffprobe", "-v", "error", "-show_entries", "format=duration", |
|
|
"-of", "json", video_path |
|
|
] |
|
|
result = subprocess.run(cmd, capture_output=True, text=True) |
|
|
duration = json.loads(result.stdout)["format"]["duration"] |
|
|
return float(duration) |
|
|
|
|
|
|
|
|
def extend_video_simple(video_path, audio_path, output_path): |
|
|
"""Extends video duration by appending a reversed version if audio is longer.""" |
|
|
audio_duration = librosa.get_duration(path=audio_path) |
|
|
video_duration = get_video_duration(video_path) |
|
|
|
|
|
print(f"Video Duration: {video_duration:.2f} sec") |
|
|
print(f"Audio Duration: {audio_duration:.2f} sec") |
|
|
|
|
|
if audio_duration > video_duration: |
|
|
print("Extending video by adding reversed version.") |
|
|
|
|
|
|
|
|
reversed_clip = tempfile.NamedTemporaryFile(dir=TEMP_DIR.name, delete=False, suffix=".mp4").name |
|
|
|
|
|
subprocess.run( |
|
|
f"ffmpeg -y -i {video_path} -vf reverse -an {reversed_clip}", shell=True |
|
|
) |
|
|
|
|
|
|
|
|
subprocess.run( |
|
|
f"ffmpeg -y -i {video_path} -i {reversed_clip} -filter_complex \"[0:v:0][1:v:0]concat=n=2:v=1[outv]\" -map \"[outv]\" -an {output_path}", |
|
|
shell=True |
|
|
) |
|
|
else: |
|
|
print("Audio is not longer than video. No extension needed.") |
|
|
subprocess.run(f"cp {video_path} {output_path}", shell=True) |
|
|
|
|
|
|
|
|
def extend_video_loop(video_path, audio_path, output_path): |
|
|
"""Extends video duration by repeating original and reversed video until it meets/exceeds audio duration.""" |
|
|
audio_duration = librosa.get_duration(path=audio_path) |
|
|
video_duration = get_video_duration(video_path) |
|
|
|
|
|
print(f"Video Duration: {video_duration:.2f} sec") |
|
|
print(f"Audio Duration: {audio_duration:.2f} sec") |
|
|
|
|
|
if audio_duration > video_duration: |
|
|
print("Extending video by repeating original and reversed versions.") |
|
|
|
|
|
|
|
|
reversed_clip = tempfile.NamedTemporaryFile(dir=TEMP_DIR.name, delete=False, suffix=".mp4").name |
|
|
subprocess.run( |
|
|
f"ffmpeg -y -i {video_path} -vf reverse -an {reversed_clip}", shell=True |
|
|
) |
|
|
|
|
|
|
|
|
video_clips = [video_path, reversed_clip] |
|
|
total_duration = video_duration * 2 |
|
|
|
|
|
while total_duration < audio_duration: |
|
|
video_clips.append(video_path) |
|
|
video_clips.append(reversed_clip) |
|
|
total_duration += video_duration * 2 |
|
|
|
|
|
print(f"Total Clips: {len(video_clips)}") |
|
|
|
|
|
|
|
|
concat_filter = "".join(f"[{i}:v:0]" for i in range(len(video_clips))) + f"concat=n={len(video_clips)}:v=1[outv]" |
|
|
input_files = " ".join(f"-i {clip}" for clip in video_clips) |
|
|
|
|
|
subprocess.run( |
|
|
f"ffmpeg -y {input_files} -filter_complex \"{concat_filter}\" -map \"[outv]\" -an {output_path}", |
|
|
shell=True |
|
|
) |
|
|
|
|
|
print(f"Extended video saved to {output_path}") |
|
|
|
|
|
else: |
|
|
print("Audio is not longer than video. No extension needed.") |
|
|
subprocess.run(f"cp {video_path} {output_path}", shell=True) |
|
|
|
|
|
|
|
|
def translate_text(text, target_language): |
|
|
if not text or text.strip() == "": |
|
|
return "" |
|
|
LANGUAGE_CODES = {"english": "en", "hindi": "hi"} |
|
|
try: |
|
|
|
|
|
target_language_code = LANGUAGE_CODES.get(target_language.lower()) |
|
|
|
|
|
|
|
|
async def perform_translation(): |
|
|
translator = Translator() |
|
|
result = await translator.translate(text, dest=target_language_code) |
|
|
return result.text if hasattr(result, 'text') else text |
|
|
|
|
|
|
|
|
loop = asyncio.new_event_loop() |
|
|
asyncio.set_event_loop(loop) |
|
|
result = loop.run_until_complete(perform_translation()) |
|
|
loop.close() |
|
|
|
|
|
return result |
|
|
except Exception as e: |
|
|
logger.error(f"Error translating text: {e}") |
|
|
|
|
|
return text |
|
|
|
|
|
|
|
|
@app.route('/run', methods=['POST']) |
|
|
def generate_video(): |
|
|
global TEMP_DIR |
|
|
|
|
|
TEMP_DIR = create_temp_dir() |
|
|
|
|
|
if 'video' not in request.files: |
|
|
return jsonify({'error': 'Video file is required.'}), 400 |
|
|
|
|
|
video_file = request.files['video'] |
|
|
text_prompt = request.form['text_prompt'] |
|
|
print('Input text prompt: ',text_prompt) |
|
|
text_prompt = text_prompt.strip() |
|
|
if not text_prompt: |
|
|
return jsonify({'error': 'Input text prompt cannot be blank'}), 400 |
|
|
|
|
|
voice_cloning = request.form.get('voice_cloning', 'no') |
|
|
target_language = request.form.get('target_language', 'original_text') |
|
|
|
|
|
if target_language != 'original_text': |
|
|
response = translate_text(text_prompt, target_language) |
|
|
text_prompt = response.strip() |
|
|
print('Translated input text prompt: ',text_prompt) |
|
|
|
|
|
|
|
|
temp_audio_path = generate_audio(voice_cloning, text_prompt) |
|
|
with tempfile.NamedTemporaryFile(suffix=".mp4", prefix="input_",dir=TEMP_DIR.name, delete=False) as temp_file: |
|
|
temp_video_path = temp_file.name |
|
|
video_file.save(temp_video_path) |
|
|
print('temp_video_path',temp_video_path) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
inference_ckpt_path = request.form.get('inference_ckpt_path', 'checkpoints/latentsync_unet.pt') |
|
|
unet_config_path = request.form.get('unet_config_path', 'configs/unet/second_stage.yaml') |
|
|
|
|
|
output_video = tempfile.NamedTemporaryFile(dir=TEMP_DIR.name, delete=False, suffix=".mp4").name |
|
|
|
|
|
extend_video_loop(temp_video_path, temp_audio_path, output_video) |
|
|
final_output_video = tempfile.NamedTemporaryFile(dir=TEMP_DIR.name, delete=False, suffix="_final_extended.mp4").name |
|
|
|
|
|
|
|
|
try: |
|
|
run_inference( |
|
|
video_path=output_video, |
|
|
audio_path=temp_audio_path, |
|
|
video_out_path=final_output_video, |
|
|
inference_ckpt_path=inference_ckpt_path, |
|
|
unet_config_path=unet_config_path, |
|
|
inference_steps=int(request.form.get('inference_steps', 20)), |
|
|
guidance_scale=float(request.form.get('guidance_scale', 1.0)), |
|
|
seed=int(request.form.get('seed', 1247)) |
|
|
) |
|
|
|
|
|
if final_output_video and final_output_video.endswith('.mp4'): |
|
|
filename = os.path.basename(final_output_video) |
|
|
|
|
|
|
|
|
print("VIDEO_DIRECTORY: ",VIDEO_DIRECTORY) |
|
|
destination_path = os.path.join(VIDEO_DIRECTORY, filename) |
|
|
shutil.copy(final_output_video, destination_path) |
|
|
video_url = f"/videos/{filename}" |
|
|
|
|
|
return jsonify({"message": "Video processed and saved successfully.", |
|
|
"output_video": video_url, |
|
|
"status": "success"}), 200 |
|
|
except Exception as e: |
|
|
return jsonify({'error': str(e)}), 500 |
|
|
|
|
|
@app.route("/videos/<string:filename>", methods=['GET']) |
|
|
def serve_video(filename): |
|
|
|
|
|
return send_from_directory(VIDEO_DIRECTORY, filename, as_attachment=False) |
|
|
|
|
|
@app.route("/health", methods=["GET"]) |
|
|
def health_status(): |
|
|
response = {"online": "true"} |
|
|
return jsonify(response) |
|
|
|
|
|
if __name__ == '__main__': |
|
|
app.run(debug=True) |