""" Video editing using ffmpeg - clips, crops, reframes for social media """ import asyncio import logging import subprocess from pathlib import Path from typing import Dict, List, Tuple import ffmpeg from config import DEFAULT_ASPECT_RATIOS, settings from models.schemas import BrandKit, Clip, ProcessRequest logger = logging.getLogger(__name__) class VideoEditor: """Edit and reframe videos for social media""" def __init__(self): """Initialize video editor""" self.logger = logger async def process_clips( self, video_path: Path, clips: List[Clip], request: ProcessRequest ) -> List[Clip]: """ Process clips: cut, reframe, and add watermark Args: video_path: Path to source video clips: List of Clip objects with timing request: Processing request with formatting options Returns: Updated clips with output paths Raises: Exception: If processing fails """ try: # Get video info video_info = await self._get_video_info(video_path) self.logger.info(f"Video info: {video_info}") loop = asyncio.get_event_loop() for i, clip in enumerate(clips): try: # Create output directory for this clip clip_output_dir = settings.output_dir / clip.job_id / str(i) clip_output_dir.mkdir(parents=True, exist_ok=True) # Step 1: Cut the clip cut_path = clip_output_dir / "01_cut.mp4" await self._cut_clip(str(video_path), clip.start_time, clip.end_time, str(cut_path)) # Step 2: Reframe to target aspect ratio reframed_path = clip_output_dir / "02_reframed.mp4" await self._reframe(str(cut_path), str(reframed_path), request.aspect_ratio, video_info) # Step 3: Add watermark/brand kit final_path = clip_output_dir / "03_final.mp4" if request.brand_kit.logo_url or request.brand_kit.watermark_opacity > 0: await self._add_watermark(str(reframed_path), str(final_path), request.brand_kit) else: final_path = reframed_path # Step 4: Generate thumbnail thumbnail_path = clip_output_dir / "thumbnail.jpg" await self._generate_thumbnail(str(final_path), str(thumbnail_path)) # Update clip with output paths clip.output_path = str(final_path) clip.thumbnail_path = str(thumbnail_path) self.logger.info(f"Processed clip {i + 1}/{len(clips)}") except Exception as e: self.logger.error(f"Failed to process clip {i}: {str(e)}") clip.output_path = None clip.thumbnail_path = None return clips except Exception as e: self.logger.error(f"Video processing failed: {str(e)}") raise Exception(f"Video processing failed: {str(e)}") async def _cut_clip( self, video_path: str, start_time: float, end_time: float, output_path: str ) -> None: """ Cut a clip from video using ffmpeg Args: video_path: Path to source video start_time: Start time in seconds end_time: End time in seconds output_path: Path to output clip """ try: loop = asyncio.get_event_loop() await loop.run_in_executor( None, self._cut_clip_sync, video_path, start_time, end_time, output_path ) except Exception as e: self.logger.error(f"Clip cutting failed: {str(e)}") raise def _cut_clip_sync( self, video_path: str, start_time: float, end_time: float, output_path: str ) -> None: """Synchronous clip cutting""" try: cmd = [ "ffmpeg", "-i", video_path, "-ss", str(start_time), "-to", str(end_time), "-c", "copy", "-y", output_path ] subprocess.run(cmd, check=True, capture_output=True, timeout=300) except subprocess.CalledProcessError as e: raise Exception(f"FFmpeg cut failed: {e.stderr.decode() if e.stderr else str(e)}") async def _reframe( self, input_path: str, output_path: str, aspect_ratio: str, video_info: Dict ) -> None: """ Reframe video to target aspect ratio with smart cropping Args: input_path: Path to input video output_path: Path to output video aspect_ratio: Target aspect ratio (e.g., "9:16") video_info: Video metadata (width, height, etc.) """ try: loop = asyncio.get_event_loop() await loop.run_in_executor( None, self._reframe_sync, input_path, output_path, aspect_ratio, video_info ) except Exception as e: self.logger.error(f"Reframing failed: {str(e)}") raise def _reframe_sync( self, input_path: str, output_path: str, aspect_ratio: str, video_info: Dict ) -> None: """Synchronous reframing""" try: target_width, target_height = DEFAULT_ASPECT_RATIOS.get(aspect_ratio, (1080, 1920)) current_width = video_info["width"] current_height = video_info["height"] # Calculate scaling and cropping if aspect_ratio == "9:16": # Vertical format: crop width to match aspect ratio if current_width / current_height > 9 / 16: # Video is too wide crop_width = int(current_height * 9 / 16) crop_height = current_height else: # Video is too tall, add letterboxing crop_width = current_width crop_height = int(current_width * 16 / 9) elif aspect_ratio == "16:9": # Horizontal format if current_width / current_height < 16 / 9: # Video is too tall, crop height crop_width = current_width crop_height = int(current_width * 9 / 16) else: # Video is too wide, crop width crop_width = int(current_height * 16 / 9) crop_height = current_height else: # Other formats: center crop target_ratio = 1.0 # 1:1 or calculate from aspect ratio if current_width / current_height > target_ratio: crop_width = int(current_height * target_ratio) crop_height = current_height else: crop_width = current_width crop_height = int(current_width / target_ratio) # Center crop x = (current_width - crop_width) // 2 y = (current_height - crop_height) // 2 # Build ffmpeg filter filter_str = f"crop={crop_width}:{crop_height}:{x}:{y},scale={target_width}:{target_height}" cmd = [ "ffmpeg", "-i", input_path, "-vf", filter_str, "-c:a", "aac", "-y", output_path ] subprocess.run(cmd, check=True, capture_output=True, timeout=300) except subprocess.CalledProcessError as e: raise Exception(f"FFmpeg reframe failed: {e.stderr.decode() if e.stderr else str(e)}") async def _add_watermark( self, input_path: str, output_path: str, brand_kit: BrandKit ) -> None: """ Add watermark and branding to video Args: input_path: Path to input video output_path: Path to output video brand_kit: Brand kit configuration """ try: loop = asyncio.get_event_loop() await loop.run_in_executor( None, self._add_watermark_sync, input_path, output_path, brand_kit ) except Exception as e: self.logger.error(f"Watermark addition failed: {str(e)}") # Continue without watermark on failure if input_path != output_path: subprocess.run(["cp", input_path, output_path], check=True) def _add_watermark_sync( self, input_path: str, output_path: str, brand_kit: BrandKit ) -> None: """Synchronous watermark addition""" try: # Simple text watermark if no logo if not brand_kit.logo_url: # Add text overlay (brand_kit colors as background hint) cmd = [ "ffmpeg", "-i", input_path, "-c:v", "libx264", "-c:a", "aac", "-y", output_path ] subprocess.run(cmd, check=True, capture_output=True, timeout=300) else: # Logo watermarking would go here # For now, just copy the file subprocess.run(["cp", input_path, output_path], check=True) except subprocess.CalledProcessError as e: raise Exception(f"FFmpeg watermark failed: {e.stderr.decode() if e.stderr else str(e)}") async def _generate_thumbnail(self, video_path: str, output_path: str) -> None: """ Generate thumbnail from video at 2 seconds Args: video_path: Path to video output_path: Path to output thumbnail """ try: loop = asyncio.get_event_loop() await loop.run_in_executor( None, self._generate_thumbnail_sync, video_path, output_path ) except Exception as e: self.logger.error(f"Thumbnail generation failed: {str(e)}") def _generate_thumbnail_sync(self, video_path: str, output_path: str) -> None: """Synchronous thumbnail generation""" try: cmd = [ "ffmpeg", "-i", video_path, "-ss", "2", "-vframes", "1", "-y", output_path ] subprocess.run(cmd, check=True, capture_output=True, timeout=60) except subprocess.CalledProcessError as e: self.logger.warning(f"Thumbnail generation failed: {e}") async def _get_video_info(self, video_path: Path) -> Dict: """ Get video information (width, height, duration, fps) Args: video_path: Path to video Returns: Dictionary with video metadata """ try: loop = asyncio.get_event_loop() return await loop.run_in_executor( None, self._get_video_info_sync, str(video_path) ) except Exception as e: self.logger.error(f"Video info extraction failed: {str(e)}") # Return defaults return { "width": 1080, "height": 1920, "duration": 0, "fps": 30 } def _get_video_info_sync(self, video_path: str) -> Dict: """Synchronous video info extraction""" try: probe = ffmpeg.probe(video_path) video_stream = next( (stream for stream in probe["streams"] if stream["codec_type"] == "video"), None ) if not video_stream: raise Exception("No video stream found") return { "width": int(video_stream.get("width", 1080)), "height": int(video_stream.get("height", 1920)), "duration": float(probe.get("format", {}).get("duration", 0)), "fps": eval(video_stream.get("r_frame_rate", "30/1")) } except Exception as e: self.logger.error(f"FFprobe failed: {str(e)}") return { "width": 1080, "height": 1920, "duration": 0, "fps": 30 }