import numpy as np from nuscenes.prediction import ( PredictHelper, convert_local_coords_to_global, convert_global_coords_to_local, ) from mmdet3d.core.bbox import Box3DMode, Coord3DMode, LiDARInstance3DBoxes from nuscenes.eval.common.utils import quaternion_yaw, Quaternion from mmcv.parallel import DataContainer as DC from mmdet.datasets.pipelines import to_tensor class NuScenesTraj(object): def __init__( self, nusc, predict_steps, planning_steps, past_steps, fut_steps, with_velocity, CLASSES, box_mode_3d, use_nonlinear_optimizer=False, ): super().__init__() self.nusc = nusc self.prepare_sdc_vel_info() self.predict_steps = predict_steps self.planning_steps = planning_steps self.past_steps = past_steps self.fut_steps = fut_steps self.with_velocity = with_velocity self.CLASSES = CLASSES self.box_mode_3d = box_mode_3d self.predict_helper = PredictHelper(self.nusc) self.use_nonlinear_optimizer = use_nonlinear_optimizer def get_traj_label(self, sample_token, ann_tokens): sd_rec = self.nusc.get("sample", sample_token) fut_traj_all = [] fut_traj_valid_mask_all = [] past_traj_all = [] past_traj_valid_mask_all = [] _, boxes, _ = self.nusc.get_sample_data( sd_rec["data"]["LIDAR_TOP"], selected_anntokens=ann_tokens ) for i, ann_token in enumerate(ann_tokens): box = boxes[i] instance_token = self.nusc.get("sample_annotation", ann_token)[ "instance_token" ] fut_traj_local = self.predict_helper.get_future_for_agent( instance_token, sample_token, seconds=6, in_agent_frame=True ) past_traj_local = self.predict_helper.get_past_for_agent( instance_token, sample_token, seconds=2, in_agent_frame=True ) fut_traj = np.zeros((self.predict_steps, 2)) fut_traj_valid_mask = np.zeros((self.predict_steps, 2)) past_traj = np.zeros((self.past_steps + self.fut_steps, 2)) past_traj_valid_mask = np.zeros((self.past_steps + self.fut_steps, 2)) if fut_traj_local.shape[0] > 0: if self.use_nonlinear_optimizer: trans = box.center else: trans = np.array([0, 0, 0]) rot = Quaternion(matrix=box.rotation_matrix) fut_traj_scence_centric = convert_local_coords_to_global( fut_traj_local, trans, rot ) fut_traj[ : fut_traj_scence_centric.shape[0], : ] = fut_traj_scence_centric fut_traj_valid_mask[: fut_traj_scence_centric.shape[0], :] = 1 if past_traj_local.shape[0] > 0: # TODO: check if this is bug????? or maybe past_traj is not used at all # trans = np.array([0, 0, 0]) if self.use_nonlinear_optimizer: trans = box.center else: trans = np.array([0, 0, 0]) rot = Quaternion(matrix=box.rotation_matrix) past_traj_scence_centric = convert_local_coords_to_global( past_traj_local, trans, rot ) past_traj[ : past_traj_scence_centric.shape[0], : ] = past_traj_scence_centric past_traj_valid_mask[: past_traj_scence_centric.shape[0], :] = 1 if fut_traj_local.shape[0] > 0: fut_steps = min(self.fut_steps, fut_traj_scence_centric.shape[0]) past_traj[ self.past_steps : self.past_steps + fut_steps, : ] = fut_traj_scence_centric[:fut_steps] past_traj_valid_mask[ self.past_steps : self.past_steps + fut_steps, : ] = 1 fut_traj_all.append(fut_traj) fut_traj_valid_mask_all.append(fut_traj_valid_mask) past_traj_all.append(past_traj) past_traj_valid_mask_all.append(past_traj_valid_mask) if len(ann_tokens) > 0: fut_traj_all = np.stack(fut_traj_all, axis=0) fut_traj_valid_mask_all = np.stack(fut_traj_valid_mask_all, axis=0) past_traj_all = np.stack(past_traj_all, axis=0) past_traj_valid_mask_all = np.stack(past_traj_valid_mask_all, axis=0) else: fut_traj_all = np.zeros((0, self.predict_steps, 2)) fut_traj_valid_mask_all = np.zeros((0, self.predict_steps, 2)) past_traj_all = np.zeros((0, self.predict_steps, 2)) past_traj_valid_mask_all = np.zeros((0, self.predict_steps, 2)) return ( fut_traj_all, fut_traj_valid_mask_all, past_traj_all, past_traj_valid_mask_all, ) def get_vel_transform_mats(self, sample): sd_rec = self.nusc.get("sample_data", sample["data"]["LIDAR_TOP"]) cs_record = self.nusc.get( "calibrated_sensor", sd_rec["calibrated_sensor_token"] ) pose_record = self.nusc.get("ego_pose", sd_rec["ego_pose_token"]) l2e_r = cs_record["rotation"] l2e_t = cs_record["translation"] e2g_r = pose_record["rotation"] e2g_t = pose_record["translation"] l2e_r_mat = Quaternion(l2e_r).rotation_matrix e2g_r_mat = Quaternion(e2g_r).rotation_matrix return l2e_r_mat, e2g_r_mat def get_vel_and_time(self, sample): lidar_token = sample["data"]["LIDAR_TOP"] lidar_top = self.nusc.get("sample_data", lidar_token) pose = self.nusc.get("ego_pose", lidar_top["ego_pose_token"]) xyz = pose["translation"] timestamp = sample["timestamp"] return xyz, timestamp def prepare_sdc_vel_info(self): # generate sdc velocity info for all samples # Note that these velocity values are converted from # global frame to lidar frame # as aligned with bbox gts self.sdc_vel_info = {} for scene in self.nusc.scene: scene_token = scene["token"] # we cannot infer vel for the last sample, therefore we skip it last_sample_token = scene["last_sample_token"] sample_token = scene["first_sample_token"] sample = self.nusc.get("sample", sample_token) xyz, time = self.get_vel_and_time(sample) while sample["token"] != last_sample_token: next_sample_token = sample["next"] next_sample = self.nusc.get("sample", next_sample_token) next_xyz, next_time = self.get_vel_and_time(next_sample) # compute velocity by difference of location dc = np.array(next_xyz) - np.array(xyz) dt = (next_time - time) / 1e6 vel = dc / dt # global frame to lidar frame l2e_r_mat, e2g_r_mat = self.get_vel_transform_mats(sample) vel = vel @ np.linalg.inv(e2g_r_mat).T @ np.linalg.inv(l2e_r_mat).T # print('vel_lidar', vel) # vel_ego = vel_lidar @ l2e_r_mat.T # print('vel_ego', vel_ego) # vel_global = vel_ego @ e2g_r_mat.T # print('vel_global', vel_global) # print('vel in global', vel) # vel = vel @ np.linalg.inv(e2g_r_mat).T # print('vel in ego', vel) # vel = vel @ np.linalg.inv(l2e_r_mat).T # print('vel in lidar', vel) vel = vel[:2] self.sdc_vel_info[sample["token"]] = vel xyz, time = next_xyz, next_time sample = next_sample # set first sample's vel equal to second sample's last_sample = self.nusc.get("sample", last_sample_token) second_last_sample_token = last_sample["prev"] self.sdc_vel_info[last_sample_token] = self.sdc_vel_info[ second_last_sample_token ] def generate_sdc_info(self, sdc_vel, as_lidar_instance3d_box=False, old_canbus=True): # sdc dim from https://forum.nuscenes.org/t/dimensions-of-the-ego-vehicle-used-to-gather-data/550 # original code, wlh with non-adjusted yaw if old_canbus: psudo_sdc_bbox = np.array([0.0, 0.0, 0.0, 1.73, 4.08, 1.56, -np.pi]) # updated code, lwh with adjusted yaw to align with other things # TODO: change this would change the training loss, possibly prior models would not work else: psudo_sdc_bbox = np.array([0.0, 0.0, 0.0, 4.08, 1.73, 1.56, np.pi/2]) # below is wrong, but weird # psudo_sdc_bbox = np.array([0.0, 0.9, 1.86, 4.08, 1.73, 1.56, np.pi/2]) # print('gt_bboxes_3d 1\n', psudo_sdc_bbox) if self.with_velocity: psudo_sdc_bbox = np.concatenate([psudo_sdc_bbox, sdc_vel], axis=-1) gt_bboxes_3d = np.array([psudo_sdc_bbox]).astype(np.float32) # print('gt_bboxes_3d 2\n', gt_bboxes_3d) gt_names_3d = ["car"] gt_labels_3d = [] for cat in gt_names_3d: if cat in self.CLASSES: gt_labels_3d.append(self.CLASSES.index(cat)) else: gt_labels_3d.append(-1) gt_labels_3d = np.array(gt_labels_3d) # the nuscenes box center is [0.5, 0.5, 0.5], we change it to be # the same as KITTI (0.5, 0.5, 0) gt_bboxes_3d = LiDARInstance3DBoxes( gt_bboxes_3d, box_dim=gt_bboxes_3d.shape[-1], origin=(0.5, 0.5, 0.5) ) # print('gt_bboxes_3d 3\n', gt_bboxes_3d) gt_bboxes_3d = gt_bboxes_3d.convert_to(self.box_mode_3d) # print('gt_bboxes_3d 4\n', gt_bboxes_3d) if as_lidar_instance3d_box: # if we do not want the batch the box in to DataContrainer return gt_bboxes_3d gt_labels_3d = DC(to_tensor(gt_labels_3d)) gt_bboxes_3d = DC(gt_bboxes_3d, cpu_only=True) return gt_bboxes_3d, gt_labels_3d def get_sdc_traj_label(self, sample_token): sd_rec = self.nusc.get("sample", sample_token) lidar_top_data_start = self.nusc.get("sample_data", sd_rec["data"]["LIDAR_TOP"]) ego_pose_start = self.nusc.get( "ego_pose", lidar_top_data_start["ego_pose_token"] ) sdc_fut_traj = [] for _ in range(self.predict_steps): next_annotation_token = sd_rec["next"] if next_annotation_token == "": break sd_rec = self.nusc.get("sample", next_annotation_token) lidar_top_data_next = self.nusc.get( "sample_data", sd_rec["data"]["LIDAR_TOP"] ) ego_pose_next = self.nusc.get( "ego_pose", lidar_top_data_next["ego_pose_token"] ) sdc_fut_traj.append( ego_pose_next["translation"][:2] ) # global xy pos of sdc at future step i sdc_fut_traj_all = np.zeros((1, self.predict_steps, 2)) sdc_fut_traj_valid_mask_all = np.zeros((1, self.predict_steps, 2)) n_valid_timestep = len(sdc_fut_traj) if n_valid_timestep > 0: sdc_fut_traj = np.stack(sdc_fut_traj, axis=0) # (t,2) sdc_fut_traj = convert_global_coords_to_local( coordinates=sdc_fut_traj, translation=ego_pose_start["translation"], rotation=ego_pose_start["rotation"], ) sdc_fut_traj_all[:, :n_valid_timestep, :] = sdc_fut_traj sdc_fut_traj_valid_mask_all[:, :n_valid_timestep, :] = 1 return sdc_fut_traj_all, sdc_fut_traj_valid_mask_all def get_l2g_transform(self, sample): sd_rec = self.nusc.get("sample_data", sample["data"]["LIDAR_TOP"]) cs_record = self.nusc.get( "calibrated_sensor", sd_rec["calibrated_sensor_token"] ) pose_record = self.nusc.get("ego_pose", sd_rec["ego_pose_token"]) l2e_r = cs_record["rotation"] l2e_t = np.array(cs_record["translation"]) e2g_r = pose_record["rotation"] e2g_t = np.array(pose_record["translation"]) l2e_r_mat = Quaternion(l2e_r).rotation_matrix e2g_r_mat = Quaternion(e2g_r).rotation_matrix return l2e_r_mat, l2e_t, e2g_r_mat, e2g_t def get_sdc_planning_label(self, sample_token): # get data in the current frame sd_rec = self.nusc.get("sample", sample_token) l2e_r_mat_init, l2e_t_init, e2g_r_mat_init, e2g_t_init = self.get_l2g_transform( sd_rec ) ################### retrieve future # get future ego vehicle data planning = [] planning_world = [] for _ in range(self.planning_steps): next_annotation_token = sd_rec["next"] if next_annotation_token == "": break sd_rec = self.nusc.get("sample", next_annotation_token) ( l2e_r_mat_curr, l2e_t_curr, e2g_r_mat_curr, e2g_t_curr, ) = self.get_l2g_transform( sd_rec ) # (lidar to global at current frame) # bbox of sdc under current lidar frame next_bbox3d = self.generate_sdc_info( self.sdc_vel_info[next_annotation_token], as_lidar_instance3d_box=True ) # print(next_bbox3d.tensor) # to bbox under curr ego frame next_bbox3d.rotate(l2e_r_mat_curr.T) next_bbox3d.translate(l2e_t_curr) # print(next_bbox3d.tensor) # to bbox under world frame next_bbox3d.rotate(e2g_r_mat_curr.T) next_bbox3d.translate(e2g_t_curr) planning_world.append(next_bbox3d.tensor.clone()) # print(next_bbox3d.tensor) # to bbox under initial ego frame, first inverse translate, then inverse rotate next_bbox3d.translate(-e2g_t_init) m1 = np.linalg.inv(e2g_r_mat_init) next_bbox3d.rotate(m1.T) # print(next_bbox3d.tensor) # to bbox under curr ego frame, first inverse translate, then inverse rotate next_bbox3d.translate(-l2e_t_init) m2 = np.linalg.inv(l2e_r_mat_init) next_bbox3d.rotate(m2.T) # print(next_bbox3d.tensor) # zxc planning.append(next_bbox3d) planning_all = np.zeros((1, self.planning_steps, 3)) planning_world_all = np.zeros((1, self.planning_steps, 3)) planning_mask_all = np.zeros((1, self.planning_steps, 2)) n_valid_timestep = len(planning) if n_valid_timestep > 0: planning = [p.tensor.squeeze(0) for p in planning] planning = np.stack(planning, axis=0) # (valid_t, 9) # print(planning) planning = planning[:, [0, 1, 6]] # (x, y, yaw) planning_all[:, :n_valid_timestep, :] = planning # 1 x P_s x 3 planning_mask_all[:, :n_valid_timestep, :] = 1 planning_world = [p.squeeze(0) for p in planning_world] planning_world = np.stack(planning_world, axis=0) # (valid_t, 9) planning_world = planning_world[:, [0, 1, 2]] # (x, y, z) planning_world_all[:, :n_valid_timestep, :] = planning_world # 1 x P_s x 3 mask = planning_mask_all[0].any(axis=1) # (P_s, ) # use x (left/right) to compute the command, i.e., planning_all[0, mask][-1][-1] # none of the timestamp is valid, i.e., end of seq, no future frames if mask.sum() == 0: command = 2 #'FORWARD' # check left/right based on the final future frames of x elif planning_all[0, mask][-1][0] >= 2: command = 0 #'RIGHT' elif planning_all[0, mask][-1][0] <= -2: command = 1 #'LEFT' else: command = 2 #'FORWARD' ################### retrieve past # reset data in the current frame sd_rec = self.nusc.get("sample", sample_token) l2e_r_mat_init, l2e_t_init, e2g_r_mat_init, e2g_t_init = self.get_l2g_transform( sd_rec ) # get past ego vehicle data, collect one more frame to get the diff planning_past = [] for _ in range(self.past_steps): prev_annotation_token = sd_rec["prev"] if prev_annotation_token == "": break sd_rec = self.nusc.get("sample", prev_annotation_token) ( l2e_r_mat_curr, l2e_t_curr, e2g_r_mat_curr, e2g_t_curr, ) = self.get_l2g_transform( sd_rec ) # (lidar to global at current frame) # bbox of sdc under current lidar frame prev_bbox3d = self.generate_sdc_info( self.sdc_vel_info[prev_annotation_token], as_lidar_instance3d_box=True ) # to bbox under curr ego frame prev_bbox3d.rotate(l2e_r_mat_curr.T) prev_bbox3d.translate(l2e_t_curr) # to bbox under world frame prev_bbox3d.rotate(e2g_r_mat_curr.T) prev_bbox3d.translate(e2g_t_curr) # to bbox under initial ego frame, first inverse translate, then inverse rotate prev_bbox3d.translate(-e2g_t_init) m1 = np.linalg.inv(e2g_r_mat_init) prev_bbox3d.rotate(m1.T) # to bbox under curr ego frame, first inverse translate, then inverse rotate prev_bbox3d.translate(-l2e_t_init) m2 = np.linalg.inv(l2e_r_mat_init) prev_bbox3d.rotate(m2.T) planning_past.append(prev_bbox3d) # currently the data is in backward time order, e.g., -1, -2, -3, -4 planning_all_past = np.zeros((1, self.past_steps, 3)) planning_mask_all_past = np.zeros((1, self.past_steps, 2)) n_valid_timestep = len(planning_past) if n_valid_timestep > 0: planning_past = [p.tensor.squeeze(0) for p in planning_past] planning_past = np.stack(planning_past, axis=0) # (valid_t, 9) planning_past = planning_past[:, [0, 1, 6]] # (x, y, yaw) planning_all_past[:, :n_valid_timestep, :] = planning_past # 1 x P_s x 3 planning_mask_all_past[:, :n_valid_timestep, :] = 1 # reverting the order to align with the future trajectory data, now -4, -3, -2, -1 planning_all_past = np.flip(planning_all_past, axis=1) # 1 x 4 x 3 planning_mask_all_past = np.flip(planning_mask_all_past, axis=1) # 1 x 4 x 2 return planning_all, planning_mask_all, command, planning_all_past, planning_mask_all_past, planning_world_all