"""Gradio UI for the OpenCode OpenEnv server. One page. Top half: LLM config + task inputs (with preset dropdown). Bottom half: live rollout progress + final result panels. The Run button uses the non-blocking Phase-2b tool path so the UI updates progressively as turns complete: start_rollout → loop {get_state, read workdir} → finalize_rollout This is a Gradio generator (``yield`` per tick) so the user sees a live ticker instead of a frozen page. """ from __future__ import annotations import json import os import time from typing import Any import gradio as gr try: from .catalog import CATALOG, by_key, default_model, resolve_endpoint from .transcript import ( TRANSCRIPT_CSS, collect_parts_from_messages, render_transcript, ) except ImportError: # pragma: no cover — support running as a script from catalog import CATALOG, by_key, default_model, resolve_endpoint # type: ignore from transcript import ( # type: ignore TRANSCRIPT_CSS, collect_parts_from_messages, render_transcript, ) # ── Preset tasks ────────────────────────────────────────────────────────── # Shown in the dropdown. Each has instruction + matching bash verifier. _HELLO_TEST = """#!/usr/bin/env bash set -u mkdir -p /home/user/logs/verifier cd /home/user/workdir || { echo 0 > /home/user/logs/verifier/reward.txt; exit 0; } [ -f hello.py ] || { echo 0 > /home/user/logs/verifier/reward.txt; exit 0; } OUT=$(python hello.py 2>/dev/null | head -1) if [ "$OUT" = "hello" ]; then echo 1.0 > /home/user/logs/verifier/reward.txt; \ else echo 0.0 > /home/user/logs/verifier/reward.txt; fi """ _FIZZBUZZ_TEST = """#!/usr/bin/env bash set -u mkdir -p /home/user/logs/verifier REWARD=/home/user/logs/verifier/reward.txt cd /home/user/workdir || { echo 0 > "$REWARD"; exit 0; } [ -f fizzbuzz.py ] || { echo 0 > "$REWARD"; exit 0; } OUT=$(python fizzbuzz.py 2>&1 | head -20) EXPECTED=(1 2 Fizz 4 Buzz Fizz 7 8 Fizz Buzz 11 Fizz 13 14 FizzBuzz) HITS=0 for line in "${EXPECTED[@]}"; do echo "$OUT" | grep -qxF "$line" && HITS=$((HITS + 1)) done python -c "print(${HITS} / ${#EXPECTED[@]})" > "$REWARD" """ _FIBONACCI_TEST = """#!/usr/bin/env bash set -u mkdir -p /home/user/logs/verifier REWARD=/home/user/logs/verifier/reward.txt cd /home/user/workdir || { echo 0 > "$REWARD"; exit 0; } [ -f fibonacci.py ] || { echo 0 > "$REWARD"; exit 0; } EXPECTED="0 1 1 2 3 5 8 13 21 34" OUT=$(python fibonacci.py 2>/dev/null | tr '\\n' ' ' | xargs || true) if [ "$OUT" = "$EXPECTED" ]; then echo 1.0 > "$REWARD" else python -c " expected='$EXPECTED'.split() got='$OUT'.split() hits=sum(1 for e,g in zip(expected,got) if e==g) print(hits/len(expected))" > "$REWARD" fi """ _SORT_LIST_TEST = """#!/usr/bin/env bash set -u mkdir -p /home/user/logs/verifier REWARD=/home/user/logs/verifier/reward.txt cd /home/user/workdir || { echo 0 > "$REWARD"; exit 0; } [ -f sort_list.py ] || { echo 0 > "$REWARD"; exit 0; } EXPECTED="1,5,7,8,11,13,23,31,42,99" OUT=$(python sort_list.py 2>/dev/null | head -1 || true) if [ "$OUT" = "$EXPECTED" ]; then echo 1.0 > "$REWARD" else echo 0.0 > "$REWARD" fi """ _SIMPLE_IO_TEST = """#!/usr/bin/env bash set -u mkdir -p /home/user/logs/verifier REWARD=/home/user/logs/verifier/reward.txt cd /home/user/workdir || { echo 0 > "$REWARD"; exit 0; } SCORE=0.0 if [ -f greeting.txt ]; then if [ "$(cat greeting.txt)" = "hello, world" ]; then SCORE=$(python -c "print(${SCORE} + 0.5)") fi fi if [ -f read_and_echo.py ]; then OUT=$(python read_and_echo.py 2>/dev/null | head -1 || true) if [ "$OUT" = "hello, world" ]; then SCORE=$(python -c "print(${SCORE} + 0.5)") fi fi echo "$SCORE" > "$REWARD" """ PRESET_TASKS: dict[str, tuple[str, str]] = { "custom (edit below)": ("", ""), "hello": ( "Write `hello.py` in the current directory that prints exactly the " "lowercase word `hello` (no quotes, no trailing punctuation).", _HELLO_TEST, ), "fizzbuzz": ( "Write a Python script `fizzbuzz.py` in the current directory that " "prints FizzBuzz for numbers 1..15, one per line. Use 'Fizz' for " "multiples of 3, 'Buzz' for multiples of 5, 'FizzBuzz' for both.", _FIZZBUZZ_TEST, ), "fibonacci": ( "Write `fibonacci.py` in the current directory that prints the first " "10 Fibonacci numbers (starting from 0), one per line. Expected " "output:\n0\n1\n1\n2\n3\n5\n8\n13\n21\n34", _FIBONACCI_TEST, ), "sort_list": ( "Write `sort_list.py` in the current directory that sorts this list " "ascending and prints the result as one comma-separated line with no " "spaces: [42, 7, 13, 1, 99, 5, 23, 8, 31, 11]\n\n" "Expected output (exactly this line): 1,5,7,8,11,13,23,31,42,99", _SORT_LIST_TEST, ), "simple_io": ( "In the current directory:\n" "1. Create a file `greeting.txt` containing exactly the line " "`hello, world`.\n" "2. Write `read_and_echo.py` that opens `greeting.txt` and prints its " "contents to stdout.\n" "3. Run the script and verify it prints `hello, world`.", _SIMPLE_IO_TEST, ), } _HF_MODEL_CHOICES = [ (m.label, m.dropdown_key) for m in CATALOG if m.backend == "hf_router" ] # Sentinel value used for the "type your own HF-router id" dropdown option. _CUSTOM_HF_KEY = "__custom_hf__" _HF_MODEL_CHOICES.append(("Custom — enter HF Router model id below", _CUSTOM_HF_KEY)) _DEFAULT_HF_KEY = _HF_MODEL_CHOICES[0][1] _HF_TOKEN_ENV = os.environ.get("HF_TOKEN", "") # Suggested / recent vllm model ids (user can type anything). _VLLM_MODEL_SUGGESTIONS = [ m.repo for m in CATALOG if m.backend == "vllm" ] + ["Qwen/Qwen3.5-4B", "Qwen/Qwen2.5-7B-Instruct"] def opencode_ui_builder( *, env_factory: Any = None, web_manager: Any = None, # kept for backward compat; unused title: str = "OpenCode Env", **_: Any, ) -> gr.Blocks: """Build the Gradio Blocks UI. ``env_factory`` is a zero-arg callable that returns a fresh :class:`OpenCodeEnvironment` on first click (lazy, so the Space's cold-start path doesn't pay the E2B cost until someone hits Run). """ _env_cache: dict[str, Any] = {"instance": None} def _get_env(): inst = _env_cache.get("instance") if inst is None: if env_factory is None: raise RuntimeError("opencode_ui_builder needs env_factory=...") inst = env_factory() _env_cache["instance"] = inst return inst with gr.Blocks(title=title, analytics_enabled=False, css=TRANSCRIPT_CSS) as demo: gr.Markdown( f"# {title}\n" "Run one OpenCode rollout against any OpenAI-compatible endpoint. " "Pick a preset task or paste your own instruction + bash verifier. " "The run is streamed live — per-turn progress updates while the " "agent works." ) # ── Config ───────────────────────────────────────────────────────── # Two backends: # 1. Self-hosted vLLM — user supplies model id + base URL. # 2. Hosted (HF Router) — user picks from the curated Qwen # catalog, or selects "Custom" and types their own HF-router # model id (e.g. ``Qwen/Qwen3-8B:together``). with gr.Row(): with gr.Column(scale=3): backend_mode = gr.Radio( label="Backend", choices=["Self-hosted vLLM", "Hosted (HF Router)"], value="Hosted (HF Router)", ) # --- Self-hosted vLLM fields (shown only when selected) --- with gr.Row(visible=False) as vllm_row: vllm_model = gr.Textbox( label="Model id (as served by your vLLM)", value=_VLLM_MODEL_SUGGESTIONS[0], placeholder="Qwen/Qwen3.5-4B", scale=1, ) vllm_url = gr.Textbox( label="vLLM base URL", value="", placeholder="https://.../v1", scale=2, ) # --- Hosted HF Router fields (default visible) --- with gr.Row(visible=True) as hf_row: hosted_model = gr.Dropdown( label="Hosted model", choices=_HF_MODEL_CHOICES, value=_DEFAULT_HF_KEY, scale=2, ) hf_token = gr.Textbox( label="HF token", value=_HF_TOKEN_ENV, type="password", placeholder="hf_...", scale=2, ) hosted_custom_id = gr.Textbox( label="Custom HF-router model id", value="", placeholder="Qwen/Qwen3-8B:together (org/repo[:provider])", visible=False, ) thinking = gr.Checkbox( label="Thinking mode (Qwen3.5 only)", value=False, ) with gr.Column(scale=1): mode = gr.Dropdown( label="Mode", choices=["transparent_proxy", "black_box"], value="transparent_proxy", ) max_tokens_cap = gr.Slider( label="max_tokens cap", minimum=512, maximum=32768, value=16384, step=512, ) agent_timeout_s = gr.Slider( label="Agent timeout (s)", minimum=60, maximum=1200, value=300, step=30, ) def _on_backend_change(mode_v: str): is_vllm = mode_v == "Self-hosted vLLM" return ( gr.update(visible=is_vllm), # vllm_row gr.update(visible=not is_vllm), # hf_row gr.update(visible=False), # hosted_custom_id reset ) def _on_hosted_change(choice: str): return gr.update(visible=(choice == _CUSTOM_HF_KEY)) backend_mode.change( _on_backend_change, inputs=[backend_mode], outputs=[vllm_row, hf_row, hosted_custom_id], ) hosted_model.change( _on_hosted_change, inputs=[hosted_model], outputs=[hosted_custom_id], ) # ── Task fields ──────────────────────────────────────────────────── # Verifier (test.sh) is intentionally not surfaced here — it's only # needed for scored training. For interactive use, leave it empty # and just have the agent finish with something observable (e.g. # "print DONE at the end"). MCP tools already accept # ``test_script=""`` and skip scoring when empty. instruction = gr.Textbox( label="Instruction", value=( "Write `hello.py` in the current directory that prints " "`hello` (no quotes). Then run it and print `DONE` when " "you are finished." ), lines=4, ) with gr.Row(): task_id = gr.Textbox( label="Task id (optional label)", value="interactive", scale=1, ) setup_shell = gr.Textbox( label="Setup shell (optional, runs before opencode)", value="", placeholder="e.g. pip install polars", scale=3, ) with gr.Row(): run_btn = gr.Button("▶ Run", variant="primary", scale=2) abort_btn = gr.Button("⏹ Abort", variant="stop", scale=1) reset_btn = gr.Button("🔄 Reset", variant="secondary", scale=1) check_btn = gr.Button("🔎 Check endpoint", scale=1) # ── Output: chat-style single-column ────────────────────────────── # Transcript is the hero. The status line above it carries a # sandbox-boot phase indicator so users know whether we're # spawning E2B, installing opencode, or waiting for the agent. # Everything else (reward, files, logprob trace, verifier, raw # JSON) lives in collapsed accordions below. Matches the chat # shape of local_ui.py. status = gr.Markdown() # Shared state: the active rollout_id so Abort and Reset can find it. rollout_state = gr.State("") transcript_html = gr.HTML( value="
run a rollout to see the transcript
", ) # Hidden outputs retained only so the streaming handler's tuple # shape doesn't have to change. They never render in the UI. reward_out = gr.Number(visible=False) wall_out = gr.Number(visible=False) exit_out = gr.Number(visible=False) turns_out = gr.Number(visible=False) with gr.Accordion("Workdir files", open=False): workdir_md = gr.Markdown() with gr.Accordion("Proxy trace (per turn — logprobs)", open=False): proxy_trace_json = gr.JSON(label=None) with gr.Accordion("Diagnostics (proxy · install · agent logs)", open=False): verifier_out = gr.Textbox(label="proxy/install/agent log tails", lines=12) verifier_err = gr.Textbox(label="primitive error (if any)", lines=3) with gr.Accordion("Raw result JSON", open=False): raw_json = gr.JSON(label=None) # ── Streaming Run handler ───────────────────────────────────────── def _run_streaming( backend_mode_v: str, vllm_model_v: str, vllm_url_v: str, hosted_model_v: str, hosted_custom_id_v: str, hf_token_v: str, thinking_v: bool, mode_v: str, max_tokens_cap_v: int, agent_timeout_s_v: float, task_id_v: str, instruction_v: str, setup_shell_v: str, ): # Verifier is optional. For interactive use we pass an empty # test_script so the finalizer skips scoring. test_script_v = "" # Assemble the uniform model_key from the UI's two-backend picker. if backend_mode_v == "Self-hosted vLLM": if not vllm_model_v.strip(): yield _error_tuple("Self-hosted vLLM requires a model id.") return model_key_v = f"vllm://{vllm_model_v.strip()}" else: if hosted_model_v == _CUSTOM_HF_KEY: cid = hosted_custom_id_v.strip() if not cid: yield _error_tuple( "Hosted 'Custom' picked but no model id entered." ) return if not cid.startswith("hf-router://"): # Accept either plain "Org/Repo[:provider]" or a # fully-prefixed key. cid = f"hf-router://{cid}" model_key_v = cid else: model_key_v = hosted_model_v """Gradio generator: yields UI updates as the rollout progresses. Uses the non-blocking fine-grained tools: start_rollout → loop(get_state) → finalize_rollout """ import httpx from openenv.core.env_server.mcp_types import CallToolAction # 0) Resolve the catalog pick into (base_url, api_key, model). # This validates the secret matches the selected backend. try: base_url, _api_key, _model, entry = resolve_endpoint( model_key_v, vllm_url=vllm_url_v, hf_token=hf_token_v, ) except Exception as exc: yield _error_tuple(f"config: {exc}") return # 1) Pre-flight: verify the endpoint is reachable before burning # an E2B sandbox on a URL typo / bad token. yield ( "🔎 **validating endpoint…**", None, None, None, 0, "", [], "", "", {"stage": "validate", "backend": entry.backend}, "
validating endpoint…
", "", ) probe_headers: dict[str, str] = {} if entry.backend == "hf_router": probe_headers["Authorization"] = f"Bearer {hf_token_v}" try: r = httpx.get( f"{base_url}/models", headers=probe_headers, timeout=15, ) if r.status_code != 200: yield _error_tuple( f"{entry.backend} probe {base_url}/models → HTTP {r.status_code}: " f"{r.text[:200]}" ) return except Exception as exc: yield _error_tuple( f"endpoint unreachable: {type(exc).__name__}: {exc}" ) return yield ( "🟡 **initialising env (creating MCP registry)…**", None, None, None, 0, "", [], "", "", {"stage": "env_init"}, "
initialising env…
", "", ) try: env = _get_env() env.reset() except Exception as exc: yield _error_tuple(f"env init failed: {type(exc).__name__}: {exc}") return # 2) start_rollout — uniform args: model_key + vllm_url + hf_token # + thinking. The env resolves via the catalog server-side. try: start_obs = env.step( CallToolAction( tool_name="start_rollout", arguments={ "model_key": model_key_v, "vllm_url": vllm_url_v, "hf_token": hf_token_v, "thinking": bool(thinking_v), "instruction": instruction_v, "test_script": test_script_v, "task_id": task_id_v, "setup_shell": setup_shell_v, "upload_files": {}, "mode": mode_v, "max_tokens_cap": int(max_tokens_cap_v), "agent_timeout_s": float(agent_timeout_s_v), }, ), timeout_s=60, ) except Exception as exc: yield _error_tuple(f"start_rollout failed: {type(exc).__name__}: {exc}") return start_payload = _parse_result(start_obs) rollout_id = start_payload.get("rollout_id") if not rollout_id: yield _error_tuple(f"start_rollout returned no rollout_id: {start_payload}") return # Initial UI update — yield the rollout_id into shared state so # Abort / Reset can target the right rollout. yield ( f"🟡 **rollout `{rollout_id}` started — booting sandbox…**", None, None, None, 0, "_(no files yet)_", [], "", "", start_payload, "
booting sandbox — this takes ~20–40s cold…
", rollout_id, ) # 2) Poll get_state + get_messages at 1s cadence. Show a sandbox # boot-phase label so users can tell "booting" from "stuck". deadline = time.time() + float(agent_timeout_s_v) + 120 t_started = float(start_payload.get("started_at") or time.time()) status_str = "running" while time.time() < deadline: try: state_obs = env.step( CallToolAction( tool_name="get_state", arguments={"rollout_id": rollout_id}, ), timeout_s=20, ) state_payload = _parse_result(state_obs) except Exception as exc: state_payload = {"error": f"{type(exc).__name__}: {exc}"} # Live transcript — only meaningful once opencode serve has # created its session (state_payload carries serve_session_id # in that case). Before that, get_messages returns an empty # list with a ``note`` field. parts_list: list = [] transcript = "
waiting for first part…
" try: msg_obs = env.step( CallToolAction( tool_name="get_messages", arguments={"rollout_id": rollout_id}, ), timeout_s=20, ) msg_payload = _parse_result(msg_obs) parts_list = collect_parts_from_messages( msg_payload.get("messages") or [] ) if parts_list: transcript = render_transcript(parts_list) except Exception: pass status_str = state_payload.get("status", "?") elapsed = time.time() - t_started msg_count = len( (state_payload.get("messages") if isinstance(state_payload, dict) else None) or [] ) # Prefer message count from the transcript payload. try: msg_count = len(msg_payload.get("messages") or []) except Exception: msg_count = 0 phase = _boot_phase(state_payload, msg_count, len(parts_list)) yield ( f"{phase} · elapsed `{elapsed:.1f}s` · rollout `{rollout_id}`", None, None, None, state_payload.get("proxy_turns_so_far", 0), "_(workdir populated on finalize)_", [], "", "", state_payload, transcript, rollout_id, ) if status_str == "done": break time.sleep(1.0) # 3) finalize_rollout — run verifier + collect full result try: final_obs = env.step( CallToolAction( tool_name="finalize_rollout", arguments={"rollout_id": rollout_id, "wait_s": 60}, ), timeout_s=300, ) except Exception as exc: yield _error_tuple(f"finalize_rollout failed: {type(exc).__name__}: {exc}") return result = _parse_result(final_obs) status_md = _summarize_status(result) wd_md = _render_workdir(result.get("workdir_files") or {}) turns = result.get("proxy_turns") or [] # One last transcript fetch — captures any final parts that # arrived between the last poll and session.idle. final_transcript = "
(transcript unavailable)
" try: msg_obs = env.step( CallToolAction( tool_name="get_messages", arguments={"rollout_id": rollout_id}, ), timeout_s=30, ) msg_payload = _parse_result(msg_obs) parts = collect_parts_from_messages(msg_payload.get("messages") or []) final_transcript = render_transcript(parts) except Exception: pass # Diagnostics pane: concat the three log tails so failures # are visible without expanding the raw JSON. diag_tail = "\n".join([ "--- PROXY LOG TAIL ---", (result.get("proxy_log_tail") or "(empty)")[-2000:], "", "--- INSTALL LOG TAIL ---", (result.get("install_log_tail") or "(empty)")[-1000:], "", "--- AGENT LOG TAIL ---", (result.get("agent_log_tail") or "(empty)")[-2000:], ]) err_line = result.get("error") or "" yield ( status_md, result.get("reward"), result.get("wall_s"), result.get("exit_code"), len(turns), wd_md, turns, diag_tail, err_line, result, final_transcript, rollout_id, ) _output_widgets = [ status, reward_out, wall_out, exit_out, turns_out, workdir_md, proxy_trace_json, verifier_out, verifier_err, raw_json, transcript_html, rollout_state, ] run_btn.click( _run_streaming, inputs=[ backend_mode, vllm_model, vllm_url, hosted_model, hosted_custom_id, hf_token, thinking, mode, max_tokens_cap, agent_timeout_s, task_id, instruction, setup_shell, ], outputs=_output_widgets, ) # Check-endpoint handler — cheap GET /v1/models probe against the # currently-configured backend. def _check_endpoint( backend_mode_v: str, vllm_model_v: str, vllm_url_v: str, hosted_model_v: str, hosted_custom_id_v: str, hf_token_v: str, ) -> str: import httpx if backend_mode_v == "Self-hosted vLLM": model_key_v = f"vllm://{(vllm_model_v or '').strip()}" else: if hosted_model_v == _CUSTOM_HF_KEY: cid = (hosted_custom_id_v or "").strip() if not cid: return "❌ custom HF model id is empty" model_key_v = cid if cid.startswith("hf-router://") else f"hf-router://{cid}" else: model_key_v = hosted_model_v try: base_url, _key, _model, entry = resolve_endpoint( model_key_v, vllm_url=vllm_url_v, hf_token=hf_token_v, ) except Exception as exc: return f"❌ {exc}" headers = {"Authorization": f"Bearer {hf_token_v}"} if entry.backend == "hf_router" else {} models_url = f"{base_url}/models" try: r = httpx.get(models_url, headers=headers, timeout=15) except Exception as exc: return f"❌ `{models_url}` unreachable: `{type(exc).__name__}: {exc}`" if r.status_code != 200: return f"❌ `{models_url}` → HTTP {r.status_code}\n```\n{r.text[:400]}\n```" try: ids = [m.get("id") for m in r.json().get("data", []) if m.get("id")] except Exception: ids = [] hint = f" · backend=`{entry.backend}` · resolved=`{_model}`" if ids: shown = ", ".join(ids[:5]) + (f", … (+{len(ids)-5} more)" if len(ids) > 5 else "") return f"✅ reachable{hint} · models: `{shown}`" return f"⚠️ reachable (HTTP 200) but no `data[*].id` in response{hint}" check_btn.click( _check_endpoint, inputs=[backend_mode, vllm_model, vllm_url, hosted_model, hosted_custom_id, hf_token], outputs=[status], ) # ── Abort handler ──────────────────────────────────────────────── # Fire-and-forget abort on the active rollout. Keeps the env + UI # state so the user can see what the transcript looked like at the # moment of abort. def _abort(current_rollout_id: str) -> tuple: from openenv.core.env_server.mcp_types import CallToolAction if not current_rollout_id: return ( "⚠️ nothing to abort (no active rollout).", None, None, None, None, "", [], "", "", {"abort": "no-op"}, gr.update(), current_rollout_id, ) try: env = _get_env() env.step( CallToolAction( tool_name="abort_rollout", arguments={"rollout_id": current_rollout_id}, ), timeout_s=30, ) except Exception as exc: # noqa: BLE001 return ( f"⚠️ abort failed: `{type(exc).__name__}: {exc}`", None, None, None, None, "", [], "", "", {"abort": str(exc)}, gr.update(), current_rollout_id, ) return ( f"⏹ **aborted** rollout `{current_rollout_id}`", None, None, None, None, "", [], "", "", {"abort": current_rollout_id}, gr.update(), current_rollout_id, ) abort_btn.click( _abort, inputs=[rollout_state], outputs=_output_widgets, ) # ── Reset handler ──────────────────────────────────────────────── # Aborts any in-flight rollout, drops the cached env so the next Run # creates a fresh :class:`OpenCodeEnvironment` (new MCP registry), # and clears all UI panels including the transcript. def _reset(current_rollout_id: str) -> tuple: from openenv.core.env_server.mcp_types import CallToolAction if current_rollout_id: try: env = _get_env() env.step( CallToolAction( tool_name="abort_rollout", arguments={"rollout_id": current_rollout_id}, ), timeout_s=30, ) except Exception: # Best-effort — if abort fails, still drop the env below # so the next Run starts clean. pass _env_cache["instance"] = None return ( "🔄 **reset.** next Run will create a fresh environment.", None, None, None, None, "_(workdir cleared)_", [], "", "", {"reset": True}, "
run a rollout to see the transcript
", "", ) reset_btn.click( _reset, inputs=[rollout_state], outputs=_output_widgets, ) return demo # ── Helpers ───────────────────────────────────────────────────────────────── def _error_tuple(msg: str, rollout_id: str = "") -> tuple: return ( f"❌ **Error:** `{msg}`", None, None, None, None, "", [], "", "", {"error": msg}, f"
❌ {msg}
", rollout_id, ) def _boot_phase(state: dict, msg_count: int, parts_count: int) -> str: """Human-readable sandbox + session boot phase label.""" if state.get("error"): return f"⚠️ state error: `{state.get('error')}`" status = state.get("status", "?") if status == "unknown": return "⏳ **starting rollout…**" serve_sid = state.get("serve_session_id") if not serve_sid: return ( "🟡 **booting sandbox** — spawning E2B, installing opencode, " "starting proxy + opencode serve (this takes ~20–40s cold)" ) if msg_count == 0: return "🟡 **creating session** — serve is up, prompt about to fire" if parts_count == 0: return "💭 **agent thinking** — first LLM call in flight" turns = state.get("proxy_turns_so_far", 0) return ( f"⚡ **running** · serve session `{serve_sid[:14]}…` · " f"parts `{parts_count}` · turns `{turns}`" ) def _parse_result(raw: Any) -> dict[str, Any]: """Unwrap the server's JSON tool result into a plain dict.""" # Object with attribute chain: obs.result.content[0].text inner = getattr(raw, "result", None) if inner is not None: content = getattr(inner, "content", None) if content: first = content[0] text = getattr(first, "text", None) if isinstance(text, str): try: return json.loads(text) except Exception: return {"raw": text} if isinstance(raw, dict): content = raw.get("content") if isinstance(content, list) and content: first = content[0] text = first.get("text") if isinstance(first, dict) else None if isinstance(text, str): try: return json.loads(text) except Exception: return {"raw": text} return raw if isinstance(raw, str): try: return json.loads(raw) except Exception: return {"raw": raw} return {"raw": str(raw)} def _summarize_status(result: dict[str, Any]) -> str: if result.get("error"): return f"❌ **Error:** `{result['error']}`" reward = result.get("reward") turns = result.get("proxy_turns") or [] wall = result.get("wall_s", 0.0) sb = result.get("sandbox_id", "") exit_code = result.get("exit_code") parts = [ f"**reward** = `{reward}`", f"**wall** = `{wall}s`", f"**turns** = `{len(turns)}`", f"**exit** = `{exit_code}`", ] if sb: parts.append(f"**sandbox** = `{sb}`") return "✅ " + " · ".join(parts) def _render_workdir(files: dict[str, str]) -> str: if not files: return "_(no files produced)_" lines = [] for path, contents in files.items(): lines.append(f"### `{path}`") lines.append("") lines.append("```") lines.append((contents or "").rstrip()[:2000]) lines.append("```") return "\n".join(lines)