File size: 37,846 Bytes
224aed4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 |
#!/usr/bin/env python3
import argparse
import json
import os
import os.path as osp
import cv2
import numpy as np
import axengine as axe
from collections import defaultdict
from tqdm import tqdm
def parse_args():
parser = argparse.ArgumentParser(description='BEVFormer AXEngine Inference from Extracted Data')
parser.add_argument('model', help='AXModel path')
parser.add_argument('config_json', help='JSON config file path')
parser.add_argument('data_dir', help='extracted data directory (extracted_data)')
parser.add_argument('--output-dir', default='./inference_results_extracted', help='output directory')
parser.add_argument('--score-thr', type=float, default=0.1, help='score threshold')
parser.add_argument('--fps', type=int, default=3, help='video fps')
parser.add_argument('--start-scene', type=int, default=0, help='start scene index')
parser.add_argument('--end-scene', type=int, default=None, help='end scene index (None for all)')
return parser.parse_args()
def load_axmodel(axmodel_path):
"""Load AXModel"""
# 尝试使用 AxEngineExecutionProvider 而不是 AXCLRTExecutionProvider
providers = ['AxEngineExecutionProvider']
session = axe.InferenceSession(axmodel_path, providers=providers)
return session
def load_config_from_json(config_path):
"""Load configuration from JSON file"""
with open(config_path, 'r') as f:
config = json.load(f)
return config
def preprocess_image(img_path, img_norm_cfg, target_size=(480, 800)):
"""Preprocess image: load, resize, normalize
Args:
img_path: path to image file
img_norm_cfg: normalization config with 'mean', 'std', 'to_rgb'
target_size: (H, W) target size
Returns:
img: (C, H, W) normalized numpy array, float32
"""
# Load image
img = cv2.imread(img_path)
if img is None:
raise ValueError(f"Cannot load image: {img_path}")
# Convert BGR to RGB if needed
if img_norm_cfg.get('to_rgb', True):
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
# Resize if needed
if img.shape[:2] != target_size:
img = cv2.resize(img, (target_size[1], target_size[0])) # (W, H)
# Convert to float and normalize
img = img.astype(np.float32)
mean = np.array(img_norm_cfg.get('mean', [123.675, 116.28, 103.53]), dtype=np.float32)
std = np.array(img_norm_cfg.get('std', [58.395, 57.12, 57.375]), dtype=np.float32)
img = (img - mean) / std
img = img.transpose(2, 0, 1) # (H, W, C) -> (C, H, W)
return img
def load_data(data_dir, scene_name, frame_idx):
"""Load data
Args:
data_dir: data directory path
scene_name: scene name (scene token)
frame_idx: frame index (sample index)
Returns:
img: (1, N, C, H, W) numpy array
lidar2img: (1, N, 4, 4) numpy array
can_bus: (1, 18) numpy array
meta: dict with metadata
"""
scene_dir = osp.join(data_dir, scene_name)
# Load meta
meta_path = osp.join(scene_dir, f'meta_{frame_idx:06d}.json')
with open(meta_path, 'r') as f:
meta = json.load(f)
# Get normalization config
img_norm_cfg = meta.get('img_norm_cfg', {
'mean': [123.675, 116.28, 103.53],
'std': [58.395, 57.12, 57.375],
'to_rgb': True
})
# Get image shape
img_shape = meta.get('img_shape', [[480, 800, 3]] * 6)
target_size = (img_shape[0][0], img_shape[0][1]) # (H, W)
# Load images for all cameras
num_cams = meta.get('num_cams', 6)
imgs = []
for cam_idx in range(num_cams):
img_path = osp.join(scene_dir, f'cam_{cam_idx:02d}_{frame_idx:06d}.png')
img = preprocess_image(img_path, img_norm_cfg, target_size)
imgs.append(img)
# Stack images: (N, C, H, W) -> (1, N, C, H, W)
img = np.stack(imgs, axis=0) # (N, C, H, W)
img = img[np.newaxis, ...] # (1, N, C, H, W)
# Load lidar2img: (N, 4, 4) -> (1, N, 4, 4)
lidar2img = np.array(meta['lidar2img'], dtype=np.float32) # (N, 4, 4)
lidar2img = lidar2img[np.newaxis, ...] # (1, N, 4, 4)
# Load can_bus: (18,) -> (1, 18)
can_bus = np.array(meta['can_bus'], dtype=np.float32) # (18,)
can_bus = can_bus[np.newaxis, ...] # (1, 18)
return img, lidar2img, can_bus, meta
CLASS_COLORS = {
0: (0, 255, 0), 1: (255, 255, 0), 2: (0, 0, 255), 3: (0, 165, 255),
4: (255, 0, 255), 5: (0, 255, 255), 6: (128, 0, 128), 7: (255, 165, 0),
8: (0, 0, 255), 9: (128, 128, 128),
}
def denormalize_bbox_np(normalized_bboxes, pc_range):
"""Denormalize bbox using numpy only"""
# rotation
rot_sine = normalized_bboxes[..., 6:7]
rot_cosine = normalized_bboxes[..., 7:8]
rot = np.arctan2(rot_sine, rot_cosine)
# center in the bev
cx = normalized_bboxes[..., 0:1]
cy = normalized_bboxes[..., 1:2]
cz = normalized_bboxes[..., 4:5]
# size
w = normalized_bboxes[..., 2:3]
l = normalized_bboxes[..., 3:4]
h = normalized_bboxes[..., 5:6]
w = np.exp(w)
l = np.exp(l)
h = np.exp(h)
if normalized_bboxes.shape[-1] > 8:
# velocity
vx = normalized_bboxes[:, 8:9]
vy = normalized_bboxes[:, 9:10]
denormalized_bboxes = np.concatenate([cx, cy, cz, w, l, h, rot, vx, vy], axis=-1)
else:
denormalized_bboxes = np.concatenate([cx, cy, cz, w, l, h, rot], axis=-1)
return denormalized_bboxes
def decode_bboxes_custom_np(all_cls_scores, all_bbox_preds, pc_range, post_center_range, max_num=100, score_threshold=None, num_classes=10):
"""Custom bbox decode function"""
# Use output from the last decoder layer
all_cls_scores = all_cls_scores[-1] # (bs, num_query, num_classes)
all_bbox_preds = all_bbox_preds[-1] # (bs, num_query, 10)
batch_size = all_cls_scores.shape[0]
predictions_list = []
for i in range(batch_size):
cls_scores = all_cls_scores[i] # (num_query, num_classes)
bbox_preds = all_bbox_preds[i] # (num_query, 10)
# Apply sigmoid
cls_scores = 1.0 / (1.0 + np.exp(-cls_scores))
# TopK selection
cls_scores_flat = cls_scores.reshape(-1)
topk_indices = np.argsort(cls_scores_flat)[::-1][:max_num]
scores = cls_scores_flat[topk_indices]
labels = topk_indices % num_classes
bbox_index = topk_indices // num_classes
bbox_preds = bbox_preds[bbox_index]
# Denormalize bbox
final_box_preds = denormalize_bbox_np(bbox_preds, pc_range) # (max_num, 9)
final_scores = scores
final_preds = labels
# Apply score threshold
if score_threshold is not None:
thresh_mask = final_scores > score_threshold
tmp_score = score_threshold
while thresh_mask.sum() == 0:
tmp_score *= 0.9
if tmp_score < 0.01:
thresh_mask = np.ones(len(final_scores), dtype=bool)
break
thresh_mask = final_scores >= tmp_score
else:
thresh_mask = np.ones(len(final_scores), dtype=bool)
# Apply post processing range filtering
if post_center_range is not None:
post_center_range_arr = np.array(post_center_range)
mask = (final_box_preds[..., :3] >= post_center_range_arr[:3]).all(1)
mask &= (final_box_preds[..., :3] <= post_center_range_arr[3:]).all(1)
mask &= thresh_mask
boxes3d = final_box_preds[mask]
scores = final_scores[mask]
labels = final_preds[mask]
else:
boxes3d = final_box_preds[thresh_mask]
scores = final_scores[thresh_mask]
labels = final_preds[thresh_mask]
predictions_list.append({
'bboxes': boxes3d,
'scores': scores,
'labels': labels
})
return predictions_list
def get_bboxes_custom_np(preds_dicts, pc_range, post_center_range, max_num=100, score_threshold=None, num_classes=10):
"""Custom get_bboxes function"""
# Decode bounding boxes
preds_list = decode_bboxes_custom_np(
preds_dicts['all_cls_scores'],
preds_dicts['all_bbox_preds'],
pc_range,
post_center_range,
max_num,
score_threshold,
num_classes
)
num_samples = len(preds_list)
ret_list = []
for i in range(num_samples):
preds = preds_list[i]
bboxes = preds['bboxes']
if len(bboxes) == 0:
ret_list.append((
np.zeros((0, 9), dtype=np.float32),
np.zeros((0,), dtype=np.float32),
np.zeros((0,), dtype=np.int64)
))
continue
# Adjust z coordinate: convert center z to bottom center z
bboxes[:, 2] = bboxes[:, 2] - bboxes[:, 5] * 0.5
# Shrink box dimensions: multiply w, l, h by 0.9 to fix oversized boxes
bboxes[:, 3:6] = bboxes[:, 3:6] * 0.9
scores = preds['scores']
labels = preds['labels']
ret_list.append((bboxes, scores, labels))
return ret_list
def format_bbox_result_np(bboxes, scores, labels):
return {
'boxes_3d': bboxes,
'scores_3d': scores,
'labels_3d': labels
}
def rotation_3d_in_axis_np(points, angles, axis=2):
"""Rotate points by angles according to axis"""
rot_sin = np.sin(angles)
rot_cos = np.cos(angles)
ones = np.ones_like(rot_cos)
zeros = np.zeros_like(rot_cos)
if axis == 2 or axis == -1:
# Rotate around z-axis
# Build rotation matrix: (N, 3, 3)
N = len(angles)
rot_mat = np.zeros((N, 3, 3), dtype=points.dtype)
rot_mat[:, 0, 0] = rot_cos
rot_mat[:, 0, 1] = -rot_sin
rot_mat[:, 0, 2] = zeros
rot_mat[:, 1, 0] = rot_sin
rot_mat[:, 1, 1] = rot_cos
rot_mat[:, 1, 2] = zeros
rot_mat[:, 2, 0] = zeros
rot_mat[:, 2, 1] = zeros
rot_mat[:, 2, 2] = ones
# Rotation: (N, M, 3) @ (N, 3, 3) -> (N, M, 3)
return np.einsum('aij,ajk->aik', points, rot_mat)
else:
raise ValueError(f'Only axis=2 (z-axis) is supported for LiDAR boxes')
def compute_bbox_corners_np(bboxes):
"""Compute 8 corners of 3D bbox"""
if len(bboxes) == 0:
return np.zeros((0, 8, 3), dtype=np.float32)
dtype = bboxes.dtype
# Extract bbox parameters
centers = bboxes[:, :3] # (N, 3) [x, y, z] - the bottom center
w = bboxes[:, 3:4] # width (y direction)
l = bboxes[:, 4:5] # length (x direction)
h = bboxes[:, 5:6] # height (z direction)
dims = np.concatenate([l, w, h], axis=1) # (N, 3) [x_size, y_size, z_size] = [l, w, h]
yaws = bboxes[:, 6] # (N,) yaw angle
# Fix: offset yaw by -80 degrees
yaws = yaws - (np.pi / 2.0 - np.pi / 18.0)
# Generate corners
corners_norm = np.stack(np.unravel_index(np.arange(8), [2] * 3), axis=1).astype(dtype)
# Rearrange to [0, 1, 3, 2, 4, 5, 7, 6]
corners_norm = corners_norm[[0, 1, 3, 2, 4, 5, 7, 6]]
# Use relative origin [0.5, 0.5, 0] (bottom center)
corners_norm = corners_norm - np.array([0.5, 0.5, 0], dtype=dtype)
# Scale corners: dims is [x_size, y_size, z_size]
corners = dims[:, np.newaxis, :] * corners_norm[np.newaxis, :, :] # (N, 8, 3)
# Rotate around z-axis
corners = rotation_3d_in_axis_np(corners, yaws, axis=2)
# Translate to center point
corners += centers[:, np.newaxis, :]
return corners
def draw_bbox3d_on_img_custom_np(bboxes, raw_img, lidar2img_rt, color=(0, 255, 0), thickness=2):
"""Custom 3D bbox drawing"""
img = raw_img.copy()
if len(bboxes) == 0:
return img
if not isinstance(bboxes, np.ndarray):
bboxes = np.array(bboxes)
if not isinstance(lidar2img_rt, np.ndarray):
lidar2img_rt = np.array(lidar2img_rt)
lidar2img_rt = lidar2img_rt.reshape(4, 4)
# Compute corners
corners_3d = compute_bbox_corners_np(bboxes) # (N, 8, 3)
num_bbox = corners_3d.shape[0]
# Project to 2D
corners_3d_flat = corners_3d.reshape(-1, 3) # (N*8, 3)
ones = np.ones((corners_3d_flat.shape[0], 1), dtype=np.float32)
pts_4d = np.concatenate([corners_3d_flat, ones], axis=-1) # (N*8, 4)
# Project
pts_2d = pts_4d @ lidar2img_rt.T # (N*8, 4)
# Perspective division
pts_2d[:, 2] = np.clip(pts_2d[:, 2], a_min=1e-5, a_max=1e5)
pts_2d[:, 0] /= pts_2d[:, 2]
pts_2d[:, 1] /= pts_2d[:, 2]
imgfov_pts_2d = pts_2d[:, :2].reshape(num_bbox, 8, 2)
line_indices = ((0, 1), (0, 3), (0, 4), (1, 2), (1, 5), (3, 2), (3, 7),
(4, 5), (4, 7), (2, 6), (5, 6), (6, 7))
for i in range(num_bbox):
corners = imgfov_pts_2d[i].astype(np.int32)
for start, end in line_indices:
pt1 = (int(corners[start, 0]), int(corners[start, 1]))
pt2 = (int(corners[end, 0]), int(corners[end, 1]))
# Check if points are within image range
h, w = img.shape[:2]
if (0 <= pt1[0] < w and 0 <= pt1[1] < h) or (0 <= pt2[0] < w and 0 <= pt2[1] < h):
cv2.line(img, pt1, pt2, color, thickness, cv2.LINE_AA)
return img.astype(np.uint8)
def post_process_outputs_np(all_cls_scores, all_bbox_preds, config, score_thr=0.1):
bbox_coder = config['model']['bbox_coder']
pc_range = bbox_coder['pc_range']
post_center_range = bbox_coder['post_center_range']
max_num = bbox_coder['max_num']
score_threshold = bbox_coder.get('score_threshold', None)
num_classes = bbox_coder['num_classes']
preds_dicts = {
'all_cls_scores': all_cls_scores,
'all_bbox_preds': all_bbox_preds
}
bbox_list = get_bboxes_custom_np(
preds_dicts, pc_range, post_center_range,
max_num, score_threshold, num_classes
)
results = []
for bboxes, scores, labels in bbox_list:
# Set class score thresholds
class_score_thrs = {
0: 0.3, # Car
1: 0.3, # Truck
2: 0.3, # Construction vehicle
3: 0.3, # Bus
4: 0.3, # Trailer
5: 0.3, # Barrier
6: 0.3, # Motorcycle
7: 0.3, # Bicycle
8: 0.3, # Pedestrian
9: 0.3, # Traffic cone
}
default_thr = score_thr
keep_indices = []
for i in range(len(scores)):
cls_id = int(labels[i])
thr = class_score_thrs.get(cls_id, default_thr)
if scores[i] > thr:
keep_indices.append(i)
if len(keep_indices) == 0:
results.append(format_bbox_result_np(
np.zeros((0, 9), dtype=np.float32),
np.zeros((0,), dtype=np.float32),
np.zeros((0,), dtype=np.int64)
))
continue
keep_indices = np.array(keep_indices, dtype=np.int64)
bboxes = bboxes[keep_indices]
scores = scores[keep_indices]
labels = labels[keep_indices]
# Circle NMS
dist_thrs = {
0: 2.0, 1: 3.0, 2: 2.5, 3: 4.0, 4: 3.0,
5: 1.0, 6: 1.5, 7: 1.0, 8: 0.5, 9: 0.3,
}
if len(scores) > 0:
keep_nms = circle_nms_np(bboxes, scores, labels, dist_thrs)
if len(keep_nms) > 0:
bboxes = bboxes[keep_nms]
scores = scores[keep_nms]
labels = labels[keep_nms]
else:
results.append(format_bbox_result_np(
np.zeros((0, 9), dtype=np.float32),
np.zeros((0,), dtype=np.float32),
np.zeros((0,), dtype=np.int64)
))
continue
results.append(format_bbox_result_np(bboxes, scores, labels))
return results
def circle_nms_np(bboxes, scores, labels, dist_thrs):
if len(bboxes) == 0:
return np.array([], dtype=np.int64)
keep = []
order = np.argsort(scores)[::-1]
bboxes = bboxes[order]
scores = scores[order]
labels = labels[order]
pts = bboxes[:, :2]
labels_np = labels
suppressed = np.zeros(len(bboxes), dtype=bool)
for i in range(len(bboxes)):
if suppressed[i]:
continue
keep.append(order[i])
curr_cls = int(labels_np[i])
radius = dist_thrs.get(curr_cls, 1.0)
if i + 1 < len(bboxes):
dists = np.linalg.norm(pts[i+1:] - pts[i], axis=1)
idx_to_suppress = np.where(
(dists < radius) & (labels_np[i+1:] == curr_cls)
)[0]
suppressed[i+1:][idx_to_suppress] = True
return np.array(keep, dtype=np.int64)
def denormalize_img_np(img_array, img_norm_cfg):
"""Denormalize image array (C, H, W) to (H, W, C) BGR"""
mean = np.array(img_norm_cfg.get('mean', [123.675, 116.28, 103.53]))
std = np.array(img_norm_cfg.get('std', [58.395, 57.12, 57.375]))
# (C, H, W) RGB -> (H, W, C) RGB
if img_array.ndim == 3:
img = img_array.transpose(1, 2, 0)
else:
img = img_array
img = (img * std + mean)
img = np.clip(img, 0, 255).astype(np.uint8)
img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
return img
def draw_bev_map(bboxes, labels, scores, pc_range, bev_size=(800, 800), score_thr=0.1):
"""Draw BEV (Bird's Eye View) map with detections
Args:
bboxes: (N, 9) numpy array, format: [x, y, z, w, l, h, yaw, vx, vy]
labels: (N,) numpy array, class labels
scores: (N,) numpy array, detection scores
pc_range: [x_min, y_min, z_min, x_max, y_max, z_max]
bev_size: (width, height) of BEV image
score_thr: score threshold
Returns:
bev_img: (H, W, 3) numpy array, BEV visualization
"""
bev_w, bev_h = bev_size # BEV image size
bev_img = np.ones((bev_h, bev_w, 3), dtype=np.uint8) * 255 # White background
# Draw grid
x_min, y_min, z_min, x_max, y_max, z_max = pc_range
x_range = x_max - x_min
y_range = y_max - y_min
# Draw grid lines
grid_color = (200, 200, 200) # Light gray grid lines
for i in range(-5, 6):
x = x_min + (i + 5) * x_range / 10
y = y_min + (i + 5) * y_range / 10
# Vertical lines (y direction in LiDAR -> x direction in image)
img_x = int((y - y_min) / y_range * bev_w)
if 0 <= img_x < bev_w:
cv2.line(bev_img, (img_x, 0), (img_x, bev_h), grid_color, 1)
# Horizontal lines (x direction in LiDAR -> y direction in image, flipped)
img_y = int((x_max - x) / x_range * bev_h)
if 0 <= img_y < bev_h:
cv2.line(bev_img, (0, img_y), (bev_w, img_y), grid_color, 1)
# Draw center lines (ego vehicle position) - darker on white background
center_x = int((0 - y_min) / y_range * bev_w)
center_y = int((x_max - 0) / x_range * bev_h)
cv2.line(bev_img, (center_x, 0), (center_x, bev_h), (150, 150, 150), 2)
cv2.line(bev_img, (0, center_y), (bev_w, center_y), (150, 150, 150), 2)
ego_length_px = 30 # pixels (representing ~4.5m, along x-axis rightward)
ego_width_px = 12 # pixels (representing ~1.8m, along y-axis downward)
ego_corners_local = np.array([
[ego_length_px//2, -ego_width_px//2], # front-top (head)
[ego_length_px//2, ego_width_px//2], # front-bottom
[-ego_length_px//2, ego_width_px//2], # back-bottom
[-ego_length_px//2, -ego_width_px//2], # back-top
], dtype=np.float32)
rotation_angle_90 = np.pi / 2 # 90 degrees in radians
cos_rot_90 = np.cos(rotation_angle_90)
sin_rot_90 = np.sin(rotation_angle_90)
rot_mat_90 = np.array([[cos_rot_90, -sin_rot_90], [sin_rot_90, cos_rot_90]])
ego_corners_rotated_90 = ego_corners_local @ rot_mat_90.T
ego_corners_rotated = ego_corners_rotated_90 @ rot_mat_90.T
# Translate to image coordinates (center position)
ego_corners = []
for corner in ego_corners_rotated:
corner_img_x = int(center_x + corner[0])
corner_img_y = int(center_y + corner[1])
ego_corners.append([corner_img_x, corner_img_y])
ego_corners = np.array(ego_corners, dtype=np.int32)
# Draw filled rectangle
cv2.fillPoly(bev_img, [ego_corners], (0, 0, 255)) # Red filled
cv2.polylines(bev_img, [ego_corners], True, (0, 0, 0), 2) # Black outline
arrow_length = ego_length_px // 2
initial_direction = np.array([1.0, 0.0])
arrow_dir_rotated_90 = initial_direction @ rot_mat_90.T
arrow_dir_rotated = arrow_dir_rotated_90 @ rot_mat_90.T
arrow_end_x = int(center_x + arrow_length * arrow_dir_rotated[0])
arrow_end_y = int(center_y + arrow_length * arrow_dir_rotated[1])
cv2.arrowedLine(bev_img, (center_x, center_y), (arrow_end_x, arrow_end_y),
(0, 0, 0), 3, tipLength=0.3) # Black arrow
if len(bboxes) == 0:
return bev_img
if score_thr > 0:
mask = scores > score_thr
bboxes = bboxes[mask]
labels = labels[mask]
scores = scores[mask]
if len(bboxes) == 0:
return bev_img
default_color = (255, 255, 255)
for i in range(len(bboxes)):
box = bboxes[i]
label = int(labels[i])
score = float(scores[i])
color = CLASS_COLORS.get(label, default_color)
x, y, z = box[0], box[1], box[2] # center position
w, l, h = box[3], box[4], box[5] # width, length, height
yaw = box[6] # yaw angle
yaw = yaw - np.pi / 2.0 # Subtract 90 degrees (counterclockwise)
# Convert to image coordinates
# Note: In LiDAR coordinate, x is forward, y is left, z is up
# In BEV image (top-down view):
# - x (forward) -> image y (downward, flipped)
# - y (left) -> image x (rightward)
# So: img_x = (y - y_min) / y_range * bev_w
# img_y = (x_max - x) / x_range * bev_h (flip x to get top-down view)
img_x = int((y - y_min) / y_range * bev_w)
img_y = int((x_max - x) / x_range * bev_h) # Flip x for top-down view
# Skip if outside image
if not (0 <= img_x < bev_w and 0 <= img_y < bev_h):
continue
# Calculate box dimensions in image space
box_w_px = int(w / x_range * bev_w)
box_l_px = int(l / y_range * bev_h)
# Draw rotated rectangle
# Calculate 4 corners of the box in LiDAR coordinates
cos_yaw = np.cos(yaw)
sin_yaw = np.sin(yaw)
# Box corners relative to center (in LiDAR frame: x forward, y left)
corners_local = np.array([
[l/2, w/2], # front-right
[l/2, -w/2], # front-left
[-l/2, -w/2], # back-left
[-l/2, w/2] # back-right
])
# Rotate corners
rot_mat = np.array([[cos_yaw, -sin_yaw], [sin_yaw, cos_yaw]])
corners_rotated = corners_local @ rot_mat.T
# Translate to world coordinates and convert to image space
corners_img = []
for corner in corners_rotated:
corner_x = x + corner[0] # x in LiDAR (forward)
corner_y = y + corner[1] # y in LiDAR (left)
corner_img_x = int((corner_y - y_min) / y_range * bev_w) # y -> img_x
corner_img_y = int((x_max - corner_x) / x_range * bev_h) # x -> img_y (flipped)
corners_img.append([corner_img_x, corner_img_y])
corners_img = np.array(corners_img, dtype=np.int32)
# Draw filled polygon (semi-transparent on white background)
overlay = bev_img.copy()
cv2.fillPoly(overlay, [corners_img], color)
cv2.addWeighted(overlay, 0.5, bev_img, 0.5, 0, bev_img)
# Draw outline (black on white background)
cv2.polylines(bev_img, [corners_img], True, (0, 0, 0), 2)
# Draw direction arrow (forward direction) - black on white
# In LiDAR: forward is +x, left is +y
# In BEV image: x -> img_y (flipped), y -> img_x
# So rotation: img_x += sin(yaw) * length, img_y -= cos(yaw) * length
arrow_length = max(box_l_px // 2, 10)
arrow_end_x = int(img_x + arrow_length * sin_yaw) # y component -> img_x
arrow_end_y = int(img_y - arrow_length * cos_yaw) # x component -> img_y (flipped)
cv2.arrowedLine(bev_img, (img_x, img_y), (arrow_end_x, arrow_end_y),
(0, 0, 0), 2, tipLength=0.3) # Black arrow
# Draw center point
cv2.circle(bev_img, (img_x, img_y), 3, (0, 0, 0), -1) # Black center point
# Rotate BEV map counterclockwise by 90 degrees (map only, not text)
center = (bev_w // 2, bev_h // 2)
rotation_matrix = cv2.getRotationMatrix2D(center, 90, 1.0) # 90 degrees counterclockwise
bev_img = cv2.warpAffine(bev_img, rotation_matrix, (bev_w, bev_h), borderValue=(255, 255, 255))
# Flip horizontally to fix mirror effect
bev_img = cv2.flip(bev_img, 1) # 1 for horizontal flip
text = 'BEV Map'
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 1
thickness = 2
(text_width, text_height), baseline = cv2.getTextSize(text, font, font_scale, thickness)
text_x = bev_w - text_width - 10
text_y = text_height + 10
cv2.putText(bev_img, text, (text_x, text_y), font, font_scale, (0, 0, 0), thickness)
return bev_img
def visualize_results_np(img, result, lidar2img, img_norm_cfg, class_names, score_thr=0.3, pc_range=None):
num_cams = img.shape[1] if img.ndim == 5 else 1
raw_imgs = [denormalize_img_np(img[0, cam_idx], img_norm_cfg) for cam_idx in range(num_cams)]
boxes_3d = result.get('boxes_3d')
scores_3d = result.get('scores_3d')
labels_3d = result.get('labels_3d')
vis_imgs = []
boxes_3d_for_bev = labels_3d_for_bev = scores_3d_for_bev = None
if boxes_3d is not None and len(boxes_3d) > 0:
mask = (scores_3d > score_thr) if (score_thr > 0 and scores_3d is not None) else np.ones_like(scores_3d, dtype=bool)
if np.any(mask):
boxes_3d = boxes_3d[mask]
scores_3d = scores_3d[mask]
labels_3d = labels_3d[mask]
boxes_3d_for_bev = boxes_3d.copy()
labels_3d_for_bev = labels_3d.copy()
scores_3d_for_bev = scores_3d.copy()
for cam_idx, vis_img in enumerate(raw_imgs):
vis_img = vis_img.copy()
if lidar2img.shape[1] > cam_idx:
cam_lidar2img = lidar2img[0, cam_idx]
for box, label in zip(boxes_3d, labels_3d):
color = CLASS_COLORS.get(int(label), (255, 255, 255))
try:
vis_img = draw_bbox3d_on_img_custom_np(box[None], vis_img, cam_lidar2img, color=color, thickness=2)
except Exception:
pass
vis_imgs.append(vis_img)
else:
vis_imgs = raw_imgs
if pc_range is None:
pc_range = [-51.2, -51.2, -5.0, 51.2, 51.2, 3.0]
if boxes_3d_for_bev is not None and len(boxes_3d_for_bev) > 0:
bev_size = (vis_imgs[0].shape[0], vis_imgs[0].shape[0]) if vis_imgs else (800, 800)
bev_img = draw_bev_map(boxes_3d_for_bev, labels_3d_for_bev, scores_3d_for_bev, pc_range, bev_size=bev_size, score_thr=score_thr)
else:
bev_size = (vis_imgs[0].shape[0], vis_imgs[0].shape[0]) if vis_imgs else (800, 800)
bev_img = np.full((bev_size[1], bev_size[0], 3), 255, np.uint8)
cv2.putText(bev_img, 'BEV Map (No Detections)', (10, bev_size[1]//2), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2)
if len(vis_imgs) == 6:
target_height = max(img.shape[0] for img in vis_imgs)
resized_imgs = [img if img.shape[0] == target_height else cv2.resize(img, (int(img.shape[1] * target_height / img.shape[0]), target_height)) for img in vis_imgs]
reordered_imgs = [
resized_imgs[2], resized_imgs[0], resized_imgs[1],
cv2.flip(resized_imgs[4], 1), cv2.flip(resized_imgs[3], 1), cv2.flip(resized_imgs[5], 1)
]
top_row = np.hstack(reordered_imgs[:3])
bottom_row = np.hstack(reordered_imgs[3:])
left_side = np.vstack([top_row, bottom_row])
bev_img = cv2.resize(bev_img, (int(bev_img.shape[1] * left_side.shape[0] / bev_img.shape[0]), left_side.shape[0]))
vis_img = np.hstack([left_side, bev_img])
elif len(vis_imgs) > 1:
target_height = max(img.shape[0] for img in vis_imgs)
resized_imgs = [img if img.shape[0] == target_height else cv2.resize(img, (int(img.shape[1] * target_height / img.shape[0]), target_height)) for img in vis_imgs]
if bev_img.shape[0] != target_height:
bev_img = cv2.resize(bev_img, (int(bev_img.shape[1] * target_height / bev_img.shape[0]), target_height))
vis_img = np.hstack([np.hstack(resized_imgs), bev_img])
else:
cam_img = vis_imgs[0] if vis_imgs else bev_img
if bev_img.shape[0] != cam_img.shape[0]:
bev_img = cv2.resize(bev_img, (int(bev_img.shape[1] * cam_img.shape[0] / bev_img.shape[0]), cam_img.shape[0]))
vis_img = np.hstack([cam_img, bev_img]) if vis_imgs else bev_img
return vis_img
def create_video_from_images(image_dir, output_video_path, fps=3):
import subprocess
image_files = sorted([f for f in os.listdir(image_dir) if f.endswith(('.png', '.jpg', '.jpeg'))])
if len(image_files) == 0:
return
first_img = cv2.imread(osp.join(image_dir, image_files[0]))
if first_img is None:
return
height, width = first_img.shape[:2]
max_width, max_height = 1920, 1080
if width > max_width or height > max_height:
scale = min(max_width / width, max_height / height)
width, height = int(width * scale), int(height * scale)
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
video_writer = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
if not video_writer.isOpened():
fourcc = cv2.VideoWriter_fourcc(*'XVID')
video_writer = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
for img_file in tqdm(image_files, desc=f"Creating video: {osp.basename(output_video_path)}"):
img_path = osp.join(image_dir, img_file)
img = cv2.imread(img_path)
if img is not None:
if img.shape[:2] != (height, width):
img = cv2.resize(img, (width, height))
video_writer.write(img)
video_writer.release()
def main():
args = parse_args()
# Load configuration from JSON
config = load_config_from_json(args.config_json)
# Create output directory
os.makedirs(args.output_dir, exist_ok=True)
# Load AXModel
ax_session = load_axmodel(args.model)
# Get model parameters from config
transformer_cfg = config['model']['transformer']
bev_h = transformer_cfg['bev_h']
bev_w = transformer_cfg['bev_w']
embed_dims = transformer_cfg['embed_dims']
# Load scene index
scene_index_path = osp.join(args.data_dir, 'scene_index.json')
with open(scene_index_path, 'r') as f:
scene_index_data = json.load(f)
scenes_dict = scene_index_data['scenes']
scene_names = list(scenes_dict.keys())
end_scene = args.end_scene if args.end_scene is not None else len(scene_names)
end_scene = min(end_scene, len(scene_names))
prev_frame_info = {
'prev_bev': None,
'scene_token': None,
'prev_pos': np.zeros(3, dtype=np.float32),
'prev_angle': 0.0,
}
scene_results = defaultdict(list)
# Process all scenes
for scene_idx in range(args.start_scene, end_scene):
scene_name = scene_names[scene_idx]
scene_info = scenes_dict[scene_name]
sample_indices = scene_info['samples']
num_frames = len(sample_indices)
print(f"Processing scene {scene_idx+1}/{len(scene_names)}: {scene_name} ({num_frames} frames)")
# Reset prev_bev for new scene
if scene_name != prev_frame_info['scene_token']:
prev_frame_info['prev_bev'] = None
prev_frame_info['prev_pos'] = np.zeros(3, dtype=np.float32)
prev_frame_info['prev_angle'] = 0.0
prev_frame_info['scene_token'] = scene_name
# Process all frames in this scene
for local_idx, frame_idx in enumerate(tqdm(sample_indices, desc=f"Scene {scene_name}")):
# Load data
img, lidar2img, can_bus, meta = load_data(args.data_dir, scene_name, frame_idx)
# Process can_bus (compute delta)
curr_can_bus_np = can_bus[0] # (18,)
tmp_pos = curr_can_bus_np[:3].copy()
tmp_angle = curr_can_bus_np[-1]
delta_can_bus_np = curr_can_bus_np.copy()
if prev_frame_info['prev_bev'] is not None and prev_frame_info['scene_token'] == scene_name:
delta_can_bus_np[:3] -= prev_frame_info['prev_pos']
delta_can_bus_np[-1] -= prev_frame_info['prev_angle']
else:
delta_can_bus_np[:3] = 0.0
delta_can_bus_np[-1] = 0.0
prev_frame_info['prev_pos'] = tmp_pos
prev_frame_info['prev_angle'] = tmp_angle
# Prepare prev_bev
prev_bev_input = next((inp for inp in ax_session.get_inputs() if inp.name == 'prev_bev'), None)
expected_shape = (bev_h * bev_w, 1, embed_dims)
if prev_bev_input is not None:
expected_shape = list(prev_bev_input.shape)
for i, dim in enumerate(expected_shape):
if isinstance(dim, str) or dim < 0:
expected_shape[i] = (bev_h * bev_w, 1, embed_dims)[i] if i < 3 else 1
expected_shape = tuple(expected_shape)
if prev_frame_info['prev_bev'] is None:
prev_bev = np.zeros(expected_shape, dtype=np.float32)
else:
prev_bev = prev_frame_info['prev_bev']
if prev_bev.shape != expected_shape and len(prev_bev.shape) == 3:
prev_bev = prev_bev.reshape(expected_shape)
# Prepare AXEngine inputs
img_np = img.astype(np.float32)
lidar2img_np = lidar2img.astype(np.float32)
can_bus_np = delta_can_bus_np.reshape(1, -1).astype(np.float32)
input_names = [inp.name for inp in ax_session.get_inputs()]
ax_inputs = {}
for name in input_names:
if name == 'img':
ax_inputs['img'] = img_np
elif name == 'can_bus':
ax_inputs['can_bus'] = can_bus_np
elif name == 'lidar2img':
ax_inputs['lidar2img'] = lidar2img_np
elif name == 'prev_bev':
ax_inputs['prev_bev'] = prev_bev
# Run inference
ax_outputs = ax_session.run(None, ax_inputs)
bev_embed, all_cls_scores, all_bbox_preds = ax_outputs
prev_frame_info['prev_bev'] = bev_embed
# Post-process
results = post_process_outputs_np(
all_cls_scores, all_bbox_preds, config, args.score_thr
)
# Visualize
img_norm_cfg = config['img_norm']
class_names = config['dataset']['class_names']
pc_range = config['model']['bbox_coder']['pc_range']
vis_img = visualize_results_np(
img, results[0], lidar2img, img_norm_cfg, class_names, args.score_thr, pc_range=pc_range
)
scene_results[scene_name].append({
'frame_idx': local_idx,
'result': results[0],
'vis_img': vis_img,
'meta': meta
})
# Save results
for scene_name, frames in tqdm(scene_results.items(), desc="Save scene results"):
scene_dir = osp.join(args.output_dir, scene_name)
os.makedirs(scene_dir, exist_ok=True)
images_dir = osp.join(scene_dir, 'images')
os.makedirs(images_dir, exist_ok=True)
for local_idx, frame_data in enumerate(frames):
vis_img = frame_data['vis_img']
if vis_img is None:
continue
if not isinstance(vis_img, np.ndarray):
vis_img = np.array(vis_img)
if vis_img.dtype != np.uint8:
vis_img = (vis_img * 255).astype(np.uint8) if vis_img.max() <= 1.0 else vis_img.astype(np.uint8)
if len(vis_img.shape) == 3 and vis_img.shape[0] in (1, 3):
vis_img = vis_img.transpose(1, 2, 0)
if vis_img.shape[0] > 0 and vis_img.shape[1] > 0:
cv2.imwrite(osp.join(images_dir, f'frame_{local_idx:06d}.png'), vis_img)
create_video_from_images(images_dir, osp.join(scene_dir, f'{scene_name}_result.mp4'), args.fps)
print(f"✓ Scene {scene_name}: {len(frames)} frames, video: {osp.join(scene_dir, f'{scene_name}_result.mp4')}")
if __name__ == '__main__':
main()
|