Spaces:
Runtime error
Runtime error
| """Video processing utilities for OutofLipSync""" | |
| import json | |
| import os | |
| import subprocess | |
| from fractions import Fraction | |
| from ffmpy import FFmpeg, FFRuntimeError | |
| from config import ( | |
| YOUTUBE_VIDEO_PRESET, | |
| YOUTUBE_VIDEO_CRF, | |
| YOUTUBE_VIDEO_PROFILE, | |
| YOUTUBE_VIDEO_LEVEL, | |
| YOUTUBE_VIDEO_PIX_FMT, | |
| YOUTUBE_AUDIO_CODEC, | |
| YOUTUBE_AUDIO_BITRATE, | |
| YOUTUBE_AUDIO_SAMPLE_RATE, | |
| ) | |
| # def crop_video_duration(video_path: str, duration: int, output_dir: str) -> str: | |
| # """Crop video to specified duration using FFmpeg (DEPRECATED - merged into normalize_video_for_youtube) | |
| # | |
| # Args: | |
| # video_path: Path to input video | |
| # duration: Duration in seconds | |
| # output_dir: Directory to save cropped video | |
| # | |
| # Returns: | |
| # Path to cropped video | |
| # """ | |
| # cropped_video_path = os.path.join(output_dir, "input_cropped.mp4") | |
| # ffmpeg = FFmpeg( | |
| # inputs={video_path: None}, | |
| # outputs={ | |
| # cropped_video_path: [ | |
| # "-t", | |
| # f"{duration}", | |
| # "-c", | |
| # "copy", | |
| # "-loglevel", | |
| # "error", | |
| # "-y", | |
| # ] | |
| # }, | |
| # ) | |
| # try: | |
| # ffmpeg.run() | |
| # except FFRuntimeError as e: | |
| # raise Exception(f"FFmpeg failed: {e}") | |
| # return cropped_video_path | |
| def loop_video(video_path: str, output_path: str, loop_count: int) -> str: | |
| """Loop video bαΊ±ng stream_loop vα»i copy codec | |
| Args: | |
| video_path: Path video gα»c | |
| output_path: Path output video ΔΓ£ loop | |
| loop_count: Sα» lαΊ§n loop | |
| Returns: | |
| Path video ΔΓ£ loop | |
| """ | |
| ffmpeg = FFmpeg( | |
| inputs={video_path: ["-stream_loop", f"{loop_count}"]}, | |
| outputs={ | |
| output_path: [ | |
| "-c", | |
| "copy", | |
| "-loglevel", | |
| "error", | |
| "-y", | |
| ] | |
| }, | |
| ) | |
| try: | |
| ffmpeg.run() | |
| except FFRuntimeError as e: | |
| raise Exception(f"FFmpeg failed to loop video: {e}") | |
| return output_path | |
| def encode_video_for_youtube( | |
| video_path: str, output_path: str, duration: float | None = None | |
| ) -> str: | |
| """Encode video theo tiΓͺu chuαΊ©n YouTube | |
| Args: | |
| video_path: Path video input | |
| output_path: Path output video | |
| duration: Duration crop (None = khΓ΄ng crop) | |
| Returns: | |
| Path video ΔΓ£ encode | |
| """ | |
| ffmpeg_args = [ | |
| "-c:v", | |
| "libx264", | |
| "-preset", | |
| YOUTUBE_VIDEO_PRESET, | |
| "-crf", | |
| str(YOUTUBE_VIDEO_CRF), | |
| "-profile:v", | |
| YOUTUBE_VIDEO_PROFILE, | |
| "-level", | |
| YOUTUBE_VIDEO_LEVEL, | |
| "-pix_fmt", | |
| YOUTUBE_VIDEO_PIX_FMT, | |
| "-c:a", | |
| "copy", | |
| "-movflags", | |
| "+faststart", | |
| "-loglevel", | |
| "error", | |
| "-y", | |
| ] | |
| if duration is not None: | |
| ffmpeg_args = ["-t", f"{duration}"] + ffmpeg_args | |
| ffmpeg = FFmpeg( | |
| inputs={video_path: None}, | |
| outputs={output_path: ffmpeg_args}, | |
| ) | |
| try: | |
| ffmpeg.run() | |
| except FFRuntimeError as e: | |
| raise Exception(f"FFmpeg failed to encode video: {e}") | |
| return output_path | |
| def normalize_video_for_youtube( | |
| video_path: str, audio_duration: float, output_dir: str | |
| ) -> str: | |
| """ChuαΊ©n hΓ³a video theo tiΓͺu chuαΊ©n YouTube | |
| - Loop nαΊΏu ngαΊ―n hΖ‘n audio, crop nαΊΏu dΓ i hΖ‘n | |
| - Re-encode: libx264, preset slow, CRF 18, profile high, level 4.2, yuv420p | |
| Args: | |
| video_path: Path video gα»c | |
| audio_duration: Duration audio (seconds) | |
| output_dir: Output directory | |
| Returns: | |
| Path video ΔΓ£ chuαΊ©n hΓ³a | |
| """ | |
| video_info = get_video_info(video_path) | |
| video_duration = video_info["duration"] | |
| output_path = os.path.join(output_dir, "video_normalized.mp4") | |
| if video_duration >= audio_duration: | |
| encode_video_for_youtube(video_path, output_path, audio_duration) | |
| else: | |
| loop_count = int(audio_duration // video_duration) + 1 | |
| temp_looped = os.path.join(output_dir, "video_looped_temp.mp4") | |
| loop_video(video_path, temp_looped, loop_count) | |
| encode_video_for_youtube(temp_looped, output_path, audio_duration) | |
| os.remove(temp_looped) | |
| return output_path | |
| def merge_audio_video(video_path: str, audio_path: str, output_dir: str) -> str: | |
| """Merge video with audio track using FFmpeg with YouTube-optimized encoding | |
| Args: | |
| video_path: Path to input video | |
| audio_path: Path to audio track | |
| output_dir: Directory to save merged video | |
| Returns: | |
| Path to merged video | |
| """ | |
| video_out = os.path.join(output_dir, "output_final.mp4") | |
| ffmpeg = FFmpeg( | |
| inputs={video_path: None, audio_path: None}, | |
| outputs={ | |
| video_out: [ | |
| "-c:v", | |
| "libx264", | |
| "-preset", | |
| YOUTUBE_VIDEO_PRESET, | |
| "-crf", | |
| str(YOUTUBE_VIDEO_CRF), | |
| "-profile:v", | |
| YOUTUBE_VIDEO_PROFILE, | |
| "-level", | |
| YOUTUBE_VIDEO_LEVEL, | |
| "-pix_fmt", | |
| YOUTUBE_VIDEO_PIX_FMT, | |
| "-map", | |
| "0:v:0", | |
| "-map", | |
| "1:a:0", | |
| "-c:a", | |
| YOUTUBE_AUDIO_CODEC, | |
| "-b:a", | |
| YOUTUBE_AUDIO_BITRATE, | |
| "-ar", | |
| str(YOUTUBE_AUDIO_SAMPLE_RATE), | |
| "-shortest", | |
| "-movflags", | |
| "+faststart", | |
| "-loglevel", | |
| "error", | |
| "-y", | |
| ] | |
| }, | |
| ) | |
| try: | |
| ffmpeg.run() | |
| except FFRuntimeError as e: | |
| raise Exception(f"FFmpeg failed: {e}") | |
| return video_out | |
| def get_video_info(video_path: str) -> dict: | |
| """Get video information: resolution, duration, fps | |
| Args: | |
| video_path: Path to video | |
| Returns: | |
| Dict with keys: width, height, duration, fps | |
| """ | |
| cmd = [ | |
| "ffprobe", | |
| "-v", | |
| "error", | |
| "-select_streams", | |
| "v:0", | |
| "-show_entries", | |
| "stream=width,height,r_frame_rate", | |
| "-show_entries", | |
| "format=duration", | |
| "-of", | |
| "json", | |
| video_path, | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True, check=True) | |
| data = json.loads(result.stdout) | |
| width = data["streams"][0]["width"] | |
| height = data["streams"][0]["height"] | |
| fps = float(Fraction(data["streams"][0]["r_frame_rate"])) | |
| duration = float(data["format"]["duration"]) | |
| return {"width": width, "height": height, "fps": fps, "duration": duration} | |
| def loop_video_to_match_audio( | |
| video_path: str, audio_duration: float, output_dir: str | |
| ) -> str: | |
| """Loop video to match audio target duration | |
| Args: | |
| video_path: Path to video source | |
| audio_duration: Audio target duration (seconds) | |
| output_dir: Output directory | |
| Returns: | |
| Path to looped/cropped video | |
| """ | |
| video_info = get_video_info(video_path) | |
| video_duration = video_info["duration"] | |
| output_path = os.path.join(output_dir, "video_looped.mp4") | |
| if video_duration >= audio_duration: | |
| ffmpeg = FFmpeg( | |
| inputs={video_path: None}, | |
| outputs={ | |
| output_path: [ | |
| "-t", | |
| f"{audio_duration}", | |
| "-c", | |
| "copy", | |
| "-loglevel", | |
| "error", | |
| "-y", | |
| ] | |
| }, | |
| ) | |
| else: | |
| loop_count = int(audio_duration // video_duration) + 1 | |
| ffmpeg = FFmpeg( | |
| inputs={video_path: ["-stream_loop", f"{loop_count}"]}, | |
| outputs={ | |
| output_path: [ | |
| "-t", | |
| f"{audio_duration}", | |
| "-c", | |
| "copy", | |
| "-loglevel", | |
| "error", | |
| "-y", | |
| ] | |
| }, | |
| ) | |
| try: | |
| ffmpeg.run() | |
| except FFRuntimeError as e: | |
| raise Exception(f"FFmpeg failed: {e}") | |
| return output_path | |