import tempfile import os import shutil import librosa import json import subprocess import gc from googletrans import Translator import asyncio from flask import Flask, request, jsonify, send_from_directory from omegaconf import OmegaConf import torch from diffusers import AutoencoderKL, DDIMScheduler from latentsync.models.unet import UNet3DConditionModel from latentsync.pipelines.lipsync_pipeline import LipsyncPipeline from diffusers.utils.import_utils import is_xformers_available from accelerate.utils import set_seed from latentsync.whisper.audio2feature import Audio2Feature from openai import OpenAI from elevenlabs import set_api_key, generate, play, clone, Voice, VoiceSettings from torch.cuda.amp import autocast # Initialize the Flask app app = Flask(__name__) TEMP_DIR = None VIDEO_DIRECTORY = os.path.abspath("videos") os.makedirs(VIDEO_DIRECTORY, exist_ok=True) def clear_cuda_memory(): torch.cuda.empty_cache() gc.collect() def run_inference(video_path, audio_path, video_out_path, inference_ckpt_path, unet_config_path="configs/unet/second_stage.yaml", inference_steps=20, guidance_scale=1.0, seed=1247): clear_cuda_memory() # Load configuration config = OmegaConf.load(unet_config_path) # Determine proper dtype based on GPU capabilities is_fp16_supported = torch.cuda.is_available() and torch.cuda.get_device_capability()[0] > 7 dtype = torch.float16 if is_fp16_supported else torch.float32 # Setup scheduler scheduler = DDIMScheduler.from_pretrained("configs") # Choose whisper model based on config settings if config.model.cross_attention_dim == 768: whisper_model_path = "checkpoints/whisper/small.pt" elif config.model.cross_attention_dim == 384: whisper_model_path = "checkpoints/whisper/tiny.pt" else: raise NotImplementedError("cross_attention_dim must be 768 or 384") # Initialize the audio encoder audio_encoder = Audio2Feature(model_path=whisper_model_path, device="cuda", num_frames=config.data.num_frames) # Load VAE vae = AutoencoderKL.from_pretrained("stabilityai/sd-vae-ft-mse", torch_dtype=dtype) vae.config.scaling_factor = 0.18215 vae.config.shift_factor = 0 # Load UNet model from the checkpoint unet, _ = UNet3DConditionModel.from_pretrained( OmegaConf.to_container(config.model), inference_ckpt_path, # load checkpoint device="cpu", ) unet = unet.to(dtype=dtype) # Optionally enable memory-efficient attention if available if is_xformers_available(): unet.enable_xformers_memory_efficient_attention() # Initialize the pipeline and move to GPU pipeline = LipsyncPipeline( vae=vae, audio_encoder=audio_encoder, unet=unet, scheduler=scheduler, ).to("cuda") # Set seed if seed != -1: set_seed(seed) else: torch.seed() with autocast(): try: pipeline( video_path=video_path, audio_path=audio_path, video_out_path=video_out_path, video_mask_path=video_out_path.replace(".mp4", "_mask.mp4"), num_frames=config.data.num_frames, num_inference_steps=inference_steps, guidance_scale=guidance_scale, weight_dtype=dtype, width=config.data.resolution, height=config.data.resolution, ) finally: clear_cuda_memory() def create_temp_dir(): return tempfile.TemporaryDirectory() def generate_audio(voice_cloning, text_prompt): if voice_cloning == 'yes': print('Entering Custom Audio creation using elevenlabs') set_api_key('92e149985ea2732b4359c74346c3daee') voice = Voice(voice_id="VJpttplXHolgV2leGe5V",name="Marc",settings=VoiceSettings( stability=0.71, similarity_boost=0.9, style=0.0, use_speaker_boost=True),) audio = generate(text = text_prompt, voice = voice, model = "eleven_multilingual_v2",stream=True, latency=4) with tempfile.NamedTemporaryFile(suffix=".mp3", prefix="cloned_audio_",dir=TEMP_DIR.name, delete=False) as temp_file: for chunk in audio: temp_file.write(chunk) driven_audio_path = temp_file.name print('driven_audio_path',driven_audio_path) return driven_audio_path elif voice_cloning == 'no': voice = 'echo' print('Entering Default Audio creation using elevenlabs') set_api_key('92e149985ea2732b4359c74346c3daee') audio = generate(text = text_prompt, voice = "Daniel", model = "eleven_multilingual_v2",stream=True, latency=4) with tempfile.NamedTemporaryFile(suffix=".mp3", prefix="default_audio_",dir=TEMP_DIR.name, delete=False) as temp_file: for chunk in audio: temp_file.write(chunk) driven_audio_path = temp_file.name print('driven_audio_path',driven_audio_path) return driven_audio_path def get_video_duration(video_path): """Extracts video duration dynamically using ffprobe.""" cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "json", video_path ] result = subprocess.run(cmd, capture_output=True, text=True) duration = json.loads(result.stdout)["format"]["duration"] return float(duration) def extend_video_simple(video_path, audio_path, output_path): """Extends video duration by appending a reversed version if audio is longer.""" audio_duration = librosa.get_duration(path=audio_path) video_duration = get_video_duration(video_path) print(f"Video Duration: {video_duration:.2f} sec") print(f"Audio Duration: {audio_duration:.2f} sec") if audio_duration > video_duration: print("Extending video by adding reversed version.") # Create a reversed version of the full video reversed_clip = tempfile.NamedTemporaryFile(dir=TEMP_DIR.name, delete=False, suffix=".mp4").name subprocess.run( f"ffmpeg -y -i {video_path} -vf reverse -an {reversed_clip}", shell=True ) # Merge original + reversed subprocess.run( f"ffmpeg -y -i {video_path} -i {reversed_clip} -filter_complex \"[0:v:0][1:v:0]concat=n=2:v=1[outv]\" -map \"[outv]\" -an {output_path}", shell=True ) else: print("Audio is not longer than video. No extension needed.") subprocess.run(f"cp {video_path} {output_path}", shell=True) def extend_video_loop(video_path, audio_path, output_path): """Extends video duration by repeating original and reversed video until it meets/exceeds audio duration.""" audio_duration = librosa.get_duration(path=audio_path) video_duration = get_video_duration(video_path) print(f"Video Duration: {video_duration:.2f} sec") print(f"Audio Duration: {audio_duration:.2f} sec") if audio_duration > video_duration: print("Extending video by repeating original and reversed versions.") # Create reversed video reversed_clip = tempfile.NamedTemporaryFile(dir=TEMP_DIR.name, delete=False, suffix=".mp4").name subprocess.run( f"ffmpeg -y -i {video_path} -vf reverse -an {reversed_clip}", shell=True ) # Generate a list of clips to reach/exceed audio duration video_clips = [video_path, reversed_clip] total_duration = video_duration * 2 # Original + reversed while total_duration < audio_duration: video_clips.append(video_path) video_clips.append(reversed_clip) total_duration += video_duration * 2 print(f"Total Clips: {len(video_clips)}") # Use FFmpeg filter_complex concat for seamless merging concat_filter = "".join(f"[{i}:v:0]" for i in range(len(video_clips))) + f"concat=n={len(video_clips)}:v=1[outv]" input_files = " ".join(f"-i {clip}" for clip in video_clips) subprocess.run( f"ffmpeg -y {input_files} -filter_complex \"{concat_filter}\" -map \"[outv]\" -an {output_path}", shell=True ) print(f"Extended video saved to {output_path}") else: print("Audio is not longer than video. No extension needed.") subprocess.run(f"cp {video_path} {output_path}", shell=True) def translate_text(text, target_language): if not text or text.strip() == "": return "" LANGUAGE_CODES = {"english": "en", "hindi": "hi"} try: # Convert language name to code target_language_code = LANGUAGE_CODES.get(target_language.lower()) # Use Google Translate with proper coroutine handling async def perform_translation(): translator = Translator() result = await translator.translate(text, dest=target_language_code) return result.text if hasattr(result, 'text') else text # Run the async function in the event loop loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) result = loop.run_until_complete(perform_translation()) loop.close() return result except Exception as e: logger.error(f"Error translating text: {e}") # Return original text if translation fails return text @app.route('/run', methods=['POST']) def generate_video(): global TEMP_DIR # global VIDEO_DIRECTORY TEMP_DIR = create_temp_dir() if 'video' not in request.files: return jsonify({'error': 'Video file is required.'}), 400 video_file = request.files['video'] text_prompt = request.form['text_prompt'] print('Input text prompt: ',text_prompt) text_prompt = text_prompt.strip() if not text_prompt: return jsonify({'error': 'Input text prompt cannot be blank'}), 400 voice_cloning = request.form.get('voice_cloning', 'no') target_language = request.form.get('target_language', 'original_text') if target_language != 'original_text': response = translate_text(text_prompt, target_language) text_prompt = response.strip() print('Translated input text prompt: ',text_prompt) temp_audio_path = generate_audio(voice_cloning, text_prompt) with tempfile.NamedTemporaryFile(suffix=".mp4", prefix="input_",dir=TEMP_DIR.name, delete=False) as temp_file: temp_video_path = temp_file.name video_file.save(temp_video_path) print('temp_video_path',temp_video_path) # output_video = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4").name # You can pass additional parameters via form data if needed (e.g., checkpoint path) inference_ckpt_path = request.form.get('inference_ckpt_path', 'checkpoints/latentsync_unet.pt') unet_config_path = request.form.get('unet_config_path', 'configs/unet/second_stage.yaml') output_video = tempfile.NamedTemporaryFile(dir=TEMP_DIR.name, delete=False, suffix=".mp4").name extend_video_loop(temp_video_path, temp_audio_path, output_video) final_output_video = tempfile.NamedTemporaryFile(dir=TEMP_DIR.name, delete=False, suffix="_final_extended.mp4").name try: run_inference( video_path=output_video, audio_path=temp_audio_path, video_out_path=final_output_video, inference_ckpt_path=inference_ckpt_path, unet_config_path=unet_config_path, inference_steps=int(request.form.get('inference_steps', 20)), guidance_scale=float(request.form.get('guidance_scale', 1.0)), seed=int(request.form.get('seed', 1247)) ) # Return the output video path or further process the file for download if final_output_video and final_output_video.endswith('.mp4'): filename = os.path.basename(final_output_video) # os.makedirs('videos', exist_ok=True) # VIDEO_DIRECTORY = os.path.abspath('videos') print("VIDEO_DIRECTORY: ",VIDEO_DIRECTORY) destination_path = os.path.join(VIDEO_DIRECTORY, filename) shutil.copy(final_output_video, destination_path) video_url = f"/videos/{filename}" return jsonify({"message": "Video processed and saved successfully.", "output_video": video_url, "status": "success"}), 200 except Exception as e: return jsonify({'error': str(e)}), 500 @app.route("/videos/", methods=['GET']) def serve_video(filename): # global VIDEO_DIRECTORY return send_from_directory(VIDEO_DIRECTORY, filename, as_attachment=False) @app.route("/health", methods=["GET"]) def health_status(): response = {"online": "true"} return jsonify(response) if __name__ == '__main__': app.run(debug=True)