centerpoint / inference_axmodel.py
fangmingguo's picture
Upload 104 files
1fcb06d verified
#!/usr/bin/env python3
"""
CenterPoint AXEngine Inference Demo
Usage:
python inference_axmodel.py ./centerpoint.axmodel ./extracted_data/config.json ./extracted_data \
--output-dir ./inference_results_ax --num-samples 10
"""
import argparse
import json
import os
import os.path as osp
import numpy as np
from tqdm import tqdm
import numba
try:
import axengine as axe
except ImportError:
print("Warning: axengine not available. Install it to use AXEngine inference.")
axe = None
def parse_args():
parser = argparse.ArgumentParser(description='CenterPoint AXEngine Inference')
parser.add_argument('axmodel', help='AXModel path')
parser.add_argument('config_json', help='JSON config file path')
parser.add_argument('data_dir', help='extracted data directory')
parser.add_argument('--output-dir', default='./inference_results_ax', help='output directory')
parser.add_argument('--score-thr', type=float, default=0.1, help='score threshold')
parser.add_argument('--num-samples', type=int, default=None, help='number of samples to process')
parser.add_argument('--visualize', action='store_true', help='save visualization images and video')
parser.add_argument('--fps', type=int, default=10, help='video fps')
return parser.parse_args()
def load_axmodel(axmodel_path):
"""Load AXModel"""
if axe is None:
raise RuntimeError("axengine is not installed")
providers = ['AxEngineExecutionProvider']
session = axe.InferenceSession(axmodel_path, providers=providers)
return session
def load_config(config_path):
"""Load configuration from JSON file"""
with open(config_path, 'r') as f:
config = json.load(f)
return config
def load_sample_index(data_dir):
"""Load sample index"""
index_path = osp.join(data_dir, 'sample_index.json')
with open(index_path, 'r') as f:
sample_index = json.load(f)
return sample_index
def load_points(data_dir, points_path):
"""Load point cloud data from binary file"""
full_path = osp.join(data_dir, points_path)
points = np.fromfile(full_path, dtype=np.float32).reshape(-1, 5)
return points
def load_gt(data_dir, gt_path):
"""Load ground truth annotations"""
full_path = osp.join(data_dir, gt_path)
with open(full_path, 'r') as f:
gt = json.load(f)
return gt
@numba.jit(nopython=True)
def _points_to_voxel_kernel(
points,
voxel_size,
coors_range,
num_points_per_voxel,
coor_to_voxelidx,
voxels,
coors,
max_points=20,
max_voxels=30000,
):
"""Voxelization kernel using numba for acceleration"""
N = points.shape[0]
ndim = 3
ndim_minus_1 = ndim - 1
grid_size = (coors_range[3:] - coors_range[:3]) / voxel_size
grid_size = np.round(grid_size, 0, grid_size).astype(np.int32)
coor = np.zeros(shape=(3,), dtype=np.int32)
voxel_num = 0
failed = False
for i in range(N):
failed = False
for j in range(ndim):
c = np.floor((points[i, j] - coors_range[j]) / voxel_size[j])
if c < 0 or c >= grid_size[j]:
failed = True
break
coor[ndim_minus_1 - j] = c
if failed:
continue
voxelidx = coor_to_voxelidx[coor[0], coor[1], coor[2]]
if voxelidx == -1:
voxelidx = voxel_num
if voxel_num >= max_voxels:
continue
voxel_num += 1
coor_to_voxelidx[coor[0], coor[1], coor[2]] = voxelidx
coors[voxelidx] = coor
num = num_points_per_voxel[voxelidx]
if num < max_points:
voxels[voxelidx, num] = points[i]
num_points_per_voxel[voxelidx] += 1
return voxel_num
def points_to_voxel(points, voxel_size, coors_range, max_points=20, max_voxels=30000):
"""Convert point cloud to voxels
Args:
points: [N, 5] float32 array (x, y, z, intensity, time_lag)
voxel_size: [3] voxel size (x, y, z)
coors_range: [6] point cloud range (xmin, ymin, zmin, xmax, ymax, zmax)
max_points: max points per voxel
max_voxels: max number of voxels
Returns:
voxels: [M, max_points, 5] voxel features
coors: [M, 3] voxel coordinates (z, y, x)
num_points_per_voxel: [M] number of points in each voxel
"""
if not isinstance(voxel_size, np.ndarray):
voxel_size = np.array(voxel_size, dtype=np.float32)
if not isinstance(coors_range, np.ndarray):
coors_range = np.array(coors_range, dtype=np.float32)
voxelmap_shape = (coors_range[3:] - coors_range[:3]) / voxel_size
voxelmap_shape = tuple(np.round(voxelmap_shape).astype(np.int32).tolist())
voxelmap_shape = voxelmap_shape[::-1] # reverse to (z, y, x)
num_points_per_voxel = np.zeros(shape=(max_voxels,), dtype=np.int32)
coor_to_voxelidx = -np.ones(shape=voxelmap_shape, dtype=np.int32)
voxels = np.zeros(shape=(max_voxels, max_points, points.shape[-1]), dtype=np.float32)
coors = np.zeros(shape=(max_voxels, 3), dtype=np.int32)
voxel_num = _points_to_voxel_kernel(
points.astype(np.float32),
voxel_size,
coors_range,
num_points_per_voxel,
coor_to_voxelidx,
voxels,
coors,
max_points,
max_voxels,
)
coors = coors[:voxel_num]
voxels = voxels[:voxel_num]
num_points_per_voxel = num_points_per_voxel[:voxel_num]
return voxels, coors, num_points_per_voxel
def preprocess_pointpillars(points, config):
"""Preprocess point cloud for PointPillars model"""
voxel_cfg = config['voxel_generator']
voxel_size = np.array(voxel_cfg['voxel_size'], dtype=np.float32)
pc_range = np.array(voxel_cfg['range'], dtype=np.float32)
max_points = voxel_cfg['max_points_in_voxel']
max_voxels = voxel_cfg['max_voxel_num'][1] if isinstance(voxel_cfg['max_voxel_num'], list) else voxel_cfg['max_voxel_num']
# Voxelization
voxels, coors, num_points = points_to_voxel(
points, voxel_size, pc_range, max_points, max_voxels
)
return voxels, coors, num_points
@numba.jit(nopython=True)
def _create_pillars_input_kernel(voxels, coors, num_points, features, indices,
voxel_size, pc_range, bev_w, num_voxels):
"""Numba-accelerated kernel for pillar feature computation"""
for i in range(num_voxels):
n_points = num_points[i]
if n_points == 0:
continue
voxel = voxels[i]
coor = coors[i]
# Compute pillar center (vectorized sum)
x_sum = 0.0
y_sum = 0.0
z_sum = 0.0
for j in range(n_points):
x_sum += voxel[j, 0]
y_sum += voxel[j, 1]
z_sum += voxel[j, 2]
x_center = x_sum / n_points
y_center = y_sum / n_points
z_center = z_sum / n_points
# Compute pillar position
x_pillar = coor[2] * voxel_size[0] + pc_range[0] + voxel_size[0] / 2
y_pillar = coor[1] * voxel_size[1] + pc_range[1] + voxel_size[1] / 2
# Fill features
for j in range(n_points):
features[0, i, j] = voxel[j, 0] # x
features[1, i, j] = voxel[j, 1] # y
features[2, i, j] = voxel[j, 2] # z
features[3, i, j] = voxel[j, 3] # intensity
features[4, i, j] = voxel[j, 4] # time_lag
features[5, i, j] = voxel[j, 0] - x_center # x_c
features[6, i, j] = voxel[j, 1] - y_center # y_c
features[7, i, j] = voxel[j, 2] - z_center # z_c
features[8, i, j] = voxel[j, 0] - x_pillar # x_p
features[9, i, j] = voxel[j, 1] - y_pillar # y_p
# Compute BEV index
indices[i, 1] = coor[1] * bev_w + coor[2]
def create_pillars_input(voxels, coors, num_points, config, max_pillars=30000):
"""Create input tensors for the PointPillars AXModel (numba-accelerated)
The model expects:
- features: [1, 10, max_pillars, max_points_per_pillar]
- indices: [1, max_pillars, 2]
"""
voxel_cfg = config['voxel_generator']
voxel_size = np.array(voxel_cfg['voxel_size'], dtype=np.float32)
pc_range = np.array(voxel_cfg['range'], dtype=np.float32)
max_points_per_pillar = voxel_cfg['max_points_in_voxel']
num_voxels = voxels.shape[0]
# Pad or truncate to max_pillars
if num_voxels > max_pillars:
voxels = voxels[:max_pillars]
coors = coors[:max_pillars]
num_points = num_points[:max_pillars]
num_voxels = max_pillars
# Initialize tensors
features = np.zeros((10, max_pillars, max_points_per_pillar), dtype=np.float32)
indices = np.zeros((max_pillars, 2), dtype=np.int32)
indices[:, 0] = 0 # batch index
indices[:, 1] = -1 # invalid index marker
# BEV grid size
bev_w = int((pc_range[3] - pc_range[0]) / voxel_size[0])
# Call numba kernel
_create_pillars_input_kernel(
voxels, coors, num_points, features, indices,
voxel_size, pc_range, bev_w, num_voxels
)
# Add batch dimension
features = features[np.newaxis, ...] # [1, 10, max_pillars, max_points_per_pillar]
indices = indices[np.newaxis, ...] # [1, max_pillars, 2]
return features, indices
def decode_bbox(reg, height, dim, rot, vel, score, cls, config, task_idx):
"""Decode detection outputs to 3D bounding boxes"""
test_cfg = config['test_cfg']
voxel_size = test_cfg['voxel_size']
pc_range = test_cfg['pc_range']
out_size_factor = test_cfg['out_size_factor']
score_threshold = test_cfg['score_threshold']
H, W = score.shape
# Create grid
xs = np.arange(W, dtype=np.float32)
ys = np.arange(H, dtype=np.float32)
xs, ys = np.meshgrid(xs, ys)
# Decode center
xs = (xs + reg[..., 0]) * out_size_factor * voxel_size[0] + pc_range[0]
ys = (ys + reg[..., 1]) * out_size_factor * voxel_size[1] + pc_range[1]
zs = height[..., 0]
# Decode rotation
theta = np.arctan2(rot[..., 0], rot[..., 1])
# Get class offset for this task
class_offset = [0, 1, 3, 5, 6, 8][task_idx]
# Filter by score
mask = score > score_threshold
if not np.any(mask):
return np.zeros((0, 9), dtype=np.float32), np.zeros((0,)), np.zeros((0,), dtype=np.int32)
# Extract valid predictions
xs = xs[mask]
ys = ys[mask]
zs = zs[mask]
dims = dim[mask]
theta = theta[mask]
vels = vel[mask]
scores = score[mask]
labels = cls[mask] + class_offset
# Construct boxes: [x, y, z, w, l, h, theta, vx, vy]
boxes = np.stack([
xs, ys, zs,
dims[:, 2], # w
dims[:, 0], # l
dims[:, 1], # h
theta,
vels[:, 0], # vx
vels[:, 1], # vy
], axis=-1)
return boxes.astype(np.float32), scores.astype(np.float32), labels.astype(np.int32)
@numba.jit(nopython=True)
def _nms_bev_kernel(boxes, scores, nms_threshold, max_output=500):
"""Numba-accelerated NMS kernel"""
n = len(boxes)
if n == 0:
return np.zeros(0, dtype=np.int64)
# Sort by score descending
order = np.argsort(-scores)
# Pre-compute box corners
x1 = boxes[:, 0] - boxes[:, 4] / 2 # x - l/2
y1 = boxes[:, 1] - boxes[:, 3] / 2 # y - w/2
x2 = boxes[:, 0] + boxes[:, 4] / 2 # x + l/2
y2 = boxes[:, 1] + boxes[:, 3] / 2 # y + w/2
areas = boxes[:, 3] * boxes[:, 4] # w * l
suppressed = np.zeros(n, dtype=np.int32)
keep = np.zeros(max_output, dtype=np.int64)
num_keep = 0
for _i in range(n):
i = order[_i]
if suppressed[i] == 1:
continue
keep[num_keep] = i
num_keep += 1
if num_keep >= max_output:
break
# Compute IoU with remaining boxes
for _j in range(_i + 1, n):
j = order[_j]
if suppressed[j] == 1:
continue
# Compute intersection
ix1 = max(x1[i], x1[j])
iy1 = max(y1[i], y1[j])
ix2 = min(x2[i], x2[j])
iy2 = min(y2[i], y2[j])
iw = max(0.0, ix2 - ix1)
ih = max(0.0, iy2 - iy1)
inter = iw * ih
# Compute IoU
union = areas[i] + areas[j] - inter
iou = inter / max(union, 1e-6)
if iou > nms_threshold:
suppressed[j] = 1
return keep[:num_keep]
def nms_bev(boxes, scores, labels, nms_threshold=0.2):
"""Aligned BEV NMS (numba-accelerated)"""
if len(boxes) == 0:
return np.array([], dtype=np.int64)
return _nms_bev_kernel(boxes, scores, nms_threshold)
def postprocess(outputs, config, score_thr=0.1):
"""Postprocess model outputs
CenterPoint model output structure (42 outputs total, 7 per task, 6 tasks):
Per task output order:
- reg: [1, 2, 128, 128] - registration offset
- height: [1, 1, 128, 128] - height
- dim: [1, 3, 128, 128] - dimensions (l, h, w)
- rot: [1, 2, 128, 128] - rotation (sin, cos)
- vel: [1, 2, 128, 128] - velocity
- score: [1, 128, 128] - confidence (after sigmoid)
- cls: [1, 128, 128] - class index (after argmax)
"""
tasks = config['tasks']
num_tasks = len(tasks) # 6 tasks
outputs_per_task = 7 # reg, height, dim, rot, vel, score, cls
test_cfg = config['test_cfg']
voxel_size = test_cfg['voxel_size']
pc_range = test_cfg['pc_range']
out_size_factor = test_cfg['out_size_factor']
score_threshold = test_cfg['score_threshold']
all_boxes = []
all_scores = []
all_labels = []
# Class offset for each task
class_offsets = [0, 1, 3, 5, 6, 8]
for task_idx in range(num_tasks):
base_idx = task_idx * outputs_per_task
reg = outputs[base_idx + 0][0] # [2, H, W]
height = outputs[base_idx + 1][0] # [1, H, W]
dim = outputs[base_idx + 2][0] # [3, H, W]
rot = outputs[base_idx + 3][0] # [2, H, W]
vel = outputs[base_idx + 4][0] # [2, H, W]
score = outputs[base_idx + 5][0] # [H, W]
cls = outputs[base_idx + 6][0] # [H, W]
H, W = score.shape
xs = np.arange(W, dtype=np.float32)
ys = np.arange(H, dtype=np.float32)
xs, ys = np.meshgrid(xs, ys)
center_x = (xs + reg[0]) * out_size_factor * voxel_size[0] + pc_range[0]
center_y = (ys + reg[1]) * out_size_factor * voxel_size[1] + pc_range[1]
center_z = height[0]
dim_l = dim[0] # length
dim_h = dim[1] # height
dim_w = dim[2] # width
theta = np.arctan2(rot[0], rot[1])
vel_x = vel[0]
vel_y = vel[1]
mask = score > score_threshold
if not np.any(mask):
continue
class_offset = class_offsets[task_idx]
boxes = np.stack([
center_x[mask], center_y[mask], center_z[mask],
dim_w[mask], dim_l[mask], dim_h[mask],
theta[mask], vel_x[mask], vel_y[mask],
], axis=-1).astype(np.float32)
scores_task = score[mask].astype(np.float32)
labels_task = (cls[mask] + class_offset).astype(np.int32)
if len(boxes) > 0:
all_boxes.append(boxes)
all_scores.append(scores_task)
all_labels.append(labels_task)
if len(all_boxes) == 0:
return np.zeros((0, 9), dtype=np.float32), np.zeros((0,)), np.zeros((0,), dtype=np.int32)
boxes = np.concatenate(all_boxes, axis=0)
scores = np.concatenate(all_scores, axis=0)
labels = np.concatenate(all_labels, axis=0)
nms_cfg = config['test_cfg']['nms']
keep = nms_bev(boxes, scores, labels, nms_cfg['nms_iou_threshold'])
boxes = boxes[keep]
scores = scores[keep]
labels = labels[keep]
mask = scores > score_thr
boxes = boxes[mask]
scores = scores[mask]
labels = labels[mask]
max_per_img = config['test_cfg']['max_per_img']
if len(boxes) > max_per_img:
topk_indices = np.argsort(-scores)[:max_per_img]
boxes = boxes[topk_indices]
scores = scores[topk_indices]
labels = labels[topk_indices]
return boxes, scores, labels
CLASS_NAMES = [
'car', 'truck', 'construction_vehicle', 'bus', 'trailer',
'barrier', 'motorcycle', 'bicycle', 'pedestrian', 'traffic_cone'
]
# BGR colors
CLASS_COLORS_BGR = {
0: (255, 0, 0), # car - blue
1: (0, 165, 255), # truck - orange
2: (0, 0, 255), # construction_vehicle - red
3: (0, 255, 255), # bus - yellow
4: (128, 0, 128), # trailer - purple
5: (255, 255, 0), # barrier - cyan
6: (0, 0, 255), # motorcycle - red
7: (0, 255, 0), # bicycle - green
8: (255, 0, 255), # pedestrian - magenta
9: (0, 255, 255), # traffic_cone - yellow
}
def visualize_bev(points, boxes, scores, labels, config, save_path,
frame_idx=0, eval_range=35, conf_th=0.5):
"""Fast BEV visualization using OpenCV (50-100x faster than matplotlib)"""
try:
import cv2
except ImportError:
print("opencv-python not available, skipping visualization")
return None
# Image size and scale
img_size = 800
scale = img_size / (2 * eval_range)
center = img_size // 2
# Create black background
img = np.zeros((img_size, img_size, 3), dtype=np.uint8)
# Filter points within range
mask = (np.abs(points[:, 0]) < eval_range) & (np.abs(points[:, 1]) < eval_range)
pts = points[mask, :3]
# Remove close points
close_mask = (np.abs(pts[:, 0]) < 3) & (np.abs(pts[:, 1]) < 3)
pts = pts[~close_mask]
# Calculate distances for coloring (viridis-like: purple->cyan->yellow)
dists = np.sqrt(pts[:, 0]**2 + pts[:, 1]**2)
norm_dists = np.minimum(1.0, dists / eval_range)
# Convert to image coordinates and draw points
px = (center + pts[:, 0] * scale).astype(np.int32)
py = (center - pts[:, 1] * scale).astype(np.int32)
# Filter valid points (within image bounds)
valid = (px >= 0) & (px < img_size) & (py >= 0) & (py < img_size)
px, py, norm_dists = px[valid], py[valid], norm_dists[valid]
# Viridis-like colormap using vectorized operations
t = norm_dists
r = np.where(t < 0.5, 68 + t * 2 * (49 - 68), 49 + (t - 0.5) * 2 * (253 - 49))
g = np.where(t < 0.5, 1 + t * 2 * (104 - 1), 104 + (t - 0.5) * 2 * (231 - 104))
b = np.where(t < 0.5, 84 + t * 2 * (142 - 84), 142 + (t - 0.5) * 2 * (37 - 142))
# Draw all points at once
img[py, px, 0] = b.astype(np.uint8)
img[py, px, 1] = g.astype(np.uint8)
img[py, px, 2] = r.astype(np.uint8)
# Count detections
num_detections = sum(1 for s in scores if s >= conf_th)
# Draw detection boxes with class-specific shapes
for box, score, label in zip(boxes, scores, labels):
if score < conf_th:
continue
x, y, z, w, l, h, theta, vx, vy = box
label_int = int(label)
# Get color for this class
color = CLASS_COLORS_BGR.get(label_int, (255, 255, 255))
# Convert center to image coordinates
cx = int(center + x * scale)
cy = int(center - y * scale)
# Apply angle transformation (same as demo_utils)
vis_theta = -theta - np.pi / 2
cos_t, sin_t = np.cos(vis_theta), np.sin(vis_theta)
# Different shapes based on class
if label_int == 8: # pedestrian - circle
radius = max(3, int(max(w, l) * scale / 2))
cv2.circle(img, (cx, cy), radius, color, 2)
# Draw heading line
head_x = int(cx + radius * cos_t)
head_y = int(cy - radius * sin_t)
cv2.line(img, (cx, cy), (head_x, head_y), color, 2)
elif label_int == 9: # traffic_cone - small triangle
size = max(4, int(max(w, l) * scale))
pts = np.array([
[cx, cy - size], # top
[cx - size//2, cy + size//2], # bottom left
[cx + size//2, cy + size//2], # bottom right
], dtype=np.int32)
cv2.fillPoly(img, [pts], color)
elif label_int == 5: # barrier - thin rectangle
# Box corners (thin barrier)
corners = np.array([
[l/2, w/4], [l/2, -w/4], [-l/2, -w/4], [-l/2, w/4]
])
rot_corners = np.zeros_like(corners)
rot_corners[:, 0] = corners[:, 0] * cos_t - corners[:, 1] * sin_t + x
rot_corners[:, 1] = corners[:, 0] * sin_t + corners[:, 1] * cos_t + y
corners_img = np.zeros((4, 2), dtype=np.int32)
corners_img[:, 0] = (center + rot_corners[:, 0] * scale).astype(np.int32)
corners_img[:, 1] = (center - rot_corners[:, 1] * scale).astype(np.int32)
cv2.fillPoly(img, [corners_img], color)
elif label_int in [6, 7]: # motorcycle, bicycle - small box with direction
# Smaller box for bikes
corners = np.array([
[l/2, w/2], [l/2, -w/2], [-l/2, -w/2], [-l/2, w/2]
])
rot_corners = np.zeros_like(corners)
rot_corners[:, 0] = corners[:, 0] * cos_t - corners[:, 1] * sin_t + x
rot_corners[:, 1] = corners[:, 0] * sin_t + corners[:, 1] * cos_t + y
corners_img = np.zeros((4, 2), dtype=np.int32)
corners_img[:, 0] = (center + rot_corners[:, 0] * scale).astype(np.int32)
corners_img[:, 1] = (center - rot_corners[:, 1] * scale).astype(np.int32)
cv2.polylines(img, [corners_img], True, color, 2)
# Draw prominent heading arrow
front_mid = ((corners_img[0] + corners_img[1]) // 2).astype(np.int32)
cv2.arrowedLine(img, (cx, cy), tuple(front_mid), color, 2, tipLength=0.4)
else: # car, truck, bus, trailer, construction_vehicle - standard box
# Box corners
corners = np.array([
[l/2, w/2], [l/2, -w/2], [-l/2, -w/2], [-l/2, w/2]
])
rot_corners = np.zeros_like(corners)
rot_corners[:, 0] = corners[:, 0] * cos_t - corners[:, 1] * sin_t + x
rot_corners[:, 1] = corners[:, 0] * sin_t + corners[:, 1] * cos_t + y
corners_img = np.zeros((4, 2), dtype=np.int32)
corners_img[:, 0] = (center + rot_corners[:, 0] * scale).astype(np.int32)
corners_img[:, 1] = (center - rot_corners[:, 1] * scale).astype(np.int32)
cv2.polylines(img, [corners_img], True, color, 2)
# Draw front indicator line
front_mid = ((corners_img[0] + corners_img[1]) // 2).astype(np.int32)
cv2.line(img, (cx, cy), tuple(front_mid), color, 2)
# Draw frame info (white text)
cv2.putText(img, f'Frame: {frame_idx}', (10, 25),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
cv2.putText(img, f'Detections: {num_detections}', (10, 50),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
# Draw legend
legend_y = 80
for cls_id, cls_name in enumerate(CLASS_NAMES):
color = CLASS_COLORS_BGR.get(cls_id, (255, 255, 255))
cv2.rectangle(img, (10, legend_y), (25, legend_y + 12), color, -1)
cv2.putText(img, cls_name, (30, legend_y + 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1)
legend_y += 18
# Save image
cv2.imwrite(save_path, img)
return True
def create_video_from_images(image_dir, output_video_path, fps=10):
"""Create video from images in a directory
Args:
image_dir: directory containing images
output_video_path: output video file path
fps: frames per second
"""
try:
import cv2
except ImportError:
print("opencv-python not available, cannot create video")
return
# Get all image files sorted by name
image_files = sorted([f for f in os.listdir(image_dir)
if f.endswith(('.png', '.jpg', '.jpeg'))])
if len(image_files) == 0:
print(f"No images found in {image_dir}")
return
# Read first image to get dimensions
first_img = cv2.imread(osp.join(image_dir, image_files[0]))
if first_img is None:
print(f"Cannot read first image: {image_files[0]}")
return
height, width = first_img.shape[:2]
# Limit video size for better compatibility
max_width, max_height = 1920, 1080
if width > max_width or height > max_height:
scale = min(max_width / width, max_height / height)
width, height = int(width * scale), int(height * scale)
# Create video writer
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
video_writer = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
if not video_writer.isOpened():
# Try alternative codec
fourcc = cv2.VideoWriter_fourcc(*'XVID')
output_video_path = output_video_path.replace('.mp4', '.avi')
video_writer = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
for img_file in tqdm(image_files, desc="Creating video"):
img_path = osp.join(image_dir, img_file)
img = cv2.imread(img_path)
if img is not None:
if img.shape[:2] != (height, width):
img = cv2.resize(img, (width, height))
video_writer.write(img)
video_writer.release()
def run_inference(session, points, config):
"""Run inference on a single point cloud"""
# Preprocess
voxels, coors, num_points = preprocess_pointpillars(points, config)
# Create model input
features, indices = create_pillars_input(voxels, coors, num_points, config)
# Get input names
input_names = [inp.name for inp in session.get_inputs()]
# Build feed dict based on exact input names
feed_dict = {}
for name in input_names:
if name == 'input.1':
feed_dict[name] = features.astype(np.float32)
elif name == 'indices_input':
feed_dict[name] = indices.astype(np.int32)
elif 'indices' in name.lower():
feed_dict[name] = indices.astype(np.int32)
else:
feed_dict[name] = features.astype(np.float32)
# Run inference
outputs = session.run(None, feed_dict)
# Postprocess
boxes, scores, labels = postprocess(outputs, config)
return boxes, scores, labels
def main():
args = parse_args()
if axe is None:
print("Error: axengine is not installed. Please install it first.")
return
# Load config and model
config = load_config(args.config_json)
session = load_axmodel(args.axmodel)
# Load sample index
sample_index = load_sample_index(args.data_dir)
samples = sample_index['samples']
if args.num_samples is not None:
samples = samples[:args.num_samples]
print(f"Processing {len(samples)} samples...")
# Create output directory
os.makedirs(args.output_dir, exist_ok=True)
# Create images directory for visualization
images_dir = osp.join(args.output_dir, 'images')
if args.visualize:
os.makedirs(images_dir, exist_ok=True)
# Results storage
all_results = []
# Process each sample
for idx, sample in enumerate(tqdm(samples, desc="Inference")):
token = sample['token']
# Load point cloud
points = load_points(args.data_dir, sample['points_path'])
# Run inference
boxes, scores, labels = run_inference(session, points, config)
# Store results
result = {
'token': token,
'boxes': boxes.tolist(),
'scores': scores.tolist(),
'labels': labels.tolist(),
'num_detections': len(boxes),
}
all_results.append(result)
# Visualize if requested
if args.visualize:
vis_path = osp.join(images_dir, f'frame_{idx:06d}.png')
visualize_bev(points, boxes, scores, labels, config, vis_path, frame_idx=idx, conf_th=args.score_thr)
# Save results
results_path = osp.join(args.output_dir, 'results.json')
with open(results_path, 'w') as f:
json.dump(all_results, f, indent=2)
# Create video from images
if args.visualize:
video_path = osp.join(args.output_dir, 'centerpoint_detection_axmodel.mp4')
create_video_from_images(images_dir, video_path, fps=args.fps)
# Print summary
total_detections = sum(r['num_detections'] for r in all_results)
print(f"Done! {len(samples)} frames, {total_detections} detections, saved to {args.output_dir}")
if __name__ == '__main__':
main()