File size: 7,145 Bytes
9304064
 
1db1a69
9304064
 
 
 
 
 
b70c3c9
1db1a69
 
 
9304064
83f3ff9
 
9304064
 
 
1db1a69
9304064
1db1a69
 
 
 
 
 
9304064
1db1a69
b70c3c9
 
83f3ff9
7ac569f
 
 
b70c3c9
7ac569f
 
b70c3c9
 
83f3ff9
 
b70c3c9
83f3ff9
b70c3c9
7ac569f
9304064
1db1a69
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cdccbe3
1db1a69
 
 
cdccbe3
1db1a69
 
 
 
 
 
 
 
 
 
 
 
 
83f3ff9
1db1a69
 
 
 
 
83f3ff9
 
1db1a69
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
"""CrewAI tools for CadQuery code execution and CNC validation.

Uses BaseTool subclasses with Pydantic args_schema for structured input.
"""

from __future__ import annotations

import json
import logging
from contextvars import ContextVar
from typing import Type

from pydantic import BaseModel, Field

from agents.design_state import DesignState

logger = logging.getLogger(__name__)

try:
    from crewai.tools import BaseTool
except ImportError:
    class BaseTool:  # type: ignore[no-redef]
        name: str = ""
        description: str = ""
        args_schema: type | None = None
        def _run(self, **kwargs) -> str:
            return ""

# ── Per-request state (ContextVar — async-safe) ─────────────────────────

_last_shape_var: ContextVar[object | None] = ContextVar("last_shape", default=None)
_design_state_var: ContextVar[DesignState | None] = ContextVar("design_state", default=None)


def set_last_shape(shape):
    _last_shape_var.set(shape)

def get_last_shape():
    return _last_shape_var.get()

def set_design_state(state: DesignState):
    _design_state_var.set(state)

def get_design_state() -> DesignState | None:
    return _design_state_var.get()


# ── Tool input schemas ──────────────────────────────────────────────────

class ExecuteCadInput(BaseModel):
    code: str = Field(..., description="CadQuery Python code. Must assign result to `result` as cq.Workplane. Import cadquery as cq.")

class ValidateCadInput(BaseModel):
    check_type: str = Field(default="full", description="Validation type: 'full' for complete CNC manufacturability check.")

class GenerateGcodeInput(BaseModel):
    operations: list[str] = Field(..., description="Ordered list of operations: adaptive, pocket, profile, face, drill, surface, waterline")
    tool_diameter: float = Field(default=6.0, description="Endmill diameter in mm")
    post_processor: str = Field(default="grbl", description="G-code format: grbl, linuxcnc, fanuc")

VALID_CHECKS = {"all", "material", "dimensions", "features", "constraints", "axis"}

class QueryDesignStateInput(BaseModel):
    check: str = Field(default="all", description="What to check: 'all' for full state, or a specific field (material, dimensions, features, constraints, axis).")


# ── Tool implementations ────────────────────────────────────────────────

class ExecuteCadTool(BaseTool):
    name: str = "Execute CadQuery Code"
    description: str = "Execute CadQuery Python code and return geometry info: volume, bounding box, face count, edge count."
    args_schema: Type[BaseModel] = ExecuteCadInput

    def _run(self, code: str) -> str:
        from core.executor import execute_cadquery
        result = execute_cadquery(code)
        if result.success and result.result is not None:
            set_last_shape(result.result)
        return json.dumps(result.model_dump(by_alias=True), indent=2)


class ValidateCadTool(BaseTool):
    name: str = "Validate CNC Manufacturability"
    description: str = "Run CNC manufacturability checks on the last executed shape. Returns machinable status, axis recommendation, and issues list."
    args_schema: Type[BaseModel] = ValidateCadInput

    def _run(self, check_type: str = "full") -> str:
        from core.validator import validate_for_cnc
        shape = get_last_shape()
        if shape is None:
            return json.dumps({"success": False, "error": "No shape available. Run Execute CadQuery Code first."})
        validation = validate_for_cnc(shape)
        return json.dumps({"success": True, "validation": validation.model_dump()}, indent=2)


class GenerateGcodeTool(BaseTool):
    name: str = "Generate G-code Toolpath"
    description: str = "Generate CNC G-code toolpath from the last executed CadQuery shape."
    args_schema: Type[BaseModel] = GenerateGcodeInput

    def _run(self, operations: list[str], tool_diameter: float = 6.0, post_processor: str = "grbl") -> str:
        from core.cam import generate_gcode, ToolConfig
        shape = get_last_shape()
        if shape is None:
            return json.dumps({"success": False, "error": "No shape available. Run Execute CadQuery Code first."})
        tool_config = ToolConfig(diameter=tool_diameter, h_feed=800, v_feed=200, speed=18000)
        result = generate_gcode(
            shape=shape, operations=operations,
            tool_config=tool_config, post_processor=post_processor,
        )
        return json.dumps(result.model_dump(), indent=2)


class QueryDesignStateTool(BaseTool):
    name: str = "Query Design State"
    description: str = "Query the orchestrator for current design state and readiness. Call BEFORE saying NOT READY to check what information is already available."
    args_schema: Type[BaseModel] = QueryDesignStateInput

    def _run(self, check: str = "all") -> str:
        from agents.design_state import compute_score
        from config.settings import settings

        if check not in VALID_CHECKS:
            return json.dumps({"error": f"Invalid check: {check!r}. Valid: {sorted(VALID_CHECKS)}"})

        state = get_design_state()
        if state is None:
            return json.dumps({"error": "No design state available."})
        score = compute_score(state)
        threshold = settings.planning.threshold

        known = {}
        missing = []

        if state.part_name:
            known["part_name"] = state.part_name
        else:
            missing.append("part_name")
        if state.material:
            known["material"] = state.material
        else:
            missing.append("material")
        if state.dimensions:
            known["dimensions"] = state.dimensions
        else:
            missing.append("dimensions")
        if state.features:
            known["features"] = state.features
        else:
            missing.append("features")
        if state.constraints:
            known["constraints"] = state.constraints
        else:
            missing.append("constraints")
        if state.axis_recommendation:
            known["axis_recommendation"] = state.axis_recommendation
        else:
            missing.append("axis_recommendation")
        if state.description:
            known["description"] = state.description
        if state.decisions:
            known["recent_decisions"] = state.decisions[-5:]

        result = {
            "known": known,
            "missing": missing,
            "readiness_score": score,
            "threshold": threshold,
            "ready": score >= threshold,
            "phase": state.phase,
        }

        if check != "all" and check in known:
            return json.dumps({"field": check, "value": known[check], "ready": score >= threshold})
        if check != "all" and check in missing:
            return json.dumps({"field": check, "value": None, "missing": True, "ready": score >= threshold})

        return json.dumps(result, indent=2)