Spaces:
Sleeping
Sleeping
File size: 7,195 Bytes
41a9651 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 | """
OpenEnv server-side environment wrapping the car racing game.
Observation: 64Γ64 egocentric headlight image + [speed, on_track, sin, cos].
The image is rendered offscreen (no display required).
"""
import math
from typing import Any, Optional
import numpy as np
import pygame
from openenv.core.env_server import Environment
from openenv.core.env_server.types import EnvironmentMetadata
from game.rl_splits import CarEnv
from game.tracks import TrackDef, SCREEN_W, SCREEN_H
from game.oval_racer import draw_headlights
from .models import DriveAction, RaceObservation, RaceState
# Egocentric view parameters
_VIEW_PX = 120 # world-pixel square captured around the car before scaling
_OUT_PX = 64 # output image size (64Γ64)
_GRASS = (45, 110, 45)
class RaceEnvironment(Environment[DriveAction, RaceObservation, RaceState]):
"""
Wraps game.rl_splits.CarEnv as an OpenEnv Environment.
Parameters
----------
track : TrackDef (must already be built, or call track.build() first)
max_steps : episode step limit
laps_target : episode ends after this many laps
use_image : if False, image field of RaceObservation will be None
(useful for fast debugging / unit tests)
"""
def __init__(
self,
track: TrackDef,
max_steps: int = 3000,
laps_target: int = 3,
use_image: bool = True,
):
super().__init__() # OpenEnv base init (transform=None, rubric=None)
self._env = CarEnv(track, max_steps=max_steps, laps_target=laps_target)
self._use_image = use_image
self._episode_id: Optional[str] = None
# Offscreen surfaces reused every step β allocated once
if use_image:
self._surf = pygame.Surface((SCREEN_W, SCREEN_H))
self._canvas = pygame.Surface((_VIEW_PX, _VIEW_PX))
self._cropped = pygame.Surface((_VIEW_PX, _VIEW_PX))
self._scaled = pygame.Surface((_OUT_PX, _OUT_PX))
# ββ OpenEnv interface ββββββββββββββββββββββββββββββββββββββββββββββββββββ
def reset(
self,
seed: Optional[int] = None,
episode_id: Optional[str] = None,
**kwargs: Any,
) -> RaceObservation:
self._reset_rubric() # OpenEnv hook β reset rubric state if attached
self._episode_id = episode_id
obs = self._env.reset()
result = self._to_obs(obs, done=False, reward=0.0)
return self._apply_transform(result) # OpenEnv hook
def step(
self,
action: DriveAction,
timeout_s: Optional[float] = None,
**kwargs: Any,
) -> RaceObservation:
obs, reward, done, info = self._env.step([action.accel, action.steer])
result = self._to_obs(obs, done=done, reward=reward, metadata=info)
return self._apply_transform(result) # OpenEnv hook
@property
def state(self) -> RaceState:
t = self._env.track
return RaceState(
episode_id=self._episode_id,
step_count=self._env._step,
track_level=t.level,
track_name=t.name,
laps=self._env.laps,
)
def close(self) -> None:
"""Clean up pygame offscreen surfaces."""
if self._use_image:
self._surf = self._canvas = self._cropped = self._scaled = None
def get_metadata(self) -> EnvironmentMetadata:
return EnvironmentMetadata(
name="CurriculumCarRacer",
description=(
"Pygame car racing with curriculum learning, egocentric vision, "
"and 20 procedural tracks. Agent observes a 64Γ64 headlight image "
"plus 9 scalar sensors (raycasts, speed, waypoint bearing)."
),
version="0.1.0",
)
# ββ Image rendering ββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _render_headlight_image(self) -> np.ndarray:
"""
Render a 64Γ64 RGB egocentric view centred on the car.
Pipeline:
1. Blit track surface onto offscreen canvas
2. Draw headlight cone
3. Crop _VIEW_PX Γ _VIEW_PX square around the car (grass-padded at borders)
4. Rotate so the car always faces UP (forward = top of image)
5. Re-crop centre after rotation padding
6. Scale to _OUT_PX Γ _OUT_PX
7. Return as (H, W, 3) uint8 numpy array
"""
x = self._env._x
y = self._env._y
angle = self._env._angle # degrees; 0=right, 90=down (pygame y-down)
# 1 & 2 β render to offscreen surface
self._surf.blit(self._env.track.surface, (0, 0))
draw_headlights(self._surf, x, y, angle)
# 3 β crop around car, padding with grass if near screen edge
half = _VIEW_PX // 2
canvas = self._canvas
canvas.fill(_GRASS)
src = pygame.Rect(int(x) - half, int(y) - half, _VIEW_PX, _VIEW_PX)
clipped = src.clip(pygame.Rect(0, 0, SCREEN_W, SCREEN_H))
if clipped.width > 0 and clipped.height > 0:
canvas.blit(self._surf, (clipped.x - src.x, clipped.y - src.y), clipped)
# 4 β rotate so forward (angle) maps to UP (270Β° in pygame convention)
rotated = pygame.transform.rotate(canvas, -(angle - 270))
# 5 β re-crop centre (rotation adds padding)
rw, rh = rotated.get_size()
cx2, cy2 = rw // 2, rh // 2
inner = pygame.Rect(cx2 - half, cy2 - half, _VIEW_PX, _VIEW_PX)
inner = inner.clip(rotated.get_rect())
cropped = self._cropped
cropped.fill(_GRASS)
cropped.blit(rotated, (inner.x - (cx2 - half), inner.y - (cy2 - half)), inner)
# 6 β scale to output size (in-place into pre-allocated surface)
pygame.transform.scale(cropped, (_OUT_PX, _OUT_PX), self._scaled)
scaled = self._scaled
# 7 β pygame surfarray is (W, H, 3); transpose to (H, W, 3)
return pygame.surfarray.array3d(scaled).transpose(1, 0, 2)
# ββ Internal βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _to_obs(
self,
obs: list,
done: bool,
reward: float,
metadata: dict = None,
) -> RaceObservation:
# obs layout: [angular_velocity, speed, ray_left, ray_fl, ray_front, ray_fr, ray_right]
image = self._render_headlight_image() if self._use_image else None
return RaceObservation(
image=image,
angular_velocity=obs[0],
speed=obs[1],
ray_left=obs[2],
ray_front_left=obs[3],
ray_front=obs[4],
ray_front_right=obs[5],
ray_right=obs[6],
wp_sin=obs[7],
wp_cos=obs[8],
done=done,
reward=reward,
metadata=metadata or {},
)
|