| | |
| | |
| | |
| | |
| | |
| |
|
| | """Road Traffic Simulator Environment Client.""" |
| |
|
| | from typing import Dict |
| |
|
| | from openenv.core.client_types import StepResult |
| | from openenv.core.env_server.types import State |
| | from openenv.core import EnvClient |
| |
|
| | from .models import RoadTrafficSimulatorAction, RoadTrafficSimulatorObservation |
| |
|
| |
|
| | class RoadTrafficSimulatorEnv( |
| | EnvClient[RoadTrafficSimulatorAction, RoadTrafficSimulatorObservation] |
| | ): |
| | """ |
| | Client for the Road Traffic Simulator Environment. |
| | |
| | The agent controls a single car navigating from a random start to a random |
| | end node in downtown San Francisco, surrounded by background traffic. |
| | |
| | Action space |
| | ------------ |
| | RoadTrafficSimulatorAction(next_edge_index=<int>) |
| | Choose which outgoing road to take at the current intersection. |
| | Valid range: [0, observation.available_actions - 1]. |
| | |
| | Observation |
| | ----------- |
| | RoadTrafficSimulatorObservation |
| | .lat / .lon β agent GPS position |
| | .goal_lat / .goal_lon β destination GPS position |
| | .distance_to_goal β straight-line metres to goal |
| | .available_actions β number of choices at current node |
| | .map_screenshot β base64 PNG (400Γ400) traffic heatmap |
| | |
| | Example |
| | ------- |
| | >>> with RoadTrafficSimulatorEnv(base_url="http://localhost:8000") as env: |
| | ... result = env.reset() |
| | ... obs = result.observation |
| | ... print(f"Start: ({obs.lat:.4f}, {obs.lon:.4f})") |
| | ... print(f"Goal: ({obs.goal_lat:.4f}, {obs.goal_lon:.4f})") |
| | ... print(f"Dist: {obs.distance_to_goal:.0f} m") |
| | ... |
| | ... # Greedy: always pick edge 0 |
| | ... while not result.done: |
| | ... result = env.step(RoadTrafficSimulatorAction(next_edge_index=0)) |
| | ... print(f"Done after {result.observation.metadata['step']} steps") |
| | """ |
| |
|
| | def _step_payload(self, action: RoadTrafficSimulatorAction) -> Dict: |
| | return {"next_edge_index": action.next_edge_index} |
| |
|
| | def _parse_result(self, payload: Dict) -> StepResult[RoadTrafficSimulatorObservation]: |
| | obs_data = payload.get("observation", {}) |
| | observation = RoadTrafficSimulatorObservation( |
| | lat=obs_data.get("lat", 0.0), |
| | lon=obs_data.get("lon", 0.0), |
| | goal_lat=obs_data.get("goal_lat", 0.0), |
| | goal_lon=obs_data.get("goal_lon", 0.0), |
| | distance_to_goal=obs_data.get("distance_to_goal", 0.0), |
| | available_actions=obs_data.get("available_actions", 1), |
| | map_screenshot=obs_data.get("map_screenshot", ""), |
| | done=payload.get("done", False), |
| | reward=payload.get("reward"), |
| | metadata=obs_data.get("metadata", {}), |
| | ) |
| | return StepResult( |
| | observation=observation, |
| | reward=payload.get("reward"), |
| | done=payload.get("done", False), |
| | ) |
| |
|
| | def _parse_state(self, payload: Dict) -> State: |
| | return State( |
| | episode_id=payload.get("episode_id"), |
| | step_count=payload.get("step_count", 0), |
| | ) |
| |
|