| | """ |
| | # Copyright 2020 Adobe |
| | # All Rights Reserved. |
| | |
| | # NOTICE: Adobe permits you to use, modify, and distribute this file in |
| | # accordance with the terms of the Adobe license agreement accompanying |
| | # it. |
| | |
| | """ |
| |
|
| | import numpy as np |
| | import os |
| | import ffmpeg |
| | import cv2 |
| | import face_alignment |
| | from src.dataset.utils import icp |
| |
|
| |
|
| | class Point: |
| | def __init__(self, x, y): |
| | self.x = x |
| | self.y = y |
| |
|
| |
|
| | class ShapeParts: |
| | def __init__(self, np_pts): |
| | self.data = np_pts |
| |
|
| | def part(self, idx): |
| | return Point(self.data[idx, 0], self.data[idx, 1]) |
| |
|
| |
|
| | class Av2Flau_Convertor(): |
| | """ |
| | |
| | Any video to facial landmark and audio numpy data converter. |
| | |
| | """ |
| |
|
| | def __init__(self, video_dir, out_dir, idx=0): |
| |
|
| | self.video_dir = video_dir |
| | if ('\\' in video_dir): |
| | self.video_name = video_dir.split('\\')[-1] |
| | else: |
| | self.video_name = video_dir.split('/')[-1] |
| | self.out_dir = out_dir |
| | self.idx = idx |
| | self.input_format = self.video_dir[-4:] |
| |
|
| | |
| | self.predictor = face_alignment.FaceAlignment(face_alignment.LandmarksType._2D, device='cuda', flip_input=True) |
| |
|
| | |
| | self.t_shape_idx = (27, 28, 29, 30, 33, 36, 39, 42, 45) |
| |
|
| | def convert(self, max_num_frames=250, save_audio=False, show=False, register=False): |
| |
|
| | |
| | ret, wfn = self.__preclean_video__() |
| | if (not ret): |
| | return |
| |
|
| | |
| | wfn = self.video_dir.replace(self.input_format, '_preclean.mp4') |
| | ret, fl2d, fl3d = self.__video_facial_landmark_detection__(video_dir=wfn, display=False, max_num_frames=max_num_frames) |
| | if (not ret): |
| | return |
| | if (len(fl3d) < 9): |
| | print('The length of the landmark is too short, skip') |
| | return |
| |
|
| | |
| | fl3d = np.array(fl3d) |
| | np.savetxt(os.path.join(self.out_dir, 'raw_fl3d/fan_{:05d}_{}_3d.txt'.format(self.idx, self.video_name[:-4])), |
| | fl3d, fmt='%.2f') |
| | if (save_audio): |
| | self.__save_audio__(video_dir=self.video_dir.replace(self.input_format, '_preclean.mp4'), fl3d=fl3d) |
| |
|
| | |
| | if (show): |
| | sf, ef = (fl3d[0][0], fl3d[-1][0]) if fl3d.shape[0] > 0 else (0, 0) |
| | print(sf, ef) |
| | print(self.video_dir.replace(self.input_format, '_fl_detect.mp4'), |
| | os.path.join(self.out_dir, 'tmp_v', '{:05d}_{}_fl_av.mp4'.format( |
| | self.idx, self.video_name[:-4])) |
| | ) |
| | self.__ffmpeg_merge_av__( |
| | video_dir=self.video_dir.replace(self.input_format, '_fl_detect.mp4'), |
| | audio_dir=self.video_dir.replace(self.input_format, '_preclean.mp4'), |
| | WriteFileName=os.path.join(self.out_dir, 'tmp_v', '{:05d}_{}_fl_av.mp4'.format( |
| | self.idx, self.video_name[:-4])), |
| | start_end_frame=(int(sf), int(ef))) |
| |
|
| | |
| | os.remove(self.video_dir.replace(self.input_format, '_preclean.mp4')) |
| | if(os.path.isfile(self.video_dir.replace(self.input_format, '_fl_detect.mp4'))): |
| | os.remove(self.video_dir.replace(self.input_format, '_fl_detect.mp4')) |
| |
|
| | |
| | if (register): |
| | self.__single_landmark_3d_register__(fl3d) |
| | |
| |
|
| | ''' ======================================================================== |
| | |
| | STEP 1: Preclean video |
| | |
| | ======================================================================== ''' |
| |
|
| | def __preclean_video__(self, WriteFileName='_preclean.mp4', fps=25, sample_rate=16000): |
| | ''' |
| | Pre-clean downloaded videos. Return false if more than 2 streams found. |
| | Then convert it to fps=25, sample_rate=16kHz |
| | ''' |
| | input_video_dir = self.video_dir if '_x_' not in self.video_dir else self.video_dir.replace('_x_', '/') |
| |
|
| | probe = ffmpeg.probe(input_video_dir) |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | codec = {'video': '', 'audio': ''} |
| | for i, stream in enumerate(probe['streams'][0:2]): |
| | codec[stream['codec_type']] = stream['codec_name'] |
| |
|
| | |
| | ( |
| | ffmpeg |
| | .input(input_video_dir) |
| | .output(self.video_dir.replace(self.input_format, WriteFileName), |
| | |
| | |
| | r=fps, ar=sample_rate) |
| | .overwrite_output().global_args('-loglevel', 'quiet') |
| | .run() |
| | ) |
| |
|
| | return True, self.video_dir.replace(self.input_format, WriteFileName) |
| |
|
| | ''' ======================================================================== |
| | |
| | STEP 2: Detect facial landmark |
| | |
| | ======================================================================== ''' |
| |
|
| | def __video_facial_landmark_detection__(self, video_dir=None, display=False, WriteFileName='_fl_detect.mp4', |
| | max_num_frames=250, write=False): |
| | ''' |
| | Get facial landmark from video. |
| | ''' |
| |
|
| | |
| | print('video_dir : ' + video_dir) |
| | video = cv2.VideoCapture(video_dir) |
| |
|
| | |
| | if (video.isOpened() == False): |
| | print('Unable to open video file') |
| | return False, None |
| |
|
| | |
| | length = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) |
| | fps = video.get(cv2.CAP_PROP_FPS) |
| | w = int(video.get(cv2.CAP_PROP_FRAME_WIDTH)) |
| | h = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
| | print('Process Video {}, len: {}, FPS: {:.2f}, W X H: {} x {}'.format(video_dir, length, fps, w, h)) |
| |
|
| | if(write): |
| | writer = cv2.VideoWriter(self.video_dir.replace(self.input_format, WriteFileName), |
| | cv2.VideoWriter_fourcc('M', 'J', 'P', 'G'), fps, (w, h)) |
| |
|
| | video_facial_landmark = [] |
| | video_facial_landmark_3d = [] |
| | frame_id = 0 |
| | not_detected_frames = 0 |
| |
|
| | while (video.isOpened()): |
| | ret, frame = video.read() |
| | |
| | if (ret == False): |
| | break |
| |
|
| | |
| | if (not_detected_frames > 5): |
| | if (len(video_facial_landmark) < 10): |
| | |
| | video_facial_landmark = [] |
| | video_facial_landmark_3d = [] |
| | else: |
| | break |
| |
|
| | |
| | img_ret, shape, shape_3d = self.__image_facial_landmark_detection__(img=frame) |
| |
|
| | |
| | if (img_ret): |
| | |
| |
|
| | |
| | xys = [] |
| | for part_i in range(68): |
| | xys.append(shape.part(part_i).x) |
| | xys.append(shape.part(part_i).y) |
| |
|
| | |
| | if (not_detected_frames > 0 and len(video_facial_landmark) > 0): |
| | |
| | def interp(last, cur, num, dims=68 * 2 + 1): |
| | interp_xys_np = np.zeros((num, dims)) |
| | for dim in range(dims): |
| | interp_xys_np[:, dim] = np.interp(np.arange(0, num), [-1, num], [last[dim], cur[dim]]) |
| | interp_xys_np = np.round(interp_xys_np).astype('int') |
| | interp_xys = [list(xy) for xy in interp_xys_np] |
| | return interp_xys |
| |
|
| | interp_xys = interp(video_facial_landmark[-1], [frame_id] + xys, not_detected_frames) |
| | video_facial_landmark += interp_xys |
| |
|
| | not_detected_frames = 0 |
| |
|
| | |
| | video_facial_landmark.append([frame_id] + xys) |
| | if (shape_3d.any()): |
| | video_facial_landmark_3d.append([frame_id] + list(np.reshape(shape_3d, -1))) |
| |
|
| | if(write): |
| | frame = self.__vis_landmark_on_img__(frame, shape) |
| |
|
| | else: |
| | print('\t ==> frame {}/{} Not detected'.format(frame_id, length)) |
| | not_detected_frames += 1 |
| |
|
| | if (display): |
| | cv2.imshow('Frame', frame) |
| | if (cv2.waitKey(10) == ord('q')): |
| | break |
| |
|
| | if(write): |
| | writer.write(frame) |
| | frame_id += 1 |
| |
|
| | if(frame_id > max_num_frames): |
| | break |
| |
|
| | video.release() |
| | if(write): |
| | writer.release() |
| | cv2.destroyAllWindows() |
| |
|
| | print('\t ==> Final processed frames {}/{}'.format(frame_id, length)) |
| |
|
| | return True, video_facial_landmark, video_facial_landmark_3d |
| |
|
| | def __image_facial_landmark_detection__(self, img=None): |
| | ''' |
| | Get facial landmark from single image by FANet |
| | ''' |
| |
|
| | shapes = self.predictor.get_landmarks(img) |
| | if (not shapes): |
| | return False, None, None |
| |
|
| | max_size_idx = 0 |
| | shape = ShapeParts(shapes[max_size_idx][:, 0:2]) |
| | shape_3d = shapes[max_size_idx] |
| |
|
| | |
| | shape_3d = np.concatenate([shape_3d, np.ones(shape=(68, 1))], axis=1) |
| |
|
| | return True, shape, shape_3d |
| |
|
| | def __vis_landmark_on_img__(self, img, shape, linewidth=2): |
| | ''' |
| | Visualize landmark on images. |
| | ''' |
| | if (type(shape) == ShapeParts): |
| | def draw_curve(idx_list, color=(0, 255, 0), loop=False, lineWidth=linewidth): |
| | for i in idx_list: |
| | cv2.line(img, (shape.part(i).x, shape.part(i).y), (shape.part(i + 1).x, shape.part(i + 1).y), |
| | color, lineWidth) |
| | if (loop): |
| | cv2.line(img, (shape.part(idx_list[0]).x, shape.part(idx_list[0]).y), |
| | (shape.part(idx_list[-1] + 1).x, shape.part(idx_list[-1] + 1).y), color, lineWidth) |
| |
|
| | draw_curve(list(range(0, 16))) |
| | draw_curve(list(range(17, 21))) |
| | draw_curve(list(range(22, 26))) |
| | draw_curve(list(range(27, 35))) |
| | draw_curve(list(range(36, 41)), loop=True) |
| | draw_curve(list(range(42, 47)), loop=True) |
| | draw_curve(list(range(48, 59)), loop=True) |
| | draw_curve(list(range(60, 67)), loop=True) |
| |
|
| | else: |
| | def draw_curve(idx_list, color=(0, 255, 0), loop=False, lineWidth=linewidth): |
| | for i in idx_list: |
| | cv2.line(img, (shape[i, 0], shape[i, 1]), (shape[i + 1, 0], shape[i + 1, 1]), color, lineWidth) |
| | if (loop): |
| | cv2.line(img, (shape[idx_list[0], 0], shape[idx_list[0], 1]), |
| | (shape[idx_list[-1] + 1, 0], shape[idx_list[-1] + 1, 1]), color, lineWidth) |
| |
|
| | draw_curve(list(range(0, 16))) |
| | draw_curve(list(range(17, 21))) |
| | draw_curve(list(range(22, 26))) |
| | draw_curve(list(range(27, 35))) |
| | draw_curve(list(range(36, 41)), loop=True) |
| | draw_curve(list(range(42, 47)), loop=True) |
| | draw_curve(list(range(48, 59)), loop=True) |
| | draw_curve(list(range(60, 67)), loop=True) |
| |
|
| | return img |
| |
|
| | def __ffmpeg_merge_av__(self, video_dir, audio_dir, WriteFileName, start_end_frame): |
| | probe = ffmpeg.probe(video_dir) |
| | fps = probe['streams'][0]['avg_frame_rate'] |
| | spf = float(fps.split('/')[1]) / float(fps.split('/')[0]) |
| | sf, ef = start_end_frame |
| | st, tt = sf * spf, ef * spf - sf * spf |
| |
|
| | vin = ffmpeg.input(video_dir).video |
| | |
| | |
| | out = ffmpeg.output(vin, WriteFileName, codec='copy', ss=st, t=tt, shortest=None) |
| | out = out.overwrite_output().global_args('-loglevel', 'quiet') |
| | out.run() |
| |
|
| | |
| |
|
| | def __save_audio__(self, video_dir, fl3d): |
| | """ |
| | Extract audio from preclean video. Used for creating audio-aware dataset. |
| | |
| | """ |
| | sf, ef = fl3d[0][0], fl3d[-1][0] |
| |
|
| | probe = ffmpeg.probe(video_dir) |
| | fps = probe['streams'][0]['avg_frame_rate'] |
| | spf = float(fps.split('/')[1]) / float(fps.split('/')[0]) |
| | st, tt = sf * spf, ef * spf - sf * spf |
| |
|
| | audio_dir = os.path.join(self.out_dir, 'raw_wav', '{:05d}_{}_audio.wav'.format(self.idx, self.video_name[:-4])) |
| | ( |
| | ffmpeg |
| | .input(video_dir) |
| | .output(audio_dir, ss=st, t=tt) |
| | .overwrite_output().global_args('-loglevel', 'quiet') |
| | .run() |
| | ) |
| |
|
| | ''' ======================================================================== |
| | |
| | STEP 5: Landmark register |
| | |
| | ======================================================================== ''' |
| |
|
| | def __single_landmark_3d_register__(self, fl3d, display=False): |
| | """ |
| | Register a single 3d landmark file |
| | |
| | """ |
| | |
| | from scipy.signal import savgol_filter |
| | lines = savgol_filter(fl3d, 7, 3, axis=0) |
| |
|
| | all_landmarks = lines[:, 1:].reshape((-1, 68, 3)) |
| | w, h = int(np.max(all_landmarks[:, :, 0])) + 20, int(np.max(all_landmarks[:, :, 1])) + 20 |
| |
|
| | |
| | print('Using exisiting ' + 'dataset/utils/ANCHOR_T_SHAPE_{}.txt'.format(len(self.t_shape_idx))) |
| | anchor_t_shape = np.loadtxt('dataset/utils/ANCHOR_T_SHAPE_{}.txt'.format(len(self.t_shape_idx))) |
| |
|
| | registered_landmarks_to_save = [] |
| | registered_affine_mat_to_save = [] |
| | |
| | for line in lines: |
| | frame_id = line[0] |
| | landmarks = line[1:].reshape(68, 3) |
| |
|
| | |
| | frame_t_shape = landmarks[self.t_shape_idx, :] |
| |
|
| | T, distance, itr = icp(frame_t_shape, anchor_t_shape) |
| |
|
| | |
| | landmarks = np.hstack((landmarks, np.ones((68, 1)))) |
| | registered_landmarks = np.dot(T, landmarks.T).T |
| | err = np.mean(np.sqrt(np.sum((registered_landmarks[self.t_shape_idx, 0:3] - anchor_t_shape) ** 2, axis=1))) |
| | |
| |
|
| | |
| | registered_landmarks_to_save.append([frame_id] + list(registered_landmarks[:, 0:3].reshape(-1))) |
| | registered_affine_mat_to_save.append([frame_id] + list(T.reshape(-1))) |
| |
|
| | |
| | if (display): |
| | img = np.zeros((h, w * 2, 3), np.uint8) |
| | self.__vis_landmark_on_img__(img, landmarks.astype(np.int)) |
| | registered_landmarks[:, 0] += w |
| | self.__vis_landmark_on_img__(img, registered_landmarks.astype(np.int)) |
| | cv2.imshow('img', img) |
| | if (cv2.waitKey(30) == ord('q')): |
| | break |
| |
|
| | np.savetxt(os.path.join(self.out_dir, 'register_fl3d', '{:05d}_{}_fl_sm.txt' |
| | .format(self.idx, self.video_name[:-4])), |
| | lines, fmt='%.6f') |
| | np.savetxt(os.path.join(self.out_dir, 'register_fl3d', '{:05d}_{}_fl_reg.txt' |
| | .format(self.idx, self.video_name[:-4])), |
| | np.array(registered_landmarks_to_save), fmt='%.6f') |
| | np.savetxt(os.path.join(self.out_dir, 'register_fl3d', '{:05d}_{}_mat_reg.txt' |
| | .format(self.idx, self.video_name[:-4])), |
| | np.array(registered_affine_mat_to_save), fmt='%.6f') |
| |
|
| |
|
| | if __name__ == '__main__': |
| | video_dir = r'C:\Users\yangzhou\Videos\004_1.mp4' |
| | out_dir = r'C:\Users\yangzhou\Videos' |
| | c = Av2Flau_Convertor(video_dir, out_dir, idx=0) |
| | c.convert() |
| |
|
| |
|