def _pos_expr(pos: str): pos = (pos or "top-right").lower() if pos == "top-left": return "10:10" if pos == "top-right": return "W-w-10:10" if pos == "bottom-left": return "10:H-h-10" if pos == "bottom-right": return "W-w-10:H-h-10" if pos == "bottom-center": return "(W-w)/2:H-h-20" return "(W-w)/2:(H-h)/2" def _drawtext(text: str, pos: str): # keep escaping minimal (CPU-friendly); avoid fancy fonts t = (text or "").replace(":", r"\:").replace("'", r"\'") p = (pos or "bottom-center").lower() if p == "top-left": x, y = "10", "10" elif p == "top-right": x, y = "w-tw-10", "10" elif p == "bottom-left": x, y = "10", "h-th-10" elif p == "bottom-right": x, y = "w-tw-10", "h-th-10" elif p == "center": x, y = "(w-tw)/2", "(h-th)/2" else: # bottom-center x, y = "(w-tw)/2", "h-th-20" return f"drawtext=text='{t}':x={x}:y={y}:fontsize=36:fontcolor=white:box=1:boxcolor=black@0.5" def build_ffmpeg_cmd(local_files: list[str], plan: dict) -> str: """ Returns a one-line ffmpeg command WITHOUT output path. Output path is forced by the server. Convention: local_files[0] is the main input -> input/ """ main = local_files[0] # input/ cmd = ["ffmpeg", "-i", main] # Extra inputs: logo + audio mix tracks logo = plan.get("logo_overlay") audio_mix = plan.get("audio_mix") or [] extra_inputs = [] if logo and logo.get("file"): extra_inputs.append(logo["file"]) for t in audio_mix: f = t.get("file") if f: extra_inputs.append(f) # unique but keep order seen = set() uniq_extra = [] for f in extra_inputs: if f != main and f not in seen: seen.add(f) uniq_extra.append(f) for f in uniq_extra: cmd += ["-i", f] # Simple trim controls (input-level) trim = plan.get("trim") if trim and trim.get("start") is not None: cmd += ["-ss", str(trim["start"])] if trim and trim.get("end") is not None: cmd += ["-to", str(trim["end"])] # Build filters vf = [] af = [] effects = plan.get("effects") or {} vf += effects.get("video_filters") or [] af += effects.get("audio_filters") or [] if plan.get("resize"): vf.append(f"scale={plan['resize']}") if plan.get("crop"): vf.append(f"crop={plan['crop']}") if plan.get("fps"): vf.append(f"fps={int(plan['fps'])}") subs = plan.get("subtitles") if subs and subs.get("file"): vf.append(f"subtitles={subs['file']}") # Text overlays for o in (plan.get("text_overlays") or []): vf.append(_drawtext(o.get("text", ""), o.get("pos", "bottom-center"))) # Now decide if we need filter_complex: # - if logo overlay exists # - if audio mix exists # - if we have any vf/af but need mapping explicitly needs_complex = bool(logo and logo.get("file")) or bool(audio_mix) if not needs_complex: # Simple case: use -vf/-af if vf: cmd += ["-vf", ",".join(vf)] if af: cmd += ["-af", ",".join(af)] # Output type decision out_ext = (plan.get("output_ext") or "mp4").lower() if out_ext in ("mp3", "wav"): if out_ext == "mp3": cmd += ["-vn", "-c:a", "libmp3lame", "-b:a", "192k"] else: cmd += ["-vn", "-c:a", "pcm_s16le"] elif out_ext == "gif": cmd += ["-vf", "fps=12,scale=640:-1:flags=lanczos"] else: cmd += ["-c:v", "libx264", "-preset", "veryfast", "-crf", "23", "-c:a", "aac", "-b:a", "192k"] return " ".join(cmd) # filter_complex path fc = [] v_in = "[0:v]" a_in = "[0:a]" # apply base vf if vf: fc.append(f"{v_in}{','.join(vf)}[v0]") v_in = "[v0]" # logo overlay: logo is input index 1 if present (but audio tracks may also be added) # we added logo first in uniq_extra if present, so it should be input 1 when exists current_v = v_in next_label = 1 if logo and logo.get("file"): logo_idx = 1 # by construction ov = f"[{logo_idx}:v]" scale = (logo.get("scale") or "").strip() if scale: fc.append(f"{ov}scale={scale}[lg]") ov = "[lg]" opacity = logo.get("opacity") if opacity is not None: # apply alpha via colorchannelmixer if image supports alpha; still okay for most PNGs fc.append(f"{ov}colorchannelmixer=aa={float(opacity)}[lga]") ov = "[lga]" xy = _pos_expr(logo.get("pos", "top-right")) fc.append(f"{current_v}{ov}overlay={xy}[v1]") current_v = "[v1]" # audio mix: base + N tracks # audio tracks start after logo if logo exists; otherwise start at input 1 audio_start_idx = 2 if (logo and logo.get("file")) else 1 mix_inputs = [] base_audio_label = a_in # apply base af if af: fc.append(f"{base_audio_label}{','.join(af)}[a0]") base_audio_label = "[a0]" mix_inputs.append(base_audio_label) for i, tr in enumerate(audio_mix): idx = audio_start_idx + i vol = tr.get("volume", 0.2) loop = tr.get("loop", False) ain = f"[{idx}:a]" if loop: # loop audio by enabling -stream_loop at input-level is better, # but we keep it simple and let short loops still mix; user can provide longer music. pass fc.append(f"{ain}volume={float(vol)}[am{i}]") mix_inputs.append(f"[am{i}]") if len(mix_inputs) == 1: final_a = mix_inputs[0] else: # amix fc.append(f"{''.join(mix_inputs)}amix=inputs={len(mix_inputs)}:duration=longest:dropout_transition=2[aout]") final_a = "[aout]" cmd += ["-filter_complex", ";".join(fc)] cmd += ["-map", current_v, "-map", final_a] out_ext = (plan.get("output_ext") or "mp4").lower() if out_ext in ("mp3", "wav"): if out_ext == "mp3": cmd += ["-vn", "-c:a", "libmp3lame", "-b:a", "192k"] else: cmd += ["-vn", "-c:a", "pcm_s16le"] elif out_ext == "gif": cmd += ["-map", current_v, "-vf", "fps=12,scale=640:-1:flags=lanczos"] else: cmd += ["-c:v", "libx264", "-preset", "veryfast", "-crf", "23", "-c:a", "aac", "-b:a", "192k"] return " ".join(cmd)