| import math |
| from time import perf_counter |
| from typing import List, Optional, Tuple, Union |
|
|
| import cv2 |
| import mediapipe as mp |
| import numpy as np |
| import onnxruntime |
| import torch |
| import torch.nn as nn |
| import torchvision |
| from mediapipe.tasks.python.components.containers.bounding_box import BoundingBox |
| from mediapipe.tasks.python.components.containers.category import Category |
| from mediapipe.tasks.python.components.containers.detections import Detection |
| from torchvision import transforms |
|
|
| from inference.core.entities.requests.gaze import GazeDetectionInferenceRequest |
| from inference.core.entities.responses.gaze import ( |
| GazeDetectionInferenceResponse, |
| GazeDetectionPrediction, |
| ) |
| from inference.core.entities.responses.inference import FaceDetectionPrediction, Point |
| from inference.core.env import ( |
| GAZE_MAX_BATCH_SIZE, |
| MODEL_CACHE_DIR, |
| REQUIRED_ONNX_PROVIDERS, |
| TENSORRT_CACHE_PATH, |
| ) |
| from inference.core.exceptions import OnnxProviderNotAvailable |
| from inference.core.models.roboflow import OnnxRoboflowCoreModel |
| from inference.core.utils.image_utils import load_image_rgb |
| from inference.models.gaze.l2cs import L2CS |
|
|
|
|
| class Gaze(OnnxRoboflowCoreModel): |
| """Roboflow ONNX Gaze model. |
| |
| This class is responsible for handling the ONNX Gaze model, including |
| loading the model, preprocessing the input, and performing inference. |
| |
| Attributes: |
| gaze_onnx_session (onnxruntime.InferenceSession): ONNX Runtime session for gaze detection inference. |
| """ |
|
|
| def __init__(self, *args, **kwargs): |
| """Initializes the Gaze with the given arguments and keyword arguments.""" |
|
|
| t1 = perf_counter() |
| super().__init__(*args, **kwargs) |
| |
| self.log("Creating inference sessions") |
|
|
| |
|
|
| self.gaze_onnx_session = onnxruntime.InferenceSession( |
| self.cache_file("L2CSNet_gaze360_resnet50_90bins.onnx"), |
| providers=[ |
| ( |
| "TensorrtExecutionProvider", |
| { |
| "trt_engine_cache_enable": True, |
| "trt_engine_cache_path": TENSORRT_CACHE_PATH, |
| }, |
| ), |
| "CUDAExecutionProvider", |
| "CPUExecutionProvider", |
| ], |
| ) |
|
|
| if REQUIRED_ONNX_PROVIDERS: |
| available_providers = onnxruntime.get_available_providers() |
| for provider in REQUIRED_ONNX_PROVIDERS: |
| if provider not in available_providers: |
| raise OnnxProviderNotAvailable( |
| f"Required ONNX Execution Provider {provider} is not availble. Check that you are using the correct docker image on a supported device." |
| ) |
|
|
| |
| self.face_detector = mp.tasks.vision.FaceDetector.create_from_options( |
| mp.tasks.vision.FaceDetectorOptions( |
| base_options=mp.tasks.BaseOptions( |
| model_asset_path=self.cache_file("mediapipe_face_detector.tflite") |
| ), |
| running_mode=mp.tasks.vision.RunningMode.IMAGE, |
| ) |
| ) |
|
|
| |
| self._gaze_transformations = transforms.Compose( |
| [ |
| transforms.ToTensor(), |
| transforms.Resize(448), |
| transforms.Normalize( |
| mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225] |
| ), |
| ] |
| ) |
| self.task_type = "gaze-detection" |
| self.log(f"GAZE model loaded in {perf_counter() - t1:.2f} seconds") |
|
|
| def _crop_face_img(self, np_img: np.ndarray, face: Detection) -> np.ndarray: |
| """Extract facial area in an image. |
| |
| Args: |
| np_img (np.ndarray): The numpy image. |
| face (mediapipe.tasks.python.components.containers.detections.Detection): The detected face. |
| |
| Returns: |
| np.ndarray: Cropped face image. |
| """ |
| |
| bbox = face.bounding_box |
| x_min = bbox.origin_x |
| y_min = bbox.origin_y |
| x_max = bbox.origin_x + bbox.width |
| y_max = bbox.origin_y + bbox.height |
| face_img = np_img[y_min:y_max, x_min:x_max, :] |
| face_img = cv2.resize(face_img, (224, 224)) |
| return face_img |
|
|
| def _detect_gaze(self, np_imgs: List[np.ndarray]) -> List[Tuple[float, float]]: |
| """Detect faces and gazes in an image. |
| |
| Args: |
| pil_imgs (List[np.ndarray]): The numpy image list, each image is a cropped facial image. |
| |
| Returns: |
| List[Tuple[float, float]]: Yaw (radian) and Pitch (radian). |
| """ |
| ret = [] |
| for i in range(0, len(np_imgs), GAZE_MAX_BATCH_SIZE): |
| img_batch = [] |
| for j in range(i, min(len(np_imgs), i + GAZE_MAX_BATCH_SIZE)): |
| img = self._gaze_transformations(np_imgs[j]) |
| img = np.expand_dims(img, axis=0).astype(np.float32) |
| img_batch.append(img) |
|
|
| img_batch = np.concatenate(img_batch, axis=0) |
| onnx_input_image = {self.gaze_onnx_session.get_inputs()[0].name: img_batch} |
| yaw, pitch = self.gaze_onnx_session.run(None, onnx_input_image) |
|
|
| for j in range(len(img_batch)): |
| ret.append((yaw[j], pitch[j])) |
|
|
| return ret |
|
|
| def _make_response( |
| self, |
| faces: List[Detection], |
| gazes: List[Tuple[float, float]], |
| imgW: int, |
| imgH: int, |
| time_total: float, |
| time_face_det: float = None, |
| time_gaze_det: float = None, |
| ) -> GazeDetectionInferenceResponse: |
| """Prepare response object from detected faces and corresponding gazes. |
| |
| Args: |
| faces (List[Detection]): The detected faces. |
| gazes (List[tuple(float, float)]): The detected gazes (yaw, pitch). |
| imgW (int): The width (px) of original image. |
| imgH (int): The height (px) of original image. |
| time_total (float): The processing time. |
| time_face_det (float): The processing time. |
| time_gaze_det (float): The processing time. |
| |
| Returns: |
| GazeDetectionInferenceResponse: The response object including the detected faces and gazes info. |
| """ |
| predictions = [] |
| for face, gaze in zip(faces, gazes): |
| landmarks = [] |
| for keypoint in face.keypoints: |
| x = min(max(int(keypoint.x * imgW), 0), imgW - 1) |
| y = min(max(int(keypoint.y * imgH), 0), imgH - 1) |
| landmarks.append(Point(x=x, y=y)) |
|
|
| bbox = face.bounding_box |
| x_center = bbox.origin_x + bbox.width / 2 |
| y_center = bbox.origin_y + bbox.height / 2 |
| score = face.categories[0].score |
|
|
| prediction = GazeDetectionPrediction( |
| face=FaceDetectionPrediction( |
| x=x_center, |
| y=y_center, |
| width=bbox.width, |
| height=bbox.height, |
| confidence=score, |
| class_name="face", |
| landmarks=landmarks, |
| ), |
| yaw=gaze[0], |
| pitch=gaze[1], |
| ) |
| predictions.append(prediction) |
|
|
| response = GazeDetectionInferenceResponse( |
| predictions=predictions, |
| time=time_total, |
| time_face_det=time_face_det, |
| time_gaze_det=time_gaze_det, |
| ) |
| return response |
|
|
| def get_infer_bucket_file_list(self) -> List[str]: |
| """Gets the list of files required for inference. |
| |
| Returns: |
| List[str]: The list of file names. |
| """ |
| return [ |
| "mediapipe_face_detector.tflite", |
| "L2CSNet_gaze360_resnet50_90bins.onnx", |
| ] |
|
|
| def infer_from_request( |
| self, request: GazeDetectionInferenceRequest |
| ) -> List[GazeDetectionInferenceResponse]: |
| """Detect faces and gazes in image(s). |
| |
| Args: |
| request (GazeDetectionInferenceRequest): The request object containing the image. |
| |
| Returns: |
| List[GazeDetectionInferenceResponse]: The list of response objects containing the faces and corresponding gazes. |
| """ |
| if isinstance(request.image, list): |
| if len(request.image) > GAZE_MAX_BATCH_SIZE: |
| raise ValueError( |
| f"The maximum number of images that can be inferred with gaze detection at one time is {GAZE_MAX_BATCH_SIZE}" |
| ) |
| imgs = request.image |
| else: |
| imgs = [request.image] |
|
|
| time_total = perf_counter() |
|
|
| |
| num_img = len(imgs) |
| np_imgs = [load_image_rgb(img) for img in imgs] |
|
|
| |
| |
| time_face_det = perf_counter() |
| faces = [] |
| for np_img in np_imgs: |
| if request.do_run_face_detection: |
| mp_img = mp.Image( |
| image_format=mp.ImageFormat.SRGB, data=np_img.astype(np.uint8) |
| ) |
| faces_per_img = self.face_detector.detect(mp_img).detections |
| else: |
| faces_per_img = [ |
| Detection( |
| bounding_box=BoundingBox( |
| origin_x=0, |
| origin_y=0, |
| width=np_img.shape[1], |
| height=np_img.shape[0], |
| ), |
| categories=[Category(score=1.0, category_name="face")], |
| keypoints=[], |
| ) |
| ] |
| faces.append(faces_per_img) |
| time_face_det = (perf_counter() - time_face_det) / num_img |
|
|
| |
| time_gaze_det = perf_counter() |
| face_imgs = [] |
| for i, np_img in enumerate(np_imgs): |
| if request.do_run_face_detection: |
| face_imgs.extend( |
| [self._crop_face_img(np_img, face) for face in faces[i]] |
| ) |
| else: |
| face_imgs.append(cv2.resize(np_img, (224, 224))) |
| gazes = self._detect_gaze(face_imgs) |
| time_gaze_det = (perf_counter() - time_gaze_det) / num_img |
|
|
| time_total = (perf_counter() - time_total) / num_img |
|
|
| |
| response = [] |
| idx_gaze = 0 |
| for i in range(len(np_imgs)): |
| imgH, imgW, _ = np_imgs[i].shape |
| faces_per_img = faces[i] |
| gazes_per_img = gazes[idx_gaze : idx_gaze + len(faces_per_img)] |
| response.append( |
| self._make_response( |
| faces_per_img, gazes_per_img, imgW, imgH, time_total |
| ) |
| ) |
|
|
| return response |
|
|
|
|
| class L2C2Wrapper(L2CS): |
| """Roboflow L2CS Gaze detection model. |
| |
| This class is responsible for converting L2CS model to ONNX model. |
| It is ONLY intended for internal usage. |
| |
| Workflow: |
| After training a L2CS model, create an instance of this wrapper class. |
| Load the trained weights file, and save it as ONNX model. |
| """ |
|
|
| def __init__(self): |
| self.device = torch.device("cpu") |
| self.num_bins = 90 |
| super().__init__( |
| torchvision.models.resnet.Bottleneck, [3, 4, 6, 3], self.num_bins |
| ) |
| self._gaze_softmax = nn.Softmax(dim=1) |
| self._gaze_idx_tensor = torch.FloatTensor([i for i in range(90)]).to( |
| self.device |
| ) |
|
|
| def forward(self, x): |
| idx_tensor = torch.stack( |
| [self._gaze_idx_tensor for i in range(x.shape[0])], dim=0 |
| ) |
| gaze_yaw, gaze_pitch = super().forward(x) |
|
|
| yaw_predicted = self._gaze_softmax(gaze_yaw) |
| yaw_radian = ( |
| (torch.sum(yaw_predicted * idx_tensor, dim=1) * 4 - 180) * np.pi / 180 |
| ) |
|
|
| pitch_predicted = self._gaze_softmax(gaze_pitch) |
| pitch_radian = ( |
| (torch.sum(pitch_predicted * idx_tensor, dim=1) * 4 - 180) * np.pi / 180 |
| ) |
|
|
| return yaw_radian, pitch_radian |
|
|
| def load_L2CS_model( |
| self, |
| file_path=f"{MODEL_CACHE_DIR}/gaze/L2CS/L2CSNet_gaze360_resnet50_90bins.pkl", |
| ): |
| super().load_state_dict(torch.load(file_path, map_location=self.device)) |
| super().to(self.device) |
|
|
| def saveas_ONNX_model( |
| self, |
| file_path=f"{MODEL_CACHE_DIR}/gaze/L2CS/L2CSNet_gaze360_resnet50_90bins.onnx", |
| ): |
| dummy_input = torch.randn(1, 3, 448, 448) |
| dynamic_axes = { |
| "input": {0: "batch_size"}, |
| "output_yaw": {0: "batch_size"}, |
| "output_pitch": {0: "batch_size"}, |
| } |
| torch.onnx.export( |
| self, |
| dummy_input, |
| file_path, |
| input_names=["input"], |
| output_names=["output_yaw", "output_pitch"], |
| dynamic_axes=dynamic_axes, |
| verbose=False, |
| ) |
|
|