Spaces:
Sleeping
Sleeping
| """Tests for agents/agent_flow.py — models, routing, readiness, and collection helpers.""" | |
| from __future__ import annotations | |
| from unittest.mock import patch, MagicMock | |
| import pytest | |
| from agents.agent_flow import ( | |
| ADVISOR_IDS, | |
| GENERATOR_IDS, | |
| AgentDispatchFlow, | |
| AgentFlowState, | |
| AgentResponse, | |
| ChatTurnResponse, | |
| PreviewData, | |
| check_readiness, | |
| collect_responses, | |
| extract_code, | |
| route_agents, | |
| ) | |
| from agents.design_state import DesignState | |
| from agents.gap_analyzer import GeneratedQuestionCard | |
| # ── Helpers ─────────────────────────────────────────────────────────────────── | |
| def _make_response( | |
| agent_id: str = "design", | |
| message: str = "Looks good.", | |
| code: str | None = None, | |
| ) -> AgentResponse: | |
| return AgentResponse.from_agent(agent_id, message, code) | |
| # ── TestAgentResponse ───────────────────────────────────────────────────────── | |
| class TestAgentResponse: | |
| def test_from_agent_populates_fields(self): | |
| resp = AgentResponse.from_agent("design", "Hello design world") | |
| assert resp.agent_id == "design" | |
| assert resp.agent_name == "Design Agent" | |
| assert resp.message == "Hello design world" | |
| assert resp.color == "#7c3aed" | |
| assert resp.avatar == "DA" | |
| assert resp.code is None | |
| def test_from_agent_with_code(self): | |
| code = "import cadquery as cq\nresult = cq.Workplane('XY').box(10, 10, 10)" | |
| resp = AgentResponse.from_agent("cad", "Here is the code", code=code) | |
| assert resp.agent_id == "cad" | |
| assert resp.code == code | |
| def test_engineering_agent_fields(self): | |
| resp = AgentResponse.from_agent("engineering", "Wall needs to be 3 mm thick") | |
| assert resp.agent_name == "Engineering Agent" | |
| assert resp.color == "#00b4d8" | |
| assert resp.avatar == "EA" | |
| def test_cnc_agent_fields(self): | |
| resp = AgentResponse.from_agent("cnc", "3-axis is fine here") | |
| assert resp.agent_name == "CNC Agent" | |
| assert resp.color == "#00e676" | |
| assert resp.avatar == "CA" | |
| def test_cam_agent_fields(self): | |
| resp = AgentResponse.from_agent("cam", "Roughing then finishing") | |
| assert resp.agent_name == "CAM Agent" | |
| assert resp.color == "#ff6b35" | |
| assert resp.avatar == "CM" | |
| def test_cad_agent_fields(self): | |
| resp = AgentResponse.from_agent("cad", "Generated model") | |
| assert resp.agent_name == "CAD Coder" | |
| assert resp.color == "#ffab40" | |
| assert resp.avatar == "CC" | |
| def test_model_dump_keys(self): | |
| resp = AgentResponse.from_agent("design", "A message") | |
| data = resp.model_dump() | |
| assert set(data.keys()) == { | |
| "agent_id", "agent_name", "message", "color", "avatar", "code" | |
| } | |
| # ── TestAgentFlowState ──────────────────────────────────────────────────────── | |
| class TestAgentFlowState: | |
| def test_defaults(self): | |
| state = AgentFlowState() | |
| assert state.message == "" | |
| assert state.context == "" | |
| assert state.model_str == "" | |
| assert state.mentions == [] | |
| assert state.is_approved_phase is False | |
| assert state.active_agent_ids == [] | |
| assert state.knowledge_sources_data == [] | |
| assert state.advisor_responses == [] | |
| assert state.cad_response is None | |
| assert state.cam_response is None | |
| assert state.cad_code is None | |
| assert state.cam_plan is None | |
| def test_with_inputs(self): | |
| resp = _make_response("engineering", "Solid choice.") | |
| state = AgentFlowState( | |
| message="Make a bracket", | |
| mentions=["design", "engineering"], | |
| is_approved_phase=True, | |
| active_agent_ids=["design", "engineering", "cad"], | |
| advisor_responses=[resp], | |
| ) | |
| assert state.message == "Make a bracket" | |
| assert state.mentions == ["design", "engineering"] | |
| assert state.is_approved_phase is True | |
| assert "cad" in state.active_agent_ids | |
| assert len(state.advisor_responses) == 1 | |
| def test_independent_default_lists(self): | |
| """Mutable defaults must not be shared between instances.""" | |
| s1 = AgentFlowState() | |
| s2 = AgentFlowState() | |
| s1.mentions.append("design") | |
| assert s2.mentions == [] | |
| # ── TestExtractCode ─────────────────────────────────────────────────────────── | |
| class TestExtractCode: | |
| def test_fenced_python_block(self): | |
| text = "Here is the code:\n```python\nimport cadquery as cq\nresult = cq.Workplane('XY').box(1,1,1)\n```\nDone." | |
| code = extract_code(text) | |
| assert code is not None | |
| assert "import cadquery" in code | |
| assert "```" not in code | |
| def test_generic_fence_no_language(self): | |
| text = "```\ncq.Workplane('XY').box(5,5,5)\n```" | |
| code = extract_code(text) | |
| assert code is not None | |
| assert "cq.Workplane" in code | |
| def test_unfenced_cq_code_import_marker(self): | |
| text = "import cadquery as cq\nresult = cq.Workplane('XY').box(2,2,2)" | |
| code = extract_code(text) | |
| assert code == text.strip() | |
| def test_unfenced_cq_dot_marker(self): | |
| text = "cq.Workplane('XY').box(1,1,1)" | |
| code = extract_code(text) | |
| assert code == text.strip() | |
| def test_unfenced_result_marker(self): | |
| text = "result = cq.Workplane('XY').box(3,3,3)" | |
| code = extract_code(text) | |
| assert code == text.strip() | |
| def test_plain_text_returns_none(self): | |
| text = "I need more information about the dimensions before I can proceed." | |
| code = extract_code(text) | |
| assert code is None | |
| def test_empty_string_returns_none(self): | |
| assert extract_code("") is None | |
| def test_strips_whitespace_from_fenced_block(self): | |
| text = "```python\n\n result = 1\n\n```" | |
| code = extract_code(text) | |
| assert code == "result = 1" | |
| # ── TestRouteAgents ─────────────────────────────────────────────────────────── | |
| class TestRouteAgents: | |
| def test_approved_phase_returns_config_agents(self): | |
| active = route_agents("anything", [], is_approved_phase=True) | |
| # config.yaml: approved_agents: ["cad", "cnc"] | |
| assert set(active) == {"cad", "cnc"} | |
| def test_mentions_override_keywords(self): | |
| active = route_agents( | |
| "design shape and engineering material", | |
| mentions=["cnc"], | |
| is_approved_phase=False, | |
| ) | |
| assert active == ["cnc"] | |
| def test_design_keyword(self): | |
| active = route_agents("I want a nice shape and design", [], False) | |
| assert "design" in active | |
| def test_engineering_keyword(self): | |
| active = route_agents("what material and wall thickness should I use?", [], False) | |
| assert "engineering" in active | |
| def test_cnc_keyword(self): | |
| active = route_agents("can we machine this on a 5-axis cnc mill?", [], False) | |
| assert "cnc" in active | |
| def test_cam_keyword(self): | |
| active = route_agents("I need a toolpath and gcode output", [], False) | |
| assert "cam" in active | |
| def test_default_no_keyword_match(self): | |
| active = route_agents("hello there", [], False) | |
| assert "design" in active | |
| assert "engineering" in active | |
| def test_max_agents_capped_at_3(self): | |
| # Message with keywords for design, engineering, cnc, cam — max is 3 | |
| msg = "design the shape, check the material thickness, machine it on a cnc, toolpath and gcode" | |
| active = route_agents(msg, [], False) | |
| # CAD-trigger words absent → at most 3 from keyword scoring | |
| # (cad trigger could add a 4th only if triggered — this message has none) | |
| non_cad = [a for a in active if a != "cad"] | |
| assert len(non_cad) <= 3 | |
| def test_cad_trigger_appended(self): | |
| active = route_agents("generate the model for me", [], False) | |
| assert "cad" in active | |
| def test_no_cad_trigger_absent(self): | |
| active = route_agents("what is the wall thickness?", [], False) | |
| # "generate" etc. not present — cad might still appear via keyword score | |
| # but should NOT be added by the trigger path if already at max capacity | |
| # Just verify the function does not crash and returns a non-empty list | |
| assert len(active) > 0 | |
| def test_cad_not_duplicated_when_already_in_keywords(self): | |
| # Even if "cad" is in keyword scores AND a trigger word is present, | |
| # it should appear only once. | |
| active = route_agents("generate a model using cad", [], False) | |
| assert active.count("cad") <= 1 | |
| def test_approved_phase_ignores_mentions(self): | |
| # approved phase always wins | |
| active = route_agents("", ["design"], is_approved_phase=True) | |
| assert set(active) == {"cad", "cnc"} | |
| # ── TestCheckReadiness ──────────────────────────────────────────────────────── | |
| class TestCheckReadiness: | |
| def test_ready_when_clean(self): | |
| responses = [ | |
| _make_response("design", "Looks good, let's proceed"), | |
| _make_response("engineering", "Dimensions are confirmed"), | |
| ] | |
| result = check_readiness(responses, active_agent_ids=["design", "engineering", "cad"]) | |
| assert result == "READY" | |
| def test_not_ready_when_flagged(self): | |
| responses = [ | |
| _make_response("design", "Nice shape"), | |
| _make_response("cad", "NOT READY: missing wall thickness and material"), | |
| ] | |
| result = check_readiness(responses, active_agent_ids=["design", "cad"]) | |
| assert result == "NOT_READY" | |
| def test_skip_generation_when_no_generators(self): | |
| responses = [_make_response("design", "All good")] | |
| result = check_readiness(responses, active_agent_ids=["design", "engineering"]) | |
| assert result == "SKIP_GENERATION" | |
| def test_not_ready_case_insensitive(self): | |
| # The check uses .upper() so mixed-case prefix must still trigger NOT_READY | |
| responses = [_make_response("cad", "not ready: dimensions missing")] | |
| result = check_readiness(responses, active_agent_ids=["cad"]) | |
| assert result == "NOT_READY" | |
| def test_not_ready_leading_whitespace_stripped(self): | |
| responses = [_make_response("cad", " NOT READY: need more info")] | |
| result = check_readiness(responses, active_agent_ids=["cad"]) | |
| assert result == "NOT_READY" | |
| def test_cam_counts_as_generator(self): | |
| responses = [_make_response("cnc", "Setup looks fine")] | |
| result = check_readiness(responses, active_agent_ids=["cnc", "cam"]) | |
| assert result == "READY" | |
| def test_empty_responses_with_generators_is_ready(self): | |
| result = check_readiness([], active_agent_ids=["cad"]) | |
| assert result == "READY" | |
| def test_empty_active_ids_skips(self): | |
| result = check_readiness([], active_agent_ids=[]) | |
| assert result == "SKIP_GENERATION" | |
| # ── TestCollectResponses ────────────────────────────────────────────────────── | |
| class TestCollectResponses: | |
| def test_merges_all(self): | |
| advisors = [ | |
| _make_response("design", "Shape confirmed"), | |
| _make_response("engineering", "Material confirmed"), | |
| ] | |
| cad = _make_response("cad", "Generated model", code="result = ...") | |
| cam = _make_response("cam", "Toolpath ready") | |
| result = collect_responses(advisors, cad, cam) | |
| assert len(result) == 4 | |
| assert result[0].agent_id == "design" | |
| assert result[1].agent_id == "engineering" | |
| assert result[2].agent_id == "cad" | |
| assert result[3].agent_id == "cam" | |
| def test_handles_none_cad(self): | |
| advisors = [_make_response("design", "OK")] | |
| cam = _make_response("cam", "Plan ready") | |
| result = collect_responses(advisors, cad_response=None, cam_response=cam) | |
| assert len(result) == 2 | |
| assert result[1].agent_id == "cam" | |
| def test_handles_none_cam(self): | |
| advisors = [_make_response("engineering", "OK")] | |
| cad = _make_response("cad", "Model done") | |
| result = collect_responses(advisors, cad_response=cad, cam_response=None) | |
| assert len(result) == 2 | |
| assert result[1].agent_id == "cad" | |
| def test_handles_both_none(self): | |
| advisors = [_make_response("cnc", "Ready")] | |
| result = collect_responses(advisors, cad_response=None, cam_response=None) | |
| assert len(result) == 1 | |
| assert result[0].agent_id == "cnc" | |
| def test_handles_empty_advisors(self): | |
| cad = _make_response("cad", "Model done") | |
| cam = _make_response("cam", "Plan done") | |
| result = collect_responses([], cad, cam) | |
| assert len(result) == 2 | |
| def test_does_not_mutate_input_list(self): | |
| advisors = [_make_response("design", "OK")] | |
| original_len = len(advisors) | |
| collect_responses(advisors, _make_response("cad", "x"), None) | |
| assert len(advisors) == original_len | |
| def test_all_none_returns_empty(self): | |
| result = collect_responses([], None, None) | |
| assert result == [] | |
| # ── TestAgentDispatchFlow ──────────────────────────────────────────────────── | |
| class TestAgentDispatchFlow: | |
| def test_no_agents_path(self): | |
| """Flow with no matching agents routes through HAS_ADVISORS (default fallback).""" | |
| flow = AgentDispatchFlow(initial_state=AgentFlowState( | |
| message="xyzzy", | |
| context="", | |
| model_str="gemini/gemini-2.5-flash", | |
| )) | |
| with patch.object(AgentDispatchFlow, '_run_advisor_crew', return_value=[ | |
| AgentResponse.from_agent("design", "I can help."), | |
| ]): | |
| flow.kickoff() | |
| assert flow.state.active_agent_ids == ["design", "engineering"] | |
| assert len(flow.state.advisor_responses) == 1 | |
| def test_approved_phase_routes_generators_only(self): | |
| flow = AgentDispatchFlow(initial_state=AgentFlowState( | |
| message="build it", | |
| context="## APPROVED PLAN", | |
| model_str="gemini/gemini-2.5-flash", | |
| is_approved_phase=True, | |
| )) | |
| with patch.object(AgentDispatchFlow, '_run_advisor_crew', return_value=[ | |
| AgentResponse.from_agent("cnc", "Looks machinable."), | |
| ]), patch.object(AgentDispatchFlow, '_run_single_agent_crew', return_value="NOT READY: need dims"): | |
| flow.kickoff() | |
| assert flow.state.active_agent_ids == ["cad", "cnc"] | |
| assert flow.state.cad_response is not None | |
| assert flow.state.cad_response.message.startswith("NOT READY") | |
| def test_mentions_override(self): | |
| flow = AgentDispatchFlow(initial_state=AgentFlowState( | |
| message="check", | |
| context="", | |
| model_str="gemini/gemini-2.5-flash", | |
| mentions=["design"], | |
| )) | |
| with patch.object(AgentDispatchFlow, '_run_advisor_crew', return_value=[ | |
| AgentResponse.from_agent("design", "Looks good."), | |
| ]): | |
| flow.kickoff() | |
| assert flow.state.active_agent_ids == ["design"] | |
| def test_generators_only_path(self): | |
| """Mentions with only generators routes through GENERATORS_ONLY.""" | |
| flow = AgentDispatchFlow(initial_state=AgentFlowState( | |
| message="generate it", | |
| context="", | |
| model_str="gemini/gemini-2.5-flash", | |
| mentions=["cad"], | |
| )) | |
| with patch.object(AgentDispatchFlow, '_run_single_agent_crew', return_value="NOT READY: need specs"): | |
| flow.kickoff() | |
| assert flow.state.active_agent_ids == ["cad"] | |
| assert flow.state.advisor_responses == [] | |
| assert flow.state.cad_response is not None | |
| def test_collect_results_from_advisor_and_cad(self): | |
| flow = AgentDispatchFlow(initial_state=AgentFlowState( | |
| message="generate a bracket preview", | |
| context="", | |
| model_str="gemini/gemini-2.5-flash", | |
| )) | |
| with patch.object(AgentDispatchFlow, '_run_advisor_crew', return_value=[ | |
| AgentResponse.from_agent("design", "L-bracket idea."), | |
| ]), patch.object(AgentDispatchFlow, '_run_single_agent_crew', return_value="```python\nimport cadquery as cq\nresult = cq.Workplane('XY').box(10,10,10)\n```"): | |
| flow.kickoff() | |
| results = collect_responses( | |
| flow.state.advisor_responses, | |
| flow.state.cad_response, | |
| flow.state.cam_response, | |
| ) | |
| assert len(results) >= 2 # advisor + cad | |
| assert flow.state.cad_code is not None | |
| class TestMemoryHelpers: | |
| def test_recall_returns_empty_when_no_memory(self): | |
| flow = AgentDispatchFlow(initial_state=AgentFlowState( | |
| message="bracket design", | |
| model_str="gemini/gemini-2.5-flash", | |
| )) | |
| flow._memory = None | |
| result = flow._recall_for_agent("design") | |
| assert result == "" | |
| def test_recall_formats_matches(self): | |
| mock_memory = MagicMock() | |
| mock_match = MagicMock() | |
| mock_match.record.content = "L-bracket with fillets" | |
| mock_memory.recall.return_value = [mock_match] | |
| flow = AgentDispatchFlow(initial_state=AgentFlowState( | |
| message="bracket", | |
| model_str="gemini/gemini-2.5-flash", | |
| )) | |
| flow._memory = mock_memory | |
| result = flow._recall_for_agent("design") | |
| assert "## Relevant context from prior turns" in result | |
| assert "L-bracket with fillets" in result | |
| mock_memory.recall.assert_called_once() | |
| def test_recall_returns_empty_when_no_matches(self): | |
| mock_memory = MagicMock() | |
| mock_memory.recall.return_value = [] | |
| flow = AgentDispatchFlow(initial_state=AgentFlowState( | |
| message="bracket", | |
| model_str="gemini/gemini-2.5-flash", | |
| )) | |
| flow._memory = mock_memory | |
| result = flow._recall_for_agent("design") | |
| assert result == "" | |
| def test_remember_stores_with_scope(self): | |
| mock_memory = MagicMock() | |
| flow = AgentDispatchFlow(initial_state=AgentFlowState( | |
| message="test", | |
| model_str="gemini/gemini-2.5-flash", | |
| )) | |
| flow._memory = mock_memory | |
| flow._remember_response("engineering", "Use 3mm walls in aluminum.") | |
| mock_memory.remember.assert_called_once_with( | |
| "Use 3mm walls in aluminum.", | |
| scope="/agent/engineering", | |
| ) | |
| def test_remember_noop_when_no_memory(self): | |
| flow = AgentDispatchFlow(initial_state=AgentFlowState( | |
| message="test", | |
| model_str="gemini/gemini-2.5-flash", | |
| )) | |
| flow._memory = None | |
| flow._remember_response("design", "test") # Should not raise | |
| class TestCollaborationFlag: | |
| def test_advisors_get_delegation(self): | |
| flow = AgentDispatchFlow(initial_state=AgentFlowState( | |
| message="test", | |
| context="", | |
| model_str="gemini/gemini-2.5-flash", | |
| )) | |
| flow._memory = None | |
| from crewai import LLM | |
| llm = LLM(model="gemini/gemini-2.5-flash", temperature=0.2) | |
| agent, task = flow._build_crew_agent("design", llm) | |
| assert agent.allow_delegation is True | |
| def test_generators_no_delegation(self): | |
| flow = AgentDispatchFlow(initial_state=AgentFlowState( | |
| message="test", | |
| context="", | |
| model_str="gemini/gemini-2.5-flash", | |
| )) | |
| flow._memory = None | |
| from crewai import LLM | |
| llm = LLM(model="gemini/gemini-2.5-flash", temperature=0.2) | |
| agent, task = flow._build_crew_agent("cad", llm) | |
| assert agent.allow_delegation is False | |
| # ── TestPreviewData ───────────────────────────────────────────────────────── | |
| class TestPreviewData: | |
| def test_success_preview(self): | |
| p = PreviewData( | |
| success=True, | |
| part_name="bracket", | |
| stl_url="/api/models/bracket.stl", | |
| step_url="/api/models/bracket.step", | |
| execution={"success": True, "volume_mm3": 1000.0}, | |
| validation={"machinable": True, "axis_recommendation": "3-axis"}, | |
| ) | |
| assert p.success is True | |
| assert p.part_name == "bracket" | |
| def test_failure_preview(self): | |
| p = PreviewData(success=False, error="Execution failed") | |
| assert p.success is False | |
| assert p.error == "Execution failed" | |
| def test_model_dump(self): | |
| p = PreviewData(success=True, part_name="gear") | |
| d = p.model_dump() | |
| assert d["success"] is True | |
| assert d["cam"] is None | |
| assert d["gcode_url"] is None | |
| # ── TestChatTurnResponse ──────────────────────────────────────────────────── | |
| class TestChatTurnResponse: | |
| def test_minimal(self): | |
| r = ChatTurnResponse(design_state=DesignState()) | |
| assert r.responses == [] | |
| assert r.preview is None | |
| assert r.question_cards == [] | |
| def test_full(self): | |
| resp = AgentResponse(agent_id="design", agent_name="D", message="hi", color="#fff", avatar="D") | |
| preview = PreviewData(success=True, part_name="test") | |
| state = DesignState(material="aluminum") | |
| card = GeneratedQuestionCard(category="material", question="What material?", responsible_agent="engineering", agent_name="Eng", agent_color="#00e676") | |
| r = ChatTurnResponse(responses=[resp], preview=preview, design_state=state, question_cards=[card]) | |
| assert len(r.responses) == 1 | |
| assert r.preview.part_name == "test" | |
| assert r.design_state.material == "aluminum" | |
| assert len(r.question_cards) == 1 | |
| def test_model_dump_roundtrip(self): | |
| state = DesignState(part_name="bracket", material="steel") | |
| r = ChatTurnResponse(design_state=state) | |
| d = r.model_dump() | |
| assert d["design_state"]["part_name"] == "bracket" | |
| assert d["responses"] == [] | |
| assert d["preview"] is None | |