jim-bo Claude Opus 4.6 (1M context) commited on
Commit
2509530
·
1 Parent(s): c0cf2a6

feat: thinking transparency layer with /verbose command

Browse files

Switch pipeline from stream_text() to stream_responses() to capture
thinking/reasoning tokens from models like Claude. Render thinking in
collapsible TUI panels, collapsed by default, expandable via /verbose.

New events: AgentThinkingChunk, AgentThinkingComplete
New command: /verbose (toggles thinking visibility)
New tests: 5 tests covering pipeline, TUI rendering, and toggle

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

src/cli_textual/agents/manager.py CHANGED
@@ -2,9 +2,12 @@ import asyncio
2
  from typing import AsyncGenerator, List, Any
3
  from pydantic_ai import Agent, RunContext
4
 
 
 
5
  from cli_textual.core.chat_events import (
6
  ChatEvent, AgentThinking, AgentToolStart, AgentToolEnd, AgentToolOutput,
7
- AgentStreamChunk, AgentComplete, AgentRequiresUserInput, ChatDeps, AgentExecuteCommand
 
8
  )
9
  from cli_textual.agents.model import model
10
  from cli_textual.tools.bash import bash_exec as pure_bash_exec
@@ -129,12 +132,40 @@ async def run_manager_pipeline(
129
  async def run_agent():
130
  try:
131
  async with manager_agent.run_stream(prompt, deps=deps, message_history=message_history) as result:
132
- last_length = 0
133
- async for text in result.stream_text():
134
- new_part = text[last_length:]
135
- if new_part:
136
- await event_queue.put(AgentStreamChunk(text=new_part))
137
- last_length = len(text)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
138
 
139
  await event_queue.put(AgentComplete(new_history=result.new_messages()))
140
  except Exception as e:
 
2
  from typing import AsyncGenerator, List, Any
3
  from pydantic_ai import Agent, RunContext
4
 
5
+ from pydantic_ai.messages import ThinkingPart, TextPart
6
+
7
  from cli_textual.core.chat_events import (
8
  ChatEvent, AgentThinking, AgentToolStart, AgentToolEnd, AgentToolOutput,
9
+ AgentStreamChunk, AgentComplete, AgentRequiresUserInput, ChatDeps, AgentExecuteCommand,
10
+ AgentThinkingChunk, AgentThinkingComplete,
11
  )
12
  from cli_textual.agents.model import model
13
  from cli_textual.tools.bash import bash_exec as pure_bash_exec
 
132
  async def run_agent():
133
  try:
134
  async with manager_agent.run_stream(prompt, deps=deps, message_history=message_history) as result:
135
+ last_thinking_len = 0
136
+ last_text_len = 0
137
+ thinking_complete = False
138
+
139
+ async for response, is_last in result.stream_responses():
140
+ # Accumulate thinking and text from all parts
141
+ thinking_text = ""
142
+ text_text = ""
143
+ for part in response.parts:
144
+ if isinstance(part, ThinkingPart):
145
+ thinking_text += part.content
146
+ elif isinstance(part, TextPart):
147
+ text_text += part.content
148
+
149
+ # Emit thinking deltas
150
+ if len(thinking_text) > last_thinking_len:
151
+ new_thinking = thinking_text[last_thinking_len:]
152
+ await event_queue.put(AgentThinkingChunk(text=new_thinking))
153
+ last_thinking_len = len(thinking_text)
154
+
155
+ # Signal thinking done when text starts
156
+ if text_text and not thinking_complete and last_thinking_len > 0:
157
+ await event_queue.put(AgentThinkingComplete(full_text=thinking_text))
158
+ thinking_complete = True
159
+
160
+ # Emit text deltas
161
+ if len(text_text) > last_text_len:
162
+ new_text = text_text[last_text_len:]
163
+ await event_queue.put(AgentStreamChunk(text=new_text))
164
+ last_text_len = len(text_text)
165
+
166
+ # If thinking was emitted but no text followed, still signal complete
167
+ if last_thinking_len > 0 and not thinking_complete:
168
+ await event_queue.put(AgentThinkingComplete(full_text=thinking_text))
169
 
170
  await event_queue.put(AgentComplete(new_history=result.new_messages()))
171
  except Exception as e:
src/cli_textual/app.py CHANGED
@@ -8,8 +8,8 @@ from textual import on, events
8
  from textual.app import App, ComposeResult
9
  from textual.containers import Container, VerticalScroll, Horizontal
10
  from textual.widgets import (
11
- Header, Footer, Static, Markdown, Label, OptionList,
12
- TabbedContent, DirectoryTree, DataTable
13
  )
14
  from textual.widgets.option_list import Option
15
  from textual.binding import Binding
@@ -20,7 +20,8 @@ from cli_textual.core.permissions import PermissionManager
20
  from cli_textual.core.command import CommandManager
21
  from cli_textual.core.chat_events import (
22
  ChatEvent, AgentThinking, AgentToolStart, AgentToolEnd, AgentToolOutput,
23
- AgentStreamChunk, AgentComplete, AgentRequiresUserInput, AgentExecuteCommand
 
24
  )
25
 
26
  # Pydantic AI Orchestrators
@@ -53,12 +54,13 @@ class ChatApp(App):
53
  self.chat_mode = os.getenv("CHAT_MODE", "manager")
54
  self.message_history = [] # For LLM context memory
55
  self.interactive_input_queue = asyncio.Queue()
 
56
 
57
 
58
  # Initialize Core Managers
59
  self.workspace_root = Path.cwd().resolve()
60
  self.fs_manager = FSManager(self.workspace_root)
61
- self.permission_manager = PermissionManager(self.workspace_root / ".cbio" / "settings.json")
62
  self.command_manager = CommandManager()
63
 
64
  # Register Commands via Auto-Discovery
@@ -176,9 +178,30 @@ class ChatApp(App):
176
 
177
  markdown_widget = None
178
  full_text = ""
 
 
 
179
 
180
  async for event in generator:
181
- if isinstance(event, AgentThinking):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
182
  task_label.update(event.message)
183
 
184
  elif isinstance(event, AgentRequiresUserInput):
 
8
  from textual.app import App, ComposeResult
9
  from textual.containers import Container, VerticalScroll, Horizontal
10
  from textual.widgets import (
11
+ Header, Footer, Static, Markdown, Label, OptionList,
12
+ TabbedContent, DirectoryTree, DataTable, Collapsible
13
  )
14
  from textual.widgets.option_list import Option
15
  from textual.binding import Binding
 
20
  from cli_textual.core.command import CommandManager
21
  from cli_textual.core.chat_events import (
22
  ChatEvent, AgentThinking, AgentToolStart, AgentToolEnd, AgentToolOutput,
23
+ AgentStreamChunk, AgentComplete, AgentRequiresUserInput, AgentExecuteCommand,
24
+ AgentThinkingChunk, AgentThinkingComplete,
25
  )
26
 
27
  # Pydantic AI Orchestrators
 
54
  self.chat_mode = os.getenv("CHAT_MODE", "manager")
55
  self.message_history = [] # For LLM context memory
56
  self.interactive_input_queue = asyncio.Queue()
57
+ self.verbose_mode = False
58
 
59
 
60
  # Initialize Core Managers
61
  self.workspace_root = Path.cwd().resolve()
62
  self.fs_manager = FSManager(self.workspace_root)
63
+ self.permission_manager = PermissionManager(self.workspace_root / ".agents" / "settings.json")
64
  self.command_manager = CommandManager()
65
 
66
  # Register Commands via Auto-Discovery
 
178
 
179
  markdown_widget = None
180
  full_text = ""
181
+ thinking_collapsible = None
182
+ thinking_widget = None
183
+ thinking_text = ""
184
 
185
  async for event in generator:
186
+ if isinstance(event, AgentThinkingChunk):
187
+ if not thinking_collapsible:
188
+ thinking_collapsible = Collapsible(
189
+ Static("", classes="thinking-content"),
190
+ title="Reasoning",
191
+ collapsed=not self.verbose_mode,
192
+ classes="thinking-block",
193
+ )
194
+ await history.mount(thinking_collapsible)
195
+ thinking_widget = thinking_collapsible.query_one(".thinking-content")
196
+ thinking_text += event.text
197
+ thinking_widget.update(thinking_text)
198
+ history.scroll_end(animate=False)
199
+
200
+ elif isinstance(event, AgentThinkingComplete):
201
+ if thinking_widget:
202
+ thinking_widget.update(event.full_text)
203
+
204
+ elif isinstance(event, AgentThinking):
205
  task_label.update(event.message)
206
 
207
  elif isinstance(event, AgentRequiresUserInput):
src/cli_textual/app.tcss CHANGED
@@ -240,3 +240,20 @@ DirectoryTree {
240
  padding: 0 1;
241
  margin: 0 0 1 0;
242
  }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
240
  padding: 0 1;
241
  margin: 0 0 1 0;
242
  }
