Spaces:
Sleeping
Sleeping
| import json | |
| import logging | |
| import time | |
| from pathlib import Path | |
| from typing import Any, Dict, Optional, Tuple | |
| import numpy as np | |
| logger = logging.getLogger(__name__) | |
| def normalize_shape(shape) -> Tuple[Any, Dict[str, Any]]: | |
| t0 = time.time() | |
| import cadquery as cq | |
| bb = shape.BoundingBox() | |
| cx = (bb.xmin + bb.xmax) / 2 | |
| cy = (bb.ymin + bb.ymax) / 2 | |
| cz = (bb.zmin + bb.zmax) / 2 | |
| shape = shape.translate((-cx, -cy, -cz)) | |
| bb = shape.BoundingBox() | |
| dims = {"X": bb.xlen, "Y": bb.ylen, "Z": bb.zlen} | |
| sorted_dims = sorted(dims.items(), key=lambda x: x[1], reverse=True) | |
| longest, second, shortest = sorted_dims[0][0], sorted_dims[1][0], sorted_dims[2][0] | |
| target_order = ["X", "Y", "Z"] | |
| current_order = [longest, second, shortest] | |
| rotation = _compute_alignment_rotation(current_order, target_order) | |
| if rotation is not None: | |
| rx, ry, rz = rotation | |
| if rx != 0 or ry != 0 or rz != 0: | |
| from OCP.gp import gp_Ax1, gp_Pnt, gp_Dir, gp_Trsf | |
| from OCP.BRepBuilderAPI import BRepBuilderAPI_Transform | |
| trsf = gp_Trsf() | |
| if rz != 0: | |
| trsf_z = gp_Trsf() | |
| trsf_z.SetRotation(gp_Ax1(gp_Pnt(0, 0, 0), gp_Dir(0, 0, 1)), np.radians(rz)) | |
| trsf.Multiply(trsf_z) | |
| if ry != 0: | |
| trsf_y = gp_Trsf() | |
| trsf_y.SetRotation(gp_Ax1(gp_Pnt(0, 0, 0), gp_Dir(0, 1, 0)), np.radians(ry)) | |
| trsf.Multiply(trsf_y) | |
| if rx != 0: | |
| trsf_x = gp_Trsf() | |
| trsf_x.SetRotation(gp_Ax1(gp_Pnt(0, 0, 0), gp_Dir(1, 0, 0)), np.radians(rx)) | |
| trsf.Multiply(trsf_x) | |
| builder = BRepBuilderAPI_Transform(shape.wrapped, trsf, True) | |
| builder.Build() | |
| shape = cq.Shape(builder.Shape()) | |
| bb = shape.BoundingBox() | |
| cx2 = (bb.xmin + bb.xmax) / 2 | |
| cy2 = (bb.ymin + bb.ymax) / 2 | |
| cz2 = (bb.zmin + bb.zmax) / 2 | |
| if abs(cx2) > 0.01 or abs(cy2) > 0.01 or abs(cz2) > 0.01: | |
| shape = shape.translate((-cx2, -cy2, -cz2)) | |
| transform_info = { | |
| "units": "mm", | |
| "translation_to_origin": [round(cx, 4), round(cy, 4), round(cz, 4)], | |
| "axis_alignment": current_order, | |
| "longest_axis": "X", | |
| } | |
| elapsed = time.time() - t0 | |
| logger.info(f"normalize_shape took {elapsed:.3f}s") | |
| return shape, transform_info | |
| def _compute_alignment_rotation(current, target): | |
| if current == target: | |
| return None | |
| c = current | |
| if c == ["Y", "X", "Z"]: | |
| return (0, 0, 90) | |
| elif c == ["Z", "Y", "X"]: | |
| return (0, 90, 0) | |
| elif c == ["Z", "X", "Y"]: | |
| return (90, 0, 0) | |
| elif c == ["X", "Z", "Y"]: | |
| return (90, 0, 0) | |
| elif c == ["Y", "Z", "X"]: | |
| return (0, 0, 90) | |
| else: | |
| return (0, 0, 0) | |
| def sample_surface_points(shape, n_points: int = 2048) -> np.ndarray: | |
| t0 = time.time() | |
| from OCP.BRepMesh import BRepMesh_IncrementalMesh | |
| from OCP.BRep import BRep_Tool | |
| from OCP.TopExp import TopExp_Explorer | |
| from OCP.TopAbs import TopAbs_FACE | |
| from OCP.TopLoc import TopLoc_Location | |
| from OCP.TopoDS import TopoDS | |
| mesh = BRepMesh_IncrementalMesh(shape.wrapped, 0.1, False, 0.1, True) | |
| mesh.Perform() | |
| all_points = [] | |
| all_areas = [] | |
| explorer = TopExp_Explorer(shape.wrapped, TopAbs_FACE) | |
| while explorer.More(): | |
| face = TopoDS.Face_s(explorer.Current()) | |
| loc = TopLoc_Location() | |
| tri = BRep_Tool.Triangulation_s(face, loc) | |
| if tri is not None: | |
| trsf = loc.Transformation() | |
| nodes = [] | |
| for i in range(1, tri.NbNodes() + 1): | |
| p = tri.Node(i) | |
| p.Transform(trsf) | |
| nodes.append([p.X(), p.Y(), p.Z()]) | |
| nodes = np.array(nodes) | |
| for i in range(1, tri.NbTriangles() + 1): | |
| t = tri.Triangle(i) | |
| n1, n2, n3 = t.Get() | |
| v0 = nodes[n1 - 1] | |
| v1 = nodes[n2 - 1] | |
| v2 = nodes[n3 - 1] | |
| area = 0.5 * np.linalg.norm(np.cross(v1 - v0, v2 - v0)) | |
| if area > 1e-12: | |
| all_points.append((v0, v1, v2)) | |
| all_areas.append(area) | |
| explorer.Next() | |
| if not all_points: | |
| logger.warning("No triangles found for surface sampling") | |
| return np.zeros((n_points, 3)) | |
| areas = np.array(all_areas) | |
| probs = areas / areas.sum() | |
| sampled = [] | |
| chosen = np.random.choice(len(all_points), size=n_points, p=probs) | |
| for idx in chosen: | |
| v0, v1, v2 = all_points[idx] | |
| r1, r2 = np.random.random(), np.random.random() | |
| if r1 + r2 > 1: | |
| r1, r2 = 1 - r1, 1 - r2 | |
| pt = v0 * (1 - r1 - r2) + v1 * r1 + v2 * r2 | |
| sampled.append(pt) | |
| result = np.array(sampled, dtype=np.float32) | |
| elapsed = time.time() - t0 | |
| logger.info(f"sample_surface_points ({n_points} pts) took {elapsed:.3f}s") | |
| return result | |
| def _occ_to_trimesh(shape): | |
| import trimesh | |
| from OCP.BRepMesh import BRepMesh_IncrementalMesh | |
| from OCP.BRep import BRep_Tool | |
| from OCP.TopExp import TopExp_Explorer | |
| from OCP.TopAbs import TopAbs_FACE | |
| from OCP.TopLoc import TopLoc_Location | |
| from OCP.TopoDS import TopoDS | |
| mesh_occ = BRepMesh_IncrementalMesh(shape.wrapped, 0.1, False, 0.1, True) | |
| mesh_occ.Perform() | |
| verts, faces = [], [] | |
| offset = 0 | |
| explorer = TopExp_Explorer(shape.wrapped, TopAbs_FACE) | |
| while explorer.More(): | |
| face = TopoDS.Face_s(explorer.Current()) | |
| loc = TopLoc_Location() | |
| tri = BRep_Tool.Triangulation_s(face, loc) | |
| if tri is not None: | |
| trsf = loc.Transformation() | |
| nodes = [] | |
| for i in range(1, tri.NbNodes() + 1): | |
| p = tri.Node(i) | |
| p.Transform(trsf) | |
| nodes.append([p.X(), p.Y(), p.Z()]) | |
| for i in range(1, tri.NbTriangles() + 1): | |
| t = tri.Triangle(i) | |
| n1, n2, n3 = t.Get() | |
| faces.append([n1 - 1 + offset, n2 - 1 + offset, n3 - 1 + offset]) | |
| verts.extend(nodes) | |
| offset += len(nodes) | |
| explorer.Next() | |
| if not verts: | |
| raise ValueError("No mesh triangles extracted from shape") | |
| m = trimesh.Trimesh( | |
| vertices=np.array(verts, dtype=np.float64), | |
| faces=np.array(faces, dtype=np.int64), | |
| process=True, | |
| ) | |
| m.fix_normals() | |
| return m | |
| def voxelize(shape, resolution: int = 64) -> np.ndarray: | |
| t0 = time.time() | |
| tri_mesh = _occ_to_trimesh(shape) | |
| bb = shape.BoundingBox() | |
| padding = 0.01 | |
| xs = np.linspace(bb.xmin - padding, bb.xmax + padding, resolution) | |
| ys = np.linspace(bb.ymin - padding, bb.ymax + padding, resolution) | |
| zs = np.linspace(bb.zmin - padding, bb.zmax + padding, resolution) | |
| grid_pts = np.stack(np.meshgrid(xs, ys, zs, indexing='ij'), axis=-1).reshape(-1, 3) | |
| inside = tri_mesh.contains(grid_pts) | |
| grid = inside.reshape(resolution, resolution, resolution) | |
| elapsed = time.time() - t0 | |
| logger.info(f"voxelize ({resolution}^3, trimesh+embree) took {elapsed:.3f}s, fill={grid.sum()}") | |
| return grid | |
| def voxelize_in_bbox(shape, bbox_min, bbox_max, resolution: int = 64) -> np.ndarray: | |
| t0 = time.time() | |
| tri_mesh = _occ_to_trimesh(shape) | |
| padding = 0.01 | |
| xs = np.linspace(bbox_min[0] - padding, bbox_max[0] + padding, resolution) | |
| ys = np.linspace(bbox_min[1] - padding, bbox_max[1] + padding, resolution) | |
| zs = np.linspace(bbox_min[2] - padding, bbox_max[2] + padding, resolution) | |
| grid_pts = np.stack(np.meshgrid(xs, ys, zs, indexing='ij'), axis=-1).reshape(-1, 3) | |
| inside = tri_mesh.contains(grid_pts) | |
| grid = inside.reshape(resolution, resolution, resolution) | |
| elapsed = time.time() - t0 | |
| logger.info(f"voxelize_in_bbox ({resolution}^3) took {elapsed:.3f}s, fill={grid.sum()}/{resolution**3}") | |
| return grid | |
| def generate_ground_truth( | |
| shape, | |
| output_dir: str, | |
| source_step: Optional[str] = None, | |
| ) -> Dict[str, Any]: | |
| t0 = time.time() | |
| output_path = Path(output_dir) | |
| output_path.mkdir(parents=True, exist_ok=True) | |
| from .geometry import extract_properties | |
| props = extract_properties(shape) | |
| points = sample_surface_points(shape, n_points=2048) | |
| np.save(str(output_path / "surface_points.npy"), points) | |
| voxels = voxelize(shape, resolution=64) | |
| np.save(str(output_path / "voxels_64.npy"), voxels) | |
| bb = shape.BoundingBox() | |
| ground_truth = { | |
| "source_step": source_step, | |
| "volume_mm3": props["volume_mm3"], | |
| "surface_area_mm2": props["surface_area_mm2"], | |
| "bbox_mm": [round(bb.xlen, 4), round(bb.ylen, 4), round(bb.zlen, 4)], | |
| "face_count": props["face_count"], | |
| "face_types": list(props["face_type_counts"].keys()), | |
| "dominant_face_type": props["dominant_face_type"], | |
| "euler_characteristic": props["euler_characteristic"], | |
| "surface_points_file": "surface_points.npy", | |
| "voxels_file": "voxels_64.npy", | |
| } | |
| with open(output_path / "ground_truth.json", "w") as f: | |
| json.dump(ground_truth, f, indent=2) | |
| elapsed = time.time() - t0 | |
| logger.info(f"generate_ground_truth took {elapsed:.3f}s") | |
| return ground_truth | |
| def preprocess_from_code( | |
| code: str, | |
| output_dir: str, | |
| task_id: Optional[str] = None, | |
| ) -> Dict[str, Any]: | |
| t0 = time.time() | |
| import cadquery as cq | |
| local_ns = {"cq": cq, "cadquery": cq} | |
| try: | |
| import math | |
| local_ns["math"] = math | |
| except Exception: | |
| pass | |
| exec(code, local_ns) | |
| if "result" not in local_ns: | |
| raise ValueError("Code must define a variable named 'result'") | |
| result = local_ns["result"] | |
| if hasattr(result, "val"): | |
| shape = result.val() | |
| else: | |
| shape = result | |
| orig_bb = shape.BoundingBox() | |
| orig_bbox_mm = [round(orig_bb.xlen, 4), round(orig_bb.ylen, 4), round(orig_bb.zlen, 4)] | |
| normalized_shape, transform_info = normalize_shape(shape) | |
| output_path = Path(output_dir) | |
| output_path.mkdir(parents=True, exist_ok=True) | |
| import cadquery as cq | |
| original_step_path = str(output_path / "ground_truth.step") | |
| cq.exporters.export(cq.Workplane().add(shape), original_step_path, exportType="STEP") | |
| normalized_step_path = str(output_path / "ground_truth_normalized.step") | |
| cq.exporters.export(cq.Workplane().add(normalized_shape), normalized_step_path, exportType="STEP") | |
| gt = generate_ground_truth( | |
| normalized_shape, | |
| output_dir, | |
| source_step=f"server/tasks/{task_id}/ground_truth.step" if task_id else original_step_path, | |
| ) | |
| gt["canonical_transform"] = transform_info | |
| gt["original_bbox_mm"] = orig_bbox_mm | |
| with open(output_path / "ground_truth.json", "w") as f: | |
| json.dump(gt, f, indent=2) | |
| elapsed = time.time() - t0 | |
| logger.info(f"preprocess_from_code took {elapsed:.3f}s total") | |
| return gt | |