Spaces:
Sleeping
Sleeping
| """ | |
| OpenEnv server-side environment wrapping the car racing game. | |
| Observation: 64Γ64 egocentric headlight image + [speed, on_track, sin, cos]. | |
| The image is rendered offscreen (no display required). | |
| """ | |
| import math | |
| from typing import Any, Optional | |
| import numpy as np | |
| import pygame | |
| from openenv.core.env_server import Environment | |
| from openenv.core.env_server.types import EnvironmentMetadata | |
| from game.rl_splits import CarEnv | |
| from game.tracks import TrackDef, SCREEN_W, SCREEN_H | |
| from game.oval_racer import draw_headlights | |
| from .models import DriveAction, RaceObservation, RaceState | |
| # Egocentric view parameters | |
| _VIEW_PX = 120 # world-pixel square captured around the car before scaling | |
| _OUT_PX = 64 # output image size (64Γ64) | |
| _GRASS = (45, 110, 45) | |
| class RaceEnvironment(Environment[DriveAction, RaceObservation, RaceState]): | |
| """ | |
| Wraps game.rl_splits.CarEnv as an OpenEnv Environment. | |
| Parameters | |
| ---------- | |
| track : TrackDef (must already be built, or call track.build() first) | |
| max_steps : episode step limit | |
| laps_target : episode ends after this many laps | |
| use_image : if False, image field of RaceObservation will be None | |
| (useful for fast debugging / unit tests) | |
| """ | |
| def __init__( | |
| self, | |
| track: TrackDef, | |
| max_steps: int = 3000, | |
| laps_target: int = 3, | |
| use_image: bool = True, | |
| ): | |
| super().__init__() # OpenEnv base init (transform=None, rubric=None) | |
| self._env = CarEnv(track, max_steps=max_steps, laps_target=laps_target) | |
| self._use_image = use_image | |
| self._episode_id: Optional[str] = None | |
| # Offscreen surfaces reused every step β allocated once | |
| if use_image: | |
| self._surf = pygame.Surface((SCREEN_W, SCREEN_H)) | |
| self._canvas = pygame.Surface((_VIEW_PX, _VIEW_PX)) | |
| self._cropped = pygame.Surface((_VIEW_PX, _VIEW_PX)) | |
| self._scaled = pygame.Surface((_OUT_PX, _OUT_PX)) | |
| # ββ OpenEnv interface ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def reset( | |
| self, | |
| seed: Optional[int] = None, | |
| episode_id: Optional[str] = None, | |
| **kwargs: Any, | |
| ) -> RaceObservation: | |
| self._reset_rubric() # OpenEnv hook β reset rubric state if attached | |
| self._episode_id = episode_id | |
| obs = self._env.reset() | |
| result = self._to_obs(obs, done=False, reward=0.0) | |
| return self._apply_transform(result) # OpenEnv hook | |
| def step( | |
| self, | |
| action: DriveAction, | |
| timeout_s: Optional[float] = None, | |
| **kwargs: Any, | |
| ) -> RaceObservation: | |
| obs, reward, done, info = self._env.step([action.accel, action.steer]) | |
| result = self._to_obs(obs, done=done, reward=reward, metadata=info) | |
| return self._apply_transform(result) # OpenEnv hook | |
| def state(self) -> RaceState: | |
| t = self._env.track | |
| return RaceState( | |
| episode_id=self._episode_id, | |
| step_count=self._env._step, | |
| track_level=t.level, | |
| track_name=t.name, | |
| laps=self._env.laps, | |
| ) | |
| def close(self) -> None: | |
| """Clean up pygame offscreen surfaces.""" | |
| if self._use_image: | |
| self._surf = self._canvas = self._cropped = self._scaled = None | |
| def get_metadata(self) -> EnvironmentMetadata: | |
| return EnvironmentMetadata( | |
| name="CurriculumCarRacer", | |
| description=( | |
| "Pygame car racing with curriculum learning, egocentric vision, " | |
| "and 20 procedural tracks. Agent observes a 64Γ64 headlight image " | |
| "plus 9 scalar sensors (raycasts, speed, waypoint bearing)." | |
| ), | |
| version="0.1.0", | |
| ) | |
| # ββ Image rendering ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _render_headlight_image(self) -> np.ndarray: | |
| """ | |
| Render a 64Γ64 RGB egocentric view centred on the car. | |
| Pipeline: | |
| 1. Blit track surface onto offscreen canvas | |
| 2. Draw headlight cone | |
| 3. Crop _VIEW_PX Γ _VIEW_PX square around the car (grass-padded at borders) | |
| 4. Rotate so the car always faces UP (forward = top of image) | |
| 5. Re-crop centre after rotation padding | |
| 6. Scale to _OUT_PX Γ _OUT_PX | |
| 7. Return as (H, W, 3) uint8 numpy array | |
| """ | |
| x = self._env._x | |
| y = self._env._y | |
| angle = self._env._angle # degrees; 0=right, 90=down (pygame y-down) | |
| # 1 & 2 β render to offscreen surface | |
| self._surf.blit(self._env.track.surface, (0, 0)) | |
| draw_headlights(self._surf, x, y, angle) | |
| # 3 β crop around car, padding with grass if near screen edge | |
| half = _VIEW_PX // 2 | |
| canvas = self._canvas | |
| canvas.fill(_GRASS) | |
| src = pygame.Rect(int(x) - half, int(y) - half, _VIEW_PX, _VIEW_PX) | |
| clipped = src.clip(pygame.Rect(0, 0, SCREEN_W, SCREEN_H)) | |
| if clipped.width > 0 and clipped.height > 0: | |
| canvas.blit(self._surf, (clipped.x - src.x, clipped.y - src.y), clipped) | |
| # 4 β rotate so forward (angle) maps to UP (270Β° in pygame convention) | |
| rotated = pygame.transform.rotate(canvas, -(angle - 270)) | |
| # 5 β re-crop centre (rotation adds padding) | |
| rw, rh = rotated.get_size() | |
| cx2, cy2 = rw // 2, rh // 2 | |
| inner = pygame.Rect(cx2 - half, cy2 - half, _VIEW_PX, _VIEW_PX) | |
| inner = inner.clip(rotated.get_rect()) | |
| cropped = self._cropped | |
| cropped.fill(_GRASS) | |
| cropped.blit(rotated, (inner.x - (cx2 - half), inner.y - (cy2 - half)), inner) | |
| # 6 β scale to output size (in-place into pre-allocated surface) | |
| pygame.transform.scale(cropped, (_OUT_PX, _OUT_PX), self._scaled) | |
| scaled = self._scaled | |
| # 7 β pygame surfarray is (W, H, 3); transpose to (H, W, 3) | |
| return pygame.surfarray.array3d(scaled).transpose(1, 0, 2) | |
| # ββ Internal βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _to_obs( | |
| self, | |
| obs: list, | |
| done: bool, | |
| reward: float, | |
| metadata: dict = None, | |
| ) -> RaceObservation: | |
| # obs layout: [angular_velocity, speed, ray_left, ray_fl, ray_front, ray_fr, ray_right] | |
| image = self._render_headlight_image() if self._use_image else None | |
| return RaceObservation( | |
| image=image, | |
| angular_velocity=obs[0], | |
| speed=obs[1], | |
| ray_left=obs[2], | |
| ray_front_left=obs[3], | |
| ray_front=obs[4], | |
| ray_front_right=obs[5], | |
| ray_right=obs[6], | |
| wp_sin=obs[7], | |
| wp_cos=obs[8], | |
| done=done, | |
| reward=reward, | |
| metadata=metadata or {}, | |
| ) | |