jim-bo Claude Opus 4.6 (1M context) commited on
Commit
597fb95
·
1 Parent(s): 4c528ec

feat: add native agent tools, /tools command, and fix rendering bugs

Browse files

- Add bash_exec, read_file, web_fetch tools to manager_agent with
streaming output via AgentToolOutput events
- Add /tools slash command with self-contained ToolsWidget that
introspects agent tools and shows detail on selection
- Fix blank LLM responses: await Markdown.update(), mount(), remove()
- Fix silent error swallowing in run_manager_pipeline
- Fix OpenRouter model routing for nvidia/model:free style IDs
- Switch default model to nvidia/nemotron-3-super-120b-a12b:free
- Use auto_discover for command registration
- Add AgentToolOutput, AgentExecuteCommand event types
- Add unit tests for tools, /tools command, and TUI rendering
- Add integration tests for tool e2e (requires OPENROUTER_API_KEY)
- Fix test_manager_pipeline_flow timeout (use FunctionModel)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

src/cli_textual/agents/orchestrators.py CHANGED
@@ -1,14 +1,16 @@
1
  import os
2
  import asyncio
 
3
  from typing import AsyncGenerator, List, Any
 
4
  from pydantic_ai import Agent, RunContext
5
  from pydantic_ai.models.openai import OpenAIChatModel
6
  from pydantic_ai.models.test import TestModel
7
  from pydantic_ai.messages import ModelMessage
8
 
9
  from cli_textual.core.chat_events import (
10
- ChatEvent, AgentThinking, AgentToolStart, AgentToolEnd, AgentStreamChunk, AgentComplete,
11
- AgentRequiresUserInput, ChatDeps
12
  )
13
  from cli_textual.agents.specialists import model, intent_resolver, data_validator, result_generator
14
  from cli_textual.core.agent_schemas import IntentResolution, ValidationResult, StructuredResult
@@ -70,12 +72,159 @@ manager_agent = Agent(
70
  )
71
 
72
  @manager_agent.tool
73
- async def ask_user_to_select_manager(ctx: RunContext[ChatDeps], prompt: str, options: List[str]) -> str:
74
- """Ask the user to select from a list of options. Use this when you need the user to make a choice."""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
75
  await ctx.deps.event_queue.put(AgentRequiresUserInput(tool_name="/select", prompt=prompt, options=options))
76
  response = await ctx.deps.input_queue.get()
77
  return response
78
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
79
  @manager_agent.tool
80
  async def call_intent_resolver(ctx: RunContext[ChatDeps], query: str) -> str:
81
  """Resolve a user's natural language query to a specific target identifier."""
@@ -125,11 +274,11 @@ async def run_manager_pipeline(
125
  if new_part:
126
  await event_queue.put(AgentStreamChunk(text=new_part))
127
  last_length = len(text)
128
-
129
  await event_queue.put(AgentComplete(new_history=result.new_messages()))
130
  except Exception as e:
 
131
  await event_queue.put(AgentComplete())
132
- raise e
133
 
134
  # Run the agent in the background
135
  task = asyncio.create_task(run_agent())
 
1
  import os
2
  import asyncio
3
+ from pathlib import Path
4
  from typing import AsyncGenerator, List, Any
5
+ import httpx
6
  from pydantic_ai import Agent, RunContext
7
  from pydantic_ai.models.openai import OpenAIChatModel
8
  from pydantic_ai.models.test import TestModel
9
  from pydantic_ai.messages import ModelMessage
10
 
11
  from cli_textual.core.chat_events import (
12
+ ChatEvent, AgentThinking, AgentToolStart, AgentToolEnd, AgentToolOutput,
13
+ AgentStreamChunk, AgentComplete, AgentRequiresUserInput, ChatDeps, AgentExecuteCommand
14
  )
15
  from cli_textual.agents.specialists import model, intent_resolver, data_validator, result_generator
16
  from cli_textual.core.agent_schemas import IntentResolution, ValidationResult, StructuredResult
 
72
  )
73
 
74
  @manager_agent.tool
75
+ async def ask_user_to_select(ctx: RunContext[ChatDeps], prompt: str, options: List[str]) -> str:
76
+ """Show a selection menu in the TUI and WAIT for the user's choice before continuing.
77
+
78
+ ALWAYS call this tool when the user's message contains any selection intent:
79
+ - "let me select / choose / pick"
80
+ - "I want to choose / select"
81
+ - "help me pick"
82
+ - "first pick / first choose / first select"
83
+ - any phrasing where the user should decide between options
84
+
85
+ This tool PAUSES the agent and BLOCKS until the user makes a choice in the terminal UI.
86
+ You MUST call this BEFORE writing any response that depends on the user's selection.
87
+ The return value is the user's chosen option — use it in your response.
88
+
89
+ Args:
90
+ prompt: The question shown above the menu (e.g., "Choose a primary color:")
91
+ options: The list of choices to display (e.g., ["Red", "Blue", "Yellow"])
92
+ """
93
  await ctx.deps.event_queue.put(AgentRequiresUserInput(tool_name="/select", prompt=prompt, options=options))
94
  response = await ctx.deps.input_queue.get()
95
  return response
96
 
97
+ @manager_agent.tool
98
+ async def execute_slash_command(ctx: RunContext[ChatDeps], command_name: str, args: List[str] = None) -> str:
99
+ """Execute a TUI slash command (e.g. '/clear', '/ls').
100
+ Use this to trigger UI actions or system tools.
101
+ """
102
+ if args is None: args = []
103
+ # Ensure command name starts with /
104
+ if not command_name.startswith("/"):
105
+ command_name = f"/{command_name}"
106
+ await ctx.deps.event_queue.put(AgentExecuteCommand(command_name=command_name, args=args))
107
+ return f"Command {command_name} triggered in UI."
108
+
109
+ @manager_agent.tool
110
+ async def bash_exec(ctx: RunContext[ChatDeps], command: str, working_dir: str = ".") -> str:
111
+ """Execute a shell command and stream its output to the UI in real time.
112
+
113
+ Use this to run scripts, inspect the system, process files, or perform any
114
+ shell operation. stdout and stderr are merged and streamed as they arrive.
115
+ Output is capped at 8 KB; a truncation note is appended when exceeded.
116
+
117
+ Args:
118
+ command: The shell command to run (passed to /bin/sh)
119
+ working_dir: Working directory for the command (default: current directory)
120
+ """
121
+ await ctx.deps.event_queue.put(AgentToolStart(tool_name="bash_exec", args={"command": command}))
122
+ MAX_OUTPUT = 8192
123
+ output_parts: list[str] = []
124
+ exit_code = 1
125
+ try:
126
+ proc = await asyncio.create_subprocess_shell(
127
+ command,
128
+ stdout=asyncio.subprocess.PIPE,
129
+ stderr=asyncio.subprocess.STDOUT,
130
+ cwd=working_dir,
131
+ )
132
+ assert proc.stdout is not None
133
+ while True:
134
+ chunk = await proc.stdout.read(1024)
135
+ if not chunk:
136
+ break
137
+ text = chunk.decode("utf-8", errors="replace")
138
+ output_parts.append(text)
139
+ await ctx.deps.event_queue.put(AgentToolOutput(tool_name="bash_exec", content=text))
140
+ await proc.wait()
141
+ exit_code = proc.returncode or 0
142
+ except Exception as exc:
143
+ err = f"Error: {exc}"
144
+ await ctx.deps.event_queue.put(AgentToolOutput(tool_name="bash_exec", content=err, is_error=True))
145
+ await ctx.deps.event_queue.put(AgentToolEnd(tool_name="bash_exec", result="error"))
146
+ return err
147
+
148
+ full_output = "".join(output_parts)
149
+ truncated = ""
150
+ if len(full_output) > MAX_OUTPUT:
151
+ full_output = full_output[:MAX_OUTPUT]
152
+ truncated = "\n[output truncated]"
153
+ result = f"Exit code: {exit_code}\n{full_output}{truncated}"
154
+ await ctx.deps.event_queue.put(AgentToolEnd(tool_name="bash_exec", result=f"exit {exit_code}"))
155
+ return result
156
+
157
+
158
+ @manager_agent.tool
159
+ async def read_file(ctx: RunContext[ChatDeps], path: str, start_line: int = 1, end_line: int = None) -> str:
160
+ """Read the contents of a local file, optionally restricted to a line range.
161
+
162
+ Args:
163
+ path: File path (relative to CWD or absolute)
164
+ start_line: First line to include, 1-indexed (default: 1)
165
+ end_line: Last line to include (default: read all, capped at 200 lines)
166
+ """
167
+ await ctx.deps.event_queue.put(AgentToolStart(tool_name="read_file", args={"path": path}))
168
+ MAX_CHARS = 8192
169
+ MAX_LINES = 200
170
+ try:
171
+ file_path = Path(path)
172
+ if not file_path.is_absolute():
173
+ file_path = Path.cwd() / file_path
174
+ lines = file_path.read_text(encoding="utf-8", errors="replace").splitlines()
175
+ start = max(0, start_line - 1)
176
+ end = min(len(lines), end_line if end_line is not None else len(lines))
177
+ end = min(end, start + MAX_LINES)
178
+ selected = lines[start:end]
179
+ content = "\n".join(selected)
180
+ truncated = ""
181
+ if len(content) > MAX_CHARS:
182
+ content = content[:MAX_CHARS]
183
+ truncated = "\n[truncated]"
184
+ result = content + truncated
185
+ except Exception as exc:
186
+ result = f"Error reading file: {exc}"
187
+ await ctx.deps.event_queue.put(AgentToolOutput(tool_name="read_file", content=result, is_error=True))
188
+ await ctx.deps.event_queue.put(AgentToolEnd(tool_name="read_file", result="error"))
189
+ return result
190
+
191
+ await ctx.deps.event_queue.put(AgentToolOutput(tool_name="read_file", content=result))
192
+ await ctx.deps.event_queue.put(AgentToolEnd(tool_name="read_file", result=f"{len(selected)} lines"))
193
+ return result
194
+
195
+
196
+ @manager_agent.tool
197
+ async def web_fetch(ctx: RunContext[ChatDeps], url: str) -> str:
198
+ """Fetch the contents of a URL via HTTP GET and return the response body.
199
+
200
+ Use this for REST APIs, documentation pages, or any web resource.
201
+ Response body is capped at 8 KB; a truncation note is appended when exceeded.
202
+
203
+ Args:
204
+ url: The URL to fetch
205
+ """
206
+ await ctx.deps.event_queue.put(AgentToolStart(tool_name="web_fetch", args={"url": url}))
207
+ MAX_CHARS = 8192
208
+ try:
209
+ async with httpx.AsyncClient(follow_redirects=True, timeout=30) as client:
210
+ response = await client.get(url)
211
+ body = response.text
212
+ truncated = ""
213
+ if len(body) > MAX_CHARS:
214
+ body = body[:MAX_CHARS]
215
+ truncated = "\n[truncated]"
216
+ result = f"HTTP {response.status_code}\n{body}{truncated}"
217
+ except Exception as exc:
218
+ result = f"Error fetching URL: {exc}"
219
+ await ctx.deps.event_queue.put(AgentToolOutput(tool_name="web_fetch", content=result, is_error=True))
220
+ await ctx.deps.event_queue.put(AgentToolEnd(tool_name="web_fetch", result="error"))
221
+ return result
222
+
223
+ await ctx.deps.event_queue.put(AgentToolOutput(tool_name="web_fetch", content=result))
224
+ await ctx.deps.event_queue.put(AgentToolEnd(tool_name="web_fetch", result=f"HTTP {response.status_code}"))
225
+ return result
226
+
227
+
228
  @manager_agent.tool
