lipsync-docker / video_processing.py
naicoi's picture
full-lipsync-youtube (#9)
64a2ea3 verified
"""Video processing utilities for OutofLipSync"""
import json
import os
import subprocess
from fractions import Fraction
from ffmpy import FFmpeg, FFRuntimeError
from config import (
YOUTUBE_VIDEO_PRESET,
YOUTUBE_VIDEO_CRF,
YOUTUBE_VIDEO_PROFILE,
YOUTUBE_VIDEO_LEVEL,
YOUTUBE_VIDEO_PIX_FMT,
YOUTUBE_AUDIO_CODEC,
YOUTUBE_AUDIO_BITRATE,
YOUTUBE_AUDIO_SAMPLE_RATE,
)
# def crop_video_duration(video_path: str, duration: int, output_dir: str) -> str:
# """Crop video to specified duration using FFmpeg (DEPRECATED - merged into normalize_video_for_youtube)
#
# Args:
# video_path: Path to input video
# duration: Duration in seconds
# output_dir: Directory to save cropped video
#
# Returns:
# Path to cropped video
# """
# cropped_video_path = os.path.join(output_dir, "input_cropped.mp4")
# ffmpeg = FFmpeg(
# inputs={video_path: None},
# outputs={
# cropped_video_path: [
# "-t",
# f"{duration}",
# "-c",
# "copy",
# "-loglevel",
# "error",
# "-y",
# ]
# },
# )
# try:
# ffmpeg.run()
# except FFRuntimeError as e:
# raise Exception(f"FFmpeg failed: {e}")
# return cropped_video_path
def loop_video(video_path: str, output_path: str, loop_count: int) -> str:
"""Loop video bαΊ±ng stream_loop vα»›i copy codec
Args:
video_path: Path video gα»‘c
output_path: Path output video Δ‘Γ£ loop
loop_count: Sα»‘ lαΊ§n loop
Returns:
Path video Δ‘Γ£ loop
"""
ffmpeg = FFmpeg(
inputs={video_path: ["-stream_loop", f"{loop_count}"]},
outputs={
output_path: [
"-c",
"copy",
"-loglevel",
"error",
"-y",
]
},
)
try:
ffmpeg.run()
except FFRuntimeError as e:
raise Exception(f"FFmpeg failed to loop video: {e}")
return output_path
def encode_video_for_youtube(
video_path: str, output_path: str, duration: float | None = None
) -> str:
"""Encode video theo tiΓͺu chuαΊ©n YouTube
Args:
video_path: Path video input
output_path: Path output video
duration: Duration crop (None = khΓ΄ng crop)
Returns:
Path video Δ‘Γ£ encode
"""
ffmpeg_args = [
"-c:v",
"libx264",
"-preset",
YOUTUBE_VIDEO_PRESET,
"-crf",
str(YOUTUBE_VIDEO_CRF),
"-profile:v",
YOUTUBE_VIDEO_PROFILE,
"-level",
YOUTUBE_VIDEO_LEVEL,
"-pix_fmt",
YOUTUBE_VIDEO_PIX_FMT,
"-c:a",
"copy",
"-movflags",
"+faststart",
"-loglevel",
"error",
"-y",
]
if duration is not None:
ffmpeg_args = ["-t", f"{duration}"] + ffmpeg_args
ffmpeg = FFmpeg(
inputs={video_path: None},
outputs={output_path: ffmpeg_args},
)
try:
ffmpeg.run()
except FFRuntimeError as e:
raise Exception(f"FFmpeg failed to encode video: {e}")
return output_path
def normalize_video_for_youtube(
video_path: str, audio_duration: float, output_dir: str
) -> str:
"""ChuαΊ©n hΓ³a video theo tiΓͺu chuαΊ©n YouTube
- Loop nαΊΏu ngαΊ―n hΖ‘n audio, crop nαΊΏu dΓ i hΖ‘n
- Re-encode: libx264, preset slow, CRF 18, profile high, level 4.2, yuv420p
Args:
video_path: Path video gα»‘c
audio_duration: Duration audio (seconds)
output_dir: Output directory
Returns:
Path video Δ‘Γ£ chuαΊ©n hΓ³a
"""
video_info = get_video_info(video_path)
video_duration = video_info["duration"]
output_path = os.path.join(output_dir, "video_normalized.mp4")
if video_duration >= audio_duration:
encode_video_for_youtube(video_path, output_path, audio_duration)
else:
loop_count = int(audio_duration // video_duration) + 1
temp_looped = os.path.join(output_dir, "video_looped_temp.mp4")
loop_video(video_path, temp_looped, loop_count)
encode_video_for_youtube(temp_looped, output_path, audio_duration)
os.remove(temp_looped)
return output_path
def merge_audio_video(video_path: str, audio_path: str, output_dir: str) -> str:
"""Merge video with audio track using FFmpeg with YouTube-optimized encoding
Args:
video_path: Path to input video
audio_path: Path to audio track
output_dir: Directory to save merged video
Returns:
Path to merged video
"""
video_out = os.path.join(output_dir, "output_final.mp4")
ffmpeg = FFmpeg(
inputs={video_path: None, audio_path: None},
outputs={
video_out: [
"-c:v",
"libx264",
"-preset",
YOUTUBE_VIDEO_PRESET,
"-crf",
str(YOUTUBE_VIDEO_CRF),
"-profile:v",
YOUTUBE_VIDEO_PROFILE,
"-level",
YOUTUBE_VIDEO_LEVEL,
"-pix_fmt",
YOUTUBE_VIDEO_PIX_FMT,
"-map",
"0:v:0",
"-map",
"1:a:0",
"-c:a",
YOUTUBE_AUDIO_CODEC,
"-b:a",
YOUTUBE_AUDIO_BITRATE,
"-ar",
str(YOUTUBE_AUDIO_SAMPLE_RATE),
"-shortest",
"-movflags",
"+faststart",
"-loglevel",
"error",
"-y",
]
},
)
try:
ffmpeg.run()
except FFRuntimeError as e:
raise Exception(f"FFmpeg failed: {e}")
return video_out
def get_video_info(video_path: str) -> dict:
"""Get video information: resolution, duration, fps
Args:
video_path: Path to video
Returns:
Dict with keys: width, height, duration, fps
"""
cmd = [
"ffprobe",
"-v",
"error",
"-select_streams",
"v:0",
"-show_entries",
"stream=width,height,r_frame_rate",
"-show_entries",
"format=duration",
"-of",
"json",
video_path,
]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
data = json.loads(result.stdout)
width = data["streams"][0]["width"]
height = data["streams"][0]["height"]
fps = float(Fraction(data["streams"][0]["r_frame_rate"]))
duration = float(data["format"]["duration"])
return {"width": width, "height": height, "fps": fps, "duration": duration}
def loop_video_to_match_audio(
video_path: str, audio_duration: float, output_dir: str
) -> str:
"""Loop video to match audio target duration
Args:
video_path: Path to video source
audio_duration: Audio target duration (seconds)
output_dir: Output directory
Returns:
Path to looped/cropped video
"""
video_info = get_video_info(video_path)
video_duration = video_info["duration"]
output_path = os.path.join(output_dir, "video_looped.mp4")
if video_duration >= audio_duration:
ffmpeg = FFmpeg(
inputs={video_path: None},
outputs={
output_path: [
"-t",
f"{audio_duration}",
"-c",
"copy",
"-loglevel",
"error",
"-y",
]
},
)
else:
loop_count = int(audio_duration // video_duration) + 1
ffmpeg = FFmpeg(
inputs={video_path: ["-stream_loop", f"{loop_count}"]},
outputs={
output_path: [
"-t",
f"{audio_duration}",
"-c",
"copy",
"-loglevel",
"error",
"-y",
]
},
)
try:
ffmpeg.run()
except FFRuntimeError as e:
raise Exception(f"FFmpeg failed: {e}")
return output_path