Spaces:
Sleeping
Sleeping
| # Copyright (c) Meta Platforms, Inc. and affiliates. | |
| # All rights reserved. | |
| # | |
| # This source code is licensed under the BSD-style license found in the | |
| # LICENSE file in the root directory of this source tree. | |
| """Minimal Gradio UI for opencode_env. | |
| Mounts under the standard OpenEnv ``/web`` path via the | |
| ``gradio_builder=`` callback documented at | |
| https://meta-pytorch.org/OpenEnv/customizing-web-ui.html. | |
| One page with: | |
| - endpoint selector (``vllm`` / ``openai`` / ``hf_router``) β the catalog | |
| resolves the actual base_url / api_key / model from env vars. | |
| - instruction + setup (bash, one cmd per line) + verify (bash, one cmd | |
| per line) textareas β the same Task shape the MCP tool accepts. | |
| - Tunables (mode, disable_thinking, max_tokens_cap, top_logprobs, | |
| agent_timeout_s, template). | |
| - Preset buttons for the ready-made example tasks. | |
| - Run button β result panel with reward, setup/verify per-command | |
| results, file outputs, logprob stats, agent + proxy log tails, | |
| and the raw RolloutResult JSON. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| from typing import Any | |
| import gradio as gr | |
| try: | |
| from .catalog import ENDPOINT_KINDS, catalog_summary, resolve_endpoint | |
| from .opencode_environment import OpenCodeEnvironment | |
| except ImportError: # pragma: no cover | |
| from server.catalog import ENDPOINT_KINDS, catalog_summary, resolve_endpoint # type: ignore | |
| from server.opencode_environment import OpenCodeEnvironment # type: ignore | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Preset task examples β each fills (instruction, setup, verify). | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| PRESETS: dict[str, dict[str, str]] = { | |
| "binary_search": { | |
| "instruction": ( | |
| "Create a single Python file named `binary_search.py` in the " | |
| "current working directory. Use the relative path `binary_search.py`. " | |
| "Expose exactly one function:\n" | |
| " def binary_search(arr: list[int], target: int) -> int\n" | |
| "Return the index of `target` in the sorted list `arr`, or -1 if " | |
| "absent. Use the binary-search algorithm; do not call list.index." | |
| ), | |
| "setup": "", | |
| "verify": ( | |
| "test -f /home/user/workdir/binary_search.py\n" | |
| "python -c \"import sys; sys.path.insert(0, '/home/user/workdir'); " | |
| "import binary_search; " | |
| "assert binary_search.binary_search([1,2,3,4,5], 3) == 2; " | |
| "assert binary_search.binary_search([1,2,3], 99) == -1; " | |
| "assert binary_search.binary_search([], 1) == -1; " | |
| "print('OK')\"" | |
| ), | |
| }, | |
| "fizzbuzz": { | |
| "instruction": ( | |
| "Create `fizzbuzz.py` in the current directory exposing " | |
| "`def fizzbuzz(n: int) -> list[str]` that returns the FizzBuzz " | |
| "sequence for the integers 1..n. 'Fizz' for multiples of 3, 'Buzz' " | |
| "for 5, 'FizzBuzz' for both, otherwise the number as a string." | |
| ), | |
| "setup": "", | |
| "verify": ( | |
| "test -f /home/user/workdir/fizzbuzz.py\n" | |
| "python -c \"import sys; sys.path.insert(0, '/home/user/workdir'); " | |
| "import fizzbuzz; " | |
| "assert fizzbuzz.fizzbuzz(5) == ['1','2','Fizz','4','Buzz']; " | |
| "assert fizzbuzz.fizzbuzz(15)[-1] == 'FizzBuzz'; " | |
| "print('OK')\"" | |
| ), | |
| }, | |
| "pandas_csv": { | |
| "instruction": ( | |
| "Read `/home/user/data/numbers.csv` (a CSV with a single column " | |
| "`x` of integers) using pandas. Compute the mean of the `x` " | |
| "column and write it as a single float to `/home/user/workdir/mean.txt` " | |
| "(no extra characters, no newline)." | |
| ), | |
| "setup": ( | |
| "pip install --quiet pandas\n" | |
| "mkdir -p /home/user/data\n" | |
| "printf 'x\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n10\\n' > /home/user/data/numbers.csv" | |
| ), | |
| "verify": ( | |
| "test -f /home/user/workdir/mean.txt\n" | |
| "python -c \"v=float(open('/home/user/workdir/mean.txt').read().strip()); " | |
| "assert abs(v-5.5) < 1e-6, v; print('mean=', v)\"" | |
| ), | |
| }, | |
| } | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Result rendering helpers | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _split_commands(text: str) -> list[str]: | |
| return [line for line in (text or "").splitlines() if line.strip()] | |
| def _badge_for_reward(reward: float | None) -> str: | |
| if reward is None: | |
| return "**reward**: _n/a_" | |
| if reward >= 0.999: | |
| emoji = "[PASS]" | |
| elif reward > 0.0: | |
| emoji = "[PARTIAL]" | |
| else: | |
| emoji = "[FAIL]" | |
| return f"### {emoji} reward = `{reward:.2f}`" | |
| def _summary_md(result: dict[str, Any]) -> str: | |
| parts = [_badge_for_reward(result.get("reward"))] | |
| parts.append( | |
| f"**sandbox**: `{result.get('sandbox_id') or 'n/a'}` Β· " | |
| f"**wall**: `{result.get('wall_s', 0):.1f}s` Β· " | |
| f"**agent_exit**: `{result.get('agent_exit_code')}` Β· " | |
| f"**mode**: `{result.get('mode', 'n/a')}`" | |
| ) | |
| if result.get("error"): | |
| parts.append(f"**error**: `{result['error']}`") | |
| return "\n\n".join(parts) | |
| def _command_rows(items: list[dict[str, Any]]) -> list[list[str]]: | |
| rows: list[list[str]] = [] | |
| for it in items or []: | |
| cmd = it.get("cmd", "") | |
| rows.append( | |
| [ | |
| cmd if len(cmd) <= 80 else cmd[:77] + "...", | |
| str(it.get("exit_code", "")), | |
| f"{it.get('duration_s', 0):.2f}s", | |
| (it.get("stderr") or "").splitlines()[-1][:80] if it.get("exit_code") else "", | |
| ] | |
| ) | |
| return rows | |
| def _logprobs_md(turns: list[dict[str, Any]]) -> str: | |
| if not turns: | |
| return "_No proxy turns captured._\n\nThis is normal in `black_box` mode. In `transparent_proxy` mode, an empty list usually means the agent never made an LLM call (check the agent log)." | |
| n = len(turns) | |
| productive = sum(1 for t in turns if t.get("completion_tokens")) | |
| total_toks = sum(len(t.get("completion_tokens") or []) for t in turns) | |
| all_lps = [ | |
| float(x) | |
| for t in turns | |
| for x in (t.get("per_token_logps") or []) | |
| if x is not None | |
| ] | |
| mean_lp = (sum(all_lps) / len(all_lps)) if all_lps else None | |
| lines = [ | |
| f"**turns**: `{n}` Β· **productive**: `{productive}` Β· " | |
| f"**total_completion_tokens**: `{total_toks}`", | |
| ] | |
| if mean_lp is not None: | |
| lines.append(f"**mean_logprob**: `{mean_lp:+.4f}`") | |
| finishes: dict[str, int] = {} | |
| for t in turns: | |
| f = t.get("finish_reason") or "unknown" | |
| finishes[f] = finishes.get(f, 0) + 1 | |
| if finishes: | |
| lines.append( | |
| "**finish_reasons**: " + " ".join(f"`{k}={v}`" for k, v in finishes.items()) | |
| ) | |
| productive_rows = [t for t in turns if t.get("completion_tokens")] | |
| if productive_rows: | |
| first = productive_rows[0] | |
| toks = first["completion_tokens"][:10] | |
| lps = first.get("per_token_logps") or [] | |
| lines.append( | |
| f"\n**first productive turn (first 10 tokens)**\n\n" | |
| f"```\n" | |
| + "\n".join( | |
| f" {tok!r:<14} {lp:+.3f}" if i < len(lps) else f" {tok!r:<14} -" | |
| for i, (tok, lp) in enumerate(zip(toks, lps + [None] * len(toks))) | |
| ) | |
| + "\n```" | |
| ) | |
| return "\n\n".join(lines) | |
| def _live_status_md( | |
| endpoint_kind: str, | |
| model: str, | |
| mode: str, | |
| elapsed_s: float, | |
| lines: list[tuple[float, str]], | |
| ) -> str: | |
| """Render a live phase log (latest at the bottom) with elapsed timestamps.""" | |
| head = ( | |
| f"### running⦠`elapsed={elapsed_s:.1f}s`\n\n" | |
| f"_endpoint=`{endpoint_kind}` model=`{model}` mode=`{mode}`_\n\n" | |
| ) | |
| if not lines: | |
| body = "_(waiting for first phase updateβ¦)_" | |
| else: | |
| # Show the most recent ~12 lines so the panel doesn't grow unbounded. | |
| rows = ["| t (s) | phase |", "|---|---|"] | |
| for ts, msg in lines[-12:]: | |
| rows.append(f"| `{ts:>6.1f}` | {msg.replace(chr(10), ' ')[:200]} |") | |
| body = "\n".join(rows) | |
| return head + body | |
| def _files_md(files: dict[str, str]) -> str: | |
| if not files: | |
| return "_No files in the workdir._" | |
| chunks = [] | |
| for path, content in files.items(): | |
| chunks.append(f"**`{path}`**\n```python\n{content[:4000]}\n```") | |
| return "\n\n".join(chunks) | |
| def _catalog_banner() -> str: | |
| rows = ["**Endpoint catalog (env vars + defaults)**", ""] | |
| rows.append("| kind | base_url | model | env vars | configured |") | |
| rows.append("|---|---|---|---|---|") | |
| for entry in catalog_summary(): | |
| envs = ( | |
| f"`{entry['base_url_env']}`<br/>`{entry['api_key_env']}`<br/>" | |
| f"`{entry['model_env']}`" | |
| ) | |
| ok = "yes" if entry["configured"] else "**no**" | |
| rows.append( | |
| f"| `{entry['kind']}` | `{entry['default_base_url'] or '-'}` | " | |
| f"`{entry['default_model'] or '-'}` | {envs} | {ok} |" | |
| ) | |
| return "\n".join(rows) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Builder | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def opencode_gradio_builder( | |
| web_manager, # noqa: ARG001 (unused: we instantiate the env directly) | |
| action_fields, # noqa: ARG001 | |
| metadata, # noqa: ARG001 | |
| is_chat_env, # noqa: ARG001 | |
| title, | |
| quick_start_md, # noqa: ARG001 | |
| ) -> gr.Blocks: | |
| """Build the opencode_env console. | |
| Compatible with ``create_app(..., gradio_builder=...)``. We ignore | |
| ``web_manager`` and instantiate :class:`OpenCodeEnvironment` ourselves | |
| inside the run handler β opencode_env's run_rollout doesn't need any | |
| per-session state beyond the env's own bookkeeping, and instantiating | |
| is cheap (no sandbox is created until the tool fires). | |
| """ | |
| def run( | |
| endpoint: str, | |
| model: str, | |
| base_url: str, | |
| api_key: str, | |
| instruction: str, | |
| setup_text: str, | |
| verify_text: str, | |
| mode: str, | |
| disable_thinking: str, | |
| template: str, | |
| max_tokens_cap: int, | |
| top_logprobs: int, | |
| agent_timeout_s: float, | |
| ): | |
| """Generator handler β yields incremental UI updates. | |
| Each ``yield`` is a tuple matching ``outputs=[...]``: | |
| (summary_md, setup_table, verify_table, files_md, logprobs_md, | |
| logs_md, raw_json). Early yields keep summary_md as a live phase | |
| log while the rollout runs; the final yield populates everything. | |
| """ | |
| import queue | |
| import threading | |
| import time | |
| # Resolve endpoint up front β if this fails, we can return one | |
| # immediate result with no streaming needed. | |
| try: | |
| resolved = resolve_endpoint( | |
| endpoint, base_url=base_url, api_key=api_key, model=model | |
| ) | |
| except ValueError as exc: | |
| err = f"endpoint resolution failed: {exc}" | |
| yield (f"### error\n\n```\n{err}\n```", [], [], "", "", "", {"error": err}) | |
| return | |
| # Translate "auto" / "on" / "off" into bool / None. | |
| if disable_thinking == "on": | |
| dt: bool | None = True | |
| elif disable_thinking == "off": | |
| dt = False | |
| else: | |
| dt = None | |
| env = OpenCodeEnvironment() | |
| # The worker fires _run_rollout_impl in a background thread and | |
| # streams progress messages into a queue; this generator polls the | |
| # queue every 0.5s and yields a refreshed status_md to the UI. | |
| status_q: queue.Queue = queue.Queue() | |
| result_holder: dict = {} | |
| def _cb(msg: str) -> None: | |
| status_q.put(("status", msg, time.time())) | |
| def _worker(): | |
| try: | |
| payload = env._run_rollout_impl( | |
| base_url=resolved.base_url, | |
| api_key=resolved.api_key, | |
| model=resolved.model, | |
| instruction=instruction, | |
| setup=_split_commands(setup_text), | |
| verify=_split_commands(verify_text), | |
| task_id="ui", | |
| mode=mode, | |
| disable_thinking=( | |
| dt if dt is not None else resolved.disable_thinking_default | |
| ), | |
| max_tokens_cap=int(max_tokens_cap), | |
| top_logprobs=int(top_logprobs), | |
| agent_timeout_s=float(agent_timeout_s), | |
| template=template, | |
| progress_cb=_cb, | |
| ) | |
| result_holder["payload"] = payload | |
| except Exception as exc: # noqa: BLE001 | |
| result_holder["error"] = f"{type(exc).__name__}: {exc}" | |
| status_q.put(("error", result_holder["error"], time.time())) | |
| finally: | |
| status_q.put(("done", None, time.time())) | |
| worker = threading.Thread(target=_worker, daemon=True) | |
| t_start = time.time() | |
| worker.start() | |
| # First yield: announce we've started. Empty result panels. | |
| yield ( | |
| f"### runningβ¦\n\n_endpoint=`{resolved.kind}` model=`{resolved.model}` mode=`{mode}`_", | |
| [], [], "", "", "", {}, | |
| ) | |
| status_lines: list[tuple[float, str]] = [] | |
| finished = False | |
| while not finished: | |
| try: | |
| kind, msg, ts = status_q.get(timeout=0.5) | |
| if kind == "status": | |
| status_lines.append((ts - t_start, msg)) | |
| elif kind == "error": | |
| status_lines.append((ts - t_start, f"ERROR: {msg}")) | |
| elif kind == "done": | |
| finished = True | |
| except queue.Empty: | |
| pass | |
| # Render the live status pane. | |
| elapsed = time.time() - t_start | |
| md = _live_status_md(resolved.kind, resolved.model, mode, elapsed, status_lines) | |
| yield (md, [], [], "", "", "", {}) | |
| # Drain any final messages still in the queue. | |
| while not status_q.empty(): | |
| try: | |
| kind, msg, ts = status_q.get_nowait() | |
| if kind == "status": | |
| status_lines.append((ts - t_start, msg)) | |
| except queue.Empty: | |
| break | |
| if "payload" not in result_holder: | |
| err = result_holder.get("error", "unknown error") | |
| yield ( | |
| f"### error\n\n```\n{err}\n```", | |
| [], [], "", "", | |
| _live_status_md(resolved.kind, resolved.model, mode, | |
| time.time() - t_start, status_lines), | |
| {"error": err}, | |
| ) | |
| return | |
| result = json.loads(result_holder["payload"]) | |
| yield ( | |
| _summary_md(result), | |
| _command_rows(result.get("setup_results") or []), | |
| _command_rows(result.get("verify_results") or []), | |
| _files_md(result.get("files") or {}), | |
| _logprobs_md(result.get("proxy_turns") or []), | |
| ( | |
| f"### live phase log\n\n" | |
| + _live_status_md(resolved.kind, resolved.model, mode, | |
| time.time() - t_start, status_lines) | |
| + f"\n\n### agent log (tail)\n```\n{result.get('agent_log_tail', '')[:4000]}\n```\n\n" | |
| f"### proxy log (tail)\n```\n{result.get('proxy_log_tail', '')[:4000]}\n```" | |
| ), | |
| result, | |
| ) | |
| def apply_preset(name: str) -> tuple[str, str, str]: | |
| p = PRESETS.get(name) or {"instruction": "", "setup": "", "verify": ""} | |
| return p["instruction"], p["setup"], p["verify"] | |
| with gr.Blocks(title=title or "opencode_env") as app: | |
| gr.Markdown(f"# {title or 'opencode_env'}") | |
| gr.Markdown( | |
| "Run one OpenCode rollout in an E2B sandbox against your chosen " | |
| "LLM endpoint. Pick an endpoint, write the task as `(instruction, " | |
| "setup, verify)`, and inspect the reward + per-token logprobs." | |
| ) | |
| gr.Markdown(_catalog_banner()) | |
| with gr.Row(): | |
| endpoint = gr.Dropdown( | |
| choices=list(ENDPOINT_KINDS), | |
| value="openai", | |
| label="Endpoint", | |
| scale=1, | |
| ) | |
| model = gr.Textbox( | |
| label="Model (blank β catalog default)", placeholder="gpt-4o-mini", | |
| scale=2, | |
| ) | |
| with gr.Row(): | |
| base_url = gr.Textbox( | |
| label="Base URL (blank β env / catalog default)", | |
| placeholder="https://api.openai.com/v1", scale=2, | |
| ) | |
| api_key = gr.Textbox( | |
| label="API key (blank β server env var)", | |
| placeholder="(server env)", type="password", scale=1, | |
| ) | |
| instruction = gr.Textbox( | |
| label="Instruction (the prompt opencode runs)", | |
| lines=4, | |
| value=PRESETS["binary_search"]["instruction"], | |
| ) | |
| with gr.Row(): | |
| setup_text = gr.Textbox( | |
| label="Setup (one bash command per line β runs BEFORE the agent)", | |
| lines=5, | |
| value=PRESETS["binary_search"]["setup"], | |
| ) | |
| verify_text = gr.Textbox( | |
| label="Verify (one bash command per line β runs AFTER the agent)", | |
| lines=5, | |
| value=PRESETS["binary_search"]["verify"], | |
| ) | |
| with gr.Row(): | |
| preset_bs = gr.Button("preset Β· binary_search", size="sm") | |
| preset_fb = gr.Button("preset Β· fizzbuzz", size="sm") | |
| preset_pd = gr.Button("preset Β· pandas_csv", size="sm") | |
| with gr.Accordion("Tunables", open=False): | |
| with gr.Row(): | |
| mode = gr.Dropdown( | |
| choices=["transparent_proxy", "black_box"], | |
| value="transparent_proxy", | |
| label="mode", | |
| ) | |
| disable_thinking = gr.Dropdown( | |
| choices=["auto", "on", "off"], | |
| value="auto", | |
| label="disable_thinking", | |
| ) | |
| template = gr.Textbox( | |
| label="E2B template (e.g. opencode-rl)", | |
| placeholder="(blank β cold install per rollout)", | |
| ) | |
| with gr.Row(): | |
| max_tokens_cap = gr.Number(value=4096, label="max_tokens_cap", step=1) | |
| top_logprobs = gr.Number(value=5, label="top_logprobs", step=1) | |
| agent_timeout_s = gr.Number(value=600, label="agent_timeout_s", step=1) | |
| run_btn = gr.Button("Run rollout", variant="primary") | |
| gr.Markdown("---") | |
| summary_md = gr.Markdown("_Submit a rollout above to see results._") | |
| with gr.Tabs(): | |
| with gr.Tab("Setup"): | |
| setup_table = gr.Dataframe( | |
| headers=["cmd", "exit", "duration", "stderr"], | |
| datatype=["str", "str", "str", "str"], | |
| interactive=False, | |
| wrap=True, | |
| ) | |
| with gr.Tab("Verify"): | |
| verify_table = gr.Dataframe( | |
| headers=["cmd", "exit", "duration", "stderr"], | |
| datatype=["str", "str", "str", "str"], | |
| interactive=False, | |
| wrap=True, | |
| ) | |
| with gr.Tab("Files"): | |
| files_md = gr.Markdown("") | |
| with gr.Tab("Logprobs"): | |
| logprobs_md = gr.Markdown("") | |
| with gr.Tab("Logs"): | |
| logs_md = gr.Markdown("") | |
| with gr.Tab("Raw JSON"): | |
| raw_json = gr.JSON(value={}) | |
| # Wire it up. | |
| for btn, name in [ | |
| (preset_bs, "binary_search"), | |
| (preset_fb, "fizzbuzz"), | |
| (preset_pd, "pandas_csv"), | |
| ]: | |
| btn.click( | |
| fn=lambda n=name: apply_preset(n), | |
| outputs=[instruction, setup_text, verify_text], | |
| ) | |
| run_btn.click( | |
| fn=run, | |
| inputs=[ | |
| endpoint, model, base_url, api_key, | |
| instruction, setup_text, verify_text, | |
| mode, disable_thinking, template, | |
| max_tokens_cap, top_logprobs, agent_timeout_s, | |
| ], | |
| outputs=[ | |
| summary_md, setup_table, verify_table, | |
| files_md, logprobs_md, logs_md, raw_json, | |
| ], | |
| ) | |
| return app | |