Spaces:
Sleeping
Sleeping
| # Copyright (c) Meta Platforms, Inc. and affiliates. | |
| # All rights reserved. | |
| """Customer Env Environment Client.""" | |
| from typing import Dict, Any | |
| from openenv.core import EnvClient | |
| from openenv.core.client_types import StepResult | |
| from openenv.core.env_server.types import State | |
| from models import CustomerAction, CustomerObservation | |
| class CustomerEnv( | |
| EnvClient[CustomerAction, CustomerObservation, State] | |
| ): | |
| """ | |
| Client for the Customer Env Environment (Banking POMDP). | |
| This client maintains a persistent WebSocket connection to the environment server, | |
| enabling efficient multi-step interactions with lower latency. | |
| """ | |
| def _step_payload(self, action: CustomerAction) -> Dict[str, Any]: | |
| """ | |
| Convert CustomerAction to JSON payload for step message. | |
| """ | |
| return { | |
| "action_type": action.action_type, | |
| "content": action.content, | |
| "tool_args": action.tool_args, | |
| } | |
| def _parse_result(self, payload: Dict[str, Any]) -> StepResult[CustomerObservation]: | |
| """ | |
| Parse server response into StepResult[CustomerObservation]. | |
| """ | |
| obs_data = payload.get("observation", {}) | |
| observation = CustomerObservation( | |
| customer_reply=obs_data.get("customer_reply"), | |
| tool_response=obs_data.get("tool_response"), | |
| conversation_history=obs_data.get("conversation_history", ""), | |
| done=payload.get("done", False), | |
| reward=payload.get("reward", 0.0), | |
| metadata=obs_data.get("metadata", {}), | |
| ) | |
| return StepResult( | |
| observation=observation, | |
| reward=payload.get("reward", 0.0), | |
| done=payload.get("done", False), | |
| ) | |
| def _parse_state(self, payload: Dict[str, Any]) -> State: | |
| """ | |
| Parse server response into State object. | |
| """ | |
| return State( | |
| episode_id=payload.get("episode_id"), | |
| step_count=payload.get("step_count", 0), | |
| ) |