243
+
244
+ .thinking-block {
245
+ margin: 0 0 1 0;
246
+ border-left: solid #555555;
247
+ padding: 0;
248
+ }
249
+
250
+ .thinking-block CollapsibleTitle {
251
+ color: #888888;
252
+ text-style: italic;
253
+ }
254
+
255
+ .thinking-content {
256
+ color: #777777;
257
+ padding: 0 1;
258
+ background: #1A1A1A;
259
+ }
src/cli_textual/core/chat_events.py CHANGED
@@ -50,6 +50,16 @@ class AgentToolOutput(ChatEvent):
50
  content: str
51
  is_error: bool = False
52
 
 
 
 
 
 
 
 
 
 
 
53
  @dataclass
54
  class AgentStreamChunk(ChatEvent):
55
  """A partial chunk of the final text response."""
 
50
  content: str
51
  is_error: bool = False
52
 
53
+ @dataclass
54
+ class AgentThinkingChunk(ChatEvent):
55
+ """A partial chunk of the model's reasoning/thinking tokens."""
56
+ text: str
57
+
58
+ @dataclass
59
+ class AgentThinkingComplete(ChatEvent):
60
+ """The model has finished emitting thinking tokens for this turn."""
61
+ full_text: str
62
+
63
  @dataclass
64
  class AgentStreamChunk(ChatEvent):
