Spaces:
Runtime error
Runtime error
File size: 11,994 Bytes
add4140 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 | # Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
"""
Compressionenv Environment Implementation.
Environment where the agent proposes compression/decompression algorithms for a
Paul Graham essay. The environment validates round-trip correctness and scores
compressed size relative to the agent's prior attempts and baseline compressors.
"""
import base64
import json
import os
import random
import subprocess
import sys
import tempfile
from dataclasses import dataclass
from pathlib import Path
from uuid import uuid4
import bz2
import lzma
import zlib
from openenv.core.env_server.interfaces import Environment
from openenv.core.env_server.types import State
from models import CompressionenvAction, CompressionenvObservation
@dataclass(frozen=True)
class _Essay:
essay_id: str
text: str
class CompressionenvEnvironment(Environment):
"""
Compression algorithm search environment.
- On `reset()`, selects a PG essay (from `../essays/*.txt`) and returns it.
- On `step()`, executes agent-provided Python code defining:
compress(text: str) -> bytes
decompress(data: bytes) -> str
Validates that decompress(compress(essay)) == essay.
Rewards (per spec):
- If algorithms fail or don't round-trip: -1 reward.
- If compressed size is lower than average of previous successful sizes for
this essay in the episode: +1 reward.
- Compare against baselines (zlib, bz2, lzma):
- If agent achieves smaller size than at least one baseline: +10 reward.
- If agent achieves smaller size than the best baseline: +20 reward.
"""
# Enable concurrent WebSocket sessions.
# Set to True if your environment isolates state between instances.
# When True, multiple WebSocket clients can connect simultaneously, each
# getting their own environment instance (when using factory mode in app.py).
SUPPORTS_CONCURRENT_SESSIONS: bool = True
def __init__(self):
"""Initialize the compressionenv environment."""
self._state = State(episode_id=str(uuid4()), step_count=0)
self._essay: _Essay | None = None
self._successful_sizes: list[int] = []
self._baselines: dict[str, int] = {}
def reset(self) -> CompressionenvObservation:
"""
Reset the environment.
Returns:
CompressionenvObservation containing a selected essay
"""
self._state = State(episode_id=str(uuid4()), step_count=0)
self._essay = self._pick_essay()
self._successful_sizes = []
self._baselines = self._compute_baselines(self._essay.text)
return CompressionenvObservation(
essay_id=self._essay.essay_id,
essay_text=self._essay.text,
valid=True,
error=None,
compressed_size_bytes=None,
avg_prev_compressed_size_bytes=None,
improved_over_avg=None,
baselines_size_bytes=self._baselines,
best_baseline_size_bytes=min(self._baselines.values()) if self._baselines else None,
beat_any_baseline=None,
beat_best_baseline=None,
done=False,
reward=0.0,
metadata={
"episode_id": self._state.episode_id,
"step_count": self._state.step_count,
"num_baselines": len(self._baselines),
},
)
def step(self, action: CompressionenvAction) -> CompressionenvObservation: # type: ignore[override]
"""
Execute a step: run agent algorithms, validate, score compression size.
"""
if self._essay is None:
# Defensive: ensure reset called.
self._essay = self._pick_essay()
self._baselines = self._compute_baselines(self._essay.text)
self._successful_sizes = []
self._state.step_count += 1
essay_text = self._essay.text
baselines = self._baselines
best_baseline = min(baselines.values()) if baselines else None
reward = 0.0
error: str | None = None
valid = False
compressed_size: int | None = None
improved_over_avg: bool | None = None
beat_any_baseline: bool | None = None
beat_best_baseline: bool | None = None
avg_prev: float | None = None
try:
compressed_bytes = self._run_agent_codec(
essay_text=essay_text,
compression_code=action.compression_code,
decompression_code=action.decompression_code,
)
compressed_size = len(compressed_bytes)
valid = True
except Exception as e:
error = str(e)
reward = -1.0
if valid and compressed_size is not None:
if self._successful_sizes:
avg_prev = sum(self._successful_sizes) / len(self._successful_sizes)
improved_over_avg = compressed_size < avg_prev
if improved_over_avg:
reward += 1.0
else:
avg_prev = None
improved_over_avg = None
self._successful_sizes.append(compressed_size)
if baselines:
beat_any_baseline = any(compressed_size < s for s in baselines.values())
beat_best_baseline = best_baseline is not None and compressed_size < best_baseline
if beat_best_baseline:
reward += 20.0
elif beat_any_baseline:
reward += 10.0
return CompressionenvObservation(
essay_id=self._essay.essay_id,
essay_text=essay_text,
valid=valid,
error=error,
compressed_size_bytes=compressed_size,
avg_prev_compressed_size_bytes=avg_prev,
improved_over_avg=improved_over_avg,
baselines_size_bytes=baselines,
best_baseline_size_bytes=best_baseline,
beat_any_baseline=beat_any_baseline,
beat_best_baseline=beat_best_baseline,
done=False,
reward=reward,
metadata={
"episode_id": self._state.episode_id,
"step_count": self._state.step_count,
"algo_name": action.algo_name,
"num_successful_attempts": len(self._successful_sizes),
},
)
@property
def state(self) -> State:
"""
Get the current environment **state**.
In RL terms, the State is a (Markov) description of the underlying
environment that is at least as informative as any single Observation.
Here we include all information needed to reconstruct what any call to
`reset()` or `step()` would expose in an observation for this episode.
Returns:
Current State with core fields plus extra environment details.
"""
# State allows extra fields, so we enrich it to be a superset of any
# single observation: from this State, an agent could derive the latest
# observation for the current episode.
if self._essay is not None:
self._state.essay_id = self._essay.essay_id # type: ignore[attr-defined]
self._state.essay_text = self._essay.text # type: ignore[attr-defined]
self._state.baselines_size_bytes = self._baselines # type: ignore[attr-defined]
self._state.num_successful_attempts = len(self._successful_sizes) # type: ignore[attr-defined]
if self._successful_sizes:
self._state.best_compressed_size_bytes = min(self._successful_sizes) # type: ignore[attr-defined]
self._state.last_compressed_size_bytes = self._successful_sizes[-1] # type: ignore[attr-defined]
if self._baselines:
self._state.best_baseline_size_bytes = min(self._baselines.values()) # type: ignore[attr-defined]
return self._state
def _pick_essay(self) -> _Essay:
# Expected layout:
# compression-openenv/
# essays/
# compressionenv/
# server/
# compressionenv_environment.py (this file)
essays_dir = Path(__file__).resolve().parents[2] / "essays"
if not essays_dir.exists():
# Try repo-level essays directory (if running from different cwd/layout).
essays_dir = Path(os.getcwd()).resolve() / "essays"
paths = sorted(essays_dir.glob("*.txt"))
if not paths:
raise FileNotFoundError(
f"No essays found in {essays_dir}. Expected PG essay .txt files."
)
path = random.choice(paths)
essay_id = path.stem
text = path.read_text(encoding="utf-8")
return _Essay(essay_id=essay_id, text=text)
def _compute_baselines(self, text: str) -> dict[str, int]:
data = text.encode("utf-8")
# Deterministic settings.
baselines: dict[str, bytes] = {
"zlib": zlib.compress(data, level=9),
"bz2": bz2.compress(data, compresslevel=9),
"lzma": lzma.compress(data, preset=9),
}
return {k: len(v) for k, v in baselines.items()}
def _run_agent_codec(
self,
essay_text: str,
compression_code: str,
decompression_code: str,
) -> bytes:
"""
Execute agent code in a subprocess and return compressed bytes.
Security note: this is not a hardened sandbox. It's a best-effort isolation
to avoid contaminating the server process, with a timeout.
"""
runner = r"""
import base64
import json
import sys
payload = json.loads(sys.stdin.read())
essay_text = payload["essay_text"]
compression_code = payload["compression_code"]
decompression_code = payload["decompression_code"]
ns = {}
exec(compression_code, ns, ns)
exec(decompression_code, ns, ns)
compress = ns.get("compress")
decompress = ns.get("decompress")
if compress is None or decompress is None:
raise RuntimeError("Expected functions compress(text: str)->bytes and decompress(data: bytes)->str")
compressed = compress(essay_text)
if not isinstance(compressed, (bytes, bytearray)):
raise RuntimeError(f"compress() must return bytes, got {type(compressed)}")
compressed = bytes(compressed)
round_trip = decompress(compressed)
if not isinstance(round_trip, str):
raise RuntimeError(f"decompress() must return str, got {type(round_trip)}")
if round_trip != essay_text:
raise RuntimeError("Round-trip failed: decompress(compress(essay)) != essay")
sys.stdout.write(base64.b64encode(compressed).decode("ascii"))
"""
payload = {
"essay_text": essay_text,
"compression_code": compression_code,
"decompression_code": decompression_code,
}
with tempfile.TemporaryDirectory() as td:
proc = subprocess.run(
[sys.executable, "-c", runner],
input=json.dumps(payload).encode("utf-8"),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=td,
timeout=3.0,
env={
"PYTHONIOENCODING": "utf-8",
"PYTHONUTF8": "1",
"PYTHONDONTWRITEBYTECODE": "1",
},
)
if proc.returncode != 0:
stderr = proc.stderr.decode("utf-8", errors="replace").strip()
raise RuntimeError(stderr or f"Agent codec subprocess failed with code {proc.returncode}")
out = proc.stdout.decode("utf-8", errors="replace").strip()
try:
return base64.b64decode(out.encode("ascii"), validate=True)
except Exception as e:
raise RuntimeError(f"Failed to decode compressed output: {e}") from e
|