File size: 13,422 Bytes
033ca06
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
"""Live integration tests for DeerFlowClient with real API.

These tests require a working config.yaml with valid API credentials.
They are skipped in CI and must be run explicitly:

    PYTHONPATH=. uv run pytest tests/test_client_live.py -v -s
"""

import json
import os
from pathlib import Path

import pytest

from src.client import DeerFlowClient, StreamEvent

# Skip entire module in CI or when no config.yaml exists
_skip_reason = None
if os.environ.get("CI"):
    _skip_reason = "Live tests skipped in CI"
elif not Path(__file__).resolve().parents[2].joinpath("config.yaml").exists():
    _skip_reason = "No config.yaml found — live tests require valid API credentials"

if _skip_reason:
    pytest.skip(_skip_reason, allow_module_level=True)

# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------

@pytest.fixture(scope="module")
def client():
    """Create a real DeerFlowClient (no mocks)."""
    return DeerFlowClient(thinking_enabled=False)


@pytest.fixture
def thread_tmp(tmp_path):
    """Provide a unique thread_id + tmp directory for file operations."""
    import uuid
    tid = f"live-test-{uuid.uuid4().hex[:8]}"
    return tid, tmp_path


# ===========================================================================
# Scenario 1: Basic chat — model responds coherently
# ===========================================================================

class TestLiveBasicChat:
    def test_chat_returns_nonempty_string(self, client):
        """chat() returns a non-empty response from the real model."""
        response = client.chat("Reply with exactly: HELLO")
        assert isinstance(response, str)
        assert len(response) > 0
        print(f"  chat response: {response}")

    def test_chat_follows_instruction(self, client):
        """Model can follow a simple instruction."""
        response = client.chat("What is 7 * 8? Reply with just the number.")
        assert "56" in response
        print(f"  math response: {response}")


# ===========================================================================
# Scenario 2: Streaming — events arrive in correct order
# ===========================================================================

class TestLiveStreaming:
    def test_stream_yields_messages_tuple_and_end(self, client):
        """stream() produces at least one messages-tuple event and ends with end."""
        events = list(client.stream("Say hi in one word."))

        types = [e.type for e in events]
        assert "messages-tuple" in types, f"Expected 'messages-tuple' event, got: {types}"
        assert "values" in types, f"Expected 'values' event, got: {types}"
        assert types[-1] == "end"

        for e in events:
            assert isinstance(e, StreamEvent)
            print(f"  [{e.type}] {e.data}")

    def test_stream_ai_content_nonempty(self, client):
        """Streamed messages-tuple AI events contain non-empty content."""
        ai_messages = [
            e for e in client.stream("What color is the sky? One word.")
            if e.type == "messages-tuple" and e.data.get("type") == "ai" and e.data.get("content")
        ]
        assert len(ai_messages) >= 1
        for m in ai_messages:
            assert len(m.data.get("content", "")) > 0


# ===========================================================================
# Scenario 3: Tool use — agent calls a tool and returns result
# ===========================================================================

class TestLiveToolUse:
    def test_agent_uses_bash_tool(self, client):
        """Agent uses bash tool when asked to run a command."""
        events = list(client.stream(
            "Use the bash tool to run: echo 'LIVE_TEST_OK'. "
            "Then tell me the output."
        ))

        types = [e.type for e in events]
        print(f"  event types: {types}")
        for e in events:
            print(f"  [{e.type}] {e.data}")

        # All message events are now messages-tuple
        mt_events = [e for e in events if e.type == "messages-tuple"]
        tc_events = [e for e in mt_events if e.data.get("type") == "ai" and "tool_calls" in e.data]
        tr_events = [e for e in mt_events if e.data.get("type") == "tool"]
        ai_events = [e for e in mt_events if e.data.get("type") == "ai" and e.data.get("content")]

        assert len(tc_events) >= 1, f"Expected tool_call event, got types: {types}"
        assert len(tr_events) >= 1, f"Expected tool result event, got types: {types}"
        assert len(ai_events) >= 1

        assert tc_events[0].data["tool_calls"][0]["name"] == "bash"
        assert "LIVE_TEST_OK" in tr_events[0].data["content"]

    def test_agent_uses_ls_tool(self, client):
        """Agent uses ls tool to list a directory."""
        events = list(client.stream(
            "Use the ls tool to list the contents of /mnt/user-data/workspace. "
            "Just report what you see."
        ))

        types = [e.type for e in events]
        print(f"  event types: {types}")

        tc_events = [e for e in events if e.type == "messages-tuple" and e.data.get("type") == "ai" and "tool_calls" in e.data]
        assert len(tc_events) >= 1
        assert tc_events[0].data["tool_calls"][0]["name"] == "ls"


