Spaces:
Runtime error
Runtime error
| """Main processing workflows for OutofLipSync""" | |
| import gc | |
| import logging | |
| import os | |
| import traceback | |
| import spaces | |
| import gradio as gr | |
| import psutil | |
| import torch | |
| from audio_processing import ( | |
| get_audio_duration, | |
| prepare_audio_for_lipsync, | |
| prepare_audio_for_youtube_aac, | |
| prepare_audio_for_youtube, | |
| ) | |
| from config import PROCESSED_RESULTS_DIR | |
| from lipsync_processing import apply_lipsync_to_video, get_video_info | |
| from time_util import timer | |
| from utils import setup_output_dir | |
| from video_processing import ( | |
| normalize_video_for_youtube, | |
| merge_audio_video, | |
| ) | |
| logger = logging.getLogger(__name__) | |
| os.environ["PROCESSED_RESULTS"] = f"{os.getcwd()}/{PROCESSED_RESULTS_DIR}" | |
| def get_memory_usage(): | |
| """Get current RAM and GPU memory usage""" | |
| ram = psutil.virtual_memory() | |
| ram_used_gb = ram.used / (1024**3) | |
| ram_percent = ram.percent | |
| if torch.cuda.is_available(): | |
| gpu_used_gb = torch.cuda.memory_allocated(0) / (1024**3) | |
| gpu_total_gb = torch.cuda.get_device_properties(0).total_memory / (1024**3) | |
| gpu_percent = (gpu_used_gb / gpu_total_gb) * 100 | |
| gpu_info = ( | |
| f" | GPU: {gpu_used_gb:.2f}GB / {gpu_total_gb:.2f}GB ({gpu_percent:.1f}%)" | |
| ) | |
| else: | |
| gpu_info = "" | |
| return f"RAM: {ram_used_gb:.2f}GB / {ram.total / (1024**3):.2f}GB ({ram_percent:.1f}%){gpu_info}" | |
| def validate_input(video_file, audio_file): | |
| """Validate input files | |
| Args: | |
| video_file: Video input | |
| audio_file: Audio input | |
| Returns: | |
| Tuple of (video_path, audio_path) | |
| """ | |
| if video_file is None: | |
| raise gr.Error("Please upload a video source.") | |
| if audio_file is None: | |
| raise gr.Error("Please upload a target audio.") | |
| if isinstance(video_file, dict): | |
| video_path = video_file.get("name") or video_file.get("path") | |
| else: | |
| video_path = video_file | |
| if isinstance(audio_file, dict): | |
| audio_path = audio_file.get("name") or audio_file.get("path") | |
| else: | |
| audio_path = audio_file | |
| if video_path is None or not os.path.exists(video_path): | |
| raise gr.Error("Could not read uploaded video file.") | |
| if audio_path is None or not os.path.exists(audio_path): | |
| raise gr.Error("Could not read uploaded audio file.") | |
| return video_path, audio_path | |
| # def process_lipsync_with_audio_target( | |
| # video_file, | |
| # audio_file, | |
| # session_id=None, | |
| # crop_size=256, | |
| # progress=gr.Progress(track_tqdm=True), | |
| # ): | |
| # """Lipsync video source với audio target (DEPRECATED - use process_lipsync_with_audio_target_new) | |
| # | |
| # Args: | |
| # video_file: Path to video source | |
| # audio_file: Path to audio target (English only) | |
| # session_id: Session identifier | |
| # crop_size: Size of crop region (256 or 512) | |
| # progress: Progress tracking object | |
| # | |
| # Returns: | |
| # Tuple of (final_video, video_looped, face_cropped, lipsynced_face, lipsynced_full) | |
| # """ | |
| # video_looped = None | |
| # face_cropped = None | |
| # lipsynced_face = None | |
| # lipsynced_full = None | |
| # final_video = None | |
| # error_msg = None | |
| # | |
| # try: | |
| # video_path, audio_path = validate_input(video_file, audio_file) | |
| # | |
| # output_dir = setup_output_dir(session_id) | |
| # | |
| # logger.info(f"Memory at start: {get_memory_usage()}") | |
| # | |
| # audio_duration = get_audio_duration(audio_path) | |
| # | |
| # progress(0.1, desc="🎬 Đang chuẩn bị video...") | |
| # logger.info(f"Memory before video loop: {get_memory_usage()}") | |
| # | |
| # with timer("Looping/cropping video to match audio"): | |
| # try: | |
| # video_looped = loop_video_to_match_audio( | |
| # video_path, audio_duration, output_dir | |
| # ) | |
| # except Exception as e: | |
| # error_msg = f"Video loop failed: {str(e)}" | |
| # logger.error(error_msg) | |
| # traceback.print_exc() | |
| # return ( | |
| # final_video, | |
| # video_looped, | |
| # face_cropped, | |
| # lipsynced_face, | |
| # lipsynced_full, | |
| # ) | |
| # | |
| # gc.collect() | |
| # logger.info(f"Memory after video loop: {get_memory_usage()}") | |
| # | |
| # progress(0.2, desc="👤 Đang phát hiện khuôn mặt...") | |
| # with timer("Detecting face"): | |
| # try: | |
| # face_bbox = detect_face_region(video_looped, output_dir, crop_size) | |
| # except FaceDetectionError as e: | |
| # error_msg = str(e) | |
| # logger.error(f"Face detection failed: {e}") | |
| # return ( | |
| # final_video, | |
| # video_looped, | |
| # face_cropped, | |
| # lipsynced_face, | |
| # lipsynced_full, | |
| # ) | |
| # | |
| # gc.collect() | |
| # logger.info(f"Memory after face detection: {get_memory_usage()}") | |
| # | |
| # actual_crop_size = crop_size * 2 | |
| # progress( | |
| # 0.25, desc=f"👤 Đang tính lại crop {actual_crop_size}x{actual_crop_size}..." | |
| # ) | |
| # with timer( | |
| # f"Recalculating crop bbox for {actual_crop_size}x{actual_crop_size}" | |
| # ): | |
| # from video_processing import get_video_info | |
| # | |
| # try: | |
| # video_info = get_video_info(video_looped) | |
| # crop_bbox = calculate_safe_crop_size( | |
| # face_bbox["face_bbox"], | |
| # video_info["width"], | |
| # video_info["height"], | |
| # actual_crop_size, | |
| # ) | |
| # except Exception as e: | |
| # error_msg = f"Calculate crop bbox failed: {str(e)}" | |
| # logger.error(error_msg) | |
| # traceback.print_exc() | |
| # return ( | |
| # final_video, | |
| # video_looped, | |
| # face_cropped, | |
| # lipsynced_face, | |
| # lipsynced_full, | |
| # ) | |
| # | |
| # progress( | |
| # 0.3, desc=f"✂️ Đang crop video {actual_crop_size}x{actual_crop_size}..." | |
| # ) | |
| # with timer(f"Cropping video to {actual_crop_size}x{actual_crop_size}"): | |
| # try: | |
| # face_cropped = crop_video_to_size( | |
| # video_looped, crop_bbox, output_dir, actual_crop_size | |
| # ) | |
| # except Exception as e: | |
| # error_msg = f"Crop video failed: {str(e)}" | |
| # logger.error(error_msg) | |
| # traceback.print_exc() | |
| # return ( | |
| # final_video, | |
| # video_looped, | |
| # face_cropped, | |
| # lipsynced_face, | |
| # lipsynced_full, | |
| # ) | |
| # | |
| # gc.collect() | |
| # logger.info(f"Memory after crop: {get_memory_usage()}") | |
| # | |
| # progress(0.4, desc="🎵 Đang xử lý audio...") | |
| # logger.info(f"Memory before audio prep: {get_memory_usage()}") | |
| # | |
| # with timer("Preparing target audio"): | |
| # try: | |
| # audio_16k, audio_upsampled = prepare_target_audio( | |
| # audio_path, output_dir | |
| # ) | |
| # except Exception as e: | |
| # error_msg = f"Prepare audio failed: {str(e)}" | |
| # logger.error(error_msg) | |
| # traceback.print_exc() | |
| # return ( | |
| # final_video, | |
| # video_looped, | |
| # face_cropped, | |
| # lipsynced_face, | |
| # lipsynced_full, | |
| # ) | |
| # | |
| # gc.collect() | |
| # logger.info(f"Memory after audio prep: {get_memory_usage()}") | |
| # | |
| # progress(0.6, desc="👄 Đang lipsync...") | |
| # | |
| # video_info = get_video_info(face_cropped) | |
| # logger.info( | |
| # f"Starting lipsync: video={face_cropped}, audio_16k={audio_16k}, output={output_dir}" | |
| # ) | |
| # logger.info( | |
| # f"Video info: {video_info['width']}x{video_info['height']}, {video_info['fps']:.1f}fps, {video_info['duration']:.1f}s" | |
| # ) | |
| # logger.info(f"Memory before lipsync: {get_memory_usage()}") | |
| # | |
| # with timer("Applying lipsync"): | |
| # try: | |
| # lipsynced_face, lipsynced_info = apply_lipsync_to_video( | |
| # face_cropped, audio_16k, output_dir, crop_size | |
| # ) | |
| # logger.info( | |
| # f"Lipsynced video size: {lipsynced_info['width']}x{lipsynced_info['height']}" | |
| # ) | |
| # except Exception as e: | |
| # error_msg = f"Lipsync failed: {str(e)}" | |
| # logger.error(f"Lipsync failed with error: {type(e).__name__}: {e}") | |
| # logger.error(f"Memory after crash: {get_memory_usage()}") | |
| # traceback.print_exc() | |
| # return ( | |
| # final_video, | |
| # video_looped, | |
| # face_cropped, | |
| # lipsynced_face, | |
| # lipsynced_full, | |
| # ) | |
| # | |
| # gc.collect() | |
| # logger.info(f"Memory after lipsync: {get_memory_usage()}") | |
| # | |
| # progress(0.8, desc="🔀 Đang ghép video...") | |
| # with timer("Blending face into original"): | |
| # try: | |
| # lipsynced_full = blend_face_into_original( | |
| # video_looped, lipsynced_face, crop_bbox, output_dir, lipsynced_info | |
| # ) | |
| # except Exception as e: | |
| # error_msg = f"Blend video failed: {str(e)}" | |
| # logger.error(error_msg) | |
| # traceback.print_exc() | |
| # return ( | |
| # final_video, | |
| # video_looped, | |
| # face_cropped, | |
| # lipsynced_face, | |
| # lipsynced_full, | |
| # ) | |
| # | |
| # gc.collect() | |
| # logger.info(f"Memory after blend: {get_memory_usage()}") | |
| # | |
| # progress(0.9, desc="🔗 Đang ghép audio...") | |
| # try: | |
| # audio_final = prepare_audio_for_youtube(audio_upsampled, output_dir) | |
| # final_video = merge_audio_video(lipsynced_full, audio_final, output_dir) | |
| # except Exception as e: | |
| # error_msg = f"Merge audio failed: {str(e)}" | |
| # logger.error(error_msg) | |
| # traceback.print_exc() | |
| # return ( | |
| # final_video, | |
| # video_looped, | |
| # face_cropped, | |
| # lipsynced_face, | |
| # lipsynced_full, | |
| # ) | |
| # | |
| # progress(1.0, desc="✅ Hoàn tất!") | |
| # logger.info(f"Memory at end: {get_memory_usage()}") | |
| # | |
| # return final_video, video_looped, face_cropped, lipsynced_face, lipsynced_full | |
| # | |
| # except Exception as e: | |
| # print(f"ERROR in process_lipsync_with_audio_target: {e}") | |
| # traceback.print_exc() | |
| # return final_video, video_looped, face_cropped, lipsynced_face, lipsynced_full | |
| def process_lipsync_with_audio_target_new( | |
| video_file, | |
| audio_file, | |
| session_id=None, | |
| model_type="latentsync", | |
| quality_level="Normal", | |
| progress=gr.Progress(track_tqdm=True), | |
| ): | |
| """Workflow mới: Chuẩn hóa YouTube rồi lipsync | |
| Steps: | |
| 1. Validate inputs | |
| 2. Chuẩn hóa video YouTube (loop/crop + re-encode) | |
| 3. Chuẩn hóa audio YouTube (AAC 320k) | |
| 4. Chuẩn bị audio 16k cho lipsync | |
| 5. Lipsync pipeline (tự detect/crop/lipsync/restore) | |
| 6. Merge audio YouTube + video lipsynced | |
| Args: | |
| video_file: Path to video source | |
| audio_file: Path to audio target (English only) | |
| session_id: Session identifier | |
| model_type: Model type for lipsync ("latentsync" or "musetalk") | |
| quality_level: Quality level ("Fast", "Normal", "Medium", "Best", "Super Best") | |
| progress: Progress tracking object | |
| Returns: | |
| Tuple of (final_video, video_normalized, lipsynced_video) | |
| """ | |
| video_normalized = None | |
| lipsynced_video = None | |
| final_video = None | |
| try: | |
| video_path, audio_path = validate_input(video_file, audio_file) | |
| output_dir = setup_output_dir(session_id) | |
| # Mapping model_type to crop_size | |
| if model_type == "LatentSync v1.6": | |
| logger.info("Using LatentSync v1.6 with crop_size=512") | |
| elif model_type == "MuseTalk v1.5": | |
| logger.info("Using MuseTalk v1.5 with crop_size=256") | |
| else: | |
| raise ValueError(f"Unknown model_type: {model_type}") | |
| logger.info(f"Memory at start: {get_memory_usage()}") | |
| audio_duration = get_audio_duration(audio_path) | |
| logger.info(f"Audio duration: {audio_duration:.2f}s") | |
| progress(0.15, desc="🎬 Đang chuẩn hóa video YouTube...") | |
| logger.info(f"Memory before video normalization: {get_memory_usage()}") | |
| with timer("Normalizing video for YouTube"): | |
| video_normalized = normalize_video_for_youtube( | |
| video_path, audio_duration, output_dir | |
| ) | |
| video_info = get_video_info(video_normalized) | |
| logger.info( | |
| f"Normalized video: {video_info['width']}x{video_info['height']}, " | |
| f"{video_info['fps']:.1f}fps, {video_info['duration']:.1f}s" | |
| ) | |
| gc.collect() | |
| logger.info(f"Memory after video normalization: {get_memory_usage()}") | |
| progress(0.25, desc="🎵 Đang chuẩn hóa audio YouTube...") | |
| logger.info(f"Memory before audio normalization: {get_memory_usage()}") | |
| with timer("Normalizing audio for YouTube"): | |
| audio_youtube = prepare_audio_for_youtube_aac(audio_path, output_dir) | |
| logger.info(f"Audio YouTube: {audio_youtube}") | |
| gc.collect() | |
| logger.info(f"Memory after audio normalization: {get_memory_usage()}") | |
| progress(0.35, desc="🔊 Đang chuẩn bị audio cho lipsync...") | |
| with timer("Preparing audio for lipsync"): | |
| audio_16k = prepare_audio_for_lipsync(audio_path, output_dir) | |
| logger.info(f"Audio 16k for lipsync: {audio_16k}") | |
| gc.collect() | |
| logger.info(f"Memory after audio preparation: {get_memory_usage()}") | |
| progress(0.55, desc="👄 Đang lipsync...") | |
| logger.info( | |
| f"Starting lipsync: video={video_normalized}, audio_16k={audio_16k}, output={output_dir}" | |
| ) | |
| logger.info(f"Memory before lipsync: {get_memory_usage()}") | |
| with timer("Applying lipsync"): | |
| lipsynced_video, lipsynced_info = apply_lipsync_to_video( | |
| video_normalized, audio_16k, output_dir, model_type, quality_level | |
| ) | |
| logger.info( | |
| f"Lipsynced video: {lipsynced_video}, size: {lipsynced_info['width']}x{lipsynced_info['height']}" | |
| ) | |
| gc.collect() | |
| logger.info(f"Memory after lipsync: {get_memory_usage()}") | |
| progress(0.85, desc="🔗 Đang ghép audio YouTube...") | |
| logger.info(f"Memory before merge: {get_memory_usage()}") | |
| with timer("Merging audio and video"): | |
| final_video = merge_audio_video(lipsynced_video, audio_youtube, output_dir) | |
| logger.info(f"Final video: {final_video}") | |
| progress(1.0, desc="✅ Hoàn tất!") | |
| logger.info(f"Memory at end: {get_memory_usage()}") | |
| return final_video | |
| except Exception as e: | |
| logger.error(f"ERROR in process_lipsync_with_audio_target_new: {e}") | |
| traceback.print_exc() | |
| raise gr.Error(f"Lỗi xử lý: {str(e)}") | |
| def lipsync_with_audio_target( | |
| video_file, | |
| audio_file, | |
| session_id=None, | |
| quality_level="Normal", | |
| model_type="LatentSync v1.6", | |
| progress=gr.Progress(track_tqdm=True), | |
| ): | |
| """Wrapper for Gradio: Lipsync video source with audio target (English only) | |
| Returns: | |
| Tuple of (final_video, video_normalized, lipsynced_video) | |
| """ | |
| if video_file is None: | |
| raise gr.Error("Please upload a video source.") | |
| if audio_file is None: | |
| raise gr.Error("Please upload a target audio.") | |
| return process_lipsync_with_audio_target_new( | |
| video_file, audio_file, session_id, model_type, quality_level, progress | |
| ) | |