File size: 3,061 Bytes
b3c65ae
 
 
cae75e9
be4847d
8532e19
b3c65ae
 
be4847d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b3c65ae
2295db3
 
 
 
d72ce9c
be4847d
b3c65ae
 
 
 
 
 
2295db3
d72ce9c
b3c65ae
 
be4847d
b3c65ae
cae75e9
 
2295db3
 
 
 
d72ce9c
 
 
 
2295db3
40163ad
2295db3
be4847d
 
 
 
 
 
cae75e9
 
 
 
 
 
be4847d
 
2295db3
be4847d
cae75e9
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
"""Lipsync processing wrapper for OutofLipSync"""

import os
import traceback
from ffmpy import FFmpeg
from lipsync import apply_lipsync


def get_video_info(video_path: str) -> dict:
    """Get video information: resolution, duration, fps

    Args:
        video_path: Path to video

    Returns:
        Dict with keys: width, height, duration, fps
    """
    import json
    import subprocess
    from fractions import Fraction

    cmd = [
        "ffprobe",
        "-v",
        "error",
        "-select_streams",
        "v:0",
        "-show_entries",
        "stream=width,height,r_frame_rate",
        "-show_entries",
        "format=duration",
        "-of",
        "json",
        video_path,
    ]
    result = subprocess.run(cmd, capture_output=True, text=True, check=True)
    data = json.loads(result.stdout)

    width = data["streams"][0]["width"]
    height = data["streams"][0]["height"]
    fps = float(Fraction(data["streams"][0]["r_frame_rate"]))
    duration = float(data["format"]["duration"])

    return {"width": width, "height": height, "fps": fps, "duration": duration}


def apply_lipsync_to_video(
    video_path: str,
    audio_16k_path: str,
    output_dir: str,
    model_type: str = "LatentSync v1.6",
    quality_level: str = "Normal",
) -> tuple:
    """Apply lipsync to video using clean 16k audio

    Args:
        video_path: Path to input video
        audio_16k_path: Path to 16kHz audio
        output_dir: Directory to save output
        model_type: Model type for lipsync ("LatentSync v1.6" or "MuseTalk v1.5")
        quality_level: Quality level ("Fast", "Normal", "Medium", "Best", "Super Best")

    Returns:
        Tuple of (lipsynced_video_path, video_info)
    """
    try:
        lipsynced_video = os.path.join(output_dir, "output_with_lipsync.mp4")

        if model_type == "LatentSync v1.6":
            crop_size = 512
            print(
                f"Using LatentSync: video={video_path}, audio={audio_16k_path}, crop_size={crop_size}, quality={quality_level}"
            )
            apply_lipsync(
                video_path, audio_16k_path, lipsynced_video, crop_size, quality_level
            )
        else:
            raise ValueError(f"Unknown model_type: {model_type}")

        video_info = get_video_info(lipsynced_video)
        print(
            f"Lipsynced video: {lipsynced_video}, size: {video_info['width']}x{video_info['height']}"
        )
        return lipsynced_video, video_info
    except RuntimeError as e:
        if "out of memory" in str(e).lower():
            print("GPU OOM Error in lipsync processing!")
            raise RuntimeError(
                "GPU out of memory during lipsync. Try a shorter video or lower resolution."
            )
        if "face not detected" in str(e).lower():
            raise RuntimeError(
                "Face detection failed in lipsync pipeline. Please upload a video with a clear, visible face."
            )
        print(f"Runtime Error in lipsync processing: {e}")
        traceback.print_exc()
        raise