65
  """A partial chunk of the final text response."""
src/cli_textual/plugins/commands/verbose.py ADDED
@@ -0,0 +1,19 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import List
2
+ from cli_textual.core.command import SlashCommand
3
+
4
+
5
+ class VerboseCommand(SlashCommand):
6
+ """Toggle verbose mode to show agent thinking by default."""
7
+
8
+ @property
9
+ def name(self) -> str:
10
+ return "/verbose"
11
+
12
+ @property
13
+ def description(self) -> str:
14
+ return "Toggle verbose mode (show thinking expanded)"
15
+
16
+ async def execute(self, app, args: List[str]):
17
+ app.verbose_mode = not app.verbose_mode
18
+ state = "ON" if app.verbose_mode else "OFF"
19
+ app.add_to_history(f"Verbose mode: **{state}**")
tests/conftest.py CHANGED
@@ -11,13 +11,13 @@ sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')
11
  def setup_permissions():
12
  """Automatically approve all tools before every test."""
13
  workspace_root = Path.cwd().resolve()
14
- settings_dir = workspace_root / ".cbio"
15
  settings_path = settings_dir / "settings.json"
16
 
17
  os.makedirs(settings_dir, exist_ok=True)
18
  with open(settings_path, "w") as f:
19
  json.dump({
20
- "approved_tools": ["/ls", "/head", "/select", "/load", "/survey", "/clear"]
21
  }, f)
22
  yield
23
  # Cleanup if needed
 
11
  def setup_permissions():
12
  """Automatically approve all tools before every test."""
13
  workspace_root = Path.cwd().resolve()
14
+ settings_dir = workspace_root / ".agents"
15
  settings_path = settings_dir / "settings.json"
16
 
17
  os.makedirs(settings_dir, exist_ok=True)
18
  with open(settings_path, "w") as f:
19
  json.dump({
20
+ "approved_tools": ["/ls", "/head", "/select", "/load", "/survey", "/clear", "/verbose"]
21
  }, f)
22
  yield
23
  # Cleanup if needed
