# Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. """Customer Env Environment Client.""" from typing import Dict, Any from openenv.core import EnvClient from openenv.core.client_types import StepResult from openenv.core.env_server.types import State from models import CustomerAction, CustomerObservation class CustomerEnv( EnvClient[CustomerAction, CustomerObservation, State] ): """ Client for the Customer Env Environment (Banking POMDP). This client maintains a persistent WebSocket connection to the environment server, enabling efficient multi-step interactions with lower latency. """ def _step_payload(self, action: CustomerAction) -> Dict[str, Any]: """ Convert CustomerAction to JSON payload for step message. """ return { "action_type": action.action_type, "content": action.content, "tool_args": action.tool_args, } def _parse_result(self, payload: Dict[str, Any]) -> StepResult[CustomerObservation]: """ Parse server response into StepResult[CustomerObservation]. """ obs_data = payload.get("observation", {}) observation = CustomerObservation( customer_reply=obs_data.get("customer_reply"), tool_response=obs_data.get("tool_response"), conversation_history=obs_data.get("conversation_history", ""), done=payload.get("done", False), reward=payload.get("reward", 0.0), metadata=obs_data.get("metadata", {}), ) return StepResult( observation=observation, reward=payload.get("reward", 0.0), done=payload.get("done", False), ) def _parse_state(self, payload: Dict[str, Any]) -> State: """ Parse server response into State object. """ return State( episode_id=payload.get("episode_id"), step_count=payload.get("step_count", 0), )