| import cv2 |
| import onnxruntime as rt |
| import sys |
| from insightface.app import FaceAnalysis |
| sys.path.insert(1, './recognition') |
| from scrfd import SCRFD |
| from arcface_onnx import ArcFaceONNX |
| import os.path as osp |
| import os |
| from pathlib import Path |
| from tqdm import tqdm |
| import ffmpeg |
| import random |
| import multiprocessing as mp |
| from concurrent.futures import ThreadPoolExecutor |
| from insightface.model_zoo.inswapper import INSwapper |
| import psutil |
| from enum import Enum |
| from insightface.app.common import Face |
| from insightface.utils.storage import ensure_available |
| import re |
| import subprocess |
|
|
| class RefacerMode(Enum): |
| CPU, CUDA, COREML, TENSORRT = range(1, 5) |
|
|
| class Refacer: |
| def __init__(self,force_cpu=False,colab_performance=False): |
| self.first_face = False |
| self.force_cpu = force_cpu |
| self.colab_performance = colab_performance |
| self.__check_encoders() |
| self.__check_providers() |
| self.total_mem = psutil.virtual_memory().total |
| self.__init_apps() |
|
|
| def __check_providers(self): |
| if self.force_cpu : |
| self.providers = ['CPUExecutionProvider'] |
| else: |
| self.providers = rt.get_available_providers() |
| rt.set_default_logger_severity(4) |
| self.sess_options = rt.SessionOptions() |
| self.sess_options.execution_mode = rt.ExecutionMode.ORT_SEQUENTIAL |
| self.sess_options.graph_optimization_level = rt.GraphOptimizationLevel.ORT_ENABLE_ALL |
|
|
| if len(self.providers) == 1 and 'CPUExecutionProvider' in self.providers: |
| self.mode = RefacerMode.CPU |
| self.use_num_cpus = mp.cpu_count()-1 |
| self.sess_options.intra_op_num_threads = int(self.use_num_cpus/3) |
| print(f"CPU mode with providers {self.providers}") |
| elif self.colab_performance: |
| self.mode = RefacerMode.TENSORRT |
| self.use_num_cpus = mp.cpu_count()-1 |
| self.sess_options.intra_op_num_threads = int(self.use_num_cpus/3) |
| print(f"TENSORRT mode with providers {self.providers}") |
| elif 'CoreMLExecutionProvider' in self.providers: |
| self.mode = RefacerMode.COREML |
| self.use_num_cpus = mp.cpu_count()-1 |
| self.sess_options.intra_op_num_threads = int(self.use_num_cpus/3) |
| print(f"CoreML mode with providers {self.providers}") |
| elif 'CUDAExecutionProvider' in self.providers: |
| self.mode = RefacerMode.CUDA |
| self.use_num_cpus = 2 |
| self.sess_options.intra_op_num_threads = 1 |
| if 'TensorrtExecutionProvider' in self.providers: |
| self.providers.remove('TensorrtExecutionProvider') |
| print(f"CUDA mode with providers {self.providers}") |
|
|
| def __init_apps(self): |
| assets_dir = ensure_available('models', 'buffalo_l', root='~/.insightface') |
|
|
| model_path = os.path.join(assets_dir, 'det_10g.onnx') |
| sess_face = rt.InferenceSession(model_path, self.sess_options, providers=self.providers) |
| self.face_detector = SCRFD(model_path,sess_face) |
| self.face_detector.prepare(0,input_size=(640, 640)) |
|
|
| model_path = os.path.join(assets_dir , 'w600k_r50.onnx') |
| sess_rec = rt.InferenceSession(model_path, self.sess_options, providers=self.providers) |
| self.rec_app = ArcFaceONNX(model_path,sess_rec) |
| self.rec_app.prepare(0) |
|
|
| model_path = 'inswapper_128.onnx' |
| sess_swap = rt.InferenceSession(model_path, self.sess_options, providers=self.providers) |
| self.face_swapper = INSwapper(model_path,sess_swap) |
|
|
| def prepare_faces(self, faces): |
| self.replacement_faces=[] |
| for face in faces: |
| if "origin" in face: |
| face_threshold = face['threshold'] |
| bboxes1, kpss1 = self.face_detector.autodetect(face['origin'], max_num=1) |
| if len(kpss1)<1: |
| raise Exception('No face detected on "Face to replace" image') |
| feat_original = self.rec_app.get(face['origin'], kpss1[0]) |
| else: |
| face_threshold = 0 |
| self.first_face = True |
| feat_original = None |
| print('No origin image: First face change') |
| _faces = self.__get_faces(face['destination'],max_num=1) |
| if len(_faces)<1: |
| raise Exception('No face detected on "Destination face" image') |
| self.replacement_faces.append((feat_original,_faces[0],face_threshold)) |
|
|
| def __convert_video(self,video_path,output_video_path): |
| if self.video_has_audio: |
| print("Merging audio with the refaced video...") |
| new_path = output_video_path + str(random.randint(0,999)) + "_c.mp4" |
| in1 = ffmpeg.input(output_video_path) |
| in2 = ffmpeg.input(video_path) |
| out = ffmpeg.output(in1.video, in2.audio, new_path,video_bitrate=self.ffmpeg_video_bitrate,vcodec=self.ffmpeg_video_encoder) |
| out.run(overwrite_output=True,quiet=True) |
| else: |
| new_path = output_video_path |
| print("The video doesn't have audio, so post-processing is not necessary") |
| |
| print(f"The process has finished.\nThe refaced video can be found at {os.path.abspath(new_path)}") |
| return new_path |
|
|
| def __get_faces(self,frame,max_num=0): |
| bboxes, kpss = self.face_detector.detect(frame,max_num=max_num,metric='default') |
|
|
| if bboxes.shape[0] == 0: |
| return [] |
| ret = [] |
| for i in range(bboxes.shape[0]): |
| bbox = bboxes[i, 0:4] |
| det_score = bboxes[i, 4] |
| kps = None |
| if kpss is not None: |
| kps = kpss[i] |
| face = Face(bbox=bbox, kps=kps, det_score=det_score) |
| face.embedding = self.rec_app.get(frame, kps) |
| ret.append(face) |
| return ret |
|
|
| def process_first_face(self,frame): |
| faces = self.__get_faces(frame,max_num=1) |
| if len(faces) != 0: |
| frame = self.face_swapper.get(frame, faces[0], self.replacement_faces[0][1], paste_back=True) |
| return frame |
|
|
| def process_faces(self,frame): |
| faces = self.__get_faces(frame,max_num=0) |
| for rep_face in self.replacement_faces: |
| for i in range(len(faces) - 1, -1, -1): |
| sim = self.rec_app.compute_sim(rep_face[0], faces[i].embedding) |
| if sim>=rep_face[2]: |
| frame = self.face_swapper.get(frame, faces[i], rep_face[1], paste_back=True) |
| del faces[i] |
| break |
| return frame |
|
|
| def __check_video_has_audio(self,video_path): |
| self.video_has_audio = False |
| probe = ffmpeg.probe(video_path) |
| audio_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'audio'), None) |
| if audio_stream is not None: |
| self.video_has_audio = True |
| |
| def reface_group(self, faces, frames, output): |
| with ThreadPoolExecutor(max_workers = self.use_num_cpus) as executor: |
| if self.first_face: |
| results = list(tqdm(executor.map(self.process_first_face, frames), total=len(frames),desc="Processing frames")) |
| else: |
| results = list(tqdm(executor.map(self.process_faces, frames), total=len(frames),desc="Processing frames")) |
| for result in results: |
| output.write(result) |
|
|
| def reface(self, video_path, faces): |
| self.__check_video_has_audio(video_path) |
| output_video_path = os.path.join('out',Path(video_path).name) |
| self.prepare_faces(faces) |
|
|
| cap = cv2.VideoCapture(video_path) |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
| print(f"Total frames: {total_frames}") |
| |
| fps = cap.get(cv2.CAP_PROP_FPS) |
| frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
| frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
| output = cv2.VideoWriter(output_video_path, fourcc, fps, (frame_width, frame_height)) |
| |
| frames=[] |
| self.k = 1 |
| with tqdm(total=total_frames,desc="Extracting frames") as pbar: |
| while cap.isOpened(): |
| flag, frame = cap.read() |
| if flag and len(frame)>0: |
| frames.append(frame.copy()) |
| pbar.update() |
| else: |
| break |
| if (len(frames) > 1000): |
| self.reface_group(faces,frames,output) |
| frames=[] |
|
|
| cap.release() |
| pbar.close() |
|
|
| self.reface_group(faces,frames,output) |
| frames=[] |
| output.release() |
| |
| return self.__convert_video(video_path,output_video_path) |
| |
| def __try_ffmpeg_encoder(self, vcodec): |
| print(f"Trying FFMPEG {vcodec} encoder") |
| command = ['ffmpeg', '-y', '-f','lavfi','-i','testsrc=duration=1:size=1280x720:rate=30','-vcodec',vcodec,'testsrc.mp4'] |
| try: |
| subprocess.run(command, check=True, capture_output=True).stderr |
| except subprocess.CalledProcessError as e: |
| print(f"FFMPEG {vcodec} encoder doesn't work -> Disabled.") |
| return False |
| print(f"FFMPEG {vcodec} encoder works") |
| return True |
| |
| def __check_encoders(self): |
| self.ffmpeg_video_encoder='libx264' |
| self.ffmpeg_video_bitrate='0' |
|
|
| if self.__try_ffmpeg_encoder('libx265'): |
| self.ffmpeg_video_encoder = 'libx265' |
| elif self.__try_ffmpeg_encoder('libvpx-vp9'): |
| self.ffmpeg_video_encoder = 'libvpx-vp9' |
|
|