Spaces:
Runtime error
Runtime error
| import cv2 | |
| import numpy as np | |
| import mediapipe as mp | |
| from PIL import Image | |
| import os | |
| import tempfile | |
| from pathlib import Path | |
| import torch | |
| import torch.nn.functional as F | |
| from facenet_pytorch import MTCNN | |
| from utils import * | |
| class VideoCharacterReplacer: | |
| def __init__(self): | |
| """Initialize the video character replacer with detection and processing models""" | |
| self.mp_face_detection = mp.solutions.face_detection | |
| self.mp_drawing = mp.solutions.drawing_utils | |
| self.mp_face_mesh = mp.solutions.face_mesh | |
| self.face_detection = self.mp_face_detection.FaceDetection( | |
| model_selection=0, min_detection_confidence=0.5 | |
| ) | |
| self.face_mesh = self.mp_face_mesh.FaceMesh( | |
| static_image_mode=True, | |
| max_num_faces=1, | |
| refine_landmarks=True | |
| ) | |
| # Initialize MTCNN for more robust face detection | |
| self.mtcnn = MTCNN( | |
| image_size=224, | |
| margin=20, | |
| min_face_size=100, | |
| thresholds=[0.6, 0.7, 0.7], | |
| factor=0.709, | |
| post=True | |
| ) | |
| # Face swap model or technique will be implemented here | |
| self.face_swapper = FaceSwapper() | |
| def replace_character(self, ref_image_path, input_video_path, | |
| replacement_strength=0.8, detection_sensitivity=0.6, | |
| tracking_stability=0.7, preserve_background=True): | |
| """ | |
| Replace character in video with reference image | |
| Args: | |
| ref_image_path (str): Path to reference image | |
| input_video_path (str): Path to input video | |
| replacement_strength (float): Strength of replacement (0-1) | |
| detection_sensitivity (float): Detection sensitivity (0-1) | |
| tracking_stability (float): Tracking stability (0-1) | |
| preserve_background (bool): Whether to preserve background | |
| Returns: | |
| str: Path to output video | |
| """ | |
| try: | |
| # Load reference image | |
| ref_image = cv2.imread(ref_image_path) | |
| ref_image_rgb = cv2.cvtColor(ref_image, cv2.COLOR_BGR2RGB) | |
| # Initialize video capture | |
| cap = cv2.VideoCapture(input_video_path) | |
| # Get video properties | |
| fps = int(cap.get(cv2.CAP_PROP_FPS)) | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| # Setup output video writer | |
| output_path = tempfile.mktemp(suffix='.mp4') | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) | |
| # Process each frame | |
| prev_face_landmarks = None | |
| frame_count = 0 | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| frame_count += 1 | |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| # Detect faces in current frame | |
| faces = self.detect_faces(frame_rgb, detection_sensitivity) | |
| if faces: | |
| # Get the most prominent face | |
| face = faces[0] | |
| # Extract face landmarks | |
| landmarks = self.get_face_landmarks(frame_rgb, face) | |
| if landmarks: | |
| # Apply temporal consistency | |
| if prev_face_landmarks is not None and tracking_stability > 0.5: | |
| landmarks = self.apply_temporal_consistency( | |
| landmarks, prev_face_landmarks, tracking_stability | |
| ) | |
| # Replace character in frame | |
| processed_frame = self.face_swapper.replace_face( | |
| frame_rgb, | |
| ref_image_rgb, | |
| landmarks, | |
| replacement_strength, | |
| preserve_background | |
| ) | |
| prev_face_landmarks = landmarks.copy() | |
| else: | |
| processed_frame = frame_rgb | |
| else: | |
| processed_frame = frame_rgb | |
| # Convert back to BGR and write frame | |
| frame_bgr = cv2.cvtColor(processed_frame, cv2.COLOR_RGB2BGR) | |
| out.write(frame_bgr) | |
| # Release resources | |
| cap.release() | |
| out.release() | |
| return output_path | |
| except Exception as e: | |
| print(f"Error in video processing: {e}") | |
| return None | |
| def detect_faces(self, image, sensitivity=0.6): | |
| """ | |
| Detect faces in image using multiple methods | |
| Args: | |
| image (numpy.ndarray): Input image in RGB format | |
| sensitivity (float): Detection sensitivity (0-1) | |
| Returns: | |
| list: List of detected faces | |
| """ | |
| faces = [] | |
| # MediaPipe face detection | |
| results = self.face_detection.process(image) | |
| if results.detections: | |
| for detection in results.detections: | |
| bboxC = detection.location_data.relative_bounding_box | |
| ih, iw, _ = image.shape | |
| bbox = int(bboxC.xmin * iw), int(bboxC.ymin * ih), \ | |
| int(bboxC.width * iw), int(bboxC.height * ih) | |
| faces.append({ | |
| 'bbox': bbox, | |
| 'confidence': detection.score[0], | |
| 'method': 'mediapipe' | |
| }) | |
| # MTCNN for additional detection if sensitivity is high | |
| if sensitivity > 0.7: | |
| try: | |
| boxes, probs = self.mtcnn.detect(image) | |
| if boxes is not None: | |
| for box, prob in zip(boxes, probs): | |
| if prob > 0.9: | |
| faces.append({ | |
| 'bbox': [int(x) for x in box], | |
| 'confidence': prob, | |
| 'method': 'mtcnn' | |
| }) | |
| except Exception as e: | |
| print(f"MTCNN detection error: {e}") | |
| # Sort by confidence and remove overlaps | |
| faces = sorted(faces, key=lambda x: x['confidence'], reverse=True) | |
| return self.remove_overlapping_faces(faces) | |
| def get_face_landmarks(self, image, face): | |
| """ | |
| Extract facial landmarks for the detected face | |
| Args: | |
| image (numpy.ndarray): Input image | |
| face (dict): Face detection result | |
| Returns: | |
| numpy.ndarray: Facial landmarks | |
| """ | |
| try: | |
| # Use MediaPipe face mesh for detailed landmarks | |
| results = self.face_mesh.process(image) | |
| if results.multi_face_landmarks: | |
| # Get landmarks for the first (most confident) face | |
| landmarks = results.multi_face_landmarks[0] | |
| landmark_points = np.array([[lm.x * image.shape[1], lm.y * image.shape[0]] | |
| for lm in landmark.landmark]) | |
| return landmark_points | |
| except Exception as e: | |
| print(f"Landmark extraction error: {e}") | |
| # Fallback to basic bounding box if landmarks unavailable | |
| bbox = face['bbox'] | |
| return np.array([ | |
| [bbox[0], bbox[1]], # Top-left | |
| [bbox[0] + bbox[2], bbox[1]], # Top-right | |
| [bbox[0], bbox[1] + bbox[3]], # Bottom-left | |
| [bbox[0] + bbox[2], bbox[1] + bbox[3]] # Bottom-right | |
| ]) | |
| def apply_temporal_consistency(self, current_landmarks, prev_landmarks, stability): | |
| """ | |
| Apply temporal consistency to smooth landmark tracking | |
| Args: | |
| current_landmarks (numpy.ndarray): Current frame landmarks | |
| prev_landmarks (numpy.ndarray): Previous frame landmarks | |
| stability (float): Stability factor (0-1) | |
| Returns: | |
| numpy.ndarray: Stabilized landmarks | |
| """ | |
| # Simple smoothing based on previous frame | |
| alpha = stability | |
| stabilized = alpha * prev_landmarks + (1 - alpha) * current_landmarks | |
| return stabilized | |
| def remove_overlapping_faces(self, faces, overlap_threshold=0.5): | |
| """ | |
| Remove overlapping face detections | |
| Args: | |
| faces (list): List of face detections | |
| overlap_threshold (float): IoU threshold for overlap removal | |
| Returns: | |
| list: Non-overlapping face detections | |
| """ | |
| if len(faces) <= 1: | |
| return faces | |
| non_overlapping = [] | |
| for i, face1 in enumerate(faces): | |
| bbox1 = face1['bbox'] | |
| keep = True | |
| for j, face2 in enumerate(faces): | |
| if i != j: | |
| bbox2 = face2['bbox'] | |
| # Calculate IoU | |
| x1 = max(bbox1[0], bbox2[0]) | |
| y1 = max(bbox1[1], bbox2[1]) | |
| x2 = min(bbox1[0] + bbox1[2], bbox2[0] + bbox2[2]) | |
| y2 = min(bbox1[1] + bbox1[3], bbox2[1] + bbox2[3]) | |
| if x2 > x1 and y2 > y1: | |
| intersection = (x2 - x1) * (y2 - y1) | |
| union = (bbox1[2] * bbox1[3]) + (bbox2[2] * bbox2[3]) - intersection | |
| iou = intersection / union if union > 0 else 0 | |
| if iou > overlap_threshold: | |
| # Keep the face with higher confidence | |
| if face2['confidence'] > face1['confidence']: | |
| keep = False | |
| break | |
| if keep: | |
| non_overlapping.append(face1) | |
| return non_overlapping | |
| class FaceSwapper: | |
| def __init__(self): | |
| """Initialize face swapping functionality""" | |
| self.face_analyzer = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') | |
| def replace_face(self, target_image, source_image, landmarks, | |
| replacement_strength=0.8, preserve_background=True): | |
| """ | |
| Replace face in target image with face from source image | |
| Args: | |
| target_image (numpy.ndarray): Target image | |
| source_image (numpy.ndarray): Source image with replacement face | |
| landmarks (numpy.ndarray): Facial landmarks | |
| replacement_strength (float): Replacement strength (0-1) | |
| preserve_background (bool): Whether to preserve background | |
| Returns: | |
| numpy.ndarray: Image with replaced face | |
| """ | |
| try: | |
| # Create a mask based on facial landmarks | |
| mask = self.create_face_mask(target_image, landmarks) | |
| # Apply color transfer for better blending | |
| source_face = self.extract_face_region(source_image, landmarks) | |
| target_face = self.extract_face_region(target_image, landmarks) | |
| # Apply color matching if preserve_background is True | |
| if preserve_background: | |
| source_face = self.match_color_statistics(source_face, target_face) | |
| # Blend the faces | |
| result = target_image.copy() | |
| for i in range(3): # For each color channel | |
| result[:, :, i] = (1 - replacement_strength) * target_image[:, :, i] + \ | |
| replacement_strength * source_face[:, :, i] * mask + \ | |
| target_image[:, :, i] * (1 - mask) | |
| return result.astype(np.uint8) | |
| except Exception as e: | |
| print(f"Face replacement error: {e}") | |
| return target_image | |
| def create_face_mask(self, image, landmarks): | |
| """ | |
| Create a mask for the face region | |
| Args: | |
| image (numpy.ndarray): Input image | |
| landmarks (numpy.ndarray): Facial landmarks | |
| Returns: | |
| numpy.ndarray: Face mask | |
| """ | |
| mask = np.zeros(image.shape[:2], dtype=np.float32) | |
| # Use convex hull of landmarks to create face mask | |
| hull = cv2.convexHull(landmarks.astype(np.int32)) | |
| cv2.fillPoly(mask, [hull], 1.0) | |
| # Apply Gaussian blur for smooth edges | |
| mask = cv2.GaussianBlur(mask, (15, 15), 0) | |
| return mask | |
| def extract_face_region(self, image, landmarks): | |
| """ | |
| Extract face region based on landmarks | |
| Args: | |
| image (numpy.ndarray): Input image | |
| landmarks (numpy.ndarray): Facial landmarks | |
| Returns: | |
| numpy.ndarray: Extracted face region | |
| """ | |
| # Get bounding box of face | |
| x_min = int(np.min(landmarks[:, 0])) | |
| x_max = int(np.max(landmarks[:, 0])) | |
| y_min = int(np.min(landmarks[:, 1])) | |
| y_max = int(np.max(landmarks[:, 1])) | |
| # Expand bounding box slightly | |
| padding = 20 | |
| x_min = max(0, x_min - padding) | |
| x_max = min(image.shape[1], x_max + padding) | |
| y_min = max(0, y_min - padding) | |
| y_max = min(image.shape[0], y_max + padding) | |
| return image[y_min:y_max, x_min:x_max] | |
| def match_color_statistics(self, source, target): | |
| """ | |
| Match color statistics between source and target faces | |
| Args: | |
| source (numpy.ndarray): Source face | |
| target (numpy.ndarray): Target face | |
| Returns: | |
| numpy.ndarray: Color-matched source face | |
| """ | |
| result = source.copy().astype(np.float32) | |
| for i in range(3): # For each color channel | |
| source_mean = np.mean(source[:, :, i]) | |
| source_std = np.std(source[:, :, i]) | |
| target_mean = np.mean(target[:, :, i]) | |
| target_std = np.std(target[:, :, i]) | |
| # Avoid division by zero | |
| if source_std > 0: | |
| result[:, :, i] = (source[:, :, i] - source_mean) * (target_std / source_std) + target_mean | |
| return np.clip(result, 0, 255).astype(np.uint8) |