Spaces:
Sleeping
Sleeping
| """CrewAI tools for CadQuery code execution and CNC validation. | |
| Uses BaseTool subclasses with Pydantic args_schema for structured input. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import logging | |
| from contextvars import ContextVar | |
| from typing import Type | |
| from pydantic import BaseModel, Field | |
| from agents.design_state import DesignState | |
| logger = logging.getLogger(__name__) | |
| try: | |
| from crewai.tools import BaseTool | |
| except ImportError: | |
| class BaseTool: # type: ignore[no-redef] | |
| name: str = "" | |
| description: str = "" | |
| args_schema: type | None = None | |
| def _run(self, **kwargs) -> str: | |
| return "" | |
| # ── Per-request state (ContextVar — async-safe) ───────────────────────── | |
| _last_shape_var: ContextVar[object | None] = ContextVar("last_shape", default=None) | |
| _design_state_var: ContextVar[DesignState | None] = ContextVar("design_state", default=None) | |
| def set_last_shape(shape): | |
| _last_shape_var.set(shape) | |
| def get_last_shape(): | |
| return _last_shape_var.get() | |
| def set_design_state(state: DesignState): | |
| _design_state_var.set(state) | |
| def get_design_state() -> DesignState | None: | |
| return _design_state_var.get() | |
| # ── Tool input schemas ────────────────────────────────────────────────── | |
| class ExecuteCadInput(BaseModel): | |
| code: str = Field(..., description="CadQuery Python code. Must assign result to `result` as cq.Workplane. Import cadquery as cq.") | |
| class ValidateCadInput(BaseModel): | |
| check_type: str = Field(default="full", description="Validation type: 'full' for complete CNC manufacturability check.") | |
| class GenerateGcodeInput(BaseModel): | |
| operations: list[str] = Field(..., description="Ordered list of operations: adaptive, pocket, profile, face, drill, surface, waterline") | |
| tool_diameter: float = Field(default=6.0, description="Endmill diameter in mm") | |
| post_processor: str = Field(default="grbl", description="G-code format: grbl, linuxcnc, fanuc") | |
| VALID_CHECKS = {"all", "material", "dimensions", "features", "constraints", "axis"} | |
| class QueryDesignStateInput(BaseModel): | |
| check: str = Field(default="all", description="What to check: 'all' for full state, or a specific field (material, dimensions, features, constraints, axis).") | |
| # ── Tool implementations ──────────────────────────────────────────────── | |
| class ExecuteCadTool(BaseTool): | |
| name: str = "Execute CadQuery Code" | |
| description: str = "Execute CadQuery Python code and return geometry info: volume, bounding box, face count, edge count." | |
| args_schema: Type[BaseModel] = ExecuteCadInput | |
| def _run(self, code: str) -> str: | |
| from core.executor import execute_cadquery | |
| result = execute_cadquery(code) | |
| if result.success and result.result is not None: | |
| set_last_shape(result.result) | |
| return json.dumps(result.model_dump(by_alias=True), indent=2) | |
| class ValidateCadTool(BaseTool): | |
| name: str = "Validate CNC Manufacturability" | |
| description: str = "Run CNC manufacturability checks on the last executed shape. Returns machinable status, axis recommendation, and issues list." | |
| args_schema: Type[BaseModel] = ValidateCadInput | |
| def _run(self, check_type: str = "full") -> str: | |
| from core.validator import validate_for_cnc | |
| shape = get_last_shape() | |
| if shape is None: | |
| return json.dumps({"success": False, "error": "No shape available. Run Execute CadQuery Code first."}) | |
| validation = validate_for_cnc(shape) | |
| return json.dumps({"success": True, "validation": validation.model_dump()}, indent=2) | |
| class GenerateGcodeTool(BaseTool): | |
| name: str = "Generate G-code Toolpath" | |
| description: str = "Generate CNC G-code toolpath from the last executed CadQuery shape." | |
| args_schema: Type[BaseModel] = GenerateGcodeInput | |
| def _run(self, operations: list[str], tool_diameter: float = 6.0, post_processor: str = "grbl") -> str: | |
| from core.cam import generate_gcode, ToolConfig | |
| shape = get_last_shape() | |
| if shape is None: | |
| return json.dumps({"success": False, "error": "No shape available. Run Execute CadQuery Code first."}) | |
| tool_config = ToolConfig(diameter=tool_diameter, h_feed=800, v_feed=200, speed=18000) | |
| result = generate_gcode( | |
| shape=shape, operations=operations, | |
| tool_config=tool_config, post_processor=post_processor, | |
| ) | |
| return json.dumps(result.model_dump(), indent=2) | |
| class QueryDesignStateTool(BaseTool): | |
| name: str = "Query Design State" | |
| description: str = "Query the orchestrator for current design state and readiness. Call BEFORE saying NOT READY to check what information is already available." | |
| args_schema: Type[BaseModel] = QueryDesignStateInput | |
| def _run(self, check: str = "all") -> str: | |
| from agents.design_state import compute_score | |
| from config.settings import settings | |
| if check not in VALID_CHECKS: | |
| return json.dumps({"error": f"Invalid check: {check!r}. Valid: {sorted(VALID_CHECKS)}"}) | |
| state = get_design_state() | |
| if state is None: | |
| return json.dumps({"error": "No design state available."}) | |
| score = compute_score(state) | |
| threshold = settings.planning.threshold | |
| known = {} | |
| missing = [] | |
| if state.part_name: | |
| known["part_name"] = state.part_name | |
| else: | |
| missing.append("part_name") | |
| if state.material: | |
| known["material"] = state.material | |
| else: | |
| missing.append("material") | |
| if state.dimensions: | |
| known["dimensions"] = state.dimensions | |
| else: | |
| missing.append("dimensions") | |
| if state.features: | |
| known["features"] = state.features | |
| else: | |
| missing.append("features") | |
| if state.constraints: | |
| known["constraints"] = state.constraints | |
| else: | |
| missing.append("constraints") | |
| if state.axis_recommendation: | |
| known["axis_recommendation"] = state.axis_recommendation | |
| else: | |
| missing.append("axis_recommendation") | |
| if state.description: | |
| known["description"] = state.description | |
| if state.decisions: | |
| known["recent_decisions"] = state.decisions[-5:] | |
| result = { | |
| "known": known, | |
| "missing": missing, | |
| "readiness_score": score, | |
| "threshold": threshold, | |
| "ready": score >= threshold, | |
| "phase": state.phase, | |
| } | |
| if check != "all" and check in known: | |
| return json.dumps({"field": check, "value": known[check], "ready": score >= threshold}) | |
| if check != "all" and check in missing: | |
| return json.dumps({"field": check, "value": None, "missing": True, "ready": score >= threshold}) | |
| return json.dumps(result, indent=2) | |