import torch # --- Patch torch.xpu to prevent diffusers and PyTorch random seed functions from crashing --- if not hasattr(torch, "xpu"): class DummyXPU: @staticmethod def empty_cache(*args, **kwargs): pass @staticmethod def is_available(*args, **kwargs): return False @staticmethod def device_count(*args, **kwargs): return 0 @staticmethod def manual_seed(*args, **kwargs): pass @staticmethod def _is_in_bad_fork(*args, **kwargs): return False def __getattr__(self, name): # Dynamically handles any unexpected attributes/methods queried by PyTorch or diffusers return lambda *args, **kwargs: None torch.xpu = DummyXPU() import os import sys import time import subprocess import cv2 import gradio as gr import spaces from huggingface_hub import snapshot_download from huggingface_hub.utils import GatedRepoError, RepositoryNotFoundError, RevisionNotFoundError from pathlib import Path import tempfile from pydub import AudioSegment # Add the src directory to the system path to allow for local imports sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), 'src'))) from models.inference.moda_test import LiveVASAPipeline, emo_map, set_seed # --- Configuration --- # Set seed for reproducibility set_seed(42) # Paths and constants for the Gradio demo DEFAULT_CFG_PATH = "configs/audio2motion/inference/inference.yaml" DEFAULT_MOTION_MEAN_STD_PATH = "src/datasets/mean.pt" DEFAULT_SILENT_AUDIO_PATH = "src/examples/silent-audio.wav" OUTPUT_DIR = "gradio_output" WEIGHTS_DIR = "pretrain_weights" REPO_ID = "lixinyizju/moda" # --- Automatic Config Patching for Aspect Ratio Preservation --- def patch_liveportrait_config(): """ Patches the liveportrait_config.yaml to enable 'flag_pasteback'. This stitches the animated face back onto the original high-resolution full-sized source image, maintaining the exact original image size and aspect ratio. """ config_path = "configs/audio2motion/model/liveportrait_config.yaml" if os.path.exists(config_path): try: import yaml with open(config_path, 'r', encoding='utf-8') as f: config_data = yaml.safe_load(f) or {} # Enable pasteback to restore original image dimensions in the output video if config_data.get('flag_pasteback') is not True: config_data['flag_pasteback'] = True with open(config_path, 'w', encoding='utf-8') as f: yaml.dump(config_data, f) print("Successfully patched liveportrait_config.yaml: flag_pasteback set to True") except Exception as e: print(f"Warning: Failed to automatically patch configuration file: {e}") # --- Download Pre-trained Weights from Hugging Face Hub --- def download_weights(): """ Downloads pre-trained weights from Hugging Face Hub if they don't exist locally. """ # A simple check for a key file to see if the download is likely complete motion_model_file = os.path.join(WEIGHTS_DIR, "moda", "net-200.pth") if not os.path.exists(motion_model_file): print(f"Weights not found locally. Downloading from Hugging Face Hub repo '{REPO_ID}'...") print(f"This may take a while depending on your internet connection.") try: snapshot_download( repo_id=REPO_ID, local_dir=WEIGHTS_DIR, local_dir_use_symlinks=False, # Use False to copy files directly; safer for Windows resume_download=True, ) print("Weights downloaded successfully.") except GatedRepoError: raise gr.Error(f"Access to the repository '{REPO_ID}' is gated. Please visit https://huggingface.co/{REPO_ID} to request access.") except (RepositoryNotFoundError, RevisionNotFoundError): raise gr.Error(f"The repository '{REPO_ID}' was not found. Please check the repository ID.") except Exception as e: print(f"An error occurred during download: {e}") raise gr.Error(f"Failed to download models. Please check your internet connection and try again. Error: {e}") else: print(f"Found existing weights at '{WEIGHTS_DIR}'. Skipping download.") # --- Audio Conversion and Trimming Function --- def ensure_wav_format(audio_path, max_duration_ms=300000): # Increased to 5 minutes to support longer generations """ Ensures the audio file is in WAV format and limits its duration to max_duration_ms. Always returns the path to the processed temporary WAV file. """ if audio_path is None: return None audio_path = Path(audio_path) try: print(f"Loading and processing audio from: {audio_path}") # Load the audio file (supports wav, mp3, m4a, etc.) audio = AudioSegment.from_file(audio_path) # Trim if the audio exceeds the limit if len(audio) > max_duration_ms: print(f"Audio is {len(audio)/1000:.2f}s, trimming to {max_duration_ms/1000:.2f}s") audio = audio[:max_duration_ms] else: print(f"Audio is {len(audio)/1000:.2f}s (under the {max_duration_ms/1000:.2f}s limit)") # Create a temporary WAV file with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp_file: wav_path = tmp_file.name # Export as WAV with standard settings audio.export( wav_path, format='wav', parameters=["-ar", "16000", "-ac", "1"] # 16kHz, mono ) print(f"Audio processed successfully: {wav_path}") return wav_path except Exception as e: print(f"Error processing audio: {e}") raise gr.Error(f"Failed to process or convert audio file. Error: {e}") # --- Helper Functions for Segmented Generation --- def extract_last_frame(video_path, output_image_path): """ Extracts the absolute last frame of the given video and saves it to output_image_path. """ cap = cv2.VideoCapture(str(video_path)) if not cap.isOpened(): raise Exception(f"Could not open video: {video_path}") frame = None success = False while True: ret, f = cap.read() if not ret: break frame = f success = True cap.release() if success and frame is not None: cv2.imwrite(output_image_path, frame) return True else: raise Exception("Failed to read frames from video.") def concatenate_videos_ffmpeg(video_paths, full_audio_path, output_path): """ Stitches multiple video files together (video track only) and overlays the original full audio track. This bypasses any pops or sync problems at the chunk boundaries. """ # Create the text file listing all videos with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False, encoding='utf-8') as f: for p in video_paths: # Format absolute path with forward slashes and escape single quotes abs_path = os.path.abspath(p).replace('\\', '/') safe_path = abs_path.replace("'", "'\\''") f.write(f"file '{safe_path}'\n") list_file_path = f.name temp_video_only = os.path.join(os.path.dirname(output_path), "temp_video_only.mp4") try: # Step 1: Concatenate video tracks only (discard original segment audios to avoid jumps) cmd_copy = [ 'ffmpeg', '-y', '-f', 'concat', '-safe', '0', '-i', list_file_path, '-an', # Discard segment audios completely '-c:v', 'copy', # Direct stream copy (instant & lossless) temp_video_only ] print(f"Executing lossless FFMPEG video concat (no-audio): {' '.join(cmd_copy)}") result = subprocess.run(cmd_copy, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if result.returncode != 0: print("Lossless video-only concatenation failed. Retrying with full video re-encoding...") # Fallback path: video re-encoding (robust) cmd_reencode = [ 'ffmpeg', '-y', '-f', 'concat', '-safe', '0', '-i', list_file_path, '-an', '-c:v', 'libx264', '-pix_fmt', 'yuv420p', temp_video_only ] print(f"Executing FFMPEG video-only re-encode: {' '.join(cmd_reencode)}") result_re = subprocess.run(cmd_reencode, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if result_re.returncode != 0: raise Exception(f"FFMPEG video concatenation failed: {result_re.stderr.decode('utf-8')}") # Step 2: Merge the concatenated video track with the complete original audio file cmd_merge = [ 'ffmpeg', '-y', '-i', temp_video_only, '-i', str(full_audio_path), '-map', '0:v:0', # Use video from concatenated file '-map', '1:a:0', # Use audio from original full wav file '-c:v', 'copy', # Keep video intact '-c:a', 'aac', # Encode original audio to standard clean AAC '-shortest', # Sync to the shortest stream duration str(output_path) ] print(f"Applying final original audio track: {' '.join(cmd_merge)}") result_merge = subprocess.run(cmd_merge, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if result_merge.returncode != 0: raise Exception(f"FFMPEG audio replacement failed: {result_merge.stderr.decode('utf-8')}") finally: # Clean up temporary files if os.path.exists(list_file_path): os.remove(list_file_path) if os.path.exists(temp_video_only): try: os.remove(temp_video_only) except Exception as e: print(f"Warning: Could not remove temp file {temp_video_only}: {e}") # --- Initialization --- # Create output directory if it doesn't exist os.makedirs(OUTPUT_DIR, exist_ok=True) # Patch the LivePortrait config to enable full-frame pasteback patch_liveportrait_config() # Download weights before initializing the pipeline download_weights() # Instantiate the pipeline once to avoid reloading models on every request print("Initializing MoDA pipeline...") try: pipeline = LiveVASAPipeline( cfg_path=DEFAULT_CFG_PATH, motion_mean_std_path=DEFAULT_MOTION_MEAN_STD_PATH ) print("MoDA pipeline initialized successfully.") except Exception as e: print(f"Error initializing pipeline: {e}") pipeline = None # Invert the emo_map for easy lookup from the dropdown value emo_name_to_id = {v: k for k, v in emo_map.items()} # --- Core Generation Function --- @spaces.GPU(duration=120) def generate_motion(source_image_path, driving_audio_path, emotion_name, cfg_scale, progress=gr.Progress(track_tqdm=True)): """ Generates talking head video. For inputs longer than 7 seconds, it automatically chunks the audio into 7-second blocks, processes them sequentially using the last frame of the previous block as the next source image, and then merges them. """ if pipeline is None: raise gr.Error("Pipeline failed to initialize. Check the console logs for details.") if source_image_path is None: raise gr.Error("Please upload a source image.") if driving_audio_path is None: raise gr.Error("Please upload a driving audio file.") start_time = time.time() # Ensure audio is in WAV format (allowing up to 5 minutes) wav_audio_path = ensure_wav_format(driving_audio_path) temp_wav_created = wav_audio_path != driving_audio_path # Create a unique subdirectory for this run timestamp = time.strftime("%Y%m%d-%H%M%S") run_output_dir = os.path.join(OUTPUT_DIR, timestamp) os.makedirs(run_output_dir, exist_ok=True) # Get emotion ID from its name emotion_id = emo_name_to_id.get(emotion_name, 8) # Default to 'None' (ID 8) if not found print(f"Starting generation with the following parameters:") print(f" Source Image: {source_image_path}") print(f" Driving Audio (original): {driving_audio_path}") print(f" Driving Audio (WAV): {wav_audio_path}") print(f" Emotion: {emotion_name} (ID: {emotion_id})") print(f" CFG Scale: {cfg_scale}") # Chunk settings CHUNK_SIZE_MS = 7000 # 7 seconds temp_wav_files = [] temp_frame_files = [] video_chunks_paths = [] final_path = None try: # Load the converted WAV file audio = AudioSegment.from_file(wav_audio_path) audio_duration_ms = len(audio) # Slice audio into segments of max 7 seconds audio_chunks = [] for i in range(0, audio_duration_ms, CHUNK_SIZE_MS): audio_chunks.append(audio[i : i + CHUNK_SIZE_MS]) num_chunks = len(audio_chunks) print(f"Audio split into {num_chunks} segment(s) for safety (max {CHUNK_SIZE_MS/1000}s per segment).") current_image_path = source_image_path for idx, chunk in enumerate(audio_chunks): chunk_num = idx + 1 print(f"\n--- Processing Segment {chunk_num}/{num_chunks} ---") progress(idx / num_chunks, desc=f"Processing segment {chunk_num}/{num_chunks}...") # 1. Export current segment to a temporary WAV file with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp_chunk_wav: chunk_wav_path = tmp_chunk_wav.name chunk.export( chunk_wav_path, format='wav', parameters=["-ar", "16000", "-ac", "1"] # 16kHz, mono ) temp_wav_files.append(chunk_wav_path) # 2. Run pipeline inference on this chunk print(f"Running inference for segment {chunk_num} using image {current_image_path}...") returned_chunk_path = pipeline.driven_sample( image_path=str(current_image_path), audio_path=str(chunk_wav_path), cfg_scale=float(cfg_scale), emo=emotion_id, save_dir=run_output_dir, smooth=False, # Disable smoothing for a faster demo silent_audio_path=DEFAULT_SILENT_AUDIO_PATH, ) # Locate the actual generated video (MoDA prefixes with 'final_') returned_path_obj = Path(returned_chunk_path) actual_generated_path = returned_path_obj.with_name(f"final_{returned_path_obj.name}") # Rename/move to a unique segment path to prevent file overwrite conflicts unique_chunk_path = os.path.join(run_output_dir, f"segment_{idx}.mp4") if os.path.exists(actual_generated_path): os.rename(actual_generated_path, unique_chunk_path) chunk_video_path = unique_chunk_path print(f"Renamed generated file to unique path: {chunk_video_path}") else: print(f"Warning: Could not find actual generated file at {actual_generated_path}. Using fallback.") chunk_video_path = str(actual_generated_path) video_chunks_paths.append(chunk_video_path) # 3. Extract last frame if we have another chunk to process if chunk_num < num_chunks: last_frame_path = os.path.join(run_output_dir, f"last_frame_chunk_{idx}.jpg") print(f"Extracting last frame of segment {chunk_num} for next segment...") try: extract_last_frame(chunk_video_path, last_frame_path) current_image_path = last_frame_path temp_frame_files.append(last_frame_path) except Exception as e: print(f"Warning: Failed to extract last frame from segment {chunk_num}: {e}") print("Falling back to the original source image for the next segment.") current_image_path = source_image_path # 4. Merge all generated chunks and lay the original full audio track on top progress(0.95, desc="Merging segments and applying original audio...") final_video_name = f"final_{timestamp}.mp4" final_path = Path(os.path.join(run_output_dir, final_video_name)) # We always run this function (even for 1 chunk) to ensure the original, clean, high-quality WAV audio is perfectly mapped on the video concatenate_videos_ffmpeg(video_chunks_paths, wav_audio_path, final_path) except Exception as e: print(f"An error occurred during video generation: {e}") import traceback traceback.print_exc() raise gr.Error(f"An unexpected error occurred: {str(e)}. Please check the console for details.") finally: # Clean up temporary audio files generated for the chunks for f_wav in temp_wav_files: if os.path.exists(f_wav): try: os.remove(f_wav) except Exception as e: print(f"Warning: Could not delete chunk file {f_wav}: {e}") # Clean up temporary last frame images for f_img in temp_frame_files: if os.path.exists(f_img): try: os.remove(f_img) except Exception as e: print(f"Warning: Could not delete image file {f_img}: {e}") # Clean up the primary temporary WAV file if temp_wav_created and os.path.exists(wav_audio_path): try: os.remove(wav_audio_path) print(f"Cleaned up primary temporary WAV file: {wav_audio_path}") except Exception as e: print(f"Warning: Could not delete primary temporary file {wav_audio_path}: {e}") end_time = time.time() processing_time = end_time - start_time print(f"Video process completed successfully at: {final_path}") print(f"Processing time: {processing_time:.2f} seconds.") return final_path # --- Gradio UI Definition --- with gr.Blocks(theme=gr.themes.Soft(), css=".gradio-container {max-width: 960px !important; margin: 0 auto !important}") as demo: gr.HTML( """

MoDA: Multi-modal Diffusion Architecture for Talking Head Generation

""" ) with gr.Row(variant="panel"): with gr.Column(scale=1): with gr.Row(): source_image = gr.Image(label="Source Image", type="filepath", value="src/examples/reference_images/7.jpg") with gr.Row(): driving_audio = gr.Audio( label="Driving Audio", type="filepath", value="src/examples/driving_audios/5.wav" ) with gr.Row(): emotion_dropdown = gr.Dropdown( label="Emotion", choices=list(emo_map.values()), value="None" ) with gr.Row(): cfg_slider = gr.Slider( label="CFG Scale", minimum=1.0, maximum=3.0, step=0.05, value=1.2 ) submit_button = gr.Button("Generate Video", variant="primary") with gr.Column(scale=1): output_video = gr.Video(label="Generated Video") gr.Markdown( """ --- ### **Disclaimer** This project is intended for academic research, and we explicitly disclaim any responsibility for user-generated content. Users are solely liable for their actions while using this generative model. """ ) submit_button.click( fn=generate_motion, inputs=[source_image, driving_audio, emotion_dropdown, cfg_slider], outputs=output_video ) if __name__ == "__main__": demo.launch(share=True)