Spaces:
Paused
Paused
| """Shared Gradio UI helpers for app.py and smolbuilder.py (CLI parity).""" | |
| from __future__ import annotations | |
| import asyncio | |
| import os | |
| from dataclasses import dataclass, field | |
| import gradio as gr | |
| from .rust_session import ( | |
| RustSession, | |
| expand_command, | |
| expand_skill, | |
| export_transcript, | |
| list_background_jobs, | |
| list_commands, | |
| list_mcp, | |
| list_rules, | |
| list_skills, | |
| render_config, | |
| session_timeline, | |
| write_agents_md, | |
| ) | |
| class UiSettings: | |
| workspace: str = "." | |
| model: str = "" | |
| agent: str = "build" | |
| mode: str = "normal" # normal | auto | plan | |
| think: str = "off" | |
| yolo: bool = False | |
| fan_out: bool = False | |
| class ApprovalState: | |
| pending_desc: str | None = None | |
| result: bool | None = None | |
| async def ask(self, desc: str) -> bool: | |
| self.pending_desc = desc | |
| self.result = None | |
| while self.result is None: | |
| await asyncio.sleep(0.15) | |
| approved = bool(self.result) | |
| self.pending_desc = None | |
| self.result = None | |
| return approved | |
| def approve(self, yes: bool = True) -> None: | |
| self.result = yes | |
| def deny(self) -> None: | |
| self.approve(False) | |
| class AppSessionState: | |
| """Gradio gr.State payload for session + settings.""" | |
| rust: RustSession | None = None | |
| settings: UiSettings = field(default_factory=UiSettings) | |
| approval: ApprovalState = field(default_factory=ApprovalState) | |
| status_msg: str = "" | |
| bg_jobs: str = "" | |
| class SlashResult: | |
| reply: str = "" | |
| queued_task: str | None = None | |
| clear_chat: bool = False | |
| download_path: str | None = None | |
| toggle_sidebar: bool = False | |
| toggle_sidebar_view: bool = False | |
| open_picker: str | None = None | |
| cycle_mode: bool = False | |
| cycle_think: bool = False | |
| set_think: str | None = None | |
| show_help: bool = False | |
| show_whichkey: bool = False | |
| _BUILTIN_SLASH = { | |
| "/help", "/new", "/sessions", "/fork", "/rename", "/export", "/stats", | |
| "/mcp", "/rules", "/skills", "/skill", "/commit", "/init", "/bg", "/clear", | |
| "/delete", "/timeline", "/mode", "/think", "/config", "/search", | |
| "/agents", "/models", "/themes", "/files", "/quit", | |
| } | |
| _ATTACH_MAX = 8192 | |
| def parse_input( | |
| text: str, | |
| *, | |
| workspace_files: list[str] | None = None, | |
| workspace: str | None = None, | |
| rust: RustSession | None = None, | |
| ) -> tuple[str, str | None, str | None]: | |
| """Parse user input. Returns (task, slash_command_result, shell_output). | |
| - `!cmd` runs shell directly | |
| - `/cmd args` returns command to dispatch | |
| - `@file` inlines file content into task | |
| """ | |
| stripped = (text or "").strip() | |
| if not stripped: | |
| return "", None, None | |
| if stripped.startswith("!"): | |
| return "", None, stripped[1:].strip() | |
| if stripped.startswith("/"): | |
| return "", stripped, None | |
| task = stripped | |
| if "@" in task and (workspace_files or workspace): | |
| from .rust_session import read_workspace_file | |
| paths = list(workspace_files or []) | |
| import re | |
| for match in re.finditer(r"@(\S+)", task): | |
| path = match.group(1) | |
| if paths and path not in paths: | |
| candidates = [p for p in paths if p.endswith(path) or p == path] | |
| if len(candidates) == 1: | |
| path = candidates[0] | |
| elif path not in paths: | |
| continue | |
| ws = workspace or (rust.workspace_path if rust else ".") | |
| content = read_workspace_file(ws, path, max_bytes=_ATTACH_MAX, rust=rust) | |
| if content is not None: | |
| block = f"[attached: {path}]\n```\n{content}\n```" | |
| task = task.replace(f"@{match.group(1)}", block, 1) | |
| return task, None, None | |
| def _workspace(session: AppSessionState) -> str: | |
| return session.settings.workspace or "." | |
| def dispatch_slash(cmd_line: str, session: AppSessionState) -> SlashResult: | |
| """Handle a slash command; mirrors CLI TUI handle_slash.""" | |
| parts = cmd_line.strip().split(maxsplit=1) | |
| cmd = parts[0].lower() | |
| args = parts[1] if len(parts) > 1 else "" | |
| ws = _workspace(session) | |
| if cmd == "/help": | |
| custom = list_commands(ws) | |
| extra = "" | |
| if custom: | |
| extra = "\n\n**Custom commands:** " + ", ".join(f"`/{n}`" for n in custom) | |
| return SlashResult( | |
| reply=( | |
| "**Slash commands:** `/new`, `/sessions`, `/fork`, `/rename <title>`, " | |
| "`/stats`, `/export [file]`, `/timeline`, `/delete`, `/mcp`, `/rules`, " | |
| "`/skills`, `/skill <name>`, `/commit [msg]`, `/init`, `/bg`, `/clear`, " | |
| "`/mode`, `/think`, `/config`, `/search`, `/files`" | |
| f"{extra}\n\n" | |
| "**Input:** `!cmd` runs shell without LLM; `@file` attaches workspace files." | |
| ) | |
| ) | |
| if cmd == "/new": | |
| session.rust = None | |
| return SlashResult(reply="Started a new session.", clear_chat=True) | |
| if cmd == "/sessions": | |
| rows = RustSession.list_sessions() | |
| if not rows: | |
| return SlashResult(reply="_No saved sessions._") | |
| lines = [f"- **{r['title']}** (`{r['id']}`)" for r in rows[:20]] | |
| return SlashResult(reply="**Sessions:**\n" + "\n".join(lines)) | |
| if cmd == "/fork": | |
| if session.rust and (nid := session.rust.fork()): | |
| return SlashResult(reply=f"Forked session → `{nid}`") | |
| return SlashResult(reply="Nothing to fork yet.") | |
| if cmd == "/rename": | |
| if session.rust and args and session.rust.rename(args): | |
| return SlashResult(reply=f"Renamed session to **{args}**") | |
| return SlashResult(reply="Usage: `/rename <title>`") | |
| if cmd == "/stats": | |
| nfiles = len(session.rust.files()) if session.rust else 0 | |
| sid = session.rust.session_id if session.rust else "(none)" | |
| return SlashResult( | |
| reply=( | |
| f"session `{sid}` · workspace: `{ws}` · files: {nfiles} · " | |
| f"agent: {session.settings.agent}" | |
| ) | |
| ) | |
| if cmd == "/export": | |
| sid = session.rust.session_id if session.rust else "" | |
| if not sid: | |
| return SlashResult(reply="No session to export yet.") | |
| try: | |
| path = export_transcript(sid, args or None) | |
| return SlashResult( | |
| reply=f"Exported transcript to `{path}`", | |
| download_path=path, | |
| ) | |
| except Exception as e: | |
| return SlashResult(reply=f"/export failed: {e}") | |
| if cmd == "/mcp": | |
| if session.rust is None: | |
| return SlashResult( | |
| reply="_Start a task first so MCP servers are connected._" | |
| ) | |
| servers = list_mcp(session.rust) | |
| if not servers: | |
| return SlashResult( | |
| reply=( | |
| "no MCP servers connected — add `[[mcp]]` entries to " | |
| "`~/.config/smolcode/config.toml` or `.smolcode/config.toml`" | |
| ) | |
| ) | |
| lines = [f"**MCP servers ({len(servers)}):**"] | |
| for row in servers: | |
| tools = row.get("tools", []) | |
| tlist = ", ".join(tools[:8]) if tools else "(no tools)" | |
| if len(tools) > 8: | |
| tlist += "…" | |
| lines.append(f"- **{row.get('server', '?')}** ({len(tools)}): {tlist}") | |
| return SlashResult(reply="\n".join(lines)) | |
| if cmd == "/rules": | |
| rules = list_rules(ws) | |
| if not rules: | |
| return SlashResult( | |
| reply="no rules — add `*.md` to `.smolcode/rules/` or `~/.config/smolcode/rules/`" | |
| ) | |
| lines = [f"**active rules ({len(rules)}):**"] | |
| for r in rules: | |
| desc = r.get("description", "") | |
| tail = f" — {desc}" if desc else "" | |
| lines.append(f"- `{r.get('name', '?')}` [{r.get('scope', '?')}]{tail}") | |
| return SlashResult(reply="\n".join(lines)) | |
| if cmd == "/skills": | |
| skills = list_skills(ws) | |
| if not skills: | |
| return SlashResult( | |
| reply="no skills — add `<name>/SKILL.md` to `.smolcode/skills/`" | |
| ) | |
| lines = [f"**skills ({len(skills)})** — run with `/skill <name>`:"] | |
| for s in skills: | |
| desc = s.get("description", "") | |
| tail = f" — {desc}" if desc else "" | |
| lines.append(f"- `{s.get('name', '?')}`{tail}") | |
| return SlashResult(reply="\n".join(lines)) | |
| if cmd == "/skill": | |
| if not args: | |
| return SlashResult(reply="Usage: `/skill <name> [args]` (see `/skills`)") | |
| sname, _, sargs = args.partition(" ") | |
| sname = sname.strip() | |
| sargs = sargs.strip() | |
| expanded = expand_skill(ws, sname, sargs) | |
| if expanded is None: | |
| return SlashResult(reply=f"no skill named `{sname}` (see `/skills`)") | |
| return SlashResult(reply=f"Running skill **{sname}**…", queued_task=expanded) | |
| if cmd == "/commit": | |
| if args: | |
| task = f"Commit all current changes with git_commit using this message: {args}" | |
| else: | |
| task = ( | |
| "Review the staged/unstaged changes with git_diff, then commit them " | |
| "with git_commit using a concise, descriptive message." | |
| ) | |
| return SlashResult(reply="Queued git commit task…", queued_task=task) | |
| if cmd == "/init": | |
| try: | |
| path = write_agents_md(ws) | |
| return SlashResult(reply=f"wrote `{path}` (project guide for agents)") | |
| except Exception as e: | |
| return SlashResult(reply=f"/init: {e}") | |
| if cmd == "/bg": | |
| session.bg_jobs = list_background_jobs() | |
| return SlashResult(reply=session.bg_jobs or "_No background jobs._") | |
| if cmd == "/timeline": | |
| sid = session.rust.session_id if session.rust else "" | |
| if not sid: | |
| return SlashResult(reply="no saved session yet") | |
| lines = session_timeline(sid) | |
| return SlashResult(reply="**Timeline:**\n" + "\n".join(f"- {ln}" for ln in lines)) | |
| if cmd == "/delete": | |
| removed = session.rust.delete() if session.rust else False | |
| session.rust = None | |
| msg = "deleted session; started a new one" if removed else "started a new session" | |
| return SlashResult(reply=msg, clear_chat=True) | |
| if cmd == "/clear": | |
| return SlashResult(reply="_Transcript cleared._", clear_chat=True) | |
| if cmd == "/mode": | |
| return SlashResult(reply="Cycling mode…", cycle_mode=True) | |
| if cmd == "/think": | |
| if args: | |
| return SlashResult(reply=f"think → {args}", set_think=args.split()[0].lower()) | |
| return SlashResult(reply="Cycling think level…", cycle_think=True) | |
| if cmd == "/config": | |
| if session.rust is None: | |
| return SlashResult(reply="_Start a task first to view config._") | |
| return SlashResult(reply=f"```\n{render_config(session.rust)}\n```") | |
| if cmd == "/search": | |
| if not args: | |
| return SlashResult(reply="Usage: `/search <text>`") | |
| return SlashResult(reply=f"_Search for `{args}` runs in transcript handler._") | |
| if cmd == "/agents": | |
| return SlashResult(reply="Opening agent picker…", open_picker="agents") | |
| if cmd == "/models": | |
| return SlashResult(reply="Opening model picker…", open_picker="models") | |
| if cmd == "/themes": | |
| return SlashResult(reply="Opening theme picker…", open_picker="themes") | |
| if cmd == "/files": | |
| return SlashResult(reply="Toggling sidebar…", toggle_sidebar=True) | |
| if cmd == "/quit": | |
| return SlashResult(reply="_Use browser close to exit the web UI._") | |
| if cmd not in _BUILTIN_SLASH: | |
| name = cmd.lstrip("/") | |
| expanded = expand_command(ws, name, args) | |
| if expanded is not None: | |
| return SlashResult( | |
| reply=f"Running custom command `/{name}`…", | |
| queued_task=expanded, | |
| ) | |
| return SlashResult(reply=f"Unknown command `{cmd}`. Try `/help`.") | |
| def settings_from_ui( | |
| workspace: str, | |
| model: str, | |
| agent: str, | |
| mode: str, | |
| think: str, | |
| yolo: bool, | |
| ) -> UiSettings: | |
| y = yolo or mode == "auto" | |
| ag = "plan" if mode == "plan" else agent | |
| return UiSettings( | |
| workspace=workspace or ".", | |
| model=model or "", | |
| agent=ag, | |
| mode=mode, | |
| think=think, | |
| yolo=y, | |
| ) | |
| def build_settings_panel(preset_models: list[str]) -> dict: | |
| """Return Gradio components for the settings sidebar.""" | |
| with gr.Accordion("⚙️ settings", open=False): | |
| workspace = gr.Textbox( | |
| value=os.environ.get("SMALLCODE_WORKSPACE", "."), | |
| label="workspace directory", | |
| ) | |
| model = gr.Dropdown( | |
| choices=preset_models, | |
| value=preset_models[0] if preset_models else "", | |
| label="model", | |
| allow_custom_value=True, | |
| ) | |
| agent = gr.Dropdown( | |
| choices=["build", "plan"], | |
| value="build", | |
| label="agent", | |
| ) | |
| mode = gr.Radio( | |
| choices=["normal", "auto", "plan"], | |
| value="normal", | |
| label="mode", | |
| ) | |
| think = gr.Dropdown( | |
| choices=["off", "low", "high", "xtra"], | |
| value="off", | |
| label="think level", | |
| ) | |
| yolo = gr.Checkbox(value=False, label="yolo (auto-approve tools)") | |
| return { | |
| "workspace": workspace, | |
| "model": model, | |
| "agent": agent, | |
| "mode": mode, | |
| "think": think, | |
| "yolo": yolo, | |
| } | |
| def file_tree_md(files: dict[str, str], selected: str | None = None) -> str: | |
| """Legacy flat file list (prefer engine.file_tree.build_workspace_panel).""" | |
| if not files: | |
| return "_workspace is empty_" | |
| lines = [] | |
| for path in sorted(files): | |
| mark = " →" if path == selected else "" | |
| lines.append(f"- `{path}`{mark}") | |
| body = files.get(selected or "", "") if selected and selected in files else "" | |
| if body: | |
| lang = "python" if selected.endswith(".py") else "" | |
| return "\n".join(lines) + f"\n\n**`{selected}`**\n```{lang}\n{body}\n```" | |
| return "\n".join(lines) | |