File size: 19,045 Bytes
9aa5185
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
#!/usr/bin/env python3
"""
Tests for the read-loop detection mechanism in file_tools.

Verifies that:
1. Only *consecutive* identical reads trigger warnings/blocks
2. Any other tool call in between resets the consecutive counter
3. Warn on 3rd consecutive, block on 4th+
4. Different regions/files/tasks don't trigger false warnings
5. get_read_files_summary returns accurate history (unaffected by search keys)
6. clear_read_tracker resets state
7. notify_other_tool_call resets consecutive counters
8. Context compression injects file-read history

Run with:  python -m pytest tests/tools/test_read_loop_detection.py -v
"""

import json
import unittest
from unittest.mock import patch, MagicMock

from tools.file_tools import (
    read_file_tool,
    search_tool,
    get_read_files_summary,
    clear_read_tracker,
    notify_other_tool_call,
    _read_tracker,
)


class _FakeReadResult:
    """Minimal stand-in for FileOperations.read_file return value."""
    def __init__(self, content="line1\nline2\n", total_lines=2):
        self.content = content
        self._total_lines = total_lines

    def to_dict(self):
        return {"content": self.content, "total_lines": self._total_lines}


def _fake_read_file(path, offset=1, limit=500):
    return _FakeReadResult(content=f"content of {path}", total_lines=10)


class _FakeSearchResult:
    """Minimal stand-in for FileOperations.search return value."""
    def __init__(self):
        self.matches = []

    def to_dict(self):
        return {"matches": [{"file": "test.py", "line": 1, "text": "match"}]}


def _make_fake_file_ops():
    fake = MagicMock()
    fake.read_file = _fake_read_file
    fake.search = lambda **kw: _FakeSearchResult()
    return fake


class TestReadLoopDetection(unittest.TestCase):
    """Verify that read_file_tool detects and warns on consecutive re-reads."""

    def setUp(self):
        clear_read_tracker()

    def tearDown(self):
        clear_read_tracker()

    @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
    def test_first_read_has_no_warning(self, _mock_ops):
        result = json.loads(read_file_tool("/tmp/test.py", task_id="t1"))
        self.assertNotIn("_warning", result)
        self.assertIn("content", result)

    @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
    def test_second_consecutive_read_no_warning(self, _mock_ops):
        """2nd consecutive read should NOT warn (threshold is 3)."""
        read_file_tool("/tmp/test.py", offset=1, limit=500, task_id="t1")
        result = json.loads(
            read_file_tool("/tmp/test.py", offset=1, limit=500, task_id="t1")
        )
        self.assertNotIn("_warning", result)
        self.assertIn("content", result)

    @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
    def test_third_consecutive_read_has_warning(self, _mock_ops):
        """3rd consecutive read of the same region triggers a warning."""
        for _ in range(2):
            read_file_tool("/tmp/test.py", task_id="t1")
        result = json.loads(read_file_tool("/tmp/test.py", task_id="t1"))
        self.assertIn("_warning", result)
        self.assertIn("3 times", result["_warning"])
        # Warning still returns content
        self.assertIn("content", result)

    @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
    def test_fourth_consecutive_read_is_blocked(self, _mock_ops):
        """4th consecutive read of the same region is BLOCKED — no content."""
        for _ in range(3):
            read_file_tool("/tmp/test.py", task_id="t1")
        result = json.loads(read_file_tool("/tmp/test.py", task_id="t1"))
        self.assertIn("error", result)
        self.assertIn("BLOCKED", result["error"])
        self.assertIn("4 times", result["error"])
        self.assertNotIn("content", result)

    @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
    def test_fifth_consecutive_read_still_blocked(self, _mock_ops):
        """Subsequent reads remain blocked with incrementing count."""
        for _ in range(4):
            read_file_tool("/tmp/test.py", task_id="t1")
        result = json.loads(read_file_tool("/tmp/test.py", task_id="t1"))
        self.assertIn("BLOCKED", result["error"])
        self.assertIn("5 times", result["error"])

    @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
    def test_different_region_resets_consecutive(self, _mock_ops):
        """Reading a different region of the same file resets consecutive count."""
        read_file_tool("/tmp/test.py", offset=1, limit=500, task_id="t1")
        read_file_tool("/tmp/test.py", offset=1, limit=500, task_id="t1")
        # Now read a different region — this resets the consecutive counter
        result = json.loads(
            read_file_tool("/tmp/test.py", offset=501, limit=500, task_id="t1")
        )
        self.assertNotIn("_warning", result)

    @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
    def test_different_file_resets_consecutive(self, _mock_ops):
        """Reading a different file resets the consecutive counter."""
        read_file_tool("/tmp/a.py", task_id="t1")
        read_file_tool("/tmp/a.py", task_id="t1")
        result = json.loads(read_file_tool("/tmp/b.py", task_id="t1"))
        self.assertNotIn("_warning", result)

    @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
    def test_different_tasks_isolated(self, _mock_ops):
        """Different task_ids have separate consecutive counters."""
        read_file_tool("/tmp/test.py", task_id="task_a")
        result = json.loads(
            read_file_tool("/tmp/test.py", task_id="task_b")
        )
        self.assertNotIn("_warning", result)

    @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
    def test_warning_still_returns_content(self, _mock_ops):
        """Even with a warning (3rd read), the file content is still returned."""
        for _ in range(2):
            read_file_tool("/tmp/test.py", task_id="t1")
        result = json.loads(read_file_tool("/tmp/test.py", task_id="t1"))
        self.assertIn("_warning", result)
        self.assertIn("content", result)
        self.assertIn("content of /tmp/test.py", result["content"])


