| | import face_detection |
| | import numpy as np |
| | import cv2 |
| | from tqdm import tqdm |
| | import torch |
| | import glob |
| | import os |
| | from natsort import natsorted |
| |
|
| | device = "cuda" if torch.cuda.is_available() else "cpu" |
| |
|
| |
|
| | def get_squre_coords(coords, image, size=None, last_size=None): |
| | y1, y2, x1, x2 = coords |
| | w, h = x2 - x1, y2 - y1 |
| | center = (x1 + w // 2, y1 + h // 2) |
| | if size is None: |
| | size = (w + h) // 2 |
| | if last_size is not None: |
| | size = (w + h) // 2 |
| | size = (size - last_size) // 5 + last_size |
| | x1, y1 = center[0] - size // 2, center[1] - size // 2 |
| | x2, y2 = x1 + size, y1 + size |
| | return size, [y1, y2, x1, x2] |
| |
|
| |
|
| | def get_smoothened_boxes(boxes, T): |
| | for i in range(len(boxes)): |
| | if i + T > len(boxes): |
| | window = boxes[len(boxes) - T :] |
| | else: |
| | window = boxes[i : i + T] |
| | boxes[i] = np.mean(window, axis=0) |
| | return boxes |
| |
|
| |
|
| | def face_detect(images, pads): |
| | detector = face_detection.FaceAlignment(face_detection.LandmarksType._2D, flip_input=False, device=device) |
| |
|
| | batch_size = 32 if device == "cuda" else 4 |
| | print("face detect batch size:", batch_size) |
| | while 1: |
| | predictions = [] |
| | try: |
| | for i in tqdm(range(0, len(images), batch_size)): |
| | predictions.extend(detector.get_detections_for_batch(np.array(images[i : i + batch_size]))) |
| | except RuntimeError: |
| | if batch_size == 1: |
| | raise RuntimeError("Image too big to run face detection on GPU. Please use the --resize_factor argument") |
| | batch_size //= 2 |
| | print("Recovering from OOM error; New batch size: {}".format(batch_size)) |
| | continue |
| | break |
| |
|
| | results = [] |
| | pady1, pady2, padx1, padx2 = pads |
| | for rect, image in zip(predictions, images): |
| | if rect is None: |
| | cv2.imwrite(".temp/faulty_frame.jpg", image) |
| | raise ValueError("Face not detected! Ensure the video contains a face in all the frames.") |
| |
|
| | y1 = max(0, rect[1] - pady1) |
| | y2 = min(image.shape[0], rect[3] + pady2) |
| | x1 = max(0, rect[0] - padx1) |
| | x2 = min(image.shape[1], rect[2] + padx2) |
| | |
| | y_gap, x_gap = (y2 - y1) // 2, (x2 - x1) // 2 |
| | coords_ = [y1 - y_gap, y2 + y_gap, x1 - x_gap, x2 + x_gap] |
| |
|
| | _, coords = get_squre_coords(coords_, image) |
| |
|
| | y1, y2, x1, x2 = coords |
| | y1 = max(0, y1) |
| | y2 = min(image.shape[0], y2) |
| | x1 = max(0, x1) |
| | x2 = min(image.shape[1], x2) |
| |
|
| | results.append([x1, y1, x2, y2]) |
| |
|
| | print("Number of frames cropped: {}".format(len(results))) |
| | print("First coords: {}".format(results[0])) |
| | boxes = np.array(results) |
| | boxes = get_smoothened_boxes(boxes, T=25) |
| | |
| |
|
| | del detector |
| | return boxes |
| |
|
| |
|
| | def add_black(imgs): |
| | for i in range(len(imgs)): |
| | imgs[i] = cv2.vconcat([np.zeros((100, imgs[i].shape[1], 3), dtype=np.uint8), imgs[i], np.zeros((20, imgs[i].shape[1], 3), dtype=np.uint8)]) |
| |
|
| | return imgs |
| |
|
| |
|
| | def preprocess(video_dir="./assets/videos", save_dir="./assets/coords"): |
| | all_videos = natsorted(glob.glob(os.path.join(video_dir, "*.mp4"))) |
| | for video_path in all_videos: |
| | video_stream = cv2.VideoCapture(video_path) |
| |
|
| | |
| | full_frames = [] |
| | while 1: |
| | still_reading, frame = video_stream.read() |
| | if not still_reading: |
| | video_stream.release() |
| | break |
| | full_frames.append(frame) |
| | print("Number of frames available for inference: " + str(len(full_frames))) |
| | full_frames = add_black(full_frames) |
| | |
| | coords = face_detect(full_frames, pads=(0, 0, 0, 0)) |
| | np.savez_compressed(os.path.join(save_dir, os.path.basename(video_path).split(".")[0]), coords=coords) |
| |
|
| |
|
| | def load_from_npz(video_name, save_dir="./assets/coords"): |
| | npz = np.load(os.path.join(save_dir, video_name + ".npz")) |
| | return npz["coords"] |
| |
|
| |
|
| | if __name__ == "__main__": |
| | preprocess() |
| |
|