RTMPose / ax_infer.py
fangmingguo's picture
Upload 13 files
993d81c verified
#!/usr/bin/env python3
"""
RTMPose-M axengine inference on AXERA NPU.
"""
import argparse
import os
from time import time
from typing import Tuple
import cv2
import numpy as np
try:
import axengine as axe
except ImportError:
import onnxruntime as axe
SIMCC_SPLIT_RATIO = 2.0
NUM_KP = 17
COCO_SKELETON = [
(15, 13), (13, 11), (16, 14), (14, 12), (11, 12),
(5, 11), (6, 12), (5, 6), (5, 7), (6, 8),
(7, 9), (8, 10), (1, 2), (0, 1), (0, 2),
(1, 3), (2, 4), (3, 5), (4, 6),
]
def bbox_xyxy2cs(bbox: np.ndarray, padding: float = 1.0) -> Tuple[np.ndarray, np.ndarray]:
x1, y1, x2, y2 = bbox
center = np.array([(x1 + x2) * 0.5, (y1 + y2) * 0.5], dtype=np.float32)
scale = np.array([(x2 - x1) * padding, (y2 - y1) * padding], dtype=np.float32)
return center, scale
def _fix_aspect_ratio(bbox_scale: np.ndarray, aspect_ratio: float) -> np.ndarray:
w, h = bbox_scale
if w > h * aspect_ratio:
return np.array([w, w / aspect_ratio], dtype=np.float32)
else:
return np.array([h * aspect_ratio, h], dtype=np.float32)
def _rotate_point(pt: np.ndarray, angle_rad: float) -> np.ndarray:
sn, cs = np.sin(angle_rad), np.cos(angle_rad)
return np.array([cs * pt[0] - sn * pt[1], sn * pt[0] + cs * pt[1]])
def _get_3rd_point(a: np.ndarray, b: np.ndarray) -> np.ndarray:
direction = a - b
return b + np.r_[-direction[1], direction[0]]
def get_warp_matrix(center, scale, rot, output_size):
src_w = scale[0]
dst_w, dst_h = output_size
rot_rad = np.deg2rad(rot)
src_dir = _rotate_point(np.array([0.0, src_w * -0.5]), rot_rad)
dst_dir = np.array([0.0, dst_w * -0.5])
src_points = np.zeros((3, 2), dtype=np.float32)
src_points[0] = center
src_points[1] = center + src_dir
src_points[2] = _get_3rd_point(src_points[0], src_points[1])
dst_points = np.zeros((3, 2), dtype=np.float32)
dst_points[0] = [dst_w * 0.5, dst_h * 0.5]
dst_points[1] = [dst_w * 0.5, dst_h * 0.5] + dst_dir
dst_points[2] = _get_3rd_point(dst_points[0], dst_points[1])
return cv2.getAffineTransform(src_points, dst_points)
def preprocess(img_bgr, input_size=(192, 256)):
h, w = img_bgr.shape[:2]
bbox = np.array([0, 0, w, h], dtype=np.float32)
center, scale = bbox_xyxy2cs(bbox, padding=1.25)
scale = _fix_aspect_ratio(scale, input_size[0] / input_size[1])
warp_mat = get_warp_matrix(center, scale, 0, input_size)
img_warped = cv2.warpAffine(img_bgr, warp_mat, input_size, flags=cv2.INTER_LINEAR)
inp = img_warped[None] # (1, H, W, 3) uint8 NHWC BGR, axmodel handles BGR->RGB
return inp, center, scale
def get_simcc_maximum(simcc_x, simcc_y):
N, K, Wx = simcc_x.shape
x_locs = np.argmax(simcc_x, axis=2)
y_locs = np.argmax(simcc_y, axis=2)
x_vals = np.take_along_axis(simcc_x, x_locs[:, :, None], axis=2).squeeze(2)
y_vals = np.take_along_axis(simcc_y, y_locs[:, :, None], axis=2).squeeze(2)
locs = np.stack([x_locs, y_locs], axis=-1).astype(np.float32)
scores = np.minimum(x_vals, y_vals)
return locs, scores
def draw(img, keypoints, scores, thr=0.3):
for i, ((x, y), s) in enumerate(zip(keypoints, scores)):
if s < thr:
continue
cv2.circle(img, (int(x), int(y)), 4, (0, 255, 0), -1)
for i, j in COCO_SKELETON:
if scores[i] >= thr and scores[j] >= thr:
pt1 = (int(keypoints[i][0]), int(keypoints[i][1]))
pt2 = (int(keypoints[j][0]), int(keypoints[j][1]))
cv2.line(img, pt1, pt2, (255, 128, 0), 2)
def main():
ap = argparse.ArgumentParser()
ap.add_argument("-m", "--model", default="output/rtmpose_m_npu3.axmodel")
ap.add_argument("-i", "--image", required=True)
ap.add_argument("-o", "--output", default="ax_result.jpg")
ap.add_argument("--score_thres", type=float, default=0.3)
ap.add_argument("--warmup", type=int, default=3)
ap.add_argument("--repeat", type=int, default=10)
args = ap.parse_args()
img0 = cv2.imread(args.image)
assert img0 is not None, f"Cannot read {args.image}"
inp, center, scale = preprocess(img0)
input_size = (192, 256)
model = axe.InferenceSession(args.model)
inp_info = model.get_inputs()[0]
dtype_str = getattr(inp_info, "dtype", getattr(inp_info, "type", "unknown"))
print(f"Model input: name={inp_info.name}, shape={inp_info.shape}, dtype={dtype_str}")
for _ in range(args.warmup):
model.run(None, {inp_info.name: inp})
t0 = time()
for _ in range(args.repeat):
outputs = model.run(None, {inp_info.name: inp})
elapsed = (time() - t0) / args.repeat * 1000
print(f"Forward: {elapsed:.2f} ms (avg of {args.repeat} runs)")
simcc_x, simcc_y = outputs[0], outputs[1]
print(f"simcc_x: shape={simcc_x.shape}, range=[{simcc_x.min():.2f}, {simcc_x.max():.2f}]")
print(f"simcc_y: shape={simcc_y.shape}, range=[{simcc_y.min():.2f}, {simcc_y.max():.2f}]")
locs, scores = get_simcc_maximum(simcc_x, simcc_y)
keypoints = locs / SIMCC_SPLIT_RATIO
keypoints = keypoints / np.array(input_size) * scale + center - scale / 2
keypoints = keypoints[0]
scores = scores[0]
above = (scores >= args.score_thres).sum()
print(f"kpts above {args.score_thres}: {above}/{NUM_KP}")
for i, ((x, y), sc) in enumerate(zip(keypoints, scores)):
print(f" kp{i:02d}: ({x:6.1f}, {y:6.1f}) score={sc:.4f}")
draw(img0, keypoints, scores, args.score_thres)
cv2.imwrite(args.output, img0)
print(f"Saved: {args.output}")
if __name__ == "__main__":
main()