import subprocess import json import os import uuid from typing import Optional, Tuple import imageio_ffmpeg from schemas import ShortsStyle, AspectRatio def get_video_info_ffmpeg(video_path: str) -> dict: """Get video information using FFmpeg - much faster than MoviePy""" try: # Get FFmpeg executable from imageio_ffmpeg ffmpeg_exe = imageio_ffmpeg.get_ffmpeg_exe() # Use ffmpeg to get basic info (ffprobe not available in imageio_ffmpeg) cmd = [ ffmpeg_exe, '-i', video_path, '-f', 'null', '-', '-hide_banner' ] result = subprocess.run(cmd, capture_output=True, text=True) # Parse info from stderr (ffmpeg outputs info to stderr) info_text = result.stderr # Extract duration duration = 0 if "Duration:" in info_text: duration_line = [line for line in info_text.split('\n') if 'Duration:' in line][0] duration_str = duration_line.split('Duration:')[1].split(',')[0].strip() # Convert HH:MM:SS.ms to seconds parts = duration_str.split(':') if len(parts) == 3: duration = float(parts[0]) * 3600 + float(parts[1]) * 60 + float(parts[2]) # Extract video stream info video_stream = None audio_stream = None width = height = fps = bitrate = 0 has_audio = False for line in info_text.split('\n'): if 'Video:' in line: video_stream = line # Extract dimensions if ', ' in line: parts = line.split(', ') for part in parts: if 'x' in part and part.replace('x', '').replace(' ', '').isdigit(): # Find dimension part dim_part = part.strip() if ' ' in dim_part: dim_part = dim_part.split()[0] if 'x' in dim_part and len(dim_part.split('x')) == 2: try: width, height = map(int, dim_part.split('x')) break except: pass # Extract FPS if ' fps' in part: try: fps = float(part.replace(' fps', '')) except: pass # Extract bitrate if ' kb/s' in part: try: bitrate = int(part.replace(' kb/s', '')) * 1000 except: pass if 'Audio:' in line: audio_stream = line has_audio = True # Build info dict info = { 'duration': duration, 'width': width, 'height': height, 'fps': fps, 'bitrate': bitrate, 'has_audio': has_audio, 'size': os.path.getsize(video_path) if os.path.exists(video_path) else 0 } return info # Extract useful info video_stream = None audio_stream = None for stream in data.get('streams', []): if stream.get('codec_type') == 'video': video_stream = stream elif stream.get('codec_type') == 'audio': audio_stream = stream info = { 'duration': float(data.get('format', {}).get('duration', 0)), 'size': int(data.get('format', {}).get('size', 0)), 'has_audio': audio_stream is not None, 'width': int(video_stream.get('width', 0)) if video_stream else 0, 'height': int(video_stream.get('height', 0)) if video_stream else 0, 'fps': eval(video_stream.get('r_frame_rate', '0')) if video_stream else 0, 'bitrate': int(data.get('format', {}).get('bit_rate', 0)), } return info except Exception as e: print(f"Error getting video info with FFmpeg: {e}") return None def escape_path_for_ass(path: str) -> str: """Escapes Windows paths for the FFmpeg ASS filter.""" # Replace backslashes with forward slashes path = path.replace('\\', '/') # Escape the colon after the drive letter path = path.replace(':', '\\:') return path def extract_clip_ffmpeg( video_path: str, start_time: float, end_time: float, output_path: str, target_width: Optional[int] = None, target_height: Optional[int] = None, include_audio: bool = True, style: Optional[ShortsStyle] = None, aspect_ratio: Optional[AspectRatio] = None, bg_music_path: Optional[str] = None, video_volume: float = 1.0, music_volume: float = 0.2, loop_music: bool = True, subtitle_path: Optional[str] = None ) -> str: """ Extract video clip using FFmpeg - high speed with advanced filters and optional subtitles """ try: duration = end_time - start_time ffmpeg_exe = imageio_ffmpeg.get_ffmpeg_exe() # Get video info to check for audio stream video_info = get_video_info_ffmpeg(video_path) has_orig_audio = video_info.get('has_audio', False) if video_info else False # Build FFmpeg command inputs = ['-i', video_path] if bg_music_path and os.path.exists(bg_music_path): if loop_music: inputs.extend(['-stream_loop', '-1', '-i', bg_music_path]) else: inputs.extend(['-i', bg_music_path]) cmd = [ffmpeg_exe] + inputs + ['-ss', str(start_time), '-t', str(duration)] # Resolution defaults for Shorts if not provided if not target_width or not target_height: target_width, target_height = 1080, 1920 filter_complex = "" if aspect_ratio == AspectRatio.ORIGINAL or style == ShortsStyle.ORIGINAL: # No filters, just copy/passthrough filter_complex = "" elif style == ShortsStyle.CINEMATIC: # Cinematic Blur: BG (Blurred & Cropped) + FG (Scaled to fit width) # Use -2 to ensure height is divisible by 2 for yuv420p filter_complex = ( f"[0:v]split=2[bg_raw][fg_raw];" f"[bg_raw]scale={target_width}:{target_height}:force_original_aspect_ratio=increase,crop={target_width}:{target_height},boxblur=20:2[bg];" f"[fg_raw]scale={target_width}:-2,setsar=1[fg];" f"[bg][fg]overlay=(W-w)/2:(H-h)/2,setsar=1[v]" ) elif style == ShortsStyle.SPLIT_SCREEN: # Split Screen: Top half and Bottom half half_h = target_height // 2 filter_complex = ( f"[0:v]split=2[top_raw][bottom_raw];" f"[top_raw]scale={target_width}:{half_h}:force_original_aspect_ratio=increase,crop={target_width}:{half_h}[top];" f"[bottom_raw]scale={target_width}:{half_h}:force_original_aspect_ratio=increase,crop={target_width}:{half_h}[bottom];" f"[top][bottom]vstack=inputs=2[v]" ) elif style == ShortsStyle.CROP_FILL: # Crop to fill entire canvas filter_complex = f"scale={target_width}:{target_height}:force_original_aspect_ratio=increase,crop={target_width}:{target_height}" elif style == ShortsStyle.FIT_BARS: # Fit with black bars filter_complex = f"scale={target_width}:{target_height}:force_original_aspect_ratio=decrease,pad={target_width}:{target_height}:(ow-iw)/2:(oh-ih)/2" elif aspect_ratio != AspectRatio.ORIGINAL: # Default for non-original ratios: Scale to fit width # Use -2 to ensure dimensions are divisible by 2 for yuv420p filter_complex = f"scale={target_width}:-2,pad={target_width}:{target_height}:(ow-iw)/2:(oh-ih)/2,setsar=1[v]" else: filter_complex = "" # Subtitle logic if subtitle_path and os.path.exists(subtitle_path): escaped_sub_path = escape_path_for_ass(subtitle_path) if filter_complex: if "[v]" not in filter_complex: filter_complex += "[v]" filter_complex += f";[v]ass='{escaped_sub_path}'[v]" else: filter_complex = f"[0:v]ass='{escaped_sub_path}'[v]" # Audio Filter logic audio_filter = "" if include_audio: if bg_music_path and os.path.exists(bg_music_path): if has_orig_audio: # Mix original audio with background music # Use unique names [a_orig] [a_bg] to avoid collisions with [v] audio_filter = f"[0:a]volume={video_volume}[a_orig];[1:a]volume={music_volume}[a_bg];[a_orig][a_bg]amix=inputs=2:duration=first[aout]" else: # Only background music (original has no audio) audio_filter = f"[1:a]volume={music_volume}[aout]" elif has_orig_audio and video_volume != 1.0: audio_filter = f"[0:a]volume={video_volume}[aout]" if filter_complex: # If [v] is not the final tag, ensure it is if "[v]" not in filter_complex: filter_complex += "[v]" if audio_filter: combined_filter = f"{filter_complex};{audio_filter}" cmd.extend(['-filter_complex', combined_filter, '-map', '[v]', '-map', '[aout]']) else: cmd.extend(['-filter_complex', filter_complex, '-map', '[v]']) if include_audio: cmd.extend(['-map', '0:a?']) elif audio_filter: # Only audio filter cmd.extend(['-filter_complex', audio_filter, '-map', '0:v:0', '-map', '[aout]']) if not include_audio: cmd.extend(['-an']) # Optimization settings cmd.extend([ '-c:v', 'libx264', '-preset', 'superfast', '-crf', '23', '-pix_fmt', 'yuv420p', # Ensure compatibility '-c:a', 'aac', # Encode audio to AAC '-b:a', '128k', # Standard bitrate '-ac', '2', # Stereo '-y', '-loglevel', 'error', output_path ]) subprocess.run(cmd, check=True, capture_output=True) return output_path except Exception as e: print(f"❌ FFmpeg advanced extraction failed: {e}") raise e def should_use_ffmpeg(timestamps, custom_dims, export_audio, bg_music) -> bool: """ Determine if FFmpeg can be used instead of MoviePy FFmpeg is faster for simple operations, MoviePy for complex ones """ # Use FFmpeg if: # 1. No background music # 2. No complex custom dimensions (just simple resize) # 3. Simple timestamps (no overlapping or complex cuts) # 4. Basic audio export (no mixing) if bg_music: return False # Need MoviePy for audio mixing if custom_dims and hasattr(custom_dims, 'audio_path') and custom_dims.audio_path: return False # Background music requires MoviePy # Check if timestamps are simple (no overlapping) if len(timestamps) > 1: # Sort timestamps and check for overlaps sorted_ts = sorted(timestamps, key=lambda x: x.start) for i in range(1, len(sorted_ts)): if sorted_ts[i].start < sorted_ts[i-1].end: return False # Overlapping timestamps need MoviePy return True def hybrid_process_clips(video_path, timestamps, output_format, custom_dims=None, export_audio=True, bg_music=None, subtitle_path=None): """ Hybrid approach: Use FFmpeg for simple operations, MoviePy for complex ones """ try: # Try FFmpeg first if conditions are met if should_use_ffmpeg(timestamps, custom_dims, export_audio, bg_music): print("Using FFmpeg for fast processing...") return process_with_ffmpeg(video_path, timestamps, output_format, custom_dims, export_audio, subtitle_path=subtitle_path) else: print("Using MoviePy for complex processing...") # Fall back to existing MoviePy implementation from video_processor import process_video_clips return process_video_clips(video_path, timestamps, output_format, custom_dims, export_audio) except Exception as e: print(f"Hybrid processing failed: {e}") print("Falling back to MoviePy...") from video_processor import process_video_clips return process_video_clips(video_path, timestamps, output_format, custom_dims, export_audio) def process_with_ffmpeg(video_path, timestamps, output_format, custom_dims=None, export_audio=True, subtitle_path=None): """Process clips using FFmpeg for maximum speed""" clip_paths = [] # Get video info video_info = get_video_info_ffmpeg(video_path) if not video_info: raise Exception("Could not get video info") # Determine target dimensions target_width, target_height = None, None if custom_dims and hasattr(custom_dims, 'width') and hasattr(custom_dims, 'height'): if custom_dims.width and custom_dims.height: target_width = custom_dims.width target_height = custom_dims.height # Process each timestamp for i, ts in enumerate(timestamps): clip_id = uuid.uuid4().hex[:8] # Generate output filename ext = 'mp4' # Default if output_format and hasattr(output_format, 'value'): if output_format.value == 'avi': ext = 'avi' elif output_format.value == 'mov': ext = 'mov' output_filename = f"{clip_id}_clip_{i+1}.{ext}" # Use PROCESSED_DIR for output from routers.video import PROCESSED_DIR output_path = os.path.join(PROCESSED_DIR, output_filename) # Extract clip using FFmpeg try: clip_path = extract_clip_ffmpeg( video_path, ts.start_time if hasattr(ts, 'start_time') else ts.start, ts.end_time if hasattr(ts, 'end_time') else ts.end, output_path, target_width, target_height, include_audio=export_audio, style=output_format if isinstance(output_format, ShortsStyle) else None, subtitle_path=subtitle_path ) clip_paths.append(clip_path) except Exception as e: print(f"FFmpeg extraction failed for clip {i+1}: {e}") raise e return clip_paths, [] # No audio paths for FFmpeg method