|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import copy |
|
|
import hashlib |
|
|
import json |
|
|
import os |
|
|
import pickle |
|
|
import pprint |
|
|
import random |
|
|
import tempfile |
|
|
from os import path as osp |
|
|
from typing import Dict, List |
|
|
import sys |
|
|
from pathlib import Path |
|
|
from torchvision import transforms |
|
|
from easydict import EasyDict |
|
|
|
|
|
import mmcv |
|
|
import numpy as np |
|
|
import scipy |
|
|
import cv2 |
|
|
import torch |
|
|
from mmcv.parallel import DataContainer as DC |
|
|
from mmdet3d.core.bbox import LiDARInstance3DBoxes |
|
|
from mmdet3d_plugin.datasets import NuScenesE2EDataset |
|
|
from mmdet3d_plugin.datasets.data_utils.rasterize import preprocess_map |
|
|
from mmdet3d_plugin.datasets.nuplan.nuplan_pointcloud import PointCloud |
|
|
from mmdet3d_plugin.eval.detection.config import config_factory_nuPlan |
|
|
from mmdet3d_plugin.uniad.dense_heads.planning_head_plugin.compute_metrics import ( |
|
|
PredictionConfig, compute_metrics) |
|
|
from mmdet.datasets import DATASETS |
|
|
from mmdet.datasets.pipelines import to_tensor |
|
|
from nuscenes import NuScenes |
|
|
from nuscenes.eval.common.utils import Quaternion, quaternion_yaw |
|
|
from nuscenes.eval.tracking.evaluate import TrackingEval |
|
|
from nuscenes.prediction import convert_local_coords_to_global |
|
|
from PIL import Image |
|
|
from prettytable import PrettyTable |
|
|
|
|
|
from mmdet3d_plugin.datasets.data_utils.diffusiondrive_map import DiffusionDriveMap |
|
|
from copy import deepcopy |
|
|
|
|
|
import lzma |
|
|
import pandas as pd |
|
|
from glob import glob |
|
|
from tqdm import tqdm |
|
|
import yaml |
|
|
|
|
|
from navsim.common.enums import BoundingBoxIndex, LidarIndex |
|
|
from navsim.common.dataclasses import Lidar, Cameras |
|
|
|
|
|
import mmdet3d_plugin.datasets.utils.calibration as calib_utils |
|
|
from navsim.agents.diffusiondrive.transfuser_config import TransfuserConfig |
|
|
from enum import IntEnum |
|
|
|
|
|
@DATASETS.register_module() |
|
|
class NavSimOpenScenesE2EDiffusionDrive(NuScenesE2EDataset): |
|
|
r"""OpenScenes E2E Dataset""" |
|
|
|
|
|
CLASSES = ( |
|
|
"vehicle", |
|
|
"bicycle", |
|
|
"pedestrian", |
|
|
"traffic_cone", |
|
|
"barrier", |
|
|
"czone_sign", |
|
|
"generic_object", |
|
|
) |
|
|
def __init__(self, *args, |
|
|
img_root=None, |
|
|
duplicate=True, load_from_splitted_pkl=False, |
|
|
split_path=None, test_filter_file=None, |
|
|
video_pkl=None, |
|
|
ep_path=None, train_pdm_path=None, test_pdm_path=None, |
|
|
future_frame_num=0, history_frame_num=3, |
|
|
finetune_yaml=None, cache_path=None, **kwargs): |
|
|
|
|
|
|
|
|
self.img_root = img_root |
|
|
self.finetune_yaml = finetune_yaml |
|
|
self.duplicate=duplicate |
|
|
|
|
|
|
|
|
self.fail_syn_map_filter = None |
|
|
self.synthetic_img_root = None |
|
|
self.full_fail_map_filter = None |
|
|
self.cache_path = cache_path |
|
|
self.metric_cache_tokens = None |
|
|
|
|
|
self.load_from_splitted_pkl = load_from_splitted_pkl |
|
|
self.future_frame_num = future_frame_num |
|
|
self.history_frame_num = history_frame_num |
|
|
|
|
|
self._config = TransfuserConfig() |
|
|
self.map_root = '/xxx/nuplan/dataset/maps' |
|
|
self.map_api = DiffusionDriveMap(self._config, |
|
|
map_root=self.map_root) |
|
|
|
|
|
|
|
|
split_path = '/xxx/navtrain_split' |
|
|
train_pdm_path = '/xxx/pdm_8192_gt_cache' |
|
|
test_pdm_path = '/xxx/pdm_8192_gt_cache_navtest' |
|
|
|
|
|
self.split_path = split_path |
|
|
|
|
|
self.train_pdm_path = train_pdm_path |
|
|
self.test_pdm_path = test_pdm_path |
|
|
self.video_pkl = video_pkl |
|
|
|
|
|
|
|
|
if self.cache_path is not None: |
|
|
print(f'loading cache info from:{cache_path}') |
|
|
self.init_metric_cache_paths(cache_path) |
|
|
|
|
|
super(NavSimOpenScenesE2EDiffusionDrive, self).__init__(*args, **kwargs) |
|
|
self.cache_path = cache_path |
|
|
|
|
|
def init_metric_cache_paths(self, cache_path): |
|
|
""" |
|
|
Helper function to load all cache file paths from folder. |
|
|
:param cache_path: directory of cache folder |
|
|
:return: dictionary of token and file path |
|
|
""" |
|
|
metadata_dir = Path(cache_path) / "metadata" |
|
|
metadata_file = [file for file in metadata_dir.iterdir() if ".csv" in str(file)][0] |
|
|
with open(str(metadata_file), "r") as f: |
|
|
cache_paths = f.read().splitlines()[1:] |
|
|
self.metric_cache_dict = {cache_path.split("/")[-2]: cache_path for cache_path in cache_paths} |
|
|
print(f'cache loaded, size {len(self.metric_cache_dict.keys())}') |
|
|
|
|
|
self.metric_cache_tokens = set(self.metric_cache_dict.keys()) |
|
|
|
|
|
def merge_into_splits(self, ann_file, scene_filter): |
|
|
|
|
|
all_pkl_file = glob(ann_file + '/*.pkl') |
|
|
print(f'{len(all_pkl_file)} files found') |
|
|
|
|
|
data_infos = [] |
|
|
for pkl_file in tqdm(all_pkl_file): |
|
|
data_tmp = mmcv.load(pkl_file, file_format="pkl")['infos'] |
|
|
scene_filter_expanded = set() |
|
|
for idx, data_frame in enumerate(data_tmp): |
|
|
if data_frame['token'] in scene_filter: |
|
|
start_frame_idx = idx - self.history_frame_num |
|
|
end_frame_idx = idx + self.future_frame_num |
|
|
for i in range(start_frame_idx, end_frame_idx + 1): |
|
|
if i < 0 or i >= len(data_tmp): |
|
|
continue |
|
|
scene_filter_expanded.add(data_tmp[i]['token']) |
|
|
|
|
|
data_save = [] |
|
|
for data_frame in data_tmp: |
|
|
token = data_frame['token'] |
|
|
if token in scene_filter_expanded: |
|
|
data_save.append(data_frame) |
|
|
data_infos.extend(data_save) |
|
|
return data_infos |
|
|
|
|
|
|
|
|
def load_annotations(self, ann_file): |
|
|
|
|
|
|
|
|
print(ann_file) |
|
|
if self.test_mode: |
|
|
print('use navtest filter...') |
|
|
self.val=True |
|
|
nav_filter = os.path.join(self.data_root, 'navsim_filter/navtest.yaml') |
|
|
else: |
|
|
print('use navtrain filter...') |
|
|
self.val=False |
|
|
nav_filter = os.path.join(self.data_root, 'navsim_filter/navtrain.yaml') |
|
|
|
|
|
|
|
|
if self.use_num_split >0 and ('val' not in ann_file) and (self.finetune_yaml==None): |
|
|
scene_filter = [] |
|
|
for i in range(5): |
|
|
if i>=self.use_num_split: |
|
|
break |
|
|
path = self.split_path + f'/split_{i}.yaml' |
|
|
with open(path, 'r') as file: |
|
|
nav_filter = yaml.safe_load(file) |
|
|
scene_filter += nav_filter['tokens'] |
|
|
else: |
|
|
if self.finetune_yaml is not None: |
|
|
print(f'loading finetuning yaml from:{self.finetune_yaml}') |
|
|
nav_filter = self.finetune_yaml |
|
|
with open(nav_filter, 'r') as file: |
|
|
nav_filter = yaml.safe_load(file) |
|
|
try: |
|
|
scene_filter = nav_filter['tokens'] |
|
|
except: |
|
|
scene_filter = nav_filter['scenario_tokens'] |
|
|
|
|
|
|
|
|
print('loading dataset!') |
|
|
if self.load_from_splitted_pkl: |
|
|
data_infos = self.merge_into_splits(ann_file, scene_filter) |
|
|
else: |
|
|
print(ann_file) |
|
|
data_infos = mmcv.load(ann_file, file_format="pkl") |
|
|
|
|
|
|
|
|
print('dataset loaded, length:', len(data_infos)) |
|
|
|
|
|
self.scene_filter = set(scene_filter) |
|
|
if self.video_pkl is not None: |
|
|
vidmetadata_file = '/xxx/navsim_metric_cache_navtrain/metadata/navsim_metric_cache_navtrain2_metadata_node_0.csv' |
|
|
with open(str(vidmetadata_file), "r") as f: |
|
|
vid_cache_paths = f.read().splitlines()[1:] |
|
|
|
|
|
metric_cache_filter = [cache_path.split("/")[-2] for cache_path in vid_cache_paths] |
|
|
|
|
|
print(f'video data loaded, length:', len(metric_cache_filter)) |
|
|
self.scene_filter = self.scene_filter | set(metric_cache_filter) |
|
|
|
|
|
|
|
|
self.index_map = [] |
|
|
for i, info in enumerate(data_infos): |
|
|
if info['token'] in self.scene_filter: |
|
|
if self.test_mode==False: |
|
|
|
|
|
valid_len=np.sum(info['gt_fut_bbox_sdc_mask'][0,:8]) |
|
|
if valid_len <8: |
|
|
continue |
|
|
self.index_map.append(i) |
|
|
|
|
|
if self.use_num_split > 0 and self.val==False and self.duplicate==True: |
|
|
self.index_map = self.duplicate_indices(self.index_map, n=5/self.use_num_split) |
|
|
|
|
|
print(f'filtering {len(data_infos)} frames to {len(self.index_map)}...') |
|
|
|
|
|
|
|
|
for info in data_infos: |
|
|
info['scene_token'] = info['log_token'] |
|
|
|
|
|
self.finetuning_mask = None |
|
|
|
|
|
return data_infos |
|
|
|
|
|
def duplicate_indices(self, indices, n): |
|
|
int_part = int(n) |
|
|
frac_part = n - int_part |
|
|
|
|
|
duplicated = indices * int_part |
|
|
|
|
|
if frac_part > 0: |
|
|
num_extra = int(len(indices) * frac_part) |
|
|
extra_indices = list(np.random.choice(indices, num_extra, replace=False)) |
|
|
duplicated.extend(extra_indices) |
|
|
|
|
|
return duplicated |
|
|
|
|
|
def __len__(self): |
|
|
return len(self.index_map) |
|
|
|
|
|
def __getitem__(self, idx): |
|
|
|
|
|
new_idx = self.index_map[idx] |
|
|
|
|
|
if self.test_mode: |
|
|
|
|
|
return self.prepare_test_data(new_idx) |
|
|
while True: |
|
|
|
|
|
data = self.prepare_train_data(new_idx) |
|
|
if data is None: |
|
|
idx = self._rand_another(idx) |
|
|
new_idx = self.index_map[idx] |
|
|
continue |
|
|
return data |
|
|
|
|
|
def init_dataset(self): |
|
|
self.sdc_acc_ego_fix = False |
|
|
|
|
|
|
|
|
self.eval_detection_configs = config_factory_nuPlan() |
|
|
self.default_attr = { |
|
|
"vehicle": "vehicle.parked", |
|
|
"bicycle": "cycle.without_rider", |
|
|
"pedestrian": "pedestrian.moving", |
|
|
"traffic_cone": "", |
|
|
"barrier": "", |
|
|
"czone_sign": "", |
|
|
"generic_object": "", |
|
|
} |
|
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def prepare_test_data_old(self, index): |
|
|
""" |
|
|
Training data preparation. |
|
|
Args: |
|
|
index (int): Index for accessing the target data. |
|
|
Returns: |
|
|
dict: Training data dict of the corresponding index. |
|
|
img: queue_length, 6, 3, H, W |
|
|
img_metas: img_metas of each frame (list) |
|
|
gt_globals_3d: gt_globals of each frame (list) |
|
|
gt_bboxes_3d: gt_bboxes of each frame (list) |
|
|
gt_inds: gt_inds of each frame (list) |
|
|
""" |
|
|
data_queue = [] |
|
|
self.enbale_temporal_aug = False |
|
|
|
|
|
final_index = index |
|
|
first_index = index - self.queue_length + 1 |
|
|
if first_index < 0: |
|
|
return None |
|
|
if ( |
|
|
self.data_infos[first_index]["scene_token"] |
|
|
!= self.data_infos[final_index]["scene_token"] |
|
|
): |
|
|
print(index, 2) |
|
|
return None |
|
|
|
|
|
|
|
|
if self.cache_path is not None: |
|
|
if self.data_infos[index]["token"] not in self.metric_cache_tokens: |
|
|
print(f'Token: {self.data_infos[index]["scene_token"]} not in Metric cache') |
|
|
return None |
|
|
|
|
|
input_dict = self.get_data_info(final_index) |
|
|
|
|
|
if self.finetuning_mask is not None: |
|
|
input_dict['fail_mask'] = self.finetuning_mask[self.reverse_index_map[index]] |
|
|
|
|
|
prev_indexs_list = list(reversed(range(first_index, final_index))) |
|
|
|
|
|
if input_dict is None: |
|
|
print(index, index in self.fail_syn_map_filter ,3) |
|
|
return None |
|
|
frame_idx = input_dict["frame_idx"] |
|
|
scene_token = input_dict["scene_token"] |
|
|
self.pre_pipeline(input_dict) |
|
|
example = self.pipeline(input_dict) |
|
|
data_queue.insert(0, example) |
|
|
|
|
|
|
|
|
for i in prev_indexs_list: |
|
|
|
|
|
input_dict = self.get_data_info(i, prev_frame=True) |
|
|
if self.finetuning_mask is not None: |
|
|
input_dict['fail_mask'] = self.finetuning_mask[self.reverse_index_map[index]] |
|
|
|
|
|
if input_dict is None: |
|
|
print(index, index in self.fail_syn_map_filter, 4) |
|
|
return None |
|
|
if ( |
|
|
input_dict["frame_idx"] < frame_idx |
|
|
and input_dict["scene_token"] == scene_token |
|
|
): |
|
|
self.pre_pipeline(input_dict) |
|
|
example = self.pipeline(input_dict) |
|
|
|
|
|
frame_idx = input_dict["frame_idx"] |
|
|
|
|
|
data_queue.insert(0, copy.deepcopy(example)) |
|
|
|
|
|
|
|
|
if self.val or self.test_mode: |
|
|
data_queue = self.union2one_test(data_queue) |
|
|
else: |
|
|
data_queue = self.union2one(data_queue, self.data_infos[index]["token"] ) |
|
|
return data_queue |
|
|
|
|
|
|
|
|
def prepare_sdc_gt_planning(self, index): |
|
|
|
|
|
sdc_plan = [] |
|
|
info = self.data_infos[index] |
|
|
if self.video_pkl is not None: |
|
|
|
|
|
gt_fut_bbox_sdc_lidar = info["gt_fut_bbox_sdc_lidar"][:, : self.planning_steps, [0, 1, 6]] |
|
|
gt_fut_bbox_sdc_lidar = gt_fut_bbox_sdc_lidar[0] |
|
|
return gt_fut_bbox_sdc_lidar |
|
|
|
|
|
global2lidar = np.linalg.inv(info["lidar2global"] ) |
|
|
for i in range(8): |
|
|
fut_info = self.data_infos[index + 1 + i] |
|
|
|
|
|
sdc_loc_global, _ = calib_utils.transform_matrix_to_vector( |
|
|
fut_info["ego2global"] |
|
|
) |
|
|
sdc_loc_global = np.concatenate((sdc_loc_global, np.array([1]))) |
|
|
sdc_loc_lidar = global2lidar @ sdc_loc_global |
|
|
|
|
|
sdc_trans_lidar = ( |
|
|
global2lidar @ fut_info["ego2global"] |
|
|
) |
|
|
yaw_lidar = scipy.spatial.transform.Rotation.from_matrix( |
|
|
sdc_trans_lidar[:3, :3] |
|
|
).as_euler("xyz", degrees=False)[-1] |
|
|
sdc_plan.append([sdc_loc_lidar[0], sdc_loc_lidar[1], yaw_lidar]) |
|
|
|
|
|
sdc_plan = np.stack(sdc_plan, axis=0) |
|
|
return sdc_plan |
|
|
|
|
|
|
|
|
def prepare_train_data(self, index): |
|
|
info = self.data_infos[index] |
|
|
|
|
|
|
|
|
if index+8>len(self.data_infos): |
|
|
print(f'token:{index} exceed in future scene') |
|
|
return None |
|
|
match_flag = all(self.data_infos[index+1+j]['log_token'] == info['log_token'] for j in range(8)) |
|
|
if match_flag==False and (self.video_pkl is None): |
|
|
print(f'token:{index} not match in future scene') |
|
|
return None |
|
|
|
|
|
|
|
|
features = self.prepare_test_data(index) |
|
|
|
|
|
|
|
|
label_features = self.prepare_label(index) |
|
|
features.update(label_features) |
|
|
|
|
|
features['img_metas'] = DC(deepcopy(info['token']), cpu_only=True) |
|
|
|
|
|
if self.cache_path is not None: |
|
|
features["metric_cache"] = DC(self.get_metric_cache(info['token']), cpu_only=True) |
|
|
|
|
|
return features |
|
|
|
|
|
|
|
|
def prepare_label(self, index): |
|
|
info = self.data_infos[index] |
|
|
sdc_plan = self.prepare_sdc_gt_planning(index) |
|
|
trajectory = torch.from_numpy(sdc_plan) |
|
|
agent_states, agent_labels = self._compute_agent_targets(index) |
|
|
bev_semantic_map = self.map_api.compute_bev_semantic_map(info) |
|
|
|
|
|
return { |
|
|
"trajectory": trajectory, |
|
|
"agent_states": agent_states, |
|
|
"agent_labels": agent_labels, |
|
|
"bev_semantic_map": bev_semantic_map, |
|
|
} |
|
|
|
|
|
def _compute_agent_targets(self, index): |
|
|
""" |
|
|
Extracts 2D agent bounding boxes in ego coordinates |
|
|
:param annotations: annotation dataclass |
|
|
:return: tuple of bounding box values and labels (binary) |
|
|
""" |
|
|
info = self.data_infos[index] |
|
|
max_agents = self._config.num_bounding_boxes |
|
|
agent_states_list = [] |
|
|
|
|
|
def _xy_in_lidar(x: float, y: float, config: TransfuserConfig) -> bool: |
|
|
return (config.lidar_min_x <= x <= config.lidar_max_x) and (config.lidar_min_y <= y <= config.lidar_max_y) |
|
|
|
|
|
for box, name in zip(info['anns']["gt_boxes"], info['anns']["gt_names"]): |
|
|
box_x, box_y, box_heading, box_length, box_width = ( |
|
|
box[BoundingBoxIndex.X], |
|
|
box[BoundingBoxIndex.Y], |
|
|
box[BoundingBoxIndex.HEADING], |
|
|
box[BoundingBoxIndex.LENGTH], |
|
|
box[BoundingBoxIndex.WIDTH], |
|
|
) |
|
|
|
|
|
if name == "vehicle" and _xy_in_lidar(box_x, box_y, self._config): |
|
|
agent_states_list.append(np.array([box_x, box_y, box_heading, box_length, box_width], dtype=np.float32)) |
|
|
|
|
|
agents_states_arr = np.array(agent_states_list) |
|
|
|
|
|
|
|
|
agent_states = np.zeros((max_agents, BoundingBox2DIndex.size()), dtype=np.float32) |
|
|
agent_labels = np.zeros(max_agents, dtype=bool) |
|
|
|
|
|
if len(agents_states_arr) > 0: |
|
|
distances = np.linalg.norm(agents_states_arr[..., BoundingBox2DIndex.POINT], axis=-1) |
|
|
argsort = np.argsort(distances)[:max_agents] |
|
|
|
|
|
|
|
|
agents_states_arr = agents_states_arr[argsort] |
|
|
agent_states[: len(agents_states_arr)] = agents_states_arr |
|
|
agent_labels[: len(agents_states_arr)] = True |
|
|
|
|
|
return torch.tensor(agent_states), torch.tensor(agent_labels) |
|
|
|
|
|
|
|
|
|
|
|
def prepare_test_data(self, index): |
|
|
"""Inherited, see superclass.""" |
|
|
info = self.data_infos[index] |
|
|
features = {} |
|
|
|
|
|
features['token'] = DC(deepcopy(info['token']), cpu_only=True) |
|
|
features["camera_feature"] = self._get_camera_feature(info) |
|
|
features["lidar_feature"] = self._get_lidar_feature(info) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
features["status_feature"] = torch.cat( |
|
|
[ |
|
|
torch.tensor(info['driving_command'], dtype=torch.float32), |
|
|
torch.tensor(info["ego_dynamic_state"][:2], dtype=torch.float32), |
|
|
torch.tensor(info["ego_dynamic_state"][2:], dtype=torch.float32), |
|
|
], |
|
|
) |
|
|
|
|
|
return features |
|
|
|
|
|
def _get_camera_feature(self, info) -> torch.Tensor: |
|
|
""" |
|
|
Extract stitched camera from AgentInput |
|
|
:param agent_input: input dataclass |
|
|
:return: stitched front view image as torch tensor |
|
|
""" |
|
|
|
|
|
cameras = Cameras.from_camera_dict( |
|
|
sensor_blobs_path=Path(self.img_root), |
|
|
camera_dict=info["cams"], |
|
|
sensor_names=['cam_l0', 'cam_f0', 'cam_r0'], |
|
|
) |
|
|
|
|
|
|
|
|
l0 = cameras.cam_l0.image[28:-28, 416:-416] |
|
|
f0 = cameras.cam_f0.image[28:-28] |
|
|
r0 = cameras.cam_r0.image[28:-28, 416:-416] |
|
|
|
|
|
|
|
|
stitched_image = np.concatenate([l0, f0, r0], axis=1) |
|
|
resized_image = cv2.resize(stitched_image, (1024, 256)) |
|
|
|
|
|
|
|
|
tensor_image = transforms.ToTensor()(resized_image) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return tensor_image |
|
|
|
|
|
def _get_lidar_feature(self, info) -> torch.Tensor: |
|
|
""" |
|
|
Compute LiDAR feature as 2D histogram, according to Transfuser |
|
|
:param agent_input: input dataclass |
|
|
:return: LiDAR histogram as torch tensors |
|
|
""" |
|
|
lidar_path = info['lidar_path'] |
|
|
lidar = Lidar.from_paths(self.img_root , lidar_path, "lidar_pc") |
|
|
|
|
|
lidar_pc = lidar.lidar_pc[LidarIndex.POSITION].T |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def splat_points(point_cloud): |
|
|
|
|
|
xbins = np.linspace( |
|
|
self._config.lidar_min_x, |
|
|
self._config.lidar_max_x, |
|
|
(self._config.lidar_max_x - self._config.lidar_min_x) * int(self._config.pixels_per_meter) + 1, |
|
|
) |
|
|
ybins = np.linspace( |
|
|
self._config.lidar_min_y, |
|
|
self._config.lidar_max_y, |
|
|
(self._config.lidar_max_y - self._config.lidar_min_y) * int(self._config.pixels_per_meter) + 1, |
|
|
) |
|
|
hist = np.histogramdd(point_cloud[:, :2], bins=(xbins, ybins))[0] |
|
|
hist[hist > self._config.hist_max_per_pixel] = self._config.hist_max_per_pixel |
|
|
overhead_splat = hist / self._config.hist_max_per_pixel |
|
|
return overhead_splat |
|
|
|
|
|
|
|
|
lidar_pc = lidar_pc[lidar_pc[..., 2] < self._config.max_height_lidar] |
|
|
below = lidar_pc[lidar_pc[..., 2] <= self._config.lidar_split_height] |
|
|
above = lidar_pc[lidar_pc[..., 2] > self._config.lidar_split_height] |
|
|
above_features = splat_points(above) |
|
|
if self._config.use_ground_plane: |
|
|
below_features = splat_points(below) |
|
|
features = np.stack([below_features, above_features], axis=-1) |
|
|
else: |
|
|
features = np.stack([above_features], axis=-1) |
|
|
features = np.transpose(features, (2, 0, 1)).astype(np.float32) |
|
|
|
|
|
return torch.from_numpy(features) |
|
|
|
|
|
|
|
|
def get_metric_cache(self, token): |
|
|
with lzma.open(self.metric_cache_dict[token], "rb") as f: |
|
|
metric_cache = pickle.load(f) |
|
|
return metric_cache |
|
|
|
|
|
def union2one(self, queue, token): |
|
|
""" |
|
|
convert a sequence of the sample dict into one single sample. |
|
|
""" |
|
|
|
|
|
|
|
|
imgs_list: List[torch.Tensor] = [ |
|
|
each["img"].data for each in queue |
|
|
] |
|
|
l2g_r_mat_list: List[torch.Tensor] = [ |
|
|
to_tensor(each["l2g_r_mat"]) for each in queue |
|
|
] |
|
|
l2g_t_list: List[torch.Tensor] = [ |
|
|
to_tensor(each["l2g_t"]) for each in queue |
|
|
] |
|
|
timestamp_list = [to_tensor(each["timestamp"]) for each in queue] |
|
|
|
|
|
|
|
|
metas_map = {} |
|
|
prev_pos = None |
|
|
prev_angle = None |
|
|
for i, each in enumerate(queue): |
|
|
metas_map[i] = each["img_metas"].data |
|
|
if i == 0: |
|
|
metas_map[i]["prev_bev"] = False |
|
|
prev_pos = copy.deepcopy(metas_map[i]["can_bus"][:3]) |
|
|
prev_angle = copy.deepcopy(metas_map[i]["can_bus"][-1]) |
|
|
metas_map[i]["can_bus"][:3] = 0 |
|
|
metas_map[i]["can_bus"][-1] = 0 |
|
|
else: |
|
|
metas_map[i]["prev_bev"] = True |
|
|
tmp_pos = copy.deepcopy(metas_map[i]["can_bus"][:3]) |
|
|
tmp_angle = copy.deepcopy(metas_map[i]["can_bus"][-1]) |
|
|
metas_map[i]["can_bus"][:3] -= prev_pos |
|
|
metas_map[i]["can_bus"][-1] -= prev_angle |
|
|
prev_pos = copy.deepcopy(tmp_pos) |
|
|
prev_angle = copy.deepcopy(tmp_angle) |
|
|
|
|
|
|
|
|
queue[-1]["img"] = DC( |
|
|
torch.stack(imgs_list), cpu_only=False, stack=True |
|
|
) |
|
|
queue[-1]["img_metas"] = DC(metas_map, cpu_only=True) |
|
|
|
|
|
|
|
|
|
|
|
queue = queue[-1] |
|
|
|
|
|
|
|
|
if self.cache_path is not None: |
|
|
queue["metric_cache"] = DC(self.get_metric_cache(token), cpu_only=True) |
|
|
|
|
|
|
|
|
queue["l2g_r_mat"] = DC(l2g_r_mat_list) |
|
|
queue["l2g_t"] = DC(l2g_t_list) |
|
|
queue["timestamp"] = DC(timestamp_list) |
|
|
|
|
|
return queue |
|
|
|
|
|
|
|
|
|
|
|
def union2one_test(self, queue): |
|
|
""" |
|
|
convert a sequence of the sample dict into one single sample. |
|
|
""" |
|
|
|
|
|
|
|
|
imgs_list: List[torch.Tensor] = [ |
|
|
each["img"][0].data for each in queue |
|
|
] |
|
|
l2g_r_mat_list: List[torch.Tensor] = [ |
|
|
to_tensor(each["l2g_r_mat"]) for each in queue |
|
|
] |
|
|
l2g_t_list: List[torch.Tensor] = [ |
|
|
to_tensor(each["l2g_t"]) for each in queue |
|
|
] |
|
|
timestamp_list = [to_tensor(each["timestamp"]) for each in queue] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
metas_map = {} |
|
|
prev_pos = None |
|
|
prev_angle = None |
|
|
for i, each in enumerate(queue): |
|
|
metas_map[i] = each["img_metas"][0].data |
|
|
if i == 0: |
|
|
metas_map[i]["prev_bev"] = False |
|
|
prev_pos = copy.deepcopy(metas_map[i]["can_bus"][:3]) |
|
|
prev_angle = copy.deepcopy(metas_map[i]["can_bus"][-1]) |
|
|
metas_map[i]["can_bus"][:3] = 0 |
|
|
metas_map[i]["can_bus"][-1] = 0 |
|
|
else: |
|
|
metas_map[i]["prev_bev"] = True |
|
|
tmp_pos = copy.deepcopy(metas_map[i]["can_bus"][:3]) |
|
|
tmp_angle = copy.deepcopy(metas_map[i]["can_bus"][-1]) |
|
|
metas_map[i]["can_bus"][:3] -= prev_pos |
|
|
metas_map[i]["can_bus"][-1] -= prev_angle |
|
|
prev_pos = copy.deepcopy(tmp_pos) |
|
|
prev_angle = copy.deepcopy(tmp_angle) |
|
|
|
|
|
|
|
|
queue[-1]["img"] = DC( |
|
|
torch.stack(imgs_list), cpu_only=False, stack=True |
|
|
) |
|
|
queue[-1]["img_metas"] = DC(metas_map, cpu_only=True) |
|
|
|
|
|
|
|
|
queue = queue[-1] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
queue["l2g_r_mat"] = DC(l2g_r_mat_list) |
|
|
queue["l2g_t"] = DC(l2g_t_list) |
|
|
queue["timestamp"] = DC(timestamp_list) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return queue |
|
|
|
|
|
def init_trackpredplan( |
|
|
self, |
|
|
predict_steps, |
|
|
planning_steps, |
|
|
past_steps, |
|
|
fut_steps, |
|
|
use_nonlinear_optimizer, |
|
|
): |
|
|
|
|
|
self.predict_steps = predict_steps |
|
|
self.planning_steps = planning_steps |
|
|
self.past_steps = past_steps |
|
|
self.fut_steps = fut_steps |
|
|
self.use_nonlinear_optimizer = use_nonlinear_optimizer |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def init_occupancy( |
|
|
self, |
|
|
occ_receptive_field, |
|
|
occ_n_future, |
|
|
occ_filter_invalid_sample, |
|
|
occ_filter_by_valid_flag, |
|
|
): |
|
|
self.occ_receptive_field = occ_receptive_field |
|
|
self.occ_n_future = occ_n_future |
|
|
self.occ_filter_invalid_sample = occ_filter_invalid_sample |
|
|
self.occ_filter_by_valid_flag = occ_filter_by_valid_flag |
|
|
self.occ_only_total_frames = 7 |
|
|
assert self.occ_filter_by_valid_flag is False |
|
|
|
|
|
|
|
|
|
|
|
def get_pdm_score_info(self, input_dict, index=None): |
|
|
|
|
|
if self.val: |
|
|
base_path = self.test_pdm_path |
|
|
if self.full_fail_map_filter: |
|
|
if index in self.full_fail_map_filter: |
|
|
base_path = self.train_pdm_path |
|
|
else: |
|
|
base_path = self.train_pdm_path |
|
|
|
|
|
if self.fail_syn_map_filter: |
|
|
if index in self.fail_syn_map_filter: |
|
|
base_path = self.synthetic_pdm |
|
|
log_name = input_dict['log_name'].split('-')[0] + '-' + input_dict['log_name'].split('-')[1] |
|
|
else: |
|
|
log_name = input_dict['log_name'] |
|
|
|
|
|
load_path = base_path + '/' + log_name + '/' |
|
|
type_list = os.listdir(load_path) |
|
|
for t_name in type_list: |
|
|
buf_dir = t_name+ '/' +input_dict['sample_idx'] |
|
|
if os.path.exists(buf_dir): |
|
|
break |
|
|
load_path = load_path + buf_dir + '/pdm_cache.pkl' |
|
|
|
|
|
with lzma.open(load_path, "rb") as f: |
|
|
data = pickle.load(f).trajectory |
|
|
|
|
|
input_dict.update(data) |
|
|
|
|
|
return input_dict |
|
|
|
|
|
def get_zero_pdm(self, input_dict): |
|
|
pdm_list = [ |
|
|
"no_at_fault_collisions", |
|
|
"drivable_area_compliance", |
|
|
"ego_progress", |
|
|
"time_to_collision_within_bound", |
|
|
"comfort", |
|
|
"score", |
|
|
] |
|
|
for k in pdm_list: |
|
|
input_dict[k] = np.zeros((8192,)) |
|
|
return input_dict |
|
|
|
|
|
|
|
|
def get_data_info(self, index, info=None, debug=False, prev_frame=False): |
|
|
"""Get data info according to the given index. |
|
|
|
|
|
Args: |
|
|
index (int): Index of the sample data to get. |
|
|
Returns: |
|
|
dict: Data information that will be passed to the data \ |
|
|
preprocessing pipelines. It includes the following keys: |
|
|
|
|
|
- sample_idx (str): Sample index. |
|
|
- pts_filename (str): Filename of point clouds. |
|
|
- sweeps (list[dict]): Infos of sweeps. |
|
|
- timestamp (float): Sample timestamp. |
|
|
- img_filename (str, optional): Image filename. |
|
|
- lidar2img (list[np.ndarray], optional): Transformations \ |
|
|
from lidar to different cameras. |
|
|
- ann_info (dict): Annotation info. |
|
|
""" |
|
|
|
|
|
if info is None: |
|
|
info = self.data_infos[index] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
input_dict = dict( |
|
|
sample_idx=info["token"], |
|
|
frame_idx=info["frame_idx"], |
|
|
timestamp=info["timestamp"] / 1e6, |
|
|
log_name=info["log_name"], |
|
|
log_token=info["log_token"], |
|
|
scene_name=info["scene_name"], |
|
|
scene_token=info["scene_token"], |
|
|
pts_filename=info["lidar_path"], |
|
|
|
|
|
prev=info["sample_prev"], |
|
|
next=info["sample_next"], |
|
|
) |
|
|
|
|
|
|
|
|
input_dict = self.update_transform(input_dict=input_dict, index=index) |
|
|
input_dict = self.update_sensor(input_dict=input_dict, index=index) |
|
|
input_dict = self.update_canbus(input_dict=input_dict, index=index) |
|
|
if prev_frame==False: |
|
|
|
|
|
input_dict = self.get_pdm_score_info(input_dict, index) |
|
|
else: |
|
|
input_dict = self.get_zero_pdm(input_dict) |
|
|
|
|
|
|
|
|
|
|
|
input_dict["ann_info"] = self.get_ann_info(index) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
input_dict = self.update_ego_prediction(input_dict=input_dict, index=index) |
|
|
input_dict = self.update_ego_planning(input_dict=input_dict, index=index) |
|
|
|
|
|
|
|
|
debug = False |
|
|
if debug: |
|
|
self.vis_data_per_frame( |
|
|
data=input_dict, |
|
|
index=index, |
|
|
vis_root="data/viz_nuplan", |
|
|
) |
|
|
|
|
|
return input_dict |
|
|
|
|
|
|
|
|
|
|
|
def update_anno_token(self, annotation, index): |
|
|
|
|
|
|
|
|
return annotation |
|
|
|
|
|
def update_tracking_prediction(self, annotation, index): |
|
|
|
|
|
|
|
|
info = self.data_infos[index] |
|
|
|
|
|
|
|
|
gt_fut_bbox_lidar = info["gt_fut_bbox_lidar"][annotation["mask"]] |
|
|
gt_fut_bbox_mask = info["gt_fut_bbox_mask"][annotation["mask"]] |
|
|
gt_pre_bbox_lidar = info["gt_pre_bbox_lidar"][annotation["mask"]] |
|
|
gt_pre_bbox_mask = info["gt_pre_bbox_mask"][annotation["mask"]] |
|
|
|
|
|
gt_pre_bbox_lidar = np.flip(gt_pre_bbox_lidar, axis=1) |
|
|
gt_pre_bbox_mask = np.flip(gt_pre_bbox_mask, axis=1) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
gt_fut_bbox_mask = np.repeat(gt_fut_bbox_mask, 2, axis=2) |
|
|
gt_pre_bbox_mask = np.repeat(gt_pre_bbox_mask, 2, axis=2) |
|
|
|
|
|
|
|
|
|
|
|
gt_pre_bbox_lidar = np.concatenate( |
|
|
(gt_pre_bbox_lidar, gt_fut_bbox_lidar[:, :4]), axis=1 |
|
|
) |
|
|
gt_pre_bbox_mask = np.concatenate( |
|
|
(gt_pre_bbox_mask, gt_fut_bbox_mask[:, :4]), axis=1 |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
annotation.update( |
|
|
dict( |
|
|
gt_fut_traj=gt_fut_bbox_lidar[:, :, :2], |
|
|
gt_fut_traj_mask=gt_fut_bbox_mask, |
|
|
gt_past_traj=gt_pre_bbox_lidar[:, :, :2], |
|
|
gt_past_traj_mask=gt_pre_bbox_mask, |
|
|
) |
|
|
) |
|
|
return annotation |
|
|
|
|
|
def update_mapping(self, input_dict, index): |
|
|
|
|
|
polyline_dict, drivable_area = self.update_map_vectorized(input_dict, index) |
|
|
|
|
|
input_dict = self.update_map_rasterized(input_dict, index, polyline_dict, drivable_area) |
|
|
|
|
|
return input_dict |
|
|
|
|
|
def update_map_vectorized(self, input_dict, index): |
|
|
info = self.data_infos[index] |
|
|
|
|
|
map_location = info['map_location'] |
|
|
e2g_translation = info['ego2global_translation'] |
|
|
e2g_rotation = info['ego2global_rotation'] |
|
|
if len(e2g_rotation) != 4: |
|
|
e2g_rotation = Quaternion._from_matrix(e2g_rotation).elements |
|
|
polyline_dict = self.vector_map.gen_vectorized_samples(e2g_translation, e2g_rotation, map_location) |
|
|
|
|
|
drivable_area = self.vector_map.gen_drivable_area(e2g_translation, e2g_rotation, map_location) |
|
|
|
|
|
return polyline_dict, drivable_area |
|
|
|
|
|
def update_map_rasterized(self, input_dict, index, polyline_dict, drivable_area, debug=False): |
|
|
gt_labels = [] |
|
|
gt_bboxes = [] |
|
|
gt_masks = [] |
|
|
|
|
|
origin = np.array([self.canvas_size[1] // 2, self.canvas_size[0] // 2]) |
|
|
scale = np.array([self.canvas_size[1] / self.patch_size[0], self.canvas_size[0] / self.patch_size[1]]) |
|
|
|
|
|
for polyline in polyline_dict: |
|
|
if polyline["type"] == 0: |
|
|
continue |
|
|
|
|
|
map_mask = np.zeros(self.canvas_size, np.uint8) |
|
|
draw_coor = (polyline["pts"] * scale + origin).round().astype(np.int32) |
|
|
gt_mask = cv2.polylines(map_mask, [draw_coor], False, color=1, thickness=self.thickness) / 255 |
|
|
|
|
|
ys, xs = np.where(gt_mask) |
|
|
try: |
|
|
gt_bbox = [min(xs), min(ys), max(xs), max(ys)] |
|
|
except ValueError: |
|
|
gt_bbox = [0, 0, 1, 1] |
|
|
continue |
|
|
|
|
|
cls = polyline["type"] |
|
|
|
|
|
gt_labels.append(cls) |
|
|
gt_bboxes.append(gt_bbox) |
|
|
gt_masks.append(gt_mask) |
|
|
|
|
|
|
|
|
map_mask = np.zeros(self.canvas_size, np.uint8) |
|
|
exteriors = [] |
|
|
interiors = [] |
|
|
for p in drivable_area.geoms: |
|
|
exteriors.append( |
|
|
(np.array(p.exterior.coords) * scale + origin).round().astype(np.int32) |
|
|
) |
|
|
for inter in p.interiors: |
|
|
interiors.append( |
|
|
(np.array(inter.coords) * scale + origin).round().astype(np.int32) |
|
|
) |
|
|
cv2.fillPoly(map_mask, exteriors, 255) |
|
|
cv2.fillPoly(map_mask, interiors, 0) |
|
|
map_mask = map_mask / 255 |
|
|
try: |
|
|
ys, xs = np.where(map_mask) |
|
|
gt_bbox = [min(xs), min(ys), max(xs), max(ys)] |
|
|
gt_labels.append(self.map_num_classes) |
|
|
gt_bboxes.append(gt_bbox) |
|
|
gt_masks.append(map_mask) |
|
|
except ValueError: |
|
|
pass |
|
|
|
|
|
gt_labels = torch.tensor(gt_labels) |
|
|
gt_bboxes = torch.from_numpy(np.stack(gt_bboxes)) |
|
|
gt_masks = torch.from_numpy(np.stack(gt_masks)) |
|
|
|
|
|
|
|
|
input_dict.update( |
|
|
dict( |
|
|
gt_lane_labels=gt_labels, |
|
|
gt_lane_bboxes=gt_bboxes, |
|
|
gt_lane_masks=gt_masks, |
|
|
) |
|
|
) |
|
|
return input_dict |
|
|
|
|
|
def update_ego_prediction(self, input_dict, index): |
|
|
|
|
|
|
|
|
|
|
|
info = self.data_infos[index] |
|
|
|
|
|
|
|
|
|
|
|
gt_sdc_bbox = info["gt_sdc_bbox_lidar"][:9].reshape((1, -1)) |
|
|
gt_sdc_bbox = LiDARInstance3DBoxes( |
|
|
gt_sdc_bbox, box_dim=gt_sdc_bbox.shape[-1], origin=(0.5, 0.5, 0) |
|
|
).convert_to(self.box_mode_3d) |
|
|
gt_sdc_bbox = DC(gt_sdc_bbox, cpu_only=True) |
|
|
|
|
|
|
|
|
gt_sdc_label = np.array([0]) |
|
|
gt_sdc_label = DC(to_tensor(gt_sdc_label)) |
|
|
|
|
|
sdc_status=info["gt_sdc_bbox_lidar"][:9] |
|
|
|
|
|
|
|
|
gt_pre_bbox_sdc_lidar = info["gt_pre_bbox_sdc_lidar"] |
|
|
gt_fut_bbox_sdc_lidar = info["gt_fut_bbox_sdc_lidar"] |
|
|
|
|
|
gt_pre_bbox_sdc_global = info["gt_pre_bbox_sdc_global"] |
|
|
gt_fut_bbox_sdc_global = info["gt_fut_bbox_sdc_global"] |
|
|
|
|
|
gt_fut_bbox_sdc_mask = info["gt_fut_bbox_sdc_mask"] |
|
|
gt_fut_bbox_sdc_mask = np.repeat(gt_fut_bbox_sdc_mask, 2, axis=2) |
|
|
gt_pre_bbox_sdc_mask = info["gt_pre_bbox_sdc_mask"] |
|
|
gt_pre_bbox_sdc_mask = np.repeat(gt_pre_bbox_sdc_mask, 4, axis=2) |
|
|
|
|
|
gt_pre_command_sdc = info["gt_pre_command_sdc"] |
|
|
|
|
|
|
|
|
input_dict.update( |
|
|
dict( |
|
|
|
|
|
gt_sdc_bbox=gt_sdc_bbox, |
|
|
gt_sdc_label=gt_sdc_label, |
|
|
gt_sdc_fut_traj=gt_fut_bbox_sdc_lidar[ |
|
|
:, :, :2 |
|
|
], |
|
|
gt_sdc_fut_traj_mask=gt_fut_bbox_sdc_mask, |
|
|
|
|
|
command=info["command"], |
|
|
sdc_planning_world=gt_fut_bbox_sdc_global[ |
|
|
:, : self.planning_steps, [0, 1, 2] |
|
|
], |
|
|
sdc_planning=gt_fut_bbox_sdc_lidar[ |
|
|
:, : self.planning_steps, [0, 1, 6] |
|
|
], |
|
|
sdc_planning_mask=gt_fut_bbox_sdc_mask[ |
|
|
:, : self.planning_steps |
|
|
], |
|
|
sdc_planning_past=info["gt_pre_bbox_sdc_lidar"][:, :, [0, 1, 6]], |
|
|
sdc_planning_mask_past=gt_pre_bbox_sdc_mask, |
|
|
gt_pre_command_sdc=gt_pre_command_sdc, |
|
|
sdc_status=sdc_status[[0, 1, 6]] |
|
|
) |
|
|
) |
|
|
|
|
|
return input_dict |
|
|
|
|
|
def update_ego_planning(self, input_dict, index): |
|
|
info = self.data_infos[index] |
|
|
|
|
|
|
|
|
return input_dict |
|
|
|
|
|
def vis_dataset_specific(self, data: Dict): |
|
|
|
|
|
try: |
|
|
split = "trainval" |
|
|
parent_dir = f"./data/openscene-v1.1/sensor_blobs/{split}" |
|
|
pts_filename: str = os.path.join(parent_dir, data["pts_filename"]) |
|
|
points = PointCloud.parse_from_file(pts_filename).to_pcd_bin2() |
|
|
except FileNotFoundError: |
|
|
try: |
|
|
split = "test" |
|
|
parent_dir = f"./data/openscene-v1.1/sensor_blobs/{split}" |
|
|
pts_filename: str = os.path.join(parent_dir, data["pts_filename"]) |
|
|
points = PointCloud.parse_from_file(pts_filename).to_pcd_bin2() |
|
|
except FileNotFoundError: |
|
|
split = "mini" |
|
|
parent_dir = f"./data/openscene-v1.1/sensor_blobs/{split}" |
|
|
pts_filename: str = os.path.join(parent_dir, data["pts_filename"]) |
|
|
points = PointCloud.parse_from_file(pts_filename).to_pcd_bin2() |
|
|
|
|
|
points = points[:3].T |
|
|
|
|
|
image_file_list = [] |
|
|
for cam_name in range(8): |
|
|
|
|
|
image_file: str = os.path.join(parent_dir, data["img_filename"][cam_name]) |
|
|
image_file_list.append(image_file) |
|
|
|
|
|
return { |
|
|
"lidar": points, |
|
|
"command_dict": { |
|
|
0: "TURN LEFT", |
|
|
1: "KEEP FORWARD", |
|
|
2: "TURN RIGHT", |
|
|
3: "UNKNOWN", |
|
|
}, |
|
|
"image_file_list": image_file_list, |
|
|
"left_hand": False, |
|
|
} |
|
|
|
|
|
|
|
|
class BoundingBox2DIndex(IntEnum): |
|
|
"""Intenum for bounding boxes in TransFuser.""" |
|
|
|
|
|
_X = 0 |
|
|
_Y = 1 |
|
|
_HEADING = 2 |
|
|
_LENGTH = 3 |
|
|
_WIDTH = 4 |
|
|
|
|
|
@classmethod |
|
|
def size(cls): |
|
|
valid_attributes = [ |
|
|
attribute |
|
|
for attribute in dir(cls) |
|
|
if attribute.startswith("_") and not attribute.startswith("__") and not callable(getattr(cls, attribute)) |
|
|
] |
|
|
return len(valid_attributes) |
|
|
|
|
|
@classmethod |
|
|
@property |
|
|
def X(cls): |
|
|
return cls._X |
|
|
|
|
|
@classmethod |
|
|
@property |
|
|
def Y(cls): |
|
|
return cls._Y |
|
|
|
|
|
@classmethod |
|
|
@property |
|
|
def HEADING(cls): |
|
|
return cls._HEADING |
|
|
|
|
|
@classmethod |
|
|
@property |
|
|
def LENGTH(cls): |
|
|
return cls._LENGTH |
|
|
|
|
|
@classmethod |
|
|
@property |
|
|
def WIDTH(cls): |
|
|
return cls._WIDTH |
|
|
|
|
|
@classmethod |
|
|
@property |
|
|
def POINT(cls): |
|
|
|
|
|
return slice(cls._X, cls._Y + 1) |
|
|
|
|
|
@classmethod |
|
|
@property |
|
|
def STATE_SE2(cls): |
|
|
|
|
|
return slice(cls._X, cls._HEADING + 1) |