lipsync-docker / face_processing.py
naicoi's picture
full-lipsync-youtube (#9)
64a2ea3 verified
"""Face detection and region extraction for lipsync optimization (DEPRECATED - Pipeline handles this automatically)"""
# NOTE: All functions in this module are DEPRECATED.
# The lipsync pipeline (latentsync/pipelines/lipsync_pipeline.py) now handles:
# - Face detection
# - Affine transformation
# - Crop
# - Restore
# These functions are kept for reference but not used in the new workflow.
# import os
# import math
# import logging
# from typing import List, Dict, Tuple, Optional
#
# import cv2
# import numpy as np
# import mediapipe as mp
# from ffmpy import FFmpeg, FFRuntimeError
#
# from video_processing import get_video_info
#
# logger = logging.getLogger(__name__)
#
#
# class FaceDetectionError(Exception):
# """Custom exception for face detection errors"""
#
# pass
#
#
# def sample_frames_from_video(
# video_path: str, output_dir: str, sample_count: int = 5
# ) -> List[Tuple[int, str]]:
# """Extract uniform sample frames from video using OpenCV CUDA (HuggingFace)
#
# Args:
# video_path: Path to video
# output_dir: Directory to save extracted frames
# sample_count: Number of frames to sample
#
# Returns:
# List of (frame_index, frame_path) tuples
# """
# video_info = get_video_info(video_path)
# fps = video_info["fps"]
# duration = video_info["duration"]
# total_frames = int(duration * fps)
#
# frames_dir = os.path.join(output_dir, "sampled_frames")
# os.makedirs(frames_dir, exist_ok=True)
#
# if total_frames <= sample_count:
# frame_indices = list(range(total_frames))
# else:
# frame_indices = [
# int(i * total_frames / sample_count) for i in range(sample_count)
# ]
#
# extracted_frames = []
# cap = cv2.VideoCapture(video_path)
#
# try:
# for idx, frame_idx in enumerate(frame_indices):
# cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
# ret, frame = cap.read()
#
# if not ret or frame is None:
# logger.warning(f"Failed to read frame {frame_idx}")
# continue
#
# frame_path = os.path.join(frames_dir, f"frame_{idx:04d}.jpg")
# cv2.imwrite(frame_path, frame, [cv2.IMWRITE_JPEG_QUALITY, 90])
# extracted_frames.append((frame_idx, frame_path))
# finally:
# cap.release()
#
# logger.info(f"Extracted {len(extracted_frames)} frames from {video_path}")
# return extracted_frames
#
#
# def detect_faces_in_frames(
# extracted_frames: List[Tuple[int, str]],
# min_confidence: float = 0.5,
# min_face_pixels: int = 100,
# ) -> List[Dict]:
# """Detect faces in all sampled frames using MediaPipe Face Detection API
#
# Args:
# extracted_frames: List of (frame_index, frame_path) tuples
# min_confidence: Minimum detection confidence (0-1)
# min_face_pixels: Minimum face size in pixels
#
# Returns:
# List of detections: [{"frame_idx", "confidence", "bbox": (x, y, w, h)}]
# """
# detections = []
#
# with mp.solutions.face_detection.FaceDetection(
# model_selection=0, min_detection_confidence=min_confidence
# ) as face_detection:
# for frame_idx, frame_path in extracted_frames:
# frame = cv2.imread(frame_path)
# if frame is None:
# logger.warning(f"Failed to read frame: {frame_path}")
# continue
#
# h, w = frame.shape[:2]
# frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
#
# results = face_detection.process(frame_rgb)
#
# if results.detections:
# for detection in results.detections:
# bbox = detection.location_data.relative_bounding_box
#
# x = int(bbox.xmin * w)
# y = int(bbox.ymin * h)
# face_w = int(bbox.width * w)
# face_h = int(bbox.height * h)
#
# x = max(0, x)
# y = max(0, y)
# face_w = min(w - x, face_w)
# face_h = min(h - y, face_h)
#
# confidence = detection.score[0] if detection.score else 0.0
#
# if face_w >= min_face_pixels and face_h >= min_face_pixels:
# detections.append(
# {
# "frame_idx": frame_idx,
# "confidence": float(confidence),
# "bbox": (x, y, face_w, face_h),
# }
# )
#
# logger.info(f"Detected {len(detections)} faces in {len(extracted_frames)} frames")
# return detections
#
#
# def cluster_face_detections(
# detections: List[Dict], max_distance: int = 100
# ) -> List[List[Dict]]:
# """Group face detections belonging to the same person using clustering
#
# Args:
# detections: List of face detections
# max_distance: Maximum distance (pixels) to consider detections as same person
#
# Returns:
# List of clusters (each cluster is a list of detections)
# """
# if not detections:
# return []
#
# clusters = []
# visited = set()
#
# for i, det_i in enumerate(detections):
# if i in visited:
# continue
#
# x_i, y_i, w_i, h_i = det_i["bbox"]
# center_i = (x_i + w_i / 2, y_i + h_i / 2)
#
# cluster = [det_i]
# visited.add(i)
#
# for j, det_j in enumerate(detections):
# if j in visited:
# continue
#
# x_j, y_j, w_j, h_j = det_j["bbox"]
# center_j = (x_j + w_j / 2, y_j + h_j / 2)
#
# distance = math.sqrt(
# (center_i[0] - center_j[0]) ** 2 + (center_i[1] - center_j[1]) ** 2
# )
#
# if distance < max_distance:
# cluster.append(det_j)
# visited.add(j)
#
# clusters.append(cluster)
#
# logger.info(f"Clustered {len(detections)} detections into {len(clusters)} clusters")
# return clusters
#
#
# def select_best_cluster(clusters: List[List[Dict]]) -> Optional[List[Dict]]:
# """Select the best face cluster (highest frequency)
#
# Args:
# clusters: List of clusters
#
# Returns:
# Best cluster (most frequent) or None
# """
# if not clusters:
# return None
#
# scored_clusters = [(len(cluster), cluster) for cluster in clusters]
# scored_clusters.sort(key=lambda x: x[0], reverse=True)
#
# best_cluster = scored_clusters[0][1]
# logger.info(f"Selected best cluster with {len(best_cluster)} detections")
# return best_cluster
#
#
# def verify_face_stability(
# cluster: List[Dict], max_movement_percent: float = 0.3
# ) -> bool:
# """Verify face doesn't move too much between frames
#
# Args:
# cluster: Face detections for the same person
# max_movement_percent: Max movement as percentage of average face size
#
# Returns:
# True if face is stable, False otherwise
# """
# if len(cluster) < 2:
# return True
#
# centers = []
# sizes = []
#
# for det in cluster:
# x, y, w, h = det["bbox"]
# centers.append((x + w / 2, y + h / 2))
# sizes.append(w * h)
#
# avg_size = sum(sizes) / len(sizes)
# avg_face_dim = math.sqrt(avg_size)
# max_allowed_movement = avg_face_dim * max_movement_percent
#
# for i in range(len(centers) - 1):
# dx = abs(centers[i + 1][0] - centers[i][0])
# dy = abs(centers[i + 1][1] - centers[i][1])
# movement = math.sqrt(dx**2 + dy**2)
#
# if movement > max_allowed_movement:
# logger.warning(
# f"Face movement {movement:.1f}px > {max_allowed_movement:.1f}px"
# )
# return False
#
# return True
#
#
# def calculate_face_bbox_from_cluster(cluster: List[Dict]) -> Dict:
# """Calculate average face bounding box from cluster
#
# Args:
# cluster: Face detections for the same person
#
# Returns:
# Dict: {"x", "y", "width", "height"}
# """
# weighted_x = 0
# weighted_y = 0
# weighted_w = 0
# weighted_h = 0
# total_weight = 0
#
# for det in cluster:
# x, y, w, h = det["bbox"]
# weight = det["confidence"]
# weighted_x += x * weight
# weighted_y += y * weight
# weighted_w += w * weight
# weighted_h += h * weight
# total_weight += weight
#
# avg_bbox = {
# "x": int(weighted_x / total_weight),
# "y": int(weighted_y / total_weight),
# "width": int(weighted_w / total_weight),
# "height": int(weighted_h / total_weight),
# }
#
# return avg_bbox
#
#
# def calculate_safe_crop_size(
# face_bbox: Dict, video_width: int, video_height: int, crop_size: int = 512
# ) -> Dict:
# """Calculate safe crop region ensuring face is inside
#
# Args:
# face_bbox: Face bounding box {"x", "y", "width", "height"}
# video_width: Video width
# video_height: Video height
# crop_size: Size of crop region (default: 512)
#
# Returns:
# Dict: {"x", "y", "width", "height"}
# """
# crop_half = crop_size // 2
#
# face_center_x = face_bbox["x"] + face_bbox["width"] / 2
# face_center_y = face_bbox["y"] + face_bbox["height"] / 2
#
# crop_x = int(face_center_x - crop_half)
# crop_y = int(face_center_y - crop_half)
#
# crop_x = max(0, crop_x)
# crop_y = max(0, crop_y)
# crop_x = min(video_width - crop_size, crop_x)
# crop_y = min(video_height - crop_size, crop_y)
#
# face_right = face_bbox["x"] + face_bbox["width"]
# face_bottom = face_bbox["y"] + face_bbox["height"]
# crop_right = crop_x + crop_size
# crop_bottom = crop_y + crop_size
#
# if (
# face_bbox["x"] < crop_x
# or face_bbox["y"] < crop_y
# or face_right > crop_right
# or face_bottom > crop_bottom
# ):
# if face_bbox["x"] < crop_x:
# crop_x = face_bbox["x"]
# elif face_right > crop_right:
# crop_x = face_right - crop_size
#
# if face_bbox["y"] < crop_y:
# crop_y = face_bbox["y"]
# elif face_bottom > crop_bottom:
# crop_y = face_bottom - crop_size
#
# crop_x = max(0, crop_x)
# crop_y = max(0, crop_y)
# crop_x = min(video_width - crop_size, crop_x)
# crop_y = min(video_height - crop_size, crop_y)
#
# return {"x": crop_x, "y": crop_y, "width": crop_size, "height": crop_size}
#
#
# def detect_face_region(
# video_path: str,
# output_dir: str,
# crop_size: int = 512,
# sample_count: int = 20,
# min_confidence: float = 0.5,
# min_face_pixels: int = 100,
# max_face_movement_percent: float = 0.3,
# ) -> Dict:
# """Main function: Detect face and calculate safe crop (DEPRECATED - Pipeline handles this)
#
# Args:
# video_path: Path to video
# output_dir: Directory for temporary files
# crop_size: Size of crop region (default: 512)
# sample_count: Number of frames to sample (default: 20)
# min_confidence: Minimum detection confidence
# min_face_pixels: Minimum face size in pixels
# max_face_movement_percent: Max allowed face movement
#
# Returns:
# Dict: {"x", "y", "width", "height", "face_bbox"}
#
# Raises:
# FaceDetectionError: If face detection fails
# """
# try:
# logger.info(f"Starting face detection for: {video_path}")
# video_info = get_video_info(video_path)
# video_w, video_h = video_info["width"], video_info["height"]
# logger.info(
# f"Video: {video_w}x{video_h}, {video_info['fps']:.1f}fps, {video_info['duration']:.1f}s"
# )
#
# if video_w < crop_size or video_h < crop_size:
# raise FaceDetectionError(
# f"Video resolution {video_w}x{video_h} < {crop_size}x{crop_size}. "
# f"Please upload higher resolution video."
# )
#
# extracted_frames = sample_frames_from_video(
# video_path, output_dir, sample_count
# )
# logger.info(f"Sampled {len(extracted_frames)} frames for detection")
#
# detections = detect_faces_in_frames(
# extracted_frames, min_confidence, min_face_pixels
# )
#
# if not detections:
# raise FaceDetectionError(
# f"No face detected in {sample_count} sampled frames. "
# f"Please upload a video with a visible face."
# )
#
# logger.info(f"Found {len(detections)} face detections")
#
# frames_with_face = len(set(d["frame_idx"] for d in detections))
# face_coverage = frames_with_face / len(extracted_frames)
# logger.info(
# f"Face coverage: {frames_with_face}/{len(extracted_frames)} ({face_coverage * 100:.1f}%)"
# )
#
# if face_coverage < 0.5:
# raise FaceDetectionError(
# f"Face detected in only {frames_with_face}/{len(extracted_frames)} frames "
# f"({face_coverage * 100:.1f}%). "
# f"Please upload a video with a visible face."
# )
#
# clusters = cluster_face_detections(detections)
# logger.info(f"Grouped into {len(clusters)} face clusters")
#
# best_cluster = select_best_cluster(clusters)
#
# if best_cluster is None:
# raise FaceDetectionError(
# f"Failed to identify main face in video. "
# f"Please upload a video with a clear, visible face."
# )
#
# logger.info(f"Selected main face cluster with {len(best_cluster)} detections")
#
# if not verify_face_stability(best_cluster, max_face_movement_percent):
# raise FaceDetectionError(
# f"Face moves too much between frames. "
# f"Please upload a video with a stable face position."
# )
#
# logger.info("Face stability check passed")
#
# face_bbox = calculate_face_bbox_from_cluster(best_cluster)
# crop_bbox = calculate_safe_crop_size(face_bbox, video_w, video_h, crop_size)
#
# crop_bbox["face_bbox"] = face_bbox
#
# logger.info(
# f"Face detected at ({face_bbox['x']}, {face_bbox['y']}) "
# f"size {face_bbox['width']}x{face_bbox['height']}, "
# f"crop at ({crop_bbox['x']}, {crop_bbox['y']})"
# )
# logger.info("Face detection completed successfully")
#
# return crop_bbox
#
# except FaceDetectionError:
# raise
# except Exception as e:
# logger.error(f"Face detection failed: {e}")
# raise FaceDetectionError(f"Face detection failed: {str(e)}")
#
#
# def crop_video_to_size(
# video_path: str, crop_bbox: Dict, output_dir: str, crop_size: int = 512
# ) -> str:
# """Crop video to specified size using calculated bbox (DEPRECATED - Pipeline handles this)
#
# Args:
# video_path: Path to input video
# crop_bbox: Crop region {"x", "y", "width", "height"}
# output_dir: Directory to save output
# crop_size: Size of crop region (default: 512)
#
# Returns:
# Path to cropped video
# """
# output_path = os.path.join(output_dir, f"face_cropped_{crop_size}x{crop_size}.mp4")
#
# logger.info(
# f"Crop box: x={crop_bbox['x']}, y={crop_bbox['y']}, "
# f"width={crop_bbox['width']}, height={crop_bbox['height']}"
# )
#
# ffmpeg = FFmpeg(
# inputs={video_path: None},
# outputs={
# output_path: [
# "-vf",
# f"crop={crop_bbox['width']}:{crop_bbox['height']}:{crop_bbox['x']}:{crop_bbox['y']}",
# "-c:v",
# "libx264",
# "-preset",
# "slow",
# "-crf",
# "18",
# "-profile:v",
# "high",
# "-pix_fmt",
# "yuv420p",
# "-c:a",
# "copy",
# "-loglevel",
# "error",
# "-y",
# ]
# },
# )
# try:
# ffmpeg.run()
# except FFRuntimeError as e:
# logger.error(f"FFmpeg failed: {e}")
# raise
# logger.info(f"Cropped video to {crop_size}x{crop_size}: {output_path}")
# return output_path
#
#
# def blend_face_into_original(
# original_video: str,
# face_video: str,
# crop_bbox: Dict,
# output_dir: str,
# lipsynced_info: Dict | None = None,
# feather: int = 15,
# ) -> str:
# """Blend face video back into original video with edge feather only (DEPRECATED - Pipeline handles this)
#
# Args:
# original_video: Path to original video
# face_video: Path to lipsynced face video (cropped)
# crop_bbox: Crop region {"x", "y", "width", "height"}
# output_dir: Directory to save output
# lipsynced_info: Info of lipsynced video {"width", "height"} (optional)
# feather: Feather radius for smooth blending at edges
#
# Returns:
# Path to blended video
# """
# output_path = os.path.join(output_dir, "face_blended.mp4")
#
# overlay_x = crop_bbox["x"]
# overlay_y = crop_bbox["y"]
#
# if lipsynced_info:
# face_width = lipsynced_info["width"]
# face_height = lipsynced_info["height"]
# logger.info(
# f"Blending {face_width}x{face_height} at ({overlay_x}, {overlay_y}) "
# f"(crop_bbox: {crop_bbox})"
# )
# else:
# face_width = crop_bbox["width"]
# face_height = crop_bbox["height"]
# logger.info(f"Blending at ({overlay_x}, {overlay_y})")
#
# mask_w = face_width
# mask_h = face_height
#
# feather_radius = 50
#
# ffmpeg = FFmpeg(
# inputs={original_video: None, face_video: None},
# outputs={
# output_path: [
# "-filter_complex",
# f"[0:v][1:v]overlay={overlay_x}:{overlay_y}",
# "-c:v",
# "libx264",
# "-preset",
# "slow",
# "-crf",
# "18",
# "-profile:v",
# "high",
# "-pix_fmt",
# "yuv420p",
# "-threads",
# "0",
# "-movflags",
# "+faststart",
# "-c:a",
# "copy",
# "-loglevel",
# "error",
# "-y",
# ]
# },
# )
# try:
# ffmpeg.run()
# except FFRuntimeError as e:
# logger.error(f"FFmpeg failed: {e}")
# raise
# logger.info(f"Blended face into original: {output_path}")
# return output_path