unknownuser6666's picture
Upload folder using huggingface_hub
663494c verified
import argparse
import glob
import os
from typing import Dict, List
import cv2
import matplotlib
import matplotlib.pyplot as plt
import mmcv
import numpy as np
import torch
from nuscenes import NuScenes
from nuscenes.prediction import PredictHelper, convert_local_coords_to_global
from nuscenes.utils import splits
from nuscenes.utils.geometry_utils import (BoxVisibility, box_in_image,
transform_matrix, view_points)
from PIL import Image
from pyquaternion import Quaternion
from xinshuo_io import is_path_exists, mkdir_if_missing
from mmdet3d_plugin.datasets.eval_utils.map_api import NuScenesMap
from scripts.analysis_tools.visualize.render.bev_render import BEVRender
from scripts.analysis_tools.visualize.render.cam_render import CameraRender
from scripts.analysis_tools.visualize.render.carla_map import carla_init_mapping
from scripts.analysis_tools.visualize.utils import (AgentPredictionData,
color_mapping)
class Visualizer:
"""
BaseRender class
"""
def __init__(
self,
dataroot="/mnt/hdd2/datasets/nuScenes",
version="v1.0-mini",
predroot=None,
with_occ_map=False,
with_map=False,
with_planning=False,
with_pred_box=True,
with_pred_traj=False,
show_gt_boxes=False,
show_lidar=False,
show_command=False,
show_hd_map=False,
show_sdc_car=False,
show_sdc_traj=False,
show_legend=False,
):
self.version = version
# determine which dataset is being used
if "nuScenes" in dataroot or "nuscenes" in dataroot:
self.dataset = "nusc"
elif "carla" in dataroot:
self.dataset = "carla"
elif "openscene" in dataroot:
self.dataset = "nuplan"
# visualization option
self.show_legend = show_legend
self.show_lidar = show_lidar
self.with_pred_box = with_pred_box
self.with_map = with_map
self.show_hd_map = show_hd_map
self.with_pred_traj = with_pred_traj
self.with_occ_map = with_occ_map
self.show_sdc_car = show_sdc_car
self.show_sdc_traj = show_sdc_traj
self.with_planning = with_planning
self.show_command = show_command
self.bev_render = BEVRender(show_gt_boxes=show_gt_boxes)
self.cam_render = CameraRender(show_gt_boxes=show_gt_boxes, dataset=self.dataset)
# trajectory-related
if self.dataset == "nusc":
self.nusc = NuScenes(version=version, dataroot=dataroot, verbose=True)
self.predict_helper = PredictHelper(self.nusc)
self.veh_id_list = [0, 1, 2, 3, 4, 6, 7]
elif self.dataset == "carla":
self.veh_id_list = [0, 1, 2]
self.nusc = None
elif self.dataset == 'nuplan':
self.veh_id_list = [0, 1]
self.nusc = None
# map-related
if self.show_hd_map:
if self.dataset == "nusc":
self.map_data = {
"boston-seaport": NuScenesMap(
dataroot=dataroot, map_name="boston-seaport"
),
"singapore-hollandvillage": NuScenesMap(
dataroot=dataroot, map_name="singapore-hollandvillage"
),
"singapore-onenorth": NuScenesMap(
dataroot=dataroot, map_name="singapore-onenorth"
),
"singapore-queenstown": NuScenesMap(
dataroot=dataroot, map_name="singapore-queenstown"
),
}
elif self.dataset == "carla":
self.map_data = carla_init_mapping()
elif self.dataset == 'nuplan':
self.map_data = None
# parse result data
self.use_json = ".json" in predroot
self.token_set = set()
self.predictions = self._parse_predictions_multitask_pkl(predroot)
def get_collision_optim_seq(self):
return [
"0e7ede02718341558414865d5c604745",
"0dae482684ce4cd69a7258f55bc98d73",
"1aa633b683174280b243e0a9a7ad9171",
"112ca771e318478a88cfa692f61ffcac",
"2f56eb47c64f43df8902d9f88aa8a019",
"3a2d9bf6115f40898005d1c1df2b7282",
"7e8ff24069ff4023ac699669b2c920de",
"8180a1dbbba3479bb0c7f4ff6e9a3f0e",
"acc29386502047339e1ec6b9c7e512d2",
"d29527ec841045d18d04a933e7a0afd2",
]
def _parse_predictions_multitask_pkl(self, predroot):
col_opl_seq: List[str] = self.get_collision_optim_seq()
outputs: Dict = mmcv.load(predroot)
# keys are bbox_results, occ_results_computed, planning_results_computed
outputs: List[Dict] = outputs["bbox_results"]
self.outputs = outputs
# lens are all frames in the dataset
# go through results on each frame
prediction_dict = dict()
for k in range(len(outputs)):
try:
token = outputs[k]["token"]
except:
token = outputs[k]["sample_token"]
predicted_agent_list = []
self.token_set.add(token)
# # debug to only on a specific sequence
# if 'scene_token' in outputs[k] and self.version != 'v1.0-mini':
# scene_token = outputs[k]['scene_token']
# if scene_token not in col_opl_seq:
# continue
# occflow
if self.with_occ_map:
if "topk_query_ins_segs" in outputs[k]["occ"]:
occ_map = outputs[k]["occ"]["topk_query_ins_segs"][0].cpu().numpy()
else:
occ_map = np.zeros((1, 5, 200, 200))
else:
occ_map = None
# detection/tracking/prediction
if self.with_pred_box:
if self.show_sdc_traj:
outputs[k]["boxes_3d"].tensor = torch.cat(
[
outputs[k]["boxes_3d"].tensor,
outputs[k]["sdc_boxes_3d"].tensor,
],
dim=0,
)
outputs[k]["scores_3d"] = torch.cat(
[outputs[k]["scores_3d"], outputs[k]["sdc_scores_3d"]], dim=0
)
outputs[k]["labels_3d"] = torch.cat(
[
outputs[k]["labels_3d"],
torch.zeros((1,), device=outputs[k]["labels_3d"].device),
],
dim=0,
)
# detection
bboxes = outputs[k]["boxes_3d"] # N x 9
scores = outputs[k]["scores_3d"]
labels = outputs[k]["labels_3d"]
track_scores = scores.cpu().detach().numpy()
track_labels = labels.cpu().detach().numpy()
track_boxes = bboxes.tensor.cpu().detach().numpy()
track_centers = bboxes.gravity_center.cpu().detach().numpy()
track_dims = bboxes.dims.cpu().detach().numpy() # lwh, yx coordinate
track_yaw = bboxes.yaw.cpu().detach().numpy()
# # print(outputs[k].keys())
# print(bboxes.tensor.cpu().detach().numpy().shape)
# # print(scores)
# zxc
# change lwh to wlh, xy coordinate
track_dims[:, [0, 1, 2]] = track_dims[:, [1, 0, 2]]
track_yaw = -track_yaw - np.pi / 2
# track
if "track_ids" in outputs[k]:
track_ids = outputs[k]["track_ids"].cpu().detach().numpy()
else:
track_ids = None
# speed
track_velocity = bboxes.tensor.cpu().detach().numpy()[:, -2:]
# trajectories if available
try:
trajs = outputs[k][f"traj"].numpy()
traj_scores = outputs[k][f"traj_scores"].numpy()
except:
trajs = [None for x in range(track_scores.shape[0])]
traj_scores = [None for x in range(track_scores.shape[0])]
# put objects into the list for visualization
occ_idx = 0
for i in range(track_scores.shape[0]):
if track_scores[i] < 0.25:
continue
if occ_map is not None and track_labels[i] in self.veh_id_list:
occ_map_cur = occ_map[occ_idx, :, ::-1]
occ_idx += 1
else:
occ_map_cur = None
if track_ids is not None:
if i < len(track_ids):
track_id = track_ids[i]
else:
track_id = 0
else:
track_id = None
# if track_labels[i] not in [0, 1, 2, 3, 4, 6, 7]:
# continue
predicted_agent_list.append(
AgentPredictionData(
track_scores[i],
track_labels[i],
track_centers[i],
track_dims[i],
track_yaw[i],
track_velocity[i],
trajs[i],
traj_scores[i],
pred_track_id=track_id,
pred_occ_map=occ_map_cur,
past_pred_traj=None,
)
)
if self.with_map and "soft" in outputs[k]:
# whole pts_bbox not stored because it takes too much of GPU memory
# score_list = outputs[k]['pts_bbox']['score_list'].cpu().numpy().transpose([
# 1, 2, 0]) # H x W x 101
# predicted_map_seg = outputs[k]['pts_bbox']['lane_score'].cpu().numpy().transpose([
# 1, 2, 0]) # H x W x 3
# instead we use this
score_list = outputs[k]["soft"]["drivable"] # H x W
predicted_map_seg = outputs[k]["soft"]["lanes"].transpose(
[1, 2, 0]
) # H x W x 3
# override the contour in the last channel with the driveable area
# divider, crossing, drivable
predicted_map_seg[..., -1] = score_list
# do the thresholding
map_thres = 0.7
predicted_map_seg = (predicted_map_seg > map_thres) * 1.0
# FLIP to match image
predicted_map_seg = predicted_map_seg[::-1, :, :]
else:
predicted_map_seg = None
if self.with_planning:
labels = 0
track_labels = labels
# if detection is available
try:
bboxes = outputs[k]["sdc_boxes_3d"]
scores = outputs[k]["sdc_scores_3d"]
track_scores = scores.cpu().detach().numpy()
track_boxes = bboxes.tensor.cpu().detach().numpy()
track_centers = bboxes.gravity_center.cpu().detach().numpy()
track_dims = (
bboxes.dims.cpu().detach().numpy()
) # wlh, xy coordinate
track_yaw = bboxes.yaw.cpu().detach().numpy()
track_velocity = bboxes.tensor.cpu().detach().numpy()[:, -2:]
only_draw_sdc_traj = False
except KeyError:
print(
"sdc box is not available for baselines, only drawing trajectory"
)
only_draw_sdc_traj = True
# print(outputs[k]['planning_traj'])
# print(outputs[k]['planning_traj'][0].cpu().detach().numpy())
# print(outputs[k]['planning_traj'].shape)
# print(outputs[k]['planning_traj_gt'][0][0].size())
# print(outputs[k]['planning_traj_gt'][0].size())
# print(outputs[k]['planning_traj_gt'].size())
# print(outputs[k]['planning_traj_gt'][0][0][0, :, :2].cpu().detach().numpy())
# zxc
# # change lwh to wlh
# track_dims[:, [0, 1, 2]] = track_dims[:, [1, 0, 2]]
# track_yaw = -track_yaw - np.pi / 2
# given the lwh format in the original code,
# need to first rot90 into wlh and then inverse
# so the overall is negative to the x coordinate, but now
# we skip this due to already having the wlh format
# outputs[k]['planning_traj'][0][:, 0] = - \
# outputs[k]['planning_traj'][0][:, 0] # 1 x 6 x 2, wlh format
if self.show_command:
command = outputs[k]["command"][0].cpu().detach().numpy()
else:
command = None
if only_draw_sdc_traj:
planning_agent = AgentPredictionData(
1,
track_labels,
[0, 0, 0],
[2, 1, 4],
0.0,
0.0,
outputs[k]["planning_traj"][0]
.cpu()
.detach()
.numpy(), # 1 x 6 x 2 -> 6 x 2
1,
pred_track_id=-1,
pred_occ_map=None,
past_pred_traj=None,
is_sdc=True,
command=command,
traj_gt=outputs[k]["planning_traj_gt"][0, :, :2]
.cpu()
.detach()
.numpy(), # 1 x 6 x 3 -> 6 x 2
)
else:
planning_agent = AgentPredictionData(
track_scores[0],
track_labels,
track_centers[0],
track_dims[0],
track_yaw[0],
track_velocity[0],
outputs[k]["planning_traj"][0].cpu().detach().numpy(),
1,
pred_track_id=-1,
pred_occ_map=None,
past_pred_traj=None,
is_sdc=True,
command=command,
traj_gt=outputs[k]["planning_traj_gt"][0][0][0, :, :2]
.cpu()
.detach()
.numpy(),
)
# appending ego into prediction list
predicted_agent_list.append(planning_agent)
else:
planning_agent = None
prediction_dict[token] = dict(
predicted_agent_list=predicted_agent_list,
predicted_map_seg=predicted_map_seg,
predicted_planning=planning_agent,
)
return prediction_dict
def visualize_bev(self, sample_token, index, out_filename, t=None, log_name=None):
self.bev_render.reset_canvas(dx=1, dy=1)
self.bev_render.set_plot_cfg()
if self.show_lidar:
self.bev_render.show_lidar_data(
sample_token, self.nusc, self.dataset, self.version, log_name=log_name,
)
if self.bev_render.show_gt_boxes:
self.bev_render.render_anno_data(
sample_token, self.nusc, self.predict_helper
)
if self.with_pred_box:
self.bev_render.render_pred_box_data(
self.predictions[sample_token]["predicted_agent_list"]
)
if self.with_pred_traj:
self.bev_render.render_pred_traj(
self.predictions[sample_token]["predicted_agent_list"]
)
if self.with_map:
self.bev_render.render_pred_map_data(
self.predictions[sample_token]["predicted_map_seg"], self.dataset
)
if self.with_occ_map:
self.bev_render.render_occ_map_data(
self.predictions[sample_token]["predicted_agent_list"]
)
if self.with_planning:
try:
self.bev_render.render_pred_box_data(
[self.predictions[sample_token]["predicted_planning"]]
)
except AttributeError:
print("baseline method does not visualize box but only trajectory")
self.bev_render.render_planning_data(
self.predictions[sample_token]["predicted_planning"],
show_command=self.show_command,
dataset=self.dataset,
)
if self.show_hd_map:
self.bev_render.render_hd_map(
self.nusc, self.map_data, sample_token, self.dataset, self.outputs[index]
)
if self.show_sdc_car:
self.bev_render.render_sdc_car(self.dataset)
if self.show_legend:
self.bev_render.render_legend()
self.bev_render.save_fig(out_filename + ".jpg")
def visualize_cam(self, sample_token, out_filename):
self.cam_render.reset_canvas(dx=2, dy=3, tight_layout=True)
if self.dataset == 'nusc':
self.cam_render.render_image_data(sample_token, self.nusc)
elif self.dataset == 'carla':
self.cam_render.render_image_data_carla(sample_token, self.version)
# baseline such as VAD does not have box results here
try:
self.cam_render.render_pred_track_bbox(
self.predictions[sample_token]["predicted_agent_list"],
sample_token,
self.nusc,
)
except:
print("baseline does not visualize box")
self.cam_render.render_pred_traj(
self.predictions[sample_token]["predicted_agent_list"],
sample_token,
self.nusc,
render_sdc=self.with_planning,
)
self.cam_render.save_fig(out_filename + "_cam.jpg")
def combine(self, sample_token, out_filename):
# pass
bev_image = cv2.imread(out_filename + ".jpg")
# try:
# cam_image = cv2.imread(out_filename + "_cam.jpg")
# except:
scene_token, frame_idstr = sample_token.split('_frame_')
image_front = f'data/carla/{self.version}/val/{scene_token}/image_front/{frame_idstr}.png'
image_front = cv2.imread(image_front)
image_tele = f'data/carla/{self.version}/val/{scene_token}/image_tele/{frame_idstr}.png'
image_tele = cv2.imread(image_tele)
image_left = f'data/carla/{self.version}/val/{scene_token}/image_left/{frame_idstr}.png'
image_left = cv2.imread(image_left)
image_right = f'data/carla/{self.version}/val/{scene_token}/image_right/{frame_idstr}.png'
image_right = cv2.imread(image_right)
merge_image1 = cv2.hconcat([image_front, image_tele])
merge_image2 = cv2.hconcat([image_left, image_right])
cam_image = cv2.vconcat([merge_image1, merge_image2]) # 600 x 600
bev_image = cv2.resize(bev_image, [cam_image.shape[0], cam_image.shape[0]], interpolation = cv2.INTER_LINEAR)
merge_image = cv2.hconcat([cam_image, bev_image])
cv2.imwrite(out_filename + ".jpg", merge_image)
try:
os.remove(out_filename + "_cam.jpg")
except FileNotFoundError:
print('do not delete, not found')
def to_video(self, folder_path, out_path, fps=4, downsample=1):
imgs_path = glob.glob(os.path.join(folder_path, "*.jpg"))
imgs_path = sorted(imgs_path)
img_array = []
for img_path in imgs_path:
img = cv2.imread(img_path)
height, width, channel = img.shape
img = cv2.resize(
img,
(width // downsample, height // downsample),
interpolation=cv2.INTER_AREA,
)
height, width, channel = img.shape
size = (width, height)
img_array.append(img)
out = cv2.VideoWriter(out_path, cv2.VideoWriter_fourcc(*"DIVX"), fps, size)
for i in range(len(img_array)):
out.write(img_array[i])
out.release()
def main(args):
if args.baseline:
render_cfg = dict(
with_occ_map=False,
with_map=True,
with_planning=True,
with_pred_box=False,
with_pred_traj=False,
show_gt_boxes=True,
show_lidar=True,
show_command=True,
show_hd_map=True,
show_sdc_car=True,
show_legend=True,
show_sdc_traj=True,
)
else:
# vis detection
# render_cfg = dict(
# with_occ_map=False,
# with_map=True,
# with_planning=False,
# with_pred_box=True,
# with_pred_traj=False,
# show_gt_boxes=False,
# show_lidar=True,
# show_command=False,
# show_hd_map=False,
# show_sdc_car=True,
# show_legend=True,
# show_sdc_traj=False,
# )
# vis E2E
render_cfg = dict(
with_occ_map=False,
with_map=True,
with_planning=True,
with_pred_box=True,
with_pred_traj=True,
show_gt_boxes=False,
show_lidar=True,
show_command=True,
show_hd_map=False,
show_sdc_car=True,
show_legend=True,
show_sdc_traj=True,
)
# src file and pre-load the data
predroot = os.path.join(args.res_root, "results.pkl")
# predroot = os.path.join(args.res_root, 'results_uniad_notto.pkl')
viser = Visualizer(
version=args.version, predroot=predroot, dataroot=args.dataroot, **render_cfg
)
# dst folder
out_folder = os.path.join(args.res_root, "vis")
if not os.path.exists(out_folder):
os.makedirs(out_folder)
# # initialize the key
# frame_idx = 0
# scene_token = None
# used to check if a sample falls into the validation set
# if viser.dataset == "nusc":
# val_splits = splits.val
# scene_token_to_name = dict()
# for i in range(len(viser.nusc.scene)):
# scene_token_to_name[viser.nusc.scene[i]["token"]] = viser.nusc.scene[i][
# "name"
# ]
# print(viser.nusc.scene[i]['token'])
# print(viser.nusc.scene[i]['name'])
# loop through each data sample for visualization
save_dir = None
# for i in range(len(viser.nusc.sample)):
# sample_token = viser.nusc.sample[i]['token']
# scene_token_cur = viser.nusc.sample[i]['scene_token']
for index in range(len(viser.predictions)):
# print(viser.outputs[index].keys())
sample_token = viser.outputs[index]["sample_token"]
scene_token = viser.outputs[index]["scene_token"]
frame_idx = viser.outputs[index]["frame_idx"]
log_name = viser.outputs[index]["log_name"]
log_token = viser.outputs[index]["log_token"]
# reset the frame id for this sequence, make the video for previous sequence
# if scene_token is None or scene_token_cur != scene_token:
# frame_idx = 0
# scene_token = scene_token_cur
# # continue the sequence
# else:
# frame_idx += 1
# if viser.dataset == "nusc":
# if scene_token_to_name[scene_token_cur] not in val_splits:
# continue
# if sample_token not in viser.token_set:
# print(i, sample_token, "not in prediction pkl!")
# continue
# visualize the image
save_dir = os.path.join(out_folder, scene_token)
mkdir_if_missing(save_dir)
save_path = os.path.join(save_dir, "%06d" % frame_idx)
# skip if done before
if is_path_exists(save_path + ".jpg") or frame_idx == 0:
continue
viser.visualize_bev(sample_token, index, save_path, log_name=log_name)
if args.project_to_cam:
viser.visualize_cam(sample_token, save_path)
viser.combine(sample_token, save_path)
# if save_dir is not None:
# demo_video = os.path.join(save_dir, "demo.avi")
# viser.to_video(save_dir, demo_video, fps=4, downsample=2)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
# parser.add_argument("--dataroot", default="data/carla", help="Path to nuScenes")
# parser.add_argument("--version", default="carla_data_0414", help="dataset version")
# parser.add_argument(
# "--res_root",
# default="/mnt/hdd/models/UniAD/stage2_e2e/017_carla_closedloop/sandbox_20240409131905_carla_manualresume/test/Tue_Apr_23_19_01_18_2024",
# help="Path to result folder",
# )
parser.add_argument("--dataroot", default="data/openscene-v1.1", help="Path to nuPlan")
parser.add_argument("--version", default="test", help="dataset version")
parser.add_argument(
"--res_root",
default="/mnt/hdd/models/paradrive/sandbox_20240527195012_pretrainnew/test/Tue_May_28_15_24_03_2024",
help="Path to result folder",
)
# parser.add_argument('--dataroot', default='data/nuscenes', help='Path to nuScenes')
# parser.add_argument('--version', default='v1.0-trainval', help='dataset version')
# parser.add_argument('--res_root', default='/mnt/hdd/models/UniAD/stage2_e2e/019_correct_commands/e2e_nusc_slurm_r50_v2t1_paradrive_new_commands/test/Fri_May__3_21_48_35_2024', help='Path to result folder')
# parser.add_argument('--res_root', default='work_dirs/stage2_e2e/base_e2e_nusc_ngc_onestage_r50_v2t1_cumsum/test/Thu_Sep_28_18_19_38_2023_val', help='Path to result folder')
# parser.add_argument('--res_root', default='/mnt/hdd/models/UniAD/stage2_e2e/base_e2e_nusc_ngc_onestage_r50_v2t1_baseline', help='Path to result folder')
parser.add_argument(
"--project_to_cam", action="store_true", help="Project to cam (default: True)"
)
parser.add_argument(
"--baseline",
action="store_true",
help="results to be visualized are from baselines, without auxliary task outputs",
)
args = parser.parse_args()
main(args)