lipsync-docker / processing.py
naicoi's picture
update
ab7144c
raw
history blame
16.4 kB
"""Main processing workflows for OutofLipSync"""
import gc
import logging
import os
import traceback
import spaces
import gradio as gr
import psutil
import torch
from audio_processing import (
get_audio_duration,
prepare_audio_for_lipsync,
prepare_audio_for_youtube_aac,
prepare_audio_for_youtube,
)
from config import PROCESSED_RESULTS_DIR
from lipsync_processing import apply_lipsync_to_video, get_video_info
from time_util import timer
from utils import setup_output_dir
from video_processing import (
normalize_video_for_youtube,
merge_audio_video,
)
logger = logging.getLogger(__name__)
os.environ["PROCESSED_RESULTS"] = f"{os.getcwd()}/{PROCESSED_RESULTS_DIR}"
def get_memory_usage():
"""Get current RAM and GPU memory usage"""
ram = psutil.virtual_memory()
ram_used_gb = ram.used / (1024**3)
ram_percent = ram.percent
if torch.cuda.is_available():
gpu_used_gb = torch.cuda.memory_allocated(0) / (1024**3)
gpu_total_gb = torch.cuda.get_device_properties(0).total_memory / (1024**3)
gpu_percent = (gpu_used_gb / gpu_total_gb) * 100
gpu_info = (
f" | GPU: {gpu_used_gb:.2f}GB / {gpu_total_gb:.2f}GB ({gpu_percent:.1f}%)"
)
else:
gpu_info = ""
return f"RAM: {ram_used_gb:.2f}GB / {ram.total / (1024**3):.2f}GB ({ram_percent:.1f}%){gpu_info}"
def validate_input(video_file, audio_file):
"""Validate input files
Args:
video_file: Video input
audio_file: Audio input
Returns:
Tuple of (video_path, audio_path)
"""
if video_file is None:
raise gr.Error("Please upload a video source.")
if audio_file is None:
raise gr.Error("Please upload a target audio.")
if isinstance(video_file, dict):
video_path = video_file.get("name") or video_file.get("path")
else:
video_path = video_file
if isinstance(audio_file, dict):
audio_path = audio_file.get("name") or audio_file.get("path")
else:
audio_path = audio_file
if video_path is None or not os.path.exists(video_path):
raise gr.Error("Could not read uploaded video file.")
if audio_path is None or not os.path.exists(audio_path):
raise gr.Error("Could not read uploaded audio file.")
return video_path, audio_path
# def process_lipsync_with_audio_target(
# video_file,
# audio_file,
# session_id=None,
# crop_size=256,
# progress=gr.Progress(track_tqdm=True),
# ):
# """Lipsync video source với audio target (DEPRECATED - use process_lipsync_with_audio_target_new)
#
# Args:
# video_file: Path to video source
# audio_file: Path to audio target (English only)
# session_id: Session identifier
# crop_size: Size of crop region (256 or 512)
# progress: Progress tracking object
#
# Returns:
# Tuple of (final_video, video_looped, face_cropped, lipsynced_face, lipsynced_full)
# """
# video_looped = None
# face_cropped = None
# lipsynced_face = None
# lipsynced_full = None
# final_video = None
# error_msg = None
#
# try:
# video_path, audio_path = validate_input(video_file, audio_file)
#
# output_dir = setup_output_dir(session_id)
#
# logger.info(f"Memory at start: {get_memory_usage()}")
#
# audio_duration = get_audio_duration(audio_path)
#
# progress(0.1, desc="🎬 Đang chuẩn bị video...")
# logger.info(f"Memory before video loop: {get_memory_usage()}")
#
# with timer("Looping/cropping video to match audio"):
# try:
# video_looped = loop_video_to_match_audio(
# video_path, audio_duration, output_dir
# )
# except Exception as e:
# error_msg = f"Video loop failed: {str(e)}"
# logger.error(error_msg)
# traceback.print_exc()
# return (
# final_video,
# video_looped,
# face_cropped,
# lipsynced_face,
# lipsynced_full,
# )
#
# gc.collect()
# logger.info(f"Memory after video loop: {get_memory_usage()}")
#
# progress(0.2, desc="👤 Đang phát hiện khuôn mặt...")
# with timer("Detecting face"):
# try:
# face_bbox = detect_face_region(video_looped, output_dir, crop_size)
# except FaceDetectionError as e:
# error_msg = str(e)
# logger.error(f"Face detection failed: {e}")
# return (
# final_video,
# video_looped,
# face_cropped,
# lipsynced_face,
# lipsynced_full,
# )
#
# gc.collect()
# logger.info(f"Memory after face detection: {get_memory_usage()}")
#
# actual_crop_size = crop_size * 2
# progress(
# 0.25, desc=f"👤 Đang tính lại crop {actual_crop_size}x{actual_crop_size}..."
# )
# with timer(
# f"Recalculating crop bbox for {actual_crop_size}x{actual_crop_size}"
# ):
# from video_processing import get_video_info
#
# try:
# video_info = get_video_info(video_looped)
# crop_bbox = calculate_safe_crop_size(
# face_bbox["face_bbox"],
# video_info["width"],
# video_info["height"],
# actual_crop_size,
# )
# except Exception as e:
# error_msg = f"Calculate crop bbox failed: {str(e)}"
# logger.error(error_msg)
# traceback.print_exc()
# return (
# final_video,
# video_looped,
# face_cropped,
# lipsynced_face,
# lipsynced_full,
# )
#
# progress(
# 0.3, desc=f"✂️ Đang crop video {actual_crop_size}x{actual_crop_size}..."
# )
# with timer(f"Cropping video to {actual_crop_size}x{actual_crop_size}"):
# try:
# face_cropped = crop_video_to_size(
# video_looped, crop_bbox, output_dir, actual_crop_size
# )
# except Exception as e:
# error_msg = f"Crop video failed: {str(e)}"
# logger.error(error_msg)
# traceback.print_exc()
# return (
# final_video,
# video_looped,
# face_cropped,
# lipsynced_face,
# lipsynced_full,
# )
#
# gc.collect()
# logger.info(f"Memory after crop: {get_memory_usage()}")
#
# progress(0.4, desc="🎵 Đang xử lý audio...")
# logger.info(f"Memory before audio prep: {get_memory_usage()}")
#
# with timer("Preparing target audio"):
# try:
# audio_16k, audio_upsampled = prepare_target_audio(
# audio_path, output_dir
# )
# except Exception as e:
# error_msg = f"Prepare audio failed: {str(e)}"
# logger.error(error_msg)
# traceback.print_exc()
# return (
# final_video,
# video_looped,
# face_cropped,
# lipsynced_face,
# lipsynced_full,
# )
#
# gc.collect()
# logger.info(f"Memory after audio prep: {get_memory_usage()}")
#
# progress(0.6, desc="👄 Đang lipsync...")
#
# video_info = get_video_info(face_cropped)
# logger.info(
# f"Starting lipsync: video={face_cropped}, audio_16k={audio_16k}, output={output_dir}"
# )
# logger.info(
# f"Video info: {video_info['width']}x{video_info['height']}, {video_info['fps']:.1f}fps, {video_info['duration']:.1f}s"
# )
# logger.info(f"Memory before lipsync: {get_memory_usage()}")
#
# with timer("Applying lipsync"):
# try:
# lipsynced_face, lipsynced_info = apply_lipsync_to_video(
# face_cropped, audio_16k, output_dir, crop_size
# )
# logger.info(
# f"Lipsynced video size: {lipsynced_info['width']}x{lipsynced_info['height']}"
# )
# except Exception as e:
# error_msg = f"Lipsync failed: {str(e)}"
# logger.error(f"Lipsync failed with error: {type(e).__name__}: {e}")
# logger.error(f"Memory after crash: {get_memory_usage()}")
# traceback.print_exc()
# return (
# final_video,
# video_looped,
# face_cropped,
# lipsynced_face,
# lipsynced_full,
# )
#
# gc.collect()
# logger.info(f"Memory after lipsync: {get_memory_usage()}")
#
# progress(0.8, desc="🔀 Đang ghép video...")
# with timer("Blending face into original"):
# try:
# lipsynced_full = blend_face_into_original(
# video_looped, lipsynced_face, crop_bbox, output_dir, lipsynced_info
# )
# except Exception as e:
# error_msg = f"Blend video failed: {str(e)}"
# logger.error(error_msg)
# traceback.print_exc()
# return (
# final_video,
# video_looped,
# face_cropped,
# lipsynced_face,
# lipsynced_full,
# )
#
# gc.collect()
# logger.info(f"Memory after blend: {get_memory_usage()}")
#
# progress(0.9, desc="🔗 Đang ghép audio...")
# try:
# audio_final = prepare_audio_for_youtube(audio_upsampled, output_dir)
# final_video = merge_audio_video(lipsynced_full, audio_final, output_dir)
# except Exception as e:
# error_msg = f"Merge audio failed: {str(e)}"
# logger.error(error_msg)
# traceback.print_exc()
# return (
# final_video,
# video_looped,
# face_cropped,
# lipsynced_face,
# lipsynced_full,
# )
#
# progress(1.0, desc="✅ Hoàn tất!")
# logger.info(f"Memory at end: {get_memory_usage()}")
#
# return final_video, video_looped, face_cropped, lipsynced_face, lipsynced_full
#
# except Exception as e:
# print(f"ERROR in process_lipsync_with_audio_target: {e}")
# traceback.print_exc()
# return final_video, video_looped, face_cropped, lipsynced_face, lipsynced_full
def process_lipsync_with_audio_target_new(
video_file,
audio_file,
session_id=None,
model_type="latentsync",
quality_level="Normal",
progress=gr.Progress(track_tqdm=True),
):
"""Workflow mới: Chuẩn hóa YouTube rồi lipsync
Steps:
1. Validate inputs
2. Chuẩn hóa video YouTube (loop/crop + re-encode)
3. Chuẩn hóa audio YouTube (AAC 320k)
4. Chuẩn bị audio 16k cho lipsync
5. Lipsync pipeline (tự detect/crop/lipsync/restore)
6. Merge audio YouTube + video lipsynced
Args:
video_file: Path to video source
audio_file: Path to audio target (English only)
session_id: Session identifier
model_type: Model type for lipsync ("latentsync" or "musetalk")
quality_level: Quality level ("Fast", "Normal", "Medium", "Best", "Super Best")
progress: Progress tracking object
Returns:
Tuple of (final_video, video_normalized, lipsynced_video)
"""
video_normalized = None
lipsynced_video = None
final_video = None
try:
video_path, audio_path = validate_input(video_file, audio_file)
output_dir = setup_output_dir(session_id)
# Mapping model_type to crop_size
if model_type == "LatentSync v1.6":
logger.info("Using LatentSync v1.6 with crop_size=512")
elif model_type == "MuseTalk v1.5":
logger.info("Using MuseTalk v1.5 with crop_size=256")
else:
raise ValueError(f"Unknown model_type: {model_type}")
logger.info(f"Memory at start: {get_memory_usage()}")
audio_duration = get_audio_duration(audio_path)
logger.info(f"Audio duration: {audio_duration:.2f}s")
progress(0.15, desc="🎬 Đang chuẩn hóa video YouTube...")
logger.info(f"Memory before video normalization: {get_memory_usage()}")
with timer("Normalizing video for YouTube"):
video_normalized = normalize_video_for_youtube(
video_path, audio_duration, output_dir
)
video_info = get_video_info(video_normalized)
logger.info(
f"Normalized video: {video_info['width']}x{video_info['height']}, "
f"{video_info['fps']:.1f}fps, {video_info['duration']:.1f}s"
)
gc.collect()
logger.info(f"Memory after video normalization: {get_memory_usage()}")
progress(0.25, desc="🎵 Đang chuẩn hóa audio YouTube...")
logger.info(f"Memory before audio normalization: {get_memory_usage()}")
with timer("Normalizing audio for YouTube"):
audio_youtube = prepare_audio_for_youtube_aac(audio_path, output_dir)
logger.info(f"Audio YouTube: {audio_youtube}")
gc.collect()
logger.info(f"Memory after audio normalization: {get_memory_usage()}")
progress(0.35, desc="🔊 Đang chuẩn bị audio cho lipsync...")
with timer("Preparing audio for lipsync"):
audio_16k = prepare_audio_for_lipsync(audio_path, output_dir)
logger.info(f"Audio 16k for lipsync: {audio_16k}")
gc.collect()
logger.info(f"Memory after audio preparation: {get_memory_usage()}")
progress(0.55, desc="👄 Đang lipsync...")
logger.info(
f"Starting lipsync: video={video_normalized}, audio_16k={audio_16k}, output={output_dir}"
)
logger.info(f"Memory before lipsync: {get_memory_usage()}")
with timer("Applying lipsync"):
lipsynced_video, lipsynced_info = apply_lipsync_to_video(
video_normalized, audio_16k, output_dir, model_type, quality_level
)
logger.info(
f"Lipsynced video: {lipsynced_video}, size: {lipsynced_info['width']}x{lipsynced_info['height']}"
)
gc.collect()
logger.info(f"Memory after lipsync: {get_memory_usage()}")
progress(0.85, desc="🔗 Đang ghép audio YouTube...")
logger.info(f"Memory before merge: {get_memory_usage()}")
with timer("Merging audio and video"):
final_video = merge_audio_video(lipsynced_video, audio_youtube, output_dir)
logger.info(f"Final video: {final_video}")
progress(1.0, desc="✅ Hoàn tất!")
logger.info(f"Memory at end: {get_memory_usage()}")
return final_video
except Exception as e:
logger.error(f"ERROR in process_lipsync_with_audio_target_new: {e}")
traceback.print_exc()
raise gr.Error(f"Lỗi xử lý: {str(e)}")
def lipsync_with_audio_target(
video_file,
audio_file,
session_id=None,
quality_level="Normal",
model_type="LatentSync v1.6",
progress=gr.Progress(track_tqdm=True),
):
"""Wrapper for Gradio: Lipsync video source with audio target (English only)
Returns:
Tuple of (final_video, video_normalized, lipsynced_video)
"""
if video_file is None:
raise gr.Error("Please upload a video source.")
if audio_file is None:
raise gr.Error("Please upload a target audio.")
return process_lipsync_with_audio_target_new(
video_file, audio_file, session_id, model_type, quality_level, progress
)