neuralcad / agents /agent_flow.py
Daniel Tu
feat: constrain agent questions to user-friendly abstraction level (#13)
afd1605 unverified
"""agent_flow.py — Models, routing helpers, and readiness checks for CrewAI Flow.
This module provides the foundational data models and pure helper functions
used by the NeuralCAD multi-agent CrewAI Flow pipeline:
- AgentResponse: serialisable record of a single agent's reply
- AgentFlowState: full mutable state carried through a flow run
- extract_code: pull a CadQuery code block out of a raw LLM response
- route_agents: decide which agents should handle a given message
- check_readiness: decide whether generation agents should proceed
- collect_responses: flatten advisor + generator responses into one list
"""
from __future__ import annotations
import logging
import re
from pathlib import Path
from pydantic import BaseModel, Field
from crewai.flow.flow import Flow, listen, or_, start, router
from agents.definitions import AGENTS
from config.settings import settings
logger = logging.getLogger(__name__)
WIKI_DIR = Path(__file__).parent.parent / "docs" / "wiki"
# ── CrewAI prompt templates ─────────────────────────────────────────────────
# Applied via CrewAI's system_template / prompt_template on advisory agents.
# {{ .System }} expands to the default system slices (role_playing + tools);
# {{ .Prompt }} expands to the task slice. This preserves all default CrewAI
# behaviour (tool-use format, etc.) while injecting question-level guidance.
_ADVISOR_SYSTEM_TEMPLATE = """\
{{ .System }}
## Question Guidelines
When asking the user clarifying questions, follow these rules strictly:
- Ask questions a normal person can answer. The user describes INTENT; you derive the technical details.
- NEVER ask for exact coordinates, precise radii, specific tolerance values, or CAD-level parameters.
- Frame questions in everyday language and offer 2-4 plain-language suggestions.
- GOOD: "Where should the mounting holes go?" → "one in each corner", "evenly spaced along edges"
- BAD: "What are the precise X, Y, Z coordinates for the M4 mounting holes?"
- GOOD: "How thick should the walls be?" → "thin (1-2mm)", "standard (3-5mm)", "heavy-duty (6mm+)"
- BAD: "What is the minimum wall thickness in mm for the vertical ribs?"
- GOOD: "Should the edges be sharp or rounded?"
- BAD: "What fillet radius should be applied to the internal pocket edges?"
- You are the expert — infer technical parameters from the user's high-level answer."""
_ADVISOR_PROMPT_TEMPLATE = "{{ .Prompt }}"
# ── Data models ───────────────────────────────────────────────────────────────
class AgentResponse(BaseModel):
"""Serialisable record of a single agent's reply within a flow run."""
agent_id: str
agent_name: str
message: str
color: str
avatar: str
code: str | None = None
@classmethod
def from_agent(
cls,
agent_id: str,
message: str,
code: str | None = None,
) -> AgentResponse:
"""Construct an AgentResponse from an agent id and raw message text."""
agent_def = AGENTS.get(agent_id)
if agent_def is None:
raise ValueError(f"Unknown agent_id: {agent_id!r}. Valid: {sorted(AGENTS)}")
return cls(
agent_id=agent_id,
agent_name=agent_def.name,
message=message,
color=agent_def.color,
avatar=agent_def.avatar,
code=code,
)
class AgentFlowState(BaseModel):
"""Mutable state object carried through a single CrewAI Flow run."""
message: str = ""
context: str = ""
model_str: str = ""
mentions: list[str] = Field(default_factory=list)
is_approved_phase: bool = False
active_agent_ids: list[str] = Field(default_factory=list)
knowledge_sources_data: list[str] = Field(default_factory=list)
advisor_responses: list[AgentResponse] = Field(default_factory=list)
cad_response: AgentResponse | None = None
cam_response: AgentResponse | None = None
cad_code: str | None = None
# String forward reference avoids importing core.cam at module level,
# preventing circular-import issues when cam.py itself imports settings.
cam_plan: "CAMPlan | None" = None
# Resolve the CAMPlan forward reference now that the model class exists.
# This import is deferred to here to avoid a circular import at module load
# (core.cam imports config.settings which is already imported above).
from core.cam import CAMPlan # noqa: E402
AgentFlowState.model_rebuild()
# ── Response models ──────────────────────────────────────────────────────────
from agents.gap_analyzer import GeneratedQuestionCard # noqa: E402
class PreviewData(BaseModel):
"""Preview data for a generated CAD model, sent to the frontend."""
success: bool
part_name: str = ""
stl_url: str = ""
step_url: str = ""
threemf_url: str = ""
execution: dict = Field(default_factory=dict)
validation: dict = Field(default_factory=dict)
cam: dict | None = None
gcode_url: str | None = None
error: str | None = None
class ChatTurnResponse(BaseModel):
"""Unified response envelope from all orchestrator chat_turn() methods."""
responses: list[AgentResponse] = Field(default_factory=list)
preview: PreviewData | None = None
design_state: "DesignState"
question_cards: list[GeneratedQuestionCard] = Field(default_factory=list)
from agents.design_state import DesignState # noqa: E402
ChatTurnResponse.model_rebuild()
# ── Pure helper functions ─────────────────────────────────────────────────────
ADVISOR_IDS: frozenset[str] = frozenset({"design", "engineering", "cnc"})
GENERATOR_IDS: frozenset[str] = frozenset({"cad", "cam"})
def extract_code(text: str) -> str | None:
"""Return the first Python/CadQuery code block found in *text*, or None.
Searches for fenced code blocks (``` ... ```) first. Falls back to
returning the whole text when it contains characteristic CadQuery markers.
"""
match = re.search(r"```(?:python)?\s*\n(.*?)```", text, re.DOTALL)
if match:
return match.group(1).strip()
if any(marker in text for marker in ["import cadquery", "cq.", "result ="]):
return text.strip()
return None
def route_agents(
message: str,
mentions: list[str],
is_approved_phase: bool,
) -> list[str]:
"""Return the ordered list of agent ids that should handle *message*.
Priority order:
1. Approved phase → fixed set from config
2. Explicit @mentions → use as-is
3. Keyword scoring → up to *max_active_agents* highest-scoring agents,
with a fallback to ["design", "engineering"] when nothing matches
4. CAD trigger keywords → append "cad" if not already present
"""
if is_approved_phase:
return list(settings.planning.approved_agents)
if mentions:
return list(mentions)
lower = message.lower()
keywords: dict[str, list[str]] = settings.routing.keywords
max_agents: int = settings.orchestration.max_active_agents
scores: dict[str, int] = {agent_id: 0 for agent_id in keywords}
for agent_id, kws in keywords.items():
for kw in kws:
if kw in lower:
scores[agent_id] += 1
active = [
aid
for aid, score in sorted(scores.items(), key=lambda x: -x[1])
if score > 0
]
if not active:
active = ["design", "engineering"]
active = active[:max_agents]
cad_triggers: list[str] = settings.routing.cad_trigger_keywords
if "cad" not in active and any(kw in lower for kw in cad_triggers):
active.append("cad")
return active
def check_readiness(
advisor_responses: list[AgentResponse],
active_agent_ids: list[str],
) -> str:
"""Return a readiness verdict for the generation phase.
Returns:
"SKIP_GENERATION" — no generator agent is active; skip generation.
"NOT_READY" — at least one advisor flagged missing information.
"READY" — all advisors are satisfied; proceed with generation.
"""
has_generators = bool(GENERATOR_IDS & set(active_agent_ids))
if not has_generators:
return "SKIP_GENERATION"
for resp in advisor_responses:
if resp.message.strip().upper().startswith("NOT READY:"):
return "NOT_READY"
return "READY"
def collect_responses(
advisor_responses: list[AgentResponse],
cad_response: AgentResponse | None,
cam_response: AgentResponse | None,
) -> list[AgentResponse]:
"""Merge advisor and generator responses into a single ordered list.
Advisor responses come first, followed by the CAD response (if any),
then the CAM response (if any).
"""
result = list(advisor_responses)
if cad_response is not None:
result.append(cad_response)
if cam_response is not None:
result.append(cam_response)
return result
# ── Flow class ───────────────────────────────────────────────────────────────
class AgentDispatchFlow(Flow[AgentFlowState]):
"""Flow-based agent dispatch replacing Crew.kickoff() + RoutingEngine."""
@start()
def prepare_agents(self):
"""Load wiki knowledge sources."""
for filename in ("cutting-parameters.md", "gcode-reference.md"):
path = WIKI_DIR / filename
if path.exists():
self.state.knowledge_sources_data.append(path.read_text())
@router(prepare_agents)
def route_message(self):
"""Select agents and return path."""
self.state.active_agent_ids = route_agents(
self.state.message, self.state.mentions, self.state.is_approved_phase,
)
if not self.state.active_agent_ids:
return "NO_AGENTS"
has_advisors = bool(ADVISOR_IDS & set(self.state.active_agent_ids))
has_generators = bool(GENERATOR_IDS & set(self.state.active_agent_ids))
if has_advisors:
return "HAS_ADVISORS"
if has_generators:
return "GENERATORS_ONLY"
return "NO_AGENTS"
@listen("HAS_ADVISORS")
def run_advisors(self):
"""Run advisory agents as one Crew."""
advisor_ids = [aid for aid in self.state.active_agent_ids if aid in ADVISOR_IDS]
if not advisor_ids:
return
self.state.advisor_responses = self._run_advisor_crew(advisor_ids)
@router(run_advisors)
def check_readiness_router(self):
"""Gate generation based on advisor responses."""
return check_readiness(self.state.advisor_responses, self.state.active_agent_ids)
@listen("READY")
def run_cad(self):
self._run_cad_step()
@listen("NOT_READY")
def run_cad_not_ready(self):
self._run_cad_step()
@listen("SKIP_GENERATION")
def skip_generation(self):
pass
@listen(run_cad)
def run_cam(self):
self._run_cam_step()
@listen(run_cad_not_ready)
def skip_cam_after_not_ready(self):
pass
@listen("GENERATORS_ONLY")
def run_cad_gen_only(self):
self._run_cad_step()
@listen(run_cad_gen_only)
def run_cam_gen_only(self):
self._run_cam_step()
@listen("NO_AGENTS")
def no_agents(self):
pass
@listen(or_(run_cam, skip_cam_after_not_ready, skip_generation,
run_cam_gen_only, no_agents))
def collect_results(self):
pass # Responses already on state
# ── Private helpers ──────────────────────────────────────────────
_memory = None # Set by CrewOrchestrator before kickoff
def _recall_for_agent(self, agent_id: str) -> str:
"""Recall relevant memories for this agent, formatted as context."""
if self._memory is None:
return ""
try:
from config.settings import settings
matches = self._memory.recall(
self.state.message,
scope=f"/agent/{agent_id}",
limit=settings.memory.recall_limit,
depth=settings.memory.recall_depth,
)
except Exception:
return ""
if not matches:
return ""
lines = [f"- {m.record.content}" for m in matches]
return "## Relevant context from prior turns\n" + "\n".join(lines)
def _remember_response(self, agent_id: str, content: str):
"""Store an agent's response in its scoped memory."""
if self._memory is None:
return
try:
self._memory.remember(content, scope=f"/agent/{agent_id}")
except Exception:
pass
def _build_llm(self):
from crewai import LLM
return LLM(model=self.state.model_str, temperature=settings.temperature)
def _build_knowledge_sources(self):
sources = []
try:
from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
for content in self.state.knowledge_sources_data:
sources.append(StringKnowledgeSource(content=content))
except ImportError:
pass
return sources
def _build_crew_agent(self, agent_id, llm):
"""Create a CrewAI Agent + Task for the given agent_id."""
from crewai import Agent, Task
from agents.tools import (
QueryDesignStateTool, ExecuteCadTool,
ValidateCadTool, GenerateGcodeTool,
)
from core.cam import CAMPlan as CAMPlanModel
agent_def = AGENTS[agent_id]
tools = [QueryDesignStateTool()]
extra_backstory = ""
task_output_pydantic = None
if agent_id == "cad":
tools.extend([ExecuteCadTool(), ValidateCadTool()])
from core.cadquery_prompts import CADQUERY_SYSTEM_PROMPT
extra_backstory = (
"\n\nBefore deciding if specs are sufficient, ALWAYS call the "
"Query Design State tool first to check what the orchestrator "
"already knows (dimensions, material, features). Only say "
"NOT READY if the tool confirms information is truly missing.\n\n"
"When generating code, use the Execute CadQuery Code tool "
"to test your code. If it fails, fix the errors and try again. "
"Use the Validate CNC Manufacturability tool to check the result. "
"Output ONLY valid CadQuery Python that assigns result to a "
"cq.Workplane. Import cadquery as cq.\n\n"
f"CadQuery reference:\n{CADQUERY_SYSTEM_PROMPT}"
)
elif agent_id == "cnc":
extra_backstory = (
"\n\nBefore deciding if manufacturability info is sufficient, "
"ALWAYS call the Query Design State tool first to check what "
"the orchestrator already knows (material, dimensions, features, "
"constraints, axis). Only say NOT READY if the tool confirms "
"information is truly missing."
)
elif agent_id == "cam":
tools.append(GenerateGcodeTool())
task_output_pydantic = CAMPlanModel
elif agent_id in ("design", "engineering"):
extra_backstory = (
"\n\nBefore asking clarifying questions, call the Query Design "
"State tool to check what is already known. Do NOT ask about "
"fields the tool shows as already provided."
)
knowledge_sources = self._build_knowledge_sources() if agent_id in ("cnc", "cam") else []
# Advisory agents get system_template with question-abstraction
# guidance via CrewAI's prompt customisation (see _ADVISOR_SYSTEM_TEMPLATE).
is_advisor = agent_id in ADVISOR_IDS
crew_agent = Agent(
role=agent_def.role,
goal=agent_def.goal,
backstory=agent_def.backstory + extra_backstory,
llm=llm,
tools=tools,
verbose=False,
allow_delegation=settings.crew.collaboration and is_advisor,
knowledge_sources=knowledge_sources if knowledge_sources else None,
system_template=_ADVISOR_SYSTEM_TEMPLATE if is_advisor else None,
prompt_template=_ADVISOR_PROMPT_TEMPLATE if is_advisor else None,
)
memories = self._recall_for_agent(agent_id)
context_parts = [self.state.context]
if memories:
context_parts.append(memories)
task_description = "\n\n".join(context_parts) + "\n\n"
task_description += (
f"As the {agent_def.role}, respond to the user's latest message. "
f"Keep your response concise (2-4 sentences). "
f"Do NOT repeat anything from the conversation history. "
f"Add NEW information from your expertise.\n\n"
f"Build on other agents' input — agree, disagree, refine, or add."
)
if agent_id == "cad":
task_description += (
"\n\nFIRST call the Query Design State tool to check what the "
"orchestrator already knows. Use the returned 'known' fields "
"as your specs. Only say 'NOT READY:' listing truly missing "
"items if the tool shows critical gaps (no shape, no dimensions, "
"no features). If enough info exists, generate CadQuery code and "
"use the Execute CadQuery Code tool to verify it works."
)
elif agent_id == "cam":
task_description += (
"\n\nFIRST call the Query Design State tool to check available "
"specs. If there is no CAD model generated yet or the tool shows "
"critical gaps (no material, no dimensions), start with "
"'NOT READY:' and list only the truly missing items. "
"If enough info exists, analyze the part geometry and create an "
"optimal machining strategy. Select operations in order (roughing "
"before finishing). Use the Generate G-code Toolpath tool."
)
elif agent_id == "cnc":
task_description += (
"\n\nFIRST call the Query Design State tool to check what the "
"orchestrator already knows about dimensions, material, and "
"constraints. Only say 'NOT READY:' listing truly missing items "
"if the tool confirms critical gaps. If enough info exists, "
"provide your manufacturability assessment."
)
else:
task_description += (
"\n\nFIRST call the Query Design State tool to see what is "
"already known. Only ask clarifying questions about fields "
"the tool shows as missing in YOUR domain."
)
if agent_id == "cad":
expected_output = "Valid CadQuery Python code or a 'NOT READY:' message."
elif agent_id in ("cnc", "cam"):
expected_output = "A concise expert assessment or a 'NOT READY:' message listing missing items."
else:
expected_output = "A concise response from your expert perspective (2-4 sentences)."
task = Task(
description=task_description,
expected_output=expected_output,
agent=crew_agent,
output_pydantic=task_output_pydantic,
)
return crew_agent, task
def _run_advisor_crew(self, advisor_ids):
from crewai import Crew, Process
llm = self._build_llm()
pairs = [self._build_crew_agent(aid, llm) for aid in advisor_ids]
crew = Crew(
agents=[p[0] for p in pairs],
tasks=[p[1] for p in pairs],
process=Process.sequential,
planning=settings.crew.planning,
planning_llm=self._build_llm(),
verbose=False,
)
crew_result = crew.kickoff()
responses = []
task_outputs = crew_result.tasks_output if hasattr(crew_result, 'tasks_output') else []
for i, agent_id in enumerate(advisor_ids):
raw = str(task_outputs[i]) if i < len(task_outputs) else (str(crew_result) if i == 0 else "")
if raw.strip():
responses.append(AgentResponse.from_agent(agent_id, raw.strip()))
self._remember_response(agent_id, raw.strip())
return responses
def _run_single_agent_crew(self, agent_id):
"""Run a single agent as a one-agent Crew. Returns raw output string.
If the task was created with output_pydantic and CrewAI returns a
pydantic model, serialises it to JSON so callers get a parseable string.
"""
from crewai import Crew, Process
llm = self._build_llm()
crew_agent, task = self._build_crew_agent(agent_id, llm)
crew = Crew(
agents=[crew_agent],
tasks=[task],
process=Process.sequential,
planning=settings.crew.planning,
planning_llm=self._build_llm(),
verbose=False,
)
crew_result = crew.kickoff()
task_outputs = crew_result.tasks_output if hasattr(crew_result, 'tasks_output') else []
if not task_outputs:
return str(crew_result).strip()
output = task_outputs[0]
# CrewAI may return a pydantic model when output_pydantic is set
if hasattr(output, 'pydantic') and output.pydantic is not None:
return output.pydantic.model_dump_json()
return str(output).strip()
def _run_cad_step(self):
if "cad" not in self.state.active_agent_ids:
return
raw_output = self._run_single_agent_crew("cad")
if not raw_output:
return
if raw_output.upper().startswith("NOT READY:"):
self.state.cad_response = AgentResponse.from_agent("cad", raw_output)
else:
code = extract_code(raw_output)
if code:
self.state.cad_response = AgentResponse.from_agent("cad", "Model generated.", code=code)
self.state.cad_code = code
else:
self.state.cad_response = AgentResponse.from_agent("cad", raw_output)
if self.state.cad_response is not None:
self._remember_response("cad", raw_output)
def _run_cam_step(self):
if "cam" not in self.state.active_agent_ids:
return
if self.state.cad_code is None:
return
raw_output = self._run_single_agent_crew("cam")
if not raw_output:
return
from core.cam import CAMPlan as CAMPlanModel
try:
import json
plan_data = json.loads(raw_output)
cam_plan = CAMPlanModel(**plan_data)
self.state.cam_plan = cam_plan
self.state.cam_response = AgentResponse.from_agent(
"cam",
f"Machining plan: {', '.join(cam_plan.operations)} | "
f"{cam_plan.tool_diameter}mm endmill | {cam_plan.post_processor}",
)
except (json.JSONDecodeError, ValueError, KeyError):
self.state.cam_response = AgentResponse.from_agent("cam", raw_output)
if self.state.cam_response is not None:
self._remember_response("cam", raw_output)