"""Shared Gradio UI helpers for app.py and smolbuilder.py (CLI parity)."""
from __future__ import annotations
import asyncio
import os
from dataclasses import dataclass, field
import gradio as gr
from .rust_session import (
RustSession,
expand_command,
expand_skill,
export_transcript,
list_background_jobs,
list_commands,
list_mcp,
list_rules,
list_skills,
render_config,
session_timeline,
write_agents_md,
)
@dataclass
class UiSettings:
workspace: str = "."
model: str = ""
agent: str = "build"
mode: str = "normal" # normal | auto | plan
think: str = "off"
yolo: bool = False
fan_out: bool = False
@dataclass
class ApprovalState:
pending_desc: str | None = None
result: bool | None = None
async def ask(self, desc: str) -> bool:
self.pending_desc = desc
self.result = None
while self.result is None:
await asyncio.sleep(0.15)
approved = bool(self.result)
self.pending_desc = None
self.result = None
return approved
def approve(self, yes: bool = True) -> None:
self.result = yes
def deny(self) -> None:
self.approve(False)
@dataclass
class AppSessionState:
"""Gradio gr.State payload for session + settings."""
rust: RustSession | None = None
settings: UiSettings = field(default_factory=UiSettings)
approval: ApprovalState = field(default_factory=ApprovalState)
status_msg: str = ""
bg_jobs: str = ""
@dataclass
class SlashResult:
reply: str = ""
queued_task: str | None = None
clear_chat: bool = False
download_path: str | None = None
toggle_sidebar: bool = False
toggle_sidebar_view: bool = False
open_picker: str | None = None
cycle_mode: bool = False
cycle_think: bool = False
set_think: str | None = None
show_help: bool = False
show_whichkey: bool = False
_BUILTIN_SLASH = {
"/help", "/new", "/sessions", "/fork", "/rename", "/export", "/stats",
"/mcp", "/rules", "/skills", "/skill", "/commit", "/init", "/bg", "/clear",
"/delete", "/timeline", "/mode", "/think", "/config", "/search",
"/agents", "/models", "/themes", "/files", "/quit",
}
_ATTACH_MAX = 8192
def parse_input(
text: str,
*,
workspace_files: list[str] | None = None,
workspace: str | None = None,
rust: RustSession | None = None,
) -> tuple[str, str | None, str | None]:
"""Parse user input. Returns (task, slash_command_result, shell_output).
- `!cmd` runs shell directly
- `/cmd args` returns command to dispatch
- `@file` inlines file content into task
"""
stripped = (text or "").strip()
if not stripped:
return "", None, None
if stripped.startswith("!"):
return "", None, stripped[1:].strip()
if stripped.startswith("/"):
return "", stripped, None
task = stripped
if "@" in task and (workspace_files or workspace):
from .rust_session import read_workspace_file
paths = list(workspace_files or [])
import re
for match in re.finditer(r"@(\S+)", task):
path = match.group(1)
if paths and path not in paths:
candidates = [p for p in paths if p.endswith(path) or p == path]
if len(candidates) == 1:
path = candidates[0]
elif path not in paths:
continue
ws = workspace or (rust.workspace_path if rust else ".")
content = read_workspace_file(ws, path, max_bytes=_ATTACH_MAX, rust=rust)
if content is not None:
block = f"[attached: {path}]\n```\n{content}\n```"
task = task.replace(f"@{match.group(1)}", block, 1)
return task, None, None
def _workspace(session: AppSessionState) -> str:
return session.settings.workspace or "."
def dispatch_slash(cmd_line: str, session: AppSessionState) -> SlashResult:
"""Handle a slash command; mirrors CLI TUI handle_slash."""
parts = cmd_line.strip().split(maxsplit=1)
cmd = parts[0].lower()
args = parts[1] if len(parts) > 1 else ""
ws = _workspace(session)
if cmd == "/help":
custom = list_commands(ws)
extra = ""
if custom:
extra = "\n\n**Custom commands:** " + ", ".join(f"`/{n}`" for n in custom)
return SlashResult(
reply=(
"**Slash commands:** `/new`, `/sessions`, `/fork`, `/rename
`, "
"`/stats`, `/export [file]`, `/timeline`, `/delete`, `/mcp`, `/rules`, "
"`/skills`, `/skill `, `/commit [msg]`, `/init`, `/bg`, `/clear`, "
"`/mode`, `/think`, `/config`, `/search`, `/files`"
f"{extra}\n\n"
"**Input:** `!cmd` runs shell without LLM; `@file` attaches workspace files."
)
)
if cmd == "/new":
session.rust = None
return SlashResult(reply="Started a new session.", clear_chat=True)
if cmd == "/sessions":
rows = RustSession.list_sessions()
if not rows:
return SlashResult(reply="_No saved sessions._")
lines = [f"- **{r['title']}** (`{r['id']}`)" for r in rows[:20]]
return SlashResult(reply="**Sessions:**\n" + "\n".join(lines))
if cmd == "/fork":
if session.rust and (nid := session.rust.fork()):
return SlashResult(reply=f"Forked session → `{nid}`")
return SlashResult(reply="Nothing to fork yet.")
if cmd == "/rename":
if session.rust and args and session.rust.rename(args):
return SlashResult(reply=f"Renamed session to **{args}**")
return SlashResult(reply="Usage: `/rename `")
if cmd == "/stats":
nfiles = len(session.rust.files()) if session.rust else 0
sid = session.rust.session_id if session.rust else "(none)"
return SlashResult(
reply=(
f"session `{sid}` · workspace: `{ws}` · files: {nfiles} · "
f"agent: {session.settings.agent}"
)
)
if cmd == "/export":
sid = session.rust.session_id if session.rust else ""
if not sid:
return SlashResult(reply="No session to export yet.")
try:
path = export_transcript(sid, args or None)
return SlashResult(
reply=f"Exported transcript to `{path}`",
download_path=path,
)
except Exception as e:
return SlashResult(reply=f"/export failed: {e}")
if cmd == "/mcp":
if session.rust is None:
return SlashResult(
reply="_Start a task first so MCP servers are connected._"
)
servers = list_mcp(session.rust)
if not servers:
return SlashResult(
reply=(
"no MCP servers connected — add `[[mcp]]` entries to "
"`~/.config/smolcode/config.toml` or `.smolcode/config.toml`"
)
)
lines = [f"**MCP servers ({len(servers)}):**"]
for row in servers:
tools = row.get("tools", [])
tlist = ", ".join(tools[:8]) if tools else "(no tools)"
if len(tools) > 8:
tlist += "…"
lines.append(f"- **{row.get('server', '?')}** ({len(tools)}): {tlist}")
return SlashResult(reply="\n".join(lines))
if cmd == "/rules":
rules = list_rules(ws)
if not rules:
return SlashResult(
reply="no rules — add `*.md` to `.smolcode/rules/` or `~/.config/smolcode/rules/`"
)
lines = [f"**active rules ({len(rules)}):**"]
for r in rules:
desc = r.get("description", "")
tail = f" — {desc}" if desc else ""
lines.append(f"- `{r.get('name', '?')}` [{r.get('scope', '?')}]{tail}")
return SlashResult(reply="\n".join(lines))
if cmd == "/skills":
skills = list_skills(ws)
if not skills:
return SlashResult(
reply="no skills — add `/SKILL.md` to `.smolcode/skills/`"
)
lines = [f"**skills ({len(skills)})** — run with `/skill `:"]
for s in skills:
desc = s.get("description", "")
tail = f" — {desc}" if desc else ""
lines.append(f"- `{s.get('name', '?')}`{tail}")
return SlashResult(reply="\n".join(lines))
if cmd == "/skill":
if not args:
return SlashResult(reply="Usage: `/skill [args]` (see `/skills`)")
sname, _, sargs = args.partition(" ")
sname = sname.strip()
sargs = sargs.strip()
expanded = expand_skill(ws, sname, sargs)
if expanded is None:
return SlashResult(reply=f"no skill named `{sname}` (see `/skills`)")
return SlashResult(reply=f"Running skill **{sname}**…", queued_task=expanded)
if cmd == "/commit":
if args:
task = f"Commit all current changes with git_commit using this message: {args}"
else:
task = (
"Review the staged/unstaged changes with git_diff, then commit them "
"with git_commit using a concise, descriptive message."
)
return SlashResult(reply="Queued git commit task…", queued_task=task)
if cmd == "/init":
try:
path = write_agents_md(ws)
return SlashResult(reply=f"wrote `{path}` (project guide for agents)")
except Exception as e:
return SlashResult(reply=f"/init: {e}")
if cmd == "/bg":
session.bg_jobs = list_background_jobs()
return SlashResult(reply=session.bg_jobs or "_No background jobs._")
if cmd == "/timeline":
sid = session.rust.session_id if session.rust else ""
if not sid:
return SlashResult(reply="no saved session yet")
lines = session_timeline(sid)
return SlashResult(reply="**Timeline:**\n" + "\n".join(f"- {ln}" for ln in lines))
if cmd == "/delete":
removed = session.rust.delete() if session.rust else False
session.rust = None
msg = "deleted session; started a new one" if removed else "started a new session"
return SlashResult(reply=msg, clear_chat=True)
if cmd == "/clear":
return SlashResult(reply="_Transcript cleared._", clear_chat=True)
if cmd == "/mode":
return SlashResult(reply="Cycling mode…", cycle_mode=True)
if cmd == "/think":
if args:
return SlashResult(reply=f"think → {args}", set_think=args.split()[0].lower())
return SlashResult(reply="Cycling think level…", cycle_think=True)
if cmd == "/config":
if session.rust is None:
return SlashResult(reply="_Start a task first to view config._")
return SlashResult(reply=f"```\n{render_config(session.rust)}\n```")
if cmd == "/search":
if not args:
return SlashResult(reply="Usage: `/search `")
return SlashResult(reply=f"_Search for `{args}` runs in transcript handler._")
if cmd == "/agents":
return SlashResult(reply="Opening agent picker…", open_picker="agents")
if cmd == "/models":
return SlashResult(reply="Opening model picker…", open_picker="models")
if cmd == "/themes":
return SlashResult(reply="Opening theme picker…", open_picker="themes")
if cmd == "/files":
return SlashResult(reply="Toggling sidebar…", toggle_sidebar=True)
if cmd == "/quit":
return SlashResult(reply="_Use browser close to exit the web UI._")
if cmd not in _BUILTIN_SLASH:
name = cmd.lstrip("/")
expanded = expand_command(ws, name, args)
if expanded is not None:
return SlashResult(
reply=f"Running custom command `/{name}`…",
queued_task=expanded,
)
return SlashResult(reply=f"Unknown command `{cmd}`. Try `/help`.")
def settings_from_ui(
workspace: str,
model: str,
agent: str,
mode: str,
think: str,
yolo: bool,
) -> UiSettings:
y = yolo or mode == "auto"
ag = "plan" if mode == "plan" else agent
return UiSettings(
workspace=workspace or ".",
model=model or "",
agent=ag,
mode=mode,
think=think,
yolo=y,
)
def build_settings_panel(preset_models: list[str]) -> dict:
"""Return Gradio components for the settings sidebar."""
with gr.Accordion("⚙️ settings", open=False):
workspace = gr.Textbox(
value=os.environ.get("SMALLCODE_WORKSPACE", "."),
label="workspace directory",
)
model = gr.Dropdown(
choices=preset_models,
value=preset_models[0] if preset_models else "",
label="model",
allow_custom_value=True,
)
agent = gr.Dropdown(
choices=["build", "plan"],
value="build",
label="agent",
)
mode = gr.Radio(
choices=["normal", "auto", "plan"],
value="normal",
label="mode",
)
think = gr.Dropdown(
choices=["off", "low", "high", "xtra"],
value="off",
label="think level",
)
yolo = gr.Checkbox(value=False, label="yolo (auto-approve tools)")
return {
"workspace": workspace,
"model": model,
"agent": agent,
"mode": mode,
"think": think,
"yolo": yolo,
}
def file_tree_md(files: dict[str, str], selected: str | None = None) -> str:
"""Legacy flat file list (prefer engine.file_tree.build_workspace_panel)."""
if not files:
return "_workspace is empty_"
lines = []
for path in sorted(files):
mark = " →" if path == selected else ""
lines.append(f"- `{path}`{mark}")
body = files.get(selected or "", "") if selected and selected in files else ""
if body:
lang = "python" if selected.endswith(".py") else ""
return "\n".join(lines) + f"\n\n**`{selected}`**\n```{lang}\n{body}\n```"
return "\n".join(lines)