| """NOC Agent Environment — OpenEnv WebSocket client.""" |
|
|
| from __future__ import annotations |
|
|
| from typing import Any |
|
|
| from openenv.core import EnvClient |
| from openenv.core.client_types import StepResult |
| from openenv.core.env_server.types import State |
|
|
| from .models import ActionType, IncidentType, NOCAction, NOCObservation, SystemMetrics |
|
|
|
|
| class NocAgentEnvClient(EnvClient[NOCAction, NOCObservation, State]): |
| """ |
| WebSocket client for the NOC Agent Environment server. |
| |
| Example:: |
| |
| with NocAgentEnvClient(base_url="http://localhost:8000") as client: |
| result = client.reset() |
| print(result.observation.incident_type) |
| |
| result = client.step(NOCAction(action_type=ActionType.THROTTLE_CPU)) |
| print(result.observation.explanation) |
| """ |
|
|
| def _step_payload(self, action: NOCAction) -> dict[str, Any]: |
| return {"action_type": action.action_type.value} |
|
|
| def _parse_result(self, payload: dict[str, Any]) -> StepResult[NOCObservation]: |
| obs_data = payload.get("observation", {}) |
| metrics_data = obs_data.get("metrics", {}) |
|
|
| metrics = SystemMetrics( |
| cpu_usage=metrics_data.get("cpu_usage", 0.5), |
| memory_usage=metrics_data.get("memory_usage", 0.5), |
| latency=metrics_data.get("latency", 0.1), |
| packet_loss=metrics_data.get("packet_loss", 0.0), |
| service_healthy=metrics_data.get("service_healthy", 1.0), |
| error_rate=metrics_data.get("error_rate", 0.0), |
| ) |
| observation = NOCObservation( |
| metrics=metrics, |
| incident_type=IncidentType(obs_data.get("incident_type", IncidentType.CPU_OVERLOAD)), |
| step=obs_data.get("step", 0), |
| explanation=obs_data.get("explanation", ""), |
| done=payload.get("done", False), |
| reward=payload.get("reward"), |
| metadata=obs_data.get("metadata", {}), |
| ) |
| return StepResult( |
| observation=observation, |
| reward=payload.get("reward"), |
| done=payload.get("done", False), |
| ) |
|
|
| def _parse_state(self, payload: dict[str, Any]) -> State: |
| return State( |
| episode_id=payload.get("episode_id"), |
| step_count=payload.get("step_count", 0), |
| ) |
|
|