""" Video Processing Engine — FFmpeg-based pipeline for Shorts editing. Uses direct subprocess calls for maximum control and error visibility. Every FFmpeg operation captures stderr, checks return codes, and reports meaningful errors to the caller. """ import json import logging import os import re import shutil import subprocess import tempfile import unicodedata from pathlib import Path from PIL import Image, ImageDraw, ImageFont logger = logging.getLogger("ShortsEditor.Processor") TARGET_WIDTH = 1080 TARGET_HEIGHT = 1920 PROJECT_ROOT = Path(__file__).resolve().parent.parent BUNDLED_EMOJI_FONT = PROJECT_ROOT / "assets" / "fonts" / "NotoColorEmoji_WindowsCompatible.ttf" LOOK_PRESETS = { "warm_cinematic": { "eq": {"contrast": 1.12, "saturation": 1.16, "brightness": 0.015, "gamma": 1.03}, "pulse": {"contrast": 0.03, "saturation": 0.08, "gamma": 0.025}, "colorbalance": {"rs": 0.10, "gs": 0.02, "bs": -0.07, "rm": 0.05, "bm": -0.02}, "vignette": 0.22, "sharpen": 0.75, }, "cool_teal": { "eq": {"contrast": 1.10, "saturation": 1.10, "brightness": 0.008, "gamma": 1.01}, "pulse": {"contrast": 0.025, "saturation": 0.06, "gamma": 0.018}, "colorbalance": {"rs": -0.04, "gs": 0.03, "bs": 0.10, "gm": 0.02, "bm": 0.04}, "vignette": 0.18, "sharpen": 0.65, }, "muted_drama": { "eq": {"contrast": 1.15, "saturation": 0.88, "brightness": -0.005, "gamma": 1.04}, "pulse": {"contrast": 0.02, "saturation": 0.04, "gamma": 0.02}, "colorbalance": {"rs": 0.04, "gs": 0.02, "bs": -0.05, "rm": 0.03, "bm": -0.03}, "vignette": 0.26, "sharpen": 0.7, }, "black_white": { "eq": {"contrast": 1.18, "saturation": 0.0, "brightness": 0.01, "gamma": 1.05}, "pulse": {"contrast": 0.025, "saturation": 0.0, "gamma": 0.015}, "colorbalance": {}, "vignette": 0.28, "sharpen": 0.8, }, } DEFAULT_DRAW_FONT_FAMILIES = "DejaVu Sans,Noto Sans,Arial,Helvetica" EXTENDED_DRAW_FONT_FAMILIES = "Noto Sans,DejaVu Sans,Noto Emoji,Noto Color Emoji,Segoe UI Emoji,Apple Color Emoji,Symbola" # --------------------------------------------------------------------------- # Exceptions # --------------------------------------------------------------------------- class ProcessingError(Exception): """Raised when any step in the video pipeline fails.""" pass class FFmpegNotFoundError(ProcessingError): """Raised when FFmpeg/FFprobe is not available on the system.""" pass # --------------------------------------------------------------------------- # Utility helpers # --------------------------------------------------------------------------- def _find_ffmpeg(): """Return the path to ffmpeg, or raise if not found.""" path = shutil.which("ffmpeg") if path is None: raise FFmpegNotFoundError( "FFmpeg is not installed or not in PATH.\n" "Download from https://ffmpeg.org/download.html and add to PATH." ) return path def _find_ffprobe(): """Return the path to ffprobe, or raise if not found.""" path = shutil.which("ffprobe") if path is None: raise FFmpegNotFoundError( "FFprobe is not installed or not in PATH.\n" "It usually comes bundled with FFmpeg." ) return path def _run_ffmpeg(args: list, description: str, duration: float = None, progress_callback=None, progress_range: tuple = None): """ Run an FFmpeg command with full error capture. Parameters ---------- args : list Full command list (including 'ffmpeg' as first element). description : str Human-readable name of this step (for error messages). duration : float, optional Total duration in seconds (for progress calculation). progress_callback : callable, optional Function(percent: float, status: str) to report progress. progress_range : tuple, optional (start_pct, end_pct) — the portion of overall progress this step covers. Raises ------ ProcessingError If FFmpeg returns a non-zero exit code. """ logger.info(f"[{description}] Running: {' '.join(args)}") process = subprocess.Popen( args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, errors="replace", creationflags=subprocess.CREATE_NO_WINDOW if os.name == "nt" else 0, ) stderr_lines = [] start_pct = progress_range[0] if progress_range else 0 end_pct = progress_range[1] if progress_range else 100 # Read stderr line-by-line for progress parsing for line in process.stderr: stderr_lines.append(line) # Parse progress from FFmpeg output: "time=00:01:23.45" if duration and progress_callback and "time=" in line: match = re.search(r"time=(\d+):(\d+):(\d+\.\d+)", line) if match: h, m, s = float(match.group(1)), float(match.group(2)), float(match.group(3)) current_time = h * 3600 + m * 60 + s step_progress = min(current_time / duration, 1.0) overall_pct = start_pct + step_progress * (end_pct - start_pct) progress_callback(overall_pct, description) process.wait() if process.returncode != 0: stderr_text = "".join(stderr_lines[-30:]) # Last 30 lines for context logger.error(f"[{description}] FFmpeg failed (code {process.returncode}):\n{stderr_text}") raise ProcessingError( f"{description} failed.\n\n" f"FFmpeg exit code: {process.returncode}\n" f"Error output:\n{stderr_text}" ) logger.info(f"[{description}] Completed successfully.") def probe_video(input_path: str) -> dict: """ Use ffprobe to extract video metadata. Returns ------- dict with keys: width, height, duration, has_audio """ ffprobe = _find_ffprobe() cmd = [ ffprobe, "-v", "quiet", "-print_format", "json", "-show_format", "-show_streams", input_path ] try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=30, creationflags=subprocess.CREATE_NO_WINDOW if os.name == "nt" else 0, ) except subprocess.TimeoutExpired: raise ProcessingError(f"FFprobe timed out reading: {input_path}") if result.returncode != 0: raise ProcessingError( f"Cannot read video file.\n" f"FFprobe error: {result.stderr[:500]}" ) try: data = json.loads(result.stdout) except json.JSONDecodeError: raise ProcessingError("FFprobe returned invalid data. File may be corrupted.") # Find video stream video_stream = None has_audio = False for stream in data.get("streams", []): if stream.get("codec_type") == "video" and video_stream is None: video_stream = stream if stream.get("codec_type") == "audio": has_audio = True if video_stream is None: raise ProcessingError("No video stream found in the file.") width = int(video_stream.get("width", 0)) height = int(video_stream.get("height", 0)) if width == 0 or height == 0: raise ProcessingError("Could not determine video dimensions.") # Get duration (try stream, then format) duration = 0.0 if "duration" in video_stream: duration = float(video_stream["duration"]) elif "duration" in data.get("format", {}): duration = float(data["format"]["duration"]) if duration <= 0: raise ProcessingError("Could not determine video duration. File may be invalid.") return { "width": width, "height": height, "duration": duration, "has_audio": has_audio, } # --------------------------------------------------------------------------- # Processing pipeline # --------------------------------------------------------------------------- def process_video( input_path: str, output_path: str, options: dict, progress_callback=None, temp_dir: str = None, ): """ Main processing pipeline. Orchestrates all editing steps. Parameters ---------- input_path : str Path to the source video file. output_path : str Path for the final exported MP4. options : dict { "crop": bool, # Crop to 9:16 "crop_position": float, # 0.0 (top) to 1.0 (bottom), default 0.5 (center) "source_rotation": str, # none|cw|ccw|180 "source_fit_mode": str, # cover|contain "source_pan_x": float, # -1.0 to 1.0 manual horizontal framing "source_pan_y": float, # -1.0 to 1.0 manual vertical framing "source_zoom": float, # 0.6 to 2.5 manual zoom "look_preset": str, # Cinematic grading preset "look_strength": float, # 0.0 to 1.0 "look_motion": float, # 0.0 to 1.0 subtle animated mood shift "text_mode": str, # none|center_title|premium_subtitle|top_commentary "text_primary": str, # Main template text "text_secondary": str, # Optional second line "text_accent_color": str, # Hex color for highlighted text "text_scale": float, # 0.7 to 1.4 "text_box": dict, # Normalized x/y/w/h placement box "captions": bool, # Burn subtitles "caption_path": str or None, # Path to .ass or .srt subtitles "caption_format": str, # "ass" or "srt" "music": bool, # Add background music "music_path": str or None, # Path to music file "music_volume": float, # 0.0 to 1.0, default 0.2 "tint": bool, # Apply color tint "tint_color": str, # Hex color e.g. "#FF0000" "tint_opacity": float, # 0.0 to 1.0, default 0.2 "watermark": bool, # Add channel name "channel_name": str, # Text to display "export_quality": str, # "high", "balanced", or "fast" } progress_callback : callable, optional Function(percent: float, status: str). temp_dir : str, optional Directory for temp files. Created if needed, cleaned on completion. """ ffmpeg = _find_ffmpeg() # --- Validate input --- if not os.path.isfile(input_path): raise ProcessingError(f"Input file not found: {input_path}") file_size = os.path.getsize(input_path) if file_size == 0: raise ProcessingError("Input file is empty (0 bytes).") if progress_callback: progress_callback(1, "Analyzing video...") info = probe_video(input_path) logger.info(f"Video info: {info}") # --- Setup temp dir --- own_temp = False if temp_dir is None: temp_dir = tempfile.mkdtemp(prefix="shorts_editor_") own_temp = True else: os.makedirs(temp_dir, exist_ok=True) try: _run_pipeline( ffmpeg, input_path, output_path, options, info, temp_dir, progress_callback ) finally: # Always clean temp files if own_temp: try: shutil.rmtree(temp_dir, ignore_errors=True) logger.info(f"Cleaned temp directory: {temp_dir}") except Exception as e: logger.warning(f"Failed to clean temp dir: {e}") # --- Verify output --- if not os.path.isfile(output_path): raise ProcessingError("Processing completed but output file was not created.") out_size = os.path.getsize(output_path) if out_size == 0: os.remove(output_path) raise ProcessingError("Processing completed but output file is empty.") if progress_callback: progress_callback(100, "Done!") logger.info(f"Processing complete. Output: {output_path} ({out_size / 1024 / 1024:.1f} MB)") def _run_pipeline(ffmpeg, input_path, output_path, options, info, temp_dir, progress_callback): """Build and execute the FFmpeg filter chain.""" src_w = info["width"] src_h = info["height"] duration = info["duration"] has_audio = info["has_audio"] crop_enabled = options.get("crop", True) crop_pos = options.get("crop_position", 0.5) # 0=top, 0.5=center, 1=bottom source_rotation = str(options.get("source_rotation", "none") or "none").strip().lower() source_fit_mode = str(options.get("source_fit_mode", "cover") or "cover").strip().lower() source_pan_x = _clamp_float(options.get("source_pan_x", 0.0), -1.0, 1.0) source_pan_y = _clamp_float(options.get("source_pan_y", 0.0), -1.0, 1.0) source_zoom = _clamp_float(options.get("source_zoom", 1.0), 0.6, 2.5) source_prepared = bool(options.get("source_prepared", False)) input_start = options.get("input_start") input_end = options.get("input_end") look_preset = str(options.get("look_preset", "warm_cinematic")).strip().lower() look_strength = _clamp_float(options.get("look_strength", 0.85), 0.0, 1.0) look_motion = _clamp_float(options.get("look_motion", 0.45), 0.0, 1.0) text_mode = str(options.get("text_mode", "none")).strip().lower() text_primary = str(options.get("text_primary", "") or "") text_secondary = str(options.get("text_secondary", "") or "") text_highlight = str(options.get("text_highlight", "") or "") text_accent_color = str(options.get("text_accent_color", "#18D7FF") or "#18D7FF") highlight_color = str(options.get("highlight_color", "#FF7B47") or "#FF7B47") text_bold = bool(options.get("text_bold", True)) text_scale = _clamp_float(options.get("text_scale", 1.0), 0.7, 1.4) top_text_scale = _clamp_float(options.get("top_text_scale", 1.0), 0.7, 2.0) text_box = _normalize_text_box(options.get("text_box")) tint_enabled = options.get("tint", False) tint_color = options.get("tint_color", "#000000") tint_opacity = options.get("tint_opacity", 0.2) watermark_enabled = options.get("watermark", False) channel_name = options.get("channel_name", "") channel_position = str(options.get("channel_position", "lower_left_overlay") or "lower_left_overlay").strip().lower() captions_enabled = options.get("captions", False) caption_path = options.get("caption_path") or options.get("srt_path") caption_format = options.get("caption_format", "") music_enabled = options.get("music", False) music_path = options.get("music_path", None) music_volume = options.get("music_volume", 0.2) duck_music = bool(options.get("duck_music", True)) ducking_strength = _clamp_float(options.get("ducking_strength", 0.7), 0.0, 1.0) audio_boost = _clamp_float(options.get("audio_boost", 1.0), 1.0, 2.5) export_quality = str(options.get("export_quality", "high")).strip().lower() if input_start is not None: input_start = _clamp_float(input_start, 0.0, duration) if input_end is not None: input_end = _clamp_float(input_end, 0.0, duration) if input_start is not None or input_end is not None: trim_start = input_start or 0.0 trim_end = input_end if input_end is not None else duration if trim_end <= trim_start: raise ProcessingError("Selected trim range is invalid.") duration = trim_end - trim_start # ---- Build video filter chain ---- vfilters = [] if not source_prepared: normalization_mode = source_fit_mode if crop_enabled else "contain" vfilters.extend( _build_source_normalization_filters( src_w, src_h, crop_position=crop_pos, fit_mode=normalization_mode, rotation=source_rotation, pan_x=source_pan_x, pan_y=source_pan_y, zoom=source_zoom, final_scale=False, ) ) else: vfilters.append("setsar=1") # Step 1: Crop to 9:16 if False and crop_enabled: target_ratio = 9 / 16 src_ratio = src_w / src_h if src_ratio > target_ratio: # Video is wider than 9:16 — crop horizontally crop_h = src_h crop_w = int(src_h * target_ratio) # Center horizontally (crop_position not relevant for horizontal crop) x_offset = (src_w - crop_w) // 2 y_offset = 0 vfilters.append(f"crop={crop_w}:{crop_h}:{x_offset}:{y_offset}") elif src_ratio < target_ratio: # Video is taller than 9:16 — crop vertically crop_w = src_w crop_h = int(src_w / target_ratio) # Use crop_position to determine vertical offset max_offset = src_h - crop_h y_offset = int(max_offset * crop_pos) x_offset = 0 vfilters.append(f"crop={crop_w}:{crop_h}:{x_offset}:{y_offset}") # else: already 9:16, no crop needed # Step 2: Resize to 1080x1920 vfilters.append(f"scale={TARGET_WIDTH}:{TARGET_HEIGHT}:flags=lanczos") # Ensure even dimensions (required by most codecs) vfilters.append("setsar=1") # Step 3: Cinematic grading and subtle color mood motion vfilters.extend(_build_look_filters(look_preset, look_strength, look_motion)) # Step 4: Optional extra tint overlay if tint_enabled and tint_color: hex_clean = tint_color.lstrip("#") try: int(hex_clean[0:2], 16) int(hex_clean[2:4], 16) int(hex_clean[4:6], 16) except (ValueError, IndexError): logger.warning(f"Invalid tint color '{tint_color}', skipping tint.") tint_enabled = False if tint_enabled: opacity = max(0.0, min(1.0, tint_opacity)) vfilters.append( f"drawbox=x=0:y=0:w=iw:h=ih:color=0x{hex_clean}@{opacity}:t=fill" ) # Step 5: Template-driven permanent text text_overlay = _build_text_overlay( text_mode=text_mode, primary=text_primary, secondary=text_secondary, highlight_text=text_highlight, accent_color=text_accent_color, highlight_color=highlight_color, text_bold=text_bold, text_scale=text_scale, top_text_scale=top_text_scale, text_box=text_box, temp_dir=temp_dir, ) # Step 6: Watermark (channel name) if watermark_enabled and channel_name.strip(): vfilters.append(_build_watermark_filter(channel_name.strip(), channel_position)) # Step 7: Captions (subtitles) if captions_enabled and caption_path and os.path.isfile(caption_path): caption_format = (caption_format or Path(caption_path).suffix.lstrip(".")).lower() escaped_caption_path = _escape_filter_path(caption_path) if caption_format == "ass": vfilters.append(f"ass='{escaped_caption_path}'") else: vfilters.append(f"subtitles='{escaped_caption_path}'") elif captions_enabled and (caption_path is None or not os.path.isfile(caption_path or "")): logger.warning("Captions enabled but no caption file found. Skipping captions.") # ---- Build audio filter chain ---- # We need to handle: original audio + optional background music audio_inputs = [] audio_filters = [] input_args = [] if input_start is not None: input_args.extend(["-ss", f"{input_start:.3f}"]) if input_start is not None or input_end is not None: input_args.extend(["-t", f"{duration:.3f}"]) input_args.extend(["-i", input_path]) input_count = 1 if music_enabled and music_path and os.path.isfile(music_path): # Add music as second input, loop it input_args.extend(["-stream_loop", "-1", "-i", music_path]) music_idx = input_count input_count += 1 vol = max(0.0, min(1.0, music_volume)) duck_threshold = 0.08 - (ducking_strength * 0.06) duck_ratio = 1.5 + (ducking_strength * 10.5) duck_attack = 12 + int((1.0 - ducking_strength) * 40) duck_release = 220 + int((1.0 - ducking_strength) * 240) duck_makeup = 1.0 + ducking_strength * 0.3 if has_audio: # Mix original audio + music if duck_music: audio_filters.append( f"[0:a]volume={audio_boost:.2f},aformat=sample_fmts=fltp:sample_rates=48000:channel_layouts=stereo[dry];" f"[{music_idx}:a]volume={vol},atrim=0:{duration},asetpts=PTS-STARTPTS," "aformat=sample_fmts=fltp:sample_rates=48000:channel_layouts=stereo[bgm];" f"[bgm][dry]sidechaincompress=threshold={duck_threshold:.3f}:ratio={duck_ratio:.2f}:" f"attack={duck_attack}:release={duck_release}:makeup={duck_makeup:.2f}[ducked];" "[dry][ducked]amix=inputs=2:duration=first:dropout_transition=2," "alimiter=limit=0.95[aout]" ) else: audio_filters.append( f"[0:a]volume={audio_boost:.2f},aformat=sample_fmts=fltp:sample_rates=48000:channel_layouts=stereo[dry];" f"[{music_idx}:a]volume={vol},atrim=0:{duration},asetpts=PTS-STARTPTS," "aformat=sample_fmts=fltp:sample_rates=48000:channel_layouts=stereo[bgm];" "[dry][bgm]amix=inputs=2:duration=first:dropout_transition=2," "alimiter=limit=0.95[aout]" ) else: # Only music (no original audio) audio_filters.append( f"[{music_idx}:a]volume={vol},atrim=0:{duration},asetpts=PTS-STARTPTS[aout]" ) elif has_audio: # Just pass through original audio audio_filters.append(f"[0:a]volume={audio_boost:.2f},alimiter=limit=0.95[aout]") # else: no audio at all # ---- Combine into final FFmpeg command ---- vfilter_str = ",".join(vfilters) if vfilters else "null" # Build complex filter graph filter_parts = [] filter_parts.append(f"[0:v]{vfilter_str}[vbase]") if text_overlay: overlay_path = _escape_filter_path(text_overlay["path"]) filter_parts.append(f"movie='{overlay_path}',format=rgba[text_ov]") filter_parts.append( f"[vbase][text_ov]overlay=" f"x={text_overlay['x']}:y={text_overlay['y']}:format=auto[vout]" ) else: filter_parts.append("[vbase]null[vout]") if audio_filters: filter_parts.extend(audio_filters) filter_graph = ";".join(filter_parts) cmd = [ffmpeg, "-y"] # Overwrite output cmd.extend(input_args) cmd.extend(["-filter_complex", filter_graph]) cmd.extend(["-map", "[vout]"]) if audio_filters: cmd.extend(["-map", "[aout]"]) # Output settings quality_map = { "high": {"preset": "slow", "crf": "18"}, "balanced": {"preset": "medium", "crf": "20"}, "fast": {"preset": "veryfast", "crf": "23"}, } quality_settings = quality_map.get(export_quality, quality_map["high"]) cmd.extend([ "-c:v", "libx264", "-preset", quality_settings["preset"], "-crf", quality_settings["crf"], "-profile:v", "high", "-level", "4.1", "-r", "30", "-pix_fmt", "yuv420p", ]) if audio_filters: cmd.extend(["-c:a", "aac", "-b:a", "192k"]) cmd.extend([ "-movflags", "+faststart", "-t", str(duration), # Ensure output matches source duration output_path, ]) if progress_callback: progress_callback(5, "Processing video...") _run_ffmpeg( cmd, description="Video processing", duration=duration, progress_callback=progress_callback, progress_range=(5, 95), ) if progress_callback: progress_callback(95, "Finalizing...") def extract_audio(input_path: str, output_wav_path: str): """ Extract audio from a video file as a WAV for transcription. Parameters ---------- input_path : str Path to the video file. output_wav_path : str Path where the WAV file will be saved. """ ffmpeg = _find_ffmpeg() cmd = [ ffmpeg, "-y", "-i", input_path, "-vn", # No video "-acodec", "pcm_s16le", # WAV format "-ar", "16000", # 16kHz (Whisper optimal) "-ac", "1", # Mono output_wav_path, ] _run_ffmpeg(cmd, description="Extracting audio") def build_raw_clip( source_videos: list, segments: list, output_path: str, crop_position: float = 0.5, source_fit_mode: str = "cover", source_rotation: str = "none", source_pan_x: float = 0.0, source_pan_y: float = 0.0, source_zoom: float = 1.0, trim_silence: bool = False, silence_threshold_db: float = -45.0, min_silence_duration: float = 0.35, silence_padding: float = 0.1, progress_callback=None, progress_range: tuple = (1, 20), ): """Trim and merge user-selected source segments into one vertical raw clip.""" ffmpeg = _find_ffmpeg() if not source_videos: raise ProcessingError("No source videos were provided for raw clip building.") normalized_segments = _normalize_segments(source_videos, segments) if trim_silence: normalized_segments = _expand_segments_by_silence( ffmpeg=ffmpeg, source_videos=source_videos, segments=normalized_segments, silence_threshold_db=silence_threshold_db, min_silence_duration=min_silence_duration, silence_padding=silence_padding, ) if not normalized_segments: raise ProcessingError("No valid source segments were provided.") os.makedirs(os.path.dirname(output_path), exist_ok=True) temp_dir = tempfile.mkdtemp(prefix="shorts_raw_builder_") segment_files = [] try: start_pct, end_pct = progress_range prep_span = max(1.0, (end_pct - start_pct) * 0.8) merge_start = start_pct + prep_span per_segment_span = prep_span / max(len(normalized_segments), 1) for index, segment in enumerate(normalized_segments): source = source_videos[segment["video_index"]] info = probe_video(source["path"]) segment_output = os.path.join(temp_dir, f"segment_{index:03d}.mp4") segment_filters = ",".join( _build_source_normalization_filters( info["width"], info["height"], crop_position=crop_position, fit_mode=source_fit_mode, rotation=source_rotation, pan_x=source_pan_x, pan_y=source_pan_y, zoom=source_zoom, ) ) cmd = [ ffmpeg, "-y", "-ss", f"{segment['start']:.3f}", "-to", f"{segment['end']:.3f}", "-i", source["path"], ] if not info["has_audio"]: cmd.extend( [ "-f", "lavfi", "-i", "anullsrc=channel_layout=stereo:sample_rate=48000", ] ) cmd.extend( [ "-vf", segment_filters, "-r", "30", "-c:v", "libx264", "-preset", "superfast", "-crf", "20", "-pix_fmt", "yuv420p", ] ) if info["has_audio"]: cmd.extend(["-c:a", "aac", "-b:a", "192k", "-ar", "48000", "-ac", "2"]) else: cmd.extend( [ "-map", "0:v:0", "-map", "1:a:0", "-shortest", "-c:a", "aac", "-b:a", "96k", "-ar", "48000", "-ac", "2", ] ) cmd.extend(["-movflags", "+faststart", segment_output]) _run_ffmpeg( cmd, description=f"Preparing source segment {index + 1}/{len(normalized_segments)}", duration=segment["end"] - segment["start"], progress_callback=progress_callback, progress_range=( start_pct + per_segment_span * index, start_pct + per_segment_span * (index + 1), ), ) segment_files.append(segment_output) concat_file = os.path.join(temp_dir, "concat.txt") with open(concat_file, "w", encoding="utf-8") as handle: for segment_file in segment_files: escaped = segment_file.replace("'", "'\\''") handle.write(f"file '{escaped}'\n") merge_cmd = [ ffmpeg, "-y", "-f", "concat", "-safe", "0", "-i", concat_file, "-c", "copy", "-movflags", "+faststart", output_path, ] _run_ffmpeg( merge_cmd, description="Merging raw clip", progress_callback=progress_callback, progress_range=(merge_start, end_pct), ) finally: shutil.rmtree(temp_dir, ignore_errors=True) def _normalize_segments(source_videos: list, segments: list) -> list: """Validate and normalize timeline segment input.""" if not segments: normalized = [] for idx, source in enumerate(source_videos): info = probe_video(source["path"]) normalized.append( { "video_index": idx, "start": 0.0, "end": info["duration"], } ) return normalized normalized = [] for raw in segments: try: video_index = int(raw.get("video_index", 0)) start = float(raw.get("start", 0.0)) end = float(raw.get("end", 0.0)) except (TypeError, ValueError): raise ProcessingError("One or more source segments are invalid.") if video_index < 0 or video_index >= len(source_videos): raise ProcessingError("A source segment references a missing video.") info = probe_video(source_videos[video_index]["path"]) start = max(0.0, min(start, info["duration"])) end = max(0.0, min(end, info["duration"])) if end <= start: raise ProcessingError("Each source segment must have an end time after its start time.") normalized.append( { "video_index": video_index, "start": start, "end": end, } ) return normalized def _expand_segments_by_silence( ffmpeg: str, source_videos: list, segments: list, silence_threshold_db: float, min_silence_duration: float, silence_padding: float, ) -> list: """Split segments around detected dead-silent intervals.""" expanded = [] silence_cache = {} for segment in segments: video_index = int(segment["video_index"]) source = source_videos[video_index] info = probe_video(source["path"]) if not info["has_audio"]: expanded.append(segment) continue if video_index not in silence_cache: silence_cache[video_index] = _detect_silence_intervals( ffmpeg=ffmpeg, input_path=source["path"], threshold_db=silence_threshold_db, min_duration=min_silence_duration, max_duration=info["duration"], ) kept_ranges = _subtract_silence_from_range( start=float(segment["start"]), end=float(segment["end"]), silences=silence_cache[video_index], padding=silence_padding, ) if kept_ranges: for keep_start, keep_end in kept_ranges: expanded.append( { "video_index": video_index, "start": keep_start, "end": keep_end, } ) else: expanded.append(segment) return expanded def _detect_silence_intervals( ffmpeg: str, input_path: str, threshold_db: float, min_duration: float, max_duration: float, ) -> list[tuple[float, float]]: """Use ffmpeg silencedetect to find dead-silent intervals.""" attempts = [ (threshold_db, min_duration), (max(threshold_db + 8.0, -35.0), max(0.2, min_duration * 0.85)), ] best_intervals = [] for noise_db, duration in attempts: cmd = [ ffmpeg, "-hide_banner", "-i", input_path, "-af", f"silencedetect=noise={noise_db:.1f}dB:d={duration:.2f}", "-f", "null", "-", ] proc = subprocess.run(cmd, capture_output=True, text=True) stderr = proc.stderr or "" silence_start_pattern = re.compile(r"silence_start:\s*([0-9.]+)") silence_end_pattern = re.compile(r"silence_end:\s*([0-9.]+)") intervals = [] current_start = None for line in stderr.splitlines(): match_start = silence_start_pattern.search(line) if match_start: current_start = float(match_start.group(1)) continue match_end = silence_end_pattern.search(line) if match_end and current_start is not None: end_time = float(match_end.group(1)) if end_time > current_start: intervals.append((current_start, end_time)) current_start = None if current_start is not None and max_duration > current_start: intervals.append((current_start, max_duration)) if intervals: return intervals best_intervals = intervals return best_intervals def _subtract_silence_from_range( start: float, end: float, silences: list[tuple[float, float]], padding: float, ) -> list[tuple[float, float]]: """Keep only the non-silent subranges inside a segment.""" if end <= start: return [] padded_silences = [] for silence_start, silence_end in silences: trimmed_start = max(start, silence_start + padding) trimmed_end = min(end, silence_end - padding) if trimmed_end - trimmed_start > 0.05: padded_silences.append((trimmed_start, trimmed_end)) if not padded_silences: return [(start, end)] keep_ranges = [] cursor = start for silence_start, silence_end in padded_silences: if silence_start > cursor: keep_ranges.append((cursor, silence_start)) cursor = max(cursor, silence_end) if cursor < end: keep_ranges.append((cursor, end)) return [(seg_start, seg_end) for seg_start, seg_end in keep_ranges if seg_end - seg_start > 0.08] def _build_source_normalization_filters( src_w: int, src_h: int, crop_position: float, fit_mode: str = "cover", rotation: str = "none", pan_x: float = 0.0, pan_y: float = 0.0, zoom: float = 1.0, final_scale: bool = True, ) -> list: """Normalize a source clip into the vertical shorts frame.""" filters = [] fit_mode = str(fit_mode or "cover").strip().lower() rotation = str(rotation or "none").strip().lower() pan_x = _clamp_float(pan_x, -1.0, 1.0) pan_y = _clamp_float(pan_y, -1.0, 1.0) zoom = _clamp_float(zoom, 0.6, 2.5) eff_w, eff_h = src_w, src_h if rotation == "cw": filters.append("transpose=1") eff_w, eff_h = src_h, src_w elif rotation == "ccw": filters.append("transpose=2") eff_w, eff_h = src_h, src_w elif rotation == "180": filters.append("rotate=PI") if fit_mode == "contain": base_scale = min(TARGET_WIDTH / eff_w, TARGET_HEIGHT / eff_h) else: base_scale = max(TARGET_WIDTH / eff_w, TARGET_HEIGHT / eff_h) scale_factor = base_scale * zoom scaled_w = max(2, int(round(eff_w * scale_factor / 2)) * 2) scaled_h = max(2, int(round(eff_h * scale_factor / 2)) * 2) filters.append(f"scale={scaled_w}:{scaled_h}:flags=lanczos") current_w = scaled_w current_h = scaled_h if current_w > TARGET_WIDTH: x_offset = int(round((current_w - TARGET_WIDTH) * ((pan_x + 1.0) / 2.0))) x_offset = max(0, min(current_w - TARGET_WIDTH, x_offset)) filters.append(f"crop={TARGET_WIDTH}:{current_h}:{x_offset}:0") current_w = TARGET_WIDTH if current_h > TARGET_HEIGHT: if current_h == scaled_h and current_w == TARGET_WIDTH: y_offset = int(round((current_h - TARGET_HEIGHT) * ((pan_y + 1.0) / 2.0))) else: fallback = _clamp_float(crop_position, 0.0, 1.0) y_offset = int(round((current_h - TARGET_HEIGHT) * ((pan_y + 1.0) / 2.0 if abs(pan_y) > 0.001 else fallback))) y_offset = max(0, min(current_h - TARGET_HEIGHT, y_offset)) filters.append(f"crop={current_w}:{TARGET_HEIGHT}:0:{y_offset}") current_h = TARGET_HEIGHT if current_w < TARGET_WIDTH or current_h < TARGET_HEIGHT: x_pad = int(round((TARGET_WIDTH - current_w) * ((pan_x + 1.0) / 2.0))) y_pad = int(round((TARGET_HEIGHT - current_h) * ((pan_y + 1.0) / 2.0))) x_pad = max(0, min(TARGET_WIDTH - current_w, x_pad)) y_pad = max(0, min(TARGET_HEIGHT - current_h, y_pad)) filters.append(f"pad={TARGET_WIDTH}:{TARGET_HEIGHT}:{x_pad}:{y_pad}:black") current_w = TARGET_WIDTH current_h = TARGET_HEIGHT if final_scale and (current_w != TARGET_WIDTH or current_h != TARGET_HEIGHT): filters.append(f"scale={TARGET_WIDTH}:{TARGET_HEIGHT}:flags=lanczos") filters.append("setsar=1") return filters def _build_look_filters(preset_name: str, strength: float, motion: float) -> list: """Return FFmpeg filters for cinematic grading and subtle mood motion.""" preset = LOOK_PRESETS.get(preset_name, LOOK_PRESETS["warm_cinematic"]) eq = preset["eq"] pulse = preset["pulse"] contrast = eq["contrast"] + (eq["contrast"] - 1.0) * (strength - 1.0) if eq["saturation"] == 0.0: saturation = 0.0 else: saturation = 1.0 + (eq["saturation"] - 1.0) * strength brightness = eq["brightness"] * strength gamma = 1.0 + (eq["gamma"] - 1.0) * strength contrast_pulse = pulse["contrast"] * motion saturation_pulse = pulse["saturation"] * motion gamma_pulse = pulse["gamma"] * motion filters = [ "format=yuv420p", ( "eq=" f"contrast='{contrast:.3f}+{contrast_pulse:.3f}*sin(t*0.55)'" f":saturation='{saturation:.3f}+{saturation_pulse:.3f}*sin(t*0.72)'" f":brightness='{brightness:.3f}'" f":gamma='{gamma:.3f}+{gamma_pulse:.3f}*sin(t*0.31)'" ), ] if preset.get("colorbalance"): color_values = [] for key, value in preset["colorbalance"].items(): color_values.append(f"{key}={value * strength:.3f}") if color_values: filters.append("colorbalance=" + ":".join(color_values)) vignette_strength = 0.08 + preset["vignette"] * strength sharpen_strength = max(0.15, preset["sharpen"] * (0.55 + 0.45 * strength)) filters.append(f"vignette=angle=PI/{max(3.1, 5.2 - vignette_strength * 5.0):.3f}") filters.append(f"unsharp=5:5:{sharpen_strength:.3f}:5:5:0.0") return filters def _build_text_filters( text_mode: str, primary: str, secondary: str, accent_color: str, text_scale: float, top_text_scale: float, text_box: dict, ) -> list: """Return drawtext overlays for the selected cinematic template.""" accent = _normalize_hex_color(accent_color, "#18D7FF") primary_text = primary.strip() secondary_text = secondary.strip() box = _normalize_text_box(text_box) left = int(box["x"] * TARGET_WIDTH) top = int(box["y"] * TARGET_HEIGHT) width = int(box["w"] * TARGET_WIDTH) height = int(box["h"] * TARGET_HEIGHT) center_x = left + width // 2 if text_mode == "none" or not primary_text: return [] filters = [] if text_mode == "center_title": filters.append( _drawtext_filter( text=_normalize_text_line(primary_text, uppercase=True), fontsize=int(74 * text_scale), fontcolor=accent, x=f"{center_x}-text_w/2", y=f"{top + int(height * 0.18)}", borderw=4, bordercolor="black@0.65", shadowx=0, shadowy=0, shadowcolor="black@0.75", line_spacing=12, prefer_extended_glyphs=_contains_extended_glyphs(primary_text), ) ) if secondary_text: filters.append( _drawtext_filter( text=_normalize_text_line(secondary_text, uppercase=False), fontsize=int(36 * text_scale), fontcolor="white", x=f"{center_x}-text_w/2", y=f"{top + int(height * 0.62)}", borderw=2, bordercolor="black@0.45", shadowx=0, shadowy=0, shadowcolor="black@0.65", line_spacing=10, prefer_extended_glyphs=_contains_extended_glyphs(secondary_text), ) ) elif text_mode == "premium_subtitle": filters.append( _drawtext_filter( text=_normalize_text_line(primary_text, uppercase=True), fontsize=int(48 * text_scale), fontcolor=accent, x=f"{center_x}-text_w/2", y=f"{top + int(height * 0.12)}", borderw=3, bordercolor="black@0.55", shadowx=0, shadowy=0, shadowcolor="black@0.7", line_spacing=8, prefer_extended_glyphs=_contains_extended_glyphs(primary_text), ) ) if secondary_text: filters.append( _drawtext_filter( text=_normalize_text_line(secondary_text, uppercase=True), fontsize=int(40 * text_scale), fontcolor="white", x=f"{center_x}-text_w/2", y=f"{top + int(height * 0.5)}", borderw=3, bordercolor="black@0.65", shadowx=0, shadowy=0, shadowcolor="black@0.72", line_spacing=8, prefer_extended_glyphs=_contains_extended_glyphs(secondary_text), ) ) return filters def _drawtext_filter( text: str, fontsize: int, fontcolor: str, x: str, y: str, borderw: int, bordercolor: str, shadowx: int, shadowy: int, shadowcolor: str, line_spacing: int, prefer_extended_glyphs: bool = False, ) -> str: """Build a drawtext filter string.""" font_arg = _build_drawtext_font_arg(prefer_extended_glyphs) return ( f"drawtext={font_arg}" f":text='{_escape_drawtext_text(text)}'" f":fontsize={max(16, fontsize)}" f":fontcolor={fontcolor}" f":x={x}" f":y={y}" f":borderw={borderw}" f":bordercolor={bordercolor}" f":shadowx={shadowx}" f":shadowy={shadowy}" f":shadowcolor={shadowcolor}" f":line_spacing={line_spacing}" ":text_shaping=1" ) def _normalize_text_line(text: str, uppercase: bool) -> str: cleaned = re.sub(r"\s+", " ", text.strip()) return cleaned.upper() if uppercase else cleaned def _normalize_text_block(text: str, uppercase: bool) -> str: lines = [re.sub(r"\s+", " ", part.strip()) for part in text.replace("\r", "").split("\n")] lines = [line for line in lines if line] cleaned = r"\n".join(lines[:4]) return cleaned.upper() if uppercase else cleaned def _build_text_overlay( text_mode: str, primary: str, secondary: str, highlight_text: str, accent_color: str, highlight_color: str, text_bold: bool, text_scale: float, top_text_scale: float, text_box: dict, temp_dir: str, ): """Render permanent template text into a transparent PNG for reliable emoji support.""" if text_mode == "none": return None primary_text = _normalize_text_block(primary, uppercase=False) if not primary_text: return None os.makedirs(temp_dir, exist_ok=True) box = _normalize_text_box(text_box) width = max(240, int(box["w"] * TARGET_WIDTH)) height = max(96, int(box["h"] * TARGET_HEIGHT)) left = int(box["x"] * TARGET_WIDTH) top = int(box["y"] * TARGET_HEIGHT) if text_mode == "center_title": primary_size = max(28, int(74 * text_scale)) secondary_size = max(18, int(36 * text_scale)) elif text_mode == "premium_subtitle": primary_size = max(24, int(48 * text_scale)) secondary_size = max(20, int(40 * text_scale)) else: primary_size = max(22, int(34 * _clamp_float(text_scale * top_text_scale, 0.7, 2.4))) secondary_size = 0 accent = _normalize_hex_color(highlight_color, "#FF7B47") accent_text = _normalize_hex_color(accent_color, "#18D7FF") highlight_tokens = _build_highlight_token_set(highlight_text) image_path = os.path.join(temp_dir, "text_overlay.png") secondary_text = secondary.strip() try: _render_text_overlay_png( text_mode=text_mode, primary_text=primary_text, secondary_text=secondary_text, output_path=image_path, width=width, height=height, primary_font_size=primary_size, secondary_font_size=secondary_size, accent_color=accent_text, highlight_color=accent, highlight_tokens=highlight_tokens, bold=text_bold, ) except Exception as exc: logger.warning("Template text overlay render failed, falling back to no permanent text: %s", exc) return None return {"path": image_path, "x": left, "y": top} def _render_text_overlay_png( text_mode: str, primary_text: str, secondary_text: str, output_path: str, width: int, height: int, primary_font_size: int, secondary_font_size: int, accent_color: str, highlight_color: str, highlight_tokens: set[str], bold: bool, ): image = Image.new("RGBA", (width, height), (0, 0, 0, 0)) draw = ImageDraw.Draw(image) primary_font = _load_overlay_font(_find_bold_font_file() if bold else _find_regular_font_file(), primary_font_size) secondary_font = _load_overlay_font(_find_bold_font_file() if bold else _find_regular_font_file(), secondary_font_size or max(18, primary_font_size // 2)) emoji_font = _load_emoji_overlay_font(_find_overlay_emoji_font_file()) if text_mode == "top_commentary": _render_wrapped_line_block( image, draw, primary_text, 0, 0, width, height, primary_font, emoji_font, highlight_tokens=highlight_tokens, highlight_color=highlight_color, normal_color="white", line_gap=max(8, int(primary_font_size * 0.18)) ) else: top_padding = max(8, int(height * 0.12)) primary_height = max(24, int(height * (0.34 if text_mode == "center_title" else 0.28))) secondary_height = max(20, int(height * 0.2)) _render_wrapped_line_block( image, draw, _normalize_text_line(primary_text, uppercase=text_mode != "top_commentary"), 0, top_padding, width, primary_height, primary_font, emoji_font, highlight_tokens=highlight_tokens, highlight_color=highlight_color, normal_color=accent_color, line_gap=max(6, int(primary_font_size * 0.14)) ) if secondary_text: _render_wrapped_line_block( image, draw, _normalize_text_line(secondary_text, uppercase=text_mode == "premium_subtitle"), 0, top_padding + int(height * (0.44 if text_mode == "center_title" else 0.46)), width, secondary_height, secondary_font, emoji_font, highlight_tokens=set(), highlight_color=highlight_color, normal_color="white", line_gap=max(6, int(secondary_font_size * 0.14)) ) image.save(output_path) def _render_wrapped_line_block( image, draw, text: str, left: int, top: int, width: int, height: int, regular_font, emoji_font, highlight_tokens: set[str], highlight_color: str, normal_color: str, line_gap: int, ): max_text_width = max(120, width - 28) lines = _wrap_mixed_text(text, draw, regular_font, emoji_font, max_text_width) if not lines: lines = [text] measured_lines = [_measure_mixed_text(line, draw, regular_font, emoji_font, highlight_tokens) for line in lines] total_height = sum(item["height"] for item in measured_lines) + line_gap * max(0, len(measured_lines) - 1) start_y = top + max(0, (height - total_height) // 2) current_y = start_y for line, metrics in zip(lines, measured_lines): current_x = left + max(0, (width - metrics["width"]) // 2) for run in _split_mixed_runs(line, highlight_tokens): if run["emoji"] and emoji_font: emoji_image = _render_emoji_run_image(run["text"], emoji_font, regular_font.size) run_width, run_height = emoji_image.size run_y = current_y + max(0, (metrics["height"] - run_height) // 2) image.alpha_composite(emoji_image, (current_x, run_y)) else: run_font = regular_font bbox = draw.textbbox((0, 0), run["text"], font=run_font) run_width = max(0, bbox[2] - bbox[0]) run_height = max(0, bbox[3] - bbox[1]) run_y = current_y + max(0, (metrics["height"] - run_height) // 2) draw.text((current_x, run_y + 2), run["text"], font=run_font, fill=(0, 0, 0, 180)) draw.text( (current_x, run_y), run["text"], font=run_font, fill=highlight_color if run.get("highlight") else normal_color, stroke_width=max(1, regular_font.size // 16), stroke_fill=(0, 0, 0, 170), ) current_x += run_width current_y += metrics["height"] + line_gap def _wrap_mixed_text(text: str, draw, regular_font, emoji_font, max_width: int) -> list[str]: lines = [] for raw_line in text.split(r"\n"): words = raw_line.split() if not words: continue current = words[0] for word in words[1:]: candidate = f"{current} {word}" if _measure_mixed_text(candidate, draw, regular_font, emoji_font)["width"] <= max_width: current = candidate else: lines.append(current) current = word lines.append(current) return lines[:4] def _measure_mixed_text(text: str, draw, regular_font, emoji_font, highlight_tokens: set[str] | None = None) -> dict: width = 0 height = 0 for run in _split_mixed_runs(text, highlight_tokens or set()): if run["emoji"] and emoji_font: emoji_image = _render_emoji_run_image(run["text"], emoji_font, regular_font.size) width += emoji_image.size[0] height = max(height, emoji_image.size[1]) else: bbox = draw.textbbox((0, 0), run["text"], font=regular_font) width += max(0, bbox[2] - bbox[0]) height = max(height, max(0, bbox[3] - bbox[1])) return {"width": width, "height": height or regular_font.size} def _split_mixed_runs(text: str, highlight_tokens: set[str]) -> list[dict]: runs = [] parts = re.split(r"(\s+)", text) for part in parts: if not part: continue if part.isspace(): runs.append({"text": part, "emoji": False, "highlight": False}) continue normalized = re.sub(r"[^\w]+", "", part, flags=re.UNICODE).lower() token_highlight = bool(normalized and normalized in highlight_tokens) current = [] current_is_emoji = None for char in part: char_is_emoji = _is_emoji_like_char(char) if current_is_emoji is None or char_is_emoji == current_is_emoji: current.append(char) current_is_emoji = char_is_emoji else: runs.append({"text": "".join(current), "emoji": current_is_emoji, "highlight": token_highlight and not current_is_emoji}) current = [char] current_is_emoji = char_is_emoji if current: runs.append({"text": "".join(current), "emoji": current_is_emoji, "highlight": token_highlight and not current_is_emoji}) return runs def _is_emoji_like_char(char: str) -> bool: if not char: return False codepoint = ord(char) if char in {"\u200d", "\ufe0f"}: return True return ( 0x1F300 <= codepoint <= 0x1FAFF or 0x2600 <= codepoint <= 0x27BF or unicodedata.category(char) == "So" ) def _load_overlay_font(font_path: str | None, font_size: int): if font_path and os.path.isfile(font_path): return ImageFont.truetype(font_path, font_size) return ImageFont.load_default() def _load_emoji_overlay_font(font_path: str | None): if font_path and os.path.isfile(font_path): for supported_size in (109,): try: return ImageFont.truetype(font_path, supported_size) except OSError: continue return None def _render_emoji_run_image(text: str, emoji_font, target_height: int) -> Image.Image: dummy = Image.new("RGBA", (1, 1), (0, 0, 0, 0)) probe = ImageDraw.Draw(dummy) bbox = probe.textbbox((0, 0), text, font=emoji_font, embedded_color=True) width = max(1, bbox[2] - bbox[0] + 8) height = max(1, bbox[3] - bbox[1] + 8) image = Image.new("RGBA", (width, height), (0, 0, 0, 0)) draw = ImageDraw.Draw(image) draw.text((4 - bbox[0], 4 - bbox[1]), text, font=emoji_font, embedded_color=True) cropped = image.getbbox() if cropped: image = image.crop(cropped) if image.height <= 0: return image scale = max(0.1, target_height / image.height) resized = image.resize( (max(1, int(round(image.width * scale))), max(1, int(round(image.height * scale)))), Image.LANCZOS, ) return resized def _find_regular_font_file() -> str | None: for font_path in [ r"C:\Windows\Fonts\segoeui.ttf", r"C:\Windows\Fonts\arial.ttf", "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", "/usr/share/fonts/truetype/noto/NotoSans-Regular.ttf", "/usr/share/fonts/truetype/liberation2/LiberationSans-Regular.ttf", ]: if os.path.isfile(font_path): return font_path return None def _find_bold_font_file() -> str | None: for font_path in [ r"C:\Windows\Fonts\arialbd.ttf", r"C:\Windows\Fonts\seguisb.ttf", "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", "/usr/share/fonts/truetype/liberation2/LiberationSans-Bold.ttf", ]: if os.path.isfile(font_path): return font_path return _find_regular_font_file() def _find_overlay_emoji_font_file() -> str | None: candidates = [ str(BUNDLED_EMOJI_FONT), r"C:\Windows\Fonts\seguiemj.ttf", "/usr/share/fonts/truetype/noto/NotoColorEmoji.ttf", "/usr/share/fonts/truetype/noto/NotoEmoji-Regular.ttf", ] for font_path in candidates: if os.path.isfile(font_path): return font_path return None def _build_highlight_token_set(text: str) -> set[str]: tokens = set() for token in re.split(r"[\s,]+", str(text or "")): normalized = re.sub(r"[^\w]+", "", token, flags=re.UNICODE).lower() if normalized: tokens.add(normalized) return tokens def _build_watermark_filter(channel_name: str, position: str) -> str: safe_name = _escape_drawtext_text(channel_name.strip().replace("@", "", 1 if channel_name.startswith("@") else 0)) safe_name = "@" + safe_name.lstrip("@") position = str(position or "lower_left_overlay").strip().lower() font_path = _find_bold_font_file() font_clause = f":fontfile='{_escape_filter_path(font_path)}'" if font_path else "" if position == "bottom_center": x_expr = "(w-text_w)/2" y_expr = "h-th-60" fontsize = 28 opacity = "white@0.6" elif position == "center_overlay": x_expr = "(w-text_w)/2" y_expr = "h*0.54" fontsize = 34 opacity = "white@0.92" else: x_expr = "w*0.12" y_expr = "h*0.73" fontsize = 32 opacity = "white@0.95" return ( f"drawtext=text='{safe_name}'" f"{font_clause}" f":fontsize={fontsize}" f":fontcolor={opacity}" f":x={x_expr}" f":y={y_expr}" f":borderw=2" f":bordercolor=black@0.55" f":shadowx=0" f":shadowy=2" f":shadowcolor=black@0.72" ) def _contains_extended_glyphs(text: str) -> bool: for char in str(text or ""): if ord(char) > 0x7F: return True if unicodedata.category(char) in {"So", "Sk"}: return True return False def _build_drawtext_font_arg(prefer_extended_glyphs: bool) -> str: """Return a font clause that gives drawtext a better Unicode fallback path.""" font_file = _find_font_file(prefer_extended_glyphs) if font_file: return f"fontfile='{_escape_filter_path(font_file)}'" family = EXTENDED_DRAW_FONT_FAMILIES if prefer_extended_glyphs else DEFAULT_DRAW_FONT_FAMILIES return f"font='{_escape_drawtext_value(family)}'" def _find_font_file(prefer_extended_glyphs: bool) -> str | None: if prefer_extended_glyphs: return None common_fonts = [ r"C:\Windows\Fonts\segoeui.ttf", r"C:\Windows\Fonts\arial.ttf", "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", "/usr/share/fonts/truetype/noto/NotoSans-Regular.ttf", "/usr/share/fonts/truetype/liberation2/LiberationSans-Regular.ttf", ] for font_path in common_fonts: if os.path.isfile(font_path): return font_path return None def _escape_drawtext_text(text: str) -> str: escaped = text.replace("\\", r"\\") escaped = escaped.replace(":", r"\:") escaped = escaped.replace("'", r"\'") escaped = escaped.replace(",", r"\,") escaped = escaped.replace("%", r"\%") escaped = escaped.replace("[", r"\[") escaped = escaped.replace("]", r"\]") return escaped def _escape_drawtext_value(text: str) -> str: escaped = str(text or "").replace("\\", r"\\") escaped = escaped.replace(":", r"\:") escaped = escaped.replace("'", r"\'") escaped = escaped.replace(",", r"\,") return escaped def _escape_filter_path(path: str) -> str: """Escape a filesystem path for use in FFmpeg filter arguments.""" escaped = path.replace("\\", "/") escaped = escaped.replace(":", r"\:") escaped = escaped.replace("'", r"\'") escaped = escaped.replace("[", r"\[") escaped = escaped.replace("]", r"\]") escaped = escaped.replace(",", r"\,") return escaped def _normalize_hex_color(value: str, fallback: str) -> str: text = str(value or "").strip() if re.fullmatch(r"#[0-9a-fA-F]{6}", text): return text return fallback def _normalize_text_box(value) -> dict: fallback = {"x": 0.14, "y": 0.38, "w": 0.72, "h": 0.2} if not isinstance(value, dict): return fallback x = _clamp_float(value.get("x", fallback["x"]), 0.0, 0.88) y = _clamp_float(value.get("y", fallback["y"]), 0.0, 0.94) w = _clamp_float(value.get("w", fallback["w"]), 0.12, 1.0 - x) h = _clamp_float(value.get("h", fallback["h"]), 0.08, 1.0 - y) return {"x": x, "y": y, "w": w, "h": h} def _clamp_float(value, low: float, high: float) -> float: try: numeric = float(value) except (TypeError, ValueError): numeric = low return max(low, min(high, numeric))