# Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. """ Compressionenv Environment Implementation. Environment where the agent proposes compression/decompression algorithms for a Paul Graham essay. The environment validates round-trip correctness and scores compressed size relative to the agent's prior attempts and baseline compressors. """ import base64 import json import os import random import subprocess import sys import tempfile from dataclasses import dataclass from pathlib import Path from uuid import uuid4 import bz2 import lzma import zlib from openenv.core.env_server.interfaces import Environment from openenv.core.env_server.types import State from models import CompressionenvAction, CompressionenvObservation @dataclass(frozen=True) class _Essay: essay_id: str text: str class CompressionenvEnvironment(Environment): """ Compression algorithm search environment. - On `reset()`, selects a PG essay (from `../essays/*.txt`) and returns it. - On `step()`, executes agent-provided Python code defining: compress(text: str) -> bytes decompress(data: bytes) -> str Validates that decompress(compress(essay)) == essay. Rewards (per spec): - If algorithms fail or don't round-trip: -1 reward. - If compressed size is lower than average of previous successful sizes for this essay in the episode: +1 reward. - Compare against baselines (zlib, bz2, lzma): - If agent achieves smaller size than at least one baseline: +10 reward. - If agent achieves smaller size than the best baseline: +20 reward. """ # Enable concurrent WebSocket sessions. # Set to True if your environment isolates state between instances. # When True, multiple WebSocket clients can connect simultaneously, each # getting their own environment instance (when using factory mode in app.py). SUPPORTS_CONCURRENT_SESSIONS: bool = True def __init__(self): """Initialize the compressionenv environment.""" self._state = State(episode_id=str(uuid4()), step_count=0) self._essay: _Essay | None = None self._successful_sizes: list[int] = [] self._baselines: dict[str, int] = {} def reset(self) -> CompressionenvObservation: """ Reset the environment. Returns: CompressionenvObservation containing a selected essay """ self._state = State(episode_id=str(uuid4()), step_count=0) self._essay = self._pick_essay() self._successful_sizes = [] self._baselines = self._compute_baselines(self._essay.text) return CompressionenvObservation( essay_id=self._essay.essay_id, essay_text=self._essay.text, valid=True, error=None, compressed_size_bytes=None, avg_prev_compressed_size_bytes=None, improved_over_avg=None, baselines_size_bytes=self._baselines, best_baseline_size_bytes=min(self._baselines.values()) if self._baselines else None, beat_any_baseline=None, beat_best_baseline=None, done=False, reward=0.0, metadata={ "episode_id": self._state.episode_id, "step_count": self._state.step_count, "num_baselines": len(self._baselines), }, ) def step(self, action: CompressionenvAction) -> CompressionenvObservation: # type: ignore[override] """ Execute a step: run agent algorithms, validate, score compression size. """ if self._essay is None: # Defensive: ensure reset called. self._essay = self._pick_essay() self._baselines = self._compute_baselines(self._essay.text) self._successful_sizes = [] self._state.step_count += 1 essay_text = self._essay.text baselines = self._baselines best_baseline = min(baselines.values()) if baselines else None reward = 0.0 error: str | None = None valid = False compressed_size: int | None = None improved_over_avg: bool | None = None beat_any_baseline: bool | None = None beat_best_baseline: bool | None = None avg_prev: float | None = None try: compressed_bytes = self._run_agent_codec( essay_text=essay_text, compression_code=action.compression_code, decompression_code=action.decompression_code, ) compressed_size = len(compressed_bytes) valid = True except Exception as e: error = str(e) reward = -1.0 if valid and compressed_size is not None: if self._successful_sizes: avg_prev = sum(self._successful_sizes) / len(self._successful_sizes) improved_over_avg = compressed_size < avg_prev if improved_over_avg: reward += 1.0 else: avg_prev = None improved_over_avg = None self._successful_sizes.append(compressed_size) if baselines: beat_any_baseline = any(compressed_size < s for s in baselines.values()) beat_best_baseline = best_baseline is not None and compressed_size < best_baseline if beat_best_baseline: reward += 20.0 elif beat_any_baseline: reward += 10.0 return CompressionenvObservation( essay_id=self._essay.essay_id, essay_text=essay_text, valid=valid, error=error, compressed_size_bytes=compressed_size, avg_prev_compressed_size_bytes=avg_prev, improved_over_avg=improved_over_avg, baselines_size_bytes=baselines, best_baseline_size_bytes=best_baseline, beat_any_baseline=beat_any_baseline, beat_best_baseline=beat_best_baseline, done=False, reward=reward, metadata={ "episode_id": self._state.episode_id, "step_count": self._state.step_count, "algo_name": action.algo_name, "num_successful_attempts": len(self._successful_sizes), }, ) @property def state(self) -> State: """ Get the current environment **state**. In RL terms, the State is a (Markov) description of the underlying environment that is at least as informative as any single Observation. Here we include all information needed to reconstruct what any call to `reset()` or `step()` would expose in an observation for this episode. Returns: Current State with core fields plus extra environment details. """ # State allows extra fields, so we enrich it to be a superset of any # single observation: from this State, an agent could derive the latest # observation for the current episode. if self._essay is not None: self._state.essay_id = self._essay.essay_id # type: ignore[attr-defined] self._state.essay_text = self._essay.text # type: ignore[attr-defined] self._state.baselines_size_bytes = self._baselines # type: ignore[attr-defined] self._state.num_successful_attempts = len(self._successful_sizes) # type: ignore[attr-defined] if self._successful_sizes: self._state.best_compressed_size_bytes = min(self._successful_sizes) # type: ignore[attr-defined] self._state.last_compressed_size_bytes = self._successful_sizes[-1] # type: ignore[attr-defined] if self._baselines: self._state.best_baseline_size_bytes = min(self._baselines.values()) # type: ignore[attr-defined] return self._state def _pick_essay(self) -> _Essay: # Expected layout: # compression-openenv/ # essays/ # compressionenv/ # server/ # compressionenv_environment.py (this file) essays_dir = Path(__file__).resolve().parents[2] / "essays" if not essays_dir.exists(): # Try repo-level essays directory (if running from different cwd/layout). essays_dir = Path(os.getcwd()).resolve() / "essays" paths = sorted(essays_dir.glob("*.txt")) if not paths: raise FileNotFoundError( f"No essays found in {essays_dir}. Expected PG essay .txt files." ) path = random.choice(paths) essay_id = path.stem text = path.read_text(encoding="utf-8") return _Essay(essay_id=essay_id, text=text) def _compute_baselines(self, text: str) -> dict[str, int]: data = text.encode("utf-8") # Deterministic settings. baselines: dict[str, bytes] = { "zlib": zlib.compress(data, level=9), "bz2": bz2.compress(data, compresslevel=9), "lzma": lzma.compress(data, preset=9), } return {k: len(v) for k, v in baselines.items()} def _run_agent_codec( self, essay_text: str, compression_code: str, decompression_code: str, ) -> bytes: """ Execute agent code in a subprocess and return compressed bytes. Security note: this is not a hardened sandbox. It's a best-effort isolation to avoid contaminating the server process, with a timeout. """ runner = r""" import base64 import json import sys payload = json.loads(sys.stdin.read()) essay_text = payload["essay_text"] compression_code = payload["compression_code"] decompression_code = payload["decompression_code"] ns = {} exec(compression_code, ns, ns) exec(decompression_code, ns, ns) compress = ns.get("compress") decompress = ns.get("decompress") if compress is None or decompress is None: raise RuntimeError("Expected functions compress(text: str)->bytes and decompress(data: bytes)->str") compressed = compress(essay_text) if not isinstance(compressed, (bytes, bytearray)): raise RuntimeError(f"compress() must return bytes, got {type(compressed)}") compressed = bytes(compressed) round_trip = decompress(compressed) if not isinstance(round_trip, str): raise RuntimeError(f"decompress() must return str, got {type(round_trip)}") if round_trip != essay_text: raise RuntimeError("Round-trip failed: decompress(compress(essay)) != essay") sys.stdout.write(base64.b64encode(compressed).decode("ascii")) """ payload = { "essay_text": essay_text, "compression_code": compression_code, "decompression_code": decompression_code, } with tempfile.TemporaryDirectory() as td: proc = subprocess.run( [sys.executable, "-c", runner], input=json.dumps(payload).encode("utf-8"), stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=td, timeout=3.0, env={ "PYTHONIOENCODING": "utf-8", "PYTHONUTF8": "1", "PYTHONDONTWRITEBYTECODE": "1", }, ) if proc.returncode != 0: stderr = proc.stderr.decode("utf-8", errors="replace").strip() raise RuntimeError(stderr or f"Agent codec subprocess failed with code {proc.returncode}") out = proc.stdout.decode("utf-8", errors="replace").strip() try: return base64.b64decode(out.encode("ascii"), validate=True) except Exception as e: raise RuntimeError(f"Failed to decode compressed output: {e}") from e