Spaces:
Sleeping
Sleeping
| def _pos_expr(pos: str): | |
| pos = (pos or "top-right").lower() | |
| if pos == "top-left": | |
| return "10:10" | |
| if pos == "top-right": | |
| return "W-w-10:10" | |
| if pos == "bottom-left": | |
| return "10:H-h-10" | |
| if pos == "bottom-right": | |
| return "W-w-10:H-h-10" | |
| if pos == "bottom-center": | |
| return "(W-w)/2:H-h-20" | |
| return "(W-w)/2:(H-h)/2" | |
| def _drawtext(text: str, pos: str): | |
| # keep escaping minimal (CPU-friendly); avoid fancy fonts | |
| t = (text or "").replace(":", r"\:").replace("'", r"\'") | |
| p = (pos or "bottom-center").lower() | |
| if p == "top-left": | |
| x, y = "10", "10" | |
| elif p == "top-right": | |
| x, y = "w-tw-10", "10" | |
| elif p == "bottom-left": | |
| x, y = "10", "h-th-10" | |
| elif p == "bottom-right": | |
| x, y = "w-tw-10", "h-th-10" | |
| elif p == "center": | |
| x, y = "(w-tw)/2", "(h-th)/2" | |
| else: # bottom-center | |
| x, y = "(w-tw)/2", "h-th-20" | |
| return f"drawtext=text='{t}':x={x}:y={y}:fontsize=36:fontcolor=white:box=1:boxcolor=black@0.5" | |
| def build_ffmpeg_cmd(local_files: list[str], plan: dict) -> str: | |
| """ | |
| Returns a one-line ffmpeg command WITHOUT output path. | |
| Output path is forced by the server. | |
| Convention: local_files[0] is the main input -> input/<downloaded name> | |
| """ | |
| main = local_files[0] # input/<name> | |
| cmd = ["ffmpeg", "-i", main] | |
| # Extra inputs: logo + audio mix tracks | |
| logo = plan.get("logo_overlay") | |
| audio_mix = plan.get("audio_mix") or [] | |
| extra_inputs = [] | |
| if logo and logo.get("file"): | |
| extra_inputs.append(logo["file"]) | |
| for t in audio_mix: | |
| f = t.get("file") | |
| if f: | |
| extra_inputs.append(f) | |
| # unique but keep order | |
| seen = set() | |
| uniq_extra = [] | |
| for f in extra_inputs: | |
| if f != main and f not in seen: | |
| seen.add(f) | |
| uniq_extra.append(f) | |
| for f in uniq_extra: | |
| cmd += ["-i", f] | |
| # Simple trim controls (input-level) | |
| trim = plan.get("trim") | |
| if trim and trim.get("start") is not None: | |
| cmd += ["-ss", str(trim["start"])] | |
| if trim and trim.get("end") is not None: | |
| cmd += ["-to", str(trim["end"])] | |
| # Build filters | |
| vf = [] | |
| af = [] | |
| effects = plan.get("effects") or {} | |
| vf += effects.get("video_filters") or [] | |
| af += effects.get("audio_filters") or [] | |
| if plan.get("resize"): | |
| vf.append(f"scale={plan['resize']}") | |
| if plan.get("crop"): | |
| vf.append(f"crop={plan['crop']}") | |
| if plan.get("fps"): | |
| vf.append(f"fps={int(plan['fps'])}") | |
| subs = plan.get("subtitles") | |
| if subs and subs.get("file"): | |
| vf.append(f"subtitles={subs['file']}") | |
| # Text overlays | |
| for o in (plan.get("text_overlays") or []): | |
| vf.append(_drawtext(o.get("text", ""), o.get("pos", "bottom-center"))) | |
| # Now decide if we need filter_complex: | |
| # - if logo overlay exists | |
| # - if audio mix exists | |
| # - if we have any vf/af but need mapping explicitly | |
| needs_complex = bool(logo and logo.get("file")) or bool(audio_mix) | |
| if not needs_complex: | |
| # Simple case: use -vf/-af | |
| if vf: | |
| cmd += ["-vf", ",".join(vf)] | |
| if af: | |
| cmd += ["-af", ",".join(af)] | |
| # Output type decision | |
| out_ext = (plan.get("output_ext") or "mp4").lower() | |
| if out_ext in ("mp3", "wav"): | |
| if out_ext == "mp3": | |
| cmd += ["-vn", "-c:a", "libmp3lame", "-b:a", "192k"] | |
| else: | |
| cmd += ["-vn", "-c:a", "pcm_s16le"] | |
| elif out_ext == "gif": | |
| cmd += ["-vf", "fps=12,scale=640:-1:flags=lanczos"] | |
| else: | |
| cmd += ["-c:v", "libx264", "-preset", "veryfast", "-crf", "23", "-c:a", "aac", "-b:a", "192k"] | |
| return " ".join(cmd) | |
| # filter_complex path | |
| fc = [] | |
| v_in = "[0:v]" | |
| a_in = "[0:a]" | |
| # apply base vf | |
| if vf: | |
| fc.append(f"{v_in}{','.join(vf)}[v0]") | |
| v_in = "[v0]" | |
| # logo overlay: logo is input index 1 if present (but audio tracks may also be added) | |
| # we added logo first in uniq_extra if present, so it should be input 1 when exists | |
| current_v = v_in | |
| next_label = 1 | |
| if logo and logo.get("file"): | |
| logo_idx = 1 # by construction | |
| ov = f"[{logo_idx}:v]" | |
| scale = (logo.get("scale") or "").strip() | |
| if scale: | |
| fc.append(f"{ov}scale={scale}[lg]") | |
| ov = "[lg]" | |
| opacity = logo.get("opacity") | |
| if opacity is not None: | |
| # apply alpha via colorchannelmixer if image supports alpha; still okay for most PNGs | |
| fc.append(f"{ov}colorchannelmixer=aa={float(opacity)}[lga]") | |
| ov = "[lga]" | |
| xy = _pos_expr(logo.get("pos", "top-right")) | |
| fc.append(f"{current_v}{ov}overlay={xy}[v1]") | |
| current_v = "[v1]" | |
| # audio mix: base + N tracks | |
| # audio tracks start after logo if logo exists; otherwise start at input 1 | |
| audio_start_idx = 2 if (logo and logo.get("file")) else 1 | |
| mix_inputs = [] | |
| base_audio_label = a_in | |
| # apply base af | |
| if af: | |
| fc.append(f"{base_audio_label}{','.join(af)}[a0]") | |
| base_audio_label = "[a0]" | |
| mix_inputs.append(base_audio_label) | |
| for i, tr in enumerate(audio_mix): | |
| idx = audio_start_idx + i | |
| vol = tr.get("volume", 0.2) | |
| loop = tr.get("loop", False) | |
| ain = f"[{idx}:a]" | |
| if loop: | |
| # loop audio by enabling -stream_loop at input-level is better, | |
| # but we keep it simple and let short loops still mix; user can provide longer music. | |
| pass | |
| fc.append(f"{ain}volume={float(vol)}[am{i}]") | |
| mix_inputs.append(f"[am{i}]") | |
| if len(mix_inputs) == 1: | |
| final_a = mix_inputs[0] | |
| else: | |
| # amix | |
| fc.append(f"{''.join(mix_inputs)}amix=inputs={len(mix_inputs)}:duration=longest:dropout_transition=2[aout]") | |
| final_a = "[aout]" | |
| cmd += ["-filter_complex", ";".join(fc)] | |
| cmd += ["-map", current_v, "-map", final_a] | |
| out_ext = (plan.get("output_ext") or "mp4").lower() | |
| if out_ext in ("mp3", "wav"): | |
| if out_ext == "mp3": | |
| cmd += ["-vn", "-c:a", "libmp3lame", "-b:a", "192k"] | |
| else: | |
| cmd += ["-vn", "-c:a", "pcm_s16le"] | |
| elif out_ext == "gif": | |
| cmd += ["-map", current_v, "-vf", "fps=12,scale=640:-1:flags=lanczos"] | |
| else: | |
| cmd += ["-c:v", "libx264", "-preset", "veryfast", "-crf", "23", "-c:a", "aac", "-b:a", "192k"] | |
| return " ".join(cmd) |