File size: 1,983 Bytes
d50c3f9
 
 
 
 
673b19e
 
 
 
d50c3f9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
673b19e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d50c3f9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import asyncio
from typing import Optional, Any
from openenv.core.env_server import Environment
from models import Action, Observation, StepResult, State
from env import DevOpsIncidentEnv as DevOpsIncidentLogic
from generator.incident_factory import IncidentFactory
from tasks.task_generated import GeneratedTask

_factory = IncidentFactory()

class DevOpsEnvironment(Environment[Action, Observation, State]):
    """Server-side environment wrapper using openenv-core."""
    
    SUPPORTS_CONCURRENT_SESSIONS = True

    def __init__(self):
        super().__init__()
        self._logic: Optional[DevOpsIncidentLogic] = None

    async def reset(
        self,
        seed: Optional[int] = None,
        episode_id: Optional[str] = None,
        task_id: str = "easy",
        **kwargs: Any
    ) -> Observation:
        if task_id == "generated":
            actual_seed = seed if seed is not None else 42
            incident = _factory.generate(actual_seed)
            task = GeneratedTask(incident_dict=incident)
            state = task.initialize()
            
            # Create the logic instance with 'easy' to bypass validation
            self._logic = DevOpsIncidentLogic(task_id="easy", seed=actual_seed)
            self._logic.task_id = "generated"
            self._logic._task = task
            self._logic._internal_state = state
            
            return state._build_observation()
        else:
            self._logic = DevOpsIncidentLogic(task_id=task_id, seed=seed)
            return self._logic.reset()

    async def step(
        self,
        action: Action,
        timeout_s: Optional[float] = None,
        **kwargs: Any
    ) -> StepResult:
        if self._logic is None:
            raise RuntimeError("Environment not reset")
        return self._logic.step(action)

    @property
    def state(self) -> State:
        if self._logic is None:
            raise RuntimeError("Environment not reset")
        return self._logic.state()