Veer15's picture
Upload folder using huggingface_hub
0da1902 verified
from typing import Any
from openenv.core import EnvClient
from openenv.core.client_types import StepResult as ClientStepResult
from openenv.core.env_server.types import State
from .models import Action, Observation, SystemMetrics
class DistributedSystemsDebugEnv(EnvClient[Action, Observation]):
"""Client wrapper around the environment HTTP API."""
def _step_payload(self, action: Action) -> dict[str, Any]:
return action.model_dump()
def _parse_result(self, payload: dict[str, Any]) -> ClientStepResult[Observation]:
observation_payload = payload.get("observation") or {}
metrics_payload = observation_payload.get("metrics") or {}
observation = Observation(
command_output=str(observation_payload.get("command_output") or ""),
metrics=SystemMetrics(
gateway_success_rate=float(
metrics_payload.get("gateway_success_rate", 0.0)
),
gateway_p99_latency_ms=float(
metrics_payload.get("gateway_p99_latency_ms", 0.0)
),
queue_depth=int(metrics_payload.get("queue_depth", 0)),
worker_restart_count=int(
metrics_payload.get("worker_restart_count", 0)
),
consumer_stall_count=int(
metrics_payload.get("consumer_stall_count", 0)
),
),
process_status={
str(key): str(value)
for key, value in dict(
observation_payload.get("process_status") or {}
).items()
},
)
reward = payload.get("reward")
return ClientStepResult(
observation=observation,
reward=float(reward) if reward is not None else None,
done=bool(payload.get("done", False)),
)
def _parse_state(self, payload: dict[str, Any]) -> State:
return State(
episode_id=payload.get("task"),
step_count=int(payload.get("step_count", 0)),
)