|
|
|
|
|
""" |
|
|
CenterPoint AXEngine Inference Demo |
|
|
|
|
|
Usage: |
|
|
python inference_axmodel.py ./centerpoint.axmodel ./extracted_data/config.json ./extracted_data \ |
|
|
--output-dir ./inference_results_ax --num-samples 10 |
|
|
""" |
|
|
|
|
|
import argparse |
|
|
import json |
|
|
import os |
|
|
import os.path as osp |
|
|
import numpy as np |
|
|
from tqdm import tqdm |
|
|
import numba |
|
|
|
|
|
try: |
|
|
import axengine as axe |
|
|
except ImportError: |
|
|
print("Warning: axengine not available. Install it to use AXEngine inference.") |
|
|
axe = None |
|
|
|
|
|
|
|
|
def parse_args(): |
|
|
parser = argparse.ArgumentParser(description='CenterPoint AXEngine Inference') |
|
|
parser.add_argument('axmodel', help='AXModel path') |
|
|
parser.add_argument('config_json', help='JSON config file path') |
|
|
parser.add_argument('data_dir', help='extracted data directory') |
|
|
parser.add_argument('--output-dir', default='./inference_results_ax', help='output directory') |
|
|
parser.add_argument('--score-thr', type=float, default=0.1, help='score threshold') |
|
|
parser.add_argument('--num-samples', type=int, default=None, help='number of samples to process') |
|
|
parser.add_argument('--visualize', action='store_true', help='save visualization images and video') |
|
|
parser.add_argument('--fps', type=int, default=10, help='video fps') |
|
|
return parser.parse_args() |
|
|
|
|
|
|
|
|
def load_axmodel(axmodel_path): |
|
|
"""Load AXModel""" |
|
|
if axe is None: |
|
|
raise RuntimeError("axengine is not installed") |
|
|
|
|
|
providers = ['AxEngineExecutionProvider'] |
|
|
session = axe.InferenceSession(axmodel_path, providers=providers) |
|
|
return session |
|
|
|
|
|
|
|
|
def load_config(config_path): |
|
|
"""Load configuration from JSON file""" |
|
|
with open(config_path, 'r') as f: |
|
|
config = json.load(f) |
|
|
return config |
|
|
|
|
|
|
|
|
def load_sample_index(data_dir): |
|
|
"""Load sample index""" |
|
|
index_path = osp.join(data_dir, 'sample_index.json') |
|
|
with open(index_path, 'r') as f: |
|
|
sample_index = json.load(f) |
|
|
return sample_index |
|
|
|
|
|
|
|
|
def load_points(data_dir, points_path): |
|
|
"""Load point cloud data from binary file""" |
|
|
full_path = osp.join(data_dir, points_path) |
|
|
points = np.fromfile(full_path, dtype=np.float32).reshape(-1, 5) |
|
|
return points |
|
|
|
|
|
|
|
|
def load_gt(data_dir, gt_path): |
|
|
"""Load ground truth annotations""" |
|
|
full_path = osp.join(data_dir, gt_path) |
|
|
with open(full_path, 'r') as f: |
|
|
gt = json.load(f) |
|
|
return gt |
|
|
|
|
|
|
|
|
@numba.jit(nopython=True) |
|
|
def _points_to_voxel_kernel( |
|
|
points, |
|
|
voxel_size, |
|
|
coors_range, |
|
|
num_points_per_voxel, |
|
|
coor_to_voxelidx, |
|
|
voxels, |
|
|
coors, |
|
|
max_points=20, |
|
|
max_voxels=30000, |
|
|
): |
|
|
"""Voxelization kernel using numba for acceleration""" |
|
|
N = points.shape[0] |
|
|
ndim = 3 |
|
|
ndim_minus_1 = ndim - 1 |
|
|
grid_size = (coors_range[3:] - coors_range[:3]) / voxel_size |
|
|
grid_size = np.round(grid_size, 0, grid_size).astype(np.int32) |
|
|
coor = np.zeros(shape=(3,), dtype=np.int32) |
|
|
voxel_num = 0 |
|
|
failed = False |
|
|
|
|
|
for i in range(N): |
|
|
failed = False |
|
|
for j in range(ndim): |
|
|
c = np.floor((points[i, j] - coors_range[j]) / voxel_size[j]) |
|
|
if c < 0 or c >= grid_size[j]: |
|
|
failed = True |
|
|
break |
|
|
coor[ndim_minus_1 - j] = c |
|
|
if failed: |
|
|
continue |
|
|
voxelidx = coor_to_voxelidx[coor[0], coor[1], coor[2]] |
|
|
if voxelidx == -1: |
|
|
voxelidx = voxel_num |
|
|
if voxel_num >= max_voxels: |
|
|
continue |
|
|
voxel_num += 1 |
|
|
coor_to_voxelidx[coor[0], coor[1], coor[2]] = voxelidx |
|
|
coors[voxelidx] = coor |
|
|
num = num_points_per_voxel[voxelidx] |
|
|
if num < max_points: |
|
|
voxels[voxelidx, num] = points[i] |
|
|
num_points_per_voxel[voxelidx] += 1 |
|
|
return voxel_num |
|
|
|
|
|
|
|
|
def points_to_voxel(points, voxel_size, coors_range, max_points=20, max_voxels=30000): |
|
|
"""Convert point cloud to voxels |
|
|
|
|
|
Args: |
|
|
points: [N, 5] float32 array (x, y, z, intensity, time_lag) |
|
|
voxel_size: [3] voxel size (x, y, z) |
|
|
coors_range: [6] point cloud range (xmin, ymin, zmin, xmax, ymax, zmax) |
|
|
max_points: max points per voxel |
|
|
max_voxels: max number of voxels |
|
|
|
|
|
Returns: |
|
|
voxels: [M, max_points, 5] voxel features |
|
|
coors: [M, 3] voxel coordinates (z, y, x) |
|
|
num_points_per_voxel: [M] number of points in each voxel |
|
|
""" |
|
|
if not isinstance(voxel_size, np.ndarray): |
|
|
voxel_size = np.array(voxel_size, dtype=np.float32) |
|
|
if not isinstance(coors_range, np.ndarray): |
|
|
coors_range = np.array(coors_range, dtype=np.float32) |
|
|
|
|
|
voxelmap_shape = (coors_range[3:] - coors_range[:3]) / voxel_size |
|
|
voxelmap_shape = tuple(np.round(voxelmap_shape).astype(np.int32).tolist()) |
|
|
voxelmap_shape = voxelmap_shape[::-1] |
|
|
|
|
|
num_points_per_voxel = np.zeros(shape=(max_voxels,), dtype=np.int32) |
|
|
coor_to_voxelidx = -np.ones(shape=voxelmap_shape, dtype=np.int32) |
|
|
voxels = np.zeros(shape=(max_voxels, max_points, points.shape[-1]), dtype=np.float32) |
|
|
coors = np.zeros(shape=(max_voxels, 3), dtype=np.int32) |
|
|
|
|
|
voxel_num = _points_to_voxel_kernel( |
|
|
points.astype(np.float32), |
|
|
voxel_size, |
|
|
coors_range, |
|
|
num_points_per_voxel, |
|
|
coor_to_voxelidx, |
|
|
voxels, |
|
|
coors, |
|
|
max_points, |
|
|
max_voxels, |
|
|
) |
|
|
|
|
|
coors = coors[:voxel_num] |
|
|
voxels = voxels[:voxel_num] |
|
|
num_points_per_voxel = num_points_per_voxel[:voxel_num] |
|
|
|
|
|
return voxels, coors, num_points_per_voxel |
|
|
|
|
|
|
|
|
def preprocess_pointpillars(points, config): |
|
|
"""Preprocess point cloud for PointPillars model""" |
|
|
voxel_cfg = config['voxel_generator'] |
|
|
voxel_size = np.array(voxel_cfg['voxel_size'], dtype=np.float32) |
|
|
pc_range = np.array(voxel_cfg['range'], dtype=np.float32) |
|
|
max_points = voxel_cfg['max_points_in_voxel'] |
|
|
max_voxels = voxel_cfg['max_voxel_num'][1] if isinstance(voxel_cfg['max_voxel_num'], list) else voxel_cfg['max_voxel_num'] |
|
|
|
|
|
|
|
|
voxels, coors, num_points = points_to_voxel( |
|
|
points, voxel_size, pc_range, max_points, max_voxels |
|
|
) |
|
|
|
|
|
return voxels, coors, num_points |
|
|
|
|
|
|
|
|
@numba.jit(nopython=True) |
|
|
def _create_pillars_input_kernel(voxels, coors, num_points, features, indices, |
|
|
voxel_size, pc_range, bev_w, num_voxels): |
|
|
"""Numba-accelerated kernel for pillar feature computation""" |
|
|
for i in range(num_voxels): |
|
|
n_points = num_points[i] |
|
|
if n_points == 0: |
|
|
continue |
|
|
|
|
|
voxel = voxels[i] |
|
|
coor = coors[i] |
|
|
|
|
|
|
|
|
x_sum = 0.0 |
|
|
y_sum = 0.0 |
|
|
z_sum = 0.0 |
|
|
for j in range(n_points): |
|
|
x_sum += voxel[j, 0] |
|
|
y_sum += voxel[j, 1] |
|
|
z_sum += voxel[j, 2] |
|
|
x_center = x_sum / n_points |
|
|
y_center = y_sum / n_points |
|
|
z_center = z_sum / n_points |
|
|
|
|
|
|
|
|
x_pillar = coor[2] * voxel_size[0] + pc_range[0] + voxel_size[0] / 2 |
|
|
y_pillar = coor[1] * voxel_size[1] + pc_range[1] + voxel_size[1] / 2 |
|
|
|
|
|
|
|
|
for j in range(n_points): |
|
|
features[0, i, j] = voxel[j, 0] |
|
|
features[1, i, j] = voxel[j, 1] |
|
|
features[2, i, j] = voxel[j, 2] |
|
|
features[3, i, j] = voxel[j, 3] |
|
|
features[4, i, j] = voxel[j, 4] |
|
|
features[5, i, j] = voxel[j, 0] - x_center |
|
|
features[6, i, j] = voxel[j, 1] - y_center |
|
|
features[7, i, j] = voxel[j, 2] - z_center |
|
|
features[8, i, j] = voxel[j, 0] - x_pillar |
|
|
features[9, i, j] = voxel[j, 1] - y_pillar |
|
|
|
|
|
|
|
|
indices[i, 1] = coor[1] * bev_w + coor[2] |
|
|
|
|
|
|
|
|
def create_pillars_input(voxels, coors, num_points, config, max_pillars=30000): |
|
|
"""Create input tensors for the PointPillars AXModel (numba-accelerated) |
|
|
|
|
|
The model expects: |
|
|
- features: [1, 10, max_pillars, max_points_per_pillar] |
|
|
- indices: [1, max_pillars, 2] |
|
|
""" |
|
|
voxel_cfg = config['voxel_generator'] |
|
|
voxel_size = np.array(voxel_cfg['voxel_size'], dtype=np.float32) |
|
|
pc_range = np.array(voxel_cfg['range'], dtype=np.float32) |
|
|
max_points_per_pillar = voxel_cfg['max_points_in_voxel'] |
|
|
|
|
|
num_voxels = voxels.shape[0] |
|
|
|
|
|
|
|
|
if num_voxels > max_pillars: |
|
|
voxels = voxels[:max_pillars] |
|
|
coors = coors[:max_pillars] |
|
|
num_points = num_points[:max_pillars] |
|
|
num_voxels = max_pillars |
|
|
|
|
|
|
|
|
features = np.zeros((10, max_pillars, max_points_per_pillar), dtype=np.float32) |
|
|
indices = np.zeros((max_pillars, 2), dtype=np.int32) |
|
|
indices[:, 0] = 0 |
|
|
indices[:, 1] = -1 |
|
|
|
|
|
|
|
|
bev_w = int((pc_range[3] - pc_range[0]) / voxel_size[0]) |
|
|
|
|
|
|
|
|
_create_pillars_input_kernel( |
|
|
voxels, coors, num_points, features, indices, |
|
|
voxel_size, pc_range, bev_w, num_voxels |
|
|
) |
|
|
|
|
|
|
|
|
features = features[np.newaxis, ...] |
|
|
indices = indices[np.newaxis, ...] |
|
|
|
|
|
return features, indices |
|
|
|
|
|
|
|
|
def decode_bbox(reg, height, dim, rot, vel, score, cls, config, task_idx): |
|
|
"""Decode detection outputs to 3D bounding boxes""" |
|
|
test_cfg = config['test_cfg'] |
|
|
voxel_size = test_cfg['voxel_size'] |
|
|
pc_range = test_cfg['pc_range'] |
|
|
out_size_factor = test_cfg['out_size_factor'] |
|
|
score_threshold = test_cfg['score_threshold'] |
|
|
|
|
|
H, W = score.shape |
|
|
|
|
|
|
|
|
xs = np.arange(W, dtype=np.float32) |
|
|
ys = np.arange(H, dtype=np.float32) |
|
|
xs, ys = np.meshgrid(xs, ys) |
|
|
|
|
|
|
|
|
xs = (xs + reg[..., 0]) * out_size_factor * voxel_size[0] + pc_range[0] |
|
|
ys = (ys + reg[..., 1]) * out_size_factor * voxel_size[1] + pc_range[1] |
|
|
zs = height[..., 0] |
|
|
|
|
|
|
|
|
theta = np.arctan2(rot[..., 0], rot[..., 1]) |
|
|
|
|
|
|
|
|
class_offset = [0, 1, 3, 5, 6, 8][task_idx] |
|
|
|
|
|
|
|
|
mask = score > score_threshold |
|
|
|
|
|
if not np.any(mask): |
|
|
return np.zeros((0, 9), dtype=np.float32), np.zeros((0,)), np.zeros((0,), dtype=np.int32) |
|
|
|
|
|
|
|
|
xs = xs[mask] |
|
|
ys = ys[mask] |
|
|
zs = zs[mask] |
|
|
dims = dim[mask] |
|
|
theta = theta[mask] |
|
|
vels = vel[mask] |
|
|
scores = score[mask] |
|
|
labels = cls[mask] + class_offset |
|
|
|
|
|
|
|
|
boxes = np.stack([ |
|
|
xs, ys, zs, |
|
|
dims[:, 2], |
|
|
dims[:, 0], |
|
|
dims[:, 1], |
|
|
theta, |
|
|
vels[:, 0], |
|
|
vels[:, 1], |
|
|
], axis=-1) |
|
|
|
|
|
return boxes.astype(np.float32), scores.astype(np.float32), labels.astype(np.int32) |
|
|
|
|
|
|
|
|
@numba.jit(nopython=True) |
|
|
def _nms_bev_kernel(boxes, scores, nms_threshold, max_output=500): |
|
|
"""Numba-accelerated NMS kernel""" |
|
|
n = len(boxes) |
|
|
if n == 0: |
|
|
return np.zeros(0, dtype=np.int64) |
|
|
|
|
|
|
|
|
order = np.argsort(-scores) |
|
|
|
|
|
|
|
|
x1 = boxes[:, 0] - boxes[:, 4] / 2 |
|
|
y1 = boxes[:, 1] - boxes[:, 3] / 2 |
|
|
x2 = boxes[:, 0] + boxes[:, 4] / 2 |
|
|
y2 = boxes[:, 1] + boxes[:, 3] / 2 |
|
|
areas = boxes[:, 3] * boxes[:, 4] |
|
|
|
|
|
suppressed = np.zeros(n, dtype=np.int32) |
|
|
keep = np.zeros(max_output, dtype=np.int64) |
|
|
num_keep = 0 |
|
|
|
|
|
for _i in range(n): |
|
|
i = order[_i] |
|
|
if suppressed[i] == 1: |
|
|
continue |
|
|
|
|
|
keep[num_keep] = i |
|
|
num_keep += 1 |
|
|
if num_keep >= max_output: |
|
|
break |
|
|
|
|
|
|
|
|
for _j in range(_i + 1, n): |
|
|
j = order[_j] |
|
|
if suppressed[j] == 1: |
|
|
continue |
|
|
|
|
|
|
|
|
ix1 = max(x1[i], x1[j]) |
|
|
iy1 = max(y1[i], y1[j]) |
|
|
ix2 = min(x2[i], x2[j]) |
|
|
iy2 = min(y2[i], y2[j]) |
|
|
|
|
|
iw = max(0.0, ix2 - ix1) |
|
|
ih = max(0.0, iy2 - iy1) |
|
|
inter = iw * ih |
|
|
|
|
|
|
|
|
union = areas[i] + areas[j] - inter |
|
|
iou = inter / max(union, 1e-6) |
|
|
|
|
|
if iou > nms_threshold: |
|
|
suppressed[j] = 1 |
|
|
|
|
|
return keep[:num_keep] |
|
|
|
|
|
|
|
|
def nms_bev(boxes, scores, labels, nms_threshold=0.2): |
|
|
"""Aligned BEV NMS (numba-accelerated)""" |
|
|
if len(boxes) == 0: |
|
|
return np.array([], dtype=np.int64) |
|
|
return _nms_bev_kernel(boxes, scores, nms_threshold) |
|
|
|
|
|
|
|
|
def postprocess(outputs, config, score_thr=0.1): |
|
|
"""Postprocess model outputs |
|
|
|
|
|
CenterPoint model output structure (42 outputs total, 7 per task, 6 tasks): |
|
|
Per task output order: |
|
|
- reg: [1, 2, 128, 128] - registration offset |
|
|
- height: [1, 1, 128, 128] - height |
|
|
- dim: [1, 3, 128, 128] - dimensions (l, h, w) |
|
|
- rot: [1, 2, 128, 128] - rotation (sin, cos) |
|
|
- vel: [1, 2, 128, 128] - velocity |
|
|
- score: [1, 128, 128] - confidence (after sigmoid) |
|
|
- cls: [1, 128, 128] - class index (after argmax) |
|
|
""" |
|
|
tasks = config['tasks'] |
|
|
num_tasks = len(tasks) |
|
|
outputs_per_task = 7 |
|
|
|
|
|
test_cfg = config['test_cfg'] |
|
|
voxel_size = test_cfg['voxel_size'] |
|
|
pc_range = test_cfg['pc_range'] |
|
|
out_size_factor = test_cfg['out_size_factor'] |
|
|
score_threshold = test_cfg['score_threshold'] |
|
|
|
|
|
all_boxes = [] |
|
|
all_scores = [] |
|
|
all_labels = [] |
|
|
|
|
|
|
|
|
class_offsets = [0, 1, 3, 5, 6, 8] |
|
|
|
|
|
for task_idx in range(num_tasks): |
|
|
base_idx = task_idx * outputs_per_task |
|
|
|
|
|
reg = outputs[base_idx + 0][0] |
|
|
height = outputs[base_idx + 1][0] |
|
|
dim = outputs[base_idx + 2][0] |
|
|
rot = outputs[base_idx + 3][0] |
|
|
vel = outputs[base_idx + 4][0] |
|
|
score = outputs[base_idx + 5][0] |
|
|
cls = outputs[base_idx + 6][0] |
|
|
|
|
|
H, W = score.shape |
|
|
|
|
|
xs = np.arange(W, dtype=np.float32) |
|
|
ys = np.arange(H, dtype=np.float32) |
|
|
xs, ys = np.meshgrid(xs, ys) |
|
|
|
|
|
center_x = (xs + reg[0]) * out_size_factor * voxel_size[0] + pc_range[0] |
|
|
center_y = (ys + reg[1]) * out_size_factor * voxel_size[1] + pc_range[1] |
|
|
center_z = height[0] |
|
|
|
|
|
dim_l = dim[0] |
|
|
dim_h = dim[1] |
|
|
dim_w = dim[2] |
|
|
|
|
|
theta = np.arctan2(rot[0], rot[1]) |
|
|
vel_x = vel[0] |
|
|
vel_y = vel[1] |
|
|
|
|
|
mask = score > score_threshold |
|
|
|
|
|
if not np.any(mask): |
|
|
continue |
|
|
|
|
|
class_offset = class_offsets[task_idx] |
|
|
|
|
|
boxes = np.stack([ |
|
|
center_x[mask], center_y[mask], center_z[mask], |
|
|
dim_w[mask], dim_l[mask], dim_h[mask], |
|
|
theta[mask], vel_x[mask], vel_y[mask], |
|
|
], axis=-1).astype(np.float32) |
|
|
|
|
|
scores_task = score[mask].astype(np.float32) |
|
|
labels_task = (cls[mask] + class_offset).astype(np.int32) |
|
|
|
|
|
if len(boxes) > 0: |
|
|
all_boxes.append(boxes) |
|
|
all_scores.append(scores_task) |
|
|
all_labels.append(labels_task) |
|
|
|
|
|
if len(all_boxes) == 0: |
|
|
return np.zeros((0, 9), dtype=np.float32), np.zeros((0,)), np.zeros((0,), dtype=np.int32) |
|
|
|
|
|
boxes = np.concatenate(all_boxes, axis=0) |
|
|
scores = np.concatenate(all_scores, axis=0) |
|
|
labels = np.concatenate(all_labels, axis=0) |
|
|
|
|
|
nms_cfg = config['test_cfg']['nms'] |
|
|
keep = nms_bev(boxes, scores, labels, nms_cfg['nms_iou_threshold']) |
|
|
|
|
|
boxes = boxes[keep] |
|
|
scores = scores[keep] |
|
|
labels = labels[keep] |
|
|
|
|
|
mask = scores > score_thr |
|
|
boxes = boxes[mask] |
|
|
scores = scores[mask] |
|
|
labels = labels[mask] |
|
|
|
|
|
max_per_img = config['test_cfg']['max_per_img'] |
|
|
if len(boxes) > max_per_img: |
|
|
topk_indices = np.argsort(-scores)[:max_per_img] |
|
|
boxes = boxes[topk_indices] |
|
|
scores = scores[topk_indices] |
|
|
labels = labels[topk_indices] |
|
|
|
|
|
return boxes, scores, labels |
|
|
|
|
|
|
|
|
CLASS_NAMES = [ |
|
|
'car', 'truck', 'construction_vehicle', 'bus', 'trailer', |
|
|
'barrier', 'motorcycle', 'bicycle', 'pedestrian', 'traffic_cone' |
|
|
] |
|
|
|
|
|
|
|
|
CLASS_COLORS_BGR = { |
|
|
0: (255, 0, 0), |
|
|
1: (0, 165, 255), |
|
|
2: (0, 0, 255), |
|
|
3: (0, 255, 255), |
|
|
4: (128, 0, 128), |
|
|
5: (255, 255, 0), |
|
|
6: (0, 0, 255), |
|
|
7: (0, 255, 0), |
|
|
8: (255, 0, 255), |
|
|
9: (0, 255, 255), |
|
|
} |
|
|
|
|
|
|
|
|
def visualize_bev(points, boxes, scores, labels, config, save_path, |
|
|
frame_idx=0, eval_range=35, conf_th=0.5): |
|
|
"""Fast BEV visualization using OpenCV (50-100x faster than matplotlib)""" |
|
|
try: |
|
|
import cv2 |
|
|
except ImportError: |
|
|
print("opencv-python not available, skipping visualization") |
|
|
return None |
|
|
|
|
|
|
|
|
img_size = 800 |
|
|
scale = img_size / (2 * eval_range) |
|
|
center = img_size // 2 |
|
|
|
|
|
|
|
|
img = np.zeros((img_size, img_size, 3), dtype=np.uint8) |
|
|
|
|
|
|
|
|
mask = (np.abs(points[:, 0]) < eval_range) & (np.abs(points[:, 1]) < eval_range) |
|
|
pts = points[mask, :3] |
|
|
|
|
|
|
|
|
close_mask = (np.abs(pts[:, 0]) < 3) & (np.abs(pts[:, 1]) < 3) |
|
|
pts = pts[~close_mask] |
|
|
|
|
|
|
|
|
dists = np.sqrt(pts[:, 0]**2 + pts[:, 1]**2) |
|
|
norm_dists = np.minimum(1.0, dists / eval_range) |
|
|
|
|
|
|
|
|
px = (center + pts[:, 0] * scale).astype(np.int32) |
|
|
py = (center - pts[:, 1] * scale).astype(np.int32) |
|
|
|
|
|
|
|
|
valid = (px >= 0) & (px < img_size) & (py >= 0) & (py < img_size) |
|
|
px, py, norm_dists = px[valid], py[valid], norm_dists[valid] |
|
|
|
|
|
|
|
|
t = norm_dists |
|
|
r = np.where(t < 0.5, 68 + t * 2 * (49 - 68), 49 + (t - 0.5) * 2 * (253 - 49)) |
|
|
g = np.where(t < 0.5, 1 + t * 2 * (104 - 1), 104 + (t - 0.5) * 2 * (231 - 104)) |
|
|
b = np.where(t < 0.5, 84 + t * 2 * (142 - 84), 142 + (t - 0.5) * 2 * (37 - 142)) |
|
|
|
|
|
|
|
|
img[py, px, 0] = b.astype(np.uint8) |
|
|
img[py, px, 1] = g.astype(np.uint8) |
|
|
img[py, px, 2] = r.astype(np.uint8) |
|
|
|
|
|
|
|
|
num_detections = sum(1 for s in scores if s >= conf_th) |
|
|
|
|
|
|
|
|
for box, score, label in zip(boxes, scores, labels): |
|
|
if score < conf_th: |
|
|
continue |
|
|
|
|
|
x, y, z, w, l, h, theta, vx, vy = box |
|
|
label_int = int(label) |
|
|
|
|
|
|
|
|
color = CLASS_COLORS_BGR.get(label_int, (255, 255, 255)) |
|
|
|
|
|
|
|
|
cx = int(center + x * scale) |
|
|
cy = int(center - y * scale) |
|
|
|
|
|
|
|
|
vis_theta = -theta - np.pi / 2 |
|
|
cos_t, sin_t = np.cos(vis_theta), np.sin(vis_theta) |
|
|
|
|
|
|
|
|
if label_int == 8: |
|
|
radius = max(3, int(max(w, l) * scale / 2)) |
|
|
cv2.circle(img, (cx, cy), radius, color, 2) |
|
|
|
|
|
head_x = int(cx + radius * cos_t) |
|
|
head_y = int(cy - radius * sin_t) |
|
|
cv2.line(img, (cx, cy), (head_x, head_y), color, 2) |
|
|
|
|
|
elif label_int == 9: |
|
|
size = max(4, int(max(w, l) * scale)) |
|
|
pts = np.array([ |
|
|
[cx, cy - size], |
|
|
[cx - size//2, cy + size//2], |
|
|
[cx + size//2, cy + size//2], |
|
|
], dtype=np.int32) |
|
|
cv2.fillPoly(img, [pts], color) |
|
|
|
|
|
elif label_int == 5: |
|
|
|
|
|
corners = np.array([ |
|
|
[l/2, w/4], [l/2, -w/4], [-l/2, -w/4], [-l/2, w/4] |
|
|
]) |
|
|
rot_corners = np.zeros_like(corners) |
|
|
rot_corners[:, 0] = corners[:, 0] * cos_t - corners[:, 1] * sin_t + x |
|
|
rot_corners[:, 1] = corners[:, 0] * sin_t + corners[:, 1] * cos_t + y |
|
|
corners_img = np.zeros((4, 2), dtype=np.int32) |
|
|
corners_img[:, 0] = (center + rot_corners[:, 0] * scale).astype(np.int32) |
|
|
corners_img[:, 1] = (center - rot_corners[:, 1] * scale).astype(np.int32) |
|
|
cv2.fillPoly(img, [corners_img], color) |
|
|
|
|
|
elif label_int in [6, 7]: |
|
|
|
|
|
corners = np.array([ |
|
|
[l/2, w/2], [l/2, -w/2], [-l/2, -w/2], [-l/2, w/2] |
|
|
]) |
|
|
rot_corners = np.zeros_like(corners) |
|
|
rot_corners[:, 0] = corners[:, 0] * cos_t - corners[:, 1] * sin_t + x |
|
|
rot_corners[:, 1] = corners[:, 0] * sin_t + corners[:, 1] * cos_t + y |
|
|
corners_img = np.zeros((4, 2), dtype=np.int32) |
|
|
corners_img[:, 0] = (center + rot_corners[:, 0] * scale).astype(np.int32) |
|
|
corners_img[:, 1] = (center - rot_corners[:, 1] * scale).astype(np.int32) |
|
|
cv2.polylines(img, [corners_img], True, color, 2) |
|
|
|
|
|
front_mid = ((corners_img[0] + corners_img[1]) // 2).astype(np.int32) |
|
|
cv2.arrowedLine(img, (cx, cy), tuple(front_mid), color, 2, tipLength=0.4) |
|
|
|
|
|
else: |
|
|
|
|
|
corners = np.array([ |
|
|
[l/2, w/2], [l/2, -w/2], [-l/2, -w/2], [-l/2, w/2] |
|
|
]) |
|
|
rot_corners = np.zeros_like(corners) |
|
|
rot_corners[:, 0] = corners[:, 0] * cos_t - corners[:, 1] * sin_t + x |
|
|
rot_corners[:, 1] = corners[:, 0] * sin_t + corners[:, 1] * cos_t + y |
|
|
corners_img = np.zeros((4, 2), dtype=np.int32) |
|
|
corners_img[:, 0] = (center + rot_corners[:, 0] * scale).astype(np.int32) |
|
|
corners_img[:, 1] = (center - rot_corners[:, 1] * scale).astype(np.int32) |
|
|
cv2.polylines(img, [corners_img], True, color, 2) |
|
|
|
|
|
front_mid = ((corners_img[0] + corners_img[1]) // 2).astype(np.int32) |
|
|
cv2.line(img, (cx, cy), tuple(front_mid), color, 2) |
|
|
|
|
|
|
|
|
cv2.putText(img, f'Frame: {frame_idx}', (10, 25), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) |
|
|
cv2.putText(img, f'Detections: {num_detections}', (10, 50), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) |
|
|
|
|
|
|
|
|
legend_y = 80 |
|
|
for cls_id, cls_name in enumerate(CLASS_NAMES): |
|
|
color = CLASS_COLORS_BGR.get(cls_id, (255, 255, 255)) |
|
|
cv2.rectangle(img, (10, legend_y), (25, legend_y + 12), color, -1) |
|
|
cv2.putText(img, cls_name, (30, legend_y + 10), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1) |
|
|
legend_y += 18 |
|
|
|
|
|
|
|
|
cv2.imwrite(save_path, img) |
|
|
return True |
|
|
|
|
|
|
|
|
def create_video_from_images(image_dir, output_video_path, fps=10): |
|
|
"""Create video from images in a directory |
|
|
|
|
|
Args: |
|
|
image_dir: directory containing images |
|
|
output_video_path: output video file path |
|
|
fps: frames per second |
|
|
""" |
|
|
try: |
|
|
import cv2 |
|
|
except ImportError: |
|
|
print("opencv-python not available, cannot create video") |
|
|
return |
|
|
|
|
|
|
|
|
image_files = sorted([f for f in os.listdir(image_dir) |
|
|
if f.endswith(('.png', '.jpg', '.jpeg'))]) |
|
|
|
|
|
if len(image_files) == 0: |
|
|
print(f"No images found in {image_dir}") |
|
|
return |
|
|
|
|
|
|
|
|
first_img = cv2.imread(osp.join(image_dir, image_files[0])) |
|
|
if first_img is None: |
|
|
print(f"Cannot read first image: {image_files[0]}") |
|
|
return |
|
|
|
|
|
height, width = first_img.shape[:2] |
|
|
|
|
|
|
|
|
max_width, max_height = 1920, 1080 |
|
|
if width > max_width or height > max_height: |
|
|
scale = min(max_width / width, max_height / height) |
|
|
width, height = int(width * scale), int(height * scale) |
|
|
|
|
|
|
|
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
|
|
video_writer = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height)) |
|
|
|
|
|
if not video_writer.isOpened(): |
|
|
|
|
|
fourcc = cv2.VideoWriter_fourcc(*'XVID') |
|
|
output_video_path = output_video_path.replace('.mp4', '.avi') |
|
|
video_writer = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height)) |
|
|
|
|
|
|
|
|
for img_file in tqdm(image_files, desc="Creating video"): |
|
|
img_path = osp.join(image_dir, img_file) |
|
|
img = cv2.imread(img_path) |
|
|
if img is not None: |
|
|
if img.shape[:2] != (height, width): |
|
|
img = cv2.resize(img, (width, height)) |
|
|
video_writer.write(img) |
|
|
|
|
|
video_writer.release() |
|
|
|
|
|
|
|
|
def run_inference(session, points, config): |
|
|
"""Run inference on a single point cloud""" |
|
|
|
|
|
voxels, coors, num_points = preprocess_pointpillars(points, config) |
|
|
|
|
|
|
|
|
features, indices = create_pillars_input(voxels, coors, num_points, config) |
|
|
|
|
|
|
|
|
input_names = [inp.name for inp in session.get_inputs()] |
|
|
|
|
|
|
|
|
feed_dict = {} |
|
|
for name in input_names: |
|
|
if name == 'input.1': |
|
|
feed_dict[name] = features.astype(np.float32) |
|
|
elif name == 'indices_input': |
|
|
feed_dict[name] = indices.astype(np.int32) |
|
|
elif 'indices' in name.lower(): |
|
|
feed_dict[name] = indices.astype(np.int32) |
|
|
else: |
|
|
feed_dict[name] = features.astype(np.float32) |
|
|
|
|
|
|
|
|
outputs = session.run(None, feed_dict) |
|
|
|
|
|
|
|
|
boxes, scores, labels = postprocess(outputs, config) |
|
|
|
|
|
return boxes, scores, labels |
|
|
|
|
|
|
|
|
def main(): |
|
|
args = parse_args() |
|
|
|
|
|
if axe is None: |
|
|
print("Error: axengine is not installed. Please install it first.") |
|
|
return |
|
|
|
|
|
|
|
|
config = load_config(args.config_json) |
|
|
session = load_axmodel(args.axmodel) |
|
|
|
|
|
|
|
|
sample_index = load_sample_index(args.data_dir) |
|
|
samples = sample_index['samples'] |
|
|
|
|
|
if args.num_samples is not None: |
|
|
samples = samples[:args.num_samples] |
|
|
|
|
|
print(f"Processing {len(samples)} samples...") |
|
|
|
|
|
|
|
|
os.makedirs(args.output_dir, exist_ok=True) |
|
|
|
|
|
|
|
|
images_dir = osp.join(args.output_dir, 'images') |
|
|
if args.visualize: |
|
|
os.makedirs(images_dir, exist_ok=True) |
|
|
|
|
|
|
|
|
all_results = [] |
|
|
|
|
|
|
|
|
for idx, sample in enumerate(tqdm(samples, desc="Inference")): |
|
|
token = sample['token'] |
|
|
|
|
|
|
|
|
points = load_points(args.data_dir, sample['points_path']) |
|
|
|
|
|
|
|
|
boxes, scores, labels = run_inference(session, points, config) |
|
|
|
|
|
|
|
|
result = { |
|
|
'token': token, |
|
|
'boxes': boxes.tolist(), |
|
|
'scores': scores.tolist(), |
|
|
'labels': labels.tolist(), |
|
|
'num_detections': len(boxes), |
|
|
} |
|
|
all_results.append(result) |
|
|
|
|
|
|
|
|
if args.visualize: |
|
|
vis_path = osp.join(images_dir, f'frame_{idx:06d}.png') |
|
|
visualize_bev(points, boxes, scores, labels, config, vis_path, frame_idx=idx, conf_th=args.score_thr) |
|
|
|
|
|
|
|
|
results_path = osp.join(args.output_dir, 'results.json') |
|
|
with open(results_path, 'w') as f: |
|
|
json.dump(all_results, f, indent=2) |
|
|
|
|
|
|
|
|
if args.visualize: |
|
|
video_path = osp.join(args.output_dir, 'centerpoint_detection_axmodel.mp4') |
|
|
create_video_from_images(images_dir, video_path, fps=args.fps) |
|
|
|
|
|
|
|
|
total_detections = sum(r['num_detections'] for r in all_results) |
|
|
print(f"Done! {len(samples)} frames, {total_detections} detections, saved to {args.output_dir}") |
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
main() |
|
|
|