Spaces:
Sleeping
Sleeping
File size: 1,983 Bytes
d50c3f9 673b19e d50c3f9 673b19e d50c3f9 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 | import asyncio
from typing import Optional, Any
from openenv.core.env_server import Environment
from models import Action, Observation, StepResult, State
from env import DevOpsIncidentEnv as DevOpsIncidentLogic
from generator.incident_factory import IncidentFactory
from tasks.task_generated import GeneratedTask
_factory = IncidentFactory()
class DevOpsEnvironment(Environment[Action, Observation, State]):
"""Server-side environment wrapper using openenv-core."""
SUPPORTS_CONCURRENT_SESSIONS = True
def __init__(self):
super().__init__()
self._logic: Optional[DevOpsIncidentLogic] = None
async def reset(
self,
seed: Optional[int] = None,
episode_id: Optional[str] = None,
task_id: str = "easy",
**kwargs: Any
) -> Observation:
if task_id == "generated":
actual_seed = seed if seed is not None else 42
incident = _factory.generate(actual_seed)
task = GeneratedTask(incident_dict=incident)
state = task.initialize()
# Create the logic instance with 'easy' to bypass validation
self._logic = DevOpsIncidentLogic(task_id="easy", seed=actual_seed)
self._logic.task_id = "generated"
self._logic._task = task
self._logic._internal_state = state
return state._build_observation()
else:
self._logic = DevOpsIncidentLogic(task_id=task_id, seed=seed)
return self._logic.reset()
async def step(
self,
action: Action,
timeout_s: Optional[float] = None,
**kwargs: Any
) -> StepResult:
if self._logic is None:
raise RuntimeError("Environment not reset")
return self._logic.step(action)
@property
def state(self) -> State:
if self._logic is None:
raise RuntimeError("Environment not reset")
return self._logic.state()
|