opencode-env / server /gradio_ui.py
AdithyaSK's picture
AdithyaSK HF Staff
Upload folder using huggingface_hub
6c15447 verified
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
"""Minimal Gradio UI for opencode_env.
Mounts under the standard OpenEnv ``/web`` path via the
``gradio_builder=`` callback documented at
https://meta-pytorch.org/OpenEnv/customizing-web-ui.html.
One page with:
- endpoint selector (``vllm`` / ``openai`` / ``hf_router``) β€” the catalog
resolves the actual base_url / api_key / model from env vars.
- instruction + setup (bash, one cmd per line) + verify (bash, one cmd
per line) textareas β€” the same Task shape the MCP tool accepts.
- Tunables (mode, disable_thinking, max_tokens_cap, top_logprobs,
agent_timeout_s, template).
- Preset buttons for the ready-made example tasks.
- Run button β†’ result panel with reward, setup/verify per-command
results, file outputs, logprob stats, agent + proxy log tails,
and the raw RolloutResult JSON.
"""
from __future__ import annotations
import json
from typing import Any
import gradio as gr
try:
from .catalog import ENDPOINT_KINDS, catalog_summary, resolve_endpoint
from .opencode_environment import OpenCodeEnvironment
except ImportError: # pragma: no cover
from server.catalog import ENDPOINT_KINDS, catalog_summary, resolve_endpoint # type: ignore
from server.opencode_environment import OpenCodeEnvironment # type: ignore
# ────────────────────────────────────────────────────────────────────────────
# Preset task examples β€” each fills (instruction, setup, verify).
# ────────────────────────────────────────────────────────────────────────────
PRESETS: dict[str, dict[str, str]] = {
"binary_search": {
"instruction": (
"Create a single Python file named `binary_search.py` in the "
"current working directory. Use the relative path `binary_search.py`. "
"Expose exactly one function:\n"
" def binary_search(arr: list[int], target: int) -> int\n"
"Return the index of `target` in the sorted list `arr`, or -1 if "
"absent. Use the binary-search algorithm; do not call list.index."
),
"setup": "",
"verify": (
"test -f /home/user/workdir/binary_search.py\n"
"python -c \"import sys; sys.path.insert(0, '/home/user/workdir'); "
"import binary_search; "
"assert binary_search.binary_search([1,2,3,4,5], 3) == 2; "
"assert binary_search.binary_search([1,2,3], 99) == -1; "
"assert binary_search.binary_search([], 1) == -1; "
"print('OK')\""
),
},
"fizzbuzz": {
"instruction": (
"Create `fizzbuzz.py` in the current directory exposing "
"`def fizzbuzz(n: int) -> list[str]` that returns the FizzBuzz "
"sequence for the integers 1..n. 'Fizz' for multiples of 3, 'Buzz' "
"for 5, 'FizzBuzz' for both, otherwise the number as a string."
),
"setup": "",
"verify": (
"test -f /home/user/workdir/fizzbuzz.py\n"
"python -c \"import sys; sys.path.insert(0, '/home/user/workdir'); "
"import fizzbuzz; "
"assert fizzbuzz.fizzbuzz(5) == ['1','2','Fizz','4','Buzz']; "
"assert fizzbuzz.fizzbuzz(15)[-1] == 'FizzBuzz'; "
"print('OK')\""
),
},
"pandas_csv": {
"instruction": (
"Read `/home/user/data/numbers.csv` (a CSV with a single column "
"`x` of integers) using pandas. Compute the mean of the `x` "
"column and write it as a single float to `/home/user/workdir/mean.txt` "
"(no extra characters, no newline)."
),
"setup": (
"pip install --quiet pandas\n"
"mkdir -p /home/user/data\n"
"printf 'x\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n10\\n' > /home/user/data/numbers.csv"
),
"verify": (
"test -f /home/user/workdir/mean.txt\n"
"python -c \"v=float(open('/home/user/workdir/mean.txt').read().strip()); "
"assert abs(v-5.5) < 1e-6, v; print('mean=', v)\""
),
},
}
# ────────────────────────────────────────────────────────────────────────────
# Result rendering helpers
# ────────────────────────────────────────────────────────────────────────────
def _split_commands(text: str) -> list[str]:
return [line for line in (text or "").splitlines() if line.strip()]
def _badge_for_reward(reward: float | None) -> str:
if reward is None:
return "**reward**: _n/a_"
if reward >= 0.999:
emoji = "[PASS]"
elif reward > 0.0:
emoji = "[PARTIAL]"
else:
emoji = "[FAIL]"
return f"### {emoji} reward = `{reward:.2f}`"
def _summary_md(result: dict[str, Any]) -> str:
parts = [_badge_for_reward(result.get("reward"))]
parts.append(
f"**sandbox**: `{result.get('sandbox_id') or 'n/a'}` Β· "
f"**wall**: `{result.get('wall_s', 0):.1f}s` Β· "
f"**agent_exit**: `{result.get('agent_exit_code')}` Β· "
f"**mode**: `{result.get('mode', 'n/a')}`"
)
if result.get("error"):
parts.append(f"**error**: `{result['error']}`")
return "\n\n".join(parts)
def _command_rows(items: list[dict[str, Any]]) -> list[list[str]]:
rows: list[list[str]] = []
for it in items or []:
cmd = it.get("cmd", "")
rows.append(
[
cmd if len(cmd) <= 80 else cmd[:77] + "...",
str(it.get("exit_code", "")),
f"{it.get('duration_s', 0):.2f}s",
(it.get("stderr") or "").splitlines()[-1][:80] if it.get("exit_code") else "",
]
)
return rows
def _logprobs_md(turns: list[dict[str, Any]]) -> str:
if not turns:
return "_No proxy turns captured._\n\nThis is normal in `black_box` mode. In `transparent_proxy` mode, an empty list usually means the agent never made an LLM call (check the agent log)."
n = len(turns)
productive = sum(1 for t in turns if t.get("completion_tokens"))
total_toks = sum(len(t.get("completion_tokens") or []) for t in turns)
all_lps = [
float(x)
for t in turns
for x in (t.get("per_token_logps") or [])
if x is not None
]
mean_lp = (sum(all_lps) / len(all_lps)) if all_lps else None
lines = [
f"**turns**: `{n}` Β· **productive**: `{productive}` Β· "
f"**total_completion_tokens**: `{total_toks}`",
]
if mean_lp is not None:
lines.append(f"**mean_logprob**: `{mean_lp:+.4f}`")
finishes: dict[str, int] = {}
for t in turns:
f = t.get("finish_reason") or "unknown"
finishes[f] = finishes.get(f, 0) + 1
if finishes:
lines.append(
"**finish_reasons**: " + " ".join(f"`{k}={v}`" for k, v in finishes.items())
)
productive_rows = [t for t in turns if t.get("completion_tokens")]
if productive_rows:
first = productive_rows[0]
toks = first["completion_tokens"][:10]
lps = first.get("per_token_logps") or []
lines.append(
f"\n**first productive turn (first 10 tokens)**\n\n"
f"```\n"
+ "\n".join(
f" {tok!r:<14} {lp:+.3f}" if i < len(lps) else f" {tok!r:<14} -"
for i, (tok, lp) in enumerate(zip(toks, lps + [None] * len(toks)))
)
+ "\n```"
)
return "\n\n".join(lines)
def _live_status_md(
endpoint_kind: str,
model: str,
mode: str,
elapsed_s: float,
lines: list[tuple[float, str]],
) -> str:
"""Render a live phase log (latest at the bottom) with elapsed timestamps."""
head = (
f"### running… `elapsed={elapsed_s:.1f}s`\n\n"
f"_endpoint=`{endpoint_kind}` model=`{model}` mode=`{mode}`_\n\n"
)
if not lines:
body = "_(waiting for first phase update…)_"
else:
# Show the most recent ~12 lines so the panel doesn't grow unbounded.
rows = ["| t (s) | phase |", "|---|---|"]
for ts, msg in lines[-12:]:
rows.append(f"| `{ts:>6.1f}` | {msg.replace(chr(10), ' ')[:200]} |")
body = "\n".join(rows)
return head + body
def _files_md(files: dict[str, str]) -> str:
if not files:
return "_No files in the workdir._"
chunks = []
for path, content in files.items():
chunks.append(f"**`{path}`**\n```python\n{content[:4000]}\n```")
return "\n\n".join(chunks)
def _catalog_banner() -> str:
rows = ["**Endpoint catalog (env vars + defaults)**", ""]
rows.append("| kind | base_url | model | env vars | configured |")
rows.append("|---|---|---|---|---|")
for entry in catalog_summary():
envs = (
f"`{entry['base_url_env']}`<br/>`{entry['api_key_env']}`<br/>"
f"`{entry['model_env']}`"
)
ok = "yes" if entry["configured"] else "**no**"
rows.append(
f"| `{entry['kind']}` | `{entry['default_base_url'] or '-'}` | "
f"`{entry['default_model'] or '-'}` | {envs} | {ok} |"
)
return "\n".join(rows)
# ────────────────────────────────────────────────────────────────────────────
# Builder
# ────────────────────────────────────────────────────────────────────────────
def opencode_gradio_builder(
web_manager, # noqa: ARG001 (unused: we instantiate the env directly)
action_fields, # noqa: ARG001
metadata, # noqa: ARG001
is_chat_env, # noqa: ARG001
title,
quick_start_md, # noqa: ARG001
) -> gr.Blocks:
"""Build the opencode_env console.
Compatible with ``create_app(..., gradio_builder=...)``. We ignore
``web_manager`` and instantiate :class:`OpenCodeEnvironment` ourselves
inside the run handler β€” opencode_env's run_rollout doesn't need any
per-session state beyond the env's own bookkeeping, and instantiating
is cheap (no sandbox is created until the tool fires).
"""
def run(
endpoint: str,
model: str,
base_url: str,
api_key: str,
instruction: str,
setup_text: str,
verify_text: str,
mode: str,
disable_thinking: str,
template: str,
max_tokens_cap: int,
top_logprobs: int,
agent_timeout_s: float,
):
"""Generator handler β€” yields incremental UI updates.
Each ``yield`` is a tuple matching ``outputs=[...]``:
(summary_md, setup_table, verify_table, files_md, logprobs_md,
logs_md, raw_json). Early yields keep summary_md as a live phase
log while the rollout runs; the final yield populates everything.
"""
import queue
import threading
import time
# Resolve endpoint up front β€” if this fails, we can return one
# immediate result with no streaming needed.
try:
resolved = resolve_endpoint(
endpoint, base_url=base_url, api_key=api_key, model=model
)
except ValueError as exc:
err = f"endpoint resolution failed: {exc}"
yield (f"### error\n\n```\n{err}\n```", [], [], "", "", "", {"error": err})
return
# Translate "auto" / "on" / "off" into bool / None.
if disable_thinking == "on":
dt: bool | None = True
elif disable_thinking == "off":
dt = False
else:
dt = None
env = OpenCodeEnvironment()
# The worker fires _run_rollout_impl in a background thread and
# streams progress messages into a queue; this generator polls the
# queue every 0.5s and yields a refreshed status_md to the UI.
status_q: queue.Queue = queue.Queue()
result_holder: dict = {}
def _cb(msg: str) -> None:
status_q.put(("status", msg, time.time()))
def _worker():
try:
payload = env._run_rollout_impl(
base_url=resolved.base_url,
api_key=resolved.api_key,
model=resolved.model,
instruction=instruction,
setup=_split_commands(setup_text),
verify=_split_commands(verify_text),
task_id="ui",
mode=mode,
disable_thinking=(
dt if dt is not None else resolved.disable_thinking_default
),
max_tokens_cap=int(max_tokens_cap),
top_logprobs=int(top_logprobs),
agent_timeout_s=float(agent_timeout_s),
template=template,
progress_cb=_cb,
)
result_holder["payload"] = payload
except Exception as exc: # noqa: BLE001
result_holder["error"] = f"{type(exc).__name__}: {exc}"
status_q.put(("error", result_holder["error"], time.time()))
finally:
status_q.put(("done", None, time.time()))
worker = threading.Thread(target=_worker, daemon=True)
t_start = time.time()
worker.start()
# First yield: announce we've started. Empty result panels.
yield (
f"### running…\n\n_endpoint=`{resolved.kind}` model=`{resolved.model}` mode=`{mode}`_",
[], [], "", "", "", {},
)
status_lines: list[tuple[float, str]] = []
finished = False
while not finished:
try:
kind, msg, ts = status_q.get(timeout=0.5)
if kind == "status":
status_lines.append((ts - t_start, msg))
elif kind == "error":
status_lines.append((ts - t_start, f"ERROR: {msg}"))
elif kind == "done":
finished = True
except queue.Empty:
pass
# Render the live status pane.
elapsed = time.time() - t_start
md = _live_status_md(resolved.kind, resolved.model, mode, elapsed, status_lines)
yield (md, [], [], "", "", "", {})
# Drain any final messages still in the queue.
while not status_q.empty():
try:
kind, msg, ts = status_q.get_nowait()
if kind == "status":
status_lines.append((ts - t_start, msg))
except queue.Empty:
break
if "payload" not in result_holder:
err = result_holder.get("error", "unknown error")
yield (
f"### error\n\n```\n{err}\n```",
[], [], "", "",
_live_status_md(resolved.kind, resolved.model, mode,
time.time() - t_start, status_lines),
{"error": err},
)
return
result = json.loads(result_holder["payload"])
yield (
_summary_md(result),
_command_rows(result.get("setup_results") or []),
_command_rows(result.get("verify_results") or []),
_files_md(result.get("files") or {}),
_logprobs_md(result.get("proxy_turns") or []),
(
f"### live phase log\n\n"
+ _live_status_md(resolved.kind, resolved.model, mode,
time.time() - t_start, status_lines)
+ f"\n\n### agent log (tail)\n```\n{result.get('agent_log_tail', '')[:4000]}\n```\n\n"
f"### proxy log (tail)\n```\n{result.get('proxy_log_tail', '')[:4000]}\n```"
),
result,
)
def apply_preset(name: str) -> tuple[str, str, str]:
p = PRESETS.get(name) or {"instruction": "", "setup": "", "verify": ""}
return p["instruction"], p["setup"], p["verify"]
with gr.Blocks(title=title or "opencode_env") as app:
gr.Markdown(f"# {title or 'opencode_env'}")
gr.Markdown(
"Run one OpenCode rollout in an E2B sandbox against your chosen "
"LLM endpoint. Pick an endpoint, write the task as `(instruction, "
"setup, verify)`, and inspect the reward + per-token logprobs."
)
gr.Markdown(_catalog_banner())
with gr.Row():
endpoint = gr.Dropdown(
choices=list(ENDPOINT_KINDS),
value="openai",
label="Endpoint",
scale=1,
)
model = gr.Textbox(
label="Model (blank β†’ catalog default)", placeholder="gpt-4o-mini",
scale=2,
)
with gr.Row():
base_url = gr.Textbox(
label="Base URL (blank β†’ env / catalog default)",
placeholder="https://api.openai.com/v1", scale=2,
)
api_key = gr.Textbox(
label="API key (blank β†’ server env var)",
placeholder="(server env)", type="password", scale=1,
)
instruction = gr.Textbox(
label="Instruction (the prompt opencode runs)",
lines=4,
value=PRESETS["binary_search"]["instruction"],
)
with gr.Row():
setup_text = gr.Textbox(
label="Setup (one bash command per line β€” runs BEFORE the agent)",
lines=5,
value=PRESETS["binary_search"]["setup"],
)
verify_text = gr.Textbox(
label="Verify (one bash command per line β€” runs AFTER the agent)",
lines=5,
value=PRESETS["binary_search"]["verify"],
)
with gr.Row():
preset_bs = gr.Button("preset Β· binary_search", size="sm")
preset_fb = gr.Button("preset Β· fizzbuzz", size="sm")
preset_pd = gr.Button("preset Β· pandas_csv", size="sm")
with gr.Accordion("Tunables", open=False):
with gr.Row():
mode = gr.Dropdown(
choices=["transparent_proxy", "black_box"],
value="transparent_proxy",
label="mode",
)
disable_thinking = gr.Dropdown(
choices=["auto", "on", "off"],
value="auto",
label="disable_thinking",
)
template = gr.Textbox(
label="E2B template (e.g. opencode-rl)",
placeholder="(blank β†’ cold install per rollout)",
)
with gr.Row():
max_tokens_cap = gr.Number(value=4096, label="max_tokens_cap", step=1)
top_logprobs = gr.Number(value=5, label="top_logprobs", step=1)
agent_timeout_s = gr.Number(value=600, label="agent_timeout_s", step=1)
run_btn = gr.Button("Run rollout", variant="primary")
gr.Markdown("---")
summary_md = gr.Markdown("_Submit a rollout above to see results._")
with gr.Tabs():
with gr.Tab("Setup"):
setup_table = gr.Dataframe(
headers=["cmd", "exit", "duration", "stderr"],
datatype=["str", "str", "str", "str"],
interactive=False,
wrap=True,
)
with gr.Tab("Verify"):
verify_table = gr.Dataframe(
headers=["cmd", "exit", "duration", "stderr"],
datatype=["str", "str", "str", "str"],
interactive=False,
wrap=True,
)
with gr.Tab("Files"):
files_md = gr.Markdown("")
with gr.Tab("Logprobs"):
logprobs_md = gr.Markdown("")
with gr.Tab("Logs"):
logs_md = gr.Markdown("")
with gr.Tab("Raw JSON"):
raw_json = gr.JSON(value={})
# Wire it up.
for btn, name in [
(preset_bs, "binary_search"),
(preset_fb, "fizzbuzz"),
(preset_pd, "pandas_csv"),
]:
btn.click(
fn=lambda n=name: apply_preset(n),
outputs=[instruction, setup_text, verify_text],
)
run_btn.click(
fn=run,
inputs=[
endpoint, model, base_url, api_key,
instruction, setup_text, verify_text,
mode, disable_thinking, template,
max_tokens_cap, top_logprobs, agent_timeout_s,
],
outputs=[
summary_md, setup_table, verify_table,
files_md, logprobs_md, logs_md, raw_json,
],
)
return app