229
  async def call_intent_resolver(ctx: RunContext[ChatDeps], query: str) -> str:
230
  """Resolve a user's natural language query to a specific target identifier."""
 
274
  if new_part:
275
  await event_queue.put(AgentStreamChunk(text=new_part))
276
  last_length = len(text)
277
+
278
  await event_queue.put(AgentComplete(new_history=result.new_messages()))
279
  except Exception as e:
280
+ await event_queue.put(AgentStreamChunk(text=f"\n\n**Error:** {e}"))
281
  await event_queue.put(AgentComplete())
 
282
 
283
  # Run the agent in the background
284
  task = asyncio.create_task(run_agent())
src/cli_textual/agents/prompts.yaml CHANGED
@@ -4,16 +4,39 @@ orchestrators:
4
  manager:
5
  name: "Manager Agent"
6
  system_prompt: |
7
- You are a versatile agentic orchestrator designed to demonstrate multi-step task delegation.
8
- You coordinate specialized sub-agents to fulfill user requests.
9
- To answer a user question, you MUST follow this flow:
10
- 1. Resolve the primary intent or topic.
11
- 2. Validate the parameters or details.
12
- 3. Generate the final structured result.
13
- If you are able to identify a specialized tool use it.
14
- If a tool asks for clarification, relay that to the user.
15
- You also have a tool to ask the user to select from a list if you need to disambiguate anything.
16
- Be polite but very concise.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
17
 
18
  specialists:
19
  intent_resolver:
 
4
  manager:
5
  name: "Manager Agent"
6
  system_prompt: |
7
+ You are a TUI Orchestrator.
8
+
9
+ CRITICAL RULE SELECTION ALWAYS USES THE TOOL:
10
+ If the user expresses ANY selection intent, you MUST call 'ask_user_to_select' IMMEDIATELY
11
+ before writing any response. Trigger phrases include (but are not limited to):
12
+ - "let me select / choose / pick"
13
+ - "I want to choose / select / pick"
14
+ - "help me pick / choose"
15
+ - "first pick / first select / first choose"
16
+ - "you choose for me" / "give me options"
17
+ - any request where the user should decide between a set of options
18
+
19
+ NEVER pick a value on behalf of the user.
20
+ NEVER ask in plain text ("Which color do you prefer?").
21
+ ALWAYS call 'ask_user_to_select' and use the returned value in your response.
22
+
23
+ EXAMPLE:
24
+ User: "Tell me a story about a primary color but first let me select one"
25
+ CORRECT: Call ask_user_to_select(prompt="Choose a primary color:", options=["Red","Blue","Yellow"]),
26
+ then write the story using the user's chosen color.
27
+ WRONG: Picking "red" yourself and writing the story without asking.
28
+
29
+ WORKFLOW TOOLS:
30
+ - 'ask_user_to_select': For ALL choices, menus, and user-driven selections. Call this FIRST.
31
+ - 'bash_exec': Run shell commands or scripts. Output streams to the UI in real time.
32
+ - 'read_file': Read the contents of a local file (supports line ranges).
33
+ - 'web_fetch': HTTP GET a URL and return the response body. Use for REST APIs and web resources.
34
+ - 'call_intent_resolver': To identify subjects.
35
+ - 'call_data_validator': To check details.
36
+ - 'call_result_generator': For final output.
37
+ - 'execute_slash_command': To trigger TUI actions like /clear.
38
+
39
+ Maintain context and be concise.
40
 
41
  specialists:
42
  intent_resolver:
src/cli_textual/agents/specialists.py CHANGED
@@ -12,40 +12,43 @@ from cli_textual.agents.prompt_loader import PROMPTS
12
 
13
  load_dotenv()
14
 
 
 
15
  def get_model():
16
  """Dynamically select model based on environment variables."""
17
- model_name = os.getenv("PYDANTIC_AI_MODEL", "test")
18
-
19
  if model_name.lower() == "test":
20
  return TestModel()
21
 
22
- # Detect Provider/Model Split
23
- provider = "openai"
 
24
  name = model_name
25
  if ":" in model_name:
26
- provider, name = model_name.split(":", 1)
 
 
27
 
28
- # 1. Handle OpenRouter
29
  openrouter_key = os.getenv("OPENROUTER_API_KEY")
30
- if openrouter_key and (provider == "openai" or "anthropic/" in name or "google/" in name):
31
  return OpenAIChatModel(
32
- name,
33
  provider=OpenAIProvider(
34
  base_url="https://openrouter.ai/api/v1",
35
  api_key=openrouter_key
36
  )
37
  )
38
 
39
- # 2. Handle Native Providers
40
- if provider == "openai":
41
- return OpenAIChatModel(name)
42
- elif provider == "anthropic":
43
  return AnthropicModel(name)
44
- elif provider == "gemini" or provider == "google":
45
  return GeminiModel(name)
46
-
47
- # Fallback
48
- return OpenAIChatModel(model_name)
49
 
50
  # Initialize the shared model instance
51
  model = get_model()
 
12
 
13
  load_dotenv()
14
 
15
+ KNOWN_PROVIDER_PREFIXES = {"anthropic", "openai", "gemini", "google"}
16
+
17
  def get_model():
18
  """Dynamically select model based on environment variables."""
19
+ model_name = os.getenv("PYDANTIC_AI_MODEL", "nvidia/nemotron-3-super-120b-a12b:free")
20
+
21
  if model_name.lower() == "test":
22
  return TestModel()
23
 
24
+ # Only split "provider:name" on ":" if the left side is a known single-token provider.
25
+ # This prevents "nvidia/model:free" style OpenRouter IDs from being mis-parsed.
26
+ provider = None
27
  name = model_name
28
  if ":" in model_name:
29
+ left, right = model_name.split(":", 1)
30
+ if left.lower() in KNOWN_PROVIDER_PREFIXES:
31
+ provider, name = left.lower(), right
32
 
33
+ # Route through OpenRouter when key is available and no explicit native provider was parsed
34
  openrouter_key = os.getenv("OPENROUTER_API_KEY")
35
+ if openrouter_key and provider is None:
36
  return OpenAIChatModel(
37
+ model_name,
38
  provider=OpenAIProvider(
39
  base_url="https://openrouter.ai/api/v1",
40
  api_key=openrouter_key
41
  )
42
  )
43
 
44
+ # Native providers
45
+ if provider == "anthropic":
 
 
46
  return AnthropicModel(name)
47
+ if provider == "gemini" or provider == "google":
48
  return GeminiModel(name)
49
+
50
+ # openai: prefix or bare model name (e.g. "gpt-4o")
51
+ return OpenAIChatModel(name if provider else model_name)
52
 
53
  # Initialize the shared model instance
54
  model = get_model()
src/cli_textual/app.py CHANGED
@@ -20,8 +20,8 @@ from cli_textual.core.permissions import PermissionManager
20
  from cli_textual.core.command import CommandManager
21
  from cli_textual.core.dummy_agent import DummyAgent
22
  from cli_textual.core.chat_events import (
23
- ChatEvent, AgentThinking, AgentToolStart, AgentToolEnd, AgentStreamChunk, AgentComplete,
24
- AgentRequiresUserInput
25
  )
26
 
27
  # Pydantic AI Orchestrators
