acd23's picture
Upload src/utils/ffmpeg.py
ca102b9 verified
"""
FFmpeg Command Builder -- AI Reel Creator Platform
==================================================
Type-safe programmatic FFmpeg filter_complex builder for:
* Video trimming & concatenation
* Transitions (cut, cross-dissolve, flash-white, zoom-in, slide)
* Visual filter presets (cinematic, sporty, elegant, technical)
* Text overlay / drawtext with brand font/colour/position
* Ken Burns zoompan
* Audio mixing & ducking
"""
import subprocess
from typing import List, Dict, Optional, Any
from dataclasses import dataclass
@dataclass
class TransitionSpec:
type: str
duration_seconds: float = 0.5
@dataclass
class FilterPresetSpec:
name: str
saturation: float = 1.0
contrast: float = 1.0
brightness: float = 1.0
vignette: bool = False
warm_grade: float = 0.0
cool_tone: float = 0.0
sharpness: float = 0.0
@dataclass
class DrawTextSpec:
text: str
fontfile: str = "Arial"
fontcolor: str = "#FFFFFF"
fontsize: int = 48
x_expr: str = "(w-text_w)/2"
y_expr: str = "h*0.85"
fade_in: float = 0.3
fade_out: float = 0.3
fade_out_before_end: float = 0.3
class FFmpegBuilder:
"""Builds FFmpeg commands safely via an internal list of flags."""
def __init__(self, ffmpeg_bin: str = "ffmpeg"):
self.ffmpeg = ffmpeg_bin
self.inputs: List[List[str]] = []
self.outputs: List[Dict[str, Any]] = []
self.filter_complex: List[str] = []
self.global_options: List[str] = []
def add_input(self, path: str, seek: Optional[float] = None, duration: Optional[float] = None,
loop: bool = False, stream_loop: Optional[int] = None) -> None:
flags = []
if loop:
flags.extend(["-loop", "1"])
if stream_loop is not None:
flags.extend(["-stream_loop", str(stream_loop)])
if seek is not None:
flags.extend(["-ss", str(seek)])
if duration is not None:
flags.extend(["-t", str(duration)])
flags.extend(["-i", path])
self.inputs.append(flags)
def add_filter_complex(self, filter_string: str) -> None:
self.filter_complex.append(filter_string)
def concat_filter(self, segment_count: int) -> str:
return f"concat=n={segment_count}:v=1:a=1[outv][outa]"
def crossfade_filter(self, stream_a: str, stream_b: str, duration: float, offset: float) -> str:
return f"[{stream_a}][{stream_b}]xfade=transition=fade:duration={duration}:offset={offset}[xf{stream_a}{stream_b}]"
def drawtext_filter(self, spec: DrawTextSpec, start: float, end: float) -> str:
hex_col = spec.fontcolor.lstrip("#")
ffmpeg_col = f"0x{hex_col}@0xFF"
filter_str = (
f"drawtext=fontfile={spec.fontfile}:text='{spec.text}':"
f"fontcolor={ffmpeg_col}:fontsize={spec.fontsize}:"
f"x={spec.x_expr}:y={spec.y_expr}:enable='between(t\\,{start}\\,{end})'"
)
return filter_str
def eq_filter(self, spec: FilterPresetSpec) -> str:
parts = [f"eq=saturation={spec.saturation}:contrast={spec.contrast}:brightness={spec.brightness}"]
if spec.sharpness > 0:
parts.append(f"unsharp=3:3:{round(spec.sharpness, 2)}")
if spec.vignette:
parts.append("vignette=PI/4")
if spec.warm_grade > 0:
parts.append(f"colorbalance=rs={spec.warm_grade}")
if spec.cool_tone > 0:
parts.append(f"colorbalance=rs=-{spec.cool_tone}")
return ",".join(parts)
def ken_burns_filter(self, duration_s: float, zoom_start: float = 1.0, zoom_end: float = 1.15) -> str:
frames = int(duration_s * 30)
return (f"zoompan=z='if(lte(on\\,1)\\,{zoom_start}\\,{zoom_end})':"
f"d={frames}:s=1080x1920:fps=30,format=yuv420p")
def add_output(self, path: str, video_codec: str = "libx264", audio_codec: Optional[str] = "aac",
video_bitrate: Optional[str] = None, audio_bitrate: str = "192k",
crf: Optional[int] = None, preset: str = "slow", pix_fmt: str = "yuv420p",
copy_streams: bool = False) -> None:
out: Dict[str, Any] = {"path": path}
if copy_streams:
out["copy"] = True
else:
out["vcodec"] = video_codec
out["acodec"] = audio_codec
out["preset"] = preset
out["pix_fmt"] = pix_fmt
if crf is not None:
out["crf"] = crf
if video_bitrate:
out["vb"] = video_bitrate
if audio_bitrate:
out["ab"] = audio_bitrate
self.outputs.append(out)
def build(self) -> str:
parts = [self.ffmpeg]
for inp in self.inputs:
parts.extend(inp)
if self.filter_complex:
parts.extend(["-filter_complex", ";".join(self.filter_complex)])
parts.extend(self.global_options)
for out in self.outputs:
if out.get("copy"):
parts.extend(["-c", "copy"])
else:
parts.extend(["-c:v", out["vcodec"]])
if out.get("acodec"):
parts.extend(["-c:a", out["acodec"], "-b:a", out["ab"]])
else:
parts.extend(["-an"])
parts.extend(["-preset", out["preset"], "-pix_fmt", out["pix_fmt"]])
if "crf" in out:
parts.extend(["-crf", str(out["crf"])])
if "vb" in out:
parts.extend(["-b:v", out["vb"]])
parts.append(out["path"])
parts.append("-y")
return " ".join(parts)
def run(self) -> subprocess.CompletedProcess:
cmd = self.build()
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"FFmpeg failed:\nSTDERR: {result.stderr}\nCMD: {cmd}")
return result