Spaces:
Sleeping
Sleeping
| """DiskPanicEnvironment — server-side OpenEnv Environment implementation.""" | |
| from __future__ import annotations | |
| import os | |
| from typing import Any, Optional | |
| from openenv.core import Environment, State | |
| from disk_panic.models import DiskPanicAction, DiskPanicObservation | |
| from disk_panic.server.graders import GRADERS | |
| from disk_panic.server.scenarios import SCENARIOS, TASK_ORDER | |
| from disk_panic.server.vfs import VFS, execute | |
| MAX_STEPS = 15 | |
| class DiskPanicEnvironment(Environment[DiskPanicAction, DiskPanicObservation, State]): | |
| """A tiny SRE world where the agent issues bash-lite commands to fix a broken server.""" | |
| SUPPORTS_CONCURRENT_SESSIONS = True | |
| def __init__(self) -> None: | |
| super().__init__() | |
| self._task_index = 0 | |
| self._vfs: VFS = VFS() | |
| self._targets: dict = {} | |
| self._task_id: str = TASK_ORDER[0] | |
| self._step_count: int = 0 | |
| self._state = State(episode_id=None, step_count=0) | |
| self._last_reward: float = 0.0 | |
| # -- lifecycle --------------------------------------------------------- | |
| def reset( | |
| self, | |
| seed: Optional[int] = None, | |
| episode_id: Optional[str] = None, | |
| task_id: Optional[str] = None, | |
| **kwargs: Any, | |
| ) -> DiskPanicObservation: | |
| # Task selection: explicit task_id kwarg > env var > round-robin. | |
| if task_id and task_id in SCENARIOS: | |
| chosen = task_id | |
| else: | |
| env_task = os.getenv("DISK_PANIC_TASK") | |
| if env_task and env_task in SCENARIOS: | |
| chosen = env_task | |
| else: | |
| chosen = TASK_ORDER[self._task_index % len(TASK_ORDER)] | |
| self._task_index += 1 | |
| builder = SCENARIOS[chosen] | |
| self._vfs, self._targets = builder() | |
| self._task_id = chosen | |
| self._step_count = 0 | |
| self._last_reward = 0.0 | |
| self._state = State(episode_id=episode_id or f"ep-{chosen}", step_count=0) | |
| return DiskPanicObservation( | |
| done=False, | |
| reward=None, | |
| stdout=( | |
| f"=== DiskPanic task: {chosen} ===\n" | |
| f"{self._describe_task()}\n" | |
| f"Current disk usage: {self._vfs.usage_pct():.1f}%" | |
| ), | |
| df_output=self._vfs.df_output(), | |
| service_status=self._vfs.services.get("app", "unknown"), | |
| task_id=chosen, | |
| step=0, | |
| last_error=None, | |
| ) | |
| def step( | |
| self, | |
| action: DiskPanicAction, | |
| timeout_s: Optional[float] = None, | |
| **kwargs: Any, | |
| ) -> DiskPanicObservation: | |
| self._step_count += 1 | |
| self._state.step_count = self._step_count | |
| # Execute the command against the VFS. | |
| result = execute(self._vfs, action.command) | |
| # Tick the world (runaway writer, etc.) | |
| self._vfs.tick() | |
| # Compute current grade — always in [0.0, 1.0]. | |
| grader = GRADERS[self._task_id] | |
| current_score = grader(self._vfs, self._targets) | |
| self._last_reward = current_score | |
| # Episode ends when full score reached OR max steps elapsed. | |
| done = current_score >= 0.98 or self._step_count >= MAX_STEPS | |
| # Reward is the ABSOLUTE current grade so the last step reflects the final score. | |
| reward = round(float(current_score), 4) | |
| return DiskPanicObservation( | |
| done=done, | |
| reward=reward, | |
| stdout=result.stdout or "", | |
| df_output=self._vfs.df_output(), | |
| service_status=self._vfs.services.get("app", "unknown"), | |
| task_id=self._task_id, | |
| step=self._step_count, | |
| last_error=result.error, | |
| ) | |
| def state(self) -> State: | |
| return self._state | |
| def close(self) -> None: | |
| self._vfs = VFS() | |
| self._targets = {} | |
| # -- helpers ----------------------------------------------------------- | |
| def _describe_task(self) -> str: | |
| if self._task_id == "easy": | |
| return ( | |
| "Root filesystem is at >95% usage — find and remove the bloated log " | |
| "under /var/log. DO NOT touch /var/log/audit/." | |
| ) | |
| if self._task_id == "medium": | |
| return ( | |
| "Disk is full AND app.service has failed. Free space, restart " | |
| "app.service, and preserve /var/log/audit/." | |
| ) | |
| if self._task_id == "hard": | |
| return ( | |
| "A runaway process keeps growing /var/log/app/runaway.log by ~100M " | |
| "every tick. Free space, restart app.service, preserve audit logs, " | |
| "AND drop a logrotate config at /etc/logrotate.d/app (must contain " | |
| "the words 'rotate' and 'size') to cap the growth." | |
| ) | |
| return "unknown task" | |