| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| |
|
| | from vhap.util.log import get_logger |
| |
|
| | from typing import Literal |
| | from tqdm import tqdm |
| |
|
| | import face_alignment |
| | import numpy as np |
| | import matplotlib.path as mpltPath |
| |
|
| | from fdlite import ( |
| | FaceDetection, |
| | FaceLandmark, |
| | face_detection_to_roi, |
| | IrisLandmark, |
| | iris_roi_from_face_landmarks, |
| | ) |
| |
|
| | logger = get_logger(__name__) |
| |
|
| |
|
| | class LandmarkDetectorFA: |
| |
|
| | IMAGE_FILE_NAME = "image_0000.png" |
| | LMK_FILE_NAME = "keypoints_static_0000.json" |
| |
|
| | def __init__( |
| | self, |
| | face_detector:Literal["sfd", "blazeface"]="sfd", |
| | ): |
| | """ |
| | Creates dataset_path where all results are stored |
| | :param video_path: path to video file |
| | :param dataset_path: path to results directory |
| | """ |
| |
|
| | logger.info("Initialize FaceAlignment module...") |
| | |
| | self.fa = face_alignment.FaceAlignment( |
| | face_alignment.LandmarksType.TWO_HALF_D, |
| | face_detector=face_detector, |
| | flip_input=True, |
| | device="cuda" |
| | ) |
| |
|
| | def detect_single_image(self, img): |
| | bbox = self.fa.face_detector.detect_from_image(img) |
| |
|
| | if len(bbox) == 0: |
| | lmks = np.zeros([68, 3]) - 1 |
| |
|
| | else: |
| | if len(bbox) > 1: |
| | |
| | bbox = [bbox[np.argmax(np.array(bbox)[:, -1])]] |
| |
|
| | lmks = self.fa.get_landmarks_from_image(img, detected_faces=bbox)[0] |
| | lmks = np.concatenate([lmks, np.ones_like(lmks[:, :1])], axis=1) |
| |
|
| | if (lmks[:, :2] == -1).sum() > 0: |
| | lmks[:, 2:] = 0.0 |
| | else: |
| | lmks[:, 2:] = 1.0 |
| |
|
| | h, w = img.shape[:2] |
| | lmks[:, 0] /= w |
| | lmks[:, 1] /= h |
| | bbox[0][[0, 2]] /= w |
| | bbox[0][[1, 3]] /= h |
| | return bbox, lmks |
| |
|
| | def detect_dataset(self, dataloader): |
| | """ |
| | Annotates each frame with 68 facial landmarks |
| | :return: dict mapping frame number to landmarks numpy array and the same thing for bboxes |
| | """ |
| | landmarks = {} |
| | bboxes = {} |
| |
|
| | logger.info("Begin annotating landmarks...") |
| | for item in tqdm(dataloader): |
| | timestep_id = item["timestep_id"][0] |
| | camera_id = item["camera_id"][0] |
| | scale_factor = item["scale_factor"][0] |
| |
|
| | logger.info( |
| | f"Annotate facial landmarks for timestep: {timestep_id}, camera: {camera_id}" |
| | ) |
| | img = item["rgb"][0].numpy() |
| | |
| | bbox, lmks = self.detect_single_image(img) |
| |
|
| | if len(bbox) == 0: |
| | logger.error( |
| | f"No bbox found for frame: {timestep_id}, camera: {camera_id}. Setting landmarks to all -1." |
| | ) |
| |
|
| | if camera_id not in landmarks: |
| | landmarks[camera_id] = {} |
| | if camera_id not in bboxes: |
| | bboxes[camera_id] = {} |
| | landmarks[camera_id][timestep_id] = lmks |
| | bboxes[camera_id][timestep_id] = bbox[0] if len(bbox) > 0 else np.zeros(5) - 1 |
| | return landmarks, bboxes |
| |
|
| | def annotate_iris_landmarks(self, dataloader): |
| | """ |
| | Annotates each frame with 2 iris landmarks |
| | :return: dict mapping frame number to landmarks numpy array |
| | """ |
| |
|
| | |
| | detect_faces = FaceDetection() |
| | detect_face_landmarks = FaceLandmark() |
| | detect_iris_landmarks = IrisLandmark() |
| |
|
| | landmarks = {} |
| |
|
| | for item in tqdm(dataloader): |
| | timestep_id = item["timestep_id"][0] |
| | camera_id = item["camera_id"][0] |
| | scale_factor = item["scale_factor"][0] |
| | if timestep_id not in landmarks: |
| | landmarks[timestep_id] = {} |
| | logger.info( |
| | f"Annotate iris landmarks for timestep: {timestep_id}, camera: {camera_id}" |
| | ) |
| |
|
| | img = item["rgb"][0].numpy() |
| |
|
| | height, width = img.shape[:2] |
| | img_size = (width, height) |
| |
|
| | face_detections = detect_faces(img) |
| | if len(face_detections) != 1: |
| | logger.error("Empty iris landmarks (type 1)") |
| | landmarks[timestep_id][camera_id] = None |
| | else: |
| | for face_detection in face_detections: |
| | try: |
| | face_roi = face_detection_to_roi(face_detection, img_size) |
| | except ValueError: |
| | logger.error("Empty iris landmarks (type 2)") |
| | landmarks[timestep_id][camera_id] = None |
| | break |
| |
|
| | face_landmarks = detect_face_landmarks(img, face_roi) |
| | if len(face_landmarks) == 0: |
| | logger.error("Empty iris landmarks (type 3)") |
| | landmarks[timestep_id][camera_id] = None |
| | break |
| |
|
| | iris_rois = iris_roi_from_face_landmarks(face_landmarks, img_size) |
| |
|
| | if len(iris_rois) != 2: |
| | logger.error("Empty iris landmarks (type 4)") |
| | landmarks[timestep_id][camera_id] = None |
| | break |
| |
|
| | lmks = [] |
| | for iris_roi in iris_rois[::-1]: |
| | try: |
| | iris_landmarks = detect_iris_landmarks(img, iris_roi).iris[ |
| | 0:1 |
| | ] |
| | except np.linalg.LinAlgError: |
| | logger.error("Failed to get iris landmarks") |
| | landmarks[timestep_id][camera_id] = None |
| | break |
| |
|
| | for landmark in iris_landmarks: |
| | lmks.append([landmark.x * width, landmark.y * height, 1.0]) |
| |
|
| | lmks = np.array(lmks, dtype=np.float32) |
| |
|
| | h, w = img.shape[:2] |
| | lmks[:, 0] /= w |
| | lmks[:, 1] /= h |
| |
|
| | landmarks[timestep_id][camera_id] = lmks |
| |
|
| | return landmarks |
| |
|
| | def iris_consistency(self, lm_iris, lm_eye): |
| | """ |
| | Checks if landmarks for eye and iris are consistent |
| | :param lm_iris: |
| | :param lm_eye: |
| | :return: |
| | """ |
| | lm_iris = lm_iris[:, :2] |
| | lm_eye = lm_eye[:, :2] |
| |
|
| | polygon_eye = mpltPath.Path(lm_eye) |
| | valid = polygon_eye.contains_points(lm_iris) |
| |
|
| | return valid[0] |
| |
|
| | def annotate_landmarks(self, dataloader, add_iris=False): |
| | """ |
| | Annotates each frame with landmarks for face and iris. Assumes frames have been extracted |
| | :param add_iris: |
| | :return: |
| | """ |
| | lmks_face, bboxes_faces = self.detect_dataset(dataloader) |
| |
|
| | if add_iris: |
| | lmks_iris = self.annotate_iris_landmarks(dataloader) |
| |
|
| | |
| | for camera_id, lmk_face_camera in lmks_face.items(): |
| | for timestep_id in lmk_face_camera.keys(): |
| |
|
| | discard_iris_lmks = False |
| | bboxes_face_i = bboxes_faces[camera_id][timestep_id] |
| | if bboxes_face_i is not None: |
| | lmks_face_i = lmks_face[camera_id][timestep_id] |
| | lmks_iris_i = lmks_iris[camera_id][timestep_id] |
| | if lmks_iris_i is not None: |
| |
|
| | |
| | left_face = lmks_face_i[36:42] |
| | right_face = lmks_face_i[42:48] |
| |
|
| | right_iris = lmks_iris_i[:1] |
| | left_iris = lmks_iris_i[1:] |
| |
|
| | if not ( |
| | self.iris_consistency(left_iris, left_face) |
| | and self.iris_consistency(right_iris, right_face) |
| | ): |
| | logger.error( |
| | f"Inconsistent iris landmarks for timestep: {timestep_id}, camera: {camera_id}" |
| | ) |
| | discard_iris_lmks = True |
| | else: |
| | logger.error( |
| | f"No iris landmarks detected for timestep: {timestep_id}, camera: {camera_id}" |
| | ) |
| | discard_iris_lmks = True |
| |
|
| | else: |
| | logger.error( |
| | f"Discarding iris landmarks because no face landmark is available for timestep: {timestep_id}, camera: {camera_id}" |
| | ) |
| | discard_iris_lmks = True |
| |
|
| | if discard_iris_lmks: |
| | lmks_iris[timestep_id][camera_id] = ( |
| | np.zeros([2, 3]) - 1 |
| | ) |
| |
|
| | |
| | for camera_id, lmk_face_camera in lmks_face.items(): |
| | bounding_box = [] |
| | face_landmark_2d = [] |
| | iris_landmark_2d = [] |
| | for timestep_id in lmk_face_camera.keys(): |
| | bounding_box.append(bboxes_faces[camera_id][timestep_id][None]) |
| | face_landmark_2d.append(lmks_face[camera_id][timestep_id][None]) |
| |
|
| | if add_iris: |
| | iris_landmark_2d.append(lmks_iris[camera_id][timestep_id][None]) |
| |
|
| | lmk_dict = { |
| | "bounding_box": bounding_box, |
| | "face_landmark_2d": face_landmark_2d, |
| | } |
| | if len(iris_landmark_2d) > 0: |
| | lmk_dict["iris_landmark_2d"] = iris_landmark_2d |
| |
|
| | for k, v in lmk_dict.items(): |
| | if len(v) > 0: |
| | lmk_dict[k] = np.concatenate(v, axis=0) |
| | out_path = dataloader.dataset.get_property_path( |
| | "landmark2d/face-alignment", camera_id=camera_id |
| | ) |
| | logger.info(f"Saving landmarks to: {out_path}") |
| | if not out_path.parent.exists(): |
| | out_path.parent.mkdir(parents=True) |
| | np.savez(out_path, **lmk_dict) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | import tyro |
| | from tqdm import tqdm |
| | from torch.utils.data import DataLoader |
| | from vhap.config.base import DataConfig, import_module |
| |
|
| | cfg = tyro.cli(DataConfig) |
| | dataset = import_module(cfg._target)( |
| | cfg=cfg, |
| | img_to_tensor=False, |
| | batchify_all_views=True, |
| | ) |
| | dataset.items = dataset.items[:2] |
| |
|
| | dataloader = DataLoader(dataset, batch_size=1, shuffle=False, num_workers=4) |
| |
|
| | detector = LandmarkDetectorFA() |
| | detector.annotate_landmarks(dataloader) |
| |
|