R2SE_model / mmdet3d_plugin /datasets /navsim_openscenes_nuplan_2.py
unknownuser6666's picture
Upload folder using huggingface_hub
663494c verified
# ---------------------------------------------------------------------------------#
# UniAD: Planning-oriented Autonomous Driving (https://arxiv.org/abs/2212.10156) #
# Source code: https://github.com/OpenDriveLab/UniAD #
# Copyright (c) OpenDriveLab. All rights reserved. #
# ---------------------------------------------------------------------------------#
import copy
import hashlib
import json
import os
import pickle
import pprint
import random
import tempfile
from os import path as osp
from typing import Dict, List
import sys
from pathlib import Path
import mmcv
import numpy as np
import scipy
import cv2
import torch
from mmcv.parallel import DataContainer as DC
from mmdet3d.core.bbox import LiDARInstance3DBoxes
from mmdet3d_plugin.datasets import NuScenesE2EDataset
from mmdet3d_plugin.datasets.data_utils.rasterize import preprocess_map
from mmdet3d_plugin.datasets.nuplan.nuplan_pointcloud import PointCloud
from mmdet3d_plugin.eval.detection.config import config_factory_nuPlan
from mmdet3d_plugin.uniad.dense_heads.planning_head_plugin.compute_metrics import (
PredictionConfig, compute_metrics)
from mmdet.datasets import DATASETS
from mmdet.datasets.pipelines import to_tensor
from nuscenes import NuScenes
from nuscenes.eval.common.utils import Quaternion, quaternion_yaw
from nuscenes.eval.tracking.evaluate import TrackingEval
from nuscenes.prediction import convert_local_coords_to_global
from PIL import Image
from prettytable import PrettyTable
from mmdet3d_plugin.datasets.data_utils.nuplan_vector_map import VectorizedLocalMap
from copy import deepcopy
import lzma
import pandas as pd
from glob import glob
@DATASETS.register_module()
class NavSimOpenScenesE2E(NuScenesE2EDataset):
r"""OpenScenes E2E Dataset"""
CLASSES = (
"vehicle",
"bicycle",
"pedestrian",
"traffic_cone",
"barrier",
"czone_sign",
"generic_object",
)
def __init__(self, *args, duplicate=True,finetune_yaml=None, cache_path=None, **kwargs):
# Initialize the parent class
self.finetune_yaml = finetune_yaml
self.duplicate=duplicate
# specialized variable for WE finetuning
# self.synthetic_pdm = None
self.fail_syn_map_filter = None
self.synthetic_img_root = None
self.full_fail_map_filter = None
self.cache_path = cache_path
self.metric_cache_tokens = None
if self.cache_path is not None:
print(f'loading cache info from:{cache_path}')
self.init_metric_cache_paths(cache_path)
super(NavSimOpenScenesE2E, self).__init__(*args, **kwargs)
self.cache_path = cache_path
def init_metric_cache_paths(self, cache_path):
"""
Helper function to load all cache file paths from folder.
:param cache_path: directory of cache folder
:return: dictionary of token and file path
"""
metadata_dir = Path(cache_path) / "metadata"
metadata_file = [file for file in metadata_dir.iterdir() if ".csv" in str(file)][0]
with open(str(metadata_file), "r") as f:
cache_paths = f.read().splitlines()[1:]
self.metric_cache_dict = {cache_path.split("/")[-2]: cache_path for cache_path in cache_paths}
print(f'cache loaded, size {len(self.metric_cache_dict.keys())}')
# print(list(self.metric_cache_dict.keys())[:5])
# assert 1==0
self.metric_cache_tokens = set(self.metric_cache_dict.keys())
def load_annotations(self, ann_file):
print('loading dataset!')
if self.file_client_args["backend"] == "disk":
print(ann_file)
data_infos = mmcv.load(ann_file, file_format="pkl")
try:
data_infos = data_infos['infos']
except:
pass
print(len(data_infos))
elif self.file_client_args["backend"] == "petrel":
data = pickle.loads(self.file_client.get(ann_file))
data_infos = list(sorted(data["infos"], key=lambda e: e["timestamp"]))
data_infos = data_infos[:: self.load_interval]
self.metadata = data["metadata"]
self.version = self.metadata["version"]
else:
assert False, "Invalid file_client_args!"
print('load pickle down!')
# load scene filter
import yaml
print(ann_file)
if 'val' in ann_file:
print('use navtest filter...')
self.val=True
nav_filter = os.path.join(self.data_root, 'navsim_filter/navtest.yaml')
else:
print('use navtrain filter...')
self.val=False
nav_filter = os.path.join(self.data_root, 'navsim_filter/navtrain.yaml')
if self.use_num_split >0 and ('val' not in ann_file) and (self.finetune_yaml==None):
scene_filter = []
for i in range(5):
if i>=self.use_num_split:
break
path = f'/nas/shared/opendrivelab/liuhaochen/navtrain_split/split_{i}.yaml'
with open(path, 'r') as file:
nav_filter = yaml.safe_load(file)
scene_filter += nav_filter['tokens']
else:
if self.finetune_yaml is not None:
print(f'loading finetuning yaml from:{self.finetune_yaml}')
nav_filter = self.finetune_yaml
with open(nav_filter, 'r') as file:
nav_filter = yaml.safe_load(file)
try:
scene_filter = nav_filter['tokens']
except:
scene_filter = nav_filter['scenario_tokens']
self.scene_filter = set(scene_filter)
self.index_map = []
for i, info in enumerate(data_infos):
if info['token'] in self.scene_filter:
valid_len=np.sum(info['gt_fut_bbox_sdc_mask'])
if valid_len != self.planning_steps:
continue
self.index_map.append(i)
if self.use_num_split > 0 and self.val==False and self.duplicate==True:
self.index_map = self.duplicate_indices(self.index_map, n=5/self.use_num_split)
print(f'filtering {len(data_infos)} frames to {len(self.index_map)}...')
# use log_token in OpenScene as scene_token.
for info in data_infos:
info['scene_token'] = info['log_token']
self.finetuning_mask = None
return data_infos
def duplicate_indices(self, indices, n):
int_part = int(n) # Get the integer part
frac_part = n - int_part # Get the fractional part
duplicated = indices * int_part # Duplicate the integer part
if frac_part > 0:
num_extra = int(len(indices) * frac_part) # Determine how many extra indices to add
extra_indices = list(np.random.choice(indices, num_extra, replace=False)) # Sample without replacement
duplicated.extend(extra_indices)
return duplicated
def __len__(self):
return len(self.index_map)
def __getitem__(self, idx):
# map nav filter idx to the data_infos idx.
new_idx = self.index_map[idx]
if self.test_mode:
# print('test_data')
return self.prepare_test_data(new_idx)
while True:
# print('train_data')
data = self.prepare_train_data(new_idx)
if data is None:
idx = self._rand_another(idx)
new_idx = self.index_map[idx]
continue
return data
def init_dataset(self):
self.sdc_acc_ego_fix = False
# update detection config to handle evaluation with different classes
self.eval_detection_configs = config_factory_nuPlan()
self.default_attr = {
"vehicle": "vehicle.parked",
"bicycle": "cycle.without_rider",
"pedestrian": "pedestrian.moving",
"traffic_cone": "",
"barrier": "",
"czone_sign": "",
"generic_object": "",
}
return
def prepare_train_data(self, index):
return self.prepare_test_data(index)
def prepare_test_data(self, index):
"""
Training data preparation.
Args:
index (int): Index for accessing the target data.
Returns:
dict: Training data dict of the corresponding index.
img: queue_length, 6, 3, H, W
img_metas: img_metas of each frame (list)
gt_globals_3d: gt_globals of each frame (list)
gt_bboxes_3d: gt_bboxes of each frame (list)
gt_inds: gt_inds of each frame (list)
"""
data_queue = []
self.enbale_temporal_aug = False
# ensure the first and final frame in same scene
final_index = index
first_index = index - self.queue_length + 1
if first_index < 0:
return None
if (
self.data_infos[first_index]["scene_token"]
!= self.data_infos[final_index]["scene_token"]
):
print(index, 2)
return None
# current timestamp
if self.cache_path is not None:
if self.data_infos[index]["token"] not in self.metric_cache_tokens:
print(f'Token: {self.data_infos[index]["scene_token"]} not in Metric cache')
return None
input_dict = self.get_data_info(final_index)
### get finetuning:
if self.finetuning_mask is not None:
input_dict['fail_mask'] = self.finetuning_mask[self.reverse_index_map[index]]
prev_indexs_list = list(reversed(range(first_index, final_index)))
if input_dict is None:
print(index, index in self.fail_syn_map_filter ,3)
return None
frame_idx = input_dict["frame_idx"]
scene_token = input_dict["scene_token"]
self.pre_pipeline(input_dict)
example = self.pipeline(input_dict)
data_queue.insert(0, example)
########## retrieve previous infos, frame by frame
for i in prev_indexs_list:
input_dict = self.get_data_info(i, prev_frame=True)
if self.finetuning_mask is not None:
input_dict['fail_mask'] = self.finetuning_mask[self.reverse_index_map[index]]
if input_dict is None:
print(index, index in self.fail_syn_map_filter, 4)
return None
if (
input_dict["frame_idx"] < frame_idx
and input_dict["scene_token"] == scene_token
):
self.pre_pipeline(input_dict)
example = self.pipeline(input_dict)
frame_idx = input_dict["frame_idx"]
data_queue.insert(0, copy.deepcopy(example))
# merge a sequence of data into one dictionary, only for temporal data
if self.val or self.test_mode:
data_queue = self.union2one_test(data_queue)
else:
data_queue = self.union2one(data_queue, self.data_infos[index]["token"] )
return data_queue
def get_metric_cache(self, token):
with lzma.open(self.metric_cache_dict[token], "rb") as f:
metric_cache = pickle.load(f)
return metric_cache
def union2one(self, queue, token):
"""
convert a sequence of the sample dict into one single sample.
"""
# sensor data and transform
imgs_list: List[torch.Tensor] = [
each["img"].data for each in queue
] # L, C x 3 x H x W
l2g_r_mat_list: List[torch.Tensor] = [
to_tensor(each["l2g_r_mat"]) for each in queue
] # L, 3 x 3
l2g_t_list: List[torch.Tensor] = [
to_tensor(each["l2g_t"]) for each in queue
] # L, 3
timestamp_list = [to_tensor(each["timestamp"]) for each in queue] # L, 1
# converting the global absolute coordinate into relative position and orientation change
metas_map = {}
prev_pos = None
prev_angle = None
for i, each in enumerate(queue):
metas_map[i] = each["img_metas"].data
if i == 0:
metas_map[i]["prev_bev"] = False
prev_pos = copy.deepcopy(metas_map[i]["can_bus"][:3])
prev_angle = copy.deepcopy(metas_map[i]["can_bus"][-1])
metas_map[i]["can_bus"][:3] = 0
metas_map[i]["can_bus"][-1] = 0
else:
metas_map[i]["prev_bev"] = True
tmp_pos = copy.deepcopy(metas_map[i]["can_bus"][:3])
tmp_angle = copy.deepcopy(metas_map[i]["can_bus"][-1])
metas_map[i]["can_bus"][:3] -= prev_pos
metas_map[i]["can_bus"][-1] -= prev_angle
prev_pos = copy.deepcopy(tmp_pos)
prev_angle = copy.deepcopy(tmp_angle)
# print(metas_map[i]["can_bus"][:3])
queue[-1]["img"] = DC(
torch.stack(imgs_list), cpu_only=False, stack=True
) # T x N x 3 x H x W
queue[-1]["img_metas"] = DC(metas_map, cpu_only=True)
# inherent all the labels for the current frame only
queue = queue[-1]
# merge all other labels that requires temporal and merge from queue
if self.cache_path is not None:
queue["metric_cache"] = DC(self.get_metric_cache(token), cpu_only=True)
queue["l2g_r_mat"] = DC(l2g_r_mat_list)
queue["l2g_t"] = DC(l2g_t_list)
queue["timestamp"] = DC(timestamp_list)
return queue
def union2one_test(self, queue):
"""
convert a sequence of the sample dict into one single sample.
"""
# sensor data and transform
imgs_list: List[torch.Tensor] = [
each["img"][0].data for each in queue
] # L, C x 3 x H x W
l2g_r_mat_list: List[torch.Tensor] = [
to_tensor(each["l2g_r_mat"]) for each in queue
] # L, 3 x 3
l2g_t_list: List[torch.Tensor] = [
to_tensor(each["l2g_t"]) for each in queue
] # L, 3
timestamp_list = [to_tensor(each["timestamp"]) for each in queue] # L, 1
# converting the global absolute coordinate into relative position and orientation change
metas_map = {}
prev_pos = None
prev_angle = None
for i, each in enumerate(queue):
metas_map[i] = each["img_metas"][0].data
if i == 0:
metas_map[i]["prev_bev"] = False
prev_pos = copy.deepcopy(metas_map[i]["can_bus"][:3])
prev_angle = copy.deepcopy(metas_map[i]["can_bus"][-1])
metas_map[i]["can_bus"][:3] = 0
metas_map[i]["can_bus"][-1] = 0
else:
metas_map[i]["prev_bev"] = True
tmp_pos = copy.deepcopy(metas_map[i]["can_bus"][:3])
tmp_angle = copy.deepcopy(metas_map[i]["can_bus"][-1])
metas_map[i]["can_bus"][:3] -= prev_pos
metas_map[i]["can_bus"][-1] -= prev_angle
prev_pos = copy.deepcopy(tmp_pos)
prev_angle = copy.deepcopy(tmp_angle)
# print(metas_map[i]["can_bus"][:3])
queue[-1]["img"] = DC(
torch.stack(imgs_list), cpu_only=False, stack=True
) # T x N x 3 x H x W
queue[-1]["img_metas"] = DC(metas_map, cpu_only=True)
# inherent all the labels for the current frame only
queue = queue[-1]
queue["l2g_r_mat"] = DC(l2g_r_mat_list)
queue["l2g_t"] = DC(l2g_t_list)
queue["timestamp"] = DC(timestamp_list)
return queue
def init_trackpredplan(
self,
predict_steps,
planning_steps,
past_steps,
fut_steps,
use_nonlinear_optimizer,
):
# trajectory APIs for tracking/prediction/planning
self.predict_steps = predict_steps
self.planning_steps = planning_steps
self.past_steps = past_steps
self.fut_steps = fut_steps
self.use_nonlinear_optimizer = use_nonlinear_optimizer
def init_occupancy(
self,
occ_receptive_field,
occ_n_future,
occ_filter_invalid_sample,
occ_filter_by_valid_flag,
):
self.occ_receptive_field = occ_receptive_field # past + current
self.occ_n_future = occ_n_future # future only
self.occ_filter_invalid_sample = occ_filter_invalid_sample
self.occ_filter_by_valid_flag = occ_filter_by_valid_flag
self.occ_only_total_frames = 7 # NOTE: hardcode, not influenced by planning
assert self.occ_filter_by_valid_flag is False
def init_mapping(self, canvas_size, patch_size, lane_ann_file):
self.map_root = '/xxx/nuplan/dataset/maps'
self.map_num_classes = 3
if canvas_size[0] == 50 or canvas_size[0] == 60:
self.thickness = 1
elif canvas_size[0] == 200:
self.thickness = 2
else:
assert False
self.patch_size = patch_size
self.canvas_size = canvas_size
self.vector_map = VectorizedLocalMap(
map_root=self.map_root,
patch_size=patch_size,
)
return
def filter_pdm_process(self, pdm_data, token=None):
'''
filter invalid 'nan' ego progress due to 0 length pdm
'''
progress = deepcopy(pdm_data['ego_progress'])
if np.isnan(pdm_data['score'][0]):
progress[np.isnan(progress)] = 1.0
else:
progress[np.isnan(progress)] = 0.0
if self.val==False:
progress = self.ep_dict[token]
progress[np.isnan(progress)] = 0.0
scores = pdm_data['no_at_fault_collisions'] * pdm_data['drivable_area_compliance'] * \
(5 * pdm_data['time_to_collision_within_bound'] + 5 * progress + 2 * pdm_data['comfort']) / 12
pdm_data['ego_progress'] = progress
pdm_data['score'] = deepcopy(scores)
return pdm_data
def get_pdm_score_info(self, input_dict, index=None):
if self.val:
base_path = '/nas/shared/opendrivelab/liuhaochen/pdm_8192_gt_cache_navtest2'
if self.full_fail_map_filter:
if index in self.full_fail_map_filter:
base_path = '/nas/shared/opendrivelab/liuhaochen/pdm_8192_gt_cache'
else:
base_path = '/nas/shared/opendrivelab/liuhaochen/pdm_8192_gt_cache'
if self.fail_syn_map_filter:
if index in self.fail_syn_map_filter:
base_path = self.synthetic_pdm
log_name = input_dict['log_name'].split('-')[0] + '-' + input_dict['log_name'].split('-')[1]
else:
log_name = input_dict['log_name']
load_path = base_path + '/' + log_name + '/'
type_list = os.listdir(load_path)
for t_name in type_list:
buf_dir = t_name+ '/' +input_dict['sample_idx']
if os.path.exists(buf_dir):
break
load_path = load_path + buf_dir + '/pdm_cache.pkl'
# try:
with lzma.open(load_path, "rb") as f:
data = pickle.load(f).trajectory
if self.fail_syn_map_filter:
if index in self.fail_syn_map_filter:
for k in data.keys():
#emit the pdm's score
if data[k].shape[0] > 8192:
data[k] = data[k][1:]
else:
data = self.filter_pdm_process(data, input_dict['sample_idx'])
else:
data = self.filter_pdm_process(data, input_dict['sample_idx'])
input_dict.update(data)
# except:
# print(f'path not found:{load_path}')
return input_dict
def get_zero_pdm(self, input_dict):
pdm_list = [
"no_at_fault_collisions",
"drivable_area_compliance",
"ego_progress",
"time_to_collision_within_bound",
"comfort",
"score",
]
for k in pdm_list:
input_dict[k] = np.zeros((8192,))
return input_dict
def get_data_info(self, index, info=None, debug=False, prev_frame=False):
"""Get data info according to the given index.
Args:
index (int): Index of the sample data to get.
Returns:
dict: Data information that will be passed to the data \
preprocessing pipelines. It includes the following keys:
- sample_idx (str): Sample index.
- pts_filename (str): Filename of point clouds.
- sweeps (list[dict]): Infos of sweeps.
- timestamp (float): Sample timestamp.
- img_filename (str, optional): Image filename.
- lidar2img (list[np.ndarray], optional): Transformations \
from lidar to different cameras.
- ann_info (dict): Annotation info.
"""
if info is None:
info = self.data_infos[index]
# standard protocal modified from SECOND.Pytorch
input_dict = dict(
sample_idx=info["token"], # str: OpenScenes unique sample token
frame_idx=info["frame_idx"], # int: 0-indexed frame IDs
timestamp=info["timestamp"] / 1e6, # int: OpenScenes unique time index
log_name=info["log_name"], # str: OpenScenes unique sample token
log_token=info["log_token"], # str: OpenScenes unique sample token
scene_name=info["scene_name"], # str: OpenScenes sequence name
scene_token=info["scene_token"], # str: OpenScenes sequence name
pts_filename=info["lidar_path"], # str: relative path for the lidar data
# sweeps=info["sweeps"], # List[Dict]: list of infos for sweep data
prev=info["sample_prev"], # str: OpenScenes unique sample token
next=info["sample_next"], # str: OpenScenes unique sample token
)
# print("\n\nnew frame")
# print("scene_token", info["scene_token"])
# print("frame_idx", info["frame_idx"])
# print("sample_token", info["token"])
# print("log_name", info["log_name"])
# print("log_token", info["log_token"])
# # test for a specific sequence with turning
# if info["scene_token"] not in ["325cef682f064c55a255f2625c533b75"] or info[
# "frame_idx"
# ] not in [2, 3, 4, 5, 6, 7]:
# return
# if info["scene_token"] not in ["fcbccedd61424f1b85dcbf8f897f9754"]:
# return
# if info['frame_idx'] not in [0, 1, 2, 3, 4, 5]:
# return
input_dict = self.update_transform(input_dict=input_dict, index=index)
input_dict = self.update_sensor(input_dict=input_dict, index=index)
input_dict = self.update_canbus(input_dict=input_dict, index=index)
if prev_frame==False:
# only count for curr frame PDMScore
input_dict = self.get_pdm_score_info(input_dict, index)
else:
input_dict = self.get_zero_pdm(input_dict)
# generate annotation for detection/tracking, putting them in the annotation so that
# we could do range filtering altogether in the transform_3d.py
input_dict["ann_info"] = self.get_ann_info(index)
# # all data ultimately needs to be in the input_dict for downstream heads
# input_dict = self.update_mapping(input_dict=input_dict, index=index)
# input_dict = self.update_occupancy(input_dict=input_dict, index=index)
input_dict = self.update_ego_prediction(input_dict=input_dict, index=index)
input_dict = self.update_ego_planning(input_dict=input_dict, index=index)
# visualization for debugging
debug = False
if debug:
self.vis_data_per_frame(
data=input_dict,
index=index,
vis_root="data/viz_nuplan",
)
return input_dict
def update_anno_token(self, annotation, index):
# add OpenScenes-specific token information
return annotation
def update_tracking_prediction(self, annotation, index):
# reusing from the Carla dataloader
info = self.data_infos[index]
# print("\n\nproblem")
# print("scene_token", info["scene_token"])
# print("frame_idx", info["frame_idx"])
# print("sample_token", info["token"])
# print("log_name", info["log_name"])
# print("log_token", info["log_token"])
# get trajectories for surrounding agents, lidar coordinate, 2Hz
gt_fut_bbox_lidar = info["gt_fut_bbox_lidar"][annotation["mask"]] # N x 12 x 9
gt_fut_bbox_mask = info["gt_fut_bbox_mask"][annotation["mask"]] # N x 12 x 1
gt_pre_bbox_lidar = info["gt_pre_bbox_lidar"][annotation["mask"]] # N x 4 x 9
gt_pre_bbox_mask = info["gt_pre_bbox_mask"][annotation["mask"]] # N x 4 x 1
# print('\nraw')
# print(gt_pre_bbox_lidar[3])
# print(gt_fut_bbox_lidar[3])
# change the past traj into backward order to satisfy UniAD
# e.g., -4, -3, -2, -1 changed to -1, -2, -3, -4
gt_pre_bbox_lidar = np.flip(gt_pre_bbox_lidar, axis=1) # N x 4 x 9
gt_pre_bbox_mask = np.flip(gt_pre_bbox_mask, axis=1) # N x 4 x 1
# print('\nreverse')
# print(gt_pre_bbox_lidar[3])
# repeat redundant channels for masks
gt_fut_bbox_mask = np.repeat(gt_fut_bbox_mask, 2, axis=2) # N x 12 x 2
gt_pre_bbox_mask = np.repeat(gt_pre_bbox_mask, 2, axis=2) # N x 4 x 2
# append 4 future frames of trajectories
# so now, it contains frames in the order of -1, -2, -3, -4, 1, 2, 3, 4
gt_pre_bbox_lidar = np.concatenate(
(gt_pre_bbox_lidar, gt_fut_bbox_lidar[:, :4]), axis=1
) # N x 8 x 9
gt_pre_bbox_mask = np.concatenate(
(gt_pre_bbox_mask, gt_fut_bbox_mask[:, :4]), axis=1
) # N x 8 x 2
# print('\nput together')
# print(gt_pre_bbox_lidar[3])
# update output dictionary
annotation.update(
dict(
gt_fut_traj=gt_fut_bbox_lidar[:, :, :2], # N x 12 x 2, lidar coordinate
gt_fut_traj_mask=gt_fut_bbox_mask, # N x 12 x 2
gt_past_traj=gt_pre_bbox_lidar[:, :, :2], # N x 8 x 2
gt_past_traj_mask=gt_pre_bbox_mask, # N x 8 x 2
)
)
return annotation
def update_mapping(self, input_dict, index):
# get polyline in the ego coordinate
polyline_dict, drivable_area = self.update_map_vectorized(input_dict, index)
input_dict = self.update_map_rasterized(input_dict, index, polyline_dict, drivable_area)
return input_dict
def update_map_vectorized(self, input_dict, index):
info = self.data_infos[index]
map_location = info['map_location']
e2g_translation = info['ego2global_translation']
e2g_rotation = info['ego2global_rotation']
if len(e2g_rotation) != 4:
e2g_rotation = Quaternion._from_matrix(e2g_rotation).elements
polyline_dict = self.vector_map.gen_vectorized_samples(e2g_translation, e2g_rotation, map_location)
drivable_area = self.vector_map.gen_drivable_area(e2g_translation, e2g_rotation, map_location)
return polyline_dict, drivable_area
def update_map_rasterized(self, input_dict, index, polyline_dict, drivable_area, debug=False):
gt_labels = []
gt_bboxes = []
gt_masks = []
origin = np.array([self.canvas_size[1] // 2, self.canvas_size[0] // 2])
scale = np.array([self.canvas_size[1] / self.patch_size[0], self.canvas_size[0] / self.patch_size[1]])
for polyline in polyline_dict:
if polyline["type"] == 0:
continue
# skip centerline
map_mask = np.zeros(self.canvas_size, np.uint8)
draw_coor = (polyline["pts"] * scale + origin).round().astype(np.int32)
gt_mask = cv2.polylines(map_mask, [draw_coor], False, color=1, thickness=self.thickness) / 255
ys, xs = np.where(gt_mask)
try:
gt_bbox = [min(xs), min(ys), max(xs), max(ys)]
except ValueError:
gt_bbox = [0, 0, 1, 1]
continue
cls = polyline["type"]
gt_labels.append(cls)
gt_bboxes.append(gt_bbox)
gt_masks.append(gt_mask)
# for stuff class, drivable area
map_mask = np.zeros(self.canvas_size, np.uint8)
exteriors = []
interiors = []
for p in drivable_area.geoms:
exteriors.append(
(np.array(p.exterior.coords) * scale + origin).round().astype(np.int32)
)
for inter in p.interiors:
interiors.append(
(np.array(inter.coords) * scale + origin).round().astype(np.int32)
)
cv2.fillPoly(map_mask, exteriors, 255)
cv2.fillPoly(map_mask, interiors, 0)
map_mask = map_mask / 255
try:
ys, xs = np.where(map_mask)
gt_bbox = [min(xs), min(ys), max(xs), max(ys)]
gt_labels.append(self.map_num_classes)
gt_bboxes.append(gt_bbox)
gt_masks.append(map_mask)
except ValueError:
pass
gt_labels = torch.tensor(gt_labels)
gt_bboxes = torch.from_numpy(np.stack(gt_bboxes))
gt_masks = torch.from_numpy(np.stack(gt_masks)) # N x H x W
# update output dictionary
input_dict.update(
dict(
gt_lane_labels=gt_labels,
gt_lane_bboxes=gt_bboxes,
gt_lane_masks=gt_masks,
)
)
return input_dict
def update_ego_prediction(self, input_dict, index):
# in replacement of both update_ego_prediction and update_ego_planning
# reusing from the Carla dataloader
info = self.data_infos[index]
# print('\n\nprint out update_ego_prediction now\n')
# retrieve ego bounding box in the current frame in lidar coordinate
gt_sdc_bbox = info["gt_sdc_bbox_lidar"][:9].reshape((1, -1)) # 1 x 9
gt_sdc_bbox = LiDARInstance3DBoxes(
gt_sdc_bbox, box_dim=gt_sdc_bbox.shape[-1], origin=(0.5, 0.5, 0)
).convert_to(self.box_mode_3d)
gt_sdc_bbox = DC(gt_sdc_bbox, cpu_only=True)
# retrieve ego class label, default it car
gt_sdc_label = np.array([0])
gt_sdc_label = DC(to_tensor(gt_sdc_label))
sdc_status=info["gt_sdc_bbox_lidar"][:9]
# ego trajectory in lidar coordinate
gt_pre_bbox_sdc_lidar = info["gt_pre_bbox_sdc_lidar"] # 1 x 4 x 9
gt_fut_bbox_sdc_lidar = info["gt_fut_bbox_sdc_lidar"] # 1 x 12 x 9
# print('gt_pre_bbox_sdc_lidar\n', gt_pre_bbox_sdc_lidar)
# print("gt_sdc_bbox\n", gt_sdc_bbox.data.tensor)
# print('gt_fut_bbox_sdc_lidar\n', gt_fut_bbox_sdc_lidar)
# print('sdc_loc_lidar\n', input_dict['sdc_loc_lidar'])
# print('sdc_vel_lidar\n', input_dict['sdc_vel_lidar'])
# print('sdc_rot_lidar\n', input_dict['sdc_rot_lidar'])
# ego trajectory in global coordinate
gt_pre_bbox_sdc_global = info["gt_pre_bbox_sdc_global"] # 1 x 4 x 9
gt_fut_bbox_sdc_global = info["gt_fut_bbox_sdc_global"] # 1 x 12 x 9
# print('gt_pre_bbox_sdc_global\n', gt_pre_bbox_sdc_global)
# print('sdc_loc_global\n', input_dict['sdc_loc_global'])
# print('sdc_vel_global\n', input_dict['sdc_vel_global'])
# print('sdc_rot_global\n', input_dict['sdc_rot_global'])
# print('gt_fut_bbox_sdc_global\n', gt_fut_bbox_sdc_global)
# print(gt_pre_bbox_sdc_lidar.shape)
# print(gt_fut_bbox_sdc_lidar.shape)
# ego trajectory mask
gt_fut_bbox_sdc_mask = info["gt_fut_bbox_sdc_mask"] # 1 x 12 x 1
gt_fut_bbox_sdc_mask = np.repeat(gt_fut_bbox_sdc_mask, 2, axis=2) # 1 x 12 x 2
gt_pre_bbox_sdc_mask = info["gt_pre_bbox_sdc_mask"] # 1 x 4 x 1
gt_pre_bbox_sdc_mask = np.repeat(gt_pre_bbox_sdc_mask, 4, axis=2) # 1 x 4 x 2
gt_pre_command_sdc = info["gt_pre_command_sdc"]
# update output dictionary for ego's prediction
input_dict.update(
dict(
# sdc_vel_lidar_calculated=sdc_vel_lidar_calculated, # (2, )
gt_sdc_bbox=gt_sdc_bbox, # DC (LiDARInstance3DBoxes), xyz, lwh, yaw, vel_x, vel_y, lidar coordinate
gt_sdc_label=gt_sdc_label, # DC (tensor[0])
gt_sdc_fut_traj=gt_fut_bbox_sdc_lidar[
:, :, :2
], # 1 x 12 x 2, lidar coordinate
gt_sdc_fut_traj_mask=gt_fut_bbox_sdc_mask, # 1 x 12 x 2
# planning labels
command=info["command"], # int, change from 1-indexed to 0-indexed
sdc_planning_world=gt_fut_bbox_sdc_global[
:, : self.planning_steps, [0, 1, 2]
], # 1 x 6 x 3, global coordinate, xyz
sdc_planning=gt_fut_bbox_sdc_lidar[
:, : self.planning_steps, [0, 1, 6]
], # 1 x 6 x 3, lidar coordinate, xy-yaw
sdc_planning_mask=gt_fut_bbox_sdc_mask[
:, : self.planning_steps
], # 1 x 6 x 2
sdc_planning_past=info["gt_pre_bbox_sdc_lidar"][:, :, [0, 1, 6]],
sdc_planning_mask_past=gt_pre_bbox_sdc_mask, # 1 x 4 x 2
gt_pre_command_sdc=gt_pre_command_sdc,
sdc_status=sdc_status[[0, 1, 6]]
)
)
return input_dict
def update_ego_planning(self, input_dict, index):
info = self.data_infos[index]
# sdc_planning already added in update_ego_prediction
return input_dict
def vis_dataset_specific(self, data: Dict):
# print(data['pts_filename'])
# zxc
# pts_filename = anno['lidar_path'] # log_name + token
# data_root = f"./data/openscene-v1.1/sensor_blobs/{split}"
# if data_root not in pts_filename:
# pts_filename = os.path.join(data_root, pts_filename)
try:
split = "trainval"
parent_dir = f"./data/openscene-v1.1/sensor_blobs/{split}"
pts_filename: str = os.path.join(parent_dir, data["pts_filename"])
points = PointCloud.parse_from_file(pts_filename).to_pcd_bin2() # 6 x N
except FileNotFoundError:
try:
split = "test"
parent_dir = f"./data/openscene-v1.1/sensor_blobs/{split}"
pts_filename: str = os.path.join(parent_dir, data["pts_filename"])
points = PointCloud.parse_from_file(pts_filename).to_pcd_bin2() # 6 x N
except FileNotFoundError:
split = "mini"
parent_dir = f"./data/openscene-v1.1/sensor_blobs/{split}"
pts_filename: str = os.path.join(parent_dir, data["pts_filename"])
points = PointCloud.parse_from_file(pts_filename).to_pcd_bin2() # 6 x N
points = points[:3].T # N x 3
image_file_list = []
for cam_name in range(8):
# FRONT, FRONT_RIGHT, FRONT_LEFT, BACK, BACK_LEFT, BACK_RIGHT
image_file: str = os.path.join(parent_dir, data["img_filename"][cam_name])
image_file_list.append(image_file)
return {
"lidar": points,
"command_dict": {
0: "TURN LEFT",
1: "KEEP FORWARD",
2: "TURN RIGHT",
3: "UNKNOWN",
},
"image_file_list": image_file_list,
"left_hand": False,
}