"""DataSage Cleaning Environment Client.""" from typing import Dict from openenv.core import EnvClient from openenv.core.client_types import StepResult from openenv.core.env_server.types import State from .models import CleaningAction, CleaningObservation class CleaningEnv(EnvClient[CleaningAction, CleaningObservation, State]): """ Client for the DataSage Cleaning Environment. Example: >>> with CleaningEnv(base_url="http://localhost:8000") as client: ... result = client.reset() ... print(result.observation.dq_score) ... result = client.step(CleaningAction( ... operation="fill_null", column="Age", value="median" ... )) ... print(result.observation.dq_score) """ def _step_payload(self, action: CleaningAction) -> Dict: """Convert CleaningAction to JSON payload.""" return { "operation": action.operation, "column": action.column, "value": action.value, "params": action.params, } def _parse_result(self, payload: Dict) -> StepResult[CleaningObservation]: """Parse server response into StepResult[CleaningObservation].""" obs_data = payload.get("observation", {}) observation = CleaningObservation( domain=obs_data.get("domain", ""), data_preview=obs_data.get("data_preview", ""), dq_report=obs_data.get("dq_report", ""), dq_score=obs_data.get("dq_score", 0.0), columns_info=obs_data.get("columns_info", ""), step_number=obs_data.get("step_number", 0), max_steps=obs_data.get("max_steps", 15), done=payload.get("done", False), reward=payload.get("reward"), metadata=obs_data.get("metadata", {}), ) return StepResult( observation=observation, reward=payload.get("reward"), done=payload.get("done", False), ) def _parse_state(self, payload: Dict) -> State: """Parse server response into State object.""" return State( episode_id=payload.get("episode_id"), step_count=payload.get("step_count", 0), )