"""Extract video clips using ffmpeg-python.""" import asyncio import subprocess from pathlib import Path from loguru import logger ANALYSIS_FRAME_WIDTH = 640.0 def _normalise_bbox(face_bbox: list | None) -> list[float] | None: if not face_bbox or len(face_bbox) != 4: return None try: coords = [float(v) for v in face_bbox] except Exception: return None if max(abs(v) for v in coords) > 1.5: # Legacy pixel fallback is handled by _face_center_expr. return None x1, y1, x2, y2 = coords x1, x2 = sorted((min(1.0, max(0.0, x1)), min(1.0, max(0.0, x2)))) y1, y2 = sorted((min(1.0, max(0.0, y1)), min(1.0, max(0.0, y2)))) if x2 - x1 < 0.02 or y2 - y1 < 0.02: return None return [x1, y1, x2, y2] def _outer_subject_x(x1: float, x2: float) -> float: """Aim toward the face side when a person box covers torso/background too.""" center = (x1 + x2) / 2.0 width = x2 - x1 if width < 0.18: return center if center > 0.54 or x2 > 0.64: return x1 * 0.32 + x2 * 0.68 if center < 0.46 or x1 < 0.36: return x1 * 0.68 + x2 * 0.32 return center def _detect_face_bbox(video_path: Path, start: float, end: float) -> list[float] | None: """Detect a real face in sampled source frames before the 9:16 crop. Qwen's scene-level bbox can focus on the product/screen instead of the presenter. A lightweight OpenCV pass gives the cropper a concrete face target when there is a person in frame. """ try: import cv2 except Exception as exc: logger.debug(f"OpenCV face crop skipped: {exc}") return None cascade_paths = [ Path(cv2.data.haarcascades) / "haarcascade_frontalface_default.xml", Path(cv2.data.haarcascades) / "haarcascade_profileface.xml", ] cascades = [cv2.CascadeClassifier(str(p)) for p in cascade_paths if p.exists()] cascades = [c for c in cascades if not c.empty()] if not cascades: return None cap = cv2.VideoCapture(str(video_path)) if not cap.isOpened(): return None duration = max(0.2, float(end) - float(start)) sample_times = [ float(start) + duration * r for r in (0.12, 0.25, 0.40, 0.55, 0.72, 0.88) ] best_bbox: list[float] | None = None best_score = 0.0 try: for t in sample_times: cap.set(cv2.CAP_PROP_POS_MSEC, max(0.0, t) * 1000) ok, frame = cap.read() if not ok or frame is None: continue fh, fw = frame.shape[:2] if fw <= 0 or fh <= 0: continue gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) gray = cv2.equalizeHist(gray) candidates: list[tuple[int, int, int, int]] = [] for cascade in cascades: faces = cascade.detectMultiScale( gray, scaleFactor=1.08, minNeighbors=4, minSize=(max(36, fw // 40), max(36, fh // 40)), ) candidates.extend(tuple(map(int, face)) for face in faces) flipped = cv2.flip(gray, 1) flipped_faces = cascade.detectMultiScale( flipped, scaleFactor=1.08, minNeighbors=4, minSize=(max(36, fw // 40), max(36, fh // 40)), ) for x, y, w, h in flipped_faces: candidates.append((fw - int(x) - int(w), int(y), int(w), int(h))) for x, y, w, h in candidates: area = w * h if area <= 0: continue face_cx = (x + w / 2) / fw face_cy = (y + h / 2) / fh # Prefer speaker-size faces, avoid tiny false positives near corners. centrality = 1.0 - min(0.6, abs(face_cy - 0.36)) score = area * centrality if score > best_score: pad_x = w * 0.28 pad_y = h * 0.40 best_bbox = [ max(0.0, (x - pad_x) / fw), max(0.0, (y - pad_y) / fh), min(1.0, (x + w + pad_x) / fw), min(1.0, (y + h + pad_y) / fh), ] best_score = score finally: cap.release() if best_bbox: logger.info( "OpenCV face crop target: " f"x={((best_bbox[0] + best_bbox[2]) / 2):.2f} " f"y={((best_bbox[1] + best_bbox[3]) / 2):.2f}" ) return best_bbox def _face_center_expr(face_bbox: list | None, bias_outer: bool = False) -> str | None: """Return a crop expression x-center from Qwen's normalized face bbox.""" if not face_bbox or len(face_bbox) != 4: return None try: x1, _, x2, _ = [float(v) for v in face_bbox] except Exception: return None # Qwen is prompted for normalized values, but often returns pixel boxes from # the 640px analysis frames. Treat those as 640-wide before falling back. if max(abs(x1), abs(x2)) <= 1.5: x1, x2 = sorted((min(1.0, max(0.0, x1)), min(1.0, max(0.0, x2)))) face_cx = _outer_subject_x(x1, x2) if bias_outer else (x1 + x2) / 2.0 return f"{face_cx:.4f}*iw-540" if 0 <= x1 <= ANALYSIS_FRAME_WIDTH * 1.25 and 0 <= x2 <= ANALYSIS_FRAME_WIDTH * 1.25: x1, x2 = sorted((x1 / ANALYSIS_FRAME_WIDTH, x2 / ANALYSIS_FRAME_WIDTH)) x1, x2 = min(1.0, max(0.0, x1)), min(1.0, max(0.0, x2)) face_cx = _outer_subject_x(x1, x2) return f"{face_cx:.4f}*iw-540" return None def _safe_fit_filter() -> str: """Keep the full source frame visible on a blurred 9:16 background.""" return ( "[0:v]split=2[bg][fg];" "[bg]scale=1080:1920:force_original_aspect_ratio=increase," "crop=1080:1920,boxblur=luma_radius=28:luma_power=1," "eq=brightness=-0.08:saturation=0.85[bg];" "[fg]scale=1080:1920:force_original_aspect_ratio=decrease[fg];" "[bg][fg]overlay=(W-w)/2:(H-h)/2,setsar=1[vout]" ) def extract_clip( video_path: Path, start: float, end: float, output_path: Path, use_hw_encode: bool = True, vertical: bool = True, face_bbox: list = None, **kwargs, ) -> Path: """Cut a clip and convert to 9:16 vertical (1080x1920) for TikTok. face_bbox: [x1, y1, x2, y2] normalized from Qwen2.5-VL. Before cropping, the extractor samples real frames and prefers an OpenCV face box so a presenter stays visible even when Qwen focused on a product or screen. Uses AMD AMF hardware encoder when available. """ output_path.parent.mkdir(parents=True, exist_ok=True) encoders = ["h264_amf", "libx264"] if use_hw_encode else ["libx264"] # 9:16 vertical conversion filter vf_filters = [] filter_complex = None if vertical: aspect_mode = kwargs.get("aspect_mode", "crop") if aspect_mode == "safe_fit": filter_complex = _safe_fit_filter() elif aspect_mode == "letterbox": # Fit entire 16:9 frame into 9:16, black bars top+bottom vf_filters.append( "scale=1080:1920:force_original_aspect_ratio=decrease," "pad=1080:1920:(ow-iw)/2:(oh-ih)/2:black" ) else: # Crop: scale to 1920 height first, then center-crop to 1080 wide # Center on a detected real face first, then Qwen's face bbox. detected_face_bbox = _detect_face_bbox(video_path, start, end) if detected_face_bbox: face_expr = _face_center_expr(detected_face_bbox) else: normalized_bbox = _normalise_bbox(face_bbox) face_expr = ( _face_center_expr(normalized_bbox) or _face_center_expr(face_bbox, bias_outer=True) ) if face_expr: crop = f"scale=-1:1920,crop=1080:1920:max(0\\,min(iw-1080\\,{face_expr})):0" else: crop = "scale=-1:1920,crop=1080:1920:(iw-1080)/2:0" vf_filters.append(crop) for encoder in encoders: cmd = ["ffmpeg", "-y", "-ss", str(start), "-to", str(end), "-i", str(video_path)] if filter_complex: cmd += ["-filter_complex", filter_complex, "-map", "[vout]", "-map", "0:a?"] elif vf_filters: cmd += ["-vf", ",".join(vf_filters)] cmd += ["-c:v", encoder, "-c:a", "aac", "-b:a", "128k", "-movflags", "+faststart", str(output_path)] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode == 0: if encoder == "h264_amf": logger.info(f"Encoded 9:16 with AMD AMF: {output_path.name}") return output_path elif encoder == "h264_amf": logger.debug("AMD AMF not available, falling back to libx264") raise RuntimeError(f"All encoders failed for clip {output_path.name}") def burn_subtitles( clip_path: Path, ass_path: Path, output_path: Path, use_hw_encode: bool = True, ) -> Path: """Burn ASS subtitles into video using ffmpeg. Returns path to output video with burned-in subtitles. """ output_path.parent.mkdir(parents=True, exist_ok=True) ass_str = str(ass_path).replace("\\", "/").replace(":", "\\:") encoders = ["h264_amf", "libx264"] if use_hw_encode else ["libx264"] for encoder in encoders: cmd = [ "ffmpeg", "-y", "-i", str(clip_path), "-vf", f"ass='{ass_str}'", "-c:v", encoder, "-c:a", "copy", "-movflags", "+faststart", str(output_path), ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode == 0: return output_path elif encoder == "h264_amf": logger.debug("AMD AMF burn-sub failed, using libx264") raise RuntimeError(f"Subtitle burn-in failed for {clip_path.name}\n{result.stderr[-500:]}") def extract_all_clips( video_path: Path, selected_clips: list[dict], output_dir: Path, session_id: str, aspect_mode: str = "crop", ) -> list[dict]: """Extract all selected clips from video. Returns list with added 'clip_path'.""" results = [] for i, clip in enumerate(selected_clips): out_path = output_dir / f"{session_id}_clip_{i+1:02d}_raw.mp4" face_bbox = clip.get("vision_analysis", {}).get("face_bbox") try: extract_clip(video_path, clip["start"], clip["end"], out_path, face_bbox=face_bbox, aspect_mode=aspect_mode) results.append({**clip, "clip_index": i + 1, "clip_path": str(out_path)}) logger.info(f"Extracted clip {i+1}: {clip['start']:.1f}s–{clip['end']:.1f}s → {out_path.name}") except Exception as e: logger.error(f"Failed to extract clip {i+1}: {e}") results.append({**clip, "clip_index": i + 1, "clip_path": None, "error": str(e)}) return results async def extract_all_clips_async( video_path: Path, selected_clips: list[dict], output_dir: Path, session_id: str, aspect_mode: str = "crop", ) -> list[dict]: loop = asyncio.get_event_loop() return await loop.run_in_executor( None, lambda: extract_all_clips(video_path, selected_clips, output_dir, session_id, aspect_mode) )