Spaces:
Sleeping
Sleeping
| """ | |
| rl_splits.py β Curriculum tracks for RL training. | |
| 10 tracks across 3 difficulty groups (all used for training): | |
| Group A β Easy ovals : tracks 1-4 | |
| Group B β Rectangular shapes : tracks 5-8 | |
| Group C β Hairpins & chicanes: tracks 9-10 | |
| TRAIN (10) : [1,2,3,4, 5,6,7,8, 9,10] β curriculum progression easyβhard | |
| VAL (0) : [] | |
| TEST (0) : [] | |
| Training stops when the agent passes greedy eval on all 10 tracks simultaneously. | |
| Usage | |
| ----- | |
| from game.rl_splits import TRAIN, make_env, CurriculumSampler | |
| sampler = CurriculumSampler(TRAIN) | |
| while True: | |
| env = make_env(sampler.sample()) | |
| reward = run_episode(env, agent) | |
| sampler.record(reward) | |
| if sampler.should_advance(): | |
| sampler.advance() | |
| """ | |
| import os | |
| import math | |
| import random | |
| import statistics | |
| from collections import deque | |
| import numpy as np | |
| # ββ Lazy pygame initialisation (avoids import-time display requirement) ββββββ | |
| _pygame_ready = False | |
| def _ensure_pygame(): | |
| global _pygame_ready | |
| if not _pygame_ready: | |
| import pygame | |
| if not pygame.get_init(): | |
| pygame.init() | |
| _pygame_ready = True | |
| # ββ Track splits βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _get_splits(): | |
| from .tracks import TRACKS # TRACKS is 0-indexed, levels are 1-indexed | |
| by_level = {t.level: t for t in TRACKS} | |
| train_levels = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] # all 10, easyβhard | |
| val_levels = [] | |
| test_levels = [] | |
| train = [by_level[l] for l in train_levels] | |
| val = [by_level[l] for l in val_levels ] | |
| test = [by_level[l] for l in test_levels ] | |
| return train, val, test | |
| TRAIN, VAL, TEST = _get_splits() | |
| # Convenience: all tracks in curriculum order (for inspection / logging) | |
| ALL_ORDERED = sorted(TRAIN + VAL + TEST, key=lambda t: t.level) | |
| # ββ Difficulty metadata βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| DIFFICULTY = { | |
| "A-easy": {"tracks": [1, 2, 3, 4], "description": "Full ovals"}, | |
| "B-medium-easy": {"tracks": [5, 6, 7, 8], "description": "Rectangular shapes"}, | |
| "C-medium-hard": {"tracks": [9, 10], "description": "Hairpins & chicanes"}, | |
| } | |
| def difficulty_of(track): | |
| """Return the difficulty tier label for a track.""" | |
| for tier, info in DIFFICULTY.items(): | |
| if track.level in info["tracks"]: | |
| return tier | |
| return "unknown" | |
| # ββ Environment factory βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class CarEnv: | |
| """ | |
| Minimal gym-style wrapper around TrackDef + Car physics. | |
| Observation (7 floats): | |
| [angular_velocity, speed/max_speed, rayΓ5] | |
| All from real sensors: gyroscope, speedometer, 5 proximity rays, camera image. | |
| No map or waypoint information in the observation. | |
| Action (2 floats, each clamped to [-1, 1]): | |
| [accel, steer] | |
| accel > 0 β accelerate, < 0 β brake | |
| steer > 0 β right, < 0 β left | |
| Reward: | |
| Per step | |
| - 0.1 base step penalty (efficiency pressure) | |
| + (1+wp_cos)/2 * 2.0 dense heading alignment reward every step | |
| (β +2 when aimed straight, 0 when perpendicular) | |
| + (1+wp_cos)/2 * 20 bonus heading reward when advancing waypoints | |
| - 10 distance penalty when moving backward through | |
| waypoints (moving away from target) | |
| Terminal (episode ends immediately) | |
| - 300 off track β done (high penalty to strongly deter leaving track) | |
| - 300 car leaves screen bounds | |
| + 200 lap completed (target reached) | |
| Complexity (track.complexity) scales the curriculum threshold only. | |
| Done conditions: | |
| * car leaves screen | |
| * max_steps exceeded | |
| * laps_target laps completed | |
| """ | |
| # Physics (same as curriculum_game.py) | |
| ACCEL = 0.13 | |
| BRAKE_DECEL = 0.22 | |
| FRICTION = 0.038 | |
| STEER_DEG = 2.7 | |
| # Dense progress reward: one full lap of forward waypoint advances β +15 total. | |
| PROGRESS_SCALE = 15.0 | |
| def __init__(self, track, max_steps=3000, laps_target=3): | |
| _ensure_pygame() | |
| self.track = track | |
| self.max_steps = max_steps | |
| self.laps_target = laps_target | |
| track.build() | |
| # Pre-compute waypoint arrays (numpy) for fast nearest-wp lookup. | |
| # Waypoints are centreline points generated by TrackDef.build(). | |
| # Used only for the internal progress reward β NOT exposed in observations. | |
| wps = track.waypoints | |
| self._n_wps = len(wps) | |
| self._wp_x = np.array([w[0] for w in wps], dtype=np.float32) | |
| self._wp_y = np.array([w[1] for w in wps], dtype=np.float32) | |
| self._progress_per_wp = self.PROGRESS_SCALE / self._n_wps | |
| self._x = self._y = self._angle = self._speed = 0.0 | |
| self._prev_side = 0.0 | |
| self._gate_armed = False # True once car is 50px past start line | |
| self._laps = 0 | |
| self._step = 0 | |
| self._angle_delta = 0.0 | |
| self._wp_idx = 0 # nearest centreline waypoint index | |
| self._lap_dist = 0.0 | |
| self._lap_prev_x = 0.0 | |
| self._lap_prev_y = 0.0 | |
| self._crash_count = 0 | |
| # ββ Public API ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def obs_size(self): | |
| # angular_velocity, speed, rayΓ5 | |
| return 7 | |
| def action_size(self): | |
| return 2 | |
| def laps(self): | |
| return self._laps | |
| def reset(self): | |
| self._x = float(self.track.start_pos[0]) | |
| self._y = float(self.track.start_pos[1]) | |
| self._angle = float(self.track.start_angle) | |
| self._speed = self.track.max_speed * 0.2 | |
| self._angle_delta = 0.0 | |
| self._prev_side = self.track.gate_side(self._x, self._y) | |
| self._gate_armed = False | |
| self._laps = 0 | |
| self._step = 0 | |
| self._wp_idx = self._nearest_wp(self._x, self._y) | |
| self._lap_dist = 0.0 | |
| self._lap_prev_x = self._x | |
| self._lap_prev_y = self._y | |
| self._crash_count = 0 | |
| return self._obs() | |
| def step(self, action): | |
| accel = float(max(-1.0, min(1.0, action[0]))) | |
| steer = float(max(-1.0, min(1.0, action[1]))) | |
| prev_angle = self._angle | |
| self._update_physics(accel, steer) | |
| self._angle_delta = self._angle - prev_angle | |
| self._step += 1 | |
| on = self.track.on_track(self._x, self._y) | |
| curr_side = self.track.gate_side(self._x, self._y) | |
| # Lap distance accumulation | |
| dx = self._x - self._lap_prev_x | |
| dy = self._y - self._lap_prev_y | |
| self._lap_dist += math.hypot(dx, dy) | |
| self._lap_prev_x = self._x | |
| self._lap_prev_y = self._y | |
| # ββ Reward βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # | |
| # Principle: reward what we actually want β going forward along the track. | |
| # | |
| # reward = -0.005 step penalty | |
| # crash β -15, done off-track penalty | |
| # forward speed speed_norm * 0.10 (up to +0.1/step) | |
| # reversing speed_norm * 0.10 (negative, up to -0.04/step) | |
| # waypoint advance (forward) +0.25 per waypoint crossed | |
| # waypoint regress (backward) -0.25 per waypoint lost | |
| # lap completed +10 | |
| # | |
| # All constants are 1/20 of the original scale to keep value targets | |
| # in [-15, +10] range. This prevents value_loss explosion and allows | |
| # log_std (policy exploration) to receive meaningful gradients. | |
| # | |
| reward = -0.005 | |
| obs_now = self._obs() | |
| # Off-track: terminal penalty | |
| if not on: | |
| self._crash_count += 1 | |
| return obs_now, -15.0, True, { | |
| "lap": self._laps, | |
| "on_track": False, | |
| "step": self._step, | |
| "crashes": self._crash_count, | |
| "lap_dist": self._lap_dist, | |
| "out_of_bounds": False, | |
| } | |
| # Forward speed reward β primary learning signal. | |
| # Positive when moving forward, negative when reversing. | |
| # This alone is enough to stop the spinning: spinning gives speed β 0 β reward β 0. | |
| speed_norm = self._speed / self.track.max_speed # [-0.4, 1.0] | |
| reward += speed_norm * 0.10 | |
| # Waypoint progress: flat bonus/penalty per waypoint crossed. | |
| # Drives the policy to steer toward the track rather than drive in a | |
| # straight line off it β steering toward wp is the only way to advance. | |
| new_wp = self._nearest_wp(self._x, self._y) | |
| diff = new_wp - self._wp_idx | |
| n = self._n_wps | |
| if diff > n // 2: | |
| diff -= n | |
| elif diff < -n // 2: | |
| diff += n | |
| if diff > 0: | |
| reward += 0.25 * diff # +0.25 per waypoint advanced forward | |
| elif diff < 0: | |
| reward -= 0.25 * abs(diff) # -0.25 per waypoint lost going backward | |
| self._wp_idx = new_wp | |
| # Lap completion β two-phase arm/trigger to reliably detect crossings. | |
| # Phase 1 (arm): car must travel 50px past the gate going forward. | |
| # Phase 2 (trigger): car crosses back through the gate (prev<0 β curr>=0). | |
| # Anti-shortcut gate: must have traveled 80% of optimal lap distance. | |
| if not self._gate_armed and curr_side > 50.0: | |
| self._gate_armed = True | |
| lap_done = (self._gate_armed | |
| and self._prev_side < 0.0 and curr_side >= 0.0 | |
| and self._speed > 0.3 | |
| and self._lap_dist >= self.track.optimal_dist * 0.8) | |
| if lap_done: | |
| self._laps += 1 | |
| self._gate_armed = False # re-arm for next lap | |
| reward += 10.0 # lap bonus | |
| self._lap_dist = 0.0 | |
| self._lap_prev_x = self._x | |
| self._lap_prev_y = self._y | |
| self._prev_side = curr_side | |
| out_of_bounds = not (0 <= self._x < 900 and 0 <= self._y < 600) | |
| if out_of_bounds: | |
| reward = -15.0 | |
| done = (out_of_bounds | |
| or self._laps >= self.laps_target | |
| or self._step >= self.max_steps) | |
| return self._obs(), reward, done, { | |
| "lap": self._laps, | |
| "on_track": True, | |
| "step": self._step, | |
| "crashes": self._crash_count, | |
| "lap_dist": self._lap_dist, | |
| "out_of_bounds": out_of_bounds, | |
| } | |
| # ββ Internal βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _nearest_wp(self, x, y): | |
| """Return index of the nearest centreline waypoint to (x, y).""" | |
| dx = self._wp_x - x | |
| dy = self._wp_y - y | |
| return int(np.argmin(dx * dx + dy * dy)) | |
| def _update_physics(self, accel, steer): | |
| ms = self.track.max_speed | |
| ratio = min(abs(self._speed) / ms, 1.0) if ms > 0 else 1.0 | |
| self._angle += steer * self.STEER_DEG * max(0.3, ratio) | |
| if accel > 0: | |
| self._speed = min(self._speed + self.ACCEL * accel, ms) | |
| elif accel < 0: | |
| self._speed = max(self._speed + self.BRAKE_DECEL * accel, | |
| -ms * 0.4) | |
| if self._speed > 0: | |
| self._speed = max(0.0, self._speed - self.FRICTION) | |
| elif self._speed < 0: | |
| self._speed = min(0.0, self._speed + self.FRICTION) | |
| if not self.track.on_track(self._x, self._y): | |
| self._speed *= 0.80 | |
| rad = math.radians(self._angle) | |
| self._x += self._speed * math.cos(rad) | |
| self._y += self._speed * math.sin(rad) | |
| # Ray angles relative to heading (degrees). Covers lateral + diagonal + forward. | |
| _RAY_ANGLES = [-90, -45, 0, 45, 90] | |
| _RAY_MAX = 120 # max ray length in px (normalise distances to 0..1) | |
| _RAY_STEP = 2 # step size in px | |
| def _raycast(self): | |
| """ | |
| Cast 5 rays from the car at fixed angles relative to heading. | |
| Returns list of 5 floats in [0, 1]: | |
| 1.0 = boundary is MAX px away (clear road) | |
| 0.0 = boundary is right at the car (on the edge / off track) | |
| Left/right rays give lateral clearance; diagonal/front give lookahead. | |
| """ | |
| results = [] | |
| for rel_deg in self._RAY_ANGLES: | |
| abs_rad = math.radians(self._angle + rel_deg) | |
| dx = math.cos(abs_rad) * self._RAY_STEP | |
| dy = math.sin(abs_rad) * self._RAY_STEP | |
| px, py = self._x, self._y | |
| dist = 0.0 | |
| while dist < self._RAY_MAX: | |
| px += dx | |
| py += dy | |
| dist += self._RAY_STEP | |
| if not self.track.on_track(px, py): | |
| break | |
| results.append(dist / self._RAY_MAX) | |
| return results | |
| def _obs(self): | |
| t = self.track | |
| rays = self._raycast() # 5 floats: left, front-left, front, front-right, right | |
| ang_vel = self._angle_delta / self.STEER_DEG # β [-1, 1] | |
| # GPS: direction to the NEXT waypoint relative to the car's current heading. | |
| # sin < 0 β waypoint is to the left (steer left) | |
| # sin > 0 β waypoint is to the right (steer right) | |
| # cos β 1 β waypoint is straight ahead (keep going) | |
| next_idx = (self._wp_idx + 10) % self._n_wps | |
| dx = self._wp_x[next_idx] - self._x | |
| dy = self._wp_y[next_idx] - self._y | |
| world_angle_rad = math.atan2(dy, dx) | |
| rel_angle_rad = world_angle_rad - math.radians(self._angle) | |
| wp_sin = math.sin(rel_angle_rad) | |
| wp_cos = math.cos(rel_angle_rad) | |
| return [ | |
| ang_vel, | |
| self._speed / t.max_speed, | |
| *rays, | |
| wp_sin, # GPS direction sin component | |
| wp_cos, # GPS direction cos component | |
| ] | |
| def make_env(track, **kwargs): | |
| """Factory: return a fresh CarEnv for the given TrackDef.""" | |
| return CarEnv(track, **kwargs) | |
| # ββ Curriculum sampler ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class CurriculumSampler: | |
| """ | |
| Manages which train track to sample next. | |
| Strategy: performance-gated with anti-forgetting replay. | |
| * 70% of episodes β current frontier track | |
| * 30% of episodes β random track from already-mastered ones | |
| Advance to the next track when the rolling mean reward over | |
| `window` episodes exceeds `threshold`. | |
| Parameters | |
| ---------- | |
| tracks : ordered list of TrackDef (easy β hard) | |
| threshold : mean episode reward required to advance | |
| window : rolling window size for reward averaging | |
| replay_frac : fraction of episodes sampled from mastered tracks | |
| """ | |
| def __init__(self, tracks, threshold=30.0, window=50, replay_frac=0.3): | |
| self.tracks = tracks | |
| self.threshold = threshold | |
| self.window = window | |
| self.replay_frac = replay_frac | |
| self._idx = 0 # current frontier index | |
| self._replay_counter = 0 # round-robin index into mastered tracks | |
| self._rewards = deque(maxlen=window) | |
| self._crashes = deque(maxlen=window) # crashes per episode (all) | |
| self._laps = deque(maxlen=window) # laps completed per episode (all) | |
| self._is_frontier = deque(maxlen=window) # True when episode was on frontier track | |
| # Dedicated frontier-only deques so replay episodes never take up slots. | |
| self._frontier_crashes = deque(maxlen=window) | |
| self._frontier_laps = deque(maxlen=window) | |
| def current_level(self): | |
| return self._idx # 0-based index into self.tracks | |
| def current_track(self): | |
| return self.tracks[self._idx] | |
| def mastered(self): | |
| return self.tracks[:self._idx] | |
| def frontier_track(self): | |
| return self.tracks[self._idx] | |
| def sample(self): | |
| """Return the TrackDef to use for the next episode. | |
| Replay uses round-robin so every mastered track gets equal coverage, | |
| preventing early tracks from being starved as the curriculum grows. | |
| """ | |
| if self._idx > 0 and random.random() < self.replay_frac: | |
| track = self.mastered[self._replay_counter % self._idx] | |
| self._replay_counter += 1 | |
| return track | |
| return self.frontier_track | |
| def record(self, episode_reward, episode_crashes=0, episode_laps=0, is_frontier=True): | |
| """Call after each episode with the total reward, crash count, and lap count.""" | |
| self._rewards.append(episode_reward) | |
| self._crashes.append(episode_crashes) | |
| self._laps.append(episode_laps) | |
| self._is_frontier.append(is_frontier) | |
| if is_frontier: | |
| self._frontier_crashes.append(episode_crashes) | |
| self._frontier_laps.append(episode_laps) | |
| def should_advance(self): | |
| """ | |
| True when every episode in the frontier window (last `window` frontier | |
| episodes) completed a lap with zero crashes. Replay episodes have their | |
| own slots and never displace frontier entries from the window. | |
| """ | |
| if self._idx >= len(self.tracks) - 1: | |
| return False | |
| if len(self._frontier_crashes) < self.window: | |
| return False | |
| return all(l >= 1 and c == 0 | |
| for l, c in zip(self._frontier_laps, self._frontier_crashes)) | |
| def advance(self): | |
| """Move to the next track. Clears all rolling buffers.""" | |
| if self._idx < len(self.tracks) - 1: | |
| self._idx += 1 | |
| self._rewards.clear() | |
| self._crashes.clear() | |
| self._laps.clear() | |
| self._is_frontier.clear() | |
| self._frontier_crashes.clear() | |
| self._frontier_laps.clear() | |
| return True | |
| return False | |
| def rolling_crashes(self): | |
| """Mean crashes per episode over the current window.""" | |
| return statistics.mean(self._crashes) if self._crashes else float("nan") | |
| def rolling_laps(self): | |
| """Mean laps per episode over the current window.""" | |
| return statistics.mean(self._laps) if self._laps else float("nan") | |
| def status(self): | |
| mean = statistics.mean(self._rewards) if self._rewards else float("nan") | |
| crashes = statistics.mean(self._crashes) if self._crashes else float("nan") | |
| t = self.frontier_track | |
| effective = self.threshold * t.complexity | |
| crash_free = all(c == 0 for c in self._crashes) if self._crashes else False | |
| return (f"Frontier: track {t.level} '{t.name}' " | |
| f"[{self._idx+1}/{len(self.tracks)}] " | |
| f"rolling_mean={mean:.2f} threshold={effective:.2f} " | |
| f"crashes/ep={crashes:.2f} crash_free={crash_free}") | |
| # ββ Evaluator βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class Evaluator: | |
| """ | |
| Runs a fixed number of greedy episodes on a list of tracks | |
| and returns per-track and aggregate metrics. | |
| agent_fn : callable(obs) β action (e.g. your policy's greedy forward pass) | |
| """ | |
| def __init__(self, n_episodes=20, max_steps=3000, laps_target=3): | |
| self.n_episodes = n_episodes | |
| self.max_steps = max_steps | |
| self.laps_target = laps_target | |
| def run(self, agent_fn, tracks): | |
| """ | |
| Returns dict: | |
| { | |
| "per_track": [ { "level", "name", "tier", "mean_reward", | |
| "mean_laps", "completion_rate" }, ... ], | |
| "mean_reward": float, | |
| "mean_laps": float, | |
| "completion_rate": float, # fraction of episodes with β₯1 lap | |
| } | |
| """ | |
| per_track = [] | |
| all_rewards, all_laps, all_complete = [], [], [] | |
| for track in tracks: | |
| ep_rewards, ep_laps = [], [] | |
| for _ in range(self.n_episodes): | |
| env = make_env(track, max_steps=self.max_steps, | |
| laps_target=self.laps_target) | |
| obs = env.reset() | |
| done = False | |
| total_r = 0.0 | |
| while not done: | |
| action = agent_fn(obs) | |
| obs, r, done, _ = env.step(action) | |
| total_r += r | |
| ep_rewards.append(total_r) | |
| ep_laps.append(env.laps) | |
| completion = sum(1 for l in ep_laps if l >= 1) / self.n_episodes | |
| per_track.append({ | |
| "level": track.level, | |
| "name": track.name, | |
| "tier": difficulty_of(track), | |
| "mean_reward": statistics.mean(ep_rewards), | |
| "std_reward": statistics.stdev(ep_rewards) if len(ep_rewards) > 1 else 0.0, | |
| "mean_laps": statistics.mean(ep_laps), | |
| "completion_rate": completion, | |
| }) | |
| all_rewards.extend(ep_rewards) | |
| all_laps.extend(ep_laps) | |
| all_complete.extend([l >= 1 for l in ep_laps]) | |
| return { | |
| "per_track": per_track, | |
| "mean_reward": statistics.mean(all_rewards), | |
| "mean_laps": statistics.mean(all_laps), | |
| "completion_rate": sum(all_complete) / len(all_complete), | |
| } | |
| def print_report(metrics, title="Evaluation"): | |
| print(f"\n{'='*60}") | |
| print(f" {title}") | |
| print(f"{'='*60}") | |
| print(f" {'Lvl':<4} {'Name':<24} {'Tier':<16} " | |
| f"{'Reward':>8} {'Laps':>6} {'Done%':>6}") | |
| print(f" {'-'*66}") | |
| for r in metrics["per_track"]: | |
| print(f" {r['level']:<4} {r['name']:<24} {r['tier']:<16} " | |
| f"{r['mean_reward']:>8.1f} {r['mean_laps']:>6.2f} " | |
| f"{r['completion_rate']*100:>5.0f}%") | |
| print(f" {'-'*66}") | |
| print(f" {'AGGREGATE':<44} " | |
| f"{metrics['mean_reward']:>8.1f} {metrics['mean_laps']:>6.2f} " | |
| f"{metrics['completion_rate']*100:>5.0f}%") | |
| print(f"{'='*60}\n") | |
| # ββ Split summary (run as script) βββββββββββββββββββββββββββββββββββββββββββββ | |
| if __name__ == "__main__": | |
| print("\n20-Track Curriculum Splits") | |
| print("=" * 60) | |
| for split_name, split_tracks in [("TRAIN", TRAIN), ("VAL", VAL), ("TEST", TEST)]: | |
| print(f"\n{split_name} ({len(split_tracks)} tracks)") | |
| print(f" {'Lvl':<4} {'Name':<24} {'Tier':<16} {'Width':>6} {'MaxSpd':>7}") | |
| print(f" {'-'*58}") | |
| for t in split_tracks: | |
| print(f" {t.level:<4} {t.name:<24} {difficulty_of(t):<16} " | |
| f"{t.width:>6} {t.max_speed:>7.1f}") | |
| print("\nSplit rationale:") | |
| print(" TRAIN - 2 tracks per difficulty tier, ordered easy->hard for curriculum") | |
| print(" VAL - 1 track per tier (within-tier generalisation check)") | |
| print(" TEST - 1 track per tier (held out entirely; final evaluation only)") | |