"""End-to-end test simulating CLI interrupt during subagent execution. Reproduces the exact scenario: 1. Parent agent calls delegate_task 2. Child agent is running (simulated with a slow tool) 3. User "types a message" (simulated by calling parent.interrupt from another thread) 4. Child should detect the interrupt and stop This tests the COMPLETE path including _run_single_child, _active_children registration, interrupt propagation, and child detection. """ import json import os import queue import threading import time import unittest from unittest.mock import MagicMock, patch, PropertyMock from tools.interrupt import set_interrupt, is_interrupted class TestCLISubagentInterrupt(unittest.TestCase): """Simulate exact CLI scenario.""" def setUp(self): set_interrupt(False) def tearDown(self): set_interrupt(False) def test_full_delegate_interrupt_flow(self): """Full integration: parent runs delegate_task, main thread interrupts.""" from run_agent import AIAgent interrupt_detected = threading.Event() child_started = threading.Event() child_api_call_count = 0 # Create a real-enough parent agent parent = AIAgent.__new__(AIAgent) parent._interrupt_requested = False parent._interrupt_message = None parent._active_children = [] parent._active_children_lock = threading.Lock() parent.quiet_mode = True parent.model = "test/model" parent.base_url = "http://localhost:1" parent.api_key = "test" parent.provider = "test" parent.api_mode = "chat_completions" parent.platform = "cli" parent.enabled_toolsets = ["terminal", "file"] parent.providers_allowed = None parent.providers_ignored = None parent.providers_order = None parent.provider_sort = None parent.max_tokens = None parent.reasoning_config = None parent.prefill_messages = None parent._session_db = None parent._delegate_depth = 0 parent._delegate_spinner = None parent.tool_progress_callback = None # We'll track what happens with _active_children original_children = parent._active_children # Mock the child's run_conversation to simulate a slow operation # that checks _interrupt_requested like the real one does def mock_child_run_conversation(user_message, **kwargs): child_started.set() # Find the child in parent._active_children child = parent._active_children[-1] if parent._active_children else None # Simulate the agent loop: poll _interrupt_requested like run_conversation does for i in range(100): # Up to 10 seconds (100 * 0.1s) if child and child._interrupt_requested: interrupt_detected.set() return { "final_response": "Interrupted!", "messages": [], "api_calls": 1, "completed": False, "interrupted": True, "interrupt_message": child._interrupt_message, } time.sleep(0.1) return { "final_response": "Finished without interrupt", "messages": [], "api_calls": 5, "completed": True, "interrupted": False, } # Patch AIAgent to use our mock from tools.delegate_tool import _run_single_child from run_agent import IterationBudget parent.iteration_budget = IterationBudget(max_total=100) # Run delegate in a thread (simulates agent_thread) delegate_result = [None] delegate_error = [None] def run_delegate(): try: with patch('run_agent.AIAgent') as MockAgent: mock_instance = MagicMock() mock_instance._interrupt_requested = False mock_instance._interrupt_message = None mock_instance._active_children = [] mock_instance._active_children_lock = threading.Lock() mock_instance.quiet_mode = True mock_instance.run_conversation = mock_child_run_conversation mock_instance.interrupt = lambda msg=None: setattr(mock_instance, '_interrupt_requested', True) or setattr(mock_instance, '_interrupt_message', msg) mock_instance.tools = [] MockAgent.return_value = mock_instance # Register child manually (normally done by _build_child_agent) parent._active_children.append(mock_instance) result = _run_single_child( task_index=0, goal="Do something slow", child=mock_instance, parent_agent=parent, ) delegate_result[0] = result except Exception as e: delegate_error[0] = e agent_thread = threading.Thread(target=run_delegate, daemon=True) agent_thread.start() # Wait for child to start assert child_started.wait(timeout=5), "Child never started!" # Now simulate user interrupt (from main/process thread) time.sleep(0.2) # Give child a moment to be in its loop print(f"Parent has {len(parent._active_children)} active children") assert len(parent._active_children) >= 1, f"Expected child in _active_children, got {len(parent._active_children)}" # This is what the CLI does: parent.interrupt("Hey stop that") print(f"Parent._interrupt_requested: {parent._interrupt_requested}") for i, child in enumerate(parent._active_children): print(f"Child {i}._interrupt_requested: {child._interrupt_requested}") # Wait for child to detect interrupt detected = interrupt_detected.wait(timeout=3.0) # Wait for delegate to finish agent_thread.join(timeout=5) if delegate_error[0]: raise delegate_error[0] assert detected, "Child never detected the interrupt!" result = delegate_result[0] assert result is not None, "Delegate returned no result" assert result["status"] == "interrupted", f"Expected 'interrupted', got '{result['status']}'" print(f"✓ Interrupt detected! Result: {result}") if __name__ == "__main__": unittest.main()