import copy import numpy as np from pathlib import Path from trajdata import MapAPI, VectorMap from trajdata.caching.df_cache import DataFrameCache from trajdata.dataset_specific.opendrive import parse_maps from mmdet3d_plugin.datasets.carla.trajdata_rasterizer import rasterize from PIL import Image from trajdata.utils import map_utils patch_size = (102.4, 102.4) canvas_size = (1024, 1024) # 200 x 200, BEV space def carla_init_mapping(): assert canvas_size[0] == canvas_size[1], "not square" assert patch_size[0] == patch_size[1], "not square" pixel_per_meter = canvas_size[0] / patch_size[0] # load trajdata # carla specific, map default folder in ./data/carla/maps map_data: Dict[str, Dict] = {} map_cache_dir = "./data" town_name_list = [ # "Town01", # "Town02", # "Town03", # "Town04", "Town05", # "Town06", # "Town07", # "Town10", # "Town10HD", ] # pre-load Carla HD maps, Parse map if not yet cached map_api = MapAPI(map_cache_dir) for town_name in town_name_list: map_id = f"carla:{town_name}" # Parse all carla maps, but only if no cache available if not (Path(map_cache_dir) / f"carla/maps/{town_name}.pb").exists(): print(f"Attempt to parse CARLA map for {town_name}.") # print ("NOTE: This might lead to timeout with CARLA simulator, # in which case you need to run the script again.") # try: parse_maps.parse_maps( maps_dir_or_file=str( Path(os.environ["CARLA_ROOT"]) / f"CarlaUE4/Content/Carla/Maps/OpenDrive/{town_name}.xodr" ), trajdata_cache_dir=map_cache_dir, ) # except OSError: # print('map already been parsed, skip now') # # Parse the opendrive map string after loaded # map_id = f"carla:online" # parse_maps.init_global_vars() # vec_map = parse_maps.parse_opendrive_map( # map_filename=None, # map_rawstring=data["hd_map"]["opendrive_str"], # map_id=map_id, # ) # # Save the resulting map. # DataFrameCache.finalize_and_cache_map( # Path(self.cfg.data.trajdata_cache_dir), vec_map, {"px_per_m": 2} # ) # re-load map with map_api # try: vec_map_tmp = map_api.get_map( map_id, incl_road_lanes=True, incl_road_areas=True, incl_ped_crosswalks=True, incl_ped_walkways=True, ) # except ValueError: # print('\n\n\nmap_id', map_id) # zxc # map rasterization for the entire town in advance to save time for every timestamp map_img, polylines, raster_from_world = rasterize( vec_map=vec_map_tmp, resolution=pixel_per_meter, return_tf_mat=True, # return transformation matrix incl_centerlines=False, incl_lane_edges=False, # land divider area_color=(255, 255, 255), edge_color=(255, 0, 0), incl_lane_area=True, # driveable area incl_ego_location=False, incl_ped_walkway=False, # sidewalk incl_ped_crosswalk=False, # crosswalk debug=False, ) # gather outputs into dictionary map_data[map_id] = { "vec_map": vec_map_tmp, # VectorMap "polylines": polylines, # List[Dict]: pts: N x 2, num_pts: int, type: int "raster_from_world": raster_from_world, # np.ndarray "map_img": map_img, # H x W x 3, RasterizedMap } return map_data def get_carla_map_rasterize_semantic(maps, sample_token, input_dict): # retrieve the full rasterized map town_id = sample_token.split('Town')[-1] town_id = town_id.split('_')[0] map_id: str = f"carla:Town{town_id}" map_mask = copy.copy(maps[map_id]["map_img"]) # H x W x 3 raster_from_world: np.ndarray = maps[map_id][ "raster_from_world" ] # 3 x 3 # convert RGB to binary, hacky as it is only driveable area map_mask = np.max(map_mask, axis=2) # H x W, float, 0-1 map_mask = (map_mask * 255).astype("uint8") map_mask = Image.fromarray(map_mask) # crop the image to a local rasterized map # get lidar sensor's location in the world coordinate to crop # crop with a larger range for later rotate-crop again lidar_world: np.ndarray = copy.deepcopy( input_dict["l2g_t"][:, :2].numpy() ) # 1 x 2 lidar_world[0, -1] *= -1 # from left-handed to right-handed lidar_raster: np.ndarray = map_utils.transform_points( lidar_world, raster_from_world )[ 0 ] # (2, ) # double crop two times, possibly faster? to avoid rotate a very large # image for the map of an entire city map_mask = map_mask.crop( ( lidar_raster[0] - int(canvas_size[0] / 2 * 1.5), lidar_raster[1] - int(canvas_size[1] / 2 * 1.5), lidar_raster[0] + int(canvas_size[0] / 2 * 1.5), lidar_raster[1] + int(canvas_size[1] / 2 * 1.5), ) ) # 300 x 300 # retrieve ego vehicle's orientation to rotate the image yaw_angle_ego_global = copy.copy(input_dict["can_bus"][-1]) yaw_angle_ego_global = -1 * yaw_angle_ego_global # left to right-handed map_mask = map_mask.rotate(yaw_angle_ego_global) # 300 x 300 # center-crop with the desired size map_mask = map_mask.crop( ( int(canvas_size[0] * 0.25), int(canvas_size[1] * 0.25), int(canvas_size[0] * 1.25), int(canvas_size[1] * 1.25), ) ) # 200 x 200 map_mask = (np.array(map_mask) / 255).astype("uint8") # uint8, 0-1 map_mask = map_mask.reshape( (1, map_mask.shape[0], map_mask.shape[1]) ) # 1 x H x W return map_mask