# https://github.com/IDEA-Research/DWPose from pathlib import Path import cv2 import numpy as np import onnxruntime as ort import os import sys from typing import List, Tuple import cv2 import numpy as np import onnxruntime as ort ModelDataPathPrefix = Path("./pretrained_weights") class Wholebody: # https://github.com/IDEA-Research/DWPose def nms(self, boxes, scores, nms_thr): """Single class NMS implemented in Numpy.""" x1 = boxes[:, 0] y1 = boxes[:, 1] x2 = boxes[:, 2] y2 = boxes[:, 3] areas = (x2 - x1 + 1) * (y2 - y1 + 1) order = scores.argsort()[::-1] keep = [] while order.size > 0: i = order[0] keep.append(i) xx1 = np.maximum(x1[i], x1[order[1:]]) yy1 = np.maximum(y1[i], y1[order[1:]]) xx2 = np.minimum(x2[i], x2[order[1:]]) yy2 = np.minimum(y2[i], y2[order[1:]]) w = np.maximum(0.0, xx2 - xx1 + 1) h = np.maximum(0.0, yy2 - yy1 + 1) inter = w * h ovr = inter / (areas[i] + areas[order[1:]] - inter) inds = np.where(ovr <= nms_thr)[0] order = order[inds + 1] return keep def multiclass_nms(self, boxes, scores, nms_thr, score_thr): """Multiclass NMS implemented in Numpy. Class-aware version.""" final_dets = [] num_classes = scores.shape[1] for cls_ind in range(num_classes): cls_scores = scores[:, cls_ind] valid_score_mask = cls_scores > score_thr if valid_score_mask.sum() == 0: continue else: valid_scores = cls_scores[valid_score_mask] valid_boxes = boxes[valid_score_mask] keep = self.nms(valid_boxes, valid_scores, nms_thr) if len(keep) > 0: cls_inds = np.ones((len(keep), 1)) * cls_ind dets = np.concatenate( [valid_boxes[keep], valid_scores[keep, None], cls_inds], 1 ) final_dets.append(dets) if len(final_dets) == 0: return None return np.concatenate(final_dets, 0) def demo_postprocess(self, outputs, img_size, p6=False): grids = [] expanded_strides = [] strides = [8, 16, 32] if not p6 else [8, 16, 32, 64] hsizes = [img_size[0] // stride for stride in strides] wsizes = [img_size[1] // stride for stride in strides] for hsize, wsize, stride in zip(hsizes, wsizes, strides): xv, yv = np.meshgrid(np.arange(wsize), np.arange(hsize)) grid = np.stack((xv, yv), 2).reshape(1, -1, 2) grids.append(grid) shape = grid.shape[:2] expanded_strides.append(np.full((*shape, 1), stride)) grids = np.concatenate(grids, 1) expanded_strides = np.concatenate(expanded_strides, 1) outputs[..., :2] = (outputs[..., :2] + grids) * expanded_strides outputs[..., 2:4] = np.exp(outputs[..., 2:4]) * expanded_strides return outputs def det_preprocess(self, img, input_size, swap=(2, 0, 1)): if len(img.shape) == 3: padded_img = np.ones((input_size[0], input_size[1], 3), dtype=np.uint8) * 114 else: padded_img = np.ones(input_size, dtype=np.uint8) * 114 r = min(input_size[0] / img.shape[0], input_size[1] / img.shape[1]) resized_img = cv2.resize( img, (int(img.shape[1] * r), int(img.shape[0] * r)), interpolation=cv2.INTER_LINEAR, ).astype(np.uint8) padded_img[: int(img.shape[0] * r), : int(img.shape[1] * r)] = resized_img padded_img = padded_img.transpose(swap) padded_img = np.ascontiguousarray(padded_img, dtype=np.float32) return padded_img, r def inference_detector(self, session, oriImg): input_shape = (640, 640) img, ratio = self.det_preprocess(oriImg, input_shape) ort_inputs = {session.get_inputs()[0].name: img[None, :, :, :]} output = session.run(None, ort_inputs) predictions = self.demo_postprocess(output[0], input_shape)[0] boxes = predictions[:, :4] scores = predictions[:, 4:5] * predictions[:, 5:] boxes_xyxy = np.ones_like(boxes) boxes_xyxy[:, 0] = boxes[:, 0] - boxes[:, 2] / 2.0 boxes_xyxy[:, 1] = boxes[:, 1] - boxes[:, 3] / 2.0 boxes_xyxy[:, 2] = boxes[:, 0] + boxes[:, 2] / 2.0 boxes_xyxy[:, 3] = boxes[:, 1] + boxes[:, 3] / 2.0 boxes_xyxy /= ratio dets = self.multiclass_nms(boxes_xyxy, scores, nms_thr=0.45, score_thr=0.1) if dets is not None: final_boxes, final_scores, final_cls_inds = dets[:, :4], dets[:, 4], dets[:, 5] isscore = final_scores > 0.3 iscat = final_cls_inds == 0 isbbox = [i and j for (i, j) in zip(isscore, iscat)] final_boxes = final_boxes[isbbox] else: return [] return final_boxes def pose_preprocess(self, img: np.ndarray, out_bbox, input_size: Tuple[int, int] = (192, 256)) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """Do preprocessing for RTMPose model inference. Args: img (np.ndarray): Input image in shape. input_size (tuple): Input image size in shape (w, h). Returns: tuple: - resized_img (np.ndarray): Preprocessed image. - center (np.ndarray): Center of image. - scale (np.ndarray): Scale of image. """ # get shape of image img_shape = img.shape[:2] out_img, out_center, out_scale = [], [], [] if len(out_bbox) == 0: out_bbox = [[0, 0, img_shape[1], img_shape[0]]] for i in range(len(out_bbox)): x0 = out_bbox[i][0] y0 = out_bbox[i][1] x1 = out_bbox[i][2] y1 = out_bbox[i][3] bbox = np.array([x0, y0, x1, y1]) # get center and scale center, scale = self.bbox_xyxy2cs(bbox, padding=1.25) # do affine transformation resized_img, scale = self.top_down_affine(input_size, scale, center, img) # normalize image mean = np.array([123.675, 116.28, 103.53]) std = np.array([58.395, 57.12, 57.375]) resized_img = (resized_img - mean) / std out_img.append(resized_img) out_center.append(center) out_scale.append(scale) return out_img, out_center, out_scale def inference(self, sess: ort.InferenceSession, img: np.ndarray) -> np.ndarray: """Inference RTMPose model. Args: sess (ort.InferenceSession): ONNXRuntime session. img (np.ndarray): Input image in shape. Returns: outputs (np.ndarray): Output of RTMPose model. """ all_out = [] # build input for i in range(len(img)): input = [img[i].transpose(2, 0, 1)] # build output sess_input = {sess.get_inputs()[0].name: input} sess_output = [] for out in sess.get_outputs(): sess_output.append(out.name) # run model outputs = sess.run(sess_output, sess_input) all_out.append(outputs) return all_out def postprocess( self, outputs: List[np.ndarray], model_input_size: Tuple[int, int], center: Tuple[int, int], scale: Tuple[int, int], simcc_split_ratio: float = 2.0, ) -> Tuple[np.ndarray, np.ndarray]: """Postprocess for RTMPose model output. Args: outputs (np.ndarray): Output of RTMPose model. model_input_size (tuple): RTMPose model Input image size. center (tuple): Center of bbox in shape (x, y). scale (tuple): Scale of bbox in shape (w, h). simcc_split_ratio (float): Split ratio of simcc. Returns: tuple: - keypoints (np.ndarray): Rescaled keypoints. - scores (np.ndarray): Model predict scores. """ all_key = [] all_score = [] for i in range(len(outputs)): # use simcc to decode simcc_x, simcc_y = outputs[i] keypoints, scores = self.decode(simcc_x, simcc_y, simcc_split_ratio) # rescale keypoints keypoints = keypoints / model_input_size * scale[i] + center[i] - scale[i] / 2 all_key.append(keypoints[0]) all_score.append(scores[0]) return np.array(all_key), np.array(all_score) def bbox_xyxy2cs( self, bbox: np.ndarray, padding: float = 1.0 ) -> Tuple[np.ndarray, np.ndarray]: """Transform the bbox format from (x,y,w,h) into (center, scale) Args: bbox (ndarray): Bounding box(es) in shape (4,) or (n, 4), formatted as (left, top, right, bottom) padding (float): BBox padding factor that will be multilied to scale. Default: 1.0 Returns: tuple: A tuple containing center and scale. - np.ndarray[float32]: Center (x, y) of the bbox in shape (2,) or (n, 2) - np.ndarray[float32]: Scale (w, h) of the bbox in shape (2,) or (n, 2) """ # convert single bbox from (4, ) to (1, 4) dim = bbox.ndim if dim == 1: bbox = bbox[None, :] # get bbox center and scale x1, y1, x2, y2 = np.hsplit(bbox, [1, 2, 3]) center = np.hstack([x1 + x2, y1 + y2]) * 0.5 scale = np.hstack([x2 - x1, y2 - y1]) * padding if dim == 1: center = center[0] scale = scale[0] return center, scale def _fix_aspect_ratio(self, bbox_scale: np.ndarray, aspect_ratio: float) -> np.ndarray: """Extend the scale to match the given aspect ratio. Args: scale (np.ndarray): The image scale (w, h) in shape (2, ) aspect_ratio (float): The ratio of ``w/h`` Returns: np.ndarray: The reshaped image scale in (2, ) """ w, h = np.hsplit(bbox_scale, [1]) bbox_scale = np.where( w > h * aspect_ratio, np.hstack([w, w / aspect_ratio]), np.hstack([h * aspect_ratio, h]), ) return bbox_scale def _rotate_point(self, pt: np.ndarray, angle_rad: float) -> np.ndarray: """Rotate a point by an angle. Args: pt (np.ndarray): 2D point coordinates (x, y) in shape (2, ) angle_rad (float): rotation angle in radian Returns: np.ndarray: Rotated point in shape (2, ) """ sn, cs = np.sin(angle_rad), np.cos(angle_rad) rot_mat = np.array([[cs, -sn], [sn, cs]]) return rot_mat @ pt def _get_3rd_point(self, a: np.ndarray, b: np.ndarray) -> np.ndarray: """To calculate the affine matrix, three pairs of points are required. This function is used to get the 3rd point, given 2D points a & b. The 3rd point is defined by rotating vector `a - b` by 90 degrees anticlockwise, using b as the rotation center. Args: a (np.ndarray): The 1st point (x,y) in shape (2, ) b (np.ndarray): The 2nd point (x,y) in shape (2, ) Returns: np.ndarray: The 3rd point. """ direction = a - b c = b + np.r_[-direction[1], direction[0]] return c def get_warp_matrix( self, center: np.ndarray, scale: np.ndarray, rot: float, output_size: Tuple[int, int], shift: Tuple[float, float] = (0.0, 0.0), inv: bool = False, ) -> np.ndarray: """Calculate the affine transformation matrix that can warp the bbox area in the input image to the output size. Args: center (np.ndarray[2, ]): Center of the bounding box (x, y). scale (np.ndarray[2, ]): Scale of the bounding box wrt [width, height]. rot (float): Rotation angle (degree). output_size (np.ndarray[2, ] | list(2,)): Size of the destination heatmaps. shift (0-100%): Shift translation ratio wrt the width/height. Default (0., 0.). inv (bool): Option to inverse the affine transform direction. (inv=False: src->dst or inv=True: dst->src) Returns: np.ndarray: A 2x3 transformation matrix """ shift = np.array(shift) src_w = scale[0] dst_w = output_size[0] dst_h = output_size[1] # compute transformation matrix rot_rad = np.deg2rad(rot) src_dir = self._rotate_point(np.array([0.0, src_w * -0.5]), rot_rad) dst_dir = np.array([0.0, dst_w * -0.5]) # get four corners of the src rectangle in the original image src = np.zeros((3, 2), dtype=np.float32) src[0, :] = center + scale * shift src[1, :] = center + src_dir + scale * shift src[2, :] = self._get_3rd_point(src[0, :], src[1, :]) # get four corners of the dst rectangle in the input image dst = np.zeros((3, 2), dtype=np.float32) dst[0, :] = [dst_w * 0.5, dst_h * 0.5] dst[1, :] = np.array([dst_w * 0.5, dst_h * 0.5]) + dst_dir dst[2, :] = self._get_3rd_point(dst[0, :], dst[1, :]) if inv: warp_mat = cv2.getAffineTransform(np.float32(dst), np.float32(src)) else: warp_mat = cv2.getAffineTransform(np.float32(src), np.float32(dst)) return warp_mat def top_down_affine( self, input_size: dict, bbox_scale: dict, bbox_center: dict, img: np.ndarray ) -> Tuple[np.ndarray, np.ndarray]: """Get the bbox image as the model input by affine transform. Args: input_size (dict): The input size of the model. bbox_scale (dict): The bbox scale of the img. bbox_center (dict): The bbox center of the img. img (np.ndarray): The original image. Returns: tuple: A tuple containing center and scale. - np.ndarray[float32]: img after affine transform. - np.ndarray[float32]: bbox scale after affine transform. """ w, h = input_size warp_size = (int(w), int(h)) # reshape bbox to fixed aspect ratio bbox_scale = self._fix_aspect_ratio(bbox_scale, aspect_ratio=w / h) # get the affine matrix center = bbox_center scale = bbox_scale rot = 0 warp_mat = self.get_warp_matrix(center, scale, rot, output_size=(w, h)) # do affine transform img = cv2.warpAffine(img, warp_mat, warp_size, flags=cv2.INTER_LINEAR) return img, bbox_scale def get_simcc_maximum( self, simcc_x: np.ndarray, simcc_y: np.ndarray ) -> Tuple[np.ndarray, np.ndarray]: """Get maximum response location and value from simcc representations. Note: instance number: N num_keypoints: K heatmap height: H heatmap width: W Args: simcc_x (np.ndarray): x-axis SimCC in shape (K, Wx) or (N, K, Wx) simcc_y (np.ndarray): y-axis SimCC in shape (K, Wy) or (N, K, Wy) Returns: tuple: - locs (np.ndarray): locations of maximum heatmap responses in shape (K, 2) or (N, K, 2) - vals (np.ndarray): values of maximum heatmap responses in shape (K,) or (N, K) """ N, K, Wx = simcc_x.shape simcc_x = simcc_x.reshape(N * K, -1) simcc_y = simcc_y.reshape(N * K, -1) # get maximum value locations x_locs = np.argmax(simcc_x, axis=1) y_locs = np.argmax(simcc_y, axis=1) locs = np.stack((x_locs, y_locs), axis=-1).astype(np.float32) max_val_x = np.amax(simcc_x, axis=1) max_val_y = np.amax(simcc_y, axis=1) # get maximum value across x and y axis mask = max_val_x > max_val_y max_val_x[mask] = max_val_y[mask] vals = max_val_x locs[vals <= 0.0] = -1 # reshape locs = locs.reshape(N, K, 2) vals = vals.reshape(N, K) return locs, vals def decode( self, simcc_x: np.ndarray, simcc_y: np.ndarray, simcc_split_ratio ) -> Tuple[np.ndarray, np.ndarray]: """Modulate simcc distribution with Gaussian. Args: simcc_x (np.ndarray[K, Wx]): model predicted simcc in x. simcc_y (np.ndarray[K, Wy]): model predicted simcc in y. simcc_split_ratio (int): The split ratio of simcc. Returns: tuple: A tuple containing center and scale. - np.ndarray[float32]: keypoints in shape (K, 2) or (n, K, 2) - np.ndarray[float32]: scores in shape (K,) or (n, K) """ keypoints, scores = self.get_simcc_maximum(simcc_x, simcc_y) keypoints /= simcc_split_ratio return keypoints, scores def inference_pose(self, session, out_bbox, oriImg): h, w = session.get_inputs()[0].shape[2:] model_input_size = (w, h) resized_img, center, scale = self.pose_preprocess(oriImg, out_bbox, model_input_size) outputs = self.inference(session, resized_img) keypoints, scores = self.postprocess(outputs, model_input_size, center, scale) return keypoints, scores def __init__(self, device="cuda:0"): providers = ( ["CPUExecutionProvider"] if device == "cpu" else ["CUDAExecutionProvider"] ) base_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # Construct absolute paths for the ONNX files onnx_det = os.path.abspath(os.path.join(base_dir, "pretrained_weights", "DWPose", "yolox_l.onnx")) onnx_pose = os.path.abspath(os.path.join(base_dir, "pretrained_weights", "DWPose", "dw-ll_ucoco_384.onnx")) self.session_det = ort.InferenceSession( path_or_bytes=onnx_det, providers=providers ) self.session_pose = ort.InferenceSession( path_or_bytes=onnx_pose, providers=providers ) def __call__(self, oriImg): det_result = self.inference_detector(self.session_det, oriImg) keypoints, scores = self.inference_pose(self.session_pose, det_result, oriImg) keypoints_info = np.concatenate((keypoints, scores[..., None]), axis=-1) # compute neck joint neck = np.mean(keypoints_info[:, [5, 6]], axis=1) # neck score when visualizing pred neck[:, 2:4] = np.logical_and( keypoints_info[:, 5, 2:4] > 0.3, keypoints_info[:, 6, 2:4] > 0.3 ).astype(int) new_keypoints_info = np.insert(keypoints_info, 17, neck, axis=1) mmpose_idx = [17, 6, 8, 10, 7, 9, 12, 14, 16, 13, 15, 2, 1, 4, 3] openpose_idx = [1, 2, 3, 4, 6, 7, 8, 9, 10, 12, 13, 14, 15, 16, 17] new_keypoints_info[:, openpose_idx] = new_keypoints_info[:, mmpose_idx] keypoints_info = new_keypoints_info keypoints, scores = keypoints_info[..., :2], keypoints_info[..., 2] return keypoints, scores