|
|
""" |
|
|
Production video rendering engine with improved captions and natural speed |
|
|
""" |
|
|
|
|
|
|
|
|
import PIL.Image |
|
|
|
|
|
if not hasattr(PIL.Image, "ANTIALIAS"): |
|
|
PIL.Image.ANTIALIAS = PIL.Image.LANCZOS |
|
|
|
|
|
import os |
|
|
import tempfile |
|
|
import uuid |
|
|
from typing import List, Dict, Optional |
|
|
from pathlib import Path |
|
|
from moviepy.editor import ( |
|
|
VideoFileClip, |
|
|
AudioFileClip, |
|
|
CompositeVideoClip, |
|
|
concatenate_videoclips, |
|
|
TextClip, |
|
|
CompositeAudioClip, |
|
|
vfx |
|
|
) |
|
|
import textwrap |
|
|
from src.logger_config import logger |
|
|
from src.utils import reverse_clip as utils_reverse_clip, get_temp_dir |
|
|
import src.utils as utils |
|
|
import time |
|
|
from video_editor.text_clip import create as create_text_clip, group_words_by_time_and_width, caption_style_1, caption_style_2, caption_style_3, caption_style_4, caption_style_on_screen_text, caption_style_on_screen_text_top |
|
|
import subprocess |
|
|
import asyncio |
|
|
import random |
|
|
from moviepy.video.fx import crop |
|
|
from src.config import get_config_value, set_config_value |
|
|
import numpy as np |
|
|
|
|
|
ALLOWED_BG_MUSIC_VOLUME = 0.08 |
|
|
REVERSE_THRESHOLD = 0.5 |
|
|
HOOK_VIDEO_DURATION = 1.5 |
|
|
HOOK_START_ORIGINAL_CLIP_USED = 0 |
|
|
|
|
|
class VideoRenderer: |
|
|
def __init__(self): |
|
|
self.cap_method, self.max_words_per_group = self._get_caption_style() |
|
|
self.temp_dir = get_temp_dir(prefix="video_renderer_").resolve() |
|
|
logger.debug(f"Initialized VideoRenderer with temp dir: {self.temp_dir}") |
|
|
|
|
|
async def render_video_without_audio(self, video_config: Optional[Dict] = None) -> tuple[str, float]: |
|
|
""" |
|
|
Render video composition WITHOUT audio and WITHOUT slow-motion |
|
|
""" |
|
|
try: |
|
|
assets = get_config_value("visual_assets") |
|
|
logger.debug("π¬ Starting video rendering (NO slow-motion)") |
|
|
if not self._validate_assets_for_video_only(): |
|
|
raise ValueError("Invalid assets provided for video rendering") |
|
|
|
|
|
|
|
|
video_clips = await self._prepare_video_clips_natural_speed() |
|
|
|
|
|
|
|
|
|
|
|
final_video = concatenate_videoclips(video_clips, method="compose") |
|
|
final_video = final_video.without_audio() |
|
|
|
|
|
|
|
|
output_path = await self._render_video_only(final_video) |
|
|
video_duration = final_video.duration |
|
|
|
|
|
|
|
|
self._cleanup_temp_files(video_clips + [final_video]) |
|
|
|
|
|
logger.debug(f"β
Video (no audio) rendered: {output_path}, duration: {video_duration:.2f}s") |
|
|
return output_path, video_duration |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"β Video rendering (no audio) failed: {e}") |
|
|
raise |
|
|
|
|
|
async def _prepare_video_clips_natural_speed(self) -> List[VideoFileClip]: |
|
|
"""Load and prepare all video clips - NO speed adjustments""" |
|
|
clips = [] |
|
|
global HOOK_START_ORIGINAL_CLIP_USED |
|
|
global HOOK_VIDEO_DURATION |
|
|
|
|
|
try: |
|
|
assets = get_config_value("visual_assets") |
|
|
|
|
|
|
|
|
runway_as_second_ai_video = None |
|
|
hook_start = None |
|
|
hook_end = None |
|
|
|
|
|
if assets.get("hook_video") and assets["hook_video"].get("local_path"): |
|
|
HOOK_VIDEO_DURATION = 1.5 |
|
|
if get_config_value("use_veo", False): |
|
|
HOOK_VIDEO_DURATION = 2 |
|
|
hook_clip = VideoFileClip(assets["hook_video"]["veo_video_data"]["local_path"]) |
|
|
runway_as_second_ai_video = VideoFileClip(assets["hook_video"]["local_path"]).subclip(0, 2) |
|
|
else: |
|
|
hook_clip = VideoFileClip(assets["hook_video"]["local_path"]) |
|
|
|
|
|
hook_duration = hook_clip.duration |
|
|
hook_clip = hook_clip.without_audio() |
|
|
|
|
|
logger.debug(f"π Creating seamless loop from {hook_duration:.2f}s hook video (NATURAL SPEED)") |
|
|
|
|
|
|
|
|
start_segment_begin = max(0, hook_duration - HOOK_VIDEO_DURATION) |
|
|
hook_start = hook_clip.subclip(start_segment_begin, hook_duration) |
|
|
logger.debug(f"β Hook start: {hook_start.duration:.2f}s") |
|
|
|
|
|
|
|
|
hook_end_duration = min(HOOK_VIDEO_DURATION, hook_duration) |
|
|
hook_end = hook_clip.subclip(0, hook_end_duration) |
|
|
logger.debug(f"β Hook end: {hook_end.duration:.2f}s") |
|
|
|
|
|
hook_clip.close() |
|
|
else: |
|
|
|
|
|
logger.debug("πΉ No hook video - concatenating segments directly") |
|
|
|
|
|
|
|
|
selected_videos = assets["selected_videos"] |
|
|
all_tts_script_segment = " ".join( |
|
|
v.get("tts_script_segment", "").strip() |
|
|
for v in selected_videos |
|
|
if v.get("tts_script_segment") |
|
|
) |
|
|
|
|
|
with AudioFileClip(assets["tts_audio_data"]["local_path"]) as audio_clip: |
|
|
original_duration = audio_clip.duration |
|
|
utils.calculate_video_durations(selected_videos, all_tts_script_segment, assets["timed_transcript"], original_duration) |
|
|
|
|
|
target_size = (1080, 1920) |
|
|
extra_secs = 0.0 |
|
|
HOOK_START_ORIGINAL_CLIP_USED = 0 |
|
|
|
|
|
for i, lib_video in enumerate(selected_videos): |
|
|
if lib_video.get("local_path"): |
|
|
try: |
|
|
lib_clip = VideoFileClip(lib_video["local_path"]) |
|
|
original_clip = lib_clip |
|
|
lib_hook_start = None |
|
|
lib_hook_end = None |
|
|
prev_clip = None |
|
|
prev_clip_file = None |
|
|
|
|
|
|
|
|
if hook_start and i == 0: |
|
|
lib_hook_start = hook_start |
|
|
original_clip = runway_as_second_ai_video if runway_as_second_ai_video else original_clip |
|
|
if i == 1 and get_config_value("use_veo", False) and runway_as_second_ai_video: |
|
|
if HOOK_START_ORIGINAL_CLIP_USED < runway_as_second_ai_video.duration-0.5: |
|
|
original_clip = runway_as_second_ai_video.subclip(HOOK_START_ORIGINAL_CLIP_USED, runway_as_second_ai_video.duration) |
|
|
if hook_end and i+1 == len(assets.get("selected_videos", [])): |
|
|
lib_hook_end = hook_end |
|
|
if len(clips) > 0: |
|
|
prev_clip = clips[-1][1] |
|
|
prev_clip_file = selected_videos[-2]["local_path"] if len(selected_videos) > 1 else None |
|
|
|
|
|
prev_clip, lib_clip, extra_secs = await self._prepare_clip( |
|
|
lib_video=lib_video, |
|
|
original_clip_path=lib_video["local_path"], |
|
|
alternate_url_local_path=lib_video.get("alternate_url_local_path"), |
|
|
original_clip=original_clip, |
|
|
lib_hook_start=lib_hook_start, |
|
|
lib_hook_end=lib_hook_end, |
|
|
target_duration=lib_video["duration"], |
|
|
extra_secs=extra_secs, |
|
|
prev_clip=prev_clip, |
|
|
prev_clip_file=prev_clip_file |
|
|
) |
|
|
if extra_secs > 0: |
|
|
logger.debug(f"β±οΈ Added {extra_secs:.2f}s extra to match target duration ({lib_video['duration']:.2f}s)") |
|
|
if prev_clip and len(clips) > 0: |
|
|
clip_name, _ = clips[-1] |
|
|
clips[-1] = (clip_name, prev_clip) |
|
|
|
|
|
lib_clip = lib_clip.without_audio() |
|
|
clips.append((f"library_{i}", lib_clip)) |
|
|
video_usage = get_config_value("video_usage_count", {}) |
|
|
video_usage[lib_video['url']] = video_usage.get(lib_video['url'], 0) + 1 |
|
|
set_config_value("video_usage_count", video_usage) |
|
|
logger.debug(f"β Loaded library video {i}: {lib_clip.duration:.2f}s (NATURAL SPEED)") |
|
|
except Exception as e: |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
logger.error(f"β Failed to load library video {i}: {e}") |
|
|
raise |
|
|
else: |
|
|
logger.warning(f"β οΈ Library video {i} missing local_path") |
|
|
|
|
|
return [clip for _, clip in clips] |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"β Failed to prepare video clips: {e}") |
|
|
for name, clip in clips: |
|
|
try: |
|
|
clip.close() |
|
|
except: |
|
|
pass |
|
|
raise |
|
|
|
|
|
async def _prepare_clip(self, lib_video, original_clip_path, alternate_url_local_path, original_clip, lib_hook_start, lib_hook_end, target_duration: float, extra_secs, prev_clip, prev_clip_file): |
|
|
|
|
|
if target_duration <= 0: |
|
|
raise ValueError(f"Invalid target_duration: {target_duration}") |
|
|
if not original_clip_path or not original_clip: |
|
|
raise ValueError("Missing required clip parameters") |
|
|
|
|
|
logger.debug(f"Preparing clip for target duration {target_duration:.2f}s") |
|
|
|
|
|
|
|
|
if lib_hook_start: |
|
|
return self._prepare_with_start_hook( |
|
|
lib_video, original_clip_path, alternate_url_local_path, original_clip, lib_hook_start, |
|
|
target_duration, prev_clip |
|
|
) |
|
|
|
|
|
|
|
|
elif lib_hook_end: |
|
|
return self._prepare_with_end_hook( |
|
|
lib_video, original_clip_path, alternate_url_local_path, original_clip, lib_hook_end, |
|
|
target_duration, extra_secs, prev_clip, prev_clip_file |
|
|
) |
|
|
|
|
|
|
|
|
else: |
|
|
logger.debug("No hooks detected, adjusting original clip duration only") |
|
|
result, extra_secs = self._extend_or_trim_clip(lib_video, original_clip_path, alternate_url_local_path, original_clip, target_duration) |
|
|
return prev_clip, result, extra_secs |
|
|
|
|
|
def _prepare_with_start_hook(self, lib_video, original_clip_path, alternate_url_local_path, original_clip, lib_hook_start, target_duration, prev_clip): |
|
|
"""Handle clip preparation when a start hook is present.""" |
|
|
global HOOK_START_ORIGINAL_CLIP_USED |
|
|
logger.debug(f"Start hook detected with duration {lib_hook_start.duration:.2f}s") |
|
|
total_duration = lib_hook_start.duration + original_clip.duration |
|
|
|
|
|
|
|
|
if target_duration <= total_duration: |
|
|
logger.debug("Target duration fits start hook + original clip, concatenating and trimming") |
|
|
result = concatenate_videoclips([lib_hook_start, original_clip], method="compose").subclip(0, target_duration) |
|
|
logger.debug(f"Prepared clip duration: {result.duration:.2f}s") |
|
|
HOOK_START_ORIGINAL_CLIP_USED = max(0, target_duration - lib_hook_start.duration) |
|
|
return prev_clip, result, 0.0 |
|
|
|
|
|
|
|
|
modified_clip, extra_secs = self._extend_or_trim_clip(lib_video, original_clip_path, alternate_url_local_path, original_clip, target_duration-HOOK_VIDEO_DURATION) |
|
|
|
|
|
result = concatenate_videoclips([lib_hook_start, modified_clip], method="compose").subclip(0, target_duration) |
|
|
HOOK_START_ORIGINAL_CLIP_USED = max(0, target_duration - lib_hook_start.duration) |
|
|
logger.debug(HOOK_START_ORIGINAL_CLIP_USED) |
|
|
logger.debug(f"Prepared clip duration: {result.duration:.2f}s") |
|
|
return prev_clip, result, extra_secs |
|
|
|
|
|
|
|
|
def _prepare_with_end_hook(self, lib_video, original_clip_path, alternate_url_local_path, original_clip, lib_hook_end, |
|
|
target_duration, extra_secs, prev_clip, prev_clip_file): |
|
|
"""Handle clip preparation when an end hook is present.""" |
|
|
temp_original_clip = original_clip |
|
|
logger.debug(f"End hook detected with duration {lib_hook_end.duration:.2f}s") |
|
|
total_duration = original_clip.duration + lib_hook_end.duration |
|
|
logger.debug(f"Combined original + end hook duration: {total_duration:.2f}s") |
|
|
|
|
|
cur_extra_secs = 0.0 |
|
|
|
|
|
if target_duration <= total_duration: |
|
|
trim_duration = target_duration - lib_hook_end.duration |
|
|
|
|
|
if trim_duration > 0: |
|
|
logger.debug(f"Trimming original clip from {original_clip.duration:.2f}s to {trim_duration:.2f}s to fit end hook") |
|
|
original_clip = original_clip.subclip(0, trim_duration) |
|
|
cur_extra_secs = 0.0 |
|
|
else: |
|
|
|
|
|
start_trim = max(0, lib_hook_end.duration - target_duration) |
|
|
result = lib_hook_end.subclip(start_trim, lib_hook_end.duration) |
|
|
logger.debug(f"Prepared end-only clip: {result.duration:.2f}s") |
|
|
return prev_clip, result, 0.0 |
|
|
|
|
|
|
|
|
elif target_duration > total_duration: |
|
|
remaining = target_duration - lib_hook_end.duration |
|
|
logger.debug(f"Original + end hook too short, need to extend original by {remaining:.2f}s") |
|
|
|
|
|
original_clip, cur_extra_secs = self._extend_or_trim_clip(lib_video, original_clip_path, alternate_url_local_path, original_clip, remaining) |
|
|
|
|
|
|
|
|
logger.debug("Concatenating original clip and end hook") |
|
|
|
|
|
|
|
|
if original_clip.duration < 1: |
|
|
if original_clip.duration + extra_secs > 1: |
|
|
|
|
|
possible_new_duration = original_clip.duration + extra_secs |
|
|
new_duration = min(possible_new_duration, temp_original_clip.duration) |
|
|
used_extra = max(0.0, new_duration - original_clip.duration) |
|
|
|
|
|
logger.debug( |
|
|
f"Extending original clip from {original_clip.duration:.2f}s β {new_duration:.2f}s " |
|
|
f"(used_extra={used_extra:.2f}s, available_extra={extra_secs:.2f}s)" |
|
|
) |
|
|
|
|
|
|
|
|
original_clip = temp_original_clip.subclip(0, new_duration) |
|
|
|
|
|
|
|
|
new_prev_duration = prev_clip.duration - used_extra |
|
|
logger.debug( |
|
|
f"βοΈ Trimming previous clip by {used_extra:.2f}s β new duration {new_prev_duration:.2f}s" |
|
|
) |
|
|
prev_clip = prev_clip.subclip(0, new_prev_duration) |
|
|
|
|
|
result = concatenate_videoclips([original_clip, lib_hook_end], method="compose").subclip(0, target_duration) |
|
|
cur_extra_secs = 0.0 |
|
|
|
|
|
else: |
|
|
if prev_clip: |
|
|
logger.debug("β οΈ Original clip too short, extending previous clip instead") |
|
|
prev_clip, extra_secs = self._extend_or_trim_clip(lib_video, None, alternate_url_local_path, prev_clip, prev_clip.duration + original_clip.duration) |
|
|
result = lib_hook_end.subclip(max(0, lib_hook_end.duration - target_duration), lib_hook_end.duration) |
|
|
|
|
|
else: |
|
|
result = concatenate_videoclips([original_clip, lib_hook_end], method="compose").subclip(0, target_duration) |
|
|
|
|
|
logger.debug(f"Prepared clip duration: {result.duration:.2f}s") |
|
|
return prev_clip, result, cur_extra_secs |
|
|
|
|
|
|
|
|
def _extend_or_trim_clip(self, lib_video, original_clip_path, alternate_url_local_path, original_clip, target_duration): |
|
|
""" |
|
|
Extend or trim a clip to match target duration. |
|
|
|
|
|
Returns: |
|
|
VideoFileClip: The adjusted clip |
|
|
""" |
|
|
total_duration = original_clip.duration |
|
|
|
|
|
|
|
|
if abs(target_duration - total_duration) < 0.01: |
|
|
return original_clip, 0.0 |
|
|
|
|
|
|
|
|
if target_duration <= total_duration: |
|
|
logger.debug("Target duration fits original clip, trimming") |
|
|
return original_clip.subclip(0, target_duration), 0.0 |
|
|
|
|
|
|
|
|
elif target_duration > total_duration: |
|
|
if alternate_url_local_path is None or (target_duration - total_duration <= REVERSE_THRESHOLD): |
|
|
logger.debug("β οΈ Reversing clip.") |
|
|
|
|
|
|
|
|
|
|
|
reversed_clip = self.reverse_clip(original_clip) |
|
|
loop_clip = concatenate_videoclips([original_clip, reversed_clip, original_clip, reversed_clip], method="compose") |
|
|
return loop_clip.subclip(0, target_duration), target_duration - original_clip.duration |
|
|
else: |
|
|
logger.debug("β οΈ Using extra clip.") |
|
|
video_usage = get_config_value("video_usage_count", {}) |
|
|
video_usage[lib_video['alternate_url']] = video_usage.get(lib_video['alternate_url'], 0) + 1 |
|
|
set_config_value("video_usage_count", video_usage) |
|
|
alternate_clip = VideoFileClip(alternate_url_local_path) |
|
|
reverse_alternate_clip = self.reverse_clip(alternate_url_local_path) |
|
|
|
|
|
combined = concatenate_videoclips([original_clip, alternate_clip, reverse_alternate_clip, original_clip], method="compose") |
|
|
result = combined.subclip(0, target_duration) |
|
|
extra_secs = max(0.0, target_duration - original_clip.duration - alternate_clip.duration) |
|
|
return result, extra_secs |
|
|
|
|
|
def _extend_clip_to_duration(self, original_clip_path, original_clip, target_duration): |
|
|
""" |
|
|
Extend a clip to target duration using interpolation, looping, or ping-pong. |
|
|
|
|
|
Returns: |
|
|
VideoFileClip: The extended clip |
|
|
""" |
|
|
|
|
|
interpolated = None |
|
|
try: |
|
|
interpolated_file = utils.interpolate_video(original_clip_path) |
|
|
if interpolated_file: |
|
|
interpolated = VideoFileClip(interpolated_file) |
|
|
|
|
|
if interpolated.duration >= target_duration: |
|
|
logger.debug("Using interpolated clip for extension") |
|
|
result = interpolated.subclip(0, target_duration) |
|
|
logger.debug(f"Prepared clip duration: {result.duration:.2f}s") |
|
|
return result |
|
|
|
|
|
|
|
|
logger.debug(f"Interpolated clip ({interpolated.duration:.2f}s) still too short") |
|
|
interpolated.close() |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Interpolation failed: {e}") |
|
|
if interpolated: |
|
|
interpolated.close() |
|
|
|
|
|
|
|
|
if utils.is_video_loopable(original_clip_path) or utils.is_loopable_phash(original_clip_path): |
|
|
logger.debug("Original clip is loopable, creating loop extension") |
|
|
loop_clip = self.loop_clip(original_clip, target_duration) |
|
|
result = loop_clip.subclip(0, target_duration) |
|
|
logger.debug(f"Prepared clip duration: {result.duration:.2f}s") |
|
|
return result |
|
|
elif utils.is_video_zoomable_tail(original_clip): |
|
|
loop_clip = self.zoom_clip(original_clip, target_duration) |
|
|
|
|
|
|
|
|
logger.debug("Using ping-pong reverse looping as fallback for extension") |
|
|
reversed_clip = self.reverse_clip(original_clip_path) |
|
|
loop_clip = concatenate_videoclips([original_clip, reversed_clip, original_clip, reversed_clip], method="compose") |
|
|
result = loop_clip.subclip(0, target_duration) |
|
|
logger.debug(f"Prepared clip duration: {result.duration:.2f}s") |
|
|
return result |
|
|
|
|
|
def reverse_clip(self, clip_path): |
|
|
reversed_clip = VideoFileClip(utils_reverse_clip(clip_path)) |
|
|
return reversed_clip |
|
|
|
|
|
def loop_clip(self, clip, target_duration): |
|
|
loop_count = int(target_duration // clip.duration) + 1 |
|
|
looped = [clip] * loop_count |
|
|
combined = concatenate_videoclips(looped, method="compose") |
|
|
final_clip = combined.subclip(0, target_duration) |
|
|
logger.debug(f"β»οΈ Looping clip {loop_count}x to reach {target_duration:.2f}s") |
|
|
return final_clip |
|
|
|
|
|
def zoom_clip(self, clip, target_duration, zoom_strength): |
|
|
|
|
|
freeze_duration = target_duration - clip.duration |
|
|
|
|
|
|
|
|
frozen_frame = clip.to_ImageClip(t=clip.duration - 0.01).set_duration(freeze_duration) |
|
|
|
|
|
|
|
|
extended_clip = concatenate_videoclips([clip, frozen_frame]) |
|
|
|
|
|
|
|
|
def apply_zoom(get_frame, t): |
|
|
frame = get_frame(t) |
|
|
zoom_factor = 1 + (zoom_strength - 1) * (t / target_duration) |
|
|
|
|
|
h, w = frame.shape[:2] |
|
|
new_h, new_w = int(h * zoom_factor), int(w * zoom_factor) |
|
|
|
|
|
|
|
|
from scipy.ndimage import zoom as scipy_zoom |
|
|
zoomed_frame = scipy_zoom(frame, (zoom_factor, zoom_factor, 1), order=1) |
|
|
|
|
|
|
|
|
y_start = (zoomed_frame.shape[0] - h) // 2 |
|
|
x_start = (zoomed_frame.shape[1] - w) // 2 |
|
|
cropped = zoomed_frame[y_start:y_start+h, x_start:x_start+w] |
|
|
|
|
|
return cropped |
|
|
|
|
|
zoomed = extended_clip.fl(apply_zoom) |
|
|
return zoomed |
|
|
|
|
|
async def _create_video_sequence_natural_speed( |
|
|
self, video_clips: List[VideoFileClip], video_config: Optional[Dict] |
|
|
) -> VideoFileClip: |
|
|
"""Create video sequence from clips - NO SLOW MOTION, allow longer duration""" |
|
|
try: |
|
|
if not video_clips: |
|
|
raise ValueError("No video clips available for sequence") |
|
|
|
|
|
|
|
|
TARGET_MIN_DURATION = 11.0 |
|
|
TARGET_MAX_DURATION = 15.0 |
|
|
|
|
|
|
|
|
hook_start = None |
|
|
hook_end = None |
|
|
library_clips = [] |
|
|
|
|
|
for clip in video_clips: |
|
|
library_clips.append(clip) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
library_duration = sum(clip.duration for clip in library_clips) |
|
|
|
|
|
hook_total = 0 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (hook_total + library_duration) < TARGET_MIN_DURATION: |
|
|
logger.debug(f"π₯ Duration too short ({hook_total + library_duration:.2f}s), adding more videos...") |
|
|
|
|
|
additional_clips_needed = [] |
|
|
current_duration = hook_total + library_duration |
|
|
|
|
|
while current_duration < TARGET_MIN_DURATION and library_clips: |
|
|
|
|
|
for clip in library_clips: |
|
|
if current_duration < TARGET_MIN_DURATION: |
|
|
additional_clips_needed.append(clip) |
|
|
current_duration += clip.duration |
|
|
else: |
|
|
break |
|
|
|
|
|
library_clips.extend(additional_clips_needed) |
|
|
logger.debug( |
|
|
f"β Added {len(additional_clips_needed)} additional clips, new duration: {current_duration:.2f}s" |
|
|
) |
|
|
|
|
|
|
|
|
total_current_duration = hook_total + sum(clip.duration for clip in library_clips) |
|
|
|
|
|
if total_current_duration > TARGET_MAX_DURATION: |
|
|
|
|
|
speed_factor = total_current_duration / TARGET_MAX_DURATION |
|
|
if speed_factor > 1.1: |
|
|
logger.debug(f"β‘ Slightly speeding up clips by {speed_factor:.2f}x to fit target") |
|
|
adjusted_library = [] |
|
|
for clip in library_clips: |
|
|
sped_clip = clip.fx(lambda c: c.speedx(speed_factor)) |
|
|
adjusted_library.append(sped_clip) |
|
|
library_clips = adjusted_library |
|
|
else: |
|
|
logger.debug("β Duration within acceptable range, keeping natural speed") |
|
|
else: |
|
|
logger.debug("β
Keeping all videos at natural speed") |
|
|
|
|
|
|
|
|
sequence_clips = [] |
|
|
|
|
|
if hook_start: |
|
|
sequence_clips.append(hook_start) |
|
|
logger.debug(f" Added hook_start: {hook_start.duration:.2f}s") |
|
|
|
|
|
for i, clip in enumerate(library_clips): |
|
|
sequence_clips.append(clip) |
|
|
logger.debug(f" Added library_{i}: {clip.duration:.2f}s") |
|
|
|
|
|
if hook_end: |
|
|
sequence_clips.append(hook_end) |
|
|
logger.debug(f" Added hook_end: {hook_end.duration:.2f}s") |
|
|
|
|
|
|
|
|
total_duration = sum(clip.duration for clip in sequence_clips) |
|
|
logger.debug( |
|
|
f"π Total video sequence duration: {total_duration:.2f}s (target: {TARGET_MIN_DURATION}-{TARGET_MAX_DURATION}s)" |
|
|
) |
|
|
|
|
|
|
|
|
final_sequence = concatenate_videoclips(sequence_clips, method="compose") |
|
|
logger.debug(f"β
Created video sequence with NATURAL SPEED: {final_sequence.duration:.2f}s") |
|
|
|
|
|
return final_sequence |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"β Failed to create video sequence: {e}") |
|
|
raise |
|
|
|
|
|
async def _add_timed_subtitles(self, video_path: str, group_all: bool = False, position: int = None) -> CompositeVideoClip: |
|
|
"""Add subtitles using timed transcript from Speech-to-Text""" |
|
|
try: |
|
|
assets = get_config_value("visual_assets") |
|
|
timed_words = assets.get("timed_transcript", []) |
|
|
|
|
|
if not timed_words: |
|
|
logger.warning("β οΈ No timed transcript available, using fallback subtitles") |
|
|
raise ValueError("No timed transcript available for subtitles") |
|
|
|
|
|
|
|
|
logger.debug(f"π Creating timed subtitles from {len(timed_words)} words") |
|
|
video_clip = VideoFileClip(video_path) |
|
|
text_clips = [] |
|
|
total_duration = video_clip.duration |
|
|
target_width, target_height = video_clip.size |
|
|
|
|
|
|
|
|
phrases = group_words_by_time_and_width(timed_words, target_width, target_height, max_words_per_group = 2, group_all=group_all) |
|
|
logger.debug(f"π Created {len(phrases)} timed phrases") |
|
|
|
|
|
for i, phrase in enumerate(phrases): |
|
|
word_timings = phrase["word_timings"] |
|
|
start_time = phrase["start_time"] |
|
|
end_time = phrase["end_time"] |
|
|
if i+1 < len(phrases): |
|
|
phrase_duration = phrases[i+1]["start_time"] - start_time |
|
|
else: |
|
|
phrase_duration = total_duration - start_time |
|
|
|
|
|
text = phrase["text"] |
|
|
|
|
|
if phrase_duration <= 0: |
|
|
continue |
|
|
|
|
|
|
|
|
text_clip = self._create_styled_text_clip( |
|
|
text=text, |
|
|
duration=phrase_duration, |
|
|
target_width=target_width, |
|
|
target_height=target_height, |
|
|
start_time=start_time, |
|
|
word_timings=word_timings |
|
|
) |
|
|
if text_clip: |
|
|
text_clips.append(text_clip) |
|
|
|
|
|
logger.debug(f"π Created {len(text_clips)} timed subtitle clips") |
|
|
|
|
|
if text_clips: |
|
|
final_video = CompositeVideoClip([video_clip] + text_clips) |
|
|
logger.debug(f"β
Added {len(text_clips)} timed subtitle segments") |
|
|
return await self._render_video_only(final_video.subclip(0, total_duration)) |
|
|
else: |
|
|
return await self._render_video_only(video_clip) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"β Failed to add timed subtitles: {e}") |
|
|
raise |
|
|
|
|
|
|
|
|
def _create_styled_text_clip( |
|
|
self, text: str, duration: float, target_width: int, target_height: int, start_time: float, word_timings=None |
|
|
) -> Optional[TextClip]: |
|
|
"""Create a styled text clip with IMPROVED appearance""" |
|
|
try: |
|
|
|
|
|
max_chars_per_line = 22 |
|
|
wrapped_text = "\n".join(textwrap.wrap(text, width=max_chars_per_line)) |
|
|
line_count = len(wrapped_text.split("\n")) |
|
|
|
|
|
|
|
|
if line_count > 2: |
|
|
fontsize = 55 |
|
|
elif line_count > 1: |
|
|
fontsize = 75 |
|
|
else: |
|
|
fontsize = 85 |
|
|
|
|
|
text_clip = create_text_clip( |
|
|
text, |
|
|
duration, |
|
|
target_width, |
|
|
target_height, |
|
|
start_time, |
|
|
fontsize, |
|
|
word_timings, |
|
|
**self.cap_method() |
|
|
) |
|
|
return text_clip |
|
|
|
|
|
text_clip = TextClip( |
|
|
txt=wrapped_text, |
|
|
fontsize=fontsize, |
|
|
color="white", |
|
|
font="Fonts/Bungee-Regular.ttf", |
|
|
stroke_color="black", |
|
|
stroke_width=5, |
|
|
method="caption", |
|
|
size=(int(target_width * 0.90), None), |
|
|
align="center", |
|
|
) |
|
|
|
|
|
|
|
|
vertical_position = int(target_height * 0.65) |
|
|
text_clip = text_clip.set_position(("center", vertical_position)) |
|
|
text_clip = text_clip.set_start(start_time) |
|
|
text_clip = text_clip.set_duration(duration) |
|
|
|
|
|
|
|
|
fade_duration = min(0.15, duration / 6) |
|
|
text_clip = text_clip.crossfadein(fade_duration).crossfadeout(fade_duration) |
|
|
|
|
|
return text_clip |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"β Failed to create styled text clip: {e}") |
|
|
raise |
|
|
|
|
|
def _group_words_into_timed_phrases(self, words: List[Dict]) -> List[Dict]: |
|
|
"""Group timed words into readable phrases""" |
|
|
phrases = [] |
|
|
current_phrase = [] |
|
|
current_start = None |
|
|
|
|
|
for word in words: |
|
|
word_text = word["word"] |
|
|
start_time = word["start_time"] |
|
|
end_time = word["end_time"] |
|
|
|
|
|
if not current_phrase: |
|
|
current_start = start_time |
|
|
|
|
|
current_phrase.append(word_text) |
|
|
|
|
|
|
|
|
has_punctuation = any(p in word_text for p in [".", ",", "!", "?", ";", ":"]) |
|
|
too_long = len(current_phrase) >= 4 |
|
|
|
|
|
if has_punctuation or too_long: |
|
|
phrases.append({"text": " ".join(current_phrase), "start_time": current_start, "end_time": end_time}) |
|
|
current_phrase = [] |
|
|
current_start = None |
|
|
|
|
|
|
|
|
if current_phrase and current_start is not None: |
|
|
phrases.append( |
|
|
{ |
|
|
"text": " ".join(current_phrase), |
|
|
"start_time": current_start, |
|
|
"end_time": words[-1]["end_time"] if words else current_start + 2.0, |
|
|
} |
|
|
) |
|
|
|
|
|
return phrases |
|
|
|
|
|
async def _add_fallback_subtitles(self, video_clip: VideoFileClip, script: str) -> CompositeVideoClip: |
|
|
"""Fallback subtitles when timed transcript is unavailable""" |
|
|
try: |
|
|
words = self._split_script_into_words(script) |
|
|
text_clips = [] |
|
|
total_duration = video_clip.duration |
|
|
target_width, target_height = video_clip.size |
|
|
|
|
|
logger.debug(f"π Using fallback subtitles: {len(words)} words") |
|
|
|
|
|
|
|
|
phrases = group_words_by_time_and_width(script, target_width, target_height, max_words_per_group=self.max_words_per_group) |
|
|
logger.debug(f"π Grouped into {len(phrases)} phrases") |
|
|
|
|
|
duration_per_phrase = total_duration / len(phrases) |
|
|
|
|
|
for i, phrase in enumerate(phrases): |
|
|
start_time = i * duration_per_phrase |
|
|
phrase_duration = duration_per_phrase |
|
|
|
|
|
text_clip = self._create_styled_text_clip( |
|
|
text=phrase, |
|
|
duration=phrase_duration, |
|
|
target_width=target_width, |
|
|
target_height=target_height, |
|
|
start_time=start_time, |
|
|
) |
|
|
|
|
|
if text_clip: |
|
|
text_clips.append(text_clip) |
|
|
|
|
|
logger.debug(f"π Created {len(text_clips)} fallback subtitle clips") |
|
|
|
|
|
if text_clips: |
|
|
final_video = CompositeVideoClip([video_clip] + text_clips) |
|
|
logger.debug(f"β
Added {len(text_clips)} fallback subtitle segments") |
|
|
return final_video |
|
|
else: |
|
|
return video_clip |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"β Fallback subtitles failed: {e}") |
|
|
raise |
|
|
|
|
|
def _split_script_into_words(self, script: str) -> List[str]: |
|
|
"""Split script into individual words""" |
|
|
import re |
|
|
|
|
|
script = re.sub(r"\s+", " ", script).strip() |
|
|
return script.split() |
|
|
|
|
|
def _group_words_into_phrases(self, words: List[str], max_words: int = 3) -> List[str]: |
|
|
"""Group words into small readable phrases""" |
|
|
phrases = [] |
|
|
current_phrase = [] |
|
|
|
|
|
for word in words: |
|
|
current_phrase.append(word) |
|
|
has_punctuation = any(p in word for p in [".", ",", "!", "?", ";"]) |
|
|
|
|
|
if len(current_phrase) >= max_words or has_punctuation: |
|
|
phrases.append(" ".join(current_phrase)) |
|
|
current_phrase = [] |
|
|
|
|
|
if current_phrase: |
|
|
phrases.append(" ".join(current_phrase)) |
|
|
|
|
|
return phrases |
|
|
|
|
|
async def add_audio_to_video(self, video_path: str) -> str: |
|
|
""" |
|
|
Add audio track to pre-rendered video (NO speedup - video is already correct duration) |
|
|
""" |
|
|
try: |
|
|
assets = get_config_value("visual_assets") |
|
|
logger.debug("π Adding audio to rendered video") |
|
|
|
|
|
|
|
|
video_clip = VideoFileClip(video_path) |
|
|
|
|
|
|
|
|
audio_clips = await self._prepare_audio_clips(assets, video_clip.duration) |
|
|
|
|
|
|
|
|
video_with_audio = await self._add_audio_track(video_clip, audio_clips) |
|
|
|
|
|
output_path = await self.render_video_final(video_with_audio) |
|
|
|
|
|
|
|
|
video_clip.close() |
|
|
if video_with_audio != video_clip: |
|
|
video_with_audio.close() |
|
|
|
|
|
logger.debug(f"β
Final video with audio: {output_path}") |
|
|
return output_path |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"β Failed to add audio to video: {e}") |
|
|
raise |
|
|
|
|
|
def get_audio_rms(self, audio_clip, sample_duration=1.0): |
|
|
"""Calculate RMS of an audio clip by sampling the first few seconds.""" |
|
|
try: |
|
|
duration = min(sample_duration, audio_clip.duration) |
|
|
audio_segment = audio_clip.subclip(0, duration) |
|
|
audio_array = audio_segment.to_soundarray(fps=44100) |
|
|
return np.sqrt(np.mean(audio_array ** 2)) |
|
|
except Exception as e: |
|
|
logger.warning(f"β οΈ Failed to compute RMS: {e}") |
|
|
return 0.0 |
|
|
|
|
|
|
|
|
def calculate_bg_volume(self, main_rms, bg_rms): |
|
|
""" |
|
|
Dynamically calculate background music volume based on TTS and background RMS. |
|
|
Tuned for boosted TTS (+5 dB from Google TTS). |
|
|
If main_rms is 0 (no TTS), treat background as main audio with high volume. |
|
|
""" |
|
|
|
|
|
|
|
|
if main_rms == 0 or main_rms < 0.001: |
|
|
logger.debug("π No main audio detected, treating background as primary audio with high volume") |
|
|
return 0.85 |
|
|
|
|
|
|
|
|
if main_rms > 0.04: |
|
|
base_volume = 0.35 |
|
|
elif main_rms > 0.02: |
|
|
base_volume = 0.45 |
|
|
else: |
|
|
base_volume = 0.55 |
|
|
|
|
|
|
|
|
if bg_rms > 0.15: |
|
|
bg_volume = base_volume * 0.25 |
|
|
elif bg_rms > 0.08: |
|
|
bg_volume = base_volume * 0.45 |
|
|
elif bg_rms > 0.03: |
|
|
bg_volume = base_volume * 0.7 |
|
|
elif bg_rms > 0.01: |
|
|
bg_volume = base_volume * 0.9 |
|
|
else: |
|
|
bg_volume = base_volume * 1.1 |
|
|
|
|
|
|
|
|
return max(0.0, min(bg_volume, ALLOWED_BG_MUSIC_VOLUME)) |
|
|
|
|
|
|
|
|
async def _prepare_audio_clips(self, assets: Dict, target_duration: float) -> List[AudioFileClip]: |
|
|
""" |
|
|
Load TTS and background music, trim to match video, and dynamically adjust background volume. |
|
|
""" |
|
|
clips = [] |
|
|
|
|
|
try: |
|
|
|
|
|
tts_clip = None |
|
|
if assets.get("tts_audio_data") and assets["tts_audio_data"].get("local_path"): |
|
|
try: |
|
|
tts_clip = AudioFileClip(assets["tts_audio_data"]["local_path"]) |
|
|
if tts_clip.duration > 0: |
|
|
if tts_clip.duration > target_duration: |
|
|
logger.debug(f"β οΈ Trimming TTS: {tts_clip.duration:.2f}s β {target_duration:.2f}s") |
|
|
tts_clip = tts_clip.subclip(0, target_duration) |
|
|
elif tts_clip.duration < target_duration: |
|
|
logger.debug(f"β οΈ TTS shorter: {tts_clip.duration:.2f}s < {target_duration:.2f}s") |
|
|
|
|
|
clips.append(("tts", tts_clip)) |
|
|
logger.debug(f"β Loaded TTS at full volume ({tts_clip.duration:.2f}s)") |
|
|
else: |
|
|
logger.warning("β οΈ TTS audio has zero duration") |
|
|
tts_clip.close() |
|
|
except Exception as e: |
|
|
logger.error(f"β Failed to load TTS audio: {e}") |
|
|
raise |
|
|
|
|
|
|
|
|
if assets.get("background_music_local"): |
|
|
try: |
|
|
bg_clip = AudioFileClip(assets["background_music_local"]) |
|
|
if bg_clip.duration > 0: |
|
|
if bg_clip.duration > target_duration: |
|
|
bg_clip = bg_clip.subclip(0, target_duration) |
|
|
logger.debug(f"β Trimmed background to {target_duration:.2f}s") |
|
|
|
|
|
|
|
|
tts_rms = self.get_audio_rms(tts_clip) if tts_clip else 0.0 |
|
|
bg_rms = self.get_audio_rms(bg_clip) |
|
|
|
|
|
|
|
|
bg_volume = self.calculate_bg_volume(tts_rms, bg_rms) |
|
|
bg_clip = bg_clip.volumex(bg_volume) |
|
|
|
|
|
clips.append(("background", bg_clip)) |
|
|
logger.debug( |
|
|
f"β Loaded background (RMS: {bg_rms:.4f}) at dynamic volume {bg_volume:.2f} ({bg_clip.duration:.2f}s)" |
|
|
) |
|
|
else: |
|
|
logger.warning("β οΈ Background music has zero duration") |
|
|
bg_clip.close() |
|
|
except Exception as e: |
|
|
logger.error(f"β Failed to load background music: {e}") |
|
|
raise |
|
|
|
|
|
return [clip for _, clip in clips] |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"β Failed to prepare audio clips: {e}") |
|
|
for name, clip in clips: |
|
|
try: |
|
|
clip.close() |
|
|
except: |
|
|
pass |
|
|
raise |
|
|
|
|
|
async def _add_audio_track(self, video_clip: VideoFileClip, audio_clips: List[AudioFileClip]) -> VideoFileClip: |
|
|
"""Add full audio track""" |
|
|
if not audio_clips: |
|
|
return video_clip |
|
|
|
|
|
try: |
|
|
valid_audio_clips = [clip for clip in audio_clips if clip.duration > 0] |
|
|
|
|
|
if not valid_audio_clips: |
|
|
return video_clip |
|
|
|
|
|
mixed_audio = CompositeAudioClip(valid_audio_clips) |
|
|
mixed_audio = mixed_audio.subclip(0, min(video_clip.duration, mixed_audio.duration)) |
|
|
video_with_audio = video_clip.set_audio(mixed_audio) |
|
|
|
|
|
logger.debug(f"β
Added audio track") |
|
|
return video_with_audio |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"β Failed to add audio track: {e}") |
|
|
raise |
|
|
|
|
|
async def render_video_final(self, video_clip) -> str: |
|
|
"""Render final video clip to file""" |
|
|
try: |
|
|
output_path = self.temp_dir / f"{uuid.uuid4().hex}.mp4" |
|
|
|
|
|
video_clip.write_videofile( |
|
|
str(output_path), |
|
|
codec="libx264", |
|
|
audio_codec="aac", |
|
|
fps=25, |
|
|
verbose=False, |
|
|
logger=None, |
|
|
ffmpeg_params=["-pix_fmt", "yuv420p"] |
|
|
) |
|
|
|
|
|
video_clip.close() |
|
|
return str(output_path) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Final video render failed: {e}") |
|
|
if "video_clip" in locals(): |
|
|
video_clip.close() |
|
|
raise |
|
|
|
|
|
async def _render_video_only(self, video_clip: VideoFileClip) -> str: |
|
|
"""Render video WITHOUT audio""" |
|
|
unique_id = uuid.uuid4().hex[:8] |
|
|
filename = f"video_no_audio_{unique_id}.mp4" |
|
|
output_path = self.temp_dir / filename |
|
|
|
|
|
try: |
|
|
logger.debug(f"πΉ Rendering video (no audio): {filename}") |
|
|
|
|
|
video_clip.write_videofile( |
|
|
str(output_path), |
|
|
codec="libx264", |
|
|
fps=25, |
|
|
verbose=False, |
|
|
logger=None, |
|
|
ffmpeg_params=["-pix_fmt", "yuv420p"] |
|
|
) |
|
|
|
|
|
return str(output_path) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"β Video rendering failed: {e}") |
|
|
raise |
|
|
finally: |
|
|
video_clip.close() |
|
|
|
|
|
def compress(self, input_path: str): |
|
|
""" |
|
|
Compress video without losing visible quality. |
|
|
Keeps full HD resolution. |
|
|
""" |
|
|
try: |
|
|
stem = Path(input_path).stem |
|
|
output_path = f"/tmp/{stem}_compressed.mp4" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cmd = [ |
|
|
"ffmpeg", "-y", |
|
|
"-i", input_path, |
|
|
"-vf", "scale=1080:1920", |
|
|
"-c:v", "libx264", |
|
|
"-preset", "slow", |
|
|
"-crf", "26", |
|
|
"-c:a", "aac", |
|
|
"-b:a", "128k", |
|
|
"-movflags", "+faststart", |
|
|
output_path |
|
|
] |
|
|
|
|
|
logger.debug(f"ποΈ Compressing {input_path} β {output_path}") |
|
|
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|
|
|
|
|
size_mb = os.path.getsize(output_path) / (1024 * 1024) |
|
|
logger.debug(f"β
Compressed to {size_mb:.2f} MB at Full HD") |
|
|
return output_path |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"β Compression failed: {str(e)}") |
|
|
return input_path |
|
|
|
|
|
def _get_caption_style(self): |
|
|
if not get_config_value("current_caption_style"): |
|
|
caption_style = get_config_value("caption_style", "random") |
|
|
|
|
|
styles = { |
|
|
"caption_1": (caption_style_1, 3), |
|
|
"caption_style_1": (caption_style_1, 3), |
|
|
"caption_2": (caption_style_2, 3), |
|
|
"caption_style_2": (caption_style_2, 3), |
|
|
"caption_3": (caption_style_3, 3), |
|
|
"caption_style_3": (caption_style_3, 3), |
|
|
"caption_4": (caption_style_4, 3), |
|
|
"caption_style_4": (caption_style_4, 3), |
|
|
"caption_style_on_screen_text": (caption_style_on_screen_text, 3), |
|
|
"caption_style_on_screen_text_top": (caption_style_on_screen_text_top, 3), |
|
|
} |
|
|
|
|
|
if caption_style == "random": |
|
|
set_config_value("current_caption_style", random.choice(list(styles.values()))) |
|
|
|
|
|
set_config_value("current_caption_style", styles.get(caption_style, random.choice(list(styles.values())))) |
|
|
logger.debug(f'π¨ Selected caption style: {get_config_value("current_caption_style")[0].__name__}') |
|
|
|
|
|
return get_config_value("current_caption_style") |
|
|
|
|
|
async def render_video_cuts( |
|
|
self, |
|
|
music_duration: float, |
|
|
beat_times: list = None, |
|
|
interval: float = None, |
|
|
min_clip_duration: float = 0.0, |
|
|
loop_short_videos: bool = True, |
|
|
) -> str: |
|
|
""" |
|
|
Unified memory-efficient video renderer: |
|
|
- Supports both beat-synced (beat_times) and interval-based (interval) modes |
|
|
- Save each clip to temp file immediately |
|
|
- Use FFmpeg concat demuxer to merge (constant memory) |
|
|
|
|
|
Args: |
|
|
music_duration: Total duration of the output video |
|
|
beat_times: Array of beat timestamps (for beat-synced mode) |
|
|
interval: Fixed interval between cuts (for hard-cut mode) |
|
|
min_clip_duration: Minimum duration for a clip (skips shorter beats) |
|
|
loop_short_videos: If True, loop videos shorter than 4s using reverse |
|
|
|
|
|
Either beat_times OR interval must be provided, not both. |
|
|
""" |
|
|
import subprocess |
|
|
|
|
|
|
|
|
if beat_times is None and interval is None: |
|
|
raise ValueError("Either beat_times or interval must be provided") |
|
|
|
|
|
|
|
|
if interval is not None: |
|
|
beat_times = [] |
|
|
t = 0.0 |
|
|
while t <= music_duration: |
|
|
beat_times.append(t) |
|
|
t += interval |
|
|
if beat_times[-1] < music_duration: |
|
|
beat_times.append(music_duration) |
|
|
beat_times = np.array(beat_times) |
|
|
logger.debug(f"Generated {len(beat_times)} beats at {interval}s intervals") |
|
|
else: |
|
|
|
|
|
if len(beat_times) > 0 and beat_times[0] > 0.0001: |
|
|
beat_times = np.insert(beat_times, 0, 0.0) |
|
|
logger.debug(f"β‘ Inserted virtual beat at 0.0s for intro") |
|
|
|
|
|
if len(beat_times) < 2: |
|
|
raise ValueError("Need at least 2 beat times") |
|
|
|
|
|
temp_clips = [] |
|
|
video_idx = 0 |
|
|
accumulated_deficit = 0.0 |
|
|
videos = get_config_value("visual_assets")["selected_videos"] |
|
|
|
|
|
logger.debug(f"Creating video synced to {len(beat_times)} beats") |
|
|
logger.debug(f"Music duration: {music_duration:.2f}s") |
|
|
|
|
|
try: |
|
|
i = 0 |
|
|
while i < len(beat_times) - 1: |
|
|
if video_idx >= len(videos) or beat_times[i] > music_duration + 2: |
|
|
break |
|
|
|
|
|
required_duration = beat_times[i + 1] - beat_times[i] |
|
|
target_duration = required_duration + accumulated_deficit |
|
|
|
|
|
if target_duration < min_clip_duration and min_clip_duration > 0: |
|
|
accumulated_deficit = target_duration |
|
|
i += 1 |
|
|
continue |
|
|
|
|
|
video_path = videos[video_idx % len(videos)] |
|
|
video_filename = os.path.basename(video_path) |
|
|
|
|
|
try: |
|
|
|
|
|
probe_cmd = ["ffprobe", "-v", "error", "-show_entries", "format=duration", |
|
|
"-of", "default=noprint_wrappers=1:nokey=1", video_path] |
|
|
probe_result = subprocess.run(probe_cmd, capture_output=True, text=True, timeout=10) |
|
|
|
|
|
video_duration_src = target_duration + 1 |
|
|
if probe_result.returncode == 0 and probe_result.stdout.strip(): |
|
|
try: |
|
|
video_duration_src = float(probe_result.stdout.strip()) |
|
|
except ValueError: |
|
|
pass |
|
|
|
|
|
temp_clip_path = os.path.abspath(str(self.temp_dir / f"clip_{video_idx+1:03d}.mp4")) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if loop_short_videos and video_duration_src < 4: |
|
|
|
|
|
vf_filter = ( |
|
|
"[0:v]split=2[a][b];[b]reverse[br];[a][br]concat=n=2:v=1:a=0[loop1];" |
|
|
"[loop1]split=2[c][d];[d]reverse[dr];[c][dr]concat=n=2:v=1:a=0[looped];" |
|
|
"[looped]setpts=PTS-STARTPTS," |
|
|
"scale=1080:1920:force_original_aspect_ratio=increase," |
|
|
"crop=1080:1920,setsar=1:1,format=yuv420p[out]" |
|
|
) |
|
|
use_filter_complex = True |
|
|
|
|
|
max_possible = video_duration_src * 4 |
|
|
trim_duration = min(target_duration, max_possible) |
|
|
elif video_duration_src < target_duration: |
|
|
loop_count = int(target_duration / video_duration_src) + 1 |
|
|
vf_filter = f"loop={loop_count}:size=999:start=0,setpts=PTS-STARTPTS,scale=1080:1920:force_original_aspect_ratio=increase,crop=1080:1920,setsar=1:1,format=yuv420p" |
|
|
use_filter_complex = False |
|
|
trim_duration = target_duration |
|
|
else: |
|
|
vf_filter = "setpts=PTS-STARTPTS,scale=1080:1920:force_original_aspect_ratio=increase,crop=1080:1920,setsar=1:1,format=yuv420p" |
|
|
use_filter_complex = False |
|
|
trim_duration = min(target_duration, video_duration_src) |
|
|
|
|
|
if use_filter_complex: |
|
|
cmd = ["ffmpeg", "-y", "-i", video_path, "-filter_complex", vf_filter, |
|
|
"-map", "[out]", "-t", str(trim_duration), "-c:v", "libx264", |
|
|
"-preset", "ultrafast", "-r", "25", "-pix_fmt", "yuv420p", |
|
|
"-video_track_timescale", "12800", "-an", temp_clip_path] |
|
|
else: |
|
|
cmd = ["ffmpeg", "-y", "-i", video_path, "-t", str(trim_duration), |
|
|
"-vf", vf_filter, "-c:v", "libx264", "-preset", "ultrafast", |
|
|
"-r", "25", "-pix_fmt", "yuv420p", |
|
|
"-video_track_timescale", "12800", "-an", temp_clip_path] |
|
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) |
|
|
|
|
|
if result.returncode != 0: |
|
|
logger.error(f"FFmpeg error for clip {video_idx+1}: {result.stderr}") |
|
|
video_idx += 1 |
|
|
i += 1 |
|
|
continue |
|
|
|
|
|
temp_clips.append(temp_clip_path) |
|
|
accumulated_deficit = max(0, target_duration - trim_duration) |
|
|
|
|
|
total_time = sum(beat_times[j+1] - beat_times[j] for j in range(i+1)) if i < len(beat_times) - 1 else beat_times[i] |
|
|
logger.debug(f"βοΈ CUT {video_idx+1}: Used {trim_duration:.2f}s from {video_filename} | Total time: {total_time:.2f}s") |
|
|
|
|
|
video_idx += 1 |
|
|
i += 1 |
|
|
|
|
|
except subprocess.TimeoutExpired: |
|
|
logger.error(f"Timeout processing {video_path}, skipping...") |
|
|
video_idx += 1 |
|
|
i += 1 |
|
|
except Exception as e: |
|
|
logger.error(f"Error processing video {video_idx}: {e}") |
|
|
video_idx += 1 |
|
|
i += 1 |
|
|
|
|
|
if not temp_clips: |
|
|
raise ValueError("No clips created") |
|
|
|
|
|
concat_file_path = os.path.abspath(str(self.temp_dir / "concat_list.txt")) |
|
|
with open(concat_file_path, "w") as f: |
|
|
for clip_path in temp_clips: |
|
|
f.write(f"file '{clip_path}'\n") |
|
|
|
|
|
output_path = os.path.abspath(str(self.temp_dir / f"merged_{uuid.uuid4().hex[:8]}.mp4")) |
|
|
|
|
|
|
|
|
|
|
|
concat_cmd = [ |
|
|
"ffmpeg", "-y", |
|
|
"-fflags", "+genpts", |
|
|
"-f", "concat", "-safe", "0", "-i", concat_file_path, |
|
|
"-c", "copy", |
|
|
"-avoid_negative_ts", "make_zero", |
|
|
"-t", str(music_duration), |
|
|
"-an", |
|
|
output_path |
|
|
] |
|
|
|
|
|
logger.debug(f"π¬ Merging {len(temp_clips)} clips...") |
|
|
result = subprocess.run(concat_cmd, capture_output=True, text=True, timeout=120) |
|
|
|
|
|
if result.returncode != 0: |
|
|
logger.error(f"FFmpeg concat error: {result.stderr}") |
|
|
raise ValueError(f"Failed to merge clips: {result.stderr}") |
|
|
|
|
|
logger.debug(f"β
Merged video saved: {output_path}") |
|
|
return output_path |
|
|
|
|
|
finally: |
|
|
|
|
|
for clip_path in temp_clips: |
|
|
try: |
|
|
if os.path.exists(clip_path): |
|
|
os.remove(clip_path) |
|
|
except: |
|
|
pass |
|
|
try: |
|
|
if 'concat_file_path' in locals() and os.path.exists(concat_file_path): |
|
|
os.remove(concat_file_path) |
|
|
except: |
|
|
pass |
|
|
|
|
|
def _validate_assets_for_video_only(self) -> bool: |
|
|
"""Validate assets for video-only rendering""" |
|
|
assets = get_config_value("visual_assets") |
|
|
if not assets.get("selected_videos"): |
|
|
logger.error("No selected videos provided") |
|
|
return False |
|
|
|
|
|
|
|
|
if assets.get("hook_video") and not assets["hook_video"].get("local_path"): |
|
|
logger.error("Hook video provided but missing local_path") |
|
|
return False |
|
|
|
|
|
|
|
|
valid_library_videos = [v for v in assets.get("selected_videos", []) if v.get("local_path")] |
|
|
if not valid_library_videos: |
|
|
logger.error("No library videos with local_path") |
|
|
return False |
|
|
|
|
|
return True |
|
|
|
|
|
def _cleanup_temp_files(self, clips: List): |
|
|
"""Clean up temporary video/audio clips""" |
|
|
for clip in clips: |
|
|
try: |
|
|
if hasattr(clip, "close"): |
|
|
clip.close() |
|
|
except Exception as e: |
|
|
|
|
|
pass |
|
|
|
|
|
def __del__(self): |
|
|
"""Cleanup on destruction""" |
|
|
try: |
|
|
import shutil |
|
|
|
|
|
if hasattr(self, "temp_dir") and self.temp_dir.exists() and not get_config_value("test_automation"): |
|
|
shutil.rmtree(self.temp_dir, ignore_errors=True) |
|
|
except Exception: |
|
|
|
|
|
pass |
|
|
|