Spaces:
Sleeping
Sleeping
Daniel Tu
feat: constrain agent questions to user-friendly abstraction level (#13)
afd1605 unverified | """agent_flow.py — Models, routing helpers, and readiness checks for CrewAI Flow. | |
| This module provides the foundational data models and pure helper functions | |
| used by the NeuralCAD multi-agent CrewAI Flow pipeline: | |
| - AgentResponse: serialisable record of a single agent's reply | |
| - AgentFlowState: full mutable state carried through a flow run | |
| - extract_code: pull a CadQuery code block out of a raw LLM response | |
| - route_agents: decide which agents should handle a given message | |
| - check_readiness: decide whether generation agents should proceed | |
| - collect_responses: flatten advisor + generator responses into one list | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| import re | |
| from pathlib import Path | |
| from pydantic import BaseModel, Field | |
| from crewai.flow.flow import Flow, listen, or_, start, router | |
| from agents.definitions import AGENTS | |
| from config.settings import settings | |
| logger = logging.getLogger(__name__) | |
| WIKI_DIR = Path(__file__).parent.parent / "docs" / "wiki" | |
| # ── CrewAI prompt templates ───────────────────────────────────────────────── | |
| # Applied via CrewAI's system_template / prompt_template on advisory agents. | |
| # {{ .System }} expands to the default system slices (role_playing + tools); | |
| # {{ .Prompt }} expands to the task slice. This preserves all default CrewAI | |
| # behaviour (tool-use format, etc.) while injecting question-level guidance. | |
| _ADVISOR_SYSTEM_TEMPLATE = """\ | |
| {{ .System }} | |
| ## Question Guidelines | |
| When asking the user clarifying questions, follow these rules strictly: | |
| - Ask questions a normal person can answer. The user describes INTENT; you derive the technical details. | |
| - NEVER ask for exact coordinates, precise radii, specific tolerance values, or CAD-level parameters. | |
| - Frame questions in everyday language and offer 2-4 plain-language suggestions. | |
| - GOOD: "Where should the mounting holes go?" → "one in each corner", "evenly spaced along edges" | |
| - BAD: "What are the precise X, Y, Z coordinates for the M4 mounting holes?" | |
| - GOOD: "How thick should the walls be?" → "thin (1-2mm)", "standard (3-5mm)", "heavy-duty (6mm+)" | |
| - BAD: "What is the minimum wall thickness in mm for the vertical ribs?" | |
| - GOOD: "Should the edges be sharp or rounded?" | |
| - BAD: "What fillet radius should be applied to the internal pocket edges?" | |
| - You are the expert — infer technical parameters from the user's high-level answer.""" | |
| _ADVISOR_PROMPT_TEMPLATE = "{{ .Prompt }}" | |
| # ── Data models ─────────────────────────────────────────────────────────────── | |
| class AgentResponse(BaseModel): | |
| """Serialisable record of a single agent's reply within a flow run.""" | |
| agent_id: str | |
| agent_name: str | |
| message: str | |
| color: str | |
| avatar: str | |
| code: str | None = None | |
| def from_agent( | |
| cls, | |
| agent_id: str, | |
| message: str, | |
| code: str | None = None, | |
| ) -> AgentResponse: | |
| """Construct an AgentResponse from an agent id and raw message text.""" | |
| agent_def = AGENTS.get(agent_id) | |
| if agent_def is None: | |
| raise ValueError(f"Unknown agent_id: {agent_id!r}. Valid: {sorted(AGENTS)}") | |
| return cls( | |
| agent_id=agent_id, | |
| agent_name=agent_def.name, | |
| message=message, | |
| color=agent_def.color, | |
| avatar=agent_def.avatar, | |
| code=code, | |
| ) | |
| class AgentFlowState(BaseModel): | |
| """Mutable state object carried through a single CrewAI Flow run.""" | |
| message: str = "" | |
| context: str = "" | |
| model_str: str = "" | |
| mentions: list[str] = Field(default_factory=list) | |
| is_approved_phase: bool = False | |
| active_agent_ids: list[str] = Field(default_factory=list) | |
| knowledge_sources_data: list[str] = Field(default_factory=list) | |
| advisor_responses: list[AgentResponse] = Field(default_factory=list) | |
| cad_response: AgentResponse | None = None | |
| cam_response: AgentResponse | None = None | |
| cad_code: str | None = None | |
| # String forward reference avoids importing core.cam at module level, | |
| # preventing circular-import issues when cam.py itself imports settings. | |
| cam_plan: "CAMPlan | None" = None | |
| # Resolve the CAMPlan forward reference now that the model class exists. | |
| # This import is deferred to here to avoid a circular import at module load | |
| # (core.cam imports config.settings which is already imported above). | |
| from core.cam import CAMPlan # noqa: E402 | |
| AgentFlowState.model_rebuild() | |
| # ── Response models ────────────────────────────────────────────────────────── | |
| from agents.gap_analyzer import GeneratedQuestionCard # noqa: E402 | |
| class PreviewData(BaseModel): | |
| """Preview data for a generated CAD model, sent to the frontend.""" | |
| success: bool | |
| part_name: str = "" | |
| stl_url: str = "" | |
| step_url: str = "" | |
| threemf_url: str = "" | |
| execution: dict = Field(default_factory=dict) | |
| validation: dict = Field(default_factory=dict) | |
| cam: dict | None = None | |
| gcode_url: str | None = None | |
| error: str | None = None | |
| class ChatTurnResponse(BaseModel): | |
| """Unified response envelope from all orchestrator chat_turn() methods.""" | |
| responses: list[AgentResponse] = Field(default_factory=list) | |
| preview: PreviewData | None = None | |
| design_state: "DesignState" | |
| question_cards: list[GeneratedQuestionCard] = Field(default_factory=list) | |
| from agents.design_state import DesignState # noqa: E402 | |
| ChatTurnResponse.model_rebuild() | |
| # ── Pure helper functions ───────────────────────────────────────────────────── | |
| ADVISOR_IDS: frozenset[str] = frozenset({"design", "engineering", "cnc"}) | |
| GENERATOR_IDS: frozenset[str] = frozenset({"cad", "cam"}) | |
| def extract_code(text: str) -> str | None: | |
| """Return the first Python/CadQuery code block found in *text*, or None. | |
| Searches for fenced code blocks (``` ... ```) first. Falls back to | |
| returning the whole text when it contains characteristic CadQuery markers. | |
| """ | |
| match = re.search(r"```(?:python)?\s*\n(.*?)```", text, re.DOTALL) | |
| if match: | |
| return match.group(1).strip() | |
| if any(marker in text for marker in ["import cadquery", "cq.", "result ="]): | |
| return text.strip() | |
| return None | |
| def route_agents( | |
| message: str, | |
| mentions: list[str], | |
| is_approved_phase: bool, | |
| ) -> list[str]: | |
| """Return the ordered list of agent ids that should handle *message*. | |
| Priority order: | |
| 1. Approved phase → fixed set from config | |
| 2. Explicit @mentions → use as-is | |
| 3. Keyword scoring → up to *max_active_agents* highest-scoring agents, | |
| with a fallback to ["design", "engineering"] when nothing matches | |
| 4. CAD trigger keywords → append "cad" if not already present | |
| """ | |
| if is_approved_phase: | |
| return list(settings.planning.approved_agents) | |
| if mentions: | |
| return list(mentions) | |
| lower = message.lower() | |
| keywords: dict[str, list[str]] = settings.routing.keywords | |
| max_agents: int = settings.orchestration.max_active_agents | |
| scores: dict[str, int] = {agent_id: 0 for agent_id in keywords} | |
| for agent_id, kws in keywords.items(): | |
| for kw in kws: | |
| if kw in lower: | |
| scores[agent_id] += 1 | |
| active = [ | |
| aid | |
| for aid, score in sorted(scores.items(), key=lambda x: -x[1]) | |
| if score > 0 | |
| ] | |
| if not active: | |
| active = ["design", "engineering"] | |
| active = active[:max_agents] | |
| cad_triggers: list[str] = settings.routing.cad_trigger_keywords | |
| if "cad" not in active and any(kw in lower for kw in cad_triggers): | |
| active.append("cad") | |
| return active | |
| def check_readiness( | |
| advisor_responses: list[AgentResponse], | |
| active_agent_ids: list[str], | |
| ) -> str: | |
| """Return a readiness verdict for the generation phase. | |
| Returns: | |
| "SKIP_GENERATION" — no generator agent is active; skip generation. | |
| "NOT_READY" — at least one advisor flagged missing information. | |
| "READY" — all advisors are satisfied; proceed with generation. | |
| """ | |
| has_generators = bool(GENERATOR_IDS & set(active_agent_ids)) | |
| if not has_generators: | |
| return "SKIP_GENERATION" | |
| for resp in advisor_responses: | |
| if resp.message.strip().upper().startswith("NOT READY:"): | |
| return "NOT_READY" | |
| return "READY" | |
| def collect_responses( | |
| advisor_responses: list[AgentResponse], | |
| cad_response: AgentResponse | None, | |
| cam_response: AgentResponse | None, | |
| ) -> list[AgentResponse]: | |
| """Merge advisor and generator responses into a single ordered list. | |
| Advisor responses come first, followed by the CAD response (if any), | |
| then the CAM response (if any). | |
| """ | |
| result = list(advisor_responses) | |
| if cad_response is not None: | |
| result.append(cad_response) | |
| if cam_response is not None: | |
| result.append(cam_response) | |
| return result | |
| # ── Flow class ─────────────────────────────────────────────────────────────── | |
| class AgentDispatchFlow(Flow[AgentFlowState]): | |
| """Flow-based agent dispatch replacing Crew.kickoff() + RoutingEngine.""" | |
| def prepare_agents(self): | |
| """Load wiki knowledge sources.""" | |
| for filename in ("cutting-parameters.md", "gcode-reference.md"): | |
| path = WIKI_DIR / filename | |
| if path.exists(): | |
| self.state.knowledge_sources_data.append(path.read_text()) | |
| def route_message(self): | |
| """Select agents and return path.""" | |
| self.state.active_agent_ids = route_agents( | |
| self.state.message, self.state.mentions, self.state.is_approved_phase, | |
| ) | |
| if not self.state.active_agent_ids: | |
| return "NO_AGENTS" | |
| has_advisors = bool(ADVISOR_IDS & set(self.state.active_agent_ids)) | |
| has_generators = bool(GENERATOR_IDS & set(self.state.active_agent_ids)) | |
| if has_advisors: | |
| return "HAS_ADVISORS" | |
| if has_generators: | |
| return "GENERATORS_ONLY" | |
| return "NO_AGENTS" | |
| def run_advisors(self): | |
| """Run advisory agents as one Crew.""" | |
| advisor_ids = [aid for aid in self.state.active_agent_ids if aid in ADVISOR_IDS] | |
| if not advisor_ids: | |
| return | |
| self.state.advisor_responses = self._run_advisor_crew(advisor_ids) | |
| def check_readiness_router(self): | |
| """Gate generation based on advisor responses.""" | |
| return check_readiness(self.state.advisor_responses, self.state.active_agent_ids) | |
| def run_cad(self): | |
| self._run_cad_step() | |
| def run_cad_not_ready(self): | |
| self._run_cad_step() | |
| def skip_generation(self): | |
| pass | |
| def run_cam(self): | |
| self._run_cam_step() | |
| def skip_cam_after_not_ready(self): | |
| pass | |
| def run_cad_gen_only(self): | |
| self._run_cad_step() | |
| def run_cam_gen_only(self): | |
| self._run_cam_step() | |
| def no_agents(self): | |
| pass | |
| def collect_results(self): | |
| pass # Responses already on state | |
| # ── Private helpers ────────────────────────────────────────────── | |
| _memory = None # Set by CrewOrchestrator before kickoff | |
| def _recall_for_agent(self, agent_id: str) -> str: | |
| """Recall relevant memories for this agent, formatted as context.""" | |
| if self._memory is None: | |
| return "" | |
| try: | |
| from config.settings import settings | |
| matches = self._memory.recall( | |
| self.state.message, | |
| scope=f"/agent/{agent_id}", | |
| limit=settings.memory.recall_limit, | |
| depth=settings.memory.recall_depth, | |
| ) | |
| except Exception: | |
| return "" | |
| if not matches: | |
| return "" | |
| lines = [f"- {m.record.content}" for m in matches] | |
| return "## Relevant context from prior turns\n" + "\n".join(lines) | |
| def _remember_response(self, agent_id: str, content: str): | |
| """Store an agent's response in its scoped memory.""" | |
| if self._memory is None: | |
| return | |
| try: | |
| self._memory.remember(content, scope=f"/agent/{agent_id}") | |
| except Exception: | |
| pass | |
| def _build_llm(self): | |
| from crewai import LLM | |
| return LLM(model=self.state.model_str, temperature=settings.temperature) | |
| def _build_knowledge_sources(self): | |
| sources = [] | |
| try: | |
| from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource | |
| for content in self.state.knowledge_sources_data: | |
| sources.append(StringKnowledgeSource(content=content)) | |
| except ImportError: | |
| pass | |
| return sources | |
| def _build_crew_agent(self, agent_id, llm): | |
| """Create a CrewAI Agent + Task for the given agent_id.""" | |
| from crewai import Agent, Task | |
| from agents.tools import ( | |
| QueryDesignStateTool, ExecuteCadTool, | |
| ValidateCadTool, GenerateGcodeTool, | |
| ) | |
| from core.cam import CAMPlan as CAMPlanModel | |
| agent_def = AGENTS[agent_id] | |
| tools = [QueryDesignStateTool()] | |
| extra_backstory = "" | |
| task_output_pydantic = None | |
| if agent_id == "cad": | |
| tools.extend([ExecuteCadTool(), ValidateCadTool()]) | |
| from core.cadquery_prompts import CADQUERY_SYSTEM_PROMPT | |
| extra_backstory = ( | |
| "\n\nBefore deciding if specs are sufficient, ALWAYS call the " | |
| "Query Design State tool first to check what the orchestrator " | |
| "already knows (dimensions, material, features). Only say " | |
| "NOT READY if the tool confirms information is truly missing.\n\n" | |
| "When generating code, use the Execute CadQuery Code tool " | |
| "to test your code. If it fails, fix the errors and try again. " | |
| "Use the Validate CNC Manufacturability tool to check the result. " | |
| "Output ONLY valid CadQuery Python that assigns result to a " | |
| "cq.Workplane. Import cadquery as cq.\n\n" | |
| f"CadQuery reference:\n{CADQUERY_SYSTEM_PROMPT}" | |
| ) | |
| elif agent_id == "cnc": | |
| extra_backstory = ( | |
| "\n\nBefore deciding if manufacturability info is sufficient, " | |
| "ALWAYS call the Query Design State tool first to check what " | |
| "the orchestrator already knows (material, dimensions, features, " | |
| "constraints, axis). Only say NOT READY if the tool confirms " | |
| "information is truly missing." | |
| ) | |
| elif agent_id == "cam": | |
| tools.append(GenerateGcodeTool()) | |
| task_output_pydantic = CAMPlanModel | |
| elif agent_id in ("design", "engineering"): | |
| extra_backstory = ( | |
| "\n\nBefore asking clarifying questions, call the Query Design " | |
| "State tool to check what is already known. Do NOT ask about " | |
| "fields the tool shows as already provided." | |
| ) | |
| knowledge_sources = self._build_knowledge_sources() if agent_id in ("cnc", "cam") else [] | |
| # Advisory agents get system_template with question-abstraction | |
| # guidance via CrewAI's prompt customisation (see _ADVISOR_SYSTEM_TEMPLATE). | |
| is_advisor = agent_id in ADVISOR_IDS | |
| crew_agent = Agent( | |
| role=agent_def.role, | |
| goal=agent_def.goal, | |
| backstory=agent_def.backstory + extra_backstory, | |
| llm=llm, | |
| tools=tools, | |
| verbose=False, | |
| allow_delegation=settings.crew.collaboration and is_advisor, | |
| knowledge_sources=knowledge_sources if knowledge_sources else None, | |
| system_template=_ADVISOR_SYSTEM_TEMPLATE if is_advisor else None, | |
| prompt_template=_ADVISOR_PROMPT_TEMPLATE if is_advisor else None, | |
| ) | |
| memories = self._recall_for_agent(agent_id) | |
| context_parts = [self.state.context] | |
| if memories: | |
| context_parts.append(memories) | |
| task_description = "\n\n".join(context_parts) + "\n\n" | |
| task_description += ( | |
| f"As the {agent_def.role}, respond to the user's latest message. " | |
| f"Keep your response concise (2-4 sentences). " | |
| f"Do NOT repeat anything from the conversation history. " | |
| f"Add NEW information from your expertise.\n\n" | |
| f"Build on other agents' input — agree, disagree, refine, or add." | |
| ) | |
| if agent_id == "cad": | |
| task_description += ( | |
| "\n\nFIRST call the Query Design State tool to check what the " | |
| "orchestrator already knows. Use the returned 'known' fields " | |
| "as your specs. Only say 'NOT READY:' listing truly missing " | |
| "items if the tool shows critical gaps (no shape, no dimensions, " | |
| "no features). If enough info exists, generate CadQuery code and " | |
| "use the Execute CadQuery Code tool to verify it works." | |
| ) | |
| elif agent_id == "cam": | |
| task_description += ( | |
| "\n\nFIRST call the Query Design State tool to check available " | |
| "specs. If there is no CAD model generated yet or the tool shows " | |
| "critical gaps (no material, no dimensions), start with " | |
| "'NOT READY:' and list only the truly missing items. " | |
| "If enough info exists, analyze the part geometry and create an " | |
| "optimal machining strategy. Select operations in order (roughing " | |
| "before finishing). Use the Generate G-code Toolpath tool." | |
| ) | |
| elif agent_id == "cnc": | |
| task_description += ( | |
| "\n\nFIRST call the Query Design State tool to check what the " | |
| "orchestrator already knows about dimensions, material, and " | |
| "constraints. Only say 'NOT READY:' listing truly missing items " | |
| "if the tool confirms critical gaps. If enough info exists, " | |
| "provide your manufacturability assessment." | |
| ) | |
| else: | |
| task_description += ( | |
| "\n\nFIRST call the Query Design State tool to see what is " | |
| "already known. Only ask clarifying questions about fields " | |
| "the tool shows as missing in YOUR domain." | |
| ) | |
| if agent_id == "cad": | |
| expected_output = "Valid CadQuery Python code or a 'NOT READY:' message." | |
| elif agent_id in ("cnc", "cam"): | |
| expected_output = "A concise expert assessment or a 'NOT READY:' message listing missing items." | |
| else: | |
| expected_output = "A concise response from your expert perspective (2-4 sentences)." | |
| task = Task( | |
| description=task_description, | |
| expected_output=expected_output, | |
| agent=crew_agent, | |
| output_pydantic=task_output_pydantic, | |
| ) | |
| return crew_agent, task | |
| def _run_advisor_crew(self, advisor_ids): | |
| from crewai import Crew, Process | |
| llm = self._build_llm() | |
| pairs = [self._build_crew_agent(aid, llm) for aid in advisor_ids] | |
| crew = Crew( | |
| agents=[p[0] for p in pairs], | |
| tasks=[p[1] for p in pairs], | |
| process=Process.sequential, | |
| planning=settings.crew.planning, | |
| planning_llm=self._build_llm(), | |
| verbose=False, | |
| ) | |
| crew_result = crew.kickoff() | |
| responses = [] | |
| task_outputs = crew_result.tasks_output if hasattr(crew_result, 'tasks_output') else [] | |
| for i, agent_id in enumerate(advisor_ids): | |
| raw = str(task_outputs[i]) if i < len(task_outputs) else (str(crew_result) if i == 0 else "") | |
| if raw.strip(): | |
| responses.append(AgentResponse.from_agent(agent_id, raw.strip())) | |
| self._remember_response(agent_id, raw.strip()) | |
| return responses | |
| def _run_single_agent_crew(self, agent_id): | |
| """Run a single agent as a one-agent Crew. Returns raw output string. | |
| If the task was created with output_pydantic and CrewAI returns a | |
| pydantic model, serialises it to JSON so callers get a parseable string. | |
| """ | |
| from crewai import Crew, Process | |
| llm = self._build_llm() | |
| crew_agent, task = self._build_crew_agent(agent_id, llm) | |
| crew = Crew( | |
| agents=[crew_agent], | |
| tasks=[task], | |
| process=Process.sequential, | |
| planning=settings.crew.planning, | |
| planning_llm=self._build_llm(), | |
| verbose=False, | |
| ) | |
| crew_result = crew.kickoff() | |
| task_outputs = crew_result.tasks_output if hasattr(crew_result, 'tasks_output') else [] | |
| if not task_outputs: | |
| return str(crew_result).strip() | |
| output = task_outputs[0] | |
| # CrewAI may return a pydantic model when output_pydantic is set | |
| if hasattr(output, 'pydantic') and output.pydantic is not None: | |
| return output.pydantic.model_dump_json() | |
| return str(output).strip() | |
| def _run_cad_step(self): | |
| if "cad" not in self.state.active_agent_ids: | |
| return | |
| raw_output = self._run_single_agent_crew("cad") | |
| if not raw_output: | |
| return | |
| if raw_output.upper().startswith("NOT READY:"): | |
| self.state.cad_response = AgentResponse.from_agent("cad", raw_output) | |
| else: | |
| code = extract_code(raw_output) | |
| if code: | |
| self.state.cad_response = AgentResponse.from_agent("cad", "Model generated.", code=code) | |
| self.state.cad_code = code | |
| else: | |
| self.state.cad_response = AgentResponse.from_agent("cad", raw_output) | |
| if self.state.cad_response is not None: | |
| self._remember_response("cad", raw_output) | |
| def _run_cam_step(self): | |
| if "cam" not in self.state.active_agent_ids: | |
| return | |
| if self.state.cad_code is None: | |
| return | |
| raw_output = self._run_single_agent_crew("cam") | |
| if not raw_output: | |
| return | |
| from core.cam import CAMPlan as CAMPlanModel | |
| try: | |
| import json | |
| plan_data = json.loads(raw_output) | |
| cam_plan = CAMPlanModel(**plan_data) | |
| self.state.cam_plan = cam_plan | |
| self.state.cam_response = AgentResponse.from_agent( | |
| "cam", | |
| f"Machining plan: {', '.join(cam_plan.operations)} | " | |
| f"{cam_plan.tool_diameter}mm endmill | {cam_plan.post_processor}", | |
| ) | |
| except (json.JSONDecodeError, ValueError, KeyError): | |
| self.state.cam_response = AgentResponse.from_agent("cam", raw_output) | |
| if self.state.cam_response is not None: | |
| self._remember_response("cam", raw_output) | |