""" OpenEnv server-side environment wrapping the car racing game. Observation: 64×64 egocentric headlight image + [speed, on_track, sin, cos]. The image is rendered offscreen (no display required). """ import math from typing import Any, Optional import numpy as np import pygame from openenv.core.env_server import Environment from openenv.core.env_server.types import EnvironmentMetadata from game.rl_splits import CarEnv from game.tracks import TrackDef, SCREEN_W, SCREEN_H from game.oval_racer import draw_headlights from .models import DriveAction, RaceObservation, RaceState # Egocentric view parameters _VIEW_PX = 120 # world-pixel square captured around the car before scaling _OUT_PX = 64 # output image size (64×64) _GRASS = (45, 110, 45) class RaceEnvironment(Environment[DriveAction, RaceObservation, RaceState]): """ Wraps game.rl_splits.CarEnv as an OpenEnv Environment. Parameters ---------- track : TrackDef (must already be built, or call track.build() first) max_steps : episode step limit laps_target : episode ends after this many laps use_image : if False, image field of RaceObservation will be None (useful for fast debugging / unit tests) """ def __init__( self, track: TrackDef, max_steps: int = 3000, laps_target: int = 3, use_image: bool = True, ): super().__init__() # OpenEnv base init (transform=None, rubric=None) self._env = CarEnv(track, max_steps=max_steps, laps_target=laps_target) self._use_image = use_image self._episode_id: Optional[str] = None # Offscreen surfaces reused every step — allocated once if use_image: self._surf = pygame.Surface((SCREEN_W, SCREEN_H)) self._canvas = pygame.Surface((_VIEW_PX, _VIEW_PX)) self._cropped = pygame.Surface((_VIEW_PX, _VIEW_PX)) self._scaled = pygame.Surface((_OUT_PX, _OUT_PX)) # ── OpenEnv interface ──────────────────────────────────────────────────── def reset( self, seed: Optional[int] = None, episode_id: Optional[str] = None, **kwargs: Any, ) -> RaceObservation: self._reset_rubric() # OpenEnv hook — reset rubric state if attached self._episode_id = episode_id obs = self._env.reset() result = self._to_obs(obs, done=False, reward=0.0) return self._apply_transform(result) # OpenEnv hook def step( self, action: DriveAction, timeout_s: Optional[float] = None, **kwargs: Any, ) -> RaceObservation: obs, reward, done, info = self._env.step([action.accel, action.steer]) result = self._to_obs(obs, done=done, reward=reward, metadata=info) return self._apply_transform(result) # OpenEnv hook @property def state(self) -> RaceState: t = self._env.track return RaceState( episode_id=self._episode_id, step_count=self._env._step, track_level=t.level, track_name=t.name, laps=self._env.laps, ) def close(self) -> None: """Clean up pygame offscreen surfaces.""" if self._use_image: self._surf = self._canvas = self._cropped = self._scaled = None def get_metadata(self) -> EnvironmentMetadata: return EnvironmentMetadata( name="CurriculumCarRacer", description=( "Pygame car racing with curriculum learning, egocentric vision, " "and 20 procedural tracks. Agent observes a 64×64 headlight image " "plus 9 scalar sensors (raycasts, speed, waypoint bearing)." ), version="0.1.0", ) # ── Image rendering ────────────────────────────────────────────────────── def _render_headlight_image(self) -> np.ndarray: """ Render a 64×64 RGB egocentric view centred on the car. Pipeline: 1. Blit track surface onto offscreen canvas 2. Draw headlight cone 3. Crop _VIEW_PX × _VIEW_PX square around the car (grass-padded at borders) 4. Rotate so the car always faces UP (forward = top of image) 5. Re-crop centre after rotation padding 6. Scale to _OUT_PX × _OUT_PX 7. Return as (H, W, 3) uint8 numpy array """ x = self._env._x y = self._env._y angle = self._env._angle # degrees; 0=right, 90=down (pygame y-down) # 1 & 2 — render to offscreen surface self._surf.blit(self._env.track.surface, (0, 0)) draw_headlights(self._surf, x, y, angle) # 3 — crop around car, padding with grass if near screen edge half = _VIEW_PX // 2 canvas = self._canvas canvas.fill(_GRASS) src = pygame.Rect(int(x) - half, int(y) - half, _VIEW_PX, _VIEW_PX) clipped = src.clip(pygame.Rect(0, 0, SCREEN_W, SCREEN_H)) if clipped.width > 0 and clipped.height > 0: canvas.blit(self._surf, (clipped.x - src.x, clipped.y - src.y), clipped) # 4 — rotate so forward (angle) maps to UP (270° in pygame convention) rotated = pygame.transform.rotate(canvas, -(angle - 270)) # 5 — re-crop centre (rotation adds padding) rw, rh = rotated.get_size() cx2, cy2 = rw // 2, rh // 2 inner = pygame.Rect(cx2 - half, cy2 - half, _VIEW_PX, _VIEW_PX) inner = inner.clip(rotated.get_rect()) cropped = self._cropped cropped.fill(_GRASS) cropped.blit(rotated, (inner.x - (cx2 - half), inner.y - (cy2 - half)), inner) # 6 — scale to output size (in-place into pre-allocated surface) pygame.transform.scale(cropped, (_OUT_PX, _OUT_PX), self._scaled) scaled = self._scaled # 7 — pygame surfarray is (W, H, 3); transpose to (H, W, 3) return pygame.surfarray.array3d(scaled).transpose(1, 0, 2) # ── Internal ───────────────────────────────────────────────────────────── def _to_obs( self, obs: list, done: bool, reward: float, metadata: dict = None, ) -> RaceObservation: # obs layout: [angular_velocity, speed, ray_left, ray_fl, ray_front, ray_fr, ray_right] image = self._render_headlight_image() if self._use_image else None return RaceObservation( image=image, angular_velocity=obs[0], speed=obs[1], ray_left=obs[2], ray_front_left=obs[3], ray_front=obs[4], ray_front_right=obs[5], ray_right=obs[6], wp_sin=obs[7], wp_cos=obs[8], done=done, reward=reward, metadata=metadata or {}, )