import cv2 import numpy as np import mediapipe as mp from PIL import Image import os import tempfile from pathlib import Path import torch import torch.nn.functional as F from facenet_pytorch import MTCNN from utils import * class VideoCharacterReplacer: def __init__(self): """Initialize the video character replacer with detection and processing models""" self.mp_face_detection = mp.solutions.face_detection self.mp_drawing = mp.solutions.drawing_utils self.mp_face_mesh = mp.solutions.face_mesh self.face_detection = self.mp_face_detection.FaceDetection( model_selection=0, min_detection_confidence=0.5 ) self.face_mesh = self.mp_face_mesh.FaceMesh( static_image_mode=True, max_num_faces=1, refine_landmarks=True ) # Initialize MTCNN for more robust face detection self.mtcnn = MTCNN( image_size=224, margin=20, min_face_size=100, thresholds=[0.6, 0.7, 0.7], factor=0.709, post=True ) # Face swap model or technique will be implemented here self.face_swapper = FaceSwapper() def replace_character(self, ref_image_path, input_video_path, replacement_strength=0.8, detection_sensitivity=0.6, tracking_stability=0.7, preserve_background=True): """ Replace character in video with reference image Args: ref_image_path (str): Path to reference image input_video_path (str): Path to input video replacement_strength (float): Strength of replacement (0-1) detection_sensitivity (float): Detection sensitivity (0-1) tracking_stability (float): Tracking stability (0-1) preserve_background (bool): Whether to preserve background Returns: str: Path to output video """ try: # Load reference image ref_image = cv2.imread(ref_image_path) ref_image_rgb = cv2.cvtColor(ref_image, cv2.COLOR_BGR2RGB) # Initialize video capture cap = cv2.VideoCapture(input_video_path) # Get video properties fps = int(cap.get(cv2.CAP_PROP_FPS)) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) # Setup output video writer output_path = tempfile.mktemp(suffix='.mp4') fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) # Process each frame prev_face_landmarks = None frame_count = 0 while True: ret, frame = cap.read() if not ret: break frame_count += 1 frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # Detect faces in current frame faces = self.detect_faces(frame_rgb, detection_sensitivity) if faces: # Get the most prominent face face = faces[0] # Extract face landmarks landmarks = self.get_face_landmarks(frame_rgb, face) if landmarks: # Apply temporal consistency if prev_face_landmarks is not None and tracking_stability > 0.5: landmarks = self.apply_temporal_consistency( landmarks, prev_face_landmarks, tracking_stability ) # Replace character in frame processed_frame = self.face_swapper.replace_face( frame_rgb, ref_image_rgb, landmarks, replacement_strength, preserve_background ) prev_face_landmarks = landmarks.copy() else: processed_frame = frame_rgb else: processed_frame = frame_rgb # Convert back to BGR and write frame frame_bgr = cv2.cvtColor(processed_frame, cv2.COLOR_RGB2BGR) out.write(frame_bgr) # Release resources cap.release() out.release() return output_path except Exception as e: print(f"Error in video processing: {e}") return None def detect_faces(self, image, sensitivity=0.6): """ Detect faces in image using multiple methods Args: image (numpy.ndarray): Input image in RGB format sensitivity (float): Detection sensitivity (0-1) Returns: list: List of detected faces """ faces = [] # MediaPipe face detection results = self.face_detection.process(image) if results.detections: for detection in results.detections: bboxC = detection.location_data.relative_bounding_box ih, iw, _ = image.shape bbox = int(bboxC.xmin * iw), int(bboxC.ymin * ih), \ int(bboxC.width * iw), int(bboxC.height * ih) faces.append({ 'bbox': bbox, 'confidence': detection.score[0], 'method': 'mediapipe' }) # MTCNN for additional detection if sensitivity is high if sensitivity > 0.7: try: boxes, probs = self.mtcnn.detect(image) if boxes is not None: for box, prob in zip(boxes, probs): if prob > 0.9: faces.append({ 'bbox': [int(x) for x in box], 'confidence': prob, 'method': 'mtcnn' }) except Exception as e: print(f"MTCNN detection error: {e}") # Sort by confidence and remove overlaps faces = sorted(faces, key=lambda x: x['confidence'], reverse=True) return self.remove_overlapping_faces(faces) def get_face_landmarks(self, image, face): """ Extract facial landmarks for the detected face Args: image (numpy.ndarray): Input image face (dict): Face detection result Returns: numpy.ndarray: Facial landmarks """ try: # Use MediaPipe face mesh for detailed landmarks results = self.face_mesh.process(image) if results.multi_face_landmarks: # Get landmarks for the first (most confident) face landmarks = results.multi_face_landmarks[0] landmark_points = np.array([[lm.x * image.shape[1], lm.y * image.shape[0]] for lm in landmark.landmark]) return landmark_points except Exception as e: print(f"Landmark extraction error: {e}") # Fallback to basic bounding box if landmarks unavailable bbox = face['bbox'] return np.array([ [bbox[0], bbox[1]], # Top-left [bbox[0] + bbox[2], bbox[1]], # Top-right [bbox[0], bbox[1] + bbox[3]], # Bottom-left [bbox[0] + bbox[2], bbox[1] + bbox[3]] # Bottom-right ]) def apply_temporal_consistency(self, current_landmarks, prev_landmarks, stability): """ Apply temporal consistency to smooth landmark tracking Args: current_landmarks (numpy.ndarray): Current frame landmarks prev_landmarks (numpy.ndarray): Previous frame landmarks stability (float): Stability factor (0-1) Returns: numpy.ndarray: Stabilized landmarks """ # Simple smoothing based on previous frame alpha = stability stabilized = alpha * prev_landmarks + (1 - alpha) * current_landmarks return stabilized def remove_overlapping_faces(self, faces, overlap_threshold=0.5): """ Remove overlapping face detections Args: faces (list): List of face detections overlap_threshold (float): IoU threshold for overlap removal Returns: list: Non-overlapping face detections """ if len(faces) <= 1: return faces non_overlapping = [] for i, face1 in enumerate(faces): bbox1 = face1['bbox'] keep = True for j, face2 in enumerate(faces): if i != j: bbox2 = face2['bbox'] # Calculate IoU x1 = max(bbox1[0], bbox2[0]) y1 = max(bbox1[1], bbox2[1]) x2 = min(bbox1[0] + bbox1[2], bbox2[0] + bbox2[2]) y2 = min(bbox1[1] + bbox1[3], bbox2[1] + bbox2[3]) if x2 > x1 and y2 > y1: intersection = (x2 - x1) * (y2 - y1) union = (bbox1[2] * bbox1[3]) + (bbox2[2] * bbox2[3]) - intersection iou = intersection / union if union > 0 else 0 if iou > overlap_threshold: # Keep the face with higher confidence if face2['confidence'] > face1['confidence']: keep = False break if keep: non_overlapping.append(face1) return non_overlapping class FaceSwapper: def __init__(self): """Initialize face swapping functionality""" self.face_analyzer = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') def replace_face(self, target_image, source_image, landmarks, replacement_strength=0.8, preserve_background=True): """ Replace face in target image with face from source image Args: target_image (numpy.ndarray): Target image source_image (numpy.ndarray): Source image with replacement face landmarks (numpy.ndarray): Facial landmarks replacement_strength (float): Replacement strength (0-1) preserve_background (bool): Whether to preserve background Returns: numpy.ndarray: Image with replaced face """ try: # Create a mask based on facial landmarks mask = self.create_face_mask(target_image, landmarks) # Apply color transfer for better blending source_face = self.extract_face_region(source_image, landmarks) target_face = self.extract_face_region(target_image, landmarks) # Apply color matching if preserve_background is True if preserve_background: source_face = self.match_color_statistics(source_face, target_face) # Blend the faces result = target_image.copy() for i in range(3): # For each color channel result[:, :, i] = (1 - replacement_strength) * target_image[:, :, i] + \ replacement_strength * source_face[:, :, i] * mask + \ target_image[:, :, i] * (1 - mask) return result.astype(np.uint8) except Exception as e: print(f"Face replacement error: {e}") return target_image def create_face_mask(self, image, landmarks): """ Create a mask for the face region Args: image (numpy.ndarray): Input image landmarks (numpy.ndarray): Facial landmarks Returns: numpy.ndarray: Face mask """ mask = np.zeros(image.shape[:2], dtype=np.float32) # Use convex hull of landmarks to create face mask hull = cv2.convexHull(landmarks.astype(np.int32)) cv2.fillPoly(mask, [hull], 1.0) # Apply Gaussian blur for smooth edges mask = cv2.GaussianBlur(mask, (15, 15), 0) return mask def extract_face_region(self, image, landmarks): """ Extract face region based on landmarks Args: image (numpy.ndarray): Input image landmarks (numpy.ndarray): Facial landmarks Returns: numpy.ndarray: Extracted face region """ # Get bounding box of face x_min = int(np.min(landmarks[:, 0])) x_max = int(np.max(landmarks[:, 0])) y_min = int(np.min(landmarks[:, 1])) y_max = int(np.max(landmarks[:, 1])) # Expand bounding box slightly padding = 20 x_min = max(0, x_min - padding) x_max = min(image.shape[1], x_max + padding) y_min = max(0, y_min - padding) y_max = min(image.shape[0], y_max + padding) return image[y_min:y_max, x_min:x_max] def match_color_statistics(self, source, target): """ Match color statistics between source and target faces Args: source (numpy.ndarray): Source face target (numpy.ndarray): Target face Returns: numpy.ndarray: Color-matched source face """ result = source.copy().astype(np.float32) for i in range(3): # For each color channel source_mean = np.mean(source[:, :, i]) source_std = np.std(source[:, :, i]) target_mean = np.mean(target[:, :, i]) target_std = np.std(target[:, :, i]) # Avoid division by zero if source_std > 0: result[:, :, i] = (source[:, :, i] - source_mean) * (target_std / source_std) + target_mean return np.clip(result, 0, 255).astype(np.uint8)