| """ |
| FFmpeg Command Builder -- AI Reel Creator Platform |
| ================================================== |
| |
| Type-safe programmatic FFmpeg filter_complex builder for: |
| * Video trimming & concatenation |
| * Transitions (cut, cross-dissolve, flash-white, zoom-in, slide) |
| * Visual filter presets (cinematic, sporty, elegant, technical) |
| * Text overlay / drawtext with brand font/colour/position |
| * Ken Burns zoompan |
| * Audio mixing & ducking |
| """ |
|
|
| import subprocess |
| from typing import List, Dict, Optional, Any |
| from dataclasses import dataclass |
|
|
|
|
| @dataclass |
| class TransitionSpec: |
| type: str |
| duration_seconds: float = 0.5 |
|
|
|
|
| @dataclass |
| class FilterPresetSpec: |
| name: str |
| saturation: float = 1.0 |
| contrast: float = 1.0 |
| brightness: float = 1.0 |
| vignette: bool = False |
| warm_grade: float = 0.0 |
| cool_tone: float = 0.0 |
| sharpness: float = 0.0 |
|
|
|
|
| @dataclass |
| class DrawTextSpec: |
| text: str |
| fontfile: str = "Arial" |
| fontcolor: str = "#FFFFFF" |
| fontsize: int = 48 |
| x_expr: str = "(w-text_w)/2" |
| y_expr: str = "h*0.85" |
| fade_in: float = 0.3 |
| fade_out: float = 0.3 |
| fade_out_before_end: float = 0.3 |
|
|
|
|
| class FFmpegBuilder: |
| """Builds FFmpeg commands safely via an internal list of flags.""" |
|
|
| def __init__(self, ffmpeg_bin: str = "ffmpeg"): |
| self.ffmpeg = ffmpeg_bin |
| self.inputs: List[List[str]] = [] |
| self.outputs: List[Dict[str, Any]] = [] |
| self.filter_complex: List[str] = [] |
| self.global_options: List[str] = [] |
|
|
| def add_input(self, path: str, seek: Optional[float] = None, duration: Optional[float] = None, |
| loop: bool = False, stream_loop: Optional[int] = None) -> None: |
| flags = [] |
| if loop: |
| flags.extend(["-loop", "1"]) |
| if stream_loop is not None: |
| flags.extend(["-stream_loop", str(stream_loop)]) |
| if seek is not None: |
| flags.extend(["-ss", str(seek)]) |
| if duration is not None: |
| flags.extend(["-t", str(duration)]) |
| flags.extend(["-i", path]) |
| self.inputs.append(flags) |
|
|
| def add_filter_complex(self, filter_string: str) -> None: |
| self.filter_complex.append(filter_string) |
|
|
| def concat_filter(self, segment_count: int) -> str: |
| return f"concat=n={segment_count}:v=1:a=1[outv][outa]" |
|
|
| def crossfade_filter(self, stream_a: str, stream_b: str, duration: float, offset: float) -> str: |
| return f"[{stream_a}][{stream_b}]xfade=transition=fade:duration={duration}:offset={offset}[xf{stream_a}{stream_b}]" |
|
|
| def drawtext_filter(self, spec: DrawTextSpec, start: float, end: float) -> str: |
| hex_col = spec.fontcolor.lstrip("#") |
| ffmpeg_col = f"0x{hex_col}@0xFF" |
| filter_str = ( |
| f"drawtext=fontfile={spec.fontfile}:text='{spec.text}':" |
| f"fontcolor={ffmpeg_col}:fontsize={spec.fontsize}:" |
| f"x={spec.x_expr}:y={spec.y_expr}:enable='between(t\\,{start}\\,{end})'" |
| ) |
| return filter_str |
|
|
| def eq_filter(self, spec: FilterPresetSpec) -> str: |
| parts = [f"eq=saturation={spec.saturation}:contrast={spec.contrast}:brightness={spec.brightness}"] |
| if spec.sharpness > 0: |
| parts.append(f"unsharp=3:3:{round(spec.sharpness, 2)}") |
| if spec.vignette: |
| parts.append("vignette=PI/4") |
| if spec.warm_grade > 0: |
| parts.append(f"colorbalance=rs={spec.warm_grade}") |
| if spec.cool_tone > 0: |
| parts.append(f"colorbalance=rs=-{spec.cool_tone}") |
| return ",".join(parts) |
|
|
| def ken_burns_filter(self, duration_s: float, zoom_start: float = 1.0, zoom_end: float = 1.15) -> str: |
| frames = int(duration_s * 30) |
| return (f"zoompan=z='if(lte(on\\,1)\\,{zoom_start}\\,{zoom_end})':" |
| f"d={frames}:s=1080x1920:fps=30,format=yuv420p") |
|
|
| def add_output(self, path: str, video_codec: str = "libx264", audio_codec: Optional[str] = "aac", |
| video_bitrate: Optional[str] = None, audio_bitrate: str = "192k", |
| crf: Optional[int] = None, preset: str = "slow", pix_fmt: str = "yuv420p", |
| copy_streams: bool = False) -> None: |
| out: Dict[str, Any] = {"path": path} |
| if copy_streams: |
| out["copy"] = True |
| else: |
| out["vcodec"] = video_codec |
| out["acodec"] = audio_codec |
| out["preset"] = preset |
| out["pix_fmt"] = pix_fmt |
| if crf is not None: |
| out["crf"] = crf |
| if video_bitrate: |
| out["vb"] = video_bitrate |
| if audio_bitrate: |
| out["ab"] = audio_bitrate |
| self.outputs.append(out) |
|
|
| def build(self) -> str: |
| parts = [self.ffmpeg] |
| for inp in self.inputs: |
| parts.extend(inp) |
| if self.filter_complex: |
| parts.extend(["-filter_complex", ";".join(self.filter_complex)]) |
| parts.extend(self.global_options) |
| for out in self.outputs: |
| if out.get("copy"): |
| parts.extend(["-c", "copy"]) |
| else: |
| parts.extend(["-c:v", out["vcodec"]]) |
| if out.get("acodec"): |
| parts.extend(["-c:a", out["acodec"], "-b:a", out["ab"]]) |
| else: |
| parts.extend(["-an"]) |
| parts.extend(["-preset", out["preset"], "-pix_fmt", out["pix_fmt"]]) |
| if "crf" in out: |
| parts.extend(["-crf", str(out["crf"])]) |
| if "vb" in out: |
| parts.extend(["-b:v", out["vb"]]) |
| parts.append(out["path"]) |
| parts.append("-y") |
| return " ".join(parts) |
|
|
| def run(self) -> subprocess.CompletedProcess: |
| cmd = self.build() |
| result = subprocess.run(cmd, shell=True, capture_output=True, text=True) |
| if result.returncode != 0: |
| raise RuntimeError(f"FFmpeg failed:\nSTDERR: {result.stderr}\nCMD: {cmd}") |
| return result |
|
|