File size: 13,426 Bytes
9aa5185 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 | """
Tests for subagent progress relay (issue #169).
Verifies that:
- KawaiiSpinner.print_above() works with and without active spinner
- _build_child_progress_callback handles CLI/gateway/no-display paths
- Thinking events are relayed correctly
- Parallel callbacks don't share state
"""
import io
import sys
import time
import threading
import pytest
from unittest.mock import MagicMock, patch
from agent.display import KawaiiSpinner
from tools.delegate_tool import _build_child_progress_callback
# =========================================================================
# KawaiiSpinner.print_above tests
# =========================================================================
class TestPrintAbove:
"""Tests for KawaiiSpinner.print_above method."""
def test_print_above_without_spinner_running(self):
"""print_above should write to stdout even when spinner is not running."""
buf = io.StringIO()
spinner = KawaiiSpinner("test")
spinner._out = buf # Redirect to buffer
spinner.print_above("hello world")
output = buf.getvalue()
assert "hello world" in output
def test_print_above_with_spinner_running(self):
"""print_above should clear spinner line and print text."""
buf = io.StringIO()
spinner = KawaiiSpinner("test")
spinner._out = buf
spinner.running = True # Pretend spinner is running (don't start thread)
spinner.print_above("tool line")
output = buf.getvalue()
assert "tool line" in output
assert "\r" in output # Should start with carriage return to clear spinner line
def test_print_above_uses_captured_stdout(self):
"""print_above should use self._out, not sys.stdout.
This ensures it works inside redirect_stdout(devnull)."""
buf = io.StringIO()
spinner = KawaiiSpinner("test")
spinner._out = buf
# Simulate redirect_stdout(devnull)
old_stdout = sys.stdout
sys.stdout = io.StringIO()
try:
spinner.print_above("should go to buf")
finally:
sys.stdout = old_stdout
assert "should go to buf" in buf.getvalue()
# =========================================================================
# _build_child_progress_callback tests
# =========================================================================
class TestBuildChildProgressCallback:
"""Tests for child progress callback builder."""
def test_returns_none_when_no_display(self):
"""Should return None when parent has no spinner or callback."""
parent = MagicMock()
parent._delegate_spinner = None
parent.tool_progress_callback = None
cb = _build_child_progress_callback(0, parent)
assert cb is None
def test_cli_spinner_tool_event(self):
"""Should print tool line above spinner for CLI path."""
buf = io.StringIO()
spinner = KawaiiSpinner("delegating")
spinner._out = buf
spinner.running = True
parent = MagicMock()
parent._delegate_spinner = spinner
parent.tool_progress_callback = None
cb = _build_child_progress_callback(0, parent)
assert cb is not None
cb("web_search", "quantum computing")
output = buf.getvalue()
assert "web_search" in output
assert "quantum computing" in output
assert "ββ" in output
def test_cli_spinner_thinking_event(self):
"""Should print thinking line above spinner for CLI path."""
buf = io.StringIO()
spinner = KawaiiSpinner("delegating")
spinner._out = buf
spinner.running = True
parent = MagicMock()
parent._delegate_spinner = spinner
parent.tool_progress_callback = None
cb = _build_child_progress_callback(0, parent)
cb("_thinking", "I'll search for papers first")
output = buf.getvalue()
assert "π" in output
assert "search for papers" in output
def test_gateway_batched_progress(self):
"""Gateway path should batch tool calls and flush at BATCH_SIZE."""
parent = MagicMock()
parent._delegate_spinner = None
parent_cb = MagicMock()
parent.tool_progress_callback = parent_cb
cb = _build_child_progress_callback(0, parent)
# Send 4 tool calls β shouldn't flush yet (BATCH_SIZE = 5)
for i in range(4):
cb(f"tool_{i}", f"arg_{i}")
parent_cb.assert_not_called()
# 5th call should trigger flush
cb("tool_4", "arg_4")
parent_cb.assert_called_once()
call_args = parent_cb.call_args
assert "tool_0" in call_args[0][1]
assert "tool_4" in call_args[0][1]
def test_thinking_not_relayed_to_gateway(self):
"""Thinking events should NOT be sent to gateway (too noisy)."""
parent = MagicMock()
parent._delegate_spinner = None
parent_cb = MagicMock()
parent.tool_progress_callback = parent_cb
cb = _build_child_progress_callback(0, parent)
cb("_thinking", "some reasoning text")
parent_cb.assert_not_called()
def test_parallel_callbacks_independent(self):
"""Each child's callback should have independent batch state."""
parent = MagicMock()
parent._delegate_spinner = None
parent_cb = MagicMock()
parent.tool_progress_callback = parent_cb
cb0 = _build_child_progress_callback(0, parent)
cb1 = _build_child_progress_callback(1, parent)
# Send 3 calls to each β neither should flush (batch size = 5)
for i in range(3):
cb0(f"tool_{i}")
cb1(f"other_{i}")
parent_cb.assert_not_called()
def test_task_index_prefix_in_batch_mode(self):
"""Batch mode (task_count > 1) should show 1-indexed prefix for all tasks."""
buf = io.StringIO()
spinner = KawaiiSpinner("delegating")
spinner._out = buf
spinner.running = True
parent = MagicMock()
parent._delegate_spinner = spinner
parent.tool_progress_callback = None
# task_index=0 in a batch of 3 β prefix "[1]"
cb0 = _build_child_progress_callback(0, parent, task_count=3)
cb0("web_search", "test")
output = buf.getvalue()
assert "[1]" in output
# task_index=2 in a batch of 3 β prefix "[3]"
buf.truncate(0)
buf.seek(0)
cb2 = _build_child_progress_callback(2, parent, task_count=3)
cb2("web_search", "test")
output = buf.getvalue()
assert "[3]" in output
def test_single_task_no_prefix(self):
"""Single task (task_count=1) should not show index prefix."""
buf = io.StringIO()
spinner = KawaiiSpinner("delegating")
spinner._out = buf
spinner.running = True
parent = MagicMock()
parent._delegate_spinner = spinner
parent.tool_progress_callback = None
cb = _build_child_progress_callback(0, parent, task_count=1)
cb("web_search", "test")
output = buf.getvalue()
assert "[" not in output
# =========================================================================
# Integration: thinking callback in run_agent.py
# =========================================================================
class TestThinkingCallback:
"""Tests for the _thinking callback in AIAgent conversation loop."""
def _simulate_thinking_callback(self, content, callback, delegate_depth=1):
"""Simulate the exact code path from run_agent.py for the thinking callback.
delegate_depth: simulates self._delegate_depth.
0 = main agent (should NOT fire), >=1 = subagent (should fire).
"""
import re
if (content and callback and delegate_depth > 0):
_think_text = content.strip()
_think_text = re.sub(
r'</?(?:REASONING_SCRATCHPAD|think|reasoning)>', '', _think_text
).strip()
first_line = _think_text.split('\n')[0][:80] if _think_text else ""
if first_line:
try:
callback("_thinking", first_line)
except Exception:
pass
def test_thinking_callback_fires_on_content(self):
"""tool_progress_callback should receive _thinking event
when assistant message has content."""
calls = []
self._simulate_thinking_callback(
"I'll research quantum computing first, then summarize.",
lambda name, preview=None: calls.append((name, preview))
)
assert len(calls) == 1
assert calls[0][0] == "_thinking"
assert "quantum computing" in calls[0][1]
def test_thinking_callback_skipped_when_no_content(self):
"""Should not fire when assistant has no content."""
calls = []
self._simulate_thinking_callback(
None,
lambda name, preview=None: calls.append((name, preview))
)
assert len(calls) == 0
def test_thinking_callback_truncates_long_content(self):
"""Should truncate long content to 80 chars."""
calls = []
self._simulate_thinking_callback(
"A" * 200 + "\nSecond line should be ignored",
lambda name, preview=None: calls.append((name, preview))
)
assert len(calls) == 1
assert len(calls[0][1]) == 80
def test_thinking_callback_skipped_for_main_agent(self):
"""Main agent (delegate_depth=0) should NOT fire thinking events.
This prevents gateway spam on Telegram/Discord."""
calls = []
self._simulate_thinking_callback(
"I'll help you with that request.",
lambda name, preview=None: calls.append((name, preview)),
delegate_depth=0,
)
assert len(calls) == 0
def test_thinking_callback_strips_reasoning_scratchpad(self):
"""REASONING_SCRATCHPAD tags should be stripped before display."""
calls = []
self._simulate_thinking_callback(
"<REASONING_SCRATCHPAD>I need to analyze this carefully</REASONING_SCRATCHPAD>",
lambda name, preview=None: calls.append((name, preview))
)
assert len(calls) == 1
assert "<REASONING_SCRATCHPAD>" not in calls[0][1]
assert "analyze this carefully" in calls[0][1]
def test_thinking_callback_strips_think_tags(self):
"""<think> tags should be stripped before display."""
calls = []
self._simulate_thinking_callback(
"<think>Let me think about this problem</think>",
lambda name, preview=None: calls.append((name, preview))
)
assert len(calls) == 1
assert "<think>" not in calls[0][1]
assert "think about this problem" in calls[0][1]
def test_thinking_callback_empty_after_strip(self):
"""Should not fire when content is only XML tags."""
calls = []
self._simulate_thinking_callback(
"<REASONING_SCRATCHPAD></REASONING_SCRATCHPAD>",
lambda name, preview=None: calls.append((name, preview))
)
assert len(calls) == 0
# =========================================================================
# Gateway batch flush tests
# =========================================================================
class TestBatchFlush:
"""Tests for gateway batch flush on subagent completion."""
def test_flush_sends_remaining_batch(self):
"""_flush should send remaining tool names to gateway."""
parent = MagicMock()
parent._delegate_spinner = None
parent_cb = MagicMock()
parent.tool_progress_callback = parent_cb
cb = _build_child_progress_callback(0, parent)
# Send 3 tools (below batch size of 5)
cb("web_search", "query1")
cb("read_file", "file.txt")
cb("write_file", "out.txt")
parent_cb.assert_not_called()
# Flush should send the remaining 3
cb._flush()
parent_cb.assert_called_once()
summary = parent_cb.call_args[0][1]
assert "web_search" in summary
assert "write_file" in summary
def test_flush_noop_when_batch_empty(self):
"""_flush should not send anything when batch is empty."""
parent = MagicMock()
parent._delegate_spinner = None
parent_cb = MagicMock()
parent.tool_progress_callback = parent_cb
cb = _build_child_progress_callback(0, parent)
cb._flush()
parent_cb.assert_not_called()
def test_flush_noop_when_no_parent_callback(self):
"""_flush should not crash when there's no parent callback."""
buf = io.StringIO()
spinner = KawaiiSpinner("test")
spinner._out = buf
spinner.running = True
parent = MagicMock()
parent._delegate_spinner = spinner
parent.tool_progress_callback = None
cb = _build_child_progress_callback(0, parent)
cb("web_search", "test")
cb._flush() # Should not crash
if __name__ == "__main__":
pytest.main([__file__, "-v"])
|