import os import numpy as np from pyquaternion import Quaternion from shapely import affinity, ops from shapely.geometry import LineString, box, MultiPolygon, MultiLineString from nuplan.common.maps.nuplan_map.map_factory import get_maps_api from nuplan.common.maps.maps_datatypes import SemanticMapLayer class VectorizedLocalMap(object): CLASS2LABEL = { SemanticMapLayer.LANE: 0, SemanticMapLayer.LANE_CONNECTOR: 0, SemanticMapLayer.CROSSWALK: 1, SemanticMapLayer.ROADBLOCK: 2, SemanticMapLayer.INTERSECTION: 2, SemanticMapLayer.CARPARK_AREA: 2, SemanticMapLayer.ROADBLOCK_CONNECTOR: 2, SemanticMapLayer.WALKWAYS: 3 } def __init__( self, map_root, map_version='nuplan-maps-v1.0', patch_size=(100, 100), # h, w map_classes={ 'centerline': [SemanticMapLayer.LANE, SemanticMapLayer.LANE_CONNECTOR], 'ped_crossing': [SemanticMapLayer.CROSSWALK], 'road_boundary': [SemanticMapLayer.ROADBLOCK, SemanticMapLayer.INTERSECTION, SemanticMapLayer.CARPARK_AREA], # 'sidewalk': [SemanticMapLayer.WALKWAYS] }, need_merged=['road_boundary'], ): super().__init__() self.map_classes = map_classes self.patch_size = patch_size self.need_merged = need_merged self.MAP_APIS_DICT = { "us-pa-pittsburgh-hazelwood" : get_maps_api(map_root, map_version, "us-pa-pittsburgh-hazelwood"), "sg-one-north" : get_maps_api(map_root, map_version, "sg-one-north"), "us-ma-boston" : get_maps_api(map_root, map_version, "us-ma-boston"), "us-nv-las-vegas-strip" : get_maps_api(map_root, map_version, "us-nv-las-vegas-strip") } def get_patch_coord(self, patch_box, patch_angle: float = 0.0): """ Convert patch_box to shapely Polygon coordinates. :param patch_box: Patch box defined as [x_center, y_center, height, width]. :param patch_angle: Patch orientation in degrees. :return: Box Polygon for patch_box. """ patch_x, patch_y, patch_h, patch_w = patch_box x_min = patch_x - patch_w / 2.0 y_min = patch_y - patch_h / 2.0 x_max = patch_x + patch_w / 2.0 y_max = patch_y + patch_h / 2.0 patch = box(x_min, y_min, x_max, y_max) patch = affinity.rotate(patch, patch_angle, origin=(patch_x, patch_y), use_radians=False) return patch def gen_vectorized_samples(self, e2g_T, e2g_R, map_location): ''' use lidar2global to get gt map layers ''' x, y = (e2g_T[0], e2g_T[1]) patch_angle = Quaternion(e2g_R).yaw_pitch_roll[0] patch_box = (x, y, self.patch_size[0], self.patch_size[1]) patch_angle = patch_angle / np.pi * 180 vectors = [] for idx, class_name in enumerate(self.map_classes): geom = self.get_map_geom(patch_box, patch_angle, self.map_classes[class_name], map_location) if class_name in self.need_merged: line_list = self.merge_polys_to_lines(geom) else: line_list = self.geoms_to_lines(geom) for line in line_list: vectors.append({ 'pts': line.astype(np.float32), 'pts_num': line.shape[0], 'type': idx }) return vectors def gen_drivable_area(self, e2g_T, e2g_R, map_location,no_transform=False): x, y = (e2g_T[0], e2g_T[1]) patch_angle = Quaternion(e2g_R).yaw_pitch_roll[0] patch_box = (x, y, self.patch_size[0], self.patch_size[1]) patch_angle = patch_angle / np.pi * 180 geom = self.get_map_geom(patch_box, patch_angle, self.map_classes['road_boundary'], map_location,no_transform) multi_polygon = self.merge_polys_to_one(geom) return multi_polygon def get_map_geom(self, patch_box, patch_angle, layer_names, map_location, no_transform=False): patch_x = patch_box[0] patch_y = patch_box[1] patch = self.get_patch_coord(patch_box, patch_angle) map_api = self.MAP_APIS_DICT[map_location] map_geom = [] for layer_name in layer_names: records = map_api._get_proximity_map_object(patch, layer_name) for record in records: if layer_name in [SemanticMapLayer.LANE, SemanticMapLayer.LANE_CONNECTOR]: line = record.baseline_path.linestring new_line = line.intersection(patch) if new_line.is_empty: continue if not no_transform: new_line = affinity.rotate(new_line, -patch_angle, origin=(patch_x, patch_y), use_radians=False) new_line = affinity.affine_transform(new_line, [1.0, 0.0, 0.0, 1.0, -patch_x, -patch_y]) map_geom.append((layer_name, new_line)) elif layer_name in [SemanticMapLayer.CROSSWALK, SemanticMapLayer.WALKWAYS, SemanticMapLayer.ROADBLOCK, SemanticMapLayer.INTERSECTION]: polygon = record.polygon if polygon.is_valid: new_polygon = polygon.intersection(patch) if new_polygon.is_empty: continue new_polygon = affinity.rotate(new_polygon, -patch_angle, origin=(patch_x, patch_y), use_radians=False) new_polygon = affinity.affine_transform(new_polygon, [1.0, 0.0, 0.0, 1.0, -patch_x, -patch_y]) if new_polygon.geom_type == 'Polygon': new_polygon = MultiPolygon([new_polygon]) map_geom.append((layer_name, new_polygon)) return map_geom def _one_type_line_geom_to_instances(self, line_geom): line_instances = [] for line in line_geom: if not line.is_empty: if line.geom_type == 'MultiLineString': for single_line in line.geoms: line_instances.append(single_line) elif line.geom_type == 'LineString': line_instances.append(line) else: print(line.geom_type) raise NotImplementedError line_instances = [np.asarray(line.coords) for line in line_instances if len(line.coords) > 1] return line_instances def geoms_to_lines(self, geoms): lines = [] for layer_name, geom in geoms: if geom.geom_type == 'MultiPolygon': lines.extend(self.poly_to_lines(geom)) else: lines.append(geom) return self._one_type_line_geom_to_instances(lines) def poly_to_lines(self, poly): max_x = self.patch_size[1] / 2 max_y = self.patch_size[0] / 2 local_patch = box(-max_x + 0.2, -max_y + 0.2, max_x - 0.2, max_y - 0.2) if poly.geom_type == 'Polygon': poly = MultiPolygon([poly]) exteriors = [] interiors = [] for p in poly.geoms: exteriors.append(p.exterior) for inter in p.interiors: interiors.append(inter) results = [] for ext in exteriors: if ext.is_ccw: ext.coords = list(ext.coords)[::-1] lines = ext.intersection(local_patch) if isinstance(lines, MultiLineString): lines = ops.linemerge(lines) results.append(lines) for inter in interiors: if not inter.is_ccw: inter.coords = list(inter.coords)[::-1] lines = inter.intersection(local_patch) if isinstance(lines, MultiLineString): lines = ops.linemerge(lines) results.append(lines) return results def merge_polys_to_lines(self, polygon_geom): roads = [poly[1] for poly in polygon_geom] exteriors = [] interiors = [] union_segments = ops.unary_union(roads) if union_segments.geom_type != 'MultiPolygon': union_segments = MultiPolygon([union_segments]) for poly in union_segments.geoms: exteriors.append(poly.exterior) for inter in poly.interiors: interiors.append(inter) max_x = self.patch_size[1] / 2 max_y = self.patch_size[0] / 2 # cut polygon to lines local_patch = box(-max_x + 0.2, -max_y + 0.2, max_x - 0.2, max_y - 0.2) results = [] for ext in exteriors: if ext.is_ccw: ext.coords = list(ext.coords)[::-1] lines = ext.intersection(local_patch) if isinstance(lines, MultiLineString): lines = ops.linemerge(lines) results.append(lines) for inter in interiors: if not inter.is_ccw: inter.coords = list(inter.coords)[::-1] lines = inter.intersection(local_patch) if isinstance(lines, MultiLineString): lines = ops.linemerge(lines) results.append(lines) return self._one_type_line_geom_to_instances(results) def merge_polys_to_one(self, polygon_geom): roads = [poly[1] for poly in polygon_geom] union_segments = ops.unary_union(roads) if union_segments.geom_type != 'MultiPolygon': union_segments = MultiPolygon([union_segments]) return union_segments