import os import time import tempfile import io import numpy as np import cv2 # Still used for reading logo/watermark if needed import logging import uuid import subprocess from PIL import Image, ImageFont, ImageDraw # --- MoviePy Imports --- from moviepy import * #from moviepy.editor import * # import moviepy.video.fx.all as vfx # Keep if you add more complex FX #from moviepy.video.tools.transitions import crossfadein, slide_in # Import specific transitions logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # --- Constants --- DEFAULT_WIDTH = 1280 DEFAULT_HEIGHT = 720 DEFAULT_FPS = 24 DEFAULT_TRANSITION_DURATION = 0.75 # seconds DEFAULT_FONT = "lazy_dog.ttf" # *** ADJUST THIS PATH *** Needs to be accessible on your server DEFAULT_SUBTITLE_FONT_SIZE = 36 DEFAULT_SUBTITLE_COLOR = 'white' DEFAULT_SUBTITLE_BG_COLOR = 'rgba(0,0,0,0.5)' # Semi-transparent black background DEFAULT_SUBTITLE_POSITION = ('center', 'bottom') DEFAULT_LOGO_PATH = "sozo_logo2.png" # *** ADJUST THIS PATH *** # --- Helper Functions --- def get_styled_audio_duration(audio_path): """ Get duration of an audio file using MoviePy (preferred) or FFprobe fallback. """ if not audio_path or not os.path.exists(audio_path): logger.warning(f"Audio path invalid or file not found: {audio_path}. Defaulting duration to 5.0s.") return 5.0 # Default duration try: # Use MoviePy - more integrated & handles more formats potentially with AudioFileClip(audio_path) as clip: duration = clip.duration # Sometimes duration might be None or 0 for very short/corrupt files return duration if duration and duration > 0 else 5.0 except Exception as e: logger.warning(f"MoviePy failed to get duration for {audio_path}: {e}. Trying ffprobe.") try: result = subprocess.run( ['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', audio_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, text=True # Get stdout as string ) return float(result.stdout.strip()) except Exception as e_ffprobe: logger.error(f"FFprobe also failed for {audio_path}: {e_ffprobe}. Returning default 5.0s.") return 5.0 def resize_image_aspect_fill(img_pil, target_width, target_height): """ Resizes PIL image to target dimensions, cropping if necessary to fill.""" try: target_ratio = target_width / target_height # Ensure image is in RGB for consistent processing img_pil = img_pil.convert("RGB") img_ratio = img_pil.width / img_pil.height if abs(target_ratio - img_ratio) < 0.01: # If aspect ratios are very close, just resize img_resized = img_pil.resize((target_width, target_height), Image.Resampling.LANCZOS) return img_resized if target_ratio > img_ratio: # Target is wider than image -> crop top/bottom new_width = target_width new_height = int(new_width / img_ratio) img_resized = img_pil.resize((new_width, new_height), Image.Resampling.LANCZOS) top = (new_height - target_height) // 2 bottom = top + target_height img_cropped = img_resized.crop((0, top, target_width, bottom)) else: # Target is taller than image (or same ratio) -> crop sides new_height = target_height new_width = int(new_height * img_ratio) img_resized = img_pil.resize((new_width, new_height), Image.Resampling.LANCZOS) left = (new_width - target_width) // 2 right = left + target_width img_cropped = img_resized.crop((left, 0, right, target_height)) return img_cropped except Exception as e: logger.error(f"Error resizing/cropping image: {e}. Returning original image resized without aspect correction.") # Fallback: simple resize, may introduce distortion or black bars later return img_pil.resize((target_width, target_height), Image.Resampling.LANCZOS) def create_subtitle_clip(text, duration, width, height, options): """Creates a MoviePy TextClip for subtitles.""" if not text or not isinstance(text, str) or not text.strip(): # logger.debug("Subtitle text is empty or invalid, skipping clip creation.") return None # Ensure defaults are handled robustly subtitle_opts = options or {} font_path = subtitle_opts.get("font", DEFAULT_FONT) fontsize = int(subtitle_opts.get("fontsize", DEFAULT_SUBTITLE_FONT_SIZE)) color = subtitle_opts.get("color", DEFAULT_SUBTITLE_COLOR) position = subtitle_opts.get("position", DEFAULT_SUBTITLE_POSITION) bg_color = subtitle_opts.get("bg_color", DEFAULT_SUBTITLE_BG_COLOR) # Background for readability stroke_color = subtitle_opts.get("stroke_color") # Optional outline stroke_width = float(subtitle_opts.get("stroke_width", 0)) # Default 0 if not specified method = subtitle_opts.get("method", "caption") # 'caption' automatically wraps text align = subtitle_opts.get("align", "center") margin = int(subtitle_opts.get("margin", 10)) # Margin from edge for position like ('center', 'bottom') # Adjust position based on margin for common cases if isinstance(position, (list, tuple)) and len(position) == 2: x_pos, y_pos = position if y_pos == 'bottom': final_pos = (x_pos, height - margin) # Anchor point is bottom of text elif y_pos == 'top': final_pos = (x_pos, margin) # Anchor point is top of text else: final_pos = position else: final_pos = position # Use as is if not tuple or custom values try: # Check font existence early if not os.path.exists(font_path): logger.error(f"❌ Subtitle font not found at '{font_path}'. Using MoviePy default.") font_path = None # Let MoviePy choose a default # Use method='caption' for automatic wrapping based on size # Limit width to 90% of video width for wrapping text_width_limit = width * 0.9 subtitle = TextClip( txt=text.strip(), # Ensure no leading/trailing whitespace fontsize=fontsize, color=color, font=font_path, size=(text_width_limit, None), # Width limit, height auto method=method, align=align, bg_color=bg_color, stroke_color=stroke_color if stroke_width > 0 else None, # Only apply if stroke_width > 0 stroke_width=stroke_width, print_cmd=False # Suppress verbose ffmpeg command print ).set_position(final_pos, relative=False).set_duration(duration) # relative=False if using pixel coords # Optional fade in/out for subtitles fade_duration = float(subtitle_opts.get("fade_duration", 0.3)) # Ensure fade doesn't exceed half the clip duration fade_duration = min(fade_duration, duration / 2.1) # Ensure non-overlapping fades if fade_duration > 0: subtitle = subtitle.crossfadein(fade_duration).crossfadeout(fade_duration) # logger.debug(f"✅ Created subtitle clip for text: '{text[:30]}...'") return subtitle except Exception as e: logger.error(f"❌ ERROR creating subtitle clip for text '{text[:30]}...': {e}") # Optionally try PIL fallback here if needed, but TextClip is generally more robust return None def create_particle_overlay_clip(particle_type, duration, width, height, options): """ Creates a particle overlay clip. Placeholder: Loads a pre-existing video file based on type. *** Requires actual particle video files on server *** """ if not particle_type or particle_type == 'none': return None particle_opts = options or {} # *** Define paths to your actual particle overlay videos here *** base_particle_path = "assets/particles" # Example base directory particle_files = { "snow": os.path.join(base_particle_path, "snow_overlay.mp4"), "sparkles": os.path.join(base_particle_path, "sparkles_overlay.mp4"), # Often .mov for alpha "rain": os.path.join(base_particle_path, "rain_overlay.mp4"), "confetti": os.path.join(base_particle_path, "confetti_overlay.mp4"), # Add more mappings for types you support } particle_path = particle_files.get(str(particle_type).lower()) if not particle_path: logger.warning(f"⚠️ Particle type '{particle_type}' not recognized or mapped.") return None if not os.path.exists(particle_path): logger.warning(f"⚠️ Particle overlay video not found for type '{particle_type}' at path: {particle_path}") return None try: logger.info(f"Creating particle overlay: {particle_type} from {particle_path}") # Load the overlay video. Check for alpha channel (.mov often has it) has_mask = particle_path.lower().endswith('.mov') overlay = VideoFileClip(particle_path, has_mask=has_mask, target_resolution=(height, width)) # Loop or trim the overlay to match the required duration if overlay.duration < duration: # Loop requires careful handling if audio present in overlay, disable audio overlay = overlay.loop(duration=duration).without_audio() else: overlay = overlay.subclip(0, duration).without_audio() # Resize MUST happen after loop/subclip if original is smaller overlay = overlay.resize(newsize=(width, height)) # Set opacity opacity = float(particle_opts.get("opacity", 0.6)) # Default 60% opacity if 0.0 <= opacity < 1.0: overlay = overlay.set_opacity(opacity) elif opacity < 0: overlay = overlay.set_opacity(0) # Clamp at 0 # Set position (usually centered) position = particle_opts.get("position", "center") return overlay.set_position(position) except Exception as e: # Log the full traceback for debugging particle issues import traceback logger.error(f"❌ ERROR creating particle overlay for {particle_type}: {e}\n{traceback.format_exc()}") return None # --- Main Video Creation Function using MoviePy --- def create_styled_video(images, audio_files, section_texts, output_path, config=None): """ Creates a video from images, audio, and text using MoviePy. Args: images (list): List of PIL Image objects (MUST NOT contain None here). audio_files (list): List of paths to audio files (can have None). section_texts (list): List of text strings for subtitles (can have None). output_path (str): Path to save the final video. config (dict): Dictionary containing configuration options. Returns: str: Path to the generated video file, or None on failure. """ # Ensure inputs are lists images = images or [] audio_files = audio_files or [] section_texts = section_texts or [] if not images: logger.error("❌ No images provided for video creation.") return None num_sections = len(images) # Pad other lists to match image count if necessary (should be handled by caller ideally) if len(audio_files) < num_sections: audio_files.extend([None] * (num_sections - len(audio_files))) if len(section_texts) < num_sections: section_texts.extend([None] * (num_sections - len(section_texts))) if config is None: config = {} logger.info(f"Starting video creation with config: {config}") # --- Get Configuration --- width = int(config.get("width", DEFAULT_WIDTH)) height = int(config.get("height", DEFAULT_HEIGHT)) fps = int(config.get("fps", DEFAULT_FPS)) transition_type = config.get("transition", "fade").lower() transition_duration = float(config.get("transition_duration", DEFAULT_TRANSITION_DURATION)) # Ensure transition duration isn't negative transition_duration = max(0, transition_duration) font_path = config.get("font_path", DEFAULT_FONT) # Primary font path subtitle_opts = config.get("subtitle_options", {}) subtitles_enabled = subtitle_opts.get("enabled", True) if "font" not in subtitle_opts: subtitle_opts["font"] = font_path # Inherit default font particle_opts = config.get("particle_options", {}) particles_enabled = particle_opts.get("enabled", False) # Ensure particle types list exists and matches section count particle_types = particle_opts.get("types_per_section", []) if len(particle_types) != num_sections: logger.warning(f"Particle types list length mismatch ({len(particle_types)} vs {num_sections} sections). Disabling particles for safety.") particle_types = [None] * num_sections # Reset to None particle_opts['types_per_section'] = particle_types # Store potentially corrected list watermark_opts = config.get("watermark_options", {}) watermark_enabled = watermark_opts.get("enabled", False) and watermark_opts.get("path") watermark_path = watermark_opts.get("path") if watermark_enabled else None end_logo_opts = config.get("end_logo_options", {}) end_logo_enabled = end_logo_opts.get("enabled", True) # Use default Sozo logo path if enabled but no path given end_logo_path = end_logo_opts.get("path", DEFAULT_LOGO_PATH) if end_logo_enabled else None end_logo_duration = float(end_logo_opts.get("duration", 3.0)) # --- Resource Management --- clips_to_close = [] # Keep track of clips needing .close() # --- Prepare Clips --- section_video_clips = [] # Holds the final composited clip for each section section_audio_clips = [] # Holds the audio clip for each section final_durations = [] # Holds the calculated duration for each section logger.info(f"Processing {num_sections} sections for video...") total_duration_est = 0 for i in range(num_sections): img_pil = images[i] audio_path = audio_files[i] text = section_texts[i] particle_type = particle_types[i] if particles_enabled else None logger.info(f"--- Section {i+1}/{num_sections} ---") # Determine duration (based on audio or default) duration = get_styled_audio_duration(audio_path) final_durations.append(duration) logger.info(f" Duration: {duration:.2f}s (Audio: {os.path.basename(str(audio_path)) if audio_path else 'No'})") logger.info(f" Text: '{str(text)[:40]}...'") logger.info(f" Particle: {particle_type if particle_type else 'None'}") # --- Create Image Clip --- img_clip = None try: img_resized_pil = resize_image_aspect_fill(img_pil, width, height) img_np = np.array(img_resized_pil) img_clip = ImageClip(img_np).set_duration(duration).set_fps(fps) clips_to_close.append(img_clip) # Add base image clip for potential closing except Exception as e: logger.error(f"❌ Failed to process image {i+1}: {e}. Creating black frame.") # Use a black frame as placeholder to avoid crashing img_clip = ColorClip(size=(width, height), color=(0,0,0), duration=duration) clips_to_close.append(img_clip) # --- Create Audio Clip --- section_audio_clip = None if audio_path and os.path.exists(audio_path): try: # Load audio and ensure duration matches video segment exactly # Use try-except for AudioFileClip as it can fail on corrupted files temp_audio = AudioFileClip(audio_path) clips_to_close.append(temp_audio) # Add for closing # Trim or pad audio if necessary (MoviePy often handles slight discrepancies) section_audio_clip = temp_audio.subclip(0, min(temp_audio.duration, duration)) # If audio is shorter than video, MoviePy usually pads with silence automatically when concatenating. # If you need explicit looping/padding logic, add it here. if section_audio_clip.duration < duration - 0.01: # Allow small tolerance logger.warning(f"Audio duration ({section_audio_clip.duration:.2f}s) shorter than video ({duration:.2f}s). Silence will be added.") section_audio_clips.append(section_audio_clip) except Exception as e: logger.error(f"❌ Failed to load audio '{os.path.basename(audio_path)}': {e}. Adding silence.") # Add silence to keep timing consistent silence = AudioClip(lambda t: 0, duration=duration, fps=44100) # Standard audio fps section_audio_clips.append(silence) clips_to_close.append(silence) else: # Add silence if no audio for this section logger.info(" No audio path or file not found, adding silence.") silence = AudioClip(lambda t: 0, duration=duration, fps=44100) section_audio_clips.append(silence) clips_to_close.append(silence) # --- Create Subtitle Clip --- subtitle_clip = None if subtitles_enabled and text: logger.info(" Creating subtitle clip...") subtitle_clip = create_subtitle_clip(text, duration, width, height, subtitle_opts) if subtitle_clip: clips_to_close.append(subtitle_clip) # --- Create Particle Overlay Clip --- particle_clip = None if particles_enabled and particle_type: logger.info(f" Creating particle overlay: {particle_type}...") particle_clip = create_particle_overlay_clip(particle_type, duration, width, height, particle_opts) if particle_clip: clips_to_close.append(particle_clip) # --- Composite Section --- # Layer order: Image -> Particles -> Subtitles composited_layers = [img_clip] # Base image if particle_clip: composited_layers.append(particle_clip) if subtitle_clip: composited_layers.append(subtitle_clip) # Only composite if more than one layer exists if len(composited_layers) > 1: final_section_clip = CompositeVideoClip(composited_layers, size=(width, height)).set_duration(duration).set_fps(fps) clips_to_close.append(final_section_clip) # Add composite for closing else: final_section_clip = img_clip # Just the image clip if no overlays section_video_clips.append(final_section_clip) total_duration_est += duration if not section_video_clips: logger.error("❌ No valid video clips were created for any section.") # Cleanup clips created so far for clip in clips_to_close: try: clip.close() except: pass return None logger.info(f"Total estimated video duration (before transitions/end logo): {total_duration_est:.2f}s") # --- Concatenate Video Clips with Transitions --- final_video = None if len(section_video_clips) > 1 and transition_type != 'none' and transition_duration > 0: # Ensure transition isn't longer than the shortest clip involved min_clip_dur = min(c.duration for c in section_video_clips) safe_transition_duration = min(transition_duration, min_clip_dur / 2.01) # Ensure overlap is possible if safe_transition_duration < transition_duration: logger.warning(f"Requested transition duration ({transition_duration}s) too long for shortest clip ({min_clip_dur:.2f}s). Clamping to {safe_transition_duration:.2f}s.") transition_duration = safe_transition_duration logger.info(f"Applying '{transition_type}' transitions with duration {transition_duration:.2f}s...") if transition_type == 'fade': # Crossfade is best handled by concatenate_videoclips's transition argument final_video = concatenate_videoclips( section_video_clips, method="compose", transition=crossfadein(transition_duration) ) elif transition_type.startswith('slide_'): # Manual slide transitions require more complex composition direction = transition_type.split('_')[1] # 'left', 'right', 'up', 'down' processed_clips = [] current_time = 0 for i, clip in enumerate(section_video_clips): clip = clip.set_start(current_time) if i > 0: # Apply slide-in from second clip onwards clip = slide_in(clip, duration=transition_duration, side=direction) processed_clips.append(clip) # Move next clip's start time back by transition duration for overlap current_time += clip.duration - (transition_duration if i < len(section_video_clips) - 1 else 0) final_video = CompositeVideoClip(processed_clips, size=(width, height)) else: logger.warning(f"Unsupported transition type '{transition_type}', falling back to 'fade'.") final_video = concatenate_videoclips( section_video_clips, method="compose", transition=crossfadein(transition_duration) ) else: logger.info("Concatenating clips without transitions...") final_video = concatenate_videoclips(section_video_clips, method="compose") if not final_video: logger.error("❌ Failed to concatenate video clips.") # Cleanup clips for clip in clips_to_close: try: clip.close() except: pass return None clips_to_close.append(final_video) # Add the main concatenated video for closing # --- Concatenate Audio --- final_audio = None if section_audio_clips: logger.info("Concatenating audio clips...") try: final_audio = concatenate_audioclips(section_audio_clips) # Set the final video's audio final_video = final_video.set_audio(final_audio) logger.info(f"Combined audio duration: {final_audio.duration:.2f}s. Video duration: {final_video.duration:.2f}s") # Moviepy usually handles slight mismatches, but log if significant if abs(final_audio.duration - final_video.duration) > 0.1: logger.warning("Significant mismatch between final video and audio durations detected.") clips_to_close.append(final_audio) except Exception as e: logger.error(f"❌ Failed to concatenate or set audio: {e}. Video will be silent.") final_video = final_video.set_audio(None) # Ensure no audio track if failed else: logger.warning("No audio clips found or generated. Video will be silent.") final_video = final_video.set_audio(None) # --- Add Watermark --- watermark_clip_instance = None # Keep track for closing if watermark_enabled and watermark_path and os.path.exists(watermark_path): try: logger.info(f"Adding watermark from: {watermark_path}") # Use ismask=True if your watermark PNG has transparency is_mask = watermark_path.lower().endswith(".png") wm_img = ImageClip(watermark_path, ismask=is_mask, transparent=True) clips_to_close.append(wm_img) # Size wm_size_param = watermark_opts.get("size", 0.15) # Default: 15% of video width target_wm_width = None target_wm_height = None if isinstance(wm_size_param, float): # Relative size based on video width target_wm_width = int(width * wm_size_param) elif isinstance(wm_size_param, (tuple, list)) and len(wm_size_param) == 2: # Absolute size (w, h) target_wm_width = int(wm_size_param[0]) target_wm_height = int(wm_size_param[1]) # Resize watermark maintaining aspect ratio if target_wm_width and target_wm_height: wm_img = wm_img.resize(newsize=(target_wm_width, target_wm_height)) elif target_wm_width: wm_img = wm_img.resize(width=target_wm_width) elif target_wm_height: wm_img = wm_img.resize(height=target_wm_height) # Else use original size if no specific size given # Position with margin wm_pos = watermark_opts.get("position", ("right", "bottom")) # Default bottom right margin = int(watermark_opts.get("margin", 15)) # Pixels margin # Convert position keywords to coordinates respecting margin def get_coord(dim, size, pos_keyword, margin): if pos_keyword == 'left': return margin if pos_keyword == 'center': return (dim / 2) - (size / 2) if pos_keyword == 'right': return dim - size - margin # Allow numerical values (absolute or fractional) if isinstance(pos_keyword, (int, float)): return pos_keyword return margin # Default fallback final_wm_pos = ( get_coord(width, wm_img.w, wm_pos[0], margin), get_coord(height, wm_img.h, wm_pos[1], margin) ) wm_img = wm_img.set_position(final_wm_pos) # Opacity wm_opacity = float(watermark_opts.get("opacity", 0.7)) if 0 <= wm_opacity < 1.0: wm_img = wm_img.set_opacity(wm_opacity) # Set duration to match video and composite watermark_clip_instance = wm_img.set_duration(final_video.duration).set_start(0) # Composite watermark on top final_video = CompositeVideoClip([final_video, watermark_clip_instance], size=(width, height), use_bgclip=True) clips_to_close.append(final_video) # The new composite needs closing too logger.info("✅ Watermark added.") except Exception as e: logger.error(f"❌ Failed to add watermark: {e}") # Don't add watermark_clip_instance to clips_to_close if it failed # --- Add End Logo Screen --- end_logo_clip_instance = None # Keep track for closing if end_logo_enabled and end_logo_path and os.path.exists(end_logo_path) and end_logo_duration > 0: try: logger.info(f"Adding end logo screen: {end_logo_path}") end_logo_clip_instance = ImageClip(end_logo_path).set_duration(end_logo_duration).resize(newsize=(width, height)) clips_to_close.append(end_logo_clip_instance) # Simple fade transition to end logo - Use concatenate again final_video = concatenate_videoclips([final_video, end_logo_clip_instance], method="compose", transition=crossfadein(0.5)) clips_to_close.append(final_video) # The *new* final video needs closing logger.info("✅ End logo screen added.") except Exception as e: logger.error(f"❌ Failed to add end logo screen: {e}") # --- Write Final Video --- final_output_path = None temp_audio_file_path = None # Keep track of temp audio file try: logger.info(f"Writing final video to: {output_path}...") # Generate a unique temp audio filename temp_audio_file_path = os.path.join(tempfile.gettempdir(), f"temp-audio-{uuid.uuid4().hex}.m4a") final_video.write_videofile( output_path, codec='libx264', # Good compatibility audio_codec='aac', # Standard audio codec temp_audiofile=temp_audio_file_path, # Use explicit temp file path remove_temp=True, # Let MoviePy handle removal on success fps=fps, preset='medium', # Balance quality and speed ('ultrafast'...'veryslow') threads=4, # Use multiple threads if available ffmpeg_params=[ # Ensure web compatibility '-pix_fmt', 'yuv420p', '-movflags', '+faststart' # Important for web streaming ], logger='bar' # Progress bar ) logger.info(f"✅ Final video saved successfully: {output_path}") final_output_path = output_path # Set success path except Exception as e: # Log full traceback for write errors import traceback logger.error(f"❌ ERROR writing final video file: {e}\n{traceback.format_exc()}") # Attempt to remove the potentially partially written output file if os.path.exists(output_path): try: os.remove(output_path) except OSError: logger.error(f"Could not remove partially written file: {output_path}") # Explicitly try removing temp audio if write failed and remove_temp=True might not have run if temp_audio_file_path and os.path.exists(temp_audio_file_path): try: os.remove(temp_audio_file_path) logger.info(f"Cleaned up temp audio file: {temp_audio_file_path}") except OSError: logger.error(f"Could not remove temp audio file: {temp_audio_file_path}") finally: # --- Close all opened clips --- logger.debug(f"Closing {len(clips_to_close)} MoviePy clips...") for clip in reversed(clips_to_close): # Close in reverse order (composites first) try: clip.close() except Exception as e: # Log closing errors but continue logger.warning(f"Error closing a clip: {e}") logger.debug("Finished closing clips.") return final_output_path # Return path only on success