# Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. """OpenCode session factory + session implementation. Implements the :class:`ResourceSessionFactory` / :class:`ResourceSession` contracts from ``openenv.core.harness`` (PR #471). The session wraps one sandbox running the ``opencode`` CLI agent. Two operating modes: - ``mode="black_box"`` — opencode talks directly to ``config.base_url``. No proxy, no logprob capture. Use for smoke tests / SFT / eval. - ``mode="transparent_proxy"`` (default) — an in-sandbox FastAPI proxy sits between opencode and the upstream LLM. It injects ``logprobs=true`` on every request and writes per-turn ``(messages, completion_tokens, per_token_logps)`` to ``proxy_trace.jsonl`` for GRPO consumption. Single driver path: opencode is started as a background subprocess via ``opencode run --format json --dangerously-skip-permissions ...`` and we poll its exit code. The previous ``opencode serve`` driver was removed — opencode CLI is the only path now. """ from __future__ import annotations from pathlib import Path from typing import Any, Callable, Literal from openenv.core.env_server.mcp_types import Tool from openenv.core.harness import ( Message, ResourceSession, ResourceSessionFactory, ToolResult, VerifyResult, ) from .config import OpenCodeConfig from .opencode_runtime import ( agent_log_path, build_env_vars, build_install_cmd, build_opencode_json, build_run_cmd, instruction_path, opencode_config_path, system_prompt_path, ) from .sandbox.base import BgJob, SandboxBackend, SandboxHandle from .task import OpenCodeTask # Inside-sandbox proxy paths (Mode B). _PROXY_PORT = 7000 _PROXY_TRACE_PATH = "/home/user/logs/agent/proxy_trace.jsonl" _PROXY_LOG_PATH = "/home/user/logs/agent/proxy.log" # Where the proxy source lives on disk (in this repo). Uploaded into the # sandbox at /home/user/proxy/interception.py before each rollout, unless # the sandbox was created from a template that already has it baked in. _PROXY_SOURCE_PATH = Path(__file__).parent / "sandbox" / "interception.py" Verifier = Callable[[SandboxHandle, OpenCodeTask], VerifyResult] class OpenCodeSession(ResourceSession): """One live OpenCode rollout inside a sandbox. The session is created already-running: :meth:`OpenCodeSessionFactory.create` calls :meth:`start_agent` before returning. Typical usage:: session = factory.create(task) session.wait_for_completion() result = session.verify([]) session.close() """ def __init__( self, *, sandbox: SandboxHandle, config: OpenCodeConfig, task: OpenCodeTask, verifier: Verifier | None = None, base_url_override: str | None = None, proxy_trace_path: str | None = None, proxy_bg_job: BgJob | None = None, ) -> None: self.sandbox = sandbox self.config = config self.task = task self._verifier = verifier self._base_url_override = base_url_override self._bg_job: BgJob | None = None self._proxy_trace_path = proxy_trace_path self._proxy_bg_job = proxy_bg_job # ------------------------------------------------------------------ # ResourceSession contract (PR #471) # ------------------------------------------------------------------ def initial_messages(self) -> list[Message]: return [{"role": "user", "content": self.task.instruction}] def list_tools(self) -> list[Tool]: # OpenCode owns its own tool loop — none are exposed to the harness. return [] def call_tool(self, name: str, arguments: dict[str, Any]) -> ToolResult: return ToolResult( error=( "OpenCodeSession does not expose external tool calls; the " "CLI agent owns its own tool loop." ) ) def verify( self, transcript: list[Message], final_state: Any | None = None, ) -> VerifyResult: if self._verifier is None: return VerifyResult(env_reward=None, done=True) return self._verifier(self.sandbox, self.task) def close(self) -> None: if self._bg_job is not None: try: self._bg_job.kill() except Exception: pass self._bg_job = None if self._proxy_bg_job is not None: try: self._proxy_bg_job.kill() except Exception: pass self._proxy_bg_job = None self.sandbox.kill() # ------------------------------------------------------------------ # OpenCode-specific session API # ------------------------------------------------------------------ def start_agent(self) -> None: """Launch ``opencode run`` as a background subprocess in the sandbox.""" if self._bg_job is not None: return cmd = build_run_cmd(self.config) envs = build_env_vars(self.config, base_url_override=self._base_url_override) self._bg_job = self.sandbox.start_bg(cmd, envs=envs) def wait_for_completion(self, timeout_s: float | None = None) -> int: """Block until the agent exits, returning its exit code.""" budget = timeout_s if timeout_s is not None else self.config.agent_timeout_s if self._bg_job is None: raise RuntimeError("Agent not started; call start_agent() first.") return self._bg_job.wait(timeout=budget) def fetch_trace(self) -> str: """Return the raw ``opencode run`` log (JSON-lines when ``run_format=json``).""" return self.sandbox.read_text(agent_log_path(self.config)) def fetch_proxy_trace(self) -> list[dict[str, Any]]: """Return per-turn proxy-captured records (Mode B only). Each entry has ``request``, ``response``, ``completion_tokens``, ``completion_token_ids``, ``per_token_logps``, ``finish_reason``, and ``latency_s``. Returns ``[]`` in Mode A. """ if self._proxy_trace_path is None: return [] try: content = self.sandbox.read_text(self._proxy_trace_path) except Exception: return [] records: list[dict[str, Any]] = [] for line in content.splitlines(): line = line.strip() if not line: continue import json as _json records.append(_json.loads(line)) return records class OpenCodeSessionFactory(ResourceSessionFactory): """Produce isolated per-rollout :class:`OpenCodeSession` instances. The factory owns sandbox provisioning, opencode install, config injection, and (Mode B) proxy startup. Each :meth:`create` call returns a fresh sandbox with a running agent. """ def __init__( self, *, config: OpenCodeConfig, sandbox_backend: SandboxBackend, mode: Literal["black_box", "transparent_proxy"] = "black_box", verifier: Verifier | None = None, install_timeout_s: int = 240, setup_timeout_s: int = 300, ) -> None: if mode not in {"black_box", "transparent_proxy"}: raise ValueError(f"Unknown mode: {mode!r}") self._config = config self._backend = sandbox_backend self._mode = mode self._verifier = verifier self._install_timeout_s = install_timeout_s self._setup_timeout_s = setup_timeout_s def create( self, task: Any, seed: int | None = None, episode_id: str | None = None, ) -> OpenCodeSession: import logging _log = logging.getLogger(__name__) oc_task = OpenCodeTask.coerce(task) sandbox_timeout = int(self._config.agent_timeout_s) + 300 _log.info( "factory.create: creating sandbox timeout=%ds mode=%s", sandbox_timeout, self._mode, ) sandbox = self._backend.create( timeout_s=sandbox_timeout, metadata={"episode_id": episode_id} if episode_id else None, ) sid = ( getattr(sandbox, "sandbox_id", None) or getattr(getattr(sandbox, "raw", None), "sandbox_id", "?") ) _log.info("factory.create: sandbox=%s — bootstrapping…", sid) try: self._bootstrap_sandbox(sandbox, oc_task) except Exception as exc: _log.error("factory.create: bootstrap failed: %r", exc) sandbox.kill() raise base_url_override: str | None = None proxy_trace_path: str | None = None proxy_bg_job: BgJob | None = None if self._mode == "transparent_proxy": _log.info( "factory.create: starting interception proxy on :%d → %s", _PROXY_PORT, self._config.base_url, ) proxy_bg_job, base_url_override, proxy_trace_path = self._start_proxy( sandbox ) _log.info("factory.create: proxy up at %s", base_url_override) # Rewrite opencode.json so opencode points at the proxy. Force # ``openai_compatible`` so opencode hits ``/v1/chat/completions`` # (which the proxy serves) rather than provider-specific paths. from .config import OpenCodeConfig as _OCC proxy_cfg = _OCC( **{ **self._config.model_dump(), "provider": "openai_compatible", "base_url": base_url_override, } ) sandbox.write_text( opencode_config_path(self._config), build_opencode_json(proxy_cfg), ) session = OpenCodeSession( sandbox=sandbox, config=self._config, task=oc_task, verifier=self._verifier, base_url_override=base_url_override, proxy_trace_path=proxy_trace_path, proxy_bg_job=proxy_bg_job, ) session.start_agent() return session # ------------------------------------------------------------------ def _wait_for_sandbox_ready( self, sandbox: SandboxHandle, *, attempts: int = 15, delay_s: float = 1.0, ) -> None: """Probe the sandbox until ``echo ok`` succeeds. E2B (and other backends) sometimes return the handle before the guest is fully ready. Issue ``echo ok`` with short timeouts until it succeeds. Returns silently on success; raises ``RuntimeError`` on prolonged failure. """ import time last_err = "" for _ in range(attempts): try: r = sandbox.exec("echo ok", timeout=5) if r.exit_code == 0 and "ok" in (r.stdout or ""): return last_err = (r.stderr or r.stdout or "").strip() or f"exit={r.exit_code}" except Exception as exc: # noqa: BLE001 last_err = f"{type(exc).__name__}: {exc}" time.sleep(delay_s) raise RuntimeError( f"sandbox did not become ready within {attempts * delay_s:.0f}s " f"(last error: {last_err})" ) def _exec_with_retry( self, sandbox: SandboxHandle, cmd: str, *, timeout: float, attempts: int = 3, backoff_s: float = 3.0, label: str = "cmd", ): """Run ``sandbox.exec`` with exponential backoff on transient failure. Transient = ``exit_code != 0`` AND empty stderr (SIGKILL / network blip signature) OR an exception during exec. Final failure is raised as ``RuntimeError`` carrying the last exit code + stderr. """ import time last_stdout = "" last_stderr = "" last_exit = 0 for i in range(attempts): try: r = sandbox.exec(cmd, timeout=timeout) if r.exit_code == 0: return r last_stdout = r.stdout or "" last_stderr = r.stderr or "" last_exit = r.exit_code if last_stderr.strip(): break except Exception as exc: # noqa: BLE001 last_stderr = f"{type(exc).__name__}: {exc}" last_exit = -1 if i + 1 < attempts: time.sleep(backoff_s * (2**i)) raise RuntimeError( f"{label} failed after {attempts} attempts " f"(exit={last_exit}, stderr={last_stderr!r}, stdout_tail={last_stdout[-400:]!r})" ) def _opencode_already_installed(self, sandbox: SandboxHandle) -> bool: """Cheap probe — returns True if opencode is on disk in the sandbox. Used to skip the slow ``curl install`` step when running against a prebaked template that already ships opencode. """ try: r = sandbox.exec( "/home/user/.opencode/bin/opencode --version", timeout=10, ) return r.exit_code == 0 except Exception: return False def _bootstrap_sandbox( self, sandbox: SandboxHandle, task: OpenCodeTask, ) -> None: """Install opencode, write config + task files, run optional setup.""" # Stage 1: wait for the sandbox to be responsive. self._wait_for_sandbox_ready(sandbox) # Stage 2: install opencode (skipped if a prebaked template already # has it). curl|bash is flaky — retry with backoff. if not self._opencode_already_installed(sandbox): self._exec_with_retry( sandbox, build_install_cmd(self._config), timeout=self._install_timeout_s, attempts=3, backoff_s=3.0, label="opencode install", ) sandbox.write_text( opencode_config_path(self._config), build_opencode_json(self._config), ) sandbox.write_text(instruction_path(self._config), task.instruction) if self._config.system_prompt: sandbox.write_text( system_prompt_path(self._config), self._config.system_prompt, ) for remote_path, content in task.upload_files.items(): sandbox.write_text(remote_path, content) if self._config.extra_setup_shell: self._exec_with_retry( sandbox, self._config.extra_setup_shell, timeout=self._setup_timeout_s, attempts=2, backoff_s=2.0, label="extra_setup_shell", ) if task.setup_shell: r = sandbox.exec(task.setup_shell, timeout=self._setup_timeout_s) if r.exit_code != 0: raise RuntimeError( f"task.setup_shell failed ({r.exit_code}): {r.stderr}" ) def _start_proxy( self, sandbox: SandboxHandle, ) -> tuple[BgJob, str, str]: """Install proxy deps + start the proxy as a bg job inside the sandbox. Returns ``(proxy_bg_job, base_url_override, proxy_trace_path)``. Skips the pip install + source-upload steps when the prebaked template already has them in place. """ proxy_already_present = sandbox.exists( "/home/user/proxy/interception.py" ) if not proxy_already_present: # Install proxy deps (idempotent on retries). self._exec_with_retry( sandbox, "pip install --quiet 'fastapi>=0.104' 'uvicorn[standard]>=0.24' " "'httpx>=0.27' 2>&1 | tail -20", timeout=180, attempts=3, backoff_s=2.0, label="proxy deps install", ) # Upload the proxy module into the sandbox. sandbox.write_text( "/home/user/proxy/interception.py", _PROXY_SOURCE_PATH.read_text(), ) sandbox.write_text("/home/user/proxy/__init__.py", "") cap_flag = "" if self._config.proxy_max_tokens_cap is not None: cap_flag = f"--max-tokens-cap {self._config.proxy_max_tokens_cap} " thinking_flag = "" if self._config.proxy_disable_thinking: thinking_flag = "--disable-thinking " # Force the upstream model id on every forwarded request — opencode's # internal title-gen call sometimes strips the provider prefix. model_override_flag = "" if self._config.model: model_override_flag = f"--model-override '{self._config.model}' " proxy_cmd = ( "cd /home/user/proxy && " "python interception.py " f"--upstream-url {self._config.base_url} " f"--upstream-api-key {self._config.api_key} " f"--trace {_PROXY_TRACE_PATH} " f"--port {_PROXY_PORT} " f"--top-logprobs {self._config.proxy_top_logprobs} " f"{cap_flag}" f"{thinking_flag}" f"{model_override_flag}" f"> {_PROXY_LOG_PATH} 2>&1" ) proxy_job = sandbox.start_bg(proxy_cmd) # Wait for the proxy to start listening. Cold uvicorn boot inside # E2B can take anywhere from <1s to ~30s depending on cache state. import time attempts = 120 interval_s = 0.5 for _ in range(attempts): r = sandbox.exec( f"curl -sf http://127.0.0.1:{_PROXY_PORT}/healthz", timeout=5, ) if r.exit_code == 0: break time.sleep(interval_s) else: log = "" try: log = sandbox.read_text(_PROXY_LOG_PATH) except Exception: pass proxy_job.kill() raise RuntimeError( f"proxy did not start within {attempts * interval_s:.0f}s. " f"log:\n{log[-2000:]}" ) base_url_override = f"http://127.0.0.1:{_PROXY_PORT}/v1" return proxy_job, base_url_override, _PROXY_TRACE_PATH __all__ = [ "OpenCodeSession", "OpenCodeSessionFactory", "OpenCodeTask", "Verifier", ]