File size: 7,195 Bytes
41a9651
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
"""
OpenEnv server-side environment wrapping the car racing game.

Observation: 64Γ—64 egocentric headlight image + [speed, on_track, sin, cos].
The image is rendered offscreen (no display required).
"""

import math
from typing import Any, Optional

import numpy as np
import pygame

from openenv.core.env_server import Environment
from openenv.core.env_server.types import EnvironmentMetadata

from game.rl_splits import CarEnv
from game.tracks import TrackDef, SCREEN_W, SCREEN_H
from game.oval_racer import draw_headlights
from .models import DriveAction, RaceObservation, RaceState

# Egocentric view parameters
_VIEW_PX = 120   # world-pixel square captured around the car before scaling
_OUT_PX  = 64    # output image size (64Γ—64)
_GRASS   = (45, 110, 45)


class RaceEnvironment(Environment[DriveAction, RaceObservation, RaceState]):
    """
    Wraps game.rl_splits.CarEnv as an OpenEnv Environment.

    Parameters
    ----------
    track       : TrackDef (must already be built, or call track.build() first)
    max_steps   : episode step limit
    laps_target : episode ends after this many laps
    use_image   : if False, image field of RaceObservation will be None
                  (useful for fast debugging / unit tests)
    """

    def __init__(
        self,
        track: TrackDef,
        max_steps: int = 3000,
        laps_target: int = 3,
        use_image: bool = True,
    ):
        super().__init__()  # OpenEnv base init (transform=None, rubric=None)
        self._env = CarEnv(track, max_steps=max_steps, laps_target=laps_target)
        self._use_image = use_image
        self._episode_id: Optional[str] = None

        # Offscreen surfaces reused every step β€” allocated once
        if use_image:
            self._surf    = pygame.Surface((SCREEN_W, SCREEN_H))
            self._canvas  = pygame.Surface((_VIEW_PX, _VIEW_PX))
            self._cropped = pygame.Surface((_VIEW_PX, _VIEW_PX))
            self._scaled  = pygame.Surface((_OUT_PX,  _OUT_PX))

    # ── OpenEnv interface ────────────────────────────────────────────────────

    def reset(
        self,
        seed: Optional[int] = None,
        episode_id: Optional[str] = None,
        **kwargs: Any,
    ) -> RaceObservation:
        self._reset_rubric()  # OpenEnv hook β€” reset rubric state if attached
        self._episode_id = episode_id
        obs = self._env.reset()
        result = self._to_obs(obs, done=False, reward=0.0)
        return self._apply_transform(result)  # OpenEnv hook

    def step(
        self,
        action: DriveAction,
        timeout_s: Optional[float] = None,
        **kwargs: Any,
    ) -> RaceObservation:
        obs, reward, done, info = self._env.step([action.accel, action.steer])
        result = self._to_obs(obs, done=done, reward=reward, metadata=info)
        return self._apply_transform(result)  # OpenEnv hook

    @property
    def state(self) -> RaceState:
        t = self._env.track
        return RaceState(
            episode_id=self._episode_id,
            step_count=self._env._step,
            track_level=t.level,
            track_name=t.name,
            laps=self._env.laps,
        )

    def close(self) -> None:
        """Clean up pygame offscreen surfaces."""
        if self._use_image:
            self._surf = self._canvas = self._cropped = self._scaled = None

    def get_metadata(self) -> EnvironmentMetadata:
        return EnvironmentMetadata(
            name="CurriculumCarRacer",
            description=(
                "Pygame car racing with curriculum learning, egocentric vision, "
                "and 20 procedural tracks. Agent observes a 64Γ—64 headlight image "
                "plus 9 scalar sensors (raycasts, speed, waypoint bearing)."
            ),
            version="0.1.0",
        )

    # ── Image rendering ──────────────────────────────────────────────────────

    def _render_headlight_image(self) -> np.ndarray:
        """
        Render a 64Γ—64 RGB egocentric view centred on the car.

        Pipeline:
          1. Blit track surface onto offscreen canvas
          2. Draw headlight cone
          3. Crop _VIEW_PX Γ— _VIEW_PX square around the car (grass-padded at borders)
          4. Rotate so the car always faces UP (forward = top of image)
          5. Re-crop centre after rotation padding
          6. Scale to _OUT_PX Γ— _OUT_PX
          7. Return as (H, W, 3) uint8 numpy array
        """
        x = self._env._x
        y = self._env._y
        angle = self._env._angle   # degrees; 0=right, 90=down (pygame y-down)

        # 1 & 2 β€” render to offscreen surface
        self._surf.blit(self._env.track.surface, (0, 0))
        draw_headlights(self._surf, x, y, angle)

        # 3 β€” crop around car, padding with grass if near screen edge
        half = _VIEW_PX // 2
        canvas = self._canvas
        canvas.fill(_GRASS)
        src = pygame.Rect(int(x) - half, int(y) - half, _VIEW_PX, _VIEW_PX)
        clipped = src.clip(pygame.Rect(0, 0, SCREEN_W, SCREEN_H))
        if clipped.width > 0 and clipped.height > 0:
            canvas.blit(self._surf, (clipped.x - src.x, clipped.y - src.y), clipped)

        # 4 β€” rotate so forward (angle) maps to UP (270Β° in pygame convention)
        rotated = pygame.transform.rotate(canvas, -(angle - 270))

        # 5 β€” re-crop centre (rotation adds padding)
        rw, rh = rotated.get_size()
        cx2, cy2 = rw // 2, rh // 2
        inner = pygame.Rect(cx2 - half, cy2 - half, _VIEW_PX, _VIEW_PX)
        inner = inner.clip(rotated.get_rect())
        cropped = self._cropped
        cropped.fill(_GRASS)
        cropped.blit(rotated, (inner.x - (cx2 - half), inner.y - (cy2 - half)), inner)

        # 6 β€” scale to output size (in-place into pre-allocated surface)
        pygame.transform.scale(cropped, (_OUT_PX, _OUT_PX), self._scaled)
        scaled = self._scaled

        # 7 β€” pygame surfarray is (W, H, 3); transpose to (H, W, 3)
        return pygame.surfarray.array3d(scaled).transpose(1, 0, 2)

    # ── Internal ─────────────────────────────────────────────────────────────

    def _to_obs(
        self,
        obs: list,
        done: bool,
        reward: float,
        metadata: dict = None,
    ) -> RaceObservation:
        # obs layout: [angular_velocity, speed, ray_left, ray_fl, ray_front, ray_fr, ray_right]
        image = self._render_headlight_image() if self._use_image else None
        return RaceObservation(
            image=image,
            angular_velocity=obs[0],
            speed=obs[1],
            ray_left=obs[2],
            ray_front_left=obs[3],
            ray_front=obs[4],
            ray_front_right=obs[5],
            ray_right=obs[6],
            wp_sin=obs[7],
            wp_cos=obs[8],
            done=done,
            reward=reward,
            metadata=metadata or {},
        )