"""agent_flow.py — Models, routing helpers, and readiness checks for CrewAI Flow. This module provides the foundational data models and pure helper functions used by the NeuralCAD multi-agent CrewAI Flow pipeline: - AgentResponse: serialisable record of a single agent's reply - AgentFlowState: full mutable state carried through a flow run - extract_code: pull a CadQuery code block out of a raw LLM response - route_agents: decide which agents should handle a given message - check_readiness: decide whether generation agents should proceed - collect_responses: flatten advisor + generator responses into one list """ from __future__ import annotations import logging import re from pathlib import Path from pydantic import BaseModel, Field from crewai.flow.flow import Flow, listen, or_, start, router from agents.definitions import AGENTS from config.settings import settings logger = logging.getLogger(__name__) WIKI_DIR = Path(__file__).parent.parent / "docs" / "wiki" # ── CrewAI prompt templates ───────────────────────────────────────────────── # Applied via CrewAI's system_template / prompt_template on advisory agents. # {{ .System }} expands to the default system slices (role_playing + tools); # {{ .Prompt }} expands to the task slice. This preserves all default CrewAI # behaviour (tool-use format, etc.) while injecting question-level guidance. _ADVISOR_SYSTEM_TEMPLATE = """\ {{ .System }} ## Question Guidelines When asking the user clarifying questions, follow these rules strictly: - Ask questions a normal person can answer. The user describes INTENT; you derive the technical details. - NEVER ask for exact coordinates, precise radii, specific tolerance values, or CAD-level parameters. - Frame questions in everyday language and offer 2-4 plain-language suggestions. - GOOD: "Where should the mounting holes go?" → "one in each corner", "evenly spaced along edges" - BAD: "What are the precise X, Y, Z coordinates for the M4 mounting holes?" - GOOD: "How thick should the walls be?" → "thin (1-2mm)", "standard (3-5mm)", "heavy-duty (6mm+)" - BAD: "What is the minimum wall thickness in mm for the vertical ribs?" - GOOD: "Should the edges be sharp or rounded?" - BAD: "What fillet radius should be applied to the internal pocket edges?" - You are the expert — infer technical parameters from the user's high-level answer.""" _ADVISOR_PROMPT_TEMPLATE = "{{ .Prompt }}" # ── Data models ─────────────────────────────────────────────────────────────── class AgentResponse(BaseModel): """Serialisable record of a single agent's reply within a flow run.""" agent_id: str agent_name: str message: str color: str avatar: str code: str | None = None @classmethod def from_agent( cls, agent_id: str, message: str, code: str | None = None, ) -> AgentResponse: """Construct an AgentResponse from an agent id and raw message text.""" agent_def = AGENTS.get(agent_id) if agent_def is None: raise ValueError(f"Unknown agent_id: {agent_id!r}. Valid: {sorted(AGENTS)}") return cls( agent_id=agent_id, agent_name=agent_def.name, message=message, color=agent_def.color, avatar=agent_def.avatar, code=code, ) class AgentFlowState(BaseModel): """Mutable state object carried through a single CrewAI Flow run.""" message: str = "" context: str = "" model_str: str = "" mentions: list[str] = Field(default_factory=list) is_approved_phase: bool = False active_agent_ids: list[str] = Field(default_factory=list) knowledge_sources_data: list[str] = Field(default_factory=list) advisor_responses: list[AgentResponse] = Field(default_factory=list) cad_response: AgentResponse | None = None cam_response: AgentResponse | None = None cad_code: str | None = None # String forward reference avoids importing core.cam at module level, # preventing circular-import issues when cam.py itself imports settings. cam_plan: "CAMPlan | None" = None # Resolve the CAMPlan forward reference now that the model class exists. # This import is deferred to here to avoid a circular import at module load # (core.cam imports config.settings which is already imported above). from core.cam import CAMPlan # noqa: E402 AgentFlowState.model_rebuild() # ── Response models ────────────────────────────────────────────────────────── from agents.gap_analyzer import GeneratedQuestionCard # noqa: E402 class PreviewData(BaseModel): """Preview data for a generated CAD model, sent to the frontend.""" success: bool part_name: str = "" stl_url: str = "" step_url: str = "" threemf_url: str = "" execution: dict = Field(default_factory=dict) validation: dict = Field(default_factory=dict) cam: dict | None = None gcode_url: str | None = None error: str | None = None class ChatTurnResponse(BaseModel): """Unified response envelope from all orchestrator chat_turn() methods.""" responses: list[AgentResponse] = Field(default_factory=list) preview: PreviewData | None = None design_state: "DesignState" question_cards: list[GeneratedQuestionCard] = Field(default_factory=list) from agents.design_state import DesignState # noqa: E402 ChatTurnResponse.model_rebuild() # ── Pure helper functions ───────────────────────────────────────────────────── ADVISOR_IDS: frozenset[str] = frozenset({"design", "engineering", "cnc"}) GENERATOR_IDS: frozenset[str] = frozenset({"cad", "cam"}) def extract_code(text: str) -> str | None: """Return the first Python/CadQuery code block found in *text*, or None. Searches for fenced code blocks (``` ... ```) first. Falls back to returning the whole text when it contains characteristic CadQuery markers. """ match = re.search(r"```(?:python)?\s*\n(.*?)```", text, re.DOTALL) if match: return match.group(1).strip() if any(marker in text for marker in ["import cadquery", "cq.", "result ="]): return text.strip() return None def route_agents( message: str, mentions: list[str], is_approved_phase: bool, ) -> list[str]: """Return the ordered list of agent ids that should handle *message*. Priority order: 1. Approved phase → fixed set from config 2. Explicit @mentions → use as-is 3. Keyword scoring → up to *max_active_agents* highest-scoring agents, with a fallback to ["design", "engineering"] when nothing matches 4. CAD trigger keywords → append "cad" if not already present """ if is_approved_phase: return list(settings.planning.approved_agents) if mentions: return list(mentions) lower = message.lower() keywords: dict[str, list[str]] = settings.routing.keywords max_agents: int = settings.orchestration.max_active_agents scores: dict[str, int] = {agent_id: 0 for agent_id in keywords} for agent_id, kws in keywords.items(): for kw in kws: if kw in lower: scores[agent_id] += 1 active = [ aid for aid, score in sorted(scores.items(), key=lambda x: -x[1]) if score > 0 ] if not active: active = ["design", "engineering"] active = active[:max_agents] cad_triggers: list[str] = settings.routing.cad_trigger_keywords if "cad" not in active and any(kw in lower for kw in cad_triggers): active.append("cad") return active def check_readiness( advisor_responses: list[AgentResponse], active_agent_ids: list[str], ) -> str: """Return a readiness verdict for the generation phase. Returns: "SKIP_GENERATION" — no generator agent is active; skip generation. "NOT_READY" — at least one advisor flagged missing information. "READY" — all advisors are satisfied; proceed with generation. """ has_generators = bool(GENERATOR_IDS & set(active_agent_ids)) if not has_generators: return "SKIP_GENERATION" for resp in advisor_responses: if resp.message.strip().upper().startswith("NOT READY:"): return "NOT_READY" return "READY" def collect_responses( advisor_responses: list[AgentResponse], cad_response: AgentResponse | None, cam_response: AgentResponse | None, ) -> list[AgentResponse]: """Merge advisor and generator responses into a single ordered list. Advisor responses come first, followed by the CAD response (if any), then the CAM response (if any). """ result = list(advisor_responses) if cad_response is not None: result.append(cad_response) if cam_response is not None: result.append(cam_response) return result # ── Flow class ─────────────────────────────────────────────────────────────── class AgentDispatchFlow(Flow[AgentFlowState]): """Flow-based agent dispatch replacing Crew.kickoff() + RoutingEngine.""" @start() def prepare_agents(self): """Load wiki knowledge sources.""" for filename in ("cutting-parameters.md", "gcode-reference.md"): path = WIKI_DIR / filename if path.exists(): self.state.knowledge_sources_data.append(path.read_text()) @router(prepare_agents) def route_message(self): """Select agents and return path.""" self.state.active_agent_ids = route_agents( self.state.message, self.state.mentions, self.state.is_approved_phase, ) if not self.state.active_agent_ids: return "NO_AGENTS" has_advisors = bool(ADVISOR_IDS & set(self.state.active_agent_ids)) has_generators = bool(GENERATOR_IDS & set(self.state.active_agent_ids)) if has_advisors: return "HAS_ADVISORS" if has_generators: return "GENERATORS_ONLY" return "NO_AGENTS" @listen("HAS_ADVISORS") def run_advisors(self): """Run advisory agents as one Crew.""" advisor_ids = [aid for aid in self.state.active_agent_ids if aid in ADVISOR_IDS] if not advisor_ids: return self.state.advisor_responses = self._run_advisor_crew(advisor_ids) @router(run_advisors) def check_readiness_router(self): """Gate generation based on advisor responses.""" return check_readiness(self.state.advisor_responses, self.state.active_agent_ids) @listen("READY") def run_cad(self): self._run_cad_step() @listen("NOT_READY") def run_cad_not_ready(self): self._run_cad_step() @listen("SKIP_GENERATION") def skip_generation(self): pass @listen(run_cad) def run_cam(self): self._run_cam_step() @listen(run_cad_not_ready) def skip_cam_after_not_ready(self): pass @listen("GENERATORS_ONLY") def run_cad_gen_only(self): self._run_cad_step() @listen(run_cad_gen_only) def run_cam_gen_only(self): self._run_cam_step() @listen("NO_AGENTS") def no_agents(self): pass @listen(or_(run_cam, skip_cam_after_not_ready, skip_generation, run_cam_gen_only, no_agents)) def collect_results(self): pass # Responses already on state # ── Private helpers ────────────────────────────────────────────── _memory = None # Set by CrewOrchestrator before kickoff def _recall_for_agent(self, agent_id: str) -> str: """Recall relevant memories for this agent, formatted as context.""" if self._memory is None: return "" try: from config.settings import settings matches = self._memory.recall( self.state.message, scope=f"/agent/{agent_id}", limit=settings.memory.recall_limit, depth=settings.memory.recall_depth, ) except Exception: return "" if not matches: return "" lines = [f"- {m.record.content}" for m in matches] return "## Relevant context from prior turns\n" + "\n".join(lines) def _remember_response(self, agent_id: str, content: str): """Store an agent's response in its scoped memory.""" if self._memory is None: return try: self._memory.remember(content, scope=f"/agent/{agent_id}") except Exception: pass def _build_llm(self): from crewai import LLM return LLM(model=self.state.model_str, temperature=settings.temperature) def _build_knowledge_sources(self): sources = [] try: from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource for content in self.state.knowledge_sources_data: sources.append(StringKnowledgeSource(content=content)) except ImportError: pass return sources def _build_crew_agent(self, agent_id, llm): """Create a CrewAI Agent + Task for the given agent_id.""" from crewai import Agent, Task from agents.tools import ( QueryDesignStateTool, ExecuteCadTool, ValidateCadTool, GenerateGcodeTool, ) from core.cam import CAMPlan as CAMPlanModel agent_def = AGENTS[agent_id] tools = [QueryDesignStateTool()] extra_backstory = "" task_output_pydantic = None if agent_id == "cad": tools.extend([ExecuteCadTool(), ValidateCadTool()]) from core.cadquery_prompts import CADQUERY_SYSTEM_PROMPT extra_backstory = ( "\n\nBefore deciding if specs are sufficient, ALWAYS call the " "Query Design State tool first to check what the orchestrator " "already knows (dimensions, material, features). Only say " "NOT READY if the tool confirms information is truly missing.\n\n" "When generating code, use the Execute CadQuery Code tool " "to test your code. If it fails, fix the errors and try again. " "Use the Validate CNC Manufacturability tool to check the result. " "Output ONLY valid CadQuery Python that assigns result to a " "cq.Workplane. Import cadquery as cq.\n\n" f"CadQuery reference:\n{CADQUERY_SYSTEM_PROMPT}" ) elif agent_id == "cnc": extra_backstory = ( "\n\nBefore deciding if manufacturability info is sufficient, " "ALWAYS call the Query Design State tool first to check what " "the orchestrator already knows (material, dimensions, features, " "constraints, axis). Only say NOT READY if the tool confirms " "information is truly missing." ) elif agent_id == "cam": tools.append(GenerateGcodeTool()) task_output_pydantic = CAMPlanModel elif agent_id in ("design", "engineering"): extra_backstory = ( "\n\nBefore asking clarifying questions, call the Query Design " "State tool to check what is already known. Do NOT ask about " "fields the tool shows as already provided." ) knowledge_sources = self._build_knowledge_sources() if agent_id in ("cnc", "cam") else [] # Advisory agents get system_template with question-abstraction # guidance via CrewAI's prompt customisation (see _ADVISOR_SYSTEM_TEMPLATE). is_advisor = agent_id in ADVISOR_IDS crew_agent = Agent( role=agent_def.role, goal=agent_def.goal, backstory=agent_def.backstory + extra_backstory, llm=llm, tools=tools, verbose=False, allow_delegation=settings.crew.collaboration and is_advisor, knowledge_sources=knowledge_sources if knowledge_sources else None, system_template=_ADVISOR_SYSTEM_TEMPLATE if is_advisor else None, prompt_template=_ADVISOR_PROMPT_TEMPLATE if is_advisor else None, ) memories = self._recall_for_agent(agent_id) context_parts = [self.state.context] if memories: context_parts.append(memories) task_description = "\n\n".join(context_parts) + "\n\n" task_description += ( f"As the {agent_def.role}, respond to the user's latest message. " f"Keep your response concise (2-4 sentences). " f"Do NOT repeat anything from the conversation history. " f"Add NEW information from your expertise.\n\n" f"Build on other agents' input — agree, disagree, refine, or add." ) if agent_id == "cad": task_description += ( "\n\nFIRST call the Query Design State tool to check what the " "orchestrator already knows. Use the returned 'known' fields " "as your specs. Only say 'NOT READY:' listing truly missing " "items if the tool shows critical gaps (no shape, no dimensions, " "no features). If enough info exists, generate CadQuery code and " "use the Execute CadQuery Code tool to verify it works." ) elif agent_id == "cam": task_description += ( "\n\nFIRST call the Query Design State tool to check available " "specs. If there is no CAD model generated yet or the tool shows " "critical gaps (no material, no dimensions), start with " "'NOT READY:' and list only the truly missing items. " "If enough info exists, analyze the part geometry and create an " "optimal machining strategy. Select operations in order (roughing " "before finishing). Use the Generate G-code Toolpath tool." ) elif agent_id == "cnc": task_description += ( "\n\nFIRST call the Query Design State tool to check what the " "orchestrator already knows about dimensions, material, and " "constraints. Only say 'NOT READY:' listing truly missing items " "if the tool confirms critical gaps. If enough info exists, " "provide your manufacturability assessment." ) else: task_description += ( "\n\nFIRST call the Query Design State tool to see what is " "already known. Only ask clarifying questions about fields " "the tool shows as missing in YOUR domain." ) if agent_id == "cad": expected_output = "Valid CadQuery Python code or a 'NOT READY:' message." elif agent_id in ("cnc", "cam"): expected_output = "A concise expert assessment or a 'NOT READY:' message listing missing items." else: expected_output = "A concise response from your expert perspective (2-4 sentences)." task = Task( description=task_description, expected_output=expected_output, agent=crew_agent, output_pydantic=task_output_pydantic, ) return crew_agent, task def _run_advisor_crew(self, advisor_ids): from crewai import Crew, Process llm = self._build_llm() pairs = [self._build_crew_agent(aid, llm) for aid in advisor_ids] crew = Crew( agents=[p[0] for p in pairs], tasks=[p[1] for p in pairs], process=Process.sequential, planning=settings.crew.planning, planning_llm=self._build_llm(), verbose=False, ) crew_result = crew.kickoff() responses = [] task_outputs = crew_result.tasks_output if hasattr(crew_result, 'tasks_output') else [] for i, agent_id in enumerate(advisor_ids): raw = str(task_outputs[i]) if i < len(task_outputs) else (str(crew_result) if i == 0 else "") if raw.strip(): responses.append(AgentResponse.from_agent(agent_id, raw.strip())) self._remember_response(agent_id, raw.strip()) return responses def _run_single_agent_crew(self, agent_id): """Run a single agent as a one-agent Crew. Returns raw output string. If the task was created with output_pydantic and CrewAI returns a pydantic model, serialises it to JSON so callers get a parseable string. """ from crewai import Crew, Process llm = self._build_llm() crew_agent, task = self._build_crew_agent(agent_id, llm) crew = Crew( agents=[crew_agent], tasks=[task], process=Process.sequential, planning=settings.crew.planning, planning_llm=self._build_llm(), verbose=False, ) crew_result = crew.kickoff() task_outputs = crew_result.tasks_output if hasattr(crew_result, 'tasks_output') else [] if not task_outputs: return str(crew_result).strip() output = task_outputs[0] # CrewAI may return a pydantic model when output_pydantic is set if hasattr(output, 'pydantic') and output.pydantic is not None: return output.pydantic.model_dump_json() return str(output).strip() def _run_cad_step(self): if "cad" not in self.state.active_agent_ids: return raw_output = self._run_single_agent_crew("cad") if not raw_output: return if raw_output.upper().startswith("NOT READY:"): self.state.cad_response = AgentResponse.from_agent("cad", raw_output) else: code = extract_code(raw_output) if code: self.state.cad_response = AgentResponse.from_agent("cad", "Model generated.", code=code) self.state.cad_code = code else: self.state.cad_response = AgentResponse.from_agent("cad", raw_output) if self.state.cad_response is not None: self._remember_response("cad", raw_output) def _run_cam_step(self): if "cam" not in self.state.active_agent_ids: return if self.state.cad_code is None: return raw_output = self._run_single_agent_crew("cam") if not raw_output: return from core.cam import CAMPlan as CAMPlanModel try: import json plan_data = json.loads(raw_output) cam_plan = CAMPlanModel(**plan_data) self.state.cam_plan = cam_plan self.state.cam_response = AgentResponse.from_agent( "cam", f"Machining plan: {', '.join(cam_plan.operations)} | " f"{cam_plan.tool_diameter}mm endmill | {cam_plan.post_processor}", ) except (json.JSONDecodeError, ValueError, KeyError): self.state.cam_response = AgentResponse.from_agent("cam", raw_output) if self.state.cam_response is not None: self._remember_response("cam", raw_output)