| |
| |
| |
| |
| |
|
|
| import copy |
| import hashlib |
| import json |
| import os |
| import pickle |
| import pprint |
| import random |
| import tempfile |
| from os import path as osp |
| from typing import Dict, List |
| import sys |
| from pathlib import Path |
|
|
| import mmcv |
| import numpy as np |
| import scipy |
| import cv2 |
| import torch |
| from mmcv.parallel import DataContainer as DC |
| from mmdet3d.core.bbox import LiDARInstance3DBoxes |
| from mmdet3d_plugin.datasets import NuScenesE2EDataset |
| from mmdet3d_plugin.datasets.data_utils.rasterize import preprocess_map |
| from mmdet3d_plugin.datasets.nuplan.nuplan_pointcloud import PointCloud |
| from mmdet3d_plugin.eval.detection.config import config_factory_nuPlan |
| from mmdet3d_plugin.uniad.dense_heads.planning_head_plugin.compute_metrics import ( |
| PredictionConfig, compute_metrics) |
| from mmdet.datasets import DATASETS |
| from mmdet.datasets.pipelines import to_tensor |
| from nuscenes import NuScenes |
| from nuscenes.eval.common.utils import Quaternion, quaternion_yaw |
| from nuscenes.eval.tracking.evaluate import TrackingEval |
| from nuscenes.prediction import convert_local_coords_to_global |
| from PIL import Image |
| from prettytable import PrettyTable |
|
|
| from mmdet3d_plugin.datasets.data_utils.nuplan_vector_map import VectorizedLocalMap |
| from copy import deepcopy |
|
|
| import lzma |
| import pandas as pd |
| from glob import glob |
|
|
|
|
| @DATASETS.register_module() |
| class NavSimOpenScenesE2E(NuScenesE2EDataset): |
| r"""OpenScenes E2E Dataset""" |
|
|
| CLASSES = ( |
| "vehicle", |
| "bicycle", |
| "pedestrian", |
| "traffic_cone", |
| "barrier", |
| "czone_sign", |
| "generic_object", |
| ) |
| def __init__(self, *args, duplicate=True,finetune_yaml=None, cache_path=None, **kwargs): |
| |
| self.finetune_yaml = finetune_yaml |
| self.duplicate=duplicate |
| |
| |
| self.fail_syn_map_filter = None |
| self.synthetic_img_root = None |
| self.full_fail_map_filter = None |
| self.cache_path = cache_path |
| self.metric_cache_tokens = None |
| |
| if self.cache_path is not None: |
| print(f'loading cache info from:{cache_path}') |
| self.init_metric_cache_paths(cache_path) |
|
|
| super(NavSimOpenScenesE2E, self).__init__(*args, **kwargs) |
| self.cache_path = cache_path |
|
|
| def init_metric_cache_paths(self, cache_path): |
| """ |
| Helper function to load all cache file paths from folder. |
| :param cache_path: directory of cache folder |
| :return: dictionary of token and file path |
| """ |
| metadata_dir = Path(cache_path) / "metadata" |
| metadata_file = [file for file in metadata_dir.iterdir() if ".csv" in str(file)][0] |
| with open(str(metadata_file), "r") as f: |
| cache_paths = f.read().splitlines()[1:] |
| self.metric_cache_dict = {cache_path.split("/")[-2]: cache_path for cache_path in cache_paths} |
| print(f'cache loaded, size {len(self.metric_cache_dict.keys())}') |
| |
| |
| self.metric_cache_tokens = set(self.metric_cache_dict.keys()) |
| |
| def load_annotations(self, ann_file): |
| print('loading dataset!') |
| if self.file_client_args["backend"] == "disk": |
| print(ann_file) |
| data_infos = mmcv.load(ann_file, file_format="pkl") |
| try: |
| data_infos = data_infos['infos'] |
| except: |
| pass |
|
|
| print(len(data_infos)) |
|
|
| elif self.file_client_args["backend"] == "petrel": |
| data = pickle.loads(self.file_client.get(ann_file)) |
| data_infos = list(sorted(data["infos"], key=lambda e: e["timestamp"])) |
| data_infos = data_infos[:: self.load_interval] |
| self.metadata = data["metadata"] |
| self.version = self.metadata["version"] |
| else: |
| assert False, "Invalid file_client_args!" |
|
|
| print('load pickle down!') |
|
|
| |
| import yaml |
| print(ann_file) |
| if 'val' in ann_file: |
| print('use navtest filter...') |
| self.val=True |
| nav_filter = os.path.join(self.data_root, 'navsim_filter/navtest.yaml') |
| else: |
| print('use navtrain filter...') |
| self.val=False |
| nav_filter = os.path.join(self.data_root, 'navsim_filter/navtrain.yaml') |
| |
| |
| if self.use_num_split >0 and ('val' not in ann_file) and (self.finetune_yaml==None): |
| scene_filter = [] |
| for i in range(5): |
| if i>=self.use_num_split: |
| break |
| path = f'/nas/shared/opendrivelab/liuhaochen/navtrain_split/split_{i}.yaml' |
| with open(path, 'r') as file: |
| nav_filter = yaml.safe_load(file) |
| scene_filter += nav_filter['tokens'] |
| else: |
| if self.finetune_yaml is not None: |
| print(f'loading finetuning yaml from:{self.finetune_yaml}') |
| nav_filter = self.finetune_yaml |
| |
| with open(nav_filter, 'r') as file: |
| nav_filter = yaml.safe_load(file) |
| try: |
| scene_filter = nav_filter['tokens'] |
| except: |
| scene_filter = nav_filter['scenario_tokens'] |
| |
|
|
| self.scene_filter = set(scene_filter) |
| |
| self.index_map = [] |
| for i, info in enumerate(data_infos): |
| if info['token'] in self.scene_filter: |
| valid_len=np.sum(info['gt_fut_bbox_sdc_mask']) |
| if valid_len != self.planning_steps: |
| continue |
| self.index_map.append(i) |
| |
| if self.use_num_split > 0 and self.val==False and self.duplicate==True: |
| self.index_map = self.duplicate_indices(self.index_map, n=5/self.use_num_split) |
|
|
| print(f'filtering {len(data_infos)} frames to {len(self.index_map)}...') |
|
|
| |
| for info in data_infos: |
| info['scene_token'] = info['log_token'] |
| |
| self.finetuning_mask = None |
|
|
| return data_infos |
| |
| def duplicate_indices(self, indices, n): |
| int_part = int(n) |
| frac_part = n - int_part |
| |
| duplicated = indices * int_part |
|
|
| if frac_part > 0: |
| num_extra = int(len(indices) * frac_part) |
| extra_indices = list(np.random.choice(indices, num_extra, replace=False)) |
| duplicated.extend(extra_indices) |
|
|
| return duplicated |
|
|
| def __len__(self): |
| return len(self.index_map) |
|
|
| def __getitem__(self, idx): |
| |
| new_idx = self.index_map[idx] |
|
|
| if self.test_mode: |
| |
| return self.prepare_test_data(new_idx) |
| while True: |
| |
| data = self.prepare_train_data(new_idx) |
| if data is None: |
| idx = self._rand_another(idx) |
| new_idx = self.index_map[idx] |
| continue |
| return data |
|
|
| def init_dataset(self): |
| self.sdc_acc_ego_fix = False |
|
|
| |
| self.eval_detection_configs = config_factory_nuPlan() |
| self.default_attr = { |
| "vehicle": "vehicle.parked", |
| "bicycle": "cycle.without_rider", |
| "pedestrian": "pedestrian.moving", |
| "traffic_cone": "", |
| "barrier": "", |
| "czone_sign": "", |
| "generic_object": "", |
| } |
|
|
| return |
| |
| def prepare_train_data(self, index): |
| return self.prepare_test_data(index) |
|
|
| def prepare_test_data(self, index): |
| """ |
| Training data preparation. |
| Args: |
| index (int): Index for accessing the target data. |
| Returns: |
| dict: Training data dict of the corresponding index. |
| img: queue_length, 6, 3, H, W |
| img_metas: img_metas of each frame (list) |
| gt_globals_3d: gt_globals of each frame (list) |
| gt_bboxes_3d: gt_bboxes of each frame (list) |
| gt_inds: gt_inds of each frame (list) |
| """ |
| data_queue = [] |
| self.enbale_temporal_aug = False |
| |
| final_index = index |
| first_index = index - self.queue_length + 1 |
| if first_index < 0: |
| return None |
| if ( |
| self.data_infos[first_index]["scene_token"] |
| != self.data_infos[final_index]["scene_token"] |
| ): |
| print(index, 2) |
| return None |
| |
|
|
| if self.cache_path is not None: |
| if self.data_infos[index]["token"] not in self.metric_cache_tokens: |
| print(f'Token: {self.data_infos[index]["scene_token"]} not in Metric cache') |
| return None |
| |
| input_dict = self.get_data_info(final_index) |
| |
| if self.finetuning_mask is not None: |
| input_dict['fail_mask'] = self.finetuning_mask[self.reverse_index_map[index]] |
|
|
| prev_indexs_list = list(reversed(range(first_index, final_index))) |
|
|
| if input_dict is None: |
| print(index, index in self.fail_syn_map_filter ,3) |
| return None |
| frame_idx = input_dict["frame_idx"] |
| scene_token = input_dict["scene_token"] |
| self.pre_pipeline(input_dict) |
| example = self.pipeline(input_dict) |
| data_queue.insert(0, example) |
|
|
| |
| for i in prev_indexs_list: |
|
|
| input_dict = self.get_data_info(i, prev_frame=True) |
| if self.finetuning_mask is not None: |
| input_dict['fail_mask'] = self.finetuning_mask[self.reverse_index_map[index]] |
|
|
| if input_dict is None: |
| print(index, index in self.fail_syn_map_filter, 4) |
| return None |
| if ( |
| input_dict["frame_idx"] < frame_idx |
| and input_dict["scene_token"] == scene_token |
| ): |
| self.pre_pipeline(input_dict) |
| example = self.pipeline(input_dict) |
| |
| frame_idx = input_dict["frame_idx"] |
| |
| data_queue.insert(0, copy.deepcopy(example)) |
|
|
| |
| if self.val or self.test_mode: |
| data_queue = self.union2one_test(data_queue) |
| else: |
| data_queue = self.union2one(data_queue, self.data_infos[index]["token"] ) |
| return data_queue |
| |
|
|
| def get_metric_cache(self, token): |
| with lzma.open(self.metric_cache_dict[token], "rb") as f: |
| metric_cache = pickle.load(f) |
| return metric_cache |
| |
| def union2one(self, queue, token): |
| """ |
| convert a sequence of the sample dict into one single sample. |
| """ |
|
|
| |
| imgs_list: List[torch.Tensor] = [ |
| each["img"].data for each in queue |
| ] |
| l2g_r_mat_list: List[torch.Tensor] = [ |
| to_tensor(each["l2g_r_mat"]) for each in queue |
| ] |
| l2g_t_list: List[torch.Tensor] = [ |
| to_tensor(each["l2g_t"]) for each in queue |
| ] |
| timestamp_list = [to_tensor(each["timestamp"]) for each in queue] |
|
|
| |
| metas_map = {} |
| prev_pos = None |
| prev_angle = None |
| for i, each in enumerate(queue): |
| metas_map[i] = each["img_metas"].data |
| if i == 0: |
| metas_map[i]["prev_bev"] = False |
| prev_pos = copy.deepcopy(metas_map[i]["can_bus"][:3]) |
| prev_angle = copy.deepcopy(metas_map[i]["can_bus"][-1]) |
| metas_map[i]["can_bus"][:3] = 0 |
| metas_map[i]["can_bus"][-1] = 0 |
| else: |
| metas_map[i]["prev_bev"] = True |
| tmp_pos = copy.deepcopy(metas_map[i]["can_bus"][:3]) |
| tmp_angle = copy.deepcopy(metas_map[i]["can_bus"][-1]) |
| metas_map[i]["can_bus"][:3] -= prev_pos |
| metas_map[i]["can_bus"][-1] -= prev_angle |
| prev_pos = copy.deepcopy(tmp_pos) |
| prev_angle = copy.deepcopy(tmp_angle) |
| |
|
|
| queue[-1]["img"] = DC( |
| torch.stack(imgs_list), cpu_only=False, stack=True |
| ) |
| queue[-1]["img_metas"] = DC(metas_map, cpu_only=True) |
|
|
|
|
| |
| queue = queue[-1] |
|
|
| |
| if self.cache_path is not None: |
| queue["metric_cache"] = DC(self.get_metric_cache(token), cpu_only=True) |
|
|
|
|
| queue["l2g_r_mat"] = DC(l2g_r_mat_list) |
| queue["l2g_t"] = DC(l2g_t_list) |
| queue["timestamp"] = DC(timestamp_list) |
|
|
| return queue |
| |
| |
| |
| def union2one_test(self, queue): |
| """ |
| convert a sequence of the sample dict into one single sample. |
| """ |
|
|
| |
| imgs_list: List[torch.Tensor] = [ |
| each["img"][0].data for each in queue |
| ] |
| l2g_r_mat_list: List[torch.Tensor] = [ |
| to_tensor(each["l2g_r_mat"]) for each in queue |
| ] |
| l2g_t_list: List[torch.Tensor] = [ |
| to_tensor(each["l2g_t"]) for each in queue |
| ] |
| timestamp_list = [to_tensor(each["timestamp"]) for each in queue] |
|
|
|
|
| |
| metas_map = {} |
| prev_pos = None |
| prev_angle = None |
| for i, each in enumerate(queue): |
| metas_map[i] = each["img_metas"][0].data |
| if i == 0: |
| metas_map[i]["prev_bev"] = False |
| prev_pos = copy.deepcopy(metas_map[i]["can_bus"][:3]) |
| prev_angle = copy.deepcopy(metas_map[i]["can_bus"][-1]) |
| metas_map[i]["can_bus"][:3] = 0 |
| metas_map[i]["can_bus"][-1] = 0 |
| else: |
| metas_map[i]["prev_bev"] = True |
| tmp_pos = copy.deepcopy(metas_map[i]["can_bus"][:3]) |
| tmp_angle = copy.deepcopy(metas_map[i]["can_bus"][-1]) |
| metas_map[i]["can_bus"][:3] -= prev_pos |
| metas_map[i]["can_bus"][-1] -= prev_angle |
| prev_pos = copy.deepcopy(tmp_pos) |
| prev_angle = copy.deepcopy(tmp_angle) |
| |
|
|
| queue[-1]["img"] = DC( |
| torch.stack(imgs_list), cpu_only=False, stack=True |
| ) |
| queue[-1]["img_metas"] = DC(metas_map, cpu_only=True) |
|
|
| |
| queue = queue[-1] |
| queue["l2g_r_mat"] = DC(l2g_r_mat_list) |
| queue["l2g_t"] = DC(l2g_t_list) |
| queue["timestamp"] = DC(timestamp_list) |
|
|
| return queue |
|
|
| def init_trackpredplan( |
| self, |
| predict_steps, |
| planning_steps, |
| past_steps, |
| fut_steps, |
| use_nonlinear_optimizer, |
| ): |
| |
| self.predict_steps = predict_steps |
| self.planning_steps = planning_steps |
| self.past_steps = past_steps |
| self.fut_steps = fut_steps |
| self.use_nonlinear_optimizer = use_nonlinear_optimizer |
|
|
| |
| def init_occupancy( |
| self, |
| occ_receptive_field, |
| occ_n_future, |
| occ_filter_invalid_sample, |
| occ_filter_by_valid_flag, |
| ): |
| self.occ_receptive_field = occ_receptive_field |
| self.occ_n_future = occ_n_future |
| self.occ_filter_invalid_sample = occ_filter_invalid_sample |
| self.occ_filter_by_valid_flag = occ_filter_by_valid_flag |
| self.occ_only_total_frames = 7 |
| assert self.occ_filter_by_valid_flag is False |
|
|
| def init_mapping(self, canvas_size, patch_size, lane_ann_file): |
| self.map_root = '/xxx/nuplan/dataset/maps' |
|
|
| self.map_num_classes = 3 |
| if canvas_size[0] == 50 or canvas_size[0] == 60: |
| self.thickness = 1 |
| elif canvas_size[0] == 200: |
| self.thickness = 2 |
| else: |
| assert False |
| self.patch_size = patch_size |
| self.canvas_size = canvas_size |
|
|
| self.vector_map = VectorizedLocalMap( |
| map_root=self.map_root, |
| patch_size=patch_size, |
| ) |
| return |
| |
| |
| def filter_pdm_process(self, pdm_data, token=None): |
| ''' |
| filter invalid 'nan' ego progress due to 0 length pdm |
| ''' |
| |
| progress = deepcopy(pdm_data['ego_progress']) |
| if np.isnan(pdm_data['score'][0]): |
| progress[np.isnan(progress)] = 1.0 |
| else: |
| progress[np.isnan(progress)] = 0.0 |
|
|
| if self.val==False: |
| progress = self.ep_dict[token] |
| progress[np.isnan(progress)] = 0.0 |
|
|
| scores = pdm_data['no_at_fault_collisions'] * pdm_data['drivable_area_compliance'] * \ |
| (5 * pdm_data['time_to_collision_within_bound'] + 5 * progress + 2 * pdm_data['comfort']) / 12 |
| |
| pdm_data['ego_progress'] = progress |
| pdm_data['score'] = deepcopy(scores) |
| return pdm_data |
| |
| def get_pdm_score_info(self, input_dict, index=None): |
| |
| if self.val: |
| base_path = '/nas/shared/opendrivelab/liuhaochen/pdm_8192_gt_cache_navtest2' |
| if self.full_fail_map_filter: |
| if index in self.full_fail_map_filter: |
| base_path = '/nas/shared/opendrivelab/liuhaochen/pdm_8192_gt_cache' |
| else: |
| base_path = '/nas/shared/opendrivelab/liuhaochen/pdm_8192_gt_cache' |
| |
| if self.fail_syn_map_filter: |
| if index in self.fail_syn_map_filter: |
| base_path = self.synthetic_pdm |
| log_name = input_dict['log_name'].split('-')[0] + '-' + input_dict['log_name'].split('-')[1] |
| else: |
| log_name = input_dict['log_name'] |
|
|
| load_path = base_path + '/' + log_name + '/' |
| type_list = os.listdir(load_path) |
| for t_name in type_list: |
| buf_dir = t_name+ '/' +input_dict['sample_idx'] |
| if os.path.exists(buf_dir): |
| break |
| load_path = load_path + buf_dir + '/pdm_cache.pkl' |
| |
| with lzma.open(load_path, "rb") as f: |
| data = pickle.load(f).trajectory |
| |
| if self.fail_syn_map_filter: |
| if index in self.fail_syn_map_filter: |
| for k in data.keys(): |
| |
| if data[k].shape[0] > 8192: |
| data[k] = data[k][1:] |
| else: |
| data = self.filter_pdm_process(data, input_dict['sample_idx']) |
| else: |
| data = self.filter_pdm_process(data, input_dict['sample_idx']) |
| input_dict.update(data) |
| |
| |
| return input_dict |
| |
| def get_zero_pdm(self, input_dict): |
| pdm_list = [ |
| "no_at_fault_collisions", |
| "drivable_area_compliance", |
| "ego_progress", |
| "time_to_collision_within_bound", |
| "comfort", |
| "score", |
| ] |
| for k in pdm_list: |
| input_dict[k] = np.zeros((8192,)) |
| return input_dict |
|
|
| def get_data_info(self, index, info=None, debug=False, prev_frame=False): |
| """Get data info according to the given index. |
| |
| Args: |
| index (int): Index of the sample data to get. |
| Returns: |
| dict: Data information that will be passed to the data \ |
| preprocessing pipelines. It includes the following keys: |
| |
| - sample_idx (str): Sample index. |
| - pts_filename (str): Filename of point clouds. |
| - sweeps (list[dict]): Infos of sweeps. |
| - timestamp (float): Sample timestamp. |
| - img_filename (str, optional): Image filename. |
| - lidar2img (list[np.ndarray], optional): Transformations \ |
| from lidar to different cameras. |
| - ann_info (dict): Annotation info. |
| """ |
|
|
| if info is None: |
| info = self.data_infos[index] |
| |
|
|
| |
| |
| input_dict = dict( |
| sample_idx=info["token"], |
| frame_idx=info["frame_idx"], |
| timestamp=info["timestamp"] / 1e6, |
| log_name=info["log_name"], |
| log_token=info["log_token"], |
| scene_name=info["scene_name"], |
| scene_token=info["scene_token"], |
| pts_filename=info["lidar_path"], |
| |
| prev=info["sample_prev"], |
| next=info["sample_next"], |
| ) |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
|
|
| input_dict = self.update_transform(input_dict=input_dict, index=index) |
| input_dict = self.update_sensor(input_dict=input_dict, index=index) |
| input_dict = self.update_canbus(input_dict=input_dict, index=index) |
| if prev_frame==False: |
| |
| input_dict = self.get_pdm_score_info(input_dict, index) |
| else: |
| input_dict = self.get_zero_pdm(input_dict) |
|
|
| |
| |
| input_dict["ann_info"] = self.get_ann_info(index) |
|
|
| |
|
|
| |
| |
| input_dict = self.update_ego_prediction(input_dict=input_dict, index=index) |
| input_dict = self.update_ego_planning(input_dict=input_dict, index=index) |
|
|
| |
| debug = False |
| if debug: |
| self.vis_data_per_frame( |
| data=input_dict, |
| index=index, |
| vis_root="data/viz_nuplan", |
| ) |
|
|
| return input_dict |
|
|
|
|
|
|
| def update_anno_token(self, annotation, index): |
| |
|
|
| return annotation |
|
|
| def update_tracking_prediction(self, annotation, index): |
| |
|
|
| info = self.data_infos[index] |
|
|
| |
| |
| |
| |
| |
| |
|
|
| |
| gt_fut_bbox_lidar = info["gt_fut_bbox_lidar"][annotation["mask"]] |
| gt_fut_bbox_mask = info["gt_fut_bbox_mask"][annotation["mask"]] |
| gt_pre_bbox_lidar = info["gt_pre_bbox_lidar"][annotation["mask"]] |
| gt_pre_bbox_mask = info["gt_pre_bbox_mask"][annotation["mask"]] |
| |
| |
| |
|
|
| |
| |
| gt_pre_bbox_lidar = np.flip(gt_pre_bbox_lidar, axis=1) |
| gt_pre_bbox_mask = np.flip(gt_pre_bbox_mask, axis=1) |
| |
| |
|
|
| |
| gt_fut_bbox_mask = np.repeat(gt_fut_bbox_mask, 2, axis=2) |
| gt_pre_bbox_mask = np.repeat(gt_pre_bbox_mask, 2, axis=2) |
|
|
| |
| |
| gt_pre_bbox_lidar = np.concatenate( |
| (gt_pre_bbox_lidar, gt_fut_bbox_lidar[:, :4]), axis=1 |
| ) |
| gt_pre_bbox_mask = np.concatenate( |
| (gt_pre_bbox_mask, gt_fut_bbox_mask[:, :4]), axis=1 |
| ) |
| |
| |
|
|
| |
| annotation.update( |
| dict( |
| gt_fut_traj=gt_fut_bbox_lidar[:, :, :2], |
| gt_fut_traj_mask=gt_fut_bbox_mask, |
| gt_past_traj=gt_pre_bbox_lidar[:, :, :2], |
| gt_past_traj_mask=gt_pre_bbox_mask, |
| ) |
| ) |
| return annotation |
| |
| def update_mapping(self, input_dict, index): |
| |
| polyline_dict, drivable_area = self.update_map_vectorized(input_dict, index) |
|
|
| input_dict = self.update_map_rasterized(input_dict, index, polyline_dict, drivable_area) |
|
|
| return input_dict |
|
|
| def update_map_vectorized(self, input_dict, index): |
| info = self.data_infos[index] |
|
|
| map_location = info['map_location'] |
| e2g_translation = info['ego2global_translation'] |
| e2g_rotation = info['ego2global_rotation'] |
| if len(e2g_rotation) != 4: |
| e2g_rotation = Quaternion._from_matrix(e2g_rotation).elements |
| polyline_dict = self.vector_map.gen_vectorized_samples(e2g_translation, e2g_rotation, map_location) |
|
|
| drivable_area = self.vector_map.gen_drivable_area(e2g_translation, e2g_rotation, map_location) |
|
|
| return polyline_dict, drivable_area |
|
|
| def update_map_rasterized(self, input_dict, index, polyline_dict, drivable_area, debug=False): |
| gt_labels = [] |
| gt_bboxes = [] |
| gt_masks = [] |
|
|
| origin = np.array([self.canvas_size[1] // 2, self.canvas_size[0] // 2]) |
| scale = np.array([self.canvas_size[1] / self.patch_size[0], self.canvas_size[0] / self.patch_size[1]]) |
|
|
| for polyline in polyline_dict: |
| if polyline["type"] == 0: |
| continue |
| |
| map_mask = np.zeros(self.canvas_size, np.uint8) |
| draw_coor = (polyline["pts"] * scale + origin).round().astype(np.int32) |
| gt_mask = cv2.polylines(map_mask, [draw_coor], False, color=1, thickness=self.thickness) / 255 |
|
|
| ys, xs = np.where(gt_mask) |
| try: |
| gt_bbox = [min(xs), min(ys), max(xs), max(ys)] |
| except ValueError: |
| gt_bbox = [0, 0, 1, 1] |
| continue |
|
|
| cls = polyline["type"] |
|
|
| gt_labels.append(cls) |
| gt_bboxes.append(gt_bbox) |
| gt_masks.append(gt_mask) |
|
|
| |
| map_mask = np.zeros(self.canvas_size, np.uint8) |
| exteriors = [] |
| interiors = [] |
| for p in drivable_area.geoms: |
| exteriors.append( |
| (np.array(p.exterior.coords) * scale + origin).round().astype(np.int32) |
| ) |
| for inter in p.interiors: |
| interiors.append( |
| (np.array(inter.coords) * scale + origin).round().astype(np.int32) |
| ) |
| cv2.fillPoly(map_mask, exteriors, 255) |
| cv2.fillPoly(map_mask, interiors, 0) |
| map_mask = map_mask / 255 |
| try: |
| ys, xs = np.where(map_mask) |
| gt_bbox = [min(xs), min(ys), max(xs), max(ys)] |
| gt_labels.append(self.map_num_classes) |
| gt_bboxes.append(gt_bbox) |
| gt_masks.append(map_mask) |
| except ValueError: |
| pass |
|
|
| gt_labels = torch.tensor(gt_labels) |
| gt_bboxes = torch.from_numpy(np.stack(gt_bboxes)) |
| gt_masks = torch.from_numpy(np.stack(gt_masks)) |
|
|
| |
| input_dict.update( |
| dict( |
| gt_lane_labels=gt_labels, |
| gt_lane_bboxes=gt_bboxes, |
| gt_lane_masks=gt_masks, |
| ) |
| ) |
| return input_dict |
|
|
| def update_ego_prediction(self, input_dict, index): |
| |
| |
|
|
| info = self.data_infos[index] |
| |
|
|
| |
| gt_sdc_bbox = info["gt_sdc_bbox_lidar"][:9].reshape((1, -1)) |
| gt_sdc_bbox = LiDARInstance3DBoxes( |
| gt_sdc_bbox, box_dim=gt_sdc_bbox.shape[-1], origin=(0.5, 0.5, 0) |
| ).convert_to(self.box_mode_3d) |
| gt_sdc_bbox = DC(gt_sdc_bbox, cpu_only=True) |
|
|
| |
| gt_sdc_label = np.array([0]) |
| gt_sdc_label = DC(to_tensor(gt_sdc_label)) |
|
|
| sdc_status=info["gt_sdc_bbox_lidar"][:9] |
|
|
| |
| gt_pre_bbox_sdc_lidar = info["gt_pre_bbox_sdc_lidar"] |
| gt_fut_bbox_sdc_lidar = info["gt_fut_bbox_sdc_lidar"] |
| |
| |
| |
| |
| |
| |
|
|
| |
| gt_pre_bbox_sdc_global = info["gt_pre_bbox_sdc_global"] |
| gt_fut_bbox_sdc_global = info["gt_fut_bbox_sdc_global"] |
| |
| |
| |
| |
| |
|
|
| |
| |
|
|
| |
| gt_fut_bbox_sdc_mask = info["gt_fut_bbox_sdc_mask"] |
| gt_fut_bbox_sdc_mask = np.repeat(gt_fut_bbox_sdc_mask, 2, axis=2) |
| gt_pre_bbox_sdc_mask = info["gt_pre_bbox_sdc_mask"] |
| gt_pre_bbox_sdc_mask = np.repeat(gt_pre_bbox_sdc_mask, 4, axis=2) |
|
|
| gt_pre_command_sdc = info["gt_pre_command_sdc"] |
|
|
| |
| input_dict.update( |
| dict( |
| |
| gt_sdc_bbox=gt_sdc_bbox, |
| gt_sdc_label=gt_sdc_label, |
| gt_sdc_fut_traj=gt_fut_bbox_sdc_lidar[ |
| :, :, :2 |
| ], |
| gt_sdc_fut_traj_mask=gt_fut_bbox_sdc_mask, |
| |
| command=info["command"], |
| sdc_planning_world=gt_fut_bbox_sdc_global[ |
| :, : self.planning_steps, [0, 1, 2] |
| ], |
| sdc_planning=gt_fut_bbox_sdc_lidar[ |
| :, : self.planning_steps, [0, 1, 6] |
| ], |
| sdc_planning_mask=gt_fut_bbox_sdc_mask[ |
| :, : self.planning_steps |
| ], |
| sdc_planning_past=info["gt_pre_bbox_sdc_lidar"][:, :, [0, 1, 6]], |
| sdc_planning_mask_past=gt_pre_bbox_sdc_mask, |
| gt_pre_command_sdc=gt_pre_command_sdc, |
| sdc_status=sdc_status[[0, 1, 6]] |
| ) |
| ) |
|
|
| return input_dict |
|
|
| def update_ego_planning(self, input_dict, index): |
| info = self.data_infos[index] |
|
|
| |
| return input_dict |
|
|
| def vis_dataset_specific(self, data: Dict): |
| |
| |
| |
|
|
| |
| |
| |
| |
|
|
| try: |
| split = "trainval" |
| parent_dir = f"./data/openscene-v1.1/sensor_blobs/{split}" |
| pts_filename: str = os.path.join(parent_dir, data["pts_filename"]) |
| points = PointCloud.parse_from_file(pts_filename).to_pcd_bin2() |
| except FileNotFoundError: |
| try: |
| split = "test" |
| parent_dir = f"./data/openscene-v1.1/sensor_blobs/{split}" |
| pts_filename: str = os.path.join(parent_dir, data["pts_filename"]) |
| points = PointCloud.parse_from_file(pts_filename).to_pcd_bin2() |
| except FileNotFoundError: |
| split = "mini" |
| parent_dir = f"./data/openscene-v1.1/sensor_blobs/{split}" |
| pts_filename: str = os.path.join(parent_dir, data["pts_filename"]) |
| points = PointCloud.parse_from_file(pts_filename).to_pcd_bin2() |
|
|
| points = points[:3].T |
|
|
| image_file_list = [] |
| for cam_name in range(8): |
| |
| image_file: str = os.path.join(parent_dir, data["img_filename"][cam_name]) |
| image_file_list.append(image_file) |
|
|
| return { |
| "lidar": points, |
| "command_dict": { |
| 0: "TURN LEFT", |
| 1: "KEEP FORWARD", |
| 2: "TURN RIGHT", |
| 3: "UNKNOWN", |
| }, |
| "image_file_list": image_file_list, |
| "left_hand": False, |
| } |
|
|