# Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. """Hft Environment Client.""" from typing import Dict, List from openenv.core import EnvClient from openenv.core.client_types import StepResult from openenv.core.env_server.types import State try: from hft.models import HftAction, HftObservation, HftState except ModuleNotFoundError: from models import HftAction, HftObservation, HftState class HftEnv(EnvClient[HftAction, HftObservation, HftState]): """ Client for the Hft Environment. This client maintains a persistent WebSocket connection to the environment server, enabling efficient multi-step interactions with lower latency. Each client instance has its own dedicated environment session on the server. Example: >>> # Connect to a running server >>> with HftEnv(base_url="http://localhost:8000") as client: ... result = client.reset() ... print(result.observation.mid) ... ... action = HftAction(market_buy_size=10) ... result = client.step(action) ... print(result.observation.reward) Example with Docker: >>> # Automatically start container and connect >>> client = HftEnv.from_docker_image("hft-env:latest") >>> try: ... result = client.reset() ... action = HftAction(limit_ask_price=101.5, limit_ask_size=5) ... result = client.step(action) ... finally: ... client.close() """ def _step_payload(self, action: HftAction) -> Dict: """ Convert HftAction to JSON payload for step message. Args: action: HftAction instance Returns: Dictionary representation suitable for JSON encoding """ return { "limit_buy_price": action.limit_buy_price, "limit_buy_size": action.limit_buy_size, "limit_ask_price": action.limit_ask_price, "limit_ask_size": action.limit_ask_size, "market_buy_size": action.market_buy_size, "market_ask_size": action.market_ask_size, "cancel_order_id": action.cancel_order_id, } def _parse_result(self, payload: Dict) -> StepResult[HftObservation]: """ Parse server response into StepResult[HftObservation]. Args: payload: JSON response data from server Returns: StepResult with HftObservation """ obs_data = payload.get("observation", {}) observation = HftObservation( time=obs_data.get("time", 0.0), reward=payload.get("reward", 0.0), done=obs_data.get("done", False), history=obs_data.get("history", []), active_orders=obs_data.get("active_orders", []), ) return StepResult( observation=observation, reward=payload.get("reward"), done=payload.get("done", False), ) def _parse_state(self, payload: Dict) -> HftState: """ Parse server response into State object. Args: payload: JSON response from state request Returns: State object with episode and portfolio fields """ return HftState( task_name=payload.get("task_name"), episode_id=payload.get("episode_id"), active_orders=payload.get("active_orders", []), step_count=payload.get("step_count", 0), max_inventory=payload.get("max_inventory", 0), inventory=payload.get("inventory", 0), cash=payload.get("cash", 0.0), )