opencode-env-rollout / local_ui.py
AdithyaSK's picture
AdithyaSK HF Staff
Upload folder using huggingface_hub
d4d3fde verified
"""Chat-style Gradio UI for a locally-running ``opencode serve``.
Prereq: ``opencode serve`` on http://127.0.0.1:4096.
Run:
uv run --with gradio --with httpx python local_ui.py
"""
from __future__ import annotations
import html as _html
import json
import threading
import time
from typing import Any, Generator
import gradio as gr
import httpx
BASE = "http://127.0.0.1:4096"
# ── HTTP helpers ────────────────────────────────────────────────────────────
def _get(path: str, **kw) -> Any:
r = httpx.get(f"{BASE}{path}", timeout=15, **kw)
r.raise_for_status()
return r.json()
def _create_session() -> str:
return httpx.post(f"{BASE}/session", json={"title": "gradio"}, timeout=15).json()["id"]
def _fire_async(sid: str, prompt: str) -> None:
httpx.post(
f"{BASE}/session/{sid}/prompt_async",
json={"parts": [{"type": "text", "text": prompt}]},
timeout=30,
).raise_for_status()
def _abort(sid: str) -> None:
try:
httpx.post(f"{BASE}/session/{sid}/abort", timeout=10)
except Exception:
pass
def _session_diff(sid: str) -> list[dict]:
try:
return _get(f"/session/{sid}/diff") or []
except Exception:
return []
def _session_todo(sid: str) -> list[dict]:
try:
return _get(f"/session/{sid}/todo") or []
except Exception:
return []
# ── Server identity ────────────────────────────────────────────────────────
def _banner() -> str:
try:
h = _get("/global/health")
c = _get("/config")
prov = (c.get("provider") or {}).get("vllm") or {}
opts = prov.get("options") or {}
model = c.get("model") or "?"
base_url = opts.get("baseURL") or "?"
limit = next(iter(prov.get("models", {}).values()), {}).get("limit") or {}
try:
tools = _get("/experimental/tool/ids") or []
except Exception:
tools = []
tool_line = (
f"<div class='tools'>tools: {', '.join(_esc(t) for t in tools)}</div>"
if tools else ""
)
return (
"<div class='banner'>"
f"<span class='chip ok'>opencode v{_esc(h.get('version','?'))}</span> "
f"<span class='chip'>model: <code>{_esc(model)}</code></span> "
f"<span class='chip'>baseURL: <code>{_esc(base_url)}</code></span> "
f"<span class='chip'>ctx: <code>{limit.get('context','?')}</code></span> "
f"<span class='chip'>out: <code>{limit.get('output','?')}</code></span>"
f"</div>{tool_line}"
)
except Exception as exc:
return f"<div class='banner'><span class='chip err'>server unreachable: {_esc(exc)}</span></div>"
# ── SSE ────────────────────────────────────────────────────────────────────
def _stream(sid_filter: str, events: list, stop: threading.Event) -> None:
"""Tail GET /event, append every frame (caller filters)."""
try:
with httpx.stream("GET", f"{BASE}/event", timeout=None) as r:
for line in r.iter_lines():
if stop.is_set():
return
if not line or not line.startswith("data:"):
continue
try:
events.append(json.loads(line[5:].strip()))
except Exception:
pass
except Exception:
return
# ── Part + delta assembly ──────────────────────────────────────────────────
def _assemble(events: list[dict]) -> tuple[list[dict], list[str]]:
"""Reduce events to ordered parts and collect any error reasons.
- ``message.part.updated`` is authoritative per ``part.id``.
- ``message.part.delta`` frames for a text part whose last snapshot is
shorter than the accumulated delta are appended live so streaming
looks smooth.
"""
order: list[str] = []
latest: dict[str, dict] = {}
deltas: dict[str, str] = {}
errors: list[str] = []
for ev in events:
t = ev.get("type")
props = ev.get("properties") or {}
if t == "message.part.updated":
p = props.get("part") or {}
pid = p.get("id")
if not pid:
continue
if pid not in latest:
order.append(pid)
latest[pid] = p
if (p.get("state") or {}).get("status") == "error":
err = (p.get("state") or {}).get("error") or "tool error"
errors.append(f"{p.get('tool','?')}: {err}")
elif t == "message.part.delta":
p = props.get("part") or {}
pid = p.get("partID") or p.get("id")
if not pid:
continue
delta = p.get("delta") or p.get("text") or ""
if isinstance(delta, str) and delta:
deltas[pid] = deltas.get(pid, "") + delta
elif t in ("error", "client.error"):
errors.append(_esc(props.get("reason") or ev.get("reason") or "unknown"))
# Splice in any deltas that exceed the latest snapshot (live streaming).
parts: list[dict] = []
for pid in order:
p = dict(latest[pid])
if p.get("type") == "text" and pid in deltas:
if len(deltas[pid]) > len(p.get("text") or ""):
p["text"] = deltas[pid]
parts.append(p)
return parts, errors
# ── Rendering ──────────────────────────────────────────────────────────────
def _esc(s: Any) -> str:
return _html.escape("" if s is None else str(s))
def _cap(s: str, n: int = 6000) -> str:
if len(s) <= n:
return s
return s[:n] + f"\n… ({len(s) - n} chars hidden)"
def _fmt_tool(name: str, state: dict, raw: dict) -> str:
status = (state or {}).get("status") or "?"
inp = (state or {}).get("input") or raw.get("input") or {}
out = (state or {}).get("output") or raw.get("output") or ""
badge = {"completed": "ok", "error": "err", "running": "run"}.get(status, "")
if name == "read":
summary = f"πŸ“– read <code>{_esc(inp.get('filePath') or inp.get('path'))}</code>"
body = f"<pre>{_esc(_cap(str(out)))}</pre>"
elif name == "write":
path = inp.get("filePath") or inp.get("path")
content = inp.get("content") or ""
summary = f"✍️ write <code>{_esc(path)}</code> ({len(content)} chars)"
body = f"<pre>{_esc(_cap(content))}</pre>"
elif name == "edit":
path = inp.get("filePath") or inp.get("path")
old = inp.get("oldString") or ""
new = inp.get("newString") or ""
summary = f"✏️ edit <code>{_esc(path)}</code>"
body = (
f"<div class='lbl'>- old</div><pre class='del'>{_esc(_cap(old, 3000))}</pre>"
f"<div class='lbl'>+ new</div><pre class='add'>{_esc(_cap(new, 3000))}</pre>"
)
if out:
body += f"<div class='lbl'>output</div><pre>{_esc(_cap(str(out), 2000))}</pre>"
elif name == "bash":
cmd = inp.get("command") or inp.get("cmd") or ""
summary = f"⚑ bash <code>{_esc(cmd[:160])}</code>"
body = f"<pre>{_esc(_cap(str(out)))}</pre>"
elif name in ("glob", "find"):
pattern = inp.get("pattern") or inp.get("query") or ""
summary = f"πŸ”Ž {name} <code>{_esc(pattern)}</code>"
body = f"<pre>{_esc(_cap(str(out), 4000))}</pre>"
elif name == "grep":
pattern = inp.get("pattern") or ""
path = inp.get("path") or ""
summary = f"πŸ”Ž grep <code>{_esc(pattern)}</code>" + (
f" in <code>{_esc(path)}</code>" if path else ""
)
body = f"<pre>{_esc(_cap(str(out), 4000))}</pre>"
elif name == "todowrite":
todos = inp.get("todos") or []
summary = f"πŸ“ todowrite ({len(todos)} items)"
body = "<ul>" + "".join(
f"<li>{_todo_icon(t.get('status'))} {_esc(t.get('content'))}</li>"
for t in todos
) + "</ul>"
elif name == "task":
desc = inp.get("description") or inp.get("prompt") or ""
summary = f"🧩 task β€” {_esc(desc[:160])}"
body = f"<pre>{_esc(_cap(str(out), 4000))}</pre>"
elif name == "webfetch":
summary = f"🌐 webfetch <code>{_esc(inp.get('url'))}</code>"
body = f"<pre>{_esc(_cap(str(out), 4000))}</pre>"
else:
summary = f"πŸ”§ {_esc(name)}"
body = (
f"<div class='lbl'>input</div><pre>{_esc(_cap(json.dumps(inp, indent=2, default=str), 4000))}</pre>"
f"<div class='lbl'>output</div><pre>{_esc(_cap(str(out), 4000))}</pre>"
)
return (
"<details class='tool' open>"
f"<summary>{summary} <span class='badge {badge}'>{_esc(status)}</span></summary>"
f"<div class='tbody'>{body}</div>"
"</details>"
)
def _todo_icon(status: str | None) -> str:
return {"completed": "βœ…", "in_progress": "πŸ”„"}.get(status or "", "⏳")
def _render_transcript(parts: list[dict], errors: list[str]) -> str:
out: list[str] = []
if errors:
out.append(
"<div class='errbox'><b>⚠️ errors</b><ul>"
+ "".join(f"<li>{_esc(e)}</li>" for e in errors[:8])
+ "</ul></div>"
)
if not parts:
out.append("<div class='empty'>waiting for first part…</div>")
return "".join(out)
out.append("<div class='chat'>")
for p in parts:
t = p.get("type")
if t == "step-start":
out.append("<div class='step'>── new step ──</div>")
elif t == "reasoning":
txt = (p.get("text") or "").strip()
if txt:
out.append(
"<details class='reasoning'><summary>🧠 reasoning</summary>"
f"<pre>{_esc(_cap(txt, 4000))}</pre></details>"
)
elif t == "text":
txt = (p.get("text") or "").strip()
if txt:
out.append(f"<div class='assistant'><pre>{_esc(txt)}</pre></div>")
elif t == "tool":
out.append(_fmt_tool(p.get("tool") or "?", p.get("state") or {}, p))
elif t == "step-finish":
tokens = p.get("tokens") or (p.get("state") or {}).get("tokens") or {}
if tokens:
out.append(f"<div class='stepfin'>tokens: {_esc(json.dumps(tokens, default=str))}</div>")
out.append("</div>")
return "".join(out)
def _render_todo(todos: list[dict]) -> str:
if not todos:
return ""
items = "".join(
f"<li>{_todo_icon(t.get('status'))} {_esc(t.get('content') or t.get('text',''))}</li>"
for t in todos
)
return f"<div class='todostrip'><b>plan</b><ul>{items}</ul></div>"
def _render_diff(diffs: list[dict]) -> str:
if not diffs:
return ""
blocks = []
for d in diffs:
path = d.get("path") or d.get("file") or "?"
patch = d.get("patch") or d.get("diff") or ""
blocks.append(
f"<details class='diff'><summary>{_esc(path)}</summary>"
f"<pre>{_esc(_cap(patch, 6000))}</pre></details>"
)
return (
"<details class='diff-wrap' open>"
f"<summary>πŸ“‹ session diff ({len(diffs)} files)</summary>"
f"{''.join(blocks)}</details>"
)
# ── State ──────────────────────────────────────────────────────────────────
class _State:
sid: str = "" # empty β†’ next Run creates a new session
stop: threading.Event | None = None
events: list[dict] = [] # reset per session
sse_thread: threading.Thread | None = None
_STATE = _State()
def _ensure_session() -> str:
"""Create a session if none exists; reuse across runs for multi-turn."""
if _STATE.sid:
return _STATE.sid
_STATE.sid = _create_session()
_STATE.stop = threading.Event()
_STATE.events = []
_STATE.sse_thread = threading.Thread(
target=_stream, args=(_STATE.sid, _STATE.events, _STATE.stop), daemon=True
)
_STATE.sse_thread.start()
time.sleep(0.15)
return _STATE.sid
def _new_session_cb() -> tuple[str, str, str, str]:
"""Tear down any existing SSE and clear state. Next Run opens a fresh session."""
if _STATE.stop:
_STATE.stop.set()
if _STATE.sid:
_abort(_STATE.sid)
_STATE.sid = ""
_STATE.stop = None
_STATE.events = []
return (
"✨ new session β€” Run to start",
"", # transcript
"", # todo
"", # diff
)
# ── Main ───────────────────────────────────────────────────────────────────
def run(prompt: str) -> Generator[tuple[str, str, str, str], None, None]:
try:
sid = _ensure_session()
except Exception as exc:
yield f"❌ session create failed: {exc}", "", "", ""
return
# Snapshot the event index BEFORE firing β€” "idle for THIS turn" must be
# scoped to events that arrive after the prompt is sent, otherwise the
# idle frame from the previous turn fires the break immediately.
turn_start = len(_STATE.events)
try:
_fire_async(sid, prompt)
except Exception as exc:
yield f"❌ prompt failed: {exc}", "", "", ""
return
t0 = time.time()
last_todo_refresh = 0.0
todos: list[dict] = []
while time.time() - t0 < 600:
new_events = _STATE.events[turn_start:]
idle = any(e.get("type") in ("session.idle", "idle") for e in new_events)
parts, errors = _assemble(_STATE.events)
if time.time() - last_todo_refresh > 3.0:
todos = _session_todo(sid)
last_todo_refresh = time.time()
status = (
f"{'βœ… idle' if idle else '⚑ running'} Β· "
f"session <code>{sid[:18]}…</code> Β· "
f"{time.time()-t0:.1f}s Β· {len(parts)} parts Β· {len(_STATE.events)} events"
)
diff_html = ""
if idle:
diff_html = _render_diff(_session_diff(sid))
yield status, _render_transcript(parts, errors), _render_todo(todos), diff_html
if idle:
break
time.sleep(0.4)
def abort_cb() -> str:
if _STATE.sid:
_abort(_STATE.sid)
# leave SSE open so user sees the abort-related events; actual teardown on new session
return "⏹ aborted (session kept β€” click New session to clear)"
def refresh_banner() -> str:
return _banner()
# ── CSS ────────────────────────────────────────────────────────────────────
_CSS = """
.banner { margin:4px 0 2px; }
.tools { font-size:11px; color:#888; margin:2px 0 8px; }
.chip { display:inline-block; padding:2px 8px; margin:2px; border-radius:10px;
background:#2b2d31; color:#ddd; font-size:12px; }
.chip.ok { background:#1f6f43; }
.chip.err { background:#7a1e1e; }
.chip code { background:transparent; color:#9ad; }
.errbox { background:#2a1414; border:1px solid #7a1e1e; border-radius:6px;
padding:6px 10px; margin:6px 0; color:#f88; font-size:13px; }
.errbox ul { margin:2px 0 0 18px; }
.chat { font-size:14px; }
.assistant pre { background:#0e1013; padding:10px; border-radius:8px;
white-space:pre-wrap; color:#eee; margin:6px 0; }
.reasoning { opacity:0.8; margin:4px 0; }
.reasoning pre { background:#0a0b0d; color:#aab; padding:8px; white-space:pre-wrap; }
.tool { border:1px solid #2a2f3a; border-radius:8px; padding:6px 10px;
margin:6px 0; background:#12161c; }
.tool summary { cursor:pointer; color:#ddd; }
.tool code { background:#222; color:#9cf; padding:1px 4px; border-radius:3px; }
.tbody { margin-top:6px; }
.tbody pre { background:#0a0b0d; padding:8px; border-radius:4px;
white-space:pre-wrap; max-height:400px; overflow:auto;
font-size:12px; color:#ddd; margin:2px 0; }
.tbody pre.add { border-left:3px solid #2e6; }
.tbody pre.del { border-left:3px solid #e53; }
.tbody .lbl { color:#888; font-size:11px; margin-top:6px; }
.badge { padding:1px 6px; border-radius:8px; font-size:11px;
background:#333; color:#ddd; }
.badge.ok { background:#1f6f43; color:white; }
.badge.err { background:#7a1e1e; color:white; }
.badge.run { background:#7a5c1e; color:white; }
.step { color:#555; text-align:center; margin:10px 0; font-size:11px; }
.stepfin { color:#666; font-size:11px; margin:4px 0 12px; }
.empty { color:#666; font-style:italic; padding:12px; }
.todostrip { background:#14181e; border:1px solid #2a2f3a; border-radius:6px;
padding:6px 10px; margin:6px 0; font-size:13px; }
.todostrip ul { margin:4px 0 0 18px; }
.diff-wrap { margin:8px 0; }
.diff summary { cursor:pointer; color:#9ad; font-family:monospace; }
.diff pre { background:#0a0b0d; padding:8px; border-radius:4px;
white-space:pre; font-size:12px; color:#ddd; overflow:auto; }
"""
# ── Layout ─────────────────────────────────────────────────────────────────
with gr.Blocks(title="opencode serve", css=_CSS) as demo:
banner_html = gr.HTML(value="_(loading…)_")
status_md = gr.Markdown()
todo_html = gr.HTML()
transcript_html = gr.HTML(value="<div class='empty'>run a prompt to start</div>")
diff_html = gr.HTML()
with gr.Row():
prompt = gr.Textbox(
label="Prompt",
value="Write fizzbuzz.py that prints FizzBuzz for 1..15 and run it.",
lines=3,
scale=5,
)
run_btn = gr.Button("β–Ά Run", variant="primary", scale=1)
with gr.Column(scale=1, min_width=120):
abort_btn = gr.Button("⏹ Abort", variant="stop")
new_btn = gr.Button("✨ New session")
run_btn.click(
run,
inputs=[prompt],
outputs=[status_md, transcript_html, todo_html, diff_html],
)
abort_btn.click(abort_cb, outputs=[status_md])
new_btn.click(
_new_session_cb,
outputs=[status_md, transcript_html, todo_html, diff_html],
)
demo.load(refresh_banner, outputs=[banner_html])
if __name__ == "__main__":
import os
demo.queue().launch(
server_name="0.0.0.0",
server_port=int(os.environ.get("GRADIO_PORT", "7861")),
share=True,
show_error=True,
)