""" LLM backend implementations for CadQuery code generation. Supports multiple backends: - Anthropic Claude - OpenAI GPT-4o - Google Gemini (free tier available) - Mock (dynamic generation, no API key required) - NeuralCAD (local neural pipeline, not yet implemented) """ import base64 import mimetypes import os import re from pathlib import Path from core.types import LLMBackend # ── LLM Backends ────────────────────────────────────────────────────────── class AnthropicBackend(LLMBackend): """Generate CadQuery code using Anthropic Claude.""" def __init__(self, model: str | None = None, api_key: str | None = None): import anthropic from config.settings import settings self.model = model or settings.model_for.get("anthropic", "claude-sonnet-4-20250514") key = api_key or settings.anthropic_api_key or os.environ.get("ANTHROPIC_API_KEY") self.client = anthropic.Anthropic(api_key=key) def generate(self, messages: list[dict]) -> str: from config.settings import settings system_msg, user_messages = self.split_system_message(messages) response = self.client.messages.create( model=self.model, max_tokens=settings.max_tokens, system=system_msg, messages=user_messages, ) return response.content[0].text def generate_with_image(self, messages: list[dict], image_path: str | Path) -> str: from config.settings import settings image_path = Path(image_path) media_type = mimetypes.guess_type(str(image_path))[0] or "image/png" image_data = base64.b64encode(image_path.read_bytes()).decode("utf-8") system_msg, user_messages = self.split_system_message(messages) # Replace last user message content with multimodal blocks last_user = user_messages[-1] last_user["content"] = [ {"type": "image", "source": {"type": "base64", "media_type": media_type, "data": image_data}}, {"type": "text", "text": last_user["content"]}, ] response = self.client.messages.create( model=self.model, max_tokens=settings.max_tokens, system=system_msg, messages=user_messages, ) return response.content[0].text class OpenAIBackend(LLMBackend): """Generate CadQuery code using OpenAI GPT-4o.""" def __init__(self, model: str | None = None, api_key: str | None = None): import openai from config.settings import settings self.model = model or settings.model_for.get("openai", "gpt-4o") key = api_key or settings.openai_api_key or os.environ.get("OPENAI_API_KEY") self.client = openai.OpenAI(api_key=key) def generate(self, messages: list[dict]) -> str: from config.settings import settings response = self.client.chat.completions.create( model=self.model, messages=messages, max_tokens=settings.max_tokens, temperature=settings.temperature, ) return response.choices[0].message.content def generate_with_image(self, messages: list[dict], image_path: str | Path) -> str: from config.settings import settings image_path = Path(image_path) media_type = mimetypes.guess_type(str(image_path))[0] or "image/png" image_data = base64.b64encode(image_path.read_bytes()).decode("utf-8") data_url = f"data:{media_type};base64,{image_data}" # Copy messages, replace last user message with multimodal content patched = [dict(m) for m in messages] last_user = patched[-1] last_user["content"] = [ {"type": "image_url", "image_url": {"url": data_url}}, {"type": "text", "text": last_user["content"]}, ] response = self.client.chat.completions.create( model=self.model, messages=patched, max_tokens=settings.max_tokens, temperature=settings.temperature, ) return response.choices[0].message.content class GeminiBackend(LLMBackend): """Generate CadQuery code using Google Gemini (free tier available).""" def __init__(self, model: str | None = None, api_key: str | None = None): from google import genai from config.settings import settings self.model = model or settings.model_for.get("gemini", "gemini-2.5-flash") key = api_key or settings.google_api_key or os.environ.get("GEMINI_API_KEY") or os.environ.get("GOOGLE_API_KEY") self.client = genai.Client(api_key=key) def generate(self, messages: list[dict]) -> str: from config.settings import settings from google.genai import types system_msg, other_messages = self.split_system_message(messages) contents = [] for m in other_messages: if m["role"] == "user": contents.append({"role": "user", "parts": [{"text": m["content"]}]}) elif m["role"] == "assistant": contents.append({"role": "model", "parts": [{"text": m["content"]}]}) response = self.client.models.generate_content( model=self.model, contents=contents, config=types.GenerateContentConfig( system_instruction=system_msg, max_output_tokens=settings.max_tokens, temperature=settings.temperature, ), ) return response.text def generate_with_image(self, messages: list[dict], image_path: str | Path) -> str: from config.settings import settings from google.genai import types image_path = Path(image_path) image_data = image_path.read_bytes() media_type = mimetypes.guess_type(str(image_path))[0] or "image/png" system_msg, other_messages = self.split_system_message(messages) contents = [] for m in other_messages: if m["role"] == "user": contents.append({"role": "user", "parts": [{"text": m["content"]}]}) elif m["role"] == "assistant": contents.append({"role": "model", "parts": [{"text": m["content"]}]}) # Add image to the last user message if contents and contents[-1]["role"] == "user": contents[-1]["parts"].insert(0, { "inline_data": {"mime_type": media_type, "data": image_data} }) response = self.client.models.generate_content( model=self.model, contents=contents, config=types.GenerateContentConfig( system_instruction=system_msg, max_output_tokens=settings.max_tokens, temperature=settings.temperature, ), ) return response.text class MockBackend(LLMBackend): """ Mock backend that dynamically generates CadQuery code from any prompt. Parses dimensions, shape type, and features from the text, then assembles parametric code. No API key required. """ # Word-to-number mapping for natural language counts _WORD_NUMS = { "one": 1, "two": 2, "three": 3, "four": 4, "five": 5, "six": 6, "seven": 7, "eight": 8, "nine": 9, "ten": 10, "twelve": 12, "sixteen": 16, "twenty": 20, } # Shape detection patterns → base shape key _SHAPE_PATTERNS = { "cylinder": [ "cylinder", "rod", "shaft", "axle", "spacer", "washer", "bushing", "sleeve", "tube", "pipe", "dowel", "pin", ], "plate": [ "plate", "bracket", "mount", "flange", "baseplate", "panel", "shim", "cover", "lid", ], "box": [ "box", "block", "enclosure", "housing", "case", "cube", "container", "shell", ], "l_bracket": [ "l-bracket", "l bracket", "angle bracket", "corner bracket", "l-shaped", ], } # Feature detection keywords _FEATURE_KEYWORDS = { "holes": ["hole", "holes", "bolt", "bolts", "screw", "screws", "bore", "bores"], "pocket": ["pocket", "recess", "cavity", "cutout", "mortise"], "slot": ["slot", "slots", "groove", "channel", "keyway"], "fillet": ["fillet", "fillets", "round", "rounded"], "chamfer": ["chamfer", "chamfers", "bevel", "beveled"], "through_hole": ["through hole", "through-hole", "thru hole", "thru-hole"], "counterbore": ["counterbore", "counterbored", "cbore"], "fins": ["fin", "fins", "cooling", "heatsink", "heat sink", "radiator"], "ribs": ["rib", "ribs", "stiffener", "stiffeners", "web"], "boss": ["boss", "bosses", "standoff", "standoffs", "pillar"], } @property def _thread_clearance(self) -> dict[str, float]: from config.settings import settings return settings.fasteners def _parse_prompt(self, text: str) -> dict: """Extract dimensions, shape, and features from natural language.""" lower = text.lower() # Extract all numbers with optional units raw_nums = re.findall(r"(\d+\.?\d*)\s*(?:mm|cm|m\b)?", lower) dimensions = [float(n) for n in raw_nums if 0.1 < float(n) < 2000] # Detect metric thread sizes (M3, M6, etc.) thread_match = re.search(r"\bm(\d+)\b", lower) hole_dia = None if thread_match: key = f"m{thread_match.group(1)}" hole_dia = self._thread_clearance.get( key, float(thread_match.group(1)) * 1.1 ) # Detect hole diameter from "Xmm hole" hole_dim_match = re.search( r"(\d+\.?\d*)\s*mm\s*(?:hole|bore|holes|bores)", lower ) if hole_dim_match and not hole_dia: hole_dia = float(hole_dim_match.group(1)) # Detect count (numeric or word) count = None count_match = re.search( r"(\d+)\s*(?:hole|bolt|screw|bore|fin|rib|slot|boss)", lower ) if count_match: count = int(count_match.group(1)) else: for word, num in self._WORD_NUMS.items(): if re.search(rf"\b{word}\b.*(?:hole|bolt|screw|bore|fin|slot)", lower): count = num break # Detect base shape shape = "box" for shape_key, keywords in self._SHAPE_PATTERNS.items(): if any(kw in lower for kw in keywords): shape = shape_key break # Detect features features = set() for feat, keywords in self._FEATURE_KEYWORDS.items(): if any(kw in lower for kw in keywords): features.add(feat) # If holes mentioned but no specific feature, add generic holes if ( any(w in lower for w in ["hole", "holes", "bolt", "screw"]) and "holes" not in features ): features.add("holes") return { "dimensions": dimensions, "shape": shape, "features": features, "hole_dia": hole_dia or 5.5, "count": count or 4, "prompt": text, } def _generate_code(self, p: dict) -> str: """Build CadQuery code from parsed parameters.""" dims = p["dimensions"] shape = p["shape"] features = p["features"] prompt = p["prompt"] lines = ["import cadquery as cq"] if shape == "cylinder" and "fins" in features: lines.append("import math") lines.append(f"") lines.append(f"# Generated from: {prompt}") if shape == "cylinder": radius = dims[0] / 2 if dims else 15.0 height = dims[1] if len(dims) > 1 else radius * 2 lines.append(f"# Cylinder: radius={radius}mm, height={height}mm") lines.append(f"result = (") lines.append(f" cq.Workplane('XY')") lines.append(f" .cylinder({height}, {radius})") if "holes" in features or "through_hole" in features: lines.append(f" .faces('>Z').workplane()") lines.append(f" .hole({p['hole_dia']})") if "chamfer" in features or "fillet" not in features: lines.append(f" .edges('>Z or Z or 4 else 8 fin_h = max(height * 0.8, 5) fin_w = 1.5 lines.append(f"") lines.append(f"# Add {n_fins} cooling fins") lines.append(f"for i in range({n_fins}):") lines.append(f" angle = i * 360 / {n_fins}") lines.append(f" rad = math.radians(angle)") lines.append(f" fx = {radius + 3} * math.cos(rad)") lines.append(f" fy = {radius + 3} * math.sin(rad)") lines.append(f" fin = (") lines.append(f" cq.Workplane('XY')") lines.append( f" .transformed(offset=(fx, fy, 0), rotate=(0, 0, angle))" ) lines.append(f" .rect({fin_w}, {radius * 0.6})") lines.append(f" .extrude({fin_h})") lines.append(f" )") lines.append(f" result = result.union(fin)") elif shape == "plate": w = dims[0] if dims else 80.0 h = dims[1] if len(dims) > 1 else w * 0.6 t = dims[2] if len(dims) > 2 else 5.0 lines.append(f"# Plate: {w}x{h}x{t}mm") lines.append(f"result = (") lines.append(f" cq.Workplane('XY')") lines.append(f" .box({w}, {h}, {t})") if "holes" in features or "through_hole" in features: n = p["count"] dia = p["hole_dia"] # Distribute holes in a grid or circle if "flange" in p["prompt"].lower() or n >= 6: # Bolt circle pattern r = min(w, h) * 0.35 lines.append(f" .faces('>Z').workplane()") lines.append(f" .polarArray({r}, 0, 360, {n})") lines.append(f" .hole({dia})") if "bore" in p["prompt"].lower() or "flange" in p["prompt"].lower(): lines.append(f" .faces('>Z').workplane()") lines.append(f" .hole({dia * 3}) # Center bore") else: # Rectangular pattern ox = w * 0.35 oy = h * 0.35 pts = [] if n == 1: pts = [(0, 0)] elif n == 2: pts = [(-ox, 0), (ox, 0)] elif n == 4: pts = [(-ox, -oy), (-ox, oy), (ox, -oy), (ox, oy)] else: pts = [(-ox, -oy), (-ox, oy), (ox, -oy), (ox, oy)] lines.append(f" .faces('>Z').workplane()") lines.append(f" .pushPoints({pts})") lines.append(f" .hole({dia})") if "pocket" in features: pw = w * 0.4 ph = h * 0.35 pd = t * 0.6 lines.append(f" .faces('>Z').workplane()") lines.append(f" .rect({pw}, {ph})") lines.append(f" .cutBlind(-{pd}) # Central pocket") if "slot" in features: sl = w * 0.35 sw = max(t * 0.8, 4) lines.append(f" .faces('>Z').workplane()") lines.append(f" .slot2D({sl}, {sw}).cutBlind(-{t})") if "fillet" in features: lines.append(f" .edges('|Z').fillet({max(t * 0.4, 1.5)})") else: lines.append(f" .edges('>Z').chamfer(0.5)") lines.append(f")") elif shape == "l_bracket": arm = dims[0] if dims else 50.0 width = dims[1] if len(dims) > 1 else 20.0 t = dims[2] if len(dims) > 2 else 4.0 lines.append(f"# L-bracket: {arm}mm arms, {width}mm wide, {t}mm thick") lines.append(f"result = (") lines.append(f" cq.Workplane('XZ')") lines.append(f" .moveTo(0, 0)") lines.append(f" .lineTo({arm}, 0)") lines.append(f" .lineTo({arm}, {t})") lines.append(f" .lineTo({t}, {t})") lines.append(f" .lineTo({t}, {arm})") lines.append(f" .lineTo(0, {arm})") lines.append(f" .close()") lines.append(f" .extrude({width})") lines.append(f" .edges('|Y').fillet({max(t * 0.5, 1.5)})") if "holes" in features: lines.append( f" .faces('>Z').workplane(centerOption='CenterOfBoundBox')" ) lines.append(f" .center({arm * 0.5}, 0)") lines.append(f" .hole({p['hole_dia']})") lines.append( f" .faces('>X').workplane(centerOption='CenterOfBoundBox')" ) lines.append(f" .center(0, {arm * 0.5})") lines.append(f" .hole({p['hole_dia']})") lines.append(f" .edges().chamfer(0.5)") lines.append(f")") else: # box / enclosure / housing w = dims[0] if dims else 60.0 h = dims[1] if len(dims) > 1 else w * 0.65 d = dims[2] if len(dims) > 2 else 20.0 lines.append(f"# Box: {w}x{h}x{d}mm") lines.append(f"result = (") lines.append(f" cq.Workplane('XY')") lines.append(f" .box({w}, {h}, {d})") if "holes" in features or "through_hole" in features: ox = w * 0.35 oy = h * 0.35 pts = [(-ox, -oy), (-ox, oy), (ox, -oy), (ox, oy)] lines.append(f" .faces('>Z').workplane()") lines.append(f" .pushPoints({pts})") lines.append(f" .hole({p['hole_dia']})") if "pocket" in features: pw = w * 0.5 ph = h * 0.4 pd = d * 0.4 lines.append(f" .faces('>Z').workplane()") lines.append(f" .rect({pw}, {ph})") lines.append(f" .cutBlind(-{pd})") if "slot" in features: sl = w * 0.4 sw = 6 lines.append(f" .faces('>Z').workplane()") lines.append(f" .slot2D({sl}, {sw}).cutBlind(-{d})") if "boss" in features: n = min(p["count"], 4) bx = w * 0.3 by = h * 0.3 boss_pts = [(-bx, -by), (-bx, by), (bx, -by), (bx, by)][:n] lines.append(f" .faces('>Z').workplane()") lines.append(f" .pushPoints({boss_pts})") lines.append(f" .circle(4).extrude(6) # Mounting bosses") if "ribs" in features: n_ribs = p["count"] if p["count"] <= 8 else 4 spacing = w / (n_ribs + 1) lines.append(f" .faces('>Z').workplane()") for i in range(n_ribs): rx = -w / 2 + spacing * (i + 1) lines.append(f" .center({rx if i == 0 else spacing}, 0)") lines.append(f" .rect(2, {h * 0.8}).extrude({d * 0.3})") if "fillet" in features: lines.append(f" .edges('|Z').fillet({min(d * 0.2, 3)})") elif "chamfer" in features: lines.append(f" .edges('>Z').chamfer(1.0)") else: lines.append(f" .edges('>Z').chamfer(0.5)") lines.append(f")") return "\n".join(lines) + "\n" # Curated hero responses for specific prompts _CURATED = { "gear": """\ import cadquery as cq import math # Simple spur gear approximation: 20 teeth, module 2, 10mm thick module = 2 teeth = 20 pitch_radius = module * teeth / 2 outer_radius = pitch_radius + module tooth_angle = 360 / teeth result = ( cq.Workplane("XY") .cylinder(10, outer_radius) .faces(">Z").workplane() .hole(12) ) for i in range(teeth): angle = i * tooth_angle rad = math.radians(angle) gap_x = pitch_radius * math.cos(rad) gap_y = pitch_radius * math.sin(rad) cutter = ( cq.Workplane("XY") .transformed(offset=(gap_x, gap_y, 0), rotate=(0, 0, angle)) .rect(module * 0.8, module * 2.5) .extrude(12) ) result = result.cut(cutter) result = result.edges(">Z or str: user_msg = messages[-1]["content"] lower = user_msg.lower() # Check curated responses first for key, code in self._CURATED.items(): if key in lower: return code # Dynamic generation for everything else params = self._parse_prompt(user_msg) return self._generate_code(params) class NeuralCADBackend(LLMBackend): """ Neural CAD pipeline backend. Runs trained models locally: Text/Image → CLIP encoder → contrastive latent → Diffusion prior → latent → Transformer decoder → CAD command sequence → OpenCascade kernel → B-rep solid Unlike LLM backends, this does not generate CadQuery code strings. Instead it produces CAD command sequences decoded directly into geometry. """ def __init__( self, model_dir: str | Path = "./models", device: str = "cuda", clip_model: str = "clip_encoder.pt", prior_model: str = "diffusion_prior.pt", decoder_model: str = "transformer_decoder.pt", ): self.model_dir = Path(model_dir) self.device = device self.clip_encoder = None self.diffusion_prior = None self.transformer_decoder = None self._model_config = { "clip": clip_model, "prior": prior_model, "decoder": decoder_model, } def load_models(self): """Load all model weights from disk. Call once before inference.""" raise NotImplementedError( f"Model loading not yet implemented. " f"Expected model files in: {self.model_dir}" ) def encode_text(self, text: str): """Encode text prompt to CLIP latent vector.""" raise NotImplementedError("CLIP text encoder not yet implemented") def encode_image(self, image_path: str | Path): """Encode image (photo/sketch) to CLIP latent vector.""" raise NotImplementedError("CLIP image encoder not yet implemented") def run_diffusion_prior(self, clip_embedding): """Map CLIP embedding to CAD latent via diffusion prior.""" raise NotImplementedError("Diffusion prior not yet implemented") def decode_to_cad_sequence(self, latent): """Decode latent to CAD command sequence.""" raise NotImplementedError("Transformer decoder not yet implemented") def cad_sequence_to_solid(self, cad_commands: list[dict]): """Execute CAD command sequence through OpenCascade kernel → B-rep solid.""" raise NotImplementedError("CAD kernel execution not yet implemented") def generate(self, messages: list[dict]) -> str: """ LLMBackend-compatible interface. Extracts the text prompt from messages, runs the full neural pipeline, and returns CadQuery-equivalent code as a string for compatibility with the existing execution/validation/export pipeline. """ user_msg = messages[-1]["content"] clip_emb = self.encode_text(user_msg) latent = self.run_diffusion_prior(clip_emb) cad_commands = self.decode_to_cad_sequence(latent) return self._cad_commands_to_code(cad_commands) def generate_from_image(self, image_path: str | Path, text_hint: str = "") -> str: """ Image-conditioned generation (not available on LLM backends). Args: image_path: Path to photo or sketch of the desired part. text_hint: Optional text to guide generation alongside the image. Returns: CadQuery code string for pipeline compatibility. """ img_emb = self.encode_image(image_path) if text_hint: txt_emb = self.encode_text(text_hint) # Fuse text + image embeddings (strategy TBD — average, concat, cross-attn) clip_emb = (img_emb + txt_emb) / 2 # placeholder fusion else: clip_emb = img_emb latent = self.run_diffusion_prior(clip_emb) cad_commands = self.decode_to_cad_sequence(latent) return self._cad_commands_to_code(cad_commands) def _cad_commands_to_code(self, cad_commands: list[dict]) -> str: """Convert internal CAD command sequence to CadQuery Python code string.""" raise NotImplementedError( "CAD command → CadQuery code serializer not yet implemented" )