@@ -33,16 +33,6 @@ from cli_textual.ui.widgets.dna_spinner import DNASpinner
33
  from cli_textual.ui.screens.permission_screen import PermissionScreen
34
  from cli_textual.ui.widgets.landing_page import LandingPage
35
 
36
- # Plugin Imports (Simulated auto-discovery for now)
37
- from cli_textual.plugins.commands.ls import ListDirectoryCommand
38
- from cli_textual.plugins.commands.head import HeadCommand
39
- from cli_textual.plugins.commands.clear import ClearCommand
40
- from cli_textual.plugins.commands.load import LoadCommand
41
- from cli_textual.plugins.commands.select import SelectCommand
42
- from cli_textual.plugins.commands.survey import SurveyCommand
43
- from cli_textual.plugins.commands.help import HelpCommand
44
- from cli_textual.plugins.commands.mode import ModeCommand
45
-
46
  class ChatApp(App):
47
  """Refactored ChatApp using modular architecture."""
48
 
@@ -61,8 +51,10 @@ class ChatApp(App):
61
  self.last_ctrl_d_time = 0
62
  self.survey_answers = {}
63
  # Allow setting default mode via environment variable
64
- self.chat_mode = os.getenv("CHAT_MODE", "dummy")
65
  self.message_history = [] # For LLM context memory
 
 
66
 
67
  # Initialize Core Managers
68
  self.workspace_root = Path.cwd().resolve()
@@ -71,18 +63,8 @@ class ChatApp(App):
71
  self.command_manager = CommandManager()
72
  self.agent = DummyAgent()
73
 
74
- # Register Commands
75
- self._init_commands()
76
-
77
- def _init_commands(self):
78
- self.command_manager.register_command(ListDirectoryCommand())
79
- self.command_manager.register_command(HeadCommand())
80
- self.command_manager.register_command(ClearCommand())
81
- self.command_manager.register_command(LoadCommand())
82
- self.command_manager.register_command(SelectCommand())
83
- self.command_manager.register_command(SurveyCommand())
84
- self.command_manager.register_command(HelpCommand())
85
- self.command_manager.register_command(ModeCommand())
86
 
87
  def compose(self) -> ComposeResult:
88
  yield Header(show_clock=True)
@@ -95,6 +77,9 @@ class ChatApp(App):
95
  with Horizontal(id="status-bar"):
96
  yield Label("workspace (/directory)", classes="status-info")
97
  yield Label(f"mode: {self.chat_mode}", classes="status-info mode-info")
 
 
 
98
  yield Label(str(self.workspace_root), classes="path-info")
99
  yield Footer()
100
 
@@ -134,6 +119,9 @@ class ChatApp(App):
134
 
135
  # Resume the agent by pushing the selection into the queue
136
  if hasattr(self, "interactive_input_queue"):
 
 
 
137
  self.interactive_input_queue.put_nowait(selection)
138
 
139
  # Refocus main input
@@ -168,8 +156,6 @@ class ChatApp(App):
168
  await self.process_command(user_input)
169
  else:
170
  # Select orchestrator based on chat_mode
171
- self.interactive_input_queue = asyncio.Queue()
172
-
173
  if self.chat_mode == "procedural":
174
  generator = run_procedural_pipeline(user_input, message_history=self.message_history)
175
  elif self.chat_mode == "manager":
@@ -219,28 +205,43 @@ class ChatApp(App):
219
 
220
  history.scroll_end(animate=False)
221
 
 
 
 
 
 
 
 
222
  elif isinstance(event, AgentToolStart):
223
  task_label.update(f"Running tool: [bold cyan]{event.tool_name}[/]")
 
 
 
 
 
 
 
 
224
 
225
  elif isinstance(event, AgentStreamChunk):
226
  # If we're starting to stream, remove the spinner and create the Markdown widget
227
  if not markdown_widget:
228
- progress.remove()
229
  markdown_widget = Markdown("", classes="ai-msg")
230
- history.mount(markdown_widget)
231
-
232
  full_text += event.text
233
- markdown_widget.update(full_text)
234
  history.scroll_end(animate=False)
235
-
236
  elif isinstance(event, AgentComplete):
237
  # Save new history for context memory
238
  if event.new_history:
239
  self.message_history.extend(event.new_history)
240
-
241
  # If we never got a stream (e.g. only tool calls), remove progress
242
  if "agent-progress" in [c.id for c in history.children]:
243
- progress.remove()
244
  history.scroll_end(animate=False)
245
 
246
  async def process_command(self, cmd_str: str):
 
20
  from cli_textual.core.command import CommandManager
21
  from cli_textual.core.dummy_agent import DummyAgent
22
  from cli_textual.core.chat_events import (
23
+ ChatEvent, AgentThinking, AgentToolStart, AgentToolEnd, AgentToolOutput,
24
+ AgentStreamChunk, AgentComplete, AgentRequiresUserInput, AgentExecuteCommand
25
  )
26
 
27
  # Pydantic AI Orchestrators
 
33
  from cli_textual.ui.screens.permission_screen import PermissionScreen
34
  from cli_textual.ui.widgets.landing_page import LandingPage
35
 
 
 
 
 
 
 
 
 
 
 
36
  class ChatApp(App):
37
  """Refactored ChatApp using modular architecture."""
38
 
 
51
  self.last_ctrl_d_time = 0
52
  self.survey_answers = {}
53
  # Allow setting default mode via environment variable
54
+ self.chat_mode = os.getenv("CHAT_MODE", "manager")
55
  self.message_history = [] # For LLM context memory
56
+ self.interactive_input_queue = asyncio.Queue()
57
+
58
 
59
  # Initialize Core Managers
60
  self.workspace_root = Path.cwd().resolve()
 
63
  self.command_manager = CommandManager()
64
  self.agent = DummyAgent()
65
 
66
+ # Register Commands via Auto-Discovery
67
+ self.command_manager.auto_discover("cli_textual.plugins.commands")
 
 
 
 
 
 
 
 
 
 
68
 
69
  def compose(self) -> ComposeResult:
70
  yield Header(show_clock=True)
 
77
  with Horizontal(id="status-bar"):
78
  yield Label("workspace (/directory)", classes="status-info")
79
  yield Label(f"mode: {self.chat_mode}", classes="status-info mode-info")
80
+ from cli_textual.agents.specialists import model
81
+ model_name = getattr(model, "model_name", "test-mock")
82
+ yield Label(f"model: {model_name}", classes="status-info model-info")
83
  yield Label(str(self.workspace_root), classes="path-info")
84
  yield Footer()
85
 
 
119
 
120
  # Resume the agent by pushing the selection into the queue
121
  if hasattr(self, "interactive_input_queue"):
122
+ # Drain any stale entries (safety measure)
123
+ while not self.interactive_input_queue.empty():
124
+ self.interactive_input_queue.get_nowait()
125
  self.interactive_input_queue.put_nowait(selection)
126
 
127
  # Refocus main input
 
156
  await self.process_command(user_input)
157
  else:
158
  # Select orchestrator based on chat_mode
 
 
159
  if self.chat_mode == "procedural":
160
  generator = run_procedural_pipeline(user_input, message_history=self.message_history)
161
  elif self.chat_mode == "manager":
 
205
 
206
  history.scroll_end(animate=False)
207
 
208
+ elif isinstance(event, AgentExecuteCommand):
209
+ # Proactively execute a TUI command
210
+ full_cmd = event.command_name
211
+ if event.args:
212
+ full_cmd += " " + " ".join(event.args)
213
+ await self.process_command(full_cmd)
214
+
215
  elif isinstance(event, AgentToolStart):
216
  task_label.update(f"Running tool: [bold cyan]{event.tool_name}[/]")
217
+
218
+ elif isinstance(event, AgentToolOutput):
219
+ style_class = "tool-output-error" if event.is_error else "tool-output"
220
+ history.mount(Static(event.content, classes=style_class))
221
+ history.scroll_end(animate=False)
222
+
223
+ elif isinstance(event, AgentToolEnd):
224
+ task_label.update(f"Tool complete: [bold green]{event.tool_name}[/]")
225
 
226
  elif isinstance(event, AgentStreamChunk):
227
  # If we're starting to stream, remove the spinner and create the Markdown widget
228
  if not markdown_widget:
229
+ await progress.remove()
230
  markdown_widget = Markdown("", classes="ai-msg")
231
+ await history.mount(markdown_widget)
232
+
233
  full_text += event.text
234
+ await markdown_widget.update(full_text)
235
  history.scroll_end(animate=False)
236
+
237
  elif isinstance(event, AgentComplete):
238
  # Save new history for context memory
239
  if event.new_history:
240
  self.message_history.extend(event.new_history)
241
+
242
  # If we never got a stream (e.g. only tool calls), remove progress
243
  if "agent-progress" in [c.id for c in history.children]:
244
+ await progress.remove()
245
  history.scroll_end(animate=False)
246
 
247
  async def process_command(self, cmd_str: str):
src/cli_textual/app.tcss CHANGED
@@ -224,3 +224,19 @@ DirectoryTree {
224
  overflow-y: scroll;
225
  border: solid #333333;
226
  }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
224
  overflow-y: scroll;
225
  border: solid #333333;
226
  }
