|
|
import argparse |
|
|
import glob |
|
|
import os |
|
|
import pickle as pkl |
|
|
from typing import Dict, List |
|
|
|
|
|
import diffstack_utils.box.sensor as box_sensor |
|
|
import diffstack_utils.box.transform as box_transform |
|
|
import diffstack_utils.box.vis as box_vis |
|
|
import diffstack_utils.carla.category as carla_class |
|
|
import diffstack_utils.carla.control as carla_control |
|
|
import diffstack_utils.object.rotation as object_rotation |
|
|
import diffstack_utils.sensor.calibration as calib_utils |
|
|
import numpy as np |
|
|
import pyquaternion |
|
|
import scipy |
|
|
import xinshuo_io as io_utils |
|
|
import xinshuo_miscellaneous as miscell_utils |
|
|
from diffstack_utils.box.data_structure import Box3D |
|
|
from nuscenes.eval.common.utils import Quaternion, quaternion_yaw |
|
|
|
|
|
FPS = 20 |
|
|
FRAME_INTERVAL = 10.0 |
|
|
FPS_KEYFRAME = FPS / FRAME_INTERVAL |
|
|
|
|
|
|
|
|
def parse_token(anno: Dict, scene_root: str, frame: str) -> Dict: |
|
|
"""Convert root of the sequence dir and frame of a file to nuScenes-like token and ts |
|
|
|
|
|
Returns: |
|
|
token: unique identifier for every frame of the data |
|
|
timestamp: unique identifier of the frame according to time |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
seq: str = scene_root.split("/")[-1] |
|
|
town_string: str = seq.split("_")[4] |
|
|
try: |
|
|
town_index = int(scene_root.split("Town")[-1].split("_")[0]) |
|
|
except ValueError: |
|
|
town_index = int(scene_root.split("Town")[-1].split("_")[0][:2]) |
|
|
route_index = int(scene_root.split("RouteScenario_")[-1].split("_")[0]) |
|
|
|
|
|
|
|
|
token: str = f"{seq}_frame_{frame}" |
|
|
|
|
|
|
|
|
frame_int: int = int(float(frame)) |
|
|
max_frame_per_route = 100000 |
|
|
max_route_per_town = 1000 |
|
|
timestamp: int = ( |
|
|
town_index * max_route_per_town + route_index |
|
|
) * max_frame_per_route + frame_int |
|
|
|
|
|
|
|
|
anno.update( |
|
|
{ |
|
|
"token": token, |
|
|
"timestamp": timestamp, |
|
|
"scene_token": seq, |
|
|
"frame_idx": frame_int, |
|
|
"town_string": town_string, |
|
|
} |
|
|
) |
|
|
|
|
|
return anno |
|
|
|
|
|
|
|
|
def parse_traffic_light_anno(anno: Dict, scene_root: str, frame: str) -> Dict: |
|
|
"""Parse annotation of traffic light and stop sign and add to dictionary""" |
|
|
|
|
|
|
|
|
traffic_light_path = f"{scene_root}/metadata/traffic_light/{frame}.pkl" |
|
|
with open(traffic_light_path, "rb") as f: |
|
|
traffic_light_data: Dict = pkl.load(f) |
|
|
|
|
|
|
|
|
anno["traffic_light"] = { |
|
|
"hazard": traffic_light_data["hazard"], |
|
|
"in_range": traffic_light_data["in_range"], |
|
|
"light_close_in_dist": traffic_light_data["light_close_in_dist"], |
|
|
"red_light_in_front": traffic_light_data["red_light_in_front"], |
|
|
} |
|
|
|
|
|
|
|
|
num_obj: int = traffic_light_data["location"].shape[1] |
|
|
anno["traffic_light"].update( |
|
|
{ |
|
|
"gt_boxes": np.zeros((num_obj, 7), dtype="float32"), |
|
|
"gt_names": np.zeros((num_obj,), dtype=object), |
|
|
"attr_labels": np.asarray([[] for _ in range(num_obj)]), |
|
|
"valid_flag": np.ones((num_obj,), dtype="bool"), |
|
|
"bbox_ego_matrix": np.zeros((num_obj, 4, 4), dtype="float32"), |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
for bbox_idx in range(num_obj): |
|
|
|
|
|
bbox_location: np.ndarray = traffic_light_data["location"][0, bbox_idx] |
|
|
bbox_rotation: np.ndarray = traffic_light_data["rotation"][0, bbox_idx] |
|
|
bbox_size: np.ndarray = traffic_light_data["size"][0, bbox_idx] |
|
|
bbox_state: str = traffic_light_data["states"][bbox_idx] |
|
|
|
|
|
|
|
|
bbox2global = calib_utils.create_transform( |
|
|
bbox_location, bbox_rotation |
|
|
) |
|
|
|
|
|
|
|
|
bbox_center = np.array([[0, 0, 0, 1]]).transpose() |
|
|
bbox_center_world: np.ndarray = bbox2global @ bbox_center |
|
|
bbox_center_ego = np.linalg.inv(anno["ego2global"]) @ bbox_center_world |
|
|
bbox_center_lidar = np.linalg.inv(anno["lidar2ego"]) @ bbox_center_ego |
|
|
|
|
|
|
|
|
bbox2lidar = np.linalg.inv(anno["lidar2global"]) @ bbox2global |
|
|
yaw = scipy.spatial.transform.Rotation.from_matrix(bbox2lidar[:3, :3]).as_euler( |
|
|
"xyz", degrees=False |
|
|
)[-1] |
|
|
|
|
|
|
|
|
l, w, h = bbox_size.tolist() |
|
|
x, y, z = bbox_center_lidar[:3, 0] |
|
|
box_7dof = np.array([float(x), float(y), float(z)] + [l, w, h] + [yaw]) |
|
|
|
|
|
|
|
|
anno["traffic_light"]["gt_boxes"][bbox_idx] = box_7dof |
|
|
anno["traffic_light"]["bbox_ego_matrix"][bbox_idx] = bbox2global |
|
|
anno["traffic_light"]["gt_names"][bbox_idx] = bbox_state |
|
|
|
|
|
return anno |
|
|
|
|
|
|
|
|
def parse_stop_sign_anno(anno: Dict, scene_root: str, frame: str) -> Dict: |
|
|
"""Parse annotation of traffic light and stop sign and add to dictionary""" |
|
|
|
|
|
|
|
|
stop_sign_path = f"{scene_root}/metadata/stop_sign/{frame}.pkl" |
|
|
with open(stop_sign_path, "rb") as f: |
|
|
stop_sign_data: Dict = pkl.load(f) |
|
|
|
|
|
|
|
|
anno["stop_sign"] = { |
|
|
"hazard": stop_sign_data["hazard"], |
|
|
"in_range": stop_sign_data["in_range"], |
|
|
} |
|
|
|
|
|
|
|
|
num_obj: int = stop_sign_data["location"].shape[1] |
|
|
anno["stop_sign"].update( |
|
|
{ |
|
|
"gt_boxes": np.zeros((num_obj, 7), dtype="float32"), |
|
|
"gt_names": np.zeros((num_obj,), dtype=object), |
|
|
"attr_labels": np.asarray([[] for _ in range(num_obj)]), |
|
|
"valid_flag": np.ones((num_obj,), dtype="bool"), |
|
|
"bbox_ego_matrix": np.zeros((num_obj, 4, 4), dtype="float32"), |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
for bbox_idx in range(num_obj): |
|
|
|
|
|
bbox_location: np.ndarray = stop_sign_data["location"][0, bbox_idx] |
|
|
bbox_rotation: np.ndarray = stop_sign_data["rotation"][0, bbox_idx] |
|
|
bbox_size: np.ndarray = stop_sign_data["size"][0, bbox_idx] |
|
|
bbox_state: str = "" |
|
|
|
|
|
|
|
|
bbox2global = calib_utils.create_transform( |
|
|
bbox_location, bbox_rotation |
|
|
) |
|
|
|
|
|
|
|
|
bbox_center = np.array([[0, 0, 0, 1]]).transpose() |
|
|
bbox_center_world: np.ndarray = bbox2global @ bbox_center |
|
|
bbox_center_ego = np.linalg.inv(anno["ego2global"]) @ bbox_center_world |
|
|
bbox_center_lidar = np.linalg.inv(anno["lidar2ego"]) @ bbox_center_ego |
|
|
|
|
|
|
|
|
bbox2lidar = np.linalg.inv(anno["lidar2global"]) @ bbox2global |
|
|
yaw = scipy.spatial.transform.Rotation.from_matrix(bbox2lidar[:3, :3]).as_euler( |
|
|
"xyz", degrees=False |
|
|
)[-1] |
|
|
|
|
|
|
|
|
l, w, h = bbox_size.tolist() |
|
|
x, y, z = bbox_center_lidar[:3, 0] |
|
|
box_7dof = np.array([float(x), float(y), float(z)] + [l, w, h] + [yaw]) |
|
|
|
|
|
|
|
|
anno["stop_sign"]["gt_boxes"][bbox_idx] = box_7dof |
|
|
anno["stop_sign"]["bbox_ego_matrix"][bbox_idx] = bbox2global |
|
|
anno["stop_sign"]["gt_names"][bbox_idx] = bbox_state |
|
|
|
|
|
return anno |
|
|
|
|
|
|
|
|
def parse_ego_sensor_calib( |
|
|
anno: Dict, scene_root: str, frame: str, vis: bool = False |
|
|
) -> Dict: |
|
|
"""Parse calibration between world, ego, lidar and camera""" |
|
|
|
|
|
save_root = "data/carla/carla_data_0414" |
|
|
|
|
|
|
|
|
planning_path = f"{scene_root}/metadata/planning/{frame}.pkl" |
|
|
with open(planning_path, "rb") as handle: |
|
|
planning_data: Dict = pkl.load(handle) |
|
|
|
|
|
|
|
|
pose_path = f"{scene_root}/metadata/ego/{frame}.pkl" |
|
|
ego2global = np.asarray(calib_utils.load_ego2world_transform(pose_path)) |
|
|
lidar2ego = np.asarray(calib_utils.lidar_to_ego_transform()) |
|
|
lidar2global: np.ndarray = ego2global @ lidar2ego |
|
|
e2g_r = ego2global[:3, :3] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
can_bus = np.zeros((18), dtype="float32") |
|
|
with open(pose_path, "rb") as handle: |
|
|
ego_data: Dict = pkl.load(handle) |
|
|
|
|
|
|
|
|
sdc_center_ego = np.array([[0, 0, 0, 1.0]]).transpose() |
|
|
sdc_center_lidar = np.linalg.inv(lidar2ego) @ sdc_center_ego |
|
|
sdc_center_global = ego2global[:3, 3] |
|
|
can_bus[:3] = sdc_center_global |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
sdc_vel_global: np.ndarray = ego_data["velocity"] |
|
|
sdc_vel_ego = sdc_vel_global @ np.linalg.inv(e2g_r).T |
|
|
sdc_vel_lidar: np.ndarray = np.linalg.inv(lidar2ego[:3, :3]) @ sdc_vel_ego |
|
|
can_bus[13:16] = sdc_vel_ego |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
sdc_acc_global: np.ndarray = ego_data["acceleration"] |
|
|
sdc_acc_global[-1] = 9.8 |
|
|
sdc_acc_ego = sdc_acc_global @ np.linalg.inv(e2g_r).T |
|
|
sdc_acc_lidar: np.ndarray = np.linalg.inv(lidar2ego[:3, :3]) @ sdc_acc_ego |
|
|
can_bus[7:10] = sdc_acc_ego |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
sdc_ang_global: np.ndarray = ego_data["angular_velocity"] |
|
|
sdc_ang_ego = sdc_ang_global @ np.linalg.inv(e2g_r).T |
|
|
sdc_ang_lidar: np.ndarray = np.linalg.inv(lidar2ego[:3, :3]) @ sdc_ang_ego |
|
|
can_bus[10:13] = sdc_ang_ego |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
rot_global_quat = Quaternion( |
|
|
object_rotation.convert_mat2qua(ego2global[:3, :3]) |
|
|
).elements |
|
|
can_bus[3:7] = rot_global_quat |
|
|
sdc_yaw_global_deg = ego_data["rotation"] |
|
|
sdc_yaw_global_rad = ego_data["rotation"] / 180 * np.pi |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
yaw_lidar = scipy.spatial.transform.Rotation.from_matrix( |
|
|
np.linalg.inv(lidar2ego)[:3, :3] |
|
|
).as_euler("xyz", degrees=False)[-1] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
can_bus[-2] = sdc_yaw_global_rad[1] |
|
|
can_bus[-1] = sdc_yaw_global_deg[1] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
sdc_size: np.ndarray = ego_data["size"] |
|
|
l, w, h = sdc_size.tolist() |
|
|
|
|
|
|
|
|
gt_sdc_bbox_lidar = np.array( |
|
|
[ |
|
|
sdc_center_lidar[0, 0], |
|
|
sdc_center_lidar[1, 0], |
|
|
sdc_center_lidar[2, 0], |
|
|
l, |
|
|
w, |
|
|
h, |
|
|
yaw_lidar, |
|
|
sdc_vel_lidar[0], |
|
|
sdc_vel_lidar[1], |
|
|
|
|
|
] |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
cam_param_all: Dict[str, Dict] = calib_utils.get_camera_param() |
|
|
cams: Dict[str, Dict] = {} |
|
|
for cam_type in cam_param_all.keys(): |
|
|
cam_intrinsic = calib_utils.get_camera_intrinsics( |
|
|
cam_param_all[cam_type] |
|
|
) |
|
|
sensor2ego = calib_utils.camera_to_ego_transform( |
|
|
cam_param_all[cam_type] |
|
|
) |
|
|
sensor2lidar = np.linalg.inv(lidar2ego) @ sensor2ego |
|
|
|
|
|
|
|
|
cam_path = f"{scene_root}/image_{cam_type}/{frame}.png" |
|
|
if "carla_1.0" in cam_path: |
|
|
cam_path = cam_path.split("carla_data_0414/")[-1] |
|
|
cam_path = os.path.join(save_root, cam_path) |
|
|
|
|
|
|
|
|
cams[cam_type] = { |
|
|
"data_path": cam_path, |
|
|
"type": cam_type, |
|
|
"sample_data_token": "fake_cam_token", |
|
|
"cam_intrinsic": cam_intrinsic, |
|
|
"sensor2ego_translation": sensor2ego[:3, 3], |
|
|
"sensor2ego_rotation": object_rotation.convert_mat2qua(sensor2ego[:3, :3]), |
|
|
"sensor2lidar_translation": sensor2lidar[:3, 3], |
|
|
|
|
|
|
|
|
|
|
|
"sensor2lidar_rotation": sensor2lidar[:3, :3], |
|
|
"timestamp": anno["timestamp"], |
|
|
"cam_param": cam_param_all[cam_type], |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
lidar_cur = f"{scene_root}/lidar/{frame}.npy" |
|
|
frame_int: int = int(float(frame)) |
|
|
pre_frame: str = f"{frame_int - 1:06d}" |
|
|
lidar_pre = f"{scene_root}/lidar/{pre_frame}.npy" |
|
|
points_cur = np.fromfile(lidar_cur, dtype=np.float32) |
|
|
points_cur = points_cur.reshape(-1, 4) |
|
|
points_pre = np.fromfile(lidar_pre, dtype=np.float32) |
|
|
points_pre = points_pre.reshape(-1, 4) |
|
|
points: np.ndarray = np.concatenate((points_cur, points_pre), axis=0) |
|
|
|
|
|
|
|
|
if vis: |
|
|
try: |
|
|
save_dir = scene_root.replace("/train/", "/vis_train_lidar/") |
|
|
except: |
|
|
save_dir = scene_root.replace("/val/", "/vis_val_lidar/") |
|
|
save_path: str = os.path.join(save_dir, frame + ".jpg") |
|
|
box_vis.vis_box_on_lidar(points_pre, save_path) |
|
|
|
|
|
|
|
|
lidar_path = f"{scene_root}/lidar_full/{frame}.npy" |
|
|
if "carla_1.0" in lidar_path: |
|
|
lidar_path = lidar_path.split("carla_data_0414/")[-1] |
|
|
lidar_path = os.path.join(save_root, lidar_path) |
|
|
if not io_utils.is_path_exists(lidar_path): |
|
|
io_utils.mkdir_if_missing(lidar_path) |
|
|
points.tofile(lidar_path) |
|
|
|
|
|
|
|
|
map_path: str = os.path.join(scene_root, "metadata/hd_map/000000.pkl") |
|
|
|
|
|
command = carla_control.roadoption2int(planning_data["local"][0][1]) |
|
|
|
|
|
|
|
|
anno.update( |
|
|
{ |
|
|
"lidar_pts": points, |
|
|
"lidar_path": lidar_path, |
|
|
"map_path": map_path, |
|
|
"sweeps": [], |
|
|
"cams": cams, |
|
|
"lidar2ego_translation": lidar2ego[:3, 3], |
|
|
"lidar2ego_rotation": object_rotation.convert_mat2qua( |
|
|
lidar2ego[:3, :3] |
|
|
), |
|
|
"lidar2ego": lidar2ego, |
|
|
"ego2global_translation": ego2global[:3, 3], |
|
|
"ego2global_rotation": object_rotation.convert_mat2qua( |
|
|
ego2global[:3, :3] |
|
|
), |
|
|
"ego2global": ego2global, |
|
|
"lidar2global": lidar2global, |
|
|
"sdc_vel_global": sdc_vel_global, |
|
|
|
|
|
|
|
|
|
|
|
"can_bus": can_bus, |
|
|
|
|
|
"gt_sdc_bbox_lidar": gt_sdc_bbox_lidar, |
|
|
"global_plan_short": planning_data["local"], |
|
|
"command": command - 1, |
|
|
} |
|
|
) |
|
|
|
|
|
return anno |
|
|
|
|
|
|
|
|
def parse_bbox(anno: Dict, scene_root: str, frame: str, vis: bool = False) -> Dict: |
|
|
"""Parse each one of the bounding boxes and compute properties""" |
|
|
|
|
|
|
|
|
bbox_path = f"{scene_root}/metadata/detection/{frame}.pkl" |
|
|
with open(bbox_path, "rb") as f: |
|
|
bboxes: Dict = pkl.load(f) |
|
|
tracking_path = f"{scene_root}/metadata/tracking/{frame}.pkl" |
|
|
with open(tracking_path, "rb") as f: |
|
|
tracklets: Dict = pkl.load(f) |
|
|
|
|
|
|
|
|
num_obj: int = bboxes["location"].shape[1] |
|
|
anno.update( |
|
|
{ |
|
|
"gt_velocity": np.zeros((num_obj, 2), dtype="float32"), |
|
|
"gt_boxes": np.zeros((num_obj, 7), dtype="float32"), |
|
|
"gt_bboxes_global": np.zeros((num_obj, 9), dtype="float32"), |
|
|
"gt_names": np.zeros((num_obj,), dtype=object), |
|
|
"gt_inds": np.zeros((num_obj), dtype="int64"), |
|
|
"num_lidar_pts": np.zeros((num_obj,), dtype="float32"), |
|
|
"num_radar_pts": np.zeros((num_obj,), dtype="float32"), |
|
|
"attr_labels": np.asarray([[] for _ in range(num_obj)]), |
|
|
"valid_flag": np.zeros((num_obj,), dtype="bool"), |
|
|
"bbox_ego_matrix": np.zeros((num_obj, 4, 4), dtype="float32"), |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
ego2global_rotation_mat: np.ndarray = pyquaternion.Quaternion( |
|
|
anno["ego2global_rotation"] |
|
|
).rotation_matrix |
|
|
lidar2ego_rotation_mat: np.ndarray = pyquaternion.Quaternion( |
|
|
anno["lidar2ego_rotation"] |
|
|
).rotation_matrix |
|
|
|
|
|
|
|
|
bbox_gt_in_ego_list: List[np.ndarray] = list() |
|
|
valid_id_list: List[np.ndarray] = list() |
|
|
for bbox_index in range(num_obj): |
|
|
|
|
|
bbox_location: np.ndarray = bboxes["location"][0, bbox_index] |
|
|
bbox_rotation: np.ndarray = bboxes["rotation"][0, bbox_index] |
|
|
bbox_size: np.ndarray = bboxes["size"][0, bbox_index] |
|
|
bbox_cls_vec = bboxes["cls"][0, bbox_index] |
|
|
|
|
|
|
|
|
bbox2global = calib_utils.create_transform( |
|
|
bbox_location, bbox_rotation |
|
|
) |
|
|
|
|
|
|
|
|
bbox_center = np.array([[0, 0, 0, 1.0]]).transpose() |
|
|
bbox_center_world: np.ndarray = bbox2global @ bbox_center |
|
|
bbox_center_ego = np.linalg.inv(anno["ego2global"]) @ bbox_center_world |
|
|
bbox_center_lidar = np.linalg.inv(anno["lidar2ego"]) @ bbox_center_ego |
|
|
|
|
|
|
|
|
global_instance_id: int = int(bboxes["id_debug"][0, bbox_index, 0]) |
|
|
tracklet_index: int = ( |
|
|
tracklets["id"][0, :, 0].tolist().index(global_instance_id) |
|
|
) |
|
|
|
|
|
|
|
|
velocity: np.ndarray = tracklets["velocity"][0, tracklet_index, -1, :] |
|
|
velocity_world = np.expand_dims(velocity, axis=1) |
|
|
velocity_ego: np.ndarray = ( |
|
|
np.linalg.inv(ego2global_rotation_mat) @ velocity_world |
|
|
) |
|
|
velocity_lidar: np.ndarray = ( |
|
|
np.linalg.inv(lidar2ego_rotation_mat) @ velocity_ego |
|
|
) |
|
|
|
|
|
|
|
|
bbox2lidar = np.linalg.inv(anno["lidar2global"]) @ bbox2global |
|
|
yaw = scipy.spatial.transform.Rotation.from_matrix(bbox2lidar[:3, :3]).as_euler( |
|
|
"xyz", degrees=False |
|
|
)[-1] |
|
|
|
|
|
|
|
|
l, w, h = bbox_size.tolist() |
|
|
x, y, z = bbox_center_lidar[:3, 0] |
|
|
box_7dof_lidar = np.array( |
|
|
[float(x), float(y), float(z)] + [l, w, h] + [yaw] |
|
|
) |
|
|
|
|
|
|
|
|
box_7dof_global = np.array( |
|
|
[ |
|
|
float(bbox_center_world[0]), |
|
|
float(bbox_center_world[1]), |
|
|
float(bbox_center_world[2]), |
|
|
] |
|
|
+ [l, w, h] |
|
|
+ [float(bbox_rotation[1]) / 180 * np.pi] |
|
|
) |
|
|
|
|
|
|
|
|
box3d: Box3D = Box3D.array2bbox_xyzlwhyaw(box_7dof_lidar) |
|
|
box_in_lidar: np.ndarray = Box3D.box2corners3d_lidar(box3d) |
|
|
pc_sub: np.ndarray = box_sensor.extract_pc_in_box3d( |
|
|
anno["lidar_pts"], box_in_lidar |
|
|
)[ |
|
|
0 |
|
|
] |
|
|
|
|
|
|
|
|
anno["num_lidar_pts"][bbox_index] = pc_sub.shape[0] |
|
|
anno["gt_boxes"][bbox_index, :] = box_7dof_lidar |
|
|
anno["gt_velocity"][bbox_index, :] = velocity_lidar[ |
|
|
:2, 0 |
|
|
] |
|
|
anno["gt_bboxes_global"][bbox_index, :7] = box_7dof_global |
|
|
anno["gt_bboxes_global"][bbox_index, 7:] = velocity_world[:2, 0] |
|
|
anno["bbox_ego_matrix"][bbox_index] = bbox2global |
|
|
anno["gt_names"][bbox_index] = carla_class.convert_vec2str(bbox_cls_vec) |
|
|
anno["gt_inds"][bbox_index] = global_instance_id |
|
|
|
|
|
|
|
|
if pc_sub.shape[0] > 0: |
|
|
anno["valid_flag"][bbox_index] = True |
|
|
|
|
|
|
|
|
if vis: |
|
|
box_in_ego: np.ndarray = box_transform.box_in_lidar_to_ego( |
|
|
box_in_lidar |
|
|
) |
|
|
bbox_gt_in_ego_list.append(box_in_ego) |
|
|
valid_id_list.append(global_instance_id) |
|
|
|
|
|
if vis: |
|
|
|
|
|
try: |
|
|
save_dir = scene_root.replace("/train/", "/vis_train_gt/") |
|
|
except: |
|
|
save_dir = scene_root.replace("/val/", "/vis_val_gt/") |
|
|
save_path: str = os.path.join(save_dir, frame + ".jpg") |
|
|
|
|
|
|
|
|
box3d: Box3D = Box3D.array2bbox_xyzlwhyaw(anno["gt_sdc_bbox_lidar"][:7]) |
|
|
sdc_box_in_lidar: np.ndarray = Box3D.box2corners3d_lidar(box3d) |
|
|
sdc_box_in_ego: np.ndarray = box_transform.box_in_lidar_to_ego( |
|
|
sdc_box_in_lidar |
|
|
) |
|
|
bbox_gt_in_ego_list.append(sdc_box_in_ego) |
|
|
valid_id_list.append(-1) |
|
|
|
|
|
|
|
|
box_vis.vis_box_on_lidar( |
|
|
anno["lidar_pts"], |
|
|
save_path, |
|
|
bbox_gt_in_ego_list=bbox_gt_in_ego_list, |
|
|
|
|
|
id_list=valid_id_list, |
|
|
) |
|
|
|
|
|
|
|
|
from pathlib import Path |
|
|
|
|
|
import matplotlib.pyplot as plt |
|
|
from mmdet3d_plugin.datasets.carla.trajdata_rasterizer import rasterize |
|
|
from trajdata import MapAPI, SceneBatch, VectorMap |
|
|
from trajdata.caching.df_cache import DataFrameCache |
|
|
from trajdata.data_structures import StateTensor |
|
|
from trajdata.dataset_specific.opendrive import parse_maps |
|
|
from trajdata.maps.vec_map_elements import (MapElement, MapElementType, |
|
|
PedCrosswalk, PedWalkway, |
|
|
Polyline, RoadArea, |
|
|
RoadLane) |
|
|
|
|
|
|
|
|
with open(anno["map_path"], "rb") as handle: |
|
|
opendrive_str = pkl.load(handle) |
|
|
|
|
|
|
|
|
trajdata_cache_dir = "/home/xweng/trajdata" |
|
|
map_api = MapAPI(trajdata_cache_dir) |
|
|
map_id = f"carla:{anno['town_string']}" |
|
|
parse_maps.init_global_vars() |
|
|
vec_map = parse_maps.parse_opendrive_map( |
|
|
map_filename=None, map_rawstring=opendrive_str, map_id=map_id |
|
|
) |
|
|
|
|
|
|
|
|
DataFrameCache.finalize_and_cache_map( |
|
|
Path(trajdata_cache_dir), vec_map, {"px_per_m": 2} |
|
|
) |
|
|
|
|
|
|
|
|
vec_map = map_api.get_map( |
|
|
map_id, |
|
|
incl_road_lanes=True, |
|
|
incl_road_areas=True, |
|
|
incl_ped_crosswalks=True, |
|
|
incl_ped_walkways=True, |
|
|
) |
|
|
|
|
|
|
|
|
map_img, polylines, raster_from_world = rasterize( |
|
|
vec_map=vec_map, |
|
|
resolution=5, |
|
|
return_tf_mat=True, |
|
|
incl_centerlines=False, |
|
|
incl_lane_edges=False, |
|
|
area_color=(255, 255, 255), |
|
|
edge_color=(255, 0, 0), |
|
|
|
|
|
anno=anno, |
|
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return anno |
|
|
|
|
|
|
|
|
def parse_data(scene_root: str, frame: str) -> Dict: |
|
|
"""Convert carla data into the collection of path needed by mmdetection3D dataloader |
|
|
|
|
|
Args: |
|
|
scene_root: root of the sequence dir |
|
|
frame: str |
|
|
""" |
|
|
|
|
|
anno = {} |
|
|
anno: Dict = parse_token(anno, scene_root, frame) |
|
|
anno: Dict = parse_ego_sensor_calib(anno, scene_root, frame) |
|
|
anno: Dict = parse_traffic_light_anno(anno, scene_root, frame) |
|
|
anno: Dict = parse_stop_sign_anno(anno, scene_root, frame) |
|
|
anno: Dict = parse_bbox(anno, scene_root, frame) |
|
|
|
|
|
return anno |
|
|
|
|
|
|
|
|
def parse_sequence( |
|
|
scene_root: str, |
|
|
annos: List[Dict], |
|
|
seq_index: List[int], |
|
|
pre_sec: int = 2, |
|
|
fut_sec: int = 6, |
|
|
vis: bool = False, |
|
|
) -> List[Dict]: |
|
|
"""Add past/future trajectory into data at each frame to allow prediction""" |
|
|
|
|
|
|
|
|
seq: str = scene_root.split("/")[-1] |
|
|
anno_seq: List[Dict] = [annos[index] for index in seq_index] |
|
|
print(f"processing: {seq}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def sort_data(data_dict: Dict): |
|
|
return data_dict["frame_idx"] |
|
|
|
|
|
anno_seq.sort(key=sort_data) |
|
|
min_frame_ID: int = anno_seq[0]["frame_idx"] |
|
|
max_frame_ID: int = anno_seq[-1]["frame_idx"] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for index in range(len(seq_index)): |
|
|
anno: Dict = anno_seq[index] |
|
|
frame_ID: int = anno["frame_idx"] |
|
|
|
|
|
|
|
|
|
|
|
scene_name: str = anno["scene_token"] |
|
|
assert scene_name == seq, "error, not the same sequence" |
|
|
|
|
|
|
|
|
|
|
|
gt_bboxes_lidar: np.ndarray = anno["gt_boxes"] |
|
|
num_obj: int = gt_bboxes_lidar.shape[0] |
|
|
obj_ids: List[int] = anno["gt_inds"].tolist() |
|
|
|
|
|
|
|
|
l, w, h = anno["gt_sdc_bbox_lidar"][3:6] |
|
|
lidar2global: np.ndarray = anno["lidar2global"] |
|
|
|
|
|
|
|
|
pre_frames: int = int(pre_sec * FPS_KEYFRAME) |
|
|
fut_frames: int = int(fut_sec * FPS_KEYFRAME) |
|
|
|
|
|
|
|
|
|
|
|
gt_fut_bbox_lidar: np.ndarray = np.zeros((num_obj, fut_frames, 9)) |
|
|
gt_pre_bbox_lidar: np.ndarray = np.zeros((num_obj, pre_frames, 9)) |
|
|
|
|
|
|
|
|
gt_fut_bbox_mask: np.ndarray = np.zeros((num_obj, fut_frames, 1)) |
|
|
gt_pre_bbox_mask: np.ndarray = np.zeros((num_obj, pre_frames, 1)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
gt_fut_bbox_lidar_all: List[np.ndarray] = [ |
|
|
[np.empty([0])] for _ in range(fut_frames) |
|
|
] |
|
|
gt_pre_bbox_lidar_all: List[np.ndarray] = [ |
|
|
[np.empty([0])] for _ in range(pre_frames) |
|
|
] |
|
|
|
|
|
|
|
|
gt_pre_bbox_sdc_lidar: np.ndarray = np.zeros((1, pre_frames, 9)) |
|
|
gt_fut_bbox_sdc_lidar: np.ndarray = np.zeros((1, fut_frames, 9)) |
|
|
gt_pre_bbox_sdc_global: np.ndarray = np.zeros((1, pre_frames, 9)) |
|
|
gt_fut_bbox_sdc_global: np.ndarray = np.zeros((1, fut_frames, 9)) |
|
|
gt_pre_bbox_sdc_mask: np.ndarray = np.zeros((1, pre_frames, 1)) |
|
|
gt_fut_bbox_sdc_mask: np.ndarray = np.zeros((1, fut_frames, 1)) |
|
|
gt_pre_command_sdc: np.ndarray = np.zeros((1, pre_frames, 1)) |
|
|
gt_fut_command_sdc: np.ndarray = np.zeros((1, fut_frames, 1)) |
|
|
|
|
|
|
|
|
|
|
|
for pre_frame_index in range(1, pre_frames + 1): |
|
|
pre_frame_ID = int(frame_ID - pre_frame_index * FRAME_INTERVAL) |
|
|
pre_frame_index_in_seq: int = index - pre_frame_index |
|
|
|
|
|
|
|
|
if pre_frame_ID < min_frame_ID: |
|
|
continue |
|
|
|
|
|
|
|
|
anno_pre_tmp: Dict = anno_seq[pre_frame_index_in_seq] |
|
|
gt_pre_bbox_global_tmp: np.ndarray = anno_pre_tmp[ |
|
|
"gt_bboxes_global" |
|
|
] |
|
|
num_obj_pre: int = gt_pre_bbox_global_tmp.shape[0] |
|
|
gt_pre_bbox_lidar_tmp = np.zeros((num_obj_pre, 9)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
loc_global = np.concatenate( |
|
|
(gt_pre_bbox_global_tmp[:, :3], np.ones((num_obj_pre, 1))), axis=1 |
|
|
).T |
|
|
loc_lidar = np.linalg.inv(lidar2global) @ loc_global |
|
|
gt_pre_bbox_lidar_tmp[:, :3] = loc_lidar[:3, :].transpose() |
|
|
|
|
|
|
|
|
gt_pre_bbox_lidar_tmp[:, 3:6] = gt_pre_bbox_global_tmp[:, 3:6] |
|
|
|
|
|
|
|
|
gt_pre_rot_global_rad = np.concatenate( |
|
|
( |
|
|
np.zeros((num_obj_pre, 1)), |
|
|
gt_pre_bbox_global_tmp[:, [6]], |
|
|
np.zeros((num_obj_pre, 1)), |
|
|
), |
|
|
axis=1, |
|
|
) |
|
|
gt_pre_rot_global_deg = gt_pre_rot_global_rad / np.pi * 180 |
|
|
for obj_index in range(num_obj_pre): |
|
|
gt_pre_trans_global = calib_utils.create_transform( |
|
|
gt_pre_bbox_global_tmp[obj_index, :3], |
|
|
gt_pre_rot_global_deg[obj_index], |
|
|
) |
|
|
gt_pre_trans_lidar = ( |
|
|
np.linalg.inv(anno["lidar2global"]) @ gt_pre_trans_global |
|
|
) |
|
|
gt_pre_yaw_lidar = scipy.spatial.transform.Rotation.from_matrix( |
|
|
gt_pre_trans_lidar[:3, :3] |
|
|
).as_euler("xyz", degrees=False)[-1] |
|
|
gt_pre_bbox_lidar_tmp[obj_index, 6] = gt_pre_yaw_lidar |
|
|
|
|
|
|
|
|
gt_pre_vel_global = np.concatenate( |
|
|
( |
|
|
gt_pre_bbox_global_tmp[:, 7:], |
|
|
np.zeros((num_obj_pre, 1)), |
|
|
), |
|
|
axis=1, |
|
|
) |
|
|
gt_pre_vel_lidar: np.ndarray = ( |
|
|
np.linalg.inv(lidar2global[:3, :3]) @ gt_pre_vel_global.T |
|
|
).T |
|
|
gt_pre_bbox_lidar_tmp[:, 7:] = gt_pre_vel_lidar[:, :2] |
|
|
|
|
|
|
|
|
gt_pre_bbox_lidar_all[pre_frame_index - 1] = gt_pre_bbox_lidar_tmp |
|
|
|
|
|
|
|
|
|
|
|
obj_ids_pre: List[int] = anno_pre_tmp["gt_inds"].tolist() |
|
|
( |
|
|
obj_ids_common, |
|
|
index_cur, |
|
|
index_past, |
|
|
) = miscell_utils.find_unique_common_from_lists(obj_ids, obj_ids_pre) |
|
|
|
|
|
|
|
|
if len(index_cur) > 0: |
|
|
gt_pre_bbox_mask[np.array(index_cur), pre_frame_index - 1] = 1 |
|
|
|
|
|
|
|
|
gt_pre_bbox_lidar[ |
|
|
np.array(index_cur), pre_frame_index - 1, : |
|
|
] = gt_pre_bbox_lidar_tmp[np.array(index_past), :] |
|
|
|
|
|
|
|
|
sdc_loc_global, _ = calib_utils.transform_matrix_to_vector( |
|
|
anno_pre_tmp["ego2global"] |
|
|
) |
|
|
sdc_loc_global = np.concatenate((sdc_loc_global, np.array([1]))) |
|
|
sdc_loc_lidar = np.linalg.inv(lidar2global) @ sdc_loc_global |
|
|
sdc_vel_global: np.ndarray = anno_pre_tmp["sdc_vel_global"] |
|
|
sdc_vel_lidar: np.ndarray = ( |
|
|
np.linalg.inv(lidar2global[:3, :3]) @ sdc_vel_global |
|
|
) |
|
|
sdc_trans_lidar = ( |
|
|
np.linalg.inv(lidar2global) @ anno_pre_tmp["ego2global"] |
|
|
) |
|
|
yaw_lidar = scipy.spatial.transform.Rotation.from_matrix( |
|
|
sdc_trans_lidar[:3, :3] |
|
|
).as_euler("xyz", degrees=False)[-1] |
|
|
yaw_global = scipy.spatial.transform.Rotation.from_matrix( |
|
|
anno_pre_tmp["ego2global"][:3, :3] |
|
|
).as_euler("xyz", degrees=False)[-1] |
|
|
gt_pre_bbox_sdc_mask[0, pre_frame_index - 1] = 1 |
|
|
|
|
|
|
|
|
gt_sdc_bbox_lidar = np.array( |
|
|
[ |
|
|
sdc_loc_lidar[0], |
|
|
sdc_loc_lidar[1], |
|
|
sdc_loc_lidar[2], |
|
|
l, |
|
|
w, |
|
|
h, |
|
|
yaw_lidar, |
|
|
sdc_vel_lidar[0], |
|
|
sdc_vel_lidar[1], |
|
|
] |
|
|
) |
|
|
gt_pre_bbox_sdc_lidar[0, pre_frame_index - 1] = gt_sdc_bbox_lidar |
|
|
gt_sdc_bbox_global = np.array( |
|
|
[ |
|
|
sdc_loc_global[0], |
|
|
sdc_loc_global[1], |
|
|
sdc_loc_global[2], |
|
|
l, |
|
|
w, |
|
|
h, |
|
|
yaw_global, |
|
|
sdc_vel_global[0], |
|
|
sdc_vel_global[1], |
|
|
] |
|
|
) |
|
|
gt_pre_bbox_sdc_global[0, pre_frame_index - 1] = gt_sdc_bbox_global |
|
|
|
|
|
|
|
|
gt_pre_command_sdc[0, pre_frame_index - 1]: int = anno_pre_tmp["command"] |
|
|
|
|
|
|
|
|
|
|
|
for fut_frame_index in range(1, fut_frames + 1): |
|
|
fut_frame_ID = int(frame_ID + fut_frame_index * FRAME_INTERVAL) |
|
|
fut_frame_index_in_seq: int = index + fut_frame_index |
|
|
|
|
|
|
|
|
if fut_frame_ID > max_frame_ID: |
|
|
continue |
|
|
|
|
|
|
|
|
anno_fut_tmp: Dict = anno_seq[fut_frame_index_in_seq] |
|
|
gt_fut_bbox_global_tmp: np.ndarray = anno_fut_tmp[ |
|
|
"gt_bboxes_global" |
|
|
] |
|
|
num_obj_fut: int = gt_fut_bbox_global_tmp.shape[0] |
|
|
gt_fut_bbox_lidar_tmp = np.zeros((num_obj_fut, 9)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
loc_global = np.concatenate( |
|
|
(gt_fut_bbox_global_tmp[:, :3], np.ones((num_obj_fut, 1))), |
|
|
axis=1, |
|
|
).T |
|
|
loc_lidar = np.linalg.inv(lidar2global) @ loc_global |
|
|
gt_fut_bbox_lidar_tmp[:, :3] = loc_lidar[:3, :].transpose() |
|
|
|
|
|
|
|
|
gt_fut_bbox_lidar_tmp[:, 3:6] = gt_fut_bbox_global_tmp[:, 3:6] |
|
|
|
|
|
|
|
|
gt_fut_rot_global_rad = np.concatenate( |
|
|
( |
|
|
np.zeros((num_obj_fut, 1)), |
|
|
gt_fut_bbox_global_tmp[:, [6]], |
|
|
np.zeros((num_obj_fut, 1)), |
|
|
), |
|
|
axis=1, |
|
|
) |
|
|
gt_fut_rot_global_deg = gt_fut_rot_global_rad / np.pi * 180 |
|
|
for obj_index in range(num_obj_fut): |
|
|
gt_trans_global_future = calib_utils.create_transform( |
|
|
gt_fut_bbox_global_tmp[obj_index, :3], |
|
|
gt_fut_rot_global_deg[obj_index], |
|
|
) |
|
|
gt_fut_trans_lidar = ( |
|
|
np.linalg.inv(anno["lidar2global"]) @ gt_trans_global_future |
|
|
) |
|
|
gt_fut_yaw_lidar = scipy.spatial.transform.Rotation.from_matrix( |
|
|
gt_fut_trans_lidar[:3, :3] |
|
|
).as_euler("xyz", degrees=False)[-1] |
|
|
gt_fut_bbox_lidar_tmp[obj_index, 6] = gt_fut_yaw_lidar |
|
|
|
|
|
|
|
|
gt_fut_vel_global = np.concatenate( |
|
|
( |
|
|
gt_fut_bbox_global_tmp[:, 7:], |
|
|
np.zeros((num_obj_fut, 1)), |
|
|
), |
|
|
axis=1, |
|
|
) |
|
|
gt_fut_vel_lidar: np.ndarray = ( |
|
|
np.linalg.inv(lidar2global[:3, :3]) @ gt_fut_vel_global.T |
|
|
).T |
|
|
gt_fut_bbox_lidar_tmp[:, 7:] = gt_fut_vel_lidar[:, :2] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
gt_fut_bbox_lidar_all[fut_frame_index - 1] = gt_fut_bbox_lidar_tmp |
|
|
|
|
|
|
|
|
|
|
|
obj_ids_fut: List[int] = anno_fut_tmp["gt_inds"].tolist() |
|
|
( |
|
|
obj_ids_common, |
|
|
index_cur, |
|
|
index_fut, |
|
|
) = miscell_utils.find_unique_common_from_lists(obj_ids, obj_ids_fut) |
|
|
|
|
|
|
|
|
if len(index_cur) > 0: |
|
|
gt_fut_bbox_mask[np.array(index_cur), fut_frame_index - 1] = 1 |
|
|
|
|
|
gt_fut_bbox_lidar[ |
|
|
np.array(index_cur), fut_frame_index - 1, : |
|
|
] = gt_fut_bbox_lidar_tmp[np.array(index_fut), :] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
sdc_loc_global, _ = calib_utils.transform_matrix_to_vector( |
|
|
anno_fut_tmp["ego2global"] |
|
|
) |
|
|
sdc_loc_global = np.concatenate((sdc_loc_global, np.array([1]))) |
|
|
sdc_loc_lidar = np.linalg.inv(lidar2global) @ sdc_loc_global |
|
|
sdc_vel_global: np.ndarray = anno_fut_tmp["sdc_vel_global"] |
|
|
sdc_vel_lidar: np.ndarray = ( |
|
|
np.linalg.inv(lidar2global[:3, :3]) @ sdc_vel_global |
|
|
) |
|
|
sdc_trans_lidar = ( |
|
|
np.linalg.inv(lidar2global) @ anno_fut_tmp["ego2global"] |
|
|
) |
|
|
yaw_lidar = scipy.spatial.transform.Rotation.from_matrix( |
|
|
sdc_trans_lidar[:3, :3] |
|
|
).as_euler("xyz", degrees=False)[-1] |
|
|
yaw_global = scipy.spatial.transform.Rotation.from_matrix( |
|
|
anno_fut_tmp["ego2global"][:3, :3] |
|
|
).as_euler("xyz", degrees=False)[-1] |
|
|
gt_fut_bbox_sdc_mask[0, fut_frame_index - 1] = 1 |
|
|
|
|
|
|
|
|
gt_sdc_bbox_lidar = np.array( |
|
|
[ |
|
|
sdc_loc_lidar[0], |
|
|
sdc_loc_lidar[1], |
|
|
sdc_loc_lidar[2], |
|
|
l, |
|
|
w, |
|
|
h, |
|
|
yaw_lidar, |
|
|
sdc_vel_lidar[0], |
|
|
sdc_vel_lidar[1], |
|
|
] |
|
|
) |
|
|
gt_fut_bbox_sdc_lidar[0, fut_frame_index - 1] = gt_sdc_bbox_lidar |
|
|
gt_sdc_bbox_global = np.array( |
|
|
[ |
|
|
sdc_loc_global[0], |
|
|
sdc_loc_global[1], |
|
|
sdc_loc_global[2], |
|
|
l, |
|
|
w, |
|
|
h, |
|
|
yaw_global, |
|
|
sdc_vel_global[0], |
|
|
sdc_vel_global[1], |
|
|
] |
|
|
) |
|
|
gt_fut_bbox_sdc_global[0, fut_frame_index - 1] = gt_sdc_bbox_global |
|
|
|
|
|
|
|
|
gt_fut_command_sdc[0, fut_frame_index - 1]: int = anno_fut_tmp["command"] |
|
|
|
|
|
|
|
|
gt_pre_bbox_lidar = np.flip(gt_pre_bbox_lidar, axis=1) |
|
|
gt_pre_bbox_mask = np.flip(gt_pre_bbox_mask, axis=1) |
|
|
gt_pre_bbox_sdc_lidar = np.flip(gt_pre_bbox_sdc_lidar, axis=1) |
|
|
gt_pre_bbox_sdc_global = np.flip(gt_pre_bbox_sdc_global, axis=1) |
|
|
gt_pre_bbox_sdc_mask = np.flip(gt_pre_bbox_sdc_mask, axis=1) |
|
|
gt_pre_command_sdc = np.flip(gt_pre_command_sdc, axis=1) |
|
|
|
|
|
|
|
|
|
|
|
anno["gt_pre_bbox_lidar"] = gt_pre_bbox_lidar |
|
|
anno["gt_fut_bbox_lidar"] = gt_fut_bbox_lidar |
|
|
anno["gt_pre_bbox_mask"] = gt_pre_bbox_mask |
|
|
anno["gt_fut_bbox_mask"] = gt_fut_bbox_mask |
|
|
anno["gt_pre_bbox_sdc_lidar"] = gt_pre_bbox_sdc_lidar |
|
|
anno["gt_fut_bbox_sdc_lidar"] = gt_fut_bbox_sdc_lidar |
|
|
anno["gt_pre_bbox_sdc_global"] = gt_pre_bbox_sdc_global |
|
|
anno["gt_fut_bbox_sdc_global"] = gt_fut_bbox_sdc_global |
|
|
anno["gt_pre_bbox_sdc_mask"] = gt_pre_bbox_sdc_mask |
|
|
anno["gt_fut_bbox_sdc_mask"] = gt_fut_bbox_sdc_mask |
|
|
anno["gt_pre_command_sdc"] = gt_pre_command_sdc |
|
|
anno["gt_fut_command_sdc"] = gt_fut_command_sdc |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
gt_sdc_bbox_lidar_all = np.concatenate( |
|
|
( |
|
|
gt_pre_bbox_sdc_lidar[0], |
|
|
anno["gt_sdc_bbox_lidar"].reshape((1, -1)), |
|
|
gt_fut_bbox_sdc_lidar[0], |
|
|
), |
|
|
axis=0, |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if vis: |
|
|
|
|
|
try: |
|
|
save_dir = scene_root.replace("/train/", "/vis_train_gt/") |
|
|
except: |
|
|
save_dir = scene_root.replace("/val/", "/vis_val_gt/") |
|
|
save_path: str = os.path.join(save_dir, "%06d.jpg" % frame_ID) |
|
|
|
|
|
|
|
|
bbox_gt_in_ego_list: List[np.ndarray] = list() |
|
|
for bbox_index in range(num_obj): |
|
|
box_7dof_lidar: np.ndarray = anno["gt_boxes"][bbox_index, :] |
|
|
box3d: Box3D = Box3D.array2bbox_xyzlwhyaw(box_7dof_lidar) |
|
|
box_in_lidar: np.ndarray = Box3D.box2corners3d_lidar(box3d) |
|
|
box_in_ego: np.ndarray = box_transform.box_in_lidar_to_ego( |
|
|
box_in_lidar |
|
|
) |
|
|
bbox_gt_in_ego_list.append(box_in_ego) |
|
|
|
|
|
|
|
|
box3d: Box3D = Box3D.array2bbox_xyzlwhyaw(anno["gt_sdc_bbox_lidar"][:7]) |
|
|
sdc_box_in_lidar: np.ndarray = Box3D.box2corners3d_lidar(box3d) |
|
|
sdc_box_in_ego: np.ndarray = box_transform.box_in_lidar_to_ego( |
|
|
sdc_box_in_lidar |
|
|
) |
|
|
bbox_gt_in_ego_list.append(sdc_box_in_ego) |
|
|
|
|
|
|
|
|
bbox_fut_lidar_all = np.concatenate( |
|
|
(anno["gt_fut_bbox_lidar"], anno["gt_fut_bbox_sdc_lidar"]), axis=0 |
|
|
) |
|
|
bbox_fut_mask_all = np.concatenate( |
|
|
(anno["gt_fut_bbox_mask"], anno["gt_fut_bbox_sdc_mask"]), axis=0 |
|
|
) |
|
|
bbox_pre_lidar_all = np.concatenate( |
|
|
(anno["gt_pre_bbox_lidar"], anno["gt_pre_bbox_sdc_lidar"]), axis=0 |
|
|
) |
|
|
bbox_pre_mask_all = np.concatenate( |
|
|
(anno["gt_pre_bbox_mask"], anno["gt_pre_bbox_sdc_mask"]), axis=0 |
|
|
) |
|
|
|
|
|
|
|
|
box_vis.vis_box_on_lidar( |
|
|
anno["lidar_pts"], |
|
|
save_path, |
|
|
bbox_gt_in_ego_list=bbox_gt_in_ego_list, |
|
|
bev=False, |
|
|
id_list=obj_ids + [-1], |
|
|
fut_traj=bbox_fut_lidar_all, |
|
|
fut_traj_mask=bbox_fut_mask_all, |
|
|
pre_traj=bbox_pre_lidar_all, |
|
|
pre_traj_mask=bbox_pre_mask_all, |
|
|
) |
|
|
|
|
|
|
|
|
del anno["lidar_pts"] |
|
|
|
|
|
return annos |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
parser = argparse.ArgumentParser() |
|
|
parser.add_argument( |
|
|
"--data_root", |
|
|
type=str, |
|
|
default="/mnt/hdd2/datasets/carla_1.0/carla_data_0414", |
|
|
|
|
|
help="root directory of raw carla data", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--output_dir", |
|
|
type=str, |
|
|
default="/mnt/hdd2/datasets/carla_1.0/carla_data_0414", |
|
|
|
|
|
help="path of output file", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--mini", |
|
|
action="store_true", |
|
|
help="generate mini pkl file", |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
parser.add_argument( |
|
|
"--tiny", |
|
|
action="store_true", |
|
|
help="for debugging purpose", |
|
|
) |
|
|
parser.add_argument("--split", type=str, default="val", help="train/val") |
|
|
args = parser.parse_args() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
annos: List[Dict] = [] |
|
|
num_frames = 0 |
|
|
scene_name: str |
|
|
for scene_name in glob.glob(f"{args.data_root}/{args.split}/*"): |
|
|
seq_index: List[int] = [] |
|
|
|
|
|
for idx in glob.glob(f"{scene_name}/image_front/*.png"): |
|
|
frame: str = idx.split("/")[-1].split(".")[0] |
|
|
frame_int: int = int(float(frame)) |
|
|
|
|
|
|
|
|
if frame_int == 0: |
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
seq_index.append(num_frames) |
|
|
|
|
|
anno: Dict = parse_data(scene_name, frame) |
|
|
annos.append(anno) |
|
|
num_frames += 1 |
|
|
|
|
|
if len(annos) % 10 == 0: |
|
|
print("finished: ", len(annos)) |
|
|
|
|
|
|
|
|
|
|
|
annos: List[Dict] = parse_sequence(scene_name, annos, seq_index) |
|
|
|
|
|
|
|
|
if args.mini or args.tiny: |
|
|
break |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if args.tiny: |
|
|
annos = annos[:10] |
|
|
|
|
|
|
|
|
print(f"number of total frames after sampling is {len(annos)}") |
|
|
|
|
|
carla = {"infos": annos, "metadata": {"version": "v1.0"}} |
|
|
|
|
|
|
|
|
if args.mini: |
|
|
output_path = os.path.join( |
|
|
args.output_dir, f"data_{args.split}_nusc_format_mini.pkl" |
|
|
) |
|
|
else: |
|
|
|
|
|
|
|
|
if args.tiny: |
|
|
output_path = os.path.join( |
|
|
args.output_dir, f"data_{args.split}_nusc_format_tiny.pkl" |
|
|
) |
|
|
else: |
|
|
output_path = os.path.join( |
|
|
args.output_dir, f"data_{args.split}_nusc_format.pkl" |
|
|
) |
|
|
with open(output_path, "wb") as f: |
|
|
pkl.dump(carla, f) |
|
|
|