#!/usr/bin/env python3 import argparse import json import os import os.path as osp import cv2 import numpy as np import axengine as axe from collections import defaultdict from tqdm import tqdm def parse_args(): parser = argparse.ArgumentParser(description='BEVFormer AXEngine Inference from Extracted Data') parser.add_argument('model', help='AXModel path') parser.add_argument('config_json', help='JSON config file path') parser.add_argument('data_dir', help='extracted data directory (extracted_data)') parser.add_argument('--output-dir', default='./inference_results_extracted', help='output directory') parser.add_argument('--score-thr', type=float, default=0.1, help='score threshold') parser.add_argument('--fps', type=int, default=3, help='video fps') parser.add_argument('--start-scene', type=int, default=0, help='start scene index') parser.add_argument('--end-scene', type=int, default=None, help='end scene index (None for all)') return parser.parse_args() def load_axmodel(axmodel_path): """Load AXModel""" # 尝试使用 AxEngineExecutionProvider 而不是 AXCLRTExecutionProvider providers = ['AxEngineExecutionProvider'] session = axe.InferenceSession(axmodel_path, providers=providers) return session def load_config_from_json(config_path): """Load configuration from JSON file""" with open(config_path, 'r') as f: config = json.load(f) return config def preprocess_image(img_path, img_norm_cfg, target_size=(480, 800)): """Preprocess image: load, resize, normalize Args: img_path: path to image file img_norm_cfg: normalization config with 'mean', 'std', 'to_rgb' target_size: (H, W) target size Returns: img: (C, H, W) normalized numpy array, float32 """ # Load image img = cv2.imread(img_path) if img is None: raise ValueError(f"Cannot load image: {img_path}") # Convert BGR to RGB if needed if img_norm_cfg.get('to_rgb', True): img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) # Resize if needed if img.shape[:2] != target_size: img = cv2.resize(img, (target_size[1], target_size[0])) # (W, H) # Convert to float and normalize img = img.astype(np.float32) mean = np.array(img_norm_cfg.get('mean', [123.675, 116.28, 103.53]), dtype=np.float32) std = np.array(img_norm_cfg.get('std', [58.395, 57.12, 57.375]), dtype=np.float32) img = (img - mean) / std img = img.transpose(2, 0, 1) # (H, W, C) -> (C, H, W) return img def load_data(data_dir, scene_name, frame_idx): """Load data Args: data_dir: data directory path scene_name: scene name (scene token) frame_idx: frame index (sample index) Returns: img: (1, N, C, H, W) numpy array lidar2img: (1, N, 4, 4) numpy array can_bus: (1, 18) numpy array meta: dict with metadata """ scene_dir = osp.join(data_dir, scene_name) # Load meta meta_path = osp.join(scene_dir, f'meta_{frame_idx:06d}.json') with open(meta_path, 'r') as f: meta = json.load(f) # Get normalization config img_norm_cfg = meta.get('img_norm_cfg', { 'mean': [123.675, 116.28, 103.53], 'std': [58.395, 57.12, 57.375], 'to_rgb': True }) # Get image shape img_shape = meta.get('img_shape', [[480, 800, 3]] * 6) target_size = (img_shape[0][0], img_shape[0][1]) # (H, W) # Load images for all cameras num_cams = meta.get('num_cams', 6) imgs = [] for cam_idx in range(num_cams): img_path = osp.join(scene_dir, f'cam_{cam_idx:02d}_{frame_idx:06d}.png') img = preprocess_image(img_path, img_norm_cfg, target_size) imgs.append(img) # Stack images: (N, C, H, W) -> (1, N, C, H, W) img = np.stack(imgs, axis=0) # (N, C, H, W) img = img[np.newaxis, ...] # (1, N, C, H, W) # Load lidar2img: (N, 4, 4) -> (1, N, 4, 4) lidar2img = np.array(meta['lidar2img'], dtype=np.float32) # (N, 4, 4) lidar2img = lidar2img[np.newaxis, ...] # (1, N, 4, 4) # Load can_bus: (18,) -> (1, 18) can_bus = np.array(meta['can_bus'], dtype=np.float32) # (18,) can_bus = can_bus[np.newaxis, ...] # (1, 18) return img, lidar2img, can_bus, meta CLASS_COLORS = { 0: (0, 255, 0), 1: (255, 255, 0), 2: (0, 0, 255), 3: (0, 165, 255), 4: (255, 0, 255), 5: (0, 255, 255), 6: (128, 0, 128), 7: (255, 165, 0), 8: (0, 0, 255), 9: (128, 128, 128), } def denormalize_bbox_np(normalized_bboxes, pc_range): """Denormalize bbox using numpy only""" # rotation rot_sine = normalized_bboxes[..., 6:7] rot_cosine = normalized_bboxes[..., 7:8] rot = np.arctan2(rot_sine, rot_cosine) # center in the bev cx = normalized_bboxes[..., 0:1] cy = normalized_bboxes[..., 1:2] cz = normalized_bboxes[..., 4:5] # size w = normalized_bboxes[..., 2:3] l = normalized_bboxes[..., 3:4] h = normalized_bboxes[..., 5:6] w = np.exp(w) l = np.exp(l) h = np.exp(h) if normalized_bboxes.shape[-1] > 8: # velocity vx = normalized_bboxes[:, 8:9] vy = normalized_bboxes[:, 9:10] denormalized_bboxes = np.concatenate([cx, cy, cz, w, l, h, rot, vx, vy], axis=-1) else: denormalized_bboxes = np.concatenate([cx, cy, cz, w, l, h, rot], axis=-1) return denormalized_bboxes def decode_bboxes_custom_np(all_cls_scores, all_bbox_preds, pc_range, post_center_range, max_num=100, score_threshold=None, num_classes=10): """Custom bbox decode function""" # Use output from the last decoder layer all_cls_scores = all_cls_scores[-1] # (bs, num_query, num_classes) all_bbox_preds = all_bbox_preds[-1] # (bs, num_query, 10) batch_size = all_cls_scores.shape[0] predictions_list = [] for i in range(batch_size): cls_scores = all_cls_scores[i] # (num_query, num_classes) bbox_preds = all_bbox_preds[i] # (num_query, 10) # Apply sigmoid cls_scores = 1.0 / (1.0 + np.exp(-cls_scores)) # TopK selection cls_scores_flat = cls_scores.reshape(-1) topk_indices = np.argsort(cls_scores_flat)[::-1][:max_num] scores = cls_scores_flat[topk_indices] labels = topk_indices % num_classes bbox_index = topk_indices // num_classes bbox_preds = bbox_preds[bbox_index] # Denormalize bbox final_box_preds = denormalize_bbox_np(bbox_preds, pc_range) # (max_num, 9) final_scores = scores final_preds = labels # Apply score threshold if score_threshold is not None: thresh_mask = final_scores > score_threshold tmp_score = score_threshold while thresh_mask.sum() == 0: tmp_score *= 0.9 if tmp_score < 0.01: thresh_mask = np.ones(len(final_scores), dtype=bool) break thresh_mask = final_scores >= tmp_score else: thresh_mask = np.ones(len(final_scores), dtype=bool) # Apply post processing range filtering if post_center_range is not None: post_center_range_arr = np.array(post_center_range) mask = (final_box_preds[..., :3] >= post_center_range_arr[:3]).all(1) mask &= (final_box_preds[..., :3] <= post_center_range_arr[3:]).all(1) mask &= thresh_mask boxes3d = final_box_preds[mask] scores = final_scores[mask] labels = final_preds[mask] else: boxes3d = final_box_preds[thresh_mask] scores = final_scores[thresh_mask] labels = final_preds[thresh_mask] predictions_list.append({ 'bboxes': boxes3d, 'scores': scores, 'labels': labels }) return predictions_list def get_bboxes_custom_np(preds_dicts, pc_range, post_center_range, max_num=100, score_threshold=None, num_classes=10): """Custom get_bboxes function""" # Decode bounding boxes preds_list = decode_bboxes_custom_np( preds_dicts['all_cls_scores'], preds_dicts['all_bbox_preds'], pc_range, post_center_range, max_num, score_threshold, num_classes ) num_samples = len(preds_list) ret_list = [] for i in range(num_samples): preds = preds_list[i] bboxes = preds['bboxes'] if len(bboxes) == 0: ret_list.append(( np.zeros((0, 9), dtype=np.float32), np.zeros((0,), dtype=np.float32), np.zeros((0,), dtype=np.int64) )) continue # Adjust z coordinate: convert center z to bottom center z bboxes[:, 2] = bboxes[:, 2] - bboxes[:, 5] * 0.5 # Shrink box dimensions: multiply w, l, h by 0.9 to fix oversized boxes bboxes[:, 3:6] = bboxes[:, 3:6] * 0.9 scores = preds['scores'] labels = preds['labels'] ret_list.append((bboxes, scores, labels)) return ret_list def format_bbox_result_np(bboxes, scores, labels): return { 'boxes_3d': bboxes, 'scores_3d': scores, 'labels_3d': labels } def rotation_3d_in_axis_np(points, angles, axis=2): """Rotate points by angles according to axis""" rot_sin = np.sin(angles) rot_cos = np.cos(angles) ones = np.ones_like(rot_cos) zeros = np.zeros_like(rot_cos) if axis == 2 or axis == -1: # Rotate around z-axis # Build rotation matrix: (N, 3, 3) N = len(angles) rot_mat = np.zeros((N, 3, 3), dtype=points.dtype) rot_mat[:, 0, 0] = rot_cos rot_mat[:, 0, 1] = -rot_sin rot_mat[:, 0, 2] = zeros rot_mat[:, 1, 0] = rot_sin rot_mat[:, 1, 1] = rot_cos rot_mat[:, 1, 2] = zeros rot_mat[:, 2, 0] = zeros rot_mat[:, 2, 1] = zeros rot_mat[:, 2, 2] = ones # Rotation: (N, M, 3) @ (N, 3, 3) -> (N, M, 3) return np.einsum('aij,ajk->aik', points, rot_mat) else: raise ValueError(f'Only axis=2 (z-axis) is supported for LiDAR boxes') def compute_bbox_corners_np(bboxes): """Compute 8 corners of 3D bbox""" if len(bboxes) == 0: return np.zeros((0, 8, 3), dtype=np.float32) dtype = bboxes.dtype # Extract bbox parameters centers = bboxes[:, :3] # (N, 3) [x, y, z] - the bottom center w = bboxes[:, 3:4] # width (y direction) l = bboxes[:, 4:5] # length (x direction) h = bboxes[:, 5:6] # height (z direction) dims = np.concatenate([l, w, h], axis=1) # (N, 3) [x_size, y_size, z_size] = [l, w, h] yaws = bboxes[:, 6] # (N,) yaw angle # Fix: offset yaw by -80 degrees yaws = yaws - (np.pi / 2.0 - np.pi / 18.0) # Generate corners corners_norm = np.stack(np.unravel_index(np.arange(8), [2] * 3), axis=1).astype(dtype) # Rearrange to [0, 1, 3, 2, 4, 5, 7, 6] corners_norm = corners_norm[[0, 1, 3, 2, 4, 5, 7, 6]] # Use relative origin [0.5, 0.5, 0] (bottom center) corners_norm = corners_norm - np.array([0.5, 0.5, 0], dtype=dtype) # Scale corners: dims is [x_size, y_size, z_size] corners = dims[:, np.newaxis, :] * corners_norm[np.newaxis, :, :] # (N, 8, 3) # Rotate around z-axis corners = rotation_3d_in_axis_np(corners, yaws, axis=2) # Translate to center point corners += centers[:, np.newaxis, :] return corners def draw_bbox3d_on_img_custom_np(bboxes, raw_img, lidar2img_rt, color=(0, 255, 0), thickness=2): """Custom 3D bbox drawing""" img = raw_img.copy() if len(bboxes) == 0: return img if not isinstance(bboxes, np.ndarray): bboxes = np.array(bboxes) if not isinstance(lidar2img_rt, np.ndarray): lidar2img_rt = np.array(lidar2img_rt) lidar2img_rt = lidar2img_rt.reshape(4, 4) # Compute corners corners_3d = compute_bbox_corners_np(bboxes) # (N, 8, 3) num_bbox = corners_3d.shape[0] # Project to 2D corners_3d_flat = corners_3d.reshape(-1, 3) # (N*8, 3) ones = np.ones((corners_3d_flat.shape[0], 1), dtype=np.float32) pts_4d = np.concatenate([corners_3d_flat, ones], axis=-1) # (N*8, 4) # Project pts_2d = pts_4d @ lidar2img_rt.T # (N*8, 4) # Perspective division pts_2d[:, 2] = np.clip(pts_2d[:, 2], a_min=1e-5, a_max=1e5) pts_2d[:, 0] /= pts_2d[:, 2] pts_2d[:, 1] /= pts_2d[:, 2] imgfov_pts_2d = pts_2d[:, :2].reshape(num_bbox, 8, 2) line_indices = ((0, 1), (0, 3), (0, 4), (1, 2), (1, 5), (3, 2), (3, 7), (4, 5), (4, 7), (2, 6), (5, 6), (6, 7)) for i in range(num_bbox): corners = imgfov_pts_2d[i].astype(np.int32) for start, end in line_indices: pt1 = (int(corners[start, 0]), int(corners[start, 1])) pt2 = (int(corners[end, 0]), int(corners[end, 1])) # Check if points are within image range h, w = img.shape[:2] if (0 <= pt1[0] < w and 0 <= pt1[1] < h) or (0 <= pt2[0] < w and 0 <= pt2[1] < h): cv2.line(img, pt1, pt2, color, thickness, cv2.LINE_AA) return img.astype(np.uint8) def post_process_outputs_np(all_cls_scores, all_bbox_preds, config, score_thr=0.1): bbox_coder = config['model']['bbox_coder'] pc_range = bbox_coder['pc_range'] post_center_range = bbox_coder['post_center_range'] max_num = bbox_coder['max_num'] score_threshold = bbox_coder.get('score_threshold', None) num_classes = bbox_coder['num_classes'] preds_dicts = { 'all_cls_scores': all_cls_scores, 'all_bbox_preds': all_bbox_preds } bbox_list = get_bboxes_custom_np( preds_dicts, pc_range, post_center_range, max_num, score_threshold, num_classes ) results = [] for bboxes, scores, labels in bbox_list: # Set class score thresholds class_score_thrs = { 0: 0.3, # Car 1: 0.3, # Truck 2: 0.3, # Construction vehicle 3: 0.3, # Bus 4: 0.3, # Trailer 5: 0.3, # Barrier 6: 0.3, # Motorcycle 7: 0.3, # Bicycle 8: 0.3, # Pedestrian 9: 0.3, # Traffic cone } default_thr = score_thr keep_indices = [] for i in range(len(scores)): cls_id = int(labels[i]) thr = class_score_thrs.get(cls_id, default_thr) if scores[i] > thr: keep_indices.append(i) if len(keep_indices) == 0: results.append(format_bbox_result_np( np.zeros((0, 9), dtype=np.float32), np.zeros((0,), dtype=np.float32), np.zeros((0,), dtype=np.int64) )) continue keep_indices = np.array(keep_indices, dtype=np.int64) bboxes = bboxes[keep_indices] scores = scores[keep_indices] labels = labels[keep_indices] # Circle NMS dist_thrs = { 0: 2.0, 1: 3.0, 2: 2.5, 3: 4.0, 4: 3.0, 5: 1.0, 6: 1.5, 7: 1.0, 8: 0.5, 9: 0.3, } if len(scores) > 0: keep_nms = circle_nms_np(bboxes, scores, labels, dist_thrs) if len(keep_nms) > 0: bboxes = bboxes[keep_nms] scores = scores[keep_nms] labels = labels[keep_nms] else: results.append(format_bbox_result_np( np.zeros((0, 9), dtype=np.float32), np.zeros((0,), dtype=np.float32), np.zeros((0,), dtype=np.int64) )) continue results.append(format_bbox_result_np(bboxes, scores, labels)) return results def circle_nms_np(bboxes, scores, labels, dist_thrs): if len(bboxes) == 0: return np.array([], dtype=np.int64) keep = [] order = np.argsort(scores)[::-1] bboxes = bboxes[order] scores = scores[order] labels = labels[order] pts = bboxes[:, :2] labels_np = labels suppressed = np.zeros(len(bboxes), dtype=bool) for i in range(len(bboxes)): if suppressed[i]: continue keep.append(order[i]) curr_cls = int(labels_np[i]) radius = dist_thrs.get(curr_cls, 1.0) if i + 1 < len(bboxes): dists = np.linalg.norm(pts[i+1:] - pts[i], axis=1) idx_to_suppress = np.where( (dists < radius) & (labels_np[i+1:] == curr_cls) )[0] suppressed[i+1:][idx_to_suppress] = True return np.array(keep, dtype=np.int64) def denormalize_img_np(img_array, img_norm_cfg): """Denormalize image array (C, H, W) to (H, W, C) BGR""" mean = np.array(img_norm_cfg.get('mean', [123.675, 116.28, 103.53])) std = np.array(img_norm_cfg.get('std', [58.395, 57.12, 57.375])) # (C, H, W) RGB -> (H, W, C) RGB if img_array.ndim == 3: img = img_array.transpose(1, 2, 0) else: img = img_array img = (img * std + mean) img = np.clip(img, 0, 255).astype(np.uint8) img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) return img def draw_bev_map(bboxes, labels, scores, pc_range, bev_size=(800, 800), score_thr=0.1): """Draw BEV (Bird's Eye View) map with detections Args: bboxes: (N, 9) numpy array, format: [x, y, z, w, l, h, yaw, vx, vy] labels: (N,) numpy array, class labels scores: (N,) numpy array, detection scores pc_range: [x_min, y_min, z_min, x_max, y_max, z_max] bev_size: (width, height) of BEV image score_thr: score threshold Returns: bev_img: (H, W, 3) numpy array, BEV visualization """ bev_w, bev_h = bev_size # BEV image size bev_img = np.ones((bev_h, bev_w, 3), dtype=np.uint8) * 255 # White background # Draw grid x_min, y_min, z_min, x_max, y_max, z_max = pc_range x_range = x_max - x_min y_range = y_max - y_min # Draw grid lines grid_color = (200, 200, 200) # Light gray grid lines for i in range(-5, 6): x = x_min + (i + 5) * x_range / 10 y = y_min + (i + 5) * y_range / 10 # Vertical lines (y direction in LiDAR -> x direction in image) img_x = int((y - y_min) / y_range * bev_w) if 0 <= img_x < bev_w: cv2.line(bev_img, (img_x, 0), (img_x, bev_h), grid_color, 1) # Horizontal lines (x direction in LiDAR -> y direction in image, flipped) img_y = int((x_max - x) / x_range * bev_h) if 0 <= img_y < bev_h: cv2.line(bev_img, (0, img_y), (bev_w, img_y), grid_color, 1) # Draw center lines (ego vehicle position) - darker on white background center_x = int((0 - y_min) / y_range * bev_w) center_y = int((x_max - 0) / x_range * bev_h) cv2.line(bev_img, (center_x, 0), (center_x, bev_h), (150, 150, 150), 2) cv2.line(bev_img, (0, center_y), (bev_w, center_y), (150, 150, 150), 2) ego_length_px = 30 # pixels (representing ~4.5m, along x-axis rightward) ego_width_px = 12 # pixels (representing ~1.8m, along y-axis downward) ego_corners_local = np.array([ [ego_length_px//2, -ego_width_px//2], # front-top (head) [ego_length_px//2, ego_width_px//2], # front-bottom [-ego_length_px//2, ego_width_px//2], # back-bottom [-ego_length_px//2, -ego_width_px//2], # back-top ], dtype=np.float32) rotation_angle_90 = np.pi / 2 # 90 degrees in radians cos_rot_90 = np.cos(rotation_angle_90) sin_rot_90 = np.sin(rotation_angle_90) rot_mat_90 = np.array([[cos_rot_90, -sin_rot_90], [sin_rot_90, cos_rot_90]]) ego_corners_rotated_90 = ego_corners_local @ rot_mat_90.T ego_corners_rotated = ego_corners_rotated_90 @ rot_mat_90.T # Translate to image coordinates (center position) ego_corners = [] for corner in ego_corners_rotated: corner_img_x = int(center_x + corner[0]) corner_img_y = int(center_y + corner[1]) ego_corners.append([corner_img_x, corner_img_y]) ego_corners = np.array(ego_corners, dtype=np.int32) # Draw filled rectangle cv2.fillPoly(bev_img, [ego_corners], (0, 0, 255)) # Red filled cv2.polylines(bev_img, [ego_corners], True, (0, 0, 0), 2) # Black outline arrow_length = ego_length_px // 2 initial_direction = np.array([1.0, 0.0]) arrow_dir_rotated_90 = initial_direction @ rot_mat_90.T arrow_dir_rotated = arrow_dir_rotated_90 @ rot_mat_90.T arrow_end_x = int(center_x + arrow_length * arrow_dir_rotated[0]) arrow_end_y = int(center_y + arrow_length * arrow_dir_rotated[1]) cv2.arrowedLine(bev_img, (center_x, center_y), (arrow_end_x, arrow_end_y), (0, 0, 0), 3, tipLength=0.3) # Black arrow if len(bboxes) == 0: return bev_img if score_thr > 0: mask = scores > score_thr bboxes = bboxes[mask] labels = labels[mask] scores = scores[mask] if len(bboxes) == 0: return bev_img default_color = (255, 255, 255) for i in range(len(bboxes)): box = bboxes[i] label = int(labels[i]) score = float(scores[i]) color = CLASS_COLORS.get(label, default_color) x, y, z = box[0], box[1], box[2] # center position w, l, h = box[3], box[4], box[5] # width, length, height yaw = box[6] # yaw angle yaw = yaw - np.pi / 2.0 # Subtract 90 degrees (counterclockwise) # Convert to image coordinates # Note: In LiDAR coordinate, x is forward, y is left, z is up # In BEV image (top-down view): # - x (forward) -> image y (downward, flipped) # - y (left) -> image x (rightward) # So: img_x = (y - y_min) / y_range * bev_w # img_y = (x_max - x) / x_range * bev_h (flip x to get top-down view) img_x = int((y - y_min) / y_range * bev_w) img_y = int((x_max - x) / x_range * bev_h) # Flip x for top-down view # Skip if outside image if not (0 <= img_x < bev_w and 0 <= img_y < bev_h): continue # Calculate box dimensions in image space box_w_px = int(w / x_range * bev_w) box_l_px = int(l / y_range * bev_h) # Draw rotated rectangle # Calculate 4 corners of the box in LiDAR coordinates cos_yaw = np.cos(yaw) sin_yaw = np.sin(yaw) # Box corners relative to center (in LiDAR frame: x forward, y left) corners_local = np.array([ [l/2, w/2], # front-right [l/2, -w/2], # front-left [-l/2, -w/2], # back-left [-l/2, w/2] # back-right ]) # Rotate corners rot_mat = np.array([[cos_yaw, -sin_yaw], [sin_yaw, cos_yaw]]) corners_rotated = corners_local @ rot_mat.T # Translate to world coordinates and convert to image space corners_img = [] for corner in corners_rotated: corner_x = x + corner[0] # x in LiDAR (forward) corner_y = y + corner[1] # y in LiDAR (left) corner_img_x = int((corner_y - y_min) / y_range * bev_w) # y -> img_x corner_img_y = int((x_max - corner_x) / x_range * bev_h) # x -> img_y (flipped) corners_img.append([corner_img_x, corner_img_y]) corners_img = np.array(corners_img, dtype=np.int32) # Draw filled polygon (semi-transparent on white background) overlay = bev_img.copy() cv2.fillPoly(overlay, [corners_img], color) cv2.addWeighted(overlay, 0.5, bev_img, 0.5, 0, bev_img) # Draw outline (black on white background) cv2.polylines(bev_img, [corners_img], True, (0, 0, 0), 2) # Draw direction arrow (forward direction) - black on white # In LiDAR: forward is +x, left is +y # In BEV image: x -> img_y (flipped), y -> img_x # So rotation: img_x += sin(yaw) * length, img_y -= cos(yaw) * length arrow_length = max(box_l_px // 2, 10) arrow_end_x = int(img_x + arrow_length * sin_yaw) # y component -> img_x arrow_end_y = int(img_y - arrow_length * cos_yaw) # x component -> img_y (flipped) cv2.arrowedLine(bev_img, (img_x, img_y), (arrow_end_x, arrow_end_y), (0, 0, 0), 2, tipLength=0.3) # Black arrow # Draw center point cv2.circle(bev_img, (img_x, img_y), 3, (0, 0, 0), -1) # Black center point # Rotate BEV map counterclockwise by 90 degrees (map only, not text) center = (bev_w // 2, bev_h // 2) rotation_matrix = cv2.getRotationMatrix2D(center, 90, 1.0) # 90 degrees counterclockwise bev_img = cv2.warpAffine(bev_img, rotation_matrix, (bev_w, bev_h), borderValue=(255, 255, 255)) # Flip horizontally to fix mirror effect bev_img = cv2.flip(bev_img, 1) # 1 for horizontal flip text = 'BEV Map' font = cv2.FONT_HERSHEY_SIMPLEX font_scale = 1 thickness = 2 (text_width, text_height), baseline = cv2.getTextSize(text, font, font_scale, thickness) text_x = bev_w - text_width - 10 text_y = text_height + 10 cv2.putText(bev_img, text, (text_x, text_y), font, font_scale, (0, 0, 0), thickness) return bev_img def visualize_results_np(img, result, lidar2img, img_norm_cfg, class_names, score_thr=0.3, pc_range=None): num_cams = img.shape[1] if img.ndim == 5 else 1 raw_imgs = [denormalize_img_np(img[0, cam_idx], img_norm_cfg) for cam_idx in range(num_cams)] boxes_3d = result.get('boxes_3d') scores_3d = result.get('scores_3d') labels_3d = result.get('labels_3d') vis_imgs = [] boxes_3d_for_bev = labels_3d_for_bev = scores_3d_for_bev = None if boxes_3d is not None and len(boxes_3d) > 0: mask = (scores_3d > score_thr) if (score_thr > 0 and scores_3d is not None) else np.ones_like(scores_3d, dtype=bool) if np.any(mask): boxes_3d = boxes_3d[mask] scores_3d = scores_3d[mask] labels_3d = labels_3d[mask] boxes_3d_for_bev = boxes_3d.copy() labels_3d_for_bev = labels_3d.copy() scores_3d_for_bev = scores_3d.copy() for cam_idx, vis_img in enumerate(raw_imgs): vis_img = vis_img.copy() if lidar2img.shape[1] > cam_idx: cam_lidar2img = lidar2img[0, cam_idx] for box, label in zip(boxes_3d, labels_3d): color = CLASS_COLORS.get(int(label), (255, 255, 255)) try: vis_img = draw_bbox3d_on_img_custom_np(box[None], vis_img, cam_lidar2img, color=color, thickness=2) except Exception: pass vis_imgs.append(vis_img) else: vis_imgs = raw_imgs if pc_range is None: pc_range = [-51.2, -51.2, -5.0, 51.2, 51.2, 3.0] if boxes_3d_for_bev is not None and len(boxes_3d_for_bev) > 0: bev_size = (vis_imgs[0].shape[0], vis_imgs[0].shape[0]) if vis_imgs else (800, 800) bev_img = draw_bev_map(boxes_3d_for_bev, labels_3d_for_bev, scores_3d_for_bev, pc_range, bev_size=bev_size, score_thr=score_thr) else: bev_size = (vis_imgs[0].shape[0], vis_imgs[0].shape[0]) if vis_imgs else (800, 800) bev_img = np.full((bev_size[1], bev_size[0], 3), 255, np.uint8) cv2.putText(bev_img, 'BEV Map (No Detections)', (10, bev_size[1]//2), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) if len(vis_imgs) == 6: target_height = max(img.shape[0] for img in vis_imgs) resized_imgs = [img if img.shape[0] == target_height else cv2.resize(img, (int(img.shape[1] * target_height / img.shape[0]), target_height)) for img in vis_imgs] reordered_imgs = [ resized_imgs[2], resized_imgs[0], resized_imgs[1], cv2.flip(resized_imgs[4], 1), cv2.flip(resized_imgs[3], 1), cv2.flip(resized_imgs[5], 1) ] top_row = np.hstack(reordered_imgs[:3]) bottom_row = np.hstack(reordered_imgs[3:]) left_side = np.vstack([top_row, bottom_row]) bev_img = cv2.resize(bev_img, (int(bev_img.shape[1] * left_side.shape[0] / bev_img.shape[0]), left_side.shape[0])) vis_img = np.hstack([left_side, bev_img]) elif len(vis_imgs) > 1: target_height = max(img.shape[0] for img in vis_imgs) resized_imgs = [img if img.shape[0] == target_height else cv2.resize(img, (int(img.shape[1] * target_height / img.shape[0]), target_height)) for img in vis_imgs] if bev_img.shape[0] != target_height: bev_img = cv2.resize(bev_img, (int(bev_img.shape[1] * target_height / bev_img.shape[0]), target_height)) vis_img = np.hstack([np.hstack(resized_imgs), bev_img]) else: cam_img = vis_imgs[0] if vis_imgs else bev_img if bev_img.shape[0] != cam_img.shape[0]: bev_img = cv2.resize(bev_img, (int(bev_img.shape[1] * cam_img.shape[0] / bev_img.shape[0]), cam_img.shape[0])) vis_img = np.hstack([cam_img, bev_img]) if vis_imgs else bev_img return vis_img def create_video_from_images(image_dir, output_video_path, fps=3): import subprocess image_files = sorted([f for f in os.listdir(image_dir) if f.endswith(('.png', '.jpg', '.jpeg'))]) if len(image_files) == 0: return first_img = cv2.imread(osp.join(image_dir, image_files[0])) if first_img is None: return height, width = first_img.shape[:2] max_width, max_height = 1920, 1080 if width > max_width or height > max_height: scale = min(max_width / width, max_height / height) width, height = int(width * scale), int(height * scale) fourcc = cv2.VideoWriter_fourcc(*'mp4v') video_writer = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height)) if not video_writer.isOpened(): fourcc = cv2.VideoWriter_fourcc(*'XVID') video_writer = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height)) for img_file in tqdm(image_files, desc=f"Creating video: {osp.basename(output_video_path)}"): img_path = osp.join(image_dir, img_file) img = cv2.imread(img_path) if img is not None: if img.shape[:2] != (height, width): img = cv2.resize(img, (width, height)) video_writer.write(img) video_writer.release() def main(): args = parse_args() # Load configuration from JSON config = load_config_from_json(args.config_json) # Create output directory os.makedirs(args.output_dir, exist_ok=True) # Load AXModel ax_session = load_axmodel(args.model) # Get model parameters from config transformer_cfg = config['model']['transformer'] bev_h = transformer_cfg['bev_h'] bev_w = transformer_cfg['bev_w'] embed_dims = transformer_cfg['embed_dims'] # Load scene index scene_index_path = osp.join(args.data_dir, 'scene_index.json') with open(scene_index_path, 'r') as f: scene_index_data = json.load(f) scenes_dict = scene_index_data['scenes'] scene_names = list(scenes_dict.keys()) end_scene = args.end_scene if args.end_scene is not None else len(scene_names) end_scene = min(end_scene, len(scene_names)) prev_frame_info = { 'prev_bev': None, 'scene_token': None, 'prev_pos': np.zeros(3, dtype=np.float32), 'prev_angle': 0.0, } scene_results = defaultdict(list) # Process all scenes for scene_idx in range(args.start_scene, end_scene): scene_name = scene_names[scene_idx] scene_info = scenes_dict[scene_name] sample_indices = scene_info['samples'] num_frames = len(sample_indices) print(f"Processing scene {scene_idx+1}/{len(scene_names)}: {scene_name} ({num_frames} frames)") # Reset prev_bev for new scene if scene_name != prev_frame_info['scene_token']: prev_frame_info['prev_bev'] = None prev_frame_info['prev_pos'] = np.zeros(3, dtype=np.float32) prev_frame_info['prev_angle'] = 0.0 prev_frame_info['scene_token'] = scene_name # Process all frames in this scene for local_idx, frame_idx in enumerate(tqdm(sample_indices, desc=f"Scene {scene_name}")): # Load data img, lidar2img, can_bus, meta = load_data(args.data_dir, scene_name, frame_idx) # Process can_bus (compute delta) curr_can_bus_np = can_bus[0] # (18,) tmp_pos = curr_can_bus_np[:3].copy() tmp_angle = curr_can_bus_np[-1] delta_can_bus_np = curr_can_bus_np.copy() if prev_frame_info['prev_bev'] is not None and prev_frame_info['scene_token'] == scene_name: delta_can_bus_np[:3] -= prev_frame_info['prev_pos'] delta_can_bus_np[-1] -= prev_frame_info['prev_angle'] else: delta_can_bus_np[:3] = 0.0 delta_can_bus_np[-1] = 0.0 prev_frame_info['prev_pos'] = tmp_pos prev_frame_info['prev_angle'] = tmp_angle # Prepare prev_bev prev_bev_input = next((inp for inp in ax_session.get_inputs() if inp.name == 'prev_bev'), None) expected_shape = (bev_h * bev_w, 1, embed_dims) if prev_bev_input is not None: expected_shape = list(prev_bev_input.shape) for i, dim in enumerate(expected_shape): if isinstance(dim, str) or dim < 0: expected_shape[i] = (bev_h * bev_w, 1, embed_dims)[i] if i < 3 else 1 expected_shape = tuple(expected_shape) if prev_frame_info['prev_bev'] is None: prev_bev = np.zeros(expected_shape, dtype=np.float32) else: prev_bev = prev_frame_info['prev_bev'] if prev_bev.shape != expected_shape and len(prev_bev.shape) == 3: prev_bev = prev_bev.reshape(expected_shape) # Prepare AXEngine inputs img_np = img.astype(np.float32) lidar2img_np = lidar2img.astype(np.float32) can_bus_np = delta_can_bus_np.reshape(1, -1).astype(np.float32) input_names = [inp.name for inp in ax_session.get_inputs()] ax_inputs = {} for name in input_names: if name == 'img': ax_inputs['img'] = img_np elif name == 'can_bus': ax_inputs['can_bus'] = can_bus_np elif name == 'lidar2img': ax_inputs['lidar2img'] = lidar2img_np elif name == 'prev_bev': ax_inputs['prev_bev'] = prev_bev # Run inference ax_outputs = ax_session.run(None, ax_inputs) bev_embed, all_cls_scores, all_bbox_preds = ax_outputs prev_frame_info['prev_bev'] = bev_embed # Post-process results = post_process_outputs_np( all_cls_scores, all_bbox_preds, config, args.score_thr ) # Visualize img_norm_cfg = config['img_norm'] class_names = config['dataset']['class_names'] pc_range = config['model']['bbox_coder']['pc_range'] vis_img = visualize_results_np( img, results[0], lidar2img, img_norm_cfg, class_names, args.score_thr, pc_range=pc_range ) scene_results[scene_name].append({ 'frame_idx': local_idx, 'result': results[0], 'vis_img': vis_img, 'meta': meta }) # Save results for scene_name, frames in tqdm(scene_results.items(), desc="Save scene results"): scene_dir = osp.join(args.output_dir, scene_name) os.makedirs(scene_dir, exist_ok=True) images_dir = osp.join(scene_dir, 'images') os.makedirs(images_dir, exist_ok=True) for local_idx, frame_data in enumerate(frames): vis_img = frame_data['vis_img'] if vis_img is None: continue if not isinstance(vis_img, np.ndarray): vis_img = np.array(vis_img) if vis_img.dtype != np.uint8: vis_img = (vis_img * 255).astype(np.uint8) if vis_img.max() <= 1.0 else vis_img.astype(np.uint8) if len(vis_img.shape) == 3 and vis_img.shape[0] in (1, 3): vis_img = vis_img.transpose(1, 2, 0) if vis_img.shape[0] > 0 and vis_img.shape[1] > 0: cv2.imwrite(osp.join(images_dir, f'frame_{local_idx:06d}.png'), vis_img) create_video_from_images(images_dir, osp.join(scene_dir, f'{scene_name}_result.mp4'), args.fps) print(f"✓ Scene {scene_name}: {len(frames)} frames, video: {osp.join(scene_dir, f'{scene_name}_result.mp4')}") if __name__ == '__main__': main()