File size: 6,699 Bytes
9aa5185
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
"""End-to-end test simulating CLI interrupt during subagent execution.

Reproduces the exact scenario:
1. Parent agent calls delegate_task
2. Child agent is running (simulated with a slow tool)
3. User "types a message" (simulated by calling parent.interrupt from another thread)
4. Child should detect the interrupt and stop

This tests the COMPLETE path including _run_single_child, _active_children
registration, interrupt propagation, and child detection.
"""

import json
import os
import queue
import threading
import time
import unittest
from unittest.mock import MagicMock, patch, PropertyMock

from tools.interrupt import set_interrupt, is_interrupted


class TestCLISubagentInterrupt(unittest.TestCase):
    """Simulate exact CLI scenario."""

    def setUp(self):
        set_interrupt(False)

    def tearDown(self):
        set_interrupt(False)

    def test_full_delegate_interrupt_flow(self):
        """Full integration: parent runs delegate_task, main thread interrupts."""
        from run_agent import AIAgent

        interrupt_detected = threading.Event()
        child_started = threading.Event()
        child_api_call_count = 0

        # Create a real-enough parent agent
        parent = AIAgent.__new__(AIAgent)
        parent._interrupt_requested = False
        parent._interrupt_message = None
        parent._active_children = []
        parent._active_children_lock = threading.Lock()
        parent.quiet_mode = True
        parent.model = "test/model"
        parent.base_url = "http://localhost:1"
        parent.api_key = "test"
        parent.provider = "test"
        parent.api_mode = "chat_completions"
        parent.platform = "cli"
        parent.enabled_toolsets = ["terminal", "file"]
        parent.providers_allowed = None
        parent.providers_ignored = None
        parent.providers_order = None
        parent.provider_sort = None
        parent.max_tokens = None
        parent.reasoning_config = None
        parent.prefill_messages = None
        parent._session_db = None
        parent._delegate_depth = 0
        parent._delegate_spinner = None
        parent.tool_progress_callback = None

        # We'll track what happens with _active_children
        original_children = parent._active_children

        # Mock the child's run_conversation to simulate a slow operation
        # that checks _interrupt_requested like the real one does
        def mock_child_run_conversation(user_message, **kwargs):
            child_started.set()
            # Find the child in parent._active_children
            child = parent._active_children[-1] if parent._active_children else None
            
            # Simulate the agent loop: poll _interrupt_requested like run_conversation does
            for i in range(100):  # Up to 10 seconds (100 * 0.1s)
                if child and child._interrupt_requested:
                    interrupt_detected.set()
                    return {
                        "final_response": "Interrupted!",
                        "messages": [],
                        "api_calls": 1,
                        "completed": False,
                        "interrupted": True,
                        "interrupt_message": child._interrupt_message,
                    }
                time.sleep(0.1)
            
            return {
                "final_response": "Finished without interrupt",
                "messages": [],
                "api_calls": 5,
                "completed": True,
                "interrupted": False,
            }

        # Patch AIAgent to use our mock
        from tools.delegate_tool import _run_single_child
        from run_agent import IterationBudget

        parent.iteration_budget = IterationBudget(max_total=100)

        # Run delegate in a thread (simulates agent_thread)
        delegate_result = [None]
        delegate_error = [None]

        def run_delegate():
            try:
                with patch('run_agent.AIAgent') as MockAgent:
                    mock_instance = MagicMock()
                    mock_instance._interrupt_requested = False
                    mock_instance._interrupt_message = None
                    mock_instance._active_children = []
                    mock_instance._active_children_lock = threading.Lock()
                    mock_instance.quiet_mode = True
                    mock_instance.run_conversation = mock_child_run_conversation
                    mock_instance.interrupt = lambda msg=None: setattr(mock_instance, '_interrupt_requested', True) or setattr(mock_instance, '_interrupt_message', msg)
                    mock_instance.tools = []
                    MockAgent.return_value = mock_instance

                    # Register child manually (normally done by _build_child_agent)
                    parent._active_children.append(mock_instance)

                    result = _run_single_child(
                        task_index=0,
                        goal="Do something slow",
                        child=mock_instance,
                        parent_agent=parent,
                    )
                    delegate_result[0] = result
            except Exception as e:
                delegate_error[0] = e

        agent_thread = threading.Thread(target=run_delegate, daemon=True)
        agent_thread.start()

        # Wait for child to start
        assert child_started.wait(timeout=5), "Child never started!"

        # Now simulate user interrupt (from main/process thread)
        time.sleep(0.2)  # Give child a moment to be in its loop
        
        print(f"Parent has {len(parent._active_children)} active children")
        assert len(parent._active_children) >= 1, f"Expected child in _active_children, got {len(parent._active_children)}"

        # This is what the CLI does:
        parent.interrupt("Hey stop that")
        
        print(f"Parent._interrupt_requested: {parent._interrupt_requested}")
        for i, child in enumerate(parent._active_children):
            print(f"Child {i}._interrupt_requested: {child._interrupt_requested}")

        # Wait for child to detect interrupt
        detected = interrupt_detected.wait(timeout=3.0)
        
        # Wait for delegate to finish
        agent_thread.join(timeout=5)

        if delegate_error[0]:
            raise delegate_error[0]

        assert detected, "Child never detected the interrupt!"
        result = delegate_result[0]
        assert result is not None, "Delegate returned no result"
        assert result["status"] == "interrupted", f"Expected 'interrupted', got '{result['status']}'"
        print(f"✓ Interrupt detected! Result: {result}")


if __name__ == "__main__":
    unittest.main()