unknownuser6666's picture
Upload folder using huggingface_hub
663494c verified
import copy
import numpy as np
from pathlib import Path
from trajdata import MapAPI, VectorMap
from trajdata.caching.df_cache import DataFrameCache
from trajdata.dataset_specific.opendrive import parse_maps
from mmdet3d_plugin.datasets.carla.trajdata_rasterizer import rasterize
from PIL import Image
from trajdata.utils import map_utils
patch_size = (102.4, 102.4)
canvas_size = (1024, 1024) # 200 x 200, BEV space
def carla_init_mapping():
assert canvas_size[0] == canvas_size[1], "not square"
assert patch_size[0] == patch_size[1], "not square"
pixel_per_meter = canvas_size[0] / patch_size[0]
# load trajdata
# carla specific, map default folder in ./data/carla/maps
map_data: Dict[str, Dict] = {}
map_cache_dir = "./data"
town_name_list = [
# "Town01",
# "Town02",
# "Town03",
# "Town04",
"Town05",
# "Town06",
# "Town07",
# "Town10",
# "Town10HD",
]
# pre-load Carla HD maps, Parse map if not yet cached
map_api = MapAPI(map_cache_dir)
for town_name in town_name_list:
map_id = f"carla:{town_name}"
# Parse all carla maps, but only if no cache available
if not (Path(map_cache_dir) / f"carla/maps/{town_name}.pb").exists():
print(f"Attempt to parse CARLA map for {town_name}.")
# print ("NOTE: This might lead to timeout with CARLA simulator,
# in which case you need to run the script again.")
# try:
parse_maps.parse_maps(
maps_dir_or_file=str(
Path(os.environ["CARLA_ROOT"])
/ f"CarlaUE4/Content/Carla/Maps/OpenDrive/{town_name}.xodr"
),
trajdata_cache_dir=map_cache_dir,
)
# except OSError:
# print('map already been parsed, skip now')
# # Parse the opendrive map string after loaded
# map_id = f"carla:online"
# parse_maps.init_global_vars()
# vec_map = parse_maps.parse_opendrive_map(
# map_filename=None,
# map_rawstring=data["hd_map"]["opendrive_str"],
# map_id=map_id,
# )
# # Save the resulting map.
# DataFrameCache.finalize_and_cache_map(
# Path(self.cfg.data.trajdata_cache_dir), vec_map, {"px_per_m": 2}
# )
# re-load map with map_api
# try:
vec_map_tmp = map_api.get_map(
map_id,
incl_road_lanes=True,
incl_road_areas=True,
incl_ped_crosswalks=True,
incl_ped_walkways=True,
)
# except ValueError:
# print('\n\n\nmap_id', map_id)
# zxc
# map rasterization for the entire town in advance to save time for every timestamp
map_img, polylines, raster_from_world = rasterize(
vec_map=vec_map_tmp,
resolution=pixel_per_meter,
return_tf_mat=True, # return transformation matrix
incl_centerlines=False,
incl_lane_edges=False, # land divider
area_color=(255, 255, 255),
edge_color=(255, 0, 0),
incl_lane_area=True, # driveable area
incl_ego_location=False,
incl_ped_walkway=False, # sidewalk
incl_ped_crosswalk=False, # crosswalk
debug=False,
)
# gather outputs into dictionary
map_data[map_id] = {
"vec_map": vec_map_tmp, # VectorMap
"polylines": polylines, # List[Dict]: pts: N x 2, num_pts: int, type: int
"raster_from_world": raster_from_world, # np.ndarray
"map_img": map_img, # H x W x 3, RasterizedMap
}
return map_data
def get_carla_map_rasterize_semantic(maps, sample_token, input_dict):
# retrieve the full rasterized map
town_id = sample_token.split('Town')[-1]
town_id = town_id.split('_')[0]
map_id: str = f"carla:Town{town_id}"
map_mask = copy.copy(maps[map_id]["map_img"]) # H x W x 3
raster_from_world: np.ndarray = maps[map_id][
"raster_from_world"
] # 3 x 3
# convert RGB to binary, hacky as it is only driveable area
map_mask = np.max(map_mask, axis=2) # H x W, float, 0-1
map_mask = (map_mask * 255).astype("uint8")
map_mask = Image.fromarray(map_mask)
# crop the image to a local rasterized map
# get lidar sensor's location in the world coordinate to crop
# crop with a larger range for later rotate-crop again
lidar_world: np.ndarray = copy.deepcopy(
input_dict["l2g_t"][:, :2].numpy()
) # 1 x 2
lidar_world[0, -1] *= -1 # from left-handed to right-handed
lidar_raster: np.ndarray = map_utils.transform_points(
lidar_world, raster_from_world
)[
0
] # (2, )
# double crop two times, possibly faster? to avoid rotate a very large
# image for the map of an entire city
map_mask = map_mask.crop(
(
lidar_raster[0] - int(canvas_size[0] / 2 * 1.5),
lidar_raster[1] - int(canvas_size[1] / 2 * 1.5),
lidar_raster[0] + int(canvas_size[0] / 2 * 1.5),
lidar_raster[1] + int(canvas_size[1] / 2 * 1.5),
)
) # 300 x 300
# retrieve ego vehicle's orientation to rotate the image
yaw_angle_ego_global = copy.copy(input_dict["can_bus"][-1])
yaw_angle_ego_global = -1 * yaw_angle_ego_global # left to right-handed
map_mask = map_mask.rotate(yaw_angle_ego_global) # 300 x 300
# center-crop with the desired size
map_mask = map_mask.crop(
(
int(canvas_size[0] * 0.25),
int(canvas_size[1] * 0.25),
int(canvas_size[0] * 1.25),
int(canvas_size[1] * 1.25),
)
) # 200 x 200
map_mask = (np.array(map_mask) / 255).astype("uint8") # uint8, 0-1
map_mask = map_mask.reshape(
(1, map_mask.shape[0], map_mask.shape[1])
) # 1 x H x W
return map_mask