| """ |
| Utility helpers for loading BRep extractor-processed STEP data as PyG graphs. |
| """ |
| from __future__ import annotations |
|
|
| from pathlib import Path |
| from typing import Dict, Iterable, Tuple |
|
|
| import numpy as np |
| import torch |
| from torch_geometric.data import HeteroData |
|
|
| |
| LABELS: Dict[str, int] = {"pipe": 0, "elbow": 1, "tjoint": 2, "random": 3} |
| STEP_EXTS = ("*.step", "*.stp", "*.STEP", "*.STP") |
|
|
|
|
| def build_label_map(step_root: Path) -> Dict[str, int]: |
| """ |
| Scan the STEP directory tree (containing /pipe, /elbow, /tjoint, ...) |
| and build a mapping from file stem to integer label. |
| """ |
| mapping: Dict[str, int] = {} |
| for cls, label in LABELS.items(): |
| cls_dir = step_root / cls |
| if not cls_dir.exists(): |
| continue |
| for ext in STEP_EXTS: |
| for file in cls_dir.glob(ext): |
| mapping[file.stem] = label |
| if not mapping: |
| raise RuntimeError(f"No STEP files found under {step_root} for any of {tuple(LABELS)}") |
| return mapping |
|
|
|
|
| def _flatten(arr: np.ndarray) -> np.ndarray: |
| return np.asarray(arr, dtype=np.float32).reshape(arr.shape[0], -1) |
|
|
| def _face_grid_stats(face_grids: np.ndarray) -> np.ndarray: |
| """ |
| Summarize face point grids into compact stats per face. |
| Returns [F, 10]: xyz_mean (3), xyz_std (3), nrm_mean (3), mask_frac (1). |
| """ |
| face_grids = np.asarray(face_grids, dtype=np.float32) |
| f = face_grids.shape[0] |
| xyz = face_grids[:, 0:3, :, :].reshape(f, 3, -1) |
| nrm = face_grids[:, 3:6, :, :].reshape(f, 3, -1) |
| msk = face_grids[:, 6, :, :].reshape(f, -1) |
|
|
| mask = (msk > 0.5).astype(np.float32) |
| mask_frac = mask.mean(axis=1, keepdims=True) |
| w = mask / (mask.sum(axis=1, keepdims=True) + 1e-6) |
|
|
| xyz_mean = (xyz * w[:, None, :]).sum(axis=2) |
| xyz_var = (w[:, None, :] * (xyz - xyz_mean[:, :, None]) ** 2).sum(axis=2) |
| xyz_std = np.sqrt(np.maximum(xyz_var, 1e-12)) |
| nrm_mean = (nrm * w[:, None, :]).sum(axis=2) |
| return np.concatenate([xyz_mean, xyz_std, nrm_mean, mask_frac], axis=1) |
|
|
| def compute_global_geom_features(data) -> np.ndarray: |
| """ |
| Compute compact global geometry descriptors from face/coedge point samples. |
| Returns [5] float32: pca_ev_ratio_1/2/3, line_fit_rmse, plane_fit_rmse. |
| """ |
| points = [] |
| face_grids = np.asarray(data["face_point_grids"], dtype=np.float32) |
| if face_grids.size: |
| xyz = face_grids[:, 0:3, :, :].transpose(0, 2, 3, 1).reshape(-1, 3) |
| mask = face_grids[:, 6, :, :].reshape(-1) > 0.5 |
| if mask.any(): |
| points.append(xyz[mask]) |
|
|
| coedge_grids = np.asarray(data["coedge_point_grids"], dtype=np.float32) |
| if coedge_grids.size: |
| co_xyz = coedge_grids[:, 0:3, :].transpose(0, 2, 1).reshape(-1, 3) |
| points.append(co_xyz) |
|
|
| if not points: |
| return np.zeros(5, dtype=np.float32) |
|
|
| pts = np.concatenate(points, axis=0) |
| if pts.shape[0] < 3: |
| return np.zeros(5, dtype=np.float32) |
| pts = pts[np.isfinite(pts).all(axis=1)] |
| if pts.shape[0] < 3: |
| return np.zeros(5, dtype=np.float32) |
|
|
| mean = pts.mean(axis=0, keepdims=True) |
| centered = pts - mean |
| scale = np.sqrt(np.mean(np.sum(centered ** 2, axis=1))) |
| centered = centered / (scale + 1e-6) |
| cov = (centered.T @ centered) / max(1, centered.shape[0]) |
| if not np.isfinite(cov).all(): |
| return np.zeros(5, dtype=np.float32) |
|
|
| ev = np.linalg.eigvalsh(cov) |
| ev = np.sort(ev)[::-1] |
| ev = np.maximum(ev, 0.0) |
| total = ev.sum() |
| if not np.isfinite(total) or total <= 0.0: |
| return np.zeros(5, dtype=np.float32) |
|
|
| ratios = ev / total |
| line_rmse = np.sqrt(max(ev[1] + ev[2], 0.0)) |
| plane_rmse = np.sqrt(max(ev[2], 0.0)) |
| feats = np.array( |
| [ratios[0], ratios[1], ratios[2], line_rmse, plane_rmse], |
| dtype=np.float32, |
| ) |
| if not np.isfinite(feats).all(): |
| return np.zeros(5, dtype=np.float32) |
| return feats |
|
|
| def load_coedge_arrays(npz_path: Path) -> Dict[str, np.ndarray]: |
| """ |
| Load node features and adjacency indices from a BRep extractor npz. |
| Returns a dict with coedge/face/edge/global features and topology arrays. |
| """ |
| with np.load(npz_path) as data: |
| coedge_feats = _flatten(data["coedge_features"]) |
| scale = np.asarray(data["coedge_scale_factors"], dtype=np.float32)[:, None] |
| reverse = np.asarray(data["coedge_reverse_flags"], dtype=np.float32)[:, None] |
| point_grids = _flatten(data["coedge_point_grids"]) |
| lcs = _flatten(data["coedge_lcs"]) |
|
|
| face_idx = np.asarray(data["face"], dtype=np.int64) |
| edge_idx = np.asarray(data["edge"], dtype=np.int64) |
| face_feats = np.asarray(data["face_features"], dtype=np.float32) |
| edge_feats = np.asarray(data["edge_features"], dtype=np.float32) |
|
|
| face_grid_stats = _face_grid_stats(data["face_point_grids"]) |
|
|
| coedge_x = np.concatenate( |
| [coedge_feats, scale, reverse, point_grids, lcs], axis=1 |
| ) |
| face_x = np.concatenate([face_feats, face_grid_stats], axis=1) |
| edge_x = edge_feats |
| next_index = np.asarray(data["next"], dtype=np.int64) |
| mate_index = np.asarray(data["mate"], dtype=np.int64) |
| global_features = compute_global_geom_features(data) |
|
|
| return { |
| "coedge_x": coedge_x, |
| "face_x": face_x, |
| "edge_x": edge_x, |
| "next": next_index, |
| "mate": mate_index, |
| "coedge_face": face_idx, |
| "coedge_edge": edge_idx, |
| "global_x": global_features, |
| } |
|
|
|
|
| def make_edge_index(source: np.ndarray, target: np.ndarray) -> torch.Tensor: |
| """ |
| Build a 2 x E tensor of edge indices (with both directions, deduplicated). |
| """ |
| pairs = np.stack([source, target], axis=1) |
| flipped = pairs[:, ::-1] |
| all_pairs = np.concatenate([pairs, flipped], axis=0) |
| all_pairs = np.unique(all_pairs, axis=0) |
| return torch.tensor(all_pairs.T, dtype=torch.long) |
|
|
| def make_directed_edge_index(source: np.ndarray, target: np.ndarray) -> torch.Tensor: |
| """ |
| Build a 2 x E tensor of directed edge indices (no deduplication). |
| """ |
| return torch.tensor(np.stack([source, target], axis=0), dtype=torch.long) |
|
|
| def make_bipartite_edge_index(source: np.ndarray, target: np.ndarray) -> torch.Tensor: |
| """ |
| Build a 2 x E tensor of directed bipartite edge indices (deduplicated). |
| """ |
| pairs = np.stack([source, target], axis=1) |
| pairs = np.unique(pairs, axis=0) |
| return torch.tensor(pairs.T, dtype=torch.long) |
|
|
| def make_heterodata( |
| coedge_x: np.ndarray, |
| face_x: np.ndarray, |
| edge_x: np.ndarray, |
| next_index: np.ndarray, |
| mate_index: np.ndarray, |
| coedge_face: np.ndarray, |
| coedge_edge: np.ndarray, |
| global_features: np.ndarray, |
| label: int | None, |
| norm_stats: Dict[str, Dict[str, np.ndarray | torch.Tensor]] | None = None, |
| ) -> HeteroData: |
| """ |
| Create a PyG HeteroData graph for the coedge features/relations. |
| When mean/std are provided the features are normalised element-wise. |
| """ |
| def _normalize(x_arr: np.ndarray, stats: Dict[str, np.ndarray | torch.Tensor] | None) -> torch.Tensor: |
| x_t = torch.tensor(x_arr, dtype=torch.float32) |
| if stats is None: |
| return x_t |
| mean = stats.get("mean") |
| std = stats.get("std") |
| if mean is None or std is None: |
| return x_t |
| mean_t = torch.as_tensor(mean, dtype=torch.float32) |
| std_t = torch.as_tensor(std, dtype=torch.float32) |
| return (x_t - mean_t) / std_t |
|
|
| coedge_stats = norm_stats.get("coedge") if norm_stats else None |
| face_stats = norm_stats.get("face") if norm_stats else None |
| edge_stats = norm_stats.get("edge") if norm_stats else None |
|
|
| x_coedge = _normalize(coedge_x, coedge_stats) |
| x_face = _normalize(face_x, face_stats) |
| x_edge = _normalize(edge_x, edge_stats) |
|
|
| idx = np.arange(coedge_x.shape[0], dtype=np.int64) |
| edge_next = make_directed_edge_index(idx, next_index) |
| edge_prev = make_directed_edge_index(next_index, idx) |
| edge_mate = make_edge_index(idx, mate_index) |
| edge_coedge_face = make_directed_edge_index(idx, coedge_face) |
| edge_face_coedge = make_directed_edge_index(coedge_face, idx) |
| edge_coedge_edge = make_directed_edge_index(idx, coedge_edge) |
| edge_edge_coedge = make_directed_edge_index(coedge_edge, idx) |
| edge_face_edge = make_bipartite_edge_index(coedge_face, coedge_edge) |
| edge_edge_face = make_bipartite_edge_index(coedge_edge, coedge_face) |
|
|
| data = HeteroData() |
| data["coedge"].x = x_coedge |
| data["face"].x = x_face |
| data["edge"].x = x_edge |
| data["global"].x = torch.tensor(global_features, dtype=torch.float32).view(1, -1) |
| data["coedge", "next", "coedge"].edge_index = edge_next |
| data["coedge", "prev", "coedge"].edge_index = edge_prev |
| data["coedge", "mate", "coedge"].edge_index = edge_mate |
| data["coedge", "to_face", "face"].edge_index = edge_coedge_face |
| data["face", "to_coedge", "coedge"].edge_index = edge_face_coedge |
| data["coedge", "to_edge", "edge"].edge_index = edge_coedge_edge |
| data["edge", "to_coedge", "coedge"].edge_index = edge_edge_coedge |
| data["face", "to_edge", "edge"].edge_index = edge_face_edge |
| data["edge", "to_face", "face"].edge_index = edge_edge_face |
| if label is not None: |
| data.y = torch.tensor([int(label)], dtype=torch.long) |
| return data |
|
|
|
|
| def compute_feature_stats(npz_paths: Iterable[Path]) -> Dict[str, np.ndarray]: |
| """ |
| Compute mean and std (per feature dimension) across all node features in the dataset. |
| """ |
| totals = {"coedge": 0, "face": 0, "edge": 0} |
| sum_vec: Dict[str, np.ndarray | None] = {"coedge": None, "face": None, "edge": None} |
| sum_sq: Dict[str, np.ndarray | None] = {"coedge": None, "face": None, "edge": None} |
|
|
| for path in npz_paths: |
| graph = load_coedge_arrays(path) |
| for key, x in (("coedge", graph["coedge_x"]), ("face", graph["face_x"]), ("edge", graph["edge_x"])): |
| if sum_vec[key] is None: |
| sum_vec[key] = np.zeros(x.shape[1], dtype=np.float64) |
| sum_sq[key] = np.zeros(x.shape[1], dtype=np.float64) |
| sum_vec[key] += x.sum(axis=0) |
| sum_sq[key] += (x * x).sum(axis=0) |
| totals[key] += x.shape[0] |
|
|
| out = {} |
| for key in ("coedge", "face", "edge"): |
| if sum_vec[key] is None or totals[key] == 0: |
| raise RuntimeError(f"Cannot compute feature stats: no {key} features observed.") |
| mean = sum_vec[key] / totals[key] |
| var = sum_sq[key] / totals[key] - mean * mean |
| var = np.maximum(var, 1e-12) |
| std = np.sqrt(var) |
| out[key] = {"mean": mean.astype(np.float32), "std": std.astype(np.float32)} |
| return out |
|
|