File size: 2,517 Bytes
1175c0b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
"""
EnvClient subclass for the SRE Incident Response environment.

Handles WebSocket/HTTP communication with the environment server.
Parses responses into typed models.
"""

from __future__ import annotations

from typing import Any, Dict, Optional

from .models import IncidentAction, IncidentObservation, IncidentState


class IncidentEnvClient:
    """
    Client for interacting with the Incident Response environment.

    Can be used standalone (direct HTTP calls) or subclassed from
    openenv.core.EnvClient for full framework integration.
    """

    def __init__(self, base_url: str = "http://localhost:8000") -> None:
        self.base_url = base_url.rstrip("/")
        self._session = None

    def _ensure_session(self):
        if self._session is None:
            import requests
            self._session = requests.Session()

    def reset(
        self,
        task_name: Optional[str] = None,
        seed: Optional[int] = None,
    ) -> Dict[str, Any]:
        """Reset the environment. Returns initial observation."""
        self._ensure_session()
        payload: Dict[str, Any] = {}
        if task_name:
            payload["task_name"] = task_name
        if seed is not None:
            payload["seed"] = seed

        resp = self._session.post(f"{self.base_url}/reset", json=payload)
        resp.raise_for_status()
        return resp.json()

    def step(self, action: IncidentAction) -> Dict[str, Any]:
        """Execute an action. Returns observation, reward, done."""
        self._ensure_session()
        payload = {
            "action_type": action.action_type,
            "target_service": action.target_service,
            "parameters": action.parameters,
        }
        resp = self._session.post(f"{self.base_url}/step", json=payload)
        resp.raise_for_status()
        return resp.json()

    def state(self) -> Dict[str, Any]:
        """Get current episode state."""
        self._ensure_session()
        resp = self._session.get(f"{self.base_url}/state")
        resp.raise_for_status()
        return resp.json()

    def health(self) -> Dict[str, str]:
        """Health check."""
        self._ensure_session()
        resp = self._session.get(f"{self.base_url}/health")
        resp.raise_for_status()
        return resp.json()

    def close(self) -> None:
        if self._session:
            self._session.close()
            self._session = None

    def __enter__(self):
        return self

    def __exit__(self, *args):
        self.close()