sozo-api / styled_video_gen.py
rairo's picture
Update styled_video_gen.py
3d5c392 verified
raw
history blame
30 kB
import os
import time
import tempfile
import io
import numpy as np
import cv2 # Still used for reading logo/watermark if needed
import logging
import uuid
import subprocess
from PIL import Image, ImageFont, ImageDraw
# --- MoviePy Imports ---
from moviepy import *
#from moviepy.editor import *
# import moviepy.video.fx.all as vfx # Keep if you add more complex FX
#from moviepy.video.tools.transitions import crossfadein, slide_in # Import specific transitions
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# --- Constants ---
DEFAULT_WIDTH = 1280
DEFAULT_HEIGHT = 720
DEFAULT_FPS = 24
DEFAULT_TRANSITION_DURATION = 0.75 # seconds
DEFAULT_FONT = "lazy_dog.ttf" # *** ADJUST THIS PATH *** Needs to be accessible on your server
DEFAULT_SUBTITLE_FONT_SIZE = 36
DEFAULT_SUBTITLE_COLOR = 'white'
DEFAULT_SUBTITLE_BG_COLOR = 'rgba(0,0,0,0.5)' # Semi-transparent black background
DEFAULT_SUBTITLE_POSITION = ('center', 'bottom')
DEFAULT_LOGO_PATH = "sozo_logo2.png" # *** ADJUST THIS PATH ***
# --- Helper Functions ---
def get_styled_audio_duration(audio_path):
""" Get duration of an audio file using MoviePy (preferred) or FFprobe fallback. """
if not audio_path or not os.path.exists(audio_path):
logger.warning(f"Audio path invalid or file not found: {audio_path}. Defaulting duration to 5.0s.")
return 5.0 # Default duration
try:
# Use MoviePy - more integrated & handles more formats potentially
with AudioFileClip(audio_path) as clip:
duration = clip.duration
# Sometimes duration might be None or 0 for very short/corrupt files
return duration if duration and duration > 0 else 5.0
except Exception as e:
logger.warning(f"MoviePy failed to get duration for {audio_path}: {e}. Trying ffprobe.")
try:
result = subprocess.run(
['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', audio_path],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True,
text=True # Get stdout as string
)
return float(result.stdout.strip())
except Exception as e_ffprobe:
logger.error(f"FFprobe also failed for {audio_path}: {e_ffprobe}. Returning default 5.0s.")
return 5.0
def resize_image_aspect_fill(img_pil, target_width, target_height):
""" Resizes PIL image to target dimensions, cropping if necessary to fill."""
try:
target_ratio = target_width / target_height
# Ensure image is in RGB for consistent processing
img_pil = img_pil.convert("RGB")
img_ratio = img_pil.width / img_pil.height
if abs(target_ratio - img_ratio) < 0.01: # If aspect ratios are very close, just resize
img_resized = img_pil.resize((target_width, target_height), Image.Resampling.LANCZOS)
return img_resized
if target_ratio > img_ratio: # Target is wider than image -> crop top/bottom
new_width = target_width
new_height = int(new_width / img_ratio)
img_resized = img_pil.resize((new_width, new_height), Image.Resampling.LANCZOS)
top = (new_height - target_height) // 2
bottom = top + target_height
img_cropped = img_resized.crop((0, top, target_width, bottom))
else: # Target is taller than image (or same ratio) -> crop sides
new_height = target_height
new_width = int(new_height * img_ratio)
img_resized = img_pil.resize((new_width, new_height), Image.Resampling.LANCZOS)
left = (new_width - target_width) // 2
right = left + target_width
img_cropped = img_resized.crop((left, 0, right, target_height))
return img_cropped
except Exception as e:
logger.error(f"Error resizing/cropping image: {e}. Returning original image resized without aspect correction.")
# Fallback: simple resize, may introduce distortion or black bars later
return img_pil.resize((target_width, target_height), Image.Resampling.LANCZOS)
def create_subtitle_clip(text, duration, width, height, options):
"""Creates a MoviePy TextClip for subtitles."""
if not text or not isinstance(text, str) or not text.strip():
# logger.debug("Subtitle text is empty or invalid, skipping clip creation.")
return None
# Ensure defaults are handled robustly
subtitle_opts = options or {}
font_path = subtitle_opts.get("font", DEFAULT_FONT)
fontsize = int(subtitle_opts.get("fontsize", DEFAULT_SUBTITLE_FONT_SIZE))
color = subtitle_opts.get("color", DEFAULT_SUBTITLE_COLOR)
position = subtitle_opts.get("position", DEFAULT_SUBTITLE_POSITION)
bg_color = subtitle_opts.get("bg_color", DEFAULT_SUBTITLE_BG_COLOR) # Background for readability
stroke_color = subtitle_opts.get("stroke_color") # Optional outline
stroke_width = float(subtitle_opts.get("stroke_width", 0)) # Default 0 if not specified
method = subtitle_opts.get("method", "caption") # 'caption' automatically wraps text
align = subtitle_opts.get("align", "center")
margin = int(subtitle_opts.get("margin", 10)) # Margin from edge for position like ('center', 'bottom')
# Adjust position based on margin for common cases
if isinstance(position, (list, tuple)) and len(position) == 2:
x_pos, y_pos = position
if y_pos == 'bottom':
final_pos = (x_pos, height - margin) # Anchor point is bottom of text
elif y_pos == 'top':
final_pos = (x_pos, margin) # Anchor point is top of text
else:
final_pos = position
else:
final_pos = position # Use as is if not tuple or custom values
try:
# Check font existence early
if not os.path.exists(font_path):
logger.error(f"❌ Subtitle font not found at '{font_path}'. Using MoviePy default.")
font_path = None # Let MoviePy choose a default
# Use method='caption' for automatic wrapping based on size
# Limit width to 90% of video width for wrapping
text_width_limit = width * 0.9
subtitle = TextClip(
txt=text.strip(), # Ensure no leading/trailing whitespace
fontsize=fontsize,
color=color,
font=font_path,
size=(text_width_limit, None), # Width limit, height auto
method=method,
align=align,
bg_color=bg_color,
stroke_color=stroke_color if stroke_width > 0 else None, # Only apply if stroke_width > 0
stroke_width=stroke_width,
print_cmd=False # Suppress verbose ffmpeg command print
).set_position(final_pos, relative=False).set_duration(duration) # relative=False if using pixel coords
# Optional fade in/out for subtitles
fade_duration = float(subtitle_opts.get("fade_duration", 0.3))
# Ensure fade doesn't exceed half the clip duration
fade_duration = min(fade_duration, duration / 2.1) # Ensure non-overlapping fades
if fade_duration > 0:
subtitle = subtitle.crossfadein(fade_duration).crossfadeout(fade_duration)
# logger.debug(f"βœ… Created subtitle clip for text: '{text[:30]}...'")
return subtitle
except Exception as e:
logger.error(f"❌ ERROR creating subtitle clip for text '{text[:30]}...': {e}")
# Optionally try PIL fallback here if needed, but TextClip is generally more robust
return None
def create_particle_overlay_clip(particle_type, duration, width, height, options):
"""
Creates a particle overlay clip.
Placeholder: Loads a pre-existing video file based on type.
*** Requires actual particle video files on server ***
"""
if not particle_type or particle_type == 'none':
return None
particle_opts = options or {}
# *** Define paths to your actual particle overlay videos here ***
base_particle_path = "assets/particles" # Example base directory
particle_files = {
"snow": os.path.join(base_particle_path, "snow_overlay.mp4"),
"sparkles": os.path.join(base_particle_path, "sparkles_overlay.mp4"), # Often .mov for alpha
"rain": os.path.join(base_particle_path, "rain_overlay.mp4"),
"confetti": os.path.join(base_particle_path, "confetti_overlay.mp4"),
# Add more mappings for types you support
}
particle_path = particle_files.get(str(particle_type).lower())
if not particle_path:
logger.warning(f"⚠️ Particle type '{particle_type}' not recognized or mapped.")
return None
if not os.path.exists(particle_path):
logger.warning(f"⚠️ Particle overlay video not found for type '{particle_type}' at path: {particle_path}")
return None
try:
logger.info(f"Creating particle overlay: {particle_type} from {particle_path}")
# Load the overlay video. Check for alpha channel (.mov often has it)
has_mask = particle_path.lower().endswith('.mov')
overlay = VideoFileClip(particle_path, has_mask=has_mask, target_resolution=(height, width))
# Loop or trim the overlay to match the required duration
if overlay.duration < duration:
# Loop requires careful handling if audio present in overlay, disable audio
overlay = overlay.loop(duration=duration).without_audio()
else:
overlay = overlay.subclip(0, duration).without_audio()
# Resize MUST happen after loop/subclip if original is smaller
overlay = overlay.resize(newsize=(width, height))
# Set opacity
opacity = float(particle_opts.get("opacity", 0.6)) # Default 60% opacity
if 0.0 <= opacity < 1.0:
overlay = overlay.set_opacity(opacity)
elif opacity < 0:
overlay = overlay.set_opacity(0) # Clamp at 0
# Set position (usually centered)
position = particle_opts.get("position", "center")
return overlay.set_position(position)
except Exception as e:
# Log the full traceback for debugging particle issues
import traceback
logger.error(f"❌ ERROR creating particle overlay for {particle_type}: {e}\n{traceback.format_exc()}")
return None
# --- Main Video Creation Function using MoviePy ---
def create_styled_video(images, audio_files, section_texts, output_path, config=None):
"""
Creates a video from images, audio, and text using MoviePy.
Args:
images (list): List of PIL Image objects (MUST NOT contain None here).
audio_files (list): List of paths to audio files (can have None).
section_texts (list): List of text strings for subtitles (can have None).
output_path (str): Path to save the final video.
config (dict): Dictionary containing configuration options.
Returns:
str: Path to the generated video file, or None on failure.
"""
# Ensure inputs are lists
images = images or []
audio_files = audio_files or []
section_texts = section_texts or []
if not images:
logger.error("❌ No images provided for video creation.")
return None
num_sections = len(images)
# Pad other lists to match image count if necessary (should be handled by caller ideally)
if len(audio_files) < num_sections:
audio_files.extend([None] * (num_sections - len(audio_files)))
if len(section_texts) < num_sections:
section_texts.extend([None] * (num_sections - len(section_texts)))
if config is None:
config = {}
logger.info(f"Starting video creation with config: {config}")
# --- Get Configuration ---
width = int(config.get("width", DEFAULT_WIDTH))
height = int(config.get("height", DEFAULT_HEIGHT))
fps = int(config.get("fps", DEFAULT_FPS))
transition_type = config.get("transition", "fade").lower()
transition_duration = float(config.get("transition_duration", DEFAULT_TRANSITION_DURATION))
# Ensure transition duration isn't negative
transition_duration = max(0, transition_duration)
font_path = config.get("font_path", DEFAULT_FONT) # Primary font path
subtitle_opts = config.get("subtitle_options", {})
subtitles_enabled = subtitle_opts.get("enabled", True)
if "font" not in subtitle_opts: subtitle_opts["font"] = font_path # Inherit default font
particle_opts = config.get("particle_options", {})
particles_enabled = particle_opts.get("enabled", False)
# Ensure particle types list exists and matches section count
particle_types = particle_opts.get("types_per_section", [])
if len(particle_types) != num_sections:
logger.warning(f"Particle types list length mismatch ({len(particle_types)} vs {num_sections} sections). Disabling particles for safety.")
particle_types = [None] * num_sections # Reset to None
particle_opts['types_per_section'] = particle_types # Store potentially corrected list
watermark_opts = config.get("watermark_options", {})
watermark_enabled = watermark_opts.get("enabled", False) and watermark_opts.get("path")
watermark_path = watermark_opts.get("path") if watermark_enabled else None
end_logo_opts = config.get("end_logo_options", {})
end_logo_enabled = end_logo_opts.get("enabled", True)
# Use default Sozo logo path if enabled but no path given
end_logo_path = end_logo_opts.get("path", DEFAULT_LOGO_PATH) if end_logo_enabled else None
end_logo_duration = float(end_logo_opts.get("duration", 3.0))
# --- Resource Management ---
clips_to_close = [] # Keep track of clips needing .close()
# --- Prepare Clips ---
section_video_clips = [] # Holds the final composited clip for each section
section_audio_clips = [] # Holds the audio clip for each section
final_durations = [] # Holds the calculated duration for each section
logger.info(f"Processing {num_sections} sections for video...")
total_duration_est = 0
for i in range(num_sections):
img_pil = images[i]
audio_path = audio_files[i]
text = section_texts[i]
particle_type = particle_types[i] if particles_enabled else None
logger.info(f"--- Section {i+1}/{num_sections} ---")
# Determine duration (based on audio or default)
duration = get_styled_audio_duration(audio_path)
final_durations.append(duration)
logger.info(f" Duration: {duration:.2f}s (Audio: {os.path.basename(str(audio_path)) if audio_path else 'No'})")
logger.info(f" Text: '{str(text)[:40]}...'")
logger.info(f" Particle: {particle_type if particle_type else 'None'}")
# --- Create Image Clip ---
img_clip = None
try:
img_resized_pil = resize_image_aspect_fill(img_pil, width, height)
img_np = np.array(img_resized_pil)
img_clip = ImageClip(img_np).set_duration(duration).set_fps(fps)
clips_to_close.append(img_clip) # Add base image clip for potential closing
except Exception as e:
logger.error(f"❌ Failed to process image {i+1}: {e}. Creating black frame.")
# Use a black frame as placeholder to avoid crashing
img_clip = ColorClip(size=(width, height), color=(0,0,0), duration=duration)
clips_to_close.append(img_clip)
# --- Create Audio Clip ---
section_audio_clip = None
if audio_path and os.path.exists(audio_path):
try:
# Load audio and ensure duration matches video segment exactly
# Use try-except for AudioFileClip as it can fail on corrupted files
temp_audio = AudioFileClip(audio_path)
clips_to_close.append(temp_audio) # Add for closing
# Trim or pad audio if necessary (MoviePy often handles slight discrepancies)
section_audio_clip = temp_audio.subclip(0, min(temp_audio.duration, duration))
# If audio is shorter than video, MoviePy usually pads with silence automatically when concatenating.
# If you need explicit looping/padding logic, add it here.
if section_audio_clip.duration < duration - 0.01: # Allow small tolerance
logger.warning(f"Audio duration ({section_audio_clip.duration:.2f}s) shorter than video ({duration:.2f}s). Silence will be added.")
section_audio_clips.append(section_audio_clip)
except Exception as e:
logger.error(f"❌ Failed to load audio '{os.path.basename(audio_path)}': {e}. Adding silence.")
# Add silence to keep timing consistent
silence = AudioClip(lambda t: 0, duration=duration, fps=44100) # Standard audio fps
section_audio_clips.append(silence)
clips_to_close.append(silence)
else:
# Add silence if no audio for this section
logger.info(" No audio path or file not found, adding silence.")
silence = AudioClip(lambda t: 0, duration=duration, fps=44100)
section_audio_clips.append(silence)
clips_to_close.append(silence)
# --- Create Subtitle Clip ---
subtitle_clip = None
if subtitles_enabled and text:
logger.info(" Creating subtitle clip...")
subtitle_clip = create_subtitle_clip(text, duration, width, height, subtitle_opts)
if subtitle_clip:
clips_to_close.append(subtitle_clip)
# --- Create Particle Overlay Clip ---
particle_clip = None
if particles_enabled and particle_type:
logger.info(f" Creating particle overlay: {particle_type}...")
particle_clip = create_particle_overlay_clip(particle_type, duration, width, height, particle_opts)
if particle_clip:
clips_to_close.append(particle_clip)
# --- Composite Section ---
# Layer order: Image -> Particles -> Subtitles
composited_layers = [img_clip] # Base image
if particle_clip:
composited_layers.append(particle_clip)
if subtitle_clip:
composited_layers.append(subtitle_clip)
# Only composite if more than one layer exists
if len(composited_layers) > 1:
final_section_clip = CompositeVideoClip(composited_layers, size=(width, height)).set_duration(duration).set_fps(fps)
clips_to_close.append(final_section_clip) # Add composite for closing
else:
final_section_clip = img_clip # Just the image clip if no overlays
section_video_clips.append(final_section_clip)
total_duration_est += duration
if not section_video_clips:
logger.error("❌ No valid video clips were created for any section.")
# Cleanup clips created so far
for clip in clips_to_close:
try: clip.close()
except: pass
return None
logger.info(f"Total estimated video duration (before transitions/end logo): {total_duration_est:.2f}s")
# --- Concatenate Video Clips with Transitions ---
final_video = None
if len(section_video_clips) > 1 and transition_type != 'none' and transition_duration > 0:
# Ensure transition isn't longer than the shortest clip involved
min_clip_dur = min(c.duration for c in section_video_clips)
safe_transition_duration = min(transition_duration, min_clip_dur / 2.01) # Ensure overlap is possible
if safe_transition_duration < transition_duration:
logger.warning(f"Requested transition duration ({transition_duration}s) too long for shortest clip ({min_clip_dur:.2f}s). Clamping to {safe_transition_duration:.2f}s.")
transition_duration = safe_transition_duration
logger.info(f"Applying '{transition_type}' transitions with duration {transition_duration:.2f}s...")
if transition_type == 'fade':
# Crossfade is best handled by concatenate_videoclips's transition argument
final_video = concatenate_videoclips(
section_video_clips,
method="compose",
transition=crossfadein(transition_duration)
)
elif transition_type.startswith('slide_'):
# Manual slide transitions require more complex composition
direction = transition_type.split('_')[1] # 'left', 'right', 'up', 'down'
processed_clips = []
current_time = 0
for i, clip in enumerate(section_video_clips):
clip = clip.set_start(current_time)
if i > 0: # Apply slide-in from second clip onwards
clip = slide_in(clip, duration=transition_duration, side=direction)
processed_clips.append(clip)
# Move next clip's start time back by transition duration for overlap
current_time += clip.duration - (transition_duration if i < len(section_video_clips) - 1 else 0)
final_video = CompositeVideoClip(processed_clips, size=(width, height))
else:
logger.warning(f"Unsupported transition type '{transition_type}', falling back to 'fade'.")
final_video = concatenate_videoclips(
section_video_clips,
method="compose",
transition=crossfadein(transition_duration)
)
else:
logger.info("Concatenating clips without transitions...")
final_video = concatenate_videoclips(section_video_clips, method="compose")
if not final_video:
logger.error("❌ Failed to concatenate video clips.")
# Cleanup clips
for clip in clips_to_close:
try: clip.close()
except: pass
return None
clips_to_close.append(final_video) # Add the main concatenated video for closing
# --- Concatenate Audio ---
final_audio = None
if section_audio_clips:
logger.info("Concatenating audio clips...")
try:
final_audio = concatenate_audioclips(section_audio_clips)
# Set the final video's audio
final_video = final_video.set_audio(final_audio)
logger.info(f"Combined audio duration: {final_audio.duration:.2f}s. Video duration: {final_video.duration:.2f}s")
# Moviepy usually handles slight mismatches, but log if significant
if abs(final_audio.duration - final_video.duration) > 0.1:
logger.warning("Significant mismatch between final video and audio durations detected.")
clips_to_close.append(final_audio)
except Exception as e:
logger.error(f"❌ Failed to concatenate or set audio: {e}. Video will be silent.")
final_video = final_video.set_audio(None) # Ensure no audio track if failed
else:
logger.warning("No audio clips found or generated. Video will be silent.")
final_video = final_video.set_audio(None)
# --- Add Watermark ---
watermark_clip_instance = None # Keep track for closing
if watermark_enabled and watermark_path and os.path.exists(watermark_path):
try:
logger.info(f"Adding watermark from: {watermark_path}")
# Use ismask=True if your watermark PNG has transparency
is_mask = watermark_path.lower().endswith(".png")
wm_img = ImageClip(watermark_path, ismask=is_mask, transparent=True)
clips_to_close.append(wm_img)
# Size
wm_size_param = watermark_opts.get("size", 0.15) # Default: 15% of video width
target_wm_width = None
target_wm_height = None
if isinstance(wm_size_param, float): # Relative size based on video width
target_wm_width = int(width * wm_size_param)
elif isinstance(wm_size_param, (tuple, list)) and len(wm_size_param) == 2: # Absolute size (w, h)
target_wm_width = int(wm_size_param[0])
target_wm_height = int(wm_size_param[1])
# Resize watermark maintaining aspect ratio
if target_wm_width and target_wm_height:
wm_img = wm_img.resize(newsize=(target_wm_width, target_wm_height))
elif target_wm_width:
wm_img = wm_img.resize(width=target_wm_width)
elif target_wm_height:
wm_img = wm_img.resize(height=target_wm_height)
# Else use original size if no specific size given
# Position with margin
wm_pos = watermark_opts.get("position", ("right", "bottom")) # Default bottom right
margin = int(watermark_opts.get("margin", 15)) # Pixels margin
# Convert position keywords to coordinates respecting margin
def get_coord(dim, size, pos_keyword, margin):
if pos_keyword == 'left': return margin
if pos_keyword == 'center': return (dim / 2) - (size / 2)
if pos_keyword == 'right': return dim - size - margin
# Allow numerical values (absolute or fractional)
if isinstance(pos_keyword, (int, float)): return pos_keyword
return margin # Default fallback
final_wm_pos = (
get_coord(width, wm_img.w, wm_pos[0], margin),
get_coord(height, wm_img.h, wm_pos[1], margin)
)
wm_img = wm_img.set_position(final_wm_pos)
# Opacity
wm_opacity = float(watermark_opts.get("opacity", 0.7))
if 0 <= wm_opacity < 1.0:
wm_img = wm_img.set_opacity(wm_opacity)
# Set duration to match video and composite
watermark_clip_instance = wm_img.set_duration(final_video.duration).set_start(0)
# Composite watermark on top
final_video = CompositeVideoClip([final_video, watermark_clip_instance], size=(width, height), use_bgclip=True)
clips_to_close.append(final_video) # The new composite needs closing too
logger.info("βœ… Watermark added.")
except Exception as e:
logger.error(f"❌ Failed to add watermark: {e}")
# Don't add watermark_clip_instance to clips_to_close if it failed
# --- Add End Logo Screen ---
end_logo_clip_instance = None # Keep track for closing
if end_logo_enabled and end_logo_path and os.path.exists(end_logo_path) and end_logo_duration > 0:
try:
logger.info(f"Adding end logo screen: {end_logo_path}")
end_logo_clip_instance = ImageClip(end_logo_path).set_duration(end_logo_duration).resize(newsize=(width, height))
clips_to_close.append(end_logo_clip_instance)
# Simple fade transition to end logo - Use concatenate again
final_video = concatenate_videoclips([final_video, end_logo_clip_instance], method="compose", transition=crossfadein(0.5))
clips_to_close.append(final_video) # The *new* final video needs closing
logger.info("βœ… End logo screen added.")
except Exception as e:
logger.error(f"❌ Failed to add end logo screen: {e}")
# --- Write Final Video ---
final_output_path = None
temp_audio_file_path = None # Keep track of temp audio file
try:
logger.info(f"Writing final video to: {output_path}...")
# Generate a unique temp audio filename
temp_audio_file_path = os.path.join(tempfile.gettempdir(), f"temp-audio-{uuid.uuid4().hex}.m4a")
final_video.write_videofile(
output_path,
codec='libx264', # Good compatibility
audio_codec='aac', # Standard audio codec
temp_audiofile=temp_audio_file_path, # Use explicit temp file path
remove_temp=True, # Let MoviePy handle removal on success
fps=fps,
preset='medium', # Balance quality and speed ('ultrafast'...'veryslow')
threads=4, # Use multiple threads if available
ffmpeg_params=[ # Ensure web compatibility
'-pix_fmt', 'yuv420p',
'-movflags', '+faststart' # Important for web streaming
],
logger='bar' # Progress bar
)
logger.info(f"βœ… Final video saved successfully: {output_path}")
final_output_path = output_path # Set success path
except Exception as e:
# Log full traceback for write errors
import traceback
logger.error(f"❌ ERROR writing final video file: {e}\n{traceback.format_exc()}")
# Attempt to remove the potentially partially written output file
if os.path.exists(output_path):
try: os.remove(output_path)
except OSError: logger.error(f"Could not remove partially written file: {output_path}")
# Explicitly try removing temp audio if write failed and remove_temp=True might not have run
if temp_audio_file_path and os.path.exists(temp_audio_file_path):
try:
os.remove(temp_audio_file_path)
logger.info(f"Cleaned up temp audio file: {temp_audio_file_path}")
except OSError:
logger.error(f"Could not remove temp audio file: {temp_audio_file_path}")
finally:
# --- Close all opened clips ---
logger.debug(f"Closing {len(clips_to_close)} MoviePy clips...")
for clip in reversed(clips_to_close): # Close in reverse order (composites first)
try:
clip.close()
except Exception as e:
# Log closing errors but continue
logger.warning(f"Error closing a clip: {e}")
logger.debug("Finished closing clips.")
return final_output_path # Return path only on success