biptv3 / code /superpoint_ops /scripts /generate_s3dis_superpoints.py
YYYYYYUUU's picture
Add core reproduction code (binarization layers, PTv3, superpoint ops, min-repro pack)
7b95dc2 verified
Raw
History Blame Contribute Delete
7.57 kB
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
import numpy as np
DEFAULT_SEGMENTATOR_CPP = Path('/mnt/data/AODUOLI/PAMI2026/_codex_research/segmentator/csrc/segmentator.cpp')
DEFAULT_SEGMENTATOR_BUILD_DIR = Path('/mnt/data/AODUOLI/PAMI2026/_codex_research/segmentator/csrc/torch_build')
def canonical_generate_superpoints(coords, normals=None, voxel_size=0.12, normal_bins=8):
coords = np.asarray(coords, dtype=np.float32)
coord_min = coords.min(axis=0, keepdims=True)
voxel_coord = np.floor((coords - coord_min) / max(float(voxel_size), 1e-4)).astype(np.int64)
if normals is not None and len(normals) == len(coords):
normals = np.asarray(normals, dtype=np.float32)
normals = normals / (np.linalg.norm(normals, axis=1, keepdims=True) + 1e-8)
normal_q = np.floor((normals + 1.0) * 0.5 * normal_bins).astype(np.int64)
normal_q = np.clip(normal_q, 0, normal_bins)
tokens = np.concatenate([voxel_coord, normal_q], axis=1)
else:
tokens = voxel_coord
_, inverse = np.unique(tokens, axis=0, return_inverse=True)
return inverse.astype(np.int32)
def load_segmentator_module(segmentator_cpp: Path, build_dir: Path, verbose: bool = False):
from torch.utils.cpp_extension import load
if not segmentator_cpp.is_file():
raise FileNotFoundError(f'Segmentator source not found: {segmentator_cpp}')
build_dir.mkdir(parents=True, exist_ok=True)
return load(
name='libsegmentator_dyn',
sources=[str(segmentator_cpp)],
build_directory=str(build_dir),
extra_cflags=['-O3'],
verbose=verbose,
)
def segmentator_generate_superpoints(coords, normals, knn_k=50, k_thresh=0.01, seg_min_verts=20, segmentator_cpp=DEFAULT_SEGMENTATOR_CPP, build_dir=DEFAULT_SEGMENTATOR_BUILD_DIR, build_verbose=False):
import torch
from torch_cluster import knn_graph
coords = np.ascontiguousarray(np.asarray(coords, dtype=np.float32))
normals = np.ascontiguousarray(np.asarray(normals, dtype=np.float32))
if coords.shape != normals.shape:
raise ValueError(f'coords/normals mismatch: {coords.shape} vs {normals.shape}')
normals = normals / (np.linalg.norm(normals, axis=1, keepdims=True) + 1e-8)
seg = load_segmentator_module(segmentator_cpp=Path(segmentator_cpp), build_dir=Path(build_dir), verbose=build_verbose)
pts = torch.from_numpy(coords)
nrm = torch.from_numpy(normals)
edges = knn_graph(pts, k=int(knn_k)).T.contiguous().to(dtype=torch.int64, device='cpu')
labels = seg.segment_point(pts.contiguous(), nrm.contiguous(), edges, float(k_thresh), int(seg_min_verts))
labels = labels.cpu().numpy().reshape(-1).astype(np.int64)
_, inverse = np.unique(labels, return_inverse=True)
return inverse.astype(np.int32)
def _iter_rooms(data_root: Path):
for area in sorted(data_root.glob('Area_*')):
if not area.is_dir():
continue
for room in sorted(area.iterdir()):
if room.is_dir() and (room / 'coord.npy').is_file():
yield room
def generate_labels(room: Path, args) -> np.ndarray:
coord = np.load(room / 'coord.npy')
normals = None
normal_file = room / 'normal.npy'
if normal_file.is_file():
normals = np.load(normal_file)
if args.method == 'canonical':
return canonical_generate_superpoints(coord, normals=normals, voxel_size=args.voxel, normal_bins=args.normal_bins)
if normals is None:
raise FileNotFoundError(f'normal.npy is required for segmentator method: {room}')
return segmentator_generate_superpoints(
coord,
normals,
knn_k=args.knn_k,
k_thresh=args.k_thresh,
seg_min_verts=args.seg_min_verts,
segmentator_cpp=args.segmentator_cpp,
build_dir=args.segmentator_build_dir,
build_verbose=args.build_verbose,
)
def main() -> None:
here = Path(__file__).resolve().parent
default_root = (here.parent.parent / '_work_biptv3' / 'pointcept_framework' / 'data' / 's3dis_official').resolve()
ap = argparse.ArgumentParser(description='Generate S3DIS superpoint.npy using either segmentator or canonical fallback')
ap.add_argument('--data_root', type=Path, default=default_root if default_root.is_dir() else Path('.'))
ap.add_argument('--method', choices=['segmentator', 'canonical'], default='segmentator')
ap.add_argument('--voxel', type=float, default=0.12)
ap.add_argument('--normal-bins', type=int, default=8, dest='normal_bins')
ap.add_argument('--knn_k', type=int, default=50)
ap.add_argument('--k_thresh', type=float, default=0.01)
ap.add_argument('--seg_min_verts', type=int, default=20)
ap.add_argument('--segmentator_cpp', type=Path, default=DEFAULT_SEGMENTATOR_CPP)
ap.add_argument('--segmentator_build_dir', type=Path, default=DEFAULT_SEGMENTATOR_BUILD_DIR)
ap.add_argument('--build_verbose', action='store_true')
ap.add_argument('--write', action='store_true')
ap.add_argument('--output_root', type=Path, default=None)
ap.add_argument('--room', type=str, default=None)
args = ap.parse_args()
root: Path = args.data_root
if not root.is_dir():
print('data_root does not exist:', root, file=sys.stderr)
sys.exit(1)
if args.room:
room_path = root / args.room
if not (room_path / 'coord.npy').is_file():
print('Missing coord.npy:', room_path / 'coord.npy', file=sys.stderr)
sys.exit(1)
rooms = [room_path]
else:
rooms = list(_iter_rooms(root))
if not rooms:
print('No rooms found', file=sys.stderr)
sys.exit(1)
print(json.dumps({
'method': args.method,
'rooms': len(rooms),
'write': bool(args.write),
'knn_k': args.knn_k,
'k_thresh': args.k_thresh,
'seg_min_verts': args.seg_min_verts,
'voxel': args.voxel,
'normal_bins': args.normal_bins,
}, ensure_ascii=False))
for room in rooms:
labels = generate_labels(room, args)
uniq, cnt = np.unique(labels, return_counts=True)
disk_sp_path = room / 'superpoint.npy'
output_path = disk_sp_path if args.output_root is None else args.output_root / room.relative_to(root) / 'superpoint.npy'
info = {
'room': str(room.relative_to(root)),
'points': int(labels.shape[0]),
'num_superpoints': int(uniq.size),
'mean_points_per_superpoint': float(cnt.mean()),
'max_points_per_superpoint': int(cnt.max()),
}
if disk_sp_path.is_file():
old = np.load(disk_sp_path).reshape(-1)
if len(old) == len(labels):
old_uniq, old_cnt = np.unique(old, return_counts=True)
info['old_num_superpoints'] = int(old_uniq.size)
info['old_mean_points_per_superpoint'] = float(old_cnt.mean())
print(json.dumps(info, ensure_ascii=False))
if args.write:
output_path.parent.mkdir(parents=True, exist_ok=True)
np.save(str(output_path), labels.astype(np.int32))
print('wrote', output_path)
if not args.write:
print('[DRY-RUN] No files written. Add --write to materialize sidecar or dataset labels.')
if __name__ == '__main__':
main()