| from __future__ import annotations |
|
|
| from typing import Any, Dict, List, Optional, Tuple, BinaryIO, Union |
| from dataclasses import dataclass, asdict |
| from pathlib import Path |
| import io |
| import os |
|
|
| import numpy as np |
| import numpy.typing as npt |
| from PIL import Image |
| from pyquaternion import Quaternion |
|
|
|
|
| from nuplan.planning.simulation.trajectory.trajectory_sampling import TrajectorySampling |
| from nuplan.common.actor_state.state_representation import StateSE2 |
| from nuplan.common.maps.abstract_map import AbstractMap |
| from nuplan.common.maps.nuplan_map.map_factory import get_maps_api |
| from nuplan.database.maps_db.gpkg_mapsdb import MAP_LOCATIONS |
| from nuplan.database.utils.pointclouds.lidar import LidarPointCloud |
|
|
| from navsim.planning.simulation.planner.pdm_planner.utils.pdm_geometry_utils import ( |
| convert_absolute_to_relative_se2_array, |
| ) |
|
|
| NAVSIM_INTERVAL_LENGTH: float = 0.5 |
| OPENSCENE_DATA_ROOT = os.environ.get("OPENSCENE_DATA_ROOT") |
| NUPLAN_MAPS_ROOT = os.environ.get("NUPLAN_MAPS_ROOT") |
|
|
|
|
| @dataclass |
| class Camera: |
| """Camera dataclass for image and parameters.""" |
|
|
| image: Optional[npt.NDArray[np.float32]] = None |
|
|
| sensor2lidar_rotation: Optional[npt.NDArray[np.float32]] = None |
| sensor2lidar_translation: Optional[npt.NDArray[np.float32]] = None |
| intrinsics: Optional[npt.NDArray[np.float32]] = None |
| distortion: Optional[npt.NDArray[np.float32]] = None |
|
|
|
|
| @dataclass |
| class Cameras: |
| """Multi-camera dataclass.""" |
|
|
| cam_f0: Camera |
| cam_l0: Camera |
| cam_l1: Camera |
| cam_l2: Camera |
| cam_r0: Camera |
| cam_r1: Camera |
| cam_r2: Camera |
| cam_b0: Camera |
|
|
| @classmethod |
| def from_camera_dict( |
| cls, |
| sensor_blobs_path: Path, |
| camera_dict: Dict[str, Any], |
| sensor_names: List[str], |
| ) -> Cameras: |
| """ |
| Load camera dataclass from dictionary. |
| :param sensor_blobs_path: root directory of sensor data. |
| :param camera_dict: dictionary containing camera specifications. |
| :param sensor_names: list of camera identifiers to include. |
| :return: Cameras dataclass. |
| """ |
|
|
| data_dict: Dict[str, Camera] = {} |
| for camera_name in camera_dict.keys(): |
| camera_identifier = camera_name.lower() |
| if camera_identifier in sensor_names: |
| image_path = sensor_blobs_path / camera_dict[camera_name]["data_path"] |
| data_dict[camera_identifier] = Camera( |
| image=np.array(Image.open(image_path)), |
| sensor2lidar_rotation=camera_dict[camera_name]["sensor2lidar_rotation"], |
| sensor2lidar_translation=camera_dict[camera_name]["sensor2lidar_translation"], |
| intrinsics=camera_dict[camera_name]["cam_intrinsic"], |
| distortion=camera_dict[camera_name]["distortion"], |
| ) |
| else: |
| data_dict[camera_identifier] = Camera() |
|
|
| return Cameras( |
| cam_f0=data_dict["cam_f0"], |
| cam_l0=data_dict["cam_l0"], |
| cam_l1=data_dict["cam_l1"], |
| cam_l2=data_dict["cam_l2"], |
| cam_r0=data_dict["cam_r0"], |
| cam_r1=data_dict["cam_r1"], |
| cam_r2=data_dict["cam_r2"], |
| cam_b0=data_dict["cam_b0"], |
| ) |
|
|
|
|
| @dataclass |
| class Lidar: |
| """Lidar point cloud dataclass.""" |
|
|
| |
| |
| |
| lidar_pc: Optional[npt.NDArray[np.float32]] = None |
|
|
| @staticmethod |
| def _load_bytes(lidar_path: Path) -> BinaryIO: |
| """Helper static method to load lidar point cloud stream.""" |
| with open(lidar_path, "rb") as fp: |
| return io.BytesIO(fp.read()) |
|
|
| @classmethod |
| def from_paths(cls, sensor_blobs_path: Path, lidar_path: Path, sensor_names: List[str]) -> Lidar: |
| """ |
| Loads lidar point cloud dataclass in log loading. |
| :param sensor_blobs_path: root directory to sensor data |
| :param lidar_path: relative lidar path from logs. |
| :param sensor_names: list of sensor identifiers to load` |
| :return: lidar point cloud dataclass |
| """ |
|
|
| |
| if "lidar_pc" in sensor_names: |
| global_lidar_path = sensor_blobs_path / lidar_path |
| lidar_pc = LidarPointCloud.from_buffer(cls._load_bytes(global_lidar_path), "pcd").points |
| return Lidar(lidar_pc) |
| return Lidar() |
|
|
|
|
| @dataclass |
| class EgoStatus: |
| """Ego vehicle status dataclass.""" |
|
|
| ego_pose: npt.NDArray[np.float64] |
| ego_velocity: npt.NDArray[np.float32] |
| ego_acceleration: npt.NDArray[np.float32] |
| driving_command: npt.NDArray[np.int] |
| in_global_frame: bool = False |
|
|
|
|
| @dataclass |
| class AgentInput: |
| """Dataclass for agent inputs with current and past ego statuses and sensors.""" |
|
|
| ego_statuses: List[EgoStatus] |
| cameras: List[Cameras] |
| lidars: List[Lidar] |
|
|
| @classmethod |
| def from_scene_dict_list( |
| cls, |
| scene_dict_list: List[Dict], |
| sensor_blobs_path: Path, |
| num_history_frames: int, |
| sensor_config: SensorConfig, |
| ) -> AgentInput: |
| """ |
| Load agent input from scene dictionary. |
| :param scene_dict_list: list of scene frames (in logs). |
| :param sensor_blobs_path: root directory of sensor data |
| :param num_history_frames: number of agent input frames |
| :param sensor_config: sensor config dataclass |
| :return: agent input dataclass |
| """ |
| assert len(scene_dict_list) > 0, "Scene list is empty!" |
|
|
| global_ego_poses = [] |
| for frame_idx in range(num_history_frames): |
| ego_translation = scene_dict_list[frame_idx]["ego2global_translation"] |
| ego_quaternion = Quaternion(*scene_dict_list[frame_idx]["ego2global_rotation"]) |
| global_ego_pose = np.array( |
| [ego_translation[0], ego_translation[1], ego_quaternion.yaw_pitch_roll[0]], |
| dtype=np.float64, |
| ) |
| global_ego_poses.append(global_ego_pose) |
|
|
| local_ego_poses = convert_absolute_to_relative_se2_array( |
| StateSE2(*global_ego_poses[-1]), np.array(global_ego_poses, dtype=np.float64) |
| ) |
|
|
| ego_statuses: List[EgoStatus] = [] |
| cameras: List[EgoStatus] = [] |
| lidars: List[Lidar] = [] |
|
|
| for frame_idx in range(num_history_frames): |
|
|
| ego_dynamic_state = scene_dict_list[frame_idx]["ego_dynamic_state"] |
| ego_status = EgoStatus( |
| ego_pose=np.array(local_ego_poses[frame_idx], dtype=np.float32), |
| ego_velocity=np.array(ego_dynamic_state[:2], dtype=np.float32), |
| ego_acceleration=np.array(ego_dynamic_state[2:], dtype=np.float32), |
| driving_command=scene_dict_list[frame_idx]["driving_command"], |
| ) |
| ego_statuses.append(ego_status) |
|
|
| sensor_names = sensor_config.get_sensors_at_iteration(frame_idx) |
| cameras.append( |
| Cameras.from_camera_dict( |
| sensor_blobs_path=sensor_blobs_path, |
| camera_dict=scene_dict_list[frame_idx]["cams"], |
| sensor_names=sensor_names, |
| ) |
| ) |
|
|
| lidars.append( |
| Lidar.from_paths( |
| sensor_blobs_path=sensor_blobs_path, |
| lidar_path=Path(scene_dict_list[frame_idx]["lidar_path"]), |
| sensor_names=sensor_names, |
| ) |
| ) |
|
|
| return AgentInput(ego_statuses, cameras, lidars) |
|
|
|
|
| @dataclass |
| class Annotations: |
| """Dataclass of annotations (e.g. bounding boxes) per frame.""" |
|
|
| boxes: npt.NDArray[np.float32] |
| names: List[str] |
| velocity_3d: npt.NDArray[np.float32] |
| instance_tokens: List[str] |
| track_tokens: List[str] |
|
|
| def __post_init__(self): |
| annotation_lengths: Dict[str, int] = { |
| attribute_name: len(attribute) for attribute_name, attribute in vars(self).items() |
| } |
| assert ( |
| len(set(annotation_lengths.values())) == 1 |
| ), f"Annotations expects all attributes to have equal length, but got {annotation_lengths}" |
|
|
|
|
| @dataclass |
| class Trajectory: |
| """Trajectory dataclass in NAVSIM.""" |
|
|
| poses: npt.NDArray[np.float32] |
| trajectory_sampling: TrajectorySampling = TrajectorySampling(time_horizon=4, interval_length=0.5) |
|
|
| def __post_init__(self): |
| assert self.poses.ndim == 2, "Trajectory poses should have two dimensions for samples and poses." |
| assert ( |
| self.poses.shape[0] == self.trajectory_sampling.num_poses |
| ), "Trajectory poses and sampling have unequal number of poses." |
| assert self.poses.shape[1] == 3, "Trajectory requires (x, y, heading) at last dim." |
|
|
|
|
| @dataclass |
| class SceneMetadata: |
| """Dataclass of scene metadata (e.g. location) per scene.""" |
|
|
| log_name: str |
| scene_token: str |
| map_name: str |
| initial_token: str |
|
|
| num_history_frames: int |
| num_future_frames: int |
|
|
|
|
| @dataclass |
| class Frame: |
| """Frame dataclass with privileged information.""" |
|
|
| token: str |
| timestamp: int |
| roadblock_ids: List[str] |
| traffic_lights: List[Tuple[str, bool]] |
| annotations: Annotations |
|
|
| ego_status: EgoStatus |
| lidar: Lidar |
| cameras: Cameras |
|
|
|
|
| @dataclass |
| class Scene: |
| """Scene dataclass defining a single sample in NAVSIM.""" |
|
|
| |
| scene_metadata: SceneMetadata |
| map_api: AbstractMap |
| frames: List[Frame] |
|
|
| def get_future_trajectory(self, num_trajectory_frames: Optional[int] = None) -> Trajectory: |
| """ |
| Extracts future trajectory of the human operator in local coordinates (ie. ego rear-axle). |
| :param num_trajectory_frames: optional number frames to extract poses, defaults to None |
| :return: trajectory dataclass |
| """ |
|
|
| if num_trajectory_frames is None: |
| num_trajectory_frames = self.scene_metadata.num_future_frames |
|
|
| start_frame_idx = self.scene_metadata.num_history_frames - 1 |
|
|
| global_ego_poses = [] |
| for frame_idx in range(start_frame_idx, start_frame_idx + num_trajectory_frames + 1): |
| global_ego_poses.append(self.frames[frame_idx].ego_status.ego_pose) |
|
|
| local_ego_poses = convert_absolute_to_relative_se2_array( |
| StateSE2(*global_ego_poses[0]), np.array(global_ego_poses[1:], dtype=np.float64) |
| ) |
|
|
| return Trajectory( |
| local_ego_poses, |
| TrajectorySampling( |
| num_poses=len(local_ego_poses), |
| interval_length=NAVSIM_INTERVAL_LENGTH, |
| ), |
| ) |
|
|
| def get_history_trajectory(self, num_trajectory_frames: Optional[int] = None) -> Trajectory: |
| """ |
| Extracts past trajectory of ego vehicles in local coordinates (ie. ego rear-axle). |
| :param num_trajectory_frames: optional number frames to extract poses, defaults to None |
| :return: trajectory dataclass |
| """ |
|
|
| if num_trajectory_frames is None: |
| num_trajectory_frames = self.scene_metadata.num_history_frames |
|
|
| global_ego_poses = [] |
| for frame_idx in range(num_trajectory_frames): |
| global_ego_poses.append(self.frames[frame_idx].ego_status.ego_pose) |
|
|
| origin = StateSE2(*global_ego_poses[-1]) |
| local_ego_poses = convert_absolute_to_relative_se2_array(origin, np.array(global_ego_poses, dtype=np.float64)) |
|
|
| return Trajectory( |
| local_ego_poses, |
| TrajectorySampling( |
| num_poses=len(local_ego_poses), |
| interval_length=NAVSIM_INTERVAL_LENGTH, |
| ), |
| ) |
|
|
| def get_agent_input(self) -> AgentInput: |
| """ |
| Extracts agents input dataclass (without privileged information) from scene. |
| :return: agent input dataclass |
| """ |
|
|
| local_ego_poses = self.get_history_trajectory().poses |
| ego_statuses: List[EgoStatus] = [] |
| cameras: List[Cameras] = [] |
| lidars: List[Lidar] = [] |
|
|
| for frame_idx in range(self.scene_metadata.num_history_frames): |
| frame_ego_status = self.frames[frame_idx].ego_status |
|
|
| ego_statuses.append( |
| EgoStatus( |
| ego_pose=local_ego_poses[frame_idx], |
| ego_velocity=frame_ego_status.ego_velocity, |
| ego_acceleration=frame_ego_status.ego_acceleration, |
| driving_command=frame_ego_status.driving_command, |
| ) |
| ) |
| cameras.append(self.frames[frame_idx].cameras) |
| lidars.append(self.frames[frame_idx].lidar) |
|
|
| return AgentInput(ego_statuses, cameras, lidars) |
|
|
| @classmethod |
| def _build_map_api(cls, map_name: str) -> AbstractMap: |
| """Helper classmethod to load map api from name.""" |
| assert map_name in MAP_LOCATIONS, f"The map name {map_name} is invalid, must be in {MAP_LOCATIONS}" |
| return get_maps_api(NUPLAN_MAPS_ROOT, "nuplan-maps-v1.0", map_name) |
|
|
| @classmethod |
| def _build_annotations(cls, scene_frame: Dict) -> Annotations: |
| """Helper classmethod to load annotation dataclass from logs.""" |
| return Annotations( |
| boxes=scene_frame["anns"]["gt_boxes"], |
| names=scene_frame["anns"]["gt_names"], |
| velocity_3d=scene_frame["anns"]["gt_velocity_3d"], |
| instance_tokens=scene_frame["anns"]["instance_tokens"], |
| track_tokens=scene_frame["anns"]["track_tokens"], |
| ) |
|
|
| @classmethod |
| def _build_ego_status(cls, scene_frame: Dict) -> EgoStatus: |
| """Helper classmethod to load ego status dataclass from logs.""" |
| ego_translation = scene_frame["ego2global_translation"] |
| ego_quaternion = Quaternion(*scene_frame["ego2global_rotation"]) |
| global_ego_pose = np.array( |
| [ego_translation[0], ego_translation[1], ego_quaternion.yaw_pitch_roll[0]], |
| dtype=np.float64, |
| ) |
| ego_dynamic_state = scene_frame["ego_dynamic_state"] |
| return EgoStatus( |
| ego_pose=global_ego_pose, |
| ego_velocity=np.array(ego_dynamic_state[:2], dtype=np.float32), |
| ego_acceleration=np.array(ego_dynamic_state[2:], dtype=np.float32), |
| driving_command=scene_frame["driving_command"], |
| in_global_frame=True, |
| ) |
|
|
| @classmethod |
| def from_scene_dict_list( |
| cls, |
| scene_dict_list: List[Dict], |
| sensor_blobs_path: Path, |
| num_history_frames: int, |
| num_future_frames: int, |
| sensor_config: SensorConfig, |
| ) -> Scene: |
| """ |
| Load scene dataclass from scene dictionary list (for log loading). |
| :param scene_dict_list: list of scene frames (in logs) |
| :param sensor_blobs_path: root directory of sensor data |
| :param num_history_frames: number of past and current frames to load |
| :param num_future_frames: number of future frames to load |
| :param sensor_config: sensor config dataclass |
| :return: scene dataclass |
| """ |
| assert len(scene_dict_list) >= 0, "Scene list is empty!" |
| scene_metadata = SceneMetadata( |
| log_name=scene_dict_list[num_history_frames - 1]["log_name"], |
| scene_token=scene_dict_list[num_history_frames - 1]["scene_token"], |
| map_name=scene_dict_list[num_history_frames - 1]["map_location"], |
| initial_token=scene_dict_list[num_history_frames - 1]["token"], |
| num_history_frames=num_history_frames, |
| num_future_frames=num_future_frames, |
| ) |
| map_api = cls._build_map_api(scene_metadata.map_name) |
|
|
| frames: List[Frame] = [] |
| for frame_idx in range(len(scene_dict_list)): |
| global_ego_status = cls._build_ego_status(scene_dict_list[frame_idx]) |
| annotations = cls._build_annotations(scene_dict_list[frame_idx]) |
|
|
| sensor_names = sensor_config.get_sensors_at_iteration(frame_idx) |
|
|
| cameras = Cameras.from_camera_dict( |
| sensor_blobs_path=sensor_blobs_path, |
| camera_dict=scene_dict_list[frame_idx]["cams"], |
| sensor_names=sensor_names, |
| ) |
|
|
| lidar = Lidar.from_paths( |
| sensor_blobs_path=sensor_blobs_path, |
| lidar_path=Path(scene_dict_list[frame_idx]["lidar_path"]), |
| sensor_names=sensor_names, |
| ) |
|
|
| frame = Frame( |
| token=scene_dict_list[frame_idx]["token"], |
| timestamp=scene_dict_list[frame_idx]["timestamp"], |
| roadblock_ids=scene_dict_list[frame_idx]["roadblock_ids"], |
| traffic_lights=scene_dict_list[frame_idx]["traffic_lights"], |
| annotations=annotations, |
| ego_status=global_ego_status, |
| lidar=lidar, |
| cameras=cameras, |
| ) |
| frames.append(frame) |
|
|
| return Scene(scene_metadata=scene_metadata, map_api=map_api, frames=frames) |
|
|
|
|
| @dataclass |
| class SceneFilter: |
| """Scene filtering configuration for scene loading.""" |
|
|
| num_history_frames: int = 4 |
| num_future_frames: int = 10 |
| frame_interval: Optional[int] = None |
| has_route: bool = True |
|
|
| max_scenes: Optional[int] = None |
| log_names: Optional[List[str]] = None |
| tokens: Optional[List[str]] = None |
| |
|
|
| def __post_init__(self): |
|
|
| if self.frame_interval is None: |
| self.frame_interval = self.num_frames |
|
|
| assert self.num_history_frames >= 1, "SceneFilter: num_history_frames must greater equal one." |
| assert self.num_future_frames >= 0, "SceneFilter: num_future_frames must greater equal zero." |
| assert self.frame_interval >= 1, "SceneFilter: frame_interval must greater equal one." |
|
|
| @property |
| def num_frames(self) -> int: |
| """ |
| :return: total number for frames for scenes to extract. |
| """ |
| return self.num_history_frames + self.num_future_frames |
|
|
|
|
| @dataclass |
| class SensorConfig: |
| """Configuration dataclass of agent sensors for memory management.""" |
|
|
| |
| |
| |
| cam_f0: Union[bool, List[int]] |
| cam_l0: Union[bool, List[int]] |
| cam_l1: Union[bool, List[int]] |
| cam_l2: Union[bool, List[int]] |
| cam_r0: Union[bool, List[int]] |
| cam_r1: Union[bool, List[int]] |
| cam_r2: Union[bool, List[int]] |
| cam_b0: Union[bool, List[int]] |
| lidar_pc: Union[bool, List[int]] |
|
|
| def get_sensors_at_iteration(self, iteration: int) -> List[str]: |
| """ |
| Creates a list of sensor identifiers given iteration. |
| :param iteration: integer indicating the history iteration. |
| :return: list of sensor identifiers to load. |
| """ |
| sensors_at_iteration: List[str] = [] |
| for sensor_name, sensor_include in asdict(self).items(): |
| if isinstance(sensor_include, bool) and sensor_include: |
| sensors_at_iteration.append(sensor_name) |
| elif isinstance(sensor_include, list) and iteration in sensor_include: |
| sensors_at_iteration.append(sensor_name) |
| return sensors_at_iteration |
|
|
| @classmethod |
| def build_all_sensors(cls, include: Union[bool, List[int]] = True) -> SensorConfig: |
| """ |
| Classmethod to load all sensors with the same specification. |
| :param include: boolean or integers for sensors to include, defaults to True |
| :return: sensor configuration dataclass |
| """ |
| return SensorConfig( |
| cam_f0=include, |
| cam_l0=include, |
| cam_l1=include, |
| cam_l2=include, |
| cam_r0=include, |
| cam_r1=include, |
| cam_r2=include, |
| cam_b0=include, |
| lidar_pc=include, |
| ) |
|
|
| @classmethod |
| def build_no_sensors(cls) -> SensorConfig: |
| """ |
| Classmethod to load no sensors. |
| :return: sensor configuration dataclass |
| """ |
| return cls.build_all_sensors(include=False) |
|
|
|
|
| @dataclass |
| class PDMResults: |
| """Helper dataclass to record PDM results.""" |
|
|
| no_at_fault_collisions: float |
| drivable_area_compliance: float |
|
|
| ego_progress: float |
| time_to_collision_within_bound: float |
| comfort: float |
| driving_direction_compliance: float |
|
|
| score: float |
|
|