Tools / src /video_renderer.py
jebin2's picture
refactor: Centralize logger import to src.logger_config across various modules.
f20025d
"""
Production video rendering engine with improved captions and natural speed
"""
# FIX FOR PIL ANTIALIAS ISSUE
import PIL.Image
if not hasattr(PIL.Image, "ANTIALIAS"):
PIL.Image.ANTIALIAS = PIL.Image.LANCZOS
import os
import tempfile
import uuid
from typing import List, Dict, Optional
from pathlib import Path
from moviepy.editor import (
VideoFileClip,
AudioFileClip,
CompositeVideoClip,
concatenate_videoclips,
TextClip,
CompositeAudioClip,
vfx
)
import textwrap
from src.logger_config import logger
from src.utils import reverse_clip as utils_reverse_clip, get_temp_dir
import src.utils as utils
import time
from video_editor.text_clip import create as create_text_clip, group_words_by_time_and_width, caption_style_1, caption_style_2, caption_style_3, caption_style_4, caption_style_on_screen_text, caption_style_on_screen_text_top
import subprocess
import asyncio
import random
from moviepy.video.fx import crop
from src.config import get_config_value, set_config_value
import numpy as np
ALLOWED_BG_MUSIC_VOLUME = 0.08
REVERSE_THRESHOLD = 0.5
HOOK_VIDEO_DURATION = 1.5
HOOK_START_ORIGINAL_CLIP_USED = 0
class VideoRenderer:
def __init__(self):
self.cap_method, self.max_words_per_group = self._get_caption_style()
self.temp_dir = get_temp_dir(prefix="video_renderer_").resolve()
logger.debug(f"Initialized VideoRenderer with temp dir: {self.temp_dir}")
async def render_video_without_audio(self, video_config: Optional[Dict] = None) -> tuple[str, float]:
"""
Render video composition WITHOUT audio and WITHOUT slow-motion
"""
try:
assets = get_config_value("visual_assets")
logger.debug("🎬 Starting video rendering (NO slow-motion)")
if not self._validate_assets_for_video_only():
raise ValueError("Invalid assets provided for video rendering")
# Prepare video clips - NO speed adjustments for natural speed
video_clips = await self._prepare_video_clips_natural_speed()
# Create video sequence with natural speed
# final_video = await self._create_video_sequence_natural_speed(video_clips, video_config)
final_video = concatenate_videoclips(video_clips, method="compose")
final_video = final_video.without_audio()
# Render video WITHOUT audio
output_path = await self._render_video_only(final_video)
video_duration = final_video.duration
# Cleanup
self._cleanup_temp_files(video_clips + [final_video])
logger.debug(f"βœ… Video (no audio) rendered: {output_path}, duration: {video_duration:.2f}s")
return output_path, video_duration
except Exception as e:
logger.error(f"❌ Video rendering (no audio) failed: {e}")
raise
async def _prepare_video_clips_natural_speed(self) -> List[VideoFileClip]:
"""Load and prepare all video clips - NO speed adjustments"""
clips = []
global HOOK_START_ORIGINAL_CLIP_USED
global HOOK_VIDEO_DURATION
try:
assets = get_config_value("visual_assets")
# Load hook video for seamless looping (OPTIONAL)
runway_as_second_ai_video = None
hook_start = None
hook_end = None
if assets.get("hook_video") and assets["hook_video"].get("local_path"):
HOOK_VIDEO_DURATION = 1.5
if get_config_value("use_veo", False):
HOOK_VIDEO_DURATION = 2
hook_clip = VideoFileClip(assets["hook_video"]["veo_video_data"]["local_path"])
runway_as_second_ai_video = VideoFileClip(assets["hook_video"]["local_path"]).subclip(0, 2)
else:
hook_clip = VideoFileClip(assets["hook_video"]["local_path"])
hook_duration = hook_clip.duration
hook_clip = hook_clip.without_audio()
logger.debug(f"πŸ”„ Creating seamless loop from {hook_duration:.2f}s hook video (NATURAL SPEED)")
# Last 1.5 seconds for start
start_segment_begin = max(0, hook_duration - HOOK_VIDEO_DURATION)
hook_start = hook_clip.subclip(start_segment_begin, hook_duration)
logger.debug(f"βœ“ Hook start: {hook_start.duration:.2f}s")
# First 1.5 seconds for end
hook_end_duration = min(HOOK_VIDEO_DURATION, hook_duration)
hook_end = hook_clip.subclip(0, hook_end_duration)
logger.debug(f"βœ“ Hook end: {hook_end.duration:.2f}s")
hook_clip.close()
else:
# No hook - just concatenate videos directly
logger.debug("πŸ“Ή No hook video - concatenating segments directly")
# Combine all tts_script_segment texts into one string
selected_videos = assets["selected_videos"]
all_tts_script_segment = " ".join(
v.get("tts_script_segment", "").strip()
for v in selected_videos
if v.get("tts_script_segment")
)
with AudioFileClip(assets["tts_audio_data"]["local_path"]) as audio_clip:
original_duration = audio_clip.duration
utils.calculate_video_durations(selected_videos, all_tts_script_segment, assets["timed_transcript"], original_duration)
target_size = (1080, 1920)
extra_secs = 0.0
HOOK_START_ORIGINAL_CLIP_USED = 0
# Load library videos - NO speed adjustments
for i, lib_video in enumerate(selected_videos):
if lib_video.get("local_path"):
try:
lib_clip = VideoFileClip(lib_video["local_path"])
original_clip = lib_clip
lib_hook_start = None
lib_hook_end = None
prev_clip = None
prev_clip_file = None
# Only apply hook logic if hook is available
if hook_start and i == 0:
lib_hook_start = hook_start
original_clip = runway_as_second_ai_video if runway_as_second_ai_video else original_clip
if i == 1 and get_config_value("use_veo", False) and runway_as_second_ai_video:
if HOOK_START_ORIGINAL_CLIP_USED < runway_as_second_ai_video.duration-0.5:
original_clip = runway_as_second_ai_video.subclip(HOOK_START_ORIGINAL_CLIP_USED, runway_as_second_ai_video.duration)
if hook_end and i+1 == len(assets.get("selected_videos", [])):
lib_hook_end = hook_end
if len(clips) > 0:
prev_clip = clips[-1][1]
prev_clip_file = selected_videos[-2]["local_path"] if len(selected_videos) > 1 else None
prev_clip, lib_clip, extra_secs = await self._prepare_clip(
lib_video=lib_video,
original_clip_path=lib_video["local_path"],
alternate_url_local_path=lib_video.get("alternate_url_local_path"),
original_clip=original_clip,
lib_hook_start=lib_hook_start,
lib_hook_end=lib_hook_end,
target_duration=lib_video["duration"],
extra_secs=extra_secs,
prev_clip=prev_clip,
prev_clip_file=prev_clip_file
)
if extra_secs > 0: # ignore tiny floating-point diffs
logger.debug(f"⏱️ Added {extra_secs:.2f}s extra to match target duration ({lib_video['duration']:.2f}s)")
if prev_clip and len(clips) > 0:
clip_name, _ = clips[-1]
clips[-1] = (clip_name, prev_clip)
lib_clip = lib_clip.without_audio()
clips.append((f"library_{i}", lib_clip))
video_usage = get_config_value("video_usage_count", {})
video_usage[lib_video['url']] = video_usage.get(lib_video['url'], 0) + 1
set_config_value("video_usage_count", video_usage)
logger.debug(f"βœ“ Loaded library video {i}: {lib_clip.duration:.2f}s (NATURAL SPEED)")
except Exception as e:
import traceback
traceback.print_exc()
logger.error(f"❌ Failed to load library video {i}: {e}")
raise
else:
logger.warning(f"⚠️ Library video {i} missing local_path")
return [clip for _, clip in clips]
except Exception as e:
logger.error(f"❌ Failed to prepare video clips: {e}")
for name, clip in clips:
try:
clip.close()
except:
pass
raise
async def _prepare_clip(self, lib_video, original_clip_path, alternate_url_local_path, original_clip, lib_hook_start, lib_hook_end, target_duration: float, extra_secs, prev_clip, prev_clip_file):
# Validate inputs
if target_duration <= 0:
raise ValueError(f"Invalid target_duration: {target_duration}")
if not original_clip_path or not original_clip:
raise ValueError("Missing required clip parameters")
logger.debug(f"Preparing clip for target duration {target_duration:.2f}s")
# Handle start hook case
if lib_hook_start:
return self._prepare_with_start_hook(
lib_video, original_clip_path, alternate_url_local_path, original_clip, lib_hook_start,
target_duration, prev_clip
)
# Handle end hook case
elif lib_hook_end:
return self._prepare_with_end_hook(
lib_video, original_clip_path, alternate_url_local_path, original_clip, lib_hook_end,
target_duration, extra_secs, prev_clip, prev_clip_file
)
# No hooks - just extend/trim the original clip
else:
logger.debug("No hooks detected, adjusting original clip duration only")
result, extra_secs = self._extend_or_trim_clip(lib_video, original_clip_path, alternate_url_local_path, original_clip, target_duration)
return prev_clip, result, extra_secs
def _prepare_with_start_hook(self, lib_video, original_clip_path, alternate_url_local_path, original_clip, lib_hook_start, target_duration, prev_clip):
"""Handle clip preparation when a start hook is present."""
global HOOK_START_ORIGINAL_CLIP_USED
logger.debug(f"Start hook detected with duration {lib_hook_start.duration:.2f}s")
total_duration = lib_hook_start.duration + original_clip.duration
# Case 1: Target fits within start hook + original clip
if target_duration <= total_duration:
logger.debug("Target duration fits start hook + original clip, concatenating and trimming")
result = concatenate_videoclips([lib_hook_start, original_clip], method="compose").subclip(0, target_duration)
logger.debug(f"Prepared clip duration: {result.duration:.2f}s")
HOOK_START_ORIGINAL_CLIP_USED = max(0, target_duration - lib_hook_start.duration)
return prev_clip, result, 0.0
# Case 2: Need to extend beyond original clip
modified_clip, extra_secs = self._extend_or_trim_clip(lib_video, original_clip_path, alternate_url_local_path, original_clip, target_duration-HOOK_VIDEO_DURATION)
result = concatenate_videoclips([lib_hook_start, modified_clip], method="compose").subclip(0, target_duration)
HOOK_START_ORIGINAL_CLIP_USED = max(0, target_duration - lib_hook_start.duration)
logger.debug(HOOK_START_ORIGINAL_CLIP_USED)
logger.debug(f"Prepared clip duration: {result.duration:.2f}s")
return prev_clip, result, extra_secs
def _prepare_with_end_hook(self, lib_video, original_clip_path, alternate_url_local_path, original_clip, lib_hook_end,
target_duration, extra_secs, prev_clip, prev_clip_file):
"""Handle clip preparation when an end hook is present."""
temp_original_clip = original_clip
logger.debug(f"End hook detected with duration {lib_hook_end.duration:.2f}s")
total_duration = original_clip.duration + lib_hook_end.duration
logger.debug(f"Combined original + end hook duration: {total_duration:.2f}s")
cur_extra_secs = 0.0
# Case 1: Combined duration exceeds target - need to trim
if target_duration <= total_duration:
trim_duration = target_duration - lib_hook_end.duration
if trim_duration > 0:
logger.debug(f"Trimming original clip from {original_clip.duration:.2f}s to {trim_duration:.2f}s to fit end hook")
original_clip = original_clip.subclip(0, trim_duration)
cur_extra_secs = 0.0
else:
# Target shorter than hook β†’ take last part of hook
start_trim = max(0, lib_hook_end.duration - target_duration)
result = lib_hook_end.subclip(start_trim, lib_hook_end.duration)
logger.debug(f"Prepared end-only clip: {result.duration:.2f}s")
return prev_clip, result, 0.0
# Case 2: Combined duration is less than target - need to extend original
elif target_duration > total_duration:
remaining = target_duration - lib_hook_end.duration
logger.debug(f"Original + end hook too short, need to extend original by {remaining:.2f}s")
original_clip, cur_extra_secs = self._extend_or_trim_clip(lib_video, original_clip_path, alternate_url_local_path, original_clip, remaining)
# Case 3: Exact match or after trimming/extending
logger.debug("Concatenating original clip and end hook")
# Handle very short original clips
if original_clip.duration < 1:
if original_clip.duration + extra_secs > 1:
# Determine how much of extra_secs is actually used to extend this clip
possible_new_duration = original_clip.duration + extra_secs
new_duration = min(possible_new_duration, temp_original_clip.duration)
used_extra = max(0.0, new_duration - original_clip.duration)
logger.debug(
f"Extending original clip from {original_clip.duration:.2f}s β†’ {new_duration:.2f}s "
f"(used_extra={used_extra:.2f}s, available_extra={extra_secs:.2f}s)"
)
# Apply the extension
original_clip = temp_original_clip.subclip(0, new_duration)
# Now, trim the previous clip by exactly how much we actually used
new_prev_duration = prev_clip.duration - used_extra
logger.debug(
f"βœ‚οΈ Trimming previous clip by {used_extra:.2f}s β†’ new duration {new_prev_duration:.2f}s"
)
prev_clip = prev_clip.subclip(0, new_prev_duration)
result = concatenate_videoclips([original_clip, lib_hook_end], method="compose").subclip(0, target_duration)
cur_extra_secs = 0.0
else:
if prev_clip:
logger.debug("⚠️ Original clip too short, extending previous clip instead")
prev_clip, extra_secs = self._extend_or_trim_clip(lib_video, None, alternate_url_local_path, prev_clip, prev_clip.duration + original_clip.duration)
result = lib_hook_end.subclip(max(0, lib_hook_end.duration - target_duration), lib_hook_end.duration)
else:
result = concatenate_videoclips([original_clip, lib_hook_end], method="compose").subclip(0, target_duration)
logger.debug(f"Prepared clip duration: {result.duration:.2f}s")
return prev_clip, result, cur_extra_secs
def _extend_or_trim_clip(self, lib_video, original_clip_path, alternate_url_local_path, original_clip, target_duration):
"""
Extend or trim a clip to match target duration.
Returns:
VideoFileClip: The adjusted clip
"""
total_duration = original_clip.duration
# Case 0: Equal
if abs(target_duration - total_duration) < 0.01: # 10ms tolerance
return original_clip, 0.0
# Case 1: Target is less than or equal to clip duration
if target_duration <= total_duration:
logger.debug("Target duration fits original clip, trimming")
return original_clip.subclip(0, target_duration), 0.0
# Case 2: Target is greater than clip duration
elif target_duration > total_duration:
if alternate_url_local_path is None or (target_duration - total_duration <= REVERSE_THRESHOLD): # Small tolerance for floating point
logger.debug("⚠️ Reversing clip.")
# if original_clip_path:
# reversed_clip = self.reverse_clip(original_clip_path)
# else:
reversed_clip = self.reverse_clip(original_clip)
loop_clip = concatenate_videoclips([original_clip, reversed_clip, original_clip, reversed_clip], method="compose")
return loop_clip.subclip(0, target_duration), target_duration - original_clip.duration
else:
logger.debug("⚠️ Using extra clip.")
video_usage = get_config_value("video_usage_count", {})
video_usage[lib_video['alternate_url']] = video_usage.get(lib_video['alternate_url'], 0) + 1
set_config_value("video_usage_count", video_usage)
alternate_clip = VideoFileClip(alternate_url_local_path)
reverse_alternate_clip = self.reverse_clip(alternate_url_local_path)
combined = concatenate_videoclips([original_clip, alternate_clip, reverse_alternate_clip, original_clip], method="compose")
result = combined.subclip(0, target_duration)
extra_secs = max(0.0, target_duration - original_clip.duration - alternate_clip.duration)
return result, extra_secs
def _extend_clip_to_duration(self, original_clip_path, original_clip, target_duration):
"""
Extend a clip to target duration using interpolation, looping, or ping-pong.
Returns:
VideoFileClip: The extended clip
"""
# Try interpolation first
interpolated = None
try:
interpolated_file = utils.interpolate_video(original_clip_path)
if interpolated_file:
interpolated = VideoFileClip(interpolated_file)
if interpolated.duration >= target_duration:
logger.debug("Using interpolated clip for extension")
result = interpolated.subclip(0, target_duration)
logger.debug(f"Prepared clip duration: {result.duration:.2f}s")
return result
# Interpolation wasn't long enough
logger.debug(f"Interpolated clip ({interpolated.duration:.2f}s) still too short")
interpolated.close()
except Exception as e:
logger.warning(f"Interpolation failed: {e}")
if interpolated:
interpolated.close()
# Try looping if the video is loopable
if utils.is_video_loopable(original_clip_path) or utils.is_loopable_phash(original_clip_path):
logger.debug("Original clip is loopable, creating loop extension")
loop_clip = self.loop_clip(original_clip, target_duration)
result = loop_clip.subclip(0, target_duration)
logger.debug(f"Prepared clip duration: {result.duration:.2f}s")
return result
elif utils.is_video_zoomable_tail(original_clip):
loop_clip = self.zoom_clip(original_clip, target_duration)
# Fallback to ping-pong reverse looping
logger.debug("Using ping-pong reverse looping as fallback for extension")
reversed_clip = self.reverse_clip(original_clip_path)
loop_clip = concatenate_videoclips([original_clip, reversed_clip, original_clip, reversed_clip], method="compose")
result = loop_clip.subclip(0, target_duration)
logger.debug(f"Prepared clip duration: {result.duration:.2f}s")
return result
def reverse_clip(self, clip_path):
reversed_clip = VideoFileClip(utils_reverse_clip(clip_path))
return reversed_clip
def loop_clip(self, clip, target_duration):
loop_count = int(target_duration // clip.duration) + 1 # how many loops needed
looped = [clip] * loop_count
combined = concatenate_videoclips(looped, method="compose")
final_clip = combined.subclip(0, target_duration) # trim to exact duration
logger.debug(f"♻️ Looping clip {loop_count}x to reach {target_duration:.2f}s")
return final_clip
def zoom_clip(self, clip, target_duration, zoom_strength):
# Calculate freeze duration
freeze_duration = target_duration - clip.duration
# Freeze the last frame
frozen_frame = clip.to_ImageClip(t=clip.duration - 0.01).set_duration(freeze_duration)
# Concatenate
extended_clip = concatenate_videoclips([clip, frozen_frame])
# Apply zoom using fl_image (frame-by-frame transformation)
def apply_zoom(get_frame, t):
frame = get_frame(t)
zoom_factor = 1 + (zoom_strength - 1) * (t / target_duration)
h, w = frame.shape[:2]
new_h, new_w = int(h * zoom_factor), int(w * zoom_factor)
# Resize frame
from scipy.ndimage import zoom as scipy_zoom
zoomed_frame = scipy_zoom(frame, (zoom_factor, zoom_factor, 1), order=1)
# Crop to center
y_start = (zoomed_frame.shape[0] - h) // 2
x_start = (zoomed_frame.shape[1] - w) // 2
cropped = zoomed_frame[y_start:y_start+h, x_start:x_start+w]
return cropped
zoomed = extended_clip.fl(apply_zoom)
return zoomed
async def _create_video_sequence_natural_speed(
self, video_clips: List[VideoFileClip], video_config: Optional[Dict]
) -> VideoFileClip:
"""Create video sequence from clips - NO SLOW MOTION, allow longer duration"""
try:
if not video_clips:
raise ValueError("No video clips available for sequence")
# MORE FLEXIBLE duration: 11-15 seconds to accommodate natural speed
TARGET_MIN_DURATION = 11.0
TARGET_MAX_DURATION = 15.0
# Identify clips by structure
hook_start = None
hook_end = None
library_clips = []
for clip in video_clips:
library_clips.append(clip)
# logger.debug(
# f"βœ“ Identified: hook_start={hook_start.duration if hook_start else 0:.2f}s, "
# f"hook_end={hook_end.duration if hook_end else 0:.2f}s, "
# f"library_clips={len(library_clips)}"
# )
# Calculate current library duration
library_duration = sum(clip.duration for clip in library_clips)
# hook_total = (hook_start.duration if hook_start else 0) + (hook_end.duration if hook_end else 0)
hook_total = 0
# logger.debug(f"πŸ“Š Hook total: {hook_total:.2f}s, Library total: {library_duration:.2f}s")
# NEW: Add more library videos if duration is too short
if (hook_total + library_duration) < TARGET_MIN_DURATION:
logger.debug(f"πŸ“₯ Duration too short ({hook_total + library_duration:.2f}s), adding more videos...")
# We'll handle this by duplicating some clips to reach target
additional_clips_needed = []
current_duration = hook_total + library_duration
while current_duration < TARGET_MIN_DURATION and library_clips:
# Add copies of existing clips to reach target
for clip in library_clips:
if current_duration < TARGET_MIN_DURATION:
additional_clips_needed.append(clip)
current_duration += clip.duration
else:
break
library_clips.extend(additional_clips_needed)
logger.debug(
f"βœ“ Added {len(additional_clips_needed)} additional clips, new duration: {current_duration:.2f}s"
)
# NEW: Only speed up if absolutely necessary, never slow down
total_current_duration = hook_total + sum(clip.duration for clip in library_clips)
if total_current_duration > TARGET_MAX_DURATION:
# Only speed up if significantly over target
speed_factor = total_current_duration / TARGET_MAX_DURATION
if speed_factor > 1.1: # Only speed up if more than 10% over
logger.debug(f"⚑ Slightly speeding up clips by {speed_factor:.2f}x to fit target")
adjusted_library = []
for clip in library_clips:
sped_clip = clip.fx(lambda c: c.speedx(speed_factor))
adjusted_library.append(sped_clip)
library_clips = adjusted_library
else:
logger.debug("βœ“ Duration within acceptable range, keeping natural speed")
else:
logger.debug("βœ… Keeping all videos at natural speed")
# Build sequence
sequence_clips = []
if hook_start:
sequence_clips.append(hook_start)
logger.debug(f" Added hook_start: {hook_start.duration:.2f}s")
for i, clip in enumerate(library_clips):
sequence_clips.append(clip)
logger.debug(f" Added library_{i}: {clip.duration:.2f}s")
if hook_end:
sequence_clips.append(hook_end)
logger.debug(f" Added hook_end: {hook_end.duration:.2f}s")
# Calculate total duration
total_duration = sum(clip.duration for clip in sequence_clips)
logger.debug(
f"πŸ“Š Total video sequence duration: {total_duration:.2f}s (target: {TARGET_MIN_DURATION}-{TARGET_MAX_DURATION}s)"
)
# FIXED: Remove transition_duration parameter
final_sequence = concatenate_videoclips(sequence_clips, method="compose")
logger.debug(f"βœ… Created video sequence with NATURAL SPEED: {final_sequence.duration:.2f}s")
return final_sequence
except Exception as e:
logger.error(f"❌ Failed to create video sequence: {e}")
raise
async def _add_timed_subtitles(self, video_path: str, group_all: bool = False, position: int = None) -> CompositeVideoClip:
"""Add subtitles using timed transcript from Speech-to-Text"""
try:
assets = get_config_value("visual_assets")
timed_words = assets.get("timed_transcript", [])
if not timed_words:
logger.warning("⚠️ No timed transcript available, using fallback subtitles")
raise ValueError("No timed transcript available for subtitles")
# return await self._add_fallback_subtitles(video_clip, assets["tts_script"])
logger.debug(f"πŸ“ Creating timed subtitles from {len(timed_words)} words")
video_clip = VideoFileClip(video_path)
text_clips = []
total_duration = video_clip.duration
target_width, target_height = video_clip.size
# Group words into phrases based on timing and punctuation
phrases = group_words_by_time_and_width(timed_words, target_width, target_height, max_words_per_group = 2, group_all=group_all)
logger.debug(f"πŸ“ Created {len(phrases)} timed phrases")
for i, phrase in enumerate(phrases):
word_timings = phrase["word_timings"]
start_time = phrase["start_time"]
end_time = phrase["end_time"]
if i+1 < len(phrases):
phrase_duration = phrases[i+1]["start_time"] - start_time
else:
phrase_duration = total_duration - start_time
# phrase_duration = end_time - start_time
text = phrase["text"]
if phrase_duration <= 0:
continue
# IMPROVED: Better styling as requested
text_clip = self._create_styled_text_clip(
text=text,
duration=phrase_duration,
target_width=target_width,
target_height=target_height,
start_time=start_time,
word_timings=word_timings
)
if text_clip:
text_clips.append(text_clip)
logger.debug(f"πŸ“Š Created {len(text_clips)} timed subtitle clips")
if text_clips:
final_video = CompositeVideoClip([video_clip] + text_clips)
logger.debug(f"βœ… Added {len(text_clips)} timed subtitle segments")
return await self._render_video_only(final_video.subclip(0, total_duration))
else:
return await self._render_video_only(video_clip)
except Exception as e:
logger.error(f"❌ Failed to add timed subtitles: {e}")
raise
# return await self._add_fallback_subtitles(video_clip, assets["tts_script"])
def _create_styled_text_clip(
self, text: str, duration: float, target_width: int, target_height: int, start_time: float, word_timings=None
) -> Optional[TextClip]:
"""Create a styled text clip with IMPROVED appearance"""
try:
# IMPROVED: Slightly smaller font sizes as requested
max_chars_per_line = 22
wrapped_text = "\n".join(textwrap.wrap(text, width=max_chars_per_line))
line_count = len(wrapped_text.split("\n"))
# Adjusted font sizes (smaller to avoid hitting top safe zone)
if line_count > 2:
fontsize = 55 # Reduced from 75 to keep 3 lines lower
elif line_count > 1:
fontsize = 75 # Moderate size for 2 lines
else:
fontsize = 85 # Large size for single line
text_clip = create_text_clip(
text,
duration,
target_width,
target_height,
start_time,
fontsize,
word_timings,
**self.cap_method()
)
return text_clip
# IMPROVED: Increased font weight with thicker stroke
text_clip = TextClip(
txt=wrapped_text,
fontsize=fontsize,
color="white",
font="Fonts/Bungee-Regular.ttf", # Bold font for better weight
stroke_color="black",
stroke_width=5, # Thicker stroke for better readability
method="caption",
size=(int(target_width * 0.90), None), # Slightly narrower
align="center",
)
# IMPROVED: Raised position (higher on screen)
vertical_position = int(target_height * 0.65) # Raised from 0.72 to 0.65
text_clip = text_clip.set_position(("center", vertical_position))
text_clip = text_clip.set_start(start_time)
text_clip = text_clip.set_duration(duration)
# Shorter fade for quicker transitions
fade_duration = min(0.15, duration / 6) # Shorter fades
text_clip = text_clip.crossfadein(fade_duration).crossfadeout(fade_duration)
return text_clip
except Exception as e:
logger.error(f"❌ Failed to create styled text clip: {e}")
raise
def _group_words_into_timed_phrases(self, words: List[Dict]) -> List[Dict]:
"""Group timed words into readable phrases"""
phrases = []
current_phrase = []
current_start = None
for word in words:
word_text = word["word"]
start_time = word["start_time"]
end_time = word["end_time"]
if not current_phrase:
current_start = start_time
current_phrase.append(word_text)
# End phrase on punctuation or after 3-4 words
has_punctuation = any(p in word_text for p in [".", ",", "!", "?", ";", ":"])
too_long = len(current_phrase) >= 4
if has_punctuation or too_long:
phrases.append({"text": " ".join(current_phrase), "start_time": current_start, "end_time": end_time})
current_phrase = []
current_start = None
# Add remaining words
if current_phrase and current_start is not None:
phrases.append(
{
"text": " ".join(current_phrase),
"start_time": current_start,
"end_time": words[-1]["end_time"] if words else current_start + 2.0,
}
)
return phrases
async def _add_fallback_subtitles(self, video_clip: VideoFileClip, script: str) -> CompositeVideoClip:
"""Fallback subtitles when timed transcript is unavailable"""
try:
words = self._split_script_into_words(script)
text_clips = []
total_duration = video_clip.duration
target_width, target_height = video_clip.size
logger.debug(f"πŸ“ Using fallback subtitles: {len(words)} words")
# phrases = self._group_words_into_phrases(words, max_words=3)
phrases = group_words_by_time_and_width(script, target_width, target_height, max_words_per_group=self.max_words_per_group)
logger.debug(f"πŸ“ Grouped into {len(phrases)} phrases")
duration_per_phrase = total_duration / len(phrases)
for i, phrase in enumerate(phrases):
start_time = i * duration_per_phrase
phrase_duration = duration_per_phrase
text_clip = self._create_styled_text_clip(
text=phrase,
duration=phrase_duration,
target_width=target_width,
target_height=target_height,
start_time=start_time,
)
if text_clip:
text_clips.append(text_clip)
logger.debug(f"πŸ“Š Created {len(text_clips)} fallback subtitle clips")
if text_clips:
final_video = CompositeVideoClip([video_clip] + text_clips)
logger.debug(f"βœ… Added {len(text_clips)} fallback subtitle segments")
return final_video
else:
return video_clip
except Exception as e:
logger.error(f"❌ Fallback subtitles failed: {e}")
raise
def _split_script_into_words(self, script: str) -> List[str]:
"""Split script into individual words"""
import re
script = re.sub(r"\s+", " ", script).strip()
return script.split()
def _group_words_into_phrases(self, words: List[str], max_words: int = 3) -> List[str]:
"""Group words into small readable phrases"""
phrases = []
current_phrase = []
for word in words:
current_phrase.append(word)
has_punctuation = any(p in word for p in [".", ",", "!", "?", ";"])
if len(current_phrase) >= max_words or has_punctuation:
phrases.append(" ".join(current_phrase))
current_phrase = []
if current_phrase:
phrases.append(" ".join(current_phrase))
return phrases
async def add_audio_to_video(self, video_path: str) -> str:
"""
Add audio track to pre-rendered video (NO speedup - video is already correct duration)
"""
try:
assets = get_config_value("visual_assets")
logger.debug("πŸ”Š Adding audio to rendered video")
# Load the video
video_clip = VideoFileClip(video_path)
# Prepare audio clips
audio_clips = await self._prepare_audio_clips(assets, video_clip.duration)
# Add audio track
video_with_audio = await self._add_audio_track(video_clip, audio_clips)
output_path = await self.render_video_final(video_with_audio)
# Cleanup
video_clip.close()
if video_with_audio != video_clip:
video_with_audio.close()
logger.debug(f"βœ… Final video with audio: {output_path}")
return output_path
except Exception as e:
logger.error(f"❌ Failed to add audio to video: {e}")
raise
def get_audio_rms(self, audio_clip, sample_duration=1.0):
"""Calculate RMS of an audio clip by sampling the first few seconds."""
try:
duration = min(sample_duration, audio_clip.duration)
audio_segment = audio_clip.subclip(0, duration)
audio_array = audio_segment.to_soundarray(fps=44100)
return np.sqrt(np.mean(audio_array ** 2))
except Exception as e:
logger.warning(f"⚠️ Failed to compute RMS: {e}")
return 0.0
def calculate_bg_volume(self, main_rms, bg_rms):
"""
Dynamically calculate background music volume based on TTS and background RMS.
Tuned for boosted TTS (+5 dB from Google TTS).
If main_rms is 0 (no TTS), treat background as main audio with high volume.
"""
# If no main audio (TTS), treat background as primary audio
if main_rms == 0 or main_rms < 0.001:
logger.debug("πŸ”Š No main audio detected, treating background as primary audio with high volume")
return 0.85 # High volume for background as main audio
# Base volume curve (a bit higher overall)
if main_rms > 0.04: # Very strong TTS
base_volume = 0.35
elif main_rms > 0.02: # Normal TTS
base_volume = 0.45
else: # Soft TTS
base_volume = 0.55
# Adjust further based on background loudness
if bg_rms > 0.15: # Very loud music file
bg_volume = base_volume * 0.25
elif bg_rms > 0.08:
bg_volume = base_volume * 0.45
elif bg_rms > 0.03:
bg_volume = base_volume * 0.7
elif bg_rms > 0.01:
bg_volume = base_volume * 0.9
else:
bg_volume = base_volume * 1.1
# return max(ALLOWED_BG_MUSIC_VOLUME, min(1.0, bg_volume))
return max(0.0, min(bg_volume, ALLOWED_BG_MUSIC_VOLUME))
async def _prepare_audio_clips(self, assets: Dict, target_duration: float) -> List[AudioFileClip]:
"""
Load TTS and background music, trim to match video, and dynamically adjust background volume.
"""
clips = []
try:
# --- Load TTS Audio (Main Voice) ---
tts_clip = None
if assets.get("tts_audio_data") and assets["tts_audio_data"].get("local_path"):
try:
tts_clip = AudioFileClip(assets["tts_audio_data"]["local_path"])
if tts_clip.duration > 0:
if tts_clip.duration > target_duration:
logger.debug(f"⚠️ Trimming TTS: {tts_clip.duration:.2f}s β†’ {target_duration:.2f}s")
tts_clip = tts_clip.subclip(0, target_duration)
elif tts_clip.duration < target_duration:
logger.debug(f"⚠️ TTS shorter: {tts_clip.duration:.2f}s < {target_duration:.2f}s")
clips.append(("tts", tts_clip))
logger.debug(f"βœ“ Loaded TTS at full volume ({tts_clip.duration:.2f}s)")
else:
logger.warning("⚠️ TTS audio has zero duration")
tts_clip.close()
except Exception as e:
logger.error(f"❌ Failed to load TTS audio: {e}")
raise
# --- Load Background Music (Dynamic Volume) ---
if assets.get("background_music_local"):
try:
bg_clip = AudioFileClip(assets["background_music_local"])
if bg_clip.duration > 0:
if bg_clip.duration > target_duration:
bg_clip = bg_clip.subclip(0, target_duration)
logger.debug(f"βœ“ Trimmed background to {target_duration:.2f}s")
# Compute RMS levels
tts_rms = self.get_audio_rms(tts_clip) if tts_clip else 0.0
bg_rms = self.get_audio_rms(bg_clip)
# Dynamic volume adjustment
bg_volume = self.calculate_bg_volume(tts_rms, bg_rms)
bg_clip = bg_clip.volumex(bg_volume)
clips.append(("background", bg_clip))
logger.debug(
f"βœ“ Loaded background (RMS: {bg_rms:.4f}) at dynamic volume {bg_volume:.2f} ({bg_clip.duration:.2f}s)"
)
else:
logger.warning("⚠️ Background music has zero duration")
bg_clip.close()
except Exception as e:
logger.error(f"❌ Failed to load background music: {e}")
raise
return [clip for _, clip in clips]
except Exception as e:
logger.error(f"❌ Failed to prepare audio clips: {e}")
for name, clip in clips:
try:
clip.close()
except:
pass
raise
async def _add_audio_track(self, video_clip: VideoFileClip, audio_clips: List[AudioFileClip]) -> VideoFileClip:
"""Add full audio track"""
if not audio_clips:
return video_clip
try:
valid_audio_clips = [clip for clip in audio_clips if clip.duration > 0]
if not valid_audio_clips:
return video_clip
mixed_audio = CompositeAudioClip(valid_audio_clips)
mixed_audio = mixed_audio.subclip(0, min(video_clip.duration, mixed_audio.duration))
video_with_audio = video_clip.set_audio(mixed_audio)
logger.debug(f"βœ… Added audio track")
return video_with_audio
except Exception as e:
logger.error(f"❌ Failed to add audio track: {e}")
raise
async def render_video_final(self, video_clip) -> str:
"""Render final video clip to file"""
try:
output_path = self.temp_dir / f"{uuid.uuid4().hex}.mp4"
video_clip.write_videofile(
str(output_path),
codec="libx264",
audio_codec="aac",
fps=25,
verbose=False,
logger=None,
ffmpeg_params=["-pix_fmt", "yuv420p"]
)
video_clip.close()
return str(output_path)
except Exception as e:
logger.error(f"Final video render failed: {e}")
if "video_clip" in locals():
video_clip.close()
raise
async def _render_video_only(self, video_clip: VideoFileClip) -> str:
"""Render video WITHOUT audio"""
unique_id = uuid.uuid4().hex[:8]
filename = f"video_no_audio_{unique_id}.mp4"
output_path = self.temp_dir / filename
try:
logger.debug(f"πŸ“Ή Rendering video (no audio): {filename}")
video_clip.write_videofile(
str(output_path),
codec="libx264",
fps=25,
verbose=False,
logger=None,
ffmpeg_params=["-pix_fmt", "yuv420p"]
)
return str(output_path)
except Exception as e:
logger.error(f"❌ Video rendering failed: {e}")
raise
finally:
video_clip.close()
def compress(self, input_path: str):
"""
Compress video without losing visible quality.
Keeps full HD resolution.
"""
try:
stem = Path(input_path).stem
output_path = f"/tmp/{stem}_compressed.mp4"
# Use CRF-based quality control
# - CRF 23–28: lower is higher quality (and larger size)
# - Preset 'slow' gives better compression ratio
cmd = [
"ffmpeg", "-y",
"-i", input_path,
"-vf", "scale=1080:1920", # keep full HD
"-c:v", "libx264",
"-preset", "slow",
"-crf", "26", # try 26; adjust 23–28 if too large/small
"-c:a", "aac",
"-b:a", "128k", # clean, compact audio
"-movflags", "+faststart", # better for web playback
output_path
]
logger.debug(f"🎞️ Compressing {input_path} β†’ {output_path}")
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
size_mb = os.path.getsize(output_path) / (1024 * 1024)
logger.debug(f"βœ… Compressed to {size_mb:.2f} MB at Full HD")
return output_path
except Exception as e:
logger.error(f"❌ Compression failed: {str(e)}")
return input_path
def _get_caption_style(self):
if not get_config_value("current_caption_style"):
caption_style = get_config_value("caption_style", "random")
styles = {
"caption_1": (caption_style_1, 3),
"caption_style_1": (caption_style_1, 3),
"caption_2": (caption_style_2, 3),
"caption_style_2": (caption_style_2, 3),
"caption_3": (caption_style_3, 3),
"caption_style_3": (caption_style_3, 3),
"caption_4": (caption_style_4, 3),
"caption_style_4": (caption_style_4, 3),
"caption_style_on_screen_text": (caption_style_on_screen_text, 3),
"caption_style_on_screen_text_top": (caption_style_on_screen_text_top, 3),
}
if caption_style == "random":
set_config_value("current_caption_style", random.choice(list(styles.values())))
set_config_value("current_caption_style", styles.get(caption_style, random.choice(list(styles.values()))))
logger.debug(f'🎨 Selected caption style: {get_config_value("current_caption_style")[0].__name__}')
return get_config_value("current_caption_style")
async def render_video_cuts(
self,
music_duration: float,
beat_times: list = None,
interval: float = None,
min_clip_duration: float = 0.0,
loop_short_videos: bool = True,
) -> str:
"""
Unified memory-efficient video renderer:
- Supports both beat-synced (beat_times) and interval-based (interval) modes
- Save each clip to temp file immediately
- Use FFmpeg concat demuxer to merge (constant memory)
Args:
music_duration: Total duration of the output video
beat_times: Array of beat timestamps (for beat-synced mode)
interval: Fixed interval between cuts (for hard-cut mode)
min_clip_duration: Minimum duration for a clip (skips shorter beats)
loop_short_videos: If True, loop videos shorter than 4s using reverse
Either beat_times OR interval must be provided, not both.
"""
import subprocess
# Validate inputs
if beat_times is None and interval is None:
raise ValueError("Either beat_times or interval must be provided")
# If interval is provided, generate synthetic beat times
if interval is not None:
beat_times = []
t = 0.0
while t <= music_duration:
beat_times.append(t)
t += interval
if beat_times[-1] < music_duration:
beat_times.append(music_duration)
beat_times = np.array(beat_times)
logger.debug(f"Generated {len(beat_times)} beats at {interval}s intervals")
else:
# Ensure first beat is at 0
if len(beat_times) > 0 and beat_times[0] > 0.0001:
beat_times = np.insert(beat_times, 0, 0.0)
logger.debug(f"⚑ Inserted virtual beat at 0.0s for intro")
if len(beat_times) < 2:
raise ValueError("Need at least 2 beat times")
temp_clips = []
video_idx = 0
accumulated_deficit = 0.0
videos = get_config_value("visual_assets")["selected_videos"]
logger.debug(f"Creating video synced to {len(beat_times)} beats")
logger.debug(f"Music duration: {music_duration:.2f}s")
try:
i = 0
while i < len(beat_times) - 1:
if video_idx >= len(videos) or beat_times[i] > music_duration + 2:
break
required_duration = beat_times[i + 1] - beat_times[i]
target_duration = required_duration + accumulated_deficit
if target_duration < min_clip_duration and min_clip_duration > 0:
accumulated_deficit = target_duration
i += 1
continue
video_path = videos[video_idx % len(videos)]
video_filename = os.path.basename(video_path)
try:
# Get video duration
probe_cmd = ["ffprobe", "-v", "error", "-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1", video_path]
probe_result = subprocess.run(probe_cmd, capture_output=True, text=True, timeout=10)
video_duration_src = target_duration + 1
if probe_result.returncode == 0 and probe_result.stdout.strip():
try:
video_duration_src = float(probe_result.stdout.strip())
except ValueError:
pass
temp_clip_path = os.path.abspath(str(self.temp_dir / f"clip_{video_idx+1:03d}.mp4"))
# Determine filter
# IMPORTANT: All clips must have identical properties to avoid black frames during concat
# - setsar=1:1 ensures consistent sample aspect ratio
# - format=yuv420p ensures consistent pixel format
# - fps=25 ensures consistent frame rate
if loop_short_videos and video_duration_src < 4:
# Ping-pong loop (Forward-Reverse-Forward-Reverse) -> 4x duration
vf_filter = (
"[0:v]split=2[a][b];[b]reverse[br];[a][br]concat=n=2:v=1:a=0[loop1];"
"[loop1]split=2[c][d];[d]reverse[dr];[c][dr]concat=n=2:v=1:a=0[looped];"
"[looped]setpts=PTS-STARTPTS,"
"scale=1080:1920:force_original_aspect_ratio=increase,"
"crop=1080:1920,setsar=1:1,format=yuv420p[out]"
)
use_filter_complex = True
# Allow utilizing the full 4x duration if needed
max_possible = video_duration_src * 4
trim_duration = min(target_duration, max_possible)
elif video_duration_src < target_duration:
loop_count = int(target_duration / video_duration_src) + 1
vf_filter = f"loop={loop_count}:size=999:start=0,setpts=PTS-STARTPTS,scale=1080:1920:force_original_aspect_ratio=increase,crop=1080:1920,setsar=1:1,format=yuv420p"
use_filter_complex = False
trim_duration = target_duration
else:
vf_filter = "setpts=PTS-STARTPTS,scale=1080:1920:force_original_aspect_ratio=increase,crop=1080:1920,setsar=1:1,format=yuv420p"
use_filter_complex = False
trim_duration = min(target_duration, video_duration_src)
if use_filter_complex:
cmd = ["ffmpeg", "-y", "-i", video_path, "-filter_complex", vf_filter,
"-map", "[out]", "-t", str(trim_duration), "-c:v", "libx264",
"-preset", "ultrafast", "-r", "25", "-pix_fmt", "yuv420p",
"-video_track_timescale", "12800", "-an", temp_clip_path]
else:
cmd = ["ffmpeg", "-y", "-i", video_path, "-t", str(trim_duration),
"-vf", vf_filter, "-c:v", "libx264", "-preset", "ultrafast",
"-r", "25", "-pix_fmt", "yuv420p",
"-video_track_timescale", "12800", "-an", temp_clip_path]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
if result.returncode != 0:
logger.error(f"FFmpeg error for clip {video_idx+1}: {result.stderr}")
video_idx += 1
i += 1
continue
temp_clips.append(temp_clip_path)
accumulated_deficit = max(0, target_duration - trim_duration)
total_time = sum(beat_times[j+1] - beat_times[j] for j in range(i+1)) if i < len(beat_times) - 1 else beat_times[i]
logger.debug(f"βœ‚οΈ CUT {video_idx+1}: Used {trim_duration:.2f}s from {video_filename} | Total time: {total_time:.2f}s")
video_idx += 1
i += 1
except subprocess.TimeoutExpired:
logger.error(f"Timeout processing {video_path}, skipping...")
video_idx += 1
i += 1
except Exception as e:
logger.error(f"Error processing video {video_idx}: {e}")
video_idx += 1
i += 1
if not temp_clips:
raise ValueError("No clips created")
concat_file_path = os.path.abspath(str(self.temp_dir / "concat_list.txt"))
with open(concat_file_path, "w") as f:
for clip_path in temp_clips:
f.write(f"file '{clip_path}'\n")
output_path = os.path.abspath(str(self.temp_dir / f"merged_{uuid.uuid4().hex[:8]}.mp4"))
# Use stream copy since all clips are already encoded with identical properties
# This avoids re-encoding artifacts and timing issues that cause black frames
concat_cmd = [
"ffmpeg", "-y",
"-fflags", "+genpts", # Generate fresh PTS for clean concatenation
"-f", "concat", "-safe", "0", "-i", concat_file_path,
"-c", "copy", # Stream copy - no re-encoding
"-avoid_negative_ts", "make_zero", # Fix timestamp issues at clip boundaries
"-t", str(music_duration),
"-an",
output_path
]
logger.debug(f"🎬 Merging {len(temp_clips)} clips...")
result = subprocess.run(concat_cmd, capture_output=True, text=True, timeout=120)
if result.returncode != 0:
logger.error(f"FFmpeg concat error: {result.stderr}")
raise ValueError(f"Failed to merge clips: {result.stderr}")
logger.debug(f"βœ… Merged video saved: {output_path}")
return output_path
finally:
# Clean up temp clips
for clip_path in temp_clips:
try:
if os.path.exists(clip_path):
os.remove(clip_path)
except:
pass
try:
if 'concat_file_path' in locals() and os.path.exists(concat_file_path):
os.remove(concat_file_path)
except:
pass
def _validate_assets_for_video_only(self) -> bool:
"""Validate assets for video-only rendering"""
assets = get_config_value("visual_assets")
if not assets.get("selected_videos"):
logger.error("No selected videos provided")
return False
# Hook video is OPTIONAL for VoiceOver pipeline
if assets.get("hook_video") and not assets["hook_video"].get("local_path"):
logger.error("Hook video provided but missing local_path")
return False
# Check that at least some library videos have local_path
valid_library_videos = [v for v in assets.get("selected_videos", []) if v.get("local_path")]
if not valid_library_videos:
logger.error("No library videos with local_path")
return False
return True
def _cleanup_temp_files(self, clips: List):
"""Clean up temporary video/audio clips"""
for clip in clips:
try:
if hasattr(clip, "close"):
clip.close()
except Exception as e:
# Silently ignore cleanup errors
pass
def __del__(self):
"""Cleanup on destruction"""
try:
import shutil
if hasattr(self, "temp_dir") and self.temp_dir.exists() and not get_config_value("test_automation"):
shutil.rmtree(self.temp_dir, ignore_errors=True)
except Exception:
# Silently ignore cleanup errors
pass