#!/usr/bin/env python3 """ RTMPose-M axengine inference on AXERA NPU. """ import argparse import os from time import time from typing import Tuple import cv2 import numpy as np try: import axengine as axe except ImportError: import onnxruntime as axe SIMCC_SPLIT_RATIO = 2.0 NUM_KP = 17 COCO_SKELETON = [ (15, 13), (13, 11), (16, 14), (14, 12), (11, 12), (5, 11), (6, 12), (5, 6), (5, 7), (6, 8), (7, 9), (8, 10), (1, 2), (0, 1), (0, 2), (1, 3), (2, 4), (3, 5), (4, 6), ] def bbox_xyxy2cs(bbox: np.ndarray, padding: float = 1.0) -> Tuple[np.ndarray, np.ndarray]: x1, y1, x2, y2 = bbox center = np.array([(x1 + x2) * 0.5, (y1 + y2) * 0.5], dtype=np.float32) scale = np.array([(x2 - x1) * padding, (y2 - y1) * padding], dtype=np.float32) return center, scale def _fix_aspect_ratio(bbox_scale: np.ndarray, aspect_ratio: float) -> np.ndarray: w, h = bbox_scale if w > h * aspect_ratio: return np.array([w, w / aspect_ratio], dtype=np.float32) else: return np.array([h * aspect_ratio, h], dtype=np.float32) def _rotate_point(pt: np.ndarray, angle_rad: float) -> np.ndarray: sn, cs = np.sin(angle_rad), np.cos(angle_rad) return np.array([cs * pt[0] - sn * pt[1], sn * pt[0] + cs * pt[1]]) def _get_3rd_point(a: np.ndarray, b: np.ndarray) -> np.ndarray: direction = a - b return b + np.r_[-direction[1], direction[0]] def get_warp_matrix(center, scale, rot, output_size): src_w = scale[0] dst_w, dst_h = output_size rot_rad = np.deg2rad(rot) src_dir = _rotate_point(np.array([0.0, src_w * -0.5]), rot_rad) dst_dir = np.array([0.0, dst_w * -0.5]) src_points = np.zeros((3, 2), dtype=np.float32) src_points[0] = center src_points[1] = center + src_dir src_points[2] = _get_3rd_point(src_points[0], src_points[1]) dst_points = np.zeros((3, 2), dtype=np.float32) dst_points[0] = [dst_w * 0.5, dst_h * 0.5] dst_points[1] = [dst_w * 0.5, dst_h * 0.5] + dst_dir dst_points[2] = _get_3rd_point(dst_points[0], dst_points[1]) return cv2.getAffineTransform(src_points, dst_points) def preprocess(img_bgr, input_size=(192, 256)): h, w = img_bgr.shape[:2] bbox = np.array([0, 0, w, h], dtype=np.float32) center, scale = bbox_xyxy2cs(bbox, padding=1.25) scale = _fix_aspect_ratio(scale, input_size[0] / input_size[1]) warp_mat = get_warp_matrix(center, scale, 0, input_size) img_warped = cv2.warpAffine(img_bgr, warp_mat, input_size, flags=cv2.INTER_LINEAR) inp = img_warped[None] # (1, H, W, 3) uint8 NHWC BGR, axmodel handles BGR->RGB return inp, center, scale def get_simcc_maximum(simcc_x, simcc_y): N, K, Wx = simcc_x.shape x_locs = np.argmax(simcc_x, axis=2) y_locs = np.argmax(simcc_y, axis=2) x_vals = np.take_along_axis(simcc_x, x_locs[:, :, None], axis=2).squeeze(2) y_vals = np.take_along_axis(simcc_y, y_locs[:, :, None], axis=2).squeeze(2) locs = np.stack([x_locs, y_locs], axis=-1).astype(np.float32) scores = np.minimum(x_vals, y_vals) return locs, scores def draw(img, keypoints, scores, thr=0.3): for i, ((x, y), s) in enumerate(zip(keypoints, scores)): if s < thr: continue cv2.circle(img, (int(x), int(y)), 4, (0, 255, 0), -1) for i, j in COCO_SKELETON: if scores[i] >= thr and scores[j] >= thr: pt1 = (int(keypoints[i][0]), int(keypoints[i][1])) pt2 = (int(keypoints[j][0]), int(keypoints[j][1])) cv2.line(img, pt1, pt2, (255, 128, 0), 2) def main(): ap = argparse.ArgumentParser() ap.add_argument("-m", "--model", default="output/rtmpose_m_npu3.axmodel") ap.add_argument("-i", "--image", required=True) ap.add_argument("-o", "--output", default="ax_result.jpg") ap.add_argument("--score_thres", type=float, default=0.3) ap.add_argument("--warmup", type=int, default=3) ap.add_argument("--repeat", type=int, default=10) args = ap.parse_args() img0 = cv2.imread(args.image) assert img0 is not None, f"Cannot read {args.image}" inp, center, scale = preprocess(img0) input_size = (192, 256) model = axe.InferenceSession(args.model) inp_info = model.get_inputs()[0] dtype_str = getattr(inp_info, "dtype", getattr(inp_info, "type", "unknown")) print(f"Model input: name={inp_info.name}, shape={inp_info.shape}, dtype={dtype_str}") for _ in range(args.warmup): model.run(None, {inp_info.name: inp}) t0 = time() for _ in range(args.repeat): outputs = model.run(None, {inp_info.name: inp}) elapsed = (time() - t0) / args.repeat * 1000 print(f"Forward: {elapsed:.2f} ms (avg of {args.repeat} runs)") simcc_x, simcc_y = outputs[0], outputs[1] print(f"simcc_x: shape={simcc_x.shape}, range=[{simcc_x.min():.2f}, {simcc_x.max():.2f}]") print(f"simcc_y: shape={simcc_y.shape}, range=[{simcc_y.min():.2f}, {simcc_y.max():.2f}]") locs, scores = get_simcc_maximum(simcc_x, simcc_y) keypoints = locs / SIMCC_SPLIT_RATIO keypoints = keypoints / np.array(input_size) * scale + center - scale / 2 keypoints = keypoints[0] scores = scores[0] above = (scores >= args.score_thres).sum() print(f"kpts above {args.score_thres}: {above}/{NUM_KP}") for i, ((x, y), sc) in enumerate(zip(keypoints, scores)): print(f" kp{i:02d}: ({x:6.1f}, {y:6.1f}) score={sc:.4f}") draw(img0, keypoints, scores, args.score_thres) cv2.imwrite(args.output, img0) print(f"Saved: {args.output}") if __name__ == "__main__": main()