File size: 6,009 Bytes
663494c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
import copy
import numpy as np
from pathlib import Path
from trajdata import MapAPI, VectorMap
from trajdata.caching.df_cache import DataFrameCache
from trajdata.dataset_specific.opendrive import parse_maps
from mmdet3d_plugin.datasets.carla.trajdata_rasterizer import rasterize
from PIL import Image
from trajdata.utils import map_utils
patch_size = (102.4, 102.4)
canvas_size = (1024, 1024) # 200 x 200, BEV space
def carla_init_mapping():
assert canvas_size[0] == canvas_size[1], "not square"
assert patch_size[0] == patch_size[1], "not square"
pixel_per_meter = canvas_size[0] / patch_size[0]
# load trajdata
# carla specific, map default folder in ./data/carla/maps
map_data: Dict[str, Dict] = {}
map_cache_dir = "./data"
town_name_list = [
# "Town01",
# "Town02",
# "Town03",
# "Town04",
"Town05",
# "Town06",
# "Town07",
# "Town10",
# "Town10HD",
]
# pre-load Carla HD maps, Parse map if not yet cached
map_api = MapAPI(map_cache_dir)
for town_name in town_name_list:
map_id = f"carla:{town_name}"
# Parse all carla maps, but only if no cache available
if not (Path(map_cache_dir) / f"carla/maps/{town_name}.pb").exists():
print(f"Attempt to parse CARLA map for {town_name}.")
# print ("NOTE: This might lead to timeout with CARLA simulator,
# in which case you need to run the script again.")
# try:
parse_maps.parse_maps(
maps_dir_or_file=str(
Path(os.environ["CARLA_ROOT"])
/ f"CarlaUE4/Content/Carla/Maps/OpenDrive/{town_name}.xodr"
),
trajdata_cache_dir=map_cache_dir,
)
# except OSError:
# print('map already been parsed, skip now')
# # Parse the opendrive map string after loaded
# map_id = f"carla:online"
# parse_maps.init_global_vars()
# vec_map = parse_maps.parse_opendrive_map(
# map_filename=None,
# map_rawstring=data["hd_map"]["opendrive_str"],
# map_id=map_id,
# )
# # Save the resulting map.
# DataFrameCache.finalize_and_cache_map(
# Path(self.cfg.data.trajdata_cache_dir), vec_map, {"px_per_m": 2}
# )
# re-load map with map_api
# try:
vec_map_tmp = map_api.get_map(
map_id,
incl_road_lanes=True,
incl_road_areas=True,
incl_ped_crosswalks=True,
incl_ped_walkways=True,
)
# except ValueError:
# print('\n\n\nmap_id', map_id)
# zxc
# map rasterization for the entire town in advance to save time for every timestamp
map_img, polylines, raster_from_world = rasterize(
vec_map=vec_map_tmp,
resolution=pixel_per_meter,
return_tf_mat=True, # return transformation matrix
incl_centerlines=False,
incl_lane_edges=False, # land divider
area_color=(255, 255, 255),
edge_color=(255, 0, 0),
incl_lane_area=True, # driveable area
incl_ego_location=False,
incl_ped_walkway=False, # sidewalk
incl_ped_crosswalk=False, # crosswalk
debug=False,
)
# gather outputs into dictionary
map_data[map_id] = {
"vec_map": vec_map_tmp, # VectorMap
"polylines": polylines, # List[Dict]: pts: N x 2, num_pts: int, type: int
"raster_from_world": raster_from_world, # np.ndarray
"map_img": map_img, # H x W x 3, RasterizedMap
}
return map_data
def get_carla_map_rasterize_semantic(maps, sample_token, input_dict):
# retrieve the full rasterized map
town_id = sample_token.split('Town')[-1]
town_id = town_id.split('_')[0]
map_id: str = f"carla:Town{town_id}"
map_mask = copy.copy(maps[map_id]["map_img"]) # H x W x 3
raster_from_world: np.ndarray = maps[map_id][
"raster_from_world"
] # 3 x 3
# convert RGB to binary, hacky as it is only driveable area
map_mask = np.max(map_mask, axis=2) # H x W, float, 0-1
map_mask = (map_mask * 255).astype("uint8")
map_mask = Image.fromarray(map_mask)
# crop the image to a local rasterized map
# get lidar sensor's location in the world coordinate to crop
# crop with a larger range for later rotate-crop again
lidar_world: np.ndarray = copy.deepcopy(
input_dict["l2g_t"][:, :2].numpy()
) # 1 x 2
lidar_world[0, -1] *= -1 # from left-handed to right-handed
lidar_raster: np.ndarray = map_utils.transform_points(
lidar_world, raster_from_world
)[
0
] # (2, )
# double crop two times, possibly faster? to avoid rotate a very large
# image for the map of an entire city
map_mask = map_mask.crop(
(
lidar_raster[0] - int(canvas_size[0] / 2 * 1.5),
lidar_raster[1] - int(canvas_size[1] / 2 * 1.5),
lidar_raster[0] + int(canvas_size[0] / 2 * 1.5),
lidar_raster[1] + int(canvas_size[1] / 2 * 1.5),
)
) # 300 x 300
# retrieve ego vehicle's orientation to rotate the image
yaw_angle_ego_global = copy.copy(input_dict["can_bus"][-1])
yaw_angle_ego_global = -1 * yaw_angle_ego_global # left to right-handed
map_mask = map_mask.rotate(yaw_angle_ego_global) # 300 x 300
# center-crop with the desired size
map_mask = map_mask.crop(
(
int(canvas_size[0] * 0.25),
int(canvas_size[1] * 0.25),
int(canvas_size[0] * 1.25),
int(canvas_size[1] * 1.25),
)
) # 200 x 200
map_mask = (np.array(map_mask) / 255).astype("uint8") # uint8, 0-1
map_mask = map_mask.reshape(
(1, map_mask.shape[0], map_mask.shape[1])
) # 1 x H x W
return map_mask
|