#!/usr/bin/env python3 from __future__ import annotations import argparse import json import sys from pathlib import Path import numpy as np DEFAULT_SEGMENTATOR_CPP = Path('/mnt/data/AODUOLI/PAMI2026/_codex_research/segmentator/csrc/segmentator.cpp') DEFAULT_SEGMENTATOR_BUILD_DIR = Path('/mnt/data/AODUOLI/PAMI2026/_codex_research/segmentator/csrc/torch_build') def canonical_generate_superpoints(coords, normals=None, voxel_size=0.12, normal_bins=8): coords = np.asarray(coords, dtype=np.float32) coord_min = coords.min(axis=0, keepdims=True) voxel_coord = np.floor((coords - coord_min) / max(float(voxel_size), 1e-4)).astype(np.int64) if normals is not None and len(normals) == len(coords): normals = np.asarray(normals, dtype=np.float32) normals = normals / (np.linalg.norm(normals, axis=1, keepdims=True) + 1e-8) normal_q = np.floor((normals + 1.0) * 0.5 * normal_bins).astype(np.int64) normal_q = np.clip(normal_q, 0, normal_bins) tokens = np.concatenate([voxel_coord, normal_q], axis=1) else: tokens = voxel_coord _, inverse = np.unique(tokens, axis=0, return_inverse=True) return inverse.astype(np.int32) def load_segmentator_module(segmentator_cpp: Path, build_dir: Path, verbose: bool = False): from torch.utils.cpp_extension import load if not segmentator_cpp.is_file(): raise FileNotFoundError(f'Segmentator source not found: {segmentator_cpp}') build_dir.mkdir(parents=True, exist_ok=True) return load( name='libsegmentator_dyn', sources=[str(segmentator_cpp)], build_directory=str(build_dir), extra_cflags=['-O3'], verbose=verbose, ) def segmentator_generate_superpoints(coords, normals, knn_k=50, k_thresh=0.01, seg_min_verts=20, segmentator_cpp=DEFAULT_SEGMENTATOR_CPP, build_dir=DEFAULT_SEGMENTATOR_BUILD_DIR, build_verbose=False): import torch from torch_cluster import knn_graph coords = np.ascontiguousarray(np.asarray(coords, dtype=np.float32)) normals = np.ascontiguousarray(np.asarray(normals, dtype=np.float32)) if coords.shape != normals.shape: raise ValueError(f'coords/normals mismatch: {coords.shape} vs {normals.shape}') normals = normals / (np.linalg.norm(normals, axis=1, keepdims=True) + 1e-8) seg = load_segmentator_module(segmentator_cpp=Path(segmentator_cpp), build_dir=Path(build_dir), verbose=build_verbose) pts = torch.from_numpy(coords) nrm = torch.from_numpy(normals) edges = knn_graph(pts, k=int(knn_k)).T.contiguous().to(dtype=torch.int64, device='cpu') labels = seg.segment_point(pts.contiguous(), nrm.contiguous(), edges, float(k_thresh), int(seg_min_verts)) labels = labels.cpu().numpy().reshape(-1).astype(np.int64) _, inverse = np.unique(labels, return_inverse=True) return inverse.astype(np.int32) def _iter_rooms(data_root: Path): for area in sorted(data_root.glob('Area_*')): if not area.is_dir(): continue for room in sorted(area.iterdir()): if room.is_dir() and (room / 'coord.npy').is_file(): yield room def generate_labels(room: Path, args) -> np.ndarray: coord = np.load(room / 'coord.npy') normals = None normal_file = room / 'normal.npy' if normal_file.is_file(): normals = np.load(normal_file) if args.method == 'canonical': return canonical_generate_superpoints(coord, normals=normals, voxel_size=args.voxel, normal_bins=args.normal_bins) if normals is None: raise FileNotFoundError(f'normal.npy is required for segmentator method: {room}') return segmentator_generate_superpoints( coord, normals, knn_k=args.knn_k, k_thresh=args.k_thresh, seg_min_verts=args.seg_min_verts, segmentator_cpp=args.segmentator_cpp, build_dir=args.segmentator_build_dir, build_verbose=args.build_verbose, ) def main() -> None: here = Path(__file__).resolve().parent default_root = (here.parent.parent / '_work_biptv3' / 'pointcept_framework' / 'data' / 's3dis_official').resolve() ap = argparse.ArgumentParser(description='Generate S3DIS superpoint.npy using either segmentator or canonical fallback') ap.add_argument('--data_root', type=Path, default=default_root if default_root.is_dir() else Path('.')) ap.add_argument('--method', choices=['segmentator', 'canonical'], default='segmentator') ap.add_argument('--voxel', type=float, default=0.12) ap.add_argument('--normal-bins', type=int, default=8, dest='normal_bins') ap.add_argument('--knn_k', type=int, default=50) ap.add_argument('--k_thresh', type=float, default=0.01) ap.add_argument('--seg_min_verts', type=int, default=20) ap.add_argument('--segmentator_cpp', type=Path, default=DEFAULT_SEGMENTATOR_CPP) ap.add_argument('--segmentator_build_dir', type=Path, default=DEFAULT_SEGMENTATOR_BUILD_DIR) ap.add_argument('--build_verbose', action='store_true') ap.add_argument('--write', action='store_true') ap.add_argument('--output_root', type=Path, default=None) ap.add_argument('--room', type=str, default=None) args = ap.parse_args() root: Path = args.data_root if not root.is_dir(): print('data_root does not exist:', root, file=sys.stderr) sys.exit(1) if args.room: room_path = root / args.room if not (room_path / 'coord.npy').is_file(): print('Missing coord.npy:', room_path / 'coord.npy', file=sys.stderr) sys.exit(1) rooms = [room_path] else: rooms = list(_iter_rooms(root)) if not rooms: print('No rooms found', file=sys.stderr) sys.exit(1) print(json.dumps({ 'method': args.method, 'rooms': len(rooms), 'write': bool(args.write), 'knn_k': args.knn_k, 'k_thresh': args.k_thresh, 'seg_min_verts': args.seg_min_verts, 'voxel': args.voxel, 'normal_bins': args.normal_bins, }, ensure_ascii=False)) for room in rooms: labels = generate_labels(room, args) uniq, cnt = np.unique(labels, return_counts=True) disk_sp_path = room / 'superpoint.npy' output_path = disk_sp_path if args.output_root is None else args.output_root / room.relative_to(root) / 'superpoint.npy' info = { 'room': str(room.relative_to(root)), 'points': int(labels.shape[0]), 'num_superpoints': int(uniq.size), 'mean_points_per_superpoint': float(cnt.mean()), 'max_points_per_superpoint': int(cnt.max()), } if disk_sp_path.is_file(): old = np.load(disk_sp_path).reshape(-1) if len(old) == len(labels): old_uniq, old_cnt = np.unique(old, return_counts=True) info['old_num_superpoints'] = int(old_uniq.size) info['old_mean_points_per_superpoint'] = float(old_cnt.mean()) print(json.dumps(info, ensure_ascii=False)) if args.write: output_path.parent.mkdir(parents=True, exist_ok=True) np.save(str(output_path), labels.astype(np.int32)) print('wrote', output_path) if not args.write: print('[DRY-RUN] No files written. Add --write to materialize sidecar or dataset labels.') if __name__ == '__main__': main()