File size: 6,009 Bytes
663494c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import copy
import numpy as np
from pathlib import Path
from trajdata import MapAPI, VectorMap
from trajdata.caching.df_cache import DataFrameCache
from trajdata.dataset_specific.opendrive import parse_maps
from mmdet3d_plugin.datasets.carla.trajdata_rasterizer import rasterize
from PIL import Image
from trajdata.utils import map_utils

patch_size = (102.4, 102.4)
canvas_size = (1024, 1024)  # 200 x 200, BEV space

def carla_init_mapping():
    assert canvas_size[0] == canvas_size[1], "not square"
    assert patch_size[0] == patch_size[1], "not square"
    pixel_per_meter = canvas_size[0] / patch_size[0]
    # load trajdata

    # carla specific, map default folder in ./data/carla/maps
    map_data: Dict[str, Dict] = {}
    map_cache_dir = "./data"
    town_name_list = [
        # "Town01",
        # "Town02",
        # "Town03",
        # "Town04",
        "Town05",
        # "Town06",
        # "Town07",
        # "Town10",
        # "Town10HD",
    ]

    # pre-load Carla HD maps, Parse map if not yet cached
    map_api = MapAPI(map_cache_dir)
    for town_name in town_name_list:
        map_id = f"carla:{town_name}"

        # Parse all carla maps, but only if no cache available
        if not (Path(map_cache_dir) / f"carla/maps/{town_name}.pb").exists():
            print(f"Attempt to parse CARLA map for {town_name}.")
            # print ("NOTE: This might lead to timeout with CARLA simulator,
            # in which case you need to run the script again.")
            # try:
            parse_maps.parse_maps(
                maps_dir_or_file=str(
                    Path(os.environ["CARLA_ROOT"])
                    / f"CarlaUE4/Content/Carla/Maps/OpenDrive/{town_name}.xodr"
                ),
                trajdata_cache_dir=map_cache_dir,
            )
            # except OSError:
            #     print('map already been parsed, skip now')

        # # Parse the opendrive map string after loaded
        # map_id = f"carla:online"
        # parse_maps.init_global_vars()
        # vec_map = parse_maps.parse_opendrive_map(
        #     map_filename=None,
        #     map_rawstring=data["hd_map"]["opendrive_str"],
        #     map_id=map_id,
        # )
        # # Save the resulting map.
        # DataFrameCache.finalize_and_cache_map(
        #     Path(self.cfg.data.trajdata_cache_dir), vec_map, {"px_per_m": 2}
        # )

        # re-load map with map_api
        # try:
        vec_map_tmp = map_api.get_map(
            map_id,
            incl_road_lanes=True,
            incl_road_areas=True,
            incl_ped_crosswalks=True,
            incl_ped_walkways=True,
        )
        # except ValueError:
        #     print('\n\n\nmap_id', map_id)
        #     zxc

        # map rasterization for the entire town in advance to save time for every timestamp
        map_img, polylines, raster_from_world = rasterize(
            vec_map=vec_map_tmp,
            resolution=pixel_per_meter,
            return_tf_mat=True,  # return transformation matrix
            incl_centerlines=False,
            incl_lane_edges=False,  # land divider
            area_color=(255, 255, 255),
            edge_color=(255, 0, 0),
            incl_lane_area=True,  # driveable area
            incl_ego_location=False,
            incl_ped_walkway=False,  # sidewalk
            incl_ped_crosswalk=False,  # crosswalk
            debug=False,
        )

        # gather outputs into dictionary
        map_data[map_id] = {
            "vec_map": vec_map_tmp,  # VectorMap
            "polylines": polylines,  # List[Dict]: pts: N x 2, num_pts: int, type: int
            "raster_from_world": raster_from_world,  # np.ndarray
            "map_img": map_img,  # H x W x 3, RasterizedMap
        }

    return map_data

def get_carla_map_rasterize_semantic(maps, sample_token, input_dict):
    # retrieve the full rasterized map
    town_id = sample_token.split('Town')[-1]
    town_id = town_id.split('_')[0]
    map_id: str = f"carla:Town{town_id}"
    map_mask = copy.copy(maps[map_id]["map_img"])  # H x W x 3
    raster_from_world: np.ndarray = maps[map_id][
        "raster_from_world"
    ]  # 3 x 3

    # convert RGB to binary, hacky as it is only driveable area
    map_mask = np.max(map_mask, axis=2)  # H x W, float, 0-1
    map_mask = (map_mask * 255).astype("uint8")
    map_mask = Image.fromarray(map_mask)

    # crop the image to a local rasterized map
    # get lidar sensor's location in the world coordinate to crop
    # crop with a larger range for later rotate-crop again
    lidar_world: np.ndarray = copy.deepcopy(
        input_dict["l2g_t"][:, :2].numpy()
    )  # 1 x 2
    lidar_world[0, -1] *= -1  # from left-handed to right-handed
    lidar_raster: np.ndarray = map_utils.transform_points(
        lidar_world, raster_from_world
    )[
        0
    ]  # (2, )

    # double crop two times, possibly faster? to avoid rotate a very large
    # image for the map of an entire city
    map_mask = map_mask.crop(
        (
            lidar_raster[0] - int(canvas_size[0] / 2 * 1.5),
            lidar_raster[1] - int(canvas_size[1] / 2 * 1.5),
            lidar_raster[0] + int(canvas_size[0] / 2 * 1.5),
            lidar_raster[1] + int(canvas_size[1] / 2 * 1.5),
        )
    )  # 300 x 300

    # retrieve ego vehicle's orientation to rotate the image
    yaw_angle_ego_global = copy.copy(input_dict["can_bus"][-1])
    yaw_angle_ego_global = -1 * yaw_angle_ego_global  # left to right-handed
    map_mask = map_mask.rotate(yaw_angle_ego_global)  # 300 x 300

    # center-crop with the desired size
    map_mask = map_mask.crop(
        (
            int(canvas_size[0] * 0.25),
            int(canvas_size[1] * 0.25),
            int(canvas_size[0] * 1.25),
            int(canvas_size[1] * 1.25),
        )
    )  # 200 x 200
    map_mask = (np.array(map_mask) / 255).astype("uint8")  # uint8, 0-1
    map_mask = map_mask.reshape(
        (1, map_mask.shape[0], map_mask.shape[1])
    )  # 1 x H x W

    return map_mask