Spaces:
Sleeping
Sleeping
| import json | |
| import logging | |
| import subprocess | |
| import sys | |
| import tempfile | |
| import time | |
| from pathlib import Path | |
| from typing import Any, Dict, Optional, Tuple | |
| logger = logging.getLogger(__name__) | |
| RUNNER_SCRIPT = ''' | |
| import sys | |
| import json | |
| import traceback | |
| try: | |
| import cadquery as cq | |
| code = sys.stdin.read() | |
| local_ns = {"cq": cq, "cadquery": cq} | |
| try: | |
| import math | |
| local_ns["math"] = math | |
| except Exception: | |
| pass | |
| exec(code, local_ns) | |
| if "result" not in local_ns: | |
| print(json.dumps({"success": False, "error": "No variable named 'result' was defined"})) | |
| sys.exit(0) | |
| result = local_ns["result"] | |
| if hasattr(result, "val"): | |
| shape = result.val() | |
| else: | |
| shape = result | |
| bb = shape.BoundingBox() | |
| volume = shape.Volume() | |
| is_valid = shape.isValid() | |
| faces = shape.Faces() | |
| face_types = [f.geomType() for f in faces] | |
| from collections import Counter | |
| type_counts = dict(Counter(face_types)) | |
| total_faces = len(face_types) | |
| dominant = max(type_counts, key=type_counts.get) if type_counts else "UNKNOWN" | |
| edges = shape.Edges() | |
| vertices = shape.Vertices() | |
| dims = [bb.xlen, bb.ylen, bb.zlen] | |
| sorted_axes = sorted(zip(["X", "Y", "Z"], dims), key=lambda x: x[1], reverse=True) | |
| longest_axis = sorted_axes[0][0] | |
| max_dim = max(dims) if max(dims) > 0 else 1.0 | |
| shells = shape.Shells() | |
| is_watertight = True | |
| if not shells: | |
| is_watertight = False | |
| else: | |
| for s in shells: | |
| if not s.Closed(): | |
| is_watertight = False | |
| break | |
| V = len(vertices) | |
| E = len(edges) | |
| F = total_faces | |
| output = { | |
| "success": True, | |
| "properties": { | |
| "is_valid": is_valid, | |
| "is_watertight": is_watertight, | |
| "volume_mm3": round(volume, 4), | |
| "surface_area_mm2": round(shape.Area(), 4), | |
| "bbox_x_mm": round(bb.xlen, 4), | |
| "bbox_y_mm": round(bb.ylen, 4), | |
| "bbox_z_mm": round(bb.zlen, 4), | |
| "bbox_longest_axis": longest_axis, | |
| "bbox_ratio_yx": round(sorted_axes[1][1] / max_dim, 4), | |
| "bbox_ratio_zx": round(sorted_axes[2][1] / max_dim, 4), | |
| "face_count": total_faces, | |
| "face_type_counts": type_counts, | |
| "dominant_face_type": dominant, | |
| "face_type_distribution": {k: round(v / total_faces, 4) for k, v in type_counts.items()} if total_faces > 0 else {}, | |
| "edge_count": E, | |
| "vertex_count": V, | |
| "euler_characteristic": V - E + F, | |
| } | |
| } | |
| print(json.dumps(output)) | |
| except Exception as e: | |
| tb = traceback.format_exc() | |
| print(json.dumps({"success": False, "error": str(e), "traceback": tb})) | |
| ''' | |
| def execute_cadquery_code( | |
| code: str, | |
| timeout: float = 10.0, | |
| python_path: Optional[str] = None, | |
| ) -> Dict[str, Any]: | |
| t0 = time.time() | |
| if python_path is None: | |
| python_path = sys.executable | |
| logger.info(f"Executing CadQuery code ({len(code)} chars, timeout={timeout}s)") | |
| try: | |
| proc = subprocess.run( | |
| [python_path, "-c", RUNNER_SCRIPT], | |
| input=code, | |
| capture_output=True, | |
| text=True, | |
| timeout=timeout, | |
| ) | |
| elapsed = time.time() - t0 | |
| logger.info(f"Subprocess completed in {elapsed:.3f}s, returncode={proc.returncode}") | |
| stdout = proc.stdout.strip() | |
| stderr = proc.stderr.strip() | |
| if proc.returncode != 0 and not stdout: | |
| return { | |
| "success": False, | |
| "error": stderr or f"Process exited with code {proc.returncode}", | |
| "properties": None, | |
| } | |
| if not stdout: | |
| return { | |
| "success": False, | |
| "error": "No output from subprocess", | |
| "properties": None, | |
| } | |
| result = json.loads(stdout) | |
| if not result.get("success", False): | |
| return { | |
| "success": False, | |
| "error": result.get("error", "Unknown error"), | |
| "properties": None, | |
| } | |
| return { | |
| "success": True, | |
| "error": None, | |
| "properties": result["properties"], | |
| } | |
| except subprocess.TimeoutExpired: | |
| elapsed = time.time() - t0 | |
| logger.warning(f"CadQuery execution timed out after {elapsed:.3f}s") | |
| return { | |
| "success": False, | |
| "error": f"Code execution timed out after {timeout} seconds", | |
| "properties": None, | |
| } | |
| except json.JSONDecodeError as e: | |
| elapsed = time.time() - t0 | |
| logger.error(f"Failed to parse subprocess output after {elapsed:.3f}s: {e}") | |
| return { | |
| "success": False, | |
| "error": f"Failed to parse execution output: {e}", | |
| "properties": None, | |
| } | |
| except Exception as e: | |
| elapsed = time.time() - t0 | |
| logger.error(f"execute_cadquery_code failed after {elapsed:.3f}s: {e}") | |
| return { | |
| "success": False, | |
| "error": str(e), | |
| "properties": None, | |
| } | |