hyzhou404's picture
init
7accb91
from typing import Dict, List, Tuple
import numpy as np
from nuplan.common.actor_state.state_representation import StateSE2
from nuplan.common.maps.abstract_map import AbstractMap
from nuplan.common.maps.abstract_map_objects import RoadBlockGraphEdgeMapObject
from nuplan.common.maps.maps_datatypes import SemanticMapLayer
from nuplan.planning.simulation.occupancy_map.strtree_occupancy_map import STRTreeOccupancyMapFactory
from navsim.planning.simulation.planner.pdm_planner.utils.graph_search.bfs_roadblock import BreadthFirstSearchRoadBlock
from navsim.planning.simulation.planner.pdm_planner.utils.pdm_geometry_utils import normalize_angle
def get_current_roadblock_candidates(
ego_pose: StateSE2,
map_api: AbstractMap,
route_roadblocks_dict: Dict[str, RoadBlockGraphEdgeMapObject],
heading_error_thresh: float = np.pi / 4,
displacement_error_thresh: float = 3,
) -> Tuple[RoadBlockGraphEdgeMapObject, List[RoadBlockGraphEdgeMapObject]]:
"""
Determines a set of roadblock candidate where ego is located
:param ego_pose: class containing ego position
:param map_api: map object
:param route_roadblocks_dict: dictionary of on-route roadblocks
:param heading_error_thresh: maximum heading error, defaults to np.pi/4
:param displacement_error_thresh: maximum displacement, defaults to 3
:return: tuple of most promising roadblock and other candidates
"""
roadblock_candidates = []
layers = [SemanticMapLayer.ROADBLOCK, SemanticMapLayer.ROADBLOCK_CONNECTOR]
roadblock_dict = map_api.get_proximal_map_objects(point=ego_pose.point, radius=1.0, layers=layers)
roadblock_candidates = (
roadblock_dict[SemanticMapLayer.ROADBLOCK] + roadblock_dict[SemanticMapLayer.ROADBLOCK_CONNECTOR]
)
if not roadblock_candidates:
for layer in layers:
roadblock_id_, distance = map_api.get_distance_to_nearest_map_object(point=ego_pose.point, layer=layer)
roadblock = map_api.get_map_object(roadblock_id_, layer)
if roadblock:
roadblock_candidates.append(roadblock)
on_route_candidates, on_route_candidate_displacement_errors = [], []
candidates, candidate_displacement_errors = [], []
roadblock_displacement_errors = []
roadblock_heading_errors = []
for idx, roadblock in enumerate(roadblock_candidates):
lane_displacement_error, lane_heading_error = np.inf, np.inf
for lane in roadblock.interior_edges:
lane_discrete_path: List[StateSE2] = lane.baseline_path.discrete_path
lane_discrete_points = np.array([state.point.array for state in lane_discrete_path], dtype=np.float64)
lane_state_distances = ((lane_discrete_points - ego_pose.point.array[None, ...]) ** 2.0).sum(axis=-1) ** 0.5
argmin = np.argmin(lane_state_distances)
heading_error = np.abs(normalize_angle(lane_discrete_path[argmin].heading - ego_pose.heading))
displacement_error = lane_state_distances[argmin]
if displacement_error < lane_displacement_error:
lane_heading_error, lane_displacement_error = (
heading_error,
displacement_error,
)
if heading_error < heading_error_thresh and displacement_error < displacement_error_thresh:
if roadblock.id in route_roadblocks_dict.keys():
on_route_candidates.append(roadblock)
on_route_candidate_displacement_errors.append(displacement_error)
else:
candidates.append(roadblock)
candidate_displacement_errors.append(displacement_error)
roadblock_displacement_errors.append(lane_displacement_error)
roadblock_heading_errors.append(lane_heading_error)
if on_route_candidates: # prefer on-route roadblocks
return (
on_route_candidates[np.argmin(on_route_candidate_displacement_errors)],
on_route_candidates,
)
elif candidates: # fallback to most promising candidate
return candidates[np.argmin(candidate_displacement_errors)], candidates
# otherwise, just find any close roadblock
return (
roadblock_candidates[np.argmin(roadblock_displacement_errors)],
roadblock_candidates,
)
def route_roadblock_correction(
ego_pose: StateSE2,
map_api: AbstractMap,
route_roadblock_dict: Dict[str, RoadBlockGraphEdgeMapObject],
search_depth_backward: int = 15,
search_depth_forward: int = 30,
) -> List[str]:
"""
Applies several methods to correct route roadblocks.
:param ego_pose: class containing ego position
:param map_api: map object
:param route_roadblocks_dict: dictionary of on-route roadblocks
:param search_depth_backward: depth of forward BFS search, defaults to 15
:param search_depth_forward: depth of backward BFS search, defaults to 30
:return: list of roadblock id's of corrected route
"""
# TODO: Refactor code for readability
starting_block, starting_block_candidates = get_current_roadblock_candidates(
ego_pose, map_api, route_roadblock_dict
)
starting_block_ids = [roadblock.id for roadblock in starting_block_candidates]
route_roadblocks = list(route_roadblock_dict.values())
route_roadblock_ids = list(route_roadblock_dict.keys())
# Fix 1: when agent starts off-route
if starting_block.id not in route_roadblock_ids:
# Backward search if current roadblock not in route
graph_search = BreadthFirstSearchRoadBlock(route_roadblock_ids[0], map_api, forward_search=False)
(path, path_id), path_found = graph_search.search(starting_block_ids, max_depth=search_depth_backward)
if path_found:
route_roadblocks[:0] = path[:-1]
route_roadblock_ids[:0] = path_id[:-1]
else:
# Forward search to any route roadblock
graph_search = BreadthFirstSearchRoadBlock(starting_block.id, map_api, forward_search=True)
(path, path_id), path_found = graph_search.search(route_roadblock_ids[:3], max_depth=search_depth_forward)
if path_found:
end_roadblock_idx = np.argmax(np.array(route_roadblock_ids) == path_id[-1])
route_roadblocks = route_roadblocks[end_roadblock_idx + 1 :]
route_roadblock_ids = route_roadblock_ids[end_roadblock_idx + 1 :]
route_roadblocks[:0] = path
route_roadblock_ids[:0] = path_id
# Fix 2: check if roadblocks are linked, search for links if not
roadblocks_to_append = {}
for i in range(len(route_roadblocks) - 1):
next_incoming_block_ids = [_roadblock.id for _roadblock in route_roadblocks[i + 1].incoming_edges]
is_incoming = route_roadblock_ids[i] in next_incoming_block_ids
if is_incoming:
continue
graph_search = BreadthFirstSearchRoadBlock(route_roadblock_ids[i], map_api, forward_search=True)
(path, path_id), path_found = graph_search.search(route_roadblock_ids[i + 1], max_depth=search_depth_forward)
if path_found and path and len(path) >= 3:
path, path_id = path[1:-1], path_id[1:-1]
roadblocks_to_append[i] = (path, path_id)
# append missing intermediate roadblocks
offset = 1
for i, (path, path_id) in roadblocks_to_append.items():
route_roadblocks[i + offset : i + offset] = path
route_roadblock_ids[i + offset : i + offset] = path_id
offset += len(path)
# Fix 3: cut route-loops
route_roadblocks, route_roadblock_ids = remove_route_loops(route_roadblocks, route_roadblock_ids)
return route_roadblock_ids
def remove_route_loops(
route_roadblocks: List[RoadBlockGraphEdgeMapObject],
route_roadblock_ids: List[str],
) -> Tuple[List[str], List[RoadBlockGraphEdgeMapObject]]:
"""
Remove ending of route, if the roadblock are intersecting the route (forming a loop).
:param route_roadblocks: input route roadblocks
:param route_roadblock_ids: input route roadblocks ids
:return: tuple of ids and roadblocks of route without loops
"""
roadblock_occupancy_map = None
loop_idx = None
for idx, roadblock in enumerate(route_roadblocks):
# loops only occur at intersection, thus searching for roadblock-connectors.
if str(roadblock.__class__.__name__) == "NuPlanRoadBlockConnector":
if not roadblock_occupancy_map:
roadblock_occupancy_map = STRTreeOccupancyMapFactory.get_from_geometry(
[roadblock.polygon], [roadblock.id]
)
continue
strtree, index_by_id = roadblock_occupancy_map._build_strtree()
indices = strtree.query(roadblock.polygon)
if len(indices) > 0:
for geom in strtree.geometries.take(indices):
area = geom.intersection(roadblock.polygon).area
if area > 1:
loop_idx = idx
break
if loop_idx:
break
roadblock_occupancy_map.insert(roadblock.id, roadblock.polygon)
if loop_idx:
route_roadblocks = route_roadblocks[:loop_idx]
route_roadblock_ids = route_roadblock_ids[:loop_idx]
return route_roadblocks, route_roadblock_ids