tbench2 / server /tbench2_env_environment.py
sergiopaniego's picture
Upload folder using huggingface_hub
5d897b1 verified
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
"""TB2 environment server implementation (Spaces-compatible local mode)."""
from __future__ import annotations
import logging
import os
import sys
import urllib.request
import zipfile
from pathlib import Path
from typing import Any
from uuid import uuid4
if sys.version_info >= (3, 11):
import tomllib
else:
import tomli as tomllib
from openenv.core.env_server.interfaces import Environment
# Support both in-repo and standalone imports
try:
# In-repo imports (when running from OpenEnv repository)
from tbench2_env.models import Tbench2Action, Tbench2Observation, Tbench2State
except ImportError:
# Standalone imports (when environment is standalone with openenv from pip)
from models import Tbench2Action, Tbench2Observation, Tbench2State
_CAMEL_IMPORT_ERROR: Exception | None = None
def _require_terminal_toolkit() -> Any:
global _CAMEL_IMPORT_ERROR
if _CAMEL_IMPORT_ERROR is not None:
raise RuntimeError(
"camel-ai (TerminalToolkit) is required for TB2. Install from PyPI or from the CAMEL repo."
) from _CAMEL_IMPORT_ERROR
try:
from camel.toolkits import TerminalToolkit
except Exception as exc: # pragma: no cover
_CAMEL_IMPORT_ERROR = exc
raise RuntimeError(
"camel-ai (TerminalToolkit) is required for TB2. Install from PyPI or from the CAMEL repo."
) from exc
return TerminalToolkit
def _download_tb2_repo(cache_dir: Path) -> Path:
repo_url = os.getenv(
"TB2_REPO_URL",
"https://github.com/laude-institute/terminal-bench-2/archive/refs/heads/main.zip",
)
cache_dir.mkdir(parents=True, exist_ok=True)
archive_path = cache_dir / "terminal-bench-2.zip"
if not archive_path.exists():
urllib.request.urlretrieve(repo_url, archive_path)
with zipfile.ZipFile(archive_path) as zf:
root = zf.namelist()[0].split("/")[0]
extract_dir = cache_dir / root
if not extract_dir.exists():
zf.extractall(cache_dir)
return extract_dir
def _read_instruction(task_dir: Path) -> str:
instruction_path = task_dir / "instruction.md"
if instruction_path.exists():
return instruction_path.read_text(encoding="utf-8")
return ""
def _read_timeout(task_dir: Path, fallback: float) -> float:
task_toml = task_dir / "task.toml"
if not task_toml.exists():
return fallback
try:
data = tomllib.loads(task_toml.read_text(encoding="utf-8"))
except Exception:
return fallback
verifier = data.get("verifier", {})
return float(verifier.get("timeout_sec", fallback))
class Tbench2Environment(Environment[Tbench2Action, Tbench2Observation, Tbench2State]):
"""OpenEnv wrapper around Terminal-Bench 2 tasks (local execution)."""
SUPPORTS_CONCURRENT_SESSIONS: bool = True
def __init__(
self,
tasks_dir: str | None = None,
output_dir: str | None = None,
command_timeout_s: float = 20.0,
safe_mode: bool = False,
) -> None:
super().__init__()
self.tasks_dir = tasks_dir or os.getenv("TB2_TASKS_DIR", "")
self.output_dir = Path(output_dir or os.getenv("TB2_OUTPUT_DIR", "/tmp/tbench2_env_runs"))
self.command_timeout_s = command_timeout_s
self.safe_mode = safe_mode
self._state = Tbench2State()
self._task_dir: Path | None = None
self._terminal_toolkit = None
self._instruction = ""
def reset(
self,
seed: int | None = None,
episode_id: str | None = None,
**kwargs: Any,
) -> Tbench2Observation:
del seed
TerminalToolkit = _require_terminal_toolkit()
task_id = kwargs.get("task_id") or kwargs.get("task_name")
task_path = kwargs.get("task_path") or kwargs.get("path")
task_dir = self._resolve_task_path(task_id, task_path)
resolved_task_id = task_id or task_dir.name
self._instruction = _read_instruction(task_dir)
self._task_dir = task_dir
trial_name = f"{resolved_task_id}.{episode_id or uuid4().hex}"
session_logs_dir = self.output_dir / trial_name / "terminal_toolkit_session_logs"
session_logs_dir.mkdir(parents=True, exist_ok=True)
self._terminal_toolkit = TerminalToolkit(
timeout=self.command_timeout_s,
working_directory=str(task_dir),
use_docker_backend=False,
session_logs_dir=session_logs_dir,
safe_mode=self.safe_mode,
)
self._state = Tbench2State(
episode_id=episode_id or str(uuid4()),
step_count=0,
task_id=resolved_task_id,
task_path=str(task_dir),
terminal_ready=True,
)
return Tbench2Observation(
instruction=self._instruction,
output="",
success=True,
error="",
task_id=resolved_task_id,
task_path=str(task_dir),
session_id=None,
action_type="reset",
info={},
reward=0.0,
done=False,
)
def step(
self,
action: Tbench2Action,
timeout_s: float | None = None,
**kwargs: Any,
) -> Tbench2Observation:
del timeout_s, kwargs
if not isinstance(action, Tbench2Action):
raise TypeError(f"Expected Tbench2Action, got {type(action)}")
if self._terminal_toolkit is None or self._task_dir is None:
raise RuntimeError("TB2 environment not initialized. Call reset() first.")
self._state.step_count += 1
self._state.last_action_type = action.action_type
self._state.last_command = action.command
output = ""
error = ""
success = True
reward = None
done = False
info: dict[str, Any] = {}
session_id = action.session_id or "tb2-session"
try:
if action.action_type == "exec":
output = self._terminal_toolkit.shell_exec(
command=action.command,
block=action.block,
id=session_id,
)
elif action.action_type == "write":
self._ensure_session_id(action.session_id, action.action_type)
output = self._terminal_toolkit.shell_write_to_process(
id=action.session_id,
command=action.command,
)
elif action.action_type == "view":
self._ensure_session_id(action.session_id, action.action_type)
output = self._terminal_toolkit.shell_view(id=action.session_id)
elif action.action_type == "wait":
self._ensure_session_id(action.session_id, action.action_type)
wait_seconds = action.wait_seconds or 0.0
output = self._terminal_toolkit.shell_wait(
id=action.session_id,
wait_seconds=wait_seconds,
)
elif action.action_type == "kill":
self._ensure_session_id(action.session_id, action.action_type)
self._terminal_toolkit.shell_kill_process(id=action.session_id)
output = f"Killed session {action.session_id}"
elif action.action_type == "write_file":
self._terminal_toolkit.shell_write_content_to_file(
content=action.content,
file_path=action.file_path,
)
output = f"Wrote content to {action.file_path}"
elif action.action_type == "evaluate":
output, reward, info = self._evaluate_task()
done = True
elif action.action_type == "close":
self.close()
output = "Closed TB2 environment."
done = True
else:
raise ValueError(f"Unsupported action_type: {action.action_type}")
except Exception as exc: # pragma: no cover
success = False
error = str(exc)
self._state.last_output = output
self._state.session_id = session_id or ""
return Tbench2Observation(
instruction=self._instruction,
output=output,
success=success,
error=error,
task_id=self._state.task_id,
task_path=self._state.task_path,
session_id=session_id or "",
action_type=action.action_type,
info=info,
reward=reward,
done=done,
)
@property
def state(self) -> Tbench2State:
return self._state
def close(self) -> None:
self._terminal_toolkit = None
self._task_dir = None
self._instruction = ""
def _resolve_task_path(self, task_id: str | None, task_path: str | None) -> Path:
if task_path:
resolved = Path(task_path).expanduser().resolve()
if not resolved.exists():
raise FileNotFoundError(f"Task path not found: {resolved}")
return resolved
if not task_id:
raise ValueError("Provide task_id or task_path to reset TB2 environment.")
if not self.tasks_dir:
cache_dir = Path(os.getenv("TB2_CACHE_DIR", str(self.output_dir / "repo_cache")))
repo_dir = _download_tb2_repo(cache_dir)
resolved = repo_dir / task_id
else:
resolved = Path(self.tasks_dir).expanduser().resolve() / task_id
if not resolved.exists():
raise FileNotFoundError(f"Task path not found: {resolved}")
return resolved
def _ensure_session_id(self, session_id: str | None, action_type: str) -> None:
if not session_id:
raise ValueError(f"session_id is required for action_type='{action_type}'")
def _evaluate_task(self) -> tuple[str, float, dict[str, Any]]:
if self._task_dir is None:
raise RuntimeError("TB2 environment not initialized. Call reset() first.")
if self._terminal_toolkit is None:
raise RuntimeError("Terminal toolkit not initialized.")
_read_timeout(self._task_dir, fallback=900.0) # Validate timeout config
tests_dir = self._task_dir / "tests"
cmd = f"cd {self._task_dir} && python -m pytest -q {tests_dir} -rA; echo __TB2_EXIT_CODE__:$?"
output = self._terminal_toolkit.shell_exec(
id="tb2-tests",
command=cmd,
block=True,
)
exit_code = 1
marker = "__TB2_EXIT_CODE__"
for line in output.splitlines()[::-1]:
if marker in line:
try:
exit_code = int(line.split(":", 1)[1].strip())
except Exception:
exit_code = 1
break
reward = 1.0 if exit_code == 0 else 0.0
info = {"tests_passed": exit_code == 0, "exit_code": exit_code}
return output, reward, info
class Tbench2DockerEnvironment(Environment[Tbench2Action, Tbench2Observation, Tbench2State]):
"""OpenEnv wrapper around Terminal-Bench 2 tasks with Docker isolation.
This environment runs each task in its own Docker container, reading
the image specification from task.toml's [environment] section.
Requires:
- Docker socket mounted (/var/run/docker.sock)
- Sufficient disk space for container images
"""
SUPPORTS_CONCURRENT_SESSIONS: bool = True
def __init__(
self,
tasks_dir: str | None = None,
output_dir: str | None = None,
command_timeout_s: float = 300.0,
safe_mode: bool = True,
) -> None:
super().__init__()
self.tasks_dir = tasks_dir or os.getenv("TB2_TASKS_DIR", "")
self.output_dir = Path(output_dir or os.getenv("TB2_OUTPUT_DIR", "/tmp/tbench2_env_runs"))
self.command_timeout_s = command_timeout_s
self.safe_mode = safe_mode
self._state = Tbench2State()
self._task_dir: Path | None = None
self._docker_client = None
self._container = None
self._instruction = ""
self._task_image = ""
self._task_config: dict[str, Any] = {}
def _get_docker_client(self) -> Any:
"""Lazy initialization of Docker client."""
if self._docker_client is None:
try:
import docker
self._docker_client = docker.from_env()
except Exception as exc:
raise RuntimeError(
f"Docker client not available. Ensure Docker socket is mounted. Error: {exc}"
) from exc
return self._docker_client
def reset(
self,
seed: int | None = None,
episode_id: str | None = None,
**kwargs: Any,
) -> Tbench2Observation:
del seed
task_id = kwargs.get("task_id") or kwargs.get("task_name")
task_path = kwargs.get("task_path") or kwargs.get("path")
task_dir = self._resolve_task_path(task_id, task_path)
resolved_task_id = task_id or task_dir.name
# Read task configuration including Docker image
task_toml_path = task_dir / "task.toml"
if task_toml_path.exists():
self._task_config = tomllib.loads(task_toml_path.read_text(encoding="utf-8"))
self._task_image = self._task_config.get("environment", {}).get("docker_image", "")
else:
self._task_image = ""
self._task_config = {}
self._instruction = _read_instruction(task_dir)
self._task_dir = task_dir
# Create trial directory for logs
trial_name = f"{resolved_task_id}.{episode_id or uuid4().hex}"
trial_dir = self.output_dir / trial_name
trial_dir.mkdir(parents=True, exist_ok=True)
# Start Docker container if image is specified
if self._task_image:
self._start_container(task_dir, trial_dir)
else:
# Fallback to local mode if no image specified
self._state = Tbench2State(
episode_id=episode_id or str(uuid4()),
step_count=0,
task_id=resolved_task_id,
task_path=str(task_dir),
terminal_ready=not self._task_image, # Ready if no container needed
)
return Tbench2Observation(
instruction=self._instruction,
output="",
success=True,
error="",
task_id=resolved_task_id,
task_path=str(task_dir),
session_id=None,
action_type="reset",
info={"docker_image": self._task_image} if self._task_image else {},
reward=0.0,
done=False,
)
def _start_container(self, task_dir: Path, trial_dir: Path) -> None:
"""Start a Docker container for the task.
Uses file copying instead of bind mounts to support Docker-in-Docker
scenarios where the server runs inside a container. Bind mounts reference
host paths, which don't exist when the server is containerized.
"""
docker = self._get_docker_client()
try:
# Pull image if needed
try:
docker.images.get(self._task_image)
except Exception:
logging.info(f"Pulling image {self._task_image}...")
docker.images.pull(self._task_image)
# Start container WITHOUT bind mounts (for DinD compatibility)
self._container = docker.containers.run(
image=self._task_image,
command="sleep infinity",
detach=True,
network_mode="host",
working_dir="/task",
remove=False,
)
# Copy task files into container using tar archive
# This works in Docker-in-Docker because we read files from our
# filesystem and stream them to the container via the Docker API
self._copy_dir_to_container(task_dir, "/task")
self._state = Tbench2State(
episode_id=str(uuid4()),
step_count=0,
task_id=task_dir.name,
task_path=str(task_dir),
terminal_ready=True,
)
except Exception as exc:
raise RuntimeError(f"Failed to start container: {exc}") from exc
def _copy_dir_to_container(self, src_dir: Path, dest_path: str) -> None:
"""Copy a directory into the container using tar archive.
This method streams files via the Docker API, avoiding bind mount
issues in Docker-in-Docker scenarios.
"""
import io
import tarfile
if self._container is None:
raise RuntimeError("Container not started")
# Create tar archive in memory
tar_stream = io.BytesIO()
with tarfile.open(fileobj=tar_stream, mode="w") as tar:
for item in src_dir.rglob("*"):
arcname = str(item.relative_to(src_dir))
tar.add(str(item), arcname=arcname)
tar_stream.seek(0)
# Copy to container
self._container.put_archive(dest_path, tar_stream.getvalue())
def _exec_in_container(self, command: str, workdir: str = "/task") -> tuple[int, str]:
"""Execute a command inside the container."""
if self._container is None:
raise RuntimeError("Container not started. Call reset() first.")
exit_code, output = self._container.exec_run(
cmd=f"bash -c 'cd {workdir} && {command}'",
workdir="/task",
stdout=True,
stderr=True,
)
return exit_code, output.decode("utf-8", errors="replace")
def step(
self,
action: Tbench2Action,
timeout_s: float | None = None,
**kwargs: Any,
) -> Tbench2Observation:
del timeout_s, kwargs
if not isinstance(action, Tbench2Action):
raise TypeError(f"Expected Tbench2Action, got {type(action)}")
if self._task_dir is None:
raise RuntimeError("TB2 environment not initialized. Call reset() first.")
self._state.step_count += 1
self._state.last_action_type = action.action_type
self._state.last_command = action.command
output = ""
error = ""
success = True
reward = None
done = False
info: dict[str, Any] = {}
session_id = action.session_id or "tb2-session"
try:
if action.action_type == "exec":
if self._container:
exit_code, output = self._exec_in_container(action.command)
success = exit_code == 0
else:
# Fallback to local execution
import subprocess
result = subprocess.run(
action.command,
shell=True,
capture_output=True,
text=True,
timeout=self.command_timeout_s,
)
output = result.stdout + result.stderr
success = result.returncode == 0
elif action.action_type == "write_file":
if self._container:
# Write to container
exit_code, _ = self._exec_in_container(f"cat > {action.file_path} << 'EOF'\n{action.content}\nEOF")
success = exit_code == 0
output = f"Wrote to {action.file_path}"
else:
# Local write
Path(action.file_path).write_text(action.content)
output = f"Wrote to {action.file_path}"
elif action.action_type == "evaluate":
if self._container:
output, reward, info = self._evaluate_docker()
else:
output, reward, info = self._evaluate_local()
done = True
elif action.action_type == "close":
self.close()
output = "Closed TB2 environment."
done = True
else:
raise ValueError(f"Unsupported action_type in Docker mode: {action.action_type}")
except Exception as exc:
success = False
error = str(exc)
self._state.last_output = output
self._state.session_id = session_id or ""
return Tbench2Observation(
instruction=self._instruction,
output=output,
success=success,
error=error,
task_id=self._state.task_id,
task_path=self._state.task_path,
session_id=session_id or "",
action_type=action.action_type,
info=info,
reward=reward,
done=done,
)
def _evaluate_docker(self) -> tuple[str, float, dict[str, Any]]:
"""Evaluate task inside Docker container."""
if self._container is None:
raise RuntimeError("Container not started.")
assert self._task_dir is not None, "Task directory not set"
# Run pytest in the container's /task directory
# Use exit code marker for consistency with local mode
cmd = "cd /task && python -m pytest -q tests/ -rA; echo __TB2_EXIT_CODE__:$?"
exit_code, output = self._container.exec_run(
cmd=f"bash -c '{cmd}'",
workdir="/task",
stdout=True,
stderr=True,
)
output_str = output.decode("utf-8", errors="replace")
# Parse exit code from marker (same logic as local mode)
ec = 1
marker = "__TB2_EXIT_CODE__"
for line in output_str.splitlines()[::-1]:
if marker in line:
try:
ec = int(line.split(":", 1)[1].strip())
except Exception:
ec = 1
break
reward = 1.0 if ec == 0 else 0.0
info = {"tests_passed": ec == 0, "exit_code": ec}
return output_str, reward, info
def _evaluate_local(self) -> tuple[str, float, dict[str, Any]]:
"""Evaluate task locally (fallback)."""
if self._task_dir is None:
raise RuntimeError("Task not initialized.")
tests_dir = self._task_dir / "tests"
cmd = f"cd {self._task_dir} && python -m pytest -q {tests_dir} -rA; echo __TB2_EXIT_CODE__:$?"
import subprocess
result = subprocess.run(
cmd,
shell=True,
capture_output=True,
text=True,
timeout=900.0,
)
output = result.stdout + result.stderr
exit_code = result.returncode
reward = 1.0 if exit_code == 0 else 0.0
info = {"tests_passed": exit_code == 0, "exit_code": exit_code}
return output, reward, info
@property
def state(self) -> Tbench2State:
return self._state
def close(self) -> None:
if self._container:
try:
self._container.stop(timeout=10)
self._container.remove(force=True)
except Exception:
pass
self._container = None
self._task_dir = None
self._instruction = ""
def _resolve_task_path(self, task_id: str | None, task_path: str | None) -> Path:
if task_path:
resolved = Path(task_path).expanduser().resolve()
if not resolved.exists():
raise FileNotFoundError(f"Task path not found: {resolved}")
return resolved
if not task_id:
raise ValueError("Provide task_id or task_path to reset TB2 environment.")
if not self.tasks_dir:
cache_dir = Path(os.getenv("TB2_CACHE_DIR", str(self.output_dir / "repo_cache")))
repo_dir = _download_tb2_repo(cache_dir)
resolved = repo_dir / task_id
else:
resolved = Path(self.tasks_dir).expanduser().resolve() / task_id
if not resolved.exists():
raise FileNotFoundError(f"Task path not found: {resolved}")
return resolved