# Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. """Crisisworldcortex Environment Client.""" from typing import Any, Dict from openenv.core import EnvClient from openenv.core.client_types import StepResult from openenv.core.env_server.types import State from .models import CrisisworldcortexAction, CrisisworldcortexObservation class CrisisworldcortexEnv(EnvClient[CrisisworldcortexAction, CrisisworldcortexObservation, State]): """ Client for the Crisisworldcortex Environment. Maintains a persistent WebSocket connection to the environment server. Each client instance gets its own dedicated environment session on the server. Example: >>> from CrisisWorldCortex import CrisisworldcortexAction, CrisisworldcortexEnv >>> from CrisisWorldCortex.models import NoOp >>> with CrisisworldcortexEnv(base_url="http://localhost:8000") as client: ... result = client.reset() ... print(result.observation.tick) ... ... result = client.step(CrisisworldcortexAction(action=NoOp())) ... print(result.observation.tick) Example with Docker: >>> client = CrisisworldcortexEnv.from_docker_image("CrisisWorldCortex-env:latest") >>> try: ... result = client.reset() ... result = client.step(CrisisworldcortexAction(action=NoOp())) ... finally: ... client.close() """ def _step_payload(self, action: CrisisworldcortexAction) -> Dict[str, Any]: """Serialize the action wrapper for WebSocket transport. Pydantic v2 ``model_dump()`` handles the discriminated-union serialization automatically: the inner ``OuterActionPayload`` is emitted with its ``kind`` field, which the server re-validates via ``CrisisworldcortexAction.model_validate(payload)``. """ return action.model_dump() def _parse_result(self, payload: Dict[str, Any]) -> StepResult[CrisisworldcortexObservation]: """Parse server response into ``StepResult[CrisisworldcortexObservation]``. ``model_validate`` rebuilds the discriminated-union payload inside the observation's ``recent_action_log`` correctly. """ obs_data = payload.get("observation", {}) observation = CrisisworldcortexObservation.model_validate(obs_data) return StepResult( observation=observation, reward=payload.get("reward"), done=payload.get("done", False), ) def _parse_state(self, payload: Dict[str, Any]) -> State: """Parse server response into the OpenEnv-compatible ``State`` object.""" return State( episode_id=payload.get("episode_id"), step_count=payload.get("step_count", 0), )