| """ |
| Video editing using ffmpeg - clips, crops, reframes for social media |
| """ |
|
|
| import asyncio |
| import logging |
| import subprocess |
| from pathlib import Path |
| from typing import Dict, List, Tuple |
|
|
| import ffmpeg |
|
|
| from config import DEFAULT_ASPECT_RATIOS, settings |
| from models.schemas import BrandKit, Clip, ProcessRequest |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class VideoEditor: |
| """Edit and reframe videos for social media""" |
|
|
| def __init__(self): |
| """Initialize video editor""" |
| self.logger = logger |
|
|
| async def process_clips( |
| self, |
| video_path: Path, |
| clips: List[Clip], |
| request: ProcessRequest |
| ) -> List[Clip]: |
| """ |
| Process clips: cut, reframe, and add watermark |
| |
| Args: |
| video_path: Path to source video |
| clips: List of Clip objects with timing |
| request: Processing request with formatting options |
| |
| Returns: |
| Updated clips with output paths |
| |
| Raises: |
| Exception: If processing fails |
| """ |
| try: |
| |
| video_info = await self._get_video_info(video_path) |
| self.logger.info(f"Video info: {video_info}") |
|
|
| loop = asyncio.get_event_loop() |
|
|
| for i, clip in enumerate(clips): |
| try: |
| |
| clip_output_dir = settings.output_dir / clip.job_id / str(i) |
| clip_output_dir.mkdir(parents=True, exist_ok=True) |
|
|
| |
| cut_path = clip_output_dir / "01_cut.mp4" |
| await self._cut_clip(str(video_path), clip.start_time, clip.end_time, str(cut_path)) |
|
|
| |
| reframed_path = clip_output_dir / "02_reframed.mp4" |
| await self._reframe(str(cut_path), str(reframed_path), request.aspect_ratio, video_info) |
|
|
| |
| final_path = clip_output_dir / "03_final.mp4" |
| if request.brand_kit.logo_url or request.brand_kit.watermark_opacity > 0: |
| await self._add_watermark(str(reframed_path), str(final_path), request.brand_kit) |
| else: |
| final_path = reframed_path |
|
|
| |
| thumbnail_path = clip_output_dir / "thumbnail.jpg" |
| await self._generate_thumbnail(str(final_path), str(thumbnail_path)) |
|
|
| |
| clip.output_path = str(final_path) |
| clip.thumbnail_path = str(thumbnail_path) |
|
|
| self.logger.info(f"Processed clip {i + 1}/{len(clips)}") |
|
|
| except Exception as e: |
| self.logger.error(f"Failed to process clip {i}: {str(e)}") |
| clip.output_path = None |
| clip.thumbnail_path = None |
|
|
| return clips |
|
|
| except Exception as e: |
| self.logger.error(f"Video processing failed: {str(e)}") |
| raise Exception(f"Video processing failed: {str(e)}") |
|
|
| async def _cut_clip( |
| self, |
| video_path: str, |
| start_time: float, |
| end_time: float, |
| output_path: str |
| ) -> None: |
| """ |
| Cut a clip from video using ffmpeg |
| |
| Args: |
| video_path: Path to source video |
| start_time: Start time in seconds |
| end_time: End time in seconds |
| output_path: Path to output clip |
| """ |
| try: |
| loop = asyncio.get_event_loop() |
| await loop.run_in_executor( |
| None, |
| self._cut_clip_sync, |
| video_path, |
| start_time, |
| end_time, |
| output_path |
| ) |
| except Exception as e: |
| self.logger.error(f"Clip cutting failed: {str(e)}") |
| raise |
|
|
| def _cut_clip_sync( |
| self, |
| video_path: str, |
| start_time: float, |
| end_time: float, |
| output_path: str |
| ) -> None: |
| """Synchronous clip cutting""" |
| try: |
| cmd = [ |
| "ffmpeg", |
| "-i", video_path, |
| "-ss", str(start_time), |
| "-to", str(end_time), |
| "-c", "copy", |
| "-y", |
| output_path |
| ] |
| subprocess.run(cmd, check=True, capture_output=True, timeout=300) |
| except subprocess.CalledProcessError as e: |
| raise Exception(f"FFmpeg cut failed: {e.stderr.decode() if e.stderr else str(e)}") |
|
|
| async def _reframe( |
| self, |
| input_path: str, |
| output_path: str, |
| aspect_ratio: str, |
| video_info: Dict |
| ) -> None: |
| """ |
| Reframe video to target aspect ratio with smart cropping |
| |
| Args: |
| input_path: Path to input video |
| output_path: Path to output video |
| aspect_ratio: Target aspect ratio (e.g., "9:16") |
| video_info: Video metadata (width, height, etc.) |
| """ |
| try: |
| loop = asyncio.get_event_loop() |
| await loop.run_in_executor( |
| None, |
| self._reframe_sync, |
| input_path, |
| output_path, |
| aspect_ratio, |
| video_info |
| ) |
| except Exception as e: |
| self.logger.error(f"Reframing failed: {str(e)}") |
| raise |
|
|
| def _reframe_sync( |
| self, |
| input_path: str, |
| output_path: str, |
| aspect_ratio: str, |
| video_info: Dict |
| ) -> None: |
| """Synchronous reframing""" |
| try: |
| target_width, target_height = DEFAULT_ASPECT_RATIOS.get(aspect_ratio, (1080, 1920)) |
| current_width = video_info["width"] |
| current_height = video_info["height"] |
|
|
| |
| if aspect_ratio == "9:16": |
| |
| if current_width / current_height > 9 / 16: |
| |
| crop_width = int(current_height * 9 / 16) |
| crop_height = current_height |
| else: |
| |
| crop_width = current_width |
| crop_height = int(current_width * 16 / 9) |
| elif aspect_ratio == "16:9": |
| |
| if current_width / current_height < 16 / 9: |
| |
| crop_width = current_width |
| crop_height = int(current_width * 9 / 16) |
| else: |
| |
| crop_width = int(current_height * 16 / 9) |
| crop_height = current_height |
| else: |
| |
| target_ratio = 1.0 |
| if current_width / current_height > target_ratio: |
| crop_width = int(current_height * target_ratio) |
| crop_height = current_height |
| else: |
| crop_width = current_width |
| crop_height = int(current_width / target_ratio) |
|
|
| |
| x = (current_width - crop_width) // 2 |
| y = (current_height - crop_height) // 2 |
|
|
| |
| filter_str = f"crop={crop_width}:{crop_height}:{x}:{y},scale={target_width}:{target_height}" |
|
|
| cmd = [ |
| "ffmpeg", |
| "-i", input_path, |
| "-vf", filter_str, |
| "-c:a", "aac", |
| "-y", |
| output_path |
| ] |
| subprocess.run(cmd, check=True, capture_output=True, timeout=300) |
|
|
| except subprocess.CalledProcessError as e: |
| raise Exception(f"FFmpeg reframe failed: {e.stderr.decode() if e.stderr else str(e)}") |
|
|
| async def _add_watermark( |
| self, |
| input_path: str, |
| output_path: str, |
| brand_kit: BrandKit |
| ) -> None: |
| """ |
| Add watermark and branding to video |
| |
| Args: |
| input_path: Path to input video |
| output_path: Path to output video |
| brand_kit: Brand kit configuration |
| """ |
| try: |
| loop = asyncio.get_event_loop() |
| await loop.run_in_executor( |
| None, |
| self._add_watermark_sync, |
| input_path, |
| output_path, |
| brand_kit |
| ) |
| except Exception as e: |
| self.logger.error(f"Watermark addition failed: {str(e)}") |
| |
| if input_path != output_path: |
| subprocess.run(["cp", input_path, output_path], check=True) |
|
|
| def _add_watermark_sync( |
| self, |
| input_path: str, |
| output_path: str, |
| brand_kit: BrandKit |
| ) -> None: |
| """Synchronous watermark addition""" |
| try: |
| |
| if not brand_kit.logo_url: |
| |
| cmd = [ |
| "ffmpeg", |
| "-i", input_path, |
| "-c:v", "libx264", |
| "-c:a", "aac", |
| "-y", |
| output_path |
| ] |
| subprocess.run(cmd, check=True, capture_output=True, timeout=300) |
| else: |
| |
| |
| subprocess.run(["cp", input_path, output_path], check=True) |
|
|
| except subprocess.CalledProcessError as e: |
| raise Exception(f"FFmpeg watermark failed: {e.stderr.decode() if e.stderr else str(e)}") |
|
|
| async def _generate_thumbnail(self, video_path: str, output_path: str) -> None: |
| """ |
| Generate thumbnail from video at 2 seconds |
| |
| Args: |
| video_path: Path to video |
| output_path: Path to output thumbnail |
| """ |
| try: |
| loop = asyncio.get_event_loop() |
| await loop.run_in_executor( |
| None, |
| self._generate_thumbnail_sync, |
| video_path, |
| output_path |
| ) |
| except Exception as e: |
| self.logger.error(f"Thumbnail generation failed: {str(e)}") |
|
|
| def _generate_thumbnail_sync(self, video_path: str, output_path: str) -> None: |
| """Synchronous thumbnail generation""" |
| try: |
| cmd = [ |
| "ffmpeg", |
| "-i", video_path, |
| "-ss", "2", |
| "-vframes", "1", |
| "-y", |
| output_path |
| ] |
| subprocess.run(cmd, check=True, capture_output=True, timeout=60) |
| except subprocess.CalledProcessError as e: |
| self.logger.warning(f"Thumbnail generation failed: {e}") |
|
|
| async def _get_video_info(self, video_path: Path) -> Dict: |
| """ |
| Get video information (width, height, duration, fps) |
| |
| Args: |
| video_path: Path to video |
| |
| Returns: |
| Dictionary with video metadata |
| """ |
| try: |
| loop = asyncio.get_event_loop() |
| return await loop.run_in_executor( |
| None, |
| self._get_video_info_sync, |
| str(video_path) |
| ) |
| except Exception as e: |
| self.logger.error(f"Video info extraction failed: {str(e)}") |
| |
| return { |
| "width": 1080, |
| "height": 1920, |
| "duration": 0, |
| "fps": 30 |
| } |
|
|
| def _get_video_info_sync(self, video_path: str) -> Dict: |
| """Synchronous video info extraction""" |
| try: |
| probe = ffmpeg.probe(video_path) |
| video_stream = next( |
| (stream for stream in probe["streams"] if stream["codec_type"] == "video"), |
| None |
| ) |
|
|
| if not video_stream: |
| raise Exception("No video stream found") |
|
|
| return { |
| "width": int(video_stream.get("width", 1080)), |
| "height": int(video_stream.get("height", 1920)), |
| "duration": float(probe.get("format", {}).get("duration", 0)), |
| "fps": eval(video_stream.get("r_frame_rate", "30/1")) |
| } |
| except Exception as e: |
| self.logger.error(f"FFprobe failed: {str(e)}") |
| return { |
| "width": 1080, |
| "height": 1920, |
| "duration": 0, |
| "fps": 30 |
| } |
|
|