class TestNotifyOtherToolCall(unittest.TestCase):
    """Verify that notify_other_tool_call resets the consecutive counter."""

    def setUp(self):
        clear_read_tracker()

    def tearDown(self):
        clear_read_tracker()

    @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
    def test_other_tool_resets_consecutive(self, _mock_ops):
        """After another tool runs, re-reading the same file is NOT consecutive."""
        read_file_tool("/tmp/test.py", task_id="t1")
        read_file_tool("/tmp/test.py", task_id="t1")
        # Simulate a different tool being called
        notify_other_tool_call("t1")
        # This should be treated as a fresh read (consecutive reset)
        result = json.loads(read_file_tool("/tmp/test.py", task_id="t1"))
        self.assertNotIn("_warning", result)
        self.assertIn("content", result)

    @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
    def test_other_tool_prevents_block(self, _mock_ops):
        """Agent can keep reading if other tools are used in between."""
        for i in range(10):
            read_file_tool("/tmp/test.py", task_id="t1")
            notify_other_tool_call("t1")
        # After 10 reads interleaved with other tools, still no warning
        result = json.loads(read_file_tool("/tmp/test.py", task_id="t1"))
        self.assertNotIn("_warning", result)
        self.assertNotIn("error", result)
        self.assertIn("content", result)

    @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
    def test_notify_on_unknown_task_is_safe(self, _mock_ops):
        """notify_other_tool_call on a task that hasn't read anything is a no-op."""
        notify_other_tool_call("nonexistent_task")  # Should not raise

    @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
    def test_history_survives_notify(self, _mock_ops):
        """notify_other_tool_call resets consecutive but preserves read_history."""
        read_file_tool("/tmp/test.py", offset=1, limit=100, task_id="t1")
        notify_other_tool_call("t1")
        summary = get_read_files_summary("t1")
        self.assertEqual(len(summary), 1)
        self.assertEqual(summary[0]["path"], "/tmp/test.py")


