""" Warehouse GridWorld Human-Play + Live RL Solver Demo ---------------------------------------------------- A Gymnasium + Gradio game version of the RL warehouse navigation environment. New in this version: - Human play with arrow keys - Reset/randomize start, goal, and 20% obstacles - Live scoreboard with Manhattan distance - RL Solver button that trains a Q-learning agent on the current maze - Animated learning episodes plus an animated final greedy policy rollout Run: pip install -r requirements_warehouse_game_rl_solver.txt python warehouse_gradio_game_rl_solver.py Controls: Arrow keys or on-screen buttons """ from __future__ import annotations from collections import deque from dataclasses import dataclass from typing import List, Optional, Set, Tuple import time import gradio as gr import gymnasium as gym import numpy as np from gymnasium import spaces Position = Tuple[int, int] class GridWorldEnv(gym.Env): """ Custom Gymnasium GridWorld environment for a warehouse navigation game. Symbols: S = Start G = Goal X = Obstacle . = Empty floor A = Agent, drawn by the UI as a red circle """ metadata = {"render_modes": ["human"]} def __init__( self, grid: Optional[np.ndarray] = None, grid_size: int = 10, auto_generate: bool = True, obstacle_density: float = 0.20, ): super().__init__() self.grid_size = int(grid_size) self.auto_generate = bool(auto_generate) self.obstacle_density = float(obstacle_density) if grid is None and self.auto_generate: self.grid = self._generate_random_grid() else: self.grid = grid.copy() if grid is not None else np.full((self.grid_size, self.grid_size), ".", dtype=str) # 0=UP, 1=RIGHT, 2=DOWN, 3=LEFT self.action_space = spaces.Discrete(4) # [agent_row_norm, agent_col_norm, goal_row_norm, goal_col_norm] self.observation_space = spaces.Box( low=0.0, high=1.0, shape=(4,), dtype=np.float32, ) self.start: Position = tuple(np.argwhere(self.grid == "S")[0]) if "S" in self.grid else (0, 0) self.goal: Position = tuple(np.argwhere(self.grid == "G")[0]) if "G" in self.grid else (self.grid_size - 1, self.grid_size - 1) self.state: Position = self.start self.visited: Set[Position] = {self.start} self.steps = 0 self.initial_distance = self._manhattan_distance(self.start, self.goal) self.prev_distance = self.initial_distance self.last_message = "New episode started." def _generate_random_grid(self) -> np.ndarray: """Generate a random, solvable grid with 20% obstacles by default.""" max_attempts = 500 for _ in range(max_attempts): grid = np.full((self.grid_size, self.grid_size), ".", dtype=str) available_positions = [(r, c) for r in range(self.grid_size) for c in range(self.grid_size)] np.random.shuffle(available_positions) start_pos = available_positions[0] goal_pos = available_positions[1] # Avoid trivial games where start and goal are side-by-side. if self._manhattan_distance(start_pos, goal_pos) < max(2, self.grid_size // 2): continue grid[start_pos] = "S" grid[goal_pos] = "G" num_obstacles = int((self.grid_size * self.grid_size - 2) * self.obstacle_density) obstacle_candidates = available_positions[2:] for pos in obstacle_candidates[:num_obstacles]: r, c = pos grid[r, c] = "X" # Make sure the game can be solved. if self._has_path_bfs(grid, start_pos, goal_pos): return grid # Fallback grid with guaranteed path. grid = np.full((self.grid_size, self.grid_size), ".", dtype=str) grid[0, 0] = "S" grid[self.grid_size - 1, self.grid_size - 1] = "G" return grid def _has_path_bfs(self, grid: np.ndarray, start: Position, goal: Position) -> bool: queue = deque([start]) visited = {start} directions = [(-1, 0), (0, 1), (1, 0), (0, -1)] while queue: r, c = queue.popleft() if (r, c) == goal: return True for dr, dc in directions: nr, nc = r + dr, c + dc if ( 0 <= nr < self.grid_size and 0 <= nc < self.grid_size and (nr, nc) not in visited and grid[nr, nc] != "X" ): visited.add((nr, nc)) queue.append((nr, nc)) return False def _manhattan_distance(self, pos1: Position, pos2: Position) -> int: return abs(pos1[0] - pos2[0]) + abs(pos1[1] - pos2[1]) def reset(self, *, seed: Optional[int] = None, options: Optional[dict] = None): super().reset(seed=seed) if self.auto_generate: self.grid = self._generate_random_grid() self.start = tuple(np.argwhere(self.grid == "S")[0]) self.goal = tuple(np.argwhere(self.grid == "G")[0]) self.state = self.start self.visited = {self.start} self.steps = 0 self.initial_distance = self._manhattan_distance(self.start, self.goal) self.prev_distance = self.initial_distance self.last_message = "New episode started. Agent begins on S." return self._get_observation(), {} def _get_observation(self) -> np.ndarray: divisor = max(1, self.grid_size - 1) return np.array( [ self.state[0] / divisor, self.state[1] / divisor, self.goal[0] / divisor, self.goal[1] / divisor, ], dtype=np.float32, ) def step(self, action: int): """ Same scoring idea as the RL lab: -5.0 invalid wall/obstacle move +1.0 moved closer to goal -0.5 moved farther from goal -0.1 same Manhattan distance +0.3 first time visiting a new cell +50.0 reached goal -10.0 timeout at 100 steps """ action = int(action) r, c = self.state moves = [(-1, 0), (0, 1), (1, 0), (0, -1)] action_names = ["UP", "RIGHT", "DOWN", "LEFT"] dr, dc = moves[action] nr, nc = r + dr, c + dc self.steps += 1 reward = 0.0 hit_wall = nr < 0 or nr >= self.grid_size or nc < 0 or nc >= self.grid_size hit_obstacle = not hit_wall and self.grid[nr, nc] == "X" if hit_wall or hit_obstacle: reward = -5.0 self.last_message = f"{action_names[action]} blocked: wall/obstacle. Agent stays put." else: self.state = (nr, nc) new_distance = self._manhattan_distance(self.state, self.goal) if new_distance < self.prev_distance: reward = 1.0 direction_msg = "closer to the goal" elif new_distance > self.prev_distance: reward = -0.5 direction_msg = "farther from the goal" else: reward = -0.1 direction_msg = "same Manhattan distance" self.prev_distance = new_distance if self.state not in self.visited: self.visited.add(self.state) reward += 0.3 direction_msg += " + new-cell bonus" self.last_message = f"{action_names[action]}: valid move, {direction_msg}." if self.state == self.goal: reward = 50.0 self.last_message = "Goal reached! Mission complete." return self._get_observation(), reward, True, False, { "success": True, "steps": self.steps, "message": self.last_message, } if self.steps >= 100: reward = -10.0 self.last_message = "Timeout: 100 steps reached." return self._get_observation(), reward, True, False, { "success": False, "steps": self.steps, "message": self.last_message, } return self._get_observation(), reward, False, False, {"message": self.last_message} def render(self) -> str: grid_vis = self.grid.copy() r, c = self.state grid_vis[r, c] = "A" return "\n".join(" ".join(row) for row in grid_vis) @dataclass class Score: total: float = 0.0 last_reward: float = 0.0 last_action: str = "None" done: bool = False success: bool = False mode: str = "Human Play" ACTION_LABELS = { 0: "↑ UP", 1: "→ RIGHT", 2: "↓ DOWN", 3: "← LEFT", } MOVES = [(-1, 0), (0, 1), (1, 0), (0, -1)] CSS = """ #game-title { text-align: center; } .warehouse-wrap { display: flex; justify-content: center; width: 100%; } .warehouse-grid { display: grid; gap: 0px; border: 3px solid #1f3347; background: #1f3347; width: min(78vw, 640px); aspect-ratio: 1 / 1; } .cell { position: relative; border: 1.5px solid #243b55; display: flex; align-items: center; justify-content: center; font-weight: 800; font-family: Arial, sans-serif; user-select: none; } .empty { background: #edf2f4; color: #1f3347; } .start { background: #3498db; color: white; } .goal { background: #2ecc71; color: white; } .obstacle { background: #2c3e50; color: white; } .agent-dot { width: 58%; height: 58%; border-radius: 50%; background: #e74c3c; box-shadow: 0 0 0 3px rgba(255,255,255,0.45); position: absolute; } .panel { border: 1px solid #d0d7de; border-radius: 14px; padding: 14px; background: #ffffff; box-shadow: 0 1px 4px rgba(0,0,0,0.06); font-family: Arial, sans-serif; margin-bottom: 10px; } .score-grid { display: grid; grid-template-columns: 1fr 1fr; gap: 8px; } .score-item { background: #f6f8fa; border-radius: 10px; padding: 8px 10px; } .score-label { font-size: 0.80rem; opacity: 0.75; } .score-value { font-size: 1.15rem; font-weight: 800; } .reward-table { width: 100%; border-collapse: collapse; font-size: 0.92rem; } .reward-table td, .reward-table th { border-bottom: 1px solid #e5e7eb; padding: 6px; text-align: left; } .kbd { display: inline-block; border: 1px solid #b6bec8; border-bottom-width: 2px; border-radius: 6px; padding: 2px 7px; margin: 0 2px; background: #f6f8fa; font-weight: 700; } .codebox { background: #0d1117; color: #e6edf3; padding: 10px; border-radius: 10px; overflow-x: auto; font-family: Consolas, Monaco, monospace; font-size: 0.86rem; } .small-note { font-size: 0.9rem; opacity: 0.85; } """ KEYBOARD_JS = """ () => { function clickMoveButton(id) { const wrapper = document.getElementById(id); if (!wrapper) return; const button = wrapper.querySelector('button') || wrapper; button.click(); } document.addEventListener('keydown', function(event) { const active = document.activeElement; const typing = active && ['INPUT', 'TEXTAREA', 'SELECT'].includes(active.tagName); if (typing) return; const keyMap = { 'ArrowUp': 'move-up', 'ArrowRight': 'move-right', 'ArrowDown': 'move-down', 'ArrowLeft': 'move-left' }; if (keyMap[event.key]) { event.preventDefault(); clickMoveButton(keyMap[event.key]); } }); } """ def grid_to_html(env: GridWorldEnv) -> str: size = env.grid_size cells = [] for r in range(size): for c in range(size): val = env.grid[r, c] if val == "X": cls = "obstacle" label = "X" elif val == "S": cls = "start" label = "S" elif val == "G": cls = "goal" label = "G" else: cls = "empty" label = "" agent = '
' if env.state == (r, c) else "" cells.append(f'
{label}{agent}
') return f"""
{''.join(cells)}
""" def scoreboard_html(env: GridWorldEnv, score: Score) -> str: distance = env._manhattan_distance(env.state, env.goal) obs = env._get_observation() status = "✅ Complete" if score.success else ("⏹️ Episode ended" if score.done else "🎮 Playing") return f"""

