"""Batch generate pycut superpoint.npy for all S3DIS rooms.""" import os import sys import time import numpy as np from scipy.spatial import cKDTree SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) LIBCP_DIR = os.path.join(SCRIPT_DIR, "_cut_pursuit", "build", "src") sys.path.insert(0, LIBCP_DIR) sys.path.insert(0, SCRIPT_DIR) import libcp from lib_geo import ( _normalize_xyz_np, _normalize_normals_np, _local_geom_features_chunked_np, _build_adj_graph_np, _edge_weights_chunked_np, _relabel_contiguous_np, ) S3DIS_ROOT = "/mnt/data/AODUOLI/_work_biptv3/pointcept_framework/data/s3dis_official" def merge_small_components(xyz, labels, min_size=50): counts = np.bincount(labels) small_mask = counts[labels] < min_size if not small_mask.any(): return labels labels = labels.copy() large_mask = ~small_mask if not large_mask.any(): return labels tree = cKDTree(xyz[large_mask]) _, nn_idx = tree.query(xyz[small_mask], k=1) large_indices = np.where(large_mask)[0] labels[small_mask] = labels[large_indices[nn_idx]] return _relabel_contiguous_np(labels) def generate_superpoints_pycut( xyz, normals=None, k_feat=10, k_adj=10, chunk_size=8192, normal_scale=0.25, lam=0.03, sigma=0.5, min_comp_weight=20, weight_decay=0.7, merge_min_size=50, ): n = xyz.shape[0] xyz_norm = _normalize_xyz_np(xyz) geom_feat = _local_geom_features_chunked_np(xyz_norm, k_feat=k_feat, chunk_size=chunk_size) feat_parts = [geom_feat] if normals is not None: nn = _normalize_normals_np(normals) feat_parts.append(nn * normal_scale) Y = np.hstack(feat_parts).astype(np.float32) src, dst = _build_adj_graph_np(xyz_norm, k_adj=k_adj, mutual=False, undirected=True) ew = _edge_weights_chunked_np(Y.T, src, dst, lam=1.0, sigma=sigma) components, in_component = libcp.cutpursuit( Y, src.astype(np.uint32), dst.astype(np.uint32), ew.astype(np.float32), float(lam), int(min_comp_weight), 0, float(weight_decay), ) labels = _relabel_contiguous_np(np.asarray(in_component, dtype=np.int32)) if merge_min_size > 0: labels = merge_small_components(xyz, labels, min_size=merge_min_size) return labels if __name__ == "__main__": out_root = os.path.join(SCRIPT_DIR, "outputs", "superpoint_pycut_all") os.makedirs(out_root, exist_ok=True) areas = sorted([d for d in os.listdir(S3DIS_ROOT) if d.startswith("Area_")]) total_rooms = 0 for area in areas: area_dir = os.path.join(S3DIS_ROOT, area) rooms = sorted([r for r in os.listdir(area_dir) if os.path.isdir(os.path.join(area_dir, r))]) total_rooms += len(rooms) print(f"Total: {total_rooms} rooms across {len(areas)} areas") done = 0 t_global = time.time() for area in areas: area_dir = os.path.join(S3DIS_ROOT, area) rooms = sorted([r for r in os.listdir(area_dir) if os.path.isdir(os.path.join(area_dir, r))]) for room in rooms: done += 1 room_dir = os.path.join(area_dir, room) coord_path = os.path.join(room_dir, "coord.npy") normal_path = os.path.join(room_dir, "normal.npy") if not os.path.exists(coord_path): print(f"[{done}/{total_rooms}] SKIP {area}/{room}: no coord.npy") continue out_dir = os.path.join(out_root, area, room) os.makedirs(out_dir, exist_ok=True) out_path = os.path.join(out_dir, "superpoint.npy") if os.path.exists(out_path): print(f"[{done}/{total_rooms}] EXISTS {area}/{room}") continue coord = np.load(coord_path).astype(np.float32) normals = None if os.path.exists(normal_path): normals = np.load(normal_path).astype(np.float32) t0 = time.time() labels = generate_superpoints_pycut( coord, normals=normals, lam=0.03, sigma=0.5, k_feat=10, k_adj=10, merge_min_size=50, ) dt = time.time() - t0 n_sp = int(labels.max()) + 1 np.save(out_path, labels) print(f"[{done}/{total_rooms}] {area}/{room}: {coord.shape[0]} pts -> {n_sp} sp ({dt:.1f}s)") print(f"\nDone! Total time: {time.time() - t_global:.0f}s")