File size: 3,272 Bytes
15c7b0c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

"""Road Traffic Simulator Environment Client."""

from typing import Dict

from openenv.core.client_types import StepResult
from openenv.core.env_server.types import State
from openenv.core import EnvClient

from .models import RoadTrafficSimulatorAction, RoadTrafficSimulatorObservation


class RoadTrafficSimulatorEnv(
    EnvClient[RoadTrafficSimulatorAction, RoadTrafficSimulatorObservation]
):
    """
    Client for the Road Traffic Simulator Environment.

    The agent controls a single car navigating from a random start to a random
    end node in downtown San Francisco, surrounded by background traffic.

    Action space
    ------------
    RoadTrafficSimulatorAction(next_edge_index=<int>)
        Choose which outgoing road to take at the current intersection.
        Valid range: [0, observation.available_actions - 1].

    Observation
    -----------
    RoadTrafficSimulatorObservation
        .lat / .lon              – agent GPS position
        .goal_lat / .goal_lon    – destination GPS position
        .distance_to_goal        – straight-line metres to goal
        .available_actions       – number of choices at current node
        .map_screenshot          – base64 PNG (400×400) traffic heatmap

    Example
    -------
    >>> with RoadTrafficSimulatorEnv(base_url="http://localhost:8000") as env:
    ...     result = env.reset()
    ...     obs = result.observation
    ...     print(f"Start: ({obs.lat:.4f}, {obs.lon:.4f})")
    ...     print(f"Goal:  ({obs.goal_lat:.4f}, {obs.goal_lon:.4f})")
    ...     print(f"Dist:  {obs.distance_to_goal:.0f} m")
    ...
    ...     # Greedy: always pick edge 0
    ...     while not result.done:
    ...         result = env.step(RoadTrafficSimulatorAction(next_edge_index=0))
    ...     print(f"Done after {result.observation.metadata['step']} steps")
    """

    def _step_payload(self, action: RoadTrafficSimulatorAction) -> Dict:
        return {"next_edge_index": action.next_edge_index}

    def _parse_result(self, payload: Dict) -> StepResult[RoadTrafficSimulatorObservation]:
        obs_data = payload.get("observation", {})
        observation = RoadTrafficSimulatorObservation(
            lat=obs_data.get("lat", 0.0),
            lon=obs_data.get("lon", 0.0),
            goal_lat=obs_data.get("goal_lat", 0.0),
            goal_lon=obs_data.get("goal_lon", 0.0),
            distance_to_goal=obs_data.get("distance_to_goal", 0.0),
            available_actions=obs_data.get("available_actions", 1),
            map_screenshot=obs_data.get("map_screenshot", ""),
            done=payload.get("done", False),
            reward=payload.get("reward"),
            metadata=obs_data.get("metadata", {}),
        )
        return StepResult(
            observation=observation,
            reward=payload.get("reward"),
            done=payload.get("done", False),
        )

    def _parse_state(self, payload: Dict) -> State:
        return State(
            episode_id=payload.get("episode_id"),
            step_count=payload.get("step_count", 0),
        )