Spaces:
Paused
Paused
| # app.py | |
| import os | |
| import sys | |
| import time | |
| import gradio as gr | |
| import spaces | |
| from huggingface_hub import snapshot_download | |
| from huggingface_hub.utils import GatedRepoError, RepositoryNotFoundError, RevisionNotFoundError | |
| from pathlib import Path | |
| import tempfile | |
| from pydub import AudioSegment | |
| import cv2 | |
| import numpy as np | |
| from scipy import interpolate | |
| # Add the src directory to the system path to allow for local imports | |
| sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), 'src'))) | |
| from models.inference.moda_test import LiveVASAPipeline, emo_map, set_seed | |
| # --- Configuration --- | |
| # Set seed for reproducibility | |
| set_seed(42) | |
| # Paths and constants for the Gradio demo | |
| DEFAULT_CFG_PATH = "configs/audio2motion/inference/inference.yaml" | |
| DEFAULT_MOTION_MEAN_STD_PATH = "src/datasets/mean.pt" | |
| DEFAULT_SILENT_AUDIO_PATH = "src/examples/silent-audio.wav" | |
| OUTPUT_DIR = "gradio_output" | |
| WEIGHTS_DIR = "pretrain_weights" | |
| REPO_ID = "lixinyizju/moda" | |
| # --- Download Pre-trained Weights from Hugging Face Hub --- | |
| def download_weights(): | |
| """ | |
| Downloads pre-trained weights from Hugging Face Hub if they don't exist locally. | |
| """ | |
| # A simple check for a key file to see if the download is likely complete | |
| motion_model_file = os.path.join(WEIGHTS_DIR, "moda", "net-200.pth") | |
| if not os.path.exists(motion_model_file): | |
| print(f"Weights not found locally. Downloading from Hugging Face Hub repo '{REPO_ID}'...") | |
| print(f"This may take a while depending on your internet connection.") | |
| try: | |
| snapshot_download( | |
| repo_id=REPO_ID, | |
| local_dir=WEIGHTS_DIR, | |
| local_dir_use_symlinks=False, # Use False to copy files directly; safer for Windows | |
| resume_download=True, | |
| ) | |
| print("Weights downloaded successfully.") | |
| except GatedRepoError: | |
| raise gr.Error(f"Access to the repository '{REPO_ID}' is gated. Please visit https://huggingface.co/{REPO_ID} to request access.") | |
| except (RepositoryNotFoundError, RevisionNotFoundError): | |
| raise gr.Error(f"The repository '{REPO_ID}' was not found. Please check the repository ID.") | |
| except Exception as e: | |
| print(f"An error occurred during download: {e}") | |
| raise gr.Error(f"Failed to download models. Please check your internet connection and try again. Error: {e}") | |
| else: | |
| print(f"Found existing weights at '{WEIGHTS_DIR}'. Skipping download.") | |
| # --- Audio Conversion Function --- | |
| def ensure_wav_format(audio_path): | |
| """ | |
| Ensures the audio file is in WAV format. If not, converts it to WAV. | |
| Returns the path to the WAV file (either original or converted). | |
| """ | |
| if audio_path is None: | |
| return None | |
| audio_path = Path(audio_path) | |
| # Check if already WAV | |
| if audio_path.suffix.lower() == '.wav': | |
| print(f"Audio is already in WAV format: {audio_path}") | |
| return str(audio_path) | |
| # Convert to WAV | |
| print(f"Converting audio from {audio_path.suffix} to WAV format...") | |
| try: | |
| # Load the audio file | |
| audio = AudioSegment.from_file(audio_path) | |
| # Create a temporary WAV file | |
| with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp_file: | |
| wav_path = tmp_file.name | |
| # Export as WAV with higher sampling rate for better quality | |
| audio.export( | |
| wav_path, | |
| format='wav', | |
| parameters=["-ar", "24000", "-ac", "1"] # 24kHz, mono for better lip-sync | |
| ) | |
| print(f"Audio converted successfully to: {wav_path}") | |
| return wav_path | |
| except Exception as e: | |
| print(f"Error converting audio: {e}") | |
| raise gr.Error(f"Failed to convert audio file to WAV format. Error: {e}") | |
| # --- Frame Interpolation Function --- | |
| def interpolate_frames(video_path, target_fps=30): | |
| """ | |
| Interpolates frames in a video to achieve smoother motion. | |
| Args: | |
| video_path: Path to the input video | |
| target_fps: Target frames per second | |
| Returns: | |
| Path to the interpolated video | |
| """ | |
| try: | |
| video_path = str(video_path) | |
| cap = cv2.VideoCapture(video_path) | |
| # Get original video properties | |
| original_fps = cap.get(cv2.CAP_PROP_FPS) | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| # Fix for FPS detection issue | |
| if original_fps == 0 or original_fps is None: | |
| print("Warning: Could not detect original FPS. Assuming 25 FPS.") | |
| original_fps = 25.0 | |
| print(f"Original FPS: {original_fps}, Target FPS: {target_fps}") | |
| # If target FPS is not higher, return original | |
| if original_fps >= target_fps: | |
| cap.release() | |
| print("Target FPS is not higher than original. Skipping interpolation.") | |
| return video_path | |
| # Read all frames | |
| frames = [] | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| frames.append(frame) | |
| cap.release() | |
| if len(frames) < 2: | |
| print("Not enough frames for interpolation.") | |
| return video_path | |
| # Calculate interpolation factor (can be fractional) | |
| interpolation_factor = target_fps / original_fps | |
| # For fractional factors, we need different approach | |
| if interpolation_factor <= 1: | |
| print("Interpolation factor too low. Skipping.") | |
| return video_path | |
| print(f"Interpolating with factor: {interpolation_factor:.2f}") | |
| print(f"Total frames to process: {len(frames)}") | |
| # Perform frame interpolation | |
| interpolated_frames = [] | |
| if interpolation_factor == int(interpolation_factor): | |
| # Integer factor - simple interpolation | |
| factor = int(interpolation_factor) | |
| for i in range(len(frames) - 1): | |
| interpolated_frames.append(frames[i]) | |
| # Generate intermediate frames | |
| for j in range(1, factor): | |
| alpha = j / factor | |
| interpolated_frame = cv2.addWeighted( | |
| frames[i], 1 - alpha, | |
| frames[i + 1], alpha, | |
| 0 | |
| ) | |
| interpolated_frames.append(interpolated_frame) | |
| interpolated_frames.append(frames[-1]) | |
| else: | |
| # Fractional factor - use different approach | |
| # For 25 -> 60 fps, we need to add selective frames | |
| for i in range(len(frames) - 1): | |
| interpolated_frames.append(frames[i]) | |
| # Add intermediate frame for smoother motion | |
| if i % 2 == 0: # Add extra frame every other original frame | |
| alpha = 0.4 # Blend ratio | |
| interpolated_frame = cv2.addWeighted( | |
| frames[i], 1 - alpha, | |
| frames[i + 1], alpha, | |
| 0 | |
| ) | |
| interpolated_frames.append(interpolated_frame) | |
| interpolated_frames.append(frames[-1]) | |
| print(f"Total interpolated frames: {len(interpolated_frames)}") | |
| # Save the interpolated video | |
| output_path = video_path.replace('.mp4', '_interpolated.mp4') | |
| # Use H.264 codec for better compatibility | |
| fourcc = cv2.VideoWriter_fourcc(*'H264') | |
| out = cv2.VideoWriter(output_path, fourcc, target_fps, (width, height)) | |
| if not out.isOpened(): | |
| # Fallback to mp4v if H264 not available | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(output_path, fourcc, target_fps, (width, height)) | |
| for frame in interpolated_frames: | |
| out.write(frame) | |
| out.release() | |
| print(f"Interpolated video saved to: {output_path}") | |
| return output_path | |
| except Exception as e: | |
| print(f"Error during frame interpolation: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return video_path # Return original if interpolation fails | |
| # --- Initialization --- | |
| # Create output directory if it doesn't exist | |
| os.makedirs(OUTPUT_DIR, exist_ok=True) | |
| # Download weights before initializing the pipeline | |
| download_weights() | |
| # Instantiate the pipeline once to avoid reloading models on every request | |
| print("Initializing MoDA pipeline...") | |
| try: | |
| pipeline = LiveVASAPipeline( | |
| cfg_path=DEFAULT_CFG_PATH, | |
| motion_mean_std_path=DEFAULT_MOTION_MEAN_STD_PATH | |
| ) | |
| print("MoDA pipeline initialized successfully.") | |
| except Exception as e: | |
| print(f"Error initializing pipeline: {e}") | |
| pipeline = None | |
| # Invert the emo_map for easy lookup from the dropdown value | |
| emo_name_to_id = {v: k for k, v in emo_map.items()} | |
| # --- Audio Length Check Function --- | |
| def check_audio_length(audio_path): | |
| """ | |
| Check the length of an audio file and warn if it's too long. | |
| Args: | |
| audio_path: Path to the audio file | |
| Returns: | |
| Duration in seconds | |
| """ | |
| try: | |
| audio = AudioSegment.from_file(audio_path) | |
| duration_seconds = len(audio) / 1000.0 | |
| return duration_seconds | |
| except Exception as e: | |
| print(f"Error checking audio length: {e}") | |
| return None | |
| # --- Core Generation Function --- | |
| # Increased duration for smoothing and interpolation | |
| def generate_motion(source_image_path, driving_audio_path, emotion_name, | |
| cfg_scale, smooth_enabled, target_fps, | |
| progress=gr.Progress(track_tqdm=True)): | |
| """ | |
| The main function that takes Gradio inputs and generates the talking head video. | |
| Args: | |
| source_image_path: Path to the source image | |
| driving_audio_path: Path to the driving audio | |
| emotion_name: Selected emotion | |
| cfg_scale: CFG scale for generation | |
| smooth_enabled: Whether to enable smoothing | |
| target_fps: Target frames per second for interpolation | |
| """ | |
| if pipeline is None: | |
| raise gr.Error("Pipeline failed to initialize. Check the console logs for details.") | |
| if source_image_path is None: | |
| raise gr.Error("Please upload a source image.") | |
| if driving_audio_path is None: | |
| raise gr.Error("Please upload a driving audio file.") | |
| # Check audio length | |
| audio_duration = check_audio_length(driving_audio_path) | |
| if audio_duration: | |
| print(f"Audio duration: {audio_duration:.1f} seconds") | |
| if audio_duration > 60: | |
| gr.Warning(f"⚠️ Audio is {audio_duration:.1f} seconds long. MoDA works best with audio under 60 seconds. Processing may be slow and quality may degrade.") | |
| if audio_duration > 180: | |
| raise gr.Error("Audio is too long. Please use audio files under 3 minutes (180 seconds) for best results.") | |
| start_time = time.time() | |
| # Ensure audio is in WAV format with optimal sampling rate | |
| wav_audio_path = ensure_wav_format(driving_audio_path) | |
| temp_wav_created = wav_audio_path != driving_audio_path | |
| # Create a unique subdirectory for this run | |
| timestamp = time.strftime("%Y%m%d-%H%M%S") | |
| run_output_dir = os.path.join(OUTPUT_DIR, timestamp) | |
| os.makedirs(run_output_dir, exist_ok=True) | |
| # Get emotion ID from its name | |
| emotion_id = emo_name_to_id.get(emotion_name, 8) # Default to 'None' (ID 8) if not found | |
| print(f"Starting generation with the following parameters:") | |
| print(f" Source Image: {source_image_path}") | |
| print(f" Driving Audio (original): {driving_audio_path}") | |
| print(f" Driving Audio (WAV): {wav_audio_path}") | |
| print(f" Emotion: {emotion_name} (ID: {emotion_id})") | |
| print(f" CFG Scale: {cfg_scale}") | |
| print(f" Smoothing: {smooth_enabled}") | |
| print(f" Target FPS: {target_fps}") | |
| try: | |
| # Temporarily disable smoothing if it causes CUDA tensor issues | |
| # Check if smooth causes issues and handle gracefully | |
| try: | |
| # Try with smoothing first | |
| result_video_path = pipeline.driven_sample( | |
| image_path=source_image_path, | |
| audio_path=wav_audio_path, | |
| cfg_scale=float(cfg_scale), | |
| emo=emotion_id, | |
| save_dir=".", | |
| smooth=smooth_enabled, # Use the checkbox value | |
| silent_audio_path=DEFAULT_SILENT_AUDIO_PATH, | |
| ) | |
| except TypeError as tensor_error: | |
| if "can't convert cuda" in str(tensor_error) and smooth_enabled: | |
| print("Warning: Smoothing caused CUDA tensor error. Retrying without smoothing...") | |
| # Retry without smoothing | |
| result_video_path = pipeline.driven_sample( | |
| image_path=source_image_path, | |
| audio_path=wav_audio_path, | |
| cfg_scale=float(cfg_scale), | |
| emo=emotion_id, | |
| save_dir=".", | |
| smooth=False, # Disable smoothing as fallback | |
| silent_audio_path=DEFAULT_SILENT_AUDIO_PATH, | |
| ) | |
| print("Generated video without smoothing due to technical limitations.") | |
| else: | |
| raise tensor_error | |
| # Apply frame interpolation if requested | |
| if target_fps > 24: # Assuming default is around 24 FPS | |
| print(f"Applying frame interpolation to achieve {target_fps} FPS...") | |
| result_video_path = interpolate_frames(result_video_path, target_fps=target_fps) | |
| except Exception as e: | |
| print(f"An error occurred during video generation: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| raise gr.Error(f"An unexpected error occurred: {str(e)}. Please check the console for details.") | |
| finally: | |
| # Clean up temporary WAV file if created | |
| if temp_wav_created and os.path.exists(wav_audio_path): | |
| try: | |
| os.remove(wav_audio_path) | |
| print(f"Cleaned up temporary WAV file: {wav_audio_path}") | |
| except Exception as e: | |
| print(f"Warning: Could not delete temporary file {wav_audio_path}: {e}") | |
| end_time = time.time() | |
| processing_time = end_time - start_time | |
| result_video_path = Path(result_video_path) | |
| final_path = result_video_path.with_name(f"final_{result_video_path.stem}{result_video_path.suffix}") | |
| print(f"Video generated successfully at: {final_path}") | |
| print(f"Processing time: {processing_time:.2f} seconds.") | |
| return final_path | |
| # --- Gradio UI Definition --- | |
| with gr.Blocks(theme=gr.themes.Soft(), css=".gradio-container {max-width: 960px !important; margin: 0 auto !important}") as demo: | |
| gr.HTML( | |
| """ | |
| <div align='center'> | |
| <h1>MoDA: MODA PLUS: Talking Head Generation</h1> | |
| <h2 style="color: #4A90E2;">Enhanced Version with Smooth Motion</h2> | |
| <p style="display:flex; justify-content: center; gap: 10px;"> | |
| <a href='https://lixinyyang.github.io/MoDA.github.io/'><img src='https://img.shields.io/badge/Project-Page-blue'></a> | |
| <a href='https://arxiv.org/abs/2507.03256'><img src='https://img.shields.io/badge/Paper-Arxiv-red'></a> | |
| <a href='https://github.com/lixinyyang/MoDA/'><img src='https://img.shields.io/badge/Code-Github-green'></a> | |
| </p> | |
| </div> | |
| """ | |
| ) | |
| with gr.Row(variant="panel"): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### 📥 Input Settings") | |
| with gr.Row(): | |
| source_image = gr.Image( | |
| label="Source Image", | |
| type="filepath", | |
| value="src/examples/reference_images/7.jpg" | |
| ) | |
| with gr.Row(): | |
| driving_audio = gr.Audio( | |
| label="Driving Audio (Recommended: < 60 seconds)", | |
| type="filepath", | |
| value="src/examples/driving_audios/5.wav" | |
| ) | |
| gr.Markdown("### ⚙️ Generation Settings") | |
| with gr.Row(): | |
| emotion_dropdown = gr.Dropdown( | |
| label="Emotion", | |
| choices=list(emo_map.values()), | |
| value="None", | |
| info="Select an emotion for more natural facial expressions" | |
| ) | |
| with gr.Row(): | |
| cfg_slider = gr.Slider( | |
| label="CFG Scale (Lower = Smoother motion)", | |
| minimum=0.5, | |
| maximum=5.0, | |
| step=0.1, | |
| value=0.5, | |
| info="Lower values produce smoother but less controlled motion" | |
| ) | |
| gr.Markdown("### 🎬 Motion Enhancement") | |
| with gr.Row(): | |
| smooth_checkbox = gr.Checkbox( | |
| label="Enable Smoothing (Experimental)", | |
| value=False, # Changed to False due to CUDA issues | |
| info="May cause errors on some systems. If errors occur, disable this option." | |
| ) | |
| with gr.Row(): | |
| fps_slider = gr.Slider( | |
| label="Target FPS", | |
| minimum=24, | |
| maximum=50, | |
| step=1, | |
| value=50, | |
| info="Higher FPS for smoother motion. 30 FPS recommended, 50 FPS maximum" | |
| ) | |
| submit_button = gr.Button("🎥 Generate Video", variant="primary", size="lg") | |
| with gr.Column(scale=1): | |
| gr.Markdown("### 📺 Output") | |
| output_video = gr.Video(label="Generated Video") | |
| # Processing status | |
| with gr.Row(): | |
| gr.Markdown( | |
| """ | |
| <div style="background-color: #f0f8ff; padding: 10px; border-radius: 5px; margin-top: 10px;"> | |
| <p style="margin: 0; font-size: 0.9em;"> | |
| <b>Tips for best results:</b><br> | |
| • Use high-quality front-facing images<br> | |
| • Clear audio without background noise<br> | |
| • <b>Keep audio under 60 seconds</b><br> | |
| • Adjust CFG scale if motion seems stiff<br> | |
| • For longer audio, split into segments | |
| </p> | |
| </div> | |
| """ | |
| ) | |
| # Examples section | |
| gr.Examples( | |
| examples=[ | |
| ["src/examples/reference_images/7.jpg", "src/examples/driving_audios/5.wav", "None", 1.0, False, 30], | |
| ["src/examples/reference_images/7.jpg", "src/examples/driving_audios/5.wav", "Happy", 0.8, False, 30], | |
| ["src/examples/reference_images/7.jpg", "src/examples/driving_audios/5.wav", "Sad", 1.2, False, 24], | |
| ], | |
| inputs=[source_image, driving_audio, emotion_dropdown, cfg_slider, smooth_checkbox, fps_slider], | |
| outputs=output_video, | |
| fn=generate_motion, | |
| cache_examples=False, | |
| label="Example Configurations" | |
| ) | |
| submit_button.click( | |
| fn=generate_motion, | |
| inputs=[source_image, driving_audio, emotion_dropdown, cfg_slider, smooth_checkbox, fps_slider], | |
| outputs=output_video | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(share=True) |