clipon / services /video_editor.py
yonagush
Launch Clipon — AI-powered viral clip generator
edb7d3e
"""
Video editing using ffmpeg - clips, crops, reframes for social media
"""
import asyncio
import logging
import subprocess
from pathlib import Path
from typing import Dict, List, Tuple
import ffmpeg
from config import DEFAULT_ASPECT_RATIOS, settings
from models.schemas import BrandKit, Clip, ProcessRequest
logger = logging.getLogger(__name__)
class VideoEditor:
"""Edit and reframe videos for social media"""
def __init__(self):
"""Initialize video editor"""
self.logger = logger
async def process_clips(
self,
video_path: Path,
clips: List[Clip],
request: ProcessRequest
) -> List[Clip]:
"""
Process clips: cut, reframe, and add watermark
Args:
video_path: Path to source video
clips: List of Clip objects with timing
request: Processing request with formatting options
Returns:
Updated clips with output paths
Raises:
Exception: If processing fails
"""
try:
# Get video info
video_info = await self._get_video_info(video_path)
self.logger.info(f"Video info: {video_info}")
loop = asyncio.get_event_loop()
for i, clip in enumerate(clips):
try:
# Create output directory for this clip
clip_output_dir = settings.output_dir / clip.job_id / str(i)
clip_output_dir.mkdir(parents=True, exist_ok=True)
# Step 1: Cut the clip
cut_path = clip_output_dir / "01_cut.mp4"
await self._cut_clip(str(video_path), clip.start_time, clip.end_time, str(cut_path))
# Step 2: Reframe to target aspect ratio
reframed_path = clip_output_dir / "02_reframed.mp4"
await self._reframe(str(cut_path), str(reframed_path), request.aspect_ratio, video_info)
# Step 3: Add watermark/brand kit
final_path = clip_output_dir / "03_final.mp4"
if request.brand_kit.logo_url or request.brand_kit.watermark_opacity > 0:
await self._add_watermark(str(reframed_path), str(final_path), request.brand_kit)
else:
final_path = reframed_path
# Step 4: Generate thumbnail
thumbnail_path = clip_output_dir / "thumbnail.jpg"
await self._generate_thumbnail(str(final_path), str(thumbnail_path))
# Update clip with output paths
clip.output_path = str(final_path)
clip.thumbnail_path = str(thumbnail_path)
self.logger.info(f"Processed clip {i + 1}/{len(clips)}")
except Exception as e:
self.logger.error(f"Failed to process clip {i}: {str(e)}")
clip.output_path = None
clip.thumbnail_path = None
return clips
except Exception as e:
self.logger.error(f"Video processing failed: {str(e)}")
raise Exception(f"Video processing failed: {str(e)}")
async def _cut_clip(
self,
video_path: str,
start_time: float,
end_time: float,
output_path: str
) -> None:
"""
Cut a clip from video using ffmpeg
Args:
video_path: Path to source video
start_time: Start time in seconds
end_time: End time in seconds
output_path: Path to output clip
"""
try:
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None,
self._cut_clip_sync,
video_path,
start_time,
end_time,
output_path
)
except Exception as e:
self.logger.error(f"Clip cutting failed: {str(e)}")
raise
def _cut_clip_sync(
self,
video_path: str,
start_time: float,
end_time: float,
output_path: str
) -> None:
"""Synchronous clip cutting"""
try:
cmd = [
"ffmpeg",
"-i", video_path,
"-ss", str(start_time),
"-to", str(end_time),
"-c", "copy",
"-y",
output_path
]
subprocess.run(cmd, check=True, capture_output=True, timeout=300)
except subprocess.CalledProcessError as e:
raise Exception(f"FFmpeg cut failed: {e.stderr.decode() if e.stderr else str(e)}")
async def _reframe(
self,
input_path: str,
output_path: str,
aspect_ratio: str,
video_info: Dict
) -> None:
"""
Reframe video to target aspect ratio with smart cropping
Args:
input_path: Path to input video
output_path: Path to output video
aspect_ratio: Target aspect ratio (e.g., "9:16")
video_info: Video metadata (width, height, etc.)
"""
try:
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None,
self._reframe_sync,
input_path,
output_path,
aspect_ratio,
video_info
)
except Exception as e:
self.logger.error(f"Reframing failed: {str(e)}")
raise
def _reframe_sync(
self,
input_path: str,
output_path: str,
aspect_ratio: str,
video_info: Dict
) -> None:
"""Synchronous reframing"""
try:
target_width, target_height = DEFAULT_ASPECT_RATIOS.get(aspect_ratio, (1080, 1920))
current_width = video_info["width"]
current_height = video_info["height"]
# Calculate scaling and cropping
if aspect_ratio == "9:16":
# Vertical format: crop width to match aspect ratio
if current_width / current_height > 9 / 16:
# Video is too wide
crop_width = int(current_height * 9 / 16)
crop_height = current_height
else:
# Video is too tall, add letterboxing
crop_width = current_width
crop_height = int(current_width * 16 / 9)
elif aspect_ratio == "16:9":
# Horizontal format
if current_width / current_height < 16 / 9:
# Video is too tall, crop height
crop_width = current_width
crop_height = int(current_width * 9 / 16)
else:
# Video is too wide, crop width
crop_width = int(current_height * 16 / 9)
crop_height = current_height
else:
# Other formats: center crop
target_ratio = 1.0 # 1:1 or calculate from aspect ratio
if current_width / current_height > target_ratio:
crop_width = int(current_height * target_ratio)
crop_height = current_height
else:
crop_width = current_width
crop_height = int(current_width / target_ratio)
# Center crop
x = (current_width - crop_width) // 2
y = (current_height - crop_height) // 2
# Build ffmpeg filter
filter_str = f"crop={crop_width}:{crop_height}:{x}:{y},scale={target_width}:{target_height}"
cmd = [
"ffmpeg",
"-i", input_path,
"-vf", filter_str,
"-c:a", "aac",
"-y",
output_path
]
subprocess.run(cmd, check=True, capture_output=True, timeout=300)
except subprocess.CalledProcessError as e:
raise Exception(f"FFmpeg reframe failed: {e.stderr.decode() if e.stderr else str(e)}")
async def _add_watermark(
self,
input_path: str,
output_path: str,
brand_kit: BrandKit
) -> None:
"""
Add watermark and branding to video
Args:
input_path: Path to input video
output_path: Path to output video
brand_kit: Brand kit configuration
"""
try:
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None,
self._add_watermark_sync,
input_path,
output_path,
brand_kit
)
except Exception as e:
self.logger.error(f"Watermark addition failed: {str(e)}")
# Continue without watermark on failure
if input_path != output_path:
subprocess.run(["cp", input_path, output_path], check=True)
def _add_watermark_sync(
self,
input_path: str,
output_path: str,
brand_kit: BrandKit
) -> None:
"""Synchronous watermark addition"""
try:
# Simple text watermark if no logo
if not brand_kit.logo_url:
# Add text overlay (brand_kit colors as background hint)
cmd = [
"ffmpeg",
"-i", input_path,
"-c:v", "libx264",
"-c:a", "aac",
"-y",
output_path
]
subprocess.run(cmd, check=True, capture_output=True, timeout=300)
else:
# Logo watermarking would go here
# For now, just copy the file
subprocess.run(["cp", input_path, output_path], check=True)
except subprocess.CalledProcessError as e:
raise Exception(f"FFmpeg watermark failed: {e.stderr.decode() if e.stderr else str(e)}")
async def _generate_thumbnail(self, video_path: str, output_path: str) -> None:
"""
Generate thumbnail from video at 2 seconds
Args:
video_path: Path to video
output_path: Path to output thumbnail
"""
try:
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None,
self._generate_thumbnail_sync,
video_path,
output_path
)
except Exception as e:
self.logger.error(f"Thumbnail generation failed: {str(e)}")
def _generate_thumbnail_sync(self, video_path: str, output_path: str) -> None:
"""Synchronous thumbnail generation"""
try:
cmd = [
"ffmpeg",
"-i", video_path,
"-ss", "2",
"-vframes", "1",
"-y",
output_path
]
subprocess.run(cmd, check=True, capture_output=True, timeout=60)
except subprocess.CalledProcessError as e:
self.logger.warning(f"Thumbnail generation failed: {e}")
async def _get_video_info(self, video_path: Path) -> Dict:
"""
Get video information (width, height, duration, fps)
Args:
video_path: Path to video
Returns:
Dictionary with video metadata
"""
try:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
self._get_video_info_sync,
str(video_path)
)
except Exception as e:
self.logger.error(f"Video info extraction failed: {str(e)}")
# Return defaults
return {
"width": 1080,
"height": 1920,
"duration": 0,
"fps": 30
}
def _get_video_info_sync(self, video_path: str) -> Dict:
"""Synchronous video info extraction"""
try:
probe = ffmpeg.probe(video_path)
video_stream = next(
(stream for stream in probe["streams"] if stream["codec_type"] == "video"),
None
)
if not video_stream:
raise Exception("No video stream found")
return {
"width": int(video_stream.get("width", 1080)),
"height": int(video_stream.get("height", 1920)),
"duration": float(probe.get("format", {}).get("duration", 0)),
"fps": eval(video_stream.get("r_frame_rate", "30/1"))
}
except Exception as e:
self.logger.error(f"FFprobe failed: {str(e)}")
return {
"width": 1080,
"height": 1920,
"duration": 0,
"fps": 30
}