import argparse import glob import os from typing import Dict, List import cv2 import matplotlib import matplotlib.pyplot as plt import mmcv import numpy as np import torch from nuscenes import NuScenes from nuscenes.prediction import PredictHelper, convert_local_coords_to_global from nuscenes.utils import splits from nuscenes.utils.geometry_utils import (BoxVisibility, box_in_image, transform_matrix, view_points) from PIL import Image from pyquaternion import Quaternion from xinshuo_io import is_path_exists, mkdir_if_missing from mmdet3d_plugin.datasets.eval_utils.map_api import NuScenesMap from scripts.analysis_tools.visualize.render.bev_render import BEVRender from scripts.analysis_tools.visualize.render.cam_render import CameraRender from scripts.analysis_tools.visualize.render.carla_map import carla_init_mapping from scripts.analysis_tools.visualize.utils import (AgentPredictionData, color_mapping) class Visualizer: """ BaseRender class """ def __init__( self, dataroot="/mnt/hdd2/datasets/nuScenes", version="v1.0-mini", predroot=None, with_occ_map=False, with_map=False, with_planning=False, with_pred_box=True, with_pred_traj=False, show_gt_boxes=False, show_lidar=False, show_command=False, show_hd_map=False, show_sdc_car=False, show_sdc_traj=False, show_legend=False, ): self.version = version # determine which dataset is being used if "nuScenes" in dataroot or "nuscenes" in dataroot: self.dataset = "nusc" elif "carla" in dataroot: self.dataset = "carla" elif "openscene" in dataroot: self.dataset = "nuplan" # visualization option self.show_legend = show_legend self.show_lidar = show_lidar self.with_pred_box = with_pred_box self.with_map = with_map self.show_hd_map = show_hd_map self.with_pred_traj = with_pred_traj self.with_occ_map = with_occ_map self.show_sdc_car = show_sdc_car self.show_sdc_traj = show_sdc_traj self.with_planning = with_planning self.show_command = show_command self.bev_render = BEVRender(show_gt_boxes=show_gt_boxes) self.cam_render = CameraRender(show_gt_boxes=show_gt_boxes, dataset=self.dataset) # trajectory-related if self.dataset == "nusc": self.nusc = NuScenes(version=version, dataroot=dataroot, verbose=True) self.predict_helper = PredictHelper(self.nusc) self.veh_id_list = [0, 1, 2, 3, 4, 6, 7] elif self.dataset == "carla": self.veh_id_list = [0, 1, 2] self.nusc = None elif self.dataset == 'nuplan': self.veh_id_list = [0, 1] self.nusc = None # map-related if self.show_hd_map: if self.dataset == "nusc": self.map_data = { "boston-seaport": NuScenesMap( dataroot=dataroot, map_name="boston-seaport" ), "singapore-hollandvillage": NuScenesMap( dataroot=dataroot, map_name="singapore-hollandvillage" ), "singapore-onenorth": NuScenesMap( dataroot=dataroot, map_name="singapore-onenorth" ), "singapore-queenstown": NuScenesMap( dataroot=dataroot, map_name="singapore-queenstown" ), } elif self.dataset == "carla": self.map_data = carla_init_mapping() elif self.dataset == 'nuplan': self.map_data = None # parse result data self.use_json = ".json" in predroot self.token_set = set() self.predictions = self._parse_predictions_multitask_pkl(predroot) def get_collision_optim_seq(self): return [ "0e7ede02718341558414865d5c604745", "0dae482684ce4cd69a7258f55bc98d73", "1aa633b683174280b243e0a9a7ad9171", "112ca771e318478a88cfa692f61ffcac", "2f56eb47c64f43df8902d9f88aa8a019", "3a2d9bf6115f40898005d1c1df2b7282", "7e8ff24069ff4023ac699669b2c920de", "8180a1dbbba3479bb0c7f4ff6e9a3f0e", "acc29386502047339e1ec6b9c7e512d2", "d29527ec841045d18d04a933e7a0afd2", ] def _parse_predictions_multitask_pkl(self, predroot): col_opl_seq: List[str] = self.get_collision_optim_seq() outputs: Dict = mmcv.load(predroot) # keys are bbox_results, occ_results_computed, planning_results_computed outputs: List[Dict] = outputs["bbox_results"] self.outputs = outputs # lens are all frames in the dataset # go through results on each frame prediction_dict = dict() for k in range(len(outputs)): try: token = outputs[k]["token"] except: token = outputs[k]["sample_token"] predicted_agent_list = [] self.token_set.add(token) # # debug to only on a specific sequence # if 'scene_token' in outputs[k] and self.version != 'v1.0-mini': # scene_token = outputs[k]['scene_token'] # if scene_token not in col_opl_seq: # continue # occflow if self.with_occ_map: if "topk_query_ins_segs" in outputs[k]["occ"]: occ_map = outputs[k]["occ"]["topk_query_ins_segs"][0].cpu().numpy() else: occ_map = np.zeros((1, 5, 200, 200)) else: occ_map = None # detection/tracking/prediction if self.with_pred_box: if self.show_sdc_traj: outputs[k]["boxes_3d"].tensor = torch.cat( [ outputs[k]["boxes_3d"].tensor, outputs[k]["sdc_boxes_3d"].tensor, ], dim=0, ) outputs[k]["scores_3d"] = torch.cat( [outputs[k]["scores_3d"], outputs[k]["sdc_scores_3d"]], dim=0 ) outputs[k]["labels_3d"] = torch.cat( [ outputs[k]["labels_3d"], torch.zeros((1,), device=outputs[k]["labels_3d"].device), ], dim=0, ) # detection bboxes = outputs[k]["boxes_3d"] # N x 9 scores = outputs[k]["scores_3d"] labels = outputs[k]["labels_3d"] track_scores = scores.cpu().detach().numpy() track_labels = labels.cpu().detach().numpy() track_boxes = bboxes.tensor.cpu().detach().numpy() track_centers = bboxes.gravity_center.cpu().detach().numpy() track_dims = bboxes.dims.cpu().detach().numpy() # lwh, yx coordinate track_yaw = bboxes.yaw.cpu().detach().numpy() # # print(outputs[k].keys()) # print(bboxes.tensor.cpu().detach().numpy().shape) # # print(scores) # zxc # change lwh to wlh, xy coordinate track_dims[:, [0, 1, 2]] = track_dims[:, [1, 0, 2]] track_yaw = -track_yaw - np.pi / 2 # track if "track_ids" in outputs[k]: track_ids = outputs[k]["track_ids"].cpu().detach().numpy() else: track_ids = None # speed track_velocity = bboxes.tensor.cpu().detach().numpy()[:, -2:] # trajectories if available try: trajs = outputs[k][f"traj"].numpy() traj_scores = outputs[k][f"traj_scores"].numpy() except: trajs = [None for x in range(track_scores.shape[0])] traj_scores = [None for x in range(track_scores.shape[0])] # put objects into the list for visualization occ_idx = 0 for i in range(track_scores.shape[0]): if track_scores[i] < 0.25: continue if occ_map is not None and track_labels[i] in self.veh_id_list: occ_map_cur = occ_map[occ_idx, :, ::-1] occ_idx += 1 else: occ_map_cur = None if track_ids is not None: if i < len(track_ids): track_id = track_ids[i] else: track_id = 0 else: track_id = None # if track_labels[i] not in [0, 1, 2, 3, 4, 6, 7]: # continue predicted_agent_list.append( AgentPredictionData( track_scores[i], track_labels[i], track_centers[i], track_dims[i], track_yaw[i], track_velocity[i], trajs[i], traj_scores[i], pred_track_id=track_id, pred_occ_map=occ_map_cur, past_pred_traj=None, ) ) if self.with_map and "soft" in outputs[k]: # whole pts_bbox not stored because it takes too much of GPU memory # score_list = outputs[k]['pts_bbox']['score_list'].cpu().numpy().transpose([ # 1, 2, 0]) # H x W x 101 # predicted_map_seg = outputs[k]['pts_bbox']['lane_score'].cpu().numpy().transpose([ # 1, 2, 0]) # H x W x 3 # instead we use this score_list = outputs[k]["soft"]["drivable"] # H x W predicted_map_seg = outputs[k]["soft"]["lanes"].transpose( [1, 2, 0] ) # H x W x 3 # override the contour in the last channel with the driveable area # divider, crossing, drivable predicted_map_seg[..., -1] = score_list # do the thresholding map_thres = 0.7 predicted_map_seg = (predicted_map_seg > map_thres) * 1.0 # FLIP to match image predicted_map_seg = predicted_map_seg[::-1, :, :] else: predicted_map_seg = None if self.with_planning: labels = 0 track_labels = labels # if detection is available try: bboxes = outputs[k]["sdc_boxes_3d"] scores = outputs[k]["sdc_scores_3d"] track_scores = scores.cpu().detach().numpy() track_boxes = bboxes.tensor.cpu().detach().numpy() track_centers = bboxes.gravity_center.cpu().detach().numpy() track_dims = ( bboxes.dims.cpu().detach().numpy() ) # wlh, xy coordinate track_yaw = bboxes.yaw.cpu().detach().numpy() track_velocity = bboxes.tensor.cpu().detach().numpy()[:, -2:] only_draw_sdc_traj = False except KeyError: print( "sdc box is not available for baselines, only drawing trajectory" ) only_draw_sdc_traj = True # print(outputs[k]['planning_traj']) # print(outputs[k]['planning_traj'][0].cpu().detach().numpy()) # print(outputs[k]['planning_traj'].shape) # print(outputs[k]['planning_traj_gt'][0][0].size()) # print(outputs[k]['planning_traj_gt'][0].size()) # print(outputs[k]['planning_traj_gt'].size()) # print(outputs[k]['planning_traj_gt'][0][0][0, :, :2].cpu().detach().numpy()) # zxc # # change lwh to wlh # track_dims[:, [0, 1, 2]] = track_dims[:, [1, 0, 2]] # track_yaw = -track_yaw - np.pi / 2 # given the lwh format in the original code, # need to first rot90 into wlh and then inverse # so the overall is negative to the x coordinate, but now # we skip this due to already having the wlh format # outputs[k]['planning_traj'][0][:, 0] = - \ # outputs[k]['planning_traj'][0][:, 0] # 1 x 6 x 2, wlh format if self.show_command: command = outputs[k]["command"][0].cpu().detach().numpy() else: command = None if only_draw_sdc_traj: planning_agent = AgentPredictionData( 1, track_labels, [0, 0, 0], [2, 1, 4], 0.0, 0.0, outputs[k]["planning_traj"][0] .cpu() .detach() .numpy(), # 1 x 6 x 2 -> 6 x 2 1, pred_track_id=-1, pred_occ_map=None, past_pred_traj=None, is_sdc=True, command=command, traj_gt=outputs[k]["planning_traj_gt"][0, :, :2] .cpu() .detach() .numpy(), # 1 x 6 x 3 -> 6 x 2 ) else: planning_agent = AgentPredictionData( track_scores[0], track_labels, track_centers[0], track_dims[0], track_yaw[0], track_velocity[0], outputs[k]["planning_traj"][0].cpu().detach().numpy(), 1, pred_track_id=-1, pred_occ_map=None, past_pred_traj=None, is_sdc=True, command=command, traj_gt=outputs[k]["planning_traj_gt"][0][0][0, :, :2] .cpu() .detach() .numpy(), ) # appending ego into prediction list predicted_agent_list.append(planning_agent) else: planning_agent = None prediction_dict[token] = dict( predicted_agent_list=predicted_agent_list, predicted_map_seg=predicted_map_seg, predicted_planning=planning_agent, ) return prediction_dict def visualize_bev(self, sample_token, index, out_filename, t=None, log_name=None): self.bev_render.reset_canvas(dx=1, dy=1) self.bev_render.set_plot_cfg() if self.show_lidar: self.bev_render.show_lidar_data( sample_token, self.nusc, self.dataset, self.version, log_name=log_name, ) if self.bev_render.show_gt_boxes: self.bev_render.render_anno_data( sample_token, self.nusc, self.predict_helper ) if self.with_pred_box: self.bev_render.render_pred_box_data( self.predictions[sample_token]["predicted_agent_list"] ) if self.with_pred_traj: self.bev_render.render_pred_traj( self.predictions[sample_token]["predicted_agent_list"] ) if self.with_map: self.bev_render.render_pred_map_data( self.predictions[sample_token]["predicted_map_seg"], self.dataset ) if self.with_occ_map: self.bev_render.render_occ_map_data( self.predictions[sample_token]["predicted_agent_list"] ) if self.with_planning: try: self.bev_render.render_pred_box_data( [self.predictions[sample_token]["predicted_planning"]] ) except AttributeError: print("baseline method does not visualize box but only trajectory") self.bev_render.render_planning_data( self.predictions[sample_token]["predicted_planning"], show_command=self.show_command, dataset=self.dataset, ) if self.show_hd_map: self.bev_render.render_hd_map( self.nusc, self.map_data, sample_token, self.dataset, self.outputs[index] ) if self.show_sdc_car: self.bev_render.render_sdc_car(self.dataset) if self.show_legend: self.bev_render.render_legend() self.bev_render.save_fig(out_filename + ".jpg") def visualize_cam(self, sample_token, out_filename): self.cam_render.reset_canvas(dx=2, dy=3, tight_layout=True) if self.dataset == 'nusc': self.cam_render.render_image_data(sample_token, self.nusc) elif self.dataset == 'carla': self.cam_render.render_image_data_carla(sample_token, self.version) # baseline such as VAD does not have box results here try: self.cam_render.render_pred_track_bbox( self.predictions[sample_token]["predicted_agent_list"], sample_token, self.nusc, ) except: print("baseline does not visualize box") self.cam_render.render_pred_traj( self.predictions[sample_token]["predicted_agent_list"], sample_token, self.nusc, render_sdc=self.with_planning, ) self.cam_render.save_fig(out_filename + "_cam.jpg") def combine(self, sample_token, out_filename): # pass bev_image = cv2.imread(out_filename + ".jpg") # try: # cam_image = cv2.imread(out_filename + "_cam.jpg") # except: scene_token, frame_idstr = sample_token.split('_frame_') image_front = f'data/carla/{self.version}/val/{scene_token}/image_front/{frame_idstr}.png' image_front = cv2.imread(image_front) image_tele = f'data/carla/{self.version}/val/{scene_token}/image_tele/{frame_idstr}.png' image_tele = cv2.imread(image_tele) image_left = f'data/carla/{self.version}/val/{scene_token}/image_left/{frame_idstr}.png' image_left = cv2.imread(image_left) image_right = f'data/carla/{self.version}/val/{scene_token}/image_right/{frame_idstr}.png' image_right = cv2.imread(image_right) merge_image1 = cv2.hconcat([image_front, image_tele]) merge_image2 = cv2.hconcat([image_left, image_right]) cam_image = cv2.vconcat([merge_image1, merge_image2]) # 600 x 600 bev_image = cv2.resize(bev_image, [cam_image.shape[0], cam_image.shape[0]], interpolation = cv2.INTER_LINEAR) merge_image = cv2.hconcat([cam_image, bev_image]) cv2.imwrite(out_filename + ".jpg", merge_image) try: os.remove(out_filename + "_cam.jpg") except FileNotFoundError: print('do not delete, not found') def to_video(self, folder_path, out_path, fps=4, downsample=1): imgs_path = glob.glob(os.path.join(folder_path, "*.jpg")) imgs_path = sorted(imgs_path) img_array = [] for img_path in imgs_path: img = cv2.imread(img_path) height, width, channel = img.shape img = cv2.resize( img, (width // downsample, height // downsample), interpolation=cv2.INTER_AREA, ) height, width, channel = img.shape size = (width, height) img_array.append(img) out = cv2.VideoWriter(out_path, cv2.VideoWriter_fourcc(*"DIVX"), fps, size) for i in range(len(img_array)): out.write(img_array[i]) out.release() def main(args): if args.baseline: render_cfg = dict( with_occ_map=False, with_map=True, with_planning=True, with_pred_box=False, with_pred_traj=False, show_gt_boxes=True, show_lidar=True, show_command=True, show_hd_map=True, show_sdc_car=True, show_legend=True, show_sdc_traj=True, ) else: # vis detection # render_cfg = dict( # with_occ_map=False, # with_map=True, # with_planning=False, # with_pred_box=True, # with_pred_traj=False, # show_gt_boxes=False, # show_lidar=True, # show_command=False, # show_hd_map=False, # show_sdc_car=True, # show_legend=True, # show_sdc_traj=False, # ) # vis E2E render_cfg = dict( with_occ_map=False, with_map=True, with_planning=True, with_pred_box=True, with_pred_traj=True, show_gt_boxes=False, show_lidar=True, show_command=True, show_hd_map=False, show_sdc_car=True, show_legend=True, show_sdc_traj=True, ) # src file and pre-load the data predroot = os.path.join(args.res_root, "results.pkl") # predroot = os.path.join(args.res_root, 'results_uniad_notto.pkl') viser = Visualizer( version=args.version, predroot=predroot, dataroot=args.dataroot, **render_cfg ) # dst folder out_folder = os.path.join(args.res_root, "vis") if not os.path.exists(out_folder): os.makedirs(out_folder) # # initialize the key # frame_idx = 0 # scene_token = None # used to check if a sample falls into the validation set # if viser.dataset == "nusc": # val_splits = splits.val # scene_token_to_name = dict() # for i in range(len(viser.nusc.scene)): # scene_token_to_name[viser.nusc.scene[i]["token"]] = viser.nusc.scene[i][ # "name" # ] # print(viser.nusc.scene[i]['token']) # print(viser.nusc.scene[i]['name']) # loop through each data sample for visualization save_dir = None # for i in range(len(viser.nusc.sample)): # sample_token = viser.nusc.sample[i]['token'] # scene_token_cur = viser.nusc.sample[i]['scene_token'] for index in range(len(viser.predictions)): # print(viser.outputs[index].keys()) sample_token = viser.outputs[index]["sample_token"] scene_token = viser.outputs[index]["scene_token"] frame_idx = viser.outputs[index]["frame_idx"] log_name = viser.outputs[index]["log_name"] log_token = viser.outputs[index]["log_token"] # reset the frame id for this sequence, make the video for previous sequence # if scene_token is None or scene_token_cur != scene_token: # frame_idx = 0 # scene_token = scene_token_cur # # continue the sequence # else: # frame_idx += 1 # if viser.dataset == "nusc": # if scene_token_to_name[scene_token_cur] not in val_splits: # continue # if sample_token not in viser.token_set: # print(i, sample_token, "not in prediction pkl!") # continue # visualize the image save_dir = os.path.join(out_folder, scene_token) mkdir_if_missing(save_dir) save_path = os.path.join(save_dir, "%06d" % frame_idx) # skip if done before if is_path_exists(save_path + ".jpg") or frame_idx == 0: continue viser.visualize_bev(sample_token, index, save_path, log_name=log_name) if args.project_to_cam: viser.visualize_cam(sample_token, save_path) viser.combine(sample_token, save_path) # if save_dir is not None: # demo_video = os.path.join(save_dir, "demo.avi") # viser.to_video(save_dir, demo_video, fps=4, downsample=2) if __name__ == "__main__": parser = argparse.ArgumentParser() # parser.add_argument("--dataroot", default="data/carla", help="Path to nuScenes") # parser.add_argument("--version", default="carla_data_0414", help="dataset version") # parser.add_argument( # "--res_root", # default="/mnt/hdd/models/UniAD/stage2_e2e/017_carla_closedloop/sandbox_20240409131905_carla_manualresume/test/Tue_Apr_23_19_01_18_2024", # help="Path to result folder", # ) parser.add_argument("--dataroot", default="data/openscene-v1.1", help="Path to nuPlan") parser.add_argument("--version", default="test", help="dataset version") parser.add_argument( "--res_root", default="/mnt/hdd/models/paradrive/sandbox_20240527195012_pretrainnew/test/Tue_May_28_15_24_03_2024", help="Path to result folder", ) # parser.add_argument('--dataroot', default='data/nuscenes', help='Path to nuScenes') # parser.add_argument('--version', default='v1.0-trainval', help='dataset version') # parser.add_argument('--res_root', default='/mnt/hdd/models/UniAD/stage2_e2e/019_correct_commands/e2e_nusc_slurm_r50_v2t1_paradrive_new_commands/test/Fri_May__3_21_48_35_2024', help='Path to result folder') # parser.add_argument('--res_root', default='work_dirs/stage2_e2e/base_e2e_nusc_ngc_onestage_r50_v2t1_cumsum/test/Thu_Sep_28_18_19_38_2023_val', help='Path to result folder') # parser.add_argument('--res_root', default='/mnt/hdd/models/UniAD/stage2_e2e/base_e2e_nusc_ngc_onestage_r50_v2t1_baseline', help='Path to result folder') parser.add_argument( "--project_to_cam", action="store_true", help="Project to cam (default: True)" ) parser.add_argument( "--baseline", action="store_true", help="results to be visualized are from baselines, without auxliary task outputs", ) args = parser.parse_args() main(args)