# ---------------------------------------------------------------------------------# # UniAD: Planning-oriented Autonomous Driving (https://arxiv.org/abs/2212.10156) # # Source code: https://github.com/OpenDriveLab/UniAD # # Copyright (c) OpenDriveLab. All rights reserved. # # ---------------------------------------------------------------------------------# import copy import hashlib import json import os import pickle import pprint import random import tempfile from os import path as osp from typing import Dict, List import sys from pathlib import Path from torchvision import transforms from easydict import EasyDict import mmcv import numpy as np import scipy import cv2 import torch from mmcv.parallel import DataContainer as DC from mmdet3d.core.bbox import LiDARInstance3DBoxes from mmdet3d_plugin.datasets import NuScenesE2EDataset from mmdet3d_plugin.datasets.data_utils.rasterize import preprocess_map from mmdet3d_plugin.datasets.nuplan.nuplan_pointcloud import PointCloud from mmdet3d_plugin.eval.detection.config import config_factory_nuPlan from mmdet3d_plugin.uniad.dense_heads.planning_head_plugin.compute_metrics import ( PredictionConfig, compute_metrics) from mmdet.datasets import DATASETS from mmdet.datasets.pipelines import to_tensor from nuscenes import NuScenes from nuscenes.eval.common.utils import Quaternion, quaternion_yaw from nuscenes.eval.tracking.evaluate import TrackingEval from nuscenes.prediction import convert_local_coords_to_global from PIL import Image from prettytable import PrettyTable from mmdet3d_plugin.datasets.data_utils.diffusiondrive_map import DiffusionDriveMap from copy import deepcopy import lzma import pandas as pd from glob import glob from tqdm import tqdm import yaml from navsim.common.enums import BoundingBoxIndex, LidarIndex from navsim.common.dataclasses import Lidar, Cameras import mmdet3d_plugin.datasets.utils.calibration as calib_utils from navsim.agents.diffusiondrive.transfuser_config import TransfuserConfig from enum import IntEnum @DATASETS.register_module() class NavSimOpenScenesE2EDiffusionDrive(NuScenesE2EDataset): r"""OpenScenes E2E Dataset""" CLASSES = ( "vehicle", "bicycle", "pedestrian", "traffic_cone", "barrier", "czone_sign", "generic_object", ) def __init__(self, *args, img_root=None, duplicate=True, load_from_splitted_pkl=False, split_path=None, test_filter_file=None, video_pkl=None, ep_path=None, train_pdm_path=None, test_pdm_path=None, future_frame_num=0, history_frame_num=3, finetune_yaml=None, cache_path=None, **kwargs): # Initialize the parent class self.img_root = img_root self.finetune_yaml = finetune_yaml self.duplicate=duplicate # specialized variable for WE finetuning # self.synthetic_pdm = None self.fail_syn_map_filter = None self.synthetic_img_root = None self.full_fail_map_filter = None self.cache_path = cache_path self.metric_cache_tokens = None self.load_from_splitted_pkl = load_from_splitted_pkl self.future_frame_num = future_frame_num self.history_frame_num = history_frame_num self._config = TransfuserConfig() self.map_root = '/xxx/nuplan/dataset/maps' self.map_api = DiffusionDriveMap(self._config, map_root=self.map_root) # newly added paths: split_path = '/xxx/navtrain_split' train_pdm_path = '/xxx/pdm_8192_gt_cache' test_pdm_path = '/xxx/pdm_8192_gt_cache_navtest' self.split_path = split_path # path for split yaml self.train_pdm_path = train_pdm_path # path saving pdm score self.test_pdm_path = test_pdm_path self.video_pkl = video_pkl if self.cache_path is not None: print(f'loading cache info from:{cache_path}') self.init_metric_cache_paths(cache_path) super(NavSimOpenScenesE2EDiffusionDrive, self).__init__(*args, **kwargs) self.cache_path = cache_path def init_metric_cache_paths(self, cache_path): """ Helper function to load all cache file paths from folder. :param cache_path: directory of cache folder :return: dictionary of token and file path """ metadata_dir = Path(cache_path) / "metadata" metadata_file = [file for file in metadata_dir.iterdir() if ".csv" in str(file)][0] with open(str(metadata_file), "r") as f: cache_paths = f.read().splitlines()[1:] self.metric_cache_dict = {cache_path.split("/")[-2]: cache_path for cache_path in cache_paths} print(f'cache loaded, size {len(self.metric_cache_dict.keys())}') self.metric_cache_tokens = set(self.metric_cache_dict.keys()) def merge_into_splits(self, ann_file, scene_filter): all_pkl_file = glob(ann_file + '/*.pkl') print(f'{len(all_pkl_file)} files found') data_infos = [] for pkl_file in tqdm(all_pkl_file): data_tmp = mmcv.load(pkl_file, file_format="pkl")['infos'] scene_filter_expanded = set() for idx, data_frame in enumerate(data_tmp): if data_frame['token'] in scene_filter: start_frame_idx = idx - self.history_frame_num end_frame_idx = idx + self.future_frame_num for i in range(start_frame_idx, end_frame_idx + 1): if i < 0 or i >= len(data_tmp): continue scene_filter_expanded.add(data_tmp[i]['token']) data_save = [] for data_frame in data_tmp: token = data_frame['token'] if token in scene_filter_expanded: data_save.append(data_frame) data_infos.extend(data_save) return data_infos def load_annotations(self, ann_file): # load scene filter print(ann_file) if self.test_mode: print('use navtest filter...') self.val=True nav_filter = os.path.join(self.data_root, 'navsim_filter/navtest.yaml') else: print('use navtrain filter...') self.val=False nav_filter = os.path.join(self.data_root, 'navsim_filter/navtrain.yaml') if self.use_num_split >0 and ('val' not in ann_file) and (self.finetune_yaml==None): scene_filter = [] for i in range(5): if i>=self.use_num_split: break path = self.split_path + f'/split_{i}.yaml' with open(path, 'r') as file: nav_filter = yaml.safe_load(file) scene_filter += nav_filter['tokens'] else: if self.finetune_yaml is not None: print(f'loading finetuning yaml from:{self.finetune_yaml}') nav_filter = self.finetune_yaml with open(nav_filter, 'r') as file: nav_filter = yaml.safe_load(file) try: scene_filter = nav_filter['tokens'] except: scene_filter = nav_filter['scenario_tokens'] # load annotations: print('loading dataset!') if self.load_from_splitted_pkl: data_infos = self.merge_into_splits(ann_file, scene_filter) else: print(ann_file) data_infos = mmcv.load(ann_file, file_format="pkl") # data_infos = data_infos['infos'] print('dataset loaded, length:', len(data_infos)) self.scene_filter = set(scene_filter) if self.video_pkl is not None: vidmetadata_file = '/xxx/navsim_metric_cache_navtrain/metadata/navsim_metric_cache_navtrain2_metadata_node_0.csv' with open(str(vidmetadata_file), "r") as f: vid_cache_paths = f.read().splitlines()[1:] # only load successful one metric_cache_filter = [cache_path.split("/")[-2] for cache_path in vid_cache_paths] print(f'video data loaded, length:', len(metric_cache_filter)) self.scene_filter = self.scene_filter | set(metric_cache_filter) self.index_map = [] for i, info in enumerate(data_infos): if info['token'] in self.scene_filter: if self.test_mode==False: # in navtest, no effects valid_len=np.sum(info['gt_fut_bbox_sdc_mask'][0,:8]) if valid_len <8: continue self.index_map.append(i) if self.use_num_split > 0 and self.val==False and self.duplicate==True: self.index_map = self.duplicate_indices(self.index_map, n=5/self.use_num_split) print(f'filtering {len(data_infos)} frames to {len(self.index_map)}...') # use log_token in OpenScene as scene_token. for info in data_infos: info['scene_token'] = info['log_token'] self.finetuning_mask = None return data_infos def duplicate_indices(self, indices, n): int_part = int(n) # Get the integer part frac_part = n - int_part # Get the fractional part duplicated = indices * int_part # Duplicate the integer part if frac_part > 0: num_extra = int(len(indices) * frac_part) # Determine how many extra indices to add extra_indices = list(np.random.choice(indices, num_extra, replace=False)) # Sample without replacement duplicated.extend(extra_indices) return duplicated def __len__(self): return len(self.index_map) def __getitem__(self, idx): # map nav filter idx to the data_infos idx. new_idx = self.index_map[idx] if self.test_mode: # print('test_data') return self.prepare_test_data(new_idx) while True: # print('train_data') data = self.prepare_train_data(new_idx) if data is None: idx = self._rand_another(idx) new_idx = self.index_map[idx] continue return data def init_dataset(self): self.sdc_acc_ego_fix = False # update detection config to handle evaluation with different classes self.eval_detection_configs = config_factory_nuPlan() self.default_attr = { "vehicle": "vehicle.parked", "bicycle": "cycle.without_rider", "pedestrian": "pedestrian.moving", "traffic_cone": "", "barrier": "", "czone_sign": "", "generic_object": "", } return # def prepare_train_data(self, index): # return self.prepare_test_data(index) def prepare_test_data_old(self, index): """ Training data preparation. Args: index (int): Index for accessing the target data. Returns: dict: Training data dict of the corresponding index. img: queue_length, 6, 3, H, W img_metas: img_metas of each frame (list) gt_globals_3d: gt_globals of each frame (list) gt_bboxes_3d: gt_bboxes of each frame (list) gt_inds: gt_inds of each frame (list) """ data_queue = [] self.enbale_temporal_aug = False # ensure the first and final frame in same scene final_index = index first_index = index - self.queue_length + 1 if first_index < 0: return None if ( self.data_infos[first_index]["scene_token"] != self.data_infos[final_index]["scene_token"] ): print(index, 2) return None # current timestamp if self.cache_path is not None: if self.data_infos[index]["token"] not in self.metric_cache_tokens: print(f'Token: {self.data_infos[index]["scene_token"]} not in Metric cache') return None input_dict = self.get_data_info(final_index) ### get finetuning: if self.finetuning_mask is not None: input_dict['fail_mask'] = self.finetuning_mask[self.reverse_index_map[index]] prev_indexs_list = list(reversed(range(first_index, final_index))) if input_dict is None: print(index, index in self.fail_syn_map_filter ,3) return None frame_idx = input_dict["frame_idx"] scene_token = input_dict["scene_token"] self.pre_pipeline(input_dict) example = self.pipeline(input_dict) data_queue.insert(0, example) ########## retrieve previous infos, frame by frame for i in prev_indexs_list: input_dict = self.get_data_info(i, prev_frame=True) if self.finetuning_mask is not None: input_dict['fail_mask'] = self.finetuning_mask[self.reverse_index_map[index]] if input_dict is None: print(index, index in self.fail_syn_map_filter, 4) return None if ( input_dict["frame_idx"] < frame_idx and input_dict["scene_token"] == scene_token ): self.pre_pipeline(input_dict) example = self.pipeline(input_dict) frame_idx = input_dict["frame_idx"] data_queue.insert(0, copy.deepcopy(example)) # merge a sequence of data into one dictionary, only for temporal data if self.val or self.test_mode: data_queue = self.union2one_test(data_queue) else: data_queue = self.union2one(data_queue, self.data_infos[index]["token"] ) return data_queue def prepare_sdc_gt_planning(self, index): sdc_plan = [] info = self.data_infos[index] if self.video_pkl is not None: # directly use lidar: gt_fut_bbox_sdc_lidar = info["gt_fut_bbox_sdc_lidar"][:, : self.planning_steps, [0, 1, 6]] gt_fut_bbox_sdc_lidar = gt_fut_bbox_sdc_lidar[0] return gt_fut_bbox_sdc_lidar global2lidar = np.linalg.inv(info["lidar2global"] ) # 4 x 4 for i in range(8): fut_info = self.data_infos[index + 1 + i] ############### get future of the sdc box in lidar coordinate sdc_loc_global, _ = calib_utils.transform_matrix_to_vector( fut_info["ego2global"] ) sdc_loc_global = np.concatenate((sdc_loc_global, np.array([1]))) # 4 sdc_loc_lidar = global2lidar @ sdc_loc_global # 4 sdc_trans_lidar = ( global2lidar @ fut_info["ego2global"] ) # 4 x 4 yaw_lidar = scipy.spatial.transform.Rotation.from_matrix( sdc_trans_lidar[:3, :3] ).as_euler("xyz", degrees=False)[-1] sdc_plan.append([sdc_loc_lidar[0], sdc_loc_lidar[1], yaw_lidar]) sdc_plan = np.stack(sdc_plan, axis=0) return sdc_plan def prepare_train_data(self, index): info = self.data_infos[index] # future log-token check 8 frames: if index+8>len(self.data_infos): print(f'token:{index} exceed in future scene') return None match_flag = all(self.data_infos[index+1+j]['log_token'] == info['log_token'] for j in range(8)) if match_flag==False and (self.video_pkl is None): print(f'token:{index} not match in future scene') return None #prepare inputs; features = self.prepare_test_data(index) #prepare labels: label_features = self.prepare_label(index) features.update(label_features) features['img_metas'] = DC(deepcopy(info['token']), cpu_only=True) if self.cache_path is not None: features["metric_cache"] = DC(self.get_metric_cache(info['token']), cpu_only=True) return features def prepare_label(self, index): info = self.data_infos[index] sdc_plan = self.prepare_sdc_gt_planning(index) trajectory = torch.from_numpy(sdc_plan) agent_states, agent_labels = self._compute_agent_targets(index) bev_semantic_map = self.map_api.compute_bev_semantic_map(info) return { "trajectory": trajectory, "agent_states": agent_states, "agent_labels": agent_labels, "bev_semantic_map": bev_semantic_map, } def _compute_agent_targets(self, index): """ Extracts 2D agent bounding boxes in ego coordinates :param annotations: annotation dataclass :return: tuple of bounding box values and labels (binary) """ info = self.data_infos[index] max_agents = self._config.num_bounding_boxes agent_states_list = [] def _xy_in_lidar(x: float, y: float, config: TransfuserConfig) -> bool: return (config.lidar_min_x <= x <= config.lidar_max_x) and (config.lidar_min_y <= y <= config.lidar_max_y) for box, name in zip(info['anns']["gt_boxes"], info['anns']["gt_names"]): box_x, box_y, box_heading, box_length, box_width = ( box[BoundingBoxIndex.X], box[BoundingBoxIndex.Y], box[BoundingBoxIndex.HEADING], box[BoundingBoxIndex.LENGTH], box[BoundingBoxIndex.WIDTH], ) if name == "vehicle" and _xy_in_lidar(box_x, box_y, self._config): agent_states_list.append(np.array([box_x, box_y, box_heading, box_length, box_width], dtype=np.float32)) agents_states_arr = np.array(agent_states_list) # filter num_instances nearest agent_states = np.zeros((max_agents, BoundingBox2DIndex.size()), dtype=np.float32) agent_labels = np.zeros(max_agents, dtype=bool) if len(agents_states_arr) > 0: distances = np.linalg.norm(agents_states_arr[..., BoundingBox2DIndex.POINT], axis=-1) argsort = np.argsort(distances)[:max_agents] # filter detections agents_states_arr = agents_states_arr[argsort] agent_states[: len(agents_states_arr)] = agents_states_arr agent_labels[: len(agents_states_arr)] = True return torch.tensor(agent_states), torch.tensor(agent_labels) def prepare_test_data(self, index): """Inherited, see superclass.""" info = self.data_infos[index] features = {} features['token'] = DC(deepcopy(info['token']), cpu_only=True) features["camera_feature"] = self._get_camera_feature(info) features["lidar_feature"] = self._get_lidar_feature(info) # cmd = np.zeros(4) # cmd[info['command']]=1 # sdc_acc_global = info['can_bus'][[7, 8, 9]] # 3 # sdc_acc_lidar: np.ndarray = ( # np.linalg.inv(info['lidar2global'][:3, :3]) @ sdc_acc_global # ) # 3 features["status_feature"] = torch.cat( [ torch.tensor(info['driving_command'], dtype=torch.float32), torch.tensor(info["ego_dynamic_state"][:2], dtype=torch.float32), #vxvy torch.tensor(info["ego_dynamic_state"][2:], dtype=torch.float32), ], ) return features def _get_camera_feature(self, info) -> torch.Tensor: """ Extract stitched camera from AgentInput :param agent_input: input dataclass :return: stitched front view image as torch tensor """ cameras = Cameras.from_camera_dict( sensor_blobs_path=Path(self.img_root), camera_dict=info["cams"], sensor_names=['cam_l0', 'cam_f0', 'cam_r0'], ) # Crop to ensure 4:1 aspect ratio l0 = cameras.cam_l0.image[28:-28, 416:-416] f0 = cameras.cam_f0.image[28:-28] r0 = cameras.cam_r0.image[28:-28, 416:-416] # stitch l0, f0, r0 images stitched_image = np.concatenate([l0, f0, r0], axis=1) resized_image = cv2.resize(stitched_image, (1024, 256)) # resized_image = cv2.resize(stitched_image, (448, 448)) # resized_image = cv2.resize(stitched_image, (2048, 512)) tensor_image = transforms.ToTensor()(resized_image) # tensor_image = transforms.Normalize( # mean=(0.5, 0.5, 0.5), # Normalize (often 0.5 or ImageNet stats) # std=(0.5, 0.5, 0.5) # )(tensor_image) return tensor_image def _get_lidar_feature(self, info) -> torch.Tensor: """ Compute LiDAR feature as 2D histogram, according to Transfuser :param agent_input: input dataclass :return: LiDAR histogram as torch tensors """ lidar_path = info['lidar_path'] lidar = Lidar.from_paths(self.img_root , lidar_path, "lidar_pc") # only consider (x,y,z) & swap axes for (N,3) numpy array lidar_pc = lidar.lidar_pc[LidarIndex.POSITION].T # NOTE: Code from # https://github.com/autonomousvision/carla_garage/blob/main/team_code/data.py#L873 # self._config = EasyDict(dict( # lidar_min_x= -32, # lidar_max_x= 32, # lidar_min_y= -32, # lidar_max_y= 32, # hist_max_per_pixel = 5, # pixels_per_meter=4.0, # max_height_lidar=100.0, # lidar_split_height=0.2, # use_ground_plane=False, # ) ) def splat_points(point_cloud): # 256 x 256 grid xbins = np.linspace( self._config.lidar_min_x, self._config.lidar_max_x, (self._config.lidar_max_x - self._config.lidar_min_x) * int(self._config.pixels_per_meter) + 1, ) ybins = np.linspace( self._config.lidar_min_y, self._config.lidar_max_y, (self._config.lidar_max_y - self._config.lidar_min_y) * int(self._config.pixels_per_meter) + 1, ) hist = np.histogramdd(point_cloud[:, :2], bins=(xbins, ybins))[0] hist[hist > self._config.hist_max_per_pixel] = self._config.hist_max_per_pixel overhead_splat = hist / self._config.hist_max_per_pixel return overhead_splat # Remove points above the vehicle lidar_pc = lidar_pc[lidar_pc[..., 2] < self._config.max_height_lidar] below = lidar_pc[lidar_pc[..., 2] <= self._config.lidar_split_height] above = lidar_pc[lidar_pc[..., 2] > self._config.lidar_split_height] above_features = splat_points(above) if self._config.use_ground_plane: below_features = splat_points(below) features = np.stack([below_features, above_features], axis=-1) else: features = np.stack([above_features], axis=-1) features = np.transpose(features, (2, 0, 1)).astype(np.float32) return torch.from_numpy(features) def get_metric_cache(self, token): with lzma.open(self.metric_cache_dict[token], "rb") as f: metric_cache = pickle.load(f) return metric_cache def union2one(self, queue, token): """ convert a sequence of the sample dict into one single sample. """ # sensor data and transform imgs_list: List[torch.Tensor] = [ each["img"].data for each in queue ] # L, C x 3 x H x W l2g_r_mat_list: List[torch.Tensor] = [ to_tensor(each["l2g_r_mat"]) for each in queue ] # L, 3 x 3 l2g_t_list: List[torch.Tensor] = [ to_tensor(each["l2g_t"]) for each in queue ] # L, 3 timestamp_list = [to_tensor(each["timestamp"]) for each in queue] # L, 1 # converting the global absolute coordinate into relative position and orientation change metas_map = {} prev_pos = None prev_angle = None for i, each in enumerate(queue): metas_map[i] = each["img_metas"].data if i == 0: metas_map[i]["prev_bev"] = False prev_pos = copy.deepcopy(metas_map[i]["can_bus"][:3]) prev_angle = copy.deepcopy(metas_map[i]["can_bus"][-1]) metas_map[i]["can_bus"][:3] = 0 metas_map[i]["can_bus"][-1] = 0 else: metas_map[i]["prev_bev"] = True tmp_pos = copy.deepcopy(metas_map[i]["can_bus"][:3]) tmp_angle = copy.deepcopy(metas_map[i]["can_bus"][-1]) metas_map[i]["can_bus"][:3] -= prev_pos metas_map[i]["can_bus"][-1] -= prev_angle prev_pos = copy.deepcopy(tmp_pos) prev_angle = copy.deepcopy(tmp_angle) # print(metas_map[i]["can_bus"][:3]) queue[-1]["img"] = DC( torch.stack(imgs_list), cpu_only=False, stack=True ) # T x N x 3 x H x W queue[-1]["img_metas"] = DC(metas_map, cpu_only=True) # inherent all the labels for the current frame only queue = queue[-1] # merge all other labels that requires temporal and merge from queue if self.cache_path is not None: queue["metric_cache"] = DC(self.get_metric_cache(token), cpu_only=True) queue["l2g_r_mat"] = DC(l2g_r_mat_list) queue["l2g_t"] = DC(l2g_t_list) queue["timestamp"] = DC(timestamp_list) return queue def union2one_test(self, queue): """ convert a sequence of the sample dict into one single sample. """ # sensor data and transform imgs_list: List[torch.Tensor] = [ each["img"][0].data for each in queue ] # L, C x 3 x H x W l2g_r_mat_list: List[torch.Tensor] = [ to_tensor(each["l2g_r_mat"]) for each in queue ] # L, 3 x 3 l2g_t_list: List[torch.Tensor] = [ to_tensor(each["l2g_t"]) for each in queue ] # L, 3 timestamp_list = [to_tensor(each["timestamp"]) for each in queue] # L, 1 # class category labels for classification, e.g., 0 for Car # gt_labels_3d_list: List[torch.Tensor] = [ # each["gt_labels_3d"].data for each in queue # ] # L, N # gt_sdc_label_list: List[torch.Tensor] = [ # each["gt_sdc_label"].data for each in queue # ] # L, 1 # # global ID for tracking # gt_inds_list = [to_tensor(each["gt_inds"]) for each in queue] # L, N # # detection # gt_bboxes_3d_list: List[LiDARInstance3DBoxes] = [ # each["gt_bboxes_3d"].data for each in queue # ] # L, N x 9 # trajectory prediction # gt_past_traj_list = [ # to_tensor(each["gt_past_traj"]) for each in queue # ] # L, N x 8 x 2 # gt_past_traj_mask_list = [ # to_tensor(each["gt_past_traj_mask"]) for each in queue # ] # L, N x 8 x 2 # gt_fut_traj = to_tensor(queue[-1]["gt_fut_traj"]) # L, N x 12 x 2 # gt_fut_traj_mask = to_tensor(queue[-1]["gt_fut_traj_mask"]) # L, N x 12 x 2 # planning # gt_sdc_bbox_list: List[LiDARInstance3DBoxes] = [ # each["gt_sdc_bbox"].data for each in queue # ] # L, 1 x 9 # gt_sdc_fut_traj = to_tensor(queue[-1]["gt_sdc_fut_traj"]) # L, 1 x 12 x 2 # gt_sdc_fut_traj_mask = to_tensor( # queue[-1]["gt_sdc_fut_traj_mask"] # ) # L, 1 x 12 x 2 # gt_future_boxes_list = queue[-1]["gt_future_boxes"] # 7, N x 9 # gt_future_labels_list = [ # to_tensor(each) for each in queue[-1]["gt_future_labels"] # ] # 7, N* (could be larger than N), only those valid are >=0, others are -1 # converting the global absolute coordinate into relative position and orientation change metas_map = {} prev_pos = None prev_angle = None for i, each in enumerate(queue): metas_map[i] = each["img_metas"][0].data if i == 0: metas_map[i]["prev_bev"] = False prev_pos = copy.deepcopy(metas_map[i]["can_bus"][:3]) prev_angle = copy.deepcopy(metas_map[i]["can_bus"][-1]) metas_map[i]["can_bus"][:3] = 0 metas_map[i]["can_bus"][-1] = 0 else: metas_map[i]["prev_bev"] = True tmp_pos = copy.deepcopy(metas_map[i]["can_bus"][:3]) tmp_angle = copy.deepcopy(metas_map[i]["can_bus"][-1]) metas_map[i]["can_bus"][:3] -= prev_pos metas_map[i]["can_bus"][-1] -= prev_angle prev_pos = copy.deepcopy(tmp_pos) prev_angle = copy.deepcopy(tmp_angle) # print(metas_map[i]["can_bus"][:3]) queue[-1]["img"] = DC( torch.stack(imgs_list), cpu_only=False, stack=True ) # T x N x 3 x H x W queue[-1]["img_metas"] = DC(metas_map, cpu_only=True) # inherent all the labels for the current frame only queue = queue[-1] # merge all other labels that requires temporal and merge from queue # queue["gt_labels_3d"] = DC(gt_labels_3d_list) # queue["gt_sdc_label"] = DC(gt_sdc_label_list) # queue["gt_inds"] = DC(gt_inds_list) # queue["gt_bboxes_3d"] = DC(gt_bboxes_3d_list, cpu_only=True) # queue["gt_sdc_bbox"] = DC(gt_sdc_bbox_list, cpu_only=True) queue["l2g_r_mat"] = DC(l2g_r_mat_list) queue["l2g_t"] = DC(l2g_t_list) queue["timestamp"] = DC(timestamp_list) # queue["gt_fut_traj"] = DC(gt_fut_traj) # queue["gt_fut_traj_mask"] = DC(gt_fut_traj_mask) # queue["gt_past_traj"] = DC(gt_past_traj_list) # queue["gt_past_traj_mask"] = DC(gt_past_traj_mask_list) # queue["gt_future_boxes"] = DC(gt_future_boxes_list, cpu_only=True) # queue["gt_future_labels"] = DC(gt_future_labels_list) return queue def init_trackpredplan( self, predict_steps, planning_steps, past_steps, fut_steps, use_nonlinear_optimizer, ): # trajectory APIs for tracking/prediction/planning self.predict_steps = predict_steps self.planning_steps = planning_steps self.past_steps = past_steps self.fut_steps = fut_steps self.use_nonlinear_optimizer = use_nonlinear_optimizer # eval for planning # self.helper = self.traj_api.predict_helper # # self.config = load_prediction_config(self.helper, "./planning.json") # # Load config file and deserialize it. # if self.test_mode: # # planning_map_config_file = "planning_test.json" # planning_map_config_file = "planning_all.json" # else: # planning_map_config_file = "planning_trainval.json" # with open(planning_map_config_file, "r") as f: # planning_map_config = json.load(f) # self.planning_map_config = PredictionConfig.deserialize( # planning_map_config, self.helper # ) def init_occupancy( self, occ_receptive_field, occ_n_future, occ_filter_invalid_sample, occ_filter_by_valid_flag, ): self.occ_receptive_field = occ_receptive_field # past + current self.occ_n_future = occ_n_future # future only self.occ_filter_invalid_sample = occ_filter_invalid_sample self.occ_filter_by_valid_flag = occ_filter_by_valid_flag self.occ_only_total_frames = 7 # NOTE: hardcode, not influenced by planning assert self.occ_filter_by_valid_flag is False def get_pdm_score_info(self, input_dict, index=None): if self.val: base_path = self.test_pdm_path if self.full_fail_map_filter: if index in self.full_fail_map_filter: base_path = self.train_pdm_path else: base_path = self.train_pdm_path if self.fail_syn_map_filter: if index in self.fail_syn_map_filter: base_path = self.synthetic_pdm log_name = input_dict['log_name'].split('-')[0] + '-' + input_dict['log_name'].split('-')[1] else: log_name = input_dict['log_name'] load_path = base_path + '/' + log_name + '/' type_list = os.listdir(load_path) for t_name in type_list: buf_dir = t_name+ '/' +input_dict['sample_idx'] if os.path.exists(buf_dir): break load_path = load_path + buf_dir + '/pdm_cache.pkl' # try: with lzma.open(load_path, "rb") as f: data = pickle.load(f).trajectory input_dict.update(data) return input_dict def get_zero_pdm(self, input_dict): pdm_list = [ "no_at_fault_collisions", "drivable_area_compliance", "ego_progress", "time_to_collision_within_bound", "comfort", "score", ] for k in pdm_list: input_dict[k] = np.zeros((8192,)) return input_dict def get_data_info(self, index, info=None, debug=False, prev_frame=False): """Get data info according to the given index. Args: index (int): Index of the sample data to get. Returns: dict: Data information that will be passed to the data \ preprocessing pipelines. It includes the following keys: - sample_idx (str): Sample index. - pts_filename (str): Filename of point clouds. - sweeps (list[dict]): Infos of sweeps. - timestamp (float): Sample timestamp. - img_filename (str, optional): Image filename. - lidar2img (list[np.ndarray], optional): Transformations \ from lidar to different cameras. - ann_info (dict): Annotation info. """ if info is None: info = self.data_infos[index] # standard protocal modified from SECOND.Pytorch input_dict = dict( sample_idx=info["token"], # str: OpenScenes unique sample token frame_idx=info["frame_idx"], # int: 0-indexed frame IDs timestamp=info["timestamp"] / 1e6, # int: OpenScenes unique time index log_name=info["log_name"], # str: OpenScenes unique sample token log_token=info["log_token"], # str: OpenScenes unique sample token scene_name=info["scene_name"], # str: OpenScenes sequence name scene_token=info["scene_token"], # str: OpenScenes sequence name pts_filename=info["lidar_path"], # str: relative path for the lidar data # sweeps=info["sweeps"], # List[Dict]: list of infos for sweep data prev=info["sample_prev"], # str: OpenScenes unique sample token next=info["sample_next"], # str: OpenScenes unique sample token ) input_dict = self.update_transform(input_dict=input_dict, index=index) input_dict = self.update_sensor(input_dict=input_dict, index=index) input_dict = self.update_canbus(input_dict=input_dict, index=index) if prev_frame==False: # only count for curr frame PDMScore input_dict = self.get_pdm_score_info(input_dict, index) else: input_dict = self.get_zero_pdm(input_dict) # generate annotation for detection/tracking, putting them in the annotation so that # we could do range filtering altogether in the transform_3d.py input_dict["ann_info"] = self.get_ann_info(index) # # all data ultimately needs to be in the input_dict for downstream heads # input_dict = self.update_mapping(input_dict=input_dict, index=index) # input_dict = self.update_occupancy(input_dict=input_dict, index=index) input_dict = self.update_ego_prediction(input_dict=input_dict, index=index) input_dict = self.update_ego_planning(input_dict=input_dict, index=index) # visualization for debugging debug = False if debug: self.vis_data_per_frame( data=input_dict, index=index, vis_root="data/viz_nuplan", ) return input_dict def update_anno_token(self, annotation, index): # add OpenScenes-specific token information return annotation def update_tracking_prediction(self, annotation, index): # reusing from the Carla dataloader info = self.data_infos[index] # get trajectories for surrounding agents, lidar coordinate, 2Hz gt_fut_bbox_lidar = info["gt_fut_bbox_lidar"][annotation["mask"]] # N x 12 x 9 gt_fut_bbox_mask = info["gt_fut_bbox_mask"][annotation["mask"]] # N x 12 x 1 gt_pre_bbox_lidar = info["gt_pre_bbox_lidar"][annotation["mask"]] # N x 4 x 9 gt_pre_bbox_mask = info["gt_pre_bbox_mask"][annotation["mask"]] # N x 4 x 1 gt_pre_bbox_lidar = np.flip(gt_pre_bbox_lidar, axis=1) # N x 4 x 9 gt_pre_bbox_mask = np.flip(gt_pre_bbox_mask, axis=1) # N x 4 x 1 # print('\nreverse') # print(gt_pre_bbox_lidar[3]) # repeat redundant channels for masks gt_fut_bbox_mask = np.repeat(gt_fut_bbox_mask, 2, axis=2) # N x 12 x 2 gt_pre_bbox_mask = np.repeat(gt_pre_bbox_mask, 2, axis=2) # N x 4 x 2 # append 4 future frames of trajectories # so now, it contains frames in the order of -1, -2, -3, -4, 1, 2, 3, 4 gt_pre_bbox_lidar = np.concatenate( (gt_pre_bbox_lidar, gt_fut_bbox_lidar[:, :4]), axis=1 ) # N x 8 x 9 gt_pre_bbox_mask = np.concatenate( (gt_pre_bbox_mask, gt_fut_bbox_mask[:, :4]), axis=1 ) # N x 8 x 2 # print('\nput together') # print(gt_pre_bbox_lidar[3]) # update output dictionary annotation.update( dict( gt_fut_traj=gt_fut_bbox_lidar[:, :, :2], # N x 12 x 2, lidar coordinate gt_fut_traj_mask=gt_fut_bbox_mask, # N x 12 x 2 gt_past_traj=gt_pre_bbox_lidar[:, :, :2], # N x 8 x 2 gt_past_traj_mask=gt_pre_bbox_mask, # N x 8 x 2 ) ) return annotation def update_mapping(self, input_dict, index): # get polyline in the ego coordinate polyline_dict, drivable_area = self.update_map_vectorized(input_dict, index) input_dict = self.update_map_rasterized(input_dict, index, polyline_dict, drivable_area) return input_dict def update_map_vectorized(self, input_dict, index): info = self.data_infos[index] map_location = info['map_location'] e2g_translation = info['ego2global_translation'] e2g_rotation = info['ego2global_rotation'] if len(e2g_rotation) != 4: e2g_rotation = Quaternion._from_matrix(e2g_rotation).elements polyline_dict = self.vector_map.gen_vectorized_samples(e2g_translation, e2g_rotation, map_location) drivable_area = self.vector_map.gen_drivable_area(e2g_translation, e2g_rotation, map_location) return polyline_dict, drivable_area def update_map_rasterized(self, input_dict, index, polyline_dict, drivable_area, debug=False): gt_labels = [] gt_bboxes = [] gt_masks = [] origin = np.array([self.canvas_size[1] // 2, self.canvas_size[0] // 2]) scale = np.array([self.canvas_size[1] / self.patch_size[0], self.canvas_size[0] / self.patch_size[1]]) for polyline in polyline_dict: if polyline["type"] == 0: continue # skip centerline map_mask = np.zeros(self.canvas_size, np.uint8) draw_coor = (polyline["pts"] * scale + origin).round().astype(np.int32) gt_mask = cv2.polylines(map_mask, [draw_coor], False, color=1, thickness=self.thickness) / 255 ys, xs = np.where(gt_mask) try: gt_bbox = [min(xs), min(ys), max(xs), max(ys)] except ValueError: gt_bbox = [0, 0, 1, 1] continue cls = polyline["type"] gt_labels.append(cls) gt_bboxes.append(gt_bbox) gt_masks.append(gt_mask) # for stuff class, drivable area map_mask = np.zeros(self.canvas_size, np.uint8) exteriors = [] interiors = [] for p in drivable_area.geoms: exteriors.append( (np.array(p.exterior.coords) * scale + origin).round().astype(np.int32) ) for inter in p.interiors: interiors.append( (np.array(inter.coords) * scale + origin).round().astype(np.int32) ) cv2.fillPoly(map_mask, exteriors, 255) cv2.fillPoly(map_mask, interiors, 0) map_mask = map_mask / 255 try: ys, xs = np.where(map_mask) gt_bbox = [min(xs), min(ys), max(xs), max(ys)] gt_labels.append(self.map_num_classes) gt_bboxes.append(gt_bbox) gt_masks.append(map_mask) except ValueError: pass gt_labels = torch.tensor(gt_labels) gt_bboxes = torch.from_numpy(np.stack(gt_bboxes)) gt_masks = torch.from_numpy(np.stack(gt_masks)) # N x H x W # update output dictionary input_dict.update( dict( gt_lane_labels=gt_labels, gt_lane_bboxes=gt_bboxes, gt_lane_masks=gt_masks, ) ) return input_dict def update_ego_prediction(self, input_dict, index): # in replacement of both update_ego_prediction and update_ego_planning # reusing from the Carla dataloader info = self.data_infos[index] # print('\n\nprint out update_ego_prediction now\n') # retrieve ego bounding box in the current frame in lidar coordinate gt_sdc_bbox = info["gt_sdc_bbox_lidar"][:9].reshape((1, -1)) # 1 x 9 gt_sdc_bbox = LiDARInstance3DBoxes( gt_sdc_bbox, box_dim=gt_sdc_bbox.shape[-1], origin=(0.5, 0.5, 0) ).convert_to(self.box_mode_3d) gt_sdc_bbox = DC(gt_sdc_bbox, cpu_only=True) # retrieve ego class label, default it car gt_sdc_label = np.array([0]) gt_sdc_label = DC(to_tensor(gt_sdc_label)) sdc_status=info["gt_sdc_bbox_lidar"][:9] # ego trajectory in lidar coordinate gt_pre_bbox_sdc_lidar = info["gt_pre_bbox_sdc_lidar"] # 1 x 4 x 9 gt_fut_bbox_sdc_lidar = info["gt_fut_bbox_sdc_lidar"] # 1 x 12 x 9 gt_pre_bbox_sdc_global = info["gt_pre_bbox_sdc_global"] # 1 x 4 x 9 gt_fut_bbox_sdc_global = info["gt_fut_bbox_sdc_global"] # 1 x 12 x 9 gt_fut_bbox_sdc_mask = info["gt_fut_bbox_sdc_mask"] # 1 x 12 x 1 gt_fut_bbox_sdc_mask = np.repeat(gt_fut_bbox_sdc_mask, 2, axis=2) # 1 x 12 x 2 gt_pre_bbox_sdc_mask = info["gt_pre_bbox_sdc_mask"] # 1 x 4 x 1 gt_pre_bbox_sdc_mask = np.repeat(gt_pre_bbox_sdc_mask, 4, axis=2) # 1 x 4 x 2 gt_pre_command_sdc = info["gt_pre_command_sdc"] # update output dictionary for ego's prediction input_dict.update( dict( # sdc_vel_lidar_calculated=sdc_vel_lidar_calculated, # (2, ) gt_sdc_bbox=gt_sdc_bbox, # DC (LiDARInstance3DBoxes), xyz, lwh, yaw, vel_x, vel_y, lidar coordinate gt_sdc_label=gt_sdc_label, # DC (tensor[0]) gt_sdc_fut_traj=gt_fut_bbox_sdc_lidar[ :, :, :2 ], # 1 x 12 x 2, lidar coordinate gt_sdc_fut_traj_mask=gt_fut_bbox_sdc_mask, # 1 x 12 x 2 # planning labels command=info["command"], # int, change from 1-indexed to 0-indexed sdc_planning_world=gt_fut_bbox_sdc_global[ :, : self.planning_steps, [0, 1, 2] ], # 1 x 6 x 3, global coordinate, xyz sdc_planning=gt_fut_bbox_sdc_lidar[ :, : self.planning_steps, [0, 1, 6] ], # 1 x 6 x 3, lidar coordinate, xy-yaw sdc_planning_mask=gt_fut_bbox_sdc_mask[ :, : self.planning_steps ], # 1 x 6 x 2 sdc_planning_past=info["gt_pre_bbox_sdc_lidar"][:, :, [0, 1, 6]], sdc_planning_mask_past=gt_pre_bbox_sdc_mask, # 1 x 4 x 2 gt_pre_command_sdc=gt_pre_command_sdc, sdc_status=sdc_status[[0, 1, 6]] ) ) return input_dict def update_ego_planning(self, input_dict, index): info = self.data_infos[index] # sdc_planning already added in update_ego_prediction return input_dict def vis_dataset_specific(self, data: Dict): try: split = "trainval" parent_dir = f"./data/openscene-v1.1/sensor_blobs/{split}" pts_filename: str = os.path.join(parent_dir, data["pts_filename"]) points = PointCloud.parse_from_file(pts_filename).to_pcd_bin2() # 6 x N except FileNotFoundError: try: split = "test" parent_dir = f"./data/openscene-v1.1/sensor_blobs/{split}" pts_filename: str = os.path.join(parent_dir, data["pts_filename"]) points = PointCloud.parse_from_file(pts_filename).to_pcd_bin2() # 6 x N except FileNotFoundError: split = "mini" parent_dir = f"./data/openscene-v1.1/sensor_blobs/{split}" pts_filename: str = os.path.join(parent_dir, data["pts_filename"]) points = PointCloud.parse_from_file(pts_filename).to_pcd_bin2() # 6 x N points = points[:3].T # N x 3 image_file_list = [] for cam_name in range(8): # FRONT, FRONT_RIGHT, FRONT_LEFT, BACK, BACK_LEFT, BACK_RIGHT image_file: str = os.path.join(parent_dir, data["img_filename"][cam_name]) image_file_list.append(image_file) return { "lidar": points, "command_dict": { 0: "TURN LEFT", 1: "KEEP FORWARD", 2: "TURN RIGHT", 3: "UNKNOWN", }, "image_file_list": image_file_list, "left_hand": False, } class BoundingBox2DIndex(IntEnum): """Intenum for bounding boxes in TransFuser.""" _X = 0 _Y = 1 _HEADING = 2 _LENGTH = 3 _WIDTH = 4 @classmethod def size(cls): valid_attributes = [ attribute for attribute in dir(cls) if attribute.startswith("_") and not attribute.startswith("__") and not callable(getattr(cls, attribute)) ] return len(valid_attributes) @classmethod @property def X(cls): return cls._X @classmethod @property def Y(cls): return cls._Y @classmethod @property def HEADING(cls): return cls._HEADING @classmethod @property def LENGTH(cls): return cls._LENGTH @classmethod @property def WIDTH(cls): return cls._WIDTH @classmethod @property def POINT(cls): # assumes X, Y have subsequent indices return slice(cls._X, cls._Y + 1) @classmethod @property def STATE_SE2(cls): # assumes X, Y, HEADING have subsequent indices return slice(cls._X, cls._HEADING + 1)