|
|
| from __future__ import annotations
|
|
|
| import argparse
|
| import json
|
| import sys
|
| from pathlib import Path
|
|
|
| import numpy as np
|
|
|
| DEFAULT_SEGMENTATOR_CPP = Path('/mnt/data/AODUOLI/PAMI2026/_codex_research/segmentator/csrc/segmentator.cpp')
|
| DEFAULT_SEGMENTATOR_BUILD_DIR = Path('/mnt/data/AODUOLI/PAMI2026/_codex_research/segmentator/csrc/torch_build')
|
|
|
|
|
| def canonical_generate_superpoints(coords, normals=None, voxel_size=0.12, normal_bins=8):
|
| coords = np.asarray(coords, dtype=np.float32)
|
| coord_min = coords.min(axis=0, keepdims=True)
|
| voxel_coord = np.floor((coords - coord_min) / max(float(voxel_size), 1e-4)).astype(np.int64)
|
| if normals is not None and len(normals) == len(coords):
|
| normals = np.asarray(normals, dtype=np.float32)
|
| normals = normals / (np.linalg.norm(normals, axis=1, keepdims=True) + 1e-8)
|
| normal_q = np.floor((normals + 1.0) * 0.5 * normal_bins).astype(np.int64)
|
| normal_q = np.clip(normal_q, 0, normal_bins)
|
| tokens = np.concatenate([voxel_coord, normal_q], axis=1)
|
| else:
|
| tokens = voxel_coord
|
| _, inverse = np.unique(tokens, axis=0, return_inverse=True)
|
| return inverse.astype(np.int32)
|
|
|
|
|
| def load_segmentator_module(segmentator_cpp: Path, build_dir: Path, verbose: bool = False):
|
| from torch.utils.cpp_extension import load
|
|
|
| if not segmentator_cpp.is_file():
|
| raise FileNotFoundError(f'Segmentator source not found: {segmentator_cpp}')
|
| build_dir.mkdir(parents=True, exist_ok=True)
|
| return load(
|
| name='libsegmentator_dyn',
|
| sources=[str(segmentator_cpp)],
|
| build_directory=str(build_dir),
|
| extra_cflags=['-O3'],
|
| verbose=verbose,
|
| )
|
|
|
|
|
| def segmentator_generate_superpoints(coords, normals, knn_k=50, k_thresh=0.01, seg_min_verts=20, segmentator_cpp=DEFAULT_SEGMENTATOR_CPP, build_dir=DEFAULT_SEGMENTATOR_BUILD_DIR, build_verbose=False):
|
| import torch
|
| from torch_cluster import knn_graph
|
|
|
| coords = np.ascontiguousarray(np.asarray(coords, dtype=np.float32))
|
| normals = np.ascontiguousarray(np.asarray(normals, dtype=np.float32))
|
| if coords.shape != normals.shape:
|
| raise ValueError(f'coords/normals mismatch: {coords.shape} vs {normals.shape}')
|
| normals = normals / (np.linalg.norm(normals, axis=1, keepdims=True) + 1e-8)
|
|
|
| seg = load_segmentator_module(segmentator_cpp=Path(segmentator_cpp), build_dir=Path(build_dir), verbose=build_verbose)
|
| pts = torch.from_numpy(coords)
|
| nrm = torch.from_numpy(normals)
|
| edges = knn_graph(pts, k=int(knn_k)).T.contiguous().to(dtype=torch.int64, device='cpu')
|
| labels = seg.segment_point(pts.contiguous(), nrm.contiguous(), edges, float(k_thresh), int(seg_min_verts))
|
| labels = labels.cpu().numpy().reshape(-1).astype(np.int64)
|
| _, inverse = np.unique(labels, return_inverse=True)
|
| return inverse.astype(np.int32)
|
|
|
|
|
| def _iter_rooms(data_root: Path):
|
| for area in sorted(data_root.glob('Area_*')):
|
| if not area.is_dir():
|
| continue
|
| for room in sorted(area.iterdir()):
|
| if room.is_dir() and (room / 'coord.npy').is_file():
|
| yield room
|
|
|
|
|
| def generate_labels(room: Path, args) -> np.ndarray:
|
| coord = np.load(room / 'coord.npy')
|
| normals = None
|
| normal_file = room / 'normal.npy'
|
| if normal_file.is_file():
|
| normals = np.load(normal_file)
|
|
|
| if args.method == 'canonical':
|
| return canonical_generate_superpoints(coord, normals=normals, voxel_size=args.voxel, normal_bins=args.normal_bins)
|
|
|
| if normals is None:
|
| raise FileNotFoundError(f'normal.npy is required for segmentator method: {room}')
|
| return segmentator_generate_superpoints(
|
| coord,
|
| normals,
|
| knn_k=args.knn_k,
|
| k_thresh=args.k_thresh,
|
| seg_min_verts=args.seg_min_verts,
|
| segmentator_cpp=args.segmentator_cpp,
|
| build_dir=args.segmentator_build_dir,
|
| build_verbose=args.build_verbose,
|
| )
|
|
|
|
|
| def main() -> None:
|
| here = Path(__file__).resolve().parent
|
| default_root = (here.parent.parent / '_work_biptv3' / 'pointcept_framework' / 'data' / 's3dis_official').resolve()
|
|
|
| ap = argparse.ArgumentParser(description='Generate S3DIS superpoint.npy using either segmentator or canonical fallback')
|
| ap.add_argument('--data_root', type=Path, default=default_root if default_root.is_dir() else Path('.'))
|
| ap.add_argument('--method', choices=['segmentator', 'canonical'], default='segmentator')
|
| ap.add_argument('--voxel', type=float, default=0.12)
|
| ap.add_argument('--normal-bins', type=int, default=8, dest='normal_bins')
|
| ap.add_argument('--knn_k', type=int, default=50)
|
| ap.add_argument('--k_thresh', type=float, default=0.01)
|
| ap.add_argument('--seg_min_verts', type=int, default=20)
|
| ap.add_argument('--segmentator_cpp', type=Path, default=DEFAULT_SEGMENTATOR_CPP)
|
| ap.add_argument('--segmentator_build_dir', type=Path, default=DEFAULT_SEGMENTATOR_BUILD_DIR)
|
| ap.add_argument('--build_verbose', action='store_true')
|
| ap.add_argument('--write', action='store_true')
|
| ap.add_argument('--output_root', type=Path, default=None)
|
| ap.add_argument('--room', type=str, default=None)
|
| args = ap.parse_args()
|
|
|
| root: Path = args.data_root
|
| if not root.is_dir():
|
| print('data_root does not exist:', root, file=sys.stderr)
|
| sys.exit(1)
|
|
|
| if args.room:
|
| room_path = root / args.room
|
| if not (room_path / 'coord.npy').is_file():
|
| print('Missing coord.npy:', room_path / 'coord.npy', file=sys.stderr)
|
| sys.exit(1)
|
| rooms = [room_path]
|
| else:
|
| rooms = list(_iter_rooms(root))
|
|
|
| if not rooms:
|
| print('No rooms found', file=sys.stderr)
|
| sys.exit(1)
|
|
|
| print(json.dumps({
|
| 'method': args.method,
|
| 'rooms': len(rooms),
|
| 'write': bool(args.write),
|
| 'knn_k': args.knn_k,
|
| 'k_thresh': args.k_thresh,
|
| 'seg_min_verts': args.seg_min_verts,
|
| 'voxel': args.voxel,
|
| 'normal_bins': args.normal_bins,
|
| }, ensure_ascii=False))
|
|
|
| for room in rooms:
|
| labels = generate_labels(room, args)
|
| uniq, cnt = np.unique(labels, return_counts=True)
|
| disk_sp_path = room / 'superpoint.npy'
|
| output_path = disk_sp_path if args.output_root is None else args.output_root / room.relative_to(root) / 'superpoint.npy'
|
| info = {
|
| 'room': str(room.relative_to(root)),
|
| 'points': int(labels.shape[0]),
|
| 'num_superpoints': int(uniq.size),
|
| 'mean_points_per_superpoint': float(cnt.mean()),
|
| 'max_points_per_superpoint': int(cnt.max()),
|
| }
|
| if disk_sp_path.is_file():
|
| old = np.load(disk_sp_path).reshape(-1)
|
| if len(old) == len(labels):
|
| old_uniq, old_cnt = np.unique(old, return_counts=True)
|
| info['old_num_superpoints'] = int(old_uniq.size)
|
| info['old_mean_points_per_superpoint'] = float(old_cnt.mean())
|
| print(json.dumps(info, ensure_ascii=False))
|
| if args.write:
|
| output_path.parent.mkdir(parents=True, exist_ok=True)
|
| np.save(str(output_path), labels.astype(np.int32))
|
| print('wrote', output_path)
|
|
|
| if not args.write:
|
| print('[DRY-RUN] No files written. Add --write to materialize sidecar or dataset labels.')
|
|
|
|
|
| if __name__ == '__main__':
|
| main()
|
|
|