| import os |
| import uuid |
| from concurrent.futures import ThreadPoolExecutor |
| from moviepy import VideoFileClip, CompositeVideoClip, ColorClip |
| import numpy as np |
| from scipy.ndimage import gaussian_filter |
| from schemas import ShortsStyle, Dimensions, LayoutType, AspectRatio, SubtitlePreset |
| from hybrid_processor import process_video_hybrid |
| from routers.subtitle_generator import generate_pro_ass |
|
|
| def get_canvas_dimensions(ratio: AspectRatio) -> tuple: |
| """Returns (width, height) for a given aspect ratio.""" |
| if ratio == AspectRatio.RATIO_9_16: |
| return 1080, 1920 |
| elif ratio == AspectRatio.RATIO_1_1: |
| return 1080, 1080 |
| elif ratio == AspectRatio.RATIO_16_9: |
| return 1920, 1080 |
| elif ratio == AspectRatio.RATIO_4_5: |
| return 1080, 1350 |
| return None, None |
|
|
| def process_video_clips(video_path: str, timestamps, aspect_ratio: AspectRatio = AspectRatio.RATIO_9_16, style: ShortsStyle = ShortsStyle.ORIGINAL, custom_dims: Dimensions = None, export_audio: bool = False, use_parallel: bool = True, use_ffmpeg_optimization: bool = True, video_volume: float = 1.0, music_volume: float = 0.2, loop_music: bool = True, transcription: list = None, subtitle_preset: SubtitlePreset = None): |
| """ |
| Processes a video file into multiple clips based on timestamps and style. |
| If export_audio is True, also saves the original audio track of each clip. |
| use_parallel: Enable parallel processing for better performance (default: True) |
| use_ffmpeg_optimization: Use FFmpeg for simple operations (much faster) (default: True) |
| Supports hard-burned subtitles if transcription is provided. |
| """ |
| clip_paths = [] |
| audio_paths = [] |
| |
| |
| bg_music_path = None |
| if custom_dims and hasattr(custom_dims, 'audio_path') and custom_dims.audio_path: |
| if os.path.exists(custom_dims.audio_path): |
| bg_music_path = custom_dims.audio_path |
| |
| |
| if custom_dims: |
| custom_dims.video_volume = video_volume |
| custom_dims.music_volume = music_volume |
| custom_dims.loop_music = loop_music |
|
|
| |
| subtitle_path = None |
| if transcription: |
| if not subtitle_preset: |
| subtitle_preset = SubtitlePreset(name="Default") |
| |
| |
| temp_dir = os.path.dirname(video_path) |
| subtitle_path = os.path.join(temp_dir, f"sub_{uuid.uuid4().hex[:8]}.ass") |
| generate_pro_ass(transcription, subtitle_preset, subtitle_path) |
| print(f"π Subtitles generated: {subtitle_path}") |
|
|
| |
| if use_ffmpeg_optimization: |
| try: |
| print(f"π Trying FFmpeg optimization for style: {style}...") |
| clip_paths, audio_paths = process_video_hybrid( |
| video_path=video_path, |
| timestamps=timestamps, |
| output_format=style, |
| custom_dims=custom_dims, |
| export_audio=export_audio, |
| aspect_ratio=aspect_ratio, |
| bg_music=bg_music_path, |
| subtitle_paths=[subtitle_path] * len(timestamps) if subtitle_path else None |
| ) |
| if clip_paths: |
| print(f"β
FFmpeg optimization successful! Processed {len(clip_paths)} clips") |
| return clip_paths, audio_paths |
| except Exception as e: |
| print(f"β οΈ FFmpeg optimization failed: {e}") |
| print("π¬ Falling back to MoviePy...") |
|
|
| try: |
| |
| bg_music = None |
| if bg_music_path: |
| from moviepy import AudioFileClip, CompositeAudioClip |
| import moviepy.audio.fx as afx |
| bg_music = AudioFileClip(bg_music_path) |
|
|
| if use_parallel and len(timestamps) > 1: |
| |
| max_workers = min(3, len(timestamps)) |
| |
| with ThreadPoolExecutor(max_workers=max_workers) as executor: |
| |
| futures = [] |
| for i, ts in enumerate(timestamps): |
| clip_id = uuid.uuid4().hex[:8] |
| future = executor.submit( |
| process_single_clip, |
| ts, video_path, aspect_ratio, style, custom_dims, |
| export_audio, bg_music, clip_id, |
| video_volume=video_volume, |
| music_volume=music_volume, |
| loop_music=loop_music |
| ) |
| futures.append((future, clip_id)) |
| |
| |
| for future, clip_id in futures: |
| try: |
| output_path, audio_output_path = future.result() |
| if output_path: |
| clip_paths.append(output_path) |
| if export_audio: |
| audio_paths.append(audio_output_path) |
| except Exception as e: |
| print(f"Error processing clip {clip_id}: {str(e)}") |
| if export_audio: |
| audio_paths.append(None) |
| else: |
| |
| from moviepy import VideoFileClip, CompositeAudioClip |
| import moviepy.audio.fx as afx |
| |
| |
| from routers.video import PROCESSED_DIR, TEMP_DIR |
| |
| with VideoFileClip(video_path) as video: |
| print(f"DEBUG: Video loaded. Duration: {video.duration}") |
| |
| for ts in timestamps: |
| print(f"DEBUG: Processing clip. Request: Start={ts.start_time}, End={ts.end_time}") |
| |
| |
| if ts.start_time >= video.duration: |
| print(f"DEBUG: Skipping clip. Start time {ts.start_time} is beyond video duration {video.duration}.") |
| continue |
| end = min(ts.end_time, video.duration) |
| print(f"DEBUG: Extracting subclip from {ts.start_time} to {end}") |
| |
| |
| clip_id = uuid.uuid4().hex[:8] |
| output_filename = f"clip_{clip_id}.mp4" |
| output_path = os.path.join(PROCESSED_DIR, output_filename) |
| audio_output_path = None |
| |
| |
| with video.subclipped(ts.start_time, end) as subclip: |
| |
| if bg_music: |
| |
| bg_music_clip = bg_music.with_duration(subclip.duration) |
| if loop_music: |
| bg_music_clip = bg_music_clip.fx(afx.AudioLoop, duration=subclip.duration) |
| |
| |
| if subclip.audio: |
| original_audio = subclip.audio.with_volume_scaled(video_volume) |
| bg_music_clip = bg_music_clip.with_volume_scaled(music_volume) |
| |
| subclip.audio = CompositeAudioClip([original_audio, bg_music_clip]) |
| else: |
| subclip.audio = bg_music_clip.with_volume_scaled(music_volume) |
|
|
| |
| if aspect_ratio == AspectRatio.ORIGINAL or style == ShortsStyle.ORIGINAL: |
| pass |
| else: |
| |
| layout_map = { |
| ShortsStyle.CINEMATIC: LayoutType.CINEMATIC_BLUR, |
| ShortsStyle.CROP_FILL: LayoutType.CROP_CENTER, |
| ShortsStyle.FIT_BARS: LayoutType.FIT_CENTER, |
| ShortsStyle.SPLIT_SCREEN: LayoutType.SPLIT_SCREEN |
| } |
| layout = layout_map.get(style, LayoutType.CROP_CENTER) |
| |
| |
| target_w, target_h = get_canvas_dimensions(aspect_ratio) |
| |
| if target_w and target_h: |
| subclip = apply_layout_factory(subclip, layout, target_w, target_h, custom_dims) |
| |
| |
| temp_audio = os.path.join(TEMP_DIR, f"temp-audio-{clip_id}.m4a") |
| subclip.write_videofile( |
| output_path, |
| codec="libx264", |
| audio_codec="aac", |
| temp_audiofile=temp_audio, |
| remove_temp=True, |
| fps=24, |
| threads=4, |
| preset="superfast", |
| logger=None |
| ) |
| |
| if output_path: |
| clip_paths.append(output_path) |
| if export_audio: |
| audio_paths.append(audio_output_path) |
| |
| if bg_music: bg_music.close() |
| return clip_paths, audio_paths |
|
|
| except Exception as e: |
| print(f"Error processing video: {str(e)}") |
| raise e |
|
|
| def apply_layout_factory(clip, layout_type, target_w, target_h, config=None): |
| """ |
| Factory to apply different video layouts. |
| """ |
| if layout_type == LayoutType.CINEMATIC_BLUR: |
| return apply_cinematic_blur(clip, target_w, target_h, config.blur_intensity if config and hasattr(config, 'blur_intensity') else 20) |
| elif layout_type == LayoutType.FIT_CENTER: |
| return apply_fit_layout(clip, target_w, target_h) |
| elif layout_type == LayoutType.CROP_CENTER: |
| |
| target_ratio = target_w / target_h |
| formatted = format_clip(clip, target_ratio) |
| return formatted.resized(width=target_w, height=target_h) |
| elif layout_type == LayoutType.SPLIT_SCREEN: |
| return apply_split_screen(clip, target_w, target_h) |
| else: |
| |
| return clip.resized(width=target_w) if clip.w > target_w else clip |
|
|
| def apply_split_screen(clip, target_w, target_h): |
| """ |
| Splits the screen into two halves (top and bottom) with the same video. |
| """ |
| half_h = target_h // 2 |
| |
| |
| top = format_clip(clip, target_w / half_h).resized(width=target_w, height=half_h) |
| |
| |
| bottom = format_clip(clip, target_w / half_h).resized(width=target_w, height=half_h) |
| |
| return CompositeVideoClip([ |
| top.with_position(("center", "top")), |
| bottom.with_position(("center", "bottom")) |
| ], size=(target_w, target_h)) |
|
|
| def apply_cinematic_blur(clip, target_w, target_h, blur_intensity=20): |
| """ |
| Creates a cinematic blurred background with the original video on top. |
| Uses custom fl_image filter for maximum compatibility. |
| """ |
| def blur_filter(image): |
| |
| |
| |
| return gaussian_filter(image, sigma=(blur_intensity, blur_intensity, 0)) |
|
|
| |
| bg = format_clip(clip, target_w / target_h) |
| bg = bg.resized(width=target_w, height=target_h) |
| |
| |
| bg = bg.image_transform(blur_filter) |
| |
| |
| fg = clip.resized(width=target_w) |
| |
| |
| final_clip = CompositeVideoClip([ |
| bg, |
| fg.with_position("center") |
| ], size=(target_w, target_h)) |
| |
| return final_clip |
|
|
| def apply_fit_layout(clip, target_w, target_h): |
| """ |
| Fits the video inside the target dimensions with black bars (Letterboxing). |
| """ |
| |
| fg = clip.resized(width=target_w) if (clip.w / clip.h) > (target_w / target_h) else clip.resized(height=target_h) |
| |
| |
| bg = ColorClip(size=(target_w, target_h), color=(0,0,0), duration=clip.duration) |
| |
| return CompositeVideoClip([bg, fg.with_position("center")], size=(target_w, target_h)) |
|
|
| def format_clip(clip, target_ratio): |
| """ |
| Crops and resizes a clip to a target aspect ratio. |
| """ |
| w, h = clip.size |
| current_ratio = w / h |
| |
| if current_ratio > target_ratio: |
| |
| new_w = h * target_ratio |
| subclip = clip.cropped(x_center=w/2, width=new_w) |
| else: |
| |
| new_h = w / target_ratio |
| subclip = clip.cropped(y_center=h/2, height=new_h) |
| |
| |
| if target_ratio == 9/16: |
| return subclip.resized(height=1920) if subclip.h < 1920 else subclip |
| elif target_ratio == 16/9: |
| return subclip.resized(width=1920) if subclip.w < 1920 else subclip |
| elif target_ratio == 1/1: |
| return subclip.resized(width=1080) if subclip.w < 1080 else subclip |
| |
| return subclip |
|
|
| def safe_remove(path: str, max_retries: int = 3): |
| """Attempt to remove a file with retries for Windows file locking.""" |
| import time |
| import os |
| |
| if not os.path.exists(path): |
| print(f"[SAFE_REMOVE] File does not exist: {path}") |
| return |
| |
| for i in range(max_retries): |
| try: |
| os.remove(path) |
| print(f"[SAFE_REMOVE] Successfully deleted: {path}") |
| return True |
| except PermissionError as e: |
| print(f"[SAFE_REMOVE] Permission error on attempt {i+1}: {e}") |
| if i < max_retries - 1: |
| time.sleep(1) |
| else: |
| print(f"[SAFE_REMOVE] Warning: Could not delete {path} after {max_retries} attempts.") |
| return False |
| except Exception as e: |
| print(f"[SAFE_REMOVE] Error deleting {path}: {e}") |
| return False |
| |
| return False |
|
|
| def create_zip_archive(file_paths: list, output_filename: str): |
| """ |
| Creates a ZIP archive containing the specified files. |
| """ |
| import zipfile |
| |
| |
| file_paths = [f for f in file_paths if f and os.path.exists(f)] |
| |
| if not file_paths: |
| return None |
|
|
| zip_path = os.path.join(os.path.dirname(file_paths[0]), output_filename) |
| |
| with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: |
| for file in file_paths: |
| zipf.write(file, os.path.basename(file)) |
| |
| return zip_path |
|
|
| def process_single_clip(ts, video_path, aspect_ratio, style, custom_dims, export_audio, bg_music, clip_id, video_volume=1.0, music_volume=0.2, loop_music=True): |
| """ |
| Process a single clip - for parallel processing. |
| """ |
| try: |
| |
| if not custom_dims: |
| from schemas import Dimensions |
| custom_dims = Dimensions() |
| |
| custom_dims.video_volume = video_volume |
| custom_dims.music_volume = music_volume |
| custom_dims.loop_music = loop_music |
|
|
| from moviepy import VideoFileClip, CompositeAudioClip |
| import moviepy.audio.fx as afx |
| from schemas import AspectRatio, ShortsStyle, LayoutType |
| |
| |
| with VideoFileClip(video_path) as video: |
| print(f"DEBUG: Processing clip {clip_id}. Request: Start={ts.start_time}, End={ts.end_time}") |
| |
| |
| if ts.start_time >= video.duration: |
| print(f"DEBUG: Skipping clip {clip_id}. Start time {ts.start_time} is beyond video duration {video.duration}.") |
| return None, None |
| |
| end = min(ts.end_time, video.duration) |
| print(f"DEBUG: Extracting subclip {clip_id} from {ts.start_time} to {end}") |
| |
| |
| subclip = video.subclipped(ts.start_time, end) |
| |
| |
| output_filename = f"clip_{clip_id}.mp4" |
| from routers.video import PROCESSED_DIR, TEMP_DIR |
| output_path = os.path.join(PROCESSED_DIR, output_filename) |
| |
| |
| audio_output_path = None |
| |
| |
| if bg_music: |
| from moviepy import CompositeAudioClip |
| import moviepy.audio.fx as afx |
| |
| |
| bg_music_clip = bg_music.with_duration(subclip.duration) |
| if loop_music: |
| bg_music_clip = bg_music_clip.fx(afx.AudioLoop, duration=subclip.duration) |
| |
| |
| original_audio = subclip.audio.with_volume_scaled(video_volume) |
| bg_music_clip = bg_music_clip.with_volume_scaled(music_volume) |
| |
| |
| subclip.audio = CompositeAudioClip([original_audio, bg_music_clip]) |
|
|
| |
| if aspect_ratio == AspectRatio.ORIGINAL or style == ShortsStyle.ORIGINAL: |
| pass |
| else: |
| |
| layout_map = { |
| ShortsStyle.CINEMATIC: LayoutType.CINEMATIC_BLUR, |
| ShortsStyle.CROP_FILL: LayoutType.CROP_CENTER, |
| ShortsStyle.FIT_BARS: LayoutType.FIT_CENTER, |
| ShortsStyle.SPLIT_SCREEN: LayoutType.SPLIT_SCREEN |
| } |
| layout = layout_map.get(style, LayoutType.CROP_CENTER) |
| |
| |
| target_w, target_h = get_canvas_dimensions(aspect_ratio) |
| |
| if target_w and target_h: |
| subclip = apply_layout_factory(subclip, layout, target_w, target_h, custom_dims) |
| |
| |
| temp_audio = os.path.join(TEMP_DIR, f"temp-audio-{clip_id}.m4a") |
| subclip.write_videofile( |
| output_path, |
| codec="libx264", |
| audio_codec="aac", |
| temp_audiofile=temp_audio, |
| remove_temp=True, |
| fps=24, |
| threads=4, |
| preset="superfast", |
| logger=None |
| ) |
| |
| |
| if subclip.audio: subclip.audio.close() |
| subclip.close() |
| |
| return output_path, audio_output_path |
| |
| except Exception as e: |
| print(f"Error processing clip {clip_id}: {str(e)}") |
| return None, None |
|
|
| def extract_audio_from_video(video_path: str, output_format: str = "mp3"): |
| """ |
| Extract audio from a video file and save it as an audio file. |
| """ |
| try: |
| from routers.video import AUDIO_DIR |
| |
| base_name = os.path.splitext(os.path.basename(video_path))[0] |
| audio_filename = f"{base_name}_audio.{output_format}" |
| output_path = os.path.join(AUDIO_DIR, audio_filename) |
| |
| |
| try: |
| import subprocess |
| import imageio_ffmpeg |
| |
| |
| ffmpeg_exe = imageio_ffmpeg.get_ffmpeg_exe() |
| |
| |
| if output_format.lower() == 'mp3': |
| codec = 'libmp3lame' |
| elif output_format.lower() == 'wav': |
| codec = 'pcm_s16le' |
| elif output_format.lower() in ['m4a', 'aac']: |
| codec = 'aac' |
| else: |
| codec = 'copy' |
| |
| |
| cmd = [ |
| ffmpeg_exe, '-i', video_path, |
| '-vn', |
| '-acodec', codec, |
| '-y', |
| '-loglevel', 'error', |
| output_path |
| ] |
| |
| result = subprocess.run(cmd, capture_output=True, text=True) |
| |
| if result.returncode == 0 and os.path.exists(output_path): |
| print(f"β Audio extracted using FFmpeg (fast method)") |
| return output_path |
| else: |
| print(f"FFmpeg failed: {result.stderr}") |
| raise Exception("FFmpeg extraction failed") |
| |
| except Exception as ffmpeg_error: |
| print(f"FFmpeg not available or failed: {ffmpeg_error}") |
| print("Falling back to MoviePy method...") |
| |
| |
| from moviepy import VideoFileClip |
| |
| with VideoFileClip(video_path) as video: |
| if video.audio is None: |
| raise ValueError("Video has no audio track") |
| |
| video.audio.write_audiofile( |
| output_path, |
| logger=None |
| ) |
| |
| print(f"β Audio extracted using MoviePy (fallback method)") |
| return output_path |
| |
| except Exception as e: |
| print(f"Error extracting audio: {str(e)}") |
| raise e |
|
|