Spaces:
Runtime error
Runtime error
| """Face detection and region extraction for lipsync optimization (DEPRECATED - Pipeline handles this automatically)""" | |
| # NOTE: All functions in this module are DEPRECATED. | |
| # The lipsync pipeline (latentsync/pipelines/lipsync_pipeline.py) now handles: | |
| # - Face detection | |
| # - Affine transformation | |
| # - Crop | |
| # - Restore | |
| # These functions are kept for reference but not used in the new workflow. | |
| # import os | |
| # import math | |
| # import logging | |
| # from typing import List, Dict, Tuple, Optional | |
| # | |
| # import cv2 | |
| # import numpy as np | |
| # import mediapipe as mp | |
| # from ffmpy import FFmpeg, FFRuntimeError | |
| # | |
| # from video_processing import get_video_info | |
| # | |
| # logger = logging.getLogger(__name__) | |
| # | |
| # | |
| # class FaceDetectionError(Exception): | |
| # """Custom exception for face detection errors""" | |
| # | |
| # pass | |
| # | |
| # | |
| # def sample_frames_from_video( | |
| # video_path: str, output_dir: str, sample_count: int = 5 | |
| # ) -> List[Tuple[int, str]]: | |
| # """Extract uniform sample frames from video using OpenCV CUDA (HuggingFace) | |
| # | |
| # Args: | |
| # video_path: Path to video | |
| # output_dir: Directory to save extracted frames | |
| # sample_count: Number of frames to sample | |
| # | |
| # Returns: | |
| # List of (frame_index, frame_path) tuples | |
| # """ | |
| # video_info = get_video_info(video_path) | |
| # fps = video_info["fps"] | |
| # duration = video_info["duration"] | |
| # total_frames = int(duration * fps) | |
| # | |
| # frames_dir = os.path.join(output_dir, "sampled_frames") | |
| # os.makedirs(frames_dir, exist_ok=True) | |
| # | |
| # if total_frames <= sample_count: | |
| # frame_indices = list(range(total_frames)) | |
| # else: | |
| # frame_indices = [ | |
| # int(i * total_frames / sample_count) for i in range(sample_count) | |
| # ] | |
| # | |
| # extracted_frames = [] | |
| # cap = cv2.VideoCapture(video_path) | |
| # | |
| # try: | |
| # for idx, frame_idx in enumerate(frame_indices): | |
| # cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx) | |
| # ret, frame = cap.read() | |
| # | |
| # if not ret or frame is None: | |
| # logger.warning(f"Failed to read frame {frame_idx}") | |
| # continue | |
| # | |
| # frame_path = os.path.join(frames_dir, f"frame_{idx:04d}.jpg") | |
| # cv2.imwrite(frame_path, frame, [cv2.IMWRITE_JPEG_QUALITY, 90]) | |
| # extracted_frames.append((frame_idx, frame_path)) | |
| # finally: | |
| # cap.release() | |
| # | |
| # logger.info(f"Extracted {len(extracted_frames)} frames from {video_path}") | |
| # return extracted_frames | |
| # | |
| # | |
| # def detect_faces_in_frames( | |
| # extracted_frames: List[Tuple[int, str]], | |
| # min_confidence: float = 0.5, | |
| # min_face_pixels: int = 100, | |
| # ) -> List[Dict]: | |
| # """Detect faces in all sampled frames using MediaPipe Face Detection API | |
| # | |
| # Args: | |
| # extracted_frames: List of (frame_index, frame_path) tuples | |
| # min_confidence: Minimum detection confidence (0-1) | |
| # min_face_pixels: Minimum face size in pixels | |
| # | |
| # Returns: | |
| # List of detections: [{"frame_idx", "confidence", "bbox": (x, y, w, h)}] | |
| # """ | |
| # detections = [] | |
| # | |
| # with mp.solutions.face_detection.FaceDetection( | |
| # model_selection=0, min_detection_confidence=min_confidence | |
| # ) as face_detection: | |
| # for frame_idx, frame_path in extracted_frames: | |
| # frame = cv2.imread(frame_path) | |
| # if frame is None: | |
| # logger.warning(f"Failed to read frame: {frame_path}") | |
| # continue | |
| # | |
| # h, w = frame.shape[:2] | |
| # frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| # | |
| # results = face_detection.process(frame_rgb) | |
| # | |
| # if results.detections: | |
| # for detection in results.detections: | |
| # bbox = detection.location_data.relative_bounding_box | |
| # | |
| # x = int(bbox.xmin * w) | |
| # y = int(bbox.ymin * h) | |
| # face_w = int(bbox.width * w) | |
| # face_h = int(bbox.height * h) | |
| # | |
| # x = max(0, x) | |
| # y = max(0, y) | |
| # face_w = min(w - x, face_w) | |
| # face_h = min(h - y, face_h) | |
| # | |
| # confidence = detection.score[0] if detection.score else 0.0 | |
| # | |
| # if face_w >= min_face_pixels and face_h >= min_face_pixels: | |
| # detections.append( | |
| # { | |
| # "frame_idx": frame_idx, | |
| # "confidence": float(confidence), | |
| # "bbox": (x, y, face_w, face_h), | |
| # } | |
| # ) | |
| # | |
| # logger.info(f"Detected {len(detections)} faces in {len(extracted_frames)} frames") | |
| # return detections | |
| # | |
| # | |
| # def cluster_face_detections( | |
| # detections: List[Dict], max_distance: int = 100 | |
| # ) -> List[List[Dict]]: | |
| # """Group face detections belonging to the same person using clustering | |
| # | |
| # Args: | |
| # detections: List of face detections | |
| # max_distance: Maximum distance (pixels) to consider detections as same person | |
| # | |
| # Returns: | |
| # List of clusters (each cluster is a list of detections) | |
| # """ | |
| # if not detections: | |
| # return [] | |
| # | |
| # clusters = [] | |
| # visited = set() | |
| # | |
| # for i, det_i in enumerate(detections): | |
| # if i in visited: | |
| # continue | |
| # | |
| # x_i, y_i, w_i, h_i = det_i["bbox"] | |
| # center_i = (x_i + w_i / 2, y_i + h_i / 2) | |
| # | |
| # cluster = [det_i] | |
| # visited.add(i) | |
| # | |
| # for j, det_j in enumerate(detections): | |
| # if j in visited: | |
| # continue | |
| # | |
| # x_j, y_j, w_j, h_j = det_j["bbox"] | |
| # center_j = (x_j + w_j / 2, y_j + h_j / 2) | |
| # | |
| # distance = math.sqrt( | |
| # (center_i[0] - center_j[0]) ** 2 + (center_i[1] - center_j[1]) ** 2 | |
| # ) | |
| # | |
| # if distance < max_distance: | |
| # cluster.append(det_j) | |
| # visited.add(j) | |
| # | |
| # clusters.append(cluster) | |
| # | |
| # logger.info(f"Clustered {len(detections)} detections into {len(clusters)} clusters") | |
| # return clusters | |
| # | |
| # | |
| # def select_best_cluster(clusters: List[List[Dict]]) -> Optional[List[Dict]]: | |
| # """Select the best face cluster (highest frequency) | |
| # | |
| # Args: | |
| # clusters: List of clusters | |
| # | |
| # Returns: | |
| # Best cluster (most frequent) or None | |
| # """ | |
| # if not clusters: | |
| # return None | |
| # | |
| # scored_clusters = [(len(cluster), cluster) for cluster in clusters] | |
| # scored_clusters.sort(key=lambda x: x[0], reverse=True) | |
| # | |
| # best_cluster = scored_clusters[0][1] | |
| # logger.info(f"Selected best cluster with {len(best_cluster)} detections") | |
| # return best_cluster | |
| # | |
| # | |
| # def verify_face_stability( | |
| # cluster: List[Dict], max_movement_percent: float = 0.3 | |
| # ) -> bool: | |
| # """Verify face doesn't move too much between frames | |
| # | |
| # Args: | |
| # cluster: Face detections for the same person | |
| # max_movement_percent: Max movement as percentage of average face size | |
| # | |
| # Returns: | |
| # True if face is stable, False otherwise | |
| # """ | |
| # if len(cluster) < 2: | |
| # return True | |
| # | |
| # centers = [] | |
| # sizes = [] | |
| # | |
| # for det in cluster: | |
| # x, y, w, h = det["bbox"] | |
| # centers.append((x + w / 2, y + h / 2)) | |
| # sizes.append(w * h) | |
| # | |
| # avg_size = sum(sizes) / len(sizes) | |
| # avg_face_dim = math.sqrt(avg_size) | |
| # max_allowed_movement = avg_face_dim * max_movement_percent | |
| # | |
| # for i in range(len(centers) - 1): | |
| # dx = abs(centers[i + 1][0] - centers[i][0]) | |
| # dy = abs(centers[i + 1][1] - centers[i][1]) | |
| # movement = math.sqrt(dx**2 + dy**2) | |
| # | |
| # if movement > max_allowed_movement: | |
| # logger.warning( | |
| # f"Face movement {movement:.1f}px > {max_allowed_movement:.1f}px" | |
| # ) | |
| # return False | |
| # | |
| # return True | |
| # | |
| # | |
| # def calculate_face_bbox_from_cluster(cluster: List[Dict]) -> Dict: | |
| # """Calculate average face bounding box from cluster | |
| # | |
| # Args: | |
| # cluster: Face detections for the same person | |
| # | |
| # Returns: | |
| # Dict: {"x", "y", "width", "height"} | |
| # """ | |
| # weighted_x = 0 | |
| # weighted_y = 0 | |
| # weighted_w = 0 | |
| # weighted_h = 0 | |
| # total_weight = 0 | |
| # | |
| # for det in cluster: | |
| # x, y, w, h = det["bbox"] | |
| # weight = det["confidence"] | |
| # weighted_x += x * weight | |
| # weighted_y += y * weight | |
| # weighted_w += w * weight | |
| # weighted_h += h * weight | |
| # total_weight += weight | |
| # | |
| # avg_bbox = { | |
| # "x": int(weighted_x / total_weight), | |
| # "y": int(weighted_y / total_weight), | |
| # "width": int(weighted_w / total_weight), | |
| # "height": int(weighted_h / total_weight), | |
| # } | |
| # | |
| # return avg_bbox | |
| # | |
| # | |
| # def calculate_safe_crop_size( | |
| # face_bbox: Dict, video_width: int, video_height: int, crop_size: int = 512 | |
| # ) -> Dict: | |
| # """Calculate safe crop region ensuring face is inside | |
| # | |
| # Args: | |
| # face_bbox: Face bounding box {"x", "y", "width", "height"} | |
| # video_width: Video width | |
| # video_height: Video height | |
| # crop_size: Size of crop region (default: 512) | |
| # | |
| # Returns: | |
| # Dict: {"x", "y", "width", "height"} | |
| # """ | |
| # crop_half = crop_size // 2 | |
| # | |
| # face_center_x = face_bbox["x"] + face_bbox["width"] / 2 | |
| # face_center_y = face_bbox["y"] + face_bbox["height"] / 2 | |
| # | |
| # crop_x = int(face_center_x - crop_half) | |
| # crop_y = int(face_center_y - crop_half) | |
| # | |
| # crop_x = max(0, crop_x) | |
| # crop_y = max(0, crop_y) | |
| # crop_x = min(video_width - crop_size, crop_x) | |
| # crop_y = min(video_height - crop_size, crop_y) | |
| # | |
| # face_right = face_bbox["x"] + face_bbox["width"] | |
| # face_bottom = face_bbox["y"] + face_bbox["height"] | |
| # crop_right = crop_x + crop_size | |
| # crop_bottom = crop_y + crop_size | |
| # | |
| # if ( | |
| # face_bbox["x"] < crop_x | |
| # or face_bbox["y"] < crop_y | |
| # or face_right > crop_right | |
| # or face_bottom > crop_bottom | |
| # ): | |
| # if face_bbox["x"] < crop_x: | |
| # crop_x = face_bbox["x"] | |
| # elif face_right > crop_right: | |
| # crop_x = face_right - crop_size | |
| # | |
| # if face_bbox["y"] < crop_y: | |
| # crop_y = face_bbox["y"] | |
| # elif face_bottom > crop_bottom: | |
| # crop_y = face_bottom - crop_size | |
| # | |
| # crop_x = max(0, crop_x) | |
| # crop_y = max(0, crop_y) | |
| # crop_x = min(video_width - crop_size, crop_x) | |
| # crop_y = min(video_height - crop_size, crop_y) | |
| # | |
| # return {"x": crop_x, "y": crop_y, "width": crop_size, "height": crop_size} | |
| # | |
| # | |
| # def detect_face_region( | |
| # video_path: str, | |
| # output_dir: str, | |
| # crop_size: int = 512, | |
| # sample_count: int = 20, | |
| # min_confidence: float = 0.5, | |
| # min_face_pixels: int = 100, | |
| # max_face_movement_percent: float = 0.3, | |
| # ) -> Dict: | |
| # """Main function: Detect face and calculate safe crop (DEPRECATED - Pipeline handles this) | |
| # | |
| # Args: | |
| # video_path: Path to video | |
| # output_dir: Directory for temporary files | |
| # crop_size: Size of crop region (default: 512) | |
| # sample_count: Number of frames to sample (default: 20) | |
| # min_confidence: Minimum detection confidence | |
| # min_face_pixels: Minimum face size in pixels | |
| # max_face_movement_percent: Max allowed face movement | |
| # | |
| # Returns: | |
| # Dict: {"x", "y", "width", "height", "face_bbox"} | |
| # | |
| # Raises: | |
| # FaceDetectionError: If face detection fails | |
| # """ | |
| # try: | |
| # logger.info(f"Starting face detection for: {video_path}") | |
| # video_info = get_video_info(video_path) | |
| # video_w, video_h = video_info["width"], video_info["height"] | |
| # logger.info( | |
| # f"Video: {video_w}x{video_h}, {video_info['fps']:.1f}fps, {video_info['duration']:.1f}s" | |
| # ) | |
| # | |
| # if video_w < crop_size or video_h < crop_size: | |
| # raise FaceDetectionError( | |
| # f"Video resolution {video_w}x{video_h} < {crop_size}x{crop_size}. " | |
| # f"Please upload higher resolution video." | |
| # ) | |
| # | |
| # extracted_frames = sample_frames_from_video( | |
| # video_path, output_dir, sample_count | |
| # ) | |
| # logger.info(f"Sampled {len(extracted_frames)} frames for detection") | |
| # | |
| # detections = detect_faces_in_frames( | |
| # extracted_frames, min_confidence, min_face_pixels | |
| # ) | |
| # | |
| # if not detections: | |
| # raise FaceDetectionError( | |
| # f"No face detected in {sample_count} sampled frames. " | |
| # f"Please upload a video with a visible face." | |
| # ) | |
| # | |
| # logger.info(f"Found {len(detections)} face detections") | |
| # | |
| # frames_with_face = len(set(d["frame_idx"] for d in detections)) | |
| # face_coverage = frames_with_face / len(extracted_frames) | |
| # logger.info( | |
| # f"Face coverage: {frames_with_face}/{len(extracted_frames)} ({face_coverage * 100:.1f}%)" | |
| # ) | |
| # | |
| # if face_coverage < 0.5: | |
| # raise FaceDetectionError( | |
| # f"Face detected in only {frames_with_face}/{len(extracted_frames)} frames " | |
| # f"({face_coverage * 100:.1f}%). " | |
| # f"Please upload a video with a visible face." | |
| # ) | |
| # | |
| # clusters = cluster_face_detections(detections) | |
| # logger.info(f"Grouped into {len(clusters)} face clusters") | |
| # | |
| # best_cluster = select_best_cluster(clusters) | |
| # | |
| # if best_cluster is None: | |
| # raise FaceDetectionError( | |
| # f"Failed to identify main face in video. " | |
| # f"Please upload a video with a clear, visible face." | |
| # ) | |
| # | |
| # logger.info(f"Selected main face cluster with {len(best_cluster)} detections") | |
| # | |
| # if not verify_face_stability(best_cluster, max_face_movement_percent): | |
| # raise FaceDetectionError( | |
| # f"Face moves too much between frames. " | |
| # f"Please upload a video with a stable face position." | |
| # ) | |
| # | |
| # logger.info("Face stability check passed") | |
| # | |
| # face_bbox = calculate_face_bbox_from_cluster(best_cluster) | |
| # crop_bbox = calculate_safe_crop_size(face_bbox, video_w, video_h, crop_size) | |
| # | |
| # crop_bbox["face_bbox"] = face_bbox | |
| # | |
| # logger.info( | |
| # f"Face detected at ({face_bbox['x']}, {face_bbox['y']}) " | |
| # f"size {face_bbox['width']}x{face_bbox['height']}, " | |
| # f"crop at ({crop_bbox['x']}, {crop_bbox['y']})" | |
| # ) | |
| # logger.info("Face detection completed successfully") | |
| # | |
| # return crop_bbox | |
| # | |
| # except FaceDetectionError: | |
| # raise | |
| # except Exception as e: | |
| # logger.error(f"Face detection failed: {e}") | |
| # raise FaceDetectionError(f"Face detection failed: {str(e)}") | |
| # | |
| # | |
| # def crop_video_to_size( | |
| # video_path: str, crop_bbox: Dict, output_dir: str, crop_size: int = 512 | |
| # ) -> str: | |
| # """Crop video to specified size using calculated bbox (DEPRECATED - Pipeline handles this) | |
| # | |
| # Args: | |
| # video_path: Path to input video | |
| # crop_bbox: Crop region {"x", "y", "width", "height"} | |
| # output_dir: Directory to save output | |
| # crop_size: Size of crop region (default: 512) | |
| # | |
| # Returns: | |
| # Path to cropped video | |
| # """ | |
| # output_path = os.path.join(output_dir, f"face_cropped_{crop_size}x{crop_size}.mp4") | |
| # | |
| # logger.info( | |
| # f"Crop box: x={crop_bbox['x']}, y={crop_bbox['y']}, " | |
| # f"width={crop_bbox['width']}, height={crop_bbox['height']}" | |
| # ) | |
| # | |
| # ffmpeg = FFmpeg( | |
| # inputs={video_path: None}, | |
| # outputs={ | |
| # output_path: [ | |
| # "-vf", | |
| # f"crop={crop_bbox['width']}:{crop_bbox['height']}:{crop_bbox['x']}:{crop_bbox['y']}", | |
| # "-c:v", | |
| # "libx264", | |
| # "-preset", | |
| # "slow", | |
| # "-crf", | |
| # "18", | |
| # "-profile:v", | |
| # "high", | |
| # "-pix_fmt", | |
| # "yuv420p", | |
| # "-c:a", | |
| # "copy", | |
| # "-loglevel", | |
| # "error", | |
| # "-y", | |
| # ] | |
| # }, | |
| # ) | |
| # try: | |
| # ffmpeg.run() | |
| # except FFRuntimeError as e: | |
| # logger.error(f"FFmpeg failed: {e}") | |
| # raise | |
| # logger.info(f"Cropped video to {crop_size}x{crop_size}: {output_path}") | |
| # return output_path | |
| # | |
| # | |
| # def blend_face_into_original( | |
| # original_video: str, | |
| # face_video: str, | |
| # crop_bbox: Dict, | |
| # output_dir: str, | |
| # lipsynced_info: Dict | None = None, | |
| # feather: int = 15, | |
| # ) -> str: | |
| # """Blend face video back into original video with edge feather only (DEPRECATED - Pipeline handles this) | |
| # | |
| # Args: | |
| # original_video: Path to original video | |
| # face_video: Path to lipsynced face video (cropped) | |
| # crop_bbox: Crop region {"x", "y", "width", "height"} | |
| # output_dir: Directory to save output | |
| # lipsynced_info: Info of lipsynced video {"width", "height"} (optional) | |
| # feather: Feather radius for smooth blending at edges | |
| # | |
| # Returns: | |
| # Path to blended video | |
| # """ | |
| # output_path = os.path.join(output_dir, "face_blended.mp4") | |
| # | |
| # overlay_x = crop_bbox["x"] | |
| # overlay_y = crop_bbox["y"] | |
| # | |
| # if lipsynced_info: | |
| # face_width = lipsynced_info["width"] | |
| # face_height = lipsynced_info["height"] | |
| # logger.info( | |
| # f"Blending {face_width}x{face_height} at ({overlay_x}, {overlay_y}) " | |
| # f"(crop_bbox: {crop_bbox})" | |
| # ) | |
| # else: | |
| # face_width = crop_bbox["width"] | |
| # face_height = crop_bbox["height"] | |
| # logger.info(f"Blending at ({overlay_x}, {overlay_y})") | |
| # | |
| # mask_w = face_width | |
| # mask_h = face_height | |
| # | |
| # feather_radius = 50 | |
| # | |
| # ffmpeg = FFmpeg( | |
| # inputs={original_video: None, face_video: None}, | |
| # outputs={ | |
| # output_path: [ | |
| # "-filter_complex", | |
| # f"[0:v][1:v]overlay={overlay_x}:{overlay_y}", | |
| # "-c:v", | |
| # "libx264", | |
| # "-preset", | |
| # "slow", | |
| # "-crf", | |
| # "18", | |
| # "-profile:v", | |
| # "high", | |
| # "-pix_fmt", | |
| # "yuv420p", | |
| # "-threads", | |
| # "0", | |
| # "-movflags", | |
| # "+faststart", | |
| # "-c:a", | |
| # "copy", | |
| # "-loglevel", | |
| # "error", | |
| # "-y", | |
| # ] | |
| # }, | |
| # ) | |
| # try: | |
| # ffmpeg.run() | |
| # except FFRuntimeError as e: | |
| # logger.error(f"FFmpeg failed: {e}") | |
| # raise | |
| # logger.info(f"Blended face into original: {output_path}") | |
| # return output_path | |