ElevenClip-AI / backend /src /processing /clip_extractor.py
jakgritb's picture
fix: handle pixel face boxes in crop fallback
1a29e06 verified
Raw
History Blame Contribute Delete
11.6 kB
"""Extract video clips using ffmpeg-python."""
import asyncio
import subprocess
from pathlib import Path
from loguru import logger
ANALYSIS_FRAME_WIDTH = 640.0
def _normalise_bbox(face_bbox: list | None) -> list[float] | None:
if not face_bbox or len(face_bbox) != 4:
return None
try:
coords = [float(v) for v in face_bbox]
except Exception:
return None
if max(abs(v) for v in coords) > 1.5:
# Legacy pixel fallback is handled by _face_center_expr.
return None
x1, y1, x2, y2 = coords
x1, x2 = sorted((min(1.0, max(0.0, x1)), min(1.0, max(0.0, x2))))
y1, y2 = sorted((min(1.0, max(0.0, y1)), min(1.0, max(0.0, y2))))
if x2 - x1 < 0.02 or y2 - y1 < 0.02:
return None
return [x1, y1, x2, y2]
def _outer_subject_x(x1: float, x2: float) -> float:
"""Aim toward the face side when a person box covers torso/background too."""
center = (x1 + x2) / 2.0
width = x2 - x1
if width < 0.18:
return center
if center > 0.54 or x2 > 0.64:
return x1 * 0.32 + x2 * 0.68
if center < 0.46 or x1 < 0.36:
return x1 * 0.68 + x2 * 0.32
return center
def _detect_face_bbox(video_path: Path, start: float, end: float) -> list[float] | None:
"""Detect a real face in sampled source frames before the 9:16 crop.
Qwen's scene-level bbox can focus on the product/screen instead of the
presenter. A lightweight OpenCV pass gives the cropper a concrete face
target when there is a person in frame.
"""
try:
import cv2
except Exception as exc:
logger.debug(f"OpenCV face crop skipped: {exc}")
return None
cascade_paths = [
Path(cv2.data.haarcascades) / "haarcascade_frontalface_default.xml",
Path(cv2.data.haarcascades) / "haarcascade_profileface.xml",
]
cascades = [cv2.CascadeClassifier(str(p)) for p in cascade_paths if p.exists()]
cascades = [c for c in cascades if not c.empty()]
if not cascades:
return None
cap = cv2.VideoCapture(str(video_path))
if not cap.isOpened():
return None
duration = max(0.2, float(end) - float(start))
sample_times = [
float(start) + duration * r
for r in (0.12, 0.25, 0.40, 0.55, 0.72, 0.88)
]
best_bbox: list[float] | None = None
best_score = 0.0
try:
for t in sample_times:
cap.set(cv2.CAP_PROP_POS_MSEC, max(0.0, t) * 1000)
ok, frame = cap.read()
if not ok or frame is None:
continue
fh, fw = frame.shape[:2]
if fw <= 0 or fh <= 0:
continue
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
gray = cv2.equalizeHist(gray)
candidates: list[tuple[int, int, int, int]] = []
for cascade in cascades:
faces = cascade.detectMultiScale(
gray,
scaleFactor=1.08,
minNeighbors=4,
minSize=(max(36, fw // 40), max(36, fh // 40)),
)
candidates.extend(tuple(map(int, face)) for face in faces)
flipped = cv2.flip(gray, 1)
flipped_faces = cascade.detectMultiScale(
flipped,
scaleFactor=1.08,
minNeighbors=4,
minSize=(max(36, fw // 40), max(36, fh // 40)),
)
for x, y, w, h in flipped_faces:
candidates.append((fw - int(x) - int(w), int(y), int(w), int(h)))
for x, y, w, h in candidates:
area = w * h
if area <= 0:
continue
face_cx = (x + w / 2) / fw
face_cy = (y + h / 2) / fh
# Prefer speaker-size faces, avoid tiny false positives near corners.
centrality = 1.0 - min(0.6, abs(face_cy - 0.36))
score = area * centrality
if score > best_score:
pad_x = w * 0.28
pad_y = h * 0.40
best_bbox = [
max(0.0, (x - pad_x) / fw),
max(0.0, (y - pad_y) / fh),
min(1.0, (x + w + pad_x) / fw),
min(1.0, (y + h + pad_y) / fh),
]
best_score = score
finally:
cap.release()
if best_bbox:
logger.info(
"OpenCV face crop target: "
f"x={((best_bbox[0] + best_bbox[2]) / 2):.2f} "
f"y={((best_bbox[1] + best_bbox[3]) / 2):.2f}"
)
return best_bbox
def _face_center_expr(face_bbox: list | None, bias_outer: bool = False) -> str | None:
"""Return a crop expression x-center from Qwen's normalized face bbox."""
if not face_bbox or len(face_bbox) != 4:
return None
try:
x1, _, x2, _ = [float(v) for v in face_bbox]
except Exception:
return None
# Qwen is prompted for normalized values, but often returns pixel boxes from
# the 640px analysis frames. Treat those as 640-wide before falling back.
if max(abs(x1), abs(x2)) <= 1.5:
x1, x2 = sorted((min(1.0, max(0.0, x1)), min(1.0, max(0.0, x2))))
face_cx = _outer_subject_x(x1, x2) if bias_outer else (x1 + x2) / 2.0
return f"{face_cx:.4f}*iw-540"
if 0 <= x1 <= ANALYSIS_FRAME_WIDTH * 1.25 and 0 <= x2 <= ANALYSIS_FRAME_WIDTH * 1.25:
x1, x2 = sorted((x1 / ANALYSIS_FRAME_WIDTH, x2 / ANALYSIS_FRAME_WIDTH))
x1, x2 = min(1.0, max(0.0, x1)), min(1.0, max(0.0, x2))
face_cx = _outer_subject_x(x1, x2)
return f"{face_cx:.4f}*iw-540"
return None
def _safe_fit_filter() -> str:
"""Keep the full source frame visible on a blurred 9:16 background."""
return (
"[0:v]split=2[bg][fg];"
"[bg]scale=1080:1920:force_original_aspect_ratio=increase,"
"crop=1080:1920,boxblur=luma_radius=28:luma_power=1,"
"eq=brightness=-0.08:saturation=0.85[bg];"
"[fg]scale=1080:1920:force_original_aspect_ratio=decrease[fg];"
"[bg][fg]overlay=(W-w)/2:(H-h)/2,setsar=1[vout]"
)
def extract_clip(
video_path: Path,
start: float,
end: float,
output_path: Path,
use_hw_encode: bool = True,
vertical: bool = True,
face_bbox: list = None,
**kwargs,
) -> Path:
"""Cut a clip and convert to 9:16 vertical (1080x1920) for TikTok.
face_bbox: [x1, y1, x2, y2] normalized from Qwen2.5-VL. Before cropping,
the extractor samples real frames and prefers an OpenCV face box so a
presenter stays visible even when Qwen focused on a product or screen.
Uses AMD AMF hardware encoder when available.
"""
output_path.parent.mkdir(parents=True, exist_ok=True)
encoders = ["h264_amf", "libx264"] if use_hw_encode else ["libx264"]
# 9:16 vertical conversion filter
vf_filters = []
filter_complex = None
if vertical:
aspect_mode = kwargs.get("aspect_mode", "crop")
if aspect_mode == "safe_fit":
filter_complex = _safe_fit_filter()
elif aspect_mode == "letterbox":
# Fit entire 16:9 frame into 9:16, black bars top+bottom
vf_filters.append(
"scale=1080:1920:force_original_aspect_ratio=decrease,"
"pad=1080:1920:(ow-iw)/2:(oh-ih)/2:black"
)
else:
# Crop: scale to 1920 height first, then center-crop to 1080 wide
# Center on a detected real face first, then Qwen's face bbox.
detected_face_bbox = _detect_face_bbox(video_path, start, end)
if detected_face_bbox:
face_expr = _face_center_expr(detected_face_bbox)
else:
normalized_bbox = _normalise_bbox(face_bbox)
face_expr = (
_face_center_expr(normalized_bbox)
or _face_center_expr(face_bbox, bias_outer=True)
)
if face_expr:
crop = f"scale=-1:1920,crop=1080:1920:max(0\\,min(iw-1080\\,{face_expr})):0"
else:
crop = "scale=-1:1920,crop=1080:1920:(iw-1080)/2:0"
vf_filters.append(crop)
for encoder in encoders:
cmd = ["ffmpeg", "-y", "-ss", str(start), "-to", str(end), "-i", str(video_path)]
if filter_complex:
cmd += ["-filter_complex", filter_complex, "-map", "[vout]", "-map", "0:a?"]
elif vf_filters:
cmd += ["-vf", ",".join(vf_filters)]
cmd += ["-c:v", encoder, "-c:a", "aac", "-b:a", "128k", "-movflags", "+faststart", str(output_path)]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
if encoder == "h264_amf":
logger.info(f"Encoded 9:16 with AMD AMF: {output_path.name}")
return output_path
elif encoder == "h264_amf":
logger.debug("AMD AMF not available, falling back to libx264")
raise RuntimeError(f"All encoders failed for clip {output_path.name}")
def burn_subtitles(
clip_path: Path,
ass_path: Path,
output_path: Path,
use_hw_encode: bool = True,
) -> Path:
"""Burn ASS subtitles into video using ffmpeg.
Returns path to output video with burned-in subtitles.
"""
output_path.parent.mkdir(parents=True, exist_ok=True)
ass_str = str(ass_path).replace("\\", "/").replace(":", "\\:")
encoders = ["h264_amf", "libx264"] if use_hw_encode else ["libx264"]
for encoder in encoders:
cmd = [
"ffmpeg", "-y",
"-i", str(clip_path),
"-vf", f"ass='{ass_str}'",
"-c:v", encoder,
"-c:a", "copy",
"-movflags", "+faststart",
str(output_path),
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
return output_path
elif encoder == "h264_amf":
logger.debug("AMD AMF burn-sub failed, using libx264")
raise RuntimeError(f"Subtitle burn-in failed for {clip_path.name}\n{result.stderr[-500:]}")
def extract_all_clips(
video_path: Path,
selected_clips: list[dict],
output_dir: Path,
session_id: str,
aspect_mode: str = "crop",
) -> list[dict]:
"""Extract all selected clips from video. Returns list with added 'clip_path'."""
results = []
for i, clip in enumerate(selected_clips):
out_path = output_dir / f"{session_id}_clip_{i+1:02d}_raw.mp4"
face_bbox = clip.get("vision_analysis", {}).get("face_bbox")
try:
extract_clip(video_path, clip["start"], clip["end"], out_path, face_bbox=face_bbox, aspect_mode=aspect_mode)
results.append({**clip, "clip_index": i + 1, "clip_path": str(out_path)})
logger.info(f"Extracted clip {i+1}: {clip['start']:.1f}s–{clip['end']:.1f}s → {out_path.name}")
except Exception as e:
logger.error(f"Failed to extract clip {i+1}: {e}")
results.append({**clip, "clip_index": i + 1, "clip_path": None, "error": str(e)})
return results
async def extract_all_clips_async(
video_path: Path,
selected_clips: list[dict],
output_dir: Path,
session_id: str,
aspect_mode: str = "crop",
) -> list[dict]:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: extract_all_clips(video_path, selected_clips, output_dir, session_id, aspect_mode)
)