| import os
|
| import cv2
|
| import numpy as np
|
| import psutil
|
|
|
| from roop.ProcessOptions import ProcessOptions
|
|
|
| from roop.face_util import get_first_face, get_all_faces, rotate_anticlockwise, rotate_clockwise, clamp_cut_values
|
| from roop.utilities import compute_cosine_distance, get_device, str_to_class, shuffle_array
|
| import roop.vr_util as vr
|
|
|
| from typing import Any, List, Callable
|
| from roop.typing import Frame, Face
|
| from concurrent.futures import ThreadPoolExecutor, as_completed
|
| from threading import Thread, Lock
|
| from queue import Queue
|
| from tqdm import tqdm
|
| from roop.ffmpeg_writer import FFMPEG_VideoWriter
|
| from roop.StreamWriter import StreamWriter
|
| import roop.globals
|
|
|
|
|
|
|
|
|
| class eNoFaceAction():
|
| USE_ORIGINAL_FRAME = 0
|
| RETRY_ROTATED = 1
|
| SKIP_FRAME = 2
|
| SKIP_FRAME_IF_DISSIMILAR = 3,
|
| USE_LAST_SWAPPED = 4
|
|
|
|
|
|
|
| def create_queue(temp_frame_paths: List[str]) -> Queue[str]:
|
| queue: Queue[str] = Queue()
|
| for frame_path in temp_frame_paths:
|
| queue.put(frame_path)
|
| return queue
|
|
|
|
|
| def pick_queue(queue: Queue[str], queue_per_future: int) -> List[str]:
|
| queues = []
|
| for _ in range(queue_per_future):
|
| if not queue.empty():
|
| queues.append(queue.get())
|
| return queues
|
|
|
|
|
|
|
| class ProcessMgr():
|
| input_face_datas = []
|
| target_face_datas = []
|
|
|
| imagemask = None
|
|
|
| processors = []
|
| options : ProcessOptions = None
|
|
|
| num_threads = 1
|
| current_index = 0
|
| processing_threads = 1
|
| buffer_wait_time = 0.1
|
|
|
| lock = Lock()
|
|
|
| frames_queue = None
|
| processed_queue = None
|
|
|
| videowriter= None
|
| streamwriter = None
|
|
|
| progress_gradio = None
|
| total_frames = 0
|
|
|
| num_frames_no_face = 0
|
| last_swapped_frame = None
|
|
|
| output_to_file = None
|
| output_to_cam = None
|
|
|
|
|
| plugins = {
|
| 'faceswap' : 'FaceSwapInsightFace',
|
| 'mask_clip2seg' : 'Mask_Clip2Seg',
|
| 'mask_xseg' : 'Mask_XSeg',
|
| 'codeformer' : 'Enhance_CodeFormer',
|
| 'gfpgan' : 'Enhance_GFPGAN',
|
| 'dmdnet' : 'Enhance_DMDNet',
|
| 'gpen' : 'Enhance_GPEN',
|
| 'restoreformer++' : 'Enhance_RestoreFormerPPlus',
|
| 'colorizer' : 'Frame_Colorizer',
|
| 'filter_generic' : 'Frame_Filter',
|
| 'removebg' : 'Frame_Masking',
|
| 'upscale' : 'Frame_Upscale'
|
| }
|
|
|
| def __init__(self, progress):
|
| if progress is not None:
|
| self.progress_gradio = progress
|
|
|
| def reuseOldProcessor(self, name:str):
|
| for p in self.processors:
|
| if p.processorname == name:
|
| return p
|
|
|
| return None
|
|
|
|
|
| def initialize(self, input_faces, target_faces, options):
|
| self.input_face_datas = input_faces
|
| self.target_face_datas = target_faces
|
| self.num_frames_no_face = 0
|
| self.last_swapped_frame = None
|
| self.options = options
|
| devicename = get_device()
|
|
|
| roop.globals.g_desired_face_analysis=["landmark_3d_68", "landmark_2d_106","detection","recognition"]
|
| if options.swap_mode == "all_female" or options.swap_mode == "all_male":
|
| roop.globals.g_desired_face_analysis.append("genderage")
|
| elif options.swap_mode == "all_random":
|
|
|
| self.input_face_datas = input_faces.copy()
|
| shuffle_array(self.input_face_datas)
|
|
|
|
|
| for p in self.processors:
|
| newp = next((x for x in options.processors.keys() if x == p.processorname), None)
|
| if newp is None:
|
| p.Release()
|
| del p
|
|
|
| newprocessors = []
|
| for key, extoption in options.processors.items():
|
| p = self.reuseOldProcessor(key)
|
| if p is None:
|
| classname = self.plugins[key]
|
| module = 'roop.processors.' + classname
|
| p = str_to_class(module, classname)
|
| if p is not None:
|
| extoption.update({"devicename": devicename})
|
| if p.type == "swap":
|
| if self.options.swap_modelname == "InSwapper 128":
|
| extoption.update({"modelname": "inswapper_128.onnx"})
|
| elif self.options.swap_modelname == "ReSwapper 128":
|
| extoption.update({"modelname": "reswapper_128.onnx"})
|
| elif self.options.swap_modelname == "ReSwapper 256":
|
| extoption.update({"modelname": "reswapper_256.onnx"})
|
|
|
| p.Initialize(extoption)
|
| newprocessors.append(p)
|
| else:
|
| print(f"Not using {module}")
|
| self.processors = newprocessors
|
|
|
|
|
|
|
| if isinstance(self.options.imagemask, dict) and self.options.imagemask.get("layers") and len(self.options.imagemask["layers"]) > 0:
|
| self.options.imagemask = self.options.imagemask.get("layers")[0]
|
|
|
| self.options.imagemask = cv2.cvtColor(self.options.imagemask, cv2.COLOR_RGBA2GRAY)
|
| if np.any(self.options.imagemask):
|
| mo = self.input_face_datas[0].faces[0].mask_offsets
|
| self.options.imagemask = self.blur_area(self.options.imagemask, mo[4], mo[5])
|
| self.options.imagemask = self.options.imagemask.astype(np.float32) / 255
|
| self.options.imagemask = cv2.cvtColor(self.options.imagemask, cv2.COLOR_GRAY2RGB)
|
| else:
|
| self.options.imagemask = None
|
|
|
| self.options.frame_processing = False
|
| for p in self.processors:
|
| if p.type.startswith("frame_"):
|
| self.options.frame_processing = True
|
|
|
|
|
|
|
|
|
|
|
|
|
| def run_batch(self, source_files, target_files, threads:int = 1):
|
| progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]'
|
| self.total_frames = len(source_files)
|
| self.num_threads = threads
|
| with tqdm(total=self.total_frames, desc='Processing', unit='frame', dynamic_ncols=True, bar_format=progress_bar_format) as progress:
|
| with ThreadPoolExecutor(max_workers=threads) as executor:
|
| futures = []
|
| queue = create_queue(source_files)
|
| queue_per_future = max(len(source_files) // threads, 1)
|
| while not queue.empty():
|
| future = executor.submit(self.process_frames, source_files, target_files, pick_queue(queue, queue_per_future), lambda: self.update_progress(progress))
|
| futures.append(future)
|
| for future in as_completed(futures):
|
| future.result()
|
|
|
|
|
| def process_frames(self, source_files: List[str], target_files: List[str], current_files, update: Callable[[], None]) -> None:
|
| for f in current_files:
|
| if not roop.globals.processing:
|
| return
|
|
|
|
|
| temp_frame = cv2.imdecode(np.fromfile(f, dtype=np.uint8), cv2.IMREAD_COLOR)
|
| if temp_frame is not None:
|
| if self.options.frame_processing:
|
| for p in self.processors:
|
| frame = p.Run(temp_frame)
|
| resimg = frame
|
| else:
|
| resimg = self.process_frame(temp_frame)
|
| if resimg is not None:
|
| i = source_files.index(f)
|
|
|
| cv2.imencode(f'.{roop.globals.CFG.output_image_format}',resimg)[1].tofile(target_files[i])
|
| if update:
|
| update()
|
|
|
|
|
|
|
| def read_frames_thread(self, cap, frame_start, frame_end, num_threads):
|
| num_frame = 0
|
| total_num = frame_end - frame_start
|
| if frame_start > 0:
|
| cap.set(cv2.CAP_PROP_POS_FRAMES,frame_start)
|
|
|
| while True and roop.globals.processing:
|
| ret, frame = cap.read()
|
| if not ret:
|
| break
|
|
|
| self.frames_queue[num_frame % num_threads].put(frame, block=True)
|
| num_frame += 1
|
| if num_frame == total_num:
|
| break
|
|
|
| for i in range(num_threads):
|
| self.frames_queue[i].put(None)
|
|
|
|
|
|
|
| def process_videoframes(self, threadindex, progress) -> None:
|
| while True:
|
| frame = self.frames_queue[threadindex].get()
|
| if frame is None:
|
| self.processing_threads -= 1
|
| self.processed_queue[threadindex].put((False, None))
|
| return
|
| else:
|
| if self.options.frame_processing:
|
| for p in self.processors:
|
| frame = p.Run(frame)
|
| resimg = frame
|
| else:
|
| resimg = self.process_frame(frame)
|
| self.processed_queue[threadindex].put((True, resimg))
|
| del frame
|
| progress()
|
|
|
|
|
| def write_frames_thread(self):
|
| nextindex = 0
|
| num_producers = self.num_threads
|
|
|
| while True:
|
| process, frame = self.processed_queue[nextindex % self.num_threads].get()
|
| nextindex += 1
|
| if frame is not None:
|
| if self.output_to_file:
|
| self.videowriter.write_frame(frame)
|
| if self.output_to_cam:
|
| self.streamwriter.WriteToStream(frame)
|
| del frame
|
| elif process == False:
|
| num_producers -= 1
|
| if num_producers < 1:
|
| return
|
|
|
|
|
|
|
| def run_batch_inmem(self, output_method, source_video, target_video, frame_start, frame_end, fps, threads:int = 1):
|
| if len(self.processors) < 1:
|
| print("No processor defined!")
|
| return
|
|
|
| cap = cv2.VideoCapture(source_video)
|
|
|
| frame_count = (frame_end - frame_start) + 1
|
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
|
|
| processed_resolution = None
|
| for p in self.processors:
|
| if hasattr(p, 'getProcessedResolution'):
|
| processed_resolution = p.getProcessedResolution(width, height)
|
| print(f"Processed resolution: {processed_resolution}")
|
| if processed_resolution is not None:
|
| width = processed_resolution[0]
|
| height = processed_resolution[1]
|
|
|
|
|
| self.total_frames = frame_count
|
| self.num_threads = threads
|
|
|
| self.processing_threads = self.num_threads
|
| self.frames_queue = []
|
| self.processed_queue = []
|
| for _ in range(threads):
|
| self.frames_queue.append(Queue(1))
|
| self.processed_queue.append(Queue(1))
|
|
|
| self.output_to_file = output_method != "Virtual Camera"
|
| self.output_to_cam = output_method == "Virtual Camera" or output_method == "Both"
|
|
|
| if self.output_to_file:
|
| self.videowriter = FFMPEG_VideoWriter(target_video, (width, height), fps, codec=roop.globals.video_encoder, crf=roop.globals.video_quality, audiofile=None)
|
| if self.output_to_cam:
|
| self.streamwriter = StreamWriter((width, height), int(fps))
|
|
|
| readthread = Thread(target=self.read_frames_thread, args=(cap, frame_start, frame_end, threads))
|
| readthread.start()
|
|
|
| writethread = Thread(target=self.write_frames_thread)
|
| writethread.start()
|
|
|
| progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]'
|
| with tqdm(total=self.total_frames, desc='Processing', unit='frames', dynamic_ncols=True, bar_format=progress_bar_format) as progress:
|
| with ThreadPoolExecutor(thread_name_prefix='swap_proc', max_workers=self.num_threads) as executor:
|
| futures = []
|
|
|
| for threadindex in range(threads):
|
| future = executor.submit(self.process_videoframes, threadindex, lambda: self.update_progress(progress))
|
| futures.append(future)
|
|
|
| for future in as_completed(futures):
|
| future.result()
|
|
|
| readthread.join()
|
| writethread.join()
|
| cap.release()
|
| if self.output_to_file:
|
| self.videowriter.close()
|
| if self.output_to_cam:
|
| self.streamwriter.Close()
|
|
|
| self.frames_queue.clear()
|
| self.processed_queue.clear()
|
|
|
|
|
|
|
|
|
| def update_progress(self, progress: Any = None) -> None:
|
| process = psutil.Process(os.getpid())
|
| memory_usage = process.memory_info().rss / 1024 / 1024 / 1024
|
| progress.set_postfix({
|
| 'memory_usage': '{:.2f}'.format(memory_usage).zfill(5) + 'GB',
|
| 'execution_threads': self.num_threads
|
| })
|
| progress.update(1)
|
| if self.progress_gradio is not None:
|
| self.progress_gradio((progress.n, self.total_frames), desc='Processing', total=self.total_frames, unit='frames')
|
|
|
|
|
|
|
| def process_frame(self, frame:Frame):
|
| if len(self.input_face_datas) < 1 and not self.options.show_face_masking:
|
| return frame
|
| temp_frame = frame.copy()
|
| num_swapped, temp_frame = self.swap_faces(frame, temp_frame)
|
| if num_swapped > 0:
|
| if roop.globals.no_face_action == eNoFaceAction.SKIP_FRAME_IF_DISSIMILAR:
|
| if len(self.input_face_datas) > num_swapped:
|
| return None
|
| self.num_frames_no_face = 0
|
| self.last_swapped_frame = temp_frame.copy()
|
| return temp_frame
|
| if roop.globals.no_face_action == eNoFaceAction.USE_LAST_SWAPPED:
|
| if self.last_swapped_frame is not None and self.num_frames_no_face < self.options.max_num_reuse_frame:
|
| self.num_frames_no_face += 1
|
| return self.last_swapped_frame.copy()
|
| return frame
|
|
|
| elif roop.globals.no_face_action == eNoFaceAction.USE_ORIGINAL_FRAME:
|
| return frame
|
| if roop.globals.no_face_action == eNoFaceAction.SKIP_FRAME:
|
|
|
|
|
|
|
|
|
| return None
|
| else:
|
| return self.retry_rotated(frame)
|
|
|
| def retry_rotated(self, frame):
|
| copyframe = frame.copy()
|
| copyframe = rotate_clockwise(copyframe)
|
| temp_frame = copyframe.copy()
|
| num_swapped, temp_frame = self.swap_faces(copyframe, temp_frame)
|
| if num_swapped > 0:
|
| return rotate_anticlockwise(temp_frame)
|
|
|
| copyframe = frame.copy()
|
| copyframe = rotate_anticlockwise(copyframe)
|
| temp_frame = copyframe.copy()
|
| num_swapped, temp_frame = self.swap_faces(copyframe, temp_frame)
|
| if num_swapped > 0:
|
| return rotate_clockwise(temp_frame)
|
| del copyframe
|
| return frame
|
|
|
|
|
|
|
| def swap_faces(self, frame, temp_frame):
|
| num_faces_found = 0
|
|
|
| if self.options.swap_mode == "first":
|
| face = get_first_face(frame)
|
|
|
| if face is None:
|
| return num_faces_found, frame
|
|
|
| num_faces_found += 1
|
| temp_frame = self.process_face(self.options.selected_index, face, temp_frame)
|
| del face
|
|
|
| else:
|
| faces = get_all_faces(frame)
|
| if faces is None:
|
| return num_faces_found, frame
|
|
|
| if self.options.swap_mode == "all":
|
| for face in faces:
|
| num_faces_found += 1
|
| temp_frame = self.process_face(self.options.selected_index, face, temp_frame)
|
|
|
| elif self.options.swap_mode == "all_input" or self.options.swap_mode == "all_random":
|
| for i,face in enumerate(faces):
|
| num_faces_found += 1
|
| if i < len(self.input_face_datas):
|
| temp_frame = self.process_face(i, face, temp_frame)
|
| else:
|
| break
|
|
|
| elif self.options.swap_mode == "selected":
|
| num_targetfaces = len(self.target_face_datas)
|
| use_index = num_targetfaces == 1
|
| for i,tf in enumerate(self.target_face_datas):
|
| for face in faces:
|
| if compute_cosine_distance(tf.embedding, face.embedding) <= self.options.face_distance_threshold:
|
| if i < len(self.input_face_datas):
|
| if use_index:
|
| temp_frame = self.process_face(self.options.selected_index, face, temp_frame)
|
| else:
|
| temp_frame = self.process_face(i, face, temp_frame)
|
| num_faces_found += 1
|
| if not roop.globals.vr_mode and num_faces_found == num_targetfaces:
|
| break
|
| elif self.options.swap_mode == "all_female" or self.options.swap_mode == "all_male":
|
| gender = 'F' if self.options.swap_mode == "all_female" else 'M'
|
| for face in faces:
|
| if face.sex == gender:
|
| num_faces_found += 1
|
| temp_frame = self.process_face(self.options.selected_index, face, temp_frame)
|
|
|
|
|
| for face in faces:
|
| del face
|
| faces.clear()
|
|
|
|
|
|
|
| if roop.globals.vr_mode and num_faces_found % 2 > 0:
|
|
|
| num_faces_found = 0
|
| return num_faces_found, frame
|
| if num_faces_found == 0:
|
| return num_faces_found, frame
|
|
|
|
|
|
|
| if self.options.imagemask is not None and self.options.imagemask.shape == frame.shape:
|
| temp_frame = self.simple_blend_with_mask(temp_frame, frame, self.options.imagemask)
|
| return num_faces_found, temp_frame
|
|
|
|
|
| def rotation_action(self, original_face:Face, frame:Frame):
|
| (height, width) = frame.shape[:2]
|
|
|
| bounding_box_width = original_face.bbox[2] - original_face.bbox[0]
|
| bounding_box_height = original_face.bbox[3] - original_face.bbox[1]
|
| horizontal_face = bounding_box_width > bounding_box_height
|
|
|
| center_x = width // 2.0
|
| start_x = original_face.bbox[0]
|
| end_x = original_face.bbox[2]
|
| bbox_center_x = start_x + (bounding_box_width // 2.0)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| forehead_x = original_face.landmark_2d_106[72][0]
|
| chin_x = original_face.landmark_2d_106[0][0]
|
|
|
| if horizontal_face:
|
| if chin_x < forehead_x:
|
|
|
| return "rotate_anticlockwise"
|
| elif forehead_x < chin_x:
|
|
|
| return "rotate_clockwise"
|
| if bbox_center_x >= center_x:
|
|
|
| return "rotate_anticlockwise"
|
| if bbox_center_x < center_x:
|
|
|
| return "rotate_clockwise"
|
|
|
| return None
|
|
|
|
|
| def auto_rotate_frame(self, original_face, frame:Frame):
|
| target_face = original_face
|
| original_frame = frame
|
|
|
| rotation_action = self.rotation_action(original_face, frame)
|
|
|
| if rotation_action == "rotate_anticlockwise":
|
|
|
| frame = rotate_anticlockwise(frame)
|
| elif rotation_action == "rotate_clockwise":
|
|
|
| frame = rotate_clockwise(frame)
|
|
|
| return target_face, frame, rotation_action
|
|
|
|
|
| def auto_unrotate_frame(self, frame:Frame, rotation_action):
|
| if rotation_action == "rotate_anticlockwise":
|
| return rotate_clockwise(frame)
|
| elif rotation_action == "rotate_clockwise":
|
| return rotate_anticlockwise(frame)
|
|
|
| return frame
|
|
|
|
|
|
|
| def process_face(self,face_index, target_face:Face, frame:Frame):
|
| from roop.face_util import align_crop
|
|
|
| enhanced_frame = None
|
| if(len(self.input_face_datas) > 0):
|
| inputface = self.input_face_datas[face_index].faces[0]
|
| else:
|
| inputface = None
|
|
|
| rotation_action = None
|
| if roop.globals.autorotate_faces:
|
|
|
| rotation_action = self.rotation_action(target_face, frame)
|
| if rotation_action is not None:
|
| (startX, startY, endX, endY) = target_face["bbox"].astype("int")
|
| width = endX - startX
|
| height = endY - startY
|
| offs = int(max(width,height) * 0.25)
|
| rotcutframe,startX, startY, endX, endY = self.cutout(frame, startX - offs, startY - offs, endX + offs, endY + offs)
|
| if rotation_action == "rotate_anticlockwise":
|
| rotcutframe = rotate_anticlockwise(rotcutframe)
|
| elif rotation_action == "rotate_clockwise":
|
| rotcutframe = rotate_clockwise(rotcutframe)
|
|
|
| rotface = get_first_face(rotcutframe)
|
| if rotface is None:
|
| rotation_action = None
|
| else:
|
| saved_frame = frame.copy()
|
| frame = rotcutframe
|
| target_face = rotface
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| """ Code ported/adapted from Facefusion which borrowed the idea from Rope:
|
| Kind of subsampling the cutout and aligned face image and faceswapping slices of it up to
|
| the desired output resolution. This works around the current resolution limitations without using enhancers.
|
| """
|
| model_output_size = self.options.swap_output_size
|
| subsample_size = max(self.options.subsample_size, model_output_size)
|
| subsample_total = subsample_size // model_output_size
|
| aligned_img, M = align_crop(frame, target_face.kps, subsample_size)
|
|
|
| fake_frame = aligned_img
|
| target_face.matrix = M
|
|
|
| for p in self.processors:
|
| if p.type == 'swap':
|
| swap_result_frames = []
|
| subsample_frames = self.implode_pixel_boost(aligned_img, model_output_size, subsample_total)
|
| for sliced_frame in subsample_frames:
|
| for _ in range(0,self.options.num_swap_steps):
|
| sliced_frame = self.prepare_crop_frame(sliced_frame)
|
| sliced_frame = p.Run(inputface, target_face, sliced_frame)
|
| sliced_frame = self.normalize_swap_frame(sliced_frame)
|
| swap_result_frames.append(sliced_frame)
|
| fake_frame = self.explode_pixel_boost(swap_result_frames, model_output_size, subsample_total, subsample_size)
|
| fake_frame = fake_frame.astype(np.uint8)
|
| scale_factor = 0.0
|
| elif p.type == 'mask':
|
| fake_frame = self.process_mask(p, aligned_img, fake_frame)
|
| else:
|
| enhanced_frame, scale_factor = p.Run(self.input_face_datas[face_index], target_face, fake_frame)
|
|
|
| upscale = 512
|
| orig_width = fake_frame.shape[1]
|
| if orig_width != upscale:
|
| fake_frame = cv2.resize(fake_frame, (upscale, upscale), cv2.INTER_CUBIC)
|
| mask_offsets = (0,0,0,0,1,20) if inputface is None else inputface.mask_offsets
|
|
|
|
|
| if enhanced_frame is None:
|
| scale_factor = int(upscale / orig_width)
|
| result = self.paste_upscale(fake_frame, fake_frame, target_face.matrix, frame, scale_factor, mask_offsets)
|
| else:
|
| result = self.paste_upscale(fake_frame, enhanced_frame, target_face.matrix, frame, scale_factor, mask_offsets)
|
|
|
|
|
| if self.options.restore_original_mouth:
|
| mouth_cutout, mouth_bb = self.create_mouth_mask(target_face, frame)
|
| result = self.apply_mouth_area(result, mouth_cutout, mouth_bb)
|
|
|
| if rotation_action is not None:
|
| fake_frame = self.auto_unrotate_frame(result, rotation_action)
|
| result = self.paste_simple(fake_frame, saved_frame, startX, startY)
|
|
|
| return result
|
|
|
|
|
|
|
|
|
| def cutout(self, frame:Frame, start_x, start_y, end_x, end_y):
|
| if start_x < 0:
|
| start_x = 0
|
| if start_y < 0:
|
| start_y = 0
|
| if end_x > frame.shape[1]:
|
| end_x = frame.shape[1]
|
| if end_y > frame.shape[0]:
|
| end_y = frame.shape[0]
|
| return frame[start_y:end_y, start_x:end_x], start_x, start_y, end_x, end_y
|
|
|
| def paste_simple(self, src:Frame, dest:Frame, start_x, start_y):
|
| end_x = start_x + src.shape[1]
|
| end_y = start_y + src.shape[0]
|
|
|
| start_x, end_x, start_y, end_y = clamp_cut_values(start_x, end_x, start_y, end_y, dest)
|
| dest[start_y:end_y, start_x:end_x] = src
|
| return dest
|
|
|
| def simple_blend_with_mask(self, image1, image2, mask):
|
|
|
| blended_image = image1.astype(np.float32) * (1.0 - mask) + image2.astype(np.float32) * mask
|
| return blended_image.astype(np.uint8)
|
|
|
|
|
| def paste_upscale(self, fake_face, upsk_face, M, target_img, scale_factor, mask_offsets):
|
| M_scale = M * scale_factor
|
| IM = cv2.invertAffineTransform(M_scale)
|
|
|
| face_matte = np.full((target_img.shape[0],target_img.shape[1]), 255, dtype=np.uint8)
|
|
|
| img_matte = np.zeros((upsk_face.shape[0],upsk_face.shape[1]), dtype=np.uint8)
|
|
|
| w = img_matte.shape[1]
|
| h = img_matte.shape[0]
|
|
|
| top = int(mask_offsets[0] * h)
|
| bottom = int(h - (mask_offsets[1] * h))
|
| left = int(mask_offsets[2] * w)
|
| right = int(w - (mask_offsets[3] * w))
|
| img_matte[top:bottom,left:right] = 255
|
|
|
|
|
| img_matte = cv2.warpAffine(img_matte, IM, (target_img.shape[1], target_img.shape[0]), flags=cv2.INTER_NEAREST, borderValue=0.0)
|
|
|
| img_matte[:1,:] = img_matte[-1:,:] = img_matte[:,:1] = img_matte[:,-1:] = 0
|
|
|
| img_matte = self.blur_area(img_matte, mask_offsets[4], mask_offsets[5])
|
|
|
| img_matte = img_matte.astype(np.float32)/255
|
| face_matte = face_matte.astype(np.float32)/255
|
| img_matte = np.minimum(face_matte, img_matte)
|
| if self.options.show_face_area_overlay:
|
|
|
| green_overlay = np.zeros_like(target_img)
|
| green_color = [0, 255, 0]
|
| for i in range(3):
|
| green_overlay[:, :, i] = np.where(img_matte > 0, green_color[i], 0)
|
| img_matte = np.reshape(img_matte, [img_matte.shape[0],img_matte.shape[1],1])
|
| paste_face = cv2.warpAffine(upsk_face, IM, (target_img.shape[1], target_img.shape[0]), borderMode=cv2.BORDER_REPLICATE)
|
| if upsk_face is not fake_face:
|
| fake_face = cv2.warpAffine(fake_face, IM, (target_img.shape[1], target_img.shape[0]), borderMode=cv2.BORDER_REPLICATE)
|
| paste_face = cv2.addWeighted(paste_face, self.options.blend_ratio, fake_face, 1.0 - self.options.blend_ratio, 0)
|
|
|
|
|
| paste_face = img_matte * paste_face
|
| paste_face = paste_face + (1-img_matte) * target_img.astype(np.float32)
|
| if self.options.show_face_area_overlay:
|
|
|
| paste_face = cv2.addWeighted(paste_face.astype(np.uint8), 1 - 0.5, green_overlay, 0.5, 0)
|
| return paste_face.astype(np.uint8)
|
|
|
|
|
| def blur_area(self, img_matte, num_erosion_iterations, blur_amount):
|
|
|
| mask_h_inds, mask_w_inds = np.where(img_matte==255)
|
|
|
| mask_h = np.max(mask_h_inds) - np.min(mask_h_inds)
|
| mask_w = np.max(mask_w_inds) - np.min(mask_w_inds)
|
| mask_size = int(np.sqrt(mask_h*mask_w))
|
|
|
|
|
| k = max(mask_size//(blur_amount // 2) , blur_amount // 2)
|
| kernel = np.ones((k,k),np.uint8)
|
| img_matte = cv2.erode(img_matte,kernel,iterations = num_erosion_iterations)
|
|
|
|
|
| k = max(mask_size//blur_amount, blur_amount//5)
|
| kernel_size = (k, k)
|
| blur_size = tuple(2*i+1 for i in kernel_size)
|
| return cv2.GaussianBlur(img_matte, blur_size, 0)
|
|
|
|
|
| def prepare_crop_frame(self, swap_frame):
|
| model_type = 'inswapper'
|
| model_mean = [0.0, 0.0, 0.0]
|
| model_standard_deviation = [1.0, 1.0, 1.0]
|
|
|
| if model_type == 'ghost':
|
| swap_frame = swap_frame[:, :, ::-1] / 127.5 - 1
|
| else:
|
| swap_frame = swap_frame[:, :, ::-1] / 255.0
|
| swap_frame = (swap_frame - model_mean) / model_standard_deviation
|
| swap_frame = swap_frame.transpose(2, 0, 1)
|
| swap_frame = np.expand_dims(swap_frame, axis = 0).astype(np.float32)
|
| return swap_frame
|
|
|
|
|
| def normalize_swap_frame(self, swap_frame):
|
| model_type = 'inswapper'
|
| swap_frame = swap_frame.transpose(1, 2, 0)
|
|
|
| if model_type == 'ghost':
|
| swap_frame = (swap_frame * 127.5 + 127.5).round()
|
| else:
|
| swap_frame = (swap_frame * 255.0).round()
|
| swap_frame = swap_frame[:, :, ::-1]
|
| return swap_frame
|
|
|
| def implode_pixel_boost(self, aligned_face_frame, model_size, pixel_boost_total : int):
|
| subsample_frame = aligned_face_frame.reshape(model_size, pixel_boost_total, model_size, pixel_boost_total, 3)
|
| subsample_frame = subsample_frame.transpose(1, 3, 0, 2, 4).reshape(pixel_boost_total ** 2, model_size, model_size, 3)
|
| return subsample_frame
|
|
|
|
|
| def explode_pixel_boost(self, subsample_frame, model_size, pixel_boost_total, pixel_boost_size):
|
| final_frame = np.stack(subsample_frame, axis = 0).reshape(pixel_boost_total, pixel_boost_total, model_size, model_size, 3)
|
| final_frame = final_frame.transpose(2, 0, 3, 1, 4).reshape(pixel_boost_size, pixel_boost_size, 3)
|
| return final_frame
|
|
|
| def process_mask(self, processor, frame:Frame, target:Frame):
|
| img_mask = processor.Run(frame, self.options.masking_text)
|
| img_mask = cv2.resize(img_mask, (target.shape[1], target.shape[0]))
|
| img_mask = np.reshape(img_mask, [img_mask.shape[0],img_mask.shape[1],1])
|
|
|
| if self.options.show_face_masking:
|
| result = (1 - img_mask) * frame.astype(np.float32)
|
| return np.uint8(result)
|
|
|
|
|
| target = target.astype(np.float32)
|
| result = (1-img_mask) * target
|
| result += img_mask * frame.astype(np.float32)
|
| return np.uint8(result)
|
|
|
|
|
|
|
|
|
| def create_mouth_mask(self, face: Face, frame: Frame):
|
| mouth_cutout = None
|
|
|
| landmarks = face.landmark_2d_106
|
| if landmarks is not None:
|
|
|
| mouth_points = landmarks[52:71].astype(np.int32)
|
|
|
|
|
| min_x, min_y = np.min(mouth_points, axis=0)
|
| max_x, max_y = np.max(mouth_points, axis=0)
|
| min_x = max(0, min_x - (15*6))
|
| min_y = max(0, min_y - 22)
|
| max_x = min(frame.shape[1], max_x + (15*6))
|
| max_y = min(frame.shape[0], max_y + (90*6))
|
|
|
|
|
| mouth_cutout = frame[min_y:max_y, min_x:max_x].copy()
|
|
|
| return mouth_cutout, (min_x, min_y, max_x, max_y)
|
|
|
|
|
|
|
| def create_feathered_mask(self, shape, feather_amount=30):
|
| mask = np.zeros(shape[:2], dtype=np.float32)
|
| center = (shape[1] // 2, shape[0] // 2)
|
| cv2.ellipse(mask, center, (shape[1] // 2 - feather_amount, shape[0] // 2 - feather_amount),
|
| 0, 0, 360, 1, -1)
|
| mask = cv2.GaussianBlur(mask, (feather_amount*2+1, feather_amount*2+1), 0)
|
| return mask / np.max(mask)
|
|
|
| def apply_mouth_area(self, frame: np.ndarray, mouth_cutout: np.ndarray, mouth_box: tuple) -> np.ndarray:
|
| min_x, min_y, max_x, max_y = mouth_box
|
| box_width = max_x - min_x
|
| box_height = max_y - min_y
|
|
|
|
|
|
|
| if mouth_cutout is None or box_width is None or box_height is None:
|
| return frame
|
| try:
|
| resized_mouth_cutout = cv2.resize(mouth_cutout, (box_width, box_height))
|
|
|
|
|
| roi = frame[min_y:max_y, min_x:max_x]
|
|
|
|
|
| if roi.shape != resized_mouth_cutout.shape:
|
| resized_mouth_cutout = cv2.resize(resized_mouth_cutout, (roi.shape[1], roi.shape[0]))
|
|
|
|
|
| color_corrected_mouth = self.apply_color_transfer(resized_mouth_cutout, roi)
|
|
|
|
|
| feather_amount = min(30, box_width // 15, box_height // 15)
|
| mask = self.create_feathered_mask(resized_mouth_cutout.shape, feather_amount)
|
|
|
|
|
| mask = mask[:,:,np.newaxis]
|
| blended = (color_corrected_mouth * mask + roi * (1 - mask)).astype(np.uint8)
|
|
|
|
|
| frame[min_y:max_y, min_x:max_x] = blended
|
| except Exception as e:
|
| print(f'Error {e}')
|
| pass
|
|
|
| return frame
|
|
|
| def apply_color_transfer(self, source, target):
|
| """
|
| Apply color transfer from target to source image
|
| """
|
| source = cv2.cvtColor(source, cv2.COLOR_BGR2LAB).astype("float32")
|
| target = cv2.cvtColor(target, cv2.COLOR_BGR2LAB).astype("float32")
|
|
|
| source_mean, source_std = cv2.meanStdDev(source)
|
| target_mean, target_std = cv2.meanStdDev(target)
|
|
|
|
|
| source_mean = source_mean.reshape(1, 1, 3)
|
| source_std = source_std.reshape(1, 1, 3)
|
| target_mean = target_mean.reshape(1, 1, 3)
|
| target_std = target_std.reshape(1, 1, 3)
|
|
|
|
|
| source = (source - source_mean) * (target_std / source_std) + target_mean
|
| return cv2.cvtColor(np.clip(source, 0, 255).astype("uint8"), cv2.COLOR_LAB2BGR)
|
|
|
|
|
|
|
| def unload_models():
|
| pass
|
|
|
|
|
| def release_resources(self):
|
| for p in self.processors:
|
| p.Release()
|
| self.processors.clear()
|
| if self.videowriter is not None:
|
| self.videowriter.close()
|
| if self.streamwriter is not None:
|
| self.streamwriter.Close()
|
|
|
|
|