biptv3 / code /superpoint_ops /batch_gen_superpoints.py
YYYYYYUUU's picture
Add core reproduction code (binarization layers, PTv3, superpoint ops, min-repro pack)
7b95dc2 verified
Raw
History Blame Contribute Delete
4.52 kB
"""Batch generate pycut superpoint.npy for all S3DIS rooms."""
import os
import sys
import time
import numpy as np
from scipy.spatial import cKDTree
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
LIBCP_DIR = os.path.join(SCRIPT_DIR, "_cut_pursuit", "build", "src")
sys.path.insert(0, LIBCP_DIR)
sys.path.insert(0, SCRIPT_DIR)
import libcp
from lib_geo import (
_normalize_xyz_np,
_normalize_normals_np,
_local_geom_features_chunked_np,
_build_adj_graph_np,
_edge_weights_chunked_np,
_relabel_contiguous_np,
)
S3DIS_ROOT = "/mnt/data/AODUOLI/_work_biptv3/pointcept_framework/data/s3dis_official"
def merge_small_components(xyz, labels, min_size=50):
counts = np.bincount(labels)
small_mask = counts[labels] < min_size
if not small_mask.any():
return labels
labels = labels.copy()
large_mask = ~small_mask
if not large_mask.any():
return labels
tree = cKDTree(xyz[large_mask])
_, nn_idx = tree.query(xyz[small_mask], k=1)
large_indices = np.where(large_mask)[0]
labels[small_mask] = labels[large_indices[nn_idx]]
return _relabel_contiguous_np(labels)
def generate_superpoints_pycut(
xyz, normals=None,
k_feat=10, k_adj=10, chunk_size=8192,
normal_scale=0.25, lam=0.03, sigma=0.5,
min_comp_weight=20, weight_decay=0.7,
merge_min_size=50,
):
n = xyz.shape[0]
xyz_norm = _normalize_xyz_np(xyz)
geom_feat = _local_geom_features_chunked_np(xyz_norm, k_feat=k_feat, chunk_size=chunk_size)
feat_parts = [geom_feat]
if normals is not None:
nn = _normalize_normals_np(normals)
feat_parts.append(nn * normal_scale)
Y = np.hstack(feat_parts).astype(np.float32)
src, dst = _build_adj_graph_np(xyz_norm, k_adj=k_adj, mutual=False, undirected=True)
ew = _edge_weights_chunked_np(Y.T, src, dst, lam=1.0, sigma=sigma)
components, in_component = libcp.cutpursuit(
Y,
src.astype(np.uint32),
dst.astype(np.uint32),
ew.astype(np.float32),
float(lam),
int(min_comp_weight),
0,
float(weight_decay),
)
labels = _relabel_contiguous_np(np.asarray(in_component, dtype=np.int32))
if merge_min_size > 0:
labels = merge_small_components(xyz, labels, min_size=merge_min_size)
return labels
if __name__ == "__main__":
out_root = os.path.join(SCRIPT_DIR, "outputs", "superpoint_pycut_all")
os.makedirs(out_root, exist_ok=True)
areas = sorted([d for d in os.listdir(S3DIS_ROOT) if d.startswith("Area_")])
total_rooms = 0
for area in areas:
area_dir = os.path.join(S3DIS_ROOT, area)
rooms = sorted([r for r in os.listdir(area_dir)
if os.path.isdir(os.path.join(area_dir, r))])
total_rooms += len(rooms)
print(f"Total: {total_rooms} rooms across {len(areas)} areas")
done = 0
t_global = time.time()
for area in areas:
area_dir = os.path.join(S3DIS_ROOT, area)
rooms = sorted([r for r in os.listdir(area_dir)
if os.path.isdir(os.path.join(area_dir, r))])
for room in rooms:
done += 1
room_dir = os.path.join(area_dir, room)
coord_path = os.path.join(room_dir, "coord.npy")
normal_path = os.path.join(room_dir, "normal.npy")
if not os.path.exists(coord_path):
print(f"[{done}/{total_rooms}] SKIP {area}/{room}: no coord.npy")
continue
out_dir = os.path.join(out_root, area, room)
os.makedirs(out_dir, exist_ok=True)
out_path = os.path.join(out_dir, "superpoint.npy")
if os.path.exists(out_path):
print(f"[{done}/{total_rooms}] EXISTS {area}/{room}")
continue
coord = np.load(coord_path).astype(np.float32)
normals = None
if os.path.exists(normal_path):
normals = np.load(normal_path).astype(np.float32)
t0 = time.time()
labels = generate_superpoints_pycut(
coord, normals=normals,
lam=0.03, sigma=0.5,
k_feat=10, k_adj=10,
merge_min_size=50,
)
dt = time.time() - t0
n_sp = int(labels.max()) + 1
np.save(out_path, labels)
print(f"[{done}/{total_rooms}] {area}/{room}: {coord.shape[0]} pts -> {n_sp} sp ({dt:.1f}s)")
print(f"\nDone! Total time: {time.time() - t_global:.0f}s")