Clipping / video_processor.py
aliSaac510's picture
add sub endpoints
c63c39e
import os
import uuid
from concurrent.futures import ThreadPoolExecutor
from moviepy import VideoFileClip, CompositeVideoClip, ColorClip
import numpy as np
from scipy.ndimage import gaussian_filter
from schemas import ShortsStyle, Dimensions, LayoutType, AspectRatio, SubtitlePreset
from hybrid_processor import process_video_hybrid
from routers.subtitle_generator import generate_pro_ass
def get_canvas_dimensions(ratio: AspectRatio) -> tuple:
"""Returns (width, height) for a given aspect ratio."""
if ratio == AspectRatio.RATIO_9_16:
return 1080, 1920
elif ratio == AspectRatio.RATIO_1_1:
return 1080, 1080
elif ratio == AspectRatio.RATIO_16_9:
return 1920, 1080
elif ratio == AspectRatio.RATIO_4_5:
return 1080, 1350
return None, None # For ORIGINAL
def process_video_clips(video_path: str, timestamps, aspect_ratio: AspectRatio = AspectRatio.RATIO_9_16, style: ShortsStyle = ShortsStyle.ORIGINAL, custom_dims: Dimensions = None, export_audio: bool = False, use_parallel: bool = True, use_ffmpeg_optimization: bool = True, video_volume: float = 1.0, music_volume: float = 0.2, loop_music: bool = True, transcription: list = None, subtitle_preset: SubtitlePreset = None):
"""
Processes a video file into multiple clips based on timestamps and style.
If export_audio is True, also saves the original audio track of each clip.
use_parallel: Enable parallel processing for better performance (default: True)
use_ffmpeg_optimization: Use FFmpeg for simple operations (much faster) (default: True)
Supports hard-burned subtitles if transcription is provided.
"""
clip_paths = []
audio_paths = []
# Extract background music path if available
bg_music_path = None
if custom_dims and hasattr(custom_dims, 'audio_path') and custom_dims.audio_path:
if os.path.exists(custom_dims.audio_path):
bg_music_path = custom_dims.audio_path
# Ensure custom_dims has the volume info for hybrid processing
if custom_dims:
custom_dims.video_volume = video_volume
custom_dims.music_volume = music_volume
custom_dims.loop_music = loop_music
# Handle Subtitles if transcription is provided
subtitle_path = None
if transcription:
if not subtitle_preset:
subtitle_preset = SubtitlePreset(name="Default")
# Generate temporary ASS file
temp_dir = os.path.dirname(video_path)
subtitle_path = os.path.join(temp_dir, f"sub_{uuid.uuid4().hex[:8]}.ass")
generate_pro_ass(transcription, subtitle_preset, subtitle_path)
print(f"πŸ“ Subtitles generated: {subtitle_path}")
# Try FFmpeg optimization first for simple cases
if use_ffmpeg_optimization:
try:
print(f"πŸš€ Trying FFmpeg optimization for style: {style}...")
clip_paths, audio_paths = process_video_hybrid(
video_path=video_path,
timestamps=timestamps,
output_format=style,
custom_dims=custom_dims,
export_audio=export_audio,
aspect_ratio=aspect_ratio,
bg_music=bg_music_path,
subtitle_paths=[subtitle_path] * len(timestamps) if subtitle_path else None
)
if clip_paths: # If FFmpeg worked, return results
print(f"βœ… FFmpeg optimization successful! Processed {len(clip_paths)} clips")
return clip_paths, audio_paths
except Exception as e:
print(f"⚠️ FFmpeg optimization failed: {e}")
print("🎬 Falling back to MoviePy...")
try:
# Load background music if provided for MoviePy fallback
bg_music = None
if bg_music_path:
from moviepy import AudioFileClip, CompositeAudioClip
import moviepy.audio.fx as afx
bg_music = AudioFileClip(bg_music_path)
if use_parallel and len(timestamps) > 1:
# Process clips in parallel for better performance
max_workers = min(3, len(timestamps)) # Limit parallel workers
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all clip processing tasks
futures = []
for i, ts in enumerate(timestamps):
clip_id = uuid.uuid4().hex[:8]
future = executor.submit(
process_single_clip,
ts, video_path, aspect_ratio, style, custom_dims,
export_audio, bg_music, clip_id,
video_volume=video_volume,
music_volume=music_volume,
loop_music=loop_music
)
futures.append((future, clip_id))
# Collect results
for future, clip_id in futures:
try:
output_path, audio_output_path = future.result()
if output_path:
clip_paths.append(output_path)
if export_audio:
audio_paths.append(audio_output_path)
except Exception as e:
print(f"Error processing clip {clip_id}: {str(e)}")
if export_audio:
audio_paths.append(None)
else:
# Sequential processing - more memory efficient for single clips or when parallel is disabled
from moviepy import VideoFileClip, CompositeAudioClip
import moviepy.audio.fx as afx
# Define folders
from routers.video import PROCESSED_DIR, TEMP_DIR
with VideoFileClip(video_path) as video:
print(f"DEBUG: Video loaded. Duration: {video.duration}")
for ts in timestamps:
print(f"DEBUG: Processing clip. Request: Start={ts.start_time}, End={ts.end_time}")
# Basic validation
if ts.start_time >= video.duration:
print(f"DEBUG: Skipping clip. Start time {ts.start_time} is beyond video duration {video.duration}.")
continue
end = min(ts.end_time, video.duration)
print(f"DEBUG: Extracting subclip from {ts.start_time} to {end}")
# Generate unique ID for this clip operation
clip_id = uuid.uuid4().hex[:8]
output_filename = f"clip_{clip_id}.mp4"
output_path = os.path.join(PROCESSED_DIR, output_filename)
audio_output_path = None
# Extract subclip and process it
with video.subclipped(ts.start_time, end) as subclip:
# Apply background music if available
if bg_music:
# Setup background music
bg_music_clip = bg_music.with_duration(subclip.duration)
if loop_music:
bg_music_clip = bg_music_clip.fx(afx.AudioLoop, duration=subclip.duration)
# Apply volumes
if subclip.audio:
original_audio = subclip.audio.with_volume_scaled(video_volume)
bg_music_clip = bg_music_clip.with_volume_scaled(music_volume)
# Combine audio
subclip.audio = CompositeAudioClip([original_audio, bg_music_clip])
else:
subclip.audio = bg_music_clip.with_volume_scaled(music_volume)
# Apply formatting
if aspect_ratio == AspectRatio.ORIGINAL or style == ShortsStyle.ORIGINAL:
pass # Skip resizing, keep original dimensions
else:
# Map style to layout
layout_map = {
ShortsStyle.CINEMATIC: LayoutType.CINEMATIC_BLUR,
ShortsStyle.CROP_FILL: LayoutType.CROP_CENTER,
ShortsStyle.FIT_BARS: LayoutType.FIT_CENTER,
ShortsStyle.SPLIT_SCREEN: LayoutType.SPLIT_SCREEN
}
layout = layout_map.get(style, LayoutType.CROP_CENTER)
# Get target dimensions
target_w, target_h = get_canvas_dimensions(aspect_ratio)
if target_w and target_h:
subclip = apply_layout_factory(subclip, layout, target_w, target_h, custom_dims)
# Write file - optimized settings for compatibility and speed
temp_audio = os.path.join(TEMP_DIR, f"temp-audio-{clip_id}.m4a")
subclip.write_videofile(
output_path,
codec="libx264",
audio_codec="aac",
temp_audiofile=temp_audio,
remove_temp=True,
fps=24,
threads=4,
preset="superfast",
logger=None
)
if output_path:
clip_paths.append(output_path)
if export_audio:
audio_paths.append(audio_output_path)
if bg_music: bg_music.close()
return clip_paths, audio_paths
except Exception as e:
print(f"Error processing video: {str(e)}")
raise e
def apply_layout_factory(clip, layout_type, target_w, target_h, config=None):
"""
Factory to apply different video layouts.
"""
if layout_type == LayoutType.CINEMATIC_BLUR:
return apply_cinematic_blur(clip, target_w, target_h, config.blur_intensity if config and hasattr(config, 'blur_intensity') else 20)
elif layout_type == LayoutType.FIT_CENTER:
return apply_fit_layout(clip, target_w, target_h)
elif layout_type == LayoutType.CROP_CENTER:
# Default to Crop/Fill
target_ratio = target_w / target_h
formatted = format_clip(clip, target_ratio)
return formatted.resized(width=target_w, height=target_h)
elif layout_type == LayoutType.SPLIT_SCREEN:
return apply_split_screen(clip, target_w, target_h)
else:
# Fallback
return clip.resized(width=target_w) if clip.w > target_w else clip
def apply_split_screen(clip, target_w, target_h):
"""
Splits the screen into two halves (top and bottom) with the same video.
"""
half_h = target_h // 2
# Top half: resize to fill
top = format_clip(clip, target_w / half_h).resized(width=target_w, height=half_h)
# Bottom half: same video
bottom = format_clip(clip, target_w / half_h).resized(width=target_w, height=half_h)
return CompositeVideoClip([
top.with_position(("center", "top")),
bottom.with_position(("center", "bottom"))
], size=(target_w, target_h))
def apply_cinematic_blur(clip, target_w, target_h, blur_intensity=20):
"""
Creates a cinematic blurred background with the original video on top.
Uses custom fl_image filter for maximum compatibility.
"""
def blur_filter(image):
# Apply gaussian filter to the RGB channels (axis 0 and 1)
# Sigma is the blur intensity.
# We use (blur_intensity, blur_intensity, 0) to not blur the color channel axis.
return gaussian_filter(image, sigma=(blur_intensity, blur_intensity, 0))
# 1. Background: Scale to fill and blur
bg = format_clip(clip, target_w / target_h)
bg = bg.resized(width=target_w, height=target_h)
# Apply our custom blur filter
bg = bg.image_transform(blur_filter)
# 2. Foreground: Resize original to fit width while keeping aspect ratio
fg = clip.resized(width=target_w)
# Center the foreground on the background
final_clip = CompositeVideoClip([
bg,
fg.with_position("center")
], size=(target_w, target_h))
return final_clip
def apply_fit_layout(clip, target_w, target_h):
"""
Fits the video inside the target dimensions with black bars (Letterboxing).
"""
# Resize to fit within target dims
fg = clip.resized(width=target_w) if (clip.w / clip.h) > (target_w / target_h) else clip.resized(height=target_h)
# Create black background
bg = ColorClip(size=(target_w, target_h), color=(0,0,0), duration=clip.duration)
return CompositeVideoClip([bg, fg.with_position("center")], size=(target_w, target_h))
def format_clip(clip, target_ratio):
"""
Crops and resizes a clip to a target aspect ratio.
"""
w, h = clip.size
current_ratio = w / h
if current_ratio > target_ratio:
# Source is wider than target, crop sides
new_w = h * target_ratio
subclip = clip.cropped(x_center=w/2, width=new_w)
else:
# Source is taller than target, crop top/bottom
new_h = w / target_ratio
subclip = clip.cropped(y_center=h/2, height=new_h)
# Standardize resolutions for known formats
if target_ratio == 9/16: # Shorts
return subclip.resized(height=1920) if subclip.h < 1920 else subclip
elif target_ratio == 16/9: # Standard
return subclip.resized(width=1920) if subclip.w < 1920 else subclip
elif target_ratio == 1/1: # Square
return subclip.resized(width=1080) if subclip.w < 1080 else subclip
return subclip
def safe_remove(path: str, max_retries: int = 3):
"""Attempt to remove a file with retries for Windows file locking."""
import time
import os
if not os.path.exists(path):
print(f"[SAFE_REMOVE] File does not exist: {path}")
return
for i in range(max_retries):
try:
os.remove(path)
print(f"[SAFE_REMOVE] Successfully deleted: {path}")
return True
except PermissionError as e:
print(f"[SAFE_REMOVE] Permission error on attempt {i+1}: {e}")
if i < max_retries - 1:
time.sleep(1) # Wait for handles to clear
else:
print(f"[SAFE_REMOVE] Warning: Could not delete {path} after {max_retries} attempts.")
return False
except Exception as e:
print(f"[SAFE_REMOVE] Error deleting {path}: {e}")
return False
return False
def create_zip_archive(file_paths: list, output_filename: str):
"""
Creates a ZIP archive containing the specified files.
"""
import zipfile
# Filter out None values
file_paths = [f for f in file_paths if f and os.path.exists(f)]
if not file_paths:
return None
zip_path = os.path.join(os.path.dirname(file_paths[0]), output_filename)
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for file in file_paths:
zipf.write(file, os.path.basename(file))
return zip_path
def process_single_clip(ts, video_path, aspect_ratio, style, custom_dims, export_audio, bg_music, clip_id, video_volume=1.0, music_volume=0.2, loop_music=True):
"""
Process a single clip - for parallel processing.
"""
try:
# Ensure custom_dims has the volume info
if not custom_dims:
from schemas import Dimensions
custom_dims = Dimensions()
custom_dims.video_volume = video_volume
custom_dims.music_volume = music_volume
custom_dims.loop_music = loop_music
from moviepy import VideoFileClip, CompositeAudioClip
import moviepy.audio.fx as afx
from schemas import AspectRatio, ShortsStyle, LayoutType
# Open video for this clip (parallel processing)
with VideoFileClip(video_path) as video:
print(f"DEBUG: Processing clip {clip_id}. Request: Start={ts.start_time}, End={ts.end_time}")
# Basic validation
if ts.start_time >= video.duration:
print(f"DEBUG: Skipping clip {clip_id}. Start time {ts.start_time} is beyond video duration {video.duration}.")
return None, None
end = min(ts.end_time, video.duration)
print(f"DEBUG: Extracting subclip {clip_id} from {ts.start_time} to {end}")
# Extract subclip
subclip = video.subclipped(ts.start_time, end)
# Generate unique ID for this clip operation
output_filename = f"clip_{clip_id}.mp4"
from routers.video import PROCESSED_DIR, TEMP_DIR
output_path = os.path.join(PROCESSED_DIR, output_filename)
# (Removed automatic audio extraction to mp3)
audio_output_path = None
# Apply background music if available
if bg_music:
from moviepy import CompositeAudioClip
import moviepy.audio.fx as afx
# Setup background music
bg_music_clip = bg_music.with_duration(subclip.duration)
if loop_music:
bg_music_clip = bg_music_clip.fx(afx.AudioLoop, duration=subclip.duration)
# Apply volumes
original_audio = subclip.audio.with_volume_scaled(video_volume)
bg_music_clip = bg_music_clip.with_volume_scaled(music_volume)
# Combine audio
subclip.audio = CompositeAudioClip([original_audio, bg_music_clip])
# Apply formatting
if aspect_ratio == AspectRatio.ORIGINAL or style == ShortsStyle.ORIGINAL:
pass # Skip resizing, keep original dimensions
else:
# Map style to layout
layout_map = {
ShortsStyle.CINEMATIC: LayoutType.CINEMATIC_BLUR,
ShortsStyle.CROP_FILL: LayoutType.CROP_CENTER,
ShortsStyle.FIT_BARS: LayoutType.FIT_CENTER,
ShortsStyle.SPLIT_SCREEN: LayoutType.SPLIT_SCREEN
}
layout = layout_map.get(style, LayoutType.CROP_CENTER)
# Get target dimensions
target_w, target_h = get_canvas_dimensions(aspect_ratio)
if target_w and target_h:
subclip = apply_layout_factory(subclip, layout, target_w, target_h, custom_dims)
# Write file - optimized settings for speed
temp_audio = os.path.join(TEMP_DIR, f"temp-audio-{clip_id}.m4a")
subclip.write_videofile(
output_path,
codec="libx264",
audio_codec="aac",
temp_audiofile=temp_audio,
remove_temp=True,
fps=24,
threads=4,
preset="superfast",
logger=None
)
# Explicitly close everything
if subclip.audio: subclip.audio.close()
subclip.close()
return output_path, audio_output_path
except Exception as e:
print(f"Error processing clip {clip_id}: {str(e)}")
return None, None
def extract_audio_from_video(video_path: str, output_format: str = "mp3"):
"""
Extract audio from a video file and save it as an audio file.
"""
try:
from routers.video import AUDIO_DIR
# Generate output filename
base_name = os.path.splitext(os.path.basename(video_path))[0]
audio_filename = f"{base_name}_audio.{output_format}"
output_path = os.path.join(AUDIO_DIR, audio_filename)
# Try FFmpeg first (fastest method)
try:
import subprocess
import imageio_ffmpeg
# Get FFmpeg executable path from imageio_ffmpeg
ffmpeg_exe = imageio_ffmpeg.get_ffmpeg_exe()
# Determine codec based on output format
if output_format.lower() == 'mp3':
codec = 'libmp3lame'
elif output_format.lower() == 'wav':
codec = 'pcm_s16le'
elif output_format.lower() in ['m4a', 'aac']:
codec = 'aac'
else:
codec = 'copy' # Try to copy without re-encoding
# FFmpeg command to extract audio stream only (much faster)
cmd = [
ffmpeg_exe, '-i', video_path,
'-vn', # no video processing
'-acodec', codec,
'-y', # overwrite output
'-loglevel', 'error', # suppress output
output_path
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0 and os.path.exists(output_path):
print(f"βœ“ Audio extracted using FFmpeg (fast method)")
return output_path
else:
print(f"FFmpeg failed: {result.stderr}")
raise Exception("FFmpeg extraction failed")
except Exception as ffmpeg_error:
print(f"FFmpeg not available or failed: {ffmpeg_error}")
print("Falling back to MoviePy method...")
# Fallback to MoviePy method
from moviepy import VideoFileClip
with VideoFileClip(video_path) as video:
if video.audio is None:
raise ValueError("Video has no audio track")
video.audio.write_audiofile(
output_path,
logger=None
)
print(f"βœ“ Audio extracted using MoviePy (fallback method)")
return output_path
except Exception as e:
print(f"Error extracting audio: {str(e)}")
raise e