import os import uuid from concurrent.futures import ThreadPoolExecutor from moviepy import VideoFileClip, CompositeVideoClip, ColorClip import numpy as np from scipy.ndimage import gaussian_filter from schemas import ShortsStyle, Dimensions, LayoutType, AspectRatio, SubtitlePreset from hybrid_processor import process_video_hybrid from routers.subtitle_generator import generate_pro_ass def get_canvas_dimensions(ratio: AspectRatio) -> tuple: """Returns (width, height) for a given aspect ratio.""" if ratio == AspectRatio.RATIO_9_16: return 1080, 1920 elif ratio == AspectRatio.RATIO_1_1: return 1080, 1080 elif ratio == AspectRatio.RATIO_16_9: return 1920, 1080 elif ratio == AspectRatio.RATIO_4_5: return 1080, 1350 return None, None # For ORIGINAL def process_video_clips(video_path: str, timestamps, aspect_ratio: AspectRatio = AspectRatio.RATIO_9_16, style: ShortsStyle = ShortsStyle.ORIGINAL, custom_dims: Dimensions = None, export_audio: bool = False, use_parallel: bool = True, use_ffmpeg_optimization: bool = True, video_volume: float = 1.0, music_volume: float = 0.2, loop_music: bool = True, transcription: list = None, subtitle_preset: SubtitlePreset = None): """ Processes a video file into multiple clips based on timestamps and style. If export_audio is True, also saves the original audio track of each clip. use_parallel: Enable parallel processing for better performance (default: True) use_ffmpeg_optimization: Use FFmpeg for simple operations (much faster) (default: True) Supports hard-burned subtitles if transcription is provided. """ clip_paths = [] audio_paths = [] # Extract background music path if available bg_music_path = None if custom_dims and hasattr(custom_dims, 'audio_path') and custom_dims.audio_path: if os.path.exists(custom_dims.audio_path): bg_music_path = custom_dims.audio_path # Ensure custom_dims has the volume info for hybrid processing if custom_dims: custom_dims.video_volume = video_volume custom_dims.music_volume = music_volume custom_dims.loop_music = loop_music # Handle Subtitles if transcription is provided subtitle_path = None if transcription: if not subtitle_preset: subtitle_preset = SubtitlePreset(name="Default") # Generate temporary ASS file temp_dir = os.path.dirname(video_path) subtitle_path = os.path.join(temp_dir, f"sub_{uuid.uuid4().hex[:8]}.ass") generate_pro_ass(transcription, subtitle_preset, subtitle_path) print(f"📝 Subtitles generated: {subtitle_path}") # Try FFmpeg optimization first for simple cases if use_ffmpeg_optimization: try: print(f"🚀 Trying FFmpeg optimization for style: {style}...") clip_paths, audio_paths = process_video_hybrid( video_path=video_path, timestamps=timestamps, output_format=style, custom_dims=custom_dims, export_audio=export_audio, aspect_ratio=aspect_ratio, bg_music=bg_music_path, subtitle_paths=[subtitle_path] * len(timestamps) if subtitle_path else None ) if clip_paths: # If FFmpeg worked, return results print(f"✅ FFmpeg optimization successful! Processed {len(clip_paths)} clips") return clip_paths, audio_paths except Exception as e: print(f"⚠️ FFmpeg optimization failed: {e}") print("🎬 Falling back to MoviePy...") try: # Load background music if provided for MoviePy fallback bg_music = None if bg_music_path: from moviepy import AudioFileClip, CompositeAudioClip import moviepy.audio.fx as afx bg_music = AudioFileClip(bg_music_path) if use_parallel and len(timestamps) > 1: # Process clips in parallel for better performance max_workers = min(3, len(timestamps)) # Limit parallel workers with ThreadPoolExecutor(max_workers=max_workers) as executor: # Submit all clip processing tasks futures = [] for i, ts in enumerate(timestamps): clip_id = uuid.uuid4().hex[:8] future = executor.submit( process_single_clip, ts, video_path, aspect_ratio, style, custom_dims, export_audio, bg_music, clip_id, video_volume=video_volume, music_volume=music_volume, loop_music=loop_music ) futures.append((future, clip_id)) # Collect results for future, clip_id in futures: try: output_path, audio_output_path = future.result() if output_path: clip_paths.append(output_path) if export_audio: audio_paths.append(audio_output_path) except Exception as e: print(f"Error processing clip {clip_id}: {str(e)}") if export_audio: audio_paths.append(None) else: # Sequential processing - more memory efficient for single clips or when parallel is disabled from moviepy import VideoFileClip, CompositeAudioClip import moviepy.audio.fx as afx # Define folders from routers.video import PROCESSED_DIR, TEMP_DIR with VideoFileClip(video_path) as video: print(f"DEBUG: Video loaded. Duration: {video.duration}") for ts in timestamps: print(f"DEBUG: Processing clip. Request: Start={ts.start_time}, End={ts.end_time}") # Basic validation if ts.start_time >= video.duration: print(f"DEBUG: Skipping clip. Start time {ts.start_time} is beyond video duration {video.duration}.") continue end = min(ts.end_time, video.duration) print(f"DEBUG: Extracting subclip from {ts.start_time} to {end}") # Generate unique ID for this clip operation clip_id = uuid.uuid4().hex[:8] output_filename = f"clip_{clip_id}.mp4" output_path = os.path.join(PROCESSED_DIR, output_filename) audio_output_path = None # Extract subclip and process it with video.subclipped(ts.start_time, end) as subclip: # Apply background music if available if bg_music: # Setup background music bg_music_clip = bg_music.with_duration(subclip.duration) if loop_music: bg_music_clip = bg_music_clip.fx(afx.AudioLoop, duration=subclip.duration) # Apply volumes if subclip.audio: original_audio = subclip.audio.with_volume_scaled(video_volume) bg_music_clip = bg_music_clip.with_volume_scaled(music_volume) # Combine audio subclip.audio = CompositeAudioClip([original_audio, bg_music_clip]) else: subclip.audio = bg_music_clip.with_volume_scaled(music_volume) # Apply formatting if aspect_ratio == AspectRatio.ORIGINAL or style == ShortsStyle.ORIGINAL: pass # Skip resizing, keep original dimensions else: # Map style to layout layout_map = { ShortsStyle.CINEMATIC: LayoutType.CINEMATIC_BLUR, ShortsStyle.CROP_FILL: LayoutType.CROP_CENTER, ShortsStyle.FIT_BARS: LayoutType.FIT_CENTER, ShortsStyle.SPLIT_SCREEN: LayoutType.SPLIT_SCREEN } layout = layout_map.get(style, LayoutType.CROP_CENTER) # Get target dimensions target_w, target_h = get_canvas_dimensions(aspect_ratio) if target_w and target_h: subclip = apply_layout_factory(subclip, layout, target_w, target_h, custom_dims) # Write file - optimized settings for compatibility and speed temp_audio = os.path.join(TEMP_DIR, f"temp-audio-{clip_id}.m4a") subclip.write_videofile( output_path, codec="libx264", audio_codec="aac", temp_audiofile=temp_audio, remove_temp=True, fps=24, threads=4, preset="superfast", logger=None ) if output_path: clip_paths.append(output_path) if export_audio: audio_paths.append(audio_output_path) if bg_music: bg_music.close() return clip_paths, audio_paths except Exception as e: print(f"Error processing video: {str(e)}") raise e def apply_layout_factory(clip, layout_type, target_w, target_h, config=None): """ Factory to apply different video layouts. """ if layout_type == LayoutType.CINEMATIC_BLUR: return apply_cinematic_blur(clip, target_w, target_h, config.blur_intensity if config and hasattr(config, 'blur_intensity') else 20) elif layout_type == LayoutType.FIT_CENTER: return apply_fit_layout(clip, target_w, target_h) elif layout_type == LayoutType.CROP_CENTER: # Default to Crop/Fill target_ratio = target_w / target_h formatted = format_clip(clip, target_ratio) return formatted.resized(width=target_w, height=target_h) elif layout_type == LayoutType.SPLIT_SCREEN: return apply_split_screen(clip, target_w, target_h) else: # Fallback return clip.resized(width=target_w) if clip.w > target_w else clip def apply_split_screen(clip, target_w, target_h): """ Splits the screen into two halves (top and bottom) with the same video. """ half_h = target_h // 2 # Top half: resize to fill top = format_clip(clip, target_w / half_h).resized(width=target_w, height=half_h) # Bottom half: same video bottom = format_clip(clip, target_w / half_h).resized(width=target_w, height=half_h) return CompositeVideoClip([ top.with_position(("center", "top")), bottom.with_position(("center", "bottom")) ], size=(target_w, target_h)) def apply_cinematic_blur(clip, target_w, target_h, blur_intensity=20): """ Creates a cinematic blurred background with the original video on top. Uses custom fl_image filter for maximum compatibility. """ def blur_filter(image): # Apply gaussian filter to the RGB channels (axis 0 and 1) # Sigma is the blur intensity. # We use (blur_intensity, blur_intensity, 0) to not blur the color channel axis. return gaussian_filter(image, sigma=(blur_intensity, blur_intensity, 0)) # 1. Background: Scale to fill and blur bg = format_clip(clip, target_w / target_h) bg = bg.resized(width=target_w, height=target_h) # Apply our custom blur filter bg = bg.image_transform(blur_filter) # 2. Foreground: Resize original to fit width while keeping aspect ratio fg = clip.resized(width=target_w) # Center the foreground on the background final_clip = CompositeVideoClip([ bg, fg.with_position("center") ], size=(target_w, target_h)) return final_clip def apply_fit_layout(clip, target_w, target_h): """ Fits the video inside the target dimensions with black bars (Letterboxing). """ # Resize to fit within target dims fg = clip.resized(width=target_w) if (clip.w / clip.h) > (target_w / target_h) else clip.resized(height=target_h) # Create black background bg = ColorClip(size=(target_w, target_h), color=(0,0,0), duration=clip.duration) return CompositeVideoClip([bg, fg.with_position("center")], size=(target_w, target_h)) def format_clip(clip, target_ratio): """ Crops and resizes a clip to a target aspect ratio. """ w, h = clip.size current_ratio = w / h if current_ratio > target_ratio: # Source is wider than target, crop sides new_w = h * target_ratio subclip = clip.cropped(x_center=w/2, width=new_w) else: # Source is taller than target, crop top/bottom new_h = w / target_ratio subclip = clip.cropped(y_center=h/2, height=new_h) # Standardize resolutions for known formats if target_ratio == 9/16: # Shorts return subclip.resized(height=1920) if subclip.h < 1920 else subclip elif target_ratio == 16/9: # Standard return subclip.resized(width=1920) if subclip.w < 1920 else subclip elif target_ratio == 1/1: # Square return subclip.resized(width=1080) if subclip.w < 1080 else subclip return subclip def safe_remove(path: str, max_retries: int = 3): """Attempt to remove a file with retries for Windows file locking.""" import time import os if not os.path.exists(path): print(f"[SAFE_REMOVE] File does not exist: {path}") return for i in range(max_retries): try: os.remove(path) print(f"[SAFE_REMOVE] Successfully deleted: {path}") return True except PermissionError as e: print(f"[SAFE_REMOVE] Permission error on attempt {i+1}: {e}") if i < max_retries - 1: time.sleep(1) # Wait for handles to clear else: print(f"[SAFE_REMOVE] Warning: Could not delete {path} after {max_retries} attempts.") return False except Exception as e: print(f"[SAFE_REMOVE] Error deleting {path}: {e}") return False return False def create_zip_archive(file_paths: list, output_filename: str): """ Creates a ZIP archive containing the specified files. """ import zipfile # Filter out None values file_paths = [f for f in file_paths if f and os.path.exists(f)] if not file_paths: return None zip_path = os.path.join(os.path.dirname(file_paths[0]), output_filename) with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: for file in file_paths: zipf.write(file, os.path.basename(file)) return zip_path def process_single_clip(ts, video_path, aspect_ratio, style, custom_dims, export_audio, bg_music, clip_id, video_volume=1.0, music_volume=0.2, loop_music=True): """ Process a single clip - for parallel processing. """ try: # Ensure custom_dims has the volume info if not custom_dims: from schemas import Dimensions custom_dims = Dimensions() custom_dims.video_volume = video_volume custom_dims.music_volume = music_volume custom_dims.loop_music = loop_music from moviepy import VideoFileClip, CompositeAudioClip import moviepy.audio.fx as afx from schemas import AspectRatio, ShortsStyle, LayoutType # Open video for this clip (parallel processing) with VideoFileClip(video_path) as video: print(f"DEBUG: Processing clip {clip_id}. Request: Start={ts.start_time}, End={ts.end_time}") # Basic validation if ts.start_time >= video.duration: print(f"DEBUG: Skipping clip {clip_id}. Start time {ts.start_time} is beyond video duration {video.duration}.") return None, None end = min(ts.end_time, video.duration) print(f"DEBUG: Extracting subclip {clip_id} from {ts.start_time} to {end}") # Extract subclip subclip = video.subclipped(ts.start_time, end) # Generate unique ID for this clip operation output_filename = f"clip_{clip_id}.mp4" from routers.video import PROCESSED_DIR, TEMP_DIR output_path = os.path.join(PROCESSED_DIR, output_filename) # (Removed automatic audio extraction to mp3) audio_output_path = None # Apply background music if available if bg_music: from moviepy import CompositeAudioClip import moviepy.audio.fx as afx # Setup background music bg_music_clip = bg_music.with_duration(subclip.duration) if loop_music: bg_music_clip = bg_music_clip.fx(afx.AudioLoop, duration=subclip.duration) # Apply volumes original_audio = subclip.audio.with_volume_scaled(video_volume) bg_music_clip = bg_music_clip.with_volume_scaled(music_volume) # Combine audio subclip.audio = CompositeAudioClip([original_audio, bg_music_clip]) # Apply formatting if aspect_ratio == AspectRatio.ORIGINAL or style == ShortsStyle.ORIGINAL: pass # Skip resizing, keep original dimensions else: # Map style to layout layout_map = { ShortsStyle.CINEMATIC: LayoutType.CINEMATIC_BLUR, ShortsStyle.CROP_FILL: LayoutType.CROP_CENTER, ShortsStyle.FIT_BARS: LayoutType.FIT_CENTER, ShortsStyle.SPLIT_SCREEN: LayoutType.SPLIT_SCREEN } layout = layout_map.get(style, LayoutType.CROP_CENTER) # Get target dimensions target_w, target_h = get_canvas_dimensions(aspect_ratio) if target_w and target_h: subclip = apply_layout_factory(subclip, layout, target_w, target_h, custom_dims) # Write file - optimized settings for speed temp_audio = os.path.join(TEMP_DIR, f"temp-audio-{clip_id}.m4a") subclip.write_videofile( output_path, codec="libx264", audio_codec="aac", temp_audiofile=temp_audio, remove_temp=True, fps=24, threads=4, preset="superfast", logger=None ) # Explicitly close everything if subclip.audio: subclip.audio.close() subclip.close() return output_path, audio_output_path except Exception as e: print(f"Error processing clip {clip_id}: {str(e)}") return None, None def extract_audio_from_video(video_path: str, output_format: str = "mp3"): """ Extract audio from a video file and save it as an audio file. """ try: from routers.video import AUDIO_DIR # Generate output filename base_name = os.path.splitext(os.path.basename(video_path))[0] audio_filename = f"{base_name}_audio.{output_format}" output_path = os.path.join(AUDIO_DIR, audio_filename) # Try FFmpeg first (fastest method) try: import subprocess import imageio_ffmpeg # Get FFmpeg executable path from imageio_ffmpeg ffmpeg_exe = imageio_ffmpeg.get_ffmpeg_exe() # Determine codec based on output format if output_format.lower() == 'mp3': codec = 'libmp3lame' elif output_format.lower() == 'wav': codec = 'pcm_s16le' elif output_format.lower() in ['m4a', 'aac']: codec = 'aac' else: codec = 'copy' # Try to copy without re-encoding # FFmpeg command to extract audio stream only (much faster) cmd = [ ffmpeg_exe, '-i', video_path, '-vn', # no video processing '-acodec', codec, '-y', # overwrite output '-loglevel', 'error', # suppress output output_path ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode == 0 and os.path.exists(output_path): print(f"✓ Audio extracted using FFmpeg (fast method)") return output_path else: print(f"FFmpeg failed: {result.stderr}") raise Exception("FFmpeg extraction failed") except Exception as ffmpeg_error: print(f"FFmpeg not available or failed: {ffmpeg_error}") print("Falling back to MoviePy method...") # Fallback to MoviePy method from moviepy import VideoFileClip with VideoFileClip(video_path) as video: if video.audio is None: raise ValueError("Video has no audio track") video.audio.write_audiofile( output_path, logger=None ) print(f"✓ Audio extracted using MoviePy (fallback method)") return output_path except Exception as e: print(f"Error extracting audio: {str(e)}") raise e