Spaces:
Sleeping
Sleeping
| import html | |
| from collections import deque | |
| import gradio as gr | |
| import gymnasium as gym | |
| import numpy as np | |
| from gymnasium import spaces | |
| # ============================================================ | |
| # Gymnasium Environment: Warehouse GridWorld | |
| # ============================================================ | |
| class WarehouseGridWorldEnv(gym.Env): | |
| """ | |
| Warehouse GridWorld navigation environment. | |
| Grid symbols: | |
| S = Start | |
| G = Goal | |
| X = Obstacle | |
| . = Empty cell | |
| A = Agent shown visually in the UI | |
| Observation: | |
| [agent_x_normalized, agent_y_normalized, goal_x_normalized, goal_y_normalized] | |
| Actions: | |
| 0 = UP | |
| 1 = RIGHT | |
| 2 = DOWN | |
| 3 = LEFT | |
| """ | |
| metadata = {"render_modes": ["human"]} | |
| def __init__(self, grid_size=9, obstacle_density=0.20, max_steps=100, seed=None): | |
| super().__init__() | |
| self.grid_size = int(grid_size) | |
| self.obstacle_density = float(obstacle_density) | |
| self.max_steps = int(max_steps) | |
| self.action_space = spaces.Discrete(4) | |
| self.observation_space = spaces.Box( | |
| low=0.0, | |
| high=1.0, | |
| shape=(4,), | |
| dtype=np.float32 | |
| ) | |
| self.action_map = { | |
| 0: (-1, 0), # UP | |
| 1: (0, 1), # RIGHT | |
| 2: (1, 0), # DOWN | |
| 3: (0, -1), # LEFT | |
| } | |
| self.action_names = { | |
| 0: "UP", | |
| 1: "RIGHT", | |
| 2: "DOWN", | |
| 3: "LEFT", | |
| } | |
| self.reset(seed=seed) | |
| def reset(self, seed=None, options=None): | |
| super().reset(seed=seed) | |
| self.grid, self.start, self.goal = self._generate_solvable_grid() | |
| self.agent_pos = self.start | |
| self.steps = 0 | |
| self.total_score = 0.0 | |
| self.last_reward = 0.0 | |
| self.done = False | |
| self.status = "Playing" | |
| self.last_action = "None" | |
| self.rule_fired = "New episode started. Agent begins on S." | |
| self.visited = {self.start} | |
| return self._get_obs(), self._get_info() | |
| def step(self, action): | |
| if self.done: | |
| self.last_reward = 0.0 | |
| self.rule_fired = "Episode already finished. Press reset to play again." | |
| return self._get_obs(), 0.0, True, False, self._get_info() | |
| action = int(action) | |
| self.steps += 1 | |
| self.last_action = self.action_names[action] | |
| old_pos = self.agent_pos | |
| old_distance = self._manhattan_distance(old_pos, self.goal) | |
| dr, dc = self.action_map[action] | |
| new_pos = (old_pos[0] + dr, old_pos[1] + dc) | |
| reward = 0.0 | |
| terminated = False | |
| truncated = False | |
| if not self._is_valid_position(new_pos): | |
| reward = -5.0 | |
| self.rule_fired = "-5.0 wall/obstacle penalty. Agent stays in place." | |
| new_pos = old_pos | |
| else: | |
| self.agent_pos = new_pos | |
| new_distance = self._manhattan_distance(new_pos, self.goal) | |
| if new_distance < old_distance: | |
| reward += 1.0 | |
| self.rule_fired = "+1.0 moved closer to goal." | |
| elif new_distance > old_distance: | |
| reward -= 0.5 | |
| self.rule_fired = "-0.5 moved farther from goal." | |
| else: | |
| reward -= 0.1 | |
| self.rule_fired = "-0.1 same Manhattan distance." | |
| if new_pos not in self.visited: | |
| reward += 0.3 | |
| self.rule_fired += " +0.3 new-cell bonus." | |
| self.visited.add(new_pos) | |
| if new_pos == self.goal: | |
| reward += 50.0 | |
| terminated = True | |
| self.done = True | |
| self.status = "Goal Reached" | |
| self.rule_fired += " +50.0 goal reached!" | |
| if not terminated and self.steps >= self.max_steps: | |
| reward -= 10.0 | |
| truncated = True | |
| self.done = True | |
| self.status = "Timeout" | |
| self.rule_fired += " -10.0 timeout penalty." | |
| self.last_reward = reward | |
| self.total_score += reward | |
| return self._get_obs(), reward, terminated, truncated, self._get_info() | |
| def _get_obs(self): | |
| denominator = max(1, self.grid_size - 1) | |
| agent_row, agent_col = self.agent_pos | |
| goal_row, goal_col = self.goal | |
| return np.array( | |
| [ | |
| agent_row / denominator, | |
| agent_col / denominator, | |
| goal_row / denominator, | |
| goal_col / denominator, | |
| ], | |
| dtype=np.float32, | |
| ) | |
| def _get_info(self): | |
| return { | |
| "total_score": self.total_score, | |
| "last_reward": self.last_reward, | |
| "steps": self.steps, | |
| "agent_position": self.agent_pos, | |
| "goal_position": self.goal, | |
| "manhattan_distance": self._manhattan_distance(self.agent_pos, self.goal), | |
| "status": self.status, | |
| "last_action": self.last_action, | |
| "rule_fired": self.rule_fired, | |
| "goal_reached": self.agent_pos == self.goal, | |
| } | |
| def _generate_solvable_grid(self): | |
| """ | |
| Randomizes start, goal, and obstacles at approximately 20% density. | |
| Keeps trying until there is a valid path from S to G. | |
| """ | |
| for _ in range(500): | |
| grid = np.full((self.grid_size, self.grid_size), ".", dtype="<U1") | |
| start = self._random_cell() | |
| goal = self._random_cell() | |
| while goal == start or self._manhattan_distance(start, goal) < self.grid_size // 2: | |
| goal = self._random_cell() | |
| available_cells = [ | |
| (r, c) | |
| for r in range(self.grid_size) | |
| for c in range(self.grid_size) | |
| if (r, c) not in {start, goal} | |
| ] | |
| obstacle_count = int(len(available_cells) * self.obstacle_density) | |
| obstacle_indices = self.np_random.choice( | |
| len(available_cells), | |
| size=obstacle_count, | |
| replace=False, | |
| ) | |
| obstacles = [available_cells[i] for i in obstacle_indices] | |
| for r, c in obstacles: | |
| grid[r, c] = "X" | |
| grid[start] = "S" | |
| grid[goal] = "G" | |
| if self._path_exists(grid, start, goal): | |
| return grid, start, goal | |
| return self._fallback_grid() | |
| def _fallback_grid(self): | |
| """ | |
| Safety fallback in case random generation fails. | |
| Creates a guaranteed solvable grid with a carved path. | |
| """ | |
| grid = np.full((self.grid_size, self.grid_size), ".", dtype="<U1") | |
| start = (0, 0) | |
| goal = (self.grid_size - 1, self.grid_size - 1) | |
| safe_path = set() | |
| for r in range(self.grid_size): | |
| safe_path.add((r, 0)) | |
| for c in range(self.grid_size): | |
| safe_path.add((self.grid_size - 1, c)) | |
| available_cells = [ | |
| (r, c) | |
| for r in range(self.grid_size) | |
| for c in range(self.grid_size) | |
| if (r, c) not in safe_path and (r, c) not in {start, goal} | |
| ] | |
| obstacle_count = int((self.grid_size * self.grid_size - 2) * self.obstacle_density) | |
| obstacle_count = min(obstacle_count, len(available_cells)) | |
| obstacle_indices = self.np_random.choice( | |
| len(available_cells), | |
| size=obstacle_count, | |
| replace=False, | |
| ) | |
| for index in obstacle_indices: | |
| r, c = available_cells[index] | |
| grid[r, c] = "X" | |
| grid[start] = "S" | |
| grid[goal] = "G" | |
| return grid, start, goal | |
| def _random_cell(self): | |
| row = int(self.np_random.integers(0, self.grid_size)) | |
| col = int(self.np_random.integers(0, self.grid_size)) | |
| return row, col | |
| def _path_exists(self, grid, start, goal): | |
| queue = deque([start]) | |
| seen = {start} | |
| while queue: | |
| current = queue.popleft() | |
| if current == goal: | |
| return True | |
| for dr, dc in self.action_map.values(): | |
| nr = current[0] + dr | |
| nc = current[1] + dc | |
| next_pos = (nr, nc) | |
| if ( | |
| 0 <= nr < self.grid_size | |
| and 0 <= nc < self.grid_size | |
| and next_pos not in seen | |
| and grid[nr, nc] != "X" | |
| ): | |
| seen.add(next_pos) | |
| queue.append(next_pos) | |
| return False | |
| def _is_valid_position(self, pos): | |
| row, col = pos | |
| if row < 0 or row >= self.grid_size: | |
| return False | |
| if col < 0 or col >= self.grid_size: | |
| return False | |
| if self.grid[row, col] == "X": | |
| return False | |
| return True | |
| def _manhattan_distance(self, pos_a, pos_b): | |
| return abs(pos_a[0] - pos_b[0]) + abs(pos_a[1] - pos_b[1]) | |
| # ============================================================ | |
| # HTML Rendering | |
| # ============================================================ | |
| def render_grid(env): | |
| rows = [] | |
| for r in range(env.grid_size): | |
| cells = [] | |
| for c in range(env.grid_size): | |
| pos = (r, c) | |
| value = env.grid[r, c] | |
| if value == "S": | |
| cell_class = "cell-start" | |
| label = "S" | |
| elif value == "G": | |
| cell_class = "cell-goal" | |
| label = "G" | |
| elif value == "X": | |
| cell_class = "cell-obstacle" | |
| label = "X" | |
| else: | |
| cell_class = "cell-empty" | |
| label = "" | |
| if pos == env.agent_pos: | |
| label = "<div class='agent-circle'>A</div>" | |
| cells.append( | |
| f"<td class='warehouse-cell {cell_class}'>{label}</td>" | |
| ) | |
| rows.append("<tr>" + "".join(cells) + "</tr>") | |
| table = "<table class='warehouse-grid'>" + "".join(rows) + "</table>" | |
| return f""" | |
| <div class="grid-panel"> | |
| {table} | |
| </div> | |
| """ | |
| def render_scoreboard(env): | |
| info = env._get_info() | |
| agent_row, agent_col = info["agent_position"] | |
| goal_row, goal_col = info["goal_position"] | |
| goal_text = "Yes" if info["goal_reached"] else "No" | |
| return f""" | |
| <div class="score-card"> | |
| <h3>Score Board</h3> | |
| <div class="metric-grid"> | |
| <div class="metric-box"> | |
| <div class="metric-label">Total Score</div> | |
| <div class="metric-value">{info["total_score"]:.1f}</div> | |
| </div> | |
| <div class="metric-box"> | |
| <div class="metric-label">Last Reward</div> | |
| <div class="metric-value">{info["last_reward"]:+.1f}</div> | |
| </div> | |
| <div class="metric-box"> | |
| <div class="metric-label">Steps</div> | |
| <div class="metric-value">{info["steps"]} / {env.max_steps}</div> | |
| </div> | |
| <div class="metric-box"> | |
| <div class="metric-label">Manhattan Distance</div> | |
| <div class="metric-value">{info["manhattan_distance"]}</div> | |
| </div> | |
| </div> | |
| <div class="detail-line"><b>Agent Position:</b> ({agent_row}, {agent_col})</div> | |
| <div class="detail-line"><b>Goal Position:</b> ({goal_row}, {goal_col})</div> | |
| <div class="detail-line"><b>Goal Reached:</b> {goal_text}</div> | |
| <div class="detail-line"><b>Status:</b> {html.escape(info["status"])}</div> | |
| <div class="detail-line"><b>Last Action:</b> {html.escape(info["last_action"])}</div> | |
| <div class="rule-box"> | |
| <b>Rule Fired:</b><br> | |
| {html.escape(info["rule_fired"])} | |
| </div> | |
| <hr> | |
| <h4>Reward Rules</h4> | |
| <ul class="reward-list"> | |
| <li><code>-5.0</code> wall/obstacle</li> | |
| <li><code>+1.0</code> closer to goal</li> | |
| <li><code>-0.5</code> farther from goal</li> | |
| <li><code>-0.1</code> same distance</li> | |
| <li><code>+0.3</code> new-cell bonus</li> | |
| <li><code>+50.0</code> goal reached</li> | |
| <li><code>-10.0</code> timeout</li> | |
| </ul> | |
| </div> | |
| """ | |
| # ============================================================ | |
| # Gradio Event Functions | |
| # ============================================================ | |
| def new_game(grid_size): | |
| env = WarehouseGridWorldEnv( | |
| grid_size=int(grid_size), | |
| obstacle_density=0.20, | |
| max_steps=100, | |
| ) | |
| return env, render_grid(env), render_scoreboard(env) | |
| def move_agent(env, action): | |
| if env is None: | |
| env = WarehouseGridWorldEnv(grid_size=9, obstacle_density=0.20, max_steps=100) | |
| env.step(action) | |
| return env, render_grid(env), render_scoreboard(env) | |
| def move_up(env): | |
| return move_agent(env, 0) | |
| def move_right(env): | |
| return move_agent(env, 1) | |
| def move_down(env): | |
| return move_agent(env, 2) | |
| def move_left(env): | |
| return move_agent(env, 3) | |
| # ============================================================ | |
| # Styling and Keyboard Script | |
| # ============================================================ | |
| APP_CSS = """ | |
| body { | |
| background: #f7f9fc; | |
| } | |
| .main-title { | |
| text-align: center; | |
| margin-bottom: 0px; | |
| } | |
| .subtitle { | |
| text-align: center; | |
| color: #555; | |
| margin-top: 0px; | |
| } | |
| .grid-panel { | |
| display: flex; | |
| justify-content: center; | |
| align-items: center; | |
| padding: 12px; | |
| } | |
| .warehouse-grid { | |
| border-collapse: collapse; | |
| border: 3px solid #2f4858; | |
| background: white; | |
| } | |
| .warehouse-cell { | |
| width: 42px; | |
| height: 42px; | |
| border: 2px solid #607d8b; | |
| text-align: center; | |
| vertical-align: middle; | |
| font-weight: 800; | |
| font-size: 16px; | |
| font-family: Arial, sans-serif; | |
| } | |
| .cell-start { | |
| background: #b9d7ff; | |
| color: #0b3d91; | |
| } | |
| .cell-goal { | |
| background: #2ecc71; | |
| color: #063b1d; | |
| } | |
| .cell-obstacle { | |
| background: #2f3e46; | |
| color: #dce3e8; | |
| } | |
| .cell-empty { | |
| background: #edf6fb; | |
| color: #607d8b; | |
| } | |
| .agent-circle { | |
| width: 30px; | |
| height: 30px; | |
| background: #e53935; | |
| color: white; | |
| border-radius: 999px; | |
| display: flex; | |
| align-items: center; | |
| justify-content: center; | |
| margin: auto; | |
| font-size: 15px; | |
| font-weight: 900; | |
| box-shadow: 0 2px 5px rgba(0,0,0,0.35); | |
| } | |
| .score-card { | |
| background: white; | |
| border: 1px solid #d9e2ec; | |
| border-radius: 14px; | |
| padding: 16px; | |
| box-shadow: 0 2px 8px rgba(0,0,0,0.06); | |
| } | |
| .score-card h3 { | |
| margin-top: 0px; | |
| } | |
| .metric-grid { | |
| display: grid; | |
| grid-template-columns: 1fr 1fr; | |
| gap: 10px; | |
| } | |
| .metric-box { | |
| background: #f1f5f9; | |
| border-radius: 10px; | |
| padding: 10px; | |
| } | |
| .metric-label { | |
| color: #64748b; | |
| font-size: 12px; | |
| } | |
| .metric-value { | |
| color: #111827; | |
| font-size: 20px; | |
| font-weight: 800; | |
| } | |
| .detail-line { | |
| margin-top: 8px; | |
| font-size: 14px; | |
| } | |
| .rule-box { | |
| margin-top: 12px; | |
| padding: 10px; | |
| background: #fff7ed; | |
| border-left: 5px solid #fb923c; | |
| border-radius: 8px; | |
| font-size: 14px; | |
| } | |
| .reward-list { | |
| margin-top: 6px; | |
| } | |
| .reward-list li { | |
| margin-bottom: 5px; | |
| } | |
| .reward-list code { | |
| background: #e5e7eb; | |
| padding: 3px 6px; | |
| border-radius: 6px; | |
| font-weight: 800; | |
| } | |
| .control-note { | |
| text-align: center; | |
| color: #555; | |
| font-size: 14px; | |
| } | |
| button { | |
| font-weight: 700 !important; | |
| } | |
| """ | |
| KEYBOARD_SCRIPT = """ | |
| <script> | |
| (function () { | |
| function clickButton(id) { | |
| const container = document.getElementById(id); | |
| if (!container) return; | |
| const button = container.querySelector("button") || container; | |
| if (button) button.click(); | |
| } | |
| function bindKeys() { | |
| if (window.__warehouse_gridworld_keys_bound) return; | |
| window.__warehouse_gridworld_keys_bound = true; | |
| document.addEventListener("keydown", function (event) { | |
| const tag = event.target.tagName; | |
| if (tag === "INPUT" || tag === "TEXTAREA" || tag === "SELECT") { | |
| return; | |
| } | |
| if (event.key === "ArrowUp") { | |
| event.preventDefault(); | |
| clickButton("up_btn"); | |
| } | |
| if (event.key === "ArrowRight") { | |
| event.preventDefault(); | |
| clickButton("right_btn"); | |
| } | |
| if (event.key === "ArrowDown") { | |
| event.preventDefault(); | |
| clickButton("down_btn"); | |
| } | |
| if (event.key === "ArrowLeft") { | |
| event.preventDefault(); | |
| clickButton("left_btn"); | |
| } | |
| }); | |
| } | |
| if (document.readyState === "loading") { | |
| document.addEventListener("DOMContentLoaded", bindKeys); | |
| } else { | |
| bindKeys(); | |
| } | |
| setTimeout(bindKeys, 1000); | |
| })(); | |
| </script> | |
| """ | |
| # ============================================================ | |
| # Gradio App | |
| # ============================================================ | |
| with gr.Blocks( | |
| title="Warehouse GridWorld Game", | |
| css=APP_CSS, | |
| head=KEYBOARD_SCRIPT, | |
| ) as demo: | |
| env_state = gr.State() | |
| gr.Markdown( | |
| """ | |
| # ποΈ Warehouse GridWorld Game | |
| <p class="subtitle"> | |
| Use the keyboard arrow keys or the on-screen buttons to move the red agent from S to G. | |
| Obstacles are randomized at approximately 20% density on every reset. | |
| </p> | |
| """ | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| grid_output = gr.HTML() | |
| gr.Markdown( | |
| """ | |
| <p class="control-note"> | |
| Controls: β β β β arrow keys, or use the buttons below. | |
| </p> | |
| """ | |
| ) | |
| with gr.Row(): | |
| up_btn = gr.Button("β Up", elem_id="up_btn") | |
| with gr.Row(): | |
| left_btn = gr.Button("β Left", elem_id="left_btn") | |
| down_btn = gr.Button("β Down", elem_id="down_btn") | |
| right_btn = gr.Button("β Right", elem_id="right_btn") | |
| with gr.Column(scale=1): | |
| grid_size = gr.Slider( | |
| minimum=5, | |
| maximum=15, | |
| value=9, | |
| step=1, | |
| label="Grid Size", | |
| ) | |
| reset_btn = gr.Button("π Reset / Randomize Grid", variant="primary") | |
| scoreboard_output = gr.HTML() | |
| reset_btn.click( | |
| fn=new_game, | |
| inputs=[grid_size], | |
| outputs=[env_state, grid_output, scoreboard_output], | |
| ) | |
| up_btn.click( | |
| fn=move_up, | |
| inputs=[env_state], | |
| outputs=[env_state, grid_output, scoreboard_output], | |
| ) | |
| right_btn.click( | |
| fn=move_right, | |
| inputs=[env_state], | |
| outputs=[env_state, grid_output, scoreboard_output], | |
| ) | |
| down_btn.click( | |
| fn=move_down, | |
| inputs=[env_state], | |
| outputs=[env_state, grid_output, scoreboard_output], | |
| ) | |
| left_btn.click( | |
| fn=move_left, | |
| inputs=[env_state], | |
| outputs=[env_state, grid_output, scoreboard_output], | |
| ) | |
| demo.load( | |
| fn=new_game, | |
| inputs=[grid_size], | |
| outputs=[env_state, grid_output, scoreboard_output], | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |