Spaces:
Configuration error
Configuration error
| import math | |
| from time import perf_counter | |
| from typing import List, Optional, Tuple, Union | |
| import cv2 | |
| import mediapipe as mp | |
| import numpy as np | |
| import onnxruntime | |
| import torch | |
| import torch.nn as nn | |
| import torchvision | |
| from mediapipe.tasks.python.components.containers.bounding_box import BoundingBox | |
| from mediapipe.tasks.python.components.containers.category import Category | |
| from mediapipe.tasks.python.components.containers.detections import Detection | |
| from torchvision import transforms | |
| from inference.core.entities.requests.gaze import GazeDetectionInferenceRequest | |
| from inference.core.entities.responses.gaze import ( | |
| GazeDetectionInferenceResponse, | |
| GazeDetectionPrediction, | |
| ) | |
| from inference.core.entities.responses.inference import FaceDetectionPrediction, Point | |
| from inference.core.env import ( | |
| GAZE_MAX_BATCH_SIZE, | |
| MODEL_CACHE_DIR, | |
| REQUIRED_ONNX_PROVIDERS, | |
| TENSORRT_CACHE_PATH, | |
| ) | |
| from inference.core.exceptions import OnnxProviderNotAvailable | |
| from inference.core.models.roboflow import OnnxRoboflowCoreModel | |
| from inference.core.utils.image_utils import load_image_rgb | |
| from inference.models.gaze.l2cs import L2CS | |
| class Gaze(OnnxRoboflowCoreModel): | |
| """Roboflow ONNX Gaze model. | |
| This class is responsible for handling the ONNX Gaze model, including | |
| loading the model, preprocessing the input, and performing inference. | |
| Attributes: | |
| gaze_onnx_session (onnxruntime.InferenceSession): ONNX Runtime session for gaze detection inference. | |
| """ | |
| def __init__(self, *args, **kwargs): | |
| """Initializes the Gaze with the given arguments and keyword arguments.""" | |
| t1 = perf_counter() | |
| super().__init__(*args, **kwargs) | |
| # Create an ONNX Runtime Session with a list of execution providers in priority order. ORT attempts to load providers until one is successful. This keeps the code across devices identical. | |
| self.log("Creating inference sessions") | |
| # TODO: convert face detector (TensorflowLite) to ONNX model | |
| self.gaze_onnx_session = onnxruntime.InferenceSession( | |
| self.cache_file("L2CSNet_gaze360_resnet50_90bins.onnx"), | |
| providers=[ | |
| ( | |
| "TensorrtExecutionProvider", | |
| { | |
| "trt_engine_cache_enable": True, | |
| "trt_engine_cache_path": TENSORRT_CACHE_PATH, | |
| }, | |
| ), | |
| "CUDAExecutionProvider", | |
| "CPUExecutionProvider", | |
| ], | |
| ) | |
| if REQUIRED_ONNX_PROVIDERS: | |
| available_providers = onnxruntime.get_available_providers() | |
| for provider in REQUIRED_ONNX_PROVIDERS: | |
| if provider not in available_providers: | |
| raise OnnxProviderNotAvailable( | |
| f"Required ONNX Execution Provider {provider} is not availble. Check that you are using the correct docker image on a supported device." | |
| ) | |
| # init face detector | |
| self.face_detector = mp.tasks.vision.FaceDetector.create_from_options( | |
| mp.tasks.vision.FaceDetectorOptions( | |
| base_options=mp.tasks.BaseOptions( | |
| model_asset_path=self.cache_file("mediapipe_face_detector.tflite") | |
| ), | |
| running_mode=mp.tasks.vision.RunningMode.IMAGE, | |
| ) | |
| ) | |
| # additional settings for gaze detection | |
| self._gaze_transformations = transforms.Compose( | |
| [ | |
| transforms.ToTensor(), | |
| transforms.Resize(448), | |
| transforms.Normalize( | |
| mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225] | |
| ), | |
| ] | |
| ) | |
| self.task_type = "gaze-detection" | |
| self.log(f"GAZE model loaded in {perf_counter() - t1:.2f} seconds") | |
| def _crop_face_img(self, np_img: np.ndarray, face: Detection) -> np.ndarray: | |
| """Extract facial area in an image. | |
| Args: | |
| np_img (np.ndarray): The numpy image. | |
| face (mediapipe.tasks.python.components.containers.detections.Detection): The detected face. | |
| Returns: | |
| np.ndarray: Cropped face image. | |
| """ | |
| # extract face area | |
| bbox = face.bounding_box | |
| x_min = bbox.origin_x | |
| y_min = bbox.origin_y | |
| x_max = bbox.origin_x + bbox.width | |
| y_max = bbox.origin_y + bbox.height | |
| face_img = np_img[y_min:y_max, x_min:x_max, :] | |
| face_img = cv2.resize(face_img, (224, 224)) | |
| return face_img | |
| def _detect_gaze(self, np_imgs: List[np.ndarray]) -> List[Tuple[float, float]]: | |
| """Detect faces and gazes in an image. | |
| Args: | |
| pil_imgs (List[np.ndarray]): The numpy image list, each image is a cropped facial image. | |
| Returns: | |
| List[Tuple[float, float]]: Yaw (radian) and Pitch (radian). | |
| """ | |
| ret = [] | |
| for i in range(0, len(np_imgs), GAZE_MAX_BATCH_SIZE): | |
| img_batch = [] | |
| for j in range(i, min(len(np_imgs), i + GAZE_MAX_BATCH_SIZE)): | |
| img = self._gaze_transformations(np_imgs[j]) | |
| img = np.expand_dims(img, axis=0).astype(np.float32) | |
| img_batch.append(img) | |
| img_batch = np.concatenate(img_batch, axis=0) | |
| onnx_input_image = {self.gaze_onnx_session.get_inputs()[0].name: img_batch} | |
| yaw, pitch = self.gaze_onnx_session.run(None, onnx_input_image) | |
| for j in range(len(img_batch)): | |
| ret.append((yaw[j], pitch[j])) | |
| return ret | |
| def _make_response( | |
| self, | |
| faces: List[Detection], | |
| gazes: List[Tuple[float, float]], | |
| imgW: int, | |
| imgH: int, | |
| time_total: float, | |
| time_face_det: float = None, | |
| time_gaze_det: float = None, | |
| ) -> GazeDetectionInferenceResponse: | |
| """Prepare response object from detected faces and corresponding gazes. | |
| Args: | |
| faces (List[Detection]): The detected faces. | |
| gazes (List[tuple(float, float)]): The detected gazes (yaw, pitch). | |
| imgW (int): The width (px) of original image. | |
| imgH (int): The height (px) of original image. | |
| time_total (float): The processing time. | |
| time_face_det (float): The processing time. | |
| time_gaze_det (float): The processing time. | |
| Returns: | |
| GazeDetectionInferenceResponse: The response object including the detected faces and gazes info. | |
| """ | |
| predictions = [] | |
| for face, gaze in zip(faces, gazes): | |
| landmarks = [] | |
| for keypoint in face.keypoints: | |
| x = min(max(int(keypoint.x * imgW), 0), imgW - 1) | |
| y = min(max(int(keypoint.y * imgH), 0), imgH - 1) | |
| landmarks.append(Point(x=x, y=y)) | |
| bbox = face.bounding_box | |
| x_center = bbox.origin_x + bbox.width / 2 | |
| y_center = bbox.origin_y + bbox.height / 2 | |
| score = face.categories[0].score | |
| prediction = GazeDetectionPrediction( | |
| face=FaceDetectionPrediction( | |
| x=x_center, | |
| y=y_center, | |
| width=bbox.width, | |
| height=bbox.height, | |
| confidence=score, | |
| class_name="face", | |
| landmarks=landmarks, | |
| ), | |
| yaw=gaze[0], | |
| pitch=gaze[1], | |
| ) | |
| predictions.append(prediction) | |
| response = GazeDetectionInferenceResponse( | |
| predictions=predictions, | |
| time=time_total, | |
| time_face_det=time_face_det, | |
| time_gaze_det=time_gaze_det, | |
| ) | |
| return response | |
| def get_infer_bucket_file_list(self) -> List[str]: | |
| """Gets the list of files required for inference. | |
| Returns: | |
| List[str]: The list of file names. | |
| """ | |
| return [ | |
| "mediapipe_face_detector.tflite", | |
| "L2CSNet_gaze360_resnet50_90bins.onnx", | |
| ] | |
| def infer_from_request( | |
| self, request: GazeDetectionInferenceRequest | |
| ) -> List[GazeDetectionInferenceResponse]: | |
| """Detect faces and gazes in image(s). | |
| Args: | |
| request (GazeDetectionInferenceRequest): The request object containing the image. | |
| Returns: | |
| List[GazeDetectionInferenceResponse]: The list of response objects containing the faces and corresponding gazes. | |
| """ | |
| if isinstance(request.image, list): | |
| if len(request.image) > GAZE_MAX_BATCH_SIZE: | |
| raise ValueError( | |
| f"The maximum number of images that can be inferred with gaze detection at one time is {GAZE_MAX_BATCH_SIZE}" | |
| ) | |
| imgs = request.image | |
| else: | |
| imgs = [request.image] | |
| time_total = perf_counter() | |
| # load pil images | |
| num_img = len(imgs) | |
| np_imgs = [load_image_rgb(img) for img in imgs] | |
| # face detection | |
| # TODO: face detection for batch | |
| time_face_det = perf_counter() | |
| faces = [] | |
| for np_img in np_imgs: | |
| if request.do_run_face_detection: | |
| mp_img = mp.Image( | |
| image_format=mp.ImageFormat.SRGB, data=np_img.astype(np.uint8) | |
| ) | |
| faces_per_img = self.face_detector.detect(mp_img).detections | |
| else: | |
| faces_per_img = [ | |
| Detection( | |
| bounding_box=BoundingBox( | |
| origin_x=0, | |
| origin_y=0, | |
| width=np_img.shape[1], | |
| height=np_img.shape[0], | |
| ), | |
| categories=[Category(score=1.0, category_name="face")], | |
| keypoints=[], | |
| ) | |
| ] | |
| faces.append(faces_per_img) | |
| time_face_det = (perf_counter() - time_face_det) / num_img | |
| # gaze detection | |
| time_gaze_det = perf_counter() | |
| face_imgs = [] | |
| for i, np_img in enumerate(np_imgs): | |
| if request.do_run_face_detection: | |
| face_imgs.extend( | |
| [self._crop_face_img(np_img, face) for face in faces[i]] | |
| ) | |
| else: | |
| face_imgs.append(cv2.resize(np_img, (224, 224))) | |
| gazes = self._detect_gaze(face_imgs) | |
| time_gaze_det = (perf_counter() - time_gaze_det) / num_img | |
| time_total = (perf_counter() - time_total) / num_img | |
| # prepare response | |
| response = [] | |
| idx_gaze = 0 | |
| for i in range(len(np_imgs)): | |
| imgH, imgW, _ = np_imgs[i].shape | |
| faces_per_img = faces[i] | |
| gazes_per_img = gazes[idx_gaze : idx_gaze + len(faces_per_img)] | |
| response.append( | |
| self._make_response( | |
| faces_per_img, gazes_per_img, imgW, imgH, time_total | |
| ) | |
| ) | |
| return response | |
| class L2C2Wrapper(L2CS): | |
| """Roboflow L2CS Gaze detection model. | |
| This class is responsible for converting L2CS model to ONNX model. | |
| It is ONLY intended for internal usage. | |
| Workflow: | |
| After training a L2CS model, create an instance of this wrapper class. | |
| Load the trained weights file, and save it as ONNX model. | |
| """ | |
| def __init__(self): | |
| self.device = torch.device("cpu") | |
| self.num_bins = 90 | |
| super().__init__( | |
| torchvision.models.resnet.Bottleneck, [3, 4, 6, 3], self.num_bins | |
| ) | |
| self._gaze_softmax = nn.Softmax(dim=1) | |
| self._gaze_idx_tensor = torch.FloatTensor([i for i in range(90)]).to( | |
| self.device | |
| ) | |
| def forward(self, x): | |
| idx_tensor = torch.stack( | |
| [self._gaze_idx_tensor for i in range(x.shape[0])], dim=0 | |
| ) | |
| gaze_yaw, gaze_pitch = super().forward(x) | |
| yaw_predicted = self._gaze_softmax(gaze_yaw) | |
| yaw_radian = ( | |
| (torch.sum(yaw_predicted * idx_tensor, dim=1) * 4 - 180) * np.pi / 180 | |
| ) | |
| pitch_predicted = self._gaze_softmax(gaze_pitch) | |
| pitch_radian = ( | |
| (torch.sum(pitch_predicted * idx_tensor, dim=1) * 4 - 180) * np.pi / 180 | |
| ) | |
| return yaw_radian, pitch_radian | |
| def load_L2CS_model( | |
| self, | |
| file_path=f"{MODEL_CACHE_DIR}/gaze/L2CS/L2CSNet_gaze360_resnet50_90bins.pkl", | |
| ): | |
| super().load_state_dict(torch.load(file_path, map_location=self.device)) | |
| super().to(self.device) | |
| def saveas_ONNX_model( | |
| self, | |
| file_path=f"{MODEL_CACHE_DIR}/gaze/L2CS/L2CSNet_gaze360_resnet50_90bins.onnx", | |
| ): | |
| dummy_input = torch.randn(1, 3, 448, 448) | |
| dynamic_axes = { | |
| "input": {0: "batch_size"}, | |
| "output_yaw": {0: "batch_size"}, | |
| "output_pitch": {0: "batch_size"}, | |
| } | |
| torch.onnx.export( | |
| self, | |
| dummy_input, | |
| file_path, | |
| input_names=["input"], | |
| output_names=["output_yaw", "output_pitch"], | |
| dynamic_axes=dynamic_axes, | |
| verbose=False, | |
| ) | |