Spaces:
Sleeping
Sleeping
File size: 7,145 Bytes
9304064 1db1a69 9304064 b70c3c9 1db1a69 9304064 83f3ff9 9304064 1db1a69 9304064 1db1a69 9304064 1db1a69 b70c3c9 83f3ff9 7ac569f b70c3c9 7ac569f b70c3c9 83f3ff9 b70c3c9 83f3ff9 b70c3c9 7ac569f 9304064 1db1a69 cdccbe3 1db1a69 cdccbe3 1db1a69 83f3ff9 1db1a69 83f3ff9 1db1a69 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 | """CrewAI tools for CadQuery code execution and CNC validation.
Uses BaseTool subclasses with Pydantic args_schema for structured input.
"""
from __future__ import annotations
import json
import logging
from contextvars import ContextVar
from typing import Type
from pydantic import BaseModel, Field
from agents.design_state import DesignState
logger = logging.getLogger(__name__)
try:
from crewai.tools import BaseTool
except ImportError:
class BaseTool: # type: ignore[no-redef]
name: str = ""
description: str = ""
args_schema: type | None = None
def _run(self, **kwargs) -> str:
return ""
# ── Per-request state (ContextVar — async-safe) ─────────────────────────
_last_shape_var: ContextVar[object | None] = ContextVar("last_shape", default=None)
_design_state_var: ContextVar[DesignState | None] = ContextVar("design_state", default=None)
def set_last_shape(shape):
_last_shape_var.set(shape)
def get_last_shape():
return _last_shape_var.get()
def set_design_state(state: DesignState):
_design_state_var.set(state)
def get_design_state() -> DesignState | None:
return _design_state_var.get()
# ── Tool input schemas ──────────────────────────────────────────────────
class ExecuteCadInput(BaseModel):
code: str = Field(..., description="CadQuery Python code. Must assign result to `result` as cq.Workplane. Import cadquery as cq.")
class ValidateCadInput(BaseModel):
check_type: str = Field(default="full", description="Validation type: 'full' for complete CNC manufacturability check.")
class GenerateGcodeInput(BaseModel):
operations: list[str] = Field(..., description="Ordered list of operations: adaptive, pocket, profile, face, drill, surface, waterline")
tool_diameter: float = Field(default=6.0, description="Endmill diameter in mm")
post_processor: str = Field(default="grbl", description="G-code format: grbl, linuxcnc, fanuc")
VALID_CHECKS = {"all", "material", "dimensions", "features", "constraints", "axis"}
class QueryDesignStateInput(BaseModel):
check: str = Field(default="all", description="What to check: 'all' for full state, or a specific field (material, dimensions, features, constraints, axis).")
# ── Tool implementations ────────────────────────────────────────────────
class ExecuteCadTool(BaseTool):
name: str = "Execute CadQuery Code"
description: str = "Execute CadQuery Python code and return geometry info: volume, bounding box, face count, edge count."
args_schema: Type[BaseModel] = ExecuteCadInput
def _run(self, code: str) -> str:
from core.executor import execute_cadquery
result = execute_cadquery(code)
if result.success and result.result is not None:
set_last_shape(result.result)
return json.dumps(result.model_dump(by_alias=True), indent=2)
class ValidateCadTool(BaseTool):
name: str = "Validate CNC Manufacturability"
description: str = "Run CNC manufacturability checks on the last executed shape. Returns machinable status, axis recommendation, and issues list."
args_schema: Type[BaseModel] = ValidateCadInput
def _run(self, check_type: str = "full") -> str:
from core.validator import validate_for_cnc
shape = get_last_shape()
if shape is None:
return json.dumps({"success": False, "error": "No shape available. Run Execute CadQuery Code first."})
validation = validate_for_cnc(shape)
return json.dumps({"success": True, "validation": validation.model_dump()}, indent=2)
class GenerateGcodeTool(BaseTool):
name: str = "Generate G-code Toolpath"
description: str = "Generate CNC G-code toolpath from the last executed CadQuery shape."
args_schema: Type[BaseModel] = GenerateGcodeInput
def _run(self, operations: list[str], tool_diameter: float = 6.0, post_processor: str = "grbl") -> str:
from core.cam import generate_gcode, ToolConfig
shape = get_last_shape()
if shape is None:
return json.dumps({"success": False, "error": "No shape available. Run Execute CadQuery Code first."})
tool_config = ToolConfig(diameter=tool_diameter, h_feed=800, v_feed=200, speed=18000)
result = generate_gcode(
shape=shape, operations=operations,
tool_config=tool_config, post_processor=post_processor,
)
return json.dumps(result.model_dump(), indent=2)
class QueryDesignStateTool(BaseTool):
name: str = "Query Design State"
description: str = "Query the orchestrator for current design state and readiness. Call BEFORE saying NOT READY to check what information is already available."
args_schema: Type[BaseModel] = QueryDesignStateInput
def _run(self, check: str = "all") -> str:
from agents.design_state import compute_score
from config.settings import settings
if check not in VALID_CHECKS:
return json.dumps({"error": f"Invalid check: {check!r}. Valid: {sorted(VALID_CHECKS)}"})
state = get_design_state()
if state is None:
return json.dumps({"error": "No design state available."})
score = compute_score(state)
threshold = settings.planning.threshold
known = {}
missing = []
if state.part_name:
known["part_name"] = state.part_name
else:
missing.append("part_name")
if state.material:
known["material"] = state.material
else:
missing.append("material")
if state.dimensions:
known["dimensions"] = state.dimensions
else:
missing.append("dimensions")
if state.features:
known["features"] = state.features
else:
missing.append("features")
if state.constraints:
known["constraints"] = state.constraints
else:
missing.append("constraints")
if state.axis_recommendation:
known["axis_recommendation"] = state.axis_recommendation
else:
missing.append("axis_recommendation")
if state.description:
known["description"] = state.description
if state.decisions:
known["recent_decisions"] = state.decisions[-5:]
result = {
"known": known,
"missing": missing,
"readiness_score": score,
"threshold": threshold,
"ready": score >= threshold,
"phase": state.phase,
}
if check != "all" and check in known:
return json.dumps({"field": check, "value": known[check], "ready": score >= threshold})
if check != "all" and check in missing:
return json.dumps({"field": check, "value": None, "missing": True, "ready": score >= threshold})
return json.dumps(result, indent=2)
|