# Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. """E2B-backed single-tool coding environment inspired by Terminus.""" from __future__ import annotations import os from typing import Any, Iterable, Optional from uuid import uuid4 from fastmcp import FastMCP from openenv.core.env_server.mcp_environment import MCPEnvironment from openenv.core.env_server.types import Action, Observation try: from .e2b_sandbox import E2BSandbox from ..models import CommandResult, TerminusState except ImportError: # pragma: no cover from models import CommandResult, TerminusState from server.e2b_sandbox import E2BSandbox REWARD_FILE = "/home/user/logs/verifier/reward.txt" class TerminusEnvironment(MCPEnvironment): """Single-tool terminal environment with one E2B sandbox per episode.""" SUPPORTS_CONCURRENT_SESSIONS = True def __init__(self): self._sandbox: Optional[E2BSandbox] = None self._state = TerminusState(episode_id=str(uuid4()), step_count=0) mcp = FastMCP("terminus_env") @mcp.tool def terminal(command: str = "", final_answer: str = "") -> str: """Run a shell command or submit a final answer inside the sandbox. Args: command: Shell command to execute in the episode's E2B sandbox. final_answer: Optional answer string. When provided, stored as the final answer and any reset-time verify commands run. Returns: Command output, or final-answer verification summary. """ if not self._sandbox: return "Error: environment not reset. Call reset() first." if final_answer: self._state.submitted_answer = final_answer if not self._state.verify_commands: return f"Answer submitted: {final_answer}" summary = self._run_verify_commands() return ( f"Answer submitted: {final_answer}\n" f"Verification: {summary['passed']}/{summary['total']} passed; " f"reward={summary['reward']}" ) if not command.strip(): return "Error: command or final_answer is required." result = self._run_shell_command(command) self._state.commands.append(result) return result.output super().__init__(mcp) def reset( self, seed: Optional[int] = None, episode_id: Optional[str] = None, **kwargs: Any, ) -> Observation: """Create a fresh E2B sandbox and run optional setup commands.""" if self._sandbox: self._sandbox.kill() self._sandbox = None api_key = os.environ.get("E2B_API_KEY") self._state = TerminusState( episode_id=episode_id or str(uuid4()), step_count=0, ) if not api_key: return Observation( done=True, reward=None, metadata={ "status": "error", "error": ( "E2B_API_KEY is not set. Configure it before resetting " "terminus_env." ), }, ) try: self._sandbox = E2BSandbox(api_key=api_key) except Exception as exc: # noqa: BLE001 return Observation( done=True, reward=None, metadata={ "status": "error", "error": f"failed to create E2B sandbox: {type(exc).__name__}: {exc}", }, ) self._state.sandbox_id = self._sandbox.sandbox_id setup_commands = _coerce_commands( kwargs.get("setup", kwargs.get("setup_scripts", [])) ) verify_commands = _coerce_commands( kwargs.get("verify", kwargs.get("verify_scripts", [])) ) self._state.verify_commands = verify_commands self._sandbox.run_shell("mkdir -p /home/user/logs/verifier") if setup_commands: setup_results = self._run_shell_commands(setup_commands) self._state.setup_results = setup_results failed = [result for result in setup_results if not result.success] if failed: return Observation( done=True, reward=None, metadata={ "status": "error", "sandbox_id": self._state.sandbox_id, "message": "Setup command failed.", "setup_results": [ result.model_dump() for result in setup_results ], }, ) msg = "Terminus environment ready. Use terminal(command=...) to work." if setup_commands: msg += f" Setup commands run: {len(setup_commands)}." if verify_commands: msg += f" Verify commands registered: {len(verify_commands)}." return Observation( done=False, reward=None, metadata={ "status": "ready", "sandbox_id": self._state.sandbox_id, "message": msg, "setup_results": [ result.model_dump() for result in self._state.setup_results ], "verify_commands": verify_commands, }, ) def _step_impl( self, action: Action, timeout_s: Optional[float] = None, **_: Any, ) -> Observation: return Observation( done=False, reward=None, metadata={ "error": ( f"Unknown action type: {type(action).__name__}. " "Use ListToolsAction or CallToolAction for MCP interactions." ) }, ) def step( self, action: Action, timeout_s: Optional[float] = None, **kwargs: Any, ) -> Observation: self._state.step_count += 1 obs = super().step(action, timeout_s=timeout_s, **kwargs) if self._state.submitted_answer is not None and self._state.last_reward is not None: obs.done = True obs.reward = self._state.last_reward return obs async def step_async( self, action: Action, timeout_s: Optional[float] = None, **kwargs: Any, ) -> Observation: self._state.step_count += 1 obs = await super().step_async(action, timeout_s=timeout_s, **kwargs) if self._state.submitted_answer is not None and self._state.last_reward is not None: obs.done = True obs.reward = self._state.last_reward return obs @property def state(self) -> TerminusState: return self._state def close(self) -> None: if self._sandbox: self._sandbox.kill() self._sandbox = None def _run_shell_commands(self, commands: Iterable[str]) -> list[CommandResult]: return [self._run_shell_command(command) for command in commands] def _run_shell_command(self, command: str) -> CommandResult: result = self._sandbox.run_shell(command) output = _format_for_llm(result) return CommandResult( command=command, output=output, error=result.error, success=result.success, ) def _run_verify_commands(self) -> dict[str, Any]: if not self._sandbox: return {"passed": 0, "total": 0, "reward": None} self._sandbox.run_shell("mkdir -p /home/user/logs/verifier") verify_results = self._run_shell_commands(self._state.verify_commands) self._state.verify_results = verify_results passed = sum(1 for result in verify_results if result.success) total = len(verify_results) reward = _read_reward_override(self._sandbox) if reward is None and total: reward = passed / total self._state.last_reward = reward return {"passed": passed, "total": total, "reward": reward} def _coerce_commands(value: Any) -> list[str]: if value is None: return [] if isinstance(value, str): return [value] if value.strip() else [] return [str(item) for item in value if str(item).strip()] def _format_for_llm(result) -> str: parts = [] if result.stdout: parts.append(result.stdout.strip()) if result.stderr: parts.append(result.stderr.strip()) if result.error: parts.append(f"ERROR:\n{result.error}") return "\n".join(parts) if parts else "(no output)" def _read_reward_override(sandbox: E2BSandbox) -> Optional[float]: result = sandbox.run_shell(f"cat {REWARD_FILE} 2>/dev/null || true") raw = (result.stdout or "").strip() if not raw: return None try: return float(raw) except ValueError: return None