Spaces:
Running
Running
| """ | |
| Caption Greenscreen Service β FastAPI + Async Jobs | |
| V6: 8 styles + dynamic colors + 6 animations (none, pop, bounce, slam, underline, typewriter, slide_in) + sliding_toggle + sliding_toggle_light | |
| """ | |
| import os, uuid, time, math, shutil, tempfile, subprocess | |
| from typing import Dict, List, Optional, Tuple | |
| from fastapi import FastAPI, BackgroundTasks, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| from PIL import Image, ImageDraw, ImageFont, ImageFilter | |
| import cloudinary, cloudinary.uploader | |
| # ββ CONFIG ββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _fetch_cloud_name(): | |
| import urllib.request as _ur, json as _j, ssl as _ssl | |
| ctx = _ssl.create_default_context() | |
| req = _ur.Request("https://media.toolxp.org/config", headers={"User-Agent": "Mozilla/5.0"}) | |
| for _i in range(3): | |
| try: | |
| with _ur.urlopen(req, timeout=10, context=ctx) as r: | |
| name = _j.loads(r.read().decode())["cloud_name"] | |
| if name: | |
| print(f"[config] cloud_name={name}") | |
| return name | |
| except Exception as _e: | |
| print(f"[config] attempt {_i+1} failed: {_e}") | |
| raise RuntimeError("[config] FATAL: could not fetch cloud_name after 3 attempts") | |
| CLOUD_NAME = _fetch_cloud_name() | |
| UPLOAD_PRESET = os.environ.get("CLOUDINARY_UPLOAD_PRESET", "testing") | |
| MEDIA_PROXY = "https://media.toolxp.org" | |
| def proxy_url(url: str) -> str: | |
| return url.replace(f"https://res.cloudinary.com/{CLOUD_NAME}", MEDIA_PROXY) | |
| WIDTH, HEIGHT, FPS = 1280, 200, 12 | |
| TRANSPARENT = (0, 0, 0, 0) | |
| JOBS: Dict[str, dict] = {} | |
| _BLANK = None | |
| def blank_bytes(): | |
| global _BLANK | |
| if _BLANK is None: | |
| _BLANK = Image.new('RGBA', (WIDTH, HEIGHT), TRANSPARENT).tobytes() | |
| return _BLANK | |
| # ββ APP ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| app = FastAPI(title="Caption Greenscreen V5") | |
| app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, | |
| allow_methods=["*"], allow_headers=["*"]) | |
| # ββ MODELS βββββββββββββββββββββββββββββββββββββββββββββ | |
| class TranscriptWord(BaseModel): | |
| text: str; start: float; end: float | |
| class CaptionColors(BaseModel): | |
| active_fill: str = "#FFD700" | |
| active_stroke: str = "#000000" | |
| active_stroke_width: int = 7 | |
| inactive_fill: str = "#FFFFFF" | |
| inactive_stroke: str = "#000000" | |
| inactive_stroke_width: int = 5 | |
| active_bg: Optional[str] = None | |
| inactive_bg: Optional[str] = None | |
| class CaptionRequest(BaseModel): | |
| transcript: List[TranscriptWord] | |
| style: Optional[str] = "hormozi" | |
| duration: Optional[float] = None | |
| colors: Optional[CaptionColors] = None | |
| animation: Optional[str] = "pop" # none, pop, bounce, slam, slam_shake, underline, minimalist_sweep, typewriter, slide_in, karaoke_wipe | |
| # ββ HELPERS ββββββββββββββββββββββββββββββββββββββββββββ | |
| def hex_rgb(h: str) -> Tuple[int,int,int]: | |
| h = h.lstrip('#') | |
| return tuple(int(h[i:i+2], 16) for i in (0,2,4)) | |
| # ββ FONTS ββββββββββββββββββββββββββββββββββββββββββββββ | |
| _FC = {} # Latin font cache | |
| _FC_DEVA = {} # Devanagari font cache | |
| def _is_devanagari(text: str) -> bool: | |
| """Return True if text contains any Devanagari Unicode character (U+0900βU+097F).""" | |
| return any('\u0900' <= ch <= '\u097f' for ch in text) | |
| def get_font(size=72): | |
| """Return a Latin/universal bold font at the given size.""" | |
| if size in _FC: return _FC[size] | |
| for p in ["/app/fonts/Inter-Black.ttf", "/app/fonts/Inter-Bold.ttf", | |
| "/app/fonts/DejaVuSans-Bold.ttf", | |
| "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", | |
| "/usr/share/fonts/truetype/liberation/LiberationSans-Bold.ttf", | |
| "/usr/share/fonts/truetype/freefont/FreeSansBold.ttf"]: | |
| if os.path.exists(p): | |
| try: | |
| f = ImageFont.truetype(p, size); _FC[size] = f; return f | |
| except: continue | |
| try: | |
| r = subprocess.run(["fc-match","--format=%{file}","sans:bold"], capture_output=True, text=True) | |
| if r.returncode == 0 and r.stdout.strip(): | |
| f = ImageFont.truetype(r.stdout.strip(), size); _FC[size] = f; return f | |
| except: pass | |
| f = ImageFont.load_default(); _FC[size] = f; return f | |
| def get_devanagari_font(size=72): | |
| """Return a Devanagari-capable font (Noto Sans Devanagari) at the given size.""" | |
| if size in _FC_DEVA: return _FC_DEVA[size] | |
| for p in ["/app/fonts/NotoSansDevanagari-Bold.ttf", | |
| "/app/fonts/NotoSansDevanagari-Regular.ttf", | |
| "/app/fonts/NotoSans-Bold.ttf", | |
| "/usr/share/fonts/truetype/noto/NotoSansDevanagari-Bold.ttf", | |
| "/usr/share/fonts/truetype/noto/NotoSansDevanagari-Regular.ttf", | |
| "/usr/share/fonts/noto/NotoSansDevanagari-Bold.ttf", | |
| "/usr/share/fonts/noto/NotoSansDevanagari-Regular.ttf"]: | |
| if os.path.exists(p): | |
| try: | |
| f = ImageFont.truetype(p, size); _FC_DEVA[size] = f; return f | |
| except: continue | |
| # Fallback: try fc-match for devanagari script | |
| try: | |
| r = subprocess.run(["fc-match","--format=%{file}",":script=deva:bold"], capture_output=True, text=True) | |
| if r.returncode == 0 and r.stdout.strip(): | |
| f = ImageFont.truetype(r.stdout.strip(), size); _FC_DEVA[size] = f; return f | |
| except: pass | |
| # Last resort: fall back to the Latin font (still better than load_default) | |
| return get_font(size) | |
| def get_font_for_text(text: str, size=72): | |
| """Return the correct font for the given text (Devanagari or Latin).""" | |
| if _is_devanagari(text): | |
| return get_devanagari_font(size) | |
| return get_font(size) | |
| FONT_N = 72 # normal | |
| FONT_A = 86 # active (pop) | |
| PAD = 45 # word spacing | |
| BOX_PX, BOX_PY, BOX_R = 16, 8, 12 # box padding & radius | |
| # Anim timing | |
| WORD_ANIM_DUR = 0.4 # 400ms β matches CSS transition: left/width 400ms cubic-bezier(1,0,0.4,1) | |
| LINE_ANIM_DUR = 0.25 # seconds for slide-in | |
| # ββ DEFAULT STYLE COLORS ββββββββββββββββββββββββββββββ | |
| DEFAULT_COLORS = { | |
| "hormozi": CaptionColors(active_fill="#FFD700",active_stroke="#000000",active_stroke_width=7,inactive_fill="#FFFFFF",inactive_stroke="#000000",inactive_stroke_width=5), | |
| "netflix": CaptionColors(active_fill="#E50914",active_stroke="#000000",active_stroke_width=7,inactive_fill="#FFFFFF",inactive_stroke="#000000",inactive_stroke_width=5), | |
| "karaoke": CaptionColors(active_fill="#00FF00",active_stroke="#000000",active_stroke_width=0,inactive_fill="#FFFFFF",inactive_stroke="#000000",inactive_stroke_width=0,active_bg="#1A1A1A",inactive_bg="#333333"), | |
| "mrbeast": CaptionColors(active_fill="#000000",active_stroke="#000000",active_stroke_width=0,inactive_fill="#FFFFFF",inactive_stroke="#000000",inactive_stroke_width=5,active_bg="#FFE100"), | |
| "minimal": CaptionColors(active_fill="#FFFFFF",active_stroke="#000000",active_stroke_width=0,inactive_fill="#999999",inactive_stroke="#000000",inactive_stroke_width=0), | |
| "gradient_pop": CaptionColors(active_fill="#FF00FF",active_stroke="#000000",active_stroke_width=7,inactive_fill="#FFFFFF",inactive_stroke="#000000",inactive_stroke_width=5), | |
| "boxing": CaptionColors(active_fill="#FFFFFF",active_stroke="#000000",active_stroke_width=0,inactive_fill="#FFFFFF",inactive_stroke="#000000",inactive_stroke_width=0,active_bg="#7C3AED",inactive_bg="#333333"), | |
| # Sliding toggle dark: dark pill slides behind active word | |
| "sliding_toggle": CaptionColors(active_fill="#FFFFFF",active_stroke="#000000",active_stroke_width=0,inactive_fill="#CCCCCC",inactive_stroke="#000000",inactive_stroke_width=0,active_bg="#3A3A3C",inactive_bg=None), | |
| # Sliding toggle light: frosted white container, white pill, dark text | |
| "sliding_toggle_light": CaptionColors(active_fill="#141428",active_stroke="#000000",active_stroke_width=0,inactive_fill="#505070",inactive_stroke="#000000",inactive_stroke_width=0,active_bg="#FFFFFF",inactive_bg=None), | |
| } | |
| # ββ CORE RENDERER βββββββββββββββββββββββββββββββββββββ | |
| def render_frame(words_in_line, active_word_idx, style, colors: CaptionColors, | |
| animation="pop", | |
| word_anim_t=1.0, # 0β1 progress of bounce/slam (1=settled) | |
| line_anim_t=1.0, # 0β1 progress of slide_in (1=settled) | |
| word_time_pct=0.0): # 0β1 how far through the active word's time | |
| """ | |
| Render one caption frame with style, colors, and animation state. | |
| """ | |
| img = Image.new('RGBA', (WIDTH, HEIGHT), TRANSPARENT) | |
| if not words_in_line: | |
| return img.tobytes() | |
| draw = ImageDraw.Draw(img) | |
| # ββ SLIDING TOGGLE: special self-contained renderer ββ | |
| if style == 'sliding_toggle': | |
| return _render_sliding_toggle(img, draw, words_in_line, active_word_idx, colors, slide_t=word_anim_t) | |
| if style == 'sliding_toggle_light': | |
| return _render_sliding_toggle_light(img, draw, words_in_line, active_word_idx, colors, slide_t=word_anim_t) | |
| # Style behavior flags | |
| use_pop = style in ('hormozi', 'netflix', 'gradient_pop', 'mrbeast') | |
| draw_all_boxes = style in ('karaoke', 'boxing') | |
| draw_active_box = style == 'mrbeast' | |
| # Minimal sweep should not have stroke to keep it clean | |
| use_stroke = style not in ('minimal',) and animation != 'minimalist_sweep' | |
| # Colors β RGB | |
| a_fill = hex_rgb(colors.active_fill) | |
| i_fill = hex_rgb(colors.inactive_fill) | |
| a_stroke = hex_rgb(colors.active_stroke) | |
| i_stroke = hex_rgb(colors.inactive_stroke) | |
| a_bg = hex_rgb(colors.active_bg) if colors.active_bg else None | |
| i_bg = hex_rgb(colors.inactive_bg) if colors.inactive_bg else None | |
| # ββ SLAM & SLAM_SHAKE animation: active word font size varies with progress | |
| if animation in ('slam', 'slam_shake') and active_word_idx >= 0 and word_anim_t < 1.0: | |
| # Start at 1.5x, ease down to 1.0x | |
| ease = 1.0 - (1.0 - word_anim_t) ** 2 # ease-out | |
| slam_scale = 1.5 - 0.5 * ease | |
| slam_font_size = int(FONT_A * slam_scale) | |
| font_slam = get_font(slam_font_size) | |
| else: | |
| font_slam = None | |
| # ββ Measure all words ββ | |
| # Note: font selection is per-word to support mixed scripts (e.g., Hindi + Latin) | |
| word_data = [] | |
| for idx, w in enumerate(words_in_line): | |
| is_active = (idx == active_word_idx) | |
| raw_text = w['text'] | |
| # Only uppercase Latin; Devanagari has no case concept | |
| text = raw_text.upper() if not _is_devanagari(raw_text) else raw_text | |
| # Typewriter: show partial text for active word | |
| if animation == 'typewriter' and is_active and word_time_pct < 1.0: | |
| chars = max(1, int(math.ceil(len(text) * word_time_pct))) | |
| text = text[:chars] | |
| # Choose font size, then get the right font for this word's script | |
| if is_active and font_slam: | |
| font = font_slam | |
| elif is_active and use_pop: | |
| font = get_font_for_text(text, FONT_A) | |
| else: | |
| font = get_font_for_text(text, FONT_N) | |
| bbox = draw.textbbox((0, 0), text, font=font) | |
| word_data.append({ | |
| 'text': text, 'font': font, | |
| 'width': bbox[2] - bbox[0], | |
| 'height': bbox[3] - bbox[1], | |
| 'is_active': is_active, | |
| 'full_width': None, # for underline (full word width) | |
| }) | |
| # For underline and minimalist_sweep, also measure full word width | |
| if animation in ('underline', 'minimalist_sweep') and is_active: | |
| full_raw = w['text'] | |
| full_text = full_raw.upper() if not _is_devanagari(full_raw) else full_raw | |
| fbbox = draw.textbbox((0, 0), full_text, font=font) | |
| word_data[-1]['full_width'] = fbbox[2] - fbbox[0] | |
| has_boxes = draw_all_boxes or draw_active_box | |
| total_w = sum(d['width'] for d in word_data) + PAD * (len(word_data) - 1) | |
| if has_boxes: | |
| total_w += BOX_PX * 2 * len(word_data) | |
| base_x = (WIDTH - total_w) // 2 | |
| # ββ SLIDE_IN animation: offset entire line horizontally | |
| if animation == 'slide_in' and line_anim_t < 1.0: | |
| ease = 1.0 - (1.0 - line_anim_t) ** 3 # ease-out cubic | |
| x_offset = int(-400 * (1.0 - ease)) | |
| else: | |
| x_offset = 0 | |
| cur_x = base_x + x_offset | |
| # ββ Draw each word ββ | |
| for wd in word_data: | |
| text = wd['text'] | |
| font = wd['font'] | |
| is_active = wd['is_active'] | |
| text_y = (HEIGHT - wd['height']) // 2 | |
| # ββ BOUNCE animation: active word shifts up | |
| if animation == 'bounce' and is_active and word_anim_t < 1.0: | |
| bounce_y = int(-22 * math.sin(word_anim_t * math.pi)) | |
| text_y += bounce_y | |
| # ββ SLAM_SHAKE animation: active word shakes after slamming | |
| if animation == 'slam_shake' and is_active and word_anim_t < 1.0: | |
| # Shake intensely between 0.3 and 0.8 of the animation curve | |
| if 0.2 < word_anim_t < 0.9: | |
| shake_intensity = (1.0 - word_anim_t) * 15 # Damps out over time | |
| shake_x = int(math.sin(word_anim_t * 50) * shake_intensity) | |
| shake_y = int(math.cos(word_anim_t * 55) * shake_intensity) | |
| cur_x += shake_x | |
| text_y += shake_y | |
| # Draw bg box | |
| if draw_all_boxes: | |
| bg_c = a_bg if is_active else i_bg | |
| if bg_c: | |
| bx1, by1 = cur_x - BOX_PX, text_y - BOX_PY | |
| bx2, by2 = cur_x + wd['width'] + BOX_PX, text_y + wd['height'] + BOX_PY | |
| # ββ BOX GROW animation for box styles: active bg grows from center | |
| if animation in ('slam', 'slam_shake') and is_active and word_anim_t < 1.0: | |
| ease = 1.0 - (1.0 - word_anim_t) ** 2 | |
| cx = (bx1 + bx2) // 2 | |
| cy = (by1 + by2) // 2 | |
| hw = int((bx2 - bx1) * 0.5 * ease) | |
| hh = int((by2 - by1) * 0.5 * ease) | |
| bx1, by1, bx2, by2 = cx - hw, cy - hh, cx + hw, cy + hh | |
| draw.rounded_rectangle([bx1, by1, bx2, by2], radius=BOX_R, fill=bg_c) | |
| elif draw_active_box and is_active and a_bg: | |
| bx1, by1 = cur_x - BOX_PX, text_y - BOX_PY | |
| bx2, by2 = cur_x + wd['width'] + BOX_PX, text_y + wd['height'] + BOX_PY | |
| draw.rounded_rectangle([bx1, by1, bx2, by2], radius=BOX_R, fill=a_bg) | |
| # Draw text | |
| fill_c = a_fill if is_active else i_fill | |
| # ββ MINIMALIST_SWEEP animation: dim inactive words | |
| if animation == 'minimalist_sweep' and not is_active: | |
| # Dim the inactive words to 40% opacity | |
| r, g, b = i_fill | |
| fill_c = (r, g, b, 102) # 40% of 255 β 102 | |
| # Set stroke to fully transparent if any | |
| i_stroke_color = (0, 0, 0, 0) | |
| else: | |
| i_stroke_color = i_stroke | |
| if use_stroke: | |
| sc = a_stroke if is_active else i_stroke_color | |
| sw = colors.active_stroke_width if is_active else colors.inactive_stroke_width | |
| if animation == 'karaoke_wipe' and is_active: | |
| # KARAOKE_WIPE: Draw the inactive color base first | |
| draw.text((cur_x, text_y), text, font=font, fill=i_fill, stroke_width=sw, stroke_fill=sc) | |
| # Create a temporary image for the sweeping active color | |
| active_img = Image.new('RGBA', (WIDTH, HEIGHT), (0, 0, 0, 0)) | |
| active_draw = ImageDraw.Draw(active_img) | |
| active_draw.text((cur_x, text_y), text, font=font, fill=a_fill, stroke_width=sw, stroke_fill=sc) | |
| # Calculate sweep mask | |
| full_w = wd.get('full_width') or wd['width'] | |
| sweep_w = int(full_w * word_time_pct) | |
| if sweep_w > 0: | |
| # Create a mask for the sweep progress | |
| mask = Image.new('L', (WIDTH, HEIGHT), 0) | |
| mask_draw = ImageDraw.Draw(mask) | |
| # Mask exactly over the active part of the current word | |
| mask_draw.rectangle([cur_x - sw, text_y - sw, cur_x + sweep_w, text_y + wd['height'] + sw], fill=255) | |
| # Composite the swept portion over the main image | |
| img = Image.composite(active_img, img, mask) | |
| draw = ImageDraw.Draw(img) | |
| else: | |
| draw.text((cur_x, text_y), text, font=font, fill=fill_c, stroke_width=sw, stroke_fill=sc) | |
| else: | |
| if animation == 'karaoke_wipe' and is_active: | |
| # KARAOKE_WIPE without stroke | |
| draw.text((cur_x, text_y), text, font=font, fill=i_fill) | |
| # Active sweep layer | |
| active_img = Image.new('RGBA', (WIDTH, HEIGHT), (0, 0, 0, 0)) | |
| active_draw = ImageDraw.Draw(active_img) | |
| active_draw.text((cur_x, text_y), text, font=font, fill=a_fill) | |
| full_w = wd.get('full_width') or wd['width'] | |
| sweep_w = int(full_w * word_time_pct) | |
| if sweep_w > 0: | |
| mask = Image.new('L', (WIDTH, HEIGHT), 0) | |
| mask_draw = ImageDraw.Draw(mask) | |
| mask_draw.rectangle([cur_x, text_y - 10, cur_x + sweep_w, text_y + wd['height'] + 10], fill=255) | |
| img = Image.composite(active_img, img, mask) | |
| draw = ImageDraw.Draw(img) | |
| else: | |
| draw.text((cur_x, text_y), text, font=font, fill=fill_c) | |
| # ββ UNDERLINE & MINIMALIST_SWEEP: draw a line under active word | |
| if animation in ('underline', 'minimalist_sweep') and is_active: | |
| full_w = wd.get('full_width') or wd['width'] | |
| line_w = int(full_w * word_time_pct) | |
| if line_w > 0: | |
| ul_y = text_y + wd['height'] + 4 | |
| if animation == 'minimalist_sweep': | |
| # Glowing underline logic: draw multiple semi-transparent rectangles | |
| draw.rectangle([cur_x, ul_y, cur_x + line_w, ul_y + 4], fill=a_fill) | |
| # Soft glow passes underneath | |
| r, g, b = a_fill | |
| draw.rectangle([cur_x, ul_y - 2, cur_x + line_w, ul_y + 6], fill=(r, g, b, 100)) | |
| draw.rectangle([cur_x, ul_y - 4, cur_x + line_w, ul_y + 8], fill=(r, g, b, 40)) | |
| else: | |
| # Standard underline | |
| draw.rectangle([cur_x, ul_y, cur_x + line_w, ul_y + 5], fill=a_fill) | |
| cur_x += wd['width'] + PAD | |
| if has_boxes: | |
| cur_x += BOX_PX * 2 | |
| return img.tobytes() | |
| # ββ SLIDING TOGGLE β PREMIUM DARK GLASS βββββββββββββββββββββββββββββββββββββββ | |
| # 11-layer composited render β maximum quality within Pillow: | |
| # L1+L2. Dual drop shadow (ambient wide + tight contact) | |
| # L3. Container flat dark fill | |
| # L4. Container top-gradient overlay (masked) | |
| # L5. Container border + edge highlights | |
| # L6. Pill tight shadow | |
| # L7. Pill gradient fill topβbottom (lighter slate β deep navy, masked) | |
| # L8. Pill specular gloss ellipse (blurred) | |
| # L9. Pill border + edge highlights | |
| # L10. Active word text glow (blurred) | |
| # L11. All words text sharp | |
| ST_FONT_SIZE = 72 | |
| ST_WORD_GAP = 16 # CSS gap:8px Γ 2.25 scale β 18 β rounded to 16 | |
| ST_WORD_PAD_X = 28 | |
| ST_CONT_PAD_X = 44 | |
| ST_CONT_PAD_Y = 32 | |
| ST_CONT_R = 200 | |
| ST_INNER_R = 100 | |
| ST_TRACKING = -1 # CSS letter-spacing:-0.01em at 72px β -0.7px β -1px (tight, not spread) | |
| _ST_ACTIVE_TXT = (252, 253, 255, 255) # near-white, very slight cool shimmer | |
| _ST_INACT_TXT = (190, 190, 192, 255) # neutral mid-grey β no color tint (matches CSS #bbbbbc) | |
| _ST_TOP_HILIGHT = (255, 255, 255, 110) | |
| _ST_SIDE_HILIGHT = (255, 255, 255, 45) | |
| _ST_BOT_SHADOW = (0, 0, 0, 60) | |
| def _gradient_layer(x1, y1, x2, y2, radius, rgba_top, rgba_bottom, bands=80): | |
| """RGBA layer with a vertical gradient clipped to a rounded rectangle.""" | |
| mask = Image.new('L', (WIDTH, HEIGHT), 0) | |
| ImageDraw.Draw(mask).rounded_rectangle([x1, y1, x2, y2], radius=radius, fill=255) | |
| layer = Image.new('RGBA', (WIDTH, HEIGHT), (0, 0, 0, 0)) | |
| ld = ImageDraw.Draw(layer) | |
| H = max(y2 - y1, 1) | |
| for i in range(bands): | |
| ty = y1 + int(i / bands * H) | |
| ty2 = y1 + int((i + 1) / bands * H) + 1 | |
| t = i / max(bands - 1, 1) | |
| col = tuple(int(rgba_top[c] + (rgba_bottom[c] - rgba_top[c]) * t) for c in range(4)) | |
| ld.rectangle([x1, ty, x2, ty2], fill=col) | |
| blank = Image.new('RGBA', (WIDTH, HEIGHT), (0, 0, 0, 0)) | |
| return Image.composite(layer, blank, mask) | |
| def _draw_glass_highlights(draw, x1, y1, x2, y2, r, *, scale=1): | |
| """No-op: flat lines look artificial in Pillow. Border handled by rounded_rectangle outline.""" | |
| pass | |
| def _pill_bounds(positions, idx, cont_y1, pi): | |
| """ | |
| Return (px1, px2) for the pill at word index `idx` in `positions`. | |
| idx=-1 means "no previous word" β return the first word's position. | |
| """ | |
| safe = max(0, min(idx, len(positions) - 1)) | |
| px, _, wd = positions[safe] | |
| return px - ST_WORD_PAD_X, px + wd['w'] + ST_WORD_PAD_X | |
| def _ease_out_cubic(t): | |
| """easeOutBack β iOS spring constant (c=1.70158): slight overshoot then settle.""" | |
| t = max(0.0, min(1.0, t)) | |
| c = 1.70158 | |
| return 1.0 + (c + 1) * (t - 1) ** 3 + c * (t - 1) ** 2 | |
| def _measure_tracked(draw, text, font, tracking=ST_TRACKING): | |
| """Total pixel width of text with per-character tracking gap.""" | |
| w = 0 | |
| for i, ch in enumerate(text): | |
| b = draw.textbbox((0, 0), ch, font=font) | |
| w += b[2] - b[0] | |
| if i < len(text) - 1: | |
| w += tracking | |
| return w | |
| def _draw_tracked(draw_obj, x, y, text, font, fill, ink_top=0, tracking=ST_TRACKING): | |
| """Draw text char-by-char with tracking; y is corrected for ink_top offset.""" | |
| cx = x | |
| for ch in text: | |
| draw_obj.text((cx, y - ink_top), ch, font=font, fill=fill) | |
| b = draw_obj.textbbox((0, 0), ch, font=font) | |
| cx += (b[2] - b[0]) + tracking | |
| def _render_sliding_toggle(img, draw, words_in_line, active_word_idx, colors: CaptionColors, slide_t=1.0): | |
| """Premium 11-layer glass-pill caption renderer.""" | |
| # measure | |
| word_data, max_h = [], 0 | |
| for idx, w in enumerate(words_in_line): | |
| raw = w['text'] | |
| text = raw.upper() if not _is_devanagari(raw) else raw | |
| font = get_font_for_text(text, ST_FONT_SIZE) | |
| bbox = draw.textbbox((0, 0), text, font=font) | |
| tw = _measure_tracked(draw, text, font) # tracked width (priority #3) | |
| th = bbox[3] - bbox[1] | |
| max_h = max(max_h, th) | |
| word_data.append({'text': text, 'font': font, 'w': tw, 'h': th, | |
| 'ink_top': bbox[1], # Pillow top-offset; subtract at draw for true centering | |
| 'is_active': (idx == active_word_idx)}) | |
| if not word_data: | |
| return img.tobytes() | |
| # geometry | |
| inner_w = sum(d['w'] + ST_WORD_PAD_X * 2 for d in word_data) + ST_WORD_GAP * (len(word_data) - 1) | |
| cont_w = inner_w + ST_CONT_PAD_X * 2 | |
| cont_h = max_h + ST_CONT_PAD_Y * 2 | |
| cx1 = (WIDTH - cont_w) // 2 | |
| cy1 = (HEIGHT - cont_h) // 2 | |
| cx2 = cx1 + cont_w | |
| cy2 = cy1 + cont_h | |
| pi = 12 # pill inset from container edges | |
| # L1: wide ambient shadow β neutral dark | |
| amb = Image.new('RGBA', (WIDTH, HEIGHT), (0, 0, 0, 0)) | |
| ImageDraw.Draw(amb).rounded_rectangle( | |
| [cx1 - 12, cy1 + 18, cx2 + 12, cy2 + 18], radius=ST_CONT_R, fill=(0, 0, 0, 55)) | |
| img = Image.alpha_composite(img, amb.filter(ImageFilter.GaussianBlur(radius=30))) | |
| # L2: tight contact shadow β neutral dark | |
| ctc = Image.new('RGBA', (WIDTH, HEIGHT), (0, 0, 0, 0)) | |
| ImageDraw.Draw(ctc).rounded_rectangle( | |
| [cx1 - 2, cy1 + 6, cx2 + 2, cy2 + 6], radius=ST_CONT_R, fill=(0, 0, 0, 120)) | |
| img = Image.alpha_composite(img, ctc.filter(ImageFilter.GaussianBlur(radius=8))) | |
| # L3: container flat dark fill | |
| c_flat = Image.new('RGBA', (WIDTH, HEIGHT), (0, 0, 0, 0)) | |
| ImageDraw.Draw(c_flat).rounded_rectangle( | |
| [cx1, cy1, cx2, cy2], radius=ST_CONT_R, fill=(18, 18, 20, 30)) # neutral dark grey β no blue tint | |
| img = Image.alpha_composite(img, c_flat) | |
| # L4: container top-gradient overlay (subtle top lighting) | |
| img = Image.alpha_composite(img, _gradient_layer( | |
| cx1, cy1, cx2, cy2, ST_CONT_R, | |
| rgba_top=(255, 255, 255, 18), | |
| rgba_bottom=(0, 0, 0, 0), bands=50)) | |
| # L5: container border + edge highlights | |
| c_edge = Image.new('RGBA', (WIDTH, HEIGHT), (0, 0, 0, 0)) | |
| ced = ImageDraw.Draw(c_edge) | |
| ced.rounded_rectangle([cx1, cy1, cx2, cy2], radius=ST_CONT_R, | |
| outline=(255, 255, 255, 26), width=1) # CSS: inset 0 0 0 1px rgba(white,10%) | |
| _draw_glass_highlights(ced, cx1, cy1, cx2, cy2, ST_CONT_R, scale=1.0) | |
| img = Image.alpha_composite(img, c_edge) | |
| draw = ImageDraw.Draw(img) | |
| # word positions | |
| cur_x = cx1 + ST_CONT_PAD_X | |
| text_y = cy1 + ST_CONT_PAD_Y | |
| positions = [] | |
| for wd in word_data: | |
| positions.append((cur_x, text_y, wd)) | |
| cur_x += wd['w'] + ST_WORD_PAD_X * 2 + ST_WORD_GAP | |
| # per-pill layers β pill position is lerped for smooth sliding | |
| py1_p, py2_p = cy1 + pi, cy2 - pi | |
| # Compute pill bounds with smooth spring interpolation (priority #1) | |
| if active_word_idx >= 0 and len(positions) > 0: | |
| curr_px1, curr_px2 = _pill_bounds(positions, active_word_idx, cy1, pi) | |
| prev_px1, prev_px2 = _pill_bounds(positions, active_word_idx - 1, cy1, pi) | |
| e = _ease_out_cubic(min(1.0, slide_t)) # spring easeOutBack | |
| pill_x1 = int(prev_px1 + (curr_px1 - prev_px1) * e) | |
| pill_x2 = int(prev_px2 + (curr_px2 - prev_px2) * e) | |
| else: | |
| pill_x1, pill_x2 = cy1, cy1 # off-screen fallback | |
| # Squish/stretch β horizontal stretch + vertical compress simultaneously | |
| # Mirrors CSS scaleToggle2: scale(1.08, 1) at 50% β pill elongates horizontally | |
| # and flattens very slightly vertically (like a water drop in motion) | |
| if active_word_idx > 0 and slide_t < 1.0: | |
| s = math.sin(math.pi * min(1.0, slide_t)) # 0β1β0 arc | |
| squish_x = 1.0 + 0.12 * s | |
| squish_y = 1.0 - 0.04 * s # subtle vertical compress | |
| p_cx = (pill_x1 + pill_x2) / 2 | |
| p_hw = (pill_x2 - pill_x1) / 2 | |
| pill_x1 = int(p_cx - p_hw * squish_x) | |
| pill_x2 = int(p_cx + p_hw * squish_x) | |
| # Vertical: grow py1 down, shrink py2 up by squish_y factor | |
| p_cy = (py1_p + py2_p) / 2 | |
| p_hh = (py2_p - py1_p) / 2 | |
| py1_p = int(p_cy - p_hh * squish_y) | |
| py2_p = int(p_cy + p_hh * squish_y) | |
| for (px, py, wd) in positions: | |
| if not wd['is_active']: | |
| continue | |
| px1, px2 = pill_x1, pill_x2 | |
| py1, py2 = py1_p, py2_p | |
| # L6: pill tight shadow β neutral dark | |
| pshadow = Image.new('RGBA', (WIDTH, HEIGHT), (0, 0, 0, 0)) | |
| ImageDraw.Draw(pshadow).rounded_rectangle( | |
| [px1, py1 + 4, px2, py2 + 4], radius=ST_INNER_R, fill=(0, 0, 0, 130)) | |
| img = Image.alpha_composite(img, pshadow.filter(ImageFilter.GaussianBlur(radius=6))) | |
| # L7: pill gradient fill β translucent gold | |
| img = Image.alpha_composite(img, _gradient_layer( | |
| px1, py1, px2, py2, ST_INNER_R, | |
| rgba_top=(215, 175, 55, 190), # warm amber-gold top | |
| rgba_bottom=(160, 118, 18, 170), # deep burnished gold bottom | |
| bands=80)) | |
| # L8: specular gloss β top-LEFT (Apple light direction: priority #4) | |
| spec_cx = px1 + int((px2 - px1) * 0.30) # 30% from left = upper-left highlight | |
| spec_cy = py1 + int((py2 - py1) * 0.20) | |
| spec_rx = (px2 - px1) // 4 | |
| spec_ry = max(10, (py2 - py1) // 5) | |
| spec = Image.new('RGBA', (WIDTH, HEIGHT), (0, 0, 0, 0)) | |
| ImageDraw.Draw(spec).ellipse( | |
| [spec_cx - spec_rx, spec_cy - spec_ry, | |
| spec_cx + spec_rx, spec_cy + spec_ry], | |
| fill=(255, 255, 255, 70)) | |
| img = Image.alpha_composite(img, spec.filter(ImageFilter.GaussianBlur(radius=max(5, spec_ry // 2)))) | |
| # L9: pill border + edge highlights | |
| p_edge = Image.new('RGBA', (WIDTH, HEIGHT), (0, 0, 0, 0)) | |
| ped = ImageDraw.Draw(p_edge) | |
| ped.rounded_rectangle([px1, py1, px2, py2], radius=ST_INNER_R, | |
| outline=(255, 255, 255, 26), width=1) # CSS: inset 0 0 0 1px rgba(white,10%) | |
| _draw_glass_highlights(ped, px1, py1, px2, py2, ST_INNER_R, scale=0.6) | |
| img = Image.alpha_composite(img, p_edge) | |
| draw = ImageDraw.Draw(img) | |
| # L10: active word text glow (priority #5: fade with slide) | |
| t_active_fade = max(0.0, (min(1.0, slide_t) - 0.5) / 0.5) if slide_t < 1.0 else 1.0 | |
| glow = Image.new('RGBA', (WIDTH, HEIGHT), (0, 0, 0, 0)) | |
| gd = ImageDraw.Draw(glow) | |
| for (px, py, wd) in positions: | |
| if wd['is_active']: | |
| glow_a = int(120 * t_active_fade) | |
| _draw_tracked(gd, px, py, wd['text'], wd['font'], | |
| (255, 255, 255, glow_a), ink_top=wd['ink_top']) | |
| img = Image.alpha_composite(img, glow.filter(ImageFilter.GaussianBlur(radius=6))) | |
| draw = ImageDraw.Draw(img) | |
| # L11: all words sharp text (priority #3 tracked + #5 crossfade) | |
| for i, (px, py, wd) in enumerate(positions): | |
| if wd['is_active']: | |
| # Active: fade from inactive color (slide_t=0.5) to full white (slide_t=1.0) | |
| r_i, g_i, b_i, _ = _ST_INACT_TXT | |
| r_a, g_a, b_a, _ = _ST_ACTIVE_TXT | |
| ta = max(0.0, (min(1.0, slide_t) - 0.5) / 0.5) if slide_t < 1.0 else 1.0 | |
| fill = (int(r_i + (r_a - r_i) * ta), int(g_i + (g_a - g_i) * ta), | |
| int(b_i + (b_a - b_i) * ta), 255) | |
| elif i == active_word_idx - 1 and slide_t < 1.0: | |
| # Previous word: fade from active white to inactive as pill leaves | |
| r_a, g_a, b_a, _ = _ST_ACTIVE_TXT | |
| r_i, g_i, b_i, _ = _ST_INACT_TXT | |
| td = max(0.0, min(1.0, slide_t) / 0.4) # 0β1 over first 40% of slide | |
| fill = (int(r_a + (r_i - r_a) * td), int(g_a + (g_i - g_a) * td), | |
| int(b_a + (b_i - b_a) * td), 255) | |
| else: | |
| fill = _ST_INACT_TXT | |
| _draw_tracked(draw, px, py, wd['text'], wd['font'], fill, ink_top=wd['ink_top']) | |
| return img.tobytes() | |
| # ββ SLIDING TOGGLE LIGHT βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Same 11-layer approach as dark, but frosted white container + white pill + | |
| # dark navy text β premium macOS/iOS frosted glass look. | |
| _STL_ACTIVE_TXT = (15, 15, 40, 255) # deep navy on white | |
| _STL_INACT_TXT = (80, 85, 115, 220) # muted slate-blue on frosted container | |
| _STL_TOP_HILIGHT = (255, 255, 255, 200) # strong top reflex (white surface) | |
| _STL_SIDE_HILIGHT = (255, 255, 255, 100) | |
| _STL_BOT_SHADOW = (0, 0, 0, 25) # very soft bottom shadow on light | |
| def _draw_glass_highlights_l(draw, x1, y1, x2, y2, r, *, scale=1): | |
| """Light-mode edge highlights (same logic, different base colours).""" | |
| margin = min(r // 2, 40) | |
| alpha_top = min(255, int(_STL_TOP_HILIGHT[3] * scale)) | |
| alpha_side = min(255, int(_STL_SIDE_HILIGHT[3] * scale)) | |
| alpha_bot = min(255, int(_STL_BOT_SHADOW[3] * scale)) | |
| tx1, tx2 = x1 + margin, x2 - margin | |
| if tx2 > tx1: | |
| draw.line([(tx1, y1 + 2), (tx2, y1 + 2)], | |
| fill=(*_STL_TOP_HILIGHT[:3], alpha_top), width=2) | |
| lx = x1 + 2 | |
| ly1_h, ly2_h = y1 + margin, y2 - margin | |
| if ly2_h > ly1_h: | |
| draw.line([(lx, ly1_h), (lx, ly2_h)], | |
| fill=(*_STL_SIDE_HILIGHT[:3], alpha_side), width=2) | |
| bx1, bx2 = x1 + margin, x2 - margin | |
| if bx2 > bx1: | |
| draw.line([(bx1, y2 - 3), (bx2, y2 - 3)], | |
| fill=(*_STL_BOT_SHADOW[:3], alpha_bot), width=2) | |
| def _render_sliding_toggle_light(img, draw, words_in_line, active_word_idx, colors: CaptionColors, slide_t=1.0): | |
| """Light-mode sliding toggle: frosted white container, white gradient pill, dark text.""" | |
| # measure | |
| word_data, max_h = [], 0 | |
| for idx, w in enumerate(words_in_line): | |
| raw = w['text'] | |
| text = raw.upper() if not _is_devanagari(raw) else raw | |
| font = get_font_for_text(text, ST_FONT_SIZE) | |
| bbox = draw.textbbox((0, 0), text, font=font) | |
| tw = _measure_tracked(draw, text, font) # tracked width (priority #3) | |
| th = bbox[3] - bbox[1] | |
| max_h = max(max_h, th) | |
| word_data.append({'text': text, 'font': font, 'w': tw, 'h': th, | |
| 'ink_top': bbox[1], # Pillow top-offset; subtract at draw for true centering | |
| 'is_active': (idx == active_word_idx)}) | |
| if not word_data: | |
| return img.tobytes() | |
| # geometry | |
| inner_w = sum(d['w'] + ST_WORD_PAD_X * 2 for d in word_data) + ST_WORD_GAP * (len(word_data) - 1) | |
| cont_w = inner_w + ST_CONT_PAD_X * 2 | |
| cont_h = max_h + ST_CONT_PAD_Y * 2 | |
| cx1 = (WIDTH - cont_w) // 2 | |
| cy1 = (HEIGHT - cont_h) // 2 | |
| cx2 = cx1 + cont_w | |
| cy2 = cy1 + cont_h | |
| pi = 12 | |
| # L1: soft blue-grey ambient shadow (light-mode window shadow) | |
| amb = Image.new('RGBA', (WIDTH, HEIGHT), (0, 0, 0, 0)) | |
| ImageDraw.Draw(amb).rounded_rectangle( | |
| [cx1 - 10, cy1 + 16, cx2 + 10, cy2 + 16], radius=ST_CONT_R, fill=(80, 90, 140, 40)) | |
| img = Image.alpha_composite(img, amb.filter(ImageFilter.GaussianBlur(radius=28))) | |
| # L2: tight contact shadow | |
| ctc = Image.new('RGBA', (WIDTH, HEIGHT), (0, 0, 0, 0)) | |
| ImageDraw.Draw(ctc).rounded_rectangle( | |
| [cx1 - 2, cy1 + 5, cx2 + 2, cy2 + 5], radius=ST_CONT_R, fill=(60, 70, 110, 80)) | |
| img = Image.alpha_composite(img, ctc.filter(ImageFilter.GaussianBlur(radius=7))) | |
| # L3: container β frosted white fill | |
| c_flat = Image.new('RGBA', (WIDTH, HEIGHT), (0, 0, 0, 0)) | |
| ImageDraw.Draw(c_flat).rounded_rectangle( | |
| [cx1, cy1, cx2, cy2], radius=ST_CONT_R, fill=(245, 246, 252, 185)) | |
| img = Image.alpha_composite(img, c_flat) | |
| # L4: container top-gradient overlay (brighter at top) | |
| img = Image.alpha_composite(img, _gradient_layer( | |
| cx1, cy1, cx2, cy2, ST_CONT_R, | |
| rgba_top=(255, 255, 255, 60), | |
| rgba_bottom=(200, 202, 220, 0), bands=50)) | |
| # L5: container border (subtle dark outline on light surface) + highlights | |
| c_edge = Image.new('RGBA', (WIDTH, HEIGHT), (0, 0, 0, 0)) | |
| ced = ImageDraw.Draw(c_edge) | |
| ced.rounded_rectangle([cx1, cy1, cx2, cy2], radius=ST_CONT_R, | |
| outline=(120, 125, 160, 45), width=1) | |
| _draw_glass_highlights_l(ced, cx1, cy1, cx2, cy2, ST_CONT_R, scale=1.0) | |
| img = Image.alpha_composite(img, c_edge) | |
| draw = ImageDraw.Draw(img) | |
| # word positions | |
| cur_x = cx1 + ST_CONT_PAD_X | |
| text_y = cy1 + ST_CONT_PAD_Y | |
| positions = [] | |
| for wd in word_data: | |
| positions.append((cur_x, text_y, wd)) | |
| cur_x += wd['w'] + ST_WORD_PAD_X * 2 + ST_WORD_GAP | |
| # per-pill layers β pill position is lerped for smooth sliding | |
| py1_p, py2_p = cy1 + pi, cy2 - pi | |
| # Compute pill bounds with smooth spring interpolation (priority #1) | |
| if active_word_idx >= 0 and len(positions) > 0: | |
| curr_px1, curr_px2 = _pill_bounds(positions, active_word_idx, cy1, pi) | |
| prev_px1, prev_px2 = _pill_bounds(positions, active_word_idx - 1, cy1, pi) | |
| e = _ease_out_cubic(min(1.0, slide_t)) # spring easeOutBack | |
| pill_x1 = int(prev_px1 + (curr_px1 - prev_px1) * e) | |
| pill_x2 = int(prev_px2 + (curr_px2 - prev_px2) * e) | |
| else: | |
| pill_x1, pill_x2 = cy1, cy1 | |
| # Priority #2: squish/stretch β pill elongates 7% at mid-flight | |
| if active_word_idx > 0 and slide_t < 1.0: | |
| squish = 1.0 + 0.07 * math.sin(math.pi * min(1.0, slide_t)) | |
| p_cx = (pill_x1 + pill_x2) / 2 | |
| p_hw = (pill_x2 - pill_x1) / 2 | |
| pill_x1 = int(p_cx - p_hw * squish) | |
| pill_x2 = int(p_cx + p_hw * squish) | |
| for (px, py, wd) in positions: | |
| if not wd['is_active']: | |
| continue | |
| px1, px2 = pill_x1, pill_x2 | |
| py1, py2 = py1_p, py2_p | |
| # L6: pill shadow (soft blue-grey) | |
| pshadow = Image.new('RGBA', (WIDTH, HEIGHT), (0, 0, 0, 0)) | |
| ImageDraw.Draw(pshadow).rounded_rectangle( | |
| [px1, py1 + 3, px2, py2 + 3], radius=ST_INNER_R, fill=(60, 70, 110, 70)) | |
| img = Image.alpha_composite(img, pshadow.filter(ImageFilter.GaussianBlur(radius=5))) | |
| # L7: pill gradient β pure white top β soft blue-white bottom (neumorphic) | |
| img = Image.alpha_composite(img, _gradient_layer( | |
| px1, py1, px2, py2, ST_INNER_R, | |
| rgba_top=(255, 255, 255, 255), | |
| rgba_bottom=(218, 220, 240, 255), bands=80)) | |
| # L8: specular gloss β top-LEFT, Apple light direction (priority #4) | |
| spec_cx = px1 + int((px2 - px1) * 0.30) # 30% from left | |
| spec_cy = py1 + int((py2 - py1) * 0.20) | |
| spec_rx = (px2 - px1) // 4 | |
| spec_ry = max(8, (py2 - py1) // 6) | |
| spec = Image.new('RGBA', (WIDTH, HEIGHT), (0, 0, 0, 0)) | |
| ImageDraw.Draw(spec).ellipse( | |
| [spec_cx - spec_rx, spec_cy - spec_ry, | |
| spec_cx + spec_rx, spec_cy + spec_ry], | |
| fill=(255, 255, 255, 120)) | |
| img = Image.alpha_composite(img, spec.filter(ImageFilter.GaussianBlur(radius=max(4, spec_ry // 2)))) | |
| # L9: pill border (subtle grey) + highlights | |
| p_edge = Image.new('RGBA', (WIDTH, HEIGHT), (0, 0, 0, 0)) | |
| ped = ImageDraw.Draw(p_edge) | |
| ped.rounded_rectangle([px1, py1, px2, py2], radius=ST_INNER_R, | |
| outline=(160, 165, 200, 60), width=1) | |
| _draw_glass_highlights_l(ped, px1, py1, px2, py2, ST_INNER_R, scale=0.8) | |
| img = Image.alpha_composite(img, p_edge) | |
| draw = ImageDraw.Draw(img) | |
| # L10: active word dark text glow (priority #5: fade with slide) | |
| t_active_fade = max(0.0, (min(1.0, slide_t) - 0.5) / 0.5) if slide_t < 1.0 else 1.0 | |
| glow = Image.new('RGBA', (WIDTH, HEIGHT), (0, 0, 0, 0)) | |
| gd = ImageDraw.Draw(glow) | |
| for (px, py, wd) in positions: | |
| if wd['is_active']: | |
| glow_a = int(60 * t_active_fade) | |
| _draw_tracked(gd, px, py, wd['text'], wd['font'], | |
| (15, 15, 40, glow_a), ink_top=wd['ink_top']) | |
| img = Image.alpha_composite(img, glow.filter(ImageFilter.GaussianBlur(radius=4))) | |
| draw = ImageDraw.Draw(img) | |
| # L11: all words sharp text (priority #3 tracked + #5 crossfade) | |
| for i, (px, py, wd) in enumerate(positions): | |
| if wd['is_active']: | |
| r_i, g_i, b_i, _ = _STL_INACT_TXT | |
| r_a, g_a, b_a, _ = _STL_ACTIVE_TXT | |
| ta = max(0.0, (min(1.0, slide_t) - 0.5) / 0.5) if slide_t < 1.0 else 1.0 | |
| fill = (int(r_i + (r_a - r_i) * ta), int(g_i + (g_a - g_i) * ta), | |
| int(b_i + (b_a - b_i) * ta), 255) | |
| elif i == active_word_idx - 1 and slide_t < 1.0: | |
| r_a, g_a, b_a, _ = _STL_ACTIVE_TXT | |
| r_i, g_i, b_i, _ = _STL_INACT_TXT | |
| td = max(0.0, min(1.0, slide_t) / 0.4) | |
| fill = (int(r_a + (r_i - r_a) * td), int(g_a + (g_i - g_a) * td), | |
| int(b_a + (b_i - b_a) * td), 255) | |
| else: | |
| fill = _STL_INACT_TXT | |
| _draw_tracked(draw, px, py, wd['text'], wd['font'], fill, ink_top=wd['ink_top']) | |
| return img.tobytes() | |
| # ββ BACKGROUND WORKER βββββββββββββββββββββββββββββββββ | |
| def process_caption_job(job_id: str, req: CaptionRequest): | |
| style = req.style or "hormozi" | |
| colors = req.colors or DEFAULT_COLORS.get(style, DEFAULT_COLORS["hormozi"]) | |
| animation = req.animation or "pop" | |
| print(f"[{job_id}] style={style} anim={animation} words={len(req.transcript)}") | |
| JOBS[job_id]["status"] = "processing" | |
| work_dir = tempfile.mkdtemp(prefix=f"cap_{job_id[:8]}_") | |
| try: | |
| JOBS[job_id]["progress"] = "Preparing..." | |
| transcript = [{"text": w.text, "start": w.start, "end": w.end} for w in req.transcript] | |
| total_dur = req.duration if (req.duration and req.duration > 0) else max(w['end'] for w in transcript) + 0.5 | |
| total_frames = int(total_dur * FPS) | |
| # Group words (3 per line) | |
| lines = [] | |
| for i in range(0, len(transcript), 3): | |
| g = transcript[i:i+3] | |
| if g: | |
| lines.append({'start': g[0]['start'], 'end': g[-1]['end'], 'words': g}) | |
| # Decide rendering strategy | |
| use_cache = animation in ('none', 'pop') | |
| if use_cache: | |
| # ββ PRE-RENDER static frames (fast path) ββ | |
| JOBS[job_id]["progress"] = "Pre-rendering frames..." | |
| cache: Dict[tuple, bytes] = {(-1, -1): blank_bytes()} | |
| for li, line in enumerate(lines): | |
| cache[(li, -1)] = render_frame(line['words'], -1, style, colors, animation) | |
| for wi in range(len(line['words'])): | |
| cache[(li, wi)] = render_frame(line['words'], wi, style, colors, animation) | |
| print(f"[{job_id}] Cached {len(cache)} states") | |
| # ββ FFMPEG pipe ββ | |
| JOBS[job_id]["progress"] = f"Encoding {total_frames} frames..." | |
| out = os.path.join(work_dir, "output.webm") | |
| log_path = os.path.join(work_dir, "ff.log") | |
| log_fh = open(log_path, "w") | |
| ffproc = subprocess.Popen( | |
| ["ffmpeg", "-y", "-f", "rawvideo", "-pix_fmt", "rgba", | |
| "-s", f"{WIDTH}x{HEIGHT}", "-r", str(FPS), "-i", "pipe:0", | |
| "-c:v", "libvpx-vp9", "-b:v", "2M", "-pix_fmt", "yuva420p", | |
| "-auto-alt-ref", "0", "-deadline", "realtime", "-cpu-used", "8", out], | |
| stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, stderr=log_fh) | |
| t0 = time.time() | |
| last_li = 0 | |
| prev_line_idx = -1 # track line changes for slide_in | |
| for fi in range(total_frames): | |
| ct = fi / FPS | |
| # Find active line & word | |
| ali, awi = -1, -1 | |
| for i in range(last_li, len(lines)): | |
| if lines[i]['start'] <= ct <= lines[i]['end']: | |
| ali = i; last_li = i | |
| for wi, w in enumerate(lines[i]['words']): | |
| if w['start'] <= ct < w['end']: | |
| awi = wi; break | |
| break | |
| if lines[i]['start'] > ct: break | |
| if lines and ct < lines[0]['start']: | |
| last_li = 0 | |
| if use_cache: | |
| # Static β look up cached frame | |
| ffproc.stdin.write(cache.get((ali, awi), cache[(-1, -1)])) | |
| else: | |
| # Animated β render per frame | |
| word_anim_t = 1.0 | |
| line_anim_t = 1.0 | |
| word_time_pct = 0.0 | |
| if awi >= 0 and ali >= 0: | |
| ws = lines[ali]['words'][awi]['start'] | |
| we = lines[ali]['words'][awi]['end'] | |
| word_elapsed = ct - ws | |
| word_anim_t = min(1.0, word_elapsed / WORD_ANIM_DUR) | |
| word_time_pct = min(1.0, (ct - ws) / max(0.01, we - ws)) | |
| # Detect line change for slide_in | |
| if ali >= 0 and ali != prev_line_idx: | |
| line_start_time = lines[ali]['start'] | |
| else: | |
| line_start_time = None | |
| if ali >= 0 and line_start_time is not None: | |
| line_elapsed = ct - lines[ali]['start'] | |
| line_anim_t = min(1.0, line_elapsed / LINE_ANIM_DUR) | |
| prev_line_idx = ali | |
| if ali >= 0: | |
| frame_bytes = render_frame( | |
| lines[ali]['words'], awi, style, colors, animation, | |
| word_anim_t, line_anim_t, word_time_pct) | |
| else: | |
| frame_bytes = blank_bytes() | |
| ffproc.stdin.write(frame_bytes) | |
| # Progress | |
| if fi > 0 and fi % max(1, total_frames // 4) == 0: | |
| JOBS[job_id]["progress"] = f"Encoding ({int(fi/total_frames*100)}%, {time.time()-t0:.1f}s)..." | |
| ffproc.stdin.close() | |
| ffproc.wait() | |
| log_fh.close() | |
| if ffproc.returncode != 0: | |
| with open(log_path) as f: raise Exception(f"FFmpeg: {f.read()[-500:]}") | |
| if not os.path.exists(out) or os.path.getsize(out) < 500: | |
| raise Exception("FFmpeg empty output") | |
| print(f"[{job_id}] Encoded in {time.time()-t0:.1f}s β {os.path.getsize(out)} bytes") | |
| # Upload | |
| JOBS[job_id]["progress"] = "Uploading..." | |
| res = cloudinary.uploader.unsigned_upload(out, UPLOAD_PRESET, cloud_name=CLOUD_NAME, resource_type="video") | |
| JOBS[job_id].update(status="completed", progress="Done", | |
| result={"public_id": res.get("public_id",""), "secure_url": proxy_url(res.get("secure_url",""))}) | |
| print(f"[{job_id}] Done: {res.get('public_id')}") | |
| except Exception as e: | |
| import traceback | |
| print(f"[{job_id}] FAIL: {traceback.format_exc()}") | |
| JOBS[job_id].update(status="failed", error=str(e)) | |
| finally: | |
| shutil.rmtree(work_dir, ignore_errors=True) | |
| # ββ ENDPOINTS ββββββββββββββββββββββββββββββββββββββββββ | |
| def submit_job(req: CaptionRequest, bg: BackgroundTasks): | |
| jid = str(uuid.uuid4()) | |
| JOBS[jid] = {"status":"queued","progress":"Waiting...","result":None,"created_at":time.time()} | |
| bg.add_task(process_caption_job, jid, req) | |
| return {"job_id": jid, "status": "queued"} | |
| def get_job(job_id: str): | |
| j = JOBS.get(job_id) | |
| if not j: raise HTTPException(404, "Job not found") | |
| return j | |
| def home(): | |
| return {"service": "Caption Greenscreen V6", "status": "running", | |
| "styles": list(DEFAULT_COLORS.keys()), | |
| "animations": ["none","pop","bounce","slam","underline","typewriter","slide_in"]} | |
| def list_styles(): | |
| return {n: c.dict() for n, c in DEFAULT_COLORS.items()} | |
| def debug_fonts(): | |
| f = get_font(FONT_N) | |
| r = subprocess.run("fc-list : family | sort | head -20", shell=True, capture_output=True, text=True) | |
| return {"font": str(f), "canvas": f"{WIDTH}x{HEIGHT}", "fps": FPS, | |
| "fonts": r.stdout.strip().split("\n")} | |