R2SE_model / mmdet3d_plugin /datasets /navsim_openscenes_nuplan_diffusiondrive.py
unknownuser6666's picture
Upload folder using huggingface_hub
663494c verified
# ---------------------------------------------------------------------------------#
# UniAD: Planning-oriented Autonomous Driving (https://arxiv.org/abs/2212.10156) #
# Source code: https://github.com/OpenDriveLab/UniAD #
# Copyright (c) OpenDriveLab. All rights reserved. #
# ---------------------------------------------------------------------------------#
import copy
import hashlib
import json
import os
import pickle
import pprint
import random
import tempfile
from os import path as osp
from typing import Dict, List
import sys
from pathlib import Path
from torchvision import transforms
from easydict import EasyDict
import mmcv
import numpy as np
import scipy
import cv2
import torch
from mmcv.parallel import DataContainer as DC
from mmdet3d.core.bbox import LiDARInstance3DBoxes
from mmdet3d_plugin.datasets import NuScenesE2EDataset
from mmdet3d_plugin.datasets.data_utils.rasterize import preprocess_map
from mmdet3d_plugin.datasets.nuplan.nuplan_pointcloud import PointCloud
from mmdet3d_plugin.eval.detection.config import config_factory_nuPlan
from mmdet3d_plugin.uniad.dense_heads.planning_head_plugin.compute_metrics import (
PredictionConfig, compute_metrics)
from mmdet.datasets import DATASETS
from mmdet.datasets.pipelines import to_tensor
from nuscenes import NuScenes
from nuscenes.eval.common.utils import Quaternion, quaternion_yaw
from nuscenes.eval.tracking.evaluate import TrackingEval
from nuscenes.prediction import convert_local_coords_to_global
from PIL import Image
from prettytable import PrettyTable
from mmdet3d_plugin.datasets.data_utils.diffusiondrive_map import DiffusionDriveMap
from copy import deepcopy
import lzma
import pandas as pd
from glob import glob
from tqdm import tqdm
import yaml
from navsim.common.enums import BoundingBoxIndex, LidarIndex
from navsim.common.dataclasses import Lidar, Cameras
import mmdet3d_plugin.datasets.utils.calibration as calib_utils
from navsim.agents.diffusiondrive.transfuser_config import TransfuserConfig
from enum import IntEnum
@DATASETS.register_module()
class NavSimOpenScenesE2EDiffusionDrive(NuScenesE2EDataset):
r"""OpenScenes E2E Dataset"""
CLASSES = (
"vehicle",
"bicycle",
"pedestrian",
"traffic_cone",
"barrier",
"czone_sign",
"generic_object",
)
def __init__(self, *args,
img_root=None,
duplicate=True, load_from_splitted_pkl=False,
split_path=None, test_filter_file=None,
video_pkl=None,
ep_path=None, train_pdm_path=None, test_pdm_path=None,
future_frame_num=0, history_frame_num=3,
finetune_yaml=None, cache_path=None, **kwargs):
# Initialize the parent class
self.img_root = img_root
self.finetune_yaml = finetune_yaml
self.duplicate=duplicate
# specialized variable for WE finetuning
# self.synthetic_pdm = None
self.fail_syn_map_filter = None
self.synthetic_img_root = None
self.full_fail_map_filter = None
self.cache_path = cache_path
self.metric_cache_tokens = None
self.load_from_splitted_pkl = load_from_splitted_pkl
self.future_frame_num = future_frame_num
self.history_frame_num = history_frame_num
self._config = TransfuserConfig()
self.map_root = '/xxx/nuplan/dataset/maps'
self.map_api = DiffusionDriveMap(self._config,
map_root=self.map_root)
# newly added paths:
split_path = '/xxx/navtrain_split'
train_pdm_path = '/xxx/pdm_8192_gt_cache'
test_pdm_path = '/xxx/pdm_8192_gt_cache_navtest'
self.split_path = split_path # path for split yaml
self.train_pdm_path = train_pdm_path # path saving pdm score
self.test_pdm_path = test_pdm_path
self.video_pkl = video_pkl
if self.cache_path is not None:
print(f'loading cache info from:{cache_path}')
self.init_metric_cache_paths(cache_path)
super(NavSimOpenScenesE2EDiffusionDrive, self).__init__(*args, **kwargs)
self.cache_path = cache_path
def init_metric_cache_paths(self, cache_path):
"""
Helper function to load all cache file paths from folder.
:param cache_path: directory of cache folder
:return: dictionary of token and file path
"""
metadata_dir = Path(cache_path) / "metadata"
metadata_file = [file for file in metadata_dir.iterdir() if ".csv" in str(file)][0]
with open(str(metadata_file), "r") as f:
cache_paths = f.read().splitlines()[1:]
self.metric_cache_dict = {cache_path.split("/")[-2]: cache_path for cache_path in cache_paths}
print(f'cache loaded, size {len(self.metric_cache_dict.keys())}')
self.metric_cache_tokens = set(self.metric_cache_dict.keys())
def merge_into_splits(self, ann_file, scene_filter):
all_pkl_file = glob(ann_file + '/*.pkl')
print(f'{len(all_pkl_file)} files found')
data_infos = []
for pkl_file in tqdm(all_pkl_file):
data_tmp = mmcv.load(pkl_file, file_format="pkl")['infos']
scene_filter_expanded = set()
for idx, data_frame in enumerate(data_tmp):
if data_frame['token'] in scene_filter:
start_frame_idx = idx - self.history_frame_num
end_frame_idx = idx + self.future_frame_num
for i in range(start_frame_idx, end_frame_idx + 1):
if i < 0 or i >= len(data_tmp):
continue
scene_filter_expanded.add(data_tmp[i]['token'])
data_save = []
for data_frame in data_tmp:
token = data_frame['token']
if token in scene_filter_expanded:
data_save.append(data_frame)
data_infos.extend(data_save)
return data_infos
def load_annotations(self, ann_file):
# load scene filter
print(ann_file)
if self.test_mode:
print('use navtest filter...')
self.val=True
nav_filter = os.path.join(self.data_root, 'navsim_filter/navtest.yaml')
else:
print('use navtrain filter...')
self.val=False
nav_filter = os.path.join(self.data_root, 'navsim_filter/navtrain.yaml')
if self.use_num_split >0 and ('val' not in ann_file) and (self.finetune_yaml==None):
scene_filter = []
for i in range(5):
if i>=self.use_num_split:
break
path = self.split_path + f'/split_{i}.yaml'
with open(path, 'r') as file:
nav_filter = yaml.safe_load(file)
scene_filter += nav_filter['tokens']
else:
if self.finetune_yaml is not None:
print(f'loading finetuning yaml from:{self.finetune_yaml}')
nav_filter = self.finetune_yaml
with open(nav_filter, 'r') as file:
nav_filter = yaml.safe_load(file)
try:
scene_filter = nav_filter['tokens']
except:
scene_filter = nav_filter['scenario_tokens']
# load annotations:
print('loading dataset!')
if self.load_from_splitted_pkl:
data_infos = self.merge_into_splits(ann_file, scene_filter)
else:
print(ann_file)
data_infos = mmcv.load(ann_file, file_format="pkl")
# data_infos = data_infos['infos']
print('dataset loaded, length:', len(data_infos))
self.scene_filter = set(scene_filter)
if self.video_pkl is not None:
vidmetadata_file = '/xxx/navsim_metric_cache_navtrain/metadata/navsim_metric_cache_navtrain2_metadata_node_0.csv'
with open(str(vidmetadata_file), "r") as f:
vid_cache_paths = f.read().splitlines()[1:]
# only load successful one
metric_cache_filter = [cache_path.split("/")[-2] for cache_path in vid_cache_paths]
print(f'video data loaded, length:', len(metric_cache_filter))
self.scene_filter = self.scene_filter | set(metric_cache_filter)
self.index_map = []
for i, info in enumerate(data_infos):
if info['token'] in self.scene_filter:
if self.test_mode==False:
# in navtest, no effects
valid_len=np.sum(info['gt_fut_bbox_sdc_mask'][0,:8])
if valid_len <8:
continue
self.index_map.append(i)
if self.use_num_split > 0 and self.val==False and self.duplicate==True:
self.index_map = self.duplicate_indices(self.index_map, n=5/self.use_num_split)
print(f'filtering {len(data_infos)} frames to {len(self.index_map)}...')
# use log_token in OpenScene as scene_token.
for info in data_infos:
info['scene_token'] = info['log_token']
self.finetuning_mask = None
return data_infos
def duplicate_indices(self, indices, n):
int_part = int(n) # Get the integer part
frac_part = n - int_part # Get the fractional part
duplicated = indices * int_part # Duplicate the integer part
if frac_part > 0:
num_extra = int(len(indices) * frac_part) # Determine how many extra indices to add
extra_indices = list(np.random.choice(indices, num_extra, replace=False)) # Sample without replacement
duplicated.extend(extra_indices)
return duplicated
def __len__(self):
return len(self.index_map)
def __getitem__(self, idx):
# map nav filter idx to the data_infos idx.
new_idx = self.index_map[idx]
if self.test_mode:
# print('test_data')
return self.prepare_test_data(new_idx)
while True:
# print('train_data')
data = self.prepare_train_data(new_idx)
if data is None:
idx = self._rand_another(idx)
new_idx = self.index_map[idx]
continue
return data
def init_dataset(self):
self.sdc_acc_ego_fix = False
# update detection config to handle evaluation with different classes
self.eval_detection_configs = config_factory_nuPlan()
self.default_attr = {
"vehicle": "vehicle.parked",
"bicycle": "cycle.without_rider",
"pedestrian": "pedestrian.moving",
"traffic_cone": "",
"barrier": "",
"czone_sign": "",
"generic_object": "",
}
return
# def prepare_train_data(self, index):
# return self.prepare_test_data(index)
def prepare_test_data_old(self, index):
"""
Training data preparation.
Args:
index (int): Index for accessing the target data.
Returns:
dict: Training data dict of the corresponding index.
img: queue_length, 6, 3, H, W
img_metas: img_metas of each frame (list)
gt_globals_3d: gt_globals of each frame (list)
gt_bboxes_3d: gt_bboxes of each frame (list)
gt_inds: gt_inds of each frame (list)
"""
data_queue = []
self.enbale_temporal_aug = False
# ensure the first and final frame in same scene
final_index = index
first_index = index - self.queue_length + 1
if first_index < 0:
return None
if (
self.data_infos[first_index]["scene_token"]
!= self.data_infos[final_index]["scene_token"]
):
print(index, 2)
return None
# current timestamp
if self.cache_path is not None:
if self.data_infos[index]["token"] not in self.metric_cache_tokens:
print(f'Token: {self.data_infos[index]["scene_token"]} not in Metric cache')
return None
input_dict = self.get_data_info(final_index)
### get finetuning:
if self.finetuning_mask is not None:
input_dict['fail_mask'] = self.finetuning_mask[self.reverse_index_map[index]]
prev_indexs_list = list(reversed(range(first_index, final_index)))
if input_dict is None:
print(index, index in self.fail_syn_map_filter ,3)
return None
frame_idx = input_dict["frame_idx"]
scene_token = input_dict["scene_token"]
self.pre_pipeline(input_dict)
example = self.pipeline(input_dict)
data_queue.insert(0, example)
########## retrieve previous infos, frame by frame
for i in prev_indexs_list:
input_dict = self.get_data_info(i, prev_frame=True)
if self.finetuning_mask is not None:
input_dict['fail_mask'] = self.finetuning_mask[self.reverse_index_map[index]]
if input_dict is None:
print(index, index in self.fail_syn_map_filter, 4)
return None
if (
input_dict["frame_idx"] < frame_idx
and input_dict["scene_token"] == scene_token
):
self.pre_pipeline(input_dict)
example = self.pipeline(input_dict)
frame_idx = input_dict["frame_idx"]
data_queue.insert(0, copy.deepcopy(example))
# merge a sequence of data into one dictionary, only for temporal data
if self.val or self.test_mode:
data_queue = self.union2one_test(data_queue)
else:
data_queue = self.union2one(data_queue, self.data_infos[index]["token"] )
return data_queue
def prepare_sdc_gt_planning(self, index):
sdc_plan = []
info = self.data_infos[index]
if self.video_pkl is not None:
# directly use lidar:
gt_fut_bbox_sdc_lidar = info["gt_fut_bbox_sdc_lidar"][:, : self.planning_steps, [0, 1, 6]]
gt_fut_bbox_sdc_lidar = gt_fut_bbox_sdc_lidar[0]
return gt_fut_bbox_sdc_lidar
global2lidar = np.linalg.inv(info["lidar2global"] ) # 4 x 4
for i in range(8):
fut_info = self.data_infos[index + 1 + i]
############### get future of the sdc box in lidar coordinate
sdc_loc_global, _ = calib_utils.transform_matrix_to_vector(
fut_info["ego2global"]
)
sdc_loc_global = np.concatenate((sdc_loc_global, np.array([1]))) # 4
sdc_loc_lidar = global2lidar @ sdc_loc_global # 4
sdc_trans_lidar = (
global2lidar @ fut_info["ego2global"]
) # 4 x 4
yaw_lidar = scipy.spatial.transform.Rotation.from_matrix(
sdc_trans_lidar[:3, :3]
).as_euler("xyz", degrees=False)[-1]
sdc_plan.append([sdc_loc_lidar[0], sdc_loc_lidar[1], yaw_lidar])
sdc_plan = np.stack(sdc_plan, axis=0)
return sdc_plan
def prepare_train_data(self, index):
info = self.data_infos[index]
# future log-token check 8 frames:
if index+8>len(self.data_infos):
print(f'token:{index} exceed in future scene')
return None
match_flag = all(self.data_infos[index+1+j]['log_token'] == info['log_token'] for j in range(8))
if match_flag==False and (self.video_pkl is None):
print(f'token:{index} not match in future scene')
return None
#prepare inputs;
features = self.prepare_test_data(index)
#prepare labels:
label_features = self.prepare_label(index)
features.update(label_features)
features['img_metas'] = DC(deepcopy(info['token']), cpu_only=True)
if self.cache_path is not None:
features["metric_cache"] = DC(self.get_metric_cache(info['token']), cpu_only=True)
return features
def prepare_label(self, index):
info = self.data_infos[index]
sdc_plan = self.prepare_sdc_gt_planning(index)
trajectory = torch.from_numpy(sdc_plan)
agent_states, agent_labels = self._compute_agent_targets(index)
bev_semantic_map = self.map_api.compute_bev_semantic_map(info)
return {
"trajectory": trajectory,
"agent_states": agent_states,
"agent_labels": agent_labels,
"bev_semantic_map": bev_semantic_map,
}
def _compute_agent_targets(self, index):
"""
Extracts 2D agent bounding boxes in ego coordinates
:param annotations: annotation dataclass
:return: tuple of bounding box values and labels (binary)
"""
info = self.data_infos[index]
max_agents = self._config.num_bounding_boxes
agent_states_list = []
def _xy_in_lidar(x: float, y: float, config: TransfuserConfig) -> bool:
return (config.lidar_min_x <= x <= config.lidar_max_x) and (config.lidar_min_y <= y <= config.lidar_max_y)
for box, name in zip(info['anns']["gt_boxes"], info['anns']["gt_names"]):
box_x, box_y, box_heading, box_length, box_width = (
box[BoundingBoxIndex.X],
box[BoundingBoxIndex.Y],
box[BoundingBoxIndex.HEADING],
box[BoundingBoxIndex.LENGTH],
box[BoundingBoxIndex.WIDTH],
)
if name == "vehicle" and _xy_in_lidar(box_x, box_y, self._config):
agent_states_list.append(np.array([box_x, box_y, box_heading, box_length, box_width], dtype=np.float32))
agents_states_arr = np.array(agent_states_list)
# filter num_instances nearest
agent_states = np.zeros((max_agents, BoundingBox2DIndex.size()), dtype=np.float32)
agent_labels = np.zeros(max_agents, dtype=bool)
if len(agents_states_arr) > 0:
distances = np.linalg.norm(agents_states_arr[..., BoundingBox2DIndex.POINT], axis=-1)
argsort = np.argsort(distances)[:max_agents]
# filter detections
agents_states_arr = agents_states_arr[argsort]
agent_states[: len(agents_states_arr)] = agents_states_arr
agent_labels[: len(agents_states_arr)] = True
return torch.tensor(agent_states), torch.tensor(agent_labels)
def prepare_test_data(self, index):
"""Inherited, see superclass."""
info = self.data_infos[index]
features = {}
features['token'] = DC(deepcopy(info['token']), cpu_only=True)
features["camera_feature"] = self._get_camera_feature(info)
features["lidar_feature"] = self._get_lidar_feature(info)
# cmd = np.zeros(4)
# cmd[info['command']]=1
# sdc_acc_global = info['can_bus'][[7, 8, 9]] # 3
# sdc_acc_lidar: np.ndarray = (
# np.linalg.inv(info['lidar2global'][:3, :3]) @ sdc_acc_global
# ) # 3
features["status_feature"] = torch.cat(
[
torch.tensor(info['driving_command'], dtype=torch.float32),
torch.tensor(info["ego_dynamic_state"][:2], dtype=torch.float32), #vxvy
torch.tensor(info["ego_dynamic_state"][2:], dtype=torch.float32),
],
)
return features
def _get_camera_feature(self, info) -> torch.Tensor:
"""
Extract stitched camera from AgentInput
:param agent_input: input dataclass
:return: stitched front view image as torch tensor
"""
cameras = Cameras.from_camera_dict(
sensor_blobs_path=Path(self.img_root),
camera_dict=info["cams"],
sensor_names=['cam_l0', 'cam_f0', 'cam_r0'],
)
# Crop to ensure 4:1 aspect ratio
l0 = cameras.cam_l0.image[28:-28, 416:-416]
f0 = cameras.cam_f0.image[28:-28]
r0 = cameras.cam_r0.image[28:-28, 416:-416]
# stitch l0, f0, r0 images
stitched_image = np.concatenate([l0, f0, r0], axis=1)
resized_image = cv2.resize(stitched_image, (1024, 256))
# resized_image = cv2.resize(stitched_image, (448, 448))
# resized_image = cv2.resize(stitched_image, (2048, 512))
tensor_image = transforms.ToTensor()(resized_image)
# tensor_image = transforms.Normalize(
# mean=(0.5, 0.5, 0.5), # Normalize (often 0.5 or ImageNet stats)
# std=(0.5, 0.5, 0.5)
# )(tensor_image)
return tensor_image
def _get_lidar_feature(self, info) -> torch.Tensor:
"""
Compute LiDAR feature as 2D histogram, according to Transfuser
:param agent_input: input dataclass
:return: LiDAR histogram as torch tensors
"""
lidar_path = info['lidar_path']
lidar = Lidar.from_paths(self.img_root , lidar_path, "lidar_pc")
# only consider (x,y,z) & swap axes for (N,3) numpy array
lidar_pc = lidar.lidar_pc[LidarIndex.POSITION].T
# NOTE: Code from
# https://github.com/autonomousvision/carla_garage/blob/main/team_code/data.py#L873
# self._config = EasyDict(dict(
# lidar_min_x= -32,
# lidar_max_x= 32,
# lidar_min_y= -32,
# lidar_max_y= 32,
# hist_max_per_pixel = 5,
# pixels_per_meter=4.0,
# max_height_lidar=100.0,
# lidar_split_height=0.2,
# use_ground_plane=False,
# ) )
def splat_points(point_cloud):
# 256 x 256 grid
xbins = np.linspace(
self._config.lidar_min_x,
self._config.lidar_max_x,
(self._config.lidar_max_x - self._config.lidar_min_x) * int(self._config.pixels_per_meter) + 1,
)
ybins = np.linspace(
self._config.lidar_min_y,
self._config.lidar_max_y,
(self._config.lidar_max_y - self._config.lidar_min_y) * int(self._config.pixels_per_meter) + 1,
)
hist = np.histogramdd(point_cloud[:, :2], bins=(xbins, ybins))[0]
hist[hist > self._config.hist_max_per_pixel] = self._config.hist_max_per_pixel
overhead_splat = hist / self._config.hist_max_per_pixel
return overhead_splat
# Remove points above the vehicle
lidar_pc = lidar_pc[lidar_pc[..., 2] < self._config.max_height_lidar]
below = lidar_pc[lidar_pc[..., 2] <= self._config.lidar_split_height]
above = lidar_pc[lidar_pc[..., 2] > self._config.lidar_split_height]
above_features = splat_points(above)
if self._config.use_ground_plane:
below_features = splat_points(below)
features = np.stack([below_features, above_features], axis=-1)
else:
features = np.stack([above_features], axis=-1)
features = np.transpose(features, (2, 0, 1)).astype(np.float32)
return torch.from_numpy(features)
def get_metric_cache(self, token):
with lzma.open(self.metric_cache_dict[token], "rb") as f:
metric_cache = pickle.load(f)
return metric_cache
def union2one(self, queue, token):
"""
convert a sequence of the sample dict into one single sample.
"""
# sensor data and transform
imgs_list: List[torch.Tensor] = [
each["img"].data for each in queue
] # L, C x 3 x H x W
l2g_r_mat_list: List[torch.Tensor] = [
to_tensor(each["l2g_r_mat"]) for each in queue
] # L, 3 x 3
l2g_t_list: List[torch.Tensor] = [
to_tensor(each["l2g_t"]) for each in queue
] # L, 3
timestamp_list = [to_tensor(each["timestamp"]) for each in queue] # L, 1
# converting the global absolute coordinate into relative position and orientation change
metas_map = {}
prev_pos = None
prev_angle = None
for i, each in enumerate(queue):
metas_map[i] = each["img_metas"].data
if i == 0:
metas_map[i]["prev_bev"] = False
prev_pos = copy.deepcopy(metas_map[i]["can_bus"][:3])
prev_angle = copy.deepcopy(metas_map[i]["can_bus"][-1])
metas_map[i]["can_bus"][:3] = 0
metas_map[i]["can_bus"][-1] = 0
else:
metas_map[i]["prev_bev"] = True
tmp_pos = copy.deepcopy(metas_map[i]["can_bus"][:3])
tmp_angle = copy.deepcopy(metas_map[i]["can_bus"][-1])
metas_map[i]["can_bus"][:3] -= prev_pos
metas_map[i]["can_bus"][-1] -= prev_angle
prev_pos = copy.deepcopy(tmp_pos)
prev_angle = copy.deepcopy(tmp_angle)
# print(metas_map[i]["can_bus"][:3])
queue[-1]["img"] = DC(
torch.stack(imgs_list), cpu_only=False, stack=True
) # T x N x 3 x H x W
queue[-1]["img_metas"] = DC(metas_map, cpu_only=True)
# inherent all the labels for the current frame only
queue = queue[-1]
# merge all other labels that requires temporal and merge from queue
if self.cache_path is not None:
queue["metric_cache"] = DC(self.get_metric_cache(token), cpu_only=True)
queue["l2g_r_mat"] = DC(l2g_r_mat_list)
queue["l2g_t"] = DC(l2g_t_list)
queue["timestamp"] = DC(timestamp_list)
return queue
def union2one_test(self, queue):
"""
convert a sequence of the sample dict into one single sample.
"""
# sensor data and transform
imgs_list: List[torch.Tensor] = [
each["img"][0].data for each in queue
] # L, C x 3 x H x W
l2g_r_mat_list: List[torch.Tensor] = [
to_tensor(each["l2g_r_mat"]) for each in queue
] # L, 3 x 3
l2g_t_list: List[torch.Tensor] = [
to_tensor(each["l2g_t"]) for each in queue
] # L, 3
timestamp_list = [to_tensor(each["timestamp"]) for each in queue] # L, 1
# class category labels for classification, e.g., 0 for Car
# gt_labels_3d_list: List[torch.Tensor] = [
# each["gt_labels_3d"].data for each in queue
# ] # L, N
# gt_sdc_label_list: List[torch.Tensor] = [
# each["gt_sdc_label"].data for each in queue
# ] # L, 1
# # global ID for tracking
# gt_inds_list = [to_tensor(each["gt_inds"]) for each in queue] # L, N
# # detection
# gt_bboxes_3d_list: List[LiDARInstance3DBoxes] = [
# each["gt_bboxes_3d"].data for each in queue
# ] # L, N x 9
# trajectory prediction
# gt_past_traj_list = [
# to_tensor(each["gt_past_traj"]) for each in queue
# ] # L, N x 8 x 2
# gt_past_traj_mask_list = [
# to_tensor(each["gt_past_traj_mask"]) for each in queue
# ] # L, N x 8 x 2
# gt_fut_traj = to_tensor(queue[-1]["gt_fut_traj"]) # L, N x 12 x 2
# gt_fut_traj_mask = to_tensor(queue[-1]["gt_fut_traj_mask"]) # L, N x 12 x 2
# planning
# gt_sdc_bbox_list: List[LiDARInstance3DBoxes] = [
# each["gt_sdc_bbox"].data for each in queue
# ] # L, 1 x 9
# gt_sdc_fut_traj = to_tensor(queue[-1]["gt_sdc_fut_traj"]) # L, 1 x 12 x 2
# gt_sdc_fut_traj_mask = to_tensor(
# queue[-1]["gt_sdc_fut_traj_mask"]
# ) # L, 1 x 12 x 2
# gt_future_boxes_list = queue[-1]["gt_future_boxes"] # 7, N x 9
# gt_future_labels_list = [
# to_tensor(each) for each in queue[-1]["gt_future_labels"]
# ] # 7, N* (could be larger than N), only those valid are >=0, others are -1
# converting the global absolute coordinate into relative position and orientation change
metas_map = {}
prev_pos = None
prev_angle = None
for i, each in enumerate(queue):
metas_map[i] = each["img_metas"][0].data
if i == 0:
metas_map[i]["prev_bev"] = False
prev_pos = copy.deepcopy(metas_map[i]["can_bus"][:3])
prev_angle = copy.deepcopy(metas_map[i]["can_bus"][-1])
metas_map[i]["can_bus"][:3] = 0
metas_map[i]["can_bus"][-1] = 0
else:
metas_map[i]["prev_bev"] = True
tmp_pos = copy.deepcopy(metas_map[i]["can_bus"][:3])
tmp_angle = copy.deepcopy(metas_map[i]["can_bus"][-1])
metas_map[i]["can_bus"][:3] -= prev_pos
metas_map[i]["can_bus"][-1] -= prev_angle
prev_pos = copy.deepcopy(tmp_pos)
prev_angle = copy.deepcopy(tmp_angle)
# print(metas_map[i]["can_bus"][:3])
queue[-1]["img"] = DC(
torch.stack(imgs_list), cpu_only=False, stack=True
) # T x N x 3 x H x W
queue[-1]["img_metas"] = DC(metas_map, cpu_only=True)
# inherent all the labels for the current frame only
queue = queue[-1]
# merge all other labels that requires temporal and merge from queue
# queue["gt_labels_3d"] = DC(gt_labels_3d_list)
# queue["gt_sdc_label"] = DC(gt_sdc_label_list)
# queue["gt_inds"] = DC(gt_inds_list)
# queue["gt_bboxes_3d"] = DC(gt_bboxes_3d_list, cpu_only=True)
# queue["gt_sdc_bbox"] = DC(gt_sdc_bbox_list, cpu_only=True)
queue["l2g_r_mat"] = DC(l2g_r_mat_list)
queue["l2g_t"] = DC(l2g_t_list)
queue["timestamp"] = DC(timestamp_list)
# queue["gt_fut_traj"] = DC(gt_fut_traj)
# queue["gt_fut_traj_mask"] = DC(gt_fut_traj_mask)
# queue["gt_past_traj"] = DC(gt_past_traj_list)
# queue["gt_past_traj_mask"] = DC(gt_past_traj_mask_list)
# queue["gt_future_boxes"] = DC(gt_future_boxes_list, cpu_only=True)
# queue["gt_future_labels"] = DC(gt_future_labels_list)
return queue
def init_trackpredplan(
self,
predict_steps,
planning_steps,
past_steps,
fut_steps,
use_nonlinear_optimizer,
):
# trajectory APIs for tracking/prediction/planning
self.predict_steps = predict_steps
self.planning_steps = planning_steps
self.past_steps = past_steps
self.fut_steps = fut_steps
self.use_nonlinear_optimizer = use_nonlinear_optimizer
# eval for planning
# self.helper = self.traj_api.predict_helper
# # self.config = load_prediction_config(self.helper, "./planning.json")
# # Load config file and deserialize it.
# if self.test_mode:
# # planning_map_config_file = "planning_test.json"
# planning_map_config_file = "planning_all.json"
# else:
# planning_map_config_file = "planning_trainval.json"
# with open(planning_map_config_file, "r") as f:
# planning_map_config = json.load(f)
# self.planning_map_config = PredictionConfig.deserialize(
# planning_map_config, self.helper
# )
def init_occupancy(
self,
occ_receptive_field,
occ_n_future,
occ_filter_invalid_sample,
occ_filter_by_valid_flag,
):
self.occ_receptive_field = occ_receptive_field # past + current
self.occ_n_future = occ_n_future # future only
self.occ_filter_invalid_sample = occ_filter_invalid_sample
self.occ_filter_by_valid_flag = occ_filter_by_valid_flag
self.occ_only_total_frames = 7 # NOTE: hardcode, not influenced by planning
assert self.occ_filter_by_valid_flag is False
def get_pdm_score_info(self, input_dict, index=None):
if self.val:
base_path = self.test_pdm_path
if self.full_fail_map_filter:
if index in self.full_fail_map_filter:
base_path = self.train_pdm_path
else:
base_path = self.train_pdm_path
if self.fail_syn_map_filter:
if index in self.fail_syn_map_filter:
base_path = self.synthetic_pdm
log_name = input_dict['log_name'].split('-')[0] + '-' + input_dict['log_name'].split('-')[1]
else:
log_name = input_dict['log_name']
load_path = base_path + '/' + log_name + '/'
type_list = os.listdir(load_path)
for t_name in type_list:
buf_dir = t_name+ '/' +input_dict['sample_idx']
if os.path.exists(buf_dir):
break
load_path = load_path + buf_dir + '/pdm_cache.pkl'
# try:
with lzma.open(load_path, "rb") as f:
data = pickle.load(f).trajectory
input_dict.update(data)
return input_dict
def get_zero_pdm(self, input_dict):
pdm_list = [
"no_at_fault_collisions",
"drivable_area_compliance",
"ego_progress",
"time_to_collision_within_bound",
"comfort",
"score",
]
for k in pdm_list:
input_dict[k] = np.zeros((8192,))
return input_dict
def get_data_info(self, index, info=None, debug=False, prev_frame=False):
"""Get data info according to the given index.
Args:
index (int): Index of the sample data to get.
Returns:
dict: Data information that will be passed to the data \
preprocessing pipelines. It includes the following keys:
- sample_idx (str): Sample index.
- pts_filename (str): Filename of point clouds.
- sweeps (list[dict]): Infos of sweeps.
- timestamp (float): Sample timestamp.
- img_filename (str, optional): Image filename.
- lidar2img (list[np.ndarray], optional): Transformations \
from lidar to different cameras.
- ann_info (dict): Annotation info.
"""
if info is None:
info = self.data_infos[index]
# standard protocal modified from SECOND.Pytorch
input_dict = dict(
sample_idx=info["token"], # str: OpenScenes unique sample token
frame_idx=info["frame_idx"], # int: 0-indexed frame IDs
timestamp=info["timestamp"] / 1e6, # int: OpenScenes unique time index
log_name=info["log_name"], # str: OpenScenes unique sample token
log_token=info["log_token"], # str: OpenScenes unique sample token
scene_name=info["scene_name"], # str: OpenScenes sequence name
scene_token=info["scene_token"], # str: OpenScenes sequence name
pts_filename=info["lidar_path"], # str: relative path for the lidar data
# sweeps=info["sweeps"], # List[Dict]: list of infos for sweep data
prev=info["sample_prev"], # str: OpenScenes unique sample token
next=info["sample_next"], # str: OpenScenes unique sample token
)
input_dict = self.update_transform(input_dict=input_dict, index=index)
input_dict = self.update_sensor(input_dict=input_dict, index=index)
input_dict = self.update_canbus(input_dict=input_dict, index=index)
if prev_frame==False:
# only count for curr frame PDMScore
input_dict = self.get_pdm_score_info(input_dict, index)
else:
input_dict = self.get_zero_pdm(input_dict)
# generate annotation for detection/tracking, putting them in the annotation so that
# we could do range filtering altogether in the transform_3d.py
input_dict["ann_info"] = self.get_ann_info(index)
# # all data ultimately needs to be in the input_dict for downstream heads
# input_dict = self.update_mapping(input_dict=input_dict, index=index)
# input_dict = self.update_occupancy(input_dict=input_dict, index=index)
input_dict = self.update_ego_prediction(input_dict=input_dict, index=index)
input_dict = self.update_ego_planning(input_dict=input_dict, index=index)
# visualization for debugging
debug = False
if debug:
self.vis_data_per_frame(
data=input_dict,
index=index,
vis_root="data/viz_nuplan",
)
return input_dict
def update_anno_token(self, annotation, index):
# add OpenScenes-specific token information
return annotation
def update_tracking_prediction(self, annotation, index):
# reusing from the Carla dataloader
info = self.data_infos[index]
# get trajectories for surrounding agents, lidar coordinate, 2Hz
gt_fut_bbox_lidar = info["gt_fut_bbox_lidar"][annotation["mask"]] # N x 12 x 9
gt_fut_bbox_mask = info["gt_fut_bbox_mask"][annotation["mask"]] # N x 12 x 1
gt_pre_bbox_lidar = info["gt_pre_bbox_lidar"][annotation["mask"]] # N x 4 x 9
gt_pre_bbox_mask = info["gt_pre_bbox_mask"][annotation["mask"]] # N x 4 x 1
gt_pre_bbox_lidar = np.flip(gt_pre_bbox_lidar, axis=1) # N x 4 x 9
gt_pre_bbox_mask = np.flip(gt_pre_bbox_mask, axis=1) # N x 4 x 1
# print('\nreverse')
# print(gt_pre_bbox_lidar[3])
# repeat redundant channels for masks
gt_fut_bbox_mask = np.repeat(gt_fut_bbox_mask, 2, axis=2) # N x 12 x 2
gt_pre_bbox_mask = np.repeat(gt_pre_bbox_mask, 2, axis=2) # N x 4 x 2
# append 4 future frames of trajectories
# so now, it contains frames in the order of -1, -2, -3, -4, 1, 2, 3, 4
gt_pre_bbox_lidar = np.concatenate(
(gt_pre_bbox_lidar, gt_fut_bbox_lidar[:, :4]), axis=1
) # N x 8 x 9
gt_pre_bbox_mask = np.concatenate(
(gt_pre_bbox_mask, gt_fut_bbox_mask[:, :4]), axis=1
) # N x 8 x 2
# print('\nput together')
# print(gt_pre_bbox_lidar[3])
# update output dictionary
annotation.update(
dict(
gt_fut_traj=gt_fut_bbox_lidar[:, :, :2], # N x 12 x 2, lidar coordinate
gt_fut_traj_mask=gt_fut_bbox_mask, # N x 12 x 2
gt_past_traj=gt_pre_bbox_lidar[:, :, :2], # N x 8 x 2
gt_past_traj_mask=gt_pre_bbox_mask, # N x 8 x 2
)
)
return annotation
def update_mapping(self, input_dict, index):
# get polyline in the ego coordinate
polyline_dict, drivable_area = self.update_map_vectorized(input_dict, index)
input_dict = self.update_map_rasterized(input_dict, index, polyline_dict, drivable_area)
return input_dict
def update_map_vectorized(self, input_dict, index):
info = self.data_infos[index]
map_location = info['map_location']
e2g_translation = info['ego2global_translation']
e2g_rotation = info['ego2global_rotation']
if len(e2g_rotation) != 4:
e2g_rotation = Quaternion._from_matrix(e2g_rotation).elements
polyline_dict = self.vector_map.gen_vectorized_samples(e2g_translation, e2g_rotation, map_location)
drivable_area = self.vector_map.gen_drivable_area(e2g_translation, e2g_rotation, map_location)
return polyline_dict, drivable_area
def update_map_rasterized(self, input_dict, index, polyline_dict, drivable_area, debug=False):
gt_labels = []
gt_bboxes = []
gt_masks = []
origin = np.array([self.canvas_size[1] // 2, self.canvas_size[0] // 2])
scale = np.array([self.canvas_size[1] / self.patch_size[0], self.canvas_size[0] / self.patch_size[1]])
for polyline in polyline_dict:
if polyline["type"] == 0:
continue
# skip centerline
map_mask = np.zeros(self.canvas_size, np.uint8)
draw_coor = (polyline["pts"] * scale + origin).round().astype(np.int32)
gt_mask = cv2.polylines(map_mask, [draw_coor], False, color=1, thickness=self.thickness) / 255
ys, xs = np.where(gt_mask)
try:
gt_bbox = [min(xs), min(ys), max(xs), max(ys)]
except ValueError:
gt_bbox = [0, 0, 1, 1]
continue
cls = polyline["type"]
gt_labels.append(cls)
gt_bboxes.append(gt_bbox)
gt_masks.append(gt_mask)
# for stuff class, drivable area
map_mask = np.zeros(self.canvas_size, np.uint8)
exteriors = []
interiors = []
for p in drivable_area.geoms:
exteriors.append(
(np.array(p.exterior.coords) * scale + origin).round().astype(np.int32)
)
for inter in p.interiors:
interiors.append(
(np.array(inter.coords) * scale + origin).round().astype(np.int32)
)
cv2.fillPoly(map_mask, exteriors, 255)
cv2.fillPoly(map_mask, interiors, 0)
map_mask = map_mask / 255
try:
ys, xs = np.where(map_mask)
gt_bbox = [min(xs), min(ys), max(xs), max(ys)]
gt_labels.append(self.map_num_classes)
gt_bboxes.append(gt_bbox)
gt_masks.append(map_mask)
except ValueError:
pass
gt_labels = torch.tensor(gt_labels)
gt_bboxes = torch.from_numpy(np.stack(gt_bboxes))
gt_masks = torch.from_numpy(np.stack(gt_masks)) # N x H x W
# update output dictionary
input_dict.update(
dict(
gt_lane_labels=gt_labels,
gt_lane_bboxes=gt_bboxes,
gt_lane_masks=gt_masks,
)
)
return input_dict
def update_ego_prediction(self, input_dict, index):
# in replacement of both update_ego_prediction and update_ego_planning
# reusing from the Carla dataloader
info = self.data_infos[index]
# print('\n\nprint out update_ego_prediction now\n')
# retrieve ego bounding box in the current frame in lidar coordinate
gt_sdc_bbox = info["gt_sdc_bbox_lidar"][:9].reshape((1, -1)) # 1 x 9
gt_sdc_bbox = LiDARInstance3DBoxes(
gt_sdc_bbox, box_dim=gt_sdc_bbox.shape[-1], origin=(0.5, 0.5, 0)
).convert_to(self.box_mode_3d)
gt_sdc_bbox = DC(gt_sdc_bbox, cpu_only=True)
# retrieve ego class label, default it car
gt_sdc_label = np.array([0])
gt_sdc_label = DC(to_tensor(gt_sdc_label))
sdc_status=info["gt_sdc_bbox_lidar"][:9]
# ego trajectory in lidar coordinate
gt_pre_bbox_sdc_lidar = info["gt_pre_bbox_sdc_lidar"] # 1 x 4 x 9
gt_fut_bbox_sdc_lidar = info["gt_fut_bbox_sdc_lidar"] # 1 x 12 x 9
gt_pre_bbox_sdc_global = info["gt_pre_bbox_sdc_global"] # 1 x 4 x 9
gt_fut_bbox_sdc_global = info["gt_fut_bbox_sdc_global"] # 1 x 12 x 9
gt_fut_bbox_sdc_mask = info["gt_fut_bbox_sdc_mask"] # 1 x 12 x 1
gt_fut_bbox_sdc_mask = np.repeat(gt_fut_bbox_sdc_mask, 2, axis=2) # 1 x 12 x 2
gt_pre_bbox_sdc_mask = info["gt_pre_bbox_sdc_mask"] # 1 x 4 x 1
gt_pre_bbox_sdc_mask = np.repeat(gt_pre_bbox_sdc_mask, 4, axis=2) # 1 x 4 x 2
gt_pre_command_sdc = info["gt_pre_command_sdc"]
# update output dictionary for ego's prediction
input_dict.update(
dict(
# sdc_vel_lidar_calculated=sdc_vel_lidar_calculated, # (2, )
gt_sdc_bbox=gt_sdc_bbox, # DC (LiDARInstance3DBoxes), xyz, lwh, yaw, vel_x, vel_y, lidar coordinate
gt_sdc_label=gt_sdc_label, # DC (tensor[0])
gt_sdc_fut_traj=gt_fut_bbox_sdc_lidar[
:, :, :2
], # 1 x 12 x 2, lidar coordinate
gt_sdc_fut_traj_mask=gt_fut_bbox_sdc_mask, # 1 x 12 x 2
# planning labels
command=info["command"], # int, change from 1-indexed to 0-indexed
sdc_planning_world=gt_fut_bbox_sdc_global[
:, : self.planning_steps, [0, 1, 2]
], # 1 x 6 x 3, global coordinate, xyz
sdc_planning=gt_fut_bbox_sdc_lidar[
:, : self.planning_steps, [0, 1, 6]
], # 1 x 6 x 3, lidar coordinate, xy-yaw
sdc_planning_mask=gt_fut_bbox_sdc_mask[
:, : self.planning_steps
], # 1 x 6 x 2
sdc_planning_past=info["gt_pre_bbox_sdc_lidar"][:, :, [0, 1, 6]],
sdc_planning_mask_past=gt_pre_bbox_sdc_mask, # 1 x 4 x 2
gt_pre_command_sdc=gt_pre_command_sdc,
sdc_status=sdc_status[[0, 1, 6]]
)
)
return input_dict
def update_ego_planning(self, input_dict, index):
info = self.data_infos[index]
# sdc_planning already added in update_ego_prediction
return input_dict
def vis_dataset_specific(self, data: Dict):
try:
split = "trainval"
parent_dir = f"./data/openscene-v1.1/sensor_blobs/{split}"
pts_filename: str = os.path.join(parent_dir, data["pts_filename"])
points = PointCloud.parse_from_file(pts_filename).to_pcd_bin2() # 6 x N
except FileNotFoundError:
try:
split = "test"
parent_dir = f"./data/openscene-v1.1/sensor_blobs/{split}"
pts_filename: str = os.path.join(parent_dir, data["pts_filename"])
points = PointCloud.parse_from_file(pts_filename).to_pcd_bin2() # 6 x N
except FileNotFoundError:
split = "mini"
parent_dir = f"./data/openscene-v1.1/sensor_blobs/{split}"
pts_filename: str = os.path.join(parent_dir, data["pts_filename"])
points = PointCloud.parse_from_file(pts_filename).to_pcd_bin2() # 6 x N
points = points[:3].T # N x 3
image_file_list = []
for cam_name in range(8):
# FRONT, FRONT_RIGHT, FRONT_LEFT, BACK, BACK_LEFT, BACK_RIGHT
image_file: str = os.path.join(parent_dir, data["img_filename"][cam_name])
image_file_list.append(image_file)
return {
"lidar": points,
"command_dict": {
0: "TURN LEFT",
1: "KEEP FORWARD",
2: "TURN RIGHT",
3: "UNKNOWN",
},
"image_file_list": image_file_list,
"left_hand": False,
}
class BoundingBox2DIndex(IntEnum):
"""Intenum for bounding boxes in TransFuser."""
_X = 0
_Y = 1
_HEADING = 2
_LENGTH = 3
_WIDTH = 4
@classmethod
def size(cls):
valid_attributes = [
attribute
for attribute in dir(cls)
if attribute.startswith("_") and not attribute.startswith("__") and not callable(getattr(cls, attribute))
]
return len(valid_attributes)
@classmethod
@property
def X(cls):
return cls._X
@classmethod
@property
def Y(cls):
return cls._Y
@classmethod
@property
def HEADING(cls):
return cls._HEADING
@classmethod
@property
def LENGTH(cls):
return cls._LENGTH
@classmethod
@property
def WIDTH(cls):
return cls._WIDTH
@classmethod
@property
def POINT(cls):
# assumes X, Y have subsequent indices
return slice(cls._X, cls._Y + 1)
@classmethod
@property
def STATE_SE2(cls):
# assumes X, Y, HEADING have subsequent indices
return slice(cls._X, cls._HEADING + 1)