| """Tests for DeerFlowClient.""" |
|
|
| import json |
| import tempfile |
| import zipfile |
| from pathlib import Path |
| from unittest.mock import MagicMock, patch |
|
|
| import pytest |
| from langchain_core.messages import AIMessage, HumanMessage, ToolMessage |
|
|
| from src.client import DeerFlowClient |
| from src.gateway.routers.mcp import McpConfigResponse |
| from src.gateway.routers.memory import MemoryConfigResponse, MemoryStatusResponse |
| from src.gateway.routers.models import ModelResponse, ModelsListResponse |
| from src.gateway.routers.skills import SkillInstallResponse, SkillResponse, SkillsListResponse |
| from src.gateway.routers.uploads import UploadResponse |
|
|
| |
| |
| |
|
|
| @pytest.fixture |
| def mock_app_config(): |
| """Provide a minimal AppConfig mock.""" |
| model = MagicMock() |
| model.name = "test-model" |
| model.model_dump.return_value = {"name": "test-model", "use": "langchain_openai:ChatOpenAI"} |
|
|
| config = MagicMock() |
| config.models = [model] |
| return config |
|
|
|
|
| @pytest.fixture |
| def client(mock_app_config): |
| """Create a DeerFlowClient with mocked config loading.""" |
| with patch("src.client.get_app_config", return_value=mock_app_config): |
| return DeerFlowClient() |
|
|
|
|
| |
| |
| |
|
|
| class TestClientInit: |
| def test_default_params(self, client): |
| assert client._model_name is None |
| assert client._thinking_enabled is True |
| assert client._subagent_enabled is False |
| assert client._plan_mode is False |
| assert client._checkpointer is None |
| assert client._agent is None |
|
|
| def test_custom_params(self, mock_app_config): |
| with patch("src.client.get_app_config", return_value=mock_app_config): |
| c = DeerFlowClient( |
| model_name="gpt-4", |
| thinking_enabled=False, |
| subagent_enabled=True, |
| plan_mode=True, |
| ) |
| assert c._model_name == "gpt-4" |
| assert c._thinking_enabled is False |
| assert c._subagent_enabled is True |
| assert c._plan_mode is True |
|
|
| def test_custom_config_path(self, mock_app_config): |
| with ( |
| patch("src.client.reload_app_config") as mock_reload, |
| patch("src.client.get_app_config", return_value=mock_app_config), |
| ): |
| DeerFlowClient(config_path="/tmp/custom.yaml") |
| mock_reload.assert_called_once_with("/tmp/custom.yaml") |
|
|
| def test_checkpointer_stored(self, mock_app_config): |
| cp = MagicMock() |
| with patch("src.client.get_app_config", return_value=mock_app_config): |
| c = DeerFlowClient(checkpointer=cp) |
| assert c._checkpointer is cp |
|
|
|
|
| |
| |
| |
|
|
| class TestConfigQueries: |
| def test_list_models(self, client): |
| result = client.list_models() |
| assert "models" in result |
| assert len(result["models"]) == 1 |
| assert result["models"][0]["name"] == "test-model" |
| |
| assert "display_name" in result["models"][0] |
| assert "supports_thinking" in result["models"][0] |
|
|
| def test_list_skills(self, client): |
| skill = MagicMock() |
| skill.name = "web-search" |
| skill.description = "Search the web" |
| skill.license = "MIT" |
| skill.category = "public" |
| skill.enabled = True |
|
|
| with patch("src.skills.loader.load_skills", return_value=[skill]) as mock_load: |
| result = client.list_skills() |
| mock_load.assert_called_once_with(enabled_only=False) |
|
|
| assert "skills" in result |
| assert len(result["skills"]) == 1 |
| assert result["skills"][0] == { |
| "name": "web-search", |
| "description": "Search the web", |
| "license": "MIT", |
| "category": "public", |
| "enabled": True, |
| } |
|
|
| def test_list_skills_enabled_only(self, client): |
| with patch("src.skills.loader.load_skills", return_value=[]) as mock_load: |
| client.list_skills(enabled_only=True) |
| mock_load.assert_called_once_with(enabled_only=True) |
|
|
| def test_get_memory(self, client): |
| memory = {"version": "1.0", "facts": []} |
| with patch("src.agents.memory.updater.get_memory_data", return_value=memory) as mock_mem: |
| result = client.get_memory() |
| mock_mem.assert_called_once() |
| assert result == memory |
|
|
|
|
| |
| |
| |
|
|
| def _make_agent_mock(chunks: list[dict]): |
| """Create a mock agent whose .stream() yields the given chunks.""" |
| agent = MagicMock() |
| agent.stream.return_value = iter(chunks) |
| return agent |
|
|
|
|
| def _ai_events(events): |
| """Filter messages-tuple events with type=ai and non-empty content.""" |
| return [e for e in events if e.type == "messages-tuple" and e.data.get("type") == "ai" and e.data.get("content")] |
|
|
|
|
| def _tool_call_events(events): |
| """Filter messages-tuple events with type=ai and tool_calls.""" |
| return [e for e in events if e.type == "messages-tuple" and e.data.get("type") == "ai" and "tool_calls" in e.data] |
|
|
|
|
| def _tool_result_events(events): |
| """Filter messages-tuple events with type=tool.""" |
| return [e for e in events if e.type == "messages-tuple" and e.data.get("type") == "tool"] |
|
|
|
|
| class TestStream: |
| def test_basic_message(self, client): |
| """stream() emits messages-tuple + values + end for a simple AI reply.""" |
| ai = AIMessage(content="Hello!", id="ai-1") |
| chunks = [ |
| {"messages": [HumanMessage(content="hi", id="h-1")]}, |
| {"messages": [HumanMessage(content="hi", id="h-1"), ai]}, |
| ] |
| agent = _make_agent_mock(chunks) |
|
|
| with ( |
| patch.object(client, "_ensure_agent"), |
| patch.object(client, "_agent", agent), |
| ): |
| events = list(client.stream("hi", thread_id="t1")) |
|
|
| types = [e.type for e in events] |
| assert "messages-tuple" in types |
| assert "values" in types |
| assert types[-1] == "end" |
| msg_events = _ai_events(events) |
| assert msg_events[0].data["content"] == "Hello!" |
|
|
| def test_tool_call_and_result(self, client): |
| """stream() emits messages-tuple events for tool calls and results.""" |
| ai = AIMessage(content="", id="ai-1", tool_calls=[{"name": "bash", "args": {"cmd": "ls"}, "id": "tc-1"}]) |
| tool = ToolMessage(content="file.txt", id="tm-1", tool_call_id="tc-1", name="bash") |
| ai2 = AIMessage(content="Here are the files.", id="ai-2") |
|
|
| chunks = [ |
| {"messages": [HumanMessage(content="list files", id="h-1"), ai]}, |
| {"messages": [HumanMessage(content="list files", id="h-1"), ai, tool]}, |
| {"messages": [HumanMessage(content="list files", id="h-1"), ai, tool, ai2]}, |
| ] |
| agent = _make_agent_mock(chunks) |
|
|
| with ( |
| patch.object(client, "_ensure_agent"), |
| patch.object(client, "_agent", agent), |
| ): |
| events = list(client.stream("list files", thread_id="t2")) |
|
|
| assert len(_tool_call_events(events)) >= 1 |
| assert len(_tool_result_events(events)) >= 1 |
| assert len(_ai_events(events)) >= 1 |
| assert events[-1].type == "end" |
|
|
| def test_values_event_with_title(self, client): |
| """stream() emits values event containing title when present in state.""" |
| ai = AIMessage(content="ok", id="ai-1") |
| chunks = [ |
| {"messages": [HumanMessage(content="hi", id="h-1"), ai], "title": "Greeting"}, |
| ] |
| agent = _make_agent_mock(chunks) |
|
|
| with ( |
| patch.object(client, "_ensure_agent"), |
| patch.object(client, "_agent", agent), |
| ): |
| events = list(client.stream("hi", thread_id="t3")) |
|
|
| values_events = [e for e in events if e.type == "values"] |
| assert len(values_events) >= 1 |
| assert values_events[-1].data["title"] == "Greeting" |
| assert "messages" in values_events[-1].data |
|
|
| def test_deduplication(self, client): |
| """Messages with the same id are not emitted twice.""" |
| ai = AIMessage(content="Hello!", id="ai-1") |
| chunks = [ |
| {"messages": [HumanMessage(content="hi", id="h-1"), ai]}, |
| {"messages": [HumanMessage(content="hi", id="h-1"), ai]}, |
| ] |
| agent = _make_agent_mock(chunks) |
|
|
| with ( |
| patch.object(client, "_ensure_agent"), |
| patch.object(client, "_agent", agent), |
| ): |
| events = list(client.stream("hi", thread_id="t4")) |
|
|
| msg_events = _ai_events(events) |
| assert len(msg_events) == 1 |
|
|
| def test_auto_thread_id(self, client): |
| """stream() auto-generates a thread_id if not provided.""" |
| agent = _make_agent_mock([{"messages": [AIMessage(content="ok", id="ai-1")]}]) |
|
|
| with ( |
| patch.object(client, "_ensure_agent"), |
| patch.object(client, "_agent", agent), |
| ): |
| events = list(client.stream("hi")) |
|
|
| |
| assert events[-1].type == "end" |
|
|
| def test_list_content_blocks(self, client): |
| """stream() handles AIMessage with list-of-blocks content.""" |
| ai = AIMessage( |
| content=[ |
| {"type": "thinking", "thinking": "hmm"}, |
| {"type": "text", "text": "result"}, |
| ], |
| id="ai-1", |
| ) |
| chunks = [{"messages": [ai]}] |
| agent = _make_agent_mock(chunks) |
|
|
| with ( |
| patch.object(client, "_ensure_agent"), |
| patch.object(client, "_agent", agent), |
| ): |
| events = list(client.stream("hi", thread_id="t5")) |
|
|
| msg_events = _ai_events(events) |
| assert len(msg_events) == 1 |
| assert msg_events[0].data["content"] == "result" |
|
|
|
|
| class TestChat: |
| def test_returns_last_message(self, client): |
| """chat() returns the last AI message text.""" |
| ai1 = AIMessage(content="thinking...", id="ai-1") |
| ai2 = AIMessage(content="final answer", id="ai-2") |
| chunks = [ |
| {"messages": [HumanMessage(content="q", id="h-1"), ai1]}, |
| {"messages": [HumanMessage(content="q", id="h-1"), ai1, ai2]}, |
| ] |
| agent = _make_agent_mock(chunks) |
|
|
| with ( |
| patch.object(client, "_ensure_agent"), |
| patch.object(client, "_agent", agent), |
| ): |
| result = client.chat("q", thread_id="t6") |
|
|
| assert result == "final answer" |
|
|
| def test_empty_response(self, client): |
| """chat() returns empty string if no AI message produced.""" |
| chunks = [{"messages": []}] |
| agent = _make_agent_mock(chunks) |
|
|
| with ( |
| patch.object(client, "_ensure_agent"), |
| patch.object(client, "_agent", agent), |
| ): |
| result = client.chat("q", thread_id="t7") |
|
|
| assert result == "" |
|
|
|
|
| |
| |
| |
|
|
| class TestExtractText: |
| def test_string(self): |
| assert DeerFlowClient._extract_text("hello") == "hello" |
|
|
| def test_list_text_blocks(self): |
| content = [ |
| {"type": "text", "text": "first"}, |
| {"type": "thinking", "thinking": "skip"}, |
| {"type": "text", "text": "second"}, |
| ] |
| assert DeerFlowClient._extract_text(content) == "first\nsecond" |
|
|
| def test_list_plain_strings(self): |
| assert DeerFlowClient._extract_text(["a", "b"]) == "a\nb" |
|
|
| def test_empty_list(self): |
| assert DeerFlowClient._extract_text([]) == "" |
|
|
| def test_other_type(self): |
| assert DeerFlowClient._extract_text(42) == "42" |
|
|
|
|
| |
| |
| |
|
|
| class TestEnsureAgent: |
| def test_creates_agent(self, client): |
| """_ensure_agent creates an agent on first call.""" |
| mock_agent = MagicMock() |
| config = client._get_runnable_config("t1") |
|
|
| with ( |
| patch("src.client.create_chat_model"), |
| patch("src.client.create_agent", return_value=mock_agent), |
| patch("src.client._build_middlewares", return_value=[]), |
| patch("src.client.apply_prompt_template", return_value="prompt"), |
| patch.object(client, "_get_tools", return_value=[]), |
| ): |
| client._ensure_agent(config) |
|
|
| assert client._agent is mock_agent |
|
|
| def test_reuses_agent_same_config(self, client): |
| """_ensure_agent does not recreate if config key unchanged.""" |
| mock_agent = MagicMock() |
| client._agent = mock_agent |
| client._agent_config_key = (None, True, False, False) |
|
|
| config = client._get_runnable_config("t1") |
| client._ensure_agent(config) |
|
|
| |
| assert client._agent is mock_agent |
|
|
|
|
| |
| |
| |
|
|
| class TestGetModel: |
| def test_found(self, client): |
| model_cfg = MagicMock() |
| model_cfg.name = "test-model" |
| model_cfg.display_name = "Test Model" |
| model_cfg.description = "A test model" |
| model_cfg.supports_thinking = True |
| client._app_config.get_model_config.return_value = model_cfg |
|
|
| result = client.get_model("test-model") |
| assert result == { |
| "name": "test-model", |
| "display_name": "Test Model", |
| "description": "A test model", |
| "supports_thinking": True, |
| } |
|
|
| def test_not_found(self, client): |
| client._app_config.get_model_config.return_value = None |
| assert client.get_model("nonexistent") is None |
|
|
|
|
| |
| |
| |
|
|
| class TestMcpConfig: |
| def test_get_mcp_config(self, client): |
| server = MagicMock() |
| server.model_dump.return_value = {"enabled": True, "type": "stdio"} |
| ext_config = MagicMock() |
| ext_config.mcp_servers = {"github": server} |
|
|
| with patch("src.client.get_extensions_config", return_value=ext_config): |
| result = client.get_mcp_config() |
|
|
| assert "mcp_servers" in result |
| assert "github" in result["mcp_servers"] |
| assert result["mcp_servers"]["github"]["enabled"] is True |
|
|
| def test_update_mcp_config(self, client): |
| |
| current_config = MagicMock() |
| current_config.skills = {} |
|
|
| reloaded_server = MagicMock() |
| reloaded_server.model_dump.return_value = {"enabled": True, "type": "sse"} |
| reloaded_config = MagicMock() |
| reloaded_config.mcp_servers = {"new-server": reloaded_server} |
|
|
| with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: |
| json.dump({}, f) |
| tmp_path = Path(f.name) |
|
|
| try: |
| |
| client._agent = MagicMock() |
|
|
| with ( |
| patch("src.client.ExtensionsConfig.resolve_config_path", return_value=tmp_path), |
| patch("src.client.get_extensions_config", return_value=current_config), |
| patch("src.client.reload_extensions_config", return_value=reloaded_config), |
| ): |
| result = client.update_mcp_config({"new-server": {"enabled": True, "type": "sse"}}) |
|
|
| assert "mcp_servers" in result |
| assert "new-server" in result["mcp_servers"] |
| assert client._agent is None |
|
|
| |
| with open(tmp_path) as f: |
| saved = json.load(f) |
| assert "mcpServers" in saved |
| finally: |
| tmp_path.unlink() |
|
|
|
|
| |
| |
| |
|
|
| class TestSkillsManagement: |
| def _make_skill(self, name="test-skill", enabled=True): |
| s = MagicMock() |
| s.name = name |
| s.description = "A test skill" |
| s.license = "MIT" |
| s.category = "public" |
| s.enabled = enabled |
| return s |
|
|
| def test_get_skill_found(self, client): |
| skill = self._make_skill() |
| with patch("src.skills.loader.load_skills", return_value=[skill]): |
| result = client.get_skill("test-skill") |
| assert result is not None |
| assert result["name"] == "test-skill" |
|
|
| def test_get_skill_not_found(self, client): |
| with patch("src.skills.loader.load_skills", return_value=[]): |
| result = client.get_skill("nonexistent") |
| assert result is None |
|
|
| def test_update_skill(self, client): |
| skill = self._make_skill(enabled=True) |
| updated_skill = self._make_skill(enabled=False) |
|
|
| ext_config = MagicMock() |
| ext_config.mcp_servers = {} |
| ext_config.skills = {} |
|
|
| with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: |
| json.dump({}, f) |
| tmp_path = Path(f.name) |
|
|
| try: |
| |
| client._agent = MagicMock() |
|
|
| with ( |
| patch("src.skills.loader.load_skills", side_effect=[[skill], [updated_skill]]), |
| patch("src.client.ExtensionsConfig.resolve_config_path", return_value=tmp_path), |
| patch("src.client.get_extensions_config", return_value=ext_config), |
| patch("src.client.reload_extensions_config"), |
| ): |
| result = client.update_skill("test-skill", enabled=False) |
| assert result["enabled"] is False |
| assert client._agent is None |
| finally: |
| tmp_path.unlink() |
|
|
| def test_update_skill_not_found(self, client): |
| with patch("src.skills.loader.load_skills", return_value=[]): |
| with pytest.raises(ValueError, match="not found"): |
| client.update_skill("nonexistent", enabled=True) |
|
|
| def test_install_skill(self, client): |
| with tempfile.TemporaryDirectory() as tmp: |
| tmp_path = Path(tmp) |
|
|
| |
| skill_dir = tmp_path / "my-skill" |
| skill_dir.mkdir() |
| (skill_dir / "SKILL.md").write_text("---\nname: my-skill\ndescription: A skill\n---\nContent") |
|
|
| archive_path = tmp_path / "my-skill.skill" |
| with zipfile.ZipFile(archive_path, "w") as zf: |
| zf.write(skill_dir / "SKILL.md", "my-skill/SKILL.md") |
|
|
| skills_root = tmp_path / "skills" |
| (skills_root / "custom").mkdir(parents=True) |
|
|
| with ( |
| patch("src.skills.loader.get_skills_root_path", return_value=skills_root), |
| patch("src.gateway.routers.skills._validate_skill_frontmatter", return_value=(True, "OK", "my-skill")), |
| ): |
| result = client.install_skill(archive_path) |
|
|
| assert result["success"] is True |
| assert result["skill_name"] == "my-skill" |
| assert (skills_root / "custom" / "my-skill").exists() |
|
|
| def test_install_skill_not_found(self, client): |
| with pytest.raises(FileNotFoundError): |
| client.install_skill("/nonexistent/path.skill") |
|
|
| def test_install_skill_bad_extension(self, client): |
| with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as f: |
| tmp_path = Path(f.name) |
| try: |
| with pytest.raises(ValueError, match=".skill extension"): |
| client.install_skill(tmp_path) |
| finally: |
| tmp_path.unlink() |
|
|
|
|
| |
| |
| |
|
|
| class TestMemoryManagement: |
| def test_reload_memory(self, client): |
| data = {"version": "1.0", "facts": []} |
| with patch("src.agents.memory.updater.reload_memory_data", return_value=data): |
| result = client.reload_memory() |
| assert result == data |
|
|
| def test_get_memory_config(self, client): |
| config = MagicMock() |
| config.enabled = True |
| config.storage_path = ".deer-flow/memory.json" |
| config.debounce_seconds = 30 |
| config.max_facts = 100 |
| config.fact_confidence_threshold = 0.7 |
| config.injection_enabled = True |
| config.max_injection_tokens = 2000 |
|
|
| with patch("src.config.memory_config.get_memory_config", return_value=config): |
| result = client.get_memory_config() |
|
|
| assert result["enabled"] is True |
| assert result["max_facts"] == 100 |
|
|
| def test_get_memory_status(self, client): |
| config = MagicMock() |
| config.enabled = True |
| config.storage_path = ".deer-flow/memory.json" |
| config.debounce_seconds = 30 |
| config.max_facts = 100 |
| config.fact_confidence_threshold = 0.7 |
| config.injection_enabled = True |
| config.max_injection_tokens = 2000 |
|
|
| data = {"version": "1.0", "facts": []} |
|
|
| with ( |
| patch("src.config.memory_config.get_memory_config", return_value=config), |
| patch("src.agents.memory.updater.get_memory_data", return_value=data), |
| ): |
| result = client.get_memory_status() |
|
|
| assert "config" in result |
| assert "data" in result |
|
|
|
|
| |
| |
| |
|
|
| class TestUploads: |
| def test_upload_files(self, client): |
| with tempfile.TemporaryDirectory() as tmp: |
| tmp_path = Path(tmp) |
|
|
| |
| src_file = tmp_path / "test.txt" |
| src_file.write_text("hello") |
|
|
| uploads_dir = tmp_path / "uploads" |
| uploads_dir.mkdir() |
|
|
| with patch.object(DeerFlowClient, "_get_uploads_dir", return_value=uploads_dir): |
| result = client.upload_files("thread-1", [src_file]) |
|
|
| assert result["success"] is True |
| assert len(result["files"]) == 1 |
| assert result["files"][0]["filename"] == "test.txt" |
| assert "artifact_url" in result["files"][0] |
| assert "message" in result |
| assert (uploads_dir / "test.txt").exists() |
|
|
| def test_upload_files_not_found(self, client): |
| with pytest.raises(FileNotFoundError): |
| client.upload_files("thread-1", ["/nonexistent/file.txt"]) |
|
|
| def test_list_uploads(self, client): |
| with tempfile.TemporaryDirectory() as tmp: |
| uploads_dir = Path(tmp) |
| (uploads_dir / "a.txt").write_text("a") |
| (uploads_dir / "b.txt").write_text("bb") |
|
|
| with patch.object(DeerFlowClient, "_get_uploads_dir", return_value=uploads_dir): |
| result = client.list_uploads("thread-1") |
|
|
| assert result["count"] == 2 |
| assert len(result["files"]) == 2 |
| names = {f["filename"] for f in result["files"]} |
| assert names == {"a.txt", "b.txt"} |
| |
| for f in result["files"]: |
| assert "artifact_url" in f |
|
|
| def test_delete_upload(self, client): |
| with tempfile.TemporaryDirectory() as tmp: |
| uploads_dir = Path(tmp) |
| (uploads_dir / "delete-me.txt").write_text("gone") |
|
|
| with patch.object(DeerFlowClient, "_get_uploads_dir", return_value=uploads_dir): |
| result = client.delete_upload("thread-1", "delete-me.txt") |
|
|
| assert result["success"] is True |
| assert "delete-me.txt" in result["message"] |
| assert not (uploads_dir / "delete-me.txt").exists() |
|
|
| def test_delete_upload_not_found(self, client): |
| with tempfile.TemporaryDirectory() as tmp: |
| with patch.object(DeerFlowClient, "_get_uploads_dir", return_value=Path(tmp)): |
| with pytest.raises(FileNotFoundError): |
| client.delete_upload("thread-1", "nope.txt") |
|
|
| def test_delete_upload_path_traversal(self, client): |
| with tempfile.TemporaryDirectory() as tmp: |
| uploads_dir = Path(tmp) |
| with patch.object(DeerFlowClient, "_get_uploads_dir", return_value=uploads_dir): |
| with pytest.raises(PermissionError): |
| client.delete_upload("thread-1", "../../etc/passwd") |
|
|
|
|
| |
| |
| |
|
|
| class TestArtifacts: |
| def test_get_artifact(self, client): |
| with tempfile.TemporaryDirectory() as tmp: |
| user_data_dir = Path(tmp) / "user-data" |
| outputs = user_data_dir / "outputs" |
| outputs.mkdir(parents=True) |
| (outputs / "result.txt").write_text("artifact content") |
|
|
| mock_paths = MagicMock() |
| mock_paths.sandbox_user_data_dir.return_value = user_data_dir |
|
|
| with patch("src.client.get_paths", return_value=mock_paths): |
| content, mime = client.get_artifact("t1", "mnt/user-data/outputs/result.txt") |
|
|
| assert content == b"artifact content" |
| assert "text" in mime |
|
|
| def test_get_artifact_not_found(self, client): |
| with tempfile.TemporaryDirectory() as tmp: |
| user_data_dir = Path(tmp) / "user-data" |
| user_data_dir.mkdir() |
|
|
| mock_paths = MagicMock() |
| mock_paths.sandbox_user_data_dir.return_value = user_data_dir |
|
|
| with patch("src.client.get_paths", return_value=mock_paths): |
| with pytest.raises(FileNotFoundError): |
| client.get_artifact("t1", "mnt/user-data/outputs/nope.txt") |
|
|
| def test_get_artifact_bad_prefix(self, client): |
| with pytest.raises(ValueError, match="must start with"): |
| client.get_artifact("t1", "bad/path/file.txt") |
|
|
| def test_get_artifact_path_traversal(self, client): |
| with tempfile.TemporaryDirectory() as tmp: |
| user_data_dir = Path(tmp) / "user-data" |
| user_data_dir.mkdir() |
|
|
| mock_paths = MagicMock() |
| mock_paths.sandbox_user_data_dir.return_value = user_data_dir |
|
|
| with patch("src.client.get_paths", return_value=mock_paths): |
| with pytest.raises(PermissionError): |
| client.get_artifact("t1", "mnt/user-data/../../../etc/passwd") |
|
|
|
|
| |
| |
| |
| |
| |
|
|
|
|
| class TestScenarioMultiTurnConversation: |
| """Scenario: User has a multi-turn conversation within a single thread.""" |
|
|
| def test_two_turn_conversation(self, client): |
| """Two sequential chat() calls on the same thread_id produce |
| independent results (without checkpointer, each call is stateless).""" |
| ai1 = AIMessage(content="I'm a helpful assistant.", id="ai-1") |
| ai2 = AIMessage(content="Python is great!", id="ai-2") |
|
|
| agent = MagicMock() |
| agent.stream.side_effect = [ |
| iter([{"messages": [HumanMessage(content="who are you?", id="h-1"), ai1]}]), |
| iter([{"messages": [HumanMessage(content="what language?", id="h-2"), ai2]}]), |
| ] |
|
|
| with ( |
| patch.object(client, "_ensure_agent"), |
| patch.object(client, "_agent", agent), |
| ): |
| r1 = client.chat("who are you?", thread_id="thread-multi") |
| r2 = client.chat("what language?", thread_id="thread-multi") |
|
|
| assert r1 == "I'm a helpful assistant." |
| assert r2 == "Python is great!" |
| assert agent.stream.call_count == 2 |
|
|
| def test_stream_collects_all_event_types_across_turns(self, client): |
| """A full turn emits messages-tuple (tool_call, tool_result, ai text) + values + end.""" |
| ai_tc = AIMessage(content="", id="ai-1", tool_calls=[ |
| {"name": "web_search", "args": {"query": "LangGraph"}, "id": "tc-1"}, |
| ]) |
| tool_r = ToolMessage(content="LangGraph is a framework...", id="tm-1", tool_call_id="tc-1", name="web_search") |
| ai_final = AIMessage(content="LangGraph is a framework for building agents.", id="ai-2") |
|
|
| chunks = [ |
| {"messages": [HumanMessage(content="search", id="h-1"), ai_tc]}, |
| {"messages": [HumanMessage(content="search", id="h-1"), ai_tc, tool_r]}, |
| {"messages": [HumanMessage(content="search", id="h-1"), ai_tc, tool_r, ai_final], "title": "LangGraph Search"}, |
| ] |
| agent = _make_agent_mock(chunks) |
|
|
| with ( |
| patch.object(client, "_ensure_agent"), |
| patch.object(client, "_agent", agent), |
| ): |
| events = list(client.stream("search", thread_id="t-full")) |
|
|
| |
| types = set(e.type for e in events) |
| assert types == {"messages-tuple", "values", "end"} |
| assert events[-1].type == "end" |
|
|
| |
| tc_events = _tool_call_events(events) |
| assert len(tc_events) == 1 |
| assert tc_events[0].data["tool_calls"][0]["name"] == "web_search" |
| assert tc_events[0].data["tool_calls"][0]["args"] == {"query": "LangGraph"} |
|
|
| |
| tr_events = _tool_result_events(events) |
| assert len(tr_events) == 1 |
| assert tr_events[0].data["tool_call_id"] == "tc-1" |
| assert "LangGraph" in tr_events[0].data["content"] |
|
|
| |
| msg_events = _ai_events(events) |
| assert any("framework" in e.data["content"] for e in msg_events) |
|
|
| |
| values_events = [e for e in events if e.type == "values"] |
| assert any(e.data.get("title") == "LangGraph Search" for e in values_events) |
|
|
|
|
| class TestScenarioToolChain: |
| """Scenario: Agent chains multiple tool calls in sequence.""" |
|
|
| def test_multi_tool_chain(self, client): |
| """Agent calls bash → reads output → calls write_file → responds.""" |
| ai_bash = AIMessage(content="", id="ai-1", tool_calls=[ |
| {"name": "bash", "args": {"cmd": "ls /mnt/user-data/workspace"}, "id": "tc-1"}, |
| ]) |
| bash_result = ToolMessage(content="README.md\nsrc/", id="tm-1", tool_call_id="tc-1", name="bash") |
| ai_write = AIMessage(content="", id="ai-2", tool_calls=[ |
| {"name": "write_file", "args": {"path": "/mnt/user-data/outputs/listing.txt", "content": "README.md\nsrc/"}, "id": "tc-2"}, |
| ]) |
| write_result = ToolMessage(content="File written successfully.", id="tm-2", tool_call_id="tc-2", name="write_file") |
| ai_final = AIMessage(content="I listed the workspace and saved the output.", id="ai-3") |
|
|
| chunks = [ |
| {"messages": [HumanMessage(content="list and save", id="h-1"), ai_bash]}, |
| {"messages": [HumanMessage(content="list and save", id="h-1"), ai_bash, bash_result]}, |
| {"messages": [HumanMessage(content="list and save", id="h-1"), ai_bash, bash_result, ai_write]}, |
| {"messages": [HumanMessage(content="list and save", id="h-1"), ai_bash, bash_result, ai_write, write_result]}, |
| {"messages": [HumanMessage(content="list and save", id="h-1"), ai_bash, bash_result, ai_write, write_result, ai_final]}, |
| ] |
| agent = _make_agent_mock(chunks) |
|
|
| with ( |
| patch.object(client, "_ensure_agent"), |
| patch.object(client, "_agent", agent), |
| ): |
| events = list(client.stream("list and save", thread_id="t-chain")) |
|
|
| tool_calls = _tool_call_events(events) |
| tool_results = _tool_result_events(events) |
| messages = _ai_events(events) |
|
|
| assert len(tool_calls) == 2 |
| assert tool_calls[0].data["tool_calls"][0]["name"] == "bash" |
| assert tool_calls[1].data["tool_calls"][0]["name"] == "write_file" |
| assert len(tool_results) == 2 |
| assert len(messages) == 1 |
| assert events[-1].type == "end" |
|
|
|
|
| class TestScenarioFileLifecycle: |
| """Scenario: Upload files → list them → use in chat → download artifact.""" |
|
|
| def test_upload_list_delete_lifecycle(self, client): |
| """Upload → list → verify → delete → list again.""" |
| with tempfile.TemporaryDirectory() as tmp: |
| tmp_path = Path(tmp) |
| uploads_dir = tmp_path / "uploads" |
| uploads_dir.mkdir() |
|
|
| |
| (tmp_path / "report.txt").write_text("quarterly report data") |
| (tmp_path / "data.csv").write_text("a,b,c\n1,2,3") |
|
|
| with patch.object(DeerFlowClient, "_get_uploads_dir", return_value=uploads_dir): |
| |
| result = client.upload_files("t-lifecycle", [ |
| tmp_path / "report.txt", |
| tmp_path / "data.csv", |
| ]) |
| assert result["success"] is True |
| assert len(result["files"]) == 2 |
| assert {f["filename"] for f in result["files"]} == {"report.txt", "data.csv"} |
|
|
| |
| listed = client.list_uploads("t-lifecycle") |
| assert listed["count"] == 2 |
| assert all("virtual_path" in f for f in listed["files"]) |
|
|
| |
| del_result = client.delete_upload("t-lifecycle", "report.txt") |
| assert del_result["success"] is True |
|
|
| |
| listed = client.list_uploads("t-lifecycle") |
| assert listed["count"] == 1 |
| assert listed["files"][0]["filename"] == "data.csv" |
|
|
| def test_upload_then_read_artifact(self, client): |
| """Upload a file, simulate agent producing artifact, read it back.""" |
| with tempfile.TemporaryDirectory() as tmp: |
| tmp_path = Path(tmp) |
| uploads_dir = tmp_path / "uploads" |
| uploads_dir.mkdir() |
| user_data_dir = tmp_path / "user-data" |
| outputs_dir = user_data_dir / "outputs" |
| outputs_dir.mkdir(parents=True) |
|
|
| |
| src_file = tmp_path / "input.txt" |
| src_file.write_text("raw data to process") |
|
|
| with patch.object(DeerFlowClient, "_get_uploads_dir", return_value=uploads_dir): |
| uploaded = client.upload_files("t-artifact", [src_file]) |
| assert len(uploaded["files"]) == 1 |
|
|
| |
| (outputs_dir / "analysis.json").write_text('{"result": "processed"}') |
|
|
| |
| mock_paths = MagicMock() |
| mock_paths.sandbox_user_data_dir.return_value = user_data_dir |
|
|
| with patch("src.client.get_paths", return_value=mock_paths): |
| content, mime = client.get_artifact("t-artifact", "mnt/user-data/outputs/analysis.json") |
|
|
| assert json.loads(content) == {"result": "processed"} |
| assert "json" in mime |
|
|
|
|
| class TestScenarioConfigManagement: |
| """Scenario: Query and update configuration through a management session.""" |
|
|
| def test_model_and_skill_discovery(self, client): |
| """List models → get specific model → list skills → get specific skill.""" |
| |
| result = client.list_models() |
| assert len(result["models"]) >= 1 |
| model_name = result["models"][0]["name"] |
|
|
| |
| model_cfg = MagicMock() |
| model_cfg.name = model_name |
| model_cfg.display_name = None |
| model_cfg.description = None |
| model_cfg.supports_thinking = False |
| client._app_config.get_model_config.return_value = model_cfg |
| detail = client.get_model(model_name) |
| assert detail["name"] == model_name |
|
|
| |
| skill = MagicMock() |
| skill.name = "web-search" |
| skill.description = "Search the web" |
| skill.license = "MIT" |
| skill.category = "public" |
| skill.enabled = True |
|
|
| with patch("src.skills.loader.load_skills", return_value=[skill]): |
| skills_result = client.list_skills() |
| assert len(skills_result["skills"]) == 1 |
|
|
| |
| with patch("src.skills.loader.load_skills", return_value=[skill]): |
| detail = client.get_skill("web-search") |
| assert detail is not None |
| assert detail["enabled"] is True |
|
|
| def test_mcp_update_then_skill_toggle(self, client): |
| """Update MCP config → toggle skill → verify both invalidate agent.""" |
| with tempfile.TemporaryDirectory() as tmp: |
| config_file = Path(tmp) / "extensions_config.json" |
| config_file.write_text("{}") |
|
|
| |
| current_config = MagicMock() |
| current_config.skills = {} |
|
|
| reloaded_server = MagicMock() |
| reloaded_server.model_dump.return_value = {"enabled": True, "type": "sse"} |
| reloaded_config = MagicMock() |
| reloaded_config.mcp_servers = {"my-mcp": reloaded_server} |
|
|
| client._agent = MagicMock() |
| with ( |
| patch("src.client.ExtensionsConfig.resolve_config_path", return_value=config_file), |
| patch("src.client.get_extensions_config", return_value=current_config), |
| patch("src.client.reload_extensions_config", return_value=reloaded_config), |
| ): |
| mcp_result = client.update_mcp_config({"my-mcp": {"enabled": True}}) |
| assert "my-mcp" in mcp_result["mcp_servers"] |
| assert client._agent is None |
|
|
| |
| skill = MagicMock() |
| skill.name = "code-gen" |
| skill.description = "Generate code" |
| skill.license = "MIT" |
| skill.category = "custom" |
| skill.enabled = True |
|
|
| toggled = MagicMock() |
| toggled.name = "code-gen" |
| toggled.description = "Generate code" |
| toggled.license = "MIT" |
| toggled.category = "custom" |
| toggled.enabled = False |
|
|
| ext_config = MagicMock() |
| ext_config.mcp_servers = {} |
| ext_config.skills = {} |
|
|
| client._agent = MagicMock() |
| with ( |
| patch("src.skills.loader.load_skills", side_effect=[[skill], [toggled]]), |
| patch("src.client.ExtensionsConfig.resolve_config_path", return_value=config_file), |
| patch("src.client.get_extensions_config", return_value=ext_config), |
| patch("src.client.reload_extensions_config"), |
| ): |
| skill_result = client.update_skill("code-gen", enabled=False) |
| assert skill_result["enabled"] is False |
| assert client._agent is None |
|
|
|
|
| class TestScenarioAgentRecreation: |
| """Scenario: Config changes trigger agent recreation at the right times.""" |
|
|
| def test_different_model_triggers_rebuild(self, client): |
| """Switching model_name between calls forces agent rebuild.""" |
| agents_created = [] |
|
|
| def fake_create_agent(**kwargs): |
| agent = MagicMock() |
| agents_created.append(agent) |
| return agent |
|
|
| config_a = client._get_runnable_config("t1", model_name="gpt-4") |
| config_b = client._get_runnable_config("t1", model_name="claude-3") |
|
|
| with ( |
| patch("src.client.create_chat_model"), |
| patch("src.client.create_agent", side_effect=fake_create_agent), |
| patch("src.client._build_middlewares", return_value=[]), |
| patch("src.client.apply_prompt_template", return_value="prompt"), |
| patch.object(client, "_get_tools", return_value=[]), |
| ): |
| client._ensure_agent(config_a) |
| first_agent = client._agent |
|
|
| client._ensure_agent(config_b) |
| second_agent = client._agent |
|
|
| assert len(agents_created) == 2 |
| assert first_agent is not second_agent |
|
|
| def test_same_config_reuses_agent(self, client): |
| """Repeated calls with identical config do not rebuild.""" |
| agents_created = [] |
|
|
| def fake_create_agent(**kwargs): |
| agent = MagicMock() |
| agents_created.append(agent) |
| return agent |
|
|
| config = client._get_runnable_config("t1", model_name="gpt-4") |
|
|
| with ( |
| patch("src.client.create_chat_model"), |
| patch("src.client.create_agent", side_effect=fake_create_agent), |
| patch("src.client._build_middlewares", return_value=[]), |
| patch("src.client.apply_prompt_template", return_value="prompt"), |
| patch.object(client, "_get_tools", return_value=[]), |
| ): |
| client._ensure_agent(config) |
| client._ensure_agent(config) |
| client._ensure_agent(config) |
|
|
| assert len(agents_created) == 1 |
|
|
| def test_reset_agent_forces_rebuild(self, client): |
| """reset_agent() clears cache, next call rebuilds.""" |
| agents_created = [] |
|
|
| def fake_create_agent(**kwargs): |
| agent = MagicMock() |
| agents_created.append(agent) |
| return agent |
|
|
| config = client._get_runnable_config("t1") |
|
|
| with ( |
| patch("src.client.create_chat_model"), |
| patch("src.client.create_agent", side_effect=fake_create_agent), |
| patch("src.client._build_middlewares", return_value=[]), |
| patch("src.client.apply_prompt_template", return_value="prompt"), |
| patch.object(client, "_get_tools", return_value=[]), |
| ): |
| client._ensure_agent(config) |
| client.reset_agent() |
| client._ensure_agent(config) |
|
|
| assert len(agents_created) == 2 |
|
|
| def test_per_call_override_triggers_rebuild(self, client): |
| """stream() with model_name override creates a different agent config.""" |
| ai = AIMessage(content="ok", id="ai-1") |
| agent = _make_agent_mock([{"messages": [ai]}]) |
|
|
| agents_created = [] |
|
|
| def fake_ensure(config): |
| key = tuple(config.get("configurable", {}).get(k) for k in ["model_name", "thinking_enabled", "is_plan_mode", "subagent_enabled"]) |
| agents_created.append(key) |
| client._agent = agent |
|
|
| with patch.object(client, "_ensure_agent", side_effect=fake_ensure): |
| list(client.stream("hi", thread_id="t1")) |
| list(client.stream("hi", thread_id="t1", model_name="other-model")) |
|
|
| |
| assert len(agents_created) == 2 |
| assert agents_created[0] != agents_created[1] |
|
|
|
|
| class TestScenarioThreadIsolation: |
| """Scenario: Operations on different threads don't interfere.""" |
|
|
| def test_uploads_isolated_per_thread(self, client): |
| """Files uploaded to thread-A are not visible in thread-B.""" |
| with tempfile.TemporaryDirectory() as tmp: |
| tmp_path = Path(tmp) |
| uploads_a = tmp_path / "thread-a" / "uploads" |
| uploads_b = tmp_path / "thread-b" / "uploads" |
| uploads_a.mkdir(parents=True) |
| uploads_b.mkdir(parents=True) |
|
|
| src_file = tmp_path / "secret.txt" |
| src_file.write_text("thread-a only") |
|
|
| def get_dir(thread_id): |
| return uploads_a if thread_id == "thread-a" else uploads_b |
|
|
| with patch.object(DeerFlowClient, "_get_uploads_dir", side_effect=get_dir): |
| client.upload_files("thread-a", [src_file]) |
|
|
| files_a = client.list_uploads("thread-a") |
| files_b = client.list_uploads("thread-b") |
|
|
| assert files_a["count"] == 1 |
| assert files_b["count"] == 0 |
|
|
| def test_artifacts_isolated_per_thread(self, client): |
| """Artifacts in thread-A are not accessible from thread-B.""" |
| with tempfile.TemporaryDirectory() as tmp: |
| tmp_path = Path(tmp) |
|
|
| data_a = tmp_path / "thread-a" |
| data_b = tmp_path / "thread-b" |
| (data_a / "outputs").mkdir(parents=True) |
| (data_b / "outputs").mkdir(parents=True) |
| (data_a / "outputs" / "result.txt").write_text("thread-a artifact") |
|
|
| mock_paths = MagicMock() |
| mock_paths.sandbox_user_data_dir.side_effect = lambda tid: data_a if tid == "thread-a" else data_b |
|
|
| with patch("src.client.get_paths", return_value=mock_paths): |
| content, _ = client.get_artifact("thread-a", "mnt/user-data/outputs/result.txt") |
| assert content == b"thread-a artifact" |
|
|
| with pytest.raises(FileNotFoundError): |
| client.get_artifact("thread-b", "mnt/user-data/outputs/result.txt") |
|
|
|
|
| class TestScenarioMemoryWorkflow: |
| """Scenario: Memory query → reload → status check.""" |
|
|
| def test_memory_full_lifecycle(self, client): |
| """get_memory → reload → get_status covers the full memory API.""" |
| initial_data = {"version": "1.0", "facts": [{"id": "f1", "content": "User likes Python"}]} |
| updated_data = {"version": "1.0", "facts": [ |
| {"id": "f1", "content": "User likes Python"}, |
| {"id": "f2", "content": "User prefers dark mode"}, |
| ]} |
|
|
| config = MagicMock() |
| config.enabled = True |
| config.storage_path = ".deer-flow/memory.json" |
| config.debounce_seconds = 30 |
| config.max_facts = 100 |
| config.fact_confidence_threshold = 0.7 |
| config.injection_enabled = True |
| config.max_injection_tokens = 2000 |
|
|
| with patch("src.agents.memory.updater.get_memory_data", return_value=initial_data): |
| mem = client.get_memory() |
| assert len(mem["facts"]) == 1 |
|
|
| with patch("src.agents.memory.updater.reload_memory_data", return_value=updated_data): |
| refreshed = client.reload_memory() |
| assert len(refreshed["facts"]) == 2 |
|
|
| with ( |
| patch("src.config.memory_config.get_memory_config", return_value=config), |
| patch("src.agents.memory.updater.get_memory_data", return_value=updated_data), |
| ): |
| status = client.get_memory_status() |
| assert status["config"]["enabled"] is True |
| assert len(status["data"]["facts"]) == 2 |
|
|
|
|
| class TestScenarioSkillInstallAndUse: |
| """Scenario: Install a skill → verify it appears → toggle it.""" |
|
|
| def test_install_then_toggle(self, client): |
| """Install .skill archive → list to verify → disable → verify disabled.""" |
| with tempfile.TemporaryDirectory() as tmp: |
| tmp_path = Path(tmp) |
|
|
| |
| skill_src = tmp_path / "my-analyzer" |
| skill_src.mkdir() |
| (skill_src / "SKILL.md").write_text( |
| "---\nname: my-analyzer\ndescription: Analyze code\nlicense: MIT\n---\nAnalysis skill" |
| ) |
| archive = tmp_path / "my-analyzer.skill" |
| with zipfile.ZipFile(archive, "w") as zf: |
| zf.write(skill_src / "SKILL.md", "my-analyzer/SKILL.md") |
|
|
| skills_root = tmp_path / "skills" |
| (skills_root / "custom").mkdir(parents=True) |
|
|
| |
| with ( |
| patch("src.skills.loader.get_skills_root_path", return_value=skills_root), |
| patch("src.gateway.routers.skills._validate_skill_frontmatter", return_value=(True, "OK", "my-analyzer")), |
| ): |
| result = client.install_skill(archive) |
| assert result["success"] is True |
| assert (skills_root / "custom" / "my-analyzer" / "SKILL.md").exists() |
|
|
| |
| installed_skill = MagicMock() |
| installed_skill.name = "my-analyzer" |
| installed_skill.description = "Analyze code" |
| installed_skill.license = "MIT" |
| installed_skill.category = "custom" |
| installed_skill.enabled = True |
|
|
| with patch("src.skills.loader.load_skills", return_value=[installed_skill]): |
| skills_result = client.list_skills() |
| assert any(s["name"] == "my-analyzer" for s in skills_result["skills"]) |
|
|
| |
| disabled_skill = MagicMock() |
| disabled_skill.name = "my-analyzer" |
| disabled_skill.description = "Analyze code" |
| disabled_skill.license = "MIT" |
| disabled_skill.category = "custom" |
| disabled_skill.enabled = False |
|
|
| ext_config = MagicMock() |
| ext_config.mcp_servers = {} |
| ext_config.skills = {} |
|
|
| config_file = tmp_path / "extensions_config.json" |
| config_file.write_text("{}") |
|
|
| with ( |
| patch("src.skills.loader.load_skills", side_effect=[[installed_skill], [disabled_skill]]), |
| patch("src.client.ExtensionsConfig.resolve_config_path", return_value=config_file), |
| patch("src.client.get_extensions_config", return_value=ext_config), |
| patch("src.client.reload_extensions_config"), |
| ): |
| toggled = client.update_skill("my-analyzer", enabled=False) |
| assert toggled["enabled"] is False |
|
|
|
|
| class TestScenarioEdgeCases: |
| """Scenario: Edge cases and error boundaries in realistic workflows.""" |
|
|
| def test_empty_stream_response(self, client): |
| """Agent produces no messages — only values + end events.""" |
| agent = _make_agent_mock([{"messages": []}]) |
|
|
| with ( |
| patch.object(client, "_ensure_agent"), |
| patch.object(client, "_agent", agent), |
| ): |
| events = list(client.stream("hi", thread_id="t-empty")) |
|
|
| |
| assert len(events) == 2 |
| assert events[0].type == "values" |
| assert events[-1].type == "end" |
|
|
| def test_chat_on_empty_response(self, client): |
| """chat() returns empty string for no-message response.""" |
| agent = _make_agent_mock([{"messages": []}]) |
|
|
| with ( |
| patch.object(client, "_ensure_agent"), |
| patch.object(client, "_agent", agent), |
| ): |
| result = client.chat("hi", thread_id="t-empty-chat") |
|
|
| assert result == "" |
|
|
| def test_multiple_title_changes(self, client): |
| """Title changes are carried in values events.""" |
| ai = AIMessage(content="ok", id="ai-1") |
| chunks = [ |
| {"messages": [ai], "title": "First Title"}, |
| {"messages": [], "title": "First Title"}, |
| {"messages": [], "title": "Second Title"}, |
| ] |
| agent = _make_agent_mock(chunks) |
|
|
| with ( |
| patch.object(client, "_ensure_agent"), |
| patch.object(client, "_agent", agent), |
| ): |
| events = list(client.stream("hi", thread_id="t-titles")) |
|
|
| |
| values_events = [e for e in events if e.type == "values"] |
| assert len(values_events) == 3 |
| assert values_events[0].data["title"] == "First Title" |
| assert values_events[1].data["title"] == "First Title" |
| assert values_events[2].data["title"] == "Second Title" |
|
|
| def test_concurrent_tool_calls_in_single_message(self, client): |
| """Agent produces multiple tool_calls in one AIMessage — emitted as single messages-tuple.""" |
| ai = AIMessage(content="", id="ai-1", tool_calls=[ |
| {"name": "web_search", "args": {"q": "a"}, "id": "tc-1"}, |
| {"name": "web_search", "args": {"q": "b"}, "id": "tc-2"}, |
| {"name": "bash", "args": {"cmd": "echo hi"}, "id": "tc-3"}, |
| ]) |
| chunks = [{"messages": [ai]}] |
| agent = _make_agent_mock(chunks) |
|
|
| with ( |
| patch.object(client, "_ensure_agent"), |
| patch.object(client, "_agent", agent), |
| ): |
| events = list(client.stream("do things", thread_id="t-parallel")) |
|
|
| tc_events = _tool_call_events(events) |
| assert len(tc_events) == 1 |
| tool_calls = tc_events[0].data["tool_calls"] |
| assert len(tool_calls) == 3 |
| assert {tc["id"] for tc in tool_calls} == {"tc-1", "tc-2", "tc-3"} |
|
|
| def test_upload_convertible_file_conversion_failure(self, client): |
| """Upload a .pdf file where conversion fails — file still uploaded, no markdown.""" |
| with tempfile.TemporaryDirectory() as tmp: |
| tmp_path = Path(tmp) |
| uploads_dir = tmp_path / "uploads" |
| uploads_dir.mkdir() |
|
|
| pdf_file = tmp_path / "doc.pdf" |
| pdf_file.write_bytes(b"%PDF-1.4 fake content") |
|
|
| with ( |
| patch.object(DeerFlowClient, "_get_uploads_dir", return_value=uploads_dir), |
| patch("src.gateway.routers.uploads.CONVERTIBLE_EXTENSIONS", {".pdf"}), |
| patch("src.gateway.routers.uploads.convert_file_to_markdown", side_effect=Exception("conversion failed")), |
| ): |
| result = client.upload_files("t-pdf-fail", [pdf_file]) |
|
|
| assert result["success"] is True |
| assert len(result["files"]) == 1 |
| assert result["files"][0]["filename"] == "doc.pdf" |
| assert "markdown_file" not in result["files"][0] |
| assert (uploads_dir / "doc.pdf").exists() |
|
|
|
|
| |
| |
| |
|
|
| class TestGatewayConformance: |
| """Validate that DeerFlowClient return dicts conform to Gateway Pydantic response models. |
| |
| Each test calls a client method, then parses the result through the |
| corresponding Gateway response model. If the client drifts (missing or |
| wrong-typed fields), Pydantic raises ``ValidationError`` and CI catches it. |
| """ |
|
|
| def test_list_models(self, mock_app_config): |
| model = MagicMock() |
| model.name = "test-model" |
| model.display_name = "Test Model" |
| model.description = "A test model" |
| model.supports_thinking = False |
| mock_app_config.models = [model] |
|
|
| with patch("src.client.get_app_config", return_value=mock_app_config): |
| client = DeerFlowClient() |
|
|
| result = client.list_models() |
| parsed = ModelsListResponse(**result) |
| assert len(parsed.models) == 1 |
| assert parsed.models[0].name == "test-model" |
|
|
| def test_get_model(self, mock_app_config): |
| model = MagicMock() |
| model.name = "test-model" |
| model.display_name = "Test Model" |
| model.description = "A test model" |
| model.supports_thinking = True |
| mock_app_config.models = [model] |
| mock_app_config.get_model_config.return_value = model |
|
|
| with patch("src.client.get_app_config", return_value=mock_app_config): |
| client = DeerFlowClient() |
|
|
| result = client.get_model("test-model") |
| assert result is not None |
| parsed = ModelResponse(**result) |
| assert parsed.name == "test-model" |
|
|
| def test_list_skills(self, client): |
| skill = MagicMock() |
| skill.name = "web-search" |
| skill.description = "Search the web" |
| skill.license = "MIT" |
| skill.category = "public" |
| skill.enabled = True |
|
|
| with patch("src.skills.loader.load_skills", return_value=[skill]): |
| result = client.list_skills() |
|
|
| parsed = SkillsListResponse(**result) |
| assert len(parsed.skills) == 1 |
| assert parsed.skills[0].name == "web-search" |
|
|
| def test_get_skill(self, client): |
| skill = MagicMock() |
| skill.name = "web-search" |
| skill.description = "Search the web" |
| skill.license = "MIT" |
| skill.category = "public" |
| skill.enabled = True |
|
|
| with patch("src.skills.loader.load_skills", return_value=[skill]): |
| result = client.get_skill("web-search") |
|
|
| assert result is not None |
| parsed = SkillResponse(**result) |
| assert parsed.name == "web-search" |
|
|
| def test_install_skill(self, client, tmp_path): |
| skill_dir = tmp_path / "my-skill" |
| skill_dir.mkdir() |
| (skill_dir / "SKILL.md").write_text( |
| "---\nname: my-skill\ndescription: A test skill\n---\nBody\n" |
| ) |
|
|
| archive = tmp_path / "my-skill.skill" |
| with zipfile.ZipFile(archive, "w") as zf: |
| zf.write(skill_dir / "SKILL.md", "my-skill/SKILL.md") |
|
|
| custom_dir = tmp_path / "custom" |
| custom_dir.mkdir() |
| with patch("src.skills.loader.get_skills_root_path", return_value=tmp_path): |
| result = client.install_skill(archive) |
|
|
| parsed = SkillInstallResponse(**result) |
| assert parsed.success is True |
| assert parsed.skill_name == "my-skill" |
|
|
| def test_get_mcp_config(self, client): |
| server = MagicMock() |
| server.model_dump.return_value = { |
| "enabled": True, |
| "type": "stdio", |
| "command": "npx", |
| "args": ["-y", "server"], |
| "env": {}, |
| "url": None, |
| "headers": {}, |
| "description": "test server", |
| } |
| ext_config = MagicMock() |
| ext_config.mcp_servers = {"test": server} |
|
|
| with patch("src.client.get_extensions_config", return_value=ext_config): |
| result = client.get_mcp_config() |
|
|
| parsed = McpConfigResponse(**result) |
| assert "test" in parsed.mcp_servers |
|
|
| def test_update_mcp_config(self, client, tmp_path): |
| server = MagicMock() |
| server.model_dump.return_value = { |
| "enabled": True, |
| "type": "stdio", |
| "command": "npx", |
| "args": [], |
| "env": {}, |
| "url": None, |
| "headers": {}, |
| "description": "", |
| } |
| ext_config = MagicMock() |
| ext_config.mcp_servers = {"srv": server} |
| ext_config.skills = {} |
|
|
| config_file = tmp_path / "extensions_config.json" |
| config_file.write_text("{}") |
|
|
| with ( |
| patch("src.client.get_extensions_config", return_value=ext_config), |
| patch("src.client.ExtensionsConfig.resolve_config_path", return_value=config_file), |
| patch("src.client.reload_extensions_config", return_value=ext_config), |
| ): |
| result = client.update_mcp_config({"srv": server.model_dump.return_value}) |
|
|
| parsed = McpConfigResponse(**result) |
| assert "srv" in parsed.mcp_servers |
|
|
| def test_upload_files(self, client, tmp_path): |
| uploads_dir = tmp_path / "uploads" |
| uploads_dir.mkdir() |
|
|
| src_file = tmp_path / "hello.txt" |
| src_file.write_text("hello") |
|
|
| with patch.object(DeerFlowClient, "_get_uploads_dir", return_value=uploads_dir): |
| result = client.upload_files("t-conform", [src_file]) |
|
|
| parsed = UploadResponse(**result) |
| assert parsed.success is True |
| assert len(parsed.files) == 1 |
|
|
| def test_get_memory_config(self, client): |
| mem_cfg = MagicMock() |
| mem_cfg.enabled = True |
| mem_cfg.storage_path = ".deer-flow/memory.json" |
| mem_cfg.debounce_seconds = 30 |
| mem_cfg.max_facts = 100 |
| mem_cfg.fact_confidence_threshold = 0.7 |
| mem_cfg.injection_enabled = True |
| mem_cfg.max_injection_tokens = 2000 |
|
|
| with patch("src.config.memory_config.get_memory_config", return_value=mem_cfg): |
| result = client.get_memory_config() |
|
|
| parsed = MemoryConfigResponse(**result) |
| assert parsed.enabled is True |
| assert parsed.max_facts == 100 |
|
|
| def test_get_memory_status(self, client): |
| mem_cfg = MagicMock() |
| mem_cfg.enabled = True |
| mem_cfg.storage_path = ".deer-flow/memory.json" |
| mem_cfg.debounce_seconds = 30 |
| mem_cfg.max_facts = 100 |
| mem_cfg.fact_confidence_threshold = 0.7 |
| mem_cfg.injection_enabled = True |
| mem_cfg.max_injection_tokens = 2000 |
|
|
| memory_data = { |
| "version": "1.0", |
| "lastUpdated": "", |
| "user": { |
| "workContext": {"summary": "", "updatedAt": ""}, |
| "personalContext": {"summary": "", "updatedAt": ""}, |
| "topOfMind": {"summary": "", "updatedAt": ""}, |
| }, |
| "history": { |
| "recentMonths": {"summary": "", "updatedAt": ""}, |
| "earlierContext": {"summary": "", "updatedAt": ""}, |
| "longTermBackground": {"summary": "", "updatedAt": ""}, |
| }, |
| "facts": [], |
| } |
|
|
| with ( |
| patch("src.config.memory_config.get_memory_config", return_value=mem_cfg), |
| patch("src.agents.memory.updater.get_memory_data", return_value=memory_data), |
| ): |
| result = client.get_memory_status() |
|
|
| parsed = MemoryStatusResponse(**result) |
| assert parsed.config.enabled is True |
| assert parsed.data.version == "1.0" |
|
|