ICCV2025-RealADSim-ClosedLoop-SubmissionDemo / navsim /planning /simulation /planner /pdm_planner /abstract_pdm_closed_planner.py
| from typing import List, Optional | |
| import numpy as np | |
| from nuplan.common.actor_state.ego_state import EgoState | |
| from nuplan.common.maps.abstract_map_objects import LaneGraphEdgeMapObject | |
| from nuplan.planning.simulation.planner.abstract_planner import PlannerInput | |
| from nuplan.planning.simulation.trajectory.interpolated_trajectory import InterpolatedTrajectory | |
| from nuplan.planning.simulation.trajectory.trajectory_sampling import TrajectorySampling | |
| from navsim.planning.simulation.planner.pdm_planner.abstract_pdm_planner import AbstractPDMPlanner | |
| from navsim.planning.simulation.planner.pdm_planner.observation.pdm_observation import PDMObservation | |
| from navsim.planning.simulation.planner.pdm_planner.proposal.batch_idm_policy import BatchIDMPolicy | |
| from navsim.planning.simulation.planner.pdm_planner.proposal.pdm_generator import PDMGenerator | |
| from navsim.planning.simulation.planner.pdm_planner.proposal.pdm_proposal import PDMProposalManager | |
| from navsim.planning.simulation.planner.pdm_planner.scoring.pdm_scorer import PDMScorer | |
| from navsim.planning.simulation.planner.pdm_planner.simulation.pdm_simulator import PDMSimulator | |
| from navsim.planning.simulation.planner.pdm_planner.utils.pdm_geometry_utils import parallel_discrete_path | |
| from navsim.planning.simulation.planner.pdm_planner.utils.pdm_path import PDMPath | |
| class AbstractPDMClosedPlanner(AbstractPDMPlanner): | |
| """ | |
| Interface for planners incorporating PDM-Closed. Used for PDM-Closed and PDM-Hybrid. | |
| """ | |
| def __init__( | |
| self, | |
| trajectory_sampling: TrajectorySampling, | |
| proposal_sampling: TrajectorySampling, | |
| idm_policies: BatchIDMPolicy, | |
| lateral_offsets: Optional[List[float]], | |
| map_radius: float, | |
| ): | |
| """ | |
| Constructor for AbstractPDMClosedPlanner | |
| :param trajectory_sampling: Sampling parameters for final trajectory | |
| :param proposal_sampling: Sampling parameters for proposals | |
| :param idm_policies: BatchIDMPolicy class | |
| :param lateral_offsets: centerline offsets for proposals (optional) | |
| :param map_radius: radius around ego to consider | |
| """ | |
| super(AbstractPDMClosedPlanner, self).__init__(map_radius) | |
| assert ( | |
| trajectory_sampling.interval_length == proposal_sampling.interval_length | |
| ), "AbstractPDMClosedPlanner: Proposals and Trajectory must have equal interval length!" | |
| # config parameters | |
| self._trajectory_sampling: int = trajectory_sampling | |
| self._proposal_sampling: int = proposal_sampling | |
| self._idm_policies: BatchIDMPolicy = idm_policies | |
| self._lateral_offsets: Optional[List[float]] = lateral_offsets | |
| # observation/forecasting class | |
| self._observation = PDMObservation(trajectory_sampling, proposal_sampling, map_radius) | |
| # proposal/trajectory related classes | |
| self._generator = PDMGenerator(trajectory_sampling, proposal_sampling) | |
| self._simulator = PDMSimulator(proposal_sampling) | |
| self._scorer = PDMScorer(proposal_sampling) | |
| # lazy loaded | |
| self._proposal_manager: Optional[PDMProposalManager] = None | |
| def _update_proposal_manager(self, ego_state: EgoState): | |
| """ | |
| Updates or initializes PDMProposalManager class | |
| :param ego_state: state of ego-vehicle | |
| """ | |
| current_lane = self._get_starting_lane(ego_state) | |
| # TODO: Find additional conditions to trigger re-planning | |
| create_new_proposals = self._iteration == 0 | |
| if create_new_proposals: | |
| proposal_paths: List[PDMPath] = self._get_proposal_paths(current_lane) | |
| self._proposal_manager = PDMProposalManager( | |
| lateral_proposals=proposal_paths, | |
| longitudinal_policies=self._idm_policies, | |
| ) | |
| # update proposals | |
| self._proposal_manager.update(current_lane.speed_limit_mps) | |
| def _get_proposal_paths(self, current_lane: LaneGraphEdgeMapObject) -> List[PDMPath]: | |
| """ | |
| Returns a list of path's to follow for the proposals. Inits a centerline. | |
| :param current_lane: current or starting lane of path-planning | |
| :return: lists of paths (0-index is centerline) | |
| """ | |
| centerline_discrete_path = self._get_discrete_centerline(current_lane) | |
| self._centerline = PDMPath(centerline_discrete_path) | |
| # 1. save centerline path (necessary for progress metric) | |
| output_paths: List[PDMPath] = [self._centerline] | |
| # 2. add additional paths with lateral offset of centerline | |
| if self._lateral_offsets is not None: | |
| for lateral_offset in self._lateral_offsets: | |
| offset_discrete_path = parallel_discrete_path( | |
| discrete_path=centerline_discrete_path, offset=lateral_offset | |
| ) | |
| output_paths.append(PDMPath(offset_discrete_path)) | |
| return output_paths | |
| def _get_closed_loop_trajectory( | |
| self, | |
| current_input: PlannerInput, | |
| ) -> InterpolatedTrajectory: | |
| """ | |
| Creates the closed-loop trajectory for PDM-Closed planner. | |
| :param current_input: planner input | |
| :return: trajectory | |
| """ | |
| ego_state, observation = current_input.history.current_state | |
| # 1. Environment forecast and observation update | |
| self._observation.update( | |
| ego_state, | |
| observation, | |
| current_input.traffic_light_data, | |
| self._route_lane_dict, | |
| ) | |
| # 2. Centerline extraction and proposal update | |
| self._update_proposal_manager(ego_state) | |
| # 3. Generate/Unroll proposals | |
| proposals_array = self._generator.generate_proposals(ego_state, self._observation, self._proposal_manager) | |
| # 4. Simulate proposals | |
| simulated_proposals_array = self._simulator.simulate_proposals(proposals_array, ego_state) | |
| # 5. Score proposals | |
| proposal_scores = self._scorer.score_proposals( | |
| simulated_proposals_array, | |
| self._observation, | |
| self._centerline, | |
| list(self._route_lane_dict.keys()), | |
| self._drivable_area_map, | |
| ) | |
| trajectory = self._generator.generate_trajectory(np.argmax(proposal_scores)) | |
| return trajectory | |