ICCV2025-RealADSim-ClosedLoop-SubmissionDemo / navsim /planning /simulation /planner /pdm_planner /pdm_closed_planner.py
| from typing import List, Optional, Type | |
| import warnings | |
| import logging | |
| import gc | |
| from nuplan.planning.simulation.observation.observation_type import DetectionsTracks, Observation | |
| from nuplan.planning.simulation.planner.abstract_planner import PlannerInitialization, PlannerInput | |
| from nuplan.planning.simulation.trajectory.abstract_trajectory import AbstractTrajectory | |
| from nuplan.planning.simulation.trajectory.trajectory_sampling import TrajectorySampling | |
| from navsim.planning.simulation.planner.pdm_planner.abstract_pdm_closed_planner import AbstractPDMClosedPlanner | |
| from navsim.planning.simulation.planner.pdm_planner.proposal.batch_idm_policy import BatchIDMPolicy | |
| from navsim.planning.simulation.planner.pdm_planner.observation.pdm_occupancy_map import PDMDrivableMap | |
| warnings.filterwarnings("ignore", category=RuntimeWarning) | |
| logger = logging.getLogger(__name__) | |
| class PDMClosedPlanner(AbstractPDMClosedPlanner): | |
| """PDM-Closed planner class.""" | |
| # Inherited property, see superclass. | |
| requires_scenario: bool = False | |
| def __init__( | |
| self, | |
| trajectory_sampling: TrajectorySampling, | |
| proposal_sampling: TrajectorySampling, | |
| idm_policies: BatchIDMPolicy, | |
| lateral_offsets: Optional[List[float]], | |
| map_radius: float, | |
| ): | |
| """ | |
| Constructor for PDMClosedPlanner | |
| :param trajectory_sampling: Sampling parameters for final trajectory | |
| :param proposal_sampling: Sampling parameters for proposals | |
| :param idm_policies: BatchIDMPolicy class | |
| :param lateral_offsets: centerline offsets for proposals (optional) | |
| :param map_radius: radius around ego to consider | |
| """ | |
| super(PDMClosedPlanner, self).__init__( | |
| trajectory_sampling, | |
| proposal_sampling, | |
| idm_policies, | |
| lateral_offsets, | |
| map_radius, | |
| ) | |
| def initialize(self, initialization: PlannerInitialization) -> None: | |
| """Inherited, see superclass.""" | |
| self._iteration = 0 | |
| self._map_api = initialization.map_api | |
| self._load_route_dicts(initialization.route_roadblock_ids) | |
| gc.collect() | |
| def name(self) -> str: | |
| """Inherited, see superclass.""" | |
| return self.__class__.__name__ | |
| def observation_type(self) -> Type[Observation]: | |
| """Inherited, see superclass.""" | |
| return DetectionsTracks # type: ignore | |
| def compute_planner_trajectory(self, current_input: PlannerInput) -> AbstractTrajectory: | |
| """Inherited, see superclass.""" | |
| gc.disable() | |
| ego_state, _ = current_input.history.current_state | |
| # Apply route correction on first iteration (ego_state required) | |
| if self._iteration == 0: | |
| self._route_roadblock_correction(ego_state) | |
| # Update/Create drivable area polygon map | |
| self._drivable_area_map = PDMDrivableMap.from_simulation(self._map_api, ego_state, self._map_radius) | |
| trajectory = self._get_closed_loop_trajectory(current_input) | |
| self._iteration += 1 | |
| return trajectory | |