Spaces:
Sleeping
Sleeping
| """Chat-style Gradio UI for a locally-running ``opencode serve``. | |
| Prereq: ``opencode serve`` on http://127.0.0.1:4096. | |
| Run: | |
| uv run --with gradio --with httpx python local_ui.py | |
| """ | |
| from __future__ import annotations | |
| import html as _html | |
| import json | |
| import threading | |
| import time | |
| from typing import Any, Generator | |
| import gradio as gr | |
| import httpx | |
| BASE = "http://127.0.0.1:4096" | |
| # ββ HTTP helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _get(path: str, **kw) -> Any: | |
| r = httpx.get(f"{BASE}{path}", timeout=15, **kw) | |
| r.raise_for_status() | |
| return r.json() | |
| def _create_session() -> str: | |
| return httpx.post(f"{BASE}/session", json={"title": "gradio"}, timeout=15).json()["id"] | |
| def _fire_async(sid: str, prompt: str) -> None: | |
| httpx.post( | |
| f"{BASE}/session/{sid}/prompt_async", | |
| json={"parts": [{"type": "text", "text": prompt}]}, | |
| timeout=30, | |
| ).raise_for_status() | |
| def _abort(sid: str) -> None: | |
| try: | |
| httpx.post(f"{BASE}/session/{sid}/abort", timeout=10) | |
| except Exception: | |
| pass | |
| def _session_diff(sid: str) -> list[dict]: | |
| try: | |
| return _get(f"/session/{sid}/diff") or [] | |
| except Exception: | |
| return [] | |
| def _session_todo(sid: str) -> list[dict]: | |
| try: | |
| return _get(f"/session/{sid}/todo") or [] | |
| except Exception: | |
| return [] | |
| # ββ Server identity ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _banner() -> str: | |
| try: | |
| h = _get("/global/health") | |
| c = _get("/config") | |
| prov = (c.get("provider") or {}).get("vllm") or {} | |
| opts = prov.get("options") or {} | |
| model = c.get("model") or "?" | |
| base_url = opts.get("baseURL") or "?" | |
| limit = next(iter(prov.get("models", {}).values()), {}).get("limit") or {} | |
| try: | |
| tools = _get("/experimental/tool/ids") or [] | |
| except Exception: | |
| tools = [] | |
| tool_line = ( | |
| f"<div class='tools'>tools: {', '.join(_esc(t) for t in tools)}</div>" | |
| if tools else "" | |
| ) | |
| return ( | |
| "<div class='banner'>" | |
| f"<span class='chip ok'>opencode v{_esc(h.get('version','?'))}</span> " | |
| f"<span class='chip'>model: <code>{_esc(model)}</code></span> " | |
| f"<span class='chip'>baseURL: <code>{_esc(base_url)}</code></span> " | |
| f"<span class='chip'>ctx: <code>{limit.get('context','?')}</code></span> " | |
| f"<span class='chip'>out: <code>{limit.get('output','?')}</code></span>" | |
| f"</div>{tool_line}" | |
| ) | |
| except Exception as exc: | |
| return f"<div class='banner'><span class='chip err'>server unreachable: {_esc(exc)}</span></div>" | |
| # ββ SSE ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _stream(sid_filter: str, events: list, stop: threading.Event) -> None: | |
| """Tail GET /event, append every frame (caller filters).""" | |
| try: | |
| with httpx.stream("GET", f"{BASE}/event", timeout=None) as r: | |
| for line in r.iter_lines(): | |
| if stop.is_set(): | |
| return | |
| if not line or not line.startswith("data:"): | |
| continue | |
| try: | |
| events.append(json.loads(line[5:].strip())) | |
| except Exception: | |
| pass | |
| except Exception: | |
| return | |
| # ββ Part + delta assembly ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _assemble(events: list[dict]) -> tuple[list[dict], list[str]]: | |
| """Reduce events to ordered parts and collect any error reasons. | |
| - ``message.part.updated`` is authoritative per ``part.id``. | |
| - ``message.part.delta`` frames for a text part whose last snapshot is | |
| shorter than the accumulated delta are appended live so streaming | |
| looks smooth. | |
| """ | |
| order: list[str] = [] | |
| latest: dict[str, dict] = {} | |
| deltas: dict[str, str] = {} | |
| errors: list[str] = [] | |
| for ev in events: | |
| t = ev.get("type") | |
| props = ev.get("properties") or {} | |
| if t == "message.part.updated": | |
| p = props.get("part") or {} | |
| pid = p.get("id") | |
| if not pid: | |
| continue | |
| if pid not in latest: | |
| order.append(pid) | |
| latest[pid] = p | |
| if (p.get("state") or {}).get("status") == "error": | |
| err = (p.get("state") or {}).get("error") or "tool error" | |
| errors.append(f"{p.get('tool','?')}: {err}") | |
| elif t == "message.part.delta": | |
| p = props.get("part") or {} | |
| pid = p.get("partID") or p.get("id") | |
| if not pid: | |
| continue | |
| delta = p.get("delta") or p.get("text") or "" | |
| if isinstance(delta, str) and delta: | |
| deltas[pid] = deltas.get(pid, "") + delta | |
| elif t in ("error", "client.error"): | |
| errors.append(_esc(props.get("reason") or ev.get("reason") or "unknown")) | |
| # Splice in any deltas that exceed the latest snapshot (live streaming). | |
| parts: list[dict] = [] | |
| for pid in order: | |
| p = dict(latest[pid]) | |
| if p.get("type") == "text" and pid in deltas: | |
| if len(deltas[pid]) > len(p.get("text") or ""): | |
| p["text"] = deltas[pid] | |
| parts.append(p) | |
| return parts, errors | |
| # ββ Rendering ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _esc(s: Any) -> str: | |
| return _html.escape("" if s is None else str(s)) | |
| def _cap(s: str, n: int = 6000) -> str: | |
| if len(s) <= n: | |
| return s | |
| return s[:n] + f"\n⦠({len(s) - n} chars hidden)" | |
| def _fmt_tool(name: str, state: dict, raw: dict) -> str: | |
| status = (state or {}).get("status") or "?" | |
| inp = (state or {}).get("input") or raw.get("input") or {} | |
| out = (state or {}).get("output") or raw.get("output") or "" | |
| badge = {"completed": "ok", "error": "err", "running": "run"}.get(status, "") | |
| if name == "read": | |
| summary = f"π read <code>{_esc(inp.get('filePath') or inp.get('path'))}</code>" | |
| body = f"<pre>{_esc(_cap(str(out)))}</pre>" | |
| elif name == "write": | |
| path = inp.get("filePath") or inp.get("path") | |
| content = inp.get("content") or "" | |
| summary = f"βοΈ write <code>{_esc(path)}</code> ({len(content)} chars)" | |
| body = f"<pre>{_esc(_cap(content))}</pre>" | |
| elif name == "edit": | |
| path = inp.get("filePath") or inp.get("path") | |
| old = inp.get("oldString") or "" | |
| new = inp.get("newString") or "" | |
| summary = f"βοΈ edit <code>{_esc(path)}</code>" | |
| body = ( | |
| f"<div class='lbl'>- old</div><pre class='del'>{_esc(_cap(old, 3000))}</pre>" | |
| f"<div class='lbl'>+ new</div><pre class='add'>{_esc(_cap(new, 3000))}</pre>" | |
| ) | |
| if out: | |
| body += f"<div class='lbl'>output</div><pre>{_esc(_cap(str(out), 2000))}</pre>" | |
| elif name == "bash": | |
| cmd = inp.get("command") or inp.get("cmd") or "" | |
| summary = f"β‘ bash <code>{_esc(cmd[:160])}</code>" | |
| body = f"<pre>{_esc(_cap(str(out)))}</pre>" | |
| elif name in ("glob", "find"): | |
| pattern = inp.get("pattern") or inp.get("query") or "" | |
| summary = f"π {name} <code>{_esc(pattern)}</code>" | |
| body = f"<pre>{_esc(_cap(str(out), 4000))}</pre>" | |
| elif name == "grep": | |
| pattern = inp.get("pattern") or "" | |
| path = inp.get("path") or "" | |
| summary = f"π grep <code>{_esc(pattern)}</code>" + ( | |
| f" in <code>{_esc(path)}</code>" if path else "" | |
| ) | |
| body = f"<pre>{_esc(_cap(str(out), 4000))}</pre>" | |
| elif name == "todowrite": | |
| todos = inp.get("todos") or [] | |
| summary = f"π todowrite ({len(todos)} items)" | |
| body = "<ul>" + "".join( | |
| f"<li>{_todo_icon(t.get('status'))} {_esc(t.get('content'))}</li>" | |
| for t in todos | |
| ) + "</ul>" | |
| elif name == "task": | |
| desc = inp.get("description") or inp.get("prompt") or "" | |
| summary = f"π§© task β {_esc(desc[:160])}" | |
| body = f"<pre>{_esc(_cap(str(out), 4000))}</pre>" | |
| elif name == "webfetch": | |
| summary = f"π webfetch <code>{_esc(inp.get('url'))}</code>" | |
| body = f"<pre>{_esc(_cap(str(out), 4000))}</pre>" | |
| else: | |
| summary = f"π§ {_esc(name)}" | |
| body = ( | |
| f"<div class='lbl'>input</div><pre>{_esc(_cap(json.dumps(inp, indent=2, default=str), 4000))}</pre>" | |
| f"<div class='lbl'>output</div><pre>{_esc(_cap(str(out), 4000))}</pre>" | |
| ) | |
| return ( | |
| "<details class='tool' open>" | |
| f"<summary>{summary} <span class='badge {badge}'>{_esc(status)}</span></summary>" | |
| f"<div class='tbody'>{body}</div>" | |
| "</details>" | |
| ) | |
| def _todo_icon(status: str | None) -> str: | |
| return {"completed": "β ", "in_progress": "π"}.get(status or "", "β³") | |
| def _render_transcript(parts: list[dict], errors: list[str]) -> str: | |
| out: list[str] = [] | |
| if errors: | |
| out.append( | |
| "<div class='errbox'><b>β οΈ errors</b><ul>" | |
| + "".join(f"<li>{_esc(e)}</li>" for e in errors[:8]) | |
| + "</ul></div>" | |
| ) | |
| if not parts: | |
| out.append("<div class='empty'>waiting for first partβ¦</div>") | |
| return "".join(out) | |
| out.append("<div class='chat'>") | |
| for p in parts: | |
| t = p.get("type") | |
| if t == "step-start": | |
| out.append("<div class='step'>ββ new step ββ</div>") | |
| elif t == "reasoning": | |
| txt = (p.get("text") or "").strip() | |
| if txt: | |
| out.append( | |
| "<details class='reasoning'><summary>π§ reasoning</summary>" | |
| f"<pre>{_esc(_cap(txt, 4000))}</pre></details>" | |
| ) | |
| elif t == "text": | |
| txt = (p.get("text") or "").strip() | |
| if txt: | |
| out.append(f"<div class='assistant'><pre>{_esc(txt)}</pre></div>") | |
| elif t == "tool": | |
| out.append(_fmt_tool(p.get("tool") or "?", p.get("state") or {}, p)) | |
| elif t == "step-finish": | |
| tokens = p.get("tokens") or (p.get("state") or {}).get("tokens") or {} | |
| if tokens: | |
| out.append(f"<div class='stepfin'>tokens: {_esc(json.dumps(tokens, default=str))}</div>") | |
| out.append("</div>") | |
| return "".join(out) | |
| def _render_todo(todos: list[dict]) -> str: | |
| if not todos: | |
| return "" | |
| items = "".join( | |
| f"<li>{_todo_icon(t.get('status'))} {_esc(t.get('content') or t.get('text',''))}</li>" | |
| for t in todos | |
| ) | |
| return f"<div class='todostrip'><b>plan</b><ul>{items}</ul></div>" | |
| def _render_diff(diffs: list[dict]) -> str: | |
| if not diffs: | |
| return "" | |
| blocks = [] | |
| for d in diffs: | |
| path = d.get("path") or d.get("file") or "?" | |
| patch = d.get("patch") or d.get("diff") or "" | |
| blocks.append( | |
| f"<details class='diff'><summary>{_esc(path)}</summary>" | |
| f"<pre>{_esc(_cap(patch, 6000))}</pre></details>" | |
| ) | |
| return ( | |
| "<details class='diff-wrap' open>" | |
| f"<summary>π session diff ({len(diffs)} files)</summary>" | |
| f"{''.join(blocks)}</details>" | |
| ) | |
| # ββ State ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class _State: | |
| sid: str = "" # empty β next Run creates a new session | |
| stop: threading.Event | None = None | |
| events: list[dict] = [] # reset per session | |
| sse_thread: threading.Thread | None = None | |
| _STATE = _State() | |
| def _ensure_session() -> str: | |
| """Create a session if none exists; reuse across runs for multi-turn.""" | |
| if _STATE.sid: | |
| return _STATE.sid | |
| _STATE.sid = _create_session() | |
| _STATE.stop = threading.Event() | |
| _STATE.events = [] | |
| _STATE.sse_thread = threading.Thread( | |
| target=_stream, args=(_STATE.sid, _STATE.events, _STATE.stop), daemon=True | |
| ) | |
| _STATE.sse_thread.start() | |
| time.sleep(0.15) | |
| return _STATE.sid | |
| def _new_session_cb() -> tuple[str, str, str, str]: | |
| """Tear down any existing SSE and clear state. Next Run opens a fresh session.""" | |
| if _STATE.stop: | |
| _STATE.stop.set() | |
| if _STATE.sid: | |
| _abort(_STATE.sid) | |
| _STATE.sid = "" | |
| _STATE.stop = None | |
| _STATE.events = [] | |
| return ( | |
| "β¨ new session β Run to start", | |
| "", # transcript | |
| "", # todo | |
| "", # diff | |
| ) | |
| # ββ Main βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def run(prompt: str) -> Generator[tuple[str, str, str, str], None, None]: | |
| try: | |
| sid = _ensure_session() | |
| except Exception as exc: | |
| yield f"β session create failed: {exc}", "", "", "" | |
| return | |
| # Snapshot the event index BEFORE firing β "idle for THIS turn" must be | |
| # scoped to events that arrive after the prompt is sent, otherwise the | |
| # idle frame from the previous turn fires the break immediately. | |
| turn_start = len(_STATE.events) | |
| try: | |
| _fire_async(sid, prompt) | |
| except Exception as exc: | |
| yield f"β prompt failed: {exc}", "", "", "" | |
| return | |
| t0 = time.time() | |
| last_todo_refresh = 0.0 | |
| todos: list[dict] = [] | |
| while time.time() - t0 < 600: | |
| new_events = _STATE.events[turn_start:] | |
| idle = any(e.get("type") in ("session.idle", "idle") for e in new_events) | |
| parts, errors = _assemble(_STATE.events) | |
| if time.time() - last_todo_refresh > 3.0: | |
| todos = _session_todo(sid) | |
| last_todo_refresh = time.time() | |
| status = ( | |
| f"{'β idle' if idle else 'β‘ running'} Β· " | |
| f"session <code>{sid[:18]}β¦</code> Β· " | |
| f"{time.time()-t0:.1f}s Β· {len(parts)} parts Β· {len(_STATE.events)} events" | |
| ) | |
| diff_html = "" | |
| if idle: | |
| diff_html = _render_diff(_session_diff(sid)) | |
| yield status, _render_transcript(parts, errors), _render_todo(todos), diff_html | |
| if idle: | |
| break | |
| time.sleep(0.4) | |
| def abort_cb() -> str: | |
| if _STATE.sid: | |
| _abort(_STATE.sid) | |
| # leave SSE open so user sees the abort-related events; actual teardown on new session | |
| return "βΉ aborted (session kept β click New session to clear)" | |
| def refresh_banner() -> str: | |
| return _banner() | |
| # ββ CSS ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _CSS = """ | |
| .banner { margin:4px 0 2px; } | |
| .tools { font-size:11px; color:#888; margin:2px 0 8px; } | |
| .chip { display:inline-block; padding:2px 8px; margin:2px; border-radius:10px; | |
| background:#2b2d31; color:#ddd; font-size:12px; } | |
| .chip.ok { background:#1f6f43; } | |
| .chip.err { background:#7a1e1e; } | |
| .chip code { background:transparent; color:#9ad; } | |
| .errbox { background:#2a1414; border:1px solid #7a1e1e; border-radius:6px; | |
| padding:6px 10px; margin:6px 0; color:#f88; font-size:13px; } | |
| .errbox ul { margin:2px 0 0 18px; } | |
| .chat { font-size:14px; } | |
| .assistant pre { background:#0e1013; padding:10px; border-radius:8px; | |
| white-space:pre-wrap; color:#eee; margin:6px 0; } | |
| .reasoning { opacity:0.8; margin:4px 0; } | |
| .reasoning pre { background:#0a0b0d; color:#aab; padding:8px; white-space:pre-wrap; } | |
| .tool { border:1px solid #2a2f3a; border-radius:8px; padding:6px 10px; | |
| margin:6px 0; background:#12161c; } | |
| .tool summary { cursor:pointer; color:#ddd; } | |
| .tool code { background:#222; color:#9cf; padding:1px 4px; border-radius:3px; } | |
| .tbody { margin-top:6px; } | |
| .tbody pre { background:#0a0b0d; padding:8px; border-radius:4px; | |
| white-space:pre-wrap; max-height:400px; overflow:auto; | |
| font-size:12px; color:#ddd; margin:2px 0; } | |
| .tbody pre.add { border-left:3px solid #2e6; } | |
| .tbody pre.del { border-left:3px solid #e53; } | |
| .tbody .lbl { color:#888; font-size:11px; margin-top:6px; } | |
| .badge { padding:1px 6px; border-radius:8px; font-size:11px; | |
| background:#333; color:#ddd; } | |
| .badge.ok { background:#1f6f43; color:white; } | |
| .badge.err { background:#7a1e1e; color:white; } | |
| .badge.run { background:#7a5c1e; color:white; } | |
| .step { color:#555; text-align:center; margin:10px 0; font-size:11px; } | |
| .stepfin { color:#666; font-size:11px; margin:4px 0 12px; } | |
| .empty { color:#666; font-style:italic; padding:12px; } | |
| .todostrip { background:#14181e; border:1px solid #2a2f3a; border-radius:6px; | |
| padding:6px 10px; margin:6px 0; font-size:13px; } | |
| .todostrip ul { margin:4px 0 0 18px; } | |
| .diff-wrap { margin:8px 0; } | |
| .diff summary { cursor:pointer; color:#9ad; font-family:monospace; } | |
| .diff pre { background:#0a0b0d; padding:8px; border-radius:4px; | |
| white-space:pre; font-size:12px; color:#ddd; overflow:auto; } | |
| """ | |
| # ββ Layout βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Blocks(title="opencode serve", css=_CSS) as demo: | |
| banner_html = gr.HTML(value="_(loadingβ¦)_") | |
| status_md = gr.Markdown() | |
| todo_html = gr.HTML() | |
| transcript_html = gr.HTML(value="<div class='empty'>run a prompt to start</div>") | |
| diff_html = gr.HTML() | |
| with gr.Row(): | |
| prompt = gr.Textbox( | |
| label="Prompt", | |
| value="Write fizzbuzz.py that prints FizzBuzz for 1..15 and run it.", | |
| lines=3, | |
| scale=5, | |
| ) | |
| run_btn = gr.Button("βΆ Run", variant="primary", scale=1) | |
| with gr.Column(scale=1, min_width=120): | |
| abort_btn = gr.Button("βΉ Abort", variant="stop") | |
| new_btn = gr.Button("β¨ New session") | |
| run_btn.click( | |
| run, | |
| inputs=[prompt], | |
| outputs=[status_md, transcript_html, todo_html, diff_html], | |
| ) | |
| abort_btn.click(abort_cb, outputs=[status_md]) | |
| new_btn.click( | |
| _new_session_cb, | |
| outputs=[status_md, transcript_html, todo_html, diff_html], | |
| ) | |
| demo.load(refresh_banner, outputs=[banner_html]) | |
| if __name__ == "__main__": | |
| import os | |
| demo.queue().launch( | |
| server_name="0.0.0.0", | |
| server_port=int(os.environ.get("GRADIO_PORT", "7861")), | |
| share=True, | |
| show_error=True, | |
| ) | |