""" FFmpeg Command Builder -- AI Reel Creator Platform ================================================== Type-safe programmatic FFmpeg filter_complex builder for: * Video trimming & concatenation * Transitions (cut, cross-dissolve, flash-white, zoom-in, slide) * Visual filter presets (cinematic, sporty, elegant, technical) * Text overlay / drawtext with brand font/colour/position * Ken Burns zoompan * Audio mixing & ducking """ import subprocess from typing import List, Dict, Optional, Any from dataclasses import dataclass @dataclass class TransitionSpec: type: str duration_seconds: float = 0.5 @dataclass class FilterPresetSpec: name: str saturation: float = 1.0 contrast: float = 1.0 brightness: float = 1.0 vignette: bool = False warm_grade: float = 0.0 cool_tone: float = 0.0 sharpness: float = 0.0 @dataclass class DrawTextSpec: text: str fontfile: str = "Arial" fontcolor: str = "#FFFFFF" fontsize: int = 48 x_expr: str = "(w-text_w)/2" y_expr: str = "h*0.85" fade_in: float = 0.3 fade_out: float = 0.3 fade_out_before_end: float = 0.3 class FFmpegBuilder: """Builds FFmpeg commands safely via an internal list of flags.""" def __init__(self, ffmpeg_bin: str = "ffmpeg"): self.ffmpeg = ffmpeg_bin self.inputs: List[List[str]] = [] self.outputs: List[Dict[str, Any]] = [] self.filter_complex: List[str] = [] self.global_options: List[str] = [] def add_input(self, path: str, seek: Optional[float] = None, duration: Optional[float] = None, loop: bool = False, stream_loop: Optional[int] = None) -> None: flags = [] if loop: flags.extend(["-loop", "1"]) if stream_loop is not None: flags.extend(["-stream_loop", str(stream_loop)]) if seek is not None: flags.extend(["-ss", str(seek)]) if duration is not None: flags.extend(["-t", str(duration)]) flags.extend(["-i", path]) self.inputs.append(flags) def add_filter_complex(self, filter_string: str) -> None: self.filter_complex.append(filter_string) def concat_filter(self, segment_count: int) -> str: return f"concat=n={segment_count}:v=1:a=1[outv][outa]" def crossfade_filter(self, stream_a: str, stream_b: str, duration: float, offset: float) -> str: return f"[{stream_a}][{stream_b}]xfade=transition=fade:duration={duration}:offset={offset}[xf{stream_a}{stream_b}]" def drawtext_filter(self, spec: DrawTextSpec, start: float, end: float) -> str: hex_col = spec.fontcolor.lstrip("#") ffmpeg_col = f"0x{hex_col}@0xFF" filter_str = ( f"drawtext=fontfile={spec.fontfile}:text='{spec.text}':" f"fontcolor={ffmpeg_col}:fontsize={spec.fontsize}:" f"x={spec.x_expr}:y={spec.y_expr}:enable='between(t\\,{start}\\,{end})'" ) return filter_str def eq_filter(self, spec: FilterPresetSpec) -> str: parts = [f"eq=saturation={spec.saturation}:contrast={spec.contrast}:brightness={spec.brightness}"] if spec.sharpness > 0: parts.append(f"unsharp=3:3:{round(spec.sharpness, 2)}") if spec.vignette: parts.append("vignette=PI/4") if spec.warm_grade > 0: parts.append(f"colorbalance=rs={spec.warm_grade}") if spec.cool_tone > 0: parts.append(f"colorbalance=rs=-{spec.cool_tone}") return ",".join(parts) def ken_burns_filter(self, duration_s: float, zoom_start: float = 1.0, zoom_end: float = 1.15) -> str: frames = int(duration_s * 30) return (f"zoompan=z='if(lte(on\\,1)\\,{zoom_start}\\,{zoom_end})':" f"d={frames}:s=1080x1920:fps=30,format=yuv420p") def add_output(self, path: str, video_codec: str = "libx264", audio_codec: Optional[str] = "aac", video_bitrate: Optional[str] = None, audio_bitrate: str = "192k", crf: Optional[int] = None, preset: str = "slow", pix_fmt: str = "yuv420p", copy_streams: bool = False) -> None: out: Dict[str, Any] = {"path": path} if copy_streams: out["copy"] = True else: out["vcodec"] = video_codec out["acodec"] = audio_codec out["preset"] = preset out["pix_fmt"] = pix_fmt if crf is not None: out["crf"] = crf if video_bitrate: out["vb"] = video_bitrate if audio_bitrate: out["ab"] = audio_bitrate self.outputs.append(out) def build(self) -> str: parts = [self.ffmpeg] for inp in self.inputs: parts.extend(inp) if self.filter_complex: parts.extend(["-filter_complex", ";".join(self.filter_complex)]) parts.extend(self.global_options) for out in self.outputs: if out.get("copy"): parts.extend(["-c", "copy"]) else: parts.extend(["-c:v", out["vcodec"]]) if out.get("acodec"): parts.extend(["-c:a", out["acodec"], "-b:a", out["ab"]]) else: parts.extend(["-an"]) parts.extend(["-preset", out["preset"], "-pix_fmt", out["pix_fmt"]]) if "crf" in out: parts.extend(["-crf", str(out["crf"])]) if "vb" in out: parts.extend(["-b:v", out["vb"]]) parts.append(out["path"]) parts.append("-y") return " ".join(parts) def run(self) -> subprocess.CompletedProcess: cmd = self.build() result = subprocess.run(cmd, shell=True, capture_output=True, text=True) if result.returncode != 0: raise RuntimeError(f"FFmpeg failed:\nSTDERR: {result.stderr}\nCMD: {cmd}") return result