ElevenClip-AI / backend /src /processing /high_retention.py
jakgritb's picture
fix: keep faces centered in HRE crops
eb1790e verified
Raw
History Blame Contribute Delete
38.6 kB
"""High-Retention Editing pipeline — per-segment AI decisions.
Each 3-5s segment gets its own zoom direction, subtitle position,
subtitle mode, and caption color driven by Qwen2.5-VL analyzing one
frame plus the local transcript for that segment.
Pipeline per clip:
1. Segment clip at speech pauses (3-5s chunks)
2. Extract midpoint frame from each segment
3. Qwen2.5-VL analyzes each frame → zoom + subtitle decisions
4. ffmpeg filter_complex: per-segment zoompan + concat
5. ASS subtitles with per-segment alignment/color/mode override tags
"""
import json
import subprocess
import tempfile
from pathlib import Path
from loguru import logger
# ─── Video metadata ────────────────────────────────────────────────────────────
def _probe_dimensions(video_path: Path) -> tuple[int, int]:
probe = subprocess.run(
["ffprobe", "-v", "error", "-select_streams", "v:0",
"-show_entries", "stream=width,height", "-of", "csv=p=0",
str(video_path)],
capture_output=True, text=True,
)
try:
w, h = map(int, probe.stdout.strip().split(","))
return w, h
except Exception:
return 1080, 1920
def _probe_duration(video_path: Path) -> float:
probe = subprocess.run(
["ffprobe", "-v", "error", "-show_entries", "format=duration",
"-of", "csv=p=0", str(video_path)],
capture_output=True, text=True,
)
try:
return float(probe.stdout.strip())
except Exception:
return 0.0
def _has_audio_stream(video_path: Path) -> bool:
probe = subprocess.run(
["ffprobe", "-v", "error", "-select_streams", "a",
"-show_entries", "stream=codec_type", "-of", "csv=p=0",
str(video_path)],
capture_output=True, text=True,
)
return bool(probe.stdout.strip())
# ─── Segmentation ─────────────────────────────────────────────────────────────
def _segment_clip(
duration: float,
transcript: dict,
clip_start: float,
max_seg: float = 4.5,
) -> list[dict]:
"""Divide clip into segments at speech pauses, max_seg seconds each."""
words: list[dict] = []
for seg in transcript.get("segments", []):
words.extend(seg.get("words", []))
if clip_start > 0:
words = [
{**w, "start": max(0.0, w["start"] - clip_start),
"end": max(0.0, w["end"] - clip_start)}
for w in words
]
words = [w for w in words if w["end"] > 0 and w["start"] < duration]
# Collect pause midpoints as candidate cut times
cuts = [0.0]
for i in range(len(words) - 1):
gap = words[i + 1]["start"] - words[i]["end"]
if gap > 0.2:
cuts.append((words[i]["end"] + words[i + 1]["start"]) / 2.0)
cuts.append(duration)
cuts = sorted(set(cuts))
# Merge short intervals, split long ones
segs: list[dict] = []
start = 0.0
for cut in cuts[1:]:
seg_len = cut - start
if seg_len < 1.5 and cut < duration:
continue # too short — extend to next cut
if seg_len > max_seg:
t = start
while t + max_seg < cut:
segs.append({"start": t, "end": t + max_seg})
t += max_seg
if cut - t > 0.5:
segs.append({"start": t, "end": cut})
start = cut
else:
segs.append({"start": start, "end": cut})
start = cut
# Fallback: split evenly if not enough segments
if len(segs) < 2:
n = max(2, round(duration / 4.0))
d = duration / n
segs = [{"start": i * d, "end": min((i + 1) * d, duration)} for i in range(n)]
return segs
# ─── Frame extraction ─────────────────────────────────────────────────────────
def _extract_frame(video_path: Path, t: float, out_path: Path) -> bool:
cmd = [
"ffmpeg", "-y", "-ss", f"{t:.3f}", "-i", str(video_path),
"-vframes", "1", "-q:v", "3", str(out_path),
]
result = subprocess.run(cmd, capture_output=True, timeout=30)
return result.returncode == 0 and out_path.exists()
def _extract_segment_frames(video_path: Path, seg: dict, seg_idx: int, tmp_dir: Path) -> list[Path]:
"""Extract a few representative frames so HRE decisions see motion, not one random still."""
start = float(seg["start"])
end = float(seg["end"])
duration = max(0.1, end - start)
times = [
start + duration * 0.25,
start + duration * 0.50,
start + duration * 0.75,
]
frames: list[Path] = []
for j, t in enumerate(times):
frame_path = tmp_dir / f"seg_{seg_idx:03d}_{j}.jpg"
if _extract_frame(video_path, min(max(start, t), max(start, end - 0.05)), frame_path):
frames.append(frame_path)
return frames
def _detect_face_bbox_in_image(image_path: Path) -> list[float] | None:
"""Detect a human face in one frame and return a normalized padded bbox."""
try:
import cv2
except Exception:
return None
image = cv2.imread(str(image_path))
if image is None:
return None
fh, fw = image.shape[:2]
if fw <= 0 or fh <= 0:
return None
cascade_paths = [
Path(cv2.data.haarcascades) / "haarcascade_frontalface_default.xml",
Path(cv2.data.haarcascades) / "haarcascade_profileface.xml",
]
cascades = [cv2.CascadeClassifier(str(p)) for p in cascade_paths if p.exists()]
cascades = [c for c in cascades if not c.empty()]
if not cascades:
return None
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
gray = cv2.equalizeHist(gray)
candidates: list[tuple[int, int, int, int]] = []
min_size = (max(34, fw // 46), max(34, fh // 46))
for cascade in cascades:
faces = cascade.detectMultiScale(
gray,
scaleFactor=1.08,
minNeighbors=4,
minSize=min_size,
)
candidates.extend(tuple(map(int, face)) for face in faces)
flipped = cv2.flip(gray, 1)
flipped_faces = cascade.detectMultiScale(
flipped,
scaleFactor=1.08,
minNeighbors=4,
minSize=min_size,
)
for x, y, w, h in flipped_faces:
candidates.append((fw - int(x) - int(w), int(y), int(w), int(h)))
best: tuple[int, int, int, int] | None = None
best_score = 0.0
for x, y, w, h in candidates:
area = w * h
if area <= 0:
continue
face_cy = (y + h / 2) / fh
centrality = 1.0 - min(0.55, abs(face_cy - 0.38))
score = area * centrality
if score > best_score:
best = (x, y, w, h)
best_score = score
if not best:
return None
x, y, w, h = best
pad_x = w * 0.34
pad_y_top = h * 0.46
pad_y_bottom = h * 0.70
return [
max(0.0, (x - pad_x) / fw),
max(0.0, (y - pad_y_top) / fh),
min(1.0, (x + w + pad_x) / fw),
min(1.0, (y + h + pad_y_bottom) / fh),
]
def _detect_segment_face_bbox(frame_paths: list[Path]) -> list[float] | None:
"""Pick the strongest face box across the sampled frames for a segment."""
best_bbox: list[float] | None = None
best_area = 0.0
for frame_path in frame_paths:
bbox = _detect_face_bbox_in_image(frame_path)
if not bbox:
continue
area = max(0.0, bbox[2] - bbox[0]) * max(0.0, bbox[3] - bbox[1])
if area > best_area:
best_bbox = bbox
best_area = area
if best_bbox:
logger.info(
"HRE face zoom target: "
f"x={((best_bbox[0] + best_bbox[2]) / 2):.2f} "
f"y={((best_bbox[1] + best_bbox[3]) / 2):.2f}"
)
return best_bbox
def _apply_detected_face_override(analysis: dict, face_bbox: list[float] | None) -> dict:
if not face_bbox:
return analysis
x1, y1, x2, y2 = face_bbox
face_cx = (x1 + x2) / 2.0
face_cy = (y1 + y2) / 2.0
return {
**analysis,
"face_detected": True,
"subject_bbox": face_bbox,
"face_cx": face_cx,
"face_cy": face_cy,
"zoom_anchor_x": face_cx,
"zoom_anchor_y": face_cy,
}
# ─── Per-segment AI analysis ──────────────────────────────────────────────────
def _analyze_segment(
video_path: Path,
seg: dict,
seg_idx: int,
n_total: int,
transcript: dict,
clip_start: float,
tmp_dir: Path,
) -> dict:
from src.analysis.vision import analyze_frames_for_hre, _default_hre_analysis
frame_paths = _extract_segment_frames(video_path, seg, seg_idx, tmp_dir)
if not frame_paths:
return _default_hre_analysis(seg_idx, n_total)
words_all: list[dict] = []
for s in transcript.get("segments", []):
words_all.extend(s.get("words", []))
abs_start = seg["start"] + clip_start
abs_end = seg["end"] + clip_start
context = " ".join(
w.get("word", w.get("text", ""))
for w in words_all
if w.get("start", 0) < abs_end and w.get("end", 0) > abs_start
).strip()
analysis = analyze_frames_for_hre(frame_paths, context, seg_idx, n_total)
return _apply_detected_face_override(analysis, _detect_segment_face_bbox(frame_paths))
# ─── Zoom expression builders ─────────────────────────────────────────────────
def _build_zoom_exprs(
analysis: dict,
w: int,
h: int,
) -> tuple[str, str, str]:
"""Return (z_expr, x_expr, y_expr) for ffmpeg zoompan from HRE analysis.
Note: \\, escapes comma inside ffmpeg filter expressions.
"""
direction = analysis.get("zoom_direction", "in")
speed = analysis.get("zoom_speed", "slow")
zoom_anchor_x = _clamp_float(analysis.get("zoom_anchor_x"), _clamp_float(analysis.get("face_cx"), 0.5))
zoom_anchor_y = _clamp_float(analysis.get("zoom_anchor_y"), _clamp_float(analysis.get("face_cy"), 0.38))
if direction == "in":
if speed == "fast":
z_expr, max_zoom = "min(1.0+on*0.0100\\,1.45)", 1.45
else:
z_expr, max_zoom = "min(1.0+on*0.0035\\,1.28)", 1.28
elif direction == "out":
if speed == "fast":
z_expr, max_zoom = "max(1.45-on*0.0100\\,1.0)", 1.45
else:
z_expr, max_zoom = "max(1.28-on*0.0040\\,1.0)", 1.28
else: # hold
z_expr, max_zoom = "1.08", 1.08
if direction == "in" and max_zoom > 1.05:
x_expr = f"max(0\\,min(iw-iw/zoom\\,iw*{zoom_anchor_x:.3f}-iw/zoom/2))"
y_expr = f"max(0\\,min(ih-ih/zoom\\,ih*{zoom_anchor_y:.3f}-ih/zoom/2))"
else:
x_expr = "iw/2-(iw/zoom/2)"
if direction == "in":
y_bias = min(zoom_anchor_y, 0.5) if zoom_anchor_y < 0.55 else 0.38
y_expr = f"max(0\\,min(ih-ih/zoom\\,ih*{y_bias:.2f}-(ih/zoom/2)))"
else:
y_expr = "ih/2-(ih/zoom/2)"
return z_expr, x_expr, y_expr
# ─── Per-segment zoom via filter_complex ──────────────────────────────────────
def _apply_per_segment_zoom(
input_path: Path,
segments: list[dict],
analyses: list[dict],
w: int,
h: int,
output_path: Path,
has_audio: bool = True,
) -> Path:
"""Apply different zoompan to each segment, concat into single stream."""
filter_parts: list[str] = []
v_labels: list[str] = []
a_labels: list[str] = []
for i, (seg, analysis) in enumerate(zip(segments, analyses)):
s = f"{seg['start']:.3f}"
e = f"{seg['end']:.3f}"
z, x, y = _build_zoom_exprs(analysis, w, h)
zp = f"zoompan=z='{z}':x='{x}':y='{y}':d=1:s={w}x{h}:fps=30"
filter_parts.append(
f"[0:v]trim=start={s}:end={e},setpts=PTS-STARTPTS,fps=30,{zp},setpts=PTS-STARTPTS[v{i}]"
)
v_labels.append(f"[v{i}]")
if has_audio:
filter_parts.append(f"[0:a]atrim=start={s}:end={e},asetpts=PTS-STARTPTS[a{i}]")
a_labels.append(f"[a{i}]")
n = len(segments)
filter_parts.append("".join(v_labels) + f"concat=n={n}:v=1:a=0[vout]")
if has_audio:
filter_parts.append("".join(a_labels) + f"concat=n={n}:v=0:a=1[aout]")
cmd = [
"ffmpeg", "-y", "-i", str(input_path),
"-filter_complex", ";".join(filter_parts),
"-map", "[vout]",
]
if has_audio:
cmd += ["-map", "[aout]", "-c:a", "aac"]
cmd += ["-c:v", "libx264", "-movflags", "+faststart", str(output_path)]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
if result.returncode == 0 and output_path.exists():
logger.info(f"Per-segment zoom: {n} segments, {w}x{h}")
return output_path
logger.warning(f"Per-segment zoom failed: {result.stderr[-800:]}")
return input_path
# ─── Per-segment ASS subtitles ────────────────────────────────────────────────
_ASS_COLORS = {
"white": "&H00FFFFFF",
"yellow": "&H0000FFFF",
"cyan": "&H00FFFF00",
"orange": "&H000066FF",
"green": "&H0000FF00",
"red": "&H000000FF",
}
_POSITIONS = {"top", "bottom", "left", "right", "center", "free"}
_MODES = {"word", "phrase", "sentence"}
_EMPHASIS = {"pop", "punch", "calm"}
_ANCHORS = set(range(1, 10))
def _ts(t: float) -> str:
total_cs = max(0, int(round(t * 100)))
h = total_cs // 360000
total_cs %= 360000
m = total_cs // 6000
total_cs %= 6000
s = total_cs // 100
cs = total_cs % 100
return f"{h}:{m:02d}:{s:02d}.{cs:02d}"
def _pick(value: object, allowed: set[str], fallback: str) -> str:
v = str(value or "").strip().lower()
return v if v in allowed else fallback
def _clamp_float(value: object, fallback: float, low: float = 0.0, high: float = 1.0) -> float:
try:
return min(high, max(low, float(value)))
except Exception:
return fallback
def _clamp_int(value: object, fallback: int, allowed: set[int]) -> int:
try:
v = int(value)
except Exception:
return fallback
return v if v in allowed else fallback
def _normalise_bbox(value: object) -> list[float] | None:
if not isinstance(value, (list, tuple)) or len(value) != 4:
return None
try:
coords = [float(v) for v in value]
except Exception:
return None
if max(abs(v) for v in coords) > 1.5:
return None
x1, y1, x2, y2 = coords
x1, x2 = sorted((min(1.0, max(0.0, x1)), min(1.0, max(0.0, x2))))
y1, y2 = sorted((min(1.0, max(0.0, y1)), min(1.0, max(0.0, y2))))
if x2 - x1 < 0.02 or y2 - y1 < 0.02:
return None
return [x1, y1, x2, y2]
def _caption_anchor_for(x: float, y: float) -> int:
if y < 0.34:
return 8 if 0.30 <= x <= 0.70 else 7 if x < 0.5 else 9
if y > 0.66:
return 2 if 0.30 <= x <= 0.70 else 1 if x < 0.5 else 3
return 5 if 0.34 <= x <= 0.66 else 4 if x < 0.5 else 6
def _safe_caption_point(subject_x: float, subject_y: float, seg_idx: int) -> tuple[float, float, int]:
"""Pick a varied but readable empty-ish zone opposite the main subject."""
left_side = subject_x < 0.50
high_subject = subject_y < 0.42
low_subject = subject_y > 0.62
candidates = [
(0.68 if left_side else 0.32, 0.72 if high_subject else 0.24 if low_subject else 0.76),
(0.72 if left_side else 0.28, 0.50),
(0.50, 0.18 if subject_y > 0.45 else 0.82),
(0.50, 0.72),
]
x, y = candidates[seg_idx % len(candidates)]
return x, y, _caption_anchor_for(x, y)
def _word_caption_point(subject_x: float, subject_y: float, seg_idx: int) -> tuple[float, float, int]:
"""Put highlight words in punchy mid-frame zones instead of ordinary subtitle zones."""
candidates = [
(0.50, 0.42),
(0.50, 0.26),
(0.28 if subject_x > 0.55 else 0.72, 0.46),
(0.30 if subject_x > 0.50 else 0.70, 0.58),
]
x, y = candidates[seg_idx % len(candidates)]
if abs(x - subject_x) < 0.18 and abs(y - subject_y) < 0.18:
x = 0.25 if subject_x > 0.5 else 0.75
return x, y, _caption_anchor_for(x, y)
def _normalise_analysis(analysis: dict, seg_idx: int, n_total: int) -> dict:
"""Validate model output and fill HRE fields used by the renderer."""
an = dict(analysis or {})
subject_bbox = _normalise_bbox(an.get("subject_bbox"))
energy = _pick(an.get("energy_level"), {"high", "medium", "low"}, "medium")
moment = _pick(
an.get("moment_type"),
{"hook", "punchline", "context", "reaction", "transition"},
"context",
)
fallback_mode = "word" if energy == "high" or moment in {"hook", "punchline", "reaction"} else "sentence"
if energy == "medium" and moment not in {"context", "transition"}:
fallback_mode = "phrase"
if subject_bbox:
subject_x = (subject_bbox[0] + subject_bbox[2]) / 2.0
subject_y = (subject_bbox[1] + subject_bbox[3]) / 2.0
else:
subject_x = _clamp_float(an.get("face_cx"), 0.5)
subject_y = _clamp_float(an.get("face_cy"), 0.38)
pos = _pick(an.get("subtitle_position"), _POSITIONS, "free")
mode = _pick(an.get("subtitle_mode"), _MODES, fallback_mode)
emphasis = _pick(an.get("subtitle_emphasis"), _EMPHASIS, "punch" if mode == "word" else "calm")
color = _pick(an.get("subtitle_color"), set(_ASS_COLORS), "white")
zoom_direction = _pick(an.get("zoom_direction"), {"in", "out", "hold"}, "in")
zoom_speed = _pick(an.get("zoom_speed"), {"fast", "slow"}, "slow")
face_cx = _clamp_float(an.get("face_cx"), subject_x)
face_cy = _clamp_float(an.get("face_cy"), subject_y)
zoom_anchor_x = _clamp_float(an.get("zoom_anchor_x"), face_cx)
zoom_anchor_y = _clamp_float(an.get("zoom_anchor_y"), face_cy)
fallback_x, fallback_y, fallback_anchor = _safe_caption_point(subject_x, subject_y, seg_idx)
caption_x = _clamp_float(an.get("caption_x"), fallback_x, 0.10, 0.90)
caption_y = _clamp_float(an.get("caption_y"), fallback_y, 0.12, 0.88)
caption_anchor = _clamp_int(an.get("caption_anchor"), fallback_anchor, _ANCHORS)
caption_max_width_pct = _clamp_float(
an.get("caption_max_width_pct"),
0.58 if mode != "sentence" else 0.72,
0.35,
0.82,
)
if mode == "sentence":
caption_x = 0.50
caption_y = _clamp_float(an.get("caption_y"), 0.70, 0.64, 0.74)
caption_anchor = 2
caption_max_width_pct = max(caption_max_width_pct, 0.68)
elif mode == "word":
word_x, word_y, word_anchor = _word_caption_point(subject_x, subject_y, seg_idx)
if caption_y > 0.66 or (abs(caption_x - subject_x) < 0.14 and abs(caption_y - subject_y) < 0.14):
caption_x, caption_y, caption_anchor = word_x, word_y, word_anchor
caption_max_width_pct = min(caption_max_width_pct, 0.56)
if subject_bbox:
x1, y1, x2, y2 = subject_bbox
overlaps_subject = (x1 - 0.08) <= caption_x <= (x2 + 0.08) and (y1 - 0.08) <= caption_y <= (y2 + 0.08)
if overlaps_subject:
if mode == "sentence":
caption_x, caption_y, caption_anchor = 0.50, 0.70, 2
elif mode == "word":
caption_x, caption_y, caption_anchor = _word_caption_point(subject_x, subject_y, seg_idx + 1)
else:
caption_x, caption_y, caption_anchor = fallback_x, fallback_y, fallback_anchor
if seg_idx == 0:
zoom_direction, zoom_speed = "in", "fast"
if mode == "sentence":
mode = "word"
if emphasis == "calm":
emphasis = "punch"
if mode == "word" or moment in {"hook", "punchline", "reaction"}:
zoom_direction = "in"
zoom_speed = "fast" if energy == "high" else "slow"
emphasis = "punch" if emphasis == "calm" else emphasis
elif mode == "sentence" and moment in {"context", "transition"}:
zoom_direction = "hold"
zoom_speed = "slow"
emphasis = "calm"
return {
**an,
"zoom_direction": zoom_direction,
"zoom_speed": zoom_speed,
"face_detected": bool(an.get("face_detected", False)),
"face_cx": face_cx,
"face_cy": face_cy,
"subject_bbox": subject_bbox,
"zoom_anchor_x": zoom_anchor_x,
"zoom_anchor_y": zoom_anchor_y,
"subtitle_position": pos,
"caption_x": caption_x,
"caption_y": caption_y,
"caption_anchor": caption_anchor,
"caption_max_width_pct": caption_max_width_pct,
"subtitle_mode": mode,
"subtitle_emphasis": emphasis,
"subtitle_color": color,
"energy_level": energy,
"moment_type": moment,
}
def _build_hre_plan(segments: list[dict], analyses: list[dict]) -> list[dict]:
plan = []
n_total = len(segments)
for i, (seg, analysis) in enumerate(zip(segments, analyses)):
an = _normalise_analysis(analysis, i, n_total)
plan.append({**an, "segment_index": i, "start": seg["start"], "end": seg["end"]})
# If the model repeats the same caption treatment for every segment, rotate
# through safe defaults so HRE visibly changes across the clip.
if len(plan) > 1 and len({(round(p["caption_x"], 2), round(p["caption_y"], 2), p["subtitle_mode"]) for p in plan}) == 1:
positions = ["free", "free", "free", "free", "free", "free"]
coords = [(0.50, 0.76), (0.50, 0.18), (0.28, 0.56), (0.72, 0.52), (0.50, 0.82), (0.50, 0.22)]
modes = ["word", "sentence", "phrase", "word", "sentence", "phrase"]
for i, p in enumerate(plan):
p["subtitle_position"] = positions[i % len(positions)]
p["caption_x"], p["caption_y"] = coords[i % len(coords)]
p["caption_anchor"] = _caption_anchor_for(p["caption_x"], p["caption_y"])
p["subtitle_mode"] = modes[i % len(modes)]
if p["subtitle_mode"] == "word":
p["subtitle_emphasis"] = "punch"
return plan
def _ass_escape(text: str) -> str:
return (
text.replace("{", "(")
.replace("}", ")")
.replace("\r", " ")
.replace("\n", " ")
.strip()
)
def _wrap_text(text: str, max_chars: int) -> str:
text = _ass_escape(text)
if len(text) <= max_chars:
return text
words = text.split()
if len(words) <= 1:
return r"\N".join(text[i:i + max_chars] for i in range(0, len(text), max_chars))
lines: list[str] = []
line = ""
for word in words:
candidate = f"{line} {word}".strip()
if line and len(candidate) > max_chars:
lines.append(line)
line = word
else:
line = candidate
if line:
lines.append(line)
if len(lines) <= 2:
return r"\N".join(lines)
return r"\N".join([lines[0], " ".join(lines[1:])])
def _collect_clip_words(transcript: dict, clip_start: float, duration: float) -> list[dict]:
words: list[dict] = []
for seg in transcript.get("segments", []):
seg_start = float(seg.get("start", clip_start)) - clip_start
seg_end = float(seg.get("end", clip_start)) - clip_start
for word in seg.get("words", []):
text = str(word.get("word", word.get("text", ""))).strip()
if not text:
continue
start = float(word.get("start", seg_start + clip_start)) - clip_start
end = float(word.get("end", word.get("start", seg_end + clip_start))) - clip_start
if end <= start:
end = start + 0.24
if end <= 0 or start >= duration:
continue
words.append({
"start": max(0.0, start),
"end": min(duration, end),
"text": text,
})
return sorted(words, key=lambda w: (w["start"], w["end"]))
def _segment_text(transcript: dict, clip_start: float, seg: dict) -> str:
parts: list[str] = []
for item in transcript.get("segments", []):
start = float(item.get("start", clip_start)) - clip_start
end = float(item.get("end", clip_start)) - clip_start
if start < seg["end"] and end > seg["start"]:
text = str(item.get("text", "")).strip()
if text:
parts.append(text)
return " ".join(parts).strip()
def _words_in_segment(words: list[dict], seg: dict) -> list[dict]:
return [
w for w in words
if w["start"] < seg["end"] and w["end"] > seg["start"]
]
def _display_text(text: str, mode: str, emphasis: str) -> str:
text = text.strip()
if mode == "sentence" and emphasis == "calm":
return text
return text.upper()
def _append_event(events: list[dict], start: float, end: float, text: str, plan: dict) -> None:
start = max(float(plan["start"]), start)
end = min(float(plan["end"]), end)
if end - start < 0.08 or not text.strip():
return
events.append({
"start": start,
"end": end,
"text": text.strip(),
"plan": plan,
})
def _word_events(words: list[dict], seg: dict, plan: dict) -> list[dict]:
events: list[dict] = []
cursor = seg["start"]
min_d = 0.14 if plan["energy_level"] == "high" else 0.18
max_d = 0.72 if plan["energy_level"] == "high" else 0.95
for i, word in enumerate(words):
start = max(seg["start"], word["start"], cursor)
next_start = words[i + 1]["start"] if i + 1 < len(words) else seg["end"]
natural_end = max(word["end"], start + min_d)
end = min(seg["end"], natural_end, start + max_d)
if next_start > start:
end = min(end, max(start + min_d, next_start - 0.015))
if end <= start:
end = min(seg["end"], start + min_d)
_append_event(events, start, end, word["text"], plan)
cursor = end + 0.015
if cursor >= seg["end"]:
break
return events
def _line_events(
words: list[dict],
seg: dict,
plan: dict,
max_words: int,
max_duration: float,
max_chars: int,
) -> list[dict]:
events: list[dict] = []
i = 0
cursor = seg["start"]
while i < len(words) and cursor < seg["end"] - 0.08:
group: list[dict] = []
start = max(seg["start"], words[i]["start"], cursor)
end = start
chars = 0
while i < len(words):
word = words[i]
proposed_end = min(seg["end"], max(word["end"], word["start"] + 0.2))
proposed_chars = chars + len(word["text"]) + (1 if group else 0)
if group and (
len(group) >= max_words
or proposed_end - start > max_duration
or proposed_chars > max_chars
):
break
group.append(word)
chars = proposed_chars
end = max(end, proposed_end)
i += 1
if not group:
i += 1
continue
end = min(seg["end"], max(end, start + 0.55))
text = " ".join(w["text"] for w in group)
_append_event(events, start, end, text, plan)
cursor = end + 0.04
return events
def _fallback_text_events(text: str, seg: dict, plan: dict) -> list[dict]:
if not text:
return []
mode = plan["subtitle_mode"]
if mode == "word":
chunk_size = 1
elif mode == "phrase":
chunk_size = 3
else:
chunk_size = 7
units = text.split()
if len(units) <= 1 and len(text) > 20:
step = 10 if mode == "word" else 24 if mode == "phrase" else 36
units = [text[i:i + step] for i in range(0, len(text), step)]
chunks = [" ".join(units[i:i + chunk_size]) for i in range(0, len(units), chunk_size)]
chunks = [c for c in chunks if c.strip()]
if not chunks:
return []
events: list[dict] = []
seg_d = max(0.1, seg["end"] - seg["start"])
dur = seg_d / len(chunks)
for i, chunk in enumerate(chunks):
start = seg["start"] + i * dur
end = seg["start"] + (i + 1) * dur
_append_event(events, start, end, chunk, plan)
return events
def _build_subtitle_events(
transcript: dict,
clip_start: float,
duration: float,
segments: list[dict],
plan: list[dict],
) -> list[dict]:
words = _collect_clip_words(transcript, clip_start, duration)
events: list[dict] = []
for seg, seg_plan in zip(segments, plan):
seg_words = _words_in_segment(words, seg)
mode = seg_plan["subtitle_mode"]
if seg_words and mode == "word":
seg_events = _word_events(seg_words, seg, seg_plan)
elif seg_words and mode == "phrase":
seg_events = _line_events(seg_words, seg, seg_plan, max_words=3, max_duration=1.7, max_chars=28)
elif seg_words:
seg_events = _line_events(seg_words, seg, seg_plan, max_words=7, max_duration=2.8, max_chars=44)
else:
seg_events = []
if not seg_events:
seg_events = _fallback_text_events(_segment_text(transcript, clip_start, seg), seg, seg_plan)
events.extend(seg_events)
events = sorted(events, key=lambda ev: (ev["start"], ev["end"]))
# ASS draws all active events at once; keep one visible caption event at a
# time so word/phrase/sentence modes never stack on top of each other.
cleaned: list[dict] = []
cursor = 0.0
for ev in events:
start = max(ev["start"], cursor)
end = min(duration, ev["end"])
if end - start < 0.08:
continue
cleaned.append({**ev, "start": start, "end": end})
cursor = end + 0.01
return cleaned
def _subtitle_tag(plan: dict) -> tuple[str, int]:
mode = plan["subtitle_mode"]
energy = plan["energy_level"]
emphasis = plan["subtitle_emphasis"]
color = _ASS_COLORS.get(plan["subtitle_color"], "&H00FFFFFF")
alignment = int(plan.get("caption_anchor", 5))
x = round(_clamp_float(plan.get("caption_x"), 0.5, 0.08, 0.92) * 1080)
y = round(_clamp_float(plan.get("caption_y"), 0.75, 0.10, 0.90) * 1920)
max_width_px = max(360, min(886, int(_clamp_float(plan.get("caption_max_width_pct"), 0.62, 0.35, 0.82) * 1080)))
if mode == "sentence":
font_size = 54 if energy != "high" else 60
elif mode == "phrase":
font_size = 68 if energy != "low" else 62
else:
font_size = 96 if energy == "high" else 84
if alignment in {4, 5, 6}:
font_size = max(54, font_size - 4)
max_chars = max(8, min(34, int(max_width_px / (font_size * 0.58))))
base = (
f"{{\\an{alignment}\\pos({x},{y})\\1c{color}&\\fs{font_size}"
"\\b1\\bord5\\shad1\\q2\\fad(30,70)}"
)
if emphasis == "punch" or mode == "word":
base += "{\\fscx132\\fscy132\\frz-2\\t(0,140,\\fscx100\\fscy100\\frz0)}"
elif emphasis == "pop":
base += "{\\fscx118\\fscy118\\t(0,120,\\fscx100\\fscy100)}"
return base, max_chars
def _generate_per_segment_subtitles(
transcript: dict,
ass_path: Path,
clip_start: float,
segments: list[dict],
analyses: list[dict],
) -> None:
"""Write one ASS file from the HRE plan.
The important rule is that HRE can change style every segment, but it must
never emit simultaneous caption events at the same timestamp.
"""
duration = max((float(seg["end"]) for seg in segments), default=0.0)
plan = _build_hre_plan(segments, analyses)
events = _build_subtitle_events(transcript, clip_start, duration, segments, plan)
lines = [
"[Script Info]",
"ScriptType: v4.00+",
"PlayResX: 1080",
"PlayResY: 1920",
"ScaledBorderAndShadow: yes",
"",
"[V4+ Styles]",
"Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, "
"OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, "
"ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, "
"Alignment, MarginL, MarginR, MarginV, Encoding",
"Style: Default,Noto Sans,82,&H00FFFFFF,&H0000FFFF,&H00000000,&H80000000,"
"-1,0,0,0,100,100,0,0,1,5,1,2,40,40,200,1",
"",
"[Events]",
"Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text",
]
for ev in events:
seg_plan = ev["plan"]
tag, max_chars = _subtitle_tag(seg_plan)
text = _display_text(ev["text"], seg_plan["subtitle_mode"], seg_plan["subtitle_emphasis"])
text = _wrap_text(text, max_chars)
lines.append(
f"Dialogue: 0,{_ts(ev['start'])},{_ts(ev['end'])},"
f"Default,,0,0,0,,{tag}{text}"
)
ass_path.write_text("\n".join(lines), encoding="utf-8")
plan_path = ass_path.with_suffix(".hre_plan.json")
plan_path.write_text(json.dumps(plan, ensure_ascii=False, indent=2), encoding="utf-8")
logger.debug(f"ASS: {len(events)} events across {len(segments)} HRE segments")
# ─── Emoji ─────────────────────────────────────────────────────────────────────
def _get_emoji(clip_data: dict, analyses: list[dict] | None = None) -> str:
if analyses:
energy_rank = {"high": 3, "medium": 2, "low": 1}
best = max(analyses, key=lambda a: energy_rank.get(a.get("energy_level", "low"), 1))
moment_emoji = {
"hook": "🔥", "punchline": "😂", "reaction": "😲",
"context": "💡", "transition": "✨",
}
if emoji := moment_emoji.get(best.get("moment_type", "")):
return emoji
a = clip_data.get("vision_analysis", {})
emotion = a.get("emotion", "excited")
action = a.get("action_type", "entertainment")
transcript_text = clip_data.get("transcript_text", "")
if transcript_text:
try:
from src.analysis.vision import get_emoji_for_scene
return get_emoji_for_scene(transcript_text, emotion, action)
except Exception:
pass
fb = {"happy": "😄", "excited": "🔥", "funny": "😂", "surprised": "😲",
"gaming": "🎮", "tutorial": "📚", "angry": "😤", "sad": "😢"}
return fb.get(emotion, fb.get(action, "⚡"))
# ─── Final render ─────────────────────────────────────────────────────────────
def _render_final(
video_path: Path,
ass_path: Path,
emoji: str,
output_path: Path,
) -> None:
ass_str = str(ass_path).replace("\\", "/").replace(":", "\\:")
emoji_filter = (
f"drawtext=text='{emoji}':fontsize=80:x=w-100:y=50"
f":enable='between(t\\,0\\,3)'"
)
vf = f"ass='{ass_str}',{emoji_filter}"
cmd = [
"ffmpeg", "-y", "-i", str(video_path),
"-vf", vf, "-c:v", "libx264", "-c:a", "copy",
"-movflags", "+faststart", str(output_path),
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
if result.returncode != 0:
cmd2 = [
"ffmpeg", "-y", "-i", str(video_path),
"-vf", f"ass='{ass_str}'",
"-c:v", "libx264", "-c:a", "copy", str(output_path),
]
result2 = subprocess.run(cmd2, capture_output=True, text=True, timeout=300)
if result2.returncode != 0:
logger.error(f"HRE render failed: {result2.stderr[-300:]}")
return
logger.info(f"HRE render complete → {output_path.name}")
# ─── Main pipeline ────────────────────────────────────────────────────────────
def apply_hre(
clip_path: Path,
clip_data: dict,
transcript: dict,
output_path: Path,
) -> Path:
"""Apply per-segment AI-driven HRE with varied zoom and caption plans."""
output_path.parent.mkdir(parents=True, exist_ok=True)
clip_start = clip_data.get("start", 0.0)
with tempfile.TemporaryDirectory() as _tmp:
tmp_dir = Path(_tmp)
tmp_zoomed = tmp_dir / "zoomed.mp4"
w, h = _probe_dimensions(clip_path)
duration = _probe_duration(clip_path)
if duration <= 0:
duration = float(clip_data.get("end", clip_start + 30)) - clip_start
has_audio = _has_audio_stream(clip_path)
# 1. Segment at speech pauses
segments = _segment_clip(duration, transcript, clip_start)
n = len(segments)
logger.info(
f"HRE clip {clip_data.get('index', '?')}: "
f"{duration:.1f}s → {n} segments (AI analyzing each)"
)
# 2. Qwen2.5-VL analyzes each segment
analyses = [
_analyze_segment(clip_path, seg, i, n, transcript, clip_start, tmp_dir)
for i, seg in enumerate(segments)
]
plan = _build_hre_plan(segments, analyses)
for i, (seg, an) in enumerate(zip(segments, plan)):
logger.info(
f" [{seg['start']:.1f}s-{seg['end']:.1f}s] "
f"zoom={an.get('zoom_direction')}({an.get('zoom_speed')}) "
f"sub={an.get('subtitle_position')}/{an.get('subtitle_mode')}/"
f"{an.get('subtitle_color')} "
f"type={an.get('moment_type')} energy={an.get('energy_level')}"
)
# 3. Per-segment zoom via filter_complex
zoomed = _apply_per_segment_zoom(
clip_path, segments, plan, w, h, tmp_zoomed, has_audio=has_audio
)
# 4. Per-segment ASS subtitles
ass_path = output_path.with_suffix(".ass")
_generate_per_segment_subtitles(transcript, ass_path, clip_start, segments, plan)
# 5. Emoji from highest-energy segment
emoji = _get_emoji(clip_data, plan)
# 6. Render
_render_final(zoomed, ass_path, emoji, output_path)
return output_path