"""Chat-style Gradio UI for a locally-running ``opencode serve``. Prereq: ``opencode serve`` on http://127.0.0.1:4096. Run: uv run --with gradio --with httpx python local_ui.py """ from __future__ import annotations import html as _html import json import threading import time from typing import Any, Generator import gradio as gr import httpx BASE = "http://127.0.0.1:4096" # ── HTTP helpers ──────────────────────────────────────────────────────────── def _get(path: str, **kw) -> Any: r = httpx.get(f"{BASE}{path}", timeout=15, **kw) r.raise_for_status() return r.json() def _create_session() -> str: return httpx.post(f"{BASE}/session", json={"title": "gradio"}, timeout=15).json()["id"] def _fire_async(sid: str, prompt: str) -> None: httpx.post( f"{BASE}/session/{sid}/prompt_async", json={"parts": [{"type": "text", "text": prompt}]}, timeout=30, ).raise_for_status() def _abort(sid: str) -> None: try: httpx.post(f"{BASE}/session/{sid}/abort", timeout=10) except Exception: pass def _session_diff(sid: str) -> list[dict]: try: return _get(f"/session/{sid}/diff") or [] except Exception: return [] def _session_todo(sid: str) -> list[dict]: try: return _get(f"/session/{sid}/todo") or [] except Exception: return [] # ── Server identity ──────────────────────────────────────────────────────── def _banner() -> str: try: h = _get("/global/health") c = _get("/config") prov = (c.get("provider") or {}).get("vllm") or {} opts = prov.get("options") or {} model = c.get("model") or "?" base_url = opts.get("baseURL") or "?" limit = next(iter(prov.get("models", {}).values()), {}).get("limit") or {} try: tools = _get("/experimental/tool/ids") or [] except Exception: tools = [] tool_line = ( f"
tools: {', '.join(_esc(t) for t in tools)}
" if tools else "" ) return ( "{tool_line}" ) except Exception as exc: return f"" # ── SSE ──────────────────────────────────────────────────────────────────── def _stream(sid_filter: str, events: list, stop: threading.Event) -> None: """Tail GET /event, append every frame (caller filters).""" try: with httpx.stream("GET", f"{BASE}/event", timeout=None) as r: for line in r.iter_lines(): if stop.is_set(): return if not line or not line.startswith("data:"): continue try: events.append(json.loads(line[5:].strip())) except Exception: pass except Exception: return # ── Part + delta assembly ────────────────────────────────────────────────── def _assemble(events: list[dict]) -> tuple[list[dict], list[str]]: """Reduce events to ordered parts and collect any error reasons. - ``message.part.updated`` is authoritative per ``part.id``. - ``message.part.delta`` frames for a text part whose last snapshot is shorter than the accumulated delta are appended live so streaming looks smooth. """ order: list[str] = [] latest: dict[str, dict] = {} deltas: dict[str, str] = {} errors: list[str] = [] for ev in events: t = ev.get("type") props = ev.get("properties") or {} if t == "message.part.updated": p = props.get("part") or {} pid = p.get("id") if not pid: continue if pid not in latest: order.append(pid) latest[pid] = p if (p.get("state") or {}).get("status") == "error": err = (p.get("state") or {}).get("error") or "tool error" errors.append(f"{p.get('tool','?')}: {err}") elif t == "message.part.delta": p = props.get("part") or {} pid = p.get("partID") or p.get("id") if not pid: continue delta = p.get("delta") or p.get("text") or "" if isinstance(delta, str) and delta: deltas[pid] = deltas.get(pid, "") + delta elif t in ("error", "client.error"): errors.append(_esc(props.get("reason") or ev.get("reason") or "unknown")) # Splice in any deltas that exceed the latest snapshot (live streaming). parts: list[dict] = [] for pid in order: p = dict(latest[pid]) if p.get("type") == "text" and pid in deltas: if len(deltas[pid]) > len(p.get("text") or ""): p["text"] = deltas[pid] parts.append(p) return parts, errors # ── Rendering ────────────────────────────────────────────────────────────── def _esc(s: Any) -> str: return _html.escape("" if s is None else str(s)) def _cap(s: str, n: int = 6000) -> str: if len(s) <= n: return s return s[:n] + f"\n… ({len(s) - n} chars hidden)" def _fmt_tool(name: str, state: dict, raw: dict) -> str: status = (state or {}).get("status") or "?" inp = (state or {}).get("input") or raw.get("input") or {} out = (state or {}).get("output") or raw.get("output") or "" badge = {"completed": "ok", "error": "err", "running": "run"}.get(status, "") if name == "read": summary = f"📖 read {_esc(inp.get('filePath') or inp.get('path'))}" body = f"
{_esc(_cap(str(out)))}
" elif name == "write": path = inp.get("filePath") or inp.get("path") content = inp.get("content") or "" summary = f"✍️ write {_esc(path)} ({len(content)} chars)" body = f"
{_esc(_cap(content))}
" elif name == "edit": path = inp.get("filePath") or inp.get("path") old = inp.get("oldString") or "" new = inp.get("newString") or "" summary = f"✏️ edit {_esc(path)}" body = ( f"
- old
{_esc(_cap(old, 3000))}
" f"
+ new
{_esc(_cap(new, 3000))}
" ) if out: body += f"
output
{_esc(_cap(str(out), 2000))}
" elif name == "bash": cmd = inp.get("command") or inp.get("cmd") or "" summary = f"⚡ bash {_esc(cmd[:160])}" body = f"
{_esc(_cap(str(out)))}
" elif name in ("glob", "find"): pattern = inp.get("pattern") or inp.get("query") or "" summary = f"🔎 {name} {_esc(pattern)}" body = f"
{_esc(_cap(str(out), 4000))}
" elif name == "grep": pattern = inp.get("pattern") or "" path = inp.get("path") or "" summary = f"🔎 grep {_esc(pattern)}" + ( f" in {_esc(path)}" if path else "" ) body = f"
{_esc(_cap(str(out), 4000))}
" elif name == "todowrite": todos = inp.get("todos") or [] summary = f"📝 todowrite ({len(todos)} items)" body = "" elif name == "task": desc = inp.get("description") or inp.get("prompt") or "" summary = f"🧩 task — {_esc(desc[:160])}" body = f"
{_esc(_cap(str(out), 4000))}
" elif name == "webfetch": summary = f"🌐 webfetch {_esc(inp.get('url'))}" body = f"
{_esc(_cap(str(out), 4000))}
" else: summary = f"🔧 {_esc(name)}" body = ( f"
input
{_esc(_cap(json.dumps(inp, indent=2, default=str), 4000))}
" f"
output
{_esc(_cap(str(out), 4000))}
" ) return ( "
" f"{summary} {_esc(status)}" f"
{body}
" "
" ) def _todo_icon(status: str | None) -> str: return {"completed": "✅", "in_progress": "🔄"}.get(status or "", "⏳") def _render_transcript(parts: list[dict], errors: list[str]) -> str: out: list[str] = [] if errors: out.append( "
⚠️ errors
" ) if not parts: out.append("
waiting for first part…
") return "".join(out) out.append("
") for p in parts: t = p.get("type") if t == "step-start": out.append("
── new step ──
") elif t == "reasoning": txt = (p.get("text") or "").strip() if txt: out.append( "
🧠 reasoning" f"
{_esc(_cap(txt, 4000))}
" ) elif t == "text": txt = (p.get("text") or "").strip() if txt: out.append(f"
{_esc(txt)}
") elif t == "tool": out.append(_fmt_tool(p.get("tool") or "?", p.get("state") or {}, p)) elif t == "step-finish": tokens = p.get("tokens") or (p.get("state") or {}).get("tokens") or {} if tokens: out.append(f"
tokens: {_esc(json.dumps(tokens, default=str))}
") out.append("
") return "".join(out) def _render_todo(todos: list[dict]) -> str: if not todos: return "" items = "".join( f"
  • {_todo_icon(t.get('status'))} {_esc(t.get('content') or t.get('text',''))}
  • " for t in todos ) return f"
    plan
    " def _render_diff(diffs: list[dict]) -> str: if not diffs: return "" blocks = [] for d in diffs: path = d.get("path") or d.get("file") or "?" patch = d.get("patch") or d.get("diff") or "" blocks.append( f"
    {_esc(path)}" f"
    {_esc(_cap(patch, 6000))}
    " ) return ( "
    " f"📋 session diff ({len(diffs)} files)" f"{''.join(blocks)}
    " ) # ── State ────────────────────────────────────────────────────────────────── class _State: sid: str = "" # empty → next Run creates a new session stop: threading.Event | None = None events: list[dict] = [] # reset per session sse_thread: threading.Thread | None = None _STATE = _State() def _ensure_session() -> str: """Create a session if none exists; reuse across runs for multi-turn.""" if _STATE.sid: return _STATE.sid _STATE.sid = _create_session() _STATE.stop = threading.Event() _STATE.events = [] _STATE.sse_thread = threading.Thread( target=_stream, args=(_STATE.sid, _STATE.events, _STATE.stop), daemon=True ) _STATE.sse_thread.start() time.sleep(0.15) return _STATE.sid def _new_session_cb() -> tuple[str, str, str, str]: """Tear down any existing SSE and clear state. Next Run opens a fresh session.""" if _STATE.stop: _STATE.stop.set() if _STATE.sid: _abort(_STATE.sid) _STATE.sid = "" _STATE.stop = None _STATE.events = [] return ( "✨ new session — Run to start", "", # transcript "", # todo "", # diff ) # ── Main ─────────────────────────────────────────────────────────────────── def run(prompt: str) -> Generator[tuple[str, str, str, str], None, None]: try: sid = _ensure_session() except Exception as exc: yield f"❌ session create failed: {exc}", "", "", "" return # Snapshot the event index BEFORE firing — "idle for THIS turn" must be # scoped to events that arrive after the prompt is sent, otherwise the # idle frame from the previous turn fires the break immediately. turn_start = len(_STATE.events) try: _fire_async(sid, prompt) except Exception as exc: yield f"❌ prompt failed: {exc}", "", "", "" return t0 = time.time() last_todo_refresh = 0.0 todos: list[dict] = [] while time.time() - t0 < 600: new_events = _STATE.events[turn_start:] idle = any(e.get("type") in ("session.idle", "idle") for e in new_events) parts, errors = _assemble(_STATE.events) if time.time() - last_todo_refresh > 3.0: todos = _session_todo(sid) last_todo_refresh = time.time() status = ( f"{'✅ idle' if idle else '⚡ running'} · " f"session {sid[:18]}… · " f"{time.time()-t0:.1f}s · {len(parts)} parts · {len(_STATE.events)} events" ) diff_html = "" if idle: diff_html = _render_diff(_session_diff(sid)) yield status, _render_transcript(parts, errors), _render_todo(todos), diff_html if idle: break time.sleep(0.4) def abort_cb() -> str: if _STATE.sid: _abort(_STATE.sid) # leave SSE open so user sees the abort-related events; actual teardown on new session return "⏹ aborted (session kept — click New session to clear)" def refresh_banner() -> str: return _banner() # ── CSS ──────────────────────────────────────────────────────────────────── _CSS = """ .banner { margin:4px 0 2px; } .tools { font-size:11px; color:#888; margin:2px 0 8px; } .chip { display:inline-block; padding:2px 8px; margin:2px; border-radius:10px; background:#2b2d31; color:#ddd; font-size:12px; } .chip.ok { background:#1f6f43; } .chip.err { background:#7a1e1e; } .chip code { background:transparent; color:#9ad; } .errbox { background:#2a1414; border:1px solid #7a1e1e; border-radius:6px; padding:6px 10px; margin:6px 0; color:#f88; font-size:13px; } .errbox ul { margin:2px 0 0 18px; } .chat { font-size:14px; } .assistant pre { background:#0e1013; padding:10px; border-radius:8px; white-space:pre-wrap; color:#eee; margin:6px 0; } .reasoning { opacity:0.8; margin:4px 0; } .reasoning pre { background:#0a0b0d; color:#aab; padding:8px; white-space:pre-wrap; } .tool { border:1px solid #2a2f3a; border-radius:8px; padding:6px 10px; margin:6px 0; background:#12161c; } .tool summary { cursor:pointer; color:#ddd; } .tool code { background:#222; color:#9cf; padding:1px 4px; border-radius:3px; } .tbody { margin-top:6px; } .tbody pre { background:#0a0b0d; padding:8px; border-radius:4px; white-space:pre-wrap; max-height:400px; overflow:auto; font-size:12px; color:#ddd; margin:2px 0; } .tbody pre.add { border-left:3px solid #2e6; } .tbody pre.del { border-left:3px solid #e53; } .tbody .lbl { color:#888; font-size:11px; margin-top:6px; } .badge { padding:1px 6px; border-radius:8px; font-size:11px; background:#333; color:#ddd; } .badge.ok { background:#1f6f43; color:white; } .badge.err { background:#7a1e1e; color:white; } .badge.run { background:#7a5c1e; color:white; } .step { color:#555; text-align:center; margin:10px 0; font-size:11px; } .stepfin { color:#666; font-size:11px; margin:4px 0 12px; } .empty { color:#666; font-style:italic; padding:12px; } .todostrip { background:#14181e; border:1px solid #2a2f3a; border-radius:6px; padding:6px 10px; margin:6px 0; font-size:13px; } .todostrip ul { margin:4px 0 0 18px; } .diff-wrap { margin:8px 0; } .diff summary { cursor:pointer; color:#9ad; font-family:monospace; } .diff pre { background:#0a0b0d; padding:8px; border-radius:4px; white-space:pre; font-size:12px; color:#ddd; overflow:auto; } """ # ── Layout ───────────────────────────────────────────────────────────────── with gr.Blocks(title="opencode serve", css=_CSS) as demo: banner_html = gr.HTML(value="_(loading…)_") status_md = gr.Markdown() todo_html = gr.HTML() transcript_html = gr.HTML(value="
    run a prompt to start
    ") diff_html = gr.HTML() with gr.Row(): prompt = gr.Textbox( label="Prompt", value="Write fizzbuzz.py that prints FizzBuzz for 1..15 and run it.", lines=3, scale=5, ) run_btn = gr.Button("▶ Run", variant="primary", scale=1) with gr.Column(scale=1, min_width=120): abort_btn = gr.Button("⏹ Abort", variant="stop") new_btn = gr.Button("✨ New session") run_btn.click( run, inputs=[prompt], outputs=[status_md, transcript_html, todo_html, diff_html], ) abort_btn.click(abort_cb, outputs=[status_md]) new_btn.click( _new_session_cb, outputs=[status_md, transcript_html, todo_html, diff_html], ) demo.load(refresh_banner, outputs=[banner_html]) if __name__ == "__main__": import os demo.queue().launch( server_name="0.0.0.0", server_port=int(os.environ.get("GRADIO_PORT", "7861")), share=True, show_error=True, )