# Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. """Road Traffic Simulator Environment Client.""" from typing import Dict from openenv.core.client_types import StepResult from openenv.core.env_server.types import State from openenv.core import EnvClient from .models import RoadTrafficSimulatorAction, RoadTrafficSimulatorObservation class RoadTrafficSimulatorEnv( EnvClient[RoadTrafficSimulatorAction, RoadTrafficSimulatorObservation] ): """ Client for the Road Traffic Simulator Environment. The agent controls a single car navigating from a random start to a random end node in downtown San Francisco, surrounded by background traffic. Action space ------------ RoadTrafficSimulatorAction(next_edge_index=) Choose which outgoing road to take at the current intersection. Valid range: [0, observation.available_actions - 1]. Observation ----------- RoadTrafficSimulatorObservation .lat / .lon – agent GPS position .goal_lat / .goal_lon – destination GPS position .distance_to_goal – straight-line metres to goal .available_actions – number of choices at current node .map_screenshot – base64 PNG (400×400) traffic heatmap Example ------- >>> with RoadTrafficSimulatorEnv(base_url="http://localhost:8000") as env: ... result = env.reset() ... obs = result.observation ... print(f"Start: ({obs.lat:.4f}, {obs.lon:.4f})") ... print(f"Goal: ({obs.goal_lat:.4f}, {obs.goal_lon:.4f})") ... print(f"Dist: {obs.distance_to_goal:.0f} m") ... ... # Greedy: always pick edge 0 ... while not result.done: ... result = env.step(RoadTrafficSimulatorAction(next_edge_index=0)) ... print(f"Done after {result.observation.metadata['step']} steps") """ def _step_payload(self, action: RoadTrafficSimulatorAction) -> Dict: return {"next_edge_index": action.next_edge_index} def _parse_result(self, payload: Dict) -> StepResult[RoadTrafficSimulatorObservation]: obs_data = payload.get("observation", {}) observation = RoadTrafficSimulatorObservation( lat=obs_data.get("lat", 0.0), lon=obs_data.get("lon", 0.0), goal_lat=obs_data.get("goal_lat", 0.0), goal_lon=obs_data.get("goal_lon", 0.0), distance_to_goal=obs_data.get("distance_to_goal", 0.0), available_actions=obs_data.get("available_actions", 1), map_screenshot=obs_data.get("map_screenshot", ""), done=payload.get("done", False), reward=payload.get("reward"), metadata=obs_data.get("metadata", {}), ) return StepResult( observation=observation, reward=payload.get("reward"), done=payload.get("done", False), ) def _parse_state(self, payload: Dict) -> State: return State( episode_id=payload.get("episode_id"), step_count=payload.get("step_count", 0), )