seta-openreward / seta.py
AdithyaSK's picture
AdithyaSK HF Staff
Deploy upstream SETA on HF Spaces with OpenReward backend
7c28b65
from __future__ import annotations
import json
import os
from pathlib import Path
from typing import Any, List, Dict
from openreward.environments import JSONObject, TextBlock, ToolOutput, tool
from openreward import SandboxSettings, SandboxBucketConfig, AsyncOpenReward
from pydantic import BaseModel
from cli_environment import CLIEnvironment
from constants import ENV_PATH
from utils import upload_text
def load_tasks() -> dict[int, dict]:
"""
Load all SETA tasks from pre-built task_index.json.
Run build_task_index.py to regenerate the index if tasks change.
Returns:
Dict mapping task_id to task dict with structure:
{
"task_id": int,
"instruction": str,
"difficulty": str,
"category": str,
"tags": list[str],
"weights": dict[str, float], # test_name -> weight
}
"""
index_path = ENV_PATH / "task_index.json"
with open(index_path, "r") as f:
raw = json.load(f)
return {int(k): v for k, v in raw.items()}
# Load tasks at module import time
TASKS = load_tasks()
def dockerfile_to_bash(dockerfile_content: str, task_id: int) -> str:
"""
Convert Dockerfile to bash script by stripping FROM and transforming instructions.
Args:
dockerfile_content: Raw Dockerfile text
task_id: Task ID for COPY path adjustments
Returns:
Bash script ready to execute
"""
lines = dockerfile_content.split('\n')
bash_lines = []
# Skip until after FROM line
from_found = False
for line in lines:
if not from_found:
if line.strip().startswith('FROM '):
from_found = True
continue
# Transform Dockerfile instructions to bash
stripped = line.strip()
# Keep comments and empty lines as-is
if not stripped or stripped.startswith('#'):
bash_lines.append(line)
continue
# Transform instructions (simple string replacements)
if stripped.startswith('RUN '):
# Strip RUN prefix - the rest is already bash
bash_lines.append(stripped[4:])
elif stripped.startswith('WORKDIR '):
# Convert to mkdir + cd
path = stripped[8:].strip()
bash_lines.append(f'mkdir -p {path} && cd {path}')
elif stripped.startswith('COPY '):
# Adjust COPY source paths to point to /orwd_data
copy_args = stripped[5:].strip().split()
if len(copy_args) >= 2:
src = copy_args[0]
dst = copy_args[-1] # Last argument is destination
bash_lines.append(f'cp -r /orwd_data/{src} {dst}')
elif stripped.startswith('ENV '):
# Convert to export
env_def = stripped[4:].strip()
bash_lines.append(f'export {env_def}')
else:
# Keep line as-is (handles continuations automatically)
bash_lines.append(line)
return '\n'.join(bash_lines)
class EmptyInput(BaseModel):
"""Empty params for submit_solution tool."""
pass
class SETAEnv(CLIEnvironment):
"""
SETA (Scaling Environments for Terminal Agents) environment.
Terminal-based coding and system administration tasks with automated
pytest validation. Agents use CLI tools (bash, read, write, etc.) to
complete tasks, then submit for scoring.
"""
@classmethod
def list_splits(cls) -> list[str]:
"""Return available splits. All tasks in 'train' split."""
return ["train"]
@classmethod
def list_tasks(cls, split: str) -> list[JSONObject]:
"""
Return task specifications for requested split.
Args:
split: Only "train" is supported
Returns:
List of task specs with metadata
"""
if split != "train":
return []
return [
{
"task_id": task["task_id"],
"difficulty": task["difficulty"],
"category": task["category"],
"tags": task["tags"],
}
for task in TASKS.values()
]
def __init__(self, task_spec: JSONObject, secrets: dict[str, str] = {}) -> None:
"""
Initialize SETA environment for a specific task.
Args:
task_spec: Task specification with task_id
secrets: May contain 'api_key' (per-session); otherwise the server
falls back to ``OPENREWARD_API_KEY`` from the process env.
The current openreward server build forbids body secrets,
so on HF Spaces / self-hosted deployments the env var is
the only path that works.
"""
super().__init__(task_spec, secrets=secrets)
self.task_id = int(task_spec["task_id"])
if self.task_id not in TASKS:
raise ValueError(f"Task ID {self.task_id} not found in loaded tasks")
self.task_data = TASKS[self.task_id]
# Resolve OpenReward API key: per-session secrets first, then env var.
api_key = secrets.get("api_key") or os.environ.get("OPENREWARD_API_KEY")
if not api_key:
raise ValueError(
"OpenReward API key required (set OPENREWARD_API_KEY env var "
"or pass via per-session secrets['api_key'])"
)
# Setup sandbox with base image
# User will build task-specific images later, for now use base image
self.sandbox_settings = SandboxSettings(
environment="Eigent/SETA",
image="generalreasoning/seta-base@sha256:369515ae30815448a3b2e0189c5ef3df40786edc2a611f6bb1d3bc6b5636c363",
machine_size="0.5:1",
block_network=False,
bucket_config=SandboxBucketConfig(
mount_path="/orwd_data",
read_only=True,
only_dir=f"Dataset/{self.task_id}"
)
)
or_client = AsyncOpenReward(api_key=api_key)
self.sandbox = or_client.sandbox(self.sandbox_settings)
self.todos: List[Dict[str, Any]] = []
async def setup(self) -> None:
"""
Start sandbox and execute task-specific Dockerfile setup.
Converts the Dockerfile to a bash script and executes it once.
"""
await self.sandbox.start()
try:
# Download Dockerfile (task directory mounted at /orwd_data via only_dir)
dockerfile_path = "/orwd_data/Dockerfile"
#print(f"[SETUP] Reading Dockerfile: {dockerfile_path}")
dockerfile_bytes = await self.sandbox.download(dockerfile_path)
dockerfile_text = dockerfile_bytes.decode('utf-8')
# Convert to bash script
bash_script = dockerfile_to_bash(dockerfile_text, self.task_id)
#print(f"[SETUP] Generated bash script ({len(bash_script)} bytes)")
# Upload script to sandbox
await upload_text(self.sandbox, "/tmp/setup.sh", bash_script)
# Execute the script
#print(f"[SETUP] Executing setup script...")
output, exit_code = await self.sandbox.run("bash /tmp/setup.sh")
# Print output
#print(f"[SETUP OUTPUT]\n{output}")
if exit_code != 0:
print(f"[SETUP WARNING] Script exited with code {exit_code}")
else:
print(f"[SETUP SUCCESS] Task {self.task_id} setup completed")
# Cleanup: Delete metadata files that shouldn't be visible to agent
cleanup_files = [
"docker-compose.yaml",
"Dockerfile",
"draft_spec.md",
"solution.sh",
"task.yaml"
]
#print(f"[SETUP] Cleaning up metadata files...")
for filename in cleanup_files:
file_path = f"/orwd_data/{filename}"
cleanup_output, cleanup_code = await self.sandbox.run(f"rm -f {file_path}")
if cleanup_code == 0:
print(f"[SETUP] Deleted {filename}")
except Exception as e:
print(f"[SETUP ERROR] Failed to setup task {self.task_id}: {e}")
# Don't raise - allow task to continue
async def get_prompt(self) -> List[TextBlock]:
"""
Generate task prompt for agent.
Returns:
Task instruction with context and guidance.
"""
instruction = self.task_data["instruction"]
return [TextBlock(text=instruction + "\n\n" + "When finished, call `submit_solution` to run the test suite and get your score.")]
@tool
async def submit_solution(self, params: EmptyInput) -> ToolOutput:
"""
Submit solution and run test suite.
Executes pytest tests in sandbox, calculates weighted score,
and returns detailed results.
Returns:
ToolOutput with:
- blocks: Formatted test results and score
- metadata: Structured test data
- reward: Final score (0.0 to 1.0)
- finished: True (ends episode)
"""
try:
# Step 2: Ensure test directory structure exists and copy files
# The task directory is mounted at /orwd_data/ via only_dir parameter
await self.sandbox.run("mkdir -p /app/tests")
# Copy test file
await self.sandbox.run(
"cp /orwd_data/tests/test_outputs.py /app/tests/"
)
# Copy any data files that the task needs (excluding tests/ directory and Dockerfile)
copy_result = await self.sandbox.run(
"find /orwd_data/ -maxdepth 1 -type f ! -name 'Dockerfile' -exec cp {} /app/ \\;"
)
# Step 3: Run tests with JSON output
test_timeout = self.task_data.get("max_test_timeout_sec", 60)
# Run pytest directly with json-report
test_result = await self.sandbox.run(
"cd /app && pytest tests/test_outputs.py -rA --json-report --json-report-file=/app/report.json"
)
# Step 4: Download and parse JSON report
try:
report_content = await self.sandbox.download("/app/report.json")
report = json.loads(report_content)
except Exception as e:
return ToolOutput(
blocks=[TextBlock(text=f"""
Test Report Not Found
=====================
Task ID: {self.task_id}
Error: Could not read test report - {str(e)}
This may indicate that pytest failed to run. Check test execution output above.
""")],
metadata={
"task_id": self.task_id,
"error": "report_not_found",
"details": str(e),
},
reward=0.0,
finished=True
)
# Parse test results
passed_tests = set()
failed_tests = set()
for test in report.get("tests", []):
# Extract test function name from nodeid
# Example nodeid: "tests/test_outputs.py::test_user_accounts_created"
test_name = test["nodeid"].split("::")[-1]
if test["outcome"] == "passed":
passed_tests.add(test_name)
else:
failed_tests.add(test_name)
# Step 5: Calculate weighted score
weights = self.task_data["weights"]
total_score = 0.0
for test_name, weight in weights.items():
if test_name in passed_tests:
total_score += weight
# Normalize score to 0.0-1.0 range
total_weight = sum(weights.values())
if total_weight > 0:
total_score = total_score / total_weight
# Step 6: Format results for display
test_details = []
for test_name in weights.keys():
status = "✓ PASSED" if test_name in passed_tests else "✗ FAILED"
weight = weights[test_name]
test_details.append(f" {status} | {test_name} (weight: {weight:.2f})")
summary_text = f"""
Test Execution Complete
========================
Task ID: {self.task_id}
Category: {self.task_data.get('category', 'unknown')}
Difficulty: {self.task_data.get('difficulty', 'unknown')}
Test Results:
{chr(10).join(test_details)}
Passed: {len(passed_tests)}/{len(weights)}
Final Score: {total_score:.2%}
"""
return ToolOutput(
blocks=[TextBlock(text=summary_text)],
metadata={
"task_id": self.task_id,
"score": total_score,
"passed_tests": list(passed_tests),
"failed_tests": list(failed_tests),
"test_count": len(weights),
"weights": weights,
},
reward=total_score,
finished=True
)
except Exception as e:
# Handle errors gracefully
error_text = f"""
Test Execution Failed
=====================
Task ID: {self.task_id}
Error: {str(e)}
The test suite encountered an error. Please check:
1. Your solution is complete
2. All required files are in place
3. The sandbox environment is properly configured
You may try running the tests again with submit_solution.
"""
return ToolOutput(
blocks=[TextBlock(text=error_text)],
metadata={
"task_id": self.task_id,
"error": str(e),
"score": 0.0,
},
reward=0.0,
finished=True
)