| |
| """ |
| RTMPose-M axengine inference on AXERA NPU. |
| """ |
|
|
| import argparse |
| import os |
| from time import time |
| from typing import Tuple |
|
|
| import cv2 |
| import numpy as np |
|
|
| try: |
| import axengine as axe |
| except ImportError: |
| import onnxruntime as axe |
|
|
| SIMCC_SPLIT_RATIO = 2.0 |
| NUM_KP = 17 |
| COCO_SKELETON = [ |
| (15, 13), (13, 11), (16, 14), (14, 12), (11, 12), |
| (5, 11), (6, 12), (5, 6), (5, 7), (6, 8), |
| (7, 9), (8, 10), (1, 2), (0, 1), (0, 2), |
| (1, 3), (2, 4), (3, 5), (4, 6), |
| ] |
|
|
|
|
| def bbox_xyxy2cs(bbox: np.ndarray, padding: float = 1.0) -> Tuple[np.ndarray, np.ndarray]: |
| x1, y1, x2, y2 = bbox |
| center = np.array([(x1 + x2) * 0.5, (y1 + y2) * 0.5], dtype=np.float32) |
| scale = np.array([(x2 - x1) * padding, (y2 - y1) * padding], dtype=np.float32) |
| return center, scale |
|
|
|
|
| def _fix_aspect_ratio(bbox_scale: np.ndarray, aspect_ratio: float) -> np.ndarray: |
| w, h = bbox_scale |
| if w > h * aspect_ratio: |
| return np.array([w, w / aspect_ratio], dtype=np.float32) |
| else: |
| return np.array([h * aspect_ratio, h], dtype=np.float32) |
|
|
|
|
| def _rotate_point(pt: np.ndarray, angle_rad: float) -> np.ndarray: |
| sn, cs = np.sin(angle_rad), np.cos(angle_rad) |
| return np.array([cs * pt[0] - sn * pt[1], sn * pt[0] + cs * pt[1]]) |
|
|
|
|
| def _get_3rd_point(a: np.ndarray, b: np.ndarray) -> np.ndarray: |
| direction = a - b |
| return b + np.r_[-direction[1], direction[0]] |
|
|
|
|
| def get_warp_matrix(center, scale, rot, output_size): |
| src_w = scale[0] |
| dst_w, dst_h = output_size |
|
|
| rot_rad = np.deg2rad(rot) |
| src_dir = _rotate_point(np.array([0.0, src_w * -0.5]), rot_rad) |
| dst_dir = np.array([0.0, dst_w * -0.5]) |
|
|
| src_points = np.zeros((3, 2), dtype=np.float32) |
| src_points[0] = center |
| src_points[1] = center + src_dir |
| src_points[2] = _get_3rd_point(src_points[0], src_points[1]) |
|
|
| dst_points = np.zeros((3, 2), dtype=np.float32) |
| dst_points[0] = [dst_w * 0.5, dst_h * 0.5] |
| dst_points[1] = [dst_w * 0.5, dst_h * 0.5] + dst_dir |
| dst_points[2] = _get_3rd_point(dst_points[0], dst_points[1]) |
|
|
| return cv2.getAffineTransform(src_points, dst_points) |
|
|
|
|
| def preprocess(img_bgr, input_size=(192, 256)): |
| h, w = img_bgr.shape[:2] |
| bbox = np.array([0, 0, w, h], dtype=np.float32) |
| center, scale = bbox_xyxy2cs(bbox, padding=1.25) |
| scale = _fix_aspect_ratio(scale, input_size[0] / input_size[1]) |
|
|
| warp_mat = get_warp_matrix(center, scale, 0, input_size) |
| img_warped = cv2.warpAffine(img_bgr, warp_mat, input_size, flags=cv2.INTER_LINEAR) |
|
|
| inp = img_warped[None] |
| return inp, center, scale |
|
|
|
|
| def get_simcc_maximum(simcc_x, simcc_y): |
| N, K, Wx = simcc_x.shape |
| x_locs = np.argmax(simcc_x, axis=2) |
| y_locs = np.argmax(simcc_y, axis=2) |
| x_vals = np.take_along_axis(simcc_x, x_locs[:, :, None], axis=2).squeeze(2) |
| y_vals = np.take_along_axis(simcc_y, y_locs[:, :, None], axis=2).squeeze(2) |
| locs = np.stack([x_locs, y_locs], axis=-1).astype(np.float32) |
| scores = np.minimum(x_vals, y_vals) |
| return locs, scores |
|
|
|
|
| def draw(img, keypoints, scores, thr=0.3): |
| for i, ((x, y), s) in enumerate(zip(keypoints, scores)): |
| if s < thr: |
| continue |
| cv2.circle(img, (int(x), int(y)), 4, (0, 255, 0), -1) |
| for i, j in COCO_SKELETON: |
| if scores[i] >= thr and scores[j] >= thr: |
| pt1 = (int(keypoints[i][0]), int(keypoints[i][1])) |
| pt2 = (int(keypoints[j][0]), int(keypoints[j][1])) |
| cv2.line(img, pt1, pt2, (255, 128, 0), 2) |
|
|
|
|
| def main(): |
| ap = argparse.ArgumentParser() |
| ap.add_argument("-m", "--model", default="output/rtmpose_m_npu3.axmodel") |
| ap.add_argument("-i", "--image", required=True) |
| ap.add_argument("-o", "--output", default="ax_result.jpg") |
| ap.add_argument("--score_thres", type=float, default=0.3) |
| ap.add_argument("--warmup", type=int, default=3) |
| ap.add_argument("--repeat", type=int, default=10) |
| args = ap.parse_args() |
|
|
| img0 = cv2.imread(args.image) |
| assert img0 is not None, f"Cannot read {args.image}" |
|
|
| inp, center, scale = preprocess(img0) |
| input_size = (192, 256) |
|
|
| model = axe.InferenceSession(args.model) |
| inp_info = model.get_inputs()[0] |
| dtype_str = getattr(inp_info, "dtype", getattr(inp_info, "type", "unknown")) |
| print(f"Model input: name={inp_info.name}, shape={inp_info.shape}, dtype={dtype_str}") |
|
|
| for _ in range(args.warmup): |
| model.run(None, {inp_info.name: inp}) |
|
|
| t0 = time() |
| for _ in range(args.repeat): |
| outputs = model.run(None, {inp_info.name: inp}) |
| elapsed = (time() - t0) / args.repeat * 1000 |
| print(f"Forward: {elapsed:.2f} ms (avg of {args.repeat} runs)") |
|
|
| simcc_x, simcc_y = outputs[0], outputs[1] |
| print(f"simcc_x: shape={simcc_x.shape}, range=[{simcc_x.min():.2f}, {simcc_x.max():.2f}]") |
| print(f"simcc_y: shape={simcc_y.shape}, range=[{simcc_y.min():.2f}, {simcc_y.max():.2f}]") |
|
|
| locs, scores = get_simcc_maximum(simcc_x, simcc_y) |
| keypoints = locs / SIMCC_SPLIT_RATIO |
| keypoints = keypoints / np.array(input_size) * scale + center - scale / 2 |
| keypoints = keypoints[0] |
| scores = scores[0] |
|
|
| above = (scores >= args.score_thres).sum() |
| print(f"kpts above {args.score_thres}: {above}/{NUM_KP}") |
| for i, ((x, y), sc) in enumerate(zip(keypoints, scores)): |
| print(f" kp{i:02d}: ({x:6.1f}, {y:6.1f}) score={sc:.4f}") |
|
|
| draw(img0, keypoints, scores, args.score_thres) |
| cv2.imwrite(args.output, img0) |
| print(f"Saved: {args.output}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|