Clipping / ffmpeg_utils.py
aliSaac510's picture
add sub endpoints
c63c39e
import subprocess
import json
import os
import uuid
from typing import Optional, Tuple
import imageio_ffmpeg
from schemas import ShortsStyle, AspectRatio
def get_video_info_ffmpeg(video_path: str) -> dict:
"""Get video information using FFmpeg - much faster than MoviePy"""
try:
# Get FFmpeg executable from imageio_ffmpeg
ffmpeg_exe = imageio_ffmpeg.get_ffmpeg_exe()
# Use ffmpeg to get basic info (ffprobe not available in imageio_ffmpeg)
cmd = [
ffmpeg_exe, '-i', video_path,
'-f', 'null', '-',
'-hide_banner'
]
result = subprocess.run(cmd, capture_output=True, text=True)
# Parse info from stderr (ffmpeg outputs info to stderr)
info_text = result.stderr
# Extract duration
duration = 0
if "Duration:" in info_text:
duration_line = [line for line in info_text.split('\n') if 'Duration:' in line][0]
duration_str = duration_line.split('Duration:')[1].split(',')[0].strip()
# Convert HH:MM:SS.ms to seconds
parts = duration_str.split(':')
if len(parts) == 3:
duration = float(parts[0]) * 3600 + float(parts[1]) * 60 + float(parts[2])
# Extract video stream info
video_stream = None
audio_stream = None
width = height = fps = bitrate = 0
has_audio = False
for line in info_text.split('\n'):
if 'Video:' in line:
video_stream = line
# Extract dimensions
if ', ' in line:
parts = line.split(', ')
for part in parts:
if 'x' in part and part.replace('x', '').replace(' ', '').isdigit():
# Find dimension part
dim_part = part.strip()
if ' ' in dim_part:
dim_part = dim_part.split()[0]
if 'x' in dim_part and len(dim_part.split('x')) == 2:
try:
width, height = map(int, dim_part.split('x'))
break
except:
pass
# Extract FPS
if ' fps' in part:
try:
fps = float(part.replace(' fps', ''))
except:
pass
# Extract bitrate
if ' kb/s' in part:
try:
bitrate = int(part.replace(' kb/s', '')) * 1000
except:
pass
if 'Audio:' in line:
audio_stream = line
has_audio = True
# Build info dict
info = {
'duration': duration,
'width': width,
'height': height,
'fps': fps,
'bitrate': bitrate,
'has_audio': has_audio,
'size': os.path.getsize(video_path) if os.path.exists(video_path) else 0
}
return info
# Extract useful info
video_stream = None
audio_stream = None
for stream in data.get('streams', []):
if stream.get('codec_type') == 'video':
video_stream = stream
elif stream.get('codec_type') == 'audio':
audio_stream = stream
info = {
'duration': float(data.get('format', {}).get('duration', 0)),
'size': int(data.get('format', {}).get('size', 0)),
'has_audio': audio_stream is not None,
'width': int(video_stream.get('width', 0)) if video_stream else 0,
'height': int(video_stream.get('height', 0)) if video_stream else 0,
'fps': eval(video_stream.get('r_frame_rate', '0')) if video_stream else 0,
'bitrate': int(data.get('format', {}).get('bit_rate', 0)),
}
return info
except Exception as e:
print(f"Error getting video info with FFmpeg: {e}")
return None
def escape_path_for_ass(path: str) -> str:
"""Escapes Windows paths for the FFmpeg ASS filter."""
# Replace backslashes with forward slashes
path = path.replace('\\', '/')
# Escape the colon after the drive letter
path = path.replace(':', '\\:')
return path
def extract_clip_ffmpeg(
video_path: str,
start_time: float,
end_time: float,
output_path: str,
target_width: Optional[int] = None,
target_height: Optional[int] = None,
include_audio: bool = True,
style: Optional[ShortsStyle] = None,
aspect_ratio: Optional[AspectRatio] = None,
bg_music_path: Optional[str] = None,
video_volume: float = 1.0,
music_volume: float = 0.2,
loop_music: bool = True,
subtitle_path: Optional[str] = None
) -> str:
"""
Extract video clip using FFmpeg - high speed with advanced filters and optional subtitles
"""
try:
duration = end_time - start_time
ffmpeg_exe = imageio_ffmpeg.get_ffmpeg_exe()
# Get video info to check for audio stream
video_info = get_video_info_ffmpeg(video_path)
has_orig_audio = video_info.get('has_audio', False) if video_info else False
# Build FFmpeg command
inputs = ['-i', video_path]
if bg_music_path and os.path.exists(bg_music_path):
if loop_music:
inputs.extend(['-stream_loop', '-1', '-i', bg_music_path])
else:
inputs.extend(['-i', bg_music_path])
cmd = [ffmpeg_exe] + inputs + ['-ss', str(start_time), '-t', str(duration)]
# Resolution defaults for Shorts if not provided
if not target_width or not target_height:
target_width, target_height = 1080, 1920
filter_complex = ""
if aspect_ratio == AspectRatio.ORIGINAL or style == ShortsStyle.ORIGINAL:
# No filters, just copy/passthrough
filter_complex = ""
elif style == ShortsStyle.CINEMATIC:
# Cinematic Blur: BG (Blurred & Cropped) + FG (Scaled to fit width)
# Use -2 to ensure height is divisible by 2 for yuv420p
filter_complex = (
f"[0:v]split=2[bg_raw][fg_raw];"
f"[bg_raw]scale={target_width}:{target_height}:force_original_aspect_ratio=increase,crop={target_width}:{target_height},boxblur=20:2[bg];"
f"[fg_raw]scale={target_width}:-2,setsar=1[fg];"
f"[bg][fg]overlay=(W-w)/2:(H-h)/2,setsar=1[v]"
)
elif style == ShortsStyle.SPLIT_SCREEN:
# Split Screen: Top half and Bottom half
half_h = target_height // 2
filter_complex = (
f"[0:v]split=2[top_raw][bottom_raw];"
f"[top_raw]scale={target_width}:{half_h}:force_original_aspect_ratio=increase,crop={target_width}:{half_h}[top];"
f"[bottom_raw]scale={target_width}:{half_h}:force_original_aspect_ratio=increase,crop={target_width}:{half_h}[bottom];"
f"[top][bottom]vstack=inputs=2[v]"
)
elif style == ShortsStyle.CROP_FILL:
# Crop to fill entire canvas
filter_complex = f"scale={target_width}:{target_height}:force_original_aspect_ratio=increase,crop={target_width}:{target_height}"
elif style == ShortsStyle.FIT_BARS:
# Fit with black bars
filter_complex = f"scale={target_width}:{target_height}:force_original_aspect_ratio=decrease,pad={target_width}:{target_height}:(ow-iw)/2:(oh-ih)/2"
elif aspect_ratio != AspectRatio.ORIGINAL:
# Default for non-original ratios: Scale to fit width
# Use -2 to ensure dimensions are divisible by 2 for yuv420p
filter_complex = f"scale={target_width}:-2,pad={target_width}:{target_height}:(ow-iw)/2:(oh-ih)/2,setsar=1[v]"
else:
filter_complex = ""
# Subtitle logic
if subtitle_path and os.path.exists(subtitle_path):
escaped_sub_path = escape_path_for_ass(subtitle_path)
if filter_complex:
if "[v]" not in filter_complex:
filter_complex += "[v]"
filter_complex += f";[v]ass='{escaped_sub_path}'[v]"
else:
filter_complex = f"[0:v]ass='{escaped_sub_path}'[v]"
# Audio Filter logic
audio_filter = ""
if include_audio:
if bg_music_path and os.path.exists(bg_music_path):
if has_orig_audio:
# Mix original audio with background music
# Use unique names [a_orig] [a_bg] to avoid collisions with [v]
audio_filter = f"[0:a]volume={video_volume}[a_orig];[1:a]volume={music_volume}[a_bg];[a_orig][a_bg]amix=inputs=2:duration=first[aout]"
else:
# Only background music (original has no audio)
audio_filter = f"[1:a]volume={music_volume}[aout]"
elif has_orig_audio and video_volume != 1.0:
audio_filter = f"[0:a]volume={video_volume}[aout]"
if filter_complex:
# If [v] is not the final tag, ensure it is
if "[v]" not in filter_complex:
filter_complex += "[v]"
if audio_filter:
combined_filter = f"{filter_complex};{audio_filter}"
cmd.extend(['-filter_complex', combined_filter, '-map', '[v]', '-map', '[aout]'])
else:
cmd.extend(['-filter_complex', filter_complex, '-map', '[v]'])
if include_audio:
cmd.extend(['-map', '0:a?'])
elif audio_filter:
# Only audio filter
cmd.extend(['-filter_complex', audio_filter, '-map', '0:v:0', '-map', '[aout]'])
if not include_audio:
cmd.extend(['-an'])
# Optimization settings
cmd.extend([
'-c:v', 'libx264',
'-preset', 'superfast',
'-crf', '23',
'-pix_fmt', 'yuv420p', # Ensure compatibility
'-c:a', 'aac', # Encode audio to AAC
'-b:a', '128k', # Standard bitrate
'-ac', '2', # Stereo
'-y',
'-loglevel', 'error',
output_path
])
subprocess.run(cmd, check=True, capture_output=True)
return output_path
except Exception as e:
print(f"❌ FFmpeg advanced extraction failed: {e}")
raise e
def should_use_ffmpeg(timestamps, custom_dims, export_audio, bg_music) -> bool:
"""
Determine if FFmpeg can be used instead of MoviePy
FFmpeg is faster for simple operations, MoviePy for complex ones
"""
# Use FFmpeg if:
# 1. No background music
# 2. No complex custom dimensions (just simple resize)
# 3. Simple timestamps (no overlapping or complex cuts)
# 4. Basic audio export (no mixing)
if bg_music:
return False # Need MoviePy for audio mixing
if custom_dims and hasattr(custom_dims, 'audio_path') and custom_dims.audio_path:
return False # Background music requires MoviePy
# Check if timestamps are simple (no overlapping)
if len(timestamps) > 1:
# Sort timestamps and check for overlaps
sorted_ts = sorted(timestamps, key=lambda x: x.start)
for i in range(1, len(sorted_ts)):
if sorted_ts[i].start < sorted_ts[i-1].end:
return False # Overlapping timestamps need MoviePy
return True
def hybrid_process_clips(video_path, timestamps, output_format, custom_dims=None, export_audio=True, bg_music=None, subtitle_path=None):
"""
Hybrid approach: Use FFmpeg for simple operations, MoviePy for complex ones
"""
try:
# Try FFmpeg first if conditions are met
if should_use_ffmpeg(timestamps, custom_dims, export_audio, bg_music):
print("Using FFmpeg for fast processing...")
return process_with_ffmpeg(video_path, timestamps, output_format, custom_dims, export_audio, subtitle_path=subtitle_path)
else:
print("Using MoviePy for complex processing...")
# Fall back to existing MoviePy implementation
from video_processor import process_video_clips
return process_video_clips(video_path, timestamps, output_format, custom_dims, export_audio)
except Exception as e:
print(f"Hybrid processing failed: {e}")
print("Falling back to MoviePy...")
from video_processor import process_video_clips
return process_video_clips(video_path, timestamps, output_format, custom_dims, export_audio)
def process_with_ffmpeg(video_path, timestamps, output_format, custom_dims=None, export_audio=True, subtitle_path=None):
"""Process clips using FFmpeg for maximum speed"""
clip_paths = []
# Get video info
video_info = get_video_info_ffmpeg(video_path)
if not video_info:
raise Exception("Could not get video info")
# Determine target dimensions
target_width, target_height = None, None
if custom_dims and hasattr(custom_dims, 'width') and hasattr(custom_dims, 'height'):
if custom_dims.width and custom_dims.height:
target_width = custom_dims.width
target_height = custom_dims.height
# Process each timestamp
for i, ts in enumerate(timestamps):
clip_id = uuid.uuid4().hex[:8]
# Generate output filename
ext = 'mp4' # Default
if output_format and hasattr(output_format, 'value'):
if output_format.value == 'avi':
ext = 'avi'
elif output_format.value == 'mov':
ext = 'mov'
output_filename = f"{clip_id}_clip_{i+1}.{ext}"
# Use PROCESSED_DIR for output
from routers.video import PROCESSED_DIR
output_path = os.path.join(PROCESSED_DIR, output_filename)
# Extract clip using FFmpeg
try:
clip_path = extract_clip_ffmpeg(
video_path,
ts.start_time if hasattr(ts, 'start_time') else ts.start,
ts.end_time if hasattr(ts, 'end_time') else ts.end,
output_path,
target_width,
target_height,
include_audio=export_audio,
style=output_format if isinstance(output_format, ShortsStyle) else None,
subtitle_path=subtitle_path
)
clip_paths.append(clip_path)
except Exception as e:
print(f"FFmpeg extraction failed for clip {i+1}: {e}")
raise e
return clip_paths, [] # No audio paths for FFmpeg method