Spaces:
Sleeping
Sleeping
| # Copyright (c) Meta Platforms, Inc. and affiliates. | |
| # All rights reserved. | |
| # | |
| # This source code is licensed under the BSD-style license found in the | |
| # LICENSE file in the root directory of this source tree. | |
| """E2B-backed single-tool coding environment inspired by Terminus.""" | |
| from __future__ import annotations | |
| import os | |
| from typing import Any, Iterable, Optional | |
| from uuid import uuid4 | |
| from fastmcp import FastMCP | |
| from openenv.core.env_server.mcp_environment import MCPEnvironment | |
| from openenv.core.env_server.types import Action, Observation | |
| try: | |
| from .e2b_sandbox import E2BSandbox | |
| from ..models import CommandResult, TerminusState | |
| except ImportError: # pragma: no cover | |
| from models import CommandResult, TerminusState | |
| from server.e2b_sandbox import E2BSandbox | |
| REWARD_FILE = "/home/user/logs/verifier/reward.txt" | |
| class TerminusEnvironment(MCPEnvironment): | |
| """Single-tool terminal environment with one E2B sandbox per episode.""" | |
| SUPPORTS_CONCURRENT_SESSIONS = True | |
| def __init__(self): | |
| self._sandbox: Optional[E2BSandbox] = None | |
| self._state = TerminusState(episode_id=str(uuid4()), step_count=0) | |
| mcp = FastMCP("terminus_env") | |
| def terminal(command: str = "", final_answer: str = "") -> str: | |
| """Run a shell command or submit a final answer inside the sandbox. | |
| Args: | |
| command: Shell command to execute in the episode's E2B sandbox. | |
| final_answer: Optional answer string. When provided, stored | |
| as the final answer and any reset-time verify commands run. | |
| Returns: | |
| Command output, or final-answer verification summary. | |
| """ | |
| if not self._sandbox: | |
| return "Error: environment not reset. Call reset() first." | |
| if final_answer: | |
| self._state.submitted_answer = final_answer | |
| if not self._state.verify_commands: | |
| return f"Answer submitted: {final_answer}" | |
| summary = self._run_verify_commands() | |
| return ( | |
| f"Answer submitted: {final_answer}\n" | |
| f"Verification: {summary['passed']}/{summary['total']} passed; " | |
| f"reward={summary['reward']}" | |
| ) | |
| if not command.strip(): | |
| return "Error: command or final_answer is required." | |
| result = self._run_shell_command(command) | |
| self._state.commands.append(result) | |
| return result.output | |
| super().__init__(mcp) | |
| def reset( | |
| self, | |
| seed: Optional[int] = None, | |
| episode_id: Optional[str] = None, | |
| **kwargs: Any, | |
| ) -> Observation: | |
| """Create a fresh E2B sandbox and run optional setup commands.""" | |
| if self._sandbox: | |
| self._sandbox.kill() | |
| self._sandbox = None | |
| api_key = os.environ.get("E2B_API_KEY") | |
| self._state = TerminusState( | |
| episode_id=episode_id or str(uuid4()), | |
| step_count=0, | |
| ) | |
| if not api_key: | |
| return Observation( | |
| done=True, | |
| reward=None, | |
| metadata={ | |
| "status": "error", | |
| "error": ( | |
| "E2B_API_KEY is not set. Configure it before resetting " | |
| "terminus_env." | |
| ), | |
| }, | |
| ) | |
| try: | |
| self._sandbox = E2BSandbox(api_key=api_key) | |
| except Exception as exc: # noqa: BLE001 | |
| return Observation( | |
| done=True, | |
| reward=None, | |
| metadata={ | |
| "status": "error", | |
| "error": f"failed to create E2B sandbox: {type(exc).__name__}: {exc}", | |
| }, | |
| ) | |
| self._state.sandbox_id = self._sandbox.sandbox_id | |
| setup_commands = _coerce_commands( | |
| kwargs.get("setup", kwargs.get("setup_scripts", [])) | |
| ) | |
| verify_commands = _coerce_commands( | |
| kwargs.get("verify", kwargs.get("verify_scripts", [])) | |
| ) | |
| self._state.verify_commands = verify_commands | |
| self._sandbox.run_shell("mkdir -p /home/user/logs/verifier") | |
| if setup_commands: | |
| setup_results = self._run_shell_commands(setup_commands) | |
| self._state.setup_results = setup_results | |
| failed = [result for result in setup_results if not result.success] | |
| if failed: | |
| return Observation( | |
| done=True, | |
| reward=None, | |
| metadata={ | |
| "status": "error", | |
| "sandbox_id": self._state.sandbox_id, | |
| "message": "Setup command failed.", | |
| "setup_results": [ | |
| result.model_dump() for result in setup_results | |
| ], | |
| }, | |
| ) | |
| msg = "Terminus environment ready. Use terminal(command=...) to work." | |
| if setup_commands: | |
| msg += f" Setup commands run: {len(setup_commands)}." | |
| if verify_commands: | |
| msg += f" Verify commands registered: {len(verify_commands)}." | |
| return Observation( | |
| done=False, | |
| reward=None, | |
| metadata={ | |
| "status": "ready", | |
| "sandbox_id": self._state.sandbox_id, | |
| "message": msg, | |
| "setup_results": [ | |
| result.model_dump() for result in self._state.setup_results | |
| ], | |
| "verify_commands": verify_commands, | |
| }, | |
| ) | |
| def _step_impl( | |
| self, | |
| action: Action, | |
| timeout_s: Optional[float] = None, | |
| **_: Any, | |
| ) -> Observation: | |
| return Observation( | |
| done=False, | |
| reward=None, | |
| metadata={ | |
| "error": ( | |
| f"Unknown action type: {type(action).__name__}. " | |
| "Use ListToolsAction or CallToolAction for MCP interactions." | |
| ) | |
| }, | |
| ) | |
| def step( | |
| self, | |
| action: Action, | |
| timeout_s: Optional[float] = None, | |
| **kwargs: Any, | |
| ) -> Observation: | |
| self._state.step_count += 1 | |
| obs = super().step(action, timeout_s=timeout_s, **kwargs) | |
| if self._state.submitted_answer is not None and self._state.last_reward is not None: | |
| obs.done = True | |
| obs.reward = self._state.last_reward | |
| return obs | |
| async def step_async( | |
| self, | |
| action: Action, | |
| timeout_s: Optional[float] = None, | |
| **kwargs: Any, | |
| ) -> Observation: | |
| self._state.step_count += 1 | |
| obs = await super().step_async(action, timeout_s=timeout_s, **kwargs) | |
| if self._state.submitted_answer is not None and self._state.last_reward is not None: | |
| obs.done = True | |
| obs.reward = self._state.last_reward | |
| return obs | |
| def state(self) -> TerminusState: | |
| return self._state | |
| def close(self) -> None: | |
| if self._sandbox: | |
| self._sandbox.kill() | |
| self._sandbox = None | |
| def _run_shell_commands(self, commands: Iterable[str]) -> list[CommandResult]: | |
| return [self._run_shell_command(command) for command in commands] | |
| def _run_shell_command(self, command: str) -> CommandResult: | |
| result = self._sandbox.run_shell(command) | |
| output = _format_for_llm(result) | |
| return CommandResult( | |
| command=command, | |
| output=output, | |
| error=result.error, | |
| success=result.success, | |
| ) | |
| def _run_verify_commands(self) -> dict[str, Any]: | |
| if not self._sandbox: | |
| return {"passed": 0, "total": 0, "reward": None} | |
| self._sandbox.run_shell("mkdir -p /home/user/logs/verifier") | |
| verify_results = self._run_shell_commands(self._state.verify_commands) | |
| self._state.verify_results = verify_results | |
| passed = sum(1 for result in verify_results if result.success) | |
| total = len(verify_results) | |
| reward = _read_reward_override(self._sandbox) | |
| if reward is None and total: | |
| reward = passed / total | |
| self._state.last_reward = reward | |
| return {"passed": passed, "total": total, "reward": reward} | |
| def _coerce_commands(value: Any) -> list[str]: | |
| if value is None: | |
| return [] | |
| if isinstance(value, str): | |
| return [value] if value.strip() else [] | |
| return [str(item) for item in value if str(item).strip()] | |
| def _format_for_llm(result) -> str: | |
| parts = [] | |
| if result.stdout: | |
| parts.append(result.stdout.strip()) | |
| if result.stderr: | |
| parts.append(result.stderr.strip()) | |
| if result.error: | |
| parts.append(f"ERROR:\n{result.error}") | |
| return "\n".join(parts) if parts else "(no output)" | |
| def _read_reward_override(sandbox: E2BSandbox) -> Optional[float]: | |
| result = sandbox.run_shell(f"cat {REWARD_FILE} 2>/dev/null || true") | |
| raw = (result.stdout or "").strip() | |
| if not raw: | |
| return None | |
| try: | |
| return float(raw) | |
| except ValueError: | |
| return None | |