from typing import Dict, List, Tuple import numpy as np from nuplan.common.actor_state.state_representation import StateSE2 from nuplan.common.maps.abstract_map import AbstractMap from nuplan.common.maps.abstract_map_objects import RoadBlockGraphEdgeMapObject from nuplan.common.maps.maps_datatypes import SemanticMapLayer from nuplan.planning.simulation.occupancy_map.strtree_occupancy_map import STRTreeOccupancyMapFactory from navsim.planning.simulation.planner.pdm_planner.utils.graph_search.bfs_roadblock import BreadthFirstSearchRoadBlock from navsim.planning.simulation.planner.pdm_planner.utils.pdm_geometry_utils import normalize_angle def get_current_roadblock_candidates( ego_pose: StateSE2, map_api: AbstractMap, route_roadblocks_dict: Dict[str, RoadBlockGraphEdgeMapObject], heading_error_thresh: float = np.pi / 4, displacement_error_thresh: float = 3, ) -> Tuple[RoadBlockGraphEdgeMapObject, List[RoadBlockGraphEdgeMapObject]]: """ Determines a set of roadblock candidate where ego is located :param ego_pose: class containing ego position :param map_api: map object :param route_roadblocks_dict: dictionary of on-route roadblocks :param heading_error_thresh: maximum heading error, defaults to np.pi/4 :param displacement_error_thresh: maximum displacement, defaults to 3 :return: tuple of most promising roadblock and other candidates """ roadblock_candidates = [] layers = [SemanticMapLayer.ROADBLOCK, SemanticMapLayer.ROADBLOCK_CONNECTOR] roadblock_dict = map_api.get_proximal_map_objects(point=ego_pose.point, radius=1.0, layers=layers) roadblock_candidates = ( roadblock_dict[SemanticMapLayer.ROADBLOCK] + roadblock_dict[SemanticMapLayer.ROADBLOCK_CONNECTOR] ) if not roadblock_candidates: for layer in layers: roadblock_id_, distance = map_api.get_distance_to_nearest_map_object(point=ego_pose.point, layer=layer) roadblock = map_api.get_map_object(roadblock_id_, layer) if roadblock: roadblock_candidates.append(roadblock) on_route_candidates, on_route_candidate_displacement_errors = [], [] candidates, candidate_displacement_errors = [], [] roadblock_displacement_errors = [] roadblock_heading_errors = [] for idx, roadblock in enumerate(roadblock_candidates): lane_displacement_error, lane_heading_error = np.inf, np.inf for lane in roadblock.interior_edges: lane_discrete_path: List[StateSE2] = lane.baseline_path.discrete_path lane_discrete_points = np.array([state.point.array for state in lane_discrete_path], dtype=np.float64) lane_state_distances = ((lane_discrete_points - ego_pose.point.array[None, ...]) ** 2.0).sum(axis=-1) ** 0.5 argmin = np.argmin(lane_state_distances) heading_error = np.abs(normalize_angle(lane_discrete_path[argmin].heading - ego_pose.heading)) displacement_error = lane_state_distances[argmin] if displacement_error < lane_displacement_error: lane_heading_error, lane_displacement_error = ( heading_error, displacement_error, ) if heading_error < heading_error_thresh and displacement_error < displacement_error_thresh: if roadblock.id in route_roadblocks_dict.keys(): on_route_candidates.append(roadblock) on_route_candidate_displacement_errors.append(displacement_error) else: candidates.append(roadblock) candidate_displacement_errors.append(displacement_error) roadblock_displacement_errors.append(lane_displacement_error) roadblock_heading_errors.append(lane_heading_error) if on_route_candidates: # prefer on-route roadblocks return ( on_route_candidates[np.argmin(on_route_candidate_displacement_errors)], on_route_candidates, ) elif candidates: # fallback to most promising candidate return candidates[np.argmin(candidate_displacement_errors)], candidates # otherwise, just find any close roadblock return ( roadblock_candidates[np.argmin(roadblock_displacement_errors)], roadblock_candidates, ) def route_roadblock_correction( ego_pose: StateSE2, map_api: AbstractMap, route_roadblock_dict: Dict[str, RoadBlockGraphEdgeMapObject], search_depth_backward: int = 15, search_depth_forward: int = 30, ) -> List[str]: """ Applies several methods to correct route roadblocks. :param ego_pose: class containing ego position :param map_api: map object :param route_roadblocks_dict: dictionary of on-route roadblocks :param search_depth_backward: depth of forward BFS search, defaults to 15 :param search_depth_forward: depth of backward BFS search, defaults to 30 :return: list of roadblock id's of corrected route """ # TODO: Refactor code for readability starting_block, starting_block_candidates = get_current_roadblock_candidates( ego_pose, map_api, route_roadblock_dict ) starting_block_ids = [roadblock.id for roadblock in starting_block_candidates] route_roadblocks = list(route_roadblock_dict.values()) route_roadblock_ids = list(route_roadblock_dict.keys()) # Fix 1: when agent starts off-route if starting_block.id not in route_roadblock_ids: # Backward search if current roadblock not in route graph_search = BreadthFirstSearchRoadBlock(route_roadblock_ids[0], map_api, forward_search=False) (path, path_id), path_found = graph_search.search(starting_block_ids, max_depth=search_depth_backward) if path_found: route_roadblocks[:0] = path[:-1] route_roadblock_ids[:0] = path_id[:-1] else: # Forward search to any route roadblock graph_search = BreadthFirstSearchRoadBlock(starting_block.id, map_api, forward_search=True) (path, path_id), path_found = graph_search.search(route_roadblock_ids[:3], max_depth=search_depth_forward) if path_found: end_roadblock_idx = np.argmax(np.array(route_roadblock_ids) == path_id[-1]) route_roadblocks = route_roadblocks[end_roadblock_idx + 1 :] route_roadblock_ids = route_roadblock_ids[end_roadblock_idx + 1 :] route_roadblocks[:0] = path route_roadblock_ids[:0] = path_id # Fix 2: check if roadblocks are linked, search for links if not roadblocks_to_append = {} for i in range(len(route_roadblocks) - 1): next_incoming_block_ids = [_roadblock.id for _roadblock in route_roadblocks[i + 1].incoming_edges] is_incoming = route_roadblock_ids[i] in next_incoming_block_ids if is_incoming: continue graph_search = BreadthFirstSearchRoadBlock(route_roadblock_ids[i], map_api, forward_search=True) (path, path_id), path_found = graph_search.search(route_roadblock_ids[i + 1], max_depth=search_depth_forward) if path_found and path and len(path) >= 3: path, path_id = path[1:-1], path_id[1:-1] roadblocks_to_append[i] = (path, path_id) # append missing intermediate roadblocks offset = 1 for i, (path, path_id) in roadblocks_to_append.items(): route_roadblocks[i + offset : i + offset] = path route_roadblock_ids[i + offset : i + offset] = path_id offset += len(path) # Fix 3: cut route-loops route_roadblocks, route_roadblock_ids = remove_route_loops(route_roadblocks, route_roadblock_ids) return route_roadblock_ids def remove_route_loops( route_roadblocks: List[RoadBlockGraphEdgeMapObject], route_roadblock_ids: List[str], ) -> Tuple[List[str], List[RoadBlockGraphEdgeMapObject]]: """ Remove ending of route, if the roadblock are intersecting the route (forming a loop). :param route_roadblocks: input route roadblocks :param route_roadblock_ids: input route roadblocks ids :return: tuple of ids and roadblocks of route without loops """ roadblock_occupancy_map = None loop_idx = None for idx, roadblock in enumerate(route_roadblocks): # loops only occur at intersection, thus searching for roadblock-connectors. if str(roadblock.__class__.__name__) == "NuPlanRoadBlockConnector": if not roadblock_occupancy_map: roadblock_occupancy_map = STRTreeOccupancyMapFactory.get_from_geometry( [roadblock.polygon], [roadblock.id] ) continue strtree, index_by_id = roadblock_occupancy_map._build_strtree() indices = strtree.query(roadblock.polygon) if len(indices) > 0: for geom in strtree.geometries.take(indices): area = geom.intersection(roadblock.polygon).area if area > 1: loop_idx = idx break if loop_idx: break roadblock_occupancy_map.insert(roadblock.id, roadblock.polygon) if loop_idx: route_roadblocks = route_roadblocks[:loop_idx] route_roadblock_ids = route_roadblock_ids[:loop_idx] return route_roadblocks, route_roadblock_ids