# ===========================================================================
# Scenario 4: Multi-tool chain — agent chains tools in sequence
# ===========================================================================

class TestLiveMultiToolChain:
    def test_write_then_read(self, client):
        """Agent writes a file, then reads it back."""
        events = list(client.stream(
            "Step 1: Use write_file to write 'integration_test_content' to "
            "/mnt/user-data/outputs/live_test.txt. "
            "Step 2: Use read_file to read that file back. "
            "Step 3: Tell me the content you read."
        ))

        types = [e.type for e in events]
        print(f"  event types: {types}")
        for e in events:
            print(f"  [{e.type}] {e.data}")

        tc_events = [e for e in events if e.type == "messages-tuple" and e.data.get("type") == "ai" and "tool_calls" in e.data]
        tool_names = [tc.data["tool_calls"][0]["name"] for tc in tc_events]

        assert "write_file" in tool_names, f"Expected write_file, got: {tool_names}"
        assert "read_file" in tool_names, f"Expected read_file, got: {tool_names}"

        # Final AI message or tool result should mention the content
        ai_events = [e for e in events if e.type == "messages-tuple" and e.data.get("type") == "ai" and e.data.get("content")]
        tr_events = [e for e in events if e.type == "messages-tuple" and e.data.get("type") == "tool"]
        final_text = ai_events[-1].data["content"] if ai_events else ""
        assert "integration_test_content" in final_text.lower() or any(
            "integration_test_content" in e.data.get("content", "")
            for e in tr_events
        )


# ===========================================================================
# Scenario 5: File upload lifecycle with real filesystem
# ===========================================================================

class TestLiveFileUpload:
    def test_upload_list_delete(self, client, thread_tmp):
        """Upload → list → delete → verify deletion."""
        thread_id, tmp_path = thread_tmp

        # Create test files
        f1 = tmp_path / "test_upload_a.txt"
        f1.write_text("content A")
        f2 = tmp_path / "test_upload_b.txt"
        f2.write_text("content B")

        # Upload
        result = client.upload_files(thread_id, [f1, f2])
        assert result["success"] is True
        assert len(result["files"]) == 2
        filenames = {r["filename"] for r in result["files"]}
        assert filenames == {"test_upload_a.txt", "test_upload_b.txt"}
        for r in result["files"]:
            assert int(r["size"]) > 0
            assert r["virtual_path"].startswith("/mnt/user-data/uploads/")
            assert "artifact_url" in r
        print(f"  uploaded: {filenames}")

        # List
        listed = client.list_uploads(thread_id)
        assert listed["count"] == 2
        print(f"  listed: {[f['filename'] for f in listed['files']]}")

        # Delete one
        del_result = client.delete_upload(thread_id, "test_upload_a.txt")
        assert del_result["success"] is True
        remaining = client.list_uploads(thread_id)
        assert remaining["count"] == 1
        assert remaining["files"][0]["filename"] == "test_upload_b.txt"
        print(f"  after delete: {[f['filename'] for f in remaining['files']]}")

        # Delete the other
        client.delete_upload(thread_id, "test_upload_b.txt")
        empty = client.list_uploads(thread_id)
        assert empty["count"] == 0
        assert empty["files"] == []

    def test_upload_nonexistent_file_raises(self, client):
        with pytest.raises(FileNotFoundError):
            client.upload_files("t-fail", ["/nonexistent/path/file.txt"])


