lipsync-docker / processing.py
naicoi's picture
update
ab7144c
"""Main processing workflows for OutofLipSync"""
import gc
import logging
import os
import traceback
import spaces
import gradio as gr
import psutil
import torch
from audio_processing import (
get_audio_duration,
prepare_audio_for_lipsync,
prepare_audio_for_youtube_aac,
prepare_audio_for_youtube,
)
from config import PROCESSED_RESULTS_DIR
from lipsync_processing import apply_lipsync_to_video, get_video_info
from time_util import timer
from utils import setup_output_dir
from video_processing import (
normalize_video_for_youtube,
merge_audio_video,
)
logger = logging.getLogger(__name__)
os.environ["PROCESSED_RESULTS"] = f"{os.getcwd()}/{PROCESSED_RESULTS_DIR}"
def get_memory_usage():
"""Get current RAM and GPU memory usage"""
ram = psutil.virtual_memory()
ram_used_gb = ram.used / (1024**3)
ram_percent = ram.percent
if torch.cuda.is_available():
gpu_used_gb = torch.cuda.memory_allocated(0) / (1024**3)
gpu_total_gb = torch.cuda.get_device_properties(0).total_memory / (1024**3)
gpu_percent = (gpu_used_gb / gpu_total_gb) * 100
gpu_info = (
f" | GPU: {gpu_used_gb:.2f}GB / {gpu_total_gb:.2f}GB ({gpu_percent:.1f}%)"
)
else:
gpu_info = ""
return f"RAM: {ram_used_gb:.2f}GB / {ram.total / (1024**3):.2f}GB ({ram_percent:.1f}%){gpu_info}"
def validate_input(video_file, audio_file):
"""Validate input files
Args:
video_file: Video input
audio_file: Audio input
Returns:
Tuple of (video_path, audio_path)
"""
if video_file is None:
raise gr.Error("Please upload a video source.")
if audio_file is None:
raise gr.Error("Please upload a target audio.")
if isinstance(video_file, dict):
video_path = video_file.get("name") or video_file.get("path")
else:
video_path = video_file
if isinstance(audio_file, dict):
audio_path = audio_file.get("name") or audio_file.get("path")
else:
audio_path = audio_file
if video_path is None or not os.path.exists(video_path):
raise gr.Error("Could not read uploaded video file.")
if audio_path is None or not os.path.exists(audio_path):
raise gr.Error("Could not read uploaded audio file.")
return video_path, audio_path
# def process_lipsync_with_audio_target(
# video_file,
# audio_file,
# session_id=None,
# crop_size=256,
# progress=gr.Progress(track_tqdm=True),
# ):
# """Lipsync video source với audio target (DEPRECATED - use process_lipsync_with_audio_target_new)
#
# Args:
# video_file: Path to video source
# audio_file: Path to audio target (English only)
# session_id: Session identifier
# crop_size: Size of crop region (256 or 512)
# progress: Progress tracking object
#
# Returns:
# Tuple of (final_video, video_looped, face_cropped, lipsynced_face, lipsynced_full)
# """
# video_looped = None
# face_cropped = None
# lipsynced_face = None
# lipsynced_full = None
# final_video = None
# error_msg = None
#
# try:
# video_path, audio_path = validate_input(video_file, audio_file)
#
# output_dir = setup_output_dir(session_id)
#
# logger.info(f"Memory at start: {get_memory_usage()}")
#
# audio_duration = get_audio_duration(audio_path)
#
# progress(0.1, desc="🎬 Đang chuẩn bị video...")
# logger.info(f"Memory before video loop: {get_memory_usage()}")
#
# with timer("Looping/cropping video to match audio"):
# try:
# video_looped = loop_video_to_match_audio(
# video_path, audio_duration, output_dir
# )
# except Exception as e:
# error_msg = f"Video loop failed: {str(e)}"
# logger.error(error_msg)
# traceback.print_exc()
# return (
# final_video,
# video_looped,
# face_cropped,
# lipsynced_face,
# lipsynced_full,
# )
#
# gc.collect()
# logger.info(f"Memory after video loop: {get_memory_usage()}")
#
# progress(0.2, desc="👤 Đang phát hiện khuôn mặt...")
# with timer("Detecting face"):
# try:
# face_bbox = detect_face_region(video_looped, output_dir, crop_size)
# except FaceDetectionError as e:
# error_msg = str(e)
# logger.error(f"Face detection failed: {e}")
# return (
# final_video,
# video_looped,
# face_cropped,
# lipsynced_face,
# lipsynced_full,
# )
#
# gc.collect()
# logger.info(f"Memory after face detection: {get_memory_usage()}")
#
# actual_crop_size = crop_size * 2
# progress(
# 0.25, desc=f"👤 Đang tính lại crop {actual_crop_size}x{actual_crop_size}..."
# )
# with timer(
# f"Recalculating crop bbox for {actual_crop_size}x{actual_crop_size}"
# ):
# from video_processing import get_video_info
#
# try:
# video_info = get_video_info(video_looped)
# crop_bbox = calculate_safe_crop_size(
# face_bbox["face_bbox"],
# video_info["width"],
# video_info["height"],
# actual_crop_size,
# )
# except Exception as e:
# error_msg = f"Calculate crop bbox failed: {str(e)}"
# logger.error(error_msg)
# traceback.print_exc()
# return (
# final_video,
# video_looped,
# face_cropped,
# lipsynced_face,
# lipsynced_full,
# )
#
# progress(
# 0.3, desc=f"✂️ Đang crop video {actual_crop_size}x{actual_crop_size}..."
# )
# with timer(f"Cropping video to {actual_crop_size}x{actual_crop_size}"):
# try:
# face_cropped = crop_video_to_size(
# video_looped, crop_bbox, output_dir, actual_crop_size
# )
# except Exception as e:
# error_msg = f"Crop video failed: {str(e)}"
# logger.error(error_msg)
# traceback.print_exc()
# return (
# final_video,
# video_looped,
# face_cropped,
# lipsynced_face,
# lipsynced_full,
# )
#
# gc.collect()
# logger.info(f"Memory after crop: {get_memory_usage()}")
#
# progress(0.4, desc="🎵 Đang xử lý audio...")
# logger.info(f"Memory before audio prep: {get_memory_usage()}")
#
# with timer("Preparing target audio"):
# try:
# audio_16k, audio_upsampled = prepare_target_audio(
# audio_path, output_dir
# )
# except Exception as e:
# error_msg = f"Prepare audio failed: {str(e)}"
# logger.error(error_msg)
# traceback.print_exc()
# return (
# final_video,
# video_looped,
# face_cropped,
# lipsynced_face,
# lipsynced_full,
# )
#
# gc.collect()
# logger.info(f"Memory after audio prep: {get_memory_usage()}")
#
# progress(0.6, desc="👄 Đang lipsync...")
#
# video_info = get_video_info(face_cropped)
# logger.info(
# f"Starting lipsync: video={face_cropped}, audio_16k={audio_16k}, output={output_dir}"
# )
# logger.info(
# f"Video info: {video_info['width']}x{video_info['height']}, {video_info['fps']:.1f}fps, {video_info['duration']:.1f}s"
# )
# logger.info(f"Memory before lipsync: {get_memory_usage()}")
#
# with timer("Applying lipsync"):
# try:
# lipsynced_face, lipsynced_info = apply_lipsync_to_video(
# face_cropped, audio_16k, output_dir, crop_size
# )
# logger.info(
# f"Lipsynced video size: {lipsynced_info['width']}x{lipsynced_info['height']}"
# )
# except Exception as e:
# error_msg = f"Lipsync failed: {str(e)}"
# logger.error(f"Lipsync failed with error: {type(e).__name__}: {e}")
# logger.error(f"Memory after crash: {get_memory_usage()}")
# traceback.print_exc()
# return (
# final_video,
# video_looped,
# face_cropped,
# lipsynced_face,
# lipsynced_full,
# )
#
# gc.collect()
# logger.info(f"Memory after lipsync: {get_memory_usage()}")
#
# progress(0.8, desc="🔀 Đang ghép video...")
# with timer("Blending face into original"):
# try:
# lipsynced_full = blend_face_into_original(
# video_looped, lipsynced_face, crop_bbox, output_dir, lipsynced_info
# )
# except Exception as e:
# error_msg = f"Blend video failed: {str(e)}"
# logger.error(error_msg)
# traceback.print_exc()
# return (
# final_video,
# video_looped,
# face_cropped,
# lipsynced_face,
# lipsynced_full,
# )
#
# gc.collect()
# logger.info(f"Memory after blend: {get_memory_usage()}")
#
# progress(0.9, desc="🔗 Đang ghép audio...")
# try:
# audio_final = prepare_audio_for_youtube(audio_upsampled, output_dir)
# final_video = merge_audio_video(lipsynced_full, audio_final, output_dir)
# except Exception as e:
# error_msg = f"Merge audio failed: {str(e)}"
# logger.error(error_msg)
# traceback.print_exc()
# return (
# final_video,
# video_looped,
# face_cropped,
# lipsynced_face,
# lipsynced_full,
# )
#
# progress(1.0, desc="✅ Hoàn tất!")
# logger.info(f"Memory at end: {get_memory_usage()}")
#
# return final_video, video_looped, face_cropped, lipsynced_face, lipsynced_full
#
# except Exception as e:
# print(f"ERROR in process_lipsync_with_audio_target: {e}")
# traceback.print_exc()
# return final_video, video_looped, face_cropped, lipsynced_face, lipsynced_full
def process_lipsync_with_audio_target_new(
video_file,
audio_file,
session_id=None,
model_type="latentsync",
quality_level="Normal",
progress=gr.Progress(track_tqdm=True),
):
"""Workflow mới: Chuẩn hóa YouTube rồi lipsync
Steps:
1. Validate inputs
2. Chuẩn hóa video YouTube (loop/crop + re-encode)
3. Chuẩn hóa audio YouTube (AAC 320k)
4. Chuẩn bị audio 16k cho lipsync
5. Lipsync pipeline (tự detect/crop/lipsync/restore)
6. Merge audio YouTube + video lipsynced
Args:
video_file: Path to video source
audio_file: Path to audio target (English only)
session_id: Session identifier
model_type: Model type for lipsync ("latentsync" or "musetalk")
quality_level: Quality level ("Fast", "Normal", "Medium", "Best", "Super Best")
progress: Progress tracking object
Returns:
Tuple of (final_video, video_normalized, lipsynced_video)
"""
video_normalized = None
lipsynced_video = None
final_video = None
try:
video_path, audio_path = validate_input(video_file, audio_file)
output_dir = setup_output_dir(session_id)
# Mapping model_type to crop_size
if model_type == "LatentSync v1.6":
logger.info("Using LatentSync v1.6 with crop_size=512")
elif model_type == "MuseTalk v1.5":
logger.info("Using MuseTalk v1.5 with crop_size=256")
else:
raise ValueError(f"Unknown model_type: {model_type}")
logger.info(f"Memory at start: {get_memory_usage()}")
audio_duration = get_audio_duration(audio_path)
logger.info(f"Audio duration: {audio_duration:.2f}s")
progress(0.15, desc="🎬 Đang chuẩn hóa video YouTube...")
logger.info(f"Memory before video normalization: {get_memory_usage()}")
with timer("Normalizing video for YouTube"):
video_normalized = normalize_video_for_youtube(
video_path, audio_duration, output_dir
)
video_info = get_video_info(video_normalized)
logger.info(
f"Normalized video: {video_info['width']}x{video_info['height']}, "
f"{video_info['fps']:.1f}fps, {video_info['duration']:.1f}s"
)
gc.collect()
logger.info(f"Memory after video normalization: {get_memory_usage()}")
progress(0.25, desc="🎵 Đang chuẩn hóa audio YouTube...")
logger.info(f"Memory before audio normalization: {get_memory_usage()}")
with timer("Normalizing audio for YouTube"):
audio_youtube = prepare_audio_for_youtube_aac(audio_path, output_dir)
logger.info(f"Audio YouTube: {audio_youtube}")
gc.collect()
logger.info(f"Memory after audio normalization: {get_memory_usage()}")
progress(0.35, desc="🔊 Đang chuẩn bị audio cho lipsync...")
with timer("Preparing audio for lipsync"):
audio_16k = prepare_audio_for_lipsync(audio_path, output_dir)
logger.info(f"Audio 16k for lipsync: {audio_16k}")
gc.collect()
logger.info(f"Memory after audio preparation: {get_memory_usage()}")
progress(0.55, desc="👄 Đang lipsync...")
logger.info(
f"Starting lipsync: video={video_normalized}, audio_16k={audio_16k}, output={output_dir}"
)
logger.info(f"Memory before lipsync: {get_memory_usage()}")
with timer("Applying lipsync"):
lipsynced_video, lipsynced_info = apply_lipsync_to_video(
video_normalized, audio_16k, output_dir, model_type, quality_level
)
logger.info(
f"Lipsynced video: {lipsynced_video}, size: {lipsynced_info['width']}x{lipsynced_info['height']}"
)
gc.collect()
logger.info(f"Memory after lipsync: {get_memory_usage()}")
progress(0.85, desc="🔗 Đang ghép audio YouTube...")
logger.info(f"Memory before merge: {get_memory_usage()}")
with timer("Merging audio and video"):
final_video = merge_audio_video(lipsynced_video, audio_youtube, output_dir)
logger.info(f"Final video: {final_video}")
progress(1.0, desc="✅ Hoàn tất!")
logger.info(f"Memory at end: {get_memory_usage()}")
return final_video
except Exception as e:
logger.error(f"ERROR in process_lipsync_with_audio_target_new: {e}")
traceback.print_exc()
raise gr.Error(f"Lỗi xử lý: {str(e)}")
def lipsync_with_audio_target(
video_file,
audio_file,
session_id=None,
quality_level="Normal",
model_type="LatentSync v1.6",
progress=gr.Progress(track_tqdm=True),
):
"""Wrapper for Gradio: Lipsync video source with audio target (English only)
Returns:
Tuple of (final_video, video_normalized, lipsynced_video)
"""
if video_file is None:
raise gr.Error("Please upload a video source.")
if audio_file is None:
raise gr.Error("Please upload a target audio.")
return process_lipsync_with_audio_target_new(
video_file, audio_file, session_id, model_type, quality_level, progress
)