unknownuser6666's picture
Upload folder using huggingface_hub
663494c verified
from __future__ import annotations
from io import BytesIO
from typing import IO, Any, List, NamedTuple
import numpy as np
import numpy.typing as npt
class PointCloudHeader(NamedTuple):
"""Class for Point Cloud header."""
version: str
fields: List[str]
size: List[int]
type: List[str]
count: List[int] # type: ignore
width: int
height: int
viewpoint: List[int]
points: int
data: str
class PointCloud:
"""
Class for raw .pcd file.
"""
def __init__(self, header: PointCloudHeader, points: npt.NDArray[np.float64]) -> None:
"""
PointCloud.
:param header: Pointcloud header.
:param points: <np.ndarray, X, N>. X columns, N points.
"""
self._header = header
self._points = points
@property
def header(self) -> PointCloudHeader:
"""
Returns pointcloud header.
:return: A PointCloudHeader instance.
"""
return self._header
@property
def points(self) -> npt.NDArray[np.float64]:
"""
Returns points.
:return: <np.ndarray, X, N>. X columns, N points.
"""
return self._points
def save(self, file_path: str) -> None:
"""
Saves to .pcd file.
:param file_path: The path to the .pcd file.
"""
with open(file_path, 'wb') as fp:
fp.write('# .PCD v{} - Point Cloud Data file format\n'.format(self._header.version).encode('utf8'))
for field in self._header._fields:
value = getattr(self._header, field)
if isinstance(value, list):
text = ' '.join(map(str, value))
else:
text = str(value)
fp.write('{} {}\n'.format(field.upper(), text).encode('utf8'))
fp.write(self._points.tobytes())
@classmethod
def parse(cls, pcd_content: bytes) -> PointCloud:
"""
Parses the pointcloud from byte stream.
:param pcd_content: The byte stream that holds the pcd content.
:return: A PointCloud object.
"""
with BytesIO(pcd_content) as stream:
header = cls.parse_header(stream)
points = cls.parse_points(stream, header)
return cls(header, points)
@classmethod
def parse_from_file(cls, pcd_file: str) -> PointCloud:
"""
Parses the pointcloud from .pcd file on disk.
:param pcd_file: The path to the .pcd file.
:return: A PointCloud instance.
"""
with open(pcd_file, 'rb') as stream:
header = cls.parse_header(stream)
points = cls.parse_points(stream, header)
return cls(header, points)
@staticmethod
def parse_header(stream: IO[Any]) -> PointCloudHeader:
"""
Parses the header of a pointcloud from byte IO stream.
:param stream: Binary stream.
:return: A PointCloudHeader instance.
"""
headers_list = []
while True:
line = stream.readline().decode('utf8').strip()
if line.startswith('#'):
continue
columns = line.split()
key = columns[0].lower()
val = columns[1:] if len(columns) > 2 else columns[1]
headers_list.append((key, val))
if key == 'data':
break
headers = dict(headers_list)
headers['size'] = list(map(int, headers['size']))
headers['count'] = list(map(int, headers['count']))
headers['width'] = int(headers['width'])
headers['height'] = int(headers['height'])
headers['viewpoint'] = list(map(int, headers['viewpoint']))
headers['points'] = int(headers['points'])
header = PointCloudHeader(**headers)
if any([c != 1 for c in header.count]):
raise RuntimeError('"count" has to be 1')
if not len(header.fields) == len(header.size) == len(header.type) == len(header.count):
raise RuntimeError('fields/size/type/count field number are inconsistent')
return header
@staticmethod
def parse_points(stream: IO[Any], header: PointCloudHeader) -> npt.NDArray[np.float64]:
"""
Parses points from byte IO stream.
:param stream: Byte stream that holds the points.
:param header: <np.ndarray, X, N>. A numpy array that has X columns(features), N points.
:return: Points of Point Cloud.
"""
if header.data != 'binary':
raise RuntimeError('Un-supported data foramt: {}. "binary" is expected.'.format(header.data))
# There is garbage data at the end of the stream, usually all b'\x00'.
row_type = PointCloud.np_type(header)
length = row_type.itemsize * header.points
buff = stream.read(length)
if len(buff) != length:
raise RuntimeError('Incomplete pointcloud stream: {} bytes expected, {} got'.format(length, len(buff)))
points = np.frombuffer(buff, row_type)
return points
@staticmethod
def np_type(header: PointCloudHeader) -> np.dtype: # type: ignore
"""
Helper function that translate column types in pointcloud to np types.
:param header: A PointCloudHeader object.
:return: np.dtype that holds the X features.
"""
type_mapping = {'I': 'int', 'U': 'uint', 'F': 'float'}
np_types = [type_mapping[t] + str(int(s) * 8) for t, s in zip(header.type, header.size)]
return np.dtype([(f, getattr(np, nt)) for f, nt in zip(header.fields, np_types)])
def to_pcd_bin(self) -> npt.NDArray[np.float32]:
"""
Converts pointcloud to .pcd.bin format.
:return: <np.float32, 5, N>, the point cloud in .pcd.bin format.
"""
lidar_fields = ['x', 'y', 'z', 'intensity', 'ring']
return np.array([np.array(self.points[f], dtype=np.float32) for f in lidar_fields])
def to_pcd_bin2(self) -> npt.NDArray[np.float32]:
"""
Converts pointcloud to .pcd.bin2 format.
:return: <np.float32, 6, N>, the point cloud in .pcd.bin2 format.
"""
lidar_fields = ['x', 'y', 'z', 'intensity', 'ring', 'lidar_info']
return np.array([np.array(self.points[f], dtype=np.float32) for f in lidar_fields])