harsharajkumar273's picture
Upload CleanOps OpenEnv submission
ac326a6 verified
"""OpenEnv WebSocket client for CleanOps."""
from __future__ import annotations
from typing import Any
from openenv.core import EnvClient
from openenv.core.client_types import StepResult
from cleanops_env.models import DataCleaningAction, DataCleaningObservation, DataCleaningState
class CleanOpsEnvClient(EnvClient[DataCleaningAction, DataCleaningObservation, DataCleaningState]):
"""Typed client for interacting with a running CleanOps server."""
def _step_payload(self, action: DataCleaningAction) -> dict[str, Any]:
return action.model_dump()
def _parse_result(self, payload: dict[str, Any]) -> StepResult[DataCleaningObservation]:
obs_payload = payload.get("observation", {})
observation = DataCleaningObservation.model_validate(
{
**obs_payload,
"reward": payload.get("reward", obs_payload.get("reward")),
"done": payload.get("done", obs_payload.get("done", False)),
}
)
return StepResult(observation=observation, reward=payload.get("reward"), done=payload.get("done", False))
def _parse_state(self, payload: dict[str, Any]) -> DataCleaningState:
return DataCleaningState.model_validate(payload)
def step_tuple(self, action: DataCleaningAction, **kwargs: Any) -> tuple[DataCleaningObservation, float, bool, dict[str, Any]]:
"""Convenience adapter that returns (observation, reward, done, info)."""
result = self.step(action, **kwargs)
info = {
"state": self.state().model_dump(),
"grader": result.observation.grader.model_dump(),
"reward_breakdown": result.observation.reward_breakdown.model_dump(),
"last_action_status": result.observation.last_action_status,
"last_action_error": result.observation.last_action_error,
}
return result.observation, float(result.reward or 0.0), bool(result.done), info