# app.py # Gridworld RL (Q-learning) with: # ✅ Original visualization + layout for the demo section (unchanged) # ✅ Non-flickering learning curve (always visible) # ✅ Obstacle density slider (auto-generate more/less blocks) # ✅ Train uses epsilon decay (converges); Play shows deterministic route (epsilon=0) # ✅ Same obstacle layout is reused for Play (stored in state) # ✅ Styling (Option 2): dark background + calmer amber/orange accents # ✅ Header: text LEFT, photo RIGHT # ✅ Removed the extra RL-description block (as you requested earlier) import time import numpy as np import gradio as gr import matplotlib.pyplot as plt from matplotlib.patches import Rectangle, FancyBboxPatch from io import BytesIO from PIL import Image from collections import deque # ========================================================= # 🎨 CUSTOM CSS (Option 2: calm industrial robotics) # ========================================================= CUSTOM_CSS = """ body { background: radial-gradient(circle at top left, #1b2b4a 0%, #050814 45%, #02030a 100%); color: #e7f3ff; } .gradio-container { max-width: 1100px !important; margin: 0 auto !important; padding-top: 28px; font-family: system-ui, -apple-system, BlinkMacSystemFont, "SF Pro Text", "Segoe UI", sans-serif; } /* Headings */ h1, h2, h3 { color: #ffd27d; letter-spacing: 0.04em; } /* Text */ p, li, .md p { color: #d6e6ff; } /* Labels */ label { color: #ffddaa !important; text-transform: uppercase; font-size: 0.75rem !important; letter-spacing: 0.12em; } /* Inputs */ textarea, input[type="text"] { background: rgba(2, 8, 26, 0.92) !important; border-radius: 16px !important; border: 1px solid rgba(255, 190, 100, 0.35) !important; color: #e7f3ff !important; } /* Sliders */ input[type="range"] { accent-color: #ffb347; } /* Buttons */ button { border-radius: 999px !important; font-weight: 600 !important; } button.primary { background: linear-gradient(90deg, #ffb347, #ffcc80) !important; color: #1a0f02 !important; border: none !important; box-shadow: 0 0 16px rgba(255, 180, 80, 0.40); } button.secondary { background: rgba(12, 20, 40, 0.92) !important; color: #ffd9a0 !important; border: 1px solid rgba(255, 200, 120, 0.35) !important; } /* Accordions / panels - keep subtle */ .gr-accordion, .gr-box, .gr-panel, .gr-group { background: radial-gradient(circle at top left, rgba(255, 200, 120, 0.06), rgba(4, 9, 29, 0.98)) !important; border: 1px solid rgba(255, 200, 120, 0.18) !important; border-radius: 18px !important; box-shadow: 0 0 18px rgba(255, 180, 80, 0.10); } /* Image containers - do not affect the pixels */ img { border-radius: 16px; } """ # ----------------------------- # Gridworld Environment # ----------------------------- ACTIONS = ["↑", "→", "↓", "←"] ACTION_DELTAS = { 0: (-1, 0), # up 1: (0, 1), # right 2: (1, 0), # down 3: (0, -1), # left } def _neighbors(r, c, n): if r > 0: yield (r - 1, c) if r < n - 1: yield (r + 1, c) if c > 0: yield (r, c - 1) if c < n - 1: yield (r, c + 1) def _has_path(size, start, goal, blocked): """BFS to ensure there's at least one safe path from start to goal.""" q = deque([start]) seen = {start} while q: cur = q.popleft() if cur == goal: return True r, c = cur for nr, nc in _neighbors(r, c, size): nxt = (nr, nc) if nxt in seen or nxt in blocked: continue seen.add(nxt) q.append(nxt) return False def generate_obstacles(size, start, goal, density, wall_ratio=0.7, max_tries=60, rng=None): """ Generate walls + lava with a given density, retrying until there is a safe path. Lava is treated as blocked (terminal negative), so we keep at least one safe route. """ rng = rng or np.random.default_rng() density = float(np.clip(density, 0.0, 0.60)) cur_density = density for _ in range(max_tries): walls = set() lava = set() for r in range(size): for c in range(size): cell = (r, c) if cell == start or cell == goal: continue if rng.random() < cur_density: if rng.random() < wall_ratio: walls.add(cell) else: lava.add(cell) blocked = walls | lava if _has_path(size, start, goal, blocked): return walls, lava cur_density = max(0.0, cur_density - 0.02) return set(), set() class GridWorld: def __init__(self, size=5, start=(0, 0), goal=None, lava=None, walls=None): self.size = int(size) self.start = start self.goal = goal if goal is not None else (self.size - 1, self.size - 1) self.lava = set(lava or []) self.walls = set(walls or []) self.reset() def reset(self): self.pos = self.start return self.pos def step(self, action): dr, dc = ACTION_DELTAS[action] r, c = self.pos nr, nc = r + dr, c + dc # bounds check if nr < 0 or nr >= self.size or nc < 0 or nc >= self.size: nr, nc = r, c # wall check if (nr, nc) in self.walls: nr, nc = r, c self.pos = (nr, nc) # rewards if self.pos == self.goal: return self.pos, +10.0, True if self.pos in self.lava: return self.pos, -10.0, True return self.pos, -0.1, False # step penalty -> shortest safe path is optimal # ----------------------------- # Q-Learning Agent # ----------------------------- class QAgent: def __init__(self, size=5, n_actions=4, alpha=0.3, gamma=0.95): self.size = int(size) self.n_actions = n_actions self.alpha = float(alpha) self.gamma = float(gamma) self.Q = np.zeros((self.size, self.size, n_actions), dtype=np.float32) def act(self, state, epsilon): r, c = state if np.random.rand() < float(epsilon): return np.random.randint(self.n_actions) return int(np.argmax(self.Q[r, c])) def act_greedy(self, state): r, c = state return int(np.argmax(self.Q[r, c])) def update(self, s, a, r, s2, done): r1, c1 = s r2, c2 = s2 best_next = 0.0 if done else float(np.max(self.Q[r2, c2])) td_target = r + self.gamma * best_next td_error = td_target - float(self.Q[r1, c1, a]) self.Q[r1, c1, a] += self.alpha * td_error # ----------------------------- # Rendering helpers (ORIGINAL look) # ----------------------------- def fig_to_pil(fig): buf = BytesIO() fig.savefig(buf, format="png", dpi=160, bbox_inches="tight") plt.close(fig) buf.seek(0) return Image.open(buf) def draw_grid(env: GridWorld, agent: QAgent = None, show_q=False, episode=None, step_i=None, total_reward=None): n = env.size fig, ax = plt.subplots(figsize=(5.4, 5.4)) ax.set_xlim(0, n) ax.set_ylim(0, n) ax.set_aspect("equal") ax.axis("off") # Background (keep original) ax.add_patch(Rectangle((0, 0), n, n, facecolor="#0b1020")) # Draw cells for r in range(n): for c in range(n): x, y = c, n - 1 - r # invert y so (0,0) is top-left visually tile_color = "#121a33" if (r, c) == env.goal: tile_color = "#0f2f1f" if (r, c) in env.lava: tile_color = "#3a1414" if (r, c) in env.walls: tile_color = "#1b1b1b" ax.add_patch( FancyBboxPatch( (x + 0.05, y + 0.05), 0.9, 0.9, boxstyle="round,pad=0.02,rounding_size=0.08", linewidth=1.0, edgecolor="#2a355f", facecolor=tile_color, alpha=0.95 ) ) # overlay Q hint (optional) if show_q and agent is not None and (r, c) not in env.walls: best_a = int(np.argmax(agent.Q[r, c])) qv = float(np.max(agent.Q[r, c])) ax.text(x + 0.5, y + 0.55, ACTIONS[best_a], ha="center", va="center", fontsize=14, color="#d7e3ff", alpha=0.65) ax.text(x + 0.5, y + 0.30, f"{qv:+.2f}", ha="center", va="center", fontsize=9, color="#a9b7e6", alpha=0.55) # Icons def put_icon(rc, icon, color="#ffffff"): r, c = rc x, y = c + 0.5, (n - 1 - r) + 0.5 ax.text(x, y, icon, ha="center", va="center", fontsize=22, color=color) put_icon(env.goal, "🏁") for rc in env.lava: put_icon(rc, "🔥") for rc in env.walls: put_icon(rc, "🧱") # Agent put_icon(env.pos, "🤖") # Header overlay title = "Gridworld RL • Q-learning" sub = [] if episode is not None: sub.append(f"Episode: {episode}") if step_i is not None: sub.append(f"Step: {step_i}") if total_reward is not None: sub.append(f"Return: {total_reward:+.2f}") subtitle = " • ".join(sub) ax.text(0, n + 0.35, title, fontsize=14, color="#eaf0ff", weight="bold") ax.text(0, n + 0.08, subtitle, fontsize=10, color="#b8c6ff", alpha=0.9) return fig_to_pil(fig) # ----------------------------- # Learning curve chart (no flicker) # ----------------------------- def moving_average(x, window=25): if len(x) < 2: return np.array(x, dtype=float) w = max(2, min(int(window), len(x))) kernel = np.ones(w) / w return np.convolve(np.array(x, dtype=float), kernel, mode="valid") def draw_learning_curve(returns, successes, window=25): fig, ax = plt.subplots(figsize=(5.4, 4.6)) ax.set_facecolor("#0b1020") for spine in ax.spines.values(): spine.set_color("#2a355f") ax.tick_params(colors="#c9d6ff") ax.yaxis.label.set_color("#c9d6ff") ax.xaxis.label.set_color("#c9d6ff") ax.title.set_color("#eaf0ff") ax.set_title("Learning curve") ax.set_xlabel("Episode") ax.set_ylabel("Return") if len(returns) > 0: xs = np.arange(1, len(returns) + 1) ax.plot(xs, returns, linewidth=1.5, alpha=0.9, label="Return") ma = moving_average(returns, window=window) if len(ma) > 0: xs_ma = np.arange(len(returns) - len(ma) + 1, len(returns) + 1) ax.plot(xs_ma, ma, linewidth=2.5, alpha=0.95, label=f"Moving avg ({min(int(window), len(returns))})") ax2 = ax.twinx() ax2.tick_params(colors="#c9d6ff") ax2.spines["right"].set_color("#2a355f") ax2.set_ylabel("Success rate", color="#c9d6ff") if len(successes) > 0: xs = np.arange(1, len(successes) + 1) sr = np.cumsum(np.array(successes, dtype=float)) / xs ax2.plot(xs, sr, linewidth=2.0, alpha=0.8, label="Success rate") lines, labels = ax.get_legend_handles_labels() lines2, labels2 = ax2.get_legend_handles_labels() ax.legend(lines + lines2, labels + labels2, loc="lower right", framealpha=0.2) ax.grid(True, alpha=0.15) return fig_to_pil(fig) # ----------------------------- # Training + Playback (store env layout so Play matches Train) # ----------------------------- def make_env_and_agent(grid_size, obstacle_density, alpha, gamma): size = int(grid_size) start = (0, 0) goal = (size - 1, size - 1) rng = np.random.default_rng() walls, lava = generate_obstacles(size, start, goal, density=float(obstacle_density), wall_ratio=0.7, rng=rng) env = GridWorld(size=size, start=start, goal=goal, walls=walls, lava=lava) agent = QAgent(size=size, alpha=alpha, gamma=gamma) env_state = { "size": size, "start": start, "goal": goal, "walls": sorted(list(walls)), "lava": sorted(list(lava)), } return env, agent, env_state def train_stream( grid_size, obstacle_density, alpha, gamma, eps_start, eps_end, eps_decay, episodes, max_steps, speed, show_q_overlay, curve_window, ): env, agent, env_state = make_env_and_agent(grid_size, obstacle_density, alpha, gamma) eps = float(eps_start) returns = [] successes = [] # initial frame = draw_grid(env, agent, show_q=show_q_overlay, episode=0, step_i=0, total_reward=0.0) last_curve = draw_learning_curve(returns, successes, window=int(curve_window)) status = f"Kies density en klik Train. (Obstacle density={float(obstacle_density):.2f})" yield frame, last_curve, agent.Q, env_state, status CURVE_UPDATE_EVERY_STEPS = 8 for ep in range(1, int(episodes) + 1): s = env.reset() total_r = 0.0 reached_goal_this_ep = 0 for t in range(1, int(max_steps) + 1): a = agent.act(s, epsilon=eps) s2, r, done = env.step(a) agent.update(s, a, r, s2, done) s = s2 total_r += r if done and env.pos == env.goal: reached_goal_this_ep = 1 if (t % CURVE_UPDATE_EVERY_STEPS == 0) or done: preview_returns = returns + [total_r] preview_successes = successes + [reached_goal_this_ep] last_curve = draw_learning_curve(preview_returns, preview_successes, window=int(curve_window)) frame = draw_grid(env, agent, show_q=show_q_overlay, episode=ep, step_i=t, total_reward=total_r) status = f"Train • ep {ep}/{episodes} • step {t}/{max_steps} • return {total_r:+.2f} • eps {eps:.3f}" yield frame, last_curve, agent.Q, env_state, status if speed > 0: time.sleep(float(speed)) if done: break returns.append(total_r) successes.append(reached_goal_this_ep) last_curve = draw_learning_curve(returns, successes, window=int(curve_window)) yield frame, last_curve, agent.Q, env_state, f"Episode {ep} klaar • return {total_r:+.2f} • success={reached_goal_this_ep} • eps {eps:.3f}" eps = max(float(eps_end), eps * float(eps_decay)) frame = draw_grid(env, agent, show_q=show_q_overlay, episode=episodes, step_i=None, total_reward=None) last_curve = draw_learning_curve(returns, successes, window=int(curve_window)) status = "Training klaar ✅ Klik nu op ‘Play learned policy’." yield frame, last_curve, agent.Q, env_state, status def play_stream(q_table, env_state, speed, show_q_overlay, max_steps): if q_table is None or env_state is None: env = GridWorld(size=5, start=(0, 0), goal=(4, 4), walls=[], lava=[]) agent = QAgent(size=5) frame = draw_grid(env, agent, show_q=show_q_overlay, episode=None, step_i=None, total_reward=None) curve = draw_learning_curve([], [], window=25) yield frame, curve, "Nog geen training gedaan. Klik eerst op Train." return size = int(env_state["size"]) start = tuple(env_state["start"]) goal = tuple(env_state["goal"]) walls = [tuple(x) for x in env_state["walls"]] lava = [tuple(x) for x in env_state["lava"]] env = GridWorld(size=size, start=start, goal=goal, walls=walls, lava=lava) agent = QAgent(size=size) agent.Q = np.array(q_table, dtype=np.float32) s = env.reset() total_r = 0.0 curve = draw_learning_curve([], [], window=25) # keep curve visible (static) during play frame = draw_grid(env, agent, show_q=show_q_overlay, episode="PLAY", step_i=0, total_reward=total_r) yield frame, curve, "Play • epsilon=0.0 (deterministisch)" for t in range(1, int(max_steps) + 1): a = agent.act_greedy(s) s2, r, done = env.step(a) s = s2 total_r += r frame = draw_grid(env, agent, show_q=show_q_overlay, episode="PLAY", step_i=t, total_reward=total_r) yield frame, curve, f"Play • step {t}/{max_steps} • return {total_r:+.2f}" if speed > 0: time.sleep(float(speed)) if done: break if env.pos == env.goal: end = f"🏁 Goal bereikt! return {total_r:+.2f}" elif env.pos in env.lava: end = "🔥 In lava beland. Tip: train langer of zet density lager." else: end = "Play klaar. Tip: train langer of max_steps omhoog." frame = draw_grid(env, agent, show_q=show_q_overlay, episode="PLAY", step_i=None, total_reward=total_r) yield frame, curve, end # ----------------------------- # Gradio UI (layout stays the same) # ----------------------------- with gr.Blocks(css=CUSTOM_CSS, theme=gr.themes.Soft(), title="Warehouse Robot RL Demo") as demo: # Header: text LEFT, image RIGHT (as you requested) with gr.Row(): with gr.Column(scale=3): gr.Markdown( """ ### 🤖 Een robot in het magazijn Stel je voor: je werkt in een groot magazijn. Tussen de stellingen rijdt een robot rond die bestellingen moet ophalen en naar het inpakstation brengen. Die robot krijgt geen kaart, geen regels en geen instructies over wat de snelste route is. In deze demo zie je hoe zo’n robot zelf leert wat slim gedrag is. In het begin rijdt hij willekeurig rond en maakt hij fouten. Maar naarmate hij meer ervaring opdoet, ontdekt hij vanzelf: hoe hij veilig, efficiënt en zo snel mogelijk door het magazijn kan bewegen. Boven zie je de robot rijden tussen stellingen en gevaarlijke zones. Onder zie je hoe zijn prestaties verbeteren naarmate hij leert. 👉 Probeer het zelf: maak het magazijn makkelijker of moeilijker, train de robot, en laat daarna zien wat hij geleerd heeft. """ ) with gr.Column(scale=2): gr.Image( value="humanoid-robot-apptronic-1024x684.jpg.webp", show_label=False, height=340, ) # ---- Demo section (unchanged) ---- q_state = gr.State(None) env_state = gr.State(None) with gr.Row(): with gr.Column(scale=1): grid_size = gr.Slider(4, 10, value=5, step=1, label="Grid size") obstacle_density = gr.Slider( 0.0, 0.45, value=0.15, step=0.05, label="Obstacle density (meer blokken/gevaar)" ) with gr.Accordion("RL parameters (defaults = goede convergentie)", open=True): alpha = gr.Slider(0.01, 1.0, value=0.45, step=0.01, label="Alpha (learning rate)") gamma = gr.Slider(0.0, 0.999, value=0.97, step=0.001, label="Gamma (discount)") with gr.Accordion("Exploration (epsilon decay)", open=True): eps_start = gr.Slider(0.0, 1.0, value=0.90, step=0.01, label="Epsilon start (veel explore)") eps_end = gr.Slider(0.0, 0.2, value=0.02, step=0.005, label="Epsilon end (bijna greedy)") eps_decay = gr.Slider(0.90, 0.999, value=0.985, step=0.001, label="Epsilon decay per episode") episodes = gr.Slider(1, 400, value=200, step=1, label="Episodes") max_steps_train = gr.Slider(5, 200, value=60, step=1, label="Max steps per episode") with gr.Accordion("Visuals & snelheid", open=True): speed = gr.Slider(0.0, 0.3, value=0.02, step=0.01, label="Animatie vertraging (sec/frame)") show_q_overlay = gr.Checkbox(value=True, label="Toon beste actie & Q-waarde per vakje (overlay)") curve_window = gr.Slider(5, 80, value=25, step=1, label="Moving average window (episodes)") with gr.Row(): train_btn = gr.Button("🚀 Train (epsilon decay)", variant="primary") play_btn = gr.Button("▶️ Play learned policy (epsilon=0)") status = gr.Textbox(label="Status", value="Kies density en klik Train.", interactive=False) with gr.Column(scale=1): frame_out = gr.Image(label="Live animatie", type="pil", height=520) curve_out = gr.Image(label="Learning curve (live)", type="pil", height=420) train_btn.click( fn=train_stream, inputs=[ grid_size, obstacle_density, alpha, gamma, eps_start, eps_end, eps_decay, episodes, max_steps_train, speed, show_q_overlay, curve_window ], outputs=[frame_out, curve_out, q_state, env_state, status], ) play_btn.click( fn=play_stream, inputs=[q_state, env_state, speed, show_q_overlay, max_steps_train], outputs=[frame_out, curve_out, status], ) if __name__ == "__main__": demo.launch()