File size: 15,272 Bytes
519351f b7c85d5 519351f c63c39e 519351f b7c85d5 73b4be4 c63c39e 519351f c63c39e 519351f 73b4be4 519351f 73b4be4 519351f b7c85d5 519351f 73b4be4 b7c85d5 73b4be4 b7c85d5 73b4be4 b7c85d5 73b4be4 b7c85d5 73b4be4 c63c39e 73b4be4 b7c85d5 73b4be4 b7c85d5 73b4be4 519351f b7c85d5 519351f b7c85d5 519351f b7c85d5 519351f b7c85d5 519351f c63c39e 519351f c63c39e 519351f c63c39e 519351f b7c85d5 519351f b7c85d5 519351f b7c85d5 519351f b7c85d5 c63c39e 519351f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 | import subprocess
import json
import os
import uuid
from typing import Optional, Tuple
import imageio_ffmpeg
from schemas import ShortsStyle, AspectRatio
def get_video_info_ffmpeg(video_path: str) -> dict:
"""Get video information using FFmpeg - much faster than MoviePy"""
try:
# Get FFmpeg executable from imageio_ffmpeg
ffmpeg_exe = imageio_ffmpeg.get_ffmpeg_exe()
# Use ffmpeg to get basic info (ffprobe not available in imageio_ffmpeg)
cmd = [
ffmpeg_exe, '-i', video_path,
'-f', 'null', '-',
'-hide_banner'
]
result = subprocess.run(cmd, capture_output=True, text=True)
# Parse info from stderr (ffmpeg outputs info to stderr)
info_text = result.stderr
# Extract duration
duration = 0
if "Duration:" in info_text:
duration_line = [line for line in info_text.split('\n') if 'Duration:' in line][0]
duration_str = duration_line.split('Duration:')[1].split(',')[0].strip()
# Convert HH:MM:SS.ms to seconds
parts = duration_str.split(':')
if len(parts) == 3:
duration = float(parts[0]) * 3600 + float(parts[1]) * 60 + float(parts[2])
# Extract video stream info
video_stream = None
audio_stream = None
width = height = fps = bitrate = 0
has_audio = False
for line in info_text.split('\n'):
if 'Video:' in line:
video_stream = line
# Extract dimensions
if ', ' in line:
parts = line.split(', ')
for part in parts:
if 'x' in part and part.replace('x', '').replace(' ', '').isdigit():
# Find dimension part
dim_part = part.strip()
if ' ' in dim_part:
dim_part = dim_part.split()[0]
if 'x' in dim_part and len(dim_part.split('x')) == 2:
try:
width, height = map(int, dim_part.split('x'))
break
except:
pass
# Extract FPS
if ' fps' in part:
try:
fps = float(part.replace(' fps', ''))
except:
pass
# Extract bitrate
if ' kb/s' in part:
try:
bitrate = int(part.replace(' kb/s', '')) * 1000
except:
pass
if 'Audio:' in line:
audio_stream = line
has_audio = True
# Build info dict
info = {
'duration': duration,
'width': width,
'height': height,
'fps': fps,
'bitrate': bitrate,
'has_audio': has_audio,
'size': os.path.getsize(video_path) if os.path.exists(video_path) else 0
}
return info
# Extract useful info
video_stream = None
audio_stream = None
for stream in data.get('streams', []):
if stream.get('codec_type') == 'video':
video_stream = stream
elif stream.get('codec_type') == 'audio':
audio_stream = stream
info = {
'duration': float(data.get('format', {}).get('duration', 0)),
'size': int(data.get('format', {}).get('size', 0)),
'has_audio': audio_stream is not None,
'width': int(video_stream.get('width', 0)) if video_stream else 0,
'height': int(video_stream.get('height', 0)) if video_stream else 0,
'fps': eval(video_stream.get('r_frame_rate', '0')) if video_stream else 0,
'bitrate': int(data.get('format', {}).get('bit_rate', 0)),
}
return info
except Exception as e:
print(f"Error getting video info with FFmpeg: {e}")
return None
def escape_path_for_ass(path: str) -> str:
"""Escapes Windows paths for the FFmpeg ASS filter."""
# Replace backslashes with forward slashes
path = path.replace('\\', '/')
# Escape the colon after the drive letter
path = path.replace(':', '\\:')
return path
def extract_clip_ffmpeg(
video_path: str,
start_time: float,
end_time: float,
output_path: str,
target_width: Optional[int] = None,
target_height: Optional[int] = None,
include_audio: bool = True,
style: Optional[ShortsStyle] = None,
aspect_ratio: Optional[AspectRatio] = None,
bg_music_path: Optional[str] = None,
video_volume: float = 1.0,
music_volume: float = 0.2,
loop_music: bool = True,
subtitle_path: Optional[str] = None
) -> str:
"""
Extract video clip using FFmpeg - high speed with advanced filters and optional subtitles
"""
try:
duration = end_time - start_time
ffmpeg_exe = imageio_ffmpeg.get_ffmpeg_exe()
# Get video info to check for audio stream
video_info = get_video_info_ffmpeg(video_path)
has_orig_audio = video_info.get('has_audio', False) if video_info else False
# Build FFmpeg command
inputs = ['-i', video_path]
if bg_music_path and os.path.exists(bg_music_path):
if loop_music:
inputs.extend(['-stream_loop', '-1', '-i', bg_music_path])
else:
inputs.extend(['-i', bg_music_path])
cmd = [ffmpeg_exe] + inputs + ['-ss', str(start_time), '-t', str(duration)]
# Resolution defaults for Shorts if not provided
if not target_width or not target_height:
target_width, target_height = 1080, 1920
filter_complex = ""
if aspect_ratio == AspectRatio.ORIGINAL or style == ShortsStyle.ORIGINAL:
# No filters, just copy/passthrough
filter_complex = ""
elif style == ShortsStyle.CINEMATIC:
# Cinematic Blur: BG (Blurred & Cropped) + FG (Scaled to fit width)
# Use -2 to ensure height is divisible by 2 for yuv420p
filter_complex = (
f"[0:v]split=2[bg_raw][fg_raw];"
f"[bg_raw]scale={target_width}:{target_height}:force_original_aspect_ratio=increase,crop={target_width}:{target_height},boxblur=20:2[bg];"
f"[fg_raw]scale={target_width}:-2,setsar=1[fg];"
f"[bg][fg]overlay=(W-w)/2:(H-h)/2,setsar=1[v]"
)
elif style == ShortsStyle.SPLIT_SCREEN:
# Split Screen: Top half and Bottom half
half_h = target_height // 2
filter_complex = (
f"[0:v]split=2[top_raw][bottom_raw];"
f"[top_raw]scale={target_width}:{half_h}:force_original_aspect_ratio=increase,crop={target_width}:{half_h}[top];"
f"[bottom_raw]scale={target_width}:{half_h}:force_original_aspect_ratio=increase,crop={target_width}:{half_h}[bottom];"
f"[top][bottom]vstack=inputs=2[v]"
)
elif style == ShortsStyle.CROP_FILL:
# Crop to fill entire canvas
filter_complex = f"scale={target_width}:{target_height}:force_original_aspect_ratio=increase,crop={target_width}:{target_height}"
elif style == ShortsStyle.FIT_BARS:
# Fit with black bars
filter_complex = f"scale={target_width}:{target_height}:force_original_aspect_ratio=decrease,pad={target_width}:{target_height}:(ow-iw)/2:(oh-ih)/2"
elif aspect_ratio != AspectRatio.ORIGINAL:
# Default for non-original ratios: Scale to fit width
# Use -2 to ensure dimensions are divisible by 2 for yuv420p
filter_complex = f"scale={target_width}:-2,pad={target_width}:{target_height}:(ow-iw)/2:(oh-ih)/2,setsar=1[v]"
else:
filter_complex = ""
# Subtitle logic
if subtitle_path and os.path.exists(subtitle_path):
escaped_sub_path = escape_path_for_ass(subtitle_path)
if filter_complex:
if "[v]" not in filter_complex:
filter_complex += "[v]"
filter_complex += f";[v]ass='{escaped_sub_path}'[v]"
else:
filter_complex = f"[0:v]ass='{escaped_sub_path}'[v]"
# Audio Filter logic
audio_filter = ""
if include_audio:
if bg_music_path and os.path.exists(bg_music_path):
if has_orig_audio:
# Mix original audio with background music
# Use unique names [a_orig] [a_bg] to avoid collisions with [v]
audio_filter = f"[0:a]volume={video_volume}[a_orig];[1:a]volume={music_volume}[a_bg];[a_orig][a_bg]amix=inputs=2:duration=first[aout]"
else:
# Only background music (original has no audio)
audio_filter = f"[1:a]volume={music_volume}[aout]"
elif has_orig_audio and video_volume != 1.0:
audio_filter = f"[0:a]volume={video_volume}[aout]"
if filter_complex:
# If [v] is not the final tag, ensure it is
if "[v]" not in filter_complex:
filter_complex += "[v]"
if audio_filter:
combined_filter = f"{filter_complex};{audio_filter}"
cmd.extend(['-filter_complex', combined_filter, '-map', '[v]', '-map', '[aout]'])
else:
cmd.extend(['-filter_complex', filter_complex, '-map', '[v]'])
if include_audio:
cmd.extend(['-map', '0:a?'])
elif audio_filter:
# Only audio filter
cmd.extend(['-filter_complex', audio_filter, '-map', '0:v:0', '-map', '[aout]'])
if not include_audio:
cmd.extend(['-an'])
# Optimization settings
cmd.extend([
'-c:v', 'libx264',
'-preset', 'superfast',
'-crf', '23',
'-pix_fmt', 'yuv420p', # Ensure compatibility
'-c:a', 'aac', # Encode audio to AAC
'-b:a', '128k', # Standard bitrate
'-ac', '2', # Stereo
'-y',
'-loglevel', 'error',
output_path
])
subprocess.run(cmd, check=True, capture_output=True)
return output_path
except Exception as e:
print(f"❌ FFmpeg advanced extraction failed: {e}")
raise e
def should_use_ffmpeg(timestamps, custom_dims, export_audio, bg_music) -> bool:
"""
Determine if FFmpeg can be used instead of MoviePy
FFmpeg is faster for simple operations, MoviePy for complex ones
"""
# Use FFmpeg if:
# 1. No background music
# 2. No complex custom dimensions (just simple resize)
# 3. Simple timestamps (no overlapping or complex cuts)
# 4. Basic audio export (no mixing)
if bg_music:
return False # Need MoviePy for audio mixing
if custom_dims and hasattr(custom_dims, 'audio_path') and custom_dims.audio_path:
return False # Background music requires MoviePy
# Check if timestamps are simple (no overlapping)
if len(timestamps) > 1:
# Sort timestamps and check for overlaps
sorted_ts = sorted(timestamps, key=lambda x: x.start)
for i in range(1, len(sorted_ts)):
if sorted_ts[i].start < sorted_ts[i-1].end:
return False # Overlapping timestamps need MoviePy
return True
def hybrid_process_clips(video_path, timestamps, output_format, custom_dims=None, export_audio=True, bg_music=None, subtitle_path=None):
"""
Hybrid approach: Use FFmpeg for simple operations, MoviePy for complex ones
"""
try:
# Try FFmpeg first if conditions are met
if should_use_ffmpeg(timestamps, custom_dims, export_audio, bg_music):
print("Using FFmpeg for fast processing...")
return process_with_ffmpeg(video_path, timestamps, output_format, custom_dims, export_audio, subtitle_path=subtitle_path)
else:
print("Using MoviePy for complex processing...")
# Fall back to existing MoviePy implementation
from video_processor import process_video_clips
return process_video_clips(video_path, timestamps, output_format, custom_dims, export_audio)
except Exception as e:
print(f"Hybrid processing failed: {e}")
print("Falling back to MoviePy...")
from video_processor import process_video_clips
return process_video_clips(video_path, timestamps, output_format, custom_dims, export_audio)
def process_with_ffmpeg(video_path, timestamps, output_format, custom_dims=None, export_audio=True, subtitle_path=None):
"""Process clips using FFmpeg for maximum speed"""
clip_paths = []
# Get video info
video_info = get_video_info_ffmpeg(video_path)
if not video_info:
raise Exception("Could not get video info")
# Determine target dimensions
target_width, target_height = None, None
if custom_dims and hasattr(custom_dims, 'width') and hasattr(custom_dims, 'height'):
if custom_dims.width and custom_dims.height:
target_width = custom_dims.width
target_height = custom_dims.height
# Process each timestamp
for i, ts in enumerate(timestamps):
clip_id = uuid.uuid4().hex[:8]
# Generate output filename
ext = 'mp4' # Default
if output_format and hasattr(output_format, 'value'):
if output_format.value == 'avi':
ext = 'avi'
elif output_format.value == 'mov':
ext = 'mov'
output_filename = f"{clip_id}_clip_{i+1}.{ext}"
# Use PROCESSED_DIR for output
from routers.video import PROCESSED_DIR
output_path = os.path.join(PROCESSED_DIR, output_filename)
# Extract clip using FFmpeg
try:
clip_path = extract_clip_ffmpeg(
video_path,
ts.start_time if hasattr(ts, 'start_time') else ts.start,
ts.end_time if hasattr(ts, 'end_time') else ts.end,
output_path,
target_width,
target_height,
include_audio=export_audio,
style=output_format if isinstance(output_format, ShortsStyle) else None,
subtitle_path=subtitle_path
)
clip_paths.append(clip_path)
except Exception as e:
print(f"FFmpeg extraction failed for clip {i+1}: {e}")
raise e
return clip_paths, [] # No audio paths for FFmpeg method |