from __future__ import annotations from io import BytesIO from typing import IO, Any, List, NamedTuple import numpy as np import numpy.typing as npt class PointCloudHeader(NamedTuple): """Class for Point Cloud header.""" version: str fields: List[str] size: List[int] type: List[str] count: List[int] # type: ignore width: int height: int viewpoint: List[int] points: int data: str class PointCloud: """ Class for raw .pcd file. """ def __init__(self, header: PointCloudHeader, points: npt.NDArray[np.float64]) -> None: """ PointCloud. :param header: Pointcloud header. :param points: . X columns, N points. """ self._header = header self._points = points @property def header(self) -> PointCloudHeader: """ Returns pointcloud header. :return: A PointCloudHeader instance. """ return self._header @property def points(self) -> npt.NDArray[np.float64]: """ Returns points. :return: . X columns, N points. """ return self._points def save(self, file_path: str) -> None: """ Saves to .pcd file. :param file_path: The path to the .pcd file. """ with open(file_path, 'wb') as fp: fp.write('# .PCD v{} - Point Cloud Data file format\n'.format(self._header.version).encode('utf8')) for field in self._header._fields: value = getattr(self._header, field) if isinstance(value, list): text = ' '.join(map(str, value)) else: text = str(value) fp.write('{} {}\n'.format(field.upper(), text).encode('utf8')) fp.write(self._points.tobytes()) @classmethod def parse(cls, pcd_content: bytes) -> PointCloud: """ Parses the pointcloud from byte stream. :param pcd_content: The byte stream that holds the pcd content. :return: A PointCloud object. """ with BytesIO(pcd_content) as stream: header = cls.parse_header(stream) points = cls.parse_points(stream, header) return cls(header, points) @classmethod def parse_from_file(cls, pcd_file: str) -> PointCloud: """ Parses the pointcloud from .pcd file on disk. :param pcd_file: The path to the .pcd file. :return: A PointCloud instance. """ with open(pcd_file, 'rb') as stream: header = cls.parse_header(stream) points = cls.parse_points(stream, header) return cls(header, points) @staticmethod def parse_header(stream: IO[Any]) -> PointCloudHeader: """ Parses the header of a pointcloud from byte IO stream. :param stream: Binary stream. :return: A PointCloudHeader instance. """ headers_list = [] while True: line = stream.readline().decode('utf8').strip() if line.startswith('#'): continue columns = line.split() key = columns[0].lower() val = columns[1:] if len(columns) > 2 else columns[1] headers_list.append((key, val)) if key == 'data': break headers = dict(headers_list) headers['size'] = list(map(int, headers['size'])) headers['count'] = list(map(int, headers['count'])) headers['width'] = int(headers['width']) headers['height'] = int(headers['height']) headers['viewpoint'] = list(map(int, headers['viewpoint'])) headers['points'] = int(headers['points']) header = PointCloudHeader(**headers) if any([c != 1 for c in header.count]): raise RuntimeError('"count" has to be 1') if not len(header.fields) == len(header.size) == len(header.type) == len(header.count): raise RuntimeError('fields/size/type/count field number are inconsistent') return header @staticmethod def parse_points(stream: IO[Any], header: PointCloudHeader) -> npt.NDArray[np.float64]: """ Parses points from byte IO stream. :param stream: Byte stream that holds the points. :param header: . A numpy array that has X columns(features), N points. :return: Points of Point Cloud. """ if header.data != 'binary': raise RuntimeError('Un-supported data foramt: {}. "binary" is expected.'.format(header.data)) # There is garbage data at the end of the stream, usually all b'\x00'. row_type = PointCloud.np_type(header) length = row_type.itemsize * header.points buff = stream.read(length) if len(buff) != length: raise RuntimeError('Incomplete pointcloud stream: {} bytes expected, {} got'.format(length, len(buff))) points = np.frombuffer(buff, row_type) return points @staticmethod def np_type(header: PointCloudHeader) -> np.dtype: # type: ignore """ Helper function that translate column types in pointcloud to np types. :param header: A PointCloudHeader object. :return: np.dtype that holds the X features. """ type_mapping = {'I': 'int', 'U': 'uint', 'F': 'float'} np_types = [type_mapping[t] + str(int(s) * 8) for t, s in zip(header.type, header.size)] return np.dtype([(f, getattr(np, nt)) for f, nt in zip(header.fields, np_types)]) def to_pcd_bin(self) -> npt.NDArray[np.float32]: """ Converts pointcloud to .pcd.bin format. :return: , the point cloud in .pcd.bin format. """ lidar_fields = ['x', 'y', 'z', 'intensity', 'ring'] return np.array([np.array(self.points[f], dtype=np.float32) for f in lidar_fields]) def to_pcd_bin2(self) -> npt.NDArray[np.float32]: """ Converts pointcloud to .pcd.bin2 format. :return: , the point cloud in .pcd.bin2 format. """ lidar_fields = ['x', 'y', 'z', 'intensity', 'ring', 'lidar_info'] return np.array([np.array(self.points[f], dtype=np.float32) for f in lidar_fields])