227
+
228
+ .tool-output {
229
+ background: #0D1117;
230
+ color: #C9D1D9;
231
+ border-left: solid #00AAFF;
232
+ padding: 0 1;
233
+ margin: 0 0 1 0;
234
+ }
235
+
236
+ .tool-output-error {
237
+ background: #1A0000;
238
+ color: #FF6B6B;
239
+ border-left: solid #FF4444;
240
+ padding: 0 1;
241
+ margin: 0 0 1 0;
242
+ }
src/cli_textual/core/chat_events.py CHANGED
@@ -20,6 +20,12 @@ class AgentRequiresUserInput(ChatEvent):
20
  prompt: str
21
  options: List[str]
22
 
 
 
 
 
 
 
23
  @dataclass
24
  class AgentThinking(ChatEvent):
25
  """The agent is processing or waiting for a response."""
@@ -37,6 +43,13 @@ class AgentToolEnd(ChatEvent):
37
  tool_name: str
38
  result: str
39
 
 
 
 
 
 
 
 
40
  @dataclass
41
  class AgentStreamChunk(ChatEvent):
42
  """A partial chunk of the final text response."""
 
20
  prompt: str
21
  options: List[str]
22
 
23
+ @dataclass
24
+ class AgentExecuteCommand(ChatEvent):
25
+ """The agent wants to execute a TUI slash command."""
26
+ command_name: str
27
+ args: List[str]
28
+
29
  @dataclass
30
  class AgentThinking(ChatEvent):
31
  """The agent is processing or waiting for a response."""
 
43
  tool_name: str
44
  result: str
45
 
46
+ @dataclass
47
+ class AgentToolOutput(ChatEvent):
48
+ """Streaming output from a running tool (e.g., bash stdout, file contents)."""
49
+ tool_name: str
50
+ content: str
51
+ is_error: bool = False
52
+
53
  @dataclass
54
  class AgentStreamChunk(ChatEvent):
55
  """A partial chunk of the final text response."""
src/cli_textual/core/command.py CHANGED
@@ -1,42 +1,67 @@
 
 
 
1
  from abc import ABC, abstractmethod
2
- from typing import List
3
 
4
  class SlashCommand(ABC):
5
  """Base class for all slash commands."""
6
-
7
  @property
8
  @abstractmethod
9
  def name(self) -> str:
10
- """The command string (e.g., /ls)."""
11
  pass
12
 
13
  @property
14
  @abstractmethod
15
  def description(self) -> str:
16
- """Help text for the command."""
17
  pass
18
 
19
  @property
20
  def requires_permission(self) -> bool:
21
- """Whether this command needs explicit user authorization."""
22
  return False
23
 
24
  @abstractmethod
25
  async def execute(self, app, args: List[str]):
26
- """The implementation of the command."""
27
  pass
28
 
29
  class CommandManager:
30
- """Registry and orchestrator for slash commands."""
31
-
32
  def __init__(self):
33
- self.commands = {}
34
 
35
  def register_command(self, cmd: SlashCommand):
36
- self.commands[cmd.name] = cmd
 
 
 
 
 
 
 
 
 
 
 
 
37
 
38
- def get_command(self, name: str) -> SlashCommand | None:
39
- return self.commands.get(name)
 
 
 
 
 
 
 
 
 
 
40
 
41
  def get_all_help(self) -> str:
42
  help_text = "### Commands\n"
@@ -44,3 +69,4 @@ class CommandManager:
44
  cmd = self.commands[name]
45
  help_text += f"- {name.ljust(15)} {cmd.description}\n"
46
  return help_text
 
 
1
+ import importlib
2
+ import pkgutil
3
+ import inspect
4
  from abc import ABC, abstractmethod
5
+ from typing import List, Dict, Type
6
 
7
  class SlashCommand(ABC):
8
  """Base class for all slash commands."""
9
+
10
  @property
11
  @abstractmethod
12
  def name(self) -> str:
13
+ """The command string, e.g., '/help'."""
14
  pass
15
 
16
  @property
17
  @abstractmethod
18
  def description(self) -> str:
19
+ """Brief summary of what the command does."""
20
  pass
21
 
22
  @property
23
  def requires_permission(self) -> bool:
24
+ """True if this command needs explicit user approval before running."""
25
  return False
26
 
27
  @abstractmethod
28
  async def execute(self, app, args: List[str]):
29
+ """The logic to run when the command is invoked."""
30
  pass
31
 
32
  class CommandManager:
33
+ """Registry and executor for slash commands."""
34
+
35
  def __init__(self):
36
+ self.commands: Dict[str, SlashCommand] = {}
37
 
38
  def register_command(self, cmd: SlashCommand):
39
+ """Manually register a command instance."""
40
+ self.commands[cmd.name.lower()] = cmd
41
+
42
+ def auto_discover(self, package_path: str):
43
+ """
44
+ Dynamically discover and register SlashCommand classes in a package.
45
+ e.g., auto_discover('cli_textual.plugins.commands')
46
+ """
47
+ try:
48
+ package = importlib.import_module(package_path)
49
+ for _, name, is_pkg in pkgutil.iter_modules(package.__path__):
50
+ full_module_name = f"{package_path}.{name}"
51
+ module = importlib.import_module(full_module_name)
52
 
53
+ for _, obj in inspect.getmembers(module):
54
+ if (inspect.isclass(obj) and
55
+ issubclass(obj, SlashCommand) and
56
+ obj is not SlashCommand):
57
+ # Instantiate and register
58
+ instance = obj()
59
+ self.register_command(instance)
60
+ except Exception as e:
61
+ print(f"Error during command discovery: {e}")
62
+
63
+ def get_command(self, name: str) -> SlashCommand:
64
+ return self.commands.get(name.lower())
65
 
66
  def get_all_help(self) -> str:
67
  help_text = "### Commands\n"
 
69
  cmd = self.commands[name]
70
  help_text += f"- {name.ljust(15)} {cmd.description}\n"
71
  return help_text
72
+
src/cli_textual/plugins/commands/tools.py ADDED
@@ -0,0 +1,66 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import List
2
+ from textual import on
3
+ from textual.app import ComposeResult
4
+ from textual.widgets import Label, OptionList, Static
5
+ from textual.widget import Widget
6
+ from cli_textual.core.command import SlashCommand
7
+ from cli_textual.agents.orchestrators import manager_agent
8
+
9
+
10
+ def _first_line(text: str) -> str:
11
+ """Return the first non-empty line of a docstring."""
12
+ for line in (text or "").splitlines():
13
+ stripped = line.strip()
14
+ if stripped:
15
+ return stripped
16
+ return ""
17
+
18
+
19
+ class ToolsWidget(Widget):
20
+ """Self-contained widget: shows tool list, then full description on selection."""
21
+
22
+ DEFAULT_CSS = """
23
+ ToolsWidget {
24
+ height: auto;
25
+ padding: 0 1;
26
+ }
27
+ ToolsWidget .tool-detail {
28
+ padding: 1;
29
+ color: $text-muted;
30
+ }
31
+ """
32
+
33
+ def compose(self) -> ComposeResult:
34
+ yield Label("Agent tools (Enter to inspect, Esc to close)")
35
+ tools = manager_agent._function_toolset.tools
36
+ items = [
37
+ f"{name:<22} {_first_line(tool.description)}"
38
+ for name, tool in tools.items()
39
+ ]
40
+ yield OptionList(*items, id="tools-option-list")
41
+
42
+ @on(OptionList.OptionSelected, "#tools-option-list")
43
+ def show_detail(self, event: OptionList.OptionSelected) -> None:
44
+ tool_name = str(event.option.prompt).split()[0]
45
+ tools = manager_agent._function_toolset.tools
46
+ tool = tools.get(tool_name)
47
+ description = tool.description if tool else "(no description)"
48
+
49
+ self.query("*").remove()
50
+ self.mount(Label(f"[bold]{tool_name}[/bold] (Esc to close)"))
51
+ self.mount(Static(description, classes="tool-detail"))
52
+
53
+
54
+ class ToolsCommand(SlashCommand):
55
+ name = "/tools"
56
+ description = "List available agent tools"
57
+
58
+ async def execute(self, app, args: List[str]):
59
+ container = app.query_one("#interaction-container")
60
+ container.add_class("visible")
61
+ container.query("*").remove()
62
+ widget = ToolsWidget()
63
+ container.mount(widget)
64
+ app.call_after_refresh(
65
+ lambda: widget.query_one("#tools-option-list").focus()
66
+ )
tests/integration/test_interactive_agents.py CHANGED
@@ -11,8 +11,8 @@ async def test_manager_interactive_mock_backend():
11
  """Test the manager pipeline using a mock TestModel that forces a tool call."""
12
  input_queue = asyncio.Queue()
13
 
14
- # We force the TestModel to call the ask_user_to_select_manager tool before finishing
15
- mock_model = TestModel(call_tools=['ask_user_to_select_manager'])
16
 
17
  events = []
18
  with manager_agent.override(model=mock_model):
@@ -46,13 +46,14 @@ async def test_manager_integration_backend():
46
 
47
  events = []
48
  with manager_agent.override(model=real_model):
49
- # We prompt the real LLM to explicitly use the tool
50
- prompt = "Ask me what my favorite color is using your select tool, then write a funny sentence about it."
51
  pipeline = run_manager_pipeline(prompt, input_queue)
52
 
53
  async for event in pipeline:
54
  events.append(event)
55
  if isinstance(event, AgentRequiresUserInput):
 
56
  await input_queue.put("Neon Pink")
57
 
58
  # Verify the LLM called the tool
@@ -77,17 +78,17 @@ async def test_manager_multi_turn_memory():
77
 
78
  history = []
79
 
80
- # Turn 1: Tell the agent something
81
  with manager_agent.override(model=real_model):
