Spaces:
Runtime error
Runtime error
| """High-Retention Editing pipeline — per-segment AI decisions. | |
| Each 3-5s segment gets its own zoom direction, subtitle position, | |
| subtitle mode, and caption color driven by Qwen2.5-VL analyzing one | |
| frame plus the local transcript for that segment. | |
| Pipeline per clip: | |
| 1. Segment clip at speech pauses (3-5s chunks) | |
| 2. Extract midpoint frame from each segment | |
| 3. Qwen2.5-VL analyzes each frame → zoom + subtitle decisions | |
| 4. ffmpeg filter_complex: per-segment zoompan + concat | |
| 5. ASS subtitles with per-segment alignment/color/mode override tags | |
| """ | |
| import json | |
| import subprocess | |
| import tempfile | |
| from pathlib import Path | |
| from loguru import logger | |
| # ─── Video metadata ──────────────────────────────────────────────────────────── | |
| def _probe_dimensions(video_path: Path) -> tuple[int, int]: | |
| probe = subprocess.run( | |
| ["ffprobe", "-v", "error", "-select_streams", "v:0", | |
| "-show_entries", "stream=width,height", "-of", "csv=p=0", | |
| str(video_path)], | |
| capture_output=True, text=True, | |
| ) | |
| try: | |
| w, h = map(int, probe.stdout.strip().split(",")) | |
| return w, h | |
| except Exception: | |
| return 1080, 1920 | |
| def _probe_duration(video_path: Path) -> float: | |
| probe = subprocess.run( | |
| ["ffprobe", "-v", "error", "-show_entries", "format=duration", | |
| "-of", "csv=p=0", str(video_path)], | |
| capture_output=True, text=True, | |
| ) | |
| try: | |
| return float(probe.stdout.strip()) | |
| except Exception: | |
| return 0.0 | |
| def _has_audio_stream(video_path: Path) -> bool: | |
| probe = subprocess.run( | |
| ["ffprobe", "-v", "error", "-select_streams", "a", | |
| "-show_entries", "stream=codec_type", "-of", "csv=p=0", | |
| str(video_path)], | |
| capture_output=True, text=True, | |
| ) | |
| return bool(probe.stdout.strip()) | |
| # ─── Segmentation ───────────────────────────────────────────────────────────── | |
| def _segment_clip( | |
| duration: float, | |
| transcript: dict, | |
| clip_start: float, | |
| max_seg: float = 4.5, | |
| ) -> list[dict]: | |
| """Divide clip into segments at speech pauses, max_seg seconds each.""" | |
| words: list[dict] = [] | |
| for seg in transcript.get("segments", []): | |
| words.extend(seg.get("words", [])) | |
| if clip_start > 0: | |
| words = [ | |
| {**w, "start": max(0.0, w["start"] - clip_start), | |
| "end": max(0.0, w["end"] - clip_start)} | |
| for w in words | |
| ] | |
| words = [w for w in words if w["end"] > 0 and w["start"] < duration] | |
| # Collect pause midpoints as candidate cut times | |
| cuts = [0.0] | |
| for i in range(len(words) - 1): | |
| gap = words[i + 1]["start"] - words[i]["end"] | |
| if gap > 0.2: | |
| cuts.append((words[i]["end"] + words[i + 1]["start"]) / 2.0) | |
| cuts.append(duration) | |
| cuts = sorted(set(cuts)) | |
| # Merge short intervals, split long ones | |
| segs: list[dict] = [] | |
| start = 0.0 | |
| for cut in cuts[1:]: | |
| seg_len = cut - start | |
| if seg_len < 1.5 and cut < duration: | |
| continue # too short — extend to next cut | |
| if seg_len > max_seg: | |
| t = start | |
| while t + max_seg < cut: | |
| segs.append({"start": t, "end": t + max_seg}) | |
| t += max_seg | |
| if cut - t > 0.5: | |
| segs.append({"start": t, "end": cut}) | |
| start = cut | |
| else: | |
| segs.append({"start": start, "end": cut}) | |
| start = cut | |
| # Fallback: split evenly if not enough segments | |
| if len(segs) < 2: | |
| n = max(2, round(duration / 4.0)) | |
| d = duration / n | |
| segs = [{"start": i * d, "end": min((i + 1) * d, duration)} for i in range(n)] | |
| return segs | |
| # ─── Frame extraction ───────────────────────────────────────────────────────── | |
| def _extract_frame(video_path: Path, t: float, out_path: Path) -> bool: | |
| cmd = [ | |
| "ffmpeg", "-y", "-ss", f"{t:.3f}", "-i", str(video_path), | |
| "-vframes", "1", "-q:v", "3", str(out_path), | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, timeout=30) | |
| return result.returncode == 0 and out_path.exists() | |
| def _extract_segment_frames(video_path: Path, seg: dict, seg_idx: int, tmp_dir: Path) -> list[Path]: | |
| """Extract a few representative frames so HRE decisions see motion, not one random still.""" | |
| start = float(seg["start"]) | |
| end = float(seg["end"]) | |
| duration = max(0.1, end - start) | |
| times = [ | |
| start + duration * 0.25, | |
| start + duration * 0.50, | |
| start + duration * 0.75, | |
| ] | |
| frames: list[Path] = [] | |
| for j, t in enumerate(times): | |
| frame_path = tmp_dir / f"seg_{seg_idx:03d}_{j}.jpg" | |
| if _extract_frame(video_path, min(max(start, t), max(start, end - 0.05)), frame_path): | |
| frames.append(frame_path) | |
| return frames | |
| def _detect_face_bbox_in_image(image_path: Path) -> list[float] | None: | |
| """Detect a human face in one frame and return a normalized padded bbox.""" | |
| try: | |
| import cv2 | |
| except Exception: | |
| return None | |
| image = cv2.imread(str(image_path)) | |
| if image is None: | |
| return None | |
| fh, fw = image.shape[:2] | |
| if fw <= 0 or fh <= 0: | |
| return None | |
| cascade_paths = [ | |
| Path(cv2.data.haarcascades) / "haarcascade_frontalface_default.xml", | |
| Path(cv2.data.haarcascades) / "haarcascade_profileface.xml", | |
| ] | |
| cascades = [cv2.CascadeClassifier(str(p)) for p in cascade_paths if p.exists()] | |
| cascades = [c for c in cascades if not c.empty()] | |
| if not cascades: | |
| return None | |
| gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) | |
| gray = cv2.equalizeHist(gray) | |
| candidates: list[tuple[int, int, int, int]] = [] | |
| min_size = (max(34, fw // 46), max(34, fh // 46)) | |
| for cascade in cascades: | |
| faces = cascade.detectMultiScale( | |
| gray, | |
| scaleFactor=1.08, | |
| minNeighbors=4, | |
| minSize=min_size, | |
| ) | |
| candidates.extend(tuple(map(int, face)) for face in faces) | |
| flipped = cv2.flip(gray, 1) | |
| flipped_faces = cascade.detectMultiScale( | |
| flipped, | |
| scaleFactor=1.08, | |
| minNeighbors=4, | |
| minSize=min_size, | |
| ) | |
| for x, y, w, h in flipped_faces: | |
| candidates.append((fw - int(x) - int(w), int(y), int(w), int(h))) | |
| best: tuple[int, int, int, int] | None = None | |
| best_score = 0.0 | |
| for x, y, w, h in candidates: | |
| area = w * h | |
| if area <= 0: | |
| continue | |
| face_cy = (y + h / 2) / fh | |
| centrality = 1.0 - min(0.55, abs(face_cy - 0.38)) | |
| score = area * centrality | |
| if score > best_score: | |
| best = (x, y, w, h) | |
| best_score = score | |
| if not best: | |
| return None | |
| x, y, w, h = best | |
| pad_x = w * 0.34 | |
| pad_y_top = h * 0.46 | |
| pad_y_bottom = h * 0.70 | |
| return [ | |
| max(0.0, (x - pad_x) / fw), | |
| max(0.0, (y - pad_y_top) / fh), | |
| min(1.0, (x + w + pad_x) / fw), | |
| min(1.0, (y + h + pad_y_bottom) / fh), | |
| ] | |
| def _detect_segment_face_bbox(frame_paths: list[Path]) -> list[float] | None: | |
| """Pick the strongest face box across the sampled frames for a segment.""" | |
| best_bbox: list[float] | None = None | |
| best_area = 0.0 | |
| for frame_path in frame_paths: | |
| bbox = _detect_face_bbox_in_image(frame_path) | |
| if not bbox: | |
| continue | |
| area = max(0.0, bbox[2] - bbox[0]) * max(0.0, bbox[3] - bbox[1]) | |
| if area > best_area: | |
| best_bbox = bbox | |
| best_area = area | |
| if best_bbox: | |
| logger.info( | |
| "HRE face zoom target: " | |
| f"x={((best_bbox[0] + best_bbox[2]) / 2):.2f} " | |
| f"y={((best_bbox[1] + best_bbox[3]) / 2):.2f}" | |
| ) | |
| return best_bbox | |
| def _apply_detected_face_override(analysis: dict, face_bbox: list[float] | None) -> dict: | |
| if not face_bbox: | |
| return analysis | |
| x1, y1, x2, y2 = face_bbox | |
| face_cx = (x1 + x2) / 2.0 | |
| face_cy = (y1 + y2) / 2.0 | |
| return { | |
| **analysis, | |
| "face_detected": True, | |
| "subject_bbox": face_bbox, | |
| "face_cx": face_cx, | |
| "face_cy": face_cy, | |
| "zoom_anchor_x": face_cx, | |
| "zoom_anchor_y": face_cy, | |
| } | |
| # ─── Per-segment AI analysis ────────────────────────────────────────────────── | |
| def _analyze_segment( | |
| video_path: Path, | |
| seg: dict, | |
| seg_idx: int, | |
| n_total: int, | |
| transcript: dict, | |
| clip_start: float, | |
| tmp_dir: Path, | |
| ) -> dict: | |
| from src.analysis.vision import analyze_frames_for_hre, _default_hre_analysis | |
| frame_paths = _extract_segment_frames(video_path, seg, seg_idx, tmp_dir) | |
| if not frame_paths: | |
| return _default_hre_analysis(seg_idx, n_total) | |
| words_all: list[dict] = [] | |
| for s in transcript.get("segments", []): | |
| words_all.extend(s.get("words", [])) | |
| abs_start = seg["start"] + clip_start | |
| abs_end = seg["end"] + clip_start | |
| context = " ".join( | |
| w.get("word", w.get("text", "")) | |
| for w in words_all | |
| if w.get("start", 0) < abs_end and w.get("end", 0) > abs_start | |
| ).strip() | |
| analysis = analyze_frames_for_hre(frame_paths, context, seg_idx, n_total) | |
| return _apply_detected_face_override(analysis, _detect_segment_face_bbox(frame_paths)) | |
| # ─── Zoom expression builders ───────────────────────────────────────────────── | |
| def _build_zoom_exprs( | |
| analysis: dict, | |
| w: int, | |
| h: int, | |
| ) -> tuple[str, str, str]: | |
| """Return (z_expr, x_expr, y_expr) for ffmpeg zoompan from HRE analysis. | |
| Note: \\, escapes comma inside ffmpeg filter expressions. | |
| """ | |
| direction = analysis.get("zoom_direction", "in") | |
| speed = analysis.get("zoom_speed", "slow") | |
| zoom_anchor_x = _clamp_float(analysis.get("zoom_anchor_x"), _clamp_float(analysis.get("face_cx"), 0.5)) | |
| zoom_anchor_y = _clamp_float(analysis.get("zoom_anchor_y"), _clamp_float(analysis.get("face_cy"), 0.38)) | |
| if direction == "in": | |
| if speed == "fast": | |
| z_expr, max_zoom = "min(1.0+on*0.0100\\,1.45)", 1.45 | |
| else: | |
| z_expr, max_zoom = "min(1.0+on*0.0035\\,1.28)", 1.28 | |
| elif direction == "out": | |
| if speed == "fast": | |
| z_expr, max_zoom = "max(1.45-on*0.0100\\,1.0)", 1.45 | |
| else: | |
| z_expr, max_zoom = "max(1.28-on*0.0040\\,1.0)", 1.28 | |
| else: # hold | |
| z_expr, max_zoom = "1.08", 1.08 | |
| if direction == "in" and max_zoom > 1.05: | |
| x_expr = f"max(0\\,min(iw-iw/zoom\\,iw*{zoom_anchor_x:.3f}-iw/zoom/2))" | |
| y_expr = f"max(0\\,min(ih-ih/zoom\\,ih*{zoom_anchor_y:.3f}-ih/zoom/2))" | |
| else: | |
| x_expr = "iw/2-(iw/zoom/2)" | |
| if direction == "in": | |
| y_bias = min(zoom_anchor_y, 0.5) if zoom_anchor_y < 0.55 else 0.38 | |
| y_expr = f"max(0\\,min(ih-ih/zoom\\,ih*{y_bias:.2f}-(ih/zoom/2)))" | |
| else: | |
| y_expr = "ih/2-(ih/zoom/2)" | |
| return z_expr, x_expr, y_expr | |
| # ─── Per-segment zoom via filter_complex ────────────────────────────────────── | |
| def _apply_per_segment_zoom( | |
| input_path: Path, | |
| segments: list[dict], | |
| analyses: list[dict], | |
| w: int, | |
| h: int, | |
| output_path: Path, | |
| has_audio: bool = True, | |
| ) -> Path: | |
| """Apply different zoompan to each segment, concat into single stream.""" | |
| filter_parts: list[str] = [] | |
| v_labels: list[str] = [] | |
| a_labels: list[str] = [] | |
| for i, (seg, analysis) in enumerate(zip(segments, analyses)): | |
| s = f"{seg['start']:.3f}" | |
| e = f"{seg['end']:.3f}" | |
| z, x, y = _build_zoom_exprs(analysis, w, h) | |
| zp = f"zoompan=z='{z}':x='{x}':y='{y}':d=1:s={w}x{h}:fps=30" | |
| filter_parts.append( | |
| f"[0:v]trim=start={s}:end={e},setpts=PTS-STARTPTS,fps=30,{zp},setpts=PTS-STARTPTS[v{i}]" | |
| ) | |
| v_labels.append(f"[v{i}]") | |
| if has_audio: | |
| filter_parts.append(f"[0:a]atrim=start={s}:end={e},asetpts=PTS-STARTPTS[a{i}]") | |
| a_labels.append(f"[a{i}]") | |
| n = len(segments) | |
| filter_parts.append("".join(v_labels) + f"concat=n={n}:v=1:a=0[vout]") | |
| if has_audio: | |
| filter_parts.append("".join(a_labels) + f"concat=n={n}:v=0:a=1[aout]") | |
| cmd = [ | |
| "ffmpeg", "-y", "-i", str(input_path), | |
| "-filter_complex", ";".join(filter_parts), | |
| "-map", "[vout]", | |
| ] | |
| if has_audio: | |
| cmd += ["-map", "[aout]", "-c:a", "aac"] | |
| cmd += ["-c:v", "libx264", "-movflags", "+faststart", str(output_path)] | |
| result = subprocess.run(cmd, capture_output=True, text=True, timeout=300) | |
| if result.returncode == 0 and output_path.exists(): | |
| logger.info(f"Per-segment zoom: {n} segments, {w}x{h}") | |
| return output_path | |
| logger.warning(f"Per-segment zoom failed: {result.stderr[-800:]}") | |
| return input_path | |
| # ─── Per-segment ASS subtitles ──────────────────────────────────────────────── | |
| _ASS_COLORS = { | |
| "white": "&H00FFFFFF", | |
| "yellow": "&H0000FFFF", | |
| "cyan": "&H00FFFF00", | |
| "orange": "&H000066FF", | |
| "green": "&H0000FF00", | |
| "red": "&H000000FF", | |
| } | |
| _POSITIONS = {"top", "bottom", "left", "right", "center", "free"} | |
| _MODES = {"word", "phrase", "sentence"} | |
| _EMPHASIS = {"pop", "punch", "calm"} | |
| _ANCHORS = set(range(1, 10)) | |
| def _ts(t: float) -> str: | |
| total_cs = max(0, int(round(t * 100))) | |
| h = total_cs // 360000 | |
| total_cs %= 360000 | |
| m = total_cs // 6000 | |
| total_cs %= 6000 | |
| s = total_cs // 100 | |
| cs = total_cs % 100 | |
| return f"{h}:{m:02d}:{s:02d}.{cs:02d}" | |
| def _pick(value: object, allowed: set[str], fallback: str) -> str: | |
| v = str(value or "").strip().lower() | |
| return v if v in allowed else fallback | |
| def _clamp_float(value: object, fallback: float, low: float = 0.0, high: float = 1.0) -> float: | |
| try: | |
| return min(high, max(low, float(value))) | |
| except Exception: | |
| return fallback | |
| def _clamp_int(value: object, fallback: int, allowed: set[int]) -> int: | |
| try: | |
| v = int(value) | |
| except Exception: | |
| return fallback | |
| return v if v in allowed else fallback | |
| def _normalise_bbox(value: object) -> list[float] | None: | |
| if not isinstance(value, (list, tuple)) or len(value) != 4: | |
| return None | |
| try: | |
| coords = [float(v) for v in value] | |
| except Exception: | |
| return None | |
| if max(abs(v) for v in coords) > 1.5: | |
| return None | |
| x1, y1, x2, y2 = coords | |
| x1, x2 = sorted((min(1.0, max(0.0, x1)), min(1.0, max(0.0, x2)))) | |
| y1, y2 = sorted((min(1.0, max(0.0, y1)), min(1.0, max(0.0, y2)))) | |
| if x2 - x1 < 0.02 or y2 - y1 < 0.02: | |
| return None | |
| return [x1, y1, x2, y2] | |
| def _caption_anchor_for(x: float, y: float) -> int: | |
| if y < 0.34: | |
| return 8 if 0.30 <= x <= 0.70 else 7 if x < 0.5 else 9 | |
| if y > 0.66: | |
| return 2 if 0.30 <= x <= 0.70 else 1 if x < 0.5 else 3 | |
| return 5 if 0.34 <= x <= 0.66 else 4 if x < 0.5 else 6 | |
| def _safe_caption_point(subject_x: float, subject_y: float, seg_idx: int) -> tuple[float, float, int]: | |
| """Pick a varied but readable empty-ish zone opposite the main subject.""" | |
| left_side = subject_x < 0.50 | |
| high_subject = subject_y < 0.42 | |
| low_subject = subject_y > 0.62 | |
| candidates = [ | |
| (0.68 if left_side else 0.32, 0.72 if high_subject else 0.24 if low_subject else 0.76), | |
| (0.72 if left_side else 0.28, 0.50), | |
| (0.50, 0.18 if subject_y > 0.45 else 0.82), | |
| (0.50, 0.72), | |
| ] | |
| x, y = candidates[seg_idx % len(candidates)] | |
| return x, y, _caption_anchor_for(x, y) | |
| def _word_caption_point(subject_x: float, subject_y: float, seg_idx: int) -> tuple[float, float, int]: | |
| """Put highlight words in punchy mid-frame zones instead of ordinary subtitle zones.""" | |
| candidates = [ | |
| (0.50, 0.42), | |
| (0.50, 0.26), | |
| (0.28 if subject_x > 0.55 else 0.72, 0.46), | |
| (0.30 if subject_x > 0.50 else 0.70, 0.58), | |
| ] | |
| x, y = candidates[seg_idx % len(candidates)] | |
| if abs(x - subject_x) < 0.18 and abs(y - subject_y) < 0.18: | |
| x = 0.25 if subject_x > 0.5 else 0.75 | |
| return x, y, _caption_anchor_for(x, y) | |
| def _normalise_analysis(analysis: dict, seg_idx: int, n_total: int) -> dict: | |
| """Validate model output and fill HRE fields used by the renderer.""" | |
| an = dict(analysis or {}) | |
| subject_bbox = _normalise_bbox(an.get("subject_bbox")) | |
| energy = _pick(an.get("energy_level"), {"high", "medium", "low"}, "medium") | |
| moment = _pick( | |
| an.get("moment_type"), | |
| {"hook", "punchline", "context", "reaction", "transition"}, | |
| "context", | |
| ) | |
| fallback_mode = "word" if energy == "high" or moment in {"hook", "punchline", "reaction"} else "sentence" | |
| if energy == "medium" and moment not in {"context", "transition"}: | |
| fallback_mode = "phrase" | |
| if subject_bbox: | |
| subject_x = (subject_bbox[0] + subject_bbox[2]) / 2.0 | |
| subject_y = (subject_bbox[1] + subject_bbox[3]) / 2.0 | |
| else: | |
| subject_x = _clamp_float(an.get("face_cx"), 0.5) | |
| subject_y = _clamp_float(an.get("face_cy"), 0.38) | |
| pos = _pick(an.get("subtitle_position"), _POSITIONS, "free") | |
| mode = _pick(an.get("subtitle_mode"), _MODES, fallback_mode) | |
| emphasis = _pick(an.get("subtitle_emphasis"), _EMPHASIS, "punch" if mode == "word" else "calm") | |
| color = _pick(an.get("subtitle_color"), set(_ASS_COLORS), "white") | |
| zoom_direction = _pick(an.get("zoom_direction"), {"in", "out", "hold"}, "in") | |
| zoom_speed = _pick(an.get("zoom_speed"), {"fast", "slow"}, "slow") | |
| face_cx = _clamp_float(an.get("face_cx"), subject_x) | |
| face_cy = _clamp_float(an.get("face_cy"), subject_y) | |
| zoom_anchor_x = _clamp_float(an.get("zoom_anchor_x"), face_cx) | |
| zoom_anchor_y = _clamp_float(an.get("zoom_anchor_y"), face_cy) | |
| fallback_x, fallback_y, fallback_anchor = _safe_caption_point(subject_x, subject_y, seg_idx) | |
| caption_x = _clamp_float(an.get("caption_x"), fallback_x, 0.10, 0.90) | |
| caption_y = _clamp_float(an.get("caption_y"), fallback_y, 0.12, 0.88) | |
| caption_anchor = _clamp_int(an.get("caption_anchor"), fallback_anchor, _ANCHORS) | |
| caption_max_width_pct = _clamp_float( | |
| an.get("caption_max_width_pct"), | |
| 0.58 if mode != "sentence" else 0.72, | |
| 0.35, | |
| 0.82, | |
| ) | |
| if mode == "sentence": | |
| caption_x = 0.50 | |
| caption_y = _clamp_float(an.get("caption_y"), 0.70, 0.64, 0.74) | |
| caption_anchor = 2 | |
| caption_max_width_pct = max(caption_max_width_pct, 0.68) | |
| elif mode == "word": | |
| word_x, word_y, word_anchor = _word_caption_point(subject_x, subject_y, seg_idx) | |
| if caption_y > 0.66 or (abs(caption_x - subject_x) < 0.14 and abs(caption_y - subject_y) < 0.14): | |
| caption_x, caption_y, caption_anchor = word_x, word_y, word_anchor | |
| caption_max_width_pct = min(caption_max_width_pct, 0.56) | |
| if subject_bbox: | |
| x1, y1, x2, y2 = subject_bbox | |
| overlaps_subject = (x1 - 0.08) <= caption_x <= (x2 + 0.08) and (y1 - 0.08) <= caption_y <= (y2 + 0.08) | |
| if overlaps_subject: | |
| if mode == "sentence": | |
| caption_x, caption_y, caption_anchor = 0.50, 0.70, 2 | |
| elif mode == "word": | |
| caption_x, caption_y, caption_anchor = _word_caption_point(subject_x, subject_y, seg_idx + 1) | |
| else: | |
| caption_x, caption_y, caption_anchor = fallback_x, fallback_y, fallback_anchor | |
| if seg_idx == 0: | |
| zoom_direction, zoom_speed = "in", "fast" | |
| if mode == "sentence": | |
| mode = "word" | |
| if emphasis == "calm": | |
| emphasis = "punch" | |
| if mode == "word" or moment in {"hook", "punchline", "reaction"}: | |
| zoom_direction = "in" | |
| zoom_speed = "fast" if energy == "high" else "slow" | |
| emphasis = "punch" if emphasis == "calm" else emphasis | |
| elif mode == "sentence" and moment in {"context", "transition"}: | |
| zoom_direction = "hold" | |
| zoom_speed = "slow" | |
| emphasis = "calm" | |
| return { | |
| **an, | |
| "zoom_direction": zoom_direction, | |
| "zoom_speed": zoom_speed, | |
| "face_detected": bool(an.get("face_detected", False)), | |
| "face_cx": face_cx, | |
| "face_cy": face_cy, | |
| "subject_bbox": subject_bbox, | |
| "zoom_anchor_x": zoom_anchor_x, | |
| "zoom_anchor_y": zoom_anchor_y, | |
| "subtitle_position": pos, | |
| "caption_x": caption_x, | |
| "caption_y": caption_y, | |
| "caption_anchor": caption_anchor, | |
| "caption_max_width_pct": caption_max_width_pct, | |
| "subtitle_mode": mode, | |
| "subtitle_emphasis": emphasis, | |
| "subtitle_color": color, | |
| "energy_level": energy, | |
| "moment_type": moment, | |
| } | |
| def _build_hre_plan(segments: list[dict], analyses: list[dict]) -> list[dict]: | |
| plan = [] | |
| n_total = len(segments) | |
| for i, (seg, analysis) in enumerate(zip(segments, analyses)): | |
| an = _normalise_analysis(analysis, i, n_total) | |
| plan.append({**an, "segment_index": i, "start": seg["start"], "end": seg["end"]}) | |
| # If the model repeats the same caption treatment for every segment, rotate | |
| # through safe defaults so HRE visibly changes across the clip. | |
| if len(plan) > 1 and len({(round(p["caption_x"], 2), round(p["caption_y"], 2), p["subtitle_mode"]) for p in plan}) == 1: | |
| positions = ["free", "free", "free", "free", "free", "free"] | |
| coords = [(0.50, 0.76), (0.50, 0.18), (0.28, 0.56), (0.72, 0.52), (0.50, 0.82), (0.50, 0.22)] | |
| modes = ["word", "sentence", "phrase", "word", "sentence", "phrase"] | |
| for i, p in enumerate(plan): | |
| p["subtitle_position"] = positions[i % len(positions)] | |
| p["caption_x"], p["caption_y"] = coords[i % len(coords)] | |
| p["caption_anchor"] = _caption_anchor_for(p["caption_x"], p["caption_y"]) | |
| p["subtitle_mode"] = modes[i % len(modes)] | |
| if p["subtitle_mode"] == "word": | |
| p["subtitle_emphasis"] = "punch" | |
| return plan | |
| def _ass_escape(text: str) -> str: | |
| return ( | |
| text.replace("{", "(") | |
| .replace("}", ")") | |
| .replace("\r", " ") | |
| .replace("\n", " ") | |
| .strip() | |
| ) | |
| def _wrap_text(text: str, max_chars: int) -> str: | |
| text = _ass_escape(text) | |
| if len(text) <= max_chars: | |
| return text | |
| words = text.split() | |
| if len(words) <= 1: | |
| return r"\N".join(text[i:i + max_chars] for i in range(0, len(text), max_chars)) | |
| lines: list[str] = [] | |
| line = "" | |
| for word in words: | |
| candidate = f"{line} {word}".strip() | |
| if line and len(candidate) > max_chars: | |
| lines.append(line) | |
| line = word | |
| else: | |
| line = candidate | |
| if line: | |
| lines.append(line) | |
| if len(lines) <= 2: | |
| return r"\N".join(lines) | |
| return r"\N".join([lines[0], " ".join(lines[1:])]) | |
| def _collect_clip_words(transcript: dict, clip_start: float, duration: float) -> list[dict]: | |
| words: list[dict] = [] | |
| for seg in transcript.get("segments", []): | |
| seg_start = float(seg.get("start", clip_start)) - clip_start | |
| seg_end = float(seg.get("end", clip_start)) - clip_start | |
| for word in seg.get("words", []): | |
| text = str(word.get("word", word.get("text", ""))).strip() | |
| if not text: | |
| continue | |
| start = float(word.get("start", seg_start + clip_start)) - clip_start | |
| end = float(word.get("end", word.get("start", seg_end + clip_start))) - clip_start | |
| if end <= start: | |
| end = start + 0.24 | |
| if end <= 0 or start >= duration: | |
| continue | |
| words.append({ | |
| "start": max(0.0, start), | |
| "end": min(duration, end), | |
| "text": text, | |
| }) | |
| return sorted(words, key=lambda w: (w["start"], w["end"])) | |
| def _segment_text(transcript: dict, clip_start: float, seg: dict) -> str: | |
| parts: list[str] = [] | |
| for item in transcript.get("segments", []): | |
| start = float(item.get("start", clip_start)) - clip_start | |
| end = float(item.get("end", clip_start)) - clip_start | |
| if start < seg["end"] and end > seg["start"]: | |
| text = str(item.get("text", "")).strip() | |
| if text: | |
| parts.append(text) | |
| return " ".join(parts).strip() | |
| def _words_in_segment(words: list[dict], seg: dict) -> list[dict]: | |
| return [ | |
| w for w in words | |
| if w["start"] < seg["end"] and w["end"] > seg["start"] | |
| ] | |
| def _display_text(text: str, mode: str, emphasis: str) -> str: | |
| text = text.strip() | |
| if mode == "sentence" and emphasis == "calm": | |
| return text | |
| return text.upper() | |
| def _append_event(events: list[dict], start: float, end: float, text: str, plan: dict) -> None: | |
| start = max(float(plan["start"]), start) | |
| end = min(float(plan["end"]), end) | |
| if end - start < 0.08 or not text.strip(): | |
| return | |
| events.append({ | |
| "start": start, | |
| "end": end, | |
| "text": text.strip(), | |
| "plan": plan, | |
| }) | |
| def _word_events(words: list[dict], seg: dict, plan: dict) -> list[dict]: | |
| events: list[dict] = [] | |
| cursor = seg["start"] | |
| min_d = 0.14 if plan["energy_level"] == "high" else 0.18 | |
| max_d = 0.72 if plan["energy_level"] == "high" else 0.95 | |
| for i, word in enumerate(words): | |
| start = max(seg["start"], word["start"], cursor) | |
| next_start = words[i + 1]["start"] if i + 1 < len(words) else seg["end"] | |
| natural_end = max(word["end"], start + min_d) | |
| end = min(seg["end"], natural_end, start + max_d) | |
| if next_start > start: | |
| end = min(end, max(start + min_d, next_start - 0.015)) | |
| if end <= start: | |
| end = min(seg["end"], start + min_d) | |
| _append_event(events, start, end, word["text"], plan) | |
| cursor = end + 0.015 | |
| if cursor >= seg["end"]: | |
| break | |
| return events | |
| def _line_events( | |
| words: list[dict], | |
| seg: dict, | |
| plan: dict, | |
| max_words: int, | |
| max_duration: float, | |
| max_chars: int, | |
| ) -> list[dict]: | |
| events: list[dict] = [] | |
| i = 0 | |
| cursor = seg["start"] | |
| while i < len(words) and cursor < seg["end"] - 0.08: | |
| group: list[dict] = [] | |
| start = max(seg["start"], words[i]["start"], cursor) | |
| end = start | |
| chars = 0 | |
| while i < len(words): | |
| word = words[i] | |
| proposed_end = min(seg["end"], max(word["end"], word["start"] + 0.2)) | |
| proposed_chars = chars + len(word["text"]) + (1 if group else 0) | |
| if group and ( | |
| len(group) >= max_words | |
| or proposed_end - start > max_duration | |
| or proposed_chars > max_chars | |
| ): | |
| break | |
| group.append(word) | |
| chars = proposed_chars | |
| end = max(end, proposed_end) | |
| i += 1 | |
| if not group: | |
| i += 1 | |
| continue | |
| end = min(seg["end"], max(end, start + 0.55)) | |
| text = " ".join(w["text"] for w in group) | |
| _append_event(events, start, end, text, plan) | |
| cursor = end + 0.04 | |
| return events | |
| def _fallback_text_events(text: str, seg: dict, plan: dict) -> list[dict]: | |
| if not text: | |
| return [] | |
| mode = plan["subtitle_mode"] | |
| if mode == "word": | |
| chunk_size = 1 | |
| elif mode == "phrase": | |
| chunk_size = 3 | |
| else: | |
| chunk_size = 7 | |
| units = text.split() | |
| if len(units) <= 1 and len(text) > 20: | |
| step = 10 if mode == "word" else 24 if mode == "phrase" else 36 | |
| units = [text[i:i + step] for i in range(0, len(text), step)] | |
| chunks = [" ".join(units[i:i + chunk_size]) for i in range(0, len(units), chunk_size)] | |
| chunks = [c for c in chunks if c.strip()] | |
| if not chunks: | |
| return [] | |
| events: list[dict] = [] | |
| seg_d = max(0.1, seg["end"] - seg["start"]) | |
| dur = seg_d / len(chunks) | |
| for i, chunk in enumerate(chunks): | |
| start = seg["start"] + i * dur | |
| end = seg["start"] + (i + 1) * dur | |
| _append_event(events, start, end, chunk, plan) | |
| return events | |
| def _build_subtitle_events( | |
| transcript: dict, | |
| clip_start: float, | |
| duration: float, | |
| segments: list[dict], | |
| plan: list[dict], | |
| ) -> list[dict]: | |
| words = _collect_clip_words(transcript, clip_start, duration) | |
| events: list[dict] = [] | |
| for seg, seg_plan in zip(segments, plan): | |
| seg_words = _words_in_segment(words, seg) | |
| mode = seg_plan["subtitle_mode"] | |
| if seg_words and mode == "word": | |
| seg_events = _word_events(seg_words, seg, seg_plan) | |
| elif seg_words and mode == "phrase": | |
| seg_events = _line_events(seg_words, seg, seg_plan, max_words=3, max_duration=1.7, max_chars=28) | |
| elif seg_words: | |
| seg_events = _line_events(seg_words, seg, seg_plan, max_words=7, max_duration=2.8, max_chars=44) | |
| else: | |
| seg_events = [] | |
| if not seg_events: | |
| seg_events = _fallback_text_events(_segment_text(transcript, clip_start, seg), seg, seg_plan) | |
| events.extend(seg_events) | |
| events = sorted(events, key=lambda ev: (ev["start"], ev["end"])) | |
| # ASS draws all active events at once; keep one visible caption event at a | |
| # time so word/phrase/sentence modes never stack on top of each other. | |
| cleaned: list[dict] = [] | |
| cursor = 0.0 | |
| for ev in events: | |
| start = max(ev["start"], cursor) | |
| end = min(duration, ev["end"]) | |
| if end - start < 0.08: | |
| continue | |
| cleaned.append({**ev, "start": start, "end": end}) | |
| cursor = end + 0.01 | |
| return cleaned | |
| def _subtitle_tag(plan: dict) -> tuple[str, int]: | |
| mode = plan["subtitle_mode"] | |
| energy = plan["energy_level"] | |
| emphasis = plan["subtitle_emphasis"] | |
| color = _ASS_COLORS.get(plan["subtitle_color"], "&H00FFFFFF") | |
| alignment = int(plan.get("caption_anchor", 5)) | |
| x = round(_clamp_float(plan.get("caption_x"), 0.5, 0.08, 0.92) * 1080) | |
| y = round(_clamp_float(plan.get("caption_y"), 0.75, 0.10, 0.90) * 1920) | |
| max_width_px = max(360, min(886, int(_clamp_float(plan.get("caption_max_width_pct"), 0.62, 0.35, 0.82) * 1080))) | |
| if mode == "sentence": | |
| font_size = 54 if energy != "high" else 60 | |
| elif mode == "phrase": | |
| font_size = 68 if energy != "low" else 62 | |
| else: | |
| font_size = 96 if energy == "high" else 84 | |
| if alignment in {4, 5, 6}: | |
| font_size = max(54, font_size - 4) | |
| max_chars = max(8, min(34, int(max_width_px / (font_size * 0.58)))) | |
| base = ( | |
| f"{{\\an{alignment}\\pos({x},{y})\\1c{color}&\\fs{font_size}" | |
| "\\b1\\bord5\\shad1\\q2\\fad(30,70)}" | |
| ) | |
| if emphasis == "punch" or mode == "word": | |
| base += "{\\fscx132\\fscy132\\frz-2\\t(0,140,\\fscx100\\fscy100\\frz0)}" | |
| elif emphasis == "pop": | |
| base += "{\\fscx118\\fscy118\\t(0,120,\\fscx100\\fscy100)}" | |
| return base, max_chars | |
| def _generate_per_segment_subtitles( | |
| transcript: dict, | |
| ass_path: Path, | |
| clip_start: float, | |
| segments: list[dict], | |
| analyses: list[dict], | |
| ) -> None: | |
| """Write one ASS file from the HRE plan. | |
| The important rule is that HRE can change style every segment, but it must | |
| never emit simultaneous caption events at the same timestamp. | |
| """ | |
| duration = max((float(seg["end"]) for seg in segments), default=0.0) | |
| plan = _build_hre_plan(segments, analyses) | |
| events = _build_subtitle_events(transcript, clip_start, duration, segments, plan) | |
| lines = [ | |
| "[Script Info]", | |
| "ScriptType: v4.00+", | |
| "PlayResX: 1080", | |
| "PlayResY: 1920", | |
| "ScaledBorderAndShadow: yes", | |
| "", | |
| "[V4+ Styles]", | |
| "Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, " | |
| "OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, " | |
| "ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, " | |
| "Alignment, MarginL, MarginR, MarginV, Encoding", | |
| "Style: Default,Noto Sans,82,&H00FFFFFF,&H0000FFFF,&H00000000,&H80000000," | |
| "-1,0,0,0,100,100,0,0,1,5,1,2,40,40,200,1", | |
| "", | |
| "[Events]", | |
| "Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text", | |
| ] | |
| for ev in events: | |
| seg_plan = ev["plan"] | |
| tag, max_chars = _subtitle_tag(seg_plan) | |
| text = _display_text(ev["text"], seg_plan["subtitle_mode"], seg_plan["subtitle_emphasis"]) | |
| text = _wrap_text(text, max_chars) | |
| lines.append( | |
| f"Dialogue: 0,{_ts(ev['start'])},{_ts(ev['end'])}," | |
| f"Default,,0,0,0,,{tag}{text}" | |
| ) | |
| ass_path.write_text("\n".join(lines), encoding="utf-8") | |
| plan_path = ass_path.with_suffix(".hre_plan.json") | |
| plan_path.write_text(json.dumps(plan, ensure_ascii=False, indent=2), encoding="utf-8") | |
| logger.debug(f"ASS: {len(events)} events across {len(segments)} HRE segments") | |
| # ─── Emoji ───────────────────────────────────────────────────────────────────── | |
| def _get_emoji(clip_data: dict, analyses: list[dict] | None = None) -> str: | |
| if analyses: | |
| energy_rank = {"high": 3, "medium": 2, "low": 1} | |
| best = max(analyses, key=lambda a: energy_rank.get(a.get("energy_level", "low"), 1)) | |
| moment_emoji = { | |
| "hook": "🔥", "punchline": "😂", "reaction": "😲", | |
| "context": "💡", "transition": "✨", | |
| } | |
| if emoji := moment_emoji.get(best.get("moment_type", "")): | |
| return emoji | |
| a = clip_data.get("vision_analysis", {}) | |
| emotion = a.get("emotion", "excited") | |
| action = a.get("action_type", "entertainment") | |
| transcript_text = clip_data.get("transcript_text", "") | |
| if transcript_text: | |
| try: | |
| from src.analysis.vision import get_emoji_for_scene | |
| return get_emoji_for_scene(transcript_text, emotion, action) | |
| except Exception: | |
| pass | |
| fb = {"happy": "😄", "excited": "🔥", "funny": "😂", "surprised": "😲", | |
| "gaming": "🎮", "tutorial": "📚", "angry": "😤", "sad": "😢"} | |
| return fb.get(emotion, fb.get(action, "⚡")) | |
| # ─── Final render ───────────────────────────────────────────────────────────── | |
| def _render_final( | |
| video_path: Path, | |
| ass_path: Path, | |
| emoji: str, | |
| output_path: Path, | |
| ) -> None: | |
| ass_str = str(ass_path).replace("\\", "/").replace(":", "\\:") | |
| emoji_filter = ( | |
| f"drawtext=text='{emoji}':fontsize=80:x=w-100:y=50" | |
| f":enable='between(t\\,0\\,3)'" | |
| ) | |
| vf = f"ass='{ass_str}',{emoji_filter}" | |
| cmd = [ | |
| "ffmpeg", "-y", "-i", str(video_path), | |
| "-vf", vf, "-c:v", "libx264", "-c:a", "copy", | |
| "-movflags", "+faststart", str(output_path), | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True, timeout=300) | |
| if result.returncode != 0: | |
| cmd2 = [ | |
| "ffmpeg", "-y", "-i", str(video_path), | |
| "-vf", f"ass='{ass_str}'", | |
| "-c:v", "libx264", "-c:a", "copy", str(output_path), | |
| ] | |
| result2 = subprocess.run(cmd2, capture_output=True, text=True, timeout=300) | |
| if result2.returncode != 0: | |
| logger.error(f"HRE render failed: {result2.stderr[-300:]}") | |
| return | |
| logger.info(f"HRE render complete → {output_path.name}") | |
| # ─── Main pipeline ──────────────────────────────────────────────────────────── | |
| def apply_hre( | |
| clip_path: Path, | |
| clip_data: dict, | |
| transcript: dict, | |
| output_path: Path, | |
| ) -> Path: | |
| """Apply per-segment AI-driven HRE with varied zoom and caption plans.""" | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| clip_start = clip_data.get("start", 0.0) | |
| with tempfile.TemporaryDirectory() as _tmp: | |
| tmp_dir = Path(_tmp) | |
| tmp_zoomed = tmp_dir / "zoomed.mp4" | |
| w, h = _probe_dimensions(clip_path) | |
| duration = _probe_duration(clip_path) | |
| if duration <= 0: | |
| duration = float(clip_data.get("end", clip_start + 30)) - clip_start | |
| has_audio = _has_audio_stream(clip_path) | |
| # 1. Segment at speech pauses | |
| segments = _segment_clip(duration, transcript, clip_start) | |
| n = len(segments) | |
| logger.info( | |
| f"HRE clip {clip_data.get('index', '?')}: " | |
| f"{duration:.1f}s → {n} segments (AI analyzing each)" | |
| ) | |
| # 2. Qwen2.5-VL analyzes each segment | |
| analyses = [ | |
| _analyze_segment(clip_path, seg, i, n, transcript, clip_start, tmp_dir) | |
| for i, seg in enumerate(segments) | |
| ] | |
| plan = _build_hre_plan(segments, analyses) | |
| for i, (seg, an) in enumerate(zip(segments, plan)): | |
| logger.info( | |
| f" [{seg['start']:.1f}s-{seg['end']:.1f}s] " | |
| f"zoom={an.get('zoom_direction')}({an.get('zoom_speed')}) " | |
| f"sub={an.get('subtitle_position')}/{an.get('subtitle_mode')}/" | |
| f"{an.get('subtitle_color')} " | |
| f"type={an.get('moment_type')} energy={an.get('energy_level')}" | |
| ) | |
| # 3. Per-segment zoom via filter_complex | |
| zoomed = _apply_per_segment_zoom( | |
| clip_path, segments, plan, w, h, tmp_zoomed, has_audio=has_audio | |
| ) | |
| # 4. Per-segment ASS subtitles | |
| ass_path = output_path.with_suffix(".ass") | |
| _generate_per_segment_subtitles(transcript, ass_path, clip_start, segments, plan) | |
| # 5. Emoji from highest-energy segment | |
| emoji = _get_emoji(clip_data, plan) | |
| # 6. Render | |
| _render_final(zoomed, ass_path, emoji, output_path) | |
| return output_path | |