File size: 6,335 Bytes
663494c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
from __future__ import annotations
from io import BytesIO
from typing import IO, Any, List, NamedTuple
import numpy as np
import numpy.typing as npt
class PointCloudHeader(NamedTuple):
"""Class for Point Cloud header."""
version: str
fields: List[str]
size: List[int]
type: List[str]
count: List[int] # type: ignore
width: int
height: int
viewpoint: List[int]
points: int
data: str
class PointCloud:
"""
Class for raw .pcd file.
"""
def __init__(self, header: PointCloudHeader, points: npt.NDArray[np.float64]) -> None:
"""
PointCloud.
:param header: Pointcloud header.
:param points: <np.ndarray, X, N>. X columns, N points.
"""
self._header = header
self._points = points
@property
def header(self) -> PointCloudHeader:
"""
Returns pointcloud header.
:return: A PointCloudHeader instance.
"""
return self._header
@property
def points(self) -> npt.NDArray[np.float64]:
"""
Returns points.
:return: <np.ndarray, X, N>. X columns, N points.
"""
return self._points
def save(self, file_path: str) -> None:
"""
Saves to .pcd file.
:param file_path: The path to the .pcd file.
"""
with open(file_path, 'wb') as fp:
fp.write('# .PCD v{} - Point Cloud Data file format\n'.format(self._header.version).encode('utf8'))
for field in self._header._fields:
value = getattr(self._header, field)
if isinstance(value, list):
text = ' '.join(map(str, value))
else:
text = str(value)
fp.write('{} {}\n'.format(field.upper(), text).encode('utf8'))
fp.write(self._points.tobytes())
@classmethod
def parse(cls, pcd_content: bytes) -> PointCloud:
"""
Parses the pointcloud from byte stream.
:param pcd_content: The byte stream that holds the pcd content.
:return: A PointCloud object.
"""
with BytesIO(pcd_content) as stream:
header = cls.parse_header(stream)
points = cls.parse_points(stream, header)
return cls(header, points)
@classmethod
def parse_from_file(cls, pcd_file: str) -> PointCloud:
"""
Parses the pointcloud from .pcd file on disk.
:param pcd_file: The path to the .pcd file.
:return: A PointCloud instance.
"""
with open(pcd_file, 'rb') as stream:
header = cls.parse_header(stream)
points = cls.parse_points(stream, header)
return cls(header, points)
@staticmethod
def parse_header(stream: IO[Any]) -> PointCloudHeader:
"""
Parses the header of a pointcloud from byte IO stream.
:param stream: Binary stream.
:return: A PointCloudHeader instance.
"""
headers_list = []
while True:
line = stream.readline().decode('utf8').strip()
if line.startswith('#'):
continue
columns = line.split()
key = columns[0].lower()
val = columns[1:] if len(columns) > 2 else columns[1]
headers_list.append((key, val))
if key == 'data':
break
headers = dict(headers_list)
headers['size'] = list(map(int, headers['size']))
headers['count'] = list(map(int, headers['count']))
headers['width'] = int(headers['width'])
headers['height'] = int(headers['height'])
headers['viewpoint'] = list(map(int, headers['viewpoint']))
headers['points'] = int(headers['points'])
header = PointCloudHeader(**headers)
if any([c != 1 for c in header.count]):
raise RuntimeError('"count" has to be 1')
if not len(header.fields) == len(header.size) == len(header.type) == len(header.count):
raise RuntimeError('fields/size/type/count field number are inconsistent')
return header
@staticmethod
def parse_points(stream: IO[Any], header: PointCloudHeader) -> npt.NDArray[np.float64]:
"""
Parses points from byte IO stream.
:param stream: Byte stream that holds the points.
:param header: <np.ndarray, X, N>. A numpy array that has X columns(features), N points.
:return: Points of Point Cloud.
"""
if header.data != 'binary':
raise RuntimeError('Un-supported data foramt: {}. "binary" is expected.'.format(header.data))
# There is garbage data at the end of the stream, usually all b'\x00'.
row_type = PointCloud.np_type(header)
length = row_type.itemsize * header.points
buff = stream.read(length)
if len(buff) != length:
raise RuntimeError('Incomplete pointcloud stream: {} bytes expected, {} got'.format(length, len(buff)))
points = np.frombuffer(buff, row_type)
return points
@staticmethod
def np_type(header: PointCloudHeader) -> np.dtype: # type: ignore
"""
Helper function that translate column types in pointcloud to np types.
:param header: A PointCloudHeader object.
:return: np.dtype that holds the X features.
"""
type_mapping = {'I': 'int', 'U': 'uint', 'F': 'float'}
np_types = [type_mapping[t] + str(int(s) * 8) for t, s in zip(header.type, header.size)]
return np.dtype([(f, getattr(np, nt)) for f, nt in zip(header.fields, np_types)])
def to_pcd_bin(self) -> npt.NDArray[np.float32]:
"""
Converts pointcloud to .pcd.bin format.
:return: <np.float32, 5, N>, the point cloud in .pcd.bin format.
"""
lidar_fields = ['x', 'y', 'z', 'intensity', 'ring']
return np.array([np.array(self.points[f], dtype=np.float32) for f in lidar_fields])
def to_pcd_bin2(self) -> npt.NDArray[np.float32]:
"""
Converts pointcloud to .pcd.bin2 format.
:return: <np.float32, 6, N>, the point cloud in .pcd.bin2 format.
"""
lidar_fields = ['x', 'y', 'z', 'intensity', 'ring', 'lidar_info']
return np.array([np.array(self.points[f], dtype=np.float32) for f in lidar_fields])
|