""" Production video rendering engine with improved captions and natural speed """ # FIX FOR PIL ANTIALIAS ISSUE import PIL.Image if not hasattr(PIL.Image, "ANTIALIAS"): PIL.Image.ANTIALIAS = PIL.Image.LANCZOS import os import tempfile import uuid from typing import List, Dict, Optional from pathlib import Path from moviepy.editor import ( VideoFileClip, AudioFileClip, CompositeVideoClip, concatenate_videoclips, TextClip, CompositeAudioClip, vfx ) import textwrap from src.logger_config import logger from src.utils import reverse_clip as utils_reverse_clip, get_temp_dir import src.utils as utils import time from video_editor.text_clip import create as create_text_clip, group_words_by_time_and_width, caption_style_1, caption_style_2, caption_style_3, caption_style_4, caption_style_on_screen_text, caption_style_on_screen_text_top import subprocess import asyncio import random from moviepy.video.fx import crop from src.config import get_config_value, set_config_value import numpy as np ALLOWED_BG_MUSIC_VOLUME = 0.08 REVERSE_THRESHOLD = 0.5 HOOK_VIDEO_DURATION = 1.5 HOOK_START_ORIGINAL_CLIP_USED = 0 class VideoRenderer: def __init__(self): self.cap_method, self.max_words_per_group = self._get_caption_style() self.temp_dir = get_temp_dir(prefix="video_renderer_").resolve() logger.debug(f"Initialized VideoRenderer with temp dir: {self.temp_dir}") async def render_video_without_audio(self, video_config: Optional[Dict] = None) -> tuple[str, float]: """ Render video composition WITHOUT audio and WITHOUT slow-motion """ try: assets = get_config_value("visual_assets") logger.debug("🎬 Starting video rendering (NO slow-motion)") if not self._validate_assets_for_video_only(): raise ValueError("Invalid assets provided for video rendering") # Prepare video clips - NO speed adjustments for natural speed video_clips = await self._prepare_video_clips_natural_speed() # Create video sequence with natural speed # final_video = await self._create_video_sequence_natural_speed(video_clips, video_config) final_video = concatenate_videoclips(video_clips, method="compose") final_video = final_video.without_audio() # Render video WITHOUT audio output_path = await self._render_video_only(final_video) video_duration = final_video.duration # Cleanup self._cleanup_temp_files(video_clips + [final_video]) logger.debug(f"✅ Video (no audio) rendered: {output_path}, duration: {video_duration:.2f}s") return output_path, video_duration except Exception as e: logger.error(f"❌ Video rendering (no audio) failed: {e}") raise async def _prepare_video_clips_natural_speed(self) -> List[VideoFileClip]: """Load and prepare all video clips - NO speed adjustments""" clips = [] global HOOK_START_ORIGINAL_CLIP_USED global HOOK_VIDEO_DURATION try: assets = get_config_value("visual_assets") # Load hook video for seamless looping (OPTIONAL) runway_as_second_ai_video = None hook_start = None hook_end = None if assets.get("hook_video") and assets["hook_video"].get("local_path"): HOOK_VIDEO_DURATION = 1.5 if get_config_value("use_veo", False): HOOK_VIDEO_DURATION = 2 hook_clip = VideoFileClip(assets["hook_video"]["veo_video_data"]["local_path"]) runway_as_second_ai_video = VideoFileClip(assets["hook_video"]["local_path"]).subclip(0, 2) else: hook_clip = VideoFileClip(assets["hook_video"]["local_path"]) hook_duration = hook_clip.duration hook_clip = hook_clip.without_audio() logger.debug(f"🔄 Creating seamless loop from {hook_duration:.2f}s hook video (NATURAL SPEED)") # Last 1.5 seconds for start start_segment_begin = max(0, hook_duration - HOOK_VIDEO_DURATION) hook_start = hook_clip.subclip(start_segment_begin, hook_duration) logger.debug(f"✓ Hook start: {hook_start.duration:.2f}s") # First 1.5 seconds for end hook_end_duration = min(HOOK_VIDEO_DURATION, hook_duration) hook_end = hook_clip.subclip(0, hook_end_duration) logger.debug(f"✓ Hook end: {hook_end.duration:.2f}s") hook_clip.close() else: # No hook - just concatenate videos directly logger.debug("📹 No hook video - concatenating segments directly") # Combine all tts_script_segment texts into one string selected_videos = assets["selected_videos"] all_tts_script_segment = " ".join( v.get("tts_script_segment", "").strip() for v in selected_videos if v.get("tts_script_segment") ) with AudioFileClip(assets["tts_audio_data"]["local_path"]) as audio_clip: original_duration = audio_clip.duration utils.calculate_video_durations(selected_videos, all_tts_script_segment, assets["timed_transcript"], original_duration) target_size = (1080, 1920) extra_secs = 0.0 HOOK_START_ORIGINAL_CLIP_USED = 0 # Load library videos - NO speed adjustments for i, lib_video in enumerate(selected_videos): if lib_video.get("local_path"): try: lib_clip = VideoFileClip(lib_video["local_path"]) original_clip = lib_clip lib_hook_start = None lib_hook_end = None prev_clip = None prev_clip_file = None # Only apply hook logic if hook is available if hook_start and i == 0: lib_hook_start = hook_start original_clip = runway_as_second_ai_video if runway_as_second_ai_video else original_clip if i == 1 and get_config_value("use_veo", False) and runway_as_second_ai_video: if HOOK_START_ORIGINAL_CLIP_USED < runway_as_second_ai_video.duration-0.5: original_clip = runway_as_second_ai_video.subclip(HOOK_START_ORIGINAL_CLIP_USED, runway_as_second_ai_video.duration) if hook_end and i+1 == len(assets.get("selected_videos", [])): lib_hook_end = hook_end if len(clips) > 0: prev_clip = clips[-1][1] prev_clip_file = selected_videos[-2]["local_path"] if len(selected_videos) > 1 else None prev_clip, lib_clip, extra_secs = await self._prepare_clip( lib_video=lib_video, original_clip_path=lib_video["local_path"], alternate_url_local_path=lib_video.get("alternate_url_local_path"), original_clip=original_clip, lib_hook_start=lib_hook_start, lib_hook_end=lib_hook_end, target_duration=lib_video["duration"], extra_secs=extra_secs, prev_clip=prev_clip, prev_clip_file=prev_clip_file ) if extra_secs > 0: # ignore tiny floating-point diffs logger.debug(f"⏱️ Added {extra_secs:.2f}s extra to match target duration ({lib_video['duration']:.2f}s)") if prev_clip and len(clips) > 0: clip_name, _ = clips[-1] clips[-1] = (clip_name, prev_clip) lib_clip = lib_clip.without_audio() clips.append((f"library_{i}", lib_clip)) video_usage = get_config_value("video_usage_count", {}) video_usage[lib_video['url']] = video_usage.get(lib_video['url'], 0) + 1 set_config_value("video_usage_count", video_usage) logger.debug(f"✓ Loaded library video {i}: {lib_clip.duration:.2f}s (NATURAL SPEED)") except Exception as e: import traceback traceback.print_exc() logger.error(f"❌ Failed to load library video {i}: {e}") raise else: logger.warning(f"⚠️ Library video {i} missing local_path") return [clip for _, clip in clips] except Exception as e: logger.error(f"❌ Failed to prepare video clips: {e}") for name, clip in clips: try: clip.close() except: pass raise async def _prepare_clip(self, lib_video, original_clip_path, alternate_url_local_path, original_clip, lib_hook_start, lib_hook_end, target_duration: float, extra_secs, prev_clip, prev_clip_file): # Validate inputs if target_duration <= 0: raise ValueError(f"Invalid target_duration: {target_duration}") if not original_clip_path or not original_clip: raise ValueError("Missing required clip parameters") logger.debug(f"Preparing clip for target duration {target_duration:.2f}s") # Handle start hook case if lib_hook_start: return self._prepare_with_start_hook( lib_video, original_clip_path, alternate_url_local_path, original_clip, lib_hook_start, target_duration, prev_clip ) # Handle end hook case elif lib_hook_end: return self._prepare_with_end_hook( lib_video, original_clip_path, alternate_url_local_path, original_clip, lib_hook_end, target_duration, extra_secs, prev_clip, prev_clip_file ) # No hooks - just extend/trim the original clip else: logger.debug("No hooks detected, adjusting original clip duration only") result, extra_secs = self._extend_or_trim_clip(lib_video, original_clip_path, alternate_url_local_path, original_clip, target_duration) return prev_clip, result, extra_secs def _prepare_with_start_hook(self, lib_video, original_clip_path, alternate_url_local_path, original_clip, lib_hook_start, target_duration, prev_clip): """Handle clip preparation when a start hook is present.""" global HOOK_START_ORIGINAL_CLIP_USED logger.debug(f"Start hook detected with duration {lib_hook_start.duration:.2f}s") total_duration = lib_hook_start.duration + original_clip.duration # Case 1: Target fits within start hook + original clip if target_duration <= total_duration: logger.debug("Target duration fits start hook + original clip, concatenating and trimming") result = concatenate_videoclips([lib_hook_start, original_clip], method="compose").subclip(0, target_duration) logger.debug(f"Prepared clip duration: {result.duration:.2f}s") HOOK_START_ORIGINAL_CLIP_USED = max(0, target_duration - lib_hook_start.duration) return prev_clip, result, 0.0 # Case 2: Need to extend beyond original clip modified_clip, extra_secs = self._extend_or_trim_clip(lib_video, original_clip_path, alternate_url_local_path, original_clip, target_duration-HOOK_VIDEO_DURATION) result = concatenate_videoclips([lib_hook_start, modified_clip], method="compose").subclip(0, target_duration) HOOK_START_ORIGINAL_CLIP_USED = max(0, target_duration - lib_hook_start.duration) logger.debug(HOOK_START_ORIGINAL_CLIP_USED) logger.debug(f"Prepared clip duration: {result.duration:.2f}s") return prev_clip, result, extra_secs def _prepare_with_end_hook(self, lib_video, original_clip_path, alternate_url_local_path, original_clip, lib_hook_end, target_duration, extra_secs, prev_clip, prev_clip_file): """Handle clip preparation when an end hook is present.""" temp_original_clip = original_clip logger.debug(f"End hook detected with duration {lib_hook_end.duration:.2f}s") total_duration = original_clip.duration + lib_hook_end.duration logger.debug(f"Combined original + end hook duration: {total_duration:.2f}s") cur_extra_secs = 0.0 # Case 1: Combined duration exceeds target - need to trim if target_duration <= total_duration: trim_duration = target_duration - lib_hook_end.duration if trim_duration > 0: logger.debug(f"Trimming original clip from {original_clip.duration:.2f}s to {trim_duration:.2f}s to fit end hook") original_clip = original_clip.subclip(0, trim_duration) cur_extra_secs = 0.0 else: # Target shorter than hook → take last part of hook start_trim = max(0, lib_hook_end.duration - target_duration) result = lib_hook_end.subclip(start_trim, lib_hook_end.duration) logger.debug(f"Prepared end-only clip: {result.duration:.2f}s") return prev_clip, result, 0.0 # Case 2: Combined duration is less than target - need to extend original elif target_duration > total_duration: remaining = target_duration - lib_hook_end.duration logger.debug(f"Original + end hook too short, need to extend original by {remaining:.2f}s") original_clip, cur_extra_secs = self._extend_or_trim_clip(lib_video, original_clip_path, alternate_url_local_path, original_clip, remaining) # Case 3: Exact match or after trimming/extending logger.debug("Concatenating original clip and end hook") # Handle very short original clips if original_clip.duration < 1: if original_clip.duration + extra_secs > 1: # Determine how much of extra_secs is actually used to extend this clip possible_new_duration = original_clip.duration + extra_secs new_duration = min(possible_new_duration, temp_original_clip.duration) used_extra = max(0.0, new_duration - original_clip.duration) logger.debug( f"Extending original clip from {original_clip.duration:.2f}s → {new_duration:.2f}s " f"(used_extra={used_extra:.2f}s, available_extra={extra_secs:.2f}s)" ) # Apply the extension original_clip = temp_original_clip.subclip(0, new_duration) # Now, trim the previous clip by exactly how much we actually used new_prev_duration = prev_clip.duration - used_extra logger.debug( f"✂️ Trimming previous clip by {used_extra:.2f}s → new duration {new_prev_duration:.2f}s" ) prev_clip = prev_clip.subclip(0, new_prev_duration) result = concatenate_videoclips([original_clip, lib_hook_end], method="compose").subclip(0, target_duration) cur_extra_secs = 0.0 else: if prev_clip: logger.debug("⚠️ Original clip too short, extending previous clip instead") prev_clip, extra_secs = self._extend_or_trim_clip(lib_video, None, alternate_url_local_path, prev_clip, prev_clip.duration + original_clip.duration) result = lib_hook_end.subclip(max(0, lib_hook_end.duration - target_duration), lib_hook_end.duration) else: result = concatenate_videoclips([original_clip, lib_hook_end], method="compose").subclip(0, target_duration) logger.debug(f"Prepared clip duration: {result.duration:.2f}s") return prev_clip, result, cur_extra_secs def _extend_or_trim_clip(self, lib_video, original_clip_path, alternate_url_local_path, original_clip, target_duration): """ Extend or trim a clip to match target duration. Returns: VideoFileClip: The adjusted clip """ total_duration = original_clip.duration # Case 0: Equal if abs(target_duration - total_duration) < 0.01: # 10ms tolerance return original_clip, 0.0 # Case 1: Target is less than or equal to clip duration if target_duration <= total_duration: logger.debug("Target duration fits original clip, trimming") return original_clip.subclip(0, target_duration), 0.0 # Case 2: Target is greater than clip duration elif target_duration > total_duration: if alternate_url_local_path is None or (target_duration - total_duration <= REVERSE_THRESHOLD): # Small tolerance for floating point logger.debug("⚠️ Reversing clip.") # if original_clip_path: # reversed_clip = self.reverse_clip(original_clip_path) # else: reversed_clip = self.reverse_clip(original_clip) loop_clip = concatenate_videoclips([original_clip, reversed_clip, original_clip, reversed_clip], method="compose") return loop_clip.subclip(0, target_duration), target_duration - original_clip.duration else: logger.debug("⚠️ Using extra clip.") video_usage = get_config_value("video_usage_count", {}) video_usage[lib_video['alternate_url']] = video_usage.get(lib_video['alternate_url'], 0) + 1 set_config_value("video_usage_count", video_usage) alternate_clip = VideoFileClip(alternate_url_local_path) reverse_alternate_clip = self.reverse_clip(alternate_url_local_path) combined = concatenate_videoclips([original_clip, alternate_clip, reverse_alternate_clip, original_clip], method="compose") result = combined.subclip(0, target_duration) extra_secs = max(0.0, target_duration - original_clip.duration - alternate_clip.duration) return result, extra_secs def _extend_clip_to_duration(self, original_clip_path, original_clip, target_duration): """ Extend a clip to target duration using interpolation, looping, or ping-pong. Returns: VideoFileClip: The extended clip """ # Try interpolation first interpolated = None try: interpolated_file = utils.interpolate_video(original_clip_path) if interpolated_file: interpolated = VideoFileClip(interpolated_file) if interpolated.duration >= target_duration: logger.debug("Using interpolated clip for extension") result = interpolated.subclip(0, target_duration) logger.debug(f"Prepared clip duration: {result.duration:.2f}s") return result # Interpolation wasn't long enough logger.debug(f"Interpolated clip ({interpolated.duration:.2f}s) still too short") interpolated.close() except Exception as e: logger.warning(f"Interpolation failed: {e}") if interpolated: interpolated.close() # Try looping if the video is loopable if utils.is_video_loopable(original_clip_path) or utils.is_loopable_phash(original_clip_path): logger.debug("Original clip is loopable, creating loop extension") loop_clip = self.loop_clip(original_clip, target_duration) result = loop_clip.subclip(0, target_duration) logger.debug(f"Prepared clip duration: {result.duration:.2f}s") return result elif utils.is_video_zoomable_tail(original_clip): loop_clip = self.zoom_clip(original_clip, target_duration) # Fallback to ping-pong reverse looping logger.debug("Using ping-pong reverse looping as fallback for extension") reversed_clip = self.reverse_clip(original_clip_path) loop_clip = concatenate_videoclips([original_clip, reversed_clip, original_clip, reversed_clip], method="compose") result = loop_clip.subclip(0, target_duration) logger.debug(f"Prepared clip duration: {result.duration:.2f}s") return result def reverse_clip(self, clip_path): reversed_clip = VideoFileClip(utils_reverse_clip(clip_path)) return reversed_clip def loop_clip(self, clip, target_duration): loop_count = int(target_duration // clip.duration) + 1 # how many loops needed looped = [clip] * loop_count combined = concatenate_videoclips(looped, method="compose") final_clip = combined.subclip(0, target_duration) # trim to exact duration logger.debug(f"♻️ Looping clip {loop_count}x to reach {target_duration:.2f}s") return final_clip def zoom_clip(self, clip, target_duration, zoom_strength): # Calculate freeze duration freeze_duration = target_duration - clip.duration # Freeze the last frame frozen_frame = clip.to_ImageClip(t=clip.duration - 0.01).set_duration(freeze_duration) # Concatenate extended_clip = concatenate_videoclips([clip, frozen_frame]) # Apply zoom using fl_image (frame-by-frame transformation) def apply_zoom(get_frame, t): frame = get_frame(t) zoom_factor = 1 + (zoom_strength - 1) * (t / target_duration) h, w = frame.shape[:2] new_h, new_w = int(h * zoom_factor), int(w * zoom_factor) # Resize frame from scipy.ndimage import zoom as scipy_zoom zoomed_frame = scipy_zoom(frame, (zoom_factor, zoom_factor, 1), order=1) # Crop to center y_start = (zoomed_frame.shape[0] - h) // 2 x_start = (zoomed_frame.shape[1] - w) // 2 cropped = zoomed_frame[y_start:y_start+h, x_start:x_start+w] return cropped zoomed = extended_clip.fl(apply_zoom) return zoomed async def _create_video_sequence_natural_speed( self, video_clips: List[VideoFileClip], video_config: Optional[Dict] ) -> VideoFileClip: """Create video sequence from clips - NO SLOW MOTION, allow longer duration""" try: if not video_clips: raise ValueError("No video clips available for sequence") # MORE FLEXIBLE duration: 11-15 seconds to accommodate natural speed TARGET_MIN_DURATION = 11.0 TARGET_MAX_DURATION = 15.0 # Identify clips by structure hook_start = None hook_end = None library_clips = [] for clip in video_clips: library_clips.append(clip) # logger.debug( # f"✓ Identified: hook_start={hook_start.duration if hook_start else 0:.2f}s, " # f"hook_end={hook_end.duration if hook_end else 0:.2f}s, " # f"library_clips={len(library_clips)}" # ) # Calculate current library duration library_duration = sum(clip.duration for clip in library_clips) # hook_total = (hook_start.duration if hook_start else 0) + (hook_end.duration if hook_end else 0) hook_total = 0 # logger.debug(f"📊 Hook total: {hook_total:.2f}s, Library total: {library_duration:.2f}s") # NEW: Add more library videos if duration is too short if (hook_total + library_duration) < TARGET_MIN_DURATION: logger.debug(f"📥 Duration too short ({hook_total + library_duration:.2f}s), adding more videos...") # We'll handle this by duplicating some clips to reach target additional_clips_needed = [] current_duration = hook_total + library_duration while current_duration < TARGET_MIN_DURATION and library_clips: # Add copies of existing clips to reach target for clip in library_clips: if current_duration < TARGET_MIN_DURATION: additional_clips_needed.append(clip) current_duration += clip.duration else: break library_clips.extend(additional_clips_needed) logger.debug( f"✓ Added {len(additional_clips_needed)} additional clips, new duration: {current_duration:.2f}s" ) # NEW: Only speed up if absolutely necessary, never slow down total_current_duration = hook_total + sum(clip.duration for clip in library_clips) if total_current_duration > TARGET_MAX_DURATION: # Only speed up if significantly over target speed_factor = total_current_duration / TARGET_MAX_DURATION if speed_factor > 1.1: # Only speed up if more than 10% over logger.debug(f"⚡ Slightly speeding up clips by {speed_factor:.2f}x to fit target") adjusted_library = [] for clip in library_clips: sped_clip = clip.fx(lambda c: c.speedx(speed_factor)) adjusted_library.append(sped_clip) library_clips = adjusted_library else: logger.debug("✓ Duration within acceptable range, keeping natural speed") else: logger.debug("✅ Keeping all videos at natural speed") # Build sequence sequence_clips = [] if hook_start: sequence_clips.append(hook_start) logger.debug(f" Added hook_start: {hook_start.duration:.2f}s") for i, clip in enumerate(library_clips): sequence_clips.append(clip) logger.debug(f" Added library_{i}: {clip.duration:.2f}s") if hook_end: sequence_clips.append(hook_end) logger.debug(f" Added hook_end: {hook_end.duration:.2f}s") # Calculate total duration total_duration = sum(clip.duration for clip in sequence_clips) logger.debug( f"📊 Total video sequence duration: {total_duration:.2f}s (target: {TARGET_MIN_DURATION}-{TARGET_MAX_DURATION}s)" ) # FIXED: Remove transition_duration parameter final_sequence = concatenate_videoclips(sequence_clips, method="compose") logger.debug(f"✅ Created video sequence with NATURAL SPEED: {final_sequence.duration:.2f}s") return final_sequence except Exception as e: logger.error(f"❌ Failed to create video sequence: {e}") raise async def _add_timed_subtitles(self, video_path: str, group_all: bool = False, position: int = None) -> CompositeVideoClip: """Add subtitles using timed transcript from Speech-to-Text""" try: assets = get_config_value("visual_assets") timed_words = assets.get("timed_transcript", []) if not timed_words: logger.warning("⚠️ No timed transcript available, using fallback subtitles") raise ValueError("No timed transcript available for subtitles") # return await self._add_fallback_subtitles(video_clip, assets["tts_script"]) logger.debug(f"📝 Creating timed subtitles from {len(timed_words)} words") video_clip = VideoFileClip(video_path) text_clips = [] total_duration = video_clip.duration target_width, target_height = video_clip.size # Group words into phrases based on timing and punctuation phrases = group_words_by_time_and_width(timed_words, target_width, target_height, max_words_per_group = 2, group_all=group_all) logger.debug(f"📝 Created {len(phrases)} timed phrases") for i, phrase in enumerate(phrases): word_timings = phrase["word_timings"] start_time = phrase["start_time"] end_time = phrase["end_time"] if i+1 < len(phrases): phrase_duration = phrases[i+1]["start_time"] - start_time else: phrase_duration = total_duration - start_time # phrase_duration = end_time - start_time text = phrase["text"] if phrase_duration <= 0: continue # IMPROVED: Better styling as requested text_clip = self._create_styled_text_clip( text=text, duration=phrase_duration, target_width=target_width, target_height=target_height, start_time=start_time, word_timings=word_timings ) if text_clip: text_clips.append(text_clip) logger.debug(f"📊 Created {len(text_clips)} timed subtitle clips") if text_clips: final_video = CompositeVideoClip([video_clip] + text_clips) logger.debug(f"✅ Added {len(text_clips)} timed subtitle segments") return await self._render_video_only(final_video.subclip(0, total_duration)) else: return await self._render_video_only(video_clip) except Exception as e: logger.error(f"❌ Failed to add timed subtitles: {e}") raise # return await self._add_fallback_subtitles(video_clip, assets["tts_script"]) def _create_styled_text_clip( self, text: str, duration: float, target_width: int, target_height: int, start_time: float, word_timings=None ) -> Optional[TextClip]: """Create a styled text clip with IMPROVED appearance""" try: # IMPROVED: Slightly smaller font sizes as requested max_chars_per_line = 22 wrapped_text = "\n".join(textwrap.wrap(text, width=max_chars_per_line)) line_count = len(wrapped_text.split("\n")) # Adjusted font sizes (smaller to avoid hitting top safe zone) if line_count > 2: fontsize = 55 # Reduced from 75 to keep 3 lines lower elif line_count > 1: fontsize = 75 # Moderate size for 2 lines else: fontsize = 85 # Large size for single line text_clip = create_text_clip( text, duration, target_width, target_height, start_time, fontsize, word_timings, **self.cap_method() ) return text_clip # IMPROVED: Increased font weight with thicker stroke text_clip = TextClip( txt=wrapped_text, fontsize=fontsize, color="white", font="Fonts/Bungee-Regular.ttf", # Bold font for better weight stroke_color="black", stroke_width=5, # Thicker stroke for better readability method="caption", size=(int(target_width * 0.90), None), # Slightly narrower align="center", ) # IMPROVED: Raised position (higher on screen) vertical_position = int(target_height * 0.65) # Raised from 0.72 to 0.65 text_clip = text_clip.set_position(("center", vertical_position)) text_clip = text_clip.set_start(start_time) text_clip = text_clip.set_duration(duration) # Shorter fade for quicker transitions fade_duration = min(0.15, duration / 6) # Shorter fades text_clip = text_clip.crossfadein(fade_duration).crossfadeout(fade_duration) return text_clip except Exception as e: logger.error(f"❌ Failed to create styled text clip: {e}") raise def _group_words_into_timed_phrases(self, words: List[Dict]) -> List[Dict]: """Group timed words into readable phrases""" phrases = [] current_phrase = [] current_start = None for word in words: word_text = word["word"] start_time = word["start_time"] end_time = word["end_time"] if not current_phrase: current_start = start_time current_phrase.append(word_text) # End phrase on punctuation or after 3-4 words has_punctuation = any(p in word_text for p in [".", ",", "!", "?", ";", ":"]) too_long = len(current_phrase) >= 4 if has_punctuation or too_long: phrases.append({"text": " ".join(current_phrase), "start_time": current_start, "end_time": end_time}) current_phrase = [] current_start = None # Add remaining words if current_phrase and current_start is not None: phrases.append( { "text": " ".join(current_phrase), "start_time": current_start, "end_time": words[-1]["end_time"] if words else current_start + 2.0, } ) return phrases async def _add_fallback_subtitles(self, video_clip: VideoFileClip, script: str) -> CompositeVideoClip: """Fallback subtitles when timed transcript is unavailable""" try: words = self._split_script_into_words(script) text_clips = [] total_duration = video_clip.duration target_width, target_height = video_clip.size logger.debug(f"📝 Using fallback subtitles: {len(words)} words") # phrases = self._group_words_into_phrases(words, max_words=3) phrases = group_words_by_time_and_width(script, target_width, target_height, max_words_per_group=self.max_words_per_group) logger.debug(f"📝 Grouped into {len(phrases)} phrases") duration_per_phrase = total_duration / len(phrases) for i, phrase in enumerate(phrases): start_time = i * duration_per_phrase phrase_duration = duration_per_phrase text_clip = self._create_styled_text_clip( text=phrase, duration=phrase_duration, target_width=target_width, target_height=target_height, start_time=start_time, ) if text_clip: text_clips.append(text_clip) logger.debug(f"📊 Created {len(text_clips)} fallback subtitle clips") if text_clips: final_video = CompositeVideoClip([video_clip] + text_clips) logger.debug(f"✅ Added {len(text_clips)} fallback subtitle segments") return final_video else: return video_clip except Exception as e: logger.error(f"❌ Fallback subtitles failed: {e}") raise def _split_script_into_words(self, script: str) -> List[str]: """Split script into individual words""" import re script = re.sub(r"\s+", " ", script).strip() return script.split() def _group_words_into_phrases(self, words: List[str], max_words: int = 3) -> List[str]: """Group words into small readable phrases""" phrases = [] current_phrase = [] for word in words: current_phrase.append(word) has_punctuation = any(p in word for p in [".", ",", "!", "?", ";"]) if len(current_phrase) >= max_words or has_punctuation: phrases.append(" ".join(current_phrase)) current_phrase = [] if current_phrase: phrases.append(" ".join(current_phrase)) return phrases async def add_audio_to_video(self, video_path: str) -> str: """ Add audio track to pre-rendered video (NO speedup - video is already correct duration) """ try: assets = get_config_value("visual_assets") logger.debug("🔊 Adding audio to rendered video") # Load the video video_clip = VideoFileClip(video_path) # Prepare audio clips audio_clips = await self._prepare_audio_clips(assets, video_clip.duration) # Add audio track video_with_audio = await self._add_audio_track(video_clip, audio_clips) output_path = await self.render_video_final(video_with_audio) # Cleanup video_clip.close() if video_with_audio != video_clip: video_with_audio.close() logger.debug(f"✅ Final video with audio: {output_path}") return output_path except Exception as e: logger.error(f"❌ Failed to add audio to video: {e}") raise def get_audio_rms(self, audio_clip, sample_duration=1.0): """Calculate RMS of an audio clip by sampling the first few seconds.""" try: duration = min(sample_duration, audio_clip.duration) audio_segment = audio_clip.subclip(0, duration) audio_array = audio_segment.to_soundarray(fps=44100) return np.sqrt(np.mean(audio_array ** 2)) except Exception as e: logger.warning(f"⚠️ Failed to compute RMS: {e}") return 0.0 def calculate_bg_volume(self, main_rms, bg_rms): """ Dynamically calculate background music volume based on TTS and background RMS. Tuned for boosted TTS (+5 dB from Google TTS). If main_rms is 0 (no TTS), treat background as main audio with high volume. """ # If no main audio (TTS), treat background as primary audio if main_rms == 0 or main_rms < 0.001: logger.debug("🔊 No main audio detected, treating background as primary audio with high volume") return 0.85 # High volume for background as main audio # Base volume curve (a bit higher overall) if main_rms > 0.04: # Very strong TTS base_volume = 0.35 elif main_rms > 0.02: # Normal TTS base_volume = 0.45 else: # Soft TTS base_volume = 0.55 # Adjust further based on background loudness if bg_rms > 0.15: # Very loud music file bg_volume = base_volume * 0.25 elif bg_rms > 0.08: bg_volume = base_volume * 0.45 elif bg_rms > 0.03: bg_volume = base_volume * 0.7 elif bg_rms > 0.01: bg_volume = base_volume * 0.9 else: bg_volume = base_volume * 1.1 # return max(ALLOWED_BG_MUSIC_VOLUME, min(1.0, bg_volume)) return max(0.0, min(bg_volume, ALLOWED_BG_MUSIC_VOLUME)) async def _prepare_audio_clips(self, assets: Dict, target_duration: float) -> List[AudioFileClip]: """ Load TTS and background music, trim to match video, and dynamically adjust background volume. """ clips = [] try: # --- Load TTS Audio (Main Voice) --- tts_clip = None if assets.get("tts_audio_data") and assets["tts_audio_data"].get("local_path"): try: tts_clip = AudioFileClip(assets["tts_audio_data"]["local_path"]) if tts_clip.duration > 0: if tts_clip.duration > target_duration: logger.debug(f"⚠️ Trimming TTS: {tts_clip.duration:.2f}s → {target_duration:.2f}s") tts_clip = tts_clip.subclip(0, target_duration) elif tts_clip.duration < target_duration: logger.debug(f"⚠️ TTS shorter: {tts_clip.duration:.2f}s < {target_duration:.2f}s") clips.append(("tts", tts_clip)) logger.debug(f"✓ Loaded TTS at full volume ({tts_clip.duration:.2f}s)") else: logger.warning("⚠️ TTS audio has zero duration") tts_clip.close() except Exception as e: logger.error(f"❌ Failed to load TTS audio: {e}") raise # --- Load Background Music (Dynamic Volume) --- if assets.get("background_music_local"): try: bg_clip = AudioFileClip(assets["background_music_local"]) if bg_clip.duration > 0: if bg_clip.duration > target_duration: bg_clip = bg_clip.subclip(0, target_duration) logger.debug(f"✓ Trimmed background to {target_duration:.2f}s") # Compute RMS levels tts_rms = self.get_audio_rms(tts_clip) if tts_clip else 0.0 bg_rms = self.get_audio_rms(bg_clip) # Dynamic volume adjustment bg_volume = self.calculate_bg_volume(tts_rms, bg_rms) bg_clip = bg_clip.volumex(bg_volume) clips.append(("background", bg_clip)) logger.debug( f"✓ Loaded background (RMS: {bg_rms:.4f}) at dynamic volume {bg_volume:.2f} ({bg_clip.duration:.2f}s)" ) else: logger.warning("⚠️ Background music has zero duration") bg_clip.close() except Exception as e: logger.error(f"❌ Failed to load background music: {e}") raise return [clip for _, clip in clips] except Exception as e: logger.error(f"❌ Failed to prepare audio clips: {e}") for name, clip in clips: try: clip.close() except: pass raise async def _add_audio_track(self, video_clip: VideoFileClip, audio_clips: List[AudioFileClip]) -> VideoFileClip: """Add full audio track""" if not audio_clips: return video_clip try: valid_audio_clips = [clip for clip in audio_clips if clip.duration > 0] if not valid_audio_clips: return video_clip mixed_audio = CompositeAudioClip(valid_audio_clips) mixed_audio = mixed_audio.subclip(0, min(video_clip.duration, mixed_audio.duration)) video_with_audio = video_clip.set_audio(mixed_audio) logger.debug(f"✅ Added audio track") return video_with_audio except Exception as e: logger.error(f"❌ Failed to add audio track: {e}") raise async def render_video_final(self, video_clip) -> str: """Render final video clip to file""" try: output_path = self.temp_dir / f"{uuid.uuid4().hex}.mp4" video_clip.write_videofile( str(output_path), codec="libx264", audio_codec="aac", fps=25, verbose=False, logger=None, ffmpeg_params=["-pix_fmt", "yuv420p"] ) video_clip.close() return str(output_path) except Exception as e: logger.error(f"Final video render failed: {e}") if "video_clip" in locals(): video_clip.close() raise async def _render_video_only(self, video_clip: VideoFileClip) -> str: """Render video WITHOUT audio""" unique_id = uuid.uuid4().hex[:8] filename = f"video_no_audio_{unique_id}.mp4" output_path = self.temp_dir / filename try: logger.debug(f"📹 Rendering video (no audio): {filename}") video_clip.write_videofile( str(output_path), codec="libx264", fps=25, verbose=False, logger=None, ffmpeg_params=["-pix_fmt", "yuv420p"] ) return str(output_path) except Exception as e: logger.error(f"❌ Video rendering failed: {e}") raise finally: video_clip.close() def compress(self, input_path: str): """ Compress video without losing visible quality. Keeps full HD resolution. """ try: stem = Path(input_path).stem output_path = f"/tmp/{stem}_compressed.mp4" # Use CRF-based quality control # - CRF 23–28: lower is higher quality (and larger size) # - Preset 'slow' gives better compression ratio cmd = [ "ffmpeg", "-y", "-i", input_path, "-vf", "scale=1080:1920", # keep full HD "-c:v", "libx264", "-preset", "slow", "-crf", "26", # try 26; adjust 23–28 if too large/small "-c:a", "aac", "-b:a", "128k", # clean, compact audio "-movflags", "+faststart", # better for web playback output_path ] logger.debug(f"🎞️ Compressing {input_path} → {output_path}") subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) size_mb = os.path.getsize(output_path) / (1024 * 1024) logger.debug(f"✅ Compressed to {size_mb:.2f} MB at Full HD") return output_path except Exception as e: logger.error(f"❌ Compression failed: {str(e)}") return input_path def _get_caption_style(self): if not get_config_value("current_caption_style"): caption_style = get_config_value("caption_style", "random") styles = { "caption_1": (caption_style_1, 3), "caption_style_1": (caption_style_1, 3), "caption_2": (caption_style_2, 3), "caption_style_2": (caption_style_2, 3), "caption_3": (caption_style_3, 3), "caption_style_3": (caption_style_3, 3), "caption_4": (caption_style_4, 3), "caption_style_4": (caption_style_4, 3), "caption_style_on_screen_text": (caption_style_on_screen_text, 3), "caption_style_on_screen_text_top": (caption_style_on_screen_text_top, 3), } if caption_style == "random": set_config_value("current_caption_style", random.choice(list(styles.values()))) set_config_value("current_caption_style", styles.get(caption_style, random.choice(list(styles.values())))) logger.debug(f'🎨 Selected caption style: {get_config_value("current_caption_style")[0].__name__}') return get_config_value("current_caption_style") async def render_video_cuts( self, music_duration: float, beat_times: list = None, interval: float = None, min_clip_duration: float = 0.0, loop_short_videos: bool = True, ) -> str: """ Unified memory-efficient video renderer: - Supports both beat-synced (beat_times) and interval-based (interval) modes - Save each clip to temp file immediately - Use FFmpeg concat demuxer to merge (constant memory) Args: music_duration: Total duration of the output video beat_times: Array of beat timestamps (for beat-synced mode) interval: Fixed interval between cuts (for hard-cut mode) min_clip_duration: Minimum duration for a clip (skips shorter beats) loop_short_videos: If True, loop videos shorter than 4s using reverse Either beat_times OR interval must be provided, not both. """ import subprocess # Validate inputs if beat_times is None and interval is None: raise ValueError("Either beat_times or interval must be provided") # If interval is provided, generate synthetic beat times if interval is not None: beat_times = [] t = 0.0 while t <= music_duration: beat_times.append(t) t += interval if beat_times[-1] < music_duration: beat_times.append(music_duration) beat_times = np.array(beat_times) logger.debug(f"Generated {len(beat_times)} beats at {interval}s intervals") else: # Ensure first beat is at 0 if len(beat_times) > 0 and beat_times[0] > 0.0001: beat_times = np.insert(beat_times, 0, 0.0) logger.debug(f"⚡ Inserted virtual beat at 0.0s for intro") if len(beat_times) < 2: raise ValueError("Need at least 2 beat times") temp_clips = [] video_idx = 0 accumulated_deficit = 0.0 videos = get_config_value("visual_assets")["selected_videos"] logger.debug(f"Creating video synced to {len(beat_times)} beats") logger.debug(f"Music duration: {music_duration:.2f}s") try: i = 0 while i < len(beat_times) - 1: if video_idx >= len(videos) or beat_times[i] > music_duration + 2: break required_duration = beat_times[i + 1] - beat_times[i] target_duration = required_duration + accumulated_deficit if target_duration < min_clip_duration and min_clip_duration > 0: accumulated_deficit = target_duration i += 1 continue video_path = videos[video_idx % len(videos)] video_filename = os.path.basename(video_path) try: # Get video duration probe_cmd = ["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", video_path] probe_result = subprocess.run(probe_cmd, capture_output=True, text=True, timeout=10) video_duration_src = target_duration + 1 if probe_result.returncode == 0 and probe_result.stdout.strip(): try: video_duration_src = float(probe_result.stdout.strip()) except ValueError: pass temp_clip_path = os.path.abspath(str(self.temp_dir / f"clip_{video_idx+1:03d}.mp4")) # Determine filter # IMPORTANT: All clips must have identical properties to avoid black frames during concat # - setsar=1:1 ensures consistent sample aspect ratio # - format=yuv420p ensures consistent pixel format # - fps=25 ensures consistent frame rate if loop_short_videos and video_duration_src < 4: # Ping-pong loop (Forward-Reverse-Forward-Reverse) -> 4x duration vf_filter = ( "[0:v]split=2[a][b];[b]reverse[br];[a][br]concat=n=2:v=1:a=0[loop1];" "[loop1]split=2[c][d];[d]reverse[dr];[c][dr]concat=n=2:v=1:a=0[looped];" "[looped]setpts=PTS-STARTPTS," "scale=1080:1920:force_original_aspect_ratio=increase," "crop=1080:1920,setsar=1:1,format=yuv420p[out]" ) use_filter_complex = True # Allow utilizing the full 4x duration if needed max_possible = video_duration_src * 4 trim_duration = min(target_duration, max_possible) elif video_duration_src < target_duration: loop_count = int(target_duration / video_duration_src) + 1 vf_filter = f"loop={loop_count}:size=999:start=0,setpts=PTS-STARTPTS,scale=1080:1920:force_original_aspect_ratio=increase,crop=1080:1920,setsar=1:1,format=yuv420p" use_filter_complex = False trim_duration = target_duration else: vf_filter = "setpts=PTS-STARTPTS,scale=1080:1920:force_original_aspect_ratio=increase,crop=1080:1920,setsar=1:1,format=yuv420p" use_filter_complex = False trim_duration = min(target_duration, video_duration_src) if use_filter_complex: cmd = ["ffmpeg", "-y", "-i", video_path, "-filter_complex", vf_filter, "-map", "[out]", "-t", str(trim_duration), "-c:v", "libx264", "-preset", "ultrafast", "-r", "25", "-pix_fmt", "yuv420p", "-video_track_timescale", "12800", "-an", temp_clip_path] else: cmd = ["ffmpeg", "-y", "-i", video_path, "-t", str(trim_duration), "-vf", vf_filter, "-c:v", "libx264", "-preset", "ultrafast", "-r", "25", "-pix_fmt", "yuv420p", "-video_track_timescale", "12800", "-an", temp_clip_path] result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) if result.returncode != 0: logger.error(f"FFmpeg error for clip {video_idx+1}: {result.stderr}") video_idx += 1 i += 1 continue temp_clips.append(temp_clip_path) accumulated_deficit = max(0, target_duration - trim_duration) total_time = sum(beat_times[j+1] - beat_times[j] for j in range(i+1)) if i < len(beat_times) - 1 else beat_times[i] logger.debug(f"✂️ CUT {video_idx+1}: Used {trim_duration:.2f}s from {video_filename} | Total time: {total_time:.2f}s") video_idx += 1 i += 1 except subprocess.TimeoutExpired: logger.error(f"Timeout processing {video_path}, skipping...") video_idx += 1 i += 1 except Exception as e: logger.error(f"Error processing video {video_idx}: {e}") video_idx += 1 i += 1 if not temp_clips: raise ValueError("No clips created") concat_file_path = os.path.abspath(str(self.temp_dir / "concat_list.txt")) with open(concat_file_path, "w") as f: for clip_path in temp_clips: f.write(f"file '{clip_path}'\n") output_path = os.path.abspath(str(self.temp_dir / f"merged_{uuid.uuid4().hex[:8]}.mp4")) # Use stream copy since all clips are already encoded with identical properties # This avoids re-encoding artifacts and timing issues that cause black frames concat_cmd = [ "ffmpeg", "-y", "-fflags", "+genpts", # Generate fresh PTS for clean concatenation "-f", "concat", "-safe", "0", "-i", concat_file_path, "-c", "copy", # Stream copy - no re-encoding "-avoid_negative_ts", "make_zero", # Fix timestamp issues at clip boundaries "-t", str(music_duration), "-an", output_path ] logger.debug(f"🎬 Merging {len(temp_clips)} clips...") result = subprocess.run(concat_cmd, capture_output=True, text=True, timeout=120) if result.returncode != 0: logger.error(f"FFmpeg concat error: {result.stderr}") raise ValueError(f"Failed to merge clips: {result.stderr}") logger.debug(f"✅ Merged video saved: {output_path}") return output_path finally: # Clean up temp clips for clip_path in temp_clips: try: if os.path.exists(clip_path): os.remove(clip_path) except: pass try: if 'concat_file_path' in locals() and os.path.exists(concat_file_path): os.remove(concat_file_path) except: pass def _validate_assets_for_video_only(self) -> bool: """Validate assets for video-only rendering""" assets = get_config_value("visual_assets") if not assets.get("selected_videos"): logger.error("No selected videos provided") return False # Hook video is OPTIONAL for VoiceOver pipeline if assets.get("hook_video") and not assets["hook_video"].get("local_path"): logger.error("Hook video provided but missing local_path") return False # Check that at least some library videos have local_path valid_library_videos = [v for v in assets.get("selected_videos", []) if v.get("local_path")] if not valid_library_videos: logger.error("No library videos with local_path") return False return True def _cleanup_temp_files(self, clips: List): """Clean up temporary video/audio clips""" for clip in clips: try: if hasattr(clip, "close"): clip.close() except Exception as e: # Silently ignore cleanup errors pass def __del__(self): """Cleanup on destruction""" try: import shutil if hasattr(self, "temp_dir") and self.temp_dir.exists() and not get_config_value("test_automation"): shutil.rmtree(self.temp_dir, ignore_errors=True) except Exception: # Silently ignore cleanup errors pass