82
- async for event in run_manager_pipeline("My secret password is 'BANANA'. Remember it.", input_queue, history):
83
  if isinstance(event, AgentComplete):
84
  history.extend(event.new_history)
85
 
86
  # Turn 2: Ask the agent to recall it
87
  events = []
88
  with manager_agent.override(model=real_model):
89
- async for event in run_manager_pipeline("What was my secret password?", input_queue, history):
90
  events.append(event)
91
 
92
  full_text = "".join([e.text for e in events if isinstance(e, AgentStreamChunk)])
93
- assert "BANANA" in full_text.upper(), f"The LLM forgot the secret. Output was: {full_text}"
 
11
  """Test the manager pipeline using a mock TestModel that forces a tool call."""
12
  input_queue = asyncio.Queue()
13
 
14
+ # We force the TestModel to call the ask_user_to_select tool before finishing
15
+ mock_model = TestModel(call_tools=['ask_user_to_select'])
16
 
17
  events = []
18
  with manager_agent.override(model=mock_model):
 
46
 
47
  events = []
48
  with manager_agent.override(model=real_model):
49
+ # Natural prompt the system prompt and tool description should be compelling enough
50
+ prompt = "Tell me a story about a primary color but first let me select a color"
51
  pipeline = run_manager_pipeline(prompt, input_queue)
52
 
53
  async for event in pipeline:
54
  events.append(event)
55
  if isinstance(event, AgentRequiresUserInput):
56
+ # The LLM successfully paused and invoked the TUI tool!
57
  await input_queue.put("Neon Pink")
58
 
59
  # Verify the LLM called the tool
 
78
 
79
  history = []
80
 
81
+ # Turn 1: Tell the agent something non-sensitive
82
  with manager_agent.override(model=real_model):
83
+ async for event in run_manager_pipeline("My favorite fruit is 'MANGO'. Remember it.", input_queue, history):
84
  if isinstance(event, AgentComplete):
85
  history.extend(event.new_history)
86
 
87
  # Turn 2: Ask the agent to recall it
88
  events = []
89
  with manager_agent.override(model=real_model):
90
+ async for event in run_manager_pipeline("What was my favorite fruit?", input_queue, history):
91
  events.append(event)
92
 
93
  full_text = "".join([e.text for e in events if isinstance(e, AgentStreamChunk)])
