|
|
import argparse |
|
|
import glob |
|
|
import os |
|
|
import pickle |
|
|
import pickle as pkl |
|
|
from typing import Dict, List |
|
|
|
|
|
import utils.sensor as box_sensor |
|
|
import utils.transform as box_transform |
|
|
import utils.vis as box_vis |
|
|
import utils.rotation as object_rotation |
|
|
import utils.calibration as calib_utils |
|
|
import numpy as np |
|
|
import pyquaternion |
|
|
import scipy |
|
|
from utils.miscell import find_unique_common_from_lists |
|
|
from utils.data_structure import Box3D |
|
|
from nuscenes.eval.common.utils import Quaternion, quaternion_yaw |
|
|
|
|
|
from utils.nuplan_pointcloud import PointCloud |
|
|
|
|
|
np.set_printoptions(precision=3, suppress=True) |
|
|
|
|
|
FPS = 2 |
|
|
FRAME_INTERVAL = 1 |
|
|
FPS_KEYFRAME = FPS / FRAME_INTERVAL |
|
|
|
|
|
global_track_id = 1 |
|
|
mapping_tracktoken2globalid = dict() |
|
|
|
|
|
|
|
|
def parse_ego_sensor_calib(anno: Dict, data_split: str, vis: bool = False) -> Dict: |
|
|
"""Parse calibration between world, ego, lidar and camera""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
sdc_center_ego = np.array([[0, 0, 0, 1.0]]).transpose() |
|
|
sdc_center_lidar = np.linalg.inv(anno["lidar2ego"]) @ sdc_center_ego |
|
|
yaw_lidar = scipy.spatial.transform.Rotation.from_matrix( |
|
|
np.linalg.inv(anno["lidar2ego"])[:3, :3] |
|
|
).as_euler("xyz", degrees=False)[-1] |
|
|
l, w, h = 4.52, 2.1, 1.56 |
|
|
sdc_vel_ego = anno["can_bus"][10:13] |
|
|
sdc_vel_global = anno["ego2global"][:3, :3] @ sdc_vel_ego |
|
|
sdc_vel_lidar = np.linalg.inv(anno["lidar2ego"][:3, :3]) @ sdc_vel_ego |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
gt_sdc_bbox_lidar = np.array( |
|
|
[ |
|
|
sdc_center_lidar[0, 0], |
|
|
sdc_center_lidar[1, 0], |
|
|
sdc_center_lidar[2, 0], |
|
|
l, |
|
|
w, |
|
|
h, |
|
|
yaw_lidar, |
|
|
sdc_vel_lidar[0], |
|
|
sdc_vel_lidar[1], |
|
|
] |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cams = anno["cams"] |
|
|
for cam_type, cam_dict in cams.items(): |
|
|
sensor2lidar = np.identity(4) |
|
|
sensor2lidar[:3, :3] = cam_dict["sensor2lidar_rotation"] |
|
|
sensor2lidar[:3, 3] = cam_dict["sensor2lidar_translation"] |
|
|
sensor2ego: np.ndarray = anno["lidar2ego"] @ sensor2lidar |
|
|
|
|
|
|
|
|
cams[cam_type].update( |
|
|
{ |
|
|
"type": cam_type, |
|
|
"sensor2ego_translation": sensor2ego[:3, 3], |
|
|
"sensor2ego_rotation": object_rotation.convert_mat2qua( |
|
|
sensor2ego[:3, :3] |
|
|
), |
|
|
} |
|
|
) |
|
|
|
|
|
pts_filename = anno["lidar_path"] |
|
|
src_split = data_split.split("_")[0] |
|
|
data_root = f"./data/openscene-v1.1/sensor_blobs/{src_split}" |
|
|
if data_root not in pts_filename: |
|
|
pts_filename = os.path.join(data_root, pts_filename) |
|
|
|
|
|
points = PointCloud.parse_from_file(pts_filename).to_pcd_bin2() |
|
|
points = points[:3].T |
|
|
|
|
|
|
|
|
if vis: |
|
|
log_root = pts_filename.split("/")[5] |
|
|
save_root = ( |
|
|
f"./data/openscene-v1.1/vis/{data_split}/{log_root}/{anno['scene_name']}" |
|
|
) |
|
|
|
|
|
if not os.path.exists(save_root): |
|
|
os.makedirs(save_root) |
|
|
save_path = os.path.join( |
|
|
save_root, f"{anno['frame_idx']:06d}_{anno['token']}.jpg" |
|
|
) |
|
|
box_vis.vis_box_on_lidar( |
|
|
lidar=points, |
|
|
save_path=save_path, |
|
|
left_hand=False, |
|
|
ego2lidar=np.linalg.inv(anno["lidar2ego"]), |
|
|
) |
|
|
|
|
|
command = np.argmax(anno["driving_command"]) |
|
|
|
|
|
anno.update( |
|
|
{ |
|
|
"lidar_pts": points, |
|
|
"sweeps": [], |
|
|
"cams": cams, |
|
|
"sdc_vel_global": sdc_vel_global, |
|
|
|
|
|
"gt_sdc_bbox_lidar": gt_sdc_bbox_lidar, |
|
|
"command": command, |
|
|
} |
|
|
) |
|
|
|
|
|
return anno |
|
|
|
|
|
|
|
|
def parse_bbox(anno: Dict, data_split: str, vis: bool = False) -> Dict: |
|
|
"""Parse each one of the bounding boxes and compute properties""" |
|
|
|
|
|
num_obj: int = anno["anns"]["gt_boxes"].shape[0] |
|
|
anno.update( |
|
|
{ |
|
|
"gt_velocity": anno["anns"]["gt_velocity_3d"][:, :2], |
|
|
"gt_boxes": anno["anns"]["gt_boxes"], |
|
|
"gt_bboxes_global": np.zeros((num_obj, 9), dtype="float32"), |
|
|
"gt_names": anno["anns"]["gt_names"], |
|
|
"gt_inds": np.zeros((num_obj), dtype="int64"), |
|
|
"num_lidar_pts": np.zeros((num_obj,), dtype="float32"), |
|
|
"valid_flag": np.zeros((num_obj,), dtype="bool"), |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
bbox_gt_in_ego_list: List[np.ndarray] = list() |
|
|
valid_id_list: List[np.ndarray] = list() |
|
|
for bbox_index in range(num_obj): |
|
|
|
|
|
box_7dof_lidar = anno["anns"]["gt_boxes"][bbox_index] |
|
|
instance_token = anno["anns"]["instance_tokens"][bbox_index] |
|
|
track_token = anno["anns"]["track_tokens"][bbox_index] |
|
|
|
|
|
|
|
|
global mapping_tracktoken2globalid |
|
|
if track_token not in mapping_tracktoken2globalid: |
|
|
global global_track_id |
|
|
mapping_tracktoken2globalid[track_token] = global_track_id |
|
|
anno["gt_inds"][bbox_index] = global_track_id |
|
|
|
|
|
|
|
|
global_track_id += 1 |
|
|
else: |
|
|
anno["gt_inds"][bbox_index] = mapping_tracktoken2globalid[track_token] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
bbox2lidar = calib_utils.create_transform( |
|
|
box_7dof_lidar[:3], np.array([0, box_7dof_lidar[-1], 0]) |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
bbox_center_lidar = np.array([[0, 0, 0, 1.0]]).transpose() |
|
|
bbox_center_lidar[:3, 0] = box_7dof_lidar[:3] |
|
|
l, w, h = box_7dof_lidar[3:6] |
|
|
bbox_center_world = anno["lidar2global"] @ bbox_center_lidar |
|
|
bbox2global = anno["lidar2global"] @ bbox2lidar |
|
|
yaw_global = scipy.spatial.transform.Rotation.from_matrix( |
|
|
bbox2global[:3, :3] |
|
|
).as_euler("xyz", degrees=False)[-1] |
|
|
velocity_lidar = np.array([[0, 0, 1.0]]).transpose() |
|
|
velocity_lidar[:2, 0] = anno["gt_velocity"][bbox_index] |
|
|
velocity_world = anno["lidar2global"][:3, :3] @ velocity_lidar |
|
|
|
|
|
|
|
|
box_7dof_global = np.array( |
|
|
[ |
|
|
float(bbox_center_world[0]), |
|
|
float(bbox_center_world[1]), |
|
|
float(bbox_center_world[2]), |
|
|
] |
|
|
+ [l, w, h] |
|
|
+ [float(yaw_global)] |
|
|
) |
|
|
anno["gt_bboxes_global"][bbox_index, :7] = box_7dof_global |
|
|
anno["gt_bboxes_global"][bbox_index, 7:] = velocity_world[:2, 0] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
box3d: Box3D = Box3D.array2bbox_xyzlwhyaw(box_7dof_lidar) |
|
|
box_in_lidar: np.ndarray = Box3D.box2corners3d_lidar(box3d) |
|
|
pc_sub: np.ndarray = box_sensor.extract_pc_in_box3d( |
|
|
anno["lidar_pts"], box_in_lidar |
|
|
)[ |
|
|
0 |
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
anno["num_lidar_pts"][bbox_index] = pc_sub.shape[0] |
|
|
|
|
|
|
|
|
if pc_sub.shape[0] > 0: |
|
|
anno["valid_flag"][bbox_index] = True |
|
|
|
|
|
|
|
|
if vis: |
|
|
box_in_ego: np.ndarray = box_transform.box_in_lidar_to_ego( |
|
|
box_in_lidar, lidar2ego=anno["lidar2ego"], |
|
|
) |
|
|
bbox_gt_in_ego_list.append(box_in_ego) |
|
|
valid_id_list.append(anno["gt_inds"][bbox_index]) |
|
|
|
|
|
if vis: |
|
|
pts_filename = anno["lidar_path"] |
|
|
log_root = pts_filename.split("/")[0] |
|
|
save_root = f"./data/openscene-v1.1/vis_bbox/{data_split}/{log_root}/{anno['scene_name']}" |
|
|
|
|
|
if not os.path.exists(save_root): |
|
|
os.makedirs(save_root) |
|
|
save_path = os.path.join( |
|
|
save_root, f"{anno['frame_idx']:06d}_{anno['token']}.jpg" |
|
|
) |
|
|
|
|
|
|
|
|
box3d: Box3D = Box3D.array2bbox_xyzlwhyaw(anno["gt_sdc_bbox_lidar"][:7]) |
|
|
sdc_box_in_lidar: np.ndarray = Box3D.box2corners3d_lidar(box3d) |
|
|
sdc_box_in_ego: np.ndarray = box_transform.box_in_lidar_to_ego( |
|
|
sdc_box_in_lidar, lidar2ego=anno["lidar2ego"], |
|
|
) |
|
|
bbox_gt_in_ego_list.append(sdc_box_in_ego) |
|
|
valid_id_list.append(-1) |
|
|
|
|
|
|
|
|
box_vis.vis_box_on_lidar( |
|
|
anno["lidar_pts"], |
|
|
save_path, |
|
|
bbox_gt_in_ego_list=bbox_gt_in_ego_list, |
|
|
bev=True, |
|
|
id_list=valid_id_list, |
|
|
left_hand=False, |
|
|
ego2lidar=np.linalg.inv(anno["lidar2ego"]), |
|
|
) |
|
|
|
|
|
return anno |
|
|
|
|
|
|
|
|
def parse_data(anno: Dict, data_split: str, vis: bool = False) -> Dict: |
|
|
"""Convert carla data into the collection of path needed by mmdetection3D dataloader""" |
|
|
|
|
|
anno: Dict = parse_ego_sensor_calib(anno, data_split, vis=vis) |
|
|
anno: Dict = parse_bbox(anno, data_split, vis=vis) |
|
|
|
|
|
return anno |
|
|
|
|
|
|
|
|
def parse_sequence( |
|
|
seq: str, |
|
|
annos: List[Dict], |
|
|
seq_index: List[int], |
|
|
data_split: str, |
|
|
pre_sec: float = 1.5, |
|
|
fut_sec: float = 4, |
|
|
vis: bool = False, |
|
|
) -> List[Dict]: |
|
|
"""Add past/future trajectory into data at each frame to allow prediction""" |
|
|
|
|
|
anno_seq: List[Dict] = [annos[index] for index in seq_index] |
|
|
print(f"processing: {seq}, number of frames {len(anno_seq)}") |
|
|
|
|
|
|
|
|
|
|
|
def sort_data(data_dict: Dict): |
|
|
return data_dict["frame_idx"] |
|
|
anno_seq.sort(key=sort_data) |
|
|
|
|
|
|
|
|
min_frame_ID: int = anno_seq[0]["frame_idx"] |
|
|
max_frame_ID: int = anno_seq[-1]["frame_idx"] |
|
|
if len(anno_seq) < max_frame_ID - min_frame_ID + 1: |
|
|
for index in seq_index: |
|
|
annos[index] = None |
|
|
|
|
|
return annos |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for index in range(len(seq_index)): |
|
|
anno: Dict = anno_seq[index] |
|
|
frame_ID: int = anno["frame_idx"] |
|
|
|
|
|
|
|
|
|
|
|
scene_token: str = anno["scene_token"] |
|
|
assert scene_token == seq, "error, not the same sequence" |
|
|
|
|
|
|
|
|
|
|
|
gt_bboxes_lidar: np.ndarray = anno["anns"]["gt_boxes"] |
|
|
num_obj: int = gt_bboxes_lidar.shape[0] |
|
|
|
|
|
|
|
|
obj_ids: List[int] = anno["gt_inds"].tolist() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
l, w, h = anno["gt_sdc_bbox_lidar"][3:6] |
|
|
lidar2global: np.ndarray = anno["lidar2global"] |
|
|
|
|
|
|
|
|
pre_frames: int = int(pre_sec * FPS_KEYFRAME) |
|
|
fut_frames: int = int(fut_sec * FPS_KEYFRAME) |
|
|
|
|
|
|
|
|
|
|
|
gt_fut_bbox_lidar: np.ndarray = np.zeros((num_obj, fut_frames, 9)) |
|
|
gt_pre_bbox_lidar: np.ndarray = np.zeros((num_obj, pre_frames, 9)) |
|
|
|
|
|
|
|
|
gt_fut_bbox_mask: np.ndarray = np.zeros((num_obj, fut_frames, 1)) |
|
|
gt_pre_bbox_mask: np.ndarray = np.zeros((num_obj, pre_frames, 1)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
gt_fut_bbox_lidar_all: List[np.ndarray] = [ |
|
|
[np.empty([0])] for _ in range(fut_frames) |
|
|
] |
|
|
gt_pre_bbox_lidar_all: List[np.ndarray] = [ |
|
|
[np.empty([0])] for _ in range(pre_frames) |
|
|
] |
|
|
|
|
|
|
|
|
gt_pre_bbox_sdc_lidar: np.ndarray = np.zeros((1, pre_frames, 9)) |
|
|
gt_fut_bbox_sdc_lidar: np.ndarray = np.zeros((1, fut_frames, 9)) |
|
|
gt_pre_bbox_sdc_global: np.ndarray = np.zeros((1, pre_frames, 9)) |
|
|
gt_fut_bbox_sdc_global: np.ndarray = np.zeros((1, fut_frames, 9)) |
|
|
gt_pre_bbox_sdc_mask: np.ndarray = np.zeros((1, pre_frames, 1)) |
|
|
gt_fut_bbox_sdc_mask: np.ndarray = np.zeros((1, fut_frames, 1)) |
|
|
gt_pre_command_sdc: np.ndarray = np.zeros((1, pre_frames, 1)) |
|
|
gt_fut_command_sdc: np.ndarray = np.zeros((1, fut_frames, 1)) |
|
|
|
|
|
|
|
|
|
|
|
for pre_frame_index in range(1, pre_frames + 1): |
|
|
pre_frame_ID = int(frame_ID - pre_frame_index * FRAME_INTERVAL) |
|
|
pre_frame_index_in_seq: int = index - pre_frame_index |
|
|
|
|
|
|
|
|
if pre_frame_ID < min_frame_ID: |
|
|
continue |
|
|
|
|
|
|
|
|
anno_pre_tmp: Dict = anno_seq[pre_frame_index_in_seq] |
|
|
gt_pre_bbox_global_tmp: np.ndarray = anno_pre_tmp[ |
|
|
"gt_bboxes_global" |
|
|
] |
|
|
num_obj_pre: int = gt_pre_bbox_global_tmp.shape[0] |
|
|
gt_pre_bbox_lidar_tmp = np.zeros((num_obj_pre, 9)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
loc_global = np.concatenate( |
|
|
(gt_pre_bbox_global_tmp[:, :3], np.ones((num_obj_pre, 1))), axis=1 |
|
|
).T |
|
|
loc_lidar = np.linalg.inv(lidar2global) @ loc_global |
|
|
gt_pre_bbox_lidar_tmp[:, :3] = loc_lidar[:3, :].transpose() |
|
|
|
|
|
|
|
|
gt_pre_bbox_lidar_tmp[:, 3:6] = gt_pre_bbox_global_tmp[:, 3:6] |
|
|
|
|
|
|
|
|
gt_pre_rot_global_rad = np.concatenate( |
|
|
( |
|
|
np.zeros((num_obj_pre, 1)), |
|
|
gt_pre_bbox_global_tmp[:, [6]], |
|
|
np.zeros((num_obj_pre, 1)), |
|
|
), |
|
|
axis=1, |
|
|
) |
|
|
gt_pre_rot_global_deg = gt_pre_rot_global_rad / np.pi * 180 |
|
|
for obj_index in range(num_obj_pre): |
|
|
gt_pre_trans_global = calib_utils.create_transform( |
|
|
gt_pre_bbox_global_tmp[obj_index, :3], |
|
|
gt_pre_rot_global_deg[obj_index], |
|
|
) |
|
|
gt_pre_trans_lidar = ( |
|
|
np.linalg.inv(anno["lidar2global"]) @ gt_pre_trans_global |
|
|
) |
|
|
gt_pre_yaw_lidar = scipy.spatial.transform.Rotation.from_matrix( |
|
|
gt_pre_trans_lidar[:3, :3] |
|
|
).as_euler("xyz", degrees=False)[-1] |
|
|
gt_pre_bbox_lidar_tmp[obj_index, 6] = gt_pre_yaw_lidar |
|
|
|
|
|
|
|
|
gt_pre_vel_global = np.concatenate( |
|
|
(gt_pre_bbox_global_tmp[:, 7:], np.zeros((num_obj_pre, 1)),), |
|
|
axis=1, |
|
|
) |
|
|
gt_pre_vel_lidar: np.ndarray = ( |
|
|
np.linalg.inv(lidar2global[:3, :3]) @ gt_pre_vel_global.T |
|
|
).T |
|
|
gt_pre_bbox_lidar_tmp[:, 7:] = gt_pre_vel_lidar[:, :2] |
|
|
|
|
|
|
|
|
gt_pre_bbox_lidar_all[pre_frame_index - 1] = gt_pre_bbox_lidar_tmp |
|
|
|
|
|
|
|
|
|
|
|
obj_ids_pre: List[int] = anno_pre_tmp["gt_inds"].tolist() |
|
|
( |
|
|
obj_ids_common, |
|
|
index_cur, |
|
|
index_past, |
|
|
) = find_unique_common_from_lists(obj_ids, obj_ids_pre) |
|
|
|
|
|
|
|
|
if len(index_cur) > 0: |
|
|
gt_pre_bbox_mask[np.array(index_cur), pre_frame_index - 1] = 1 |
|
|
|
|
|
|
|
|
gt_pre_bbox_lidar[ |
|
|
np.array(index_cur), pre_frame_index - 1, : |
|
|
] = gt_pre_bbox_lidar_tmp[np.array(index_past), :] |
|
|
|
|
|
|
|
|
sdc_loc_global, _ = calib_utils.transform_matrix_to_vector( |
|
|
anno_pre_tmp["ego2global"] |
|
|
) |
|
|
sdc_loc_global = np.concatenate((sdc_loc_global, np.array([1]))) |
|
|
sdc_loc_lidar = np.linalg.inv(lidar2global) @ sdc_loc_global |
|
|
sdc_vel_global: np.ndarray = anno_pre_tmp["sdc_vel_global"] |
|
|
sdc_vel_lidar: np.ndarray = ( |
|
|
np.linalg.inv(lidar2global[:3, :3]) @ sdc_vel_global |
|
|
) |
|
|
sdc_trans_lidar = ( |
|
|
np.linalg.inv(lidar2global) @ anno_pre_tmp["ego2global"] |
|
|
) |
|
|
yaw_lidar = scipy.spatial.transform.Rotation.from_matrix( |
|
|
sdc_trans_lidar[:3, :3] |
|
|
).as_euler("xyz", degrees=False)[-1] |
|
|
yaw_global = scipy.spatial.transform.Rotation.from_matrix( |
|
|
anno_pre_tmp["ego2global"][:3, :3] |
|
|
).as_euler("xyz", degrees=False)[-1] |
|
|
gt_pre_bbox_sdc_mask[0, pre_frame_index - 1] = 1 |
|
|
|
|
|
|
|
|
gt_sdc_bbox_lidar = np.array( |
|
|
[ |
|
|
sdc_loc_lidar[0], |
|
|
sdc_loc_lidar[1], |
|
|
sdc_loc_lidar[2], |
|
|
l, |
|
|
w, |
|
|
h, |
|
|
yaw_lidar, |
|
|
sdc_vel_lidar[0], |
|
|
sdc_vel_lidar[1], |
|
|
] |
|
|
) |
|
|
gt_pre_bbox_sdc_lidar[0, pre_frame_index - 1] = gt_sdc_bbox_lidar |
|
|
gt_sdc_bbox_global = np.array( |
|
|
[ |
|
|
sdc_loc_global[0], |
|
|
sdc_loc_global[1], |
|
|
sdc_loc_global[2], |
|
|
l, |
|
|
w, |
|
|
h, |
|
|
yaw_global, |
|
|
sdc_vel_global[0], |
|
|
sdc_vel_global[1], |
|
|
] |
|
|
) |
|
|
gt_pre_bbox_sdc_global[0, pre_frame_index - 1] = gt_sdc_bbox_global |
|
|
|
|
|
|
|
|
gt_pre_command_sdc[0, pre_frame_index - 1]: int = anno_pre_tmp["command"] |
|
|
|
|
|
|
|
|
|
|
|
for fut_frame_index in range(1, fut_frames + 1): |
|
|
fut_frame_ID = int(frame_ID + fut_frame_index * FRAME_INTERVAL) |
|
|
fut_frame_index_in_seq: int = index + fut_frame_index |
|
|
|
|
|
|
|
|
if fut_frame_ID > max_frame_ID: |
|
|
continue |
|
|
|
|
|
|
|
|
try: |
|
|
anno_fut_tmp: Dict = anno_seq[fut_frame_index_in_seq] |
|
|
except: |
|
|
print('fut_frame_index_in_seq', fut_frame_index_in_seq) |
|
|
anno_fut_tmp: Dict = anno_seq[fut_frame_index_in_seq] |
|
|
|
|
|
gt_fut_bbox_global_tmp: np.ndarray = anno_fut_tmp[ |
|
|
"gt_bboxes_global" |
|
|
] |
|
|
num_obj_fut: int = gt_fut_bbox_global_tmp.shape[0] |
|
|
gt_fut_bbox_lidar_tmp = np.zeros((num_obj_fut, 9)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
loc_global = np.concatenate( |
|
|
(gt_fut_bbox_global_tmp[:, :3], np.ones((num_obj_fut, 1))), axis=1, |
|
|
).T |
|
|
loc_lidar = np.linalg.inv(lidar2global) @ loc_global |
|
|
gt_fut_bbox_lidar_tmp[:, :3] = loc_lidar[:3, :].transpose() |
|
|
|
|
|
|
|
|
gt_fut_bbox_lidar_tmp[:, 3:6] = gt_fut_bbox_global_tmp[:, 3:6] |
|
|
|
|
|
|
|
|
gt_fut_rot_global_rad = np.concatenate( |
|
|
( |
|
|
np.zeros((num_obj_fut, 1)), |
|
|
gt_fut_bbox_global_tmp[:, [6]], |
|
|
np.zeros((num_obj_fut, 1)), |
|
|
), |
|
|
axis=1, |
|
|
) |
|
|
gt_fut_rot_global_deg = gt_fut_rot_global_rad / np.pi * 180 |
|
|
for obj_index in range(num_obj_fut): |
|
|
gt_trans_global_future = calib_utils.create_transform( |
|
|
gt_fut_bbox_global_tmp[obj_index, :3], |
|
|
gt_fut_rot_global_deg[obj_index], |
|
|
) |
|
|
gt_fut_trans_lidar = ( |
|
|
np.linalg.inv(anno["lidar2global"]) @ gt_trans_global_future |
|
|
) |
|
|
gt_fut_yaw_lidar = scipy.spatial.transform.Rotation.from_matrix( |
|
|
gt_fut_trans_lidar[:3, :3] |
|
|
).as_euler("xyz", degrees=False)[-1] |
|
|
gt_fut_bbox_lidar_tmp[obj_index, 6] = gt_fut_yaw_lidar |
|
|
|
|
|
|
|
|
gt_fut_vel_global = np.concatenate( |
|
|
(gt_fut_bbox_global_tmp[:, 7:], np.zeros((num_obj_fut, 1)),), |
|
|
axis=1, |
|
|
) |
|
|
gt_fut_vel_lidar: np.ndarray = ( |
|
|
np.linalg.inv(lidar2global[:3, :3]) @ gt_fut_vel_global.T |
|
|
).T |
|
|
gt_fut_bbox_lidar_tmp[:, 7:] = gt_fut_vel_lidar[:, :2] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
gt_fut_bbox_lidar_all[fut_frame_index - 1] = gt_fut_bbox_lidar_tmp |
|
|
|
|
|
|
|
|
|
|
|
obj_ids_fut: List[int] = anno_fut_tmp["gt_inds"].tolist() |
|
|
( |
|
|
obj_ids_common, |
|
|
index_cur, |
|
|
index_fut, |
|
|
) = find_unique_common_from_lists(obj_ids, obj_ids_fut) |
|
|
|
|
|
|
|
|
if len(index_cur) > 0: |
|
|
gt_fut_bbox_mask[np.array(index_cur), fut_frame_index - 1] = 1 |
|
|
|
|
|
gt_fut_bbox_lidar[ |
|
|
np.array(index_cur), fut_frame_index - 1, : |
|
|
] = gt_fut_bbox_lidar_tmp[np.array(index_fut), :] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
sdc_loc_global, _ = calib_utils.transform_matrix_to_vector( |
|
|
anno_fut_tmp["ego2global"] |
|
|
) |
|
|
sdc_loc_global = np.concatenate((sdc_loc_global, np.array([1]))) |
|
|
sdc_loc_lidar = np.linalg.inv(lidar2global) @ sdc_loc_global |
|
|
sdc_vel_global: np.ndarray = anno_fut_tmp["sdc_vel_global"] |
|
|
sdc_vel_lidar: np.ndarray = ( |
|
|
np.linalg.inv(lidar2global[:3, :3]) @ sdc_vel_global |
|
|
) |
|
|
sdc_trans_lidar = ( |
|
|
np.linalg.inv(lidar2global) @ anno_fut_tmp["ego2global"] |
|
|
) |
|
|
yaw_lidar = scipy.spatial.transform.Rotation.from_matrix( |
|
|
sdc_trans_lidar[:3, :3] |
|
|
).as_euler("xyz", degrees=False)[-1] |
|
|
yaw_global = scipy.spatial.transform.Rotation.from_matrix( |
|
|
anno_fut_tmp["ego2global"][:3, :3] |
|
|
).as_euler("xyz", degrees=False)[-1] |
|
|
gt_fut_bbox_sdc_mask[0, fut_frame_index - 1] = 1 |
|
|
|
|
|
|
|
|
gt_sdc_bbox_lidar = np.array( |
|
|
[ |
|
|
sdc_loc_lidar[0], |
|
|
sdc_loc_lidar[1], |
|
|
sdc_loc_lidar[2], |
|
|
l, |
|
|
w, |
|
|
h, |
|
|
yaw_lidar, |
|
|
sdc_vel_lidar[0], |
|
|
sdc_vel_lidar[1], |
|
|
] |
|
|
) |
|
|
gt_fut_bbox_sdc_lidar[0, fut_frame_index - 1] = gt_sdc_bbox_lidar |
|
|
gt_sdc_bbox_global = np.array( |
|
|
[ |
|
|
sdc_loc_global[0], |
|
|
sdc_loc_global[1], |
|
|
sdc_loc_global[2], |
|
|
l, |
|
|
w, |
|
|
h, |
|
|
yaw_global, |
|
|
sdc_vel_global[0], |
|
|
sdc_vel_global[1], |
|
|
] |
|
|
) |
|
|
gt_fut_bbox_sdc_global[0, fut_frame_index - 1] = gt_sdc_bbox_global |
|
|
|
|
|
|
|
|
gt_fut_command_sdc[0, fut_frame_index - 1]: int = anno_fut_tmp["command"] |
|
|
|
|
|
|
|
|
gt_pre_bbox_lidar = np.flip(gt_pre_bbox_lidar, axis=1) |
|
|
gt_pre_bbox_mask = np.flip(gt_pre_bbox_mask, axis=1) |
|
|
gt_pre_bbox_sdc_lidar = np.flip(gt_pre_bbox_sdc_lidar, axis=1) |
|
|
gt_pre_bbox_sdc_global = np.flip(gt_pre_bbox_sdc_global, axis=1) |
|
|
gt_pre_bbox_sdc_mask = np.flip(gt_pre_bbox_sdc_mask, axis=1) |
|
|
gt_pre_command_sdc = np.flip(gt_pre_command_sdc, axis=1) |
|
|
|
|
|
|
|
|
|
|
|
anno["gt_pre_bbox_lidar"] = gt_pre_bbox_lidar |
|
|
anno["gt_fut_bbox_lidar"] = gt_fut_bbox_lidar |
|
|
anno["gt_pre_bbox_mask"] = gt_pre_bbox_mask |
|
|
anno["gt_fut_bbox_mask"] = gt_fut_bbox_mask |
|
|
anno["gt_pre_bbox_sdc_lidar"] = gt_pre_bbox_sdc_lidar |
|
|
anno["gt_fut_bbox_sdc_lidar"] = gt_fut_bbox_sdc_lidar |
|
|
anno["gt_pre_bbox_sdc_global"] = gt_pre_bbox_sdc_global |
|
|
anno["gt_fut_bbox_sdc_global"] = gt_fut_bbox_sdc_global |
|
|
anno["gt_pre_bbox_sdc_mask"] = gt_pre_bbox_sdc_mask |
|
|
anno["gt_fut_bbox_sdc_mask"] = gt_fut_bbox_sdc_mask |
|
|
anno["gt_pre_command_sdc"] = gt_pre_command_sdc |
|
|
anno["gt_fut_command_sdc"] = gt_fut_command_sdc |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if vis: |
|
|
|
|
|
|
|
|
pts_filename = anno["lidar_path"] |
|
|
log_root = pts_filename.split("/")[0] |
|
|
save_root = f"./data/openscene-v1.1/vis_seq/{data_split}/{log_root}/{anno['scene_name']}" |
|
|
if not os.path.exists(save_root): |
|
|
os.makedirs(save_root) |
|
|
save_path = os.path.join( |
|
|
save_root, f"{anno['frame_idx']:06d}_{anno['token']}.jpg" |
|
|
) |
|
|
|
|
|
|
|
|
bbox_gt_in_ego_list: List[np.ndarray] = list() |
|
|
for bbox_index in range(num_obj): |
|
|
box_7dof_lidar: np.ndarray = anno["gt_boxes"][bbox_index, :] |
|
|
box3d: Box3D = Box3D.array2bbox_xyzlwhyaw(box_7dof_lidar) |
|
|
box_in_lidar: np.ndarray = Box3D.box2corners3d_lidar(box3d) |
|
|
box_in_ego: np.ndarray = box_transform.box_in_lidar_to_ego( |
|
|
box_in_lidar, lidar2ego=anno["lidar2ego"], |
|
|
) |
|
|
bbox_gt_in_ego_list.append(box_in_ego) |
|
|
|
|
|
|
|
|
box3d: Box3D = Box3D.array2bbox_xyzlwhyaw(anno["gt_sdc_bbox_lidar"][:7]) |
|
|
sdc_box_in_lidar: np.ndarray = Box3D.box2corners3d_lidar(box3d) |
|
|
sdc_box_in_ego: np.ndarray = box_transform.box_in_lidar_to_ego( |
|
|
sdc_box_in_lidar, lidar2ego=anno["lidar2ego"], |
|
|
) |
|
|
bbox_gt_in_ego_list.append(sdc_box_in_ego) |
|
|
|
|
|
|
|
|
bbox_fut_lidar_all = np.concatenate( |
|
|
(anno["gt_fut_bbox_lidar"], anno["gt_fut_bbox_sdc_lidar"]), axis=0 |
|
|
) |
|
|
bbox_fut_mask_all = np.concatenate( |
|
|
(anno["gt_fut_bbox_mask"], anno["gt_fut_bbox_sdc_mask"]), axis=0 |
|
|
) |
|
|
bbox_pre_lidar_all = np.concatenate( |
|
|
(anno["gt_pre_bbox_lidar"], anno["gt_pre_bbox_sdc_lidar"]), axis=0 |
|
|
) |
|
|
bbox_pre_mask_all = np.concatenate( |
|
|
(anno["gt_pre_bbox_mask"], anno["gt_pre_bbox_sdc_mask"]), axis=0 |
|
|
) |
|
|
|
|
|
|
|
|
box_vis.vis_box_on_lidar( |
|
|
anno["lidar_pts"], |
|
|
save_path, |
|
|
bbox_gt_in_ego_list=bbox_gt_in_ego_list, |
|
|
bev=True, |
|
|
id_list=obj_ids + [-1], |
|
|
fut_traj=bbox_fut_lidar_all, |
|
|
fut_traj_mask=bbox_fut_mask_all, |
|
|
pre_traj=bbox_pre_lidar_all, |
|
|
pre_traj_mask=bbox_pre_mask_all, |
|
|
left_hand=False, |
|
|
ego2lidar=np.linalg.inv(anno["lidar2ego"]), |
|
|
) |
|
|
|
|
|
|
|
|
del anno["lidar_pts"] |
|
|
|
|
|
return annos |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
parser = argparse.ArgumentParser() |
|
|
parser.add_argument( |
|
|
"--data_root", |
|
|
type=str, |
|
|
default="/mnt/hdd2/datasets/navsim/openscene-v1.1", |
|
|
help="root directory of raw carla data", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--output_dir", |
|
|
type=str, |
|
|
default="/mnt/hdd2/datasets/navsim/openscene-v1.1/infos", |
|
|
help="path of output file", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--split", |
|
|
type=str, |
|
|
default="mini_val", |
|
|
help="trainval_train/trainval_val/mini_train/mini_val", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--vis", |
|
|
action='store_true', |
|
|
help="turn on the visualization", |
|
|
) |
|
|
args = parser.parse_args() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
data_src = f"{args.data_root}/infos/openscene_{args.split}.pkl" |
|
|
with open(data_src, "rb") as f: |
|
|
data_src = pickle.load(f) |
|
|
|
|
|
seq_index: List[int] = [] |
|
|
scene_token_pre = None |
|
|
|
|
|
for frame_count_global, frame_data in enumerate(data_src): |
|
|
|
|
|
log_name = frame_data["log_name"] |
|
|
log_token = frame_data["log_token"] |
|
|
scene_name = frame_data["scene_name"] |
|
|
scene_token = frame_data["scene_token"] |
|
|
token = frame_data["token"] |
|
|
frame_idx = frame_data["frame_idx"] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if scene_token != scene_token_pre and scene_token_pre is not None: |
|
|
|
|
|
|
|
|
data_src: List[Dict] = parse_sequence( |
|
|
scene_token_pre, data_src, seq_index, args.split, vis=args.vis, |
|
|
) |
|
|
seq_index: List[int] = [] |
|
|
|
|
|
|
|
|
frame_data: Dict = parse_data(frame_data, args.split, vis=args.vis) |
|
|
data_src[frame_count_global] = frame_data |
|
|
|
|
|
|
|
|
seq_index.append(frame_count_global) |
|
|
scene_token_pre = scene_token |
|
|
|
|
|
if frame_count_global % 10 == 0: |
|
|
print("finished: ", frame_count_global) |
|
|
|
|
|
|
|
|
data_src: List[Dict] = parse_sequence( |
|
|
scene_token_pre, data_src, seq_index, args.split |
|
|
) |
|
|
print("total length: ", len(data_src)) |
|
|
|
|
|
|
|
|
save_data = [] |
|
|
for data_dict in data_src: |
|
|
if data_dict is not None: |
|
|
save_data.append(data_dict) |
|
|
print("total length after cleaning: ", len(save_data)) |
|
|
|
|
|
|
|
|
data_to_save = { |
|
|
"infos": save_data, |
|
|
"mapping_tracktoken2globalid": mapping_tracktoken2globalid, |
|
|
} |
|
|
output_path = os.path.join(args.output_dir, f"nuplan_{args.split}.pkl") |
|
|
with open(output_path, "wb") as f: |
|
|
pkl.dump(data_to_save, f) |
|
|
|