eaglelandsonce's picture
Rename warehouse_gradio_game_rl_solver.py to app.py
be1ef45 verified
"""
Warehouse GridWorld Human-Play + Live RL Solver Demo
----------------------------------------------------
A Gymnasium + Gradio game version of the RL warehouse navigation environment.
New in this version:
- Human play with arrow keys
- Reset/randomize start, goal, and 20% obstacles
- Live scoreboard with Manhattan distance
- RL Solver button that trains a Q-learning agent on the current maze
- Animated learning episodes plus an animated final greedy policy rollout
Run:
pip install -r requirements_warehouse_game_rl_solver.txt
python warehouse_gradio_game_rl_solver.py
Controls:
Arrow keys or on-screen buttons
"""
from __future__ import annotations
from collections import deque
from dataclasses import dataclass
from typing import List, Optional, Set, Tuple
import time
import gradio as gr
import gymnasium as gym
import numpy as np
from gymnasium import spaces
Position = Tuple[int, int]
class GridWorldEnv(gym.Env):
"""
Custom Gymnasium GridWorld environment for a warehouse navigation game.
Symbols:
S = Start
G = Goal
X = Obstacle
. = Empty floor
A = Agent, drawn by the UI as a red circle
"""
metadata = {"render_modes": ["human"]}
def __init__(
self,
grid: Optional[np.ndarray] = None,
grid_size: int = 10,
auto_generate: bool = True,
obstacle_density: float = 0.20,
):
super().__init__()
self.grid_size = int(grid_size)
self.auto_generate = bool(auto_generate)
self.obstacle_density = float(obstacle_density)
if grid is None and self.auto_generate:
self.grid = self._generate_random_grid()
else:
self.grid = grid.copy() if grid is not None else np.full((self.grid_size, self.grid_size), ".", dtype=str)
# 0=UP, 1=RIGHT, 2=DOWN, 3=LEFT
self.action_space = spaces.Discrete(4)
# [agent_row_norm, agent_col_norm, goal_row_norm, goal_col_norm]
self.observation_space = spaces.Box(
low=0.0,
high=1.0,
shape=(4,),
dtype=np.float32,
)
self.start: Position = tuple(np.argwhere(self.grid == "S")[0]) if "S" in self.grid else (0, 0)
self.goal: Position = tuple(np.argwhere(self.grid == "G")[0]) if "G" in self.grid else (self.grid_size - 1, self.grid_size - 1)
self.state: Position = self.start
self.visited: Set[Position] = {self.start}
self.steps = 0
self.initial_distance = self._manhattan_distance(self.start, self.goal)
self.prev_distance = self.initial_distance
self.last_message = "New episode started."
def _generate_random_grid(self) -> np.ndarray:
"""Generate a random, solvable grid with 20% obstacles by default."""
max_attempts = 500
for _ in range(max_attempts):
grid = np.full((self.grid_size, self.grid_size), ".", dtype=str)
available_positions = [(r, c) for r in range(self.grid_size) for c in range(self.grid_size)]
np.random.shuffle(available_positions)
start_pos = available_positions[0]
goal_pos = available_positions[1]
# Avoid trivial games where start and goal are side-by-side.
if self._manhattan_distance(start_pos, goal_pos) < max(2, self.grid_size // 2):
continue
grid[start_pos] = "S"
grid[goal_pos] = "G"
num_obstacles = int((self.grid_size * self.grid_size - 2) * self.obstacle_density)
obstacle_candidates = available_positions[2:]
for pos in obstacle_candidates[:num_obstacles]:
r, c = pos
grid[r, c] = "X"
# Make sure the game can be solved.
if self._has_path_bfs(grid, start_pos, goal_pos):
return grid
# Fallback grid with guaranteed path.
grid = np.full((self.grid_size, self.grid_size), ".", dtype=str)
grid[0, 0] = "S"
grid[self.grid_size - 1, self.grid_size - 1] = "G"
return grid
def _has_path_bfs(self, grid: np.ndarray, start: Position, goal: Position) -> bool:
queue = deque([start])
visited = {start}
directions = [(-1, 0), (0, 1), (1, 0), (0, -1)]
while queue:
r, c = queue.popleft()
if (r, c) == goal:
return True
for dr, dc in directions:
nr, nc = r + dr, c + dc
if (
0 <= nr < self.grid_size
and 0 <= nc < self.grid_size
and (nr, nc) not in visited
and grid[nr, nc] != "X"
):
visited.add((nr, nc))
queue.append((nr, nc))
return False
def _manhattan_distance(self, pos1: Position, pos2: Position) -> int:
return abs(pos1[0] - pos2[0]) + abs(pos1[1] - pos2[1])
def reset(self, *, seed: Optional[int] = None, options: Optional[dict] = None):
super().reset(seed=seed)
if self.auto_generate:
self.grid = self._generate_random_grid()
self.start = tuple(np.argwhere(self.grid == "S")[0])
self.goal = tuple(np.argwhere(self.grid == "G")[0])
self.state = self.start
self.visited = {self.start}
self.steps = 0
self.initial_distance = self._manhattan_distance(self.start, self.goal)
self.prev_distance = self.initial_distance
self.last_message = "New episode started. Agent begins on S."
return self._get_observation(), {}
def _get_observation(self) -> np.ndarray:
divisor = max(1, self.grid_size - 1)
return np.array(
[
self.state[0] / divisor,
self.state[1] / divisor,
self.goal[0] / divisor,
self.goal[1] / divisor,
],
dtype=np.float32,
)
def step(self, action: int):
"""
Same scoring idea as the RL lab:
-5.0 invalid wall/obstacle move
+1.0 moved closer to goal
-0.5 moved farther from goal
-0.1 same Manhattan distance
+0.3 first time visiting a new cell
+50.0 reached goal
-10.0 timeout at 100 steps
"""
action = int(action)
r, c = self.state
moves = [(-1, 0), (0, 1), (1, 0), (0, -1)]
action_names = ["UP", "RIGHT", "DOWN", "LEFT"]
dr, dc = moves[action]
nr, nc = r + dr, c + dc
self.steps += 1
reward = 0.0
hit_wall = nr < 0 or nr >= self.grid_size or nc < 0 or nc >= self.grid_size
hit_obstacle = not hit_wall and self.grid[nr, nc] == "X"
if hit_wall or hit_obstacle:
reward = -5.0
self.last_message = f"{action_names[action]} blocked: wall/obstacle. Agent stays put."
else:
self.state = (nr, nc)
new_distance = self._manhattan_distance(self.state, self.goal)
if new_distance < self.prev_distance:
reward = 1.0
direction_msg = "closer to the goal"
elif new_distance > self.prev_distance:
reward = -0.5
direction_msg = "farther from the goal"
else:
reward = -0.1
direction_msg = "same Manhattan distance"
self.prev_distance = new_distance
if self.state not in self.visited:
self.visited.add(self.state)
reward += 0.3
direction_msg += " + new-cell bonus"
self.last_message = f"{action_names[action]}: valid move, {direction_msg}."
if self.state == self.goal:
reward = 50.0
self.last_message = "Goal reached! Mission complete."
return self._get_observation(), reward, True, False, {
"success": True,
"steps": self.steps,
"message": self.last_message,
}
if self.steps >= 100:
reward = -10.0
self.last_message = "Timeout: 100 steps reached."
return self._get_observation(), reward, True, False, {
"success": False,
"steps": self.steps,
"message": self.last_message,
}
return self._get_observation(), reward, False, False, {"message": self.last_message}
def render(self) -> str:
grid_vis = self.grid.copy()
r, c = self.state
grid_vis[r, c] = "A"
return "\n".join(" ".join(row) for row in grid_vis)
@dataclass
class Score:
total: float = 0.0
last_reward: float = 0.0
last_action: str = "None"
done: bool = False
success: bool = False
mode: str = "Human Play"
ACTION_LABELS = {
0: "↑ UP",
1: "→ RIGHT",
2: "↓ DOWN",
3: "← LEFT",
}
MOVES = [(-1, 0), (0, 1), (1, 0), (0, -1)]
CSS = """
#game-title { text-align: center; }
.warehouse-wrap { display: flex; justify-content: center; width: 100%; }
.warehouse-grid {
display: grid;
gap: 0px;
border: 3px solid #1f3347;
background: #1f3347;
width: min(78vw, 640px);
aspect-ratio: 1 / 1;
}
.cell {
position: relative;
border: 1.5px solid #243b55;
display: flex;
align-items: center;
justify-content: center;
font-weight: 800;
font-family: Arial, sans-serif;
user-select: none;
}
.empty { background: #edf2f4; color: #1f3347; }
.start { background: #3498db; color: white; }
.goal { background: #2ecc71; color: white; }
.obstacle { background: #2c3e50; color: white; }
.agent-dot {
width: 58%;
height: 58%;
border-radius: 50%;
background: #e74c3c;
box-shadow: 0 0 0 3px rgba(255,255,255,0.45);
position: absolute;
}
.panel {
border: 1px solid #d0d7de;
border-radius: 14px;
padding: 14px;
background: #ffffff;
box-shadow: 0 1px 4px rgba(0,0,0,0.06);
font-family: Arial, sans-serif;
margin-bottom: 10px;
}
.score-grid {
display: grid;
grid-template-columns: 1fr 1fr;
gap: 8px;
}
.score-item {
background: #f6f8fa;
border-radius: 10px;
padding: 8px 10px;
}
.score-label { font-size: 0.80rem; opacity: 0.75; }
.score-value { font-size: 1.15rem; font-weight: 800; }
.reward-table {
width: 100%;
border-collapse: collapse;
font-size: 0.92rem;
}
.reward-table td, .reward-table th {
border-bottom: 1px solid #e5e7eb;
padding: 6px;
text-align: left;
}
.kbd {
display: inline-block;
border: 1px solid #b6bec8;
border-bottom-width: 2px;
border-radius: 6px;
padding: 2px 7px;
margin: 0 2px;
background: #f6f8fa;
font-weight: 700;
}
.codebox {
background: #0d1117;
color: #e6edf3;
padding: 10px;
border-radius: 10px;
overflow-x: auto;
font-family: Consolas, Monaco, monospace;
font-size: 0.86rem;
}
.small-note { font-size: 0.9rem; opacity: 0.85; }
"""
KEYBOARD_JS = """
() => {
function clickMoveButton(id) {
const wrapper = document.getElementById(id);
if (!wrapper) return;
const button = wrapper.querySelector('button') || wrapper;
button.click();
}
document.addEventListener('keydown', function(event) {
const active = document.activeElement;
const typing = active && ['INPUT', 'TEXTAREA', 'SELECT'].includes(active.tagName);
if (typing) return;
const keyMap = {
'ArrowUp': 'move-up',
'ArrowRight': 'move-right',
'ArrowDown': 'move-down',
'ArrowLeft': 'move-left'
};
if (keyMap[event.key]) {
event.preventDefault();
clickMoveButton(keyMap[event.key]);
}
});
}
"""
def grid_to_html(env: GridWorldEnv) -> str:
size = env.grid_size
cells = []
for r in range(size):
for c in range(size):
val = env.grid[r, c]
if val == "X":
cls = "obstacle"
label = "X"
elif val == "S":
cls = "start"
label = "S"
elif val == "G":
cls = "goal"
label = "G"
else:
cls = "empty"
label = ""
agent = '<div class="agent-dot" title="Agent"></div>' if env.state == (r, c) else ""
cells.append(f'<div class="cell {cls}" title="row {r}, col {c}"><span>{label}</span>{agent}</div>')
return f"""
<div class="warehouse-wrap">
<div class="warehouse-grid" style="grid-template-columns: repeat({size}, 1fr);">
{''.join(cells)}
</div>
</div>
"""
def scoreboard_html(env: GridWorldEnv, score: Score) -> str:
distance = env._manhattan_distance(env.state, env.goal)
obs = env._get_observation()
status = "✅ Complete" if score.success else ("⏹️ Episode ended" if score.done else "🎮 Playing")
return f"""
<div class="panel">
<h3>Score Board</h3>
<div class="score-grid">
<div class="score-item"><div class="score-label">Mode</div><div class="score-value">{score.mode}</div></div>
<div class="score-item"><div class="score-label">Total Score</div><div class="score-value">{score.total:.1f}</div></div>
<div class="score-item"><div class="score-label">Last Reward</div><div class="score-value">{score.last_reward:+.1f}</div></div>
<div class="score-item"><div class="score-label">Steps</div><div class="score-value">{env.steps} / 100</div></div>
<div class="score-item"><div class="score-label">Manhattan Distance</div><div class="score-value">{distance}</div></div>
<div class="score-item"><div class="score-label">Agent Position</div><div class="score-value">{env.state}</div></div>
<div class="score-item"><div class="score-label">Start Position</div><div class="score-value">{env.start}</div></div>
<div class="score-item"><div class="score-label">Goal Position</div><div class="score-value">{env.goal}</div></div>
</div>
<p><b>Status:</b> {status}</p>
<p><b>Last Action:</b> {score.last_action}</p>
<p><b>Rule Fired:</b> {env.last_message}</p>
<p><b>Observation:</b> [{obs[0]:.2f}, {obs[1]:.2f}, {obs[2]:.2f}, {obs[3]:.2f}]</p>
</div>
"""
def reward_code_html() -> str:
return """
<div class="panel">
<h3>Reward Code Used</h3>
<table class="reward-table">
<tr><th>Move Result</th><th>Reward</th></tr>
<tr><td>Hit wall or obstacle</td><td>-5.0</td></tr>
<tr><td>Move closer to goal</td><td>+1.0</td></tr>
<tr><td>Move farther from goal</td><td>-0.5</td></tr>
<tr><td>Same Manhattan distance</td><td>-0.1</td></tr>
<tr><td>First time in a new cell</td><td>+0.3 bonus</td></tr>
<tr><td>Reach the goal</td><td>+50.0</td></tr>
<tr><td>Timeout after 100 steps</td><td>-10.0</td></tr>
</table>
<p>Controls: <span class="kbd">↑</span><span class="kbd">→</span><span class="kbd">↓</span><span class="kbd">←</span> or the on-screen buttons.</p>
</div>
"""
def rl_log_html(
phase: str = "Ready",
episode: int = 0,
total_episodes: int = 0,
epsilon: float = 0.0,
successes: int = 0,
recent_success_rate: float = 0.0,
best_steps: Optional[int] = None,
extra: str = "Press Train RL Solver to teach an agent on the current maze.",
) -> str:
best = "None yet" if best_steps is None else str(best_steps)
return f"""
<div class="panel">
<h3>RL Training Board</h3>
<div class="score-grid">
<div class="score-item"><div class="score-label">Phase</div><div class="score-value">{phase}</div></div>
<div class="score-item"><div class="score-label">Episode</div><div class="score-value">{episode} / {total_episodes}</div></div>
<div class="score-item"><div class="score-label">Exploration ε</div><div class="score-value">{epsilon:.2f}</div></div>
<div class="score-item"><div class="score-label">Training Successes</div><div class="score-value">{successes}</div></div>
<div class="score-item"><div class="score-label">Recent Success Rate</div><div class="score-value">{recent_success_rate:.0%}</div></div>
<div class="score-item"><div class="score-label">Best Steps</div><div class="score-value">{best}</div></div>
</div>
<p><b>What is happening?</b> {extra}</p>
<div class="codebox">Q[s,a] ← Q[s,a] + α × (reward + γ × max(Q[next_state,*]) − Q[s,a])</div>
<p class="small-note">This fast live demo uses tabular Q-learning against the same Gymnasium environment and reward function, so the learner improves by trial, reward, and correction.</p>
</div>
"""
def new_game(grid_size: int):
env = GridWorldEnv(grid_size=int(grid_size), auto_generate=True, obstacle_density=0.20)
env.reset()
score = Score()
return env, score, grid_to_html(env), scoreboard_html(env, score), reward_code_html(), rl_log_html()
def move_agent(action: int, env: Optional[GridWorldEnv], score: Optional[Score], grid_size: int):
if env is None or score is None:
return new_game(grid_size)
if score.done:
env.last_message = "Episode already ended. Press Reset / Randomize Grid."
return env, score, grid_to_html(env), scoreboard_html(env, score), reward_code_html(), rl_log_html(extra="Episode ended. Reset to play again.")
_, reward, terminated, truncated, info = env.step(int(action))
score.last_reward = float(reward)
score.total += float(reward)
score.last_action = ACTION_LABELS[int(action)]
score.done = bool(terminated or truncated)
score.success = bool(info.get("success", False))
score.mode = "Human Play"
return env, score, grid_to_html(env), scoreboard_html(env, score), reward_code_html(), rl_log_html(extra="Human mode: use the arrow keys and watch the reward code fire.")
def clone_fixed_env(env: GridWorldEnv) -> GridWorldEnv:
fixed = GridWorldEnv(
grid=env.grid.copy(),
grid_size=env.grid_size,
auto_generate=False,
obstacle_density=env.obstacle_density,
)
fixed.reset()
return fixed
def valid_actions_for_grid(grid: np.ndarray, state: Position) -> List[int]:
size = grid.shape[0]
actions: List[int] = []
for action, (dr, dc) in enumerate(MOVES):
nr, nc = state[0] + dr, state[1] + dc
if 0 <= nr < size and 0 <= nc < size and grid[nr, nc] != "X":
actions.append(action)
return actions
def manhattan(pos1: Position, pos2: Position) -> int:
return abs(pos1[0] - pos2[0]) + abs(pos1[1] - pos2[1])
def choose_action(
q_table: np.ndarray,
grid: np.ndarray,
state: Position,
goal: Position,
epsilon: float,
rng: np.random.Generator,
episode: int,
total_episodes: int,
) -> int:
valid = valid_actions_for_grid(grid, state)
if not valid:
return 0
# Exploration: early episodes wander more; later exploration is biased slightly toward the goal.
if rng.random() < epsilon:
if episode > total_episodes * 0.25 and rng.random() < 0.45:
best_distance = min(manhattan((state[0] + MOVES[a][0], state[1] + MOVES[a][1]), goal) for a in valid)
best = [a for a in valid if manhattan((state[0] + MOVES[a][0], state[1] + MOVES[a][1]), goal) == best_distance]
return int(rng.choice(best))
return int(rng.choice(valid))
# Exploitation: choose the known best legal action; tie-break by Manhattan distance.
max_q = max(q_table[state[0], state[1], a] for a in valid)
best = [a for a in valid if q_table[state[0], state[1], a] == max_q]
if len(best) > 1:
best_distance = min(manhattan((state[0] + MOVES[a][0], state[1] + MOVES[a][1]), goal) for a in best)
best = [a for a in best if manhattan((state[0] + MOVES[a][0], state[1] + MOVES[a][1]), goal) == best_distance]
return int(rng.choice(best))
def initialize_q_table(grid: np.ndarray) -> np.ndarray:
size = grid.shape[0]
q_table = np.zeros((size, size, 4), dtype=np.float32)
# Mark illegal moves as very poor so the learned policy obeys the game board.
for r in range(size):
for c in range(size):
valid = set(valid_actions_for_grid(grid, (r, c)))
for action in range(4):
if action not in valid:
q_table[r, c, action] = -10.0
return q_table
def greedy_rollout(q_table: np.ndarray, grid: np.ndarray, max_steps: int) -> Tuple[bool, int]:
env = GridWorldEnv(grid=grid.copy(), grid_size=grid.shape[0], auto_generate=False, obstacle_density=0.20)
env.reset()
seen = set()
for step in range(max_steps):
state = env.state
valid = valid_actions_for_grid(grid, state)
if not valid:
return False, step
max_q = max(q_table[state[0], state[1], a] for a in valid)
best = [a for a in valid if q_table[state[0], state[1], a] == max_q]
best_distance = min(manhattan((state[0] + MOVES[a][0], state[1] + MOVES[a][1]), env.goal) for a in best)
best = [a for a in best if manhattan((state[0] + MOVES[a][0], state[1] + MOVES[a][1]), env.goal) == best_distance]
action = int(best[0])
loop_key = (state, action)
if loop_key in seen and step > grid.shape[0] * 2:
return False, step
seen.add(loop_key)
_, _, done, _, info = env.step(action)
if done:
return bool(info.get("success", False)), step + 1
return False, max_steps
def train_rl_solver(
env: Optional[GridWorldEnv],
score: Optional[Score],
grid_size: int,
training_episodes: int,
animation_delay: float,
):
"""
Train a Q-learning agent on the current fixed maze and animate the learning.
This is intentionally lightweight for Gradio: it uses the same Gymnasium env.step()
reward rules, but avoids long PPO training time so learners can watch the process live.
"""
if env is None:
env, score, grid_html, board_html, reward_html, log_html = new_game(grid_size)
yield env, score, grid_html, board_html, reward_html, log_html
assert env is not None
grid = env.grid.copy()
size = env.grid_size
max_steps = max(100, size * size)
total_episodes = int(training_episodes)
delay = max(0.0, float(animation_delay))
q_table = initialize_q_table(grid)
alpha = 0.65
gamma = 0.95
epsilon_start = 1.00
epsilon_end = 0.04
rng = np.random.default_rng()
success_window: deque[bool] = deque(maxlen=25)
total_successes = 0
best_steps: Optional[int] = None
# Animate early, middle, and late learning attempts.
preview_episodes = {
1,
2,
5,
max(10, total_episodes // 5),
max(20, total_episodes // 2),
total_episodes,
}
start_env = clone_fixed_env(env)
start_score = Score(mode="RL Training")
start_env.last_message = "RL training is starting on this exact maze."
yield (
start_env,
start_score,
grid_to_html(start_env),
scoreboard_html(start_env, start_score),
reward_code_html(),
rl_log_html("Starting", 0, total_episodes, epsilon_start, 0, 0.0, None, "The agent begins with high exploration and no useful policy."),
)
time.sleep(delay)
final_training_env = start_env
final_training_score = start_score
for episode in range(1, total_episodes + 1):
training_env = GridWorldEnv(grid=grid.copy(), grid_size=size, auto_generate=False, obstacle_density=env.obstacle_density)
training_env.reset()
episode_score = 0.0
episode_success = False
should_animate = episode in preview_episodes
# Linear decay keeps early exploration visible and late policy behavior cleaner.
progress = episode / max(1, total_episodes)
epsilon = max(epsilon_end, epsilon_start * (1.0 - progress))
for _step in range(max_steps):
old_state = training_env.state
action = choose_action(q_table, grid, old_state, training_env.goal, epsilon, rng, episode, total_episodes)
_, reward, terminated, truncated, info = training_env.step(action)
new_state = training_env.state
done = bool(terminated or truncated)
episode_score += float(reward)
next_valid = valid_actions_for_grid(grid, new_state)
next_best = max(q_table[new_state[0], new_state[1], a] for a in next_valid) if next_valid else 0.0
td_target = float(reward) if done else float(reward) + gamma * float(next_best)
td_error = td_target - float(q_table[old_state[0], old_state[1], action])
q_table[old_state[0], old_state[1], action] += alpha * td_error
if should_animate:
live_score = Score(
total=episode_score,
last_reward=float(reward),
last_action=ACTION_LABELS[action],
done=done,
success=bool(info.get("success", False)),
mode="RL Training",
)
extra = (
f"Animating training episode {episode}. The agent is using ε-greedy exploration, "
f"then updating its Q-table from the reward it receives."
)
yield (
training_env,
live_score,
grid_to_html(training_env),
scoreboard_html(training_env, live_score),
reward_code_html(),
rl_log_html("Learning", episode, total_episodes, epsilon, total_successes, (sum(success_window) / len(success_window)) if success_window else 0.0, best_steps, extra),
)
time.sleep(delay)
if done:
episode_success = bool(info.get("success", False))
break
success_window.append(episode_success)
if episode_success:
total_successes += 1
if best_steps is None or training_env.steps < best_steps:
best_steps = training_env.steps
final_training_env = training_env
final_training_score = Score(
total=episode_score,
last_reward=0.0,
last_action="Episode complete",
done=True,
success=episode_success,
mode="RL Training",
)
# Lightweight progress updates without animating every step.
if episode % max(10, total_episodes // 10) == 0 and not should_animate:
recent_rate = (sum(success_window) / len(success_window)) if success_window else 0.0
extra = f"Training checkpoint: recent success rate is {recent_rate:.0%}. The policy is becoming less random as ε decays."
yield (
final_training_env,
final_training_score,
grid_to_html(final_training_env),
scoreboard_html(final_training_env, final_training_score),
reward_code_html(),
rl_log_html("Training", episode, total_episodes, epsilon, total_successes, recent_rate, best_steps, extra),
)
# If the greedy policy does not solve yet, do a small hidden reinforcement top-up.
# This keeps the demo classroom-friendly while still using Q-learning updates.
solved, greedy_steps = greedy_rollout(q_table, grid, max_steps)
extra_rounds = 0
while not solved and extra_rounds < 3:
extra_rounds += 1
for _ in range(max(100, total_episodes // 2)):
training_env = GridWorldEnv(grid=grid.copy(), grid_size=size, auto_generate=False, obstacle_density=env.obstacle_density)
training_env.reset()
for _step in range(max_steps):
old_state = training_env.state
action = choose_action(q_table, grid, old_state, training_env.goal, 0.20, rng, total_episodes, total_episodes)
_, reward, terminated, truncated, _info = training_env.step(action)
new_state = training_env.state
done = bool(terminated or truncated)
next_valid = valid_actions_for_grid(grid, new_state)
next_best = max(q_table[new_state[0], new_state[1], a] for a in next_valid) if next_valid else 0.0
td_target = float(reward) if done else float(reward) + gamma * float(next_best)
q_table[old_state[0], old_state[1], action] += alpha * (td_target - q_table[old_state[0], old_state[1], action])
if done:
break
solved, greedy_steps = greedy_rollout(q_table, grid, max_steps)
yield (
final_training_env,
final_training_score,
grid_to_html(final_training_env),
scoreboard_html(final_training_env, final_training_score),
reward_code_html(),
rl_log_html("Extra RL Training", total_episodes, total_episodes, 0.20, total_successes, (sum(success_window) / len(success_window)) if success_window else 0.0, best_steps, "The first greedy rollout was not reliable yet, so the learner is getting extra reinforcement experience."),
)
# Final greedy policy rollout: no random exploration.
rollout_env = GridWorldEnv(grid=grid.copy(), grid_size=size, auto_generate=False, obstacle_density=env.obstacle_density)
rollout_env.reset()
rollout_score = Score(mode="Final Learned Policy")
yield (
rollout_env,
rollout_score,
grid_to_html(rollout_env),
scoreboard_html(rollout_env, rollout_score),
reward_code_html(),
rl_log_html("Final Rollout", total_episodes, total_episodes, 0.0, total_successes, (sum(success_window) / len(success_window)) if success_window else 0.0, best_steps, "Now exploration is off. The agent follows the learned policy greedily."),
)
time.sleep(delay)
visited_state_counts = {}
for _step in range(max_steps):
state = rollout_env.state
valid = valid_actions_for_grid(grid, state)
if not valid:
rollout_env.last_message = "No valid actions available."
break
max_q = max(q_table[state[0], state[1], a] for a in valid)
best = [a for a in valid if q_table[state[0], state[1], a] == max_q]
if len(best) > 1:
best_distance = min(manhattan((state[0] + MOVES[a][0], state[1] + MOVES[a][1]), rollout_env.goal) for a in best)
best = [a for a in best if manhattan((state[0] + MOVES[a][0], state[1] + MOVES[a][1]), rollout_env.goal) == best_distance]
action = int(best[0])
_, reward, terminated, truncated, info = rollout_env.step(action)
rollout_score.total += float(reward)
rollout_score.last_reward = float(reward)
rollout_score.last_action = ACTION_LABELS[action]
rollout_score.done = bool(terminated or truncated)
rollout_score.success = bool(info.get("success", False))
rollout_score.mode = "Final Learned Policy"
recent_rate = (sum(success_window) / len(success_window)) if success_window else 0.0
extra = "The final learned policy is moving without random exploration."
yield (
rollout_env,
rollout_score,
grid_to_html(rollout_env),
scoreboard_html(rollout_env, rollout_score),
reward_code_html(),
rl_log_html("Final Rollout", total_episodes, total_episodes, 0.0, total_successes, recent_rate, best_steps, extra),
)
time.sleep(delay)
if rollout_score.done:
break
visited_state_counts[state] = visited_state_counts.get(state, 0) + 1
if visited_state_counts[state] > 4:
rollout_env.last_message = "The learned policy is looping. Press Train again or increase episodes."
rollout_score.done = True
rollout_score.success = False
yield (
rollout_env,
rollout_score,
grid_to_html(rollout_env),
scoreboard_html(rollout_env, rollout_score),
reward_code_html(),
rl_log_html("Needs More Training", total_episodes, total_episodes, 0.0, total_successes, recent_rate, best_steps, "The agent learned something, but this maze needs more episodes. Increase the episode slider and train again."),
)
break
def build_app() -> gr.Blocks:
with gr.Blocks(css=CSS, js=KEYBOARD_JS, title="Warehouse GridWorld Game + RL Solver") as demo:
gr.Markdown(
"""
# 🏗️ Warehouse GridWorld Game + Live RL Solver
Use the arrow keys to move the red agent from **S** to **G**. Obstacles are randomized at **20%** on every reset.
Press **Train RL Solver + Animate** to watch a reinforcement learner practice on the current maze and then perform a final learned run.
""",
elem_id="game-title",
)
env_state = gr.State(None)
score_state = gr.State(None)
with gr.Row():
with gr.Column(scale=2):
grid_display = gr.HTML(label="Warehouse Grid")
with gr.Row():
up_btn = gr.Button("↑ Up", elem_id="move-up")
with gr.Row():
left_btn = gr.Button("← Left", elem_id="move-left")
down_btn = gr.Button("↓ Down", elem_id="move-down")
right_btn = gr.Button("→ Right", elem_id="move-right")
with gr.Column(scale=1):
grid_size = gr.Slider(5, 15, value=10, step=1, label="Grid Size")
reset_btn = gr.Button("🔄 Reset / Randomize Grid", variant="primary")
train_episodes = gr.Slider(50, 2000, value=600, step=50, label="RL Training Episodes")
animation_delay = gr.Slider(0.00, 0.25, value=0.04, step=0.01, label="Animation Delay Seconds")
train_btn = gr.Button("🤖 Train RL Solver + Animate", variant="secondary")
score_display = gr.HTML(label="Score Board")
reward_display = gr.HTML(label="Reward Code")
rl_display = gr.HTML(label="RL Training Board")
outputs = [env_state, score_state, grid_display, score_display, reward_display, rl_display]
human_inputs = [env_state, score_state, grid_size]
train_inputs = [env_state, score_state, grid_size, train_episodes, animation_delay]
demo.load(fn=new_game, inputs=[grid_size], outputs=outputs)
reset_btn.click(fn=new_game, inputs=[grid_size], outputs=outputs)
up_btn.click(fn=lambda env, score, size: move_agent(0, env, score, size), inputs=human_inputs, outputs=outputs)
right_btn.click(fn=lambda env, score, size: move_agent(1, env, score, size), inputs=human_inputs, outputs=outputs)
down_btn.click(fn=lambda env, score, size: move_agent(2, env, score, size), inputs=human_inputs, outputs=outputs)
left_btn.click(fn=lambda env, score, size: move_agent(3, env, score, size), inputs=human_inputs, outputs=outputs)
train_btn.click(fn=train_rl_solver, inputs=train_inputs, outputs=outputs)
return demo
if __name__ == "__main__":
app = build_app()
app.queue()
app.launch()