ICCV2025-RealADSim-ClosedLoop-SubmissionDemo / navsim /planning /simulation /planner /pdm_planner /utils /route_utils.py
| from typing import Dict, List, Tuple | |
| import numpy as np | |
| from nuplan.common.actor_state.state_representation import StateSE2 | |
| from nuplan.common.maps.abstract_map import AbstractMap | |
| from nuplan.common.maps.abstract_map_objects import RoadBlockGraphEdgeMapObject | |
| from nuplan.common.maps.maps_datatypes import SemanticMapLayer | |
| from nuplan.planning.simulation.occupancy_map.strtree_occupancy_map import STRTreeOccupancyMapFactory | |
| from navsim.planning.simulation.planner.pdm_planner.utils.graph_search.bfs_roadblock import BreadthFirstSearchRoadBlock | |
| from navsim.planning.simulation.planner.pdm_planner.utils.pdm_geometry_utils import normalize_angle | |
| def get_current_roadblock_candidates( | |
| ego_pose: StateSE2, | |
| map_api: AbstractMap, | |
| route_roadblocks_dict: Dict[str, RoadBlockGraphEdgeMapObject], | |
| heading_error_thresh: float = np.pi / 4, | |
| displacement_error_thresh: float = 3, | |
| ) -> Tuple[RoadBlockGraphEdgeMapObject, List[RoadBlockGraphEdgeMapObject]]: | |
| """ | |
| Determines a set of roadblock candidate where ego is located | |
| :param ego_pose: class containing ego position | |
| :param map_api: map object | |
| :param route_roadblocks_dict: dictionary of on-route roadblocks | |
| :param heading_error_thresh: maximum heading error, defaults to np.pi/4 | |
| :param displacement_error_thresh: maximum displacement, defaults to 3 | |
| :return: tuple of most promising roadblock and other candidates | |
| """ | |
| roadblock_candidates = [] | |
| layers = [SemanticMapLayer.ROADBLOCK, SemanticMapLayer.ROADBLOCK_CONNECTOR] | |
| roadblock_dict = map_api.get_proximal_map_objects(point=ego_pose.point, radius=1.0, layers=layers) | |
| roadblock_candidates = ( | |
| roadblock_dict[SemanticMapLayer.ROADBLOCK] + roadblock_dict[SemanticMapLayer.ROADBLOCK_CONNECTOR] | |
| ) | |
| if not roadblock_candidates: | |
| for layer in layers: | |
| roadblock_id_, distance = map_api.get_distance_to_nearest_map_object(point=ego_pose.point, layer=layer) | |
| roadblock = map_api.get_map_object(roadblock_id_, layer) | |
| if roadblock: | |
| roadblock_candidates.append(roadblock) | |
| on_route_candidates, on_route_candidate_displacement_errors = [], [] | |
| candidates, candidate_displacement_errors = [], [] | |
| roadblock_displacement_errors = [] | |
| roadblock_heading_errors = [] | |
| for idx, roadblock in enumerate(roadblock_candidates): | |
| lane_displacement_error, lane_heading_error = np.inf, np.inf | |
| for lane in roadblock.interior_edges: | |
| lane_discrete_path: List[StateSE2] = lane.baseline_path.discrete_path | |
| lane_discrete_points = np.array([state.point.array for state in lane_discrete_path], dtype=np.float64) | |
| lane_state_distances = ((lane_discrete_points - ego_pose.point.array[None, ...]) ** 2.0).sum(axis=-1) ** 0.5 | |
| argmin = np.argmin(lane_state_distances) | |
| heading_error = np.abs(normalize_angle(lane_discrete_path[argmin].heading - ego_pose.heading)) | |
| displacement_error = lane_state_distances[argmin] | |
| if displacement_error < lane_displacement_error: | |
| lane_heading_error, lane_displacement_error = ( | |
| heading_error, | |
| displacement_error, | |
| ) | |
| if heading_error < heading_error_thresh and displacement_error < displacement_error_thresh: | |
| if roadblock.id in route_roadblocks_dict.keys(): | |
| on_route_candidates.append(roadblock) | |
| on_route_candidate_displacement_errors.append(displacement_error) | |
| else: | |
| candidates.append(roadblock) | |
| candidate_displacement_errors.append(displacement_error) | |
| roadblock_displacement_errors.append(lane_displacement_error) | |
| roadblock_heading_errors.append(lane_heading_error) | |
| if on_route_candidates: # prefer on-route roadblocks | |
| return ( | |
| on_route_candidates[np.argmin(on_route_candidate_displacement_errors)], | |
| on_route_candidates, | |
| ) | |
| elif candidates: # fallback to most promising candidate | |
| return candidates[np.argmin(candidate_displacement_errors)], candidates | |
| # otherwise, just find any close roadblock | |
| return ( | |
| roadblock_candidates[np.argmin(roadblock_displacement_errors)], | |
| roadblock_candidates, | |
| ) | |
| def route_roadblock_correction( | |
| ego_pose: StateSE2, | |
| map_api: AbstractMap, | |
| route_roadblock_dict: Dict[str, RoadBlockGraphEdgeMapObject], | |
| search_depth_backward: int = 15, | |
| search_depth_forward: int = 30, | |
| ) -> List[str]: | |
| """ | |
| Applies several methods to correct route roadblocks. | |
| :param ego_pose: class containing ego position | |
| :param map_api: map object | |
| :param route_roadblocks_dict: dictionary of on-route roadblocks | |
| :param search_depth_backward: depth of forward BFS search, defaults to 15 | |
| :param search_depth_forward: depth of backward BFS search, defaults to 30 | |
| :return: list of roadblock id's of corrected route | |
| """ | |
| # TODO: Refactor code for readability | |
| starting_block, starting_block_candidates = get_current_roadblock_candidates( | |
| ego_pose, map_api, route_roadblock_dict | |
| ) | |
| starting_block_ids = [roadblock.id for roadblock in starting_block_candidates] | |
| route_roadblocks = list(route_roadblock_dict.values()) | |
| route_roadblock_ids = list(route_roadblock_dict.keys()) | |
| # Fix 1: when agent starts off-route | |
| if starting_block.id not in route_roadblock_ids: | |
| # Backward search if current roadblock not in route | |
| graph_search = BreadthFirstSearchRoadBlock(route_roadblock_ids[0], map_api, forward_search=False) | |
| (path, path_id), path_found = graph_search.search(starting_block_ids, max_depth=search_depth_backward) | |
| if path_found: | |
| route_roadblocks[:0] = path[:-1] | |
| route_roadblock_ids[:0] = path_id[:-1] | |
| else: | |
| # Forward search to any route roadblock | |
| graph_search = BreadthFirstSearchRoadBlock(starting_block.id, map_api, forward_search=True) | |
| (path, path_id), path_found = graph_search.search(route_roadblock_ids[:3], max_depth=search_depth_forward) | |
| if path_found: | |
| end_roadblock_idx = np.argmax(np.array(route_roadblock_ids) == path_id[-1]) | |
| route_roadblocks = route_roadblocks[end_roadblock_idx + 1 :] | |
| route_roadblock_ids = route_roadblock_ids[end_roadblock_idx + 1 :] | |
| route_roadblocks[:0] = path | |
| route_roadblock_ids[:0] = path_id | |
| # Fix 2: check if roadblocks are linked, search for links if not | |
| roadblocks_to_append = {} | |
| for i in range(len(route_roadblocks) - 1): | |
| next_incoming_block_ids = [_roadblock.id for _roadblock in route_roadblocks[i + 1].incoming_edges] | |
| is_incoming = route_roadblock_ids[i] in next_incoming_block_ids | |
| if is_incoming: | |
| continue | |
| graph_search = BreadthFirstSearchRoadBlock(route_roadblock_ids[i], map_api, forward_search=True) | |
| (path, path_id), path_found = graph_search.search(route_roadblock_ids[i + 1], max_depth=search_depth_forward) | |
| if path_found and path and len(path) >= 3: | |
| path, path_id = path[1:-1], path_id[1:-1] | |
| roadblocks_to_append[i] = (path, path_id) | |
| # append missing intermediate roadblocks | |
| offset = 1 | |
| for i, (path, path_id) in roadblocks_to_append.items(): | |
| route_roadblocks[i + offset : i + offset] = path | |
| route_roadblock_ids[i + offset : i + offset] = path_id | |
| offset += len(path) | |
| # Fix 3: cut route-loops | |
| route_roadblocks, route_roadblock_ids = remove_route_loops(route_roadblocks, route_roadblock_ids) | |
| return route_roadblock_ids | |
| def remove_route_loops( | |
| route_roadblocks: List[RoadBlockGraphEdgeMapObject], | |
| route_roadblock_ids: List[str], | |
| ) -> Tuple[List[str], List[RoadBlockGraphEdgeMapObject]]: | |
| """ | |
| Remove ending of route, if the roadblock are intersecting the route (forming a loop). | |
| :param route_roadblocks: input route roadblocks | |
| :param route_roadblock_ids: input route roadblocks ids | |
| :return: tuple of ids and roadblocks of route without loops | |
| """ | |
| roadblock_occupancy_map = None | |
| loop_idx = None | |
| for idx, roadblock in enumerate(route_roadblocks): | |
| # loops only occur at intersection, thus searching for roadblock-connectors. | |
| if str(roadblock.__class__.__name__) == "NuPlanRoadBlockConnector": | |
| if not roadblock_occupancy_map: | |
| roadblock_occupancy_map = STRTreeOccupancyMapFactory.get_from_geometry( | |
| [roadblock.polygon], [roadblock.id] | |
| ) | |
| continue | |
| strtree, index_by_id = roadblock_occupancy_map._build_strtree() | |
| indices = strtree.query(roadblock.polygon) | |
| if len(indices) > 0: | |
| for geom in strtree.geometries.take(indices): | |
| area = geom.intersection(roadblock.polygon).area | |
| if area > 1: | |
| loop_idx = idx | |
| break | |
| if loop_idx: | |
| break | |
| roadblock_occupancy_map.insert(roadblock.id, roadblock.polygon) | |
| if loop_idx: | |
| route_roadblocks = route_roadblocks[:loop_idx] | |
| route_roadblock_ids = route_roadblock_ids[:loop_idx] | |
| return route_roadblocks, route_roadblock_ids | |