Spaces:
Running
on
Zero
Running
on
Zero
| import os, random, time | |
| import spaces | |
| import uuid | |
| import tempfile, shutil | |
| from pydub import AudioSegment | |
| import gradio as gr | |
| from huggingface_hub import snapshot_download | |
| # Download models | |
| os.makedirs("checkpoints", exist_ok=True) | |
| # List of subdirectories to create inside "checkpoints" | |
| subfolders = [ | |
| "vae", | |
| "wav2vec2", | |
| "emotion2vec_plus_large" | |
| ] | |
| # Create each subdirectory | |
| for subfolder in subfolders: | |
| os.makedirs(os.path.join("checkpoints", subfolder), exist_ok=True) | |
| snapshot_download( | |
| repo_id = "memoavatar/memo", | |
| local_dir = "./checkpoints" | |
| ) | |
| snapshot_download( | |
| repo_id = "stabilityai/sd-vae-ft-mse", | |
| local_dir = "./checkpoints/vae" | |
| ) | |
| snapshot_download( | |
| repo_id = "facebook/wav2vec2-base-960h", | |
| local_dir = "./checkpoints/wav2vec2" | |
| ) | |
| snapshot_download( | |
| repo_id = "emotion2vec/emotion2vec_plus_large", | |
| local_dir = "./checkpoints/emotion2vec_plus_large" | |
| ) | |
| import torch | |
| # CUDA version | |
| print("CUDA Version (from PyTorch):", torch.version.cuda) | |
| # cuDNN version | |
| print("cuDNN Version (from PyTorch):", torch.backends.cudnn.version()) | |
| # Is CUDA available | |
| print("Is CUDA available:", torch.cuda.is_available()) | |
| from diffusers import AutoencoderKL, FlowMatchEulerDiscreteScheduler | |
| from tqdm import tqdm | |
| from memo.models.audio_proj import AudioProjModel | |
| from memo.models.image_proj import ImageProjModel | |
| from memo.models.unet_2d_condition import UNet2DConditionModel | |
| from memo.models.unet_3d import UNet3DConditionModel | |
| from memo.pipelines.video_pipeline import VideoPipeline | |
| from memo.utils.audio_utils import extract_audio_emotion_labels, preprocess_audio, resample_audio | |
| from memo.utils.vision_utils import preprocess_image, tensor_to_video | |
| device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu") | |
| weight_dtype = torch.bfloat16 | |
| vae = AutoencoderKL.from_pretrained("./checkpoints/vae").to(device=device, dtype=weight_dtype) | |
| reference_net = UNet2DConditionModel.from_pretrained("./checkpoints", subfolder="reference_net", use_safetensors=True) | |
| diffusion_net = UNet3DConditionModel.from_pretrained("./checkpoints", subfolder="diffusion_net", use_safetensors=True) | |
| image_proj = ImageProjModel.from_pretrained("./checkpoints", subfolder="image_proj", use_safetensors=True) | |
| audio_proj = AudioProjModel.from_pretrained("./checkpoints", subfolder="audio_proj", use_safetensors=True) | |
| vae.requires_grad_(False).eval() | |
| reference_net.requires_grad_(False).eval() | |
| diffusion_net.requires_grad_(False).eval() | |
| image_proj.requires_grad_(False).eval() | |
| audio_proj.requires_grad_(False).eval() | |
| noise_scheduler = FlowMatchEulerDiscreteScheduler() | |
| pipeline = VideoPipeline(vae=vae, reference_net=reference_net, diffusion_net=diffusion_net, scheduler=noise_scheduler, image_proj=image_proj) | |
| pipeline.to(device=device, dtype=weight_dtype) | |
| def process_audio(file_path, temp_dir): | |
| # Load the audio file | |
| audio = AudioSegment.from_file(file_path) | |
| # Check and cut the audio if longer than 9 seconds | |
| max_duration = 9 * 1000 # 9 seconds in milliseconds | |
| if len(audio) > max_duration: | |
| audio = audio[:max_duration] | |
| # Save the processed audio in the temporary directory | |
| output_path = os.path.join(temp_dir, "trimmed_audio.wav") | |
| audio.export(output_path, format="wav") | |
| # Return the path to the trimmed file | |
| print(f"Processed audio saved at: {output_path}") | |
| return output_path | |
| def generate(input_video, input_audio, seed, progress=gr.Progress(track_tqdm=True)): | |
| """ | |
| Generates a talking-head video synchronized with the input audio using the MEMO pipeline. | |
| This function combines an input face image and an audio clip to create a temporally coherent | |
| and emotionally expressive talking video. It leverages a memory-guided diffusion model | |
| conditioned on audio features, emotional cues, and visual context. | |
| Args: | |
| input_video (str): Path to the input image file (used as the reference face). | |
| input_audio (str): Path to the input audio file (speech or dialogue). | |
| seed (int): Random seed for deterministic results. Use 0 for a randomly generated seed. | |
| progress (gr.Progress, optional): Gradio progress tracker (automatically passed by Gradio). | |
| Returns: | |
| str: File path to the generated output video (MP4 format). | |
| """ | |
| gr.Info("580 seconds will be allocated from your daily ZeroGPU time credits.") | |
| pipeline.reference_net.enable_xformers_memory_efficient_attention() | |
| pipeline.diffusion_net.enable_xformers_memory_efficient_attention() | |
| is_shared_ui = True if "fffiloni/MEMO" in os.environ['SPACE_ID'] else False | |
| temp_dir = None | |
| if is_shared_ui: | |
| temp_dir = tempfile.mkdtemp() | |
| input_audio = process_audio(input_audio, temp_dir) | |
| print(f"Processed file was stored temporarily at: {input_audio}") | |
| resolution = 512 | |
| num_generated_frames_per_clip = 16 | |
| fps = 30 | |
| num_init_past_frames = 2 | |
| num_past_frames = 16 | |
| inference_steps = 20 | |
| cfg_scale = 3.5 | |
| if seed == 0: | |
| random.seed(int(time.time())) | |
| seed = random.randint(0, 18446744073709551615) | |
| generator = torch.manual_seed(seed) | |
| img_size = (resolution, resolution) | |
| pixel_values, face_emb = preprocess_image(face_analysis_model="./checkpoints/misc/face_analysis", image_path=input_video, image_size=resolution) | |
| output_dir = "./outputs" | |
| os.makedirs(output_dir, exist_ok=True) | |
| cache_dir = os.path.join(output_dir, "audio_preprocess") | |
| os.makedirs(cache_dir, exist_ok=True) | |
| input_audio = resample_audio(input_audio, os.path.join(cache_dir, f"{os.path.basename(input_audio).split('.')[0]}-16k.wav")) | |
| if is_shared_ui: | |
| # Clean up the temporary directory | |
| if os.path.exists(temp_dir): | |
| shutil.rmtree(temp_dir) | |
| print(f"Temporary directory {temp_dir} deleted.") | |
| audio_emb, audio_length = preprocess_audio( | |
| wav_path=input_audio, | |
| num_generated_frames_per_clip=num_generated_frames_per_clip, | |
| fps=fps, | |
| wav2vec_model="./checkpoints/wav2vec2", | |
| vocal_separator_model="./checkpoints/misc/vocal_separator/Kim_Vocal_2.onnx", | |
| cache_dir=cache_dir, | |
| device=device, | |
| ) | |
| audio_emotion, num_emotion_classes = extract_audio_emotion_labels( | |
| model="./checkpoints", | |
| wav_path=input_audio, | |
| emotion2vec_model="./checkpoints/emotion2vec_plus_large", | |
| audio_length=audio_length, | |
| device=device, | |
| ) | |
| video_frames = [] | |
| num_clips = audio_emb.shape[0] // num_generated_frames_per_clip | |
| for t in tqdm(range(num_clips), desc="Generating video clips"): | |
| if len(video_frames) == 0: | |
| past_frames = pixel_values.repeat(num_init_past_frames, 1, 1, 1) | |
| past_frames = past_frames.to(dtype=pixel_values.dtype, device=pixel_values.device) | |
| pixel_values_ref_img = torch.cat([pixel_values, past_frames], dim=0) | |
| else: | |
| past_frames = video_frames[-1][0] | |
| past_frames = past_frames.permute(1, 0, 2, 3) | |
| past_frames = past_frames[0 - num_past_frames :] | |
| past_frames = past_frames * 2.0 - 1.0 | |
| past_frames = past_frames.to(dtype=pixel_values.dtype, device=pixel_values.device) | |
| pixel_values_ref_img = torch.cat([pixel_values, past_frames], dim=0) | |
| pixel_values_ref_img = pixel_values_ref_img.unsqueeze(0) | |
| audio_tensor = (audio_emb[t * num_generated_frames_per_clip : min((t + 1) * num_generated_frames_per_clip, audio_emb.shape[0])].unsqueeze(0).to(device=audio_proj.device, dtype=audio_proj.dtype)) | |
| audio_tensor = audio_proj(audio_tensor) | |
| audio_emotion_tensor = audio_emotion[t * num_generated_frames_per_clip : min((t + 1) * num_generated_frames_per_clip, audio_emb.shape[0])] | |
| pipeline_output = pipeline( | |
| ref_image=pixel_values_ref_img, | |
| audio_tensor=audio_tensor, | |
| audio_emotion=audio_emotion_tensor, | |
| emotion_class_num=num_emotion_classes, | |
| face_emb=face_emb, | |
| width=img_size[0], | |
| height=img_size[1], | |
| video_length=num_generated_frames_per_clip, | |
| num_inference_steps=inference_steps, | |
| guidance_scale=cfg_scale, | |
| generator=generator, | |
| ) | |
| video_frames.append(pipeline_output.videos) | |
| pipeline.reference_net.disable_xformers_memory_efficient_attention() | |
| pipeline.diffusion_net.disable_xformers_memory_efficient_attention() | |
| video_frames = torch.cat(video_frames, dim=2) | |
| video_frames = video_frames.squeeze(0) | |
| video_frames = video_frames[:, :audio_length] | |
| # Save the output video | |
| unique_id = str(uuid.uuid4()) | |
| video_path = os.path.join(output_dir, f"memo-{seed}_{unique_id}.mp4") | |
| tensor_to_video(video_frames, video_path, input_audio, fps=fps) | |
| return video_path | |
| with gr.Blocks(analytics_enabled=False) as demo: | |
| with gr.Column(): | |
| gr.Markdown("# MEMO: Memory-Guided Diffusion for Expressive Talking Video Generation") | |
| gr.Markdown("Note: Audio length is trimmed to max 9 seconds, using 580 GPU credits.") | |
| gr.Markdown("Consider adding word at beginning of clip that is later trimmable since lipsync does not always start immediately.") | |
| gr.HTML(""" | |
| <div style="display:flex;column-gap:4px;"> | |
| <a href="https://github.com/memoavatar/memo"> | |
| <img src='https://img.shields.io/badge/GitHub-Repo-blue'> | |
| </a> | |
| </div> | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| input_video = gr.Image(label="Upload Input Image", type="filepath") | |
| input_audio = gr.Audio(label="Upload Input Audio", type="filepath") | |
| seed = gr.Number(label="Seed (0 for Random)", value=0, precision=0) | |
| with gr.Column(): | |
| video_output = gr.Video(label="Generated Video") | |
| generate_button = gr.Button("Generate") | |
| generate_button.click( | |
| fn=generate, | |
| inputs=[input_video, input_audio, seed], | |
| outputs=[video_output], | |
| ) | |
| demo.queue().launch(share=False, show_api=True, show_error=True, mcp_server=True) | |