94
+ assert "MANGO" in full_text.upper(), f"The LLM forgot the fruit. Output was: {full_text}"
tests/integration/test_tool_agents.py ADDED
@@ -0,0 +1,131 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Integration tests for native agent tools using a real LLM via OpenRouter.
2
+
3
+ All tests are skipped unless OPENROUTER_API_KEY is set. They exercise the full
4
+ run_manager_pipeline path — real model, real tool execution, real event stream —
5
+ and assert that the LLM correctly invokes tools and incorporates their output.
6
+ """
7
+ import os
8
+ import asyncio
9
+ import pytest
10
+
11
+ from cli_textual.agents.orchestrators import manager_agent, run_manager_pipeline
12
+ from cli_textual.core.chat_events import (
13
+ AgentToolStart, AgentToolOutput, AgentStreamChunk, AgentComplete,
14
+ AgentRequiresUserInput,
15
+ )
16
+
17
+ # ---------------------------------------------------------------------------
18
+ # Helpers
19
+ # ---------------------------------------------------------------------------
20
+
21
+ SKIP_NO_KEY = pytest.mark.skipif(
22
+ not os.getenv("OPENROUTER_API_KEY"),
23
+ reason="OPENROUTER_API_KEY required for integration tests",
24
+ )
25
+
26
+
27
+ async def collect_pipeline(pipeline, input_queue, auto_respond=None) -> list:
28
+ """Drain pipeline events, optionally responding to AgentRequiresUserInput."""
29
+ events = []
30
+ async for event in pipeline:
31
+ events.append(event)
32
+ if isinstance(event, AgentRequiresUserInput) and auto_respond is not None:
33
+ await input_queue.put(auto_respond)
34
+ return events
35
+
36
+
37
+ def text_from(events) -> str:
38
+ return "".join(e.text for e in events if isinstance(e, AgentStreamChunk))
39
+
40
+
41
+ def tool_started(events, name: str) -> bool:
42
+ return any(isinstance(e, AgentToolStart) and e.tool_name == name for e in events)
43
+
44
+
45
+ def tool_output_contains(events, name: str, substring: str) -> bool:
46
+ return any(
47
+ isinstance(e, AgentToolOutput) and e.tool_name == name and substring in e.content
48
+ for e in events
49
+ )
50
+
51
+
52
+ # ---------------------------------------------------------------------------
53
+ # Tests
54
+ # ---------------------------------------------------------------------------
55
+
56
+ @SKIP_NO_KEY
57
+ @pytest.mark.asyncio
58
+ async def test_bash_exec_e2e():
59
+ """LLM should call bash_exec and incorporate the output in its response."""
60
+ input_queue = asyncio.Queue()
61
+ pipeline = run_manager_pipeline(
62
+ "Run the shell command 'echo hello world' and tell me what it outputs.",
63
+ input_queue,
64
+ )
65
+ events = await collect_pipeline(pipeline, input_queue)
66
+
67
+ assert tool_started(events, "bash_exec"), "Expected bash_exec tool call"
68
+ assert tool_output_contains(events, "bash_exec", "hello world"), \
69
+ "Expected 'hello world' in bash_exec output"
70
+ full_text = text_from(events)
71
+ assert "hello world" in full_text.lower(), \
72
+ f"LLM response did not mention 'hello world'. Got: {full_text[:300]}"
73
+ assert isinstance(events[-1], AgentComplete)
74
+
75
+
76
+ @SKIP_NO_KEY
77
+ @pytest.mark.asyncio
78
+ async def test_read_file_e2e():
79
+ """LLM should call read_file when asked to inspect a file."""
80
+ input_queue = asyncio.Queue()
81
+ pipeline = run_manager_pipeline(
82
+ "Read the file src/cli_textual/core/chat_events.py and tell me what events are defined.",
83
+ input_queue,
84
+ )
85
+ events = await collect_pipeline(pipeline, input_queue)
86
+
87
+ assert tool_started(events, "read_file"), "Expected read_file tool call"
88
+ full_text = text_from(events)
89
+ # The file contains ChatEvent / AgentComplete — the LLM should mention at least one
90
+ assert any(keyword in full_text for keyword in ["ChatEvent", "AgentComplete", "event"]), \
91
+ f"LLM response didn't mention events. Got: {full_text[:300]}"
92
+ assert isinstance(events[-1], AgentComplete)
93
+
94
+
95
+ @SKIP_NO_KEY
96
+ @pytest.mark.asyncio
97
+ async def test_web_fetch_e2e():
98
+ """LLM should call web_fetch when asked to retrieve a URL."""
99
+ input_queue = asyncio.Queue()
100
+ pipeline = run_manager_pipeline(
101
+ "Fetch the URL https://httpbin.org/json and tell me what the JSON contains.",
102
+ input_queue,
103
+ )
104
+ events = await collect_pipeline(pipeline, input_queue)
105
+
106
+ assert tool_started(events, "web_fetch"), "Expected web_fetch tool call"
107
+ # httpbin.org/json returns {"slideshow": ...}
108
+ assert tool_output_contains(events, "web_fetch", "slideshow") or \
109
+ tool_output_contains(events, "web_fetch", "200"), \
110
+ "Expected HTTP response in web_fetch output"
111
+ assert isinstance(events[-1], AgentComplete)
112
+
113
+
114
+ @SKIP_NO_KEY
115
+ @pytest.mark.asyncio
116
+ async def test_select_then_bash_e2e():
117
+ """LLM should use ask_user_to_select first, then run bash_exec with the chosen command."""
118
+ input_queue = asyncio.Queue()
119
+ pipeline = run_manager_pipeline(
120
+ "Let me pick a shell command from a list, then run it and show me the output.",
121
+ input_queue,
122
+ )
123
+ events = await collect_pipeline(pipeline, input_queue, auto_respond="echo chosen_value")
124
+
125
+ assert any(isinstance(e, AgentRequiresUserInput) for e in events), \
126
+ "Expected a selection prompt"
127
+ assert tool_started(events, "bash_exec"), \
128
+ "Expected bash_exec to be called after selection"
129
+ full_text = text_from(events)
130
+ assert full_text, "Expected some text response"
131
+ assert isinstance(events[-1], AgentComplete)
tests/integration/test_ui_full_agent_flow.py ADDED
@@ -0,0 +1,60 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import pytest
2
+ import asyncio
3
+ from textual.widgets import OptionList, Markdown, Label
4
+ from pydantic_ai.models.test import TestModel
5
+
6
+ from cli_textual.app import ChatApp
7
+ from cli_textual.agents.orchestrators import manager_agent
8
+
9
+ @pytest.mark.asyncio
10
+ async def test_full_ui_interaction_round_trip():
11
+ """
12
+ Verify the full loop:
13
+ 1. User sends message
14
+ 2. Agent triggers selection UI
15
+ 3. User makes selection
16
+ 4. Agent completes with final output
17
+ """
18
+ app = ChatApp()
19
+ app.chat_mode = "manager"
20
+
21
+ # We force the TestModel to call the selection tool
22
+ mock_model = TestModel(call_tools=['ask_user_to_select'])
23
+
24
+ async with app.run_test() as pilot:
25
+ with manager_agent.override(model=mock_model):
26
+ # 1. Type message and submit
27
+ await pilot.press(*"tell me a story about a color", "enter")
28
+
29
+ # 2. Wait for the interaction container to become visible
30
+ # We use a loop to poll since agent responses are async
31
+ for _ in range(20):
32
+ interaction = app.query_one("#interaction-container")
33
+ if interaction.has_class("visible") and app.query("OptionList#agent-select-tool"):
34
+ break
35
+ await pilot.pause(0.1)
36
+ else:
37
+ pytest.fail("Interaction UI never appeared")
38
+
39
+ # 3. Verify the selection list has options
40
+ option_list = app.query_one("#agent-select-tool", OptionList)
41
+ assert option_list.option_count > 0
42
+
43
+ # 4. Select the first option (Red) and press enter
44
+ await pilot.press("enter")
45
+
46
+ # 5. Wait for the agent to finish and the interaction UI to close
47
+ for _ in range(20):
48
+ if not interaction.has_class("visible"):
49
+ break
50
+ await pilot.pause(0.1)
51
+ else:
52
+ pytest.fail("Interaction UI never closed after selection")
53
+
54
+ # 6. Verify the final AI message was mounted in history
55
+ history = app.query_one("#history-container")
56
+ # The TestModel response usually contains the tool call result or mock text
57
+ assert len(history.query(Markdown)) >= 1
58
+
59
+ # Final check that focus returned to main input
60
+ assert app.focused.id == "main-input"
tests/unit/test_agent_tools.py ADDED
@@ -0,0 +1,222 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Unit tests for the native manager_agent tools (bash_exec, read_file, web_fetch).
2
+
3
+ Tools are called directly — the @agent.tool decorator registers them but returns
4
+ the original function unchanged, so they can be invoked as plain async functions.
5
+ A minimal mock RunContext carrying real asyncio.Queues stands in for the live
6
+ pydantic-ai context.
7
+ """
8
+ import asyncio
9
+ import tempfile
10
+ import os
11
+ import pytest
12
+ from unittest.mock import MagicMock, patch, AsyncMock
13
+
14
+ from cli_textual.core.chat_events import (
15
+ ChatDeps, AgentToolStart, AgentToolEnd, AgentToolOutput,
16
+ )
17
+ from cli_textual.agents.orchestrators import bash_exec, read_file, web_fetch
18
+
19
+
20
+ # ---------------------------------------------------------------------------
21
+ # Helpers
22
+ # ---------------------------------------------------------------------------
23
+
24
+ def make_ctx() -> tuple:
25
+ """Return (ctx, event_queue) backed by real asyncio.Queues."""
26
+ event_queue: asyncio.Queue = asyncio.Queue()
27
+ input_queue: asyncio.Queue = asyncio.Queue()
28
+ deps = ChatDeps(event_queue=event_queue, input_queue=input_queue)
29
+ ctx = MagicMock()
30
+ ctx.deps = deps
31
+ return ctx, event_queue
32
+
33
+
34
+ async def drain(q: asyncio.Queue) -> list:
35
+ """Return all items currently in the queue without blocking."""
36
+ items = []
37
+ while not q.empty():
38
+ items.append(q.get_nowait())
39
+ return items
40
+
41
+
42
+ # ---------------------------------------------------------------------------
43
+ # bash_exec
44
+ # ---------------------------------------------------------------------------
45
+
46
+ @pytest.mark.asyncio
47
+ async def test_bash_exec_captures_output():
48
+ ctx, event_queue = make_ctx()
49
+ result = await bash_exec(ctx, command="echo hello")
50
+ assert "hello" in result
51
+ assert "Exit code: 0" in result
52
+
53
+
54
+ @pytest.mark.asyncio
55
+ async def test_bash_exec_emits_lifecycle_events():
56
+ ctx, event_queue = make_ctx()
57
+ await bash_exec(ctx, command="echo lifecycle")
58
+ events = await drain(event_queue)
59
+
60
+ types = [type(e) for e in events]
61
+ assert AgentToolStart in types
62
+ assert AgentToolOutput in types
63
+ assert AgentToolEnd in types
64
+
65
+ # Order must be Start → Output → End
66
+ start_idx = next(i for i, e in enumerate(events) if isinstance(e, AgentToolStart))
67
+ output_idx = next(i for i, e in enumerate(events) if isinstance(e, AgentToolOutput))
68
+ end_idx = next(i for i, e in enumerate(events) if isinstance(e, AgentToolEnd))
69
+ assert start_idx < output_idx < end_idx
70
+
71
+
72
+ @pytest.mark.asyncio
73
+ async def test_bash_exec_output_event_contains_text():
74
+ ctx, event_queue = make_ctx()
75
+ await bash_exec(ctx, command="echo unique_marker_xyz")
76
+ events = await drain(event_queue)
77
+ output_events = [e for e in events if isinstance(e, AgentToolOutput)]
78
+ combined = "".join(e.content for e in output_events)
79
+ assert "unique_marker_xyz" in combined
80
+
81
+
82
+ @pytest.mark.asyncio
83
+ async def test_bash_exec_nonzero_exit_code():
84
+ ctx, _ = make_ctx()
85
+ result = await bash_exec(ctx, command="sh -c 'exit 42'")
86
+ assert "42" in result
87
+
88
+
89
+ @pytest.mark.asyncio
90
+ async def test_bash_exec_invalid_command_does_not_raise():
91
+ ctx, _ = make_ctx()
92
+ # A command that doesn't exist — should return a non-empty string, not raise
93
+ result = await bash_exec(ctx, command="__nonexistent_command_xyz__")
94
+ assert result
95
+
96
+
97
+ # ---------------------------------------------------------------------------
98
+ # read_file
99
+ # ---------------------------------------------------------------------------
100
+
101
+ @pytest.mark.asyncio
102
+ async def test_read_file_returns_contents():
103
+ ctx, _ = make_ctx()
104
+ with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) as f:
105
+ f.write("line one\nline two\nline three\n")
106
+ tmp_path = f.name
107
+ try:
108
+ result = await read_file(ctx, path=tmp_path)
109
+ assert "line one" in result
110
+ assert "line two" in result
111
+ assert "line three" in result
112
+ finally:
113
+ os.unlink(tmp_path)
114
+
115
+
116
+ @pytest.mark.asyncio
117
+ async def test_read_file_line_range():
118
+ ctx, _ = make_ctx()
119
+ with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) as f:
120
+ f.write("alpha\nbeta\ngamma\ndelta\n")
121
+ tmp_path = f.name
122
+ try:
123
+ result = await read_file(ctx, path=tmp_path, start_line=2, end_line=3)
124
+ assert "beta" in result
125
+ assert "gamma" in result
126
+ assert "alpha" not in result
127
+ assert "delta" not in result
128
+ finally:
129
+ os.unlink(tmp_path)
130
+
131
+
132
+ @pytest.mark.asyncio
133
+ async def test_read_file_emits_lifecycle_events():
134
+ ctx, event_queue = make_ctx()
135
+ with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) as f:
136
+ f.write("content")
137
+ tmp_path = f.name
138
+ try:
139
+ await read_file(ctx, path=tmp_path)
140
+ events = await drain(event_queue)
141
+ types = [type(e) for e in events]
142
+ assert AgentToolStart in types
143
+ assert AgentToolOutput in types
144
+ assert AgentToolEnd in types
145
+ finally:
146
+ os.unlink(tmp_path)
147
+
148
+
149
+ @pytest.mark.asyncio
150
+ async def test_read_file_missing_returns_error_string():
151
+ ctx, event_queue = make_ctx()
152
+ result = await read_file(ctx, path="/nonexistent/path/file_xyz.txt")
153
+ assert "error" in result.lower() or "Error" in result
154
+ # Must also emit an error output event
155
+ events = await drain(event_queue)
156
+ error_events = [e for e in events if isinstance(e, AgentToolOutput) and e.is_error]
157
+ assert error_events
158
+
159
+
160
+ # ---------------------------------------------------------------------------
161
+ # web_fetch
162
+ # ---------------------------------------------------------------------------
163
+
164
+ @pytest.mark.asyncio
165
+ async def test_web_fetch_returns_body():
166
+ ctx, _ = make_ctx()
167
+
168
+ mock_response = MagicMock()
169
+ mock_response.status_code = 200
170
+ mock_response.text = '{"key": "value"}'
171
+
172
+ mock_client = AsyncMock()
173
+ mock_client.get = AsyncMock(return_value=mock_response)
174
+ mock_client.__aenter__ = AsyncMock(return_value=mock_client)
175
+ mock_client.__aexit__ = AsyncMock(return_value=None)
176
+
177
+ with patch("cli_textual.agents.orchestrators.httpx.AsyncClient", return_value=mock_client):
178
+ result = await web_fetch(ctx, url="https://example.com/api")
179
+
180
+ assert "200" in result
181
+ assert "value" in result
182
+
183
+
184
+ @pytest.mark.asyncio
185
+ async def test_web_fetch_emits_lifecycle_events():
186
+ ctx, event_queue = make_ctx()
187
+
188
+ mock_response = MagicMock()
189
+ mock_response.status_code = 200
190
+ mock_response.text = "body content"
191
+
192
+ mock_client = AsyncMock()
193
+ mock_client.get = AsyncMock(return_value=mock_response)
194
+ mock_client.__aenter__ = AsyncMock(return_value=mock_client)
195
+ mock_client.__aexit__ = AsyncMock(return_value=None)
196
+
197
+ with patch("cli_textual.agents.orchestrators.httpx.AsyncClient", return_value=mock_client):
198
+ await web_fetch(ctx, url="https://example.com")
199
+
200
+ events = await drain(event_queue)
201
+ types = [type(e) for e in events]
202
+ assert AgentToolStart in types
203
+ assert AgentToolOutput in types
204
+ assert AgentToolEnd in types
205
+
206
+
207
+ @pytest.mark.asyncio
208
+ async def test_web_fetch_network_error_returns_error_string():
209
+ ctx, event_queue = make_ctx()
210
+
211
+ mock_client = AsyncMock()
212
+ mock_client.get = AsyncMock(side_effect=Exception("connection refused"))
213
+ mock_client.__aenter__ = AsyncMock(return_value=mock_client)
214
+ mock_client.__aexit__ = AsyncMock(return_value=None)
215
+
216
+ with patch("cli_textual.agents.orchestrators.httpx.AsyncClient", return_value=mock_client):
217
+ result = await web_fetch(ctx, url="https://unreachable.example")
218
+
219
+ assert "error" in result.lower() or "Error" in result
220
+ events = await drain(event_queue)
221
+ error_events = [e for e in events if isinstance(e, AgentToolOutput) and e.is_error]
222
+ assert error_events
tests/unit/test_chat_ux.py CHANGED
@@ -1,13 +1,17 @@
1
  import pytest
 
 