tests/unit/test_thinking.py ADDED
@@ -0,0 +1,113 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Tests for thinking/reasoning transparency layer."""
2
+ import asyncio
3
+ import pytest
4
+ from pydantic_ai.models.function import FunctionModel, AgentInfo, DeltaThinkingPart
5
+ from pydantic_ai.messages import ModelMessage
6
+ from textual.widgets import Collapsible
7
+
8
+ from cli_textual.agents.manager import run_manager_pipeline, manager_agent
9
+ from cli_textual.core.chat_events import (
10
+ AgentThinkingChunk, AgentThinkingComplete, AgentStreamChunk, AgentComplete,
11
+ )
12
+ from cli_textual.app import ChatApp
13
+
14
+
15
+ # ---------------------------------------------------------------------------
16
+ # Pipeline tests
17
+ # ---------------------------------------------------------------------------
18
+
19
+ @pytest.mark.asyncio
20
+ async def test_pipeline_emits_thinking_chunks():
21
+ """Thinking tokens surface as AgentThinkingChunk events."""
22
+ async def thinking_then_text(messages: list[ModelMessage], info: AgentInfo):
23
+ yield {0: DeltaThinkingPart(content="Let me reason about this.")}
24
+ yield "Here is my answer."
25
+
26
+ input_queue = asyncio.Queue()
27
+ events = []
28
+ with manager_agent.override(model=FunctionModel(stream_function=thinking_then_text)):
29
+ async with asyncio.timeout(5):
30
+ async for event in run_manager_pipeline("test", input_queue):
31
+ events.append(event)
32
+
33
+ thinking_chunks = [e for e in events if isinstance(e, AgentThinkingChunk)]
34
+ assert thinking_chunks, "No AgentThinkingChunk events emitted"
35
+
36
+ thinking_complete = [e for e in events if isinstance(e, AgentThinkingComplete)]
37
+ assert thinking_complete, "No AgentThinkingComplete event emitted"
38
+ assert "reason" in thinking_complete[0].full_text.lower()
39
+
40
+ text_chunks = [e for e in events if isinstance(e, AgentStreamChunk)]
41
+ assert text_chunks, "No text chunks emitted"
42
+ assert isinstance(events[-1], AgentComplete)
43
+
44
+
45
+ @pytest.mark.asyncio
46
+ async def test_pipeline_no_thinking_still_works():
47
+ """Existing behavior preserved when model produces no thinking."""
48
+ async def text_only(messages: list[ModelMessage], info: AgentInfo):
49
+ yield "Just text, no thinking."
50
+
51
+ input_queue = asyncio.Queue()
52
+ events = []
53
+ with manager_agent.override(model=FunctionModel(stream_function=text_only)):
54
+ async with asyncio.timeout(5):
55
+ async for event in run_manager_pipeline("test", input_queue):
56
+ events.append(event)
57
+
58
+ thinking_chunks = [e for e in events if isinstance(e, AgentThinkingChunk)]
59
+ assert not thinking_chunks, "Unexpected thinking chunks for text-only model"
60
+ assert any(isinstance(e, AgentStreamChunk) for e in events)
61
+ assert isinstance(events[-1], AgentComplete)
62
+
63
+
64
+ # ---------------------------------------------------------------------------
65
+ # TUI tests
66
+ # ---------------------------------------------------------------------------
67
+
68
+ @pytest.mark.asyncio
69
+ async def test_thinking_renders_collapsed_by_default():
70
+ """Thinking appears in a collapsed Collapsible widget."""
71
+ async def thinking_then_text(messages: list[ModelMessage], info: AgentInfo):
72
+ yield {0: DeltaThinkingPart(content="Deep thought here")}
73
+ yield "Final answer."
74
+
75
+ app = ChatApp()
76
+ with manager_agent.override(model=FunctionModel(stream_function=thinking_then_text)):
77
+ async with app.run_test(size=(120, 40)) as pilot:
78
+ await pilot.press(*"hello", "enter")
79
+ await pilot.pause(2.0)
80
+
81
+ collapsibles = list(app.query_one("#history-container").query(Collapsible))
82
+ assert collapsibles, "No Collapsible widget found for thinking"
83
+ assert collapsibles[0].collapsed is True
84
+
85
+
86
+ @pytest.mark.asyncio
87
+ async def test_verbose_mode_expands_thinking():
88
+ """With verbose_mode=True, thinking is expanded."""
89
+ async def thinking_then_text(messages: list[ModelMessage], info: AgentInfo):
90
+ yield {0: DeltaThinkingPart(content="Deep thought here")}
91
+ yield "Final answer."
92
+
93
+ app = ChatApp()
94
+ app.verbose_mode = True
95
+ with manager_agent.override(model=FunctionModel(stream_function=thinking_then_text)):
96
+ async with app.run_test(size=(120, 40)) as pilot:
97
+ await pilot.press(*"hello", "enter")
98
+ await pilot.pause(2.0)
99
+
100
+ collapsibles = list(app.query_one("#history-container").query(Collapsible))
101
+ assert collapsibles, "No Collapsible widget found"
102
+ assert collapsibles[0].collapsed is False
103
+
104
+
105
+ @pytest.mark.asyncio
106
+ async def test_verbose_command_toggles():
107
+ """/verbose toggles app.verbose_mode."""
108
+ app = ChatApp()
109
+ async with app.run_test(size=(120, 40)) as pilot:
110
+ assert app.verbose_mode is False
111
+ await pilot.press(*"/verbose", "enter")
112
+ await pilot.pause(0.5)
113
+ assert app.verbose_mode is True