Spaces:
Sleeping
Sleeping
| import os | |
| import time | |
| import tempfile | |
| import io | |
| import numpy as np | |
| import cv2 # Still used for reading logo/watermark if needed | |
| import logging | |
| import uuid | |
| import subprocess | |
| from PIL import Image, ImageFont, ImageDraw | |
| # --- MoviePy Imports --- | |
| from moviepy import * | |
| #from moviepy.editor import * | |
| # import moviepy.video.fx.all as vfx # Keep if you add more complex FX | |
| #from moviepy.video.tools.transitions import crossfadein, slide_in # Import specific transitions | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| # --- Constants --- | |
| DEFAULT_WIDTH = 1280 | |
| DEFAULT_HEIGHT = 720 | |
| DEFAULT_FPS = 24 | |
| DEFAULT_TRANSITION_DURATION = 0.75 # seconds | |
| DEFAULT_FONT = "lazy_dog.ttf" # *** ADJUST THIS PATH *** Needs to be accessible on your server | |
| DEFAULT_SUBTITLE_FONT_SIZE = 36 | |
| DEFAULT_SUBTITLE_COLOR = 'white' | |
| DEFAULT_SUBTITLE_BG_COLOR = 'rgba(0,0,0,0.5)' # Semi-transparent black background | |
| DEFAULT_SUBTITLE_POSITION = ('center', 'bottom') | |
| DEFAULT_LOGO_PATH = "sozo_logo2.png" # *** ADJUST THIS PATH *** | |
| # --- Helper Functions --- | |
| def get_styled_audio_duration(audio_path): | |
| """ Get duration of an audio file using MoviePy (preferred) or FFprobe fallback. """ | |
| if not audio_path or not os.path.exists(audio_path): | |
| logger.warning(f"Audio path invalid or file not found: {audio_path}. Defaulting duration to 5.0s.") | |
| return 5.0 # Default duration | |
| try: | |
| # Use MoviePy - more integrated & handles more formats potentially | |
| with AudioFileClip(audio_path) as clip: | |
| duration = clip.duration | |
| # Sometimes duration might be None or 0 for very short/corrupt files | |
| return duration if duration and duration > 0 else 5.0 | |
| except Exception as e: | |
| logger.warning(f"MoviePy failed to get duration for {audio_path}: {e}. Trying ffprobe.") | |
| try: | |
| result = subprocess.run( | |
| ['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', audio_path], | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| check=True, | |
| text=True # Get stdout as string | |
| ) | |
| return float(result.stdout.strip()) | |
| except Exception as e_ffprobe: | |
| logger.error(f"FFprobe also failed for {audio_path}: {e_ffprobe}. Returning default 5.0s.") | |
| return 5.0 | |
| def resize_image_aspect_fill(img_pil, target_width, target_height): | |
| """ Resizes PIL image to target dimensions, cropping if necessary to fill.""" | |
| try: | |
| target_ratio = target_width / target_height | |
| # Ensure image is in RGB for consistent processing | |
| img_pil = img_pil.convert("RGB") | |
| img_ratio = img_pil.width / img_pil.height | |
| if abs(target_ratio - img_ratio) < 0.01: # If aspect ratios are very close, just resize | |
| img_resized = img_pil.resize((target_width, target_height), Image.Resampling.LANCZOS) | |
| return img_resized | |
| if target_ratio > img_ratio: # Target is wider than image -> crop top/bottom | |
| new_width = target_width | |
| new_height = int(new_width / img_ratio) | |
| img_resized = img_pil.resize((new_width, new_height), Image.Resampling.LANCZOS) | |
| top = (new_height - target_height) // 2 | |
| bottom = top + target_height | |
| img_cropped = img_resized.crop((0, top, target_width, bottom)) | |
| else: # Target is taller than image (or same ratio) -> crop sides | |
| new_height = target_height | |
| new_width = int(new_height * img_ratio) | |
| img_resized = img_pil.resize((new_width, new_height), Image.Resampling.LANCZOS) | |
| left = (new_width - target_width) // 2 | |
| right = left + target_width | |
| img_cropped = img_resized.crop((left, 0, right, target_height)) | |
| return img_cropped | |
| except Exception as e: | |
| logger.error(f"Error resizing/cropping image: {e}. Returning original image resized without aspect correction.") | |
| # Fallback: simple resize, may introduce distortion or black bars later | |
| return img_pil.resize((target_width, target_height), Image.Resampling.LANCZOS) | |
| def create_subtitle_clip(text, duration, width, height, options): | |
| """Creates a MoviePy TextClip for subtitles.""" | |
| if not text or not isinstance(text, str) or not text.strip(): | |
| # logger.debug("Subtitle text is empty or invalid, skipping clip creation.") | |
| return None | |
| # Ensure defaults are handled robustly | |
| subtitle_opts = options or {} | |
| font_path = subtitle_opts.get("font", DEFAULT_FONT) | |
| fontsize = int(subtitle_opts.get("fontsize", DEFAULT_SUBTITLE_FONT_SIZE)) | |
| color = subtitle_opts.get("color", DEFAULT_SUBTITLE_COLOR) | |
| position = subtitle_opts.get("position", DEFAULT_SUBTITLE_POSITION) | |
| bg_color = subtitle_opts.get("bg_color", DEFAULT_SUBTITLE_BG_COLOR) # Background for readability | |
| stroke_color = subtitle_opts.get("stroke_color") # Optional outline | |
| stroke_width = float(subtitle_opts.get("stroke_width", 0)) # Default 0 if not specified | |
| method = subtitle_opts.get("method", "caption") # 'caption' automatically wraps text | |
| align = subtitle_opts.get("align", "center") | |
| margin = int(subtitle_opts.get("margin", 10)) # Margin from edge for position like ('center', 'bottom') | |
| # Adjust position based on margin for common cases | |
| if isinstance(position, (list, tuple)) and len(position) == 2: | |
| x_pos, y_pos = position | |
| if y_pos == 'bottom': | |
| final_pos = (x_pos, height - margin) # Anchor point is bottom of text | |
| elif y_pos == 'top': | |
| final_pos = (x_pos, margin) # Anchor point is top of text | |
| else: | |
| final_pos = position | |
| else: | |
| final_pos = position # Use as is if not tuple or custom values | |
| try: | |
| # Check font existence early | |
| if not os.path.exists(font_path): | |
| logger.error(f"β Subtitle font not found at '{font_path}'. Using MoviePy default.") | |
| font_path = None # Let MoviePy choose a default | |
| # Use method='caption' for automatic wrapping based on size | |
| # Limit width to 90% of video width for wrapping | |
| text_width_limit = width * 0.9 | |
| subtitle = TextClip( | |
| txt=text.strip(), # Ensure no leading/trailing whitespace | |
| fontsize=fontsize, | |
| color=color, | |
| font=font_path, | |
| size=(text_width_limit, None), # Width limit, height auto | |
| method=method, | |
| align=align, | |
| bg_color=bg_color, | |
| stroke_color=stroke_color if stroke_width > 0 else None, # Only apply if stroke_width > 0 | |
| stroke_width=stroke_width, | |
| print_cmd=False # Suppress verbose ffmpeg command print | |
| ).set_position(final_pos, relative=False).set_duration(duration) # relative=False if using pixel coords | |
| # Optional fade in/out for subtitles | |
| fade_duration = float(subtitle_opts.get("fade_duration", 0.3)) | |
| # Ensure fade doesn't exceed half the clip duration | |
| fade_duration = min(fade_duration, duration / 2.1) # Ensure non-overlapping fades | |
| if fade_duration > 0: | |
| subtitle = subtitle.crossfadein(fade_duration).crossfadeout(fade_duration) | |
| # logger.debug(f"β Created subtitle clip for text: '{text[:30]}...'") | |
| return subtitle | |
| except Exception as e: | |
| logger.error(f"β ERROR creating subtitle clip for text '{text[:30]}...': {e}") | |
| # Optionally try PIL fallback here if needed, but TextClip is generally more robust | |
| return None | |
| def create_particle_overlay_clip(particle_type, duration, width, height, options): | |
| """ | |
| Creates a particle overlay clip. | |
| Placeholder: Loads a pre-existing video file based on type. | |
| *** Requires actual particle video files on server *** | |
| """ | |
| if not particle_type or particle_type == 'none': | |
| return None | |
| particle_opts = options or {} | |
| # *** Define paths to your actual particle overlay videos here *** | |
| base_particle_path = "assets/particles" # Example base directory | |
| particle_files = { | |
| "snow": os.path.join(base_particle_path, "snow_overlay.mp4"), | |
| "sparkles": os.path.join(base_particle_path, "sparkles_overlay.mp4"), # Often .mov for alpha | |
| "rain": os.path.join(base_particle_path, "rain_overlay.mp4"), | |
| "confetti": os.path.join(base_particle_path, "confetti_overlay.mp4"), | |
| # Add more mappings for types you support | |
| } | |
| particle_path = particle_files.get(str(particle_type).lower()) | |
| if not particle_path: | |
| logger.warning(f"β οΈ Particle type '{particle_type}' not recognized or mapped.") | |
| return None | |
| if not os.path.exists(particle_path): | |
| logger.warning(f"β οΈ Particle overlay video not found for type '{particle_type}' at path: {particle_path}") | |
| return None | |
| try: | |
| logger.info(f"Creating particle overlay: {particle_type} from {particle_path}") | |
| # Load the overlay video. Check for alpha channel (.mov often has it) | |
| has_mask = particle_path.lower().endswith('.mov') | |
| overlay = VideoFileClip(particle_path, has_mask=has_mask, target_resolution=(height, width)) | |
| # Loop or trim the overlay to match the required duration | |
| if overlay.duration < duration: | |
| # Loop requires careful handling if audio present in overlay, disable audio | |
| overlay = overlay.loop(duration=duration).without_audio() | |
| else: | |
| overlay = overlay.subclip(0, duration).without_audio() | |
| # Resize MUST happen after loop/subclip if original is smaller | |
| overlay = overlay.resize(newsize=(width, height)) | |
| # Set opacity | |
| opacity = float(particle_opts.get("opacity", 0.6)) # Default 60% opacity | |
| if 0.0 <= opacity < 1.0: | |
| overlay = overlay.set_opacity(opacity) | |
| elif opacity < 0: | |
| overlay = overlay.set_opacity(0) # Clamp at 0 | |
| # Set position (usually centered) | |
| position = particle_opts.get("position", "center") | |
| return overlay.set_position(position) | |
| except Exception as e: | |
| # Log the full traceback for debugging particle issues | |
| import traceback | |
| logger.error(f"β ERROR creating particle overlay for {particle_type}: {e}\n{traceback.format_exc()}") | |
| return None | |
| # --- Main Video Creation Function using MoviePy --- | |
| def create_styled_video(images, audio_files, section_texts, output_path, config=None): | |
| """ | |
| Creates a video from images, audio, and text using MoviePy. | |
| Args: | |
| images (list): List of PIL Image objects (MUST NOT contain None here). | |
| audio_files (list): List of paths to audio files (can have None). | |
| section_texts (list): List of text strings for subtitles (can have None). | |
| output_path (str): Path to save the final video. | |
| config (dict): Dictionary containing configuration options. | |
| Returns: | |
| str: Path to the generated video file, or None on failure. | |
| """ | |
| # Ensure inputs are lists | |
| images = images or [] | |
| audio_files = audio_files or [] | |
| section_texts = section_texts or [] | |
| if not images: | |
| logger.error("β No images provided for video creation.") | |
| return None | |
| num_sections = len(images) | |
| # Pad other lists to match image count if necessary (should be handled by caller ideally) | |
| if len(audio_files) < num_sections: | |
| audio_files.extend([None] * (num_sections - len(audio_files))) | |
| if len(section_texts) < num_sections: | |
| section_texts.extend([None] * (num_sections - len(section_texts))) | |
| if config is None: | |
| config = {} | |
| logger.info(f"Starting video creation with config: {config}") | |
| # --- Get Configuration --- | |
| width = int(config.get("width", DEFAULT_WIDTH)) | |
| height = int(config.get("height", DEFAULT_HEIGHT)) | |
| fps = int(config.get("fps", DEFAULT_FPS)) | |
| transition_type = config.get("transition", "fade").lower() | |
| transition_duration = float(config.get("transition_duration", DEFAULT_TRANSITION_DURATION)) | |
| # Ensure transition duration isn't negative | |
| transition_duration = max(0, transition_duration) | |
| font_path = config.get("font_path", DEFAULT_FONT) # Primary font path | |
| subtitle_opts = config.get("subtitle_options", {}) | |
| subtitles_enabled = subtitle_opts.get("enabled", True) | |
| if "font" not in subtitle_opts: subtitle_opts["font"] = font_path # Inherit default font | |
| particle_opts = config.get("particle_options", {}) | |
| particles_enabled = particle_opts.get("enabled", False) | |
| # Ensure particle types list exists and matches section count | |
| particle_types = particle_opts.get("types_per_section", []) | |
| if len(particle_types) != num_sections: | |
| logger.warning(f"Particle types list length mismatch ({len(particle_types)} vs {num_sections} sections). Disabling particles for safety.") | |
| particle_types = [None] * num_sections # Reset to None | |
| particle_opts['types_per_section'] = particle_types # Store potentially corrected list | |
| watermark_opts = config.get("watermark_options", {}) | |
| watermark_enabled = watermark_opts.get("enabled", False) and watermark_opts.get("path") | |
| watermark_path = watermark_opts.get("path") if watermark_enabled else None | |
| end_logo_opts = config.get("end_logo_options", {}) | |
| end_logo_enabled = end_logo_opts.get("enabled", True) | |
| # Use default Sozo logo path if enabled but no path given | |
| end_logo_path = end_logo_opts.get("path", DEFAULT_LOGO_PATH) if end_logo_enabled else None | |
| end_logo_duration = float(end_logo_opts.get("duration", 3.0)) | |
| # --- Resource Management --- | |
| clips_to_close = [] # Keep track of clips needing .close() | |
| # --- Prepare Clips --- | |
| section_video_clips = [] # Holds the final composited clip for each section | |
| section_audio_clips = [] # Holds the audio clip for each section | |
| final_durations = [] # Holds the calculated duration for each section | |
| logger.info(f"Processing {num_sections} sections for video...") | |
| total_duration_est = 0 | |
| for i in range(num_sections): | |
| img_pil = images[i] | |
| audio_path = audio_files[i] | |
| text = section_texts[i] | |
| particle_type = particle_types[i] if particles_enabled else None | |
| logger.info(f"--- Section {i+1}/{num_sections} ---") | |
| # Determine duration (based on audio or default) | |
| duration = get_styled_audio_duration(audio_path) | |
| final_durations.append(duration) | |
| logger.info(f" Duration: {duration:.2f}s (Audio: {os.path.basename(str(audio_path)) if audio_path else 'No'})") | |
| logger.info(f" Text: '{str(text)[:40]}...'") | |
| logger.info(f" Particle: {particle_type if particle_type else 'None'}") | |
| # --- Create Image Clip --- | |
| img_clip = None | |
| try: | |
| img_resized_pil = resize_image_aspect_fill(img_pil, width, height) | |
| img_np = np.array(img_resized_pil) | |
| img_clip = ImageClip(img_np).set_duration(duration).set_fps(fps) | |
| clips_to_close.append(img_clip) # Add base image clip for potential closing | |
| except Exception as e: | |
| logger.error(f"β Failed to process image {i+1}: {e}. Creating black frame.") | |
| # Use a black frame as placeholder to avoid crashing | |
| img_clip = ColorClip(size=(width, height), color=(0,0,0), duration=duration) | |
| clips_to_close.append(img_clip) | |
| # --- Create Audio Clip --- | |
| section_audio_clip = None | |
| if audio_path and os.path.exists(audio_path): | |
| try: | |
| # Load audio and ensure duration matches video segment exactly | |
| # Use try-except for AudioFileClip as it can fail on corrupted files | |
| temp_audio = AudioFileClip(audio_path) | |
| clips_to_close.append(temp_audio) # Add for closing | |
| # Trim or pad audio if necessary (MoviePy often handles slight discrepancies) | |
| section_audio_clip = temp_audio.subclip(0, min(temp_audio.duration, duration)) | |
| # If audio is shorter than video, MoviePy usually pads with silence automatically when concatenating. | |
| # If you need explicit looping/padding logic, add it here. | |
| if section_audio_clip.duration < duration - 0.01: # Allow small tolerance | |
| logger.warning(f"Audio duration ({section_audio_clip.duration:.2f}s) shorter than video ({duration:.2f}s). Silence will be added.") | |
| section_audio_clips.append(section_audio_clip) | |
| except Exception as e: | |
| logger.error(f"β Failed to load audio '{os.path.basename(audio_path)}': {e}. Adding silence.") | |
| # Add silence to keep timing consistent | |
| silence = AudioClip(lambda t: 0, duration=duration, fps=44100) # Standard audio fps | |
| section_audio_clips.append(silence) | |
| clips_to_close.append(silence) | |
| else: | |
| # Add silence if no audio for this section | |
| logger.info(" No audio path or file not found, adding silence.") | |
| silence = AudioClip(lambda t: 0, duration=duration, fps=44100) | |
| section_audio_clips.append(silence) | |
| clips_to_close.append(silence) | |
| # --- Create Subtitle Clip --- | |
| subtitle_clip = None | |
| if subtitles_enabled and text: | |
| logger.info(" Creating subtitle clip...") | |
| subtitle_clip = create_subtitle_clip(text, duration, width, height, subtitle_opts) | |
| if subtitle_clip: | |
| clips_to_close.append(subtitle_clip) | |
| # --- Create Particle Overlay Clip --- | |
| particle_clip = None | |
| if particles_enabled and particle_type: | |
| logger.info(f" Creating particle overlay: {particle_type}...") | |
| particle_clip = create_particle_overlay_clip(particle_type, duration, width, height, particle_opts) | |
| if particle_clip: | |
| clips_to_close.append(particle_clip) | |
| # --- Composite Section --- | |
| # Layer order: Image -> Particles -> Subtitles | |
| composited_layers = [img_clip] # Base image | |
| if particle_clip: | |
| composited_layers.append(particle_clip) | |
| if subtitle_clip: | |
| composited_layers.append(subtitle_clip) | |
| # Only composite if more than one layer exists | |
| if len(composited_layers) > 1: | |
| final_section_clip = CompositeVideoClip(composited_layers, size=(width, height)).set_duration(duration).set_fps(fps) | |
| clips_to_close.append(final_section_clip) # Add composite for closing | |
| else: | |
| final_section_clip = img_clip # Just the image clip if no overlays | |
| section_video_clips.append(final_section_clip) | |
| total_duration_est += duration | |
| if not section_video_clips: | |
| logger.error("β No valid video clips were created for any section.") | |
| # Cleanup clips created so far | |
| for clip in clips_to_close: | |
| try: clip.close() | |
| except: pass | |
| return None | |
| logger.info(f"Total estimated video duration (before transitions/end logo): {total_duration_est:.2f}s") | |
| # --- Concatenate Video Clips with Transitions --- | |
| final_video = None | |
| if len(section_video_clips) > 1 and transition_type != 'none' and transition_duration > 0: | |
| # Ensure transition isn't longer than the shortest clip involved | |
| min_clip_dur = min(c.duration for c in section_video_clips) | |
| safe_transition_duration = min(transition_duration, min_clip_dur / 2.01) # Ensure overlap is possible | |
| if safe_transition_duration < transition_duration: | |
| logger.warning(f"Requested transition duration ({transition_duration}s) too long for shortest clip ({min_clip_dur:.2f}s). Clamping to {safe_transition_duration:.2f}s.") | |
| transition_duration = safe_transition_duration | |
| logger.info(f"Applying '{transition_type}' transitions with duration {transition_duration:.2f}s...") | |
| if transition_type == 'fade': | |
| # Crossfade is best handled by concatenate_videoclips's transition argument | |
| final_video = concatenate_videoclips( | |
| section_video_clips, | |
| method="compose", | |
| transition=crossfadein(transition_duration) | |
| ) | |
| elif transition_type.startswith('slide_'): | |
| # Manual slide transitions require more complex composition | |
| direction = transition_type.split('_')[1] # 'left', 'right', 'up', 'down' | |
| processed_clips = [] | |
| current_time = 0 | |
| for i, clip in enumerate(section_video_clips): | |
| clip = clip.set_start(current_time) | |
| if i > 0: # Apply slide-in from second clip onwards | |
| clip = slide_in(clip, duration=transition_duration, side=direction) | |
| processed_clips.append(clip) | |
| # Move next clip's start time back by transition duration for overlap | |
| current_time += clip.duration - (transition_duration if i < len(section_video_clips) - 1 else 0) | |
| final_video = CompositeVideoClip(processed_clips, size=(width, height)) | |
| else: | |
| logger.warning(f"Unsupported transition type '{transition_type}', falling back to 'fade'.") | |
| final_video = concatenate_videoclips( | |
| section_video_clips, | |
| method="compose", | |
| transition=crossfadein(transition_duration) | |
| ) | |
| else: | |
| logger.info("Concatenating clips without transitions...") | |
| final_video = concatenate_videoclips(section_video_clips, method="compose") | |
| if not final_video: | |
| logger.error("β Failed to concatenate video clips.") | |
| # Cleanup clips | |
| for clip in clips_to_close: | |
| try: clip.close() | |
| except: pass | |
| return None | |
| clips_to_close.append(final_video) # Add the main concatenated video for closing | |
| # --- Concatenate Audio --- | |
| final_audio = None | |
| if section_audio_clips: | |
| logger.info("Concatenating audio clips...") | |
| try: | |
| final_audio = concatenate_audioclips(section_audio_clips) | |
| # Set the final video's audio | |
| final_video = final_video.set_audio(final_audio) | |
| logger.info(f"Combined audio duration: {final_audio.duration:.2f}s. Video duration: {final_video.duration:.2f}s") | |
| # Moviepy usually handles slight mismatches, but log if significant | |
| if abs(final_audio.duration - final_video.duration) > 0.1: | |
| logger.warning("Significant mismatch between final video and audio durations detected.") | |
| clips_to_close.append(final_audio) | |
| except Exception as e: | |
| logger.error(f"β Failed to concatenate or set audio: {e}. Video will be silent.") | |
| final_video = final_video.set_audio(None) # Ensure no audio track if failed | |
| else: | |
| logger.warning("No audio clips found or generated. Video will be silent.") | |
| final_video = final_video.set_audio(None) | |
| # --- Add Watermark --- | |
| watermark_clip_instance = None # Keep track for closing | |
| if watermark_enabled and watermark_path and os.path.exists(watermark_path): | |
| try: | |
| logger.info(f"Adding watermark from: {watermark_path}") | |
| # Use ismask=True if your watermark PNG has transparency | |
| is_mask = watermark_path.lower().endswith(".png") | |
| wm_img = ImageClip(watermark_path, ismask=is_mask, transparent=True) | |
| clips_to_close.append(wm_img) | |
| # Size | |
| wm_size_param = watermark_opts.get("size", 0.15) # Default: 15% of video width | |
| target_wm_width = None | |
| target_wm_height = None | |
| if isinstance(wm_size_param, float): # Relative size based on video width | |
| target_wm_width = int(width * wm_size_param) | |
| elif isinstance(wm_size_param, (tuple, list)) and len(wm_size_param) == 2: # Absolute size (w, h) | |
| target_wm_width = int(wm_size_param[0]) | |
| target_wm_height = int(wm_size_param[1]) | |
| # Resize watermark maintaining aspect ratio | |
| if target_wm_width and target_wm_height: | |
| wm_img = wm_img.resize(newsize=(target_wm_width, target_wm_height)) | |
| elif target_wm_width: | |
| wm_img = wm_img.resize(width=target_wm_width) | |
| elif target_wm_height: | |
| wm_img = wm_img.resize(height=target_wm_height) | |
| # Else use original size if no specific size given | |
| # Position with margin | |
| wm_pos = watermark_opts.get("position", ("right", "bottom")) # Default bottom right | |
| margin = int(watermark_opts.get("margin", 15)) # Pixels margin | |
| # Convert position keywords to coordinates respecting margin | |
| def get_coord(dim, size, pos_keyword, margin): | |
| if pos_keyword == 'left': return margin | |
| if pos_keyword == 'center': return (dim / 2) - (size / 2) | |
| if pos_keyword == 'right': return dim - size - margin | |
| # Allow numerical values (absolute or fractional) | |
| if isinstance(pos_keyword, (int, float)): return pos_keyword | |
| return margin # Default fallback | |
| final_wm_pos = ( | |
| get_coord(width, wm_img.w, wm_pos[0], margin), | |
| get_coord(height, wm_img.h, wm_pos[1], margin) | |
| ) | |
| wm_img = wm_img.set_position(final_wm_pos) | |
| # Opacity | |
| wm_opacity = float(watermark_opts.get("opacity", 0.7)) | |
| if 0 <= wm_opacity < 1.0: | |
| wm_img = wm_img.set_opacity(wm_opacity) | |
| # Set duration to match video and composite | |
| watermark_clip_instance = wm_img.set_duration(final_video.duration).set_start(0) | |
| # Composite watermark on top | |
| final_video = CompositeVideoClip([final_video, watermark_clip_instance], size=(width, height), use_bgclip=True) | |
| clips_to_close.append(final_video) # The new composite needs closing too | |
| logger.info("β Watermark added.") | |
| except Exception as e: | |
| logger.error(f"β Failed to add watermark: {e}") | |
| # Don't add watermark_clip_instance to clips_to_close if it failed | |
| # --- Add End Logo Screen --- | |
| end_logo_clip_instance = None # Keep track for closing | |
| if end_logo_enabled and end_logo_path and os.path.exists(end_logo_path) and end_logo_duration > 0: | |
| try: | |
| logger.info(f"Adding end logo screen: {end_logo_path}") | |
| end_logo_clip_instance = ImageClip(end_logo_path).set_duration(end_logo_duration).resize(newsize=(width, height)) | |
| clips_to_close.append(end_logo_clip_instance) | |
| # Simple fade transition to end logo - Use concatenate again | |
| final_video = concatenate_videoclips([final_video, end_logo_clip_instance], method="compose", transition=crossfadein(0.5)) | |
| clips_to_close.append(final_video) # The *new* final video needs closing | |
| logger.info("β End logo screen added.") | |
| except Exception as e: | |
| logger.error(f"β Failed to add end logo screen: {e}") | |
| # --- Write Final Video --- | |
| final_output_path = None | |
| temp_audio_file_path = None # Keep track of temp audio file | |
| try: | |
| logger.info(f"Writing final video to: {output_path}...") | |
| # Generate a unique temp audio filename | |
| temp_audio_file_path = os.path.join(tempfile.gettempdir(), f"temp-audio-{uuid.uuid4().hex}.m4a") | |
| final_video.write_videofile( | |
| output_path, | |
| codec='libx264', # Good compatibility | |
| audio_codec='aac', # Standard audio codec | |
| temp_audiofile=temp_audio_file_path, # Use explicit temp file path | |
| remove_temp=True, # Let MoviePy handle removal on success | |
| fps=fps, | |
| preset='medium', # Balance quality and speed ('ultrafast'...'veryslow') | |
| threads=4, # Use multiple threads if available | |
| ffmpeg_params=[ # Ensure web compatibility | |
| '-pix_fmt', 'yuv420p', | |
| '-movflags', '+faststart' # Important for web streaming | |
| ], | |
| logger='bar' # Progress bar | |
| ) | |
| logger.info(f"β Final video saved successfully: {output_path}") | |
| final_output_path = output_path # Set success path | |
| except Exception as e: | |
| # Log full traceback for write errors | |
| import traceback | |
| logger.error(f"β ERROR writing final video file: {e}\n{traceback.format_exc()}") | |
| # Attempt to remove the potentially partially written output file | |
| if os.path.exists(output_path): | |
| try: os.remove(output_path) | |
| except OSError: logger.error(f"Could not remove partially written file: {output_path}") | |
| # Explicitly try removing temp audio if write failed and remove_temp=True might not have run | |
| if temp_audio_file_path and os.path.exists(temp_audio_file_path): | |
| try: | |
| os.remove(temp_audio_file_path) | |
| logger.info(f"Cleaned up temp audio file: {temp_audio_file_path}") | |
| except OSError: | |
| logger.error(f"Could not remove temp audio file: {temp_audio_file_path}") | |
| finally: | |
| # --- Close all opened clips --- | |
| logger.debug(f"Closing {len(clips_to_close)} MoviePy clips...") | |
| for clip in reversed(clips_to_close): # Close in reverse order (composites first) | |
| try: | |
| clip.close() | |
| except Exception as e: | |
| # Log closing errors but continue | |
| logger.warning(f"Error closing a clip: {e}") | |
| logger.debug("Finished closing clips.") | |
| return final_output_path # Return path only on success |