"""Main processing workflows for OutofLipSync""" import gc import logging import os import traceback import spaces import gradio as gr import psutil import torch from audio_processing import ( get_audio_duration, prepare_audio_for_lipsync, prepare_audio_for_youtube_aac, prepare_audio_for_youtube, ) from config import PROCESSED_RESULTS_DIR from lipsync_processing import apply_lipsync_to_video, get_video_info from time_util import timer from utils import setup_output_dir from video_processing import ( normalize_video_for_youtube, merge_audio_video, ) logger = logging.getLogger(__name__) os.environ["PROCESSED_RESULTS"] = f"{os.getcwd()}/{PROCESSED_RESULTS_DIR}" def get_memory_usage(): """Get current RAM and GPU memory usage""" ram = psutil.virtual_memory() ram_used_gb = ram.used / (1024**3) ram_percent = ram.percent if torch.cuda.is_available(): gpu_used_gb = torch.cuda.memory_allocated(0) / (1024**3) gpu_total_gb = torch.cuda.get_device_properties(0).total_memory / (1024**3) gpu_percent = (gpu_used_gb / gpu_total_gb) * 100 gpu_info = ( f" | GPU: {gpu_used_gb:.2f}GB / {gpu_total_gb:.2f}GB ({gpu_percent:.1f}%)" ) else: gpu_info = "" return f"RAM: {ram_used_gb:.2f}GB / {ram.total / (1024**3):.2f}GB ({ram_percent:.1f}%){gpu_info}" def validate_input(video_file, audio_file): """Validate input files Args: video_file: Video input audio_file: Audio input Returns: Tuple of (video_path, audio_path) """ if video_file is None: raise gr.Error("Please upload a video source.") if audio_file is None: raise gr.Error("Please upload a target audio.") if isinstance(video_file, dict): video_path = video_file.get("name") or video_file.get("path") else: video_path = video_file if isinstance(audio_file, dict): audio_path = audio_file.get("name") or audio_file.get("path") else: audio_path = audio_file if video_path is None or not os.path.exists(video_path): raise gr.Error("Could not read uploaded video file.") if audio_path is None or not os.path.exists(audio_path): raise gr.Error("Could not read uploaded audio file.") return video_path, audio_path # def process_lipsync_with_audio_target( # video_file, # audio_file, # session_id=None, # crop_size=256, # progress=gr.Progress(track_tqdm=True), # ): # """Lipsync video source với audio target (DEPRECATED - use process_lipsync_with_audio_target_new) # # Args: # video_file: Path to video source # audio_file: Path to audio target (English only) # session_id: Session identifier # crop_size: Size of crop region (256 or 512) # progress: Progress tracking object # # Returns: # Tuple of (final_video, video_looped, face_cropped, lipsynced_face, lipsynced_full) # """ # video_looped = None # face_cropped = None # lipsynced_face = None # lipsynced_full = None # final_video = None # error_msg = None # # try: # video_path, audio_path = validate_input(video_file, audio_file) # # output_dir = setup_output_dir(session_id) # # logger.info(f"Memory at start: {get_memory_usage()}") # # audio_duration = get_audio_duration(audio_path) # # progress(0.1, desc="🎬 Đang chuẩn bị video...") # logger.info(f"Memory before video loop: {get_memory_usage()}") # # with timer("Looping/cropping video to match audio"): # try: # video_looped = loop_video_to_match_audio( # video_path, audio_duration, output_dir # ) # except Exception as e: # error_msg = f"Video loop failed: {str(e)}" # logger.error(error_msg) # traceback.print_exc() # return ( # final_video, # video_looped, # face_cropped, # lipsynced_face, # lipsynced_full, # ) # # gc.collect() # logger.info(f"Memory after video loop: {get_memory_usage()}") # # progress(0.2, desc="👤 Đang phát hiện khuôn mặt...") # with timer("Detecting face"): # try: # face_bbox = detect_face_region(video_looped, output_dir, crop_size) # except FaceDetectionError as e: # error_msg = str(e) # logger.error(f"Face detection failed: {e}") # return ( # final_video, # video_looped, # face_cropped, # lipsynced_face, # lipsynced_full, # ) # # gc.collect() # logger.info(f"Memory after face detection: {get_memory_usage()}") # # actual_crop_size = crop_size * 2 # progress( # 0.25, desc=f"👤 Đang tính lại crop {actual_crop_size}x{actual_crop_size}..." # ) # with timer( # f"Recalculating crop bbox for {actual_crop_size}x{actual_crop_size}" # ): # from video_processing import get_video_info # # try: # video_info = get_video_info(video_looped) # crop_bbox = calculate_safe_crop_size( # face_bbox["face_bbox"], # video_info["width"], # video_info["height"], # actual_crop_size, # ) # except Exception as e: # error_msg = f"Calculate crop bbox failed: {str(e)}" # logger.error(error_msg) # traceback.print_exc() # return ( # final_video, # video_looped, # face_cropped, # lipsynced_face, # lipsynced_full, # ) # # progress( # 0.3, desc=f"✂️ Đang crop video {actual_crop_size}x{actual_crop_size}..." # ) # with timer(f"Cropping video to {actual_crop_size}x{actual_crop_size}"): # try: # face_cropped = crop_video_to_size( # video_looped, crop_bbox, output_dir, actual_crop_size # ) # except Exception as e: # error_msg = f"Crop video failed: {str(e)}" # logger.error(error_msg) # traceback.print_exc() # return ( # final_video, # video_looped, # face_cropped, # lipsynced_face, # lipsynced_full, # ) # # gc.collect() # logger.info(f"Memory after crop: {get_memory_usage()}") # # progress(0.4, desc="🎵 Đang xử lý audio...") # logger.info(f"Memory before audio prep: {get_memory_usage()}") # # with timer("Preparing target audio"): # try: # audio_16k, audio_upsampled = prepare_target_audio( # audio_path, output_dir # ) # except Exception as e: # error_msg = f"Prepare audio failed: {str(e)}" # logger.error(error_msg) # traceback.print_exc() # return ( # final_video, # video_looped, # face_cropped, # lipsynced_face, # lipsynced_full, # ) # # gc.collect() # logger.info(f"Memory after audio prep: {get_memory_usage()}") # # progress(0.6, desc="👄 Đang lipsync...") # # video_info = get_video_info(face_cropped) # logger.info( # f"Starting lipsync: video={face_cropped}, audio_16k={audio_16k}, output={output_dir}" # ) # logger.info( # f"Video info: {video_info['width']}x{video_info['height']}, {video_info['fps']:.1f}fps, {video_info['duration']:.1f}s" # ) # logger.info(f"Memory before lipsync: {get_memory_usage()}") # # with timer("Applying lipsync"): # try: # lipsynced_face, lipsynced_info = apply_lipsync_to_video( # face_cropped, audio_16k, output_dir, crop_size # ) # logger.info( # f"Lipsynced video size: {lipsynced_info['width']}x{lipsynced_info['height']}" # ) # except Exception as e: # error_msg = f"Lipsync failed: {str(e)}" # logger.error(f"Lipsync failed with error: {type(e).__name__}: {e}") # logger.error(f"Memory after crash: {get_memory_usage()}") # traceback.print_exc() # return ( # final_video, # video_looped, # face_cropped, # lipsynced_face, # lipsynced_full, # ) # # gc.collect() # logger.info(f"Memory after lipsync: {get_memory_usage()}") # # progress(0.8, desc="🔀 Đang ghép video...") # with timer("Blending face into original"): # try: # lipsynced_full = blend_face_into_original( # video_looped, lipsynced_face, crop_bbox, output_dir, lipsynced_info # ) # except Exception as e: # error_msg = f"Blend video failed: {str(e)}" # logger.error(error_msg) # traceback.print_exc() # return ( # final_video, # video_looped, # face_cropped, # lipsynced_face, # lipsynced_full, # ) # # gc.collect() # logger.info(f"Memory after blend: {get_memory_usage()}") # # progress(0.9, desc="🔗 Đang ghép audio...") # try: # audio_final = prepare_audio_for_youtube(audio_upsampled, output_dir) # final_video = merge_audio_video(lipsynced_full, audio_final, output_dir) # except Exception as e: # error_msg = f"Merge audio failed: {str(e)}" # logger.error(error_msg) # traceback.print_exc() # return ( # final_video, # video_looped, # face_cropped, # lipsynced_face, # lipsynced_full, # ) # # progress(1.0, desc="✅ Hoàn tất!") # logger.info(f"Memory at end: {get_memory_usage()}") # # return final_video, video_looped, face_cropped, lipsynced_face, lipsynced_full # # except Exception as e: # print(f"ERROR in process_lipsync_with_audio_target: {e}") # traceback.print_exc() # return final_video, video_looped, face_cropped, lipsynced_face, lipsynced_full def process_lipsync_with_audio_target_new( video_file, audio_file, session_id=None, model_type="latentsync", quality_level="Normal", progress=gr.Progress(track_tqdm=True), ): """Workflow mới: Chuẩn hóa YouTube rồi lipsync Steps: 1. Validate inputs 2. Chuẩn hóa video YouTube (loop/crop + re-encode) 3. Chuẩn hóa audio YouTube (AAC 320k) 4. Chuẩn bị audio 16k cho lipsync 5. Lipsync pipeline (tự detect/crop/lipsync/restore) 6. Merge audio YouTube + video lipsynced Args: video_file: Path to video source audio_file: Path to audio target (English only) session_id: Session identifier model_type: Model type for lipsync ("latentsync" or "musetalk") quality_level: Quality level ("Fast", "Normal", "Medium", "Best", "Super Best") progress: Progress tracking object Returns: Tuple of (final_video, video_normalized, lipsynced_video) """ video_normalized = None lipsynced_video = None final_video = None try: video_path, audio_path = validate_input(video_file, audio_file) output_dir = setup_output_dir(session_id) # Mapping model_type to crop_size if model_type == "LatentSync v1.6": logger.info("Using LatentSync v1.6 with crop_size=512") elif model_type == "MuseTalk v1.5": logger.info("Using MuseTalk v1.5 with crop_size=256") else: raise ValueError(f"Unknown model_type: {model_type}") logger.info(f"Memory at start: {get_memory_usage()}") audio_duration = get_audio_duration(audio_path) logger.info(f"Audio duration: {audio_duration:.2f}s") progress(0.15, desc="🎬 Đang chuẩn hóa video YouTube...") logger.info(f"Memory before video normalization: {get_memory_usage()}") with timer("Normalizing video for YouTube"): video_normalized = normalize_video_for_youtube( video_path, audio_duration, output_dir ) video_info = get_video_info(video_normalized) logger.info( f"Normalized video: {video_info['width']}x{video_info['height']}, " f"{video_info['fps']:.1f}fps, {video_info['duration']:.1f}s" ) gc.collect() logger.info(f"Memory after video normalization: {get_memory_usage()}") progress(0.25, desc="🎵 Đang chuẩn hóa audio YouTube...") logger.info(f"Memory before audio normalization: {get_memory_usage()}") with timer("Normalizing audio for YouTube"): audio_youtube = prepare_audio_for_youtube_aac(audio_path, output_dir) logger.info(f"Audio YouTube: {audio_youtube}") gc.collect() logger.info(f"Memory after audio normalization: {get_memory_usage()}") progress(0.35, desc="🔊 Đang chuẩn bị audio cho lipsync...") with timer("Preparing audio for lipsync"): audio_16k = prepare_audio_for_lipsync(audio_path, output_dir) logger.info(f"Audio 16k for lipsync: {audio_16k}") gc.collect() logger.info(f"Memory after audio preparation: {get_memory_usage()}") progress(0.55, desc="👄 Đang lipsync...") logger.info( f"Starting lipsync: video={video_normalized}, audio_16k={audio_16k}, output={output_dir}" ) logger.info(f"Memory before lipsync: {get_memory_usage()}") with timer("Applying lipsync"): lipsynced_video, lipsynced_info = apply_lipsync_to_video( video_normalized, audio_16k, output_dir, model_type, quality_level ) logger.info( f"Lipsynced video: {lipsynced_video}, size: {lipsynced_info['width']}x{lipsynced_info['height']}" ) gc.collect() logger.info(f"Memory after lipsync: {get_memory_usage()}") progress(0.85, desc="🔗 Đang ghép audio YouTube...") logger.info(f"Memory before merge: {get_memory_usage()}") with timer("Merging audio and video"): final_video = merge_audio_video(lipsynced_video, audio_youtube, output_dir) logger.info(f"Final video: {final_video}") progress(1.0, desc="✅ Hoàn tất!") logger.info(f"Memory at end: {get_memory_usage()}") return final_video except Exception as e: logger.error(f"ERROR in process_lipsync_with_audio_target_new: {e}") traceback.print_exc() raise gr.Error(f"Lỗi xử lý: {str(e)}") def lipsync_with_audio_target( video_file, audio_file, session_id=None, quality_level="Normal", model_type="LatentSync v1.6", progress=gr.Progress(track_tqdm=True), ): """Wrapper for Gradio: Lipsync video source with audio target (English only) Returns: Tuple of (final_video, video_normalized, lipsynced_video) """ if video_file is None: raise gr.Error("Please upload a video source.") if audio_file is None: raise gr.Error("Please upload a target audio.") return process_lipsync_with_audio_target_new( video_file, audio_file, session_id, model_type, quality_level, progress )