Score Board

Mode
{score.mode}
Total Score
{score.total:.1f}
Last Reward
{score.last_reward:+.1f}
Steps
{env.steps} / 100
Manhattan Distance
{distance}
Agent Position
{env.state}
Start Position
{env.start}
Goal Position
{env.goal}

Status: {status}

Last Action: {score.last_action}

Rule Fired: {env.last_message}

Observation: [{obs[0]:.2f}, {obs[1]:.2f}, {obs[2]:.2f}, {obs[3]:.2f}]

""" def reward_code_html() -> str: return """

Reward Code Used

Move ResultReward
Hit wall or obstacle-5.0
Move closer to goal+1.0
Move farther from goal-0.5
Same Manhattan distance-0.1
First time in a new cell+0.3 bonus
Reach the goal+50.0
Timeout after 100 steps-10.0

Controls: or the on-screen buttons.

""" def rl_log_html( phase: str = "Ready", episode: int = 0, total_episodes: int = 0, epsilon: float = 0.0, successes: int = 0, recent_success_rate: float = 0.0, best_steps: Optional[int] = None, extra: str = "Press Train RL Solver to teach an agent on the current maze.", ) -> str: best = "None yet" if best_steps is None else str(best_steps) return f"""

RL Training Board

Phase
{phase}
Episode
{episode} / {total_episodes}
Exploration ε
{epsilon:.2f}
Training Successes
{successes}
Recent Success Rate
{recent_success_rate:.0%}
Best Steps
{best}

