import numpy as np from nuscenes.map_expansion.map_api import NuScenesMap, NuScenesMapExplorer from nuscenes.eval.common.utils import quaternion_yaw, Quaternion from shapely import affinity, ops from shapely.geometry import LineString, box, MultiPolygon, MultiLineString CLASS2LABEL = { "road_divider": 0, "lane_divider": 0, "ped_crossing": 1, "contours": 2, "others": -1, } class VectorizedLocalMap(object): def __init__( self, dataroot, patch_size, canvas_size, line_classes=["road_divider", "lane_divider"], ped_crossing_classes=["ped_crossing"], contour_classes=["road_segment", "lane"], sample_dist=1, num_samples=250, padding=False, normalize=False, fixed_num=-1, ): """ Args: fixed_num = -1 : no fixed num """ super().__init__() self.data_root = dataroot self.MAPS = [ "boston-seaport", "singapore-hollandvillage", "singapore-onenorth", "singapore-queenstown", ] self.line_classes = line_classes self.ped_crossing_classes = ped_crossing_classes self.polygon_classes = contour_classes self.nusc_maps = {} self.map_explorer = {} for loc in self.MAPS: self.nusc_maps[loc] = NuScenesMap(dataroot=self.data_root, map_name=loc) self.map_explorer[loc] = NuScenesMapExplorer(self.nusc_maps[loc]) self.patch_size = patch_size self.canvas_size = canvas_size self.sample_dist = sample_dist self.num_samples = num_samples self.padding = padding self.normalize = normalize self.fixed_num = fixed_num def gen_vectorized_samples( self, location, ego2global_translation, ego2global_rotation ): map_pose = ego2global_translation[:2] rotation = Quaternion(ego2global_rotation) patch_box = (map_pose[0], map_pose[1], self.patch_size[0], self.patch_size[1]) patch_angle = quaternion_yaw(rotation) / np.pi * 180 line_geom = self.get_map_geom( patch_box, patch_angle, self.line_classes, location ) line_vector_dict = self.line_geoms_to_vectors(line_geom) # print(len(line_vector_dict)) 2 ped_geom = self.get_map_geom( patch_box, patch_angle, self.ped_crossing_classes, location ) # ped_vector_list = self.ped_geoms_to_vectors(ped_geom) ped_vector_list = self.line_geoms_to_vectors(ped_geom)["ped_crossing"] # print(len(ped_vector_list)) 11 polygon_geom = self.get_map_geom( patch_box, patch_angle, self.polygon_classes, location ) poly_bound_list = self.poly_geoms_to_vectors(polygon_geom) # print(len(poly_bound_list)) 4 # zxc vectors = [] for line_type, vects in line_vector_dict.items(): for line, length in vects: vectors.append( (line.astype(float), length, CLASS2LABEL.get(line_type, -1)) ) for ped_line, length in ped_vector_list: vectors.append( (ped_line.astype(float), length, CLASS2LABEL.get("ped_crossing", -1)) ) # print(ped_line) # print(length) # zxc for contour, length in poly_bound_list: vectors.append( (contour.astype(float), length, CLASS2LABEL.get("contours", -1)) ) # filter out -1 filtered_vectors = [] for pts, pts_num, type in vectors: if type != -1: filtered_vectors.append({"pts": pts, "pts_num": pts_num, "type": type}) return filtered_vectors def get_map_geom(self, patch_box, patch_angle, layer_names, location): map_geom = [] for layer_name in layer_names: if layer_name in self.line_classes: geoms = self.map_explorer[location]._get_layer_line( patch_box, patch_angle, layer_name ) map_geom.append((layer_name, geoms)) elif layer_name in self.polygon_classes: geoms = self.map_explorer[location]._get_layer_polygon( patch_box, patch_angle, layer_name ) map_geom.append((layer_name, geoms)) elif layer_name in self.ped_crossing_classes: geoms = self.get_ped_crossing_line(patch_box, patch_angle, location) # geoms = self.map_explorer[location]._get_layer_polygon(patch_box, patch_angle, layer_name) map_geom.append((layer_name, geoms)) return map_geom def _one_type_line_geom_to_vectors(self, line_geom): line_vectors = [] for line in line_geom: if not line.is_empty: if line.geom_type == "MultiLineString": for single_line in line.geoms: line_vectors.append(self.sample_pts_from_line(single_line)) elif line.geom_type == "LineString": line_vectors.append(self.sample_pts_from_line(line)) else: raise NotImplementedError return line_vectors def poly_geoms_to_vectors(self, polygon_geom): roads = polygon_geom[0][1] lanes = polygon_geom[1][1] union_roads = ops.unary_union(roads) union_lanes = ops.unary_union(lanes) union_segments = ops.unary_union([union_roads, union_lanes]) max_x = self.patch_size[1] / 2 max_y = self.patch_size[0] / 2 local_patch = box(-max_x + 0.2, -max_y + 0.2, max_x - 0.2, max_y - 0.2) exteriors = [] interiors = [] if union_segments.geom_type != "MultiPolygon": union_segments = MultiPolygon([union_segments]) for poly in union_segments.geoms: exteriors.append(poly.exterior) for inter in poly.interiors: interiors.append(inter) results = [] for ext in exteriors: if ext.is_ccw: ext.coords = list(ext.coords)[::-1] lines = ext.intersection(local_patch) if isinstance(lines, MultiLineString): lines = ops.linemerge(lines) results.append(lines) for inter in interiors: if not inter.is_ccw: inter.coords = list(inter.coords)[::-1] lines = inter.intersection(local_patch) if isinstance(lines, MultiLineString): lines = ops.linemerge(lines) results.append(lines) return self._one_type_line_geom_to_vectors(results) def line_geoms_to_vectors(self, line_geom): line_vectors_dict = dict() for line_type, a_type_of_lines in line_geom: one_type_vectors = self._one_type_line_geom_to_vectors(a_type_of_lines) line_vectors_dict[line_type] = one_type_vectors return line_vectors_dict def ped_geoms_to_vectors(self, ped_geom): ped_geom = ped_geom[0][1] union_ped = ops.unary_union(ped_geom) if union_ped.geom_type != "MultiPolygon": union_ped = MultiPolygon([union_ped]) max_x = self.patch_size[1] / 2 max_y = self.patch_size[0] / 2 local_patch = box(-max_x + 0.2, -max_y + 0.2, max_x - 0.2, max_y - 0.2) results = [] for ped_poly in union_ped: # rect = ped_poly.minimum_rotated_rectangle ext = ped_poly.exterior if not ext.is_ccw: ext.coords = list(ext.coords)[::-1] lines = ext.intersection(local_patch) results.append(lines) return self._one_type_line_geom_to_vectors(results) def get_ped_crossing_line(self, patch_box, patch_angle, location): def add_line(poly_xy, idx, patch, patch_angle, patch_x, patch_y, line_list): points = [ (p0, p1) for p0, p1 in zip(poly_xy[0, idx : idx + 2], poly_xy[1, idx : idx + 2]) ] line = LineString(points) line = line.intersection(patch) if not line.is_empty: line = affinity.rotate( line, -patch_angle, origin=(patch_x, patch_y), use_radians=False ) line = affinity.affine_transform( line, [1.0, 0.0, 0.0, 1.0, -patch_x, -patch_y] ) line_list.append(line) patch_x = patch_box[0] patch_y = patch_box[1] patch = NuScenesMapExplorer.get_patch_coord(patch_box, patch_angle) line_list = [] records = getattr(self.nusc_maps[location], "ped_crossing") for record in records: polygon = self.map_explorer[location].extract_polygon( record["polygon_token"] ) poly_xy = np.array(polygon.exterior.xy) dist = np.square(poly_xy[:, 1:] - poly_xy[:, :-1]).sum(0) x1, x2 = np.argsort(dist)[-2:] add_line(poly_xy, x1, patch, patch_angle, patch_x, patch_y, line_list) add_line(poly_xy, x2, patch, patch_angle, patch_x, patch_y, line_list) return line_list def sample_pts_from_line(self, line): if self.fixed_num < 0: distances = np.arange(0, line.length, self.sample_dist) sampled_points = np.array( [list(line.interpolate(distance).coords) for distance in distances] ).reshape(-1, 2) else: # fixed number of points, so distance is line.length / self.fixed_num distances = np.linspace(0, line.length, self.fixed_num) sampled_points = np.array( [list(line.interpolate(distance).coords) for distance in distances] ).reshape(-1, 2) if self.normalize: sampled_points = sampled_points / np.array( [self.patch_size[1], self.patch_size[0]] ) num_valid = len(sampled_points) if not self.padding or self.fixed_num > 0: # fixed num sample can return now! return sampled_points, num_valid # fixed distance sampling need padding! num_valid = len(sampled_points) if self.fixed_num < 0: if num_valid < self.num_samples: padding = np.zeros((self.num_samples - len(sampled_points), 2)) sampled_points = np.concatenate([sampled_points, padding], axis=0) else: sampled_points = sampled_points[: self.num_samples, :] num_valid = self.num_samples if self.normalize: sampled_points = sampled_points / np.array( [self.patch_size[1], self.patch_size[0]] ) num_valid = len(sampled_points) return sampled_points, num_valid