| """ |
| Warehouse GridWorld Human-Play + Live RL Solver Demo |
| ---------------------------------------------------- |
| A Gymnasium + Gradio game version of the RL warehouse navigation environment. |
| |
| New in this version: |
| - Human play with arrow keys |
| - Reset/randomize start, goal, and 20% obstacles |
| - Live scoreboard with Manhattan distance |
| - RL Solver button that trains a Q-learning agent on the current maze |
| - Animated learning episodes plus an animated final greedy policy rollout |
| |
| Run: |
| pip install -r requirements_warehouse_game_rl_solver.txt |
| python warehouse_gradio_game_rl_solver.py |
| |
| Controls: |
| Arrow keys or on-screen buttons |
| """ |
|
|
| from __future__ import annotations |
|
|
| from collections import deque |
| from dataclasses import dataclass |
| from typing import List, Optional, Set, Tuple |
| import time |
|
|
| import gradio as gr |
| import gymnasium as gym |
| import numpy as np |
| from gymnasium import spaces |
|
|
| Position = Tuple[int, int] |
|
|
|
|
| class GridWorldEnv(gym.Env): |
| """ |
| Custom Gymnasium GridWorld environment for a warehouse navigation game. |
| |
| Symbols: |
| S = Start |
| G = Goal |
| X = Obstacle |
| . = Empty floor |
| A = Agent, drawn by the UI as a red circle |
| """ |
|
|
| metadata = {"render_modes": ["human"]} |
|
|
| def __init__( |
| self, |
| grid: Optional[np.ndarray] = None, |
| grid_size: int = 10, |
| auto_generate: bool = True, |
| obstacle_density: float = 0.20, |
| ): |
| super().__init__() |
| self.grid_size = int(grid_size) |
| self.auto_generate = bool(auto_generate) |
| self.obstacle_density = float(obstacle_density) |
|
|
| if grid is None and self.auto_generate: |
| self.grid = self._generate_random_grid() |
| else: |
| self.grid = grid.copy() if grid is not None else np.full((self.grid_size, self.grid_size), ".", dtype=str) |
|
|
| |
| self.action_space = spaces.Discrete(4) |
|
|
| |
| self.observation_space = spaces.Box( |
| low=0.0, |
| high=1.0, |
| shape=(4,), |
| dtype=np.float32, |
| ) |
|
|
| self.start: Position = tuple(np.argwhere(self.grid == "S")[0]) if "S" in self.grid else (0, 0) |
| self.goal: Position = tuple(np.argwhere(self.grid == "G")[0]) if "G" in self.grid else (self.grid_size - 1, self.grid_size - 1) |
| self.state: Position = self.start |
| self.visited: Set[Position] = {self.start} |
| self.steps = 0 |
| self.initial_distance = self._manhattan_distance(self.start, self.goal) |
| self.prev_distance = self.initial_distance |
| self.last_message = "New episode started." |
|
|
| def _generate_random_grid(self) -> np.ndarray: |
| """Generate a random, solvable grid with 20% obstacles by default.""" |
| max_attempts = 500 |
|
|
| for _ in range(max_attempts): |
| grid = np.full((self.grid_size, self.grid_size), ".", dtype=str) |
| available_positions = [(r, c) for r in range(self.grid_size) for c in range(self.grid_size)] |
| np.random.shuffle(available_positions) |
|
|
| start_pos = available_positions[0] |
| goal_pos = available_positions[1] |
|
|
| |
| if self._manhattan_distance(start_pos, goal_pos) < max(2, self.grid_size // 2): |
| continue |
|
|
| grid[start_pos] = "S" |
| grid[goal_pos] = "G" |
|
|
| num_obstacles = int((self.grid_size * self.grid_size - 2) * self.obstacle_density) |
| obstacle_candidates = available_positions[2:] |
| for pos in obstacle_candidates[:num_obstacles]: |
| r, c = pos |
| grid[r, c] = "X" |
|
|
| |
| if self._has_path_bfs(grid, start_pos, goal_pos): |
| return grid |
|
|
| |
| grid = np.full((self.grid_size, self.grid_size), ".", dtype=str) |
| grid[0, 0] = "S" |
| grid[self.grid_size - 1, self.grid_size - 1] = "G" |
| return grid |
|
|
| def _has_path_bfs(self, grid: np.ndarray, start: Position, goal: Position) -> bool: |
| queue = deque([start]) |
| visited = {start} |
| directions = [(-1, 0), (0, 1), (1, 0), (0, -1)] |
|
|
| while queue: |
| r, c = queue.popleft() |
| if (r, c) == goal: |
| return True |
|
|
| for dr, dc in directions: |
| nr, nc = r + dr, c + dc |
| if ( |
| 0 <= nr < self.grid_size |
| and 0 <= nc < self.grid_size |
| and (nr, nc) not in visited |
| and grid[nr, nc] != "X" |
| ): |
| visited.add((nr, nc)) |
| queue.append((nr, nc)) |
| return False |
|
|
| def _manhattan_distance(self, pos1: Position, pos2: Position) -> int: |
| return abs(pos1[0] - pos2[0]) + abs(pos1[1] - pos2[1]) |
|
|
| def reset(self, *, seed: Optional[int] = None, options: Optional[dict] = None): |
| super().reset(seed=seed) |
|
|
| if self.auto_generate: |
| self.grid = self._generate_random_grid() |
| self.start = tuple(np.argwhere(self.grid == "S")[0]) |
| self.goal = tuple(np.argwhere(self.grid == "G")[0]) |
|
|
| self.state = self.start |
| self.visited = {self.start} |
| self.steps = 0 |
| self.initial_distance = self._manhattan_distance(self.start, self.goal) |
| self.prev_distance = self.initial_distance |
| self.last_message = "New episode started. Agent begins on S." |
| return self._get_observation(), {} |
|
|
| def _get_observation(self) -> np.ndarray: |
| divisor = max(1, self.grid_size - 1) |
| return np.array( |
| [ |
| self.state[0] / divisor, |
| self.state[1] / divisor, |
| self.goal[0] / divisor, |
| self.goal[1] / divisor, |
| ], |
| dtype=np.float32, |
| ) |
|
|
| def step(self, action: int): |
| """ |
| Same scoring idea as the RL lab: |
| -5.0 invalid wall/obstacle move |
| +1.0 moved closer to goal |
| -0.5 moved farther from goal |
| -0.1 same Manhattan distance |
| +0.3 first time visiting a new cell |
| +50.0 reached goal |
| -10.0 timeout at 100 steps |
| """ |
| action = int(action) |
| r, c = self.state |
| moves = [(-1, 0), (0, 1), (1, 0), (0, -1)] |
| action_names = ["UP", "RIGHT", "DOWN", "LEFT"] |
| dr, dc = moves[action] |
| nr, nc = r + dr, c + dc |
|
|
| self.steps += 1 |
| reward = 0.0 |
|
|
| hit_wall = nr < 0 or nr >= self.grid_size or nc < 0 or nc >= self.grid_size |
| hit_obstacle = not hit_wall and self.grid[nr, nc] == "X" |
|
|
| if hit_wall or hit_obstacle: |
| reward = -5.0 |
| self.last_message = f"{action_names[action]} blocked: wall/obstacle. Agent stays put." |
| else: |
| self.state = (nr, nc) |
| new_distance = self._manhattan_distance(self.state, self.goal) |
|
|
| if new_distance < self.prev_distance: |
| reward = 1.0 |
| direction_msg = "closer to the goal" |
| elif new_distance > self.prev_distance: |
| reward = -0.5 |
| direction_msg = "farther from the goal" |
| else: |
| reward = -0.1 |
| direction_msg = "same Manhattan distance" |
|
|
| self.prev_distance = new_distance |
|
|
| if self.state not in self.visited: |
| self.visited.add(self.state) |
| reward += 0.3 |
| direction_msg += " + new-cell bonus" |
|
|
| self.last_message = f"{action_names[action]}: valid move, {direction_msg}." |
|
|
| if self.state == self.goal: |
| reward = 50.0 |
| self.last_message = "Goal reached! Mission complete." |
| return self._get_observation(), reward, True, False, { |
| "success": True, |
| "steps": self.steps, |
| "message": self.last_message, |
| } |
|
|
| if self.steps >= 100: |
| reward = -10.0 |
| self.last_message = "Timeout: 100 steps reached." |
| return self._get_observation(), reward, True, False, { |
| "success": False, |
| "steps": self.steps, |
| "message": self.last_message, |
| } |
|
|
| return self._get_observation(), reward, False, False, {"message": self.last_message} |
|
|
| def render(self) -> str: |
| grid_vis = self.grid.copy() |
| r, c = self.state |
| grid_vis[r, c] = "A" |
| return "\n".join(" ".join(row) for row in grid_vis) |
|
|
|
|
| @dataclass |
| class Score: |
| total: float = 0.0 |
| last_reward: float = 0.0 |
| last_action: str = "None" |
| done: bool = False |
| success: bool = False |
| mode: str = "Human Play" |
|
|
|
|
| ACTION_LABELS = { |
| 0: "↑ UP", |
| 1: "→ RIGHT", |
| 2: "↓ DOWN", |
| 3: "← LEFT", |
| } |
|
|
| MOVES = [(-1, 0), (0, 1), (1, 0), (0, -1)] |
|
|
| CSS = """ |
| #game-title { text-align: center; } |
| .warehouse-wrap { display: flex; justify-content: center; width: 100%; } |
| .warehouse-grid { |
| display: grid; |
| gap: 0px; |
| border: 3px solid #1f3347; |
| background: #1f3347; |
| width: min(78vw, 640px); |
| aspect-ratio: 1 / 1; |
| } |
| .cell { |
| position: relative; |
| border: 1.5px solid #243b55; |
| display: flex; |
| align-items: center; |
| justify-content: center; |
| font-weight: 800; |
| font-family: Arial, sans-serif; |
| user-select: none; |
| } |
| .empty { background: #edf2f4; color: #1f3347; } |
| .start { background: #3498db; color: white; } |
| .goal { background: #2ecc71; color: white; } |
| .obstacle { background: #2c3e50; color: white; } |
| .agent-dot { |
| width: 58%; |
| height: 58%; |
| border-radius: 50%; |
| background: #e74c3c; |
| box-shadow: 0 0 0 3px rgba(255,255,255,0.45); |
| position: absolute; |
| } |
| .panel { |
| border: 1px solid #d0d7de; |
| border-radius: 14px; |
| padding: 14px; |
| background: #ffffff; |
| box-shadow: 0 1px 4px rgba(0,0,0,0.06); |
| font-family: Arial, sans-serif; |
| margin-bottom: 10px; |
| } |
| .score-grid { |
| display: grid; |
| grid-template-columns: 1fr 1fr; |
| gap: 8px; |
| } |
| .score-item { |
| background: #f6f8fa; |
| border-radius: 10px; |
| padding: 8px 10px; |
| } |
| .score-label { font-size: 0.80rem; opacity: 0.75; } |
| .score-value { font-size: 1.15rem; font-weight: 800; } |
| .reward-table { |
| width: 100%; |
| border-collapse: collapse; |
| font-size: 0.92rem; |
| } |
| .reward-table td, .reward-table th { |
| border-bottom: 1px solid #e5e7eb; |
| padding: 6px; |
| text-align: left; |
| } |
| .kbd { |
| display: inline-block; |
| border: 1px solid #b6bec8; |
| border-bottom-width: 2px; |
| border-radius: 6px; |
| padding: 2px 7px; |
| margin: 0 2px; |
| background: #f6f8fa; |
| font-weight: 700; |
| } |
| .codebox { |
| background: #0d1117; |
| color: #e6edf3; |
| padding: 10px; |
| border-radius: 10px; |
| overflow-x: auto; |
| font-family: Consolas, Monaco, monospace; |
| font-size: 0.86rem; |
| } |
| .small-note { font-size: 0.9rem; opacity: 0.85; } |
| """ |
|
|
| KEYBOARD_JS = """ |
| () => { |
| function clickMoveButton(id) { |
| const wrapper = document.getElementById(id); |
| if (!wrapper) return; |
| const button = wrapper.querySelector('button') || wrapper; |
| button.click(); |
| } |
| |
| document.addEventListener('keydown', function(event) { |
| const active = document.activeElement; |
| const typing = active && ['INPUT', 'TEXTAREA', 'SELECT'].includes(active.tagName); |
| if (typing) return; |
| |
| const keyMap = { |
| 'ArrowUp': 'move-up', |
| 'ArrowRight': 'move-right', |
| 'ArrowDown': 'move-down', |
| 'ArrowLeft': 'move-left' |
| }; |
| |
| if (keyMap[event.key]) { |
| event.preventDefault(); |
| clickMoveButton(keyMap[event.key]); |
| } |
| }); |
| } |
| """ |
|
|
|
|
| def grid_to_html(env: GridWorldEnv) -> str: |
| size = env.grid_size |
| cells = [] |
| for r in range(size): |
| for c in range(size): |
| val = env.grid[r, c] |
| if val == "X": |
| cls = "obstacle" |
| label = "X" |
| elif val == "S": |
| cls = "start" |
| label = "S" |
| elif val == "G": |
| cls = "goal" |
| label = "G" |
| else: |
| cls = "empty" |
| label = "" |
|
|
| agent = '<div class="agent-dot" title="Agent"></div>' if env.state == (r, c) else "" |
| cells.append(f'<div class="cell {cls}" title="row {r}, col {c}"><span>{label}</span>{agent}</div>') |
|
|
| return f""" |
| <div class="warehouse-wrap"> |
| <div class="warehouse-grid" style="grid-template-columns: repeat({size}, 1fr);"> |
| {''.join(cells)} |
| </div> |
| </div> |
| """ |
|
|
|
|
| def scoreboard_html(env: GridWorldEnv, score: Score) -> str: |
| distance = env._manhattan_distance(env.state, env.goal) |
| obs = env._get_observation() |
| status = "✅ Complete" if score.success else ("⏹️ Episode ended" if score.done else "🎮 Playing") |
| return f""" |
| <div class="panel"> |
| <h3>Score Board</h3> |
| <div class="score-grid"> |
| <div class="score-item"><div class="score-label">Mode</div><div class="score-value">{score.mode}</div></div> |
| <div class="score-item"><div class="score-label">Total Score</div><div class="score-value">{score.total:.1f}</div></div> |
| <div class="score-item"><div class="score-label">Last Reward</div><div class="score-value">{score.last_reward:+.1f}</div></div> |
| <div class="score-item"><div class="score-label">Steps</div><div class="score-value">{env.steps} / 100</div></div> |
| <div class="score-item"><div class="score-label">Manhattan Distance</div><div class="score-value">{distance}</div></div> |
| <div class="score-item"><div class="score-label">Agent Position</div><div class="score-value">{env.state}</div></div> |
| <div class="score-item"><div class="score-label">Start Position</div><div class="score-value">{env.start}</div></div> |
| <div class="score-item"><div class="score-label">Goal Position</div><div class="score-value">{env.goal}</div></div> |
| </div> |
| <p><b>Status:</b> {status}</p> |
| <p><b>Last Action:</b> {score.last_action}</p> |
| <p><b>Rule Fired:</b> {env.last_message}</p> |
| <p><b>Observation:</b> [{obs[0]:.2f}, {obs[1]:.2f}, {obs[2]:.2f}, {obs[3]:.2f}]</p> |
| </div> |
| """ |
|
|
|
|
| def reward_code_html() -> str: |
| return """ |
| <div class="panel"> |
| <h3>Reward Code Used</h3> |
| <table class="reward-table"> |
| <tr><th>Move Result</th><th>Reward</th></tr> |
| <tr><td>Hit wall or obstacle</td><td>-5.0</td></tr> |
| <tr><td>Move closer to goal</td><td>+1.0</td></tr> |
| <tr><td>Move farther from goal</td><td>-0.5</td></tr> |
| <tr><td>Same Manhattan distance</td><td>-0.1</td></tr> |
| <tr><td>First time in a new cell</td><td>+0.3 bonus</td></tr> |
| <tr><td>Reach the goal</td><td>+50.0</td></tr> |
| <tr><td>Timeout after 100 steps</td><td>-10.0</td></tr> |
| </table> |
| <p>Controls: <span class="kbd">↑</span><span class="kbd">→</span><span class="kbd">↓</span><span class="kbd">←</span> or the on-screen buttons.</p> |
| </div> |
| """ |
|
|
|
|
| def rl_log_html( |
| phase: str = "Ready", |
| episode: int = 0, |
| total_episodes: int = 0, |
| epsilon: float = 0.0, |
| successes: int = 0, |
| recent_success_rate: float = 0.0, |
| best_steps: Optional[int] = None, |
| extra: str = "Press Train RL Solver to teach an agent on the current maze.", |
| ) -> str: |
| best = "None yet" if best_steps is None else str(best_steps) |
| return f""" |
| <div class="panel"> |
| <h3>RL Training Board</h3> |
| <div class="score-grid"> |
| <div class="score-item"><div class="score-label">Phase</div><div class="score-value">{phase}</div></div> |
| <div class="score-item"><div class="score-label">Episode</div><div class="score-value">{episode} / {total_episodes}</div></div> |
| <div class="score-item"><div class="score-label">Exploration ε</div><div class="score-value">{epsilon:.2f}</div></div> |
| <div class="score-item"><div class="score-label">Training Successes</div><div class="score-value">{successes}</div></div> |
| <div class="score-item"><div class="score-label">Recent Success Rate</div><div class="score-value">{recent_success_rate:.0%}</div></div> |
| <div class="score-item"><div class="score-label">Best Steps</div><div class="score-value">{best}</div></div> |
| </div> |
| <p><b>What is happening?</b> {extra}</p> |
| <div class="codebox">Q[s,a] ← Q[s,a] + α × (reward + γ × max(Q[next_state,*]) − Q[s,a])</div> |
| <p class="small-note">This fast live demo uses tabular Q-learning against the same Gymnasium environment and reward function, so the learner improves by trial, reward, and correction.</p> |
| </div> |
| """ |
|
|
|
|
| def new_game(grid_size: int): |
| env = GridWorldEnv(grid_size=int(grid_size), auto_generate=True, obstacle_density=0.20) |
| env.reset() |
| score = Score() |
| return env, score, grid_to_html(env), scoreboard_html(env, score), reward_code_html(), rl_log_html() |
|
|
|
|
| def move_agent(action: int, env: Optional[GridWorldEnv], score: Optional[Score], grid_size: int): |
| if env is None or score is None: |
| return new_game(grid_size) |
|
|
| if score.done: |
| env.last_message = "Episode already ended. Press Reset / Randomize Grid." |
| return env, score, grid_to_html(env), scoreboard_html(env, score), reward_code_html(), rl_log_html(extra="Episode ended. Reset to play again.") |
|
|
| _, reward, terminated, truncated, info = env.step(int(action)) |
| score.last_reward = float(reward) |
| score.total += float(reward) |
| score.last_action = ACTION_LABELS[int(action)] |
| score.done = bool(terminated or truncated) |
| score.success = bool(info.get("success", False)) |
| score.mode = "Human Play" |
|
|
| return env, score, grid_to_html(env), scoreboard_html(env, score), reward_code_html(), rl_log_html(extra="Human mode: use the arrow keys and watch the reward code fire.") |
|
|
|
|
| def clone_fixed_env(env: GridWorldEnv) -> GridWorldEnv: |
| fixed = GridWorldEnv( |
| grid=env.grid.copy(), |
| grid_size=env.grid_size, |
| auto_generate=False, |
| obstacle_density=env.obstacle_density, |
| ) |
| fixed.reset() |
| return fixed |
|
|
|
|
| def valid_actions_for_grid(grid: np.ndarray, state: Position) -> List[int]: |
| size = grid.shape[0] |
| actions: List[int] = [] |
| for action, (dr, dc) in enumerate(MOVES): |
| nr, nc = state[0] + dr, state[1] + dc |
| if 0 <= nr < size and 0 <= nc < size and grid[nr, nc] != "X": |
| actions.append(action) |
| return actions |
|
|
|
|
| def manhattan(pos1: Position, pos2: Position) -> int: |
| return abs(pos1[0] - pos2[0]) + abs(pos1[1] - pos2[1]) |
|
|
|
|
| def choose_action( |
| q_table: np.ndarray, |
| grid: np.ndarray, |
| state: Position, |
| goal: Position, |
| epsilon: float, |
| rng: np.random.Generator, |
| episode: int, |
| total_episodes: int, |
| ) -> int: |
| valid = valid_actions_for_grid(grid, state) |
| if not valid: |
| return 0 |
|
|
| |
| if rng.random() < epsilon: |
| if episode > total_episodes * 0.25 and rng.random() < 0.45: |
| best_distance = min(manhattan((state[0] + MOVES[a][0], state[1] + MOVES[a][1]), goal) for a in valid) |
| best = [a for a in valid if manhattan((state[0] + MOVES[a][0], state[1] + MOVES[a][1]), goal) == best_distance] |
| return int(rng.choice(best)) |
| return int(rng.choice(valid)) |
|
|
| |
| max_q = max(q_table[state[0], state[1], a] for a in valid) |
| best = [a for a in valid if q_table[state[0], state[1], a] == max_q] |
| if len(best) > 1: |
| best_distance = min(manhattan((state[0] + MOVES[a][0], state[1] + MOVES[a][1]), goal) for a in best) |
| best = [a for a in best if manhattan((state[0] + MOVES[a][0], state[1] + MOVES[a][1]), goal) == best_distance] |
| return int(rng.choice(best)) |
|
|
|
|
| def initialize_q_table(grid: np.ndarray) -> np.ndarray: |
| size = grid.shape[0] |
| q_table = np.zeros((size, size, 4), dtype=np.float32) |
|
|
| |
| for r in range(size): |
| for c in range(size): |
| valid = set(valid_actions_for_grid(grid, (r, c))) |
| for action in range(4): |
| if action not in valid: |
| q_table[r, c, action] = -10.0 |
| return q_table |
|
|
|
|
| def greedy_rollout(q_table: np.ndarray, grid: np.ndarray, max_steps: int) -> Tuple[bool, int]: |
| env = GridWorldEnv(grid=grid.copy(), grid_size=grid.shape[0], auto_generate=False, obstacle_density=0.20) |
| env.reset() |
| seen = set() |
|
|
| for step in range(max_steps): |
| state = env.state |
| valid = valid_actions_for_grid(grid, state) |
| if not valid: |
| return False, step |
|
|
| max_q = max(q_table[state[0], state[1], a] for a in valid) |
| best = [a for a in valid if q_table[state[0], state[1], a] == max_q] |
| best_distance = min(manhattan((state[0] + MOVES[a][0], state[1] + MOVES[a][1]), env.goal) for a in best) |
| best = [a for a in best if manhattan((state[0] + MOVES[a][0], state[1] + MOVES[a][1]), env.goal) == best_distance] |
| action = int(best[0]) |
|
|
| loop_key = (state, action) |
| if loop_key in seen and step > grid.shape[0] * 2: |
| return False, step |
| seen.add(loop_key) |
|
|
| _, _, done, _, info = env.step(action) |
| if done: |
| return bool(info.get("success", False)), step + 1 |
|
|
| return False, max_steps |
|
|
|
|
| def train_rl_solver( |
| env: Optional[GridWorldEnv], |
| score: Optional[Score], |
| grid_size: int, |
| training_episodes: int, |
| animation_delay: float, |
| ): |
| """ |
| Train a Q-learning agent on the current fixed maze and animate the learning. |
| |
| This is intentionally lightweight for Gradio: it uses the same Gymnasium env.step() |
| reward rules, but avoids long PPO training time so learners can watch the process live. |
| """ |
| if env is None: |
| env, score, grid_html, board_html, reward_html, log_html = new_game(grid_size) |
| yield env, score, grid_html, board_html, reward_html, log_html |
|
|
| assert env is not None |
| grid = env.grid.copy() |
| size = env.grid_size |
| max_steps = max(100, size * size) |
| total_episodes = int(training_episodes) |
| delay = max(0.0, float(animation_delay)) |
|
|
| q_table = initialize_q_table(grid) |
| alpha = 0.65 |
| gamma = 0.95 |
| epsilon_start = 1.00 |
| epsilon_end = 0.04 |
| rng = np.random.default_rng() |
|
|
| success_window: deque[bool] = deque(maxlen=25) |
| total_successes = 0 |
| best_steps: Optional[int] = None |
|
|
| |
| preview_episodes = { |
| 1, |
| 2, |
| 5, |
| max(10, total_episodes // 5), |
| max(20, total_episodes // 2), |
| total_episodes, |
| } |
|
|
| start_env = clone_fixed_env(env) |
| start_score = Score(mode="RL Training") |
| start_env.last_message = "RL training is starting on this exact maze." |
| yield ( |
| start_env, |
| start_score, |
| grid_to_html(start_env), |
| scoreboard_html(start_env, start_score), |
| reward_code_html(), |
| rl_log_html("Starting", 0, total_episodes, epsilon_start, 0, 0.0, None, "The agent begins with high exploration and no useful policy."), |
| ) |
| time.sleep(delay) |
|
|
| final_training_env = start_env |
| final_training_score = start_score |
|
|
| for episode in range(1, total_episodes + 1): |
| training_env = GridWorldEnv(grid=grid.copy(), grid_size=size, auto_generate=False, obstacle_density=env.obstacle_density) |
| training_env.reset() |
| episode_score = 0.0 |
| episode_success = False |
| should_animate = episode in preview_episodes |
|
|
| |
| progress = episode / max(1, total_episodes) |
| epsilon = max(epsilon_end, epsilon_start * (1.0 - progress)) |
|
|
| for _step in range(max_steps): |
| old_state = training_env.state |
| action = choose_action(q_table, grid, old_state, training_env.goal, epsilon, rng, episode, total_episodes) |
| _, reward, terminated, truncated, info = training_env.step(action) |
| new_state = training_env.state |
| done = bool(terminated or truncated) |
| episode_score += float(reward) |
|
|
| next_valid = valid_actions_for_grid(grid, new_state) |
| next_best = max(q_table[new_state[0], new_state[1], a] for a in next_valid) if next_valid else 0.0 |
| td_target = float(reward) if done else float(reward) + gamma * float(next_best) |
| td_error = td_target - float(q_table[old_state[0], old_state[1], action]) |
| q_table[old_state[0], old_state[1], action] += alpha * td_error |
|
|
| if should_animate: |
| live_score = Score( |
| total=episode_score, |
| last_reward=float(reward), |
| last_action=ACTION_LABELS[action], |
| done=done, |
| success=bool(info.get("success", False)), |
| mode="RL Training", |
| ) |
| extra = ( |
| f"Animating training episode {episode}. The agent is using ε-greedy exploration, " |
| f"then updating its Q-table from the reward it receives." |
| ) |
| yield ( |
| training_env, |
| live_score, |
| grid_to_html(training_env), |
| scoreboard_html(training_env, live_score), |
| reward_code_html(), |
| rl_log_html("Learning", episode, total_episodes, epsilon, total_successes, (sum(success_window) / len(success_window)) if success_window else 0.0, best_steps, extra), |
| ) |
| time.sleep(delay) |
|
|
| if done: |
| episode_success = bool(info.get("success", False)) |
| break |
|
|
| success_window.append(episode_success) |
| if episode_success: |
| total_successes += 1 |
| if best_steps is None or training_env.steps < best_steps: |
| best_steps = training_env.steps |
|
|
| final_training_env = training_env |
| final_training_score = Score( |
| total=episode_score, |
| last_reward=0.0, |
| last_action="Episode complete", |
| done=True, |
| success=episode_success, |
| mode="RL Training", |
| ) |
|
|
| |
| if episode % max(10, total_episodes // 10) == 0 and not should_animate: |
| recent_rate = (sum(success_window) / len(success_window)) if success_window else 0.0 |
| extra = f"Training checkpoint: recent success rate is {recent_rate:.0%}. The policy is becoming less random as ε decays." |
| yield ( |
| final_training_env, |
| final_training_score, |
| grid_to_html(final_training_env), |
| scoreboard_html(final_training_env, final_training_score), |
| reward_code_html(), |
| rl_log_html("Training", episode, total_episodes, epsilon, total_successes, recent_rate, best_steps, extra), |
| ) |
|
|
| |
| |
| solved, greedy_steps = greedy_rollout(q_table, grid, max_steps) |
| extra_rounds = 0 |
| while not solved and extra_rounds < 3: |
| extra_rounds += 1 |
| for _ in range(max(100, total_episodes // 2)): |
| training_env = GridWorldEnv(grid=grid.copy(), grid_size=size, auto_generate=False, obstacle_density=env.obstacle_density) |
| training_env.reset() |
| for _step in range(max_steps): |
| old_state = training_env.state |
| action = choose_action(q_table, grid, old_state, training_env.goal, 0.20, rng, total_episodes, total_episodes) |
| _, reward, terminated, truncated, _info = training_env.step(action) |
| new_state = training_env.state |
| done = bool(terminated or truncated) |
| next_valid = valid_actions_for_grid(grid, new_state) |
| next_best = max(q_table[new_state[0], new_state[1], a] for a in next_valid) if next_valid else 0.0 |
| td_target = float(reward) if done else float(reward) + gamma * float(next_best) |
| q_table[old_state[0], old_state[1], action] += alpha * (td_target - q_table[old_state[0], old_state[1], action]) |
| if done: |
| break |
| solved, greedy_steps = greedy_rollout(q_table, grid, max_steps) |
| yield ( |
| final_training_env, |
| final_training_score, |
| grid_to_html(final_training_env), |
| scoreboard_html(final_training_env, final_training_score), |
| reward_code_html(), |
| rl_log_html("Extra RL Training", total_episodes, total_episodes, 0.20, total_successes, (sum(success_window) / len(success_window)) if success_window else 0.0, best_steps, "The first greedy rollout was not reliable yet, so the learner is getting extra reinforcement experience."), |
| ) |
|
|
| |
| rollout_env = GridWorldEnv(grid=grid.copy(), grid_size=size, auto_generate=False, obstacle_density=env.obstacle_density) |
| rollout_env.reset() |
| rollout_score = Score(mode="Final Learned Policy") |
| yield ( |
| rollout_env, |
| rollout_score, |
| grid_to_html(rollout_env), |
| scoreboard_html(rollout_env, rollout_score), |
| reward_code_html(), |
| rl_log_html("Final Rollout", total_episodes, total_episodes, 0.0, total_successes, (sum(success_window) / len(success_window)) if success_window else 0.0, best_steps, "Now exploration is off. The agent follows the learned policy greedily."), |
| ) |
| time.sleep(delay) |
|
|
| visited_state_counts = {} |
| for _step in range(max_steps): |
| state = rollout_env.state |
| valid = valid_actions_for_grid(grid, state) |
| if not valid: |
| rollout_env.last_message = "No valid actions available." |
| break |
|
|
| max_q = max(q_table[state[0], state[1], a] for a in valid) |
| best = [a for a in valid if q_table[state[0], state[1], a] == max_q] |
| if len(best) > 1: |
| best_distance = min(manhattan((state[0] + MOVES[a][0], state[1] + MOVES[a][1]), rollout_env.goal) for a in best) |
| best = [a for a in best if manhattan((state[0] + MOVES[a][0], state[1] + MOVES[a][1]), rollout_env.goal) == best_distance] |
| action = int(best[0]) |
|
|
| _, reward, terminated, truncated, info = rollout_env.step(action) |
| rollout_score.total += float(reward) |
| rollout_score.last_reward = float(reward) |
| rollout_score.last_action = ACTION_LABELS[action] |
| rollout_score.done = bool(terminated or truncated) |
| rollout_score.success = bool(info.get("success", False)) |
| rollout_score.mode = "Final Learned Policy" |
|
|
| recent_rate = (sum(success_window) / len(success_window)) if success_window else 0.0 |
| extra = "The final learned policy is moving without random exploration." |
| yield ( |
| rollout_env, |
| rollout_score, |
| grid_to_html(rollout_env), |
| scoreboard_html(rollout_env, rollout_score), |
| reward_code_html(), |
| rl_log_html("Final Rollout", total_episodes, total_episodes, 0.0, total_successes, recent_rate, best_steps, extra), |
| ) |
| time.sleep(delay) |
|
|
| if rollout_score.done: |
| break |
|
|
| visited_state_counts[state] = visited_state_counts.get(state, 0) + 1 |
| if visited_state_counts[state] > 4: |
| rollout_env.last_message = "The learned policy is looping. Press Train again or increase episodes." |
| rollout_score.done = True |
| rollout_score.success = False |
| yield ( |
| rollout_env, |
| rollout_score, |
| grid_to_html(rollout_env), |
| scoreboard_html(rollout_env, rollout_score), |
| reward_code_html(), |
| rl_log_html("Needs More Training", total_episodes, total_episodes, 0.0, total_successes, recent_rate, best_steps, "The agent learned something, but this maze needs more episodes. Increase the episode slider and train again."), |
| ) |
| break |
|
|
|
|
| def build_app() -> gr.Blocks: |
| with gr.Blocks(css=CSS, js=KEYBOARD_JS, title="Warehouse GridWorld Game + RL Solver") as demo: |
| gr.Markdown( |
| """ |
| # 🏗️ Warehouse GridWorld Game + Live RL Solver |
| Use the arrow keys to move the red agent from **S** to **G**. Obstacles are randomized at **20%** on every reset. |
| Press **Train RL Solver + Animate** to watch a reinforcement learner practice on the current maze and then perform a final learned run. |
| """, |
| elem_id="game-title", |
| ) |
|
|
| env_state = gr.State(None) |
| score_state = gr.State(None) |
|
|
| with gr.Row(): |
| with gr.Column(scale=2): |
| grid_display = gr.HTML(label="Warehouse Grid") |
| with gr.Row(): |
| up_btn = gr.Button("↑ Up", elem_id="move-up") |
| with gr.Row(): |
| left_btn = gr.Button("← Left", elem_id="move-left") |
| down_btn = gr.Button("↓ Down", elem_id="move-down") |
| right_btn = gr.Button("→ Right", elem_id="move-right") |
| with gr.Column(scale=1): |
| grid_size = gr.Slider(5, 15, value=10, step=1, label="Grid Size") |
| reset_btn = gr.Button("🔄 Reset / Randomize Grid", variant="primary") |
| train_episodes = gr.Slider(50, 2000, value=600, step=50, label="RL Training Episodes") |
| animation_delay = gr.Slider(0.00, 0.25, value=0.04, step=0.01, label="Animation Delay Seconds") |
| train_btn = gr.Button("🤖 Train RL Solver + Animate", variant="secondary") |
| score_display = gr.HTML(label="Score Board") |
| reward_display = gr.HTML(label="Reward Code") |
| rl_display = gr.HTML(label="RL Training Board") |
|
|
| outputs = [env_state, score_state, grid_display, score_display, reward_display, rl_display] |
| human_inputs = [env_state, score_state, grid_size] |
| train_inputs = [env_state, score_state, grid_size, train_episodes, animation_delay] |
|
|
| demo.load(fn=new_game, inputs=[grid_size], outputs=outputs) |
| reset_btn.click(fn=new_game, inputs=[grid_size], outputs=outputs) |
| up_btn.click(fn=lambda env, score, size: move_agent(0, env, score, size), inputs=human_inputs, outputs=outputs) |
| right_btn.click(fn=lambda env, score, size: move_agent(1, env, score, size), inputs=human_inputs, outputs=outputs) |
| down_btn.click(fn=lambda env, score, size: move_agent(2, env, score, size), inputs=human_inputs, outputs=outputs) |
| left_btn.click(fn=lambda env, score, size: move_agent(3, env, score, size), inputs=human_inputs, outputs=outputs) |
| train_btn.click(fn=train_rl_solver, inputs=train_inputs, outputs=outputs) |
|
|
| return demo |
|
|
|
|
| if __name__ == "__main__": |
| app = build_app() |
| app.queue() |
| app.launch() |
|
|