Spaces:
Build error
Build error
| import cv2 | |
| import numpy as np | |
| import os | |
| from PIL import Image | |
| from typing import List, Tuple, Callable, Any | |
| from tqdm import tqdm | |
| import traceback | |
| from insightface.utils import face_align | |
| from scipy.spatial import distance | |
| from .masks import face_mask_static | |
| from .image_processing import normalize_and_torch, normalize_and_torch_batch, crop_face | |
| import torch | |
| import torch.nn.functional as F | |
| from torch.utils.data import DataLoader, Dataset | |
| import torchvision.transforms as transforms | |
| import kornia | |
| def add_audio_from_another_video(video_with_sound: str, | |
| video_without_sound: str, | |
| audio_name: str, | |
| fast_cpu=True, | |
| gpu=False) -> None: | |
| if not os.path.exists('./examples/audio/'): | |
| os.makedirs('./examples/audio/') | |
| fast_cmd = "-c:v libx264 -preset ultrafast -crf 18" if fast_cpu else "" | |
| gpu_cmd = "-c:v h264_nvenc" if gpu else "" | |
| os.system(f"ffmpeg -v -8 -i {video_with_sound} -vn -vcodec h264_nvenc ./examples/audio/{audio_name}.m4a") | |
| os.system(f"ffmpeg -v -8 -i {video_without_sound} -i ./examples/audio/{audio_name}.m4a {fast_cmd} {gpu_cmd}{video_without_sound[:-4]}_audio.mp4 -y") | |
| os.system(f"rm -rf ./examples/audio/{audio_name}.m4a") | |
| os.system(f"mv {video_without_sound[:-4]}_audio.mp4 {video_without_sound}") | |
| def read_video(path_to_video: str) -> Tuple[List[np.ndarray], float]: | |
| """ | |
| Read video by frames using its path | |
| """ | |
| # load video | |
| cap = cv2.VideoCapture(path_to_video) | |
| width_original, height_original = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # | |
| fps, frames = cap.get(cv2.CAP_PROP_FPS), cap.get(cv2.CAP_PROP_FRAME_COUNT) | |
| full_frames = [] | |
| i = 0 # current frame | |
| while(cap.isOpened()): | |
| if i == frames: | |
| break | |
| ret, frame = cap.read() | |
| i += 1 | |
| if ret==True: | |
| full_frames.append(frame) | |
| p = i * 100 / frames | |
| else: | |
| break | |
| cap.release() | |
| return full_frames, fps | |
| def get_target(full_frames: List[np.ndarray], | |
| app: Callable, | |
| crop_size: int): | |
| i = 0 | |
| target = None | |
| while target is None: | |
| if i < len(full_frames): | |
| try: | |
| target = [crop_face(full_frames[i], app, crop_size)[0]] | |
| except TypeError: | |
| i += 1 | |
| else: | |
| print("Video doesn't contain face!") | |
| break | |
| return target | |
| def smooth_landmarks(kps_arr, n = 2): | |
| kps_arr_smooth_final = [] | |
| for ka in kps_arr: | |
| kps_arr_s = [[ka[0]]] | |
| for i in range(1, len(ka)): | |
| if (len(ka[i])==0) or (len(ka[i-1])==0): | |
| kps_arr_s.append([ka[i]]) | |
| elif (distance.euclidean(ka[i][0], ka[i-1][0]) > 5) or (distance.euclidean(ka[i][2], ka[i-1][2]) > 5): | |
| kps_arr_s.append([ka[i]]) | |
| else: | |
| kps_arr_s[-1].append(ka[i]) | |
| kps_arr_smooth = [] | |
| for a in kps_arr_s: | |
| a_smooth = [] | |
| for i in range(len(a)): | |
| q = min(i-0, len(a)-i-1, n) | |
| a_smooth.append(np.mean( np.array(a[i-q:i+1+q]), axis=0 ) ) | |
| kps_arr_smooth += a_smooth | |
| kps_arr_smooth_final.append(kps_arr_smooth) | |
| return kps_arr_smooth_final | |
| def crop_frames_and_get_transforms(full_frames: List[np.ndarray], | |
| target_embeds: List, | |
| app: Callable, | |
| netArc: Callable, | |
| crop_size: int, | |
| set_target: bool, | |
| similarity_th: float) -> Tuple[List[Any], List[Any]]: | |
| """ | |
| Crop faces from frames and get respective tranforms | |
| """ | |
| crop_frames = [ [] for _ in range(target_embeds.shape[0]) ] | |
| tfm_array = [ [] for _ in range(target_embeds.shape[0]) ] | |
| kps_array = [ [] for _ in range(target_embeds.shape[0]) ] | |
| target_embeds = F.normalize(target_embeds) | |
| for frame in tqdm(full_frames): | |
| try: | |
| kps = app.get(frame, crop_size) | |
| if len(kps) > 1 or set_target: | |
| faces = [] | |
| for p in kps: | |
| M, _ = face_align.estimate_norm(p, crop_size, mode ='None') | |
| align_img = cv2.warpAffine(frame, M, (crop_size, crop_size), borderValue=0.0) | |
| faces.append(align_img) | |
| face_norm = normalize_and_torch_batch(np.array(faces)) | |
| face_norm = F.interpolate(face_norm, scale_factor=0.5, mode='bilinear', align_corners=True) | |
| face_embeds = netArc(face_norm) | |
| face_embeds = F.normalize(face_embeds) | |
| similarity = face_embeds@target_embeds.T | |
| best_idxs = similarity.argmax(0).detach().cpu().numpy() | |
| for idx, best_idx in enumerate(best_idxs): | |
| if similarity[best_idx][idx] > similarity_th: | |
| kps_array[idx].append(kps[best_idx]) | |
| else: | |
| kps_array[idx].append([]) | |
| else: | |
| kps_array[0].append(kps[0]) | |
| except TypeError: | |
| for q in range (len(target_embeds)): | |
| kps_array[0].append([]) | |
| smooth_kps = smooth_landmarks(kps_array, n = 2) | |
| for i, frame in tqdm(enumerate(full_frames)): | |
| for q in range (len(target_embeds)): | |
| try: | |
| M, _ = face_align.estimate_norm(smooth_kps[q][i], crop_size, mode ='None') | |
| align_img = cv2.warpAffine(frame, M, (crop_size, crop_size), borderValue=0.0) | |
| crop_frames[q].append(align_img) | |
| tfm_array[q].append(M) | |
| except: | |
| crop_frames[q].append([]) | |
| tfm_array[q].append([]) | |
| torch.cuda.empty_cache() | |
| return crop_frames, tfm_array | |
| def resize_frames(crop_frames: List[np.ndarray], new_size=(256, 256)) -> Tuple[List[np.ndarray], np.ndarray]: | |
| """ | |
| Resize frames to new size | |
| """ | |
| resized_frs = [] | |
| present = np.ones(len(crop_frames)) | |
| for i, crop_fr in tqdm(enumerate(crop_frames)): | |
| try: | |
| resized_frs.append(cv2.resize(crop_fr, new_size)) | |
| except: | |
| present[i] = 0 | |
| return resized_frs, present | |
| def get_final_video(final_frames: List[np.ndarray], | |
| crop_frames: List[np.ndarray], | |
| full_frames: List[np.ndarray], | |
| tfm_array: List[np.ndarray], | |
| OUT_VIDEO_NAME: str, | |
| fps: float, | |
| handler) -> None: | |
| """ | |
| Create final video from frames | |
| """ | |
| out = cv2.VideoWriter(f"{OUT_VIDEO_NAME}", cv2.VideoWriter_fourcc(*'mp4v'), fps, (full_frames[0].shape[1], full_frames[0].shape[0])) | |
| size = (full_frames[0].shape[0], full_frames[0].shape[1]) | |
| params = [None for i in range(len(crop_frames))] | |
| result_frames = full_frames.copy() | |
| for i in tqdm(range(len(full_frames))): | |
| if i == len(full_frames): | |
| break | |
| for j in range(len(crop_frames)): | |
| try: | |
| swap = cv2.resize(final_frames[j][i], (224, 224)) | |
| if len(crop_frames[j][i]) == 0: | |
| params[j] = None | |
| continue | |
| landmarks = handler.get_without_detection_without_transform(swap) | |
| if params[j] == None: | |
| landmarks_tgt = handler.get_without_detection_without_transform(crop_frames[j][i]) | |
| mask, params[j] = face_mask_static(swap, landmarks, landmarks_tgt, params[j]) | |
| else: | |
| mask = face_mask_static(swap, landmarks, landmarks_tgt, params[j]) | |
| swap = torch.from_numpy(swap).cuda().permute(2,0,1).unsqueeze(0).type(torch.float32) | |
| mask = torch.from_numpy(mask).cuda().unsqueeze(0).unsqueeze(0).type(torch.float32) | |
| full_frame = torch.from_numpy(result_frames[i]).cuda().permute(2,0,1).unsqueeze(0) | |
| mat = torch.from_numpy(tfm_array[j][i]).cuda().unsqueeze(0).type(torch.float32) | |
| mat_rev = kornia.invert_affine_transform(mat) | |
| swap_t = kornia.warp_affine(swap, mat_rev, size) | |
| mask_t = kornia.warp_affine(mask, mat_rev, size) | |
| final = (mask_t*swap_t + (1-mask_t)*full_frame).type(torch.uint8).squeeze().permute(1,2,0).cpu().detach().numpy() | |
| result_frames[i] = final | |
| torch.cuda.empty_cache() | |
| except Exception as e: | |
| pass | |
| out.write(result_frames[i]) | |
| out.release() | |
| class Frames(Dataset): | |
| def __init__(self, frames_list): | |
| self.frames_list = frames_list | |
| self.transforms = transforms.Compose([ | |
| transforms.ToTensor() | |
| ]) | |
| def __getitem__(self, idx): | |
| frame = Image.fromarray(self.frames_list[idx][:,:,::-1]) | |
| return self.transforms(frame) | |
| def __len__(self): | |
| return len(self.frames_list) | |
| def face_enhancement(final_frames: List[np.ndarray], model) -> List[np.ndarray]: | |
| enhanced_frames_all = [] | |
| for i in range(len(final_frames)): | |
| enhanced_frames = final_frames[i].copy() | |
| face_idx = [i for i, x in enumerate(final_frames[i]) if not isinstance(x, list)] | |
| face_frames = [x for i, x in enumerate(final_frames[i]) if not isinstance(x, list)] | |
| ff_i = 0 | |
| dataset = Frames(face_frames) | |
| dataloader = DataLoader(dataset, batch_size=20, shuffle=False, num_workers=1, drop_last=False) | |
| for iteration, data in tqdm(enumerate(dataloader)): | |
| frames = data | |
| data = {'image': frames, 'label': frames} | |
| generated = model(data, mode='inference2') | |
| generated = torch.clamp(generated*255, 0, 255) | |
| generated = (generated).type(torch.uint8).permute(0,2,3,1).cpu().detach().numpy() | |
| for generated_frame in generated: | |
| enhanced_frames[face_idx[ff_i]] = generated_frame[:,:,::-1] | |
| ff_i+=1 | |
| enhanced_frames_all.append(enhanced_frames) | |
| return enhanced_frames_all | |