"""Face detection and region extraction for lipsync optimization (DEPRECATED - Pipeline handles this automatically)""" # NOTE: All functions in this module are DEPRECATED. # The lipsync pipeline (latentsync/pipelines/lipsync_pipeline.py) now handles: # - Face detection # - Affine transformation # - Crop # - Restore # These functions are kept for reference but not used in the new workflow. # import os # import math # import logging # from typing import List, Dict, Tuple, Optional # # import cv2 # import numpy as np # import mediapipe as mp # from ffmpy import FFmpeg, FFRuntimeError # # from video_processing import get_video_info # # logger = logging.getLogger(__name__) # # # class FaceDetectionError(Exception): # """Custom exception for face detection errors""" # # pass # # # def sample_frames_from_video( # video_path: str, output_dir: str, sample_count: int = 5 # ) -> List[Tuple[int, str]]: # """Extract uniform sample frames from video using OpenCV CUDA (HuggingFace) # # Args: # video_path: Path to video # output_dir: Directory to save extracted frames # sample_count: Number of frames to sample # # Returns: # List of (frame_index, frame_path) tuples # """ # video_info = get_video_info(video_path) # fps = video_info["fps"] # duration = video_info["duration"] # total_frames = int(duration * fps) # # frames_dir = os.path.join(output_dir, "sampled_frames") # os.makedirs(frames_dir, exist_ok=True) # # if total_frames <= sample_count: # frame_indices = list(range(total_frames)) # else: # frame_indices = [ # int(i * total_frames / sample_count) for i in range(sample_count) # ] # # extracted_frames = [] # cap = cv2.VideoCapture(video_path) # # try: # for idx, frame_idx in enumerate(frame_indices): # cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx) # ret, frame = cap.read() # # if not ret or frame is None: # logger.warning(f"Failed to read frame {frame_idx}") # continue # # frame_path = os.path.join(frames_dir, f"frame_{idx:04d}.jpg") # cv2.imwrite(frame_path, frame, [cv2.IMWRITE_JPEG_QUALITY, 90]) # extracted_frames.append((frame_idx, frame_path)) # finally: # cap.release() # # logger.info(f"Extracted {len(extracted_frames)} frames from {video_path}") # return extracted_frames # # # def detect_faces_in_frames( # extracted_frames: List[Tuple[int, str]], # min_confidence: float = 0.5, # min_face_pixels: int = 100, # ) -> List[Dict]: # """Detect faces in all sampled frames using MediaPipe Face Detection API # # Args: # extracted_frames: List of (frame_index, frame_path) tuples # min_confidence: Minimum detection confidence (0-1) # min_face_pixels: Minimum face size in pixels # # Returns: # List of detections: [{"frame_idx", "confidence", "bbox": (x, y, w, h)}] # """ # detections = [] # # with mp.solutions.face_detection.FaceDetection( # model_selection=0, min_detection_confidence=min_confidence # ) as face_detection: # for frame_idx, frame_path in extracted_frames: # frame = cv2.imread(frame_path) # if frame is None: # logger.warning(f"Failed to read frame: {frame_path}") # continue # # h, w = frame.shape[:2] # frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # # results = face_detection.process(frame_rgb) # # if results.detections: # for detection in results.detections: # bbox = detection.location_data.relative_bounding_box # # x = int(bbox.xmin * w) # y = int(bbox.ymin * h) # face_w = int(bbox.width * w) # face_h = int(bbox.height * h) # # x = max(0, x) # y = max(0, y) # face_w = min(w - x, face_w) # face_h = min(h - y, face_h) # # confidence = detection.score[0] if detection.score else 0.0 # # if face_w >= min_face_pixels and face_h >= min_face_pixels: # detections.append( # { # "frame_idx": frame_idx, # "confidence": float(confidence), # "bbox": (x, y, face_w, face_h), # } # ) # # logger.info(f"Detected {len(detections)} faces in {len(extracted_frames)} frames") # return detections # # # def cluster_face_detections( # detections: List[Dict], max_distance: int = 100 # ) -> List[List[Dict]]: # """Group face detections belonging to the same person using clustering # # Args: # detections: List of face detections # max_distance: Maximum distance (pixels) to consider detections as same person # # Returns: # List of clusters (each cluster is a list of detections) # """ # if not detections: # return [] # # clusters = [] # visited = set() # # for i, det_i in enumerate(detections): # if i in visited: # continue # # x_i, y_i, w_i, h_i = det_i["bbox"] # center_i = (x_i + w_i / 2, y_i + h_i / 2) # # cluster = [det_i] # visited.add(i) # # for j, det_j in enumerate(detections): # if j in visited: # continue # # x_j, y_j, w_j, h_j = det_j["bbox"] # center_j = (x_j + w_j / 2, y_j + h_j / 2) # # distance = math.sqrt( # (center_i[0] - center_j[0]) ** 2 + (center_i[1] - center_j[1]) ** 2 # ) # # if distance < max_distance: # cluster.append(det_j) # visited.add(j) # # clusters.append(cluster) # # logger.info(f"Clustered {len(detections)} detections into {len(clusters)} clusters") # return clusters # # # def select_best_cluster(clusters: List[List[Dict]]) -> Optional[List[Dict]]: # """Select the best face cluster (highest frequency) # # Args: # clusters: List of clusters # # Returns: # Best cluster (most frequent) or None # """ # if not clusters: # return None # # scored_clusters = [(len(cluster), cluster) for cluster in clusters] # scored_clusters.sort(key=lambda x: x[0], reverse=True) # # best_cluster = scored_clusters[0][1] # logger.info(f"Selected best cluster with {len(best_cluster)} detections") # return best_cluster # # # def verify_face_stability( # cluster: List[Dict], max_movement_percent: float = 0.3 # ) -> bool: # """Verify face doesn't move too much between frames # # Args: # cluster: Face detections for the same person # max_movement_percent: Max movement as percentage of average face size # # Returns: # True if face is stable, False otherwise # """ # if len(cluster) < 2: # return True # # centers = [] # sizes = [] # # for det in cluster: # x, y, w, h = det["bbox"] # centers.append((x + w / 2, y + h / 2)) # sizes.append(w * h) # # avg_size = sum(sizes) / len(sizes) # avg_face_dim = math.sqrt(avg_size) # max_allowed_movement = avg_face_dim * max_movement_percent # # for i in range(len(centers) - 1): # dx = abs(centers[i + 1][0] - centers[i][0]) # dy = abs(centers[i + 1][1] - centers[i][1]) # movement = math.sqrt(dx**2 + dy**2) # # if movement > max_allowed_movement: # logger.warning( # f"Face movement {movement:.1f}px > {max_allowed_movement:.1f}px" # ) # return False # # return True # # # def calculate_face_bbox_from_cluster(cluster: List[Dict]) -> Dict: # """Calculate average face bounding box from cluster # # Args: # cluster: Face detections for the same person # # Returns: # Dict: {"x", "y", "width", "height"} # """ # weighted_x = 0 # weighted_y = 0 # weighted_w = 0 # weighted_h = 0 # total_weight = 0 # # for det in cluster: # x, y, w, h = det["bbox"] # weight = det["confidence"] # weighted_x += x * weight # weighted_y += y * weight # weighted_w += w * weight # weighted_h += h * weight # total_weight += weight # # avg_bbox = { # "x": int(weighted_x / total_weight), # "y": int(weighted_y / total_weight), # "width": int(weighted_w / total_weight), # "height": int(weighted_h / total_weight), # } # # return avg_bbox # # # def calculate_safe_crop_size( # face_bbox: Dict, video_width: int, video_height: int, crop_size: int = 512 # ) -> Dict: # """Calculate safe crop region ensuring face is inside # # Args: # face_bbox: Face bounding box {"x", "y", "width", "height"} # video_width: Video width # video_height: Video height # crop_size: Size of crop region (default: 512) # # Returns: # Dict: {"x", "y", "width", "height"} # """ # crop_half = crop_size // 2 # # face_center_x = face_bbox["x"] + face_bbox["width"] / 2 # face_center_y = face_bbox["y"] + face_bbox["height"] / 2 # # crop_x = int(face_center_x - crop_half) # crop_y = int(face_center_y - crop_half) # # crop_x = max(0, crop_x) # crop_y = max(0, crop_y) # crop_x = min(video_width - crop_size, crop_x) # crop_y = min(video_height - crop_size, crop_y) # # face_right = face_bbox["x"] + face_bbox["width"] # face_bottom = face_bbox["y"] + face_bbox["height"] # crop_right = crop_x + crop_size # crop_bottom = crop_y + crop_size # # if ( # face_bbox["x"] < crop_x # or face_bbox["y"] < crop_y # or face_right > crop_right # or face_bottom > crop_bottom # ): # if face_bbox["x"] < crop_x: # crop_x = face_bbox["x"] # elif face_right > crop_right: # crop_x = face_right - crop_size # # if face_bbox["y"] < crop_y: # crop_y = face_bbox["y"] # elif face_bottom > crop_bottom: # crop_y = face_bottom - crop_size # # crop_x = max(0, crop_x) # crop_y = max(0, crop_y) # crop_x = min(video_width - crop_size, crop_x) # crop_y = min(video_height - crop_size, crop_y) # # return {"x": crop_x, "y": crop_y, "width": crop_size, "height": crop_size} # # # def detect_face_region( # video_path: str, # output_dir: str, # crop_size: int = 512, # sample_count: int = 20, # min_confidence: float = 0.5, # min_face_pixels: int = 100, # max_face_movement_percent: float = 0.3, # ) -> Dict: # """Main function: Detect face and calculate safe crop (DEPRECATED - Pipeline handles this) # # Args: # video_path: Path to video # output_dir: Directory for temporary files # crop_size: Size of crop region (default: 512) # sample_count: Number of frames to sample (default: 20) # min_confidence: Minimum detection confidence # min_face_pixels: Minimum face size in pixels # max_face_movement_percent: Max allowed face movement # # Returns: # Dict: {"x", "y", "width", "height", "face_bbox"} # # Raises: # FaceDetectionError: If face detection fails # """ # try: # logger.info(f"Starting face detection for: {video_path}") # video_info = get_video_info(video_path) # video_w, video_h = video_info["width"], video_info["height"] # logger.info( # f"Video: {video_w}x{video_h}, {video_info['fps']:.1f}fps, {video_info['duration']:.1f}s" # ) # # if video_w < crop_size or video_h < crop_size: # raise FaceDetectionError( # f"Video resolution {video_w}x{video_h} < {crop_size}x{crop_size}. " # f"Please upload higher resolution video." # ) # # extracted_frames = sample_frames_from_video( # video_path, output_dir, sample_count # ) # logger.info(f"Sampled {len(extracted_frames)} frames for detection") # # detections = detect_faces_in_frames( # extracted_frames, min_confidence, min_face_pixels # ) # # if not detections: # raise FaceDetectionError( # f"No face detected in {sample_count} sampled frames. " # f"Please upload a video with a visible face." # ) # # logger.info(f"Found {len(detections)} face detections") # # frames_with_face = len(set(d["frame_idx"] for d in detections)) # face_coverage = frames_with_face / len(extracted_frames) # logger.info( # f"Face coverage: {frames_with_face}/{len(extracted_frames)} ({face_coverage * 100:.1f}%)" # ) # # if face_coverage < 0.5: # raise FaceDetectionError( # f"Face detected in only {frames_with_face}/{len(extracted_frames)} frames " # f"({face_coverage * 100:.1f}%). " # f"Please upload a video with a visible face." # ) # # clusters = cluster_face_detections(detections) # logger.info(f"Grouped into {len(clusters)} face clusters") # # best_cluster = select_best_cluster(clusters) # # if best_cluster is None: # raise FaceDetectionError( # f"Failed to identify main face in video. " # f"Please upload a video with a clear, visible face." # ) # # logger.info(f"Selected main face cluster with {len(best_cluster)} detections") # # if not verify_face_stability(best_cluster, max_face_movement_percent): # raise FaceDetectionError( # f"Face moves too much between frames. " # f"Please upload a video with a stable face position." # ) # # logger.info("Face stability check passed") # # face_bbox = calculate_face_bbox_from_cluster(best_cluster) # crop_bbox = calculate_safe_crop_size(face_bbox, video_w, video_h, crop_size) # # crop_bbox["face_bbox"] = face_bbox # # logger.info( # f"Face detected at ({face_bbox['x']}, {face_bbox['y']}) " # f"size {face_bbox['width']}x{face_bbox['height']}, " # f"crop at ({crop_bbox['x']}, {crop_bbox['y']})" # ) # logger.info("Face detection completed successfully") # # return crop_bbox # # except FaceDetectionError: # raise # except Exception as e: # logger.error(f"Face detection failed: {e}") # raise FaceDetectionError(f"Face detection failed: {str(e)}") # # # def crop_video_to_size( # video_path: str, crop_bbox: Dict, output_dir: str, crop_size: int = 512 # ) -> str: # """Crop video to specified size using calculated bbox (DEPRECATED - Pipeline handles this) # # Args: # video_path: Path to input video # crop_bbox: Crop region {"x", "y", "width", "height"} # output_dir: Directory to save output # crop_size: Size of crop region (default: 512) # # Returns: # Path to cropped video # """ # output_path = os.path.join(output_dir, f"face_cropped_{crop_size}x{crop_size}.mp4") # # logger.info( # f"Crop box: x={crop_bbox['x']}, y={crop_bbox['y']}, " # f"width={crop_bbox['width']}, height={crop_bbox['height']}" # ) # # ffmpeg = FFmpeg( # inputs={video_path: None}, # outputs={ # output_path: [ # "-vf", # f"crop={crop_bbox['width']}:{crop_bbox['height']}:{crop_bbox['x']}:{crop_bbox['y']}", # "-c:v", # "libx264", # "-preset", # "slow", # "-crf", # "18", # "-profile:v", # "high", # "-pix_fmt", # "yuv420p", # "-c:a", # "copy", # "-loglevel", # "error", # "-y", # ] # }, # ) # try: # ffmpeg.run() # except FFRuntimeError as e: # logger.error(f"FFmpeg failed: {e}") # raise # logger.info(f"Cropped video to {crop_size}x{crop_size}: {output_path}") # return output_path # # # def blend_face_into_original( # original_video: str, # face_video: str, # crop_bbox: Dict, # output_dir: str, # lipsynced_info: Dict | None = None, # feather: int = 15, # ) -> str: # """Blend face video back into original video with edge feather only (DEPRECATED - Pipeline handles this) # # Args: # original_video: Path to original video # face_video: Path to lipsynced face video (cropped) # crop_bbox: Crop region {"x", "y", "width", "height"} # output_dir: Directory to save output # lipsynced_info: Info of lipsynced video {"width", "height"} (optional) # feather: Feather radius for smooth blending at edges # # Returns: # Path to blended video # """ # output_path = os.path.join(output_dir, "face_blended.mp4") # # overlay_x = crop_bbox["x"] # overlay_y = crop_bbox["y"] # # if lipsynced_info: # face_width = lipsynced_info["width"] # face_height = lipsynced_info["height"] # logger.info( # f"Blending {face_width}x{face_height} at ({overlay_x}, {overlay_y}) " # f"(crop_bbox: {crop_bbox})" # ) # else: # face_width = crop_bbox["width"] # face_height = crop_bbox["height"] # logger.info(f"Blending at ({overlay_x}, {overlay_y})") # # mask_w = face_width # mask_h = face_height # # feather_radius = 50 # # ffmpeg = FFmpeg( # inputs={original_video: None, face_video: None}, # outputs={ # output_path: [ # "-filter_complex", # f"[0:v][1:v]overlay={overlay_x}:{overlay_y}", # "-c:v", # "libx264", # "-preset", # "slow", # "-crf", # "18", # "-profile:v", # "high", # "-pix_fmt", # "yuv420p", # "-threads", # "0", # "-movflags", # "+faststart", # "-c:a", # "copy", # "-loglevel", # "error", # "-y", # ] # }, # ) # try: # ffmpeg.run() # except FFRuntimeError as e: # logger.error(f"FFmpeg failed: {e}") # raise # logger.info(f"Blended face into original: {output_path}") # return output_path