What is happening? {extra}

Q[s,a] ← Q[s,a] + α × (reward + γ × max(Q[next_state,*]) − Q[s,a])

This fast live demo uses tabular Q-learning against the same Gymnasium environment and reward function, so the learner improves by trial, reward, and correction.

""" def new_game(grid_size: int): env = GridWorldEnv(grid_size=int(grid_size), auto_generate=True, obstacle_density=0.20) env.reset() score = Score() return env, score, grid_to_html(env), scoreboard_html(env, score), reward_code_html(), rl_log_html() def move_agent(action: int, env: Optional[GridWorldEnv], score: Optional[Score], grid_size: int): if env is None or score is None: return new_game(grid_size) if score.done: env.last_message = "Episode already ended. Press Reset / Randomize Grid." return env, score, grid_to_html(env), scoreboard_html(env, score), reward_code_html(), rl_log_html(extra="Episode ended. Reset to play again.") _, reward, terminated, truncated, info = env.step(int(action)) score.last_reward = float(reward) score.total += float(reward) score.last_action = ACTION_LABELS[int(action)] score.done = bool(terminated or truncated) score.success = bool(info.get("success", False)) score.mode = "Human Play" return env, score, grid_to_html(env), scoreboard_html(env, score), reward_code_html(), rl_log_html(extra="Human mode: use the arrow keys and watch the reward code fire.") def clone_fixed_env(env: GridWorldEnv) -> GridWorldEnv: fixed = GridWorldEnv( grid=env.grid.copy(), grid_size=env.grid_size, auto_generate=False, obstacle_density=env.obstacle_density, ) fixed.reset() return fixed def valid_actions_for_grid(grid: np.ndarray, state: Position) -> List[int]: size = grid.shape[0] actions: List[int] = [] for action, (dr, dc) in enumerate(MOVES): nr, nc = state[0] + dr, state[1] + dc if 0 <= nr < size and 0 <= nc < size and grid[nr, nc] != "X": actions.append(action) return actions def manhattan(pos1: Position, pos2: Position) -> int: return abs(pos1[0] - pos2[0]) + abs(pos1[1] - pos2[1]) def choose_action( q_table: np.ndarray, grid: np.ndarray, state: Position, goal: Position, epsilon: float, rng: np.random.Generator, episode: int, total_episodes: int, ) -> int: valid = valid_actions_for_grid(grid, state) if not valid: return 0 # Exploration: early episodes wander more; later exploration is biased slightly toward the goal. if rng.random() < epsilon: if episode > total_episodes * 0.25 and rng.random() < 0.45: best_distance = min(manhattan((state[0] + MOVES[a][0], state[1] + MOVES[a][1]), goal) for a in valid) best = [a for a in valid if manhattan((state[0] + MOVES[a][0], state[1] + MOVES[a][1]), goal) == best_distance] return int(rng.choice(best)) return int(rng.choice(valid)) # Exploitation: choose the known best legal action; tie-break by Manhattan distance. max_q = max(q_table[state[0], state[1], a] for a in valid) best = [a for a in valid if q_table[state[0], state[1], a] == max_q] if len(best) > 1: best_distance = min(manhattan((state[0] + MOVES[a][0], state[1] + MOVES[a][1]), goal) for a in best) best = [a for a in best if manhattan((state[0] + MOVES[a][0], state[1] + MOVES[a][1]), goal) == best_distance] return int(rng.choice(best)) def initialize_q_table(grid: np.ndarray) -> np.ndarray: size = grid.shape[0] q_table = np.zeros((size, size, 4), dtype=np.float32) # Mark illegal moves as very poor so the learned policy obeys the game board. for r in range(size): for c in range(size): valid = set(valid_actions_for_grid(grid, (r, c))) for action in range(4): if action not in valid: q_table[r, c, action] = -10.0 return q_table def greedy_rollout(q_table: np.ndarray, grid: np.ndarray, max_steps: int) -> Tuple[bool, int]: env = GridWorldEnv(grid=grid.copy(), grid_size=grid.shape[0], auto_generate=False, obstacle_density=0.20) env.reset() seen = set() for step in range(max_steps): state = env.state valid = valid_actions_for_grid(grid, state) if not valid: return False, step max_q = max(q_table[state[0], state[1], a] for a in valid) best = [a for a in valid if q_table[state[0], state[1], a] == max_q] best_distance = min(manhattan((state[0] + MOVES[a][0], state[1] + MOVES[a][1]), env.goal) for a in best) best = [a for a in best if manhattan((state[0] + MOVES[a][0], state[1] + MOVES[a][1]), env.goal) == best_distance] action = int(best[0]) loop_key = (state, action) if loop_key in seen and step > grid.shape[0] * 2: return False, step seen.add(loop_key) _, _, done, _, info = env.step(action) if done: return bool(info.get("success", False)), step + 1 return False, max_steps def train_rl_solver( env: Optional[GridWorldEnv], score: Optional[Score], grid_size: int, training_episodes: int, animation_delay: float, ): """ Train a Q-learning agent on the current fixed maze and animate the learning. This is intentionally lightweight for Gradio: it uses the same Gymnasium env.step() reward rules, but avoids long PPO training time so learners can watch the process live. """ if env is None: env, score, grid_html, board_html, reward_html, log_html = new_game(grid_size) yield env, score, grid_html, board_html, reward_html, log_html assert env is not None grid = env.grid.copy() size = env.grid_size max_steps = max(100, size * size) total_episodes = int(training_episodes) delay = max(0.0, float(animation_delay)) q_table = initialize_q_table(grid) alpha = 0.65 gamma = 0.95 epsilon_start = 1.00 epsilon_end = 0.04 rng = np.random.default_rng() success_window: deque[bool] = deque(maxlen=25) total_successes = 0 best_steps: Optional[int] = None # Animate early, middle, and late learning attempts. preview_episodes = { 1, 2, 5, max(10, total_episodes // 5), max(20, total_episodes // 2), total_episodes, } start_env = clone_fixed_env(env) start_score = Score(mode="RL Training") start_env.last_message = "RL training is starting on this exact maze." yield ( start_env, start_score, grid_to_html(start_env), scoreboard_html(start_env, start_score), reward_code_html(), rl_log_html("Starting", 0, total_episodes, epsilon_start, 0, 0.0, None, "The agent begins with high exploration and no useful policy."), ) time.sleep(delay) final_training_env = start_env final_training_score = start_score for episode in range(1, total_episodes + 1): training_env = GridWorldEnv(grid=grid.copy(), grid_size=size, auto_generate=False, obstacle_density=env.obstacle_density) training_env.reset() episode_score = 0.0 episode_success = False should_animate = episode in preview_episodes # Linear decay keeps early exploration visible and late policy behavior cleaner. progress = episode / max(1, total_episodes) epsilon = max(epsilon_end, epsilon_start * (1.0 - progress)) for _step in range(max_steps): old_state = training_env.state action = choose_action(q_table, grid, old_state, training_env.goal, epsilon, rng, episode, total_episodes) _, reward, terminated, truncated, info = training_env.step(action) new_state = training_env.state done = bool(terminated or truncated) episode_score += float(reward) next_valid = valid_actions_for_grid(grid, new_state) next_best = max(q_table[new_state[0], new_state[1], a] for a in next_valid) if next_valid else 0.0 td_target = float(reward) if done else float(reward) + gamma * float(next_best) td_error = td_target - float(q_table[old_state[0], old_state[1], action]) q_table[old_state[0], old_state[1], action] += alpha * td_error if should_animate: live_score = Score( total=episode_score, last_reward=float(reward), last_action=ACTION_LABELS[action], done=done, success=bool(info.get("success", False)), mode="RL Training", ) extra = ( f"Animating training episode {episode}. The agent is using ε-greedy exploration, " f"then updating its Q-table from the reward it receives." ) yield ( training_env, live_score, grid_to_html(training_env), scoreboard_html(training_env, live_score), reward_code_html(), rl_log_html("Learning", episode, total_episodes, epsilon, total_successes, (sum(success_window) / len(success_window)) if success_window else 0.0, best_steps, extra), ) time.sleep(delay) if done: episode_success = bool(info.get("success", False)) break success_window.append(episode_success) if episode_success: total_successes += 1 if best_steps is None or training_env.steps < best_steps: best_steps = training_env.steps final_training_env = training_env final_training_score = Score( total=episode_score, last_reward=0.0, last_action="Episode complete", done=True, success=episode_success, mode="RL Training", ) # Lightweight progress updates without animating every step. if episode % max(10, total_episodes // 10) == 0 and not should_animate: recent_rate = (sum(success_window) / len(success_window)) if success_window else 0.0 extra = f"Training checkpoint: recent success rate is {recent_rate:.0%}. The policy is becoming less random as ε decays." yield ( final_training_env, final_training_score, grid_to_html(final_training_env), scoreboard_html(final_training_env, final_training_score), reward_code_html(), rl_log_html("Training", episode, total_episodes, epsilon, total_successes, recent_rate, best_steps, extra), ) # If the greedy policy does not solve yet, do a small hidden reinforcement top-up. # This keeps the demo classroom-friendly while still using Q-learning updates. solved, greedy_steps = greedy_rollout(q_table, grid, max_steps) extra_rounds = 0 while not solved and extra_rounds < 3: extra_rounds += 1 for _ in range(max(100, total_episodes // 2)): training_env = GridWorldEnv(grid=grid.copy(), grid_size=size, auto_generate=False, obstacle_density=env.obstacle_density) training_env.reset() for _step in range(max_steps): old_state = training_env.state action = choose_action(q_table, grid, old_state, training_env.goal, 0.20, rng, total_episodes, total_episodes) _, reward, terminated, truncated, _info = training_env.step(action) new_state = training_env.state done = bool(terminated or truncated) next_valid = valid_actions_for_grid(grid, new_state) next_best = max(q_table[new_state[0], new_state[1], a] for a in next_valid) if next_valid else 0.0 td_target = float(reward) if done else float(reward) + gamma * float(next_best) q_table[old_state[0], old_state[1], action] += alpha * (td_target - q_table[old_state[0], old_state[1], action]) if done: break solved, greedy_steps = greedy_rollout(q_table, grid, max_steps) yield ( final_training_env, final_training_score, grid_to_html(final_training_env), scoreboard_html(final_training_env, final_training_score), reward_code_html(), rl_log_html("Extra RL Training", total_episodes, total_episodes, 0.20, total_successes, (sum(success_window) / len(success_window)) if success_window else 0.0, best_steps, "The first greedy rollout was not reliable yet, so the learner is getting extra reinforcement experience."), ) # Final greedy policy rollout: no random exploration. rollout_env = GridWorldEnv(grid=grid.copy(), grid_size=size, auto_generate=False, obstacle_density=env.obstacle_density) rollout_env.reset() rollout_score = Score(mode="Final Learned Policy") yield ( rollout_env, rollout_score, grid_to_html(rollout_env), scoreboard_html(rollout_env, rollout_score), reward_code_html(), rl_log_html("Final Rollout", total_episodes, total_episodes, 0.0, total_successes, (sum(success_window) / len(success_window)) if success_window else 0.0, best_steps, "Now exploration is off. The agent follows the learned policy greedily."), ) time.sleep(delay) visited_state_counts = {} for _step in range(max_steps): state = rollout_env.state valid = valid_actions_for_grid(grid, state) if not valid: rollout_env.last_message = "No valid actions available." break max_q = max(q_table[state[0], state[1], a] for a in valid) best = [a for a in valid if q_table[state[0], state[1], a] == max_q] if len(best) > 1: best_distance = min(manhattan((state[0] + MOVES[a][0], state[1] + MOVES[a][1]), rollout_env.goal) for a in best) best = [a for a in best if manhattan((state[0] + MOVES[a][0], state[1] + MOVES[a][1]), rollout_env.goal) == best_distance] action = int(best[0]) _, reward, terminated, truncated, info = rollout_env.step(action) rollout_score.total += float(reward) rollout_score.last_reward = float(reward) rollout_score.last_action = ACTION_LABELS[action] rollout_score.done = bool(terminated or truncated) rollout_score.success = bool(info.get("success", False)) rollout_score.mode = "Final Learned Policy" recent_rate = (sum(success_window) / len(success_window)) if success_window else 0.0 extra = "The final learned policy is moving without random exploration." yield ( rollout_env, rollout_score, grid_to_html(rollout_env), scoreboard_html(rollout_env, rollout_score), reward_code_html(), rl_log_html("Final Rollout", total_episodes, total_episodes, 0.0, total_successes, recent_rate, best_steps, extra), ) time.sleep(delay) if rollout_score.done: break visited_state_counts[state] = visited_state_counts.get(state, 0) + 1 if visited_state_counts[state] > 4: rollout_env.last_message = "The learned policy is looping. Press Train again or increase episodes." rollout_score.done = True rollout_score.success = False yield ( rollout_env, rollout_score, grid_to_html(rollout_env), scoreboard_html(rollout_env, rollout_score), reward_code_html(), rl_log_html("Needs More Training", total_episodes, total_episodes, 0.0, total_successes, recent_rate, best_steps, "The agent learned something, but this maze needs more episodes. Increase the episode slider and train again."), ) break def build_app() -> gr.Blocks: with gr.Blocks(css=CSS, js=KEYBOARD_JS, title="Warehouse GridWorld Game + RL Solver") as demo: gr.Markdown( """ # 🏗️ Warehouse GridWorld Game + Live RL Solver Use the arrow keys to move the red agent from **S** to **G**. Obstacles are randomized at **20%** on every reset. Press **Train RL Solver + Animate** to watch a reinforcement learner practice on the current maze and then perform a final learned run. """, elem_id="game-title", ) env_state = gr.State(None) score_state = gr.State(None) with gr.Row(): with gr.Column(scale=2): grid_display = gr.HTML(label="Warehouse Grid") with gr.Row(): up_btn = gr.Button("↑ Up", elem_id="move-up") with gr.Row(): left_btn = gr.Button("← Left", elem_id="move-left") down_btn = gr.Button("↓ Down", elem_id="move-down") right_btn = gr.Button("→ Right", elem_id="move-right") with gr.Column(scale=1): grid_size = gr.Slider(5, 15, value=10, step=1, label="Grid Size") reset_btn = gr.Button("🔄 Reset / Randomize Grid", variant="primary") train_episodes = gr.Slider(50, 2000, value=600, step=50, label="RL Training Episodes") animation_delay = gr.Slider(0.00, 0.25, value=0.04, step=0.01, label="Animation Delay Seconds") train_btn = gr.Button("🤖 Train RL Solver + Animate", variant="secondary") score_display = gr.HTML(label="Score Board") reward_display = gr.HTML(label="Reward Code") rl_display = gr.HTML(label="RL Training Board") outputs = [env_state, score_state, grid_display, score_display, reward_display, rl_display] human_inputs = [env_state, score_state, grid_size] train_inputs = [env_state, score_state, grid_size, train_episodes, animation_delay] demo.load(fn=new_game, inputs=[grid_size], outputs=outputs) reset_btn.click(fn=new_game, inputs=[grid_size], outputs=outputs) up_btn.click(fn=lambda env, score, size: move_agent(0, env, score, size), inputs=human_inputs, outputs=outputs) right_btn.click(fn=lambda env, score, size: move_agent(1, env, score, size), inputs=human_inputs, outputs=outputs) down_btn.click(fn=lambda env, score, size: move_agent(2, env, score, size), inputs=human_inputs, outputs=outputs) left_btn.click(fn=lambda env, score, size: move_agent(3, env, score, size), inputs=human_inputs, outputs=outputs) train_btn.click(fn=train_rl_solver, inputs=train_inputs, outputs=outputs) return demo if __name__ == "__main__": app = build_app() app.queue() app.launch()