# ===========================================================================
# Scenario 6: Configuration query — real config loading
# ===========================================================================

class TestLiveConfigQueries:
    def test_list_models_returns_ark(self, client):
        """list_models() returns the configured ARK model."""
        result = client.list_models()
        assert "models" in result
        assert len(result["models"]) >= 1
        names = [m["name"] for m in result["models"]]
        assert "ark-model" in names
        # Verify Gateway-aligned fields
        for m in result["models"]:
            assert "display_name" in m
            assert "supports_thinking" in m
        print(f"  models: {names}")

    def test_get_model_found(self, client):
        """get_model() returns details for existing model."""
        model = client.get_model("ark-model")
        assert model is not None
        assert model["name"] == "ark-model"
        assert "display_name" in model
        assert "supports_thinking" in model
        print(f"  model detail: {model}")

    def test_get_model_not_found(self, client):
        assert client.get_model("nonexistent-model-xyz") is None

    def test_list_skills(self, client):
        """list_skills() runs without error."""
        result = client.list_skills()
        assert "skills" in result
        assert isinstance(result["skills"], list)
        print(f"  skills count: {len(result['skills'])}")
        for s in result["skills"][:3]:
            print(f"    - {s['name']}: {s['enabled']}")


# ===========================================================================
# Scenario 7: Artifact read after agent writes
# ===========================================================================

class TestLiveArtifact:
    def test_get_artifact_after_write(self, client):
        """Agent writes a file → client reads it back via get_artifact()."""
        import uuid
        thread_id = f"live-artifact-{uuid.uuid4().hex[:8]}"

        # Ask agent to write a file
        events = list(client.stream(
            "Use write_file to create /mnt/user-data/outputs/artifact_test.json "
            "with content: {\"status\": \"ok\", \"source\": \"live_test\"}",
            thread_id=thread_id,
        ))

        # Verify write happened
        tc_events = [e for e in events if e.type == "messages-tuple" and e.data.get("type") == "ai" and "tool_calls" in e.data]
        assert any(
            any(tc["name"] == "write_file" for tc in e.data["tool_calls"])
            for e in tc_events
        )

        # Read artifact
        content, mime = client.get_artifact(thread_id, "mnt/user-data/outputs/artifact_test.json")
        data = json.loads(content)
        assert data["status"] == "ok"
        assert data["source"] == "live_test"
        assert "json" in mime
        print(f"  artifact: {data}, mime: {mime}")

    def test_get_artifact_not_found(self, client):
        with pytest.raises(FileNotFoundError):
            client.get_artifact("nonexistent-thread", "mnt/user-data/outputs/nope.txt")


# ===========================================================================
# Scenario 8: Per-call overrides
# ===========================================================================

class TestLiveOverrides:
    def test_thinking_disabled_still_works(self, client):
        """Explicit thinking_enabled=False override produces a response."""
        response = client.chat(
            "Say OK.", thinking_enabled=False,
        )
        assert len(response) > 0
        print(f"  response: {response}")


# ===========================================================================
# Scenario 9: Error resilience
# ===========================================================================

class TestLiveErrorResilience:
    def test_delete_nonexistent_upload(self, client):
        with pytest.raises(FileNotFoundError):
            client.delete_upload("nonexistent-thread", "ghost.txt")

    def test_bad_artifact_path(self, client):
        with pytest.raises(ValueError):
            client.get_artifact("t", "invalid/path")

    def test_path_traversal_blocked(self, client):
        with pytest.raises(PermissionError):
            client.delete_upload("t", "../../etc/passwd")