class TestReadFilesSummary(unittest.TestCase):
    """Verify get_read_files_summary returns accurate file-read history."""

    def setUp(self):
        clear_read_tracker()

    def tearDown(self):
        clear_read_tracker()

    @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
    def test_empty_when_no_reads(self, _mock_ops):
        summary = get_read_files_summary("t1")
        self.assertEqual(summary, [])

    @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
    def test_single_file_single_region(self, _mock_ops):
        read_file_tool("/tmp/test.py", offset=1, limit=500, task_id="t1")
        summary = get_read_files_summary("t1")
        self.assertEqual(len(summary), 1)
        self.assertEqual(summary[0]["path"], "/tmp/test.py")
        self.assertIn("lines 1-500", summary[0]["regions"])

    @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
    def test_single_file_multiple_regions(self, _mock_ops):
        read_file_tool("/tmp/test.py", offset=1, limit=500, task_id="t1")
        read_file_tool("/tmp/test.py", offset=501, limit=500, task_id="t1")
        summary = get_read_files_summary("t1")
        self.assertEqual(len(summary), 1)
        self.assertEqual(len(summary[0]["regions"]), 2)

    @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
    def test_multiple_files(self, _mock_ops):
        read_file_tool("/tmp/a.py", task_id="t1")
        read_file_tool("/tmp/b.py", task_id="t1")
        summary = get_read_files_summary("t1")
        self.assertEqual(len(summary), 2)
        paths = [s["path"] for s in summary]
        self.assertIn("/tmp/a.py", paths)
        self.assertIn("/tmp/b.py", paths)

    @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
    def test_different_task_has_separate_summary(self, _mock_ops):
        read_file_tool("/tmp/a.py", task_id="task_a")
        read_file_tool("/tmp/b.py", task_id="task_b")
        summary_a = get_read_files_summary("task_a")
        summary_b = get_read_files_summary("task_b")
        self.assertEqual(len(summary_a), 1)
        self.assertEqual(summary_a[0]["path"], "/tmp/a.py")
        self.assertEqual(len(summary_b), 1)
        self.assertEqual(summary_b[0]["path"], "/tmp/b.py")

    @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
    def test_summary_unaffected_by_searches(self, _mock_ops):
        """Searches should NOT appear in the file-read summary."""
        read_file_tool("/tmp/test.py", task_id="t1")
        search_tool("def main", task_id="t1")
        summary = get_read_files_summary("t1")
        self.assertEqual(len(summary), 1)
        self.assertEqual(summary[0]["path"], "/tmp/test.py")


class TestClearReadTracker(unittest.TestCase):
    """Verify clear_read_tracker resets state properly."""

    def setUp(self):
        clear_read_tracker()

    def tearDown(self):
        clear_read_tracker()

    @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
    def test_clear_specific_task(self, _mock_ops):
        read_file_tool("/tmp/test.py", task_id="t1")
        read_file_tool("/tmp/test.py", task_id="t2")
        clear_read_tracker("t1")
        self.assertEqual(get_read_files_summary("t1"), [])
        self.assertEqual(len(get_read_files_summary("t2")), 1)

    @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
    def test_clear_all(self, _mock_ops):
        read_file_tool("/tmp/test.py", task_id="t1")
        read_file_tool("/tmp/test.py", task_id="t2")
        clear_read_tracker()
        self.assertEqual(get_read_files_summary("t1"), [])
        self.assertEqual(get_read_files_summary("t2"), [])

    @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
    def test_clear_then_reread_no_warning(self, _mock_ops):
        for _ in range(3):
            read_file_tool("/tmp/test.py", task_id="t1")
        clear_read_tracker("t1")
        result = json.loads(read_file_tool("/tmp/test.py", task_id="t1"))
        self.assertNotIn("_warning", result)
        self.assertNotIn("error", result)