2
  from textual.widgets import Markdown, Static, Label
3
  from cli_textual.app import ChatApp
4
  from cli_textual.core.dummy_agent import DummyAgent
5
  from cli_textual.core.chat_events import AgentThinking, AgentComplete
 
6
 
7
  @pytest.mark.asyncio
8
  async def test_chat_agent_loop():
9
  """Verify the full agent interaction loop: Thinking -> Tool -> Stream."""
10
  app = ChatApp()
 
11
  # Inject dummy agent for predictable testing
12
  app.agent = DummyAgent()
13
 
@@ -40,3 +44,49 @@ async def test_chat_agent_loop():
40
  await pilot.pause(2.0)
41
  assert len(app.query(".agent-spinner")) == 0
42
  assert "How can I help" in getattr(ai_msg, "_markdown", "")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import pytest
2
+ from pydantic_ai.models.function import FunctionModel, AgentInfo
3
+ from pydantic_ai.messages import ModelMessage
4
  from textual.widgets import Markdown, Static, Label
5
  from cli_textual.app import ChatApp
6
  from cli_textual.core.dummy_agent import DummyAgent
7
  from cli_textual.core.chat_events import AgentThinking, AgentComplete
8
+ from cli_textual.agents.orchestrators import manager_agent
9
 
10
  @pytest.mark.asyncio
11
  async def test_chat_agent_loop():
12
  """Verify the full agent interaction loop: Thinking -> Tool -> Stream."""
13
  app = ChatApp()
14
+ app.chat_mode = "dummy"
15
  # Inject dummy agent for predictable testing
16
  app.agent = DummyAgent()
17
 
 
44
  await pilot.pause(2.0)
45
  assert len(app.query(".agent-spinner")) == 0
46
  assert "How can I help" in getattr(ai_msg, "_markdown", "")
47
+
48
+
49
+ @pytest.mark.asyncio
50
+ async def test_manager_response_renders_in_tui():
51
+ """Verify that the manager pipeline response actually appears rendered in the UI.
52
+
53
+ This test specifically guards against the Markdown.update() await bug:
54
+ - Markdown._markdown (set synchronously) would be non-empty even without await
55
+ - But Markdown's child MarkdownBlock widgets are only created in the async part
56
+ - Without `await markdown_widget.update(...)`, children are never mounted
57
+ and the widget renders as a blank Blank object despite _markdown being set.
58
+ """
59
+ RESPONSE_TEXT = "Sentinel response from the deterministic test model."
60
+
61
+ async def fixed_response(messages: list[ModelMessage], agent_info: AgentInfo):
62
+ yield RESPONSE_TEXT
63
+
64
+ app = ChatApp()
65
+ app.chat_mode = "manager"
66
+
67
+ with manager_agent.override(model=FunctionModel(stream_function=fixed_response)):
68
+ async with app.run_test(size=(120, 40)) as pilot:
69
+ await pilot.press(*"hello", "enter")
70
+ await pilot.pause(2.0)
71
+
72
+ history = app.query_one("#history-container")
73
+ ai_widgets = list(history.query(".ai-msg"))
74
+ assert ai_widgets, "No .ai-msg widget found — response was never rendered"
75
+
76
+ md_widget = ai_widgets[-1]
77
+ assert isinstance(md_widget, Markdown), \
78
+ f"Expected Markdown widget, got {type(md_widget).__name__}"
79
+
80
+ # _markdown is set synchronously — this alone does NOT prove rendering worked
81
+ content = getattr(md_widget, "_markdown", "")
82
+ assert RESPONSE_TEXT in content, \
83
+ f"Response text missing from _markdown. Got: {repr(content)}"
84
+
85
+ # MarkdownBlock children are only created by the async part of update().
86
+ # If update() was not awaited, this list will be empty and the widget
87
+ # displays as blank despite _markdown being set.
88
+ child_blocks = list(md_widget.query("*"))
89
+ assert child_blocks, (
90
+ "Markdown widget has no rendered child blocks. "
91
+ "This means update() was called without await — the widget appears blank to the user."
92
+ )
tests/unit/test_manager_interaction.py ADDED
@@ -0,0 +1,141 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+ import asyncio
3
+ import pytest
4
+ from pydantic_ai.models.test import TestModel
5
+ from pydantic_ai.models.function import FunctionModel, AgentInfo, DeltaToolCall
6
+ from pydantic_ai.messages import ModelMessage, ModelRequest, ToolReturnPart
7
+
8
+ from cli_textual.agents.orchestrators import manager_agent, run_manager_pipeline
9
+ from cli_textual.core.chat_events import AgentRequiresUserInput, AgentStreamChunk, AgentComplete, AgentThinking
10
+
11
+
12
+ # ---------------------------------------------------------------------------
13
+ # Helpers
14
+ # ---------------------------------------------------------------------------
15
+
16
+ async def _collect_pipeline(pipeline, input_queue, auto_respond=None):
17
+ """Drain a pipeline, optionally responding to any AgentRequiresUserInput."""
18
+ events = []
19
+ async for event in pipeline:
20
+ events.append(event)
21
+ if isinstance(event, AgentRequiresUserInput) and auto_respond is not None:
22
+ await input_queue.put(auto_respond)
23
+ return events
24
+
25
+
26
+ def _has_tool_return(messages: list[ModelMessage]) -> bool:
27
+ return any(
28
+ isinstance(msg, ModelRequest) and any(isinstance(p, ToolReturnPart) for p in msg.parts)
29
+ for msg in messages
30
+ )
31
+
32
+
33
+ # ---------------------------------------------------------------------------
34
+ # Stream functions for FunctionModel
35
+ # ---------------------------------------------------------------------------
36
+
37
+ async def text_only_stream(messages: list[ModelMessage], agent_info: AgentInfo):
38
+ """Simulates an LLM that ignores the tool and just writes text."""
39
+ yield "Once upon a time, there was a red sunset that painted the sky crimson."
40
+
41
+
42
+ async def select_then_text_stream(messages: list[ModelMessage], agent_info: AgentInfo):
43
+ """Simulates an LLM that correctly calls ask_user_to_select first, then responds."""
44
+ if _has_tool_return(messages):
45
+ # Second call after the tool returned: write the story
46
+ yield "Here is your story about the chosen color!"
47
+ else:
48
+ # First call: issue the tool call
49
+ yield {
50
+ 0: DeltaToolCall(
51
+ name="ask_user_to_select",
52
+ json_args=json.dumps({
53
+ "prompt": "Choose a primary color:",
54
+ "options": ["Red", "Blue", "Yellow"],
55
+ }),
56
+ )
57
+ }
58
+
59
+
60
+ # ---------------------------------------------------------------------------
61
+ # Tests
62
+ # ---------------------------------------------------------------------------
63
+
64
+ @pytest.mark.asyncio
65
+ async def test_pipeline_plumbing_with_forced_tool_call():
66
+ """Verify the pipeline infrastructure works when the tool IS called.
67
+
68
+ Uses TestModel(call_tools=[...]) to force tool invocation — this tests
69
+ the event-queue / input-queue bridge, not LLM prompt quality.
70
+ """
71
+ input_queue = asyncio.Queue()
72
+ mock_model = TestModel(call_tools=["ask_user_to_select"])
73
+
74
+ with manager_agent.override(model=mock_model):
75
+ pipeline = run_manager_pipeline(
76
+ "Tell me a story about a primary color but first let me select a color",
77
+ input_queue,
78
+ )
79
+ events = await _collect_pipeline(pipeline, input_queue, auto_respond="Blue")
80
+
81
+ assert any(isinstance(e, AgentRequiresUserInput) for e in events)
82
+ req = next(e for e in events if isinstance(e, AgentRequiresUserInput))
83
+ assert req.tool_name == "/select"
84
+ assert any(isinstance(e, AgentStreamChunk) for e in events)
85
+ assert isinstance(events[-1], AgentComplete)
86
+
87
+
88
+ @pytest.mark.asyncio
89
+ async def test_pipeline_text_only_emits_no_user_input_event():
90
+ """Document what happens when the LLM returns text without calling the tool.
91
+
92
+ This test captures the BROKEN behavior: if the LLM ignores ask_user_to_select,
93
+ no AgentRequiresUserInput event is emitted and the user never gets a choice.
94
+ A passing test here means the pipeline handles this gracefully (no crash),
95
+ but the user experience is wrong — the LLM should always call the tool.
96
+ """
97
+ input_queue = asyncio.Queue()
98
+ text_only_model = FunctionModel(stream_function=text_only_stream)
99
+
100
+ with manager_agent.override(model=text_only_model):
101
+ pipeline = run_manager_pipeline(
102
+ "Tell me a story about a primary color but first let me select a color",
103
+ input_queue,
104
+ )
105
+ events = await _collect_pipeline(pipeline, input_queue)
106
+
107
+ # No selection event — the LLM skipped the tool
108
+ assert not any(isinstance(e, AgentRequiresUserInput) for e in events)
109
+ # But we still get text and a clean completion
110
+ assert any(isinstance(e, AgentStreamChunk) for e in events)
111
+ assert isinstance(events[-1], AgentComplete)
112
+
113
+
114
+ @pytest.mark.asyncio
115
+ async def test_pipeline_with_function_model_select_then_respond():
116
+ """Verify the full selection flow using a FunctionModel that mimics correct LLM behavior.
117
+
118
+ This tests the same pipeline path a real LLM takes when it respects the
119
+ system prompt and calls ask_user_to_select before writing a response.
120
+ """
121
+ input_queue = asyncio.Queue()
122
+ select_model = FunctionModel(stream_function=select_then_text_stream)
123
+
124
+ with manager_agent.override(model=select_model):
125
+ pipeline = run_manager_pipeline(
126
+ "Tell me a story about a primary color but first let me select a color",
127
+ input_queue,
128
+ )
129
+ events = await _collect_pipeline(pipeline, input_queue, auto_respond="Red")
130
+
131
+ # Must get a selection event
132
+ assert any(isinstance(e, AgentRequiresUserInput) for e in events), \
133
+ "Expected AgentRequiresUserInput but LLM skipped the tool"
134
+
135
+ req = next(e for e in events if isinstance(e, AgentRequiresUserInput))
136
+ assert req.tool_name == "/select"
137
+ assert len(req.options) > 0
138
+
139
+ # Must get a text response after the selection
140
+ assert any(isinstance(e, AgentStreamChunk) for e in events)
141
+ assert isinstance(events[-1], AgentComplete)
tests/unit/test_pydantic_agents.py CHANGED
@@ -1,6 +1,8 @@
1
  import asyncio
