Spaces:
Running
Running
| """ | |
| InfiniteTalk - Talking Video Generator | |
| Gradio Space for HuggingFace | |
| """ | |
| import os | |
| import sys | |
| # CRITICAL: Set environment variables BEFORE any torch/torchvision imports | |
| # This prevents torchvision from registering CUDA ops that don't exist at import time | |
| os.environ["TORCHVISION_DISABLE_META_REGISTRATIONS"] = "1" | |
| os.environ["TORCH_LOGS"] = "-all" # Reduce torch logging noise | |
| import random | |
| import logging | |
| import warnings | |
| from pathlib import Path | |
| import gradio as gr | |
| import torch | |
| import numpy as np | |
| # Suppress warnings | |
| warnings.filterwarnings('ignore') | |
| # Setup logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Add current directory to path | |
| sys.path.insert(0, str(Path(__file__).parent)) | |
| # Import utilities | |
| from utils.model_loader import ModelManager | |
| from utils.gpu_manager import gpu_manager | |
| # Import InfiniteTalk modules | |
| import wan | |
| from wan.configs import SIZE_CONFIGS, WAN_CONFIGS | |
| from wan.utils.utils import cache_image, cache_video, is_video | |
| from wan.utils.multitalk_utils import save_video_ffmpeg | |
| # Audio processing | |
| import librosa | |
| import soundfile as sf | |
| import pyloudnorm as pyln | |
| from transformers import Wav2Vec2FeatureExtractor | |
| from src.audio_analysis.wav2vec2 import Wav2Vec2Model | |
| # Image/Video processing | |
| from PIL import Image | |
| from einops import rearrange | |
| # Global variables | |
| model_manager = None | |
| models_loaded = False | |
| def initialize_models(progress=gr.Progress()): | |
| """Initialize models on first use""" | |
| global model_manager, models_loaded | |
| if models_loaded: | |
| return | |
| try: | |
| progress(0.1, desc="Initializing model manager...") | |
| model_manager = ModelManager() | |
| progress(0.3, desc="Downloading models (first time only - may take 2-3 minutes)...") | |
| # Download models (lazy loading - they'll be loaded on first inference) | |
| model_manager.get_wan_model_path() | |
| model_manager.get_infinitetalk_weights_path() | |
| model_manager.get_wav2vec_model_path() | |
| models_loaded = True | |
| progress(1.0, desc="Models ready!") | |
| logger.info("Models initialized successfully") | |
| except Exception as e: | |
| logger.error(f"Error initializing models: {e}") | |
| raise gr.Error(f"Failed to initialize models: {str(e)}") | |
| def loudness_norm(audio_array, sr=16000, lufs=-20.0): | |
| """Normalize audio loudness using pyloudnorm""" | |
| try: | |
| meter = pyln.Meter(sr) | |
| loudness = meter.integrated_loudness(audio_array) | |
| if abs(loudness) > 100: # Skip if loudness measurement failed | |
| return audio_array | |
| normalized_audio = pyln.normalize.loudness(audio_array, loudness, lufs) | |
| return normalized_audio | |
| except Exception as e: | |
| logger.warning(f"Loudness normalization failed: {e}, returning original audio") | |
| return audio_array | |
| def process_audio(audio_path, target_sr=16000): | |
| """ | |
| Process audio file for InfiniteTalk (matches audio_prepare_single from reference) | |
| Args: | |
| audio_path: Path to audio file | |
| target_sr: Target sample rate | |
| Returns: | |
| Processed audio array and sample rate | |
| """ | |
| try: | |
| # Load audio with librosa | |
| audio, sr = librosa.load(audio_path, sr=target_sr) | |
| # Normalize loudness | |
| audio = loudness_norm(audio, sr) | |
| # Ensure mono | |
| if len(audio.shape) > 1: | |
| audio = np.mean(audio, axis=1) | |
| return audio, sr | |
| except Exception as e: | |
| logger.error(f"Error processing audio: {e}") | |
| raise gr.Error(f"Audio processing failed: {str(e)}") | |
| def validate_inputs(image_or_video, audio, resolution, steps): | |
| """Validate user inputs""" | |
| errors = [] | |
| if image_or_video is None: | |
| errors.append("Please upload an image or video") | |
| if audio is None: | |
| errors.append("Please upload an audio file") | |
| if resolution not in ["480p", "720p"]: | |
| errors.append("Invalid resolution selected") | |
| if not (20 <= steps <= 50): | |
| errors.append("Steps must be between 20 and 50") | |
| if errors: | |
| raise gr.Error(" | ".join(errors)) | |
| def generate_video( | |
| image_or_video, | |
| audio_file, | |
| resolution="480p", | |
| steps=40, | |
| audio_guide_scale=3.0, | |
| seed=-1, | |
| progress=gr.Progress() | |
| ): | |
| """ | |
| Generate talking video from image or dub existing video | |
| Args: | |
| image_or_video: Input image or video file | |
| audio_file: Audio file for lip-sync | |
| resolution: Output resolution (480p or 720p) | |
| steps: Number of diffusion steps | |
| audio_guide_scale: Audio conditioning strength | |
| seed: Random seed for reproducibility | |
| progress: Gradio progress tracker | |
| Returns: | |
| Path to generated video | |
| """ | |
| try: | |
| # Check if GPU is available | |
| if not torch.cuda.is_available(): | |
| raise gr.Error( | |
| "⚠️ GPU not available. This Space requires GPU hardware to generate videos. " | |
| "Please apply for a Community GPU Grant in the Space settings, or run this app locally with a GPU." | |
| ) | |
| # Initialize models if needed | |
| if not models_loaded: | |
| initialize_models(progress) | |
| # Validate inputs | |
| validate_inputs(image_or_video, audio_file, resolution, steps) | |
| # GPU memory check | |
| gpu_manager.print_memory_usage("Initial - ") | |
| progress(0.1, desc="Processing audio...") | |
| # Process audio | |
| audio, sr = process_audio(audio_file) | |
| audio_duration = len(audio) / sr | |
| logger.info(f"Audio duration: {audio_duration:.2f}s") | |
| # Calculate ZeroGPU duration | |
| zerogpu_duration = gpu_manager.calculate_duration_for_zerogpu( | |
| audio_duration, resolution | |
| ) | |
| progress(0.2, desc="Loading models...") | |
| # Load models | |
| size = f"infinitetalk-{resolution.replace('p', '')}" | |
| # Load InfiniteTalk pipeline | |
| wan_pipeline = model_manager.load_wan_model(size=size, device="cuda") | |
| # Load audio encoder | |
| audio_encoder, feature_extractor = model_manager.load_audio_encoder(device="cuda") | |
| gpu_manager.print_memory_usage("After model loading - ") | |
| progress(0.3, desc="Processing input...") | |
| # Determine if input is image or video | |
| is_input_video = is_video(image_or_video) | |
| if is_input_video: | |
| logger.info("Processing video dubbing...") | |
| input_frames = cache_video(image_or_video) | |
| else: | |
| logger.info("Processing image-to-video...") | |
| input_image = Image.open(image_or_video).convert("RGB") | |
| input_frames = [input_image] | |
| progress(0.4, desc="Extracting audio features...") | |
| # Extract audio features (matches get_embedding from reference) | |
| audio_duration = len(audio) / sr | |
| video_length = audio_duration * 25 # Assume 25 FPS | |
| # Extract features with wav2vec | |
| audio_feature = np.squeeze( | |
| feature_extractor(audio, sampling_rate=sr).input_values | |
| ) | |
| audio_feature = torch.from_numpy(audio_feature).float().to(device="cuda") | |
| audio_feature = audio_feature.unsqueeze(0) | |
| # Get embeddings from audio encoder | |
| with torch.no_grad(): | |
| embeddings = audio_encoder(audio_feature, seq_len=int(video_length), output_hidden_states=True) | |
| if len(embeddings) == 0 or not hasattr(embeddings, 'hidden_states'): | |
| raise gr.Error("Failed to extract audio embeddings") | |
| # Stack hidden states (matches reference implementation) | |
| from einops import rearrange | |
| audio_embeddings = torch.stack(embeddings.hidden_states[1:], dim=1).squeeze(0) | |
| audio_embeddings = rearrange(audio_embeddings, "b s d -> s b d") | |
| audio_embeddings = audio_embeddings.cpu().detach() | |
| logger.info(f"Audio embeddings shape: {audio_embeddings.shape}") | |
| gpu_manager.print_memory_usage("After audio processing - ") | |
| progress(0.5, desc="Generating video (this may take a minute)...") | |
| # Set random seed | |
| if seed == -1: | |
| seed = random.randint(0, 99999999) | |
| torch.manual_seed(seed) | |
| if torch.cuda.is_available(): | |
| torch.cuda.manual_seed(seed) | |
| # Generate video with InfiniteTalk | |
| output_path = f"/tmp/output_{seed}.mp4" | |
| # Prepare input for pipeline (following generate_infinitetalk.py structure) | |
| with torch.no_grad(): | |
| logger.info(f"Generating {resolution} video with {steps} steps...") | |
| # Save audio embeddings to temporary file (pipeline expects file path) | |
| import tempfile | |
| os.makedirs("/tmp/audio_embeddings", exist_ok=True) | |
| emb_path = "/tmp/audio_embeddings/1.pt" | |
| audio_wav_path = "/tmp/audio_embeddings/sum.wav" | |
| torch.save(audio_embeddings, emb_path) | |
| sf.write(audio_wav_path, audio, sr) | |
| # Prepare input dictionary (matches generate_infinitetalk.py format) | |
| input_clip = { | |
| "prompt": "", # Empty prompt for talking head | |
| "cond_video": image_or_video, | |
| "cond_audio": { | |
| "person1": emb_path | |
| }, | |
| "video_audio": audio_wav_path | |
| } | |
| # Calculate sample_shift based on resolution | |
| sample_shift = 7 if resolution == "480p" else 11 | |
| # Call InfiniteTalk pipeline | |
| video_tensor = wan_pipeline.generate_infinitetalk( | |
| input_clip, | |
| size_buckget=size, | |
| motion_frame=9, # Default motion frame | |
| frame_num=81, # Default frame num (4n+1 format) | |
| shift=sample_shift, | |
| sampling_steps=steps, | |
| text_guide_scale=5.0, # Default text guidance | |
| audio_guide_scale=audio_guide_scale, | |
| seed=seed, | |
| offload_model=True, | |
| max_frames_num=81, # For clip mode | |
| color_correction_strength=1.0, | |
| extra_args=None | |
| ) | |
| # Save video with audio | |
| from wan.utils.multitalk_utils import save_video_ffmpeg | |
| save_video_ffmpeg( | |
| video_tensor, | |
| output_path.replace(".mp4", ""), # Function adds .mp4 extension | |
| [audio_wav_path], | |
| high_quality_save=False | |
| ) | |
| progress(0.9, desc="Finalizing...") | |
| # Cleanup | |
| gpu_manager.cleanup() | |
| progress(1.0, desc="Complete!") | |
| logger.info(f"Video generated successfully: {output_path}") | |
| return output_path | |
| except Exception as e: | |
| logger.error(f"Error generating video: {e}") | |
| gpu_manager.cleanup() | |
| raise gr.Error(f"Generation failed: {str(e)}") | |
| def create_interface(): | |
| """Create Gradio interface""" | |
| with gr.Blocks(title="InfiniteTalk - Talking Video Generator", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown(""" | |
| # 🎬 InfiniteTalk - Talking Video Generator | |
| Generate realistic talking head videos with accurate lip-sync from images or dub existing videos with new audio! | |
| **Note**: First generation may take 2-3 minutes while models download. Subsequent generations are much faster (~40s for 10s video). | |
| """) | |
| with gr.Tabs(): | |
| # Tab 1: Image-to-Video | |
| with gr.Tab("📸 Image-to-Video"): | |
| gr.Markdown("Transform a static portrait into a talking video") | |
| with gr.Row(): | |
| with gr.Column(): | |
| image_input = gr.Image( | |
| type="filepath", | |
| label="Upload Portrait Image (clear face visibility recommended)" | |
| ) | |
| audio_input_i2v = gr.Audio( | |
| type="filepath", | |
| label="Upload Audio (MP3, WAV, or FLAC)" | |
| ) | |
| with gr.Accordion("Advanced Settings", open=False): | |
| resolution_i2v = gr.Radio( | |
| choices=["480p", "720p"], | |
| value="480p", | |
| label="Resolution (480p faster, 720p higher quality)" | |
| ) | |
| steps_i2v = gr.Slider( | |
| minimum=20, | |
| maximum=50, | |
| value=40, | |
| step=1, | |
| label="Diffusion Steps (more = higher quality but slower)" | |
| ) | |
| audio_scale_i2v = gr.Slider( | |
| minimum=1.0, | |
| maximum=5.0, | |
| value=3.0, | |
| step=0.5, | |
| label="Audio Guide Scale (2-4 recommended)" | |
| ) | |
| seed_i2v = gr.Number( | |
| value=-1, | |
| label="Seed (-1 for random)" | |
| ) | |
| generate_btn_i2v = gr.Button("🎬 Generate Video", variant="primary", size="lg") | |
| with gr.Column(): | |
| output_video_i2v = gr.Video(label="Generated Video") | |
| gr.Markdown("**💡 Tip**: Use high-quality portrait images with clear facial features for best results") | |
| generate_btn_i2v.click( | |
| fn=generate_video, | |
| inputs=[image_input, audio_input_i2v, resolution_i2v, steps_i2v, audio_scale_i2v, seed_i2v], | |
| outputs=output_video_i2v | |
| ) | |
| # Tab 2: Video Dubbing | |
| with gr.Tab("🎥 Video Dubbing"): | |
| gr.Markdown("Dub an existing video with new audio while maintaining natural movements") | |
| with gr.Row(): | |
| with gr.Column(): | |
| video_input = gr.Video( | |
| label="Upload Video (with visible face)" | |
| ) | |
| audio_input_v2v = gr.Audio( | |
| type="filepath", | |
| label="Upload New Audio (MP3, WAV, or FLAC)" | |
| ) | |
| with gr.Accordion("Advanced Settings", open=False): | |
| resolution_v2v = gr.Radio( | |
| choices=["480p", "720p"], | |
| value="480p", | |
| label="Resolution" | |
| ) | |
| steps_v2v = gr.Slider( | |
| minimum=20, | |
| maximum=50, | |
| value=40, | |
| step=1, | |
| label="Diffusion Steps" | |
| ) | |
| audio_scale_v2v = gr.Slider( | |
| minimum=1.0, | |
| maximum=5.0, | |
| value=3.0, | |
| step=0.5, | |
| label="Audio Guide Scale" | |
| ) | |
| seed_v2v = gr.Number( | |
| value=-1, | |
| label="Seed" | |
| ) | |
| generate_btn_v2v = gr.Button("🎬 Generate Dubbed Video", variant="primary", size="lg") | |
| with gr.Column(): | |
| output_video_v2v = gr.Video(label="Dubbed Video") | |
| gr.Markdown("**💡 Tip**: For best results, use videos with consistent face visibility throughout") | |
| generate_btn_v2v.click( | |
| fn=generate_video, | |
| inputs=[video_input, audio_input_v2v, resolution_v2v, steps_v2v, audio_scale_v2v, seed_v2v], | |
| outputs=output_video_v2v | |
| ) | |
| # Footer | |
| gr.Markdown(""" | |
| --- | |
| ### About | |
| Powered by [InfiniteTalk](https://github.com/MeiGen-AI/InfiniteTalk) - Apache 2.0 License | |
| ⚠️ **Note**: This Space requires GPU hardware to generate videos. Apply for a Community GPU Grant in Settings. | |
| 💡 **Tips**: | |
| - First generation downloads models (~15GB) and may take 2-3 minutes | |
| - Use 480p for faster generation (~40s for 10s video) | |
| - Use 720p for higher quality (slower but better results) | |
| - Clear, well-lit images produce the best results | |
| """) | |
| return demo | |
| if __name__ == "__main__": | |
| demo = create_interface() | |
| demo.queue(max_size=10) | |
| demo.launch() | |