James Lindsay
feat: customization hooks for ChatApp subclasses (#12)
c4ba2e7 unverified
import asyncio
import os
import time
import uuid
from pathlib import Path
from typing import AsyncGenerator, Callable, List, Optional
from textual import on, events
from textual.app import App, ComposeResult
from textual.containers import Container, VerticalScroll, Horizontal
from textual.widgets import (
Header, Footer, Static, Markdown, Label, OptionList,
TabbedContent, DirectoryTree, DataTable, Collapsible
)
from textual.widgets.option_list import Option
from textual.binding import Binding
# Core Framework Imports
from cli_textual.core.fs import FSManager
from cli_textual.core.permissions import PermissionManager
from cli_textual.core.command import CommandManager
from cli_textual.core.chat_events import (
ChatEvent, AgentThinking, AgentToolStart, AgentToolEnd, AgentToolOutput,
AgentStreamChunk, AgentComplete, AgentRequiresUserInput, AgentExecuteCommand,
AgentThinkingChunk, AgentThinkingComplete,
)
# Pydantic AI Orchestrators
from cli_textual.agents.manager import run_manager_pipeline
from cli_textual.agents.observability import init_observability, is_tracing_enabled
from cli_textual.core.conversation_log import ConversationLogger, default_log_path
# UI Component Imports
from cli_textual.ui.widgets.growing_text_area import GrowingTextArea
from cli_textual.ui.widgets.dna_spinner import DNASpinner
from cli_textual.ui.screens.permission_screen import PermissionScreen
from cli_textual.ui.widgets.landing_page import LandingPage
class ChatApp(App):
"""Refactored ChatApp using modular architecture."""
# Absolute path so subclasses defined in other packages inherit the
# correct stylesheet location. Textual resolves a relative ``CSS_PATH``
# against the subclass's own module file, which would break any
# third-party ``ChatApp`` subclass.
CSS_PATH = str(Path(__file__).parent / "app.tcss")
BINDINGS = [
Binding("ctrl+c", "quit", "Quit", show=False),
Binding("ctrl+d", "double_ctrl_d", "Exit", show=True, priority=True),
Binding("escape", "cancel_interaction", "Cancel", show=False),
Binding("ctrl+n", "next_tab", "Next Tab", show=False, priority=True),
Binding("ctrl+p", "prev_tab", "Prev Tab", show=False, priority=True),
]
# Landing-page widget class mounted by ``on_mount``. Subclasses can
# override this class attribute to swap in their own branded widget
# without having to override ``on_mount`` (which Textual dispatches
# to every class in the MRO, so an override alone would leave the
# default landing page mounted alongside the custom one).
LANDING_WIDGET_CLS: type = LandingPage
def __init__(
self,
tools: Optional[List[Callable]] = None,
command_packages: Optional[List[str]] = None,
command_filter: Optional[Callable[[str], bool]] = None,
model: Optional[str] = None,
safe_mode: Optional[bool] = None,
log: bool = False,
log_path: Optional[Path] = None,
system_prompt: Optional[str] = None,
system_prompt_append: Optional[str] = None,
**kwargs,
):
# Apply library overrides BEFORE the manager agent is first built.
if model is not None:
from cli_textual.agents.model import set_model
set_model(model)
import cli_textual.agents.manager as _mgr
if safe_mode is not None:
_mgr.SAFE_MODE = safe_mode
if system_prompt is not None:
_mgr.SYSTEM_PROMPT_OVERRIDE = system_prompt
if system_prompt_append is not None:
_mgr.SYSTEM_PROMPT_APPEND = system_prompt_append
if tools:
from cli_textual.tools.registry import register_tool
for t in tools:
register_tool(t)
if (
model is not None
or safe_mode is not None
or tools
or system_prompt is not None
or system_prompt_append is not None
):
from cli_textual.agents.manager import _reset_agent
_reset_agent()
super().__init__(**kwargs)
init_observability()
self.session_id = str(uuid.uuid4())
self.last_ctrl_d_time = 0
self.survey_answers = {}
# Allow setting default mode via environment variable
self.chat_mode = os.getenv("CHAT_MODE", "manager")
self.message_history = [] # For LLM context memory
self.interactive_input_queue = asyncio.Queue()
self.verbose_mode = False
self._agent_waiting_for_input = False
# Optional append-only JSONL conversation log for debugging.
self.conversation_log: Optional[ConversationLogger] = None
if log or log_path is not None:
target = Path(log_path) if log_path is not None else default_log_path(self.session_id)
self.conversation_log = ConversationLogger(target, self.session_id)
# Initialize Core Managers
self.workspace_root = Path.cwd().resolve()
self.fs_manager = FSManager(self.workspace_root)
self.permission_manager = PermissionManager(self.workspace_root / ".agents" / "settings.json")
self.command_manager = CommandManager()
# Register Commands via Auto-Discovery
self.command_manager.auto_discover("cli_textual.plugins.commands")
for pkg in command_packages or []:
self.command_manager.auto_discover(pkg)
# Optional post-discovery filter. Lets callers restrict the active
# command set (for example, to a read-only allowlist in a sandboxed
# deployment) without having to mutate ``command_manager.commands``
# after the fact.
if command_filter is not None:
self.command_manager.commands = {
name: cmd
for name, cmd in self.command_manager.commands.items()
if command_filter(name)
}
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
yield VerticalScroll(id="history-container")
yield Container(id="interaction-container")
with Container(id="bottom-dock"):
with Horizontal(id="input-container"):
yield Label("> ", id="prompt"); yield GrowingTextArea(id="main-input")
yield OptionList(id="autocomplete-list")
with Horizontal(id="status-bar"):
yield Label("workspace (/directory)", classes="status-info")
yield Label(f"mode: {self.chat_mode}", classes="status-info mode-info")
from cli_textual.agents.model import get_model
model_name = getattr(get_model(), "model_name", "test-mock")
yield Label(f"model: {model_name}", classes="status-info model-info")
trace_label = "[green]● langfuse[/]" if is_tracing_enabled() else "[dim]○ langfuse[/]"
yield Label(trace_label, classes="status-info")
yield Label(str(self.workspace_root), classes="path-info")
yield Footer()
def on_mount(self) -> None:
self.query_one("#main-input").focus()
history = self.query_one("#history-container")
history.mount(self.LANDING_WIDGET_CLS())
@on(OptionList.OptionSelected, "#mode-select-list")
def handle_mode_selection(self, event: OptionList.OptionSelected) -> None:
"""Handle user selecting a mode from the /mode command list."""
selection = str(event.option.prompt)
self.chat_mode = selection
# Clear UI
interaction = self.query_one("#interaction-container")
interaction.remove_class("visible")
interaction.query("*").remove()
# Feedback
self.add_to_history(f"Chat mode set to: **{self.chat_mode}**")
self.query_one(".mode-info").update(f"mode: {self.chat_mode}")
self.query_one("#main-input").focus()
@on(OptionList.OptionSelected, "#agent-select-tool")
def handle_agent_selection(self, event: OptionList.OptionSelected) -> None:
"""Handle user making a choice requested by an agent tool."""
selection = str(event.option.prompt)
self._agent_waiting_for_input = False
# Clear the interaction area
interaction = self.query_one("#interaction-container")
interaction.remove_class("visible")
interaction.query("*").remove()
# Log choice to history
self.add_to_history(f"Selected: **{selection}**", is_user=True)
# Resume the agent by pushing the selection into the queue
if hasattr(self, "interactive_input_queue"):
# Drain any stale entries (safety measure)
while not self.interactive_input_queue.empty():
self.interactive_input_queue.get_nowait()
self.interactive_input_queue.put_nowait(selection)
# Refocus main input
self.query_one("#main-input").focus()
@on(GrowingTextArea.Changed)
def handle_changes(self, event: GrowingTextArea.Changed) -> None:
event.text_area.styles.height = min(max(1, event.text_area.document.line_count), 10)
self.update_autocomplete(event.text_area.text)
def update_autocomplete(self, text: str) -> None:
autocomplete = self.query_one("#autocomplete-list", OptionList)
if text.startswith("/"):
filtered = [(c.name, c.description) for c in self.command_manager.commands.values() if c.name.startswith(text)]
if filtered:
autocomplete.clear_options()
for name, desc in filtered: autocomplete.add_option(Option(f"{name.ljust(15)} {desc}"))
autocomplete.highlighted = 0; autocomplete.set_class(True, "visible"); return
autocomplete.set_class(False, "visible")
def add_to_history(self, text: str, is_user: bool = False):
history = self.query_one("#history-container")
if is_user: history.mount(Static(f"> {text}", classes="user-msg"))
else: history.mount(Markdown(text, classes="ai-msg"))
history.scroll_end(animate=False)
@on(GrowingTextArea.Submitted)
async def handle_submission(self, event: GrowingTextArea.Submitted) -> None:
user_input = event.text
self.add_to_history(user_input, is_user=True)
if self.conversation_log is not None:
if user_input.startswith("/"):
parts = user_input.split()
self.conversation_log.log_user_command(parts[0], parts[1:])
else:
self.conversation_log.log_user_message(user_input)
if user_input.startswith("/"):
await self.process_command(user_input)
else:
generator = run_manager_pipeline(user_input, self.interactive_input_queue, message_history=self.message_history, session_id=self.session_id)
self.run_worker(self.stream_agent_response(generator))
self.query_one("#main-input").focus()
async def stream_agent_response(self, generator: AsyncGenerator[ChatEvent, None]):
"""Worker to consume events from the agent and update the UI."""
history = self.query_one("#history-container")
# Create a container for the agent's progress
task_label = Label("Thinking...", id="task-label")
progress = Horizontal(
DNASpinner(),
task_label,
classes="agent-spinner",
id="agent-progress"
)
progress.styles.height = 1
progress.styles.margin = (1, 0)
history.mount(progress)
history.scroll_end(animate=False)
markdown_widget = None
full_text = ""
thinking_collapsible = None
thinking_widget = None
thinking_text = ""
async for event in generator:
if self.conversation_log is not None:
self.conversation_log.log_event(event)
if isinstance(event, AgentThinkingChunk):
if not thinking_collapsible:
thinking_collapsible = Collapsible(
Static("", classes="thinking-content"),
title="Reasoning",
collapsed=not self.verbose_mode,
classes="thinking-block",
)
await history.mount(thinking_collapsible)
thinking_widget = thinking_collapsible.query_one(".thinking-content")
thinking_text += event.text
thinking_widget.update(thinking_text)
history.scroll_end(animate=False)
elif isinstance(event, AgentThinkingComplete):
if thinking_widget:
thinking_widget.update(event.full_text)
elif isinstance(event, AgentThinking):
task_label.update(event.message)
elif isinstance(event, AgentRequiresUserInput):
# Pause and show the interaction UI
self._agent_waiting_for_input = True
interaction = self.query_one("#interaction-container")
interaction.add_class("visible")
interaction.query("*").remove()
interaction.mount(Label(event.prompt))
if event.tool_name == "/select":
options = OptionList(*event.options, id="agent-select-tool")
interaction.mount(options)
self.call_after_refresh(options.focus)
history.scroll_end(animate=False)
elif isinstance(event, AgentExecuteCommand):
# Proactively execute a TUI command
full_cmd = event.command_name
if event.args:
full_cmd += " " + " ".join(event.args)
await self.process_command(full_cmd)
elif isinstance(event, AgentToolStart):
task_label.update(f"Running tool: [bold cyan]{event.tool_name}[/]")
elif isinstance(event, AgentToolOutput):
# Render tool output as Markdown inside a collapsed-by-default
# Collapsible so the chat stays scannable. Tool results are
# often long (tables, file contents, JSON) and the user can
# expand them on demand. Errors stay expanded so failures are
# visible without a click.
style_class = "tool-output-error" if event.is_error else "tool-output"
status = "error" if event.is_error else "ok"
title = f"{event.tool_name}{status}"
coll = Collapsible(
Markdown(event.content or "", classes=style_class),
title=title,
collapsed=not event.is_error,
classes="tool-output-block",
)
await history.mount(coll)
history.scroll_end(animate=False)
elif isinstance(event, AgentToolEnd):
task_label.update(f"Tool complete: [bold green]{event.tool_name}[/]")
elif isinstance(event, AgentStreamChunk):
# If we're starting to stream, remove the spinner and create the Markdown widget
if not markdown_widget:
await progress.remove()
markdown_widget = Markdown("", classes="ai-msg")
await history.mount(markdown_widget)
full_text += event.text
await markdown_widget.update(full_text)
history.scroll_end(animate=False)
elif isinstance(event, AgentComplete):
# Save new history for context memory
if event.new_history:
self.message_history.extend(event.new_history)
# If we never got a stream (e.g. only tool calls), remove progress
if "agent-progress" in [c.id for c in history.children]:
await progress.remove()
history.scroll_end(animate=False)
async def process_command(self, cmd_str: str):
parts = cmd_str.split()
name = parts[0].lower()
args = parts[1:]
cmd = self.command_manager.get_command(name)
if not cmd:
self.add_to_history(f"Unknown command: {name}")
return
if cmd.requires_permission and not self.permission_manager.is_tool_approved(name):
self.push_screen(PermissionScreen(name), lambda approved: self.handle_command_auth(approved, cmd, args))
else:
self.run_worker(cmd.execute(self, args))
async def handle_command_auth(self, approved: bool, cmd, args):
if approved:
self.permission_manager.approve_tool(cmd.name)
self.run_worker(cmd.execute(self, args))
else:
self.add_to_history(f"Permission denied for {cmd.name}.")
@on(DataTable.RowSelected)
def handle_row_selected(self, event: DataTable.RowSelected):
if event.row_key is not None:
path = Path(str(event.row_key))
if path.is_file():
cmd = self.command_manager.get_command("/head")
if cmd: self.run_worker(cmd.execute(self, [str(path), "20"]))
@on(DirectoryTree.FileSelected)
def handle_file_selected(self, event: DirectoryTree.FileSelected):
cmd = self.command_manager.get_command("/head")
if cmd: self.run_worker(cmd.execute(self, [str(event.path), "20"]))
@on(OptionList.OptionSelected)
def handle_selection(self, event: OptionList.OptionSelected):
if event.option_list.id == "autocomplete-list": return
list_id = event.option_list.id; choice = str(event.option.prompt)
if list_id and list_id.startswith("opt-q"):
self.survey_answers[list_id] = choice
tabs = self.query_one("#survey-tabs", TabbedContent)
if list_id == "opt-q1": tabs.active = "q2"; self.call_after_refresh(lambda: self.query_one("#opt-q2", OptionList).focus())
else: self.add_to_history("Survey complete."); self.cancel_interaction()
else:
self.add_to_history(f"Selected: **{choice}**")
self.cancel_interaction()
@on(events.DescendantBlur)
def handle_descendant_blur(self, event: events.DescendantBlur):
if "visible" in self.query_one("#interaction-container").classes: self.set_timer(0.1, self.check_focus_loss)
def check_focus_loss(self):
try:
if self._agent_waiting_for_input:
return
container = self.query_one("#interaction-container")
if "visible" in container.classes and not any(w.has_focus for w in container.query("*")):
self.cancel_interaction()
except: pass
def action_cancel_interaction(self):
if self._agent_waiting_for_input:
return
if "visible" in self.query_one("#interaction-container").classes:
self.cancel_interaction()
def cancel_interaction(self):
container = self.query_one("#interaction-container")
container.remove_class("visible"); container.query("*").remove()
self.query_one("#main-input").focus()
def on_key(self, event: events.Key) -> None:
if event.key == "tab":
try:
tabs = self.query_one("#survey-tabs", TabbedContent)
if tabs.visible: event.prevent_default(); self.action_next_tab()
except: pass
def action_next_tab(self):
try:
tabs = self.query_one("#survey-tabs", TabbedContent)
tabs.active = "q2" if tabs.active == "q1" else "q1"
self.query_one(f"#opt-{tabs.active}", OptionList).focus()
except: pass
def action_prev_tab(self): self.action_next_tab()
def action_double_ctrl_d(self):
if time.time() - self.last_ctrl_d_time < 1.0: self.exit()
else: self.last_ctrl_d_time = time.time(); self.notify("Press Ctrl+D again to exit", timeout=1)
def on_unmount(self) -> None:
if self.conversation_log is not None:
self.conversation_log.close()
def main():
import argparse
parser = argparse.ArgumentParser(prog="demo-cli", description="cli-textual chat TUI")
parser.add_argument(
"--log",
nargs="?",
const="",
default=None,
metavar="PATH",
help=(
"Record the full conversation (user input, LLM events, tool calls) "
"to a JSONL file. With no argument, writes to "
"~/.cli-textual/convos/<utc-timestamp>-<sid>.jsonl. Pass an explicit "
"path to override."
),
)
args = parser.parse_args()
log_enabled = args.log is not None
log_path = Path(args.log) if args.log else None
ChatApp(log=log_enabled, log_path=log_path).run()
if __name__ == "__main__":
main()