class TestSearchLoopDetection(unittest.TestCase):
    """Verify that search_tool detects and blocks consecutive repeated searches."""

    def setUp(self):
        clear_read_tracker()

    def tearDown(self):
        clear_read_tracker()

    @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
    def test_first_search_no_warning(self, _mock_ops):
        result = json.loads(search_tool("def main", task_id="t1"))
        self.assertNotIn("_warning", result)
        self.assertNotIn("error", result)

    @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
    def test_second_consecutive_search_no_warning(self, _mock_ops):
        """2nd consecutive search should NOT warn (threshold is 3)."""
        search_tool("def main", task_id="t1")
        result = json.loads(search_tool("def main", task_id="t1"))
        self.assertNotIn("_warning", result)
        self.assertNotIn("error", result)

    @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
    def test_third_consecutive_search_has_warning(self, _mock_ops):
        """3rd consecutive identical search triggers a warning."""
        for _ in range(2):
            search_tool("def main", task_id="t1")
        result = json.loads(search_tool("def main", task_id="t1"))
        self.assertIn("_warning", result)
        self.assertIn("3 times", result["_warning"])
        # Warning still returns results
        self.assertIn("matches", result)

    @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
    def test_fourth_consecutive_search_is_blocked(self, _mock_ops):
        """4th consecutive identical search is BLOCKED."""
        for _ in range(3):
            search_tool("def main", task_id="t1")
        result = json.loads(search_tool("def main", task_id="t1"))
        self.assertIn("error", result)
        self.assertIn("BLOCKED", result["error"])
        self.assertNotIn("matches", result)

    @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
    def test_different_pattern_resets_consecutive(self, _mock_ops):
        """A different search pattern resets the consecutive counter."""
        search_tool("def main", task_id="t1")
        search_tool("def main", task_id="t1")
        result = json.loads(search_tool("class Foo", task_id="t1"))
        self.assertNotIn("_warning", result)
        self.assertNotIn("error", result)

    @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
    def test_different_task_isolated(self, _mock_ops):
        """Different tasks have separate consecutive counters."""
        search_tool("def main", task_id="t1")
        result = json.loads(search_tool("def main", task_id="t2"))
        self.assertNotIn("_warning", result)

    @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
    def test_other_tool_resets_search_consecutive(self, _mock_ops):
        """notify_other_tool_call resets search consecutive counter too."""
        search_tool("def main", task_id="t1")
        search_tool("def main", task_id="t1")
        notify_other_tool_call("t1")
        result = json.loads(search_tool("def main", task_id="t1"))
        self.assertNotIn("_warning", result)
        self.assertNotIn("error", result)

    @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
    def test_pagination_offset_does_not_count_as_repeat(self, _mock_ops):
        """Paginating truncated results should not be blocked as a repeat search."""
        for offset in (0, 50, 100, 150):
            result = json.loads(search_tool("def main", task_id="t1", offset=offset, limit=50))
            self.assertNotIn("_warning", result)
            self.assertNotIn("error", result)

    @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
    def test_read_between_searches_resets_consecutive(self, _mock_ops):
        """A read_file call between searches resets search consecutive counter."""
        search_tool("def main", task_id="t1")
        search_tool("def main", task_id="t1")
        # A read changes the last_key, resetting consecutive for the search
        read_file_tool("/tmp/test.py", task_id="t1")
        result = json.loads(search_tool("def main", task_id="t1"))
        self.assertNotIn("_warning", result)
        self.assertNotIn("error", result)


class TestTodoInjectionFiltering(unittest.TestCase):
    """Verify that format_for_injection filters completed/cancelled todos."""

    def test_filters_completed_and_cancelled(self):
        from tools.todo_tool import TodoStore
        store = TodoStore()
        store.write([
            {"id": "1", "content": "Read codebase", "status": "completed"},
            {"id": "2", "content": "Write fix", "status": "in_progress"},
            {"id": "3", "content": "Run tests", "status": "pending"},
            {"id": "4", "content": "Abandoned", "status": "cancelled"},
        ])
        injection = store.format_for_injection()
        self.assertNotIn("Read codebase", injection)
        self.assertNotIn("Abandoned", injection)
        self.assertIn("Write fix", injection)
        self.assertIn("Run tests", injection)

    def test_all_completed_returns_none(self):
        from tools.todo_tool import TodoStore
        store = TodoStore()
        store.write([
            {"id": "1", "content": "Done", "status": "completed"},
            {"id": "2", "content": "Also done", "status": "cancelled"},
        ])
        self.assertIsNone(store.format_for_injection())

    def test_empty_store_returns_none(self):
        from tools.todo_tool import TodoStore
        store = TodoStore()
        self.assertIsNone(store.format_for_injection())

    def test_all_active_included(self):
        from tools.todo_tool import TodoStore
        store = TodoStore()
        store.write([
            {"id": "1", "content": "Task A", "status": "pending"},
            {"id": "2", "content": "Task B", "status": "in_progress"},
        ])
        injection = store.format_for_injection()
        self.assertIn("Task A", injection)
        self.assertIn("Task B", injection)


if __name__ == "__main__":
    unittest.main()