2
  import pytest
3
- from cli_textual.agents.orchestrators import run_procedural_pipeline, run_manager_pipeline
 
 
4
  from cli_textual.core.chat_events import (
5
  AgentThinking, AgentToolStart, AgentToolEnd, AgentStreamChunk, AgentComplete,
6
  AgentRequiresUserInput
@@ -23,21 +25,17 @@ async def test_procedural_pipeline_flow():
23
  @pytest.mark.asyncio
24
  async def test_manager_pipeline_flow():
25
  """Verify that the manager pipeline initializes and completes."""
 
 
 
26
  events = []
27
  input_queue = asyncio.Queue()
28
-
29
- # We wrap this in a timeout to prevent hanging
30
- try:
31
  async with asyncio.timeout(5):
32
  pipeline = run_manager_pipeline("test prompt", input_queue)
33
  async for event in pipeline:
34
  events.append(event)
35
- # If the TestModel randomly decides to call a tool (like selection), unblock it
36
- if isinstance(event, AgentRequiresUserInput):
37
- await input_queue.put("mock selection")
38
- except asyncio.TimeoutError:
39
- pytest.fail("test_manager_pipeline_flow timed out - likely deadlocked on queue.get()")
40
-
41
- # Manager pipeline using TestModel should at least think and complete.
42
  assert any(isinstance(e, AgentThinking) for e in events)
43
  assert isinstance(events[-1], AgentComplete)
 
1
  import asyncio
2
  import pytest
3
+ from pydantic_ai.models.function import FunctionModel, AgentInfo
4
+ from pydantic_ai.messages import ModelMessage
5
+ from cli_textual.agents.orchestrators import run_procedural_pipeline, run_manager_pipeline, manager_agent
6
  from cli_textual.core.chat_events import (
7
  AgentThinking, AgentToolStart, AgentToolEnd, AgentStreamChunk, AgentComplete,
8
  AgentRequiresUserInput
 
25
  @pytest.mark.asyncio
26
  async def test_manager_pipeline_flow():
27
  """Verify that the manager pipeline initializes and completes."""
28
+ async def fixed_response(messages: list[ModelMessage], agent_info: AgentInfo):
29
+ yield "done"
30
+
31
  events = []
32
  input_queue = asyncio.Queue()
33
+
34
+ with manager_agent.override(model=FunctionModel(stream_function=fixed_response)):
 
35
  async with asyncio.timeout(5):
36
  pipeline = run_manager_pipeline("test prompt", input_queue)
37
  async for event in pipeline:
38
  events.append(event)
39
+
 
 
 
 
 
 
40
  assert any(isinstance(e, AgentThinking) for e in events)
41
  assert isinstance(events[-1], AgentComplete)
tests/unit/test_tools_command.py ADDED
@@ -0,0 +1,104 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import pytest
2
+ from unittest.mock import patch, MagicMock
3
+ from pydantic_ai.models.test import TestModel
4
+ from textual.widgets import Label, OptionList, Static
5
+ from cli_textual.app import ChatApp
6
+ from cli_textual.plugins.commands.tools import ToolsWidget, ToolsCommand, _first_line
7
+ from cli_textual.agents.orchestrators import manager_agent
8
+
9
+
10
+ # ---------------------------------------------------------------------------
11
+ # _first_line helper
12
+ # ---------------------------------------------------------------------------
13
+
14
+ def test_first_line_returns_first_non_empty():
15
+ assert _first_line("\n\n Hello world\n more text") == "Hello world"
16
+
17
+ def test_first_line_empty_string():
18
+ assert _first_line("") == ""
19
+
20
+ def test_first_line_only_whitespace():
21
+ assert _first_line(" \n \n") == ""
22
+
23
+
24
+ # ---------------------------------------------------------------------------
25
+ # ToolsWidget
26
+ # ---------------------------------------------------------------------------
27
+
28
+ @pytest.mark.asyncio
29
+ async def test_tools_widget_composes_option_list():
30
+ """ToolsWidget should render an OptionList with one item per tool."""
31
+ app = ChatApp()
32
+ async with app.run_test(size=(120, 40)) as pilot:
33
+ widget = ToolsWidget()
34
+ history = app.query_one("#history-container")
35
+ await history.mount(widget)
36
+ await pilot.pause(0.1)
37
+
38
+ option_list = widget.query_one("#tools-option-list", OptionList)
39
+ assert option_list is not None
40
+
41
+ tool_count = len(manager_agent._function_toolset.tools)
42
+ assert option_list.option_count == tool_count
43
+
44
+
45
+ @pytest.mark.asyncio
46
+ async def test_tools_widget_shows_detail_on_selection():
47
+ """Selecting a tool in the OptionList should swap to the detail view."""
48
+ app = ChatApp()
49
+ async with app.run_test(size=(120, 40)) as pilot:
50
+ widget = ToolsWidget()
51
+ history = app.query_one("#history-container")
52
+ await history.mount(widget)
53
+ await pilot.pause(0.1)
54
+
55
+ option_list = widget.query_one("#tools-option-list", OptionList)
56
+ option_list.focus()
57
+ await pilot.pause(0.05)
58
+
59
+ # Select the first item
60
+ await pilot.press("enter")
61
+ await pilot.pause(0.2)
62
+
63
+ # OptionList should be gone, Static detail should be present
64
+ assert not widget.query("#tools-option-list")
65
+ assert widget.query(".tool-detail")
66
+
67
+
68
+ @pytest.mark.asyncio
69
+ async def test_tools_widget_detail_contains_tool_name():
70
+ """Detail view Label should contain the selected tool's name."""
71
+ first_tool_name = next(iter(manager_agent._function_toolset.tools))
72
+
73
+ app = ChatApp()
74
+ async with app.run_test(size=(120, 40)) as pilot:
75
+ widget = ToolsWidget()
76
+ history = app.query_one("#history-container")
77
+ await history.mount(widget)
78
+ await pilot.pause(0.1)
79
+
80
+ option_list = widget.query_one("#tools-option-list", OptionList)
81
+ option_list.focus()
82
+ await pilot.pause(0.05)
83
+ await pilot.press("enter")
84
+ await pilot.pause(0.2)
85
+
86
+ labels = list(widget.query(Label))
87
+ assert any(first_tool_name in str(lbl.render()) for lbl in labels)
88
+
89
+
90
+ # ---------------------------------------------------------------------------
91
+ # ToolsCommand integration with ChatApp
92
+ # ---------------------------------------------------------------------------
93
+
94
+ @pytest.mark.asyncio
95
+ async def test_tools_command_mounts_widget():
96
+ """/tools command should mount ToolsWidget into #interaction-container."""
97
+ app = ChatApp()
98
+ async with app.run_test(size=(120, 40)) as pilot:
99
+ await pilot.press(*"/tools", "enter")
100
+ await pilot.pause(0.3)
101
+
102
+ container = app.query_one("#interaction-container")
103
+ assert "visible" in container.classes
104
+ assert container.query(ToolsWidget)