import json import logging import time from pathlib import Path from typing import Any, Dict, Optional, Tuple import numpy as np logger = logging.getLogger(__name__) def normalize_shape(shape) -> Tuple[Any, Dict[str, Any]]: t0 = time.time() import cadquery as cq bb = shape.BoundingBox() cx = (bb.xmin + bb.xmax) / 2 cy = (bb.ymin + bb.ymax) / 2 cz = (bb.zmin + bb.zmax) / 2 shape = shape.translate((-cx, -cy, -cz)) bb = shape.BoundingBox() dims = {"X": bb.xlen, "Y": bb.ylen, "Z": bb.zlen} sorted_dims = sorted(dims.items(), key=lambda x: x[1], reverse=True) longest, second, shortest = sorted_dims[0][0], sorted_dims[1][0], sorted_dims[2][0] target_order = ["X", "Y", "Z"] current_order = [longest, second, shortest] rotation = _compute_alignment_rotation(current_order, target_order) if rotation is not None: rx, ry, rz = rotation if rx != 0 or ry != 0 or rz != 0: from OCP.gp import gp_Ax1, gp_Pnt, gp_Dir, gp_Trsf from OCP.BRepBuilderAPI import BRepBuilderAPI_Transform trsf = gp_Trsf() if rz != 0: trsf_z = gp_Trsf() trsf_z.SetRotation(gp_Ax1(gp_Pnt(0, 0, 0), gp_Dir(0, 0, 1)), np.radians(rz)) trsf.Multiply(trsf_z) if ry != 0: trsf_y = gp_Trsf() trsf_y.SetRotation(gp_Ax1(gp_Pnt(0, 0, 0), gp_Dir(0, 1, 0)), np.radians(ry)) trsf.Multiply(trsf_y) if rx != 0: trsf_x = gp_Trsf() trsf_x.SetRotation(gp_Ax1(gp_Pnt(0, 0, 0), gp_Dir(1, 0, 0)), np.radians(rx)) trsf.Multiply(trsf_x) builder = BRepBuilderAPI_Transform(shape.wrapped, trsf, True) builder.Build() shape = cq.Shape(builder.Shape()) bb = shape.BoundingBox() cx2 = (bb.xmin + bb.xmax) / 2 cy2 = (bb.ymin + bb.ymax) / 2 cz2 = (bb.zmin + bb.zmax) / 2 if abs(cx2) > 0.01 or abs(cy2) > 0.01 or abs(cz2) > 0.01: shape = shape.translate((-cx2, -cy2, -cz2)) transform_info = { "units": "mm", "translation_to_origin": [round(cx, 4), round(cy, 4), round(cz, 4)], "axis_alignment": current_order, "longest_axis": "X", } elapsed = time.time() - t0 logger.info(f"normalize_shape took {elapsed:.3f}s") return shape, transform_info def _compute_alignment_rotation(current, target): if current == target: return None c = current if c == ["Y", "X", "Z"]: return (0, 0, 90) elif c == ["Z", "Y", "X"]: return (0, 90, 0) elif c == ["Z", "X", "Y"]: return (90, 0, 0) elif c == ["X", "Z", "Y"]: return (90, 0, 0) elif c == ["Y", "Z", "X"]: return (0, 0, 90) else: return (0, 0, 0) def sample_surface_points(shape, n_points: int = 2048) -> np.ndarray: t0 = time.time() from OCP.BRepMesh import BRepMesh_IncrementalMesh from OCP.BRep import BRep_Tool from OCP.TopExp import TopExp_Explorer from OCP.TopAbs import TopAbs_FACE from OCP.TopLoc import TopLoc_Location from OCP.TopoDS import TopoDS mesh = BRepMesh_IncrementalMesh(shape.wrapped, 0.1, False, 0.1, True) mesh.Perform() all_points = [] all_areas = [] explorer = TopExp_Explorer(shape.wrapped, TopAbs_FACE) while explorer.More(): face = TopoDS.Face_s(explorer.Current()) loc = TopLoc_Location() tri = BRep_Tool.Triangulation_s(face, loc) if tri is not None: trsf = loc.Transformation() nodes = [] for i in range(1, tri.NbNodes() + 1): p = tri.Node(i) p.Transform(trsf) nodes.append([p.X(), p.Y(), p.Z()]) nodes = np.array(nodes) for i in range(1, tri.NbTriangles() + 1): t = tri.Triangle(i) n1, n2, n3 = t.Get() v0 = nodes[n1 - 1] v1 = nodes[n2 - 1] v2 = nodes[n3 - 1] area = 0.5 * np.linalg.norm(np.cross(v1 - v0, v2 - v0)) if area > 1e-12: all_points.append((v0, v1, v2)) all_areas.append(area) explorer.Next() if not all_points: logger.warning("No triangles found for surface sampling") return np.zeros((n_points, 3)) areas = np.array(all_areas) probs = areas / areas.sum() sampled = [] chosen = np.random.choice(len(all_points), size=n_points, p=probs) for idx in chosen: v0, v1, v2 = all_points[idx] r1, r2 = np.random.random(), np.random.random() if r1 + r2 > 1: r1, r2 = 1 - r1, 1 - r2 pt = v0 * (1 - r1 - r2) + v1 * r1 + v2 * r2 sampled.append(pt) result = np.array(sampled, dtype=np.float32) elapsed = time.time() - t0 logger.info(f"sample_surface_points ({n_points} pts) took {elapsed:.3f}s") return result def _occ_to_trimesh(shape): import trimesh from OCP.BRepMesh import BRepMesh_IncrementalMesh from OCP.BRep import BRep_Tool from OCP.TopExp import TopExp_Explorer from OCP.TopAbs import TopAbs_FACE from OCP.TopLoc import TopLoc_Location from OCP.TopoDS import TopoDS mesh_occ = BRepMesh_IncrementalMesh(shape.wrapped, 0.1, False, 0.1, True) mesh_occ.Perform() verts, faces = [], [] offset = 0 explorer = TopExp_Explorer(shape.wrapped, TopAbs_FACE) while explorer.More(): face = TopoDS.Face_s(explorer.Current()) loc = TopLoc_Location() tri = BRep_Tool.Triangulation_s(face, loc) if tri is not None: trsf = loc.Transformation() nodes = [] for i in range(1, tri.NbNodes() + 1): p = tri.Node(i) p.Transform(trsf) nodes.append([p.X(), p.Y(), p.Z()]) for i in range(1, tri.NbTriangles() + 1): t = tri.Triangle(i) n1, n2, n3 = t.Get() faces.append([n1 - 1 + offset, n2 - 1 + offset, n3 - 1 + offset]) verts.extend(nodes) offset += len(nodes) explorer.Next() if not verts: raise ValueError("No mesh triangles extracted from shape") m = trimesh.Trimesh( vertices=np.array(verts, dtype=np.float64), faces=np.array(faces, dtype=np.int64), process=True, ) m.fix_normals() return m def voxelize(shape, resolution: int = 64) -> np.ndarray: t0 = time.time() tri_mesh = _occ_to_trimesh(shape) bb = shape.BoundingBox() padding = 0.01 xs = np.linspace(bb.xmin - padding, bb.xmax + padding, resolution) ys = np.linspace(bb.ymin - padding, bb.ymax + padding, resolution) zs = np.linspace(bb.zmin - padding, bb.zmax + padding, resolution) grid_pts = np.stack(np.meshgrid(xs, ys, zs, indexing='ij'), axis=-1).reshape(-1, 3) inside = tri_mesh.contains(grid_pts) grid = inside.reshape(resolution, resolution, resolution) elapsed = time.time() - t0 logger.info(f"voxelize ({resolution}^3, trimesh+embree) took {elapsed:.3f}s, fill={grid.sum()}") return grid def voxelize_in_bbox(shape, bbox_min, bbox_max, resolution: int = 64) -> np.ndarray: t0 = time.time() tri_mesh = _occ_to_trimesh(shape) padding = 0.01 xs = np.linspace(bbox_min[0] - padding, bbox_max[0] + padding, resolution) ys = np.linspace(bbox_min[1] - padding, bbox_max[1] + padding, resolution) zs = np.linspace(bbox_min[2] - padding, bbox_max[2] + padding, resolution) grid_pts = np.stack(np.meshgrid(xs, ys, zs, indexing='ij'), axis=-1).reshape(-1, 3) inside = tri_mesh.contains(grid_pts) grid = inside.reshape(resolution, resolution, resolution) elapsed = time.time() - t0 logger.info(f"voxelize_in_bbox ({resolution}^3) took {elapsed:.3f}s, fill={grid.sum()}/{resolution**3}") return grid def generate_ground_truth( shape, output_dir: str, source_step: Optional[str] = None, ) -> Dict[str, Any]: t0 = time.time() output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) from .geometry import extract_properties props = extract_properties(shape) points = sample_surface_points(shape, n_points=2048) np.save(str(output_path / "surface_points.npy"), points) voxels = voxelize(shape, resolution=64) np.save(str(output_path / "voxels_64.npy"), voxels) bb = shape.BoundingBox() ground_truth = { "source_step": source_step, "volume_mm3": props["volume_mm3"], "surface_area_mm2": props["surface_area_mm2"], "bbox_mm": [round(bb.xlen, 4), round(bb.ylen, 4), round(bb.zlen, 4)], "face_count": props["face_count"], "face_types": list(props["face_type_counts"].keys()), "dominant_face_type": props["dominant_face_type"], "euler_characteristic": props["euler_characteristic"], "surface_points_file": "surface_points.npy", "voxels_file": "voxels_64.npy", } with open(output_path / "ground_truth.json", "w") as f: json.dump(ground_truth, f, indent=2) elapsed = time.time() - t0 logger.info(f"generate_ground_truth took {elapsed:.3f}s") return ground_truth def preprocess_from_code( code: str, output_dir: str, task_id: Optional[str] = None, ) -> Dict[str, Any]: t0 = time.time() import cadquery as cq local_ns = {"cq": cq, "cadquery": cq} try: import math local_ns["math"] = math except Exception: pass exec(code, local_ns) if "result" not in local_ns: raise ValueError("Code must define a variable named 'result'") result = local_ns["result"] if hasattr(result, "val"): shape = result.val() else: shape = result orig_bb = shape.BoundingBox() orig_bbox_mm = [round(orig_bb.xlen, 4), round(orig_bb.ylen, 4), round(orig_bb.zlen, 4)] normalized_shape, transform_info = normalize_shape(shape) output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) import cadquery as cq original_step_path = str(output_path / "ground_truth.step") cq.exporters.export(cq.Workplane().add(shape), original_step_path, exportType="STEP") normalized_step_path = str(output_path / "ground_truth_normalized.step") cq.exporters.export(cq.Workplane().add(normalized_shape), normalized_step_path, exportType="STEP") gt = generate_ground_truth( normalized_shape, output_dir, source_step=f"server/tasks/{task_id}/ground_truth.step" if task_id else original_step_path, ) gt["canonical_transform"] = transform_info gt["original_bbox_mm"] = orig_bbox_mm with open(output_path / "ground_truth.json", "w") as f: json.dump(gt, f, indent=2) elapsed = time.time() - t0 logger.info(f"preprocess_from_code took {elapsed:.3f}s total") return gt