Car-Racing-Agent / env /environment.py
nirmalpratheep's picture
Upload 11 files
41a9651 verified
"""
OpenEnv server-side environment wrapping the car racing game.
Observation: 64Γ—64 egocentric headlight image + [speed, on_track, sin, cos].
The image is rendered offscreen (no display required).
"""
import math
from typing import Any, Optional
import numpy as np
import pygame
from openenv.core.env_server import Environment
from openenv.core.env_server.types import EnvironmentMetadata
from game.rl_splits import CarEnv
from game.tracks import TrackDef, SCREEN_W, SCREEN_H
from game.oval_racer import draw_headlights
from .models import DriveAction, RaceObservation, RaceState
# Egocentric view parameters
_VIEW_PX = 120 # world-pixel square captured around the car before scaling
_OUT_PX = 64 # output image size (64Γ—64)
_GRASS = (45, 110, 45)
class RaceEnvironment(Environment[DriveAction, RaceObservation, RaceState]):
"""
Wraps game.rl_splits.CarEnv as an OpenEnv Environment.
Parameters
----------
track : TrackDef (must already be built, or call track.build() first)
max_steps : episode step limit
laps_target : episode ends after this many laps
use_image : if False, image field of RaceObservation will be None
(useful for fast debugging / unit tests)
"""
def __init__(
self,
track: TrackDef,
max_steps: int = 3000,
laps_target: int = 3,
use_image: bool = True,
):
super().__init__() # OpenEnv base init (transform=None, rubric=None)
self._env = CarEnv(track, max_steps=max_steps, laps_target=laps_target)
self._use_image = use_image
self._episode_id: Optional[str] = None
# Offscreen surfaces reused every step β€” allocated once
if use_image:
self._surf = pygame.Surface((SCREEN_W, SCREEN_H))
self._canvas = pygame.Surface((_VIEW_PX, _VIEW_PX))
self._cropped = pygame.Surface((_VIEW_PX, _VIEW_PX))
self._scaled = pygame.Surface((_OUT_PX, _OUT_PX))
# ── OpenEnv interface ────────────────────────────────────────────────────
def reset(
self,
seed: Optional[int] = None,
episode_id: Optional[str] = None,
**kwargs: Any,
) -> RaceObservation:
self._reset_rubric() # OpenEnv hook β€” reset rubric state if attached
self._episode_id = episode_id
obs = self._env.reset()
result = self._to_obs(obs, done=False, reward=0.0)
return self._apply_transform(result) # OpenEnv hook
def step(
self,
action: DriveAction,
timeout_s: Optional[float] = None,
**kwargs: Any,
) -> RaceObservation:
obs, reward, done, info = self._env.step([action.accel, action.steer])
result = self._to_obs(obs, done=done, reward=reward, metadata=info)
return self._apply_transform(result) # OpenEnv hook
@property
def state(self) -> RaceState:
t = self._env.track
return RaceState(
episode_id=self._episode_id,
step_count=self._env._step,
track_level=t.level,
track_name=t.name,
laps=self._env.laps,
)
def close(self) -> None:
"""Clean up pygame offscreen surfaces."""
if self._use_image:
self._surf = self._canvas = self._cropped = self._scaled = None
def get_metadata(self) -> EnvironmentMetadata:
return EnvironmentMetadata(
name="CurriculumCarRacer",
description=(
"Pygame car racing with curriculum learning, egocentric vision, "
"and 20 procedural tracks. Agent observes a 64Γ—64 headlight image "
"plus 9 scalar sensors (raycasts, speed, waypoint bearing)."
),
version="0.1.0",
)
# ── Image rendering ──────────────────────────────────────────────────────
def _render_headlight_image(self) -> np.ndarray:
"""
Render a 64Γ—64 RGB egocentric view centred on the car.
Pipeline:
1. Blit track surface onto offscreen canvas
2. Draw headlight cone
3. Crop _VIEW_PX Γ— _VIEW_PX square around the car (grass-padded at borders)
4. Rotate so the car always faces UP (forward = top of image)
5. Re-crop centre after rotation padding
6. Scale to _OUT_PX Γ— _OUT_PX
7. Return as (H, W, 3) uint8 numpy array
"""
x = self._env._x
y = self._env._y
angle = self._env._angle # degrees; 0=right, 90=down (pygame y-down)
# 1 & 2 β€” render to offscreen surface
self._surf.blit(self._env.track.surface, (0, 0))
draw_headlights(self._surf, x, y, angle)
# 3 β€” crop around car, padding with grass if near screen edge
half = _VIEW_PX // 2
canvas = self._canvas
canvas.fill(_GRASS)
src = pygame.Rect(int(x) - half, int(y) - half, _VIEW_PX, _VIEW_PX)
clipped = src.clip(pygame.Rect(0, 0, SCREEN_W, SCREEN_H))
if clipped.width > 0 and clipped.height > 0:
canvas.blit(self._surf, (clipped.x - src.x, clipped.y - src.y), clipped)
# 4 β€” rotate so forward (angle) maps to UP (270Β° in pygame convention)
rotated = pygame.transform.rotate(canvas, -(angle - 270))
# 5 β€” re-crop centre (rotation adds padding)
rw, rh = rotated.get_size()
cx2, cy2 = rw // 2, rh // 2
inner = pygame.Rect(cx2 - half, cy2 - half, _VIEW_PX, _VIEW_PX)
inner = inner.clip(rotated.get_rect())
cropped = self._cropped
cropped.fill(_GRASS)
cropped.blit(rotated, (inner.x - (cx2 - half), inner.y - (cy2 - half)), inner)
# 6 β€” scale to output size (in-place into pre-allocated surface)
pygame.transform.scale(cropped, (_OUT_PX, _OUT_PX), self._scaled)
scaled = self._scaled
# 7 β€” pygame surfarray is (W, H, 3); transpose to (H, W, 3)
return pygame.surfarray.array3d(scaled).transpose(1, 0, 2)
# ── Internal ─────────────────────────────────────────────────────────────
def _to_obs(
self,
obs: list,
done: bool,
reward: float,
metadata: dict = None,
) -> RaceObservation:
# obs layout: [angular_velocity, speed, ray_left, ray_fl, ray_front, ray_fr, ray_right]
image = self._render_headlight_image() if self._use_image else None
return RaceObservation(
image=image,
angular_velocity=obs[0],
speed=obs[1],
ray_left=obs[2],
ray_front_left=obs[3],
ray_front=obs[4],
ray_front_right=obs[5],
ray_right=obs[6],
wp_sin=obs[7],
wp_cos=obs[8],
done=done,
reward=reward,
metadata=metadata or {},
)