Opera8's picture
Update app.py
ae61cc9 verified
Raw
History Blame Contribute Delete
21 kB
import torch
# --- Patch torch.xpu to prevent diffusers and PyTorch random seed functions from crashing ---
if not hasattr(torch, "xpu"):
class DummyXPU:
@staticmethod
def empty_cache(*args, **kwargs):
pass
@staticmethod
def is_available(*args, **kwargs):
return False
@staticmethod
def device_count(*args, **kwargs):
return 0
@staticmethod
def manual_seed(*args, **kwargs):
pass
@staticmethod
def _is_in_bad_fork(*args, **kwargs):
return False
def __getattr__(self, name):
# Dynamically handles any unexpected attributes/methods queried by PyTorch or diffusers
return lambda *args, **kwargs: None
torch.xpu = DummyXPU()
import os
import sys
import time
import subprocess
import cv2
import gradio as gr
import spaces
from huggingface_hub import snapshot_download
from huggingface_hub.utils import GatedRepoError, RepositoryNotFoundError, RevisionNotFoundError
from pathlib import Path
import tempfile
from pydub import AudioSegment
# Add the src directory to the system path to allow for local imports
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), 'src')))
from models.inference.moda_test import LiveVASAPipeline, emo_map, set_seed
# --- Configuration ---
# Set seed for reproducibility
set_seed(42)
# Paths and constants for the Gradio demo
DEFAULT_CFG_PATH = "configs/audio2motion/inference/inference.yaml"
DEFAULT_MOTION_MEAN_STD_PATH = "src/datasets/mean.pt"
DEFAULT_SILENT_AUDIO_PATH = "src/examples/silent-audio.wav"
OUTPUT_DIR = "gradio_output"
WEIGHTS_DIR = "pretrain_weights"
REPO_ID = "lixinyizju/moda"
# --- Automatic Config Patching for Aspect Ratio Preservation ---
def patch_liveportrait_config():
"""
Patches the liveportrait_config.yaml to enable 'flag_pasteback'.
This stitches the animated face back onto the original high-resolution full-sized
source image, maintaining the exact original image size and aspect ratio.
"""
config_path = "configs/audio2motion/model/liveportrait_config.yaml"
if os.path.exists(config_path):
try:
import yaml
with open(config_path, 'r', encoding='utf-8') as f:
config_data = yaml.safe_load(f) or {}
# Enable pasteback to restore original image dimensions in the output video
if config_data.get('flag_pasteback') is not True:
config_data['flag_pasteback'] = True
with open(config_path, 'w', encoding='utf-8') as f:
yaml.dump(config_data, f)
print("Successfully patched liveportrait_config.yaml: flag_pasteback set to True")
except Exception as e:
print(f"Warning: Failed to automatically patch configuration file: {e}")
# --- Download Pre-trained Weights from Hugging Face Hub ---
def download_weights():
"""
Downloads pre-trained weights from Hugging Face Hub if they don't exist locally.
"""
# A simple check for a key file to see if the download is likely complete
motion_model_file = os.path.join(WEIGHTS_DIR, "moda", "net-200.pth")
if not os.path.exists(motion_model_file):
print(f"Weights not found locally. Downloading from Hugging Face Hub repo '{REPO_ID}'...")
print(f"This may take a while depending on your internet connection.")
try:
snapshot_download(
repo_id=REPO_ID,
local_dir=WEIGHTS_DIR,
local_dir_use_symlinks=False, # Use False to copy files directly; safer for Windows
resume_download=True,
)
print("Weights downloaded successfully.")
except GatedRepoError:
raise gr.Error(f"Access to the repository '{REPO_ID}' is gated. Please visit https://huggingface.co/{REPO_ID} to request access.")
except (RepositoryNotFoundError, RevisionNotFoundError):
raise gr.Error(f"The repository '{REPO_ID}' was not found. Please check the repository ID.")
except Exception as e:
print(f"An error occurred during download: {e}")
raise gr.Error(f"Failed to download models. Please check your internet connection and try again. Error: {e}")
else:
print(f"Found existing weights at '{WEIGHTS_DIR}'. Skipping download.")
# --- Audio Conversion and Trimming Function ---
def ensure_wav_format(audio_path, max_duration_ms=300000): # Increased to 5 minutes to support longer generations
"""
Ensures the audio file is in WAV format and limits its duration to max_duration_ms.
Always returns the path to the processed temporary WAV file.
"""
if audio_path is None:
return None
audio_path = Path(audio_path)
try:
print(f"Loading and processing audio from: {audio_path}")
# Load the audio file (supports wav, mp3, m4a, etc.)
audio = AudioSegment.from_file(audio_path)
# Trim if the audio exceeds the limit
if len(audio) > max_duration_ms:
print(f"Audio is {len(audio)/1000:.2f}s, trimming to {max_duration_ms/1000:.2f}s")
audio = audio[:max_duration_ms]
else:
print(f"Audio is {len(audio)/1000:.2f}s (under the {max_duration_ms/1000:.2f}s limit)")
# Create a temporary WAV file
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp_file:
wav_path = tmp_file.name
# Export as WAV with standard settings
audio.export(
wav_path,
format='wav',
parameters=["-ar", "16000", "-ac", "1"] # 16kHz, mono
)
print(f"Audio processed successfully: {wav_path}")
return wav_path
except Exception as e:
print(f"Error processing audio: {e}")
raise gr.Error(f"Failed to process or convert audio file. Error: {e}")
# --- Helper Functions for Segmented Generation ---
def extract_last_frame(video_path, output_image_path):
"""
Extracts the absolute last frame of the given video and saves it to output_image_path.
"""
cap = cv2.VideoCapture(str(video_path))
if not cap.isOpened():
raise Exception(f"Could not open video: {video_path}")
frame = None
success = False
while True:
ret, f = cap.read()
if not ret:
break
frame = f
success = True
cap.release()
if success and frame is not None:
cv2.imwrite(output_image_path, frame)
return True
else:
raise Exception("Failed to read frames from video.")
def concatenate_videos_ffmpeg(video_paths, full_audio_path, output_path):
"""
Stitches multiple video files together (video track only) and overlays the original full audio track.
This bypasses any pops or sync problems at the chunk boundaries.
"""
# Create the text file listing all videos
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False, encoding='utf-8') as f:
for p in video_paths:
# Format absolute path with forward slashes and escape single quotes
abs_path = os.path.abspath(p).replace('\\', '/')
safe_path = abs_path.replace("'", "'\\''")
f.write(f"file '{safe_path}'\n")
list_file_path = f.name
temp_video_only = os.path.join(os.path.dirname(output_path), "temp_video_only.mp4")
try:
# Step 1: Concatenate video tracks only (discard original segment audios to avoid jumps)
cmd_copy = [
'ffmpeg', '-y',
'-f', 'concat',
'-safe', '0',
'-i', list_file_path,
'-an', # Discard segment audios completely
'-c:v', 'copy', # Direct stream copy (instant & lossless)
temp_video_only
]
print(f"Executing lossless FFMPEG video concat (no-audio): {' '.join(cmd_copy)}")
result = subprocess.run(cmd_copy, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result.returncode != 0:
print("Lossless video-only concatenation failed. Retrying with full video re-encoding...")
# Fallback path: video re-encoding (robust)
cmd_reencode = [
'ffmpeg', '-y',
'-f', 'concat',
'-safe', '0',
'-i', list_file_path,
'-an',
'-c:v', 'libx264',
'-pix_fmt', 'yuv420p',
temp_video_only
]
print(f"Executing FFMPEG video-only re-encode: {' '.join(cmd_reencode)}")
result_re = subprocess.run(cmd_reencode, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result_re.returncode != 0:
raise Exception(f"FFMPEG video concatenation failed: {result_re.stderr.decode('utf-8')}")
# Step 2: Merge the concatenated video track with the complete original audio file
cmd_merge = [
'ffmpeg', '-y',
'-i', temp_video_only,
'-i', str(full_audio_path),
'-map', '0:v:0', # Use video from concatenated file
'-map', '1:a:0', # Use audio from original full wav file
'-c:v', 'copy', # Keep video intact
'-c:a', 'aac', # Encode original audio to standard clean AAC
'-shortest', # Sync to the shortest stream duration
str(output_path)
]
print(f"Applying final original audio track: {' '.join(cmd_merge)}")
result_merge = subprocess.run(cmd_merge, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result_merge.returncode != 0:
raise Exception(f"FFMPEG audio replacement failed: {result_merge.stderr.decode('utf-8')}")
finally:
# Clean up temporary files
if os.path.exists(list_file_path):
os.remove(list_file_path)
if os.path.exists(temp_video_only):
try:
os.remove(temp_video_only)
except Exception as e:
print(f"Warning: Could not remove temp file {temp_video_only}: {e}")
# --- Initialization ---
# Create output directory if it doesn't exist
os.makedirs(OUTPUT_DIR, exist_ok=True)
# Patch the LivePortrait config to enable full-frame pasteback
patch_liveportrait_config()
# Download weights before initializing the pipeline
download_weights()
# Instantiate the pipeline once to avoid reloading models on every request
print("Initializing MoDA pipeline...")
try:
pipeline = LiveVASAPipeline(
cfg_path=DEFAULT_CFG_PATH,
motion_mean_std_path=DEFAULT_MOTION_MEAN_STD_PATH
)
print("MoDA pipeline initialized successfully.")
except Exception as e:
print(f"Error initializing pipeline: {e}")
pipeline = None
# Invert the emo_map for easy lookup from the dropdown value
emo_name_to_id = {v: k for k, v in emo_map.items()}
# --- Core Generation Function ---
@spaces.GPU(duration=120)
def generate_motion(source_image_path, driving_audio_path, emotion_name, cfg_scale, progress=gr.Progress(track_tqdm=True)):
"""
Generates talking head video. For inputs longer than 7 seconds, it automatically
chunks the audio into 7-second blocks, processes them sequentially using the
last frame of the previous block as the next source image, and then merges them.
"""
if pipeline is None:
raise gr.Error("Pipeline failed to initialize. Check the console logs for details.")
if source_image_path is None:
raise gr.Error("Please upload a source image.")
if driving_audio_path is None:
raise gr.Error("Please upload a driving audio file.")
start_time = time.time()
# Ensure audio is in WAV format (allowing up to 5 minutes)
wav_audio_path = ensure_wav_format(driving_audio_path)
temp_wav_created = wav_audio_path != driving_audio_path
# Create a unique subdirectory for this run
timestamp = time.strftime("%Y%m%d-%H%M%S")
run_output_dir = os.path.join(OUTPUT_DIR, timestamp)
os.makedirs(run_output_dir, exist_ok=True)
# Get emotion ID from its name
emotion_id = emo_name_to_id.get(emotion_name, 8) # Default to 'None' (ID 8) if not found
print(f"Starting generation with the following parameters:")
print(f" Source Image: {source_image_path}")
print(f" Driving Audio (original): {driving_audio_path}")
print(f" Driving Audio (WAV): {wav_audio_path}")
print(f" Emotion: {emotion_name} (ID: {emotion_id})")
print(f" CFG Scale: {cfg_scale}")
# Chunk settings
CHUNK_SIZE_MS = 7000 # 7 seconds
temp_wav_files = []
temp_frame_files = []
video_chunks_paths = []
final_path = None
try:
# Load the converted WAV file
audio = AudioSegment.from_file(wav_audio_path)
audio_duration_ms = len(audio)
# Slice audio into segments of max 7 seconds
audio_chunks = []
for i in range(0, audio_duration_ms, CHUNK_SIZE_MS):
audio_chunks.append(audio[i : i + CHUNK_SIZE_MS])
num_chunks = len(audio_chunks)
print(f"Audio split into {num_chunks} segment(s) for safety (max {CHUNK_SIZE_MS/1000}s per segment).")
current_image_path = source_image_path
for idx, chunk in enumerate(audio_chunks):
chunk_num = idx + 1
print(f"\n--- Processing Segment {chunk_num}/{num_chunks} ---")
progress(idx / num_chunks, desc=f"Processing segment {chunk_num}/{num_chunks}...")
# 1. Export current segment to a temporary WAV file
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp_chunk_wav:
chunk_wav_path = tmp_chunk_wav.name
chunk.export(
chunk_wav_path,
format='wav',
parameters=["-ar", "16000", "-ac", "1"] # 16kHz, mono
)
temp_wav_files.append(chunk_wav_path)
# 2. Run pipeline inference on this chunk
print(f"Running inference for segment {chunk_num} using image {current_image_path}...")
returned_chunk_path = pipeline.driven_sample(
image_path=str(current_image_path),
audio_path=str(chunk_wav_path),
cfg_scale=float(cfg_scale),
emo=emotion_id,
save_dir=run_output_dir,
smooth=False, # Disable smoothing for a faster demo
silent_audio_path=DEFAULT_SILENT_AUDIO_PATH,
)
# Locate the actual generated video (MoDA prefixes with 'final_')
returned_path_obj = Path(returned_chunk_path)
actual_generated_path = returned_path_obj.with_name(f"final_{returned_path_obj.name}")
# Rename/move to a unique segment path to prevent file overwrite conflicts
unique_chunk_path = os.path.join(run_output_dir, f"segment_{idx}.mp4")
if os.path.exists(actual_generated_path):
os.rename(actual_generated_path, unique_chunk_path)
chunk_video_path = unique_chunk_path
print(f"Renamed generated file to unique path: {chunk_video_path}")
else:
print(f"Warning: Could not find actual generated file at {actual_generated_path}. Using fallback.")
chunk_video_path = str(actual_generated_path)
video_chunks_paths.append(chunk_video_path)
# 3. Extract last frame if we have another chunk to process
if chunk_num < num_chunks:
last_frame_path = os.path.join(run_output_dir, f"last_frame_chunk_{idx}.jpg")
print(f"Extracting last frame of segment {chunk_num} for next segment...")
try:
extract_last_frame(chunk_video_path, last_frame_path)
current_image_path = last_frame_path
temp_frame_files.append(last_frame_path)
except Exception as e:
print(f"Warning: Failed to extract last frame from segment {chunk_num}: {e}")
print("Falling back to the original source image for the next segment.")
current_image_path = source_image_path
# 4. Merge all generated chunks and lay the original full audio track on top
progress(0.95, desc="Merging segments and applying original audio...")
final_video_name = f"final_{timestamp}.mp4"
final_path = Path(os.path.join(run_output_dir, final_video_name))
# We always run this function (even for 1 chunk) to ensure the original, clean, high-quality WAV audio is perfectly mapped on the video
concatenate_videos_ffmpeg(video_chunks_paths, wav_audio_path, final_path)
except Exception as e:
print(f"An error occurred during video generation: {e}")
import traceback
traceback.print_exc()
raise gr.Error(f"An unexpected error occurred: {str(e)}. Please check the console for details.")
finally:
# Clean up temporary audio files generated for the chunks
for f_wav in temp_wav_files:
if os.path.exists(f_wav):
try:
os.remove(f_wav)
except Exception as e:
print(f"Warning: Could not delete chunk file {f_wav}: {e}")
# Clean up temporary last frame images
for f_img in temp_frame_files:
if os.path.exists(f_img):
try:
os.remove(f_img)
except Exception as e:
print(f"Warning: Could not delete image file {f_img}: {e}")
# Clean up the primary temporary WAV file
if temp_wav_created and os.path.exists(wav_audio_path):
try:
os.remove(wav_audio_path)
print(f"Cleaned up primary temporary WAV file: {wav_audio_path}")
except Exception as e:
print(f"Warning: Could not delete primary temporary file {wav_audio_path}: {e}")
end_time = time.time()
processing_time = end_time - start_time
print(f"Video process completed successfully at: {final_path}")
print(f"Processing time: {processing_time:.2f} seconds.")
return final_path
# --- Gradio UI Definition ---
with gr.Blocks(theme=gr.themes.Soft(), css=".gradio-container {max-width: 960px !important; margin: 0 auto !important}") as demo:
gr.HTML(
"""
<div align='center'>
<h1>MoDA: Multi-modal Diffusion Architecture for Talking Head Generation</h1>
<p style="display:flex">
<a href='https://lixinyyang.github.io/MoDA.github.io/'><img src='https://img.shields.io/badge/Project-Page-blue'></a>
<a href='https://arxiv.org/abs/2507.03256'><img src='https://img.shields.io/badge/Paper-Arxiv-red'></a>
<a href='https://github.com/lixinyyang/MoDA/'><img src='https://img.shields.io/badge/Code-Github-green'></a>
</p>
</div>
"""
)
with gr.Row(variant="panel"):
with gr.Column(scale=1):
with gr.Row():
source_image = gr.Image(label="Source Image", type="filepath", value="src/examples/reference_images/7.jpg")
with gr.Row():
driving_audio = gr.Audio(
label="Driving Audio",
type="filepath",
value="src/examples/driving_audios/5.wav"
)
with gr.Row():
emotion_dropdown = gr.Dropdown(
label="Emotion",
choices=list(emo_map.values()),
value="None"
)
with gr.Row():
cfg_slider = gr.Slider(
label="CFG Scale",
minimum=1.0,
maximum=3.0,
step=0.05,
value=1.2
)
submit_button = gr.Button("Generate Video", variant="primary")
with gr.Column(scale=1):
output_video = gr.Video(label="Generated Video")
gr.Markdown(
"""
---
### **Disclaimer**
This project is intended for academic research, and we explicitly disclaim any responsibility for user-generated content. Users are solely liable for their actions while using this generative model.
"""
)
submit_button.click(
fn=generate_motion,
inputs=[source_image, driving_audio, emotion_dropdown, cfg_slider],
outputs=output_video
)
if __name__ == "__main__":
demo.launch(share=True)