"""Gradio UI for the OpenCode OpenEnv server.
One page. Top half: LLM config + task inputs (with preset dropdown).
Bottom half: live rollout progress + final result panels.
The Run button uses the non-blocking Phase-2b tool path so the UI updates
progressively as turns complete:
start_rollout → loop {get_state, read workdir} → finalize_rollout
This is a Gradio generator (``yield`` per tick) so the user sees a live
ticker instead of a frozen page.
"""
from __future__ import annotations
import json
import os
import time
from typing import Any
import gradio as gr
try:
from .catalog import CATALOG, by_key, default_model, resolve_endpoint
from .transcript import (
TRANSCRIPT_CSS,
collect_parts_from_messages,
render_transcript,
)
except ImportError: # pragma: no cover — support running as a script
from catalog import CATALOG, by_key, default_model, resolve_endpoint # type: ignore
from transcript import ( # type: ignore
TRANSCRIPT_CSS,
collect_parts_from_messages,
render_transcript,
)
# ── Preset tasks ──────────────────────────────────────────────────────────
# Shown in the dropdown. Each has instruction + matching bash verifier.
_HELLO_TEST = """#!/usr/bin/env bash
set -u
mkdir -p /home/user/logs/verifier
cd /home/user/workdir || { echo 0 > /home/user/logs/verifier/reward.txt; exit 0; }
[ -f hello.py ] || { echo 0 > /home/user/logs/verifier/reward.txt; exit 0; }
OUT=$(python hello.py 2>/dev/null | head -1)
if [ "$OUT" = "hello" ]; then echo 1.0 > /home/user/logs/verifier/reward.txt; \
else echo 0.0 > /home/user/logs/verifier/reward.txt; fi
"""
_FIZZBUZZ_TEST = """#!/usr/bin/env bash
set -u
mkdir -p /home/user/logs/verifier
REWARD=/home/user/logs/verifier/reward.txt
cd /home/user/workdir || { echo 0 > "$REWARD"; exit 0; }
[ -f fizzbuzz.py ] || { echo 0 > "$REWARD"; exit 0; }
OUT=$(python fizzbuzz.py 2>&1 | head -20)
EXPECTED=(1 2 Fizz 4 Buzz Fizz 7 8 Fizz Buzz 11 Fizz 13 14 FizzBuzz)
HITS=0
for line in "${EXPECTED[@]}"; do
echo "$OUT" | grep -qxF "$line" && HITS=$((HITS + 1))
done
python -c "print(${HITS} / ${#EXPECTED[@]})" > "$REWARD"
"""
_FIBONACCI_TEST = """#!/usr/bin/env bash
set -u
mkdir -p /home/user/logs/verifier
REWARD=/home/user/logs/verifier/reward.txt
cd /home/user/workdir || { echo 0 > "$REWARD"; exit 0; }
[ -f fibonacci.py ] || { echo 0 > "$REWARD"; exit 0; }
EXPECTED="0 1 1 2 3 5 8 13 21 34"
OUT=$(python fibonacci.py 2>/dev/null | tr '\\n' ' ' | xargs || true)
if [ "$OUT" = "$EXPECTED" ]; then
echo 1.0 > "$REWARD"
else
python -c "
expected='$EXPECTED'.split()
got='$OUT'.split()
hits=sum(1 for e,g in zip(expected,got) if e==g)
print(hits/len(expected))" > "$REWARD"
fi
"""
_SORT_LIST_TEST = """#!/usr/bin/env bash
set -u
mkdir -p /home/user/logs/verifier
REWARD=/home/user/logs/verifier/reward.txt
cd /home/user/workdir || { echo 0 > "$REWARD"; exit 0; }
[ -f sort_list.py ] || { echo 0 > "$REWARD"; exit 0; }
EXPECTED="1,5,7,8,11,13,23,31,42,99"
OUT=$(python sort_list.py 2>/dev/null | head -1 || true)
if [ "$OUT" = "$EXPECTED" ]; then
echo 1.0 > "$REWARD"
else
echo 0.0 > "$REWARD"
fi
"""
_SIMPLE_IO_TEST = """#!/usr/bin/env bash
set -u
mkdir -p /home/user/logs/verifier
REWARD=/home/user/logs/verifier/reward.txt
cd /home/user/workdir || { echo 0 > "$REWARD"; exit 0; }
SCORE=0.0
if [ -f greeting.txt ]; then
if [ "$(cat greeting.txt)" = "hello, world" ]; then
SCORE=$(python -c "print(${SCORE} + 0.5)")
fi
fi
if [ -f read_and_echo.py ]; then
OUT=$(python read_and_echo.py 2>/dev/null | head -1 || true)
if [ "$OUT" = "hello, world" ]; then
SCORE=$(python -c "print(${SCORE} + 0.5)")
fi
fi
echo "$SCORE" > "$REWARD"
"""
PRESET_TASKS: dict[str, tuple[str, str]] = {
"custom (edit below)": ("", ""),
"hello": (
"Write `hello.py` in the current directory that prints exactly the "
"lowercase word `hello` (no quotes, no trailing punctuation).",
_HELLO_TEST,
),
"fizzbuzz": (
"Write a Python script `fizzbuzz.py` in the current directory that "
"prints FizzBuzz for numbers 1..15, one per line. Use 'Fizz' for "
"multiples of 3, 'Buzz' for multiples of 5, 'FizzBuzz' for both.",
_FIZZBUZZ_TEST,
),
"fibonacci": (
"Write `fibonacci.py` in the current directory that prints the first "
"10 Fibonacci numbers (starting from 0), one per line. Expected "
"output:\n0\n1\n1\n2\n3\n5\n8\n13\n21\n34",
_FIBONACCI_TEST,
),
"sort_list": (
"Write `sort_list.py` in the current directory that sorts this list "
"ascending and prints the result as one comma-separated line with no "
"spaces: [42, 7, 13, 1, 99, 5, 23, 8, 31, 11]\n\n"
"Expected output (exactly this line): 1,5,7,8,11,13,23,31,42,99",
_SORT_LIST_TEST,
),
"simple_io": (
"In the current directory:\n"
"1. Create a file `greeting.txt` containing exactly the line "
"`hello, world`.\n"
"2. Write `read_and_echo.py` that opens `greeting.txt` and prints its "
"contents to stdout.\n"
"3. Run the script and verify it prints `hello, world`.",
_SIMPLE_IO_TEST,
),
}
_HF_MODEL_CHOICES = [
(m.label, m.dropdown_key) for m in CATALOG if m.backend == "hf_router"
]
# Sentinel value used for the "type your own HF-router id" dropdown option.
_CUSTOM_HF_KEY = "__custom_hf__"
_HF_MODEL_CHOICES.append(("Custom — enter HF Router model id below", _CUSTOM_HF_KEY))
_DEFAULT_HF_KEY = _HF_MODEL_CHOICES[0][1]
_HF_TOKEN_ENV = os.environ.get("HF_TOKEN", "")
# Suggested / recent vllm model ids (user can type anything).
_VLLM_MODEL_SUGGESTIONS = [
m.repo for m in CATALOG if m.backend == "vllm"
] + ["Qwen/Qwen3.5-4B", "Qwen/Qwen2.5-7B-Instruct"]
def opencode_ui_builder(
*,
env_factory: Any = None,
web_manager: Any = None, # kept for backward compat; unused
title: str = "OpenCode Env",
**_: Any,
) -> gr.Blocks:
"""Build the Gradio Blocks UI.
``env_factory`` is a zero-arg callable that returns a fresh
:class:`OpenCodeEnvironment` on first click (lazy, so the Space's
cold-start path doesn't pay the E2B cost until someone hits Run).
"""
_env_cache: dict[str, Any] = {"instance": None}
def _get_env():
inst = _env_cache.get("instance")
if inst is None:
if env_factory is None:
raise RuntimeError("opencode_ui_builder needs env_factory=...")
inst = env_factory()
_env_cache["instance"] = inst
return inst
with gr.Blocks(title=title, analytics_enabled=False, css=TRANSCRIPT_CSS) as demo:
gr.Markdown(
f"# {title}\n"
"Run one OpenCode rollout against any OpenAI-compatible endpoint. "
"Pick a preset task or paste your own instruction + bash verifier. "
"The run is streamed live — per-turn progress updates while the "
"agent works."
)
# ── Config ─────────────────────────────────────────────────────────
# Two backends:
# 1. Self-hosted vLLM — user supplies model id + base URL.
# 2. Hosted (HF Router) — user picks from the curated Qwen
# catalog, or selects "Custom" and types their own HF-router
# model id (e.g. ``Qwen/Qwen3-8B:together``).
with gr.Row():
with gr.Column(scale=3):
backend_mode = gr.Radio(
label="Backend",
choices=["Self-hosted vLLM", "Hosted (HF Router)"],
value="Hosted (HF Router)",
)
# --- Self-hosted vLLM fields (shown only when selected) ---
with gr.Row(visible=False) as vllm_row:
vllm_model = gr.Textbox(
label="Model id (as served by your vLLM)",
value=_VLLM_MODEL_SUGGESTIONS[0],
placeholder="Qwen/Qwen3.5-4B",
scale=1,
)
vllm_url = gr.Textbox(
label="vLLM base URL",
value="",
placeholder="https://.../v1",
scale=2,
)
# --- Hosted HF Router fields (default visible) ---
with gr.Row(visible=True) as hf_row:
hosted_model = gr.Dropdown(
label="Hosted model",
choices=_HF_MODEL_CHOICES,
value=_DEFAULT_HF_KEY,
scale=2,
)
hf_token = gr.Textbox(
label="HF token",
value=_HF_TOKEN_ENV,
type="password",
placeholder="hf_...",
scale=2,
)
hosted_custom_id = gr.Textbox(
label="Custom HF-router model id",
value="",
placeholder="Qwen/Qwen3-8B:together (org/repo[:provider])",
visible=False,
)
thinking = gr.Checkbox(
label="Thinking mode (Qwen3.5 only)",
value=False,
)
with gr.Column(scale=1):
mode = gr.Dropdown(
label="Mode",
choices=["transparent_proxy", "black_box"],
value="transparent_proxy",
)
max_tokens_cap = gr.Slider(
label="max_tokens cap",
minimum=512, maximum=32768, value=16384, step=512,
)
agent_timeout_s = gr.Slider(
label="Agent timeout (s)",
minimum=60, maximum=1200, value=300, step=30,
)
def _on_backend_change(mode_v: str):
is_vllm = mode_v == "Self-hosted vLLM"
return (
gr.update(visible=is_vllm), # vllm_row
gr.update(visible=not is_vllm), # hf_row
gr.update(visible=False), # hosted_custom_id reset
)
def _on_hosted_change(choice: str):
return gr.update(visible=(choice == _CUSTOM_HF_KEY))
backend_mode.change(
_on_backend_change,
inputs=[backend_mode],
outputs=[vllm_row, hf_row, hosted_custom_id],
)
hosted_model.change(
_on_hosted_change,
inputs=[hosted_model],
outputs=[hosted_custom_id],
)
# ── Task fields ────────────────────────────────────────────────────
# Verifier (test.sh) is intentionally not surfaced here — it's only
# needed for scored training. For interactive use, leave it empty
# and just have the agent finish with something observable (e.g.
# "print DONE at the end"). MCP tools already accept
# ``test_script=""`` and skip scoring when empty.
instruction = gr.Textbox(
label="Instruction",
value=(
"Write `hello.py` in the current directory that prints "
"`hello` (no quotes). Then run it and print `DONE` when "
"you are finished."
),
lines=4,
)
with gr.Row():
task_id = gr.Textbox(
label="Task id (optional label)",
value="interactive",
scale=1,
)
setup_shell = gr.Textbox(
label="Setup shell (optional, runs before opencode)",
value="",
placeholder="e.g. pip install polars",
scale=3,
)
with gr.Row():
run_btn = gr.Button("▶ Run", variant="primary", scale=2)
abort_btn = gr.Button("⏹ Abort", variant="stop", scale=1)
reset_btn = gr.Button("🔄 Reset", variant="secondary", scale=1)
check_btn = gr.Button("🔎 Check endpoint", scale=1)
# ── Output: chat-style single-column ──────────────────────────────
# Transcript is the hero. The status line above it carries a
# sandbox-boot phase indicator so users know whether we're
# spawning E2B, installing opencode, or waiting for the agent.
# Everything else (reward, files, logprob trace, verifier, raw
# JSON) lives in collapsed accordions below. Matches the chat
# shape of local_ui.py.
status = gr.Markdown()
# Shared state: the active rollout_id so Abort and Reset can find it.
rollout_state = gr.State("")
transcript_html = gr.HTML(
value="
run a rollout to see the transcript
",
)
# Hidden outputs retained only so the streaming handler's tuple
# shape doesn't have to change. They never render in the UI.
reward_out = gr.Number(visible=False)
wall_out = gr.Number(visible=False)
exit_out = gr.Number(visible=False)
turns_out = gr.Number(visible=False)
with gr.Accordion("Workdir files", open=False):
workdir_md = gr.Markdown()
with gr.Accordion("Proxy trace (per turn — logprobs)", open=False):
proxy_trace_json = gr.JSON(label=None)
with gr.Accordion("Diagnostics (proxy · install · agent logs)", open=False):
verifier_out = gr.Textbox(label="proxy/install/agent log tails", lines=12)
verifier_err = gr.Textbox(label="primitive error (if any)", lines=3)
with gr.Accordion("Raw result JSON", open=False):
raw_json = gr.JSON(label=None)
# ── Streaming Run handler ─────────────────────────────────────────
def _run_streaming(
backend_mode_v: str,
vllm_model_v: str,
vllm_url_v: str,
hosted_model_v: str,
hosted_custom_id_v: str,
hf_token_v: str,
thinking_v: bool,
mode_v: str,
max_tokens_cap_v: int,
agent_timeout_s_v: float,
task_id_v: str,
instruction_v: str,
setup_shell_v: str,
):
# Verifier is optional. For interactive use we pass an empty
# test_script so the finalizer skips scoring.
test_script_v = ""
# Assemble the uniform model_key from the UI's two-backend picker.
if backend_mode_v == "Self-hosted vLLM":
if not vllm_model_v.strip():
yield _error_tuple("Self-hosted vLLM requires a model id.")
return
model_key_v = f"vllm://{vllm_model_v.strip()}"
else:
if hosted_model_v == _CUSTOM_HF_KEY:
cid = hosted_custom_id_v.strip()
if not cid:
yield _error_tuple(
"Hosted 'Custom' picked but no model id entered."
)
return
if not cid.startswith("hf-router://"):
# Accept either plain "Org/Repo[:provider]" or a
# fully-prefixed key.
cid = f"hf-router://{cid}"
model_key_v = cid
else:
model_key_v = hosted_model_v
"""Gradio generator: yields UI updates as the rollout progresses.
Uses the non-blocking fine-grained tools:
start_rollout → loop(get_state) → finalize_rollout
"""
import httpx
from openenv.core.env_server.mcp_types import CallToolAction
# 0) Resolve the catalog pick into (base_url, api_key, model).
# This validates the secret matches the selected backend.
try:
base_url, _api_key, _model, entry = resolve_endpoint(
model_key_v,
vllm_url=vllm_url_v,
hf_token=hf_token_v,
)
except Exception as exc:
yield _error_tuple(f"config: {exc}")
return
# 1) Pre-flight: verify the endpoint is reachable before burning
# an E2B sandbox on a URL typo / bad token.
yield (
"🔎 **validating endpoint…**",
None, None, None, 0,
"", [], "", "", {"stage": "validate", "backend": entry.backend},
"validating endpoint…
",
"",
)
probe_headers: dict[str, str] = {}
if entry.backend == "hf_router":
probe_headers["Authorization"] = f"Bearer {hf_token_v}"
try:
r = httpx.get(
f"{base_url}/models", headers=probe_headers, timeout=15,
)
if r.status_code != 200:
yield _error_tuple(
f"{entry.backend} probe {base_url}/models → HTTP {r.status_code}: "
f"{r.text[:200]}"
)
return
except Exception as exc:
yield _error_tuple(
f"endpoint unreachable: {type(exc).__name__}: {exc}"
)
return
yield (
"🟡 **initialising env (creating MCP registry)…**",
None, None, None, 0, "", [], "", "", {"stage": "env_init"},
"initialising env…
",
"",
)
try:
env = _get_env()
env.reset()
except Exception as exc:
yield _error_tuple(f"env init failed: {type(exc).__name__}: {exc}")
return
# 2) start_rollout — uniform args: model_key + vllm_url + hf_token
# + thinking. The env resolves via the catalog server-side.
try:
start_obs = env.step(
CallToolAction(
tool_name="start_rollout",
arguments={
"model_key": model_key_v,
"vllm_url": vllm_url_v,
"hf_token": hf_token_v,
"thinking": bool(thinking_v),
"instruction": instruction_v,
"test_script": test_script_v,
"task_id": task_id_v,
"setup_shell": setup_shell_v,
"upload_files": {},
"mode": mode_v,
"max_tokens_cap": int(max_tokens_cap_v),
"agent_timeout_s": float(agent_timeout_s_v),
},
),
timeout_s=60,
)
except Exception as exc:
yield _error_tuple(f"start_rollout failed: {type(exc).__name__}: {exc}")
return
start_payload = _parse_result(start_obs)
rollout_id = start_payload.get("rollout_id")
if not rollout_id:
yield _error_tuple(f"start_rollout returned no rollout_id: {start_payload}")
return
# Initial UI update — yield the rollout_id into shared state so
# Abort / Reset can target the right rollout.
yield (
f"🟡 **rollout `{rollout_id}` started — booting sandbox…**",
None, None, None, 0,
"_(no files yet)_", [], "", "", start_payload,
"booting sandbox — this takes ~20–40s cold…
",
rollout_id,
)
# 2) Poll get_state + get_messages at 1s cadence. Show a sandbox
# boot-phase label so users can tell "booting" from "stuck".
deadline = time.time() + float(agent_timeout_s_v) + 120
t_started = float(start_payload.get("started_at") or time.time())
status_str = "running"
while time.time() < deadline:
try:
state_obs = env.step(
CallToolAction(
tool_name="get_state",
arguments={"rollout_id": rollout_id},
),
timeout_s=20,
)
state_payload = _parse_result(state_obs)
except Exception as exc:
state_payload = {"error": f"{type(exc).__name__}: {exc}"}
# Live transcript — only meaningful once opencode serve has
# created its session (state_payload carries serve_session_id
# in that case). Before that, get_messages returns an empty
# list with a ``note`` field.
parts_list: list = []
transcript = "waiting for first part…
"
try:
msg_obs = env.step(
CallToolAction(
tool_name="get_messages",
arguments={"rollout_id": rollout_id},
),
timeout_s=20,
)
msg_payload = _parse_result(msg_obs)
parts_list = collect_parts_from_messages(
msg_payload.get("messages") or []
)
if parts_list:
transcript = render_transcript(parts_list)
except Exception:
pass
status_str = state_payload.get("status", "?")
elapsed = time.time() - t_started
msg_count = len(
(state_payload.get("messages") if isinstance(state_payload, dict) else None) or []
)
# Prefer message count from the transcript payload.
try:
msg_count = len(msg_payload.get("messages") or [])
except Exception:
msg_count = 0
phase = _boot_phase(state_payload, msg_count, len(parts_list))
yield (
f"{phase} · elapsed `{elapsed:.1f}s` · rollout `{rollout_id}`",
None, None, None, state_payload.get("proxy_turns_so_far", 0),
"_(workdir populated on finalize)_",
[], "", "", state_payload,
transcript,
rollout_id,
)
if status_str == "done":
break
time.sleep(1.0)
# 3) finalize_rollout — run verifier + collect full result
try:
final_obs = env.step(
CallToolAction(
tool_name="finalize_rollout",
arguments={"rollout_id": rollout_id, "wait_s": 60},
),
timeout_s=300,
)
except Exception as exc:
yield _error_tuple(f"finalize_rollout failed: {type(exc).__name__}: {exc}")
return
result = _parse_result(final_obs)
status_md = _summarize_status(result)
wd_md = _render_workdir(result.get("workdir_files") or {})
turns = result.get("proxy_turns") or []
# One last transcript fetch — captures any final parts that
# arrived between the last poll and session.idle.
final_transcript = "(transcript unavailable)
"
try:
msg_obs = env.step(
CallToolAction(
tool_name="get_messages",
arguments={"rollout_id": rollout_id},
),
timeout_s=30,
)
msg_payload = _parse_result(msg_obs)
parts = collect_parts_from_messages(msg_payload.get("messages") or [])
final_transcript = render_transcript(parts)
except Exception:
pass
# Diagnostics pane: concat the three log tails so failures
# are visible without expanding the raw JSON.
diag_tail = "\n".join([
"--- PROXY LOG TAIL ---",
(result.get("proxy_log_tail") or "(empty)")[-2000:],
"",
"--- INSTALL LOG TAIL ---",
(result.get("install_log_tail") or "(empty)")[-1000:],
"",
"--- AGENT LOG TAIL ---",
(result.get("agent_log_tail") or "(empty)")[-2000:],
])
err_line = result.get("error") or ""
yield (
status_md,
result.get("reward"),
result.get("wall_s"),
result.get("exit_code"),
len(turns),
wd_md,
turns,
diag_tail,
err_line,
result,
final_transcript,
rollout_id,
)
_output_widgets = [
status, reward_out, wall_out, exit_out, turns_out,
workdir_md, proxy_trace_json,
verifier_out, verifier_err, raw_json,
transcript_html, rollout_state,
]
run_btn.click(
_run_streaming,
inputs=[
backend_mode,
vllm_model, vllm_url,
hosted_model, hosted_custom_id, hf_token,
thinking, mode,
max_tokens_cap, agent_timeout_s,
task_id, instruction, setup_shell,
],
outputs=_output_widgets,
)
# Check-endpoint handler — cheap GET /v1/models probe against the
# currently-configured backend.
def _check_endpoint(
backend_mode_v: str,
vllm_model_v: str, vllm_url_v: str,
hosted_model_v: str, hosted_custom_id_v: str, hf_token_v: str,
) -> str:
import httpx
if backend_mode_v == "Self-hosted vLLM":
model_key_v = f"vllm://{(vllm_model_v or '').strip()}"
else:
if hosted_model_v == _CUSTOM_HF_KEY:
cid = (hosted_custom_id_v or "").strip()
if not cid:
return "❌ custom HF model id is empty"
model_key_v = cid if cid.startswith("hf-router://") else f"hf-router://{cid}"
else:
model_key_v = hosted_model_v
try:
base_url, _key, _model, entry = resolve_endpoint(
model_key_v, vllm_url=vllm_url_v, hf_token=hf_token_v,
)
except Exception as exc:
return f"❌ {exc}"
headers = {"Authorization": f"Bearer {hf_token_v}"} if entry.backend == "hf_router" else {}
models_url = f"{base_url}/models"
try:
r = httpx.get(models_url, headers=headers, timeout=15)
except Exception as exc:
return f"❌ `{models_url}` unreachable: `{type(exc).__name__}: {exc}`"
if r.status_code != 200:
return f"❌ `{models_url}` → HTTP {r.status_code}\n```\n{r.text[:400]}\n```"
try:
ids = [m.get("id") for m in r.json().get("data", []) if m.get("id")]
except Exception:
ids = []
hint = f" · backend=`{entry.backend}` · resolved=`{_model}`"
if ids:
shown = ", ".join(ids[:5]) + (f", … (+{len(ids)-5} more)" if len(ids) > 5 else "")
return f"✅ reachable{hint} · models: `{shown}`"
return f"⚠️ reachable (HTTP 200) but no `data[*].id` in response{hint}"
check_btn.click(
_check_endpoint,
inputs=[backend_mode, vllm_model, vllm_url, hosted_model, hosted_custom_id, hf_token],
outputs=[status],
)
# ── Abort handler ────────────────────────────────────────────────
# Fire-and-forget abort on the active rollout. Keeps the env + UI
# state so the user can see what the transcript looked like at the
# moment of abort.
def _abort(current_rollout_id: str) -> tuple:
from openenv.core.env_server.mcp_types import CallToolAction
if not current_rollout_id:
return (
"⚠️ nothing to abort (no active rollout).",
None, None, None, None,
"", [], "", "", {"abort": "no-op"},
gr.update(), current_rollout_id,
)
try:
env = _get_env()
env.step(
CallToolAction(
tool_name="abort_rollout",
arguments={"rollout_id": current_rollout_id},
),
timeout_s=30,
)
except Exception as exc: # noqa: BLE001
return (
f"⚠️ abort failed: `{type(exc).__name__}: {exc}`",
None, None, None, None,
"", [], "", "", {"abort": str(exc)},
gr.update(), current_rollout_id,
)
return (
f"⏹ **aborted** rollout `{current_rollout_id}`",
None, None, None, None,
"", [], "", "", {"abort": current_rollout_id},
gr.update(), current_rollout_id,
)
abort_btn.click(
_abort,
inputs=[rollout_state],
outputs=_output_widgets,
)
# ── Reset handler ────────────────────────────────────────────────
# Aborts any in-flight rollout, drops the cached env so the next Run
# creates a fresh :class:`OpenCodeEnvironment` (new MCP registry),
# and clears all UI panels including the transcript.
def _reset(current_rollout_id: str) -> tuple:
from openenv.core.env_server.mcp_types import CallToolAction
if current_rollout_id:
try:
env = _get_env()
env.step(
CallToolAction(
tool_name="abort_rollout",
arguments={"rollout_id": current_rollout_id},
),
timeout_s=30,
)
except Exception:
# Best-effort — if abort fails, still drop the env below
# so the next Run starts clean.
pass
_env_cache["instance"] = None
return (
"🔄 **reset.** next Run will create a fresh environment.",
None, None, None, None,
"_(workdir cleared)_",
[], "", "", {"reset": True},
"run a rollout to see the transcript
",
"",
)
reset_btn.click(
_reset,
inputs=[rollout_state],
outputs=_output_widgets,
)
return demo
# ── Helpers ─────────────────────────────────────────────────────────────────
def _error_tuple(msg: str, rollout_id: str = "") -> tuple:
return (
f"❌ **Error:** `{msg}`",
None, None, None, None,
"", [], "", "", {"error": msg},
f"❌ {msg}
",
rollout_id,
)
def _boot_phase(state: dict, msg_count: int, parts_count: int) -> str:
"""Human-readable sandbox + session boot phase label."""
if state.get("error"):
return f"⚠️ state error: `{state.get('error')}`"
status = state.get("status", "?")
if status == "unknown":
return "⏳ **starting rollout…**"
serve_sid = state.get("serve_session_id")
if not serve_sid:
return (
"🟡 **booting sandbox** — spawning E2B, installing opencode, "
"starting proxy + opencode serve (this takes ~20–40s cold)"
)
if msg_count == 0:
return "🟡 **creating session** — serve is up, prompt about to fire"
if parts_count == 0:
return "💭 **agent thinking** — first LLM call in flight"
turns = state.get("proxy_turns_so_far", 0)
return (
f"⚡ **running** · serve session `{serve_sid[:14]}…` · "
f"parts `{parts_count}` · turns `{turns}`"
)
def _parse_result(raw: Any) -> dict[str, Any]:
"""Unwrap the server's JSON tool result into a plain dict."""
# Object with attribute chain: obs.result.content[0].text
inner = getattr(raw, "result", None)
if inner is not None:
content = getattr(inner, "content", None)
if content:
first = content[0]
text = getattr(first, "text", None)
if isinstance(text, str):
try:
return json.loads(text)
except Exception:
return {"raw": text}
if isinstance(raw, dict):
content = raw.get("content")
if isinstance(content, list) and content:
first = content[0]
text = first.get("text") if isinstance(first, dict) else None
if isinstance(text, str):
try:
return json.loads(text)
except Exception:
return {"raw": text}
return raw
if isinstance(raw, str):
try:
return json.loads(raw)
except Exception:
return {"raw": raw}
return {"raw": str(raw)}
def _summarize_status(result: dict[str, Any]) -> str:
if result.get("error"):
return f"❌ **Error:** `{result['error']}`"
reward = result.get("reward")
turns = result.get("proxy_turns") or []
wall = result.get("wall_s", 0.0)
sb = result.get("sandbox_id", "")
exit_code = result.get("exit_code")
parts = [
f"**reward** = `{reward}`",
f"**wall** = `{wall}s`",
f"**turns** = `{len(turns)}`",
f"**exit** = `{exit_code}`",
]
if sb:
parts.append(f"**sandbox** = `{sb}`")
return "✅ " + " · ".join(parts)
def _render_workdir(files: dict[str, str]) -> str:
if not files:
return "_(no files produced)_"
lines = []
for path, contents in files.items():
lines.append(f"### `{path}`")
lines.append("")
lines.append("```")
lines.append((contents or "").rstrip()[:2000])
lines.append("```")
return "\n".join(lines)