Spaces:
Runtime error
Runtime error
| import cv2 | |
| import numpy as np | |
| import onnxruntime as rt | |
| import sys | |
| from insightface.app import FaceAnalysis | |
| sys.path.insert(1, './recognition') | |
| from scrfd import SCRFD | |
| from arcface_onnx import ArcFaceONNX | |
| import os.path as osp | |
| import os | |
| from pathlib import Path | |
| from tqdm import tqdm | |
| import ffmpeg | |
| import random | |
| import multiprocessing as mp | |
| from concurrent.futures import ThreadPoolExecutor | |
| from insightface.model_zoo.inswapper import INSwapper | |
| import psutil | |
| from enum import Enum | |
| from insightface.app.common import Face | |
| from insightface.utils.storage import ensure_available | |
| import re | |
| import subprocess | |
| import urllib.request | |
| # Face enhancement imports | |
| try: | |
| from gfpgan import GFPGANer | |
| GFPGAN_AVAILABLE = True | |
| except ImportError: | |
| GFPGAN_AVAILABLE = False | |
| print("GFPGAN not available - face enhancement disabled") | |
| class RefacerMode(Enum): | |
| CPU, CUDA, COREML, TENSORRT = range(1, 5) | |
| class Refacer: | |
| def __init__(self,force_cpu=False,colab_performance=False): | |
| self.first_face = False | |
| self.force_cpu = force_cpu | |
| self.colab_performance = colab_performance | |
| self.__check_encoders() | |
| self.__check_providers() | |
| self.total_mem = psutil.virtual_memory().total | |
| self.__init_apps() | |
| # Advanced temporal smoothing for reducing flickering | |
| self.prev_faces = [] # Store faces from previous frame | |
| self.face_tracking_threshold = 0.15 # Lower IOU threshold for fast motion tracking | |
| self.face_memory = {} # Track face state across frames | |
| self.occlusion_tolerance = 10 # Higher tolerance for fast motion and occlusions | |
| self.last_swapped_frame = None # Cache last successfully swapped frame for stability | |
| self.stable_swap_count = 0 # Count consecutive stable swaps | |
| # Quality enhancement settings | |
| self.enable_color_correction = True # Match skin tone and lighting | |
| self.enable_seamless_clone = False # Disabled - INSwapper already handles blending | |
| self.enable_temporal_blend = True # Smooth frame transitions | |
| self.temporal_blend_alpha = 0.15 # Blend 15% with previous frame | |
| self.prev_blended_frame = None # For temporal smoothing | |
| self.enable_face_enhancement = GFPGAN_AVAILABLE # Face restoration with GFPGAN | |
| self.face_enhancer = None | |
| # Initialize GFPGAN for face enhancement | |
| if self.enable_face_enhancement: | |
| try: | |
| print("Initializing GFPGAN face enhancer...") | |
| self.face_enhancer = GFPGANer( | |
| model_path='https://github.com/TencentARC/GFPGAN/releases/download/v1.3.0/GFPGANv1.3.pth', | |
| upscale=1, # Don't upscale, just enhance | |
| arch='clean', | |
| channel_multiplier=2, | |
| bg_upsampler=None # Don't enhance background | |
| ) | |
| print("GFPGAN initialized successfully!") | |
| except Exception as e: | |
| print(f"GFPGAN initialization failed: {e}") | |
| self.enable_face_enhancement = False | |
| def __check_providers(self): | |
| if self.force_cpu : | |
| self.providers = ['CPUExecutionProvider'] | |
| else: | |
| self.providers = rt.get_available_providers() | |
| rt.set_default_logger_severity(4) | |
| self.sess_options = rt.SessionOptions() | |
| self.sess_options.execution_mode = rt.ExecutionMode.ORT_SEQUENTIAL | |
| self.sess_options.graph_optimization_level = rt.GraphOptimizationLevel.ORT_ENABLE_ALL | |
| if len(self.providers) == 1 and 'CPUExecutionProvider' in self.providers: | |
| self.mode = RefacerMode.CPU | |
| self.use_num_cpus = mp.cpu_count()-1 | |
| self.sess_options.intra_op_num_threads = int(self.use_num_cpus/3) | |
| print(f"CPU mode with providers {self.providers}") | |
| elif self.colab_performance: | |
| self.mode = RefacerMode.TENSORRT | |
| self.use_num_cpus = mp.cpu_count()-1 | |
| self.sess_options.intra_op_num_threads = int(self.use_num_cpus/3) | |
| print(f"TENSORRT mode with providers {self.providers}") | |
| elif 'CoreMLExecutionProvider' in self.providers: | |
| self.mode = RefacerMode.COREML | |
| self.use_num_cpus = mp.cpu_count()-1 | |
| self.sess_options.intra_op_num_threads = int(self.use_num_cpus/3) | |
| print(f"CoreML mode with providers {self.providers}") | |
| elif 'CUDAExecutionProvider' in self.providers: | |
| self.mode = RefacerMode.CUDA | |
| self.use_num_cpus = 2 | |
| self.sess_options.intra_op_num_threads = 1 | |
| if 'TensorrtExecutionProvider' in self.providers: | |
| self.providers.remove('TensorrtExecutionProvider') | |
| print(f"CUDA mode with providers {self.providers}") | |
| """ | |
| elif 'TensorrtExecutionProvider' in self.providers: | |
| self.mode = RefacerMode.TENSORRT | |
| #self.use_num_cpus = 1 | |
| #self.sess_options.intra_op_num_threads = 1 | |
| self.use_num_cpus = mp.cpu_count()-1 | |
| self.sess_options.intra_op_num_threads = int(self.use_num_cpus/3) | |
| print(f"TENSORRT mode with providers {self.providers}") | |
| """ | |
| def __download_model(self, model_path): | |
| """Download the inswapper model if it doesn't exist""" | |
| if os.path.exists(model_path): | |
| return | |
| print(f"Model file {model_path} not found. Downloading...") | |
| # Direct download from reliable sources | |
| sources = [ | |
| { | |
| 'name': 'Hugging Face - ezioruan', | |
| 'url': 'https://huggingface.co/ezioruan/inswapper_128.onnx/resolve/main/inswapper_128.onnx', | |
| }, | |
| { | |
| 'name': 'Hugging Face - ashleykleynhans', | |
| 'url': 'https://huggingface.co/ashleykleynhans/inswapper/resolve/main/inswapper_128.onnx', | |
| }, | |
| { | |
| 'name': 'Hugging Face - public-data', | |
| 'url': 'https://huggingface.co/public-data/insightface/resolve/main/models/inswapper_128.onnx', | |
| } | |
| ] | |
| for source in sources: | |
| try: | |
| print(f"Trying to download from {source['name']}...") | |
| # Use urllib with headers to avoid blocking | |
| import ssl | |
| ssl._create_default_https_context = ssl._create_unverified_context | |
| req = urllib.request.Request( | |
| source['url'], | |
| headers={ | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' | |
| } | |
| ) | |
| with urllib.request.urlopen(req, timeout=300) as response: | |
| total_size = int(response.headers.get('content-length', 0)) | |
| print(f"Downloading {total_size / (1024*1024):.1f} MB...") | |
| with open(model_path, 'wb') as f: | |
| downloaded = 0 | |
| chunk_size = 8192 | |
| while True: | |
| chunk = response.read(chunk_size) | |
| if not chunk: | |
| break | |
| f.write(chunk) | |
| downloaded += len(chunk) | |
| if total_size > 0: | |
| percent = (downloaded / total_size) * 100 | |
| if int(percent) % 10 == 0: # Print every 10% | |
| print(f"Progress: {percent:.0f}%") | |
| # Verify the file | |
| if os.path.exists(model_path) and os.path.getsize(model_path) > 500000000: # > 500MB | |
| print(f"✅ Successfully downloaded model from {source['name']} ({os.path.getsize(model_path) / (1024*1024):.1f} MB)") | |
| return | |
| else: | |
| print(f"❌ Downloaded file seems incomplete (size: {os.path.getsize(model_path) if os.path.exists(model_path) else 0} bytes)") | |
| if os.path.exists(model_path): | |
| os.remove(model_path) | |
| except Exception as e: | |
| print(f"❌ Failed to download from {source['name']}: {str(e)}") | |
| if os.path.exists(model_path): | |
| os.remove(model_path) | |
| continue | |
| raise Exception( | |
| "❌ Failed to download inswapper_128.onnx from all sources.\n\n" | |
| "Please upload the model file manually:\n" | |
| "1. Download from: https://huggingface.co/ezioruan/inswapper_128.onnx/resolve/main/inswapper_128.onnx\n" | |
| "2. Upload to your Space via the Files tab\n" | |
| "3. Name it: inswapper_128.onnx" | |
| ) | |
| def __init_apps(self): | |
| assets_dir = ensure_available('models', 'buffalo_l', root='~/.insightface') | |
| model_path = os.path.join(assets_dir, 'det_10g.onnx') | |
| sess_face = rt.InferenceSession(model_path, self.sess_options, providers=self.providers) | |
| self.face_detector = SCRFD(model_path,sess_face) | |
| self.face_detector.prepare(0,input_size=(640, 640)) | |
| model_path = os.path.join(assets_dir , 'w600k_r50.onnx') | |
| sess_rec = rt.InferenceSession(model_path, self.sess_options, providers=self.providers) | |
| self.rec_app = ArcFaceONNX(model_path,sess_rec) | |
| self.rec_app.prepare(0) | |
| model_path = 'inswapper_128.onnx' | |
| # Download model if it doesn't exist | |
| self.__download_model(model_path) | |
| sess_swap = rt.InferenceSession(model_path, self.sess_options, providers=self.providers) | |
| self.face_swapper = INSwapper(model_path,sess_swap) | |
| def prepare_faces(self, faces): | |
| self.replacement_faces=[] | |
| for face in faces: | |
| #image1 = cv2.imread(face.origin) | |
| if "origin" in face: | |
| face_threshold = face['threshold'] | |
| bboxes1, kpss1 = self.face_detector.autodetect(face['origin'], max_num=1) | |
| if len(kpss1)<1: | |
| raise Exception('No face detected on "Face to replace" image') | |
| feat_original = self.rec_app.get(face['origin'], kpss1[0]) | |
| print(f"✅ Prepared SPECIFIC mode face swap (will match target face)") | |
| else: | |
| face_threshold = 0 | |
| self.first_face = True | |
| feat_original = None | |
| print('✅ Prepared SIMPLE mode face swap (will replace first face)') | |
| #image2 = cv2.imread(face.destination) | |
| _faces = self.__get_faces(face['destination'],max_num=1) | |
| if len(_faces)<1: | |
| raise Exception('No face detected on "Destination face" image') | |
| self.replacement_faces.append((feat_original,_faces[0],face_threshold)) | |
| def __convert_video(self,video_path,output_video_path): | |
| if self.video_has_audio: | |
| print("Merging audio with the refaced video...") | |
| new_path = output_video_path + str(random.randint(0,999)) + "_c.mp4" | |
| #stream = ffmpeg.input(output_video_path) | |
| in1 = ffmpeg.input(output_video_path) | |
| in2 = ffmpeg.input(video_path) | |
| out = ffmpeg.output(in1.video, in2.audio, new_path,video_bitrate=self.ffmpeg_video_bitrate,vcodec=self.ffmpeg_video_encoder) | |
| out.run(overwrite_output=True,quiet=True) | |
| else: | |
| new_path = output_video_path | |
| print("The video doesn't have audio, so post-processing is not necessary") | |
| print(f"The process has finished.\nThe refaced video can be found at {os.path.abspath(new_path)}") | |
| return new_path | |
| def __get_faces(self,frame,max_num=0): | |
| bboxes, kpss = self.face_detector.detect(frame,max_num=max_num,metric='default') | |
| if bboxes.shape[0] == 0: | |
| return [] | |
| ret = [] | |
| for i in range(bboxes.shape[0]): | |
| bbox = bboxes[i, 0:4] | |
| det_score = bboxes[i, 4] | |
| kps = None | |
| if kpss is not None: | |
| kps = kpss[i] | |
| face = Face(bbox=bbox, kps=kps, det_score=det_score) | |
| face.embedding = self.rec_app.get(frame, kps) | |
| ret.append(face) | |
| return ret | |
| def __compute_iou(self, box1, box2): | |
| """Compute Intersection over Union for face tracking""" | |
| x1 = max(box1[0], box2[0]) | |
| y1 = max(box1[1], box2[1]) | |
| x2 = min(box1[2], box2[2]) | |
| y2 = min(box1[3], box2[3]) | |
| intersection = max(0, x2 - x1) * max(0, y2 - y1) | |
| area1 = (box1[2] - box1[0]) * (box1[3] - box1[1]) | |
| area2 = (box2[2] - box2[0]) * (box2[3] - box2[1]) | |
| union = area1 + area2 - intersection | |
| return intersection / union if union > 0 else 0 | |
| def __enhance_face_gfpgan(self, swapped_face, bbox): | |
| """Enhance face quality using GFPGAN""" | |
| if not self.enable_face_enhancement or self.face_enhancer is None: | |
| return swapped_face | |
| try: | |
| x1, y1, x2, y2 = map(int, bbox) | |
| x1, y1 = max(0, x1), max(0, y1) | |
| x2, y2 = min(swapped_face.shape[1], x2), min(swapped_face.shape[0], y2) | |
| if x2 <= x1 or y2 <= y1: | |
| return swapped_face | |
| # Extract face region | |
| face_region = swapped_face[y1:y2, x1:x2].copy() | |
| # Enhance with GFPGAN | |
| _, _, enhanced_face = self.face_enhancer.enhance( | |
| face_region, | |
| has_aligned=False, | |
| only_center_face=True, | |
| paste_back=True | |
| ) | |
| if enhanced_face is not None: | |
| # Create result image | |
| result = swapped_face.copy() | |
| result[y1:y2, x1:x2] = enhanced_face | |
| return result | |
| else: | |
| return swapped_face | |
| except Exception as e: | |
| print(f"GFPGAN enhancement failed: {e}") | |
| return swapped_face | |
| def __color_correct_face(self, swapped_face, target_face, bbox): | |
| """Apply color correction to match lighting and skin tone""" | |
| try: | |
| x1, y1, x2, y2 = map(int, bbox) | |
| x1, y1 = max(0, x1), max(0, y1) | |
| x2, y2 = min(swapped_face.shape[1], x2), min(swapped_face.shape[0], y2) | |
| if x2 <= x1 or y2 <= y1: | |
| return swapped_face | |
| # Work on a copy to avoid modifying original | |
| result = swapped_face.copy() | |
| # Extract face regions | |
| swapped_region = result[y1:y2, x1:x2].copy() | |
| target_region = target_face[y1:y2, x1:x2] | |
| if swapped_region.size == 0 or target_region.size == 0: | |
| return swapped_face | |
| # Calculate mean and std for each channel | |
| for i in range(3): # BGR channels | |
| swapped_mean, swapped_std = cv2.meanStdDev(swapped_region[:,:,i]) | |
| target_mean, target_std = cv2.meanStdDev(target_region[:,:,i]) | |
| # Avoid division by zero | |
| if swapped_std[0][0] > 1: # Only if there's enough variance | |
| # Match the color distribution (subtle adjustment) | |
| factor = min(target_std[0][0] / swapped_std[0][0], 1.5) # Limit adjustment | |
| swapped_region[:,:,i] = np.clip( | |
| (swapped_region[:,:,i] - swapped_mean[0][0]) * factor * 0.5 + swapped_mean[0][0] * 0.5 + target_mean[0][0] * 0.5, | |
| 0, 255 | |
| ).astype(np.uint8) | |
| # Put corrected region back | |
| result[y1:y2, x1:x2] = swapped_region | |
| return result | |
| except Exception as e: | |
| print(f"Color correction failed: {e}") | |
| return swapped_face | |
| def __seamless_blend(self, swapped_face, target_face, bbox): | |
| """Apply seamless cloning for better edge integration""" | |
| try: | |
| x1, y1, x2, y2 = map(int, bbox) | |
| x1, y1 = max(0, x1), max(0, y1) | |
| x2, y2 = min(swapped_face.shape[1], x2), min(swapped_face.shape[0], y2) | |
| if x2 <= x1 or y2 <= y1: | |
| return swapped_face | |
| # Create center point for seamless clone | |
| center_x = (x1 + x2) // 2 | |
| center_y = (y1 + y2) // 2 | |
| center = (center_x, center_y) | |
| # Create mask for the face region | |
| mask = np.zeros(target_face.shape[:2], dtype=np.uint8) | |
| # Create elliptical mask for more natural blending | |
| width = x2 - x1 | |
| height = y2 - y1 | |
| cv2.ellipse(mask, center, (width//2, height//2), 0, 0, 360, 255, -1) | |
| # Apply Gaussian blur to mask for softer edges | |
| mask = cv2.GaussianBlur(mask, (15, 15), 0) | |
| # Use cv2.seamlessClone for better blending | |
| try: | |
| result = cv2.seamlessClone(swapped_face, target_face, mask, center, cv2.NORMAL_CLONE) | |
| return result | |
| except: | |
| # Fallback to alpha blending if seamless clone fails | |
| mask_3channel = cv2.cvtColor(mask, cv2.COLOR_GRAY2BGR) / 255.0 | |
| result = (swapped_face * mask_3channel + target_face * (1 - mask_3channel)).astype(np.uint8) | |
| return result | |
| except Exception as e: | |
| print(f"Seamless blending failed: {e}") | |
| return swapped_face | |
| def __temporal_smooth(self, current_frame): | |
| """Apply temporal smoothing to reduce frame-to-frame jitter""" | |
| if not self.enable_temporal_blend or self.prev_blended_frame is None: | |
| self.prev_blended_frame = current_frame.copy() | |
| return current_frame | |
| try: | |
| # Blend with previous frame for smoothness | |
| alpha = self.temporal_blend_alpha | |
| smoothed = cv2.addWeighted( | |
| current_frame, 1 - alpha, | |
| self.prev_blended_frame, alpha, | |
| 0 | |
| ) | |
| self.prev_blended_frame = smoothed.copy() | |
| return smoothed | |
| except: | |
| self.prev_blended_frame = current_frame.copy() | |
| return current_frame | |
| def __enhance_quality(self, swapped_frame, original_frame, bbox): | |
| """Apply all quality enhancements to the swapped frame""" | |
| result = swapped_frame.copy() | |
| # 1. GFPGAN face enhancement (if available) | |
| if self.enable_face_enhancement: | |
| try: | |
| result = self.__enhance_face_gfpgan(result, bbox) | |
| except Exception as e: | |
| print(f"Skipping GFPGAN enhancement: {e}") | |
| pass | |
| # 2. Subtle color correction to match lighting (optional, conservative) | |
| if self.enable_color_correction: | |
| try: | |
| result = self.__color_correct_face(result, original_frame, bbox) | |
| except Exception as e: | |
| print(f"Skipping color correction: {e}") | |
| pass | |
| # 3. Skip seamless blending - INSwapper already handles this | |
| # The seamless_clone was causing black backgrounds | |
| # 4. Light sharpening only if needed | |
| try: | |
| # Very subtle sharpening to maintain detail | |
| kernel = np.array([[0, -0.25, 0], | |
| [-0.25, 2, -0.25], | |
| [0, -0.25, 0]]) | |
| sharpened = cv2.filter2D(result, -1, kernel) | |
| # Blend 30% sharpened with 70% original | |
| result = cv2.addWeighted(result, 0.7, sharpened, 0.3, 0) | |
| except Exception as e: | |
| print(f"Skipping sharpening: {e}") | |
| pass | |
| # 5. Temporal smoothing for motion stability | |
| try: | |
| result = self.__temporal_smooth(result) | |
| except Exception as e: | |
| print(f"Skipping temporal smoothing: {e}") | |
| pass | |
| return result | |
| def process_first_face(self,frame): | |
| faces = self.__get_faces(frame,max_num=1) | |
| # Aggressive anti-flicker: handle no detection or weak detection | |
| if len(faces) == 0: | |
| # No face detected - check if we have stable swap history | |
| if 'first_face' in self.face_memory: | |
| self.face_memory['first_face']['frames_missing'] += 1 | |
| # During occlusion/fast motion, use last good swap to maintain stability | |
| if self.face_memory['first_face']['frames_missing'] <= self.occlusion_tolerance: | |
| if self.last_swapped_frame is not None and self.stable_swap_count > 3: | |
| # Use cached swap result for stability | |
| return self.last_swapped_frame.copy() | |
| return frame # Skip swapping but show original | |
| else: | |
| # Face gone too long, reset | |
| del self.face_memory['first_face'] | |
| self.last_swapped_frame = None | |
| self.stable_swap_count = 0 | |
| return frame | |
| # Face detected - determine if we should swap | |
| if len(faces) != 0: | |
| # Check if tracked face (motion consistency) | |
| tracked = False | |
| iou = 0 | |
| if 'first_face' in self.face_memory and len(self.prev_faces) > 0: | |
| iou = self.__compute_iou(faces[0].bbox, self.prev_faces[0].bbox) | |
| # Very lenient tracking for fast motion | |
| if iou > self.face_tracking_threshold: | |
| tracked = True | |
| # Decision: swap if good confidence OR tracked OR stable history | |
| should_swap = False | |
| if faces[0].det_score > 0.5: | |
| should_swap = True # High confidence | |
| elif tracked and faces[0].det_score > 0.3: | |
| should_swap = True # Tracked with reasonable confidence | |
| elif self.stable_swap_count > 5 and faces[0].det_score > 0.25: | |
| should_swap = True # Stable history, accept lower confidence | |
| if should_swap: | |
| # Perform basic swap | |
| swapped_frame = self.face_swapper.get(frame.copy(), faces[0], self.replacement_faces[0][1], paste_back=True) | |
| # Apply quality enhancements | |
| swapped_frame = self.__enhance_quality(swapped_frame, frame, faces[0].bbox) | |
| # Cache this successful swap | |
| self.last_swapped_frame = swapped_frame.copy() | |
| self.stable_swap_count += 1 | |
| # Update memory | |
| self.face_memory['first_face'] = { | |
| 'bbox': faces[0].bbox, | |
| 'confidence': faces[0].det_score, | |
| 'frames_missing': 0 | |
| } | |
| self.prev_faces = [faces[0]] | |
| return swapped_frame | |
| else: | |
| # Low confidence, not tracked - use cached if available | |
| if self.last_swapped_frame is not None and self.stable_swap_count > 3: | |
| return self.last_swapped_frame.copy() | |
| # Otherwise maintain tracking but don't swap | |
| self.prev_faces = [faces[0]] | |
| return frame | |
| return frame | |
| def process_faces(self,frame): | |
| faces = self.__get_faces(frame,max_num=0) | |
| # Handle no faces detected - use cached swap if stable | |
| if len(faces) == 0: | |
| if self.last_swapped_frame is not None and self.stable_swap_count > 5: | |
| return self.last_swapped_frame.copy() | |
| return frame | |
| # Aggressive temporal smoothing with motion tracking | |
| matched_faces = [] | |
| for face in faces: | |
| best_match = None | |
| best_iou = 0 | |
| face_id = None | |
| # Try to match with previous frame faces - lenient for fast motion | |
| for prev_idx, prev_face in enumerate(self.prev_faces): | |
| iou = self.__compute_iou(face.bbox, prev_face.bbox) | |
| if iou > best_iou and iou > self.face_tracking_threshold: | |
| best_iou = iou | |
| best_match = prev_face | |
| face_id = prev_idx | |
| matched_faces.append((face, best_match, best_iou, face_id)) | |
| # Process face swapping with aggressive stability | |
| swapped = False | |
| for rep_face in self.replacement_faces: | |
| for i in range(len(faces) - 1, -1, -1): | |
| matched_info = matched_faces[i] if i < len(matched_faces) else (faces[i], None, 0, None) | |
| current_face = matched_info[0] | |
| prev_match = matched_info[1] | |
| iou_score = matched_info[2] | |
| # SIMPLE MODE: If no origin face specified, swap the first/largest face | |
| if rep_face[0] is None: | |
| # Simple mode - just swap the first face we encounter | |
| if i == 0: # Only process first face | |
| print(f"🔄 SIMPLE MODE: Swapping first detected face (confidence: {current_face.det_score:.2f})") | |
| # Perform basic swap | |
| temp_frame = self.face_swapper.get(frame.copy(), faces[i], rep_face[1], paste_back=True) | |
| # Apply quality enhancements | |
| frame = self.__enhance_quality(temp_frame, frame, current_face.bbox) | |
| swapped = True | |
| del faces[i] | |
| break | |
| else: | |
| continue # Skip other faces in simple mode | |
| # SPECIFIC MODE: Match by face similarity | |
| # Very aggressive threshold reduction for tracked faces | |
| effective_threshold = rep_face[2] | |
| if prev_match is not None: | |
| if iou_score > 0.3: # Tracked with some overlap | |
| effective_threshold = max(0, rep_face[2] - 0.25) # Major boost | |
| elif iou_score > 0.1: # Weak tracking (fast motion) | |
| effective_threshold = max(0, rep_face[2] - 0.15) | |
| # More lenient confidence check for tracked faces | |
| min_confidence = 0.5 | |
| if prev_match is not None and iou_score > 0.2: | |
| min_confidence = 0.25 # Much lower for tracked | |
| elif self.stable_swap_count > 5: | |
| min_confidence = 0.35 # Lower if we have stable history | |
| if current_face.det_score < min_confidence: | |
| continue | |
| # Compute face similarity | |
| sim = self.rec_app.compute_sim(rep_face[0], faces[i].embedding) | |
| # Perform swap if similarity meets threshold | |
| if sim >= effective_threshold: | |
| # Perform basic swap | |
| temp_frame = self.face_swapper.get(frame.copy(), faces[i], rep_face[1], paste_back=True) | |
| # Apply quality enhancements | |
| frame = self.__enhance_quality(temp_frame, frame, current_face.bbox) | |
| swapped = True | |
| del faces[i] | |
| break | |
| # Update tracking state | |
| if swapped: | |
| self.last_swapped_frame = frame.copy() | |
| self.stable_swap_count += 1 | |
| else: | |
| self.stable_swap_count = max(0, self.stable_swap_count - 1) | |
| # Store current faces for next frame tracking | |
| self.prev_faces = self.__get_faces(frame, max_num=0) | |
| return frame | |
| def __check_video_has_audio(self,video_path): | |
| self.video_has_audio = False | |
| probe = ffmpeg.probe(video_path) | |
| audio_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'audio'), None) | |
| if audio_stream is not None: | |
| self.video_has_audio = True | |
| def reface_group(self, faces, frames, output): | |
| # Sequential processing to maintain temporal consistency | |
| # Parallel processing breaks face tracking and causes flickering | |
| process_func = self.process_first_face if self.first_face else self.process_faces | |
| for frame in tqdm(frames, desc="Processing frames"): | |
| result = process_func(frame) | |
| output.write(result) | |
| def reface(self, video_path, faces): | |
| self.__check_video_has_audio(video_path) | |
| output_video_path = os.path.join('out',Path(video_path).name) | |
| self.prepare_faces(faces) | |
| # Reset all temporal tracking for new video | |
| self.prev_faces = [] | |
| self.face_memory = {} | |
| self.last_swapped_frame = None | |
| self.stable_swap_count = 0 | |
| self.prev_blended_frame = None # Reset temporal smoothing | |
| cap = cv2.VideoCapture(video_path) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| print(f"Total frames: {total_frames}") | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| output = cv2.VideoWriter(output_video_path, fourcc, fps, (frame_width, frame_height)) | |
| frames=[] | |
| self.k = 1 | |
| with tqdm(total=total_frames,desc="Extracting frames") as pbar: | |
| while cap.isOpened(): | |
| flag, frame = cap.read() | |
| if flag and len(frame)>0: | |
| frames.append(frame.copy()) | |
| pbar.update() | |
| else: | |
| break | |
| if (len(frames) > 1000): | |
| self.reface_group(faces,frames,output) | |
| frames=[] | |
| cap.release() | |
| pbar.close() | |
| self.reface_group(faces,frames,output) | |
| frames=[] | |
| output.release() | |
| return self.__convert_video(video_path,output_video_path) | |
| def __try_ffmpeg_encoder(self, vcodec): | |
| print(f"Trying FFMPEG {vcodec} encoder") | |
| command = ['ffmpeg', '-y', '-f','lavfi','-i','testsrc=duration=1:size=1280x720:rate=30','-vcodec',vcodec,'testsrc.mp4'] | |
| try: | |
| subprocess.run(command, check=True, capture_output=True).stderr | |
| except subprocess.CalledProcessError as e: | |
| print(f"FFMPEG {vcodec} encoder doesn't work -> Disabled.") | |
| return False | |
| print(f"FFMPEG {vcodec} encoder works") | |
| return True | |
| def __check_encoders(self): | |
| self.ffmpeg_video_encoder='libx264' | |
| self.ffmpeg_video_bitrate='0' | |
| pattern = r"encoders: ([a-zA-Z0-9_]+(?: [a-zA-Z0-9_]+)*)" | |
| command = ['ffmpeg', '-codecs', '--list-encoders'] | |
| commandout = subprocess.run(command, check=True, capture_output=True).stdout | |
| result = commandout.decode('utf-8').split('\n') | |
| for r in result: | |
| if "264" in r: | |
| encoders = re.search(pattern, r).group(1).split(' ') | |
| for v_c in Refacer.VIDEO_CODECS: | |
| for v_k in encoders: | |
| if v_c == v_k: | |
| if self.__try_ffmpeg_encoder(v_k): | |
| self.ffmpeg_video_encoder=v_k | |
| self.ffmpeg_video_bitrate=Refacer.VIDEO_CODECS[v_k] | |
| print(f"Video codec for FFMPEG: {self.ffmpeg_video_encoder}") | |
| return | |
| VIDEO_CODECS = { | |
| 'h264_videotoolbox':'0', #osx HW acceleration | |
| 'h264_nvenc':'0', #NVIDIA HW acceleration | |
| #'h264_qsv', #Intel HW acceleration | |
| #'h264_vaapi', #Intel HW acceleration | |
| #'h264_omx', #HW acceleration | |
| 'libx264':'0' #No HW acceleration | |
| } | |