ykirpichev's picture
Upload folder using huggingface_hub
15c7b0c verified
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
"""Road Traffic Simulator Environment Client."""
from typing import Dict
from openenv.core.client_types import StepResult
from openenv.core.env_server.types import State
from openenv.core import EnvClient
from .models import RoadTrafficSimulatorAction, RoadTrafficSimulatorObservation
class RoadTrafficSimulatorEnv(
EnvClient[RoadTrafficSimulatorAction, RoadTrafficSimulatorObservation]
):
"""
Client for the Road Traffic Simulator Environment.
The agent controls a single car navigating from a random start to a random
end node in downtown San Francisco, surrounded by background traffic.
Action space
------------
RoadTrafficSimulatorAction(next_edge_index=<int>)
Choose which outgoing road to take at the current intersection.
Valid range: [0, observation.available_actions - 1].
Observation
-----------
RoadTrafficSimulatorObservation
.lat / .lon – agent GPS position
.goal_lat / .goal_lon – destination GPS position
.distance_to_goal – straight-line metres to goal
.available_actions – number of choices at current node
.map_screenshot – base64 PNG (400Γ—400) traffic heatmap
Example
-------
>>> with RoadTrafficSimulatorEnv(base_url="http://localhost:8000") as env:
... result = env.reset()
... obs = result.observation
... print(f"Start: ({obs.lat:.4f}, {obs.lon:.4f})")
... print(f"Goal: ({obs.goal_lat:.4f}, {obs.goal_lon:.4f})")
... print(f"Dist: {obs.distance_to_goal:.0f} m")
...
... # Greedy: always pick edge 0
... while not result.done:
... result = env.step(RoadTrafficSimulatorAction(next_edge_index=0))
... print(f"Done after {result.observation.metadata['step']} steps")
"""
def _step_payload(self, action: RoadTrafficSimulatorAction) -> Dict:
return {"next_edge_index": action.next_edge_index}
def _parse_result(self, payload: Dict) -> StepResult[RoadTrafficSimulatorObservation]:
obs_data = payload.get("observation", {})
observation = RoadTrafficSimulatorObservation(
lat=obs_data.get("lat", 0.0),
lon=obs_data.get("lon", 0.0),
goal_lat=obs_data.get("goal_lat", 0.0),
goal_lon=obs_data.get("goal_lon", 0.0),
distance_to_goal=obs_data.get("distance_to_goal", 0.0),
available_actions=obs_data.get("available_actions", 1),
map_screenshot=obs_data.get("map_screenshot", ""),
done=payload.get("done", False),
reward=payload.get("reward"),
metadata=obs_data.get("metadata", {}),
)
return StepResult(
observation=observation,
reward=payload.get("reward"),
done=payload.get("done", False),
)
def _parse_state(self, payload: Dict) -> State:
return State(
episode_id=payload.get("episode_id"),
step_count=payload.get("step_count", 0),
)