""" FrozenLakeProcessor - FrozenLake puzzle generation, solving, rendering, and evaluation. Grid cells: S=Start, F=Frozen(safe), H=Hole(death), G=Goal Table chars: @=Start, _=Frozen, #=Hole, *=Goal Generation strategy: - ``generate()``: Pure random + BFS retry. Fine for small grids (≤16). - ``generate_guided()``: Lay a random walk path first, then fill remaining cells. Guarantees long paths even at 32×32+ without exponential retries. - ``generate_auto()``: Auto-select best strategy based on difficulty. - ``generate_batch()``: Multiprocessing wrapper for high-throughput. Solving uses plain BFS (~10× faster than networkx). """ import os import random import warnings from collections import deque from concurrent.futures import ProcessPoolExecutor, as_completed from typing import List, Tuple, Optional import numpy as np from PIL import Image, ImageDraw try: os.environ.setdefault("SDL_AUDIODRIVER", "dummy") warnings.filterwarnings("ignore", category=UserWarning, module="pygame") warnings.filterwarnings("ignore", category=DeprecationWarning) import gymnasium as gym HAS_GYM = True except ImportError: HAS_GYM = False TABLE_TO_GRID = {"@": "S", "_": "F", "#": "H", "*": "G"} GRID_TO_TABLE = {v: k for k, v in TABLE_TO_GRID.items()} MOVES = {"U": (-1, 0), "D": (1, 0), "L": (0, -1), "R": (0, 1)} GYM_ACTION_MAP = {"L": 0, "D": 1, "R": 2, "U": 3} GridDesc = List[str] class FrozenLakeProcessor: """FrozenLake generation, BFS solving, rendering, and evaluation.""" def __init__(self, img_size: int = 512): self.img_size = img_size self.path_color = "red" # ==================== Generation: Pure Random ==================== def generate( self, size: int, p: float = 0.8, min_path_len: int = 1, max_attempts: int = 500, ) -> Tuple[GridDesc, List[Tuple[int, int]]]: """ Random layout + BFS retry. Good for small grids or low min_path_len. For large grids with long path requirements, use ``generate_guided()``. """ for _ in range(max_attempts): desc = self._random_layout(size, p) path = self.solve(desc) if path is not None and (len(path) - 1) >= min_path_len: return desc, path raise RuntimeError( f"Failed after {max_attempts} attempts " f"(size={size}, p={p}, min_path_len={min_path_len})." ) @staticmethod def _random_layout(size: int, p: float = 0.8) -> GridDesc: all_coords = [(r, c) for r in range(size) for c in range(size)] start, goal = random.sample(all_coords, 2) grid = [] for r in range(size): row = [] for c in range(size): if (r, c) == start: row.append("S") elif (r, c) == goal: row.append("G") else: row.append("F" if random.random() < p else "H") grid.append("".join(row)) return grid # ==================== Generation: Guided (path-first) ==================== def simplify_path(self, path: List[Tuple[int, int]]) -> List[Tuple[int, int]]: """ Reduce the path """ if not path: return path simplified = [path[0]] curr_idx = 0 while curr_idx < len(path) - 1: found_shortcut = False for next_idx in range(len(path) - 1, curr_idx + 1, -1): r1, c1 = path[curr_idx] r2, c2 = path[next_idx] if abs(r1 - r2) + abs(c1 - c2) == 1: simplified.append(path[next_idx]) curr_idx = next_idx found_shortcut = True break if not found_shortcut: curr_idx += 1 simplified.append(path[curr_idx]) return simplified def generate_guided( self, size: int, p: float = 0.8, min_path_len: int = 1, max_attempts: int = 100, ) -> Tuple[GridDesc, List[Tuple[int, int]]]: """ Path-first generation using DFS spanning tree diameter. The walk is a valid S→G path by construction (all walk cells are Frozen, all others are Holes). We return the walk directly as the solution path — it may not be the BFS-shortest, but it IS a valid path of guaranteed minimum length. """ for _ in range(max_attempts): desc, walk = self._guided_layout(size, p, min_path_len) if desc is None: continue optimized_walk = self.simplify_path(walk) if len(optimized_walk) - 1 >= min_path_len: return desc, optimized_walk raise RuntimeError( f"Guided generation failed after {max_attempts} attempts " f"(size={size}, p={p}, min_path_len={min_path_len})." ) def _guided_layout( self, size: int, p: float, min_path_len: int, ) -> Tuple[Optional[GridDesc], Optional[List[Tuple[int, int]]]]: """ Build grid with a guaranteed long path using a DFS spanning tree. Strategy: 1. Build random spanning tree of the grid via DFS. 2. Find tree diameter (longest path) via double-BFS — guaranteed unique path, no shortcuts possible. 3. Trim to desired length if much longer than needed. 4. Cells adjacent to ≥2 walk cells but OFF the walk become Holes (deterministically blocks all shortcuts). 5. Remaining off-path cells are cosmetically filled with p. Because tree paths are unique, the BFS shortest path in the resulting grid equals the walk length (no shortcuts exist). """ dirs = [(0, 1), (0, -1), (1, 0), (-1, 0)] # Step 1: Random spanning tree via DFS adj: dict = {(r, c): [] for r in range(size) for c in range(size)} vis = [[False] * size for _ in range(size)] sr, sc = random.randrange(size), random.randrange(size) vis[sr][sc] = True stack = [(sr, sc)] while stack: r, c = stack[-1] nbrs = [] for dr, dc in dirs: nr, nc = r + dr, c + dc if 0 <= nr < size and 0 <= nc < size and not vis[nr][nc]: nbrs.append((nr, nc)) if nbrs: nr, nc = random.choice(nbrs) vis[nr][nc] = True adj[(r, c)].append((nr, nc)) adj[(nr, nc)].append((r, c)) stack.append((nr, nc)) else: stack.pop() # Step 2: Tree diameter via double-BFS def _bfs_far(start): dist = {start: 0} q = deque([start]) far = start while q: node = q.popleft() for nb in adj[node]: if nb not in dist: dist[nb] = dist[node] + 1 q.append(nb) if dist[nb] > dist[far]: far = nb return far, dist end1, _ = _bfs_far((sr, sc)) end2, dist1 = _bfs_far(end1) if dist1[end2] < min_path_len: return None, None # Step 3: Reconstruct path end1 → end2 prev = {end1: None} q = deque([end1]) while q: node = q.popleft() if node == end2: break for nb in adj[node]: if nb not in prev: prev[nb] = node q.append(nb) walk = [] cur = end2 while cur is not None: walk.append(cur) cur = prev[cur] walk.reverse() # Optionally trim if much longer if len(walk) - 1 > min_path_len * 2: excess = len(walk) - 1 - min_path_len trim = random.randint(0, excess // 2) if trim > 0: walk = walk[trim:] excess2 = len(walk) - 1 - min_path_len trim2 = random.randint(0, excess2 // 2) if trim2 > 0: walk = walk[: len(walk) - trim2] start_pos, end_pos = walk[0], walk[-1] walk_set = set(walk) # Step 4: Compute adjacency to walk for off-path cells walk_nbr_ct: dict = {} for wr, wc in walk: for dr, dc in dirs: nr, nc = wr + dr, wc + dc if 0 <= nr < size and 0 <= nc < size and (nr, nc) not in walk_set: walk_nbr_ct[(nr, nc)] = walk_nbr_ct.get((nr, nc), 0) + 1 # Step 5: Fill grid. # ALL non-walk cells are Holes. This guarantees the BFS shortest # path equals the walk itself (zero shortcut surface). # The grid will look like a winding corridor through a sea of holes. grid = [[""] * size for _ in range(size)] for r in range(size): for c in range(size): if (r, c) == start_pos: grid[r][c] = "S" elif (r, c) == end_pos: grid[r][c] = "G" elif (r, c) in walk_set: grid[r][c] = "F" else: # prob `p` as hole grid[r][c] = "F" if random.random() < p else "H" return ["".join(row) for row in grid], walk # ==================== Generation: Auto ==================== def generate_auto( self, size: int, p: float = 0.8, min_path_len: int = 1, max_attempts: int = 200, ) -> Tuple[GridDesc, List[Tuple[int, int]]]: """Auto-select: random for easy cases, guided for hard ones.""" expected_max = size * 1.5 if min_path_len > expected_max * 0.5: return self.generate_guided(size, p, min_path_len, max_attempts) try: return self.generate(size, p, min_path_len, max_attempts) except RuntimeError: return self.generate_guided(size, p, min_path_len, max_attempts) # ==================== Batch (multiprocessing) ==================== @staticmethod def _generate_one(args: tuple) -> Optional[Tuple[GridDesc, list]]: """Worker for multiprocessing.""" size, p, min_path_len, seed = args random.seed(seed) proc = FrozenLakeProcessor() try: return proc.generate_auto(size, p, min_path_len, max_attempts=200) except RuntimeError: return None def generate_batch( self, size: int, count: int, p: float = 0.8, min_path_len: int = 1, workers: int = 8, base_seed: int = 42, ) -> List[Tuple[GridDesc, List[Tuple[int, int]]]]: """Generate *count* puzzles in parallel.""" tasks = [(size, p, min_path_len, base_seed + i) for i in range(count * 2)] results = [] with ProcessPoolExecutor(max_workers=workers) as executor: futures = {executor.submit(self._generate_one, t): t for t in tasks} for future in as_completed(futures): res = future.result() if res is not None: results.append(res) if len(results) >= count: executor.shutdown(wait=False, cancel_futures=True) break return results[:count] # ==================== Solving (plain BFS) ==================== @staticmethod def solve(desc: GridDesc) -> Optional[List[Tuple[int, int]]]: """BFS shortest path from S to G, avoiding H.""" rows, cols = len(desc), len(desc[0]) start = goal = None for r in range(rows): for c in range(cols): if desc[r][c] == "S": start = (r, c) elif desc[r][c] == "G": goal = (r, c) if start is None or goal is None: return None visited = [[False] * cols for _ in range(rows)] visited[start[0]][start[1]] = True queue: deque = deque([(start, [start])]) while queue: (r, c), path = queue.popleft() if (r, c) == goal: return path for dr, dc in ((-1, 0), (1, 0), (0, -1), (0, 1)): nr, nc = r + dr, c + dc if 0 <= nr < rows and 0 <= nc < cols and not visited[nr][nc]: if desc[nr][nc] != "H": visited[nr][nc] = True queue.append(((nr, nc), path + [(nr, nc)])) return None # ==================== Path ↔ UDRL ==================== @staticmethod def path_to_udrl(path: List[Tuple[int, int]]) -> str: moves = [] for i in range(len(path) - 1): r1, c1 = path[i] r2, c2 = path[i + 1] if r2 < r1: moves.append("U") elif r2 > r1: moves.append("D") elif c2 < c1: moves.append("L") else: moves.append("R") return "".join(moves) # ==================== Verification ==================== def verify_path_sim(self, desc: GridDesc, udrl: str) -> bool: rows, cols = len(desc), len(desc[0]) start = self.find_start(desc) if start is None: return False r, c = start clean = udrl.replace(",", "").replace(" ", "").strip() if "Action plan" in clean: clean = clean.rsplit("Action plan", 1)[-1] for ch in clean: if ch not in MOVES: continue dr, dc = MOVES[ch] nr, nc = r + dr, c + dc if not (0 <= nr < rows and 0 <= nc < cols): return False if desc[nr][nc] == "H": return False r, c = nr, nc if desc[nr][nc] == "G": return True return desc[r][c] == "G" def verify_path_gym(self, desc: GridDesc, udrl: str) -> bool: if not HAS_GYM: return self.verify_path_sim(desc, udrl) rows, cols = len(desc), len(desc[0]) try: env = gym.make( "FrozenLake-v1", desc=desc, map_name=f"{rows}x{cols}", is_slippery=False, render_mode=None, ) env.reset(seed=42) success = False clean = udrl.replace(",", "").replace(" ", "").strip() if "Action plan" in clean: clean = clean.rsplit("Action plan", 1)[-1] for ch in clean: if ch not in GYM_ACTION_MAP: continue _, reward, terminated, truncated, _ = env.step(GYM_ACTION_MAP[ch]) if terminated or truncated: success = reward > 0 break env.close() return success except Exception: return self.verify_path_sim(desc, udrl) # ==================== Table I/O ==================== def encode_table(self, desc: GridDesc) -> str: size = len(desc) lines = ["| | " + " | ".join(f"Col {i+1}" for i in range(size)) + " |"] for r in range(size): mapped = [GRID_TO_TABLE[ch] for ch in desc[r]] lines.append(f"| Row {r+1} | " + " | ".join(mapped) + " |") return "\n".join(lines) def decode_table(self, text: str) -> Optional[GridDesc]: try: rows = [] for line in text.strip().splitlines(): line = line.strip() if not line or "Col" in line or "---" in line: continue parts = [p.strip() for p in line.split("|")] clean = [p for p in parts if p] if len(clean) < 2: continue row_str = "".join( TABLE_TO_GRID[ch] for ch in clean[1:] if ch in TABLE_TO_GRID ) if row_str: rows.append(row_str) return rows if rows else None except Exception: return None def save_table(self, filepath: str, desc: GridDesc) -> None: with open(filepath, "w") as f: f.write(self.encode_table(desc)) def load_table(self, filepath: str) -> Optional[GridDesc]: try: with open(filepath) as f: return self.decode_table(f.read()) except Exception: return None def find_start(self, desc: GridDesc) -> Optional[Tuple[int, int]]: for r, row in enumerate(desc): for c, ch in enumerate(row): if ch == "S": return (r, c) return None def fingerprint(self, desc: GridDesc) -> str: return "".join(desc) # ==================== Rendering ==================== def render_gym(self, desc: GridDesc) -> Optional[Image.Image]: if not HAS_GYM: return None try: env = gym.make( "FrozenLake-v1", desc=desc, is_slippery=False, render_mode="rgb_array", ) env.reset() rgb = env.render() env.close() return Image.fromarray(rgb).resize( (self.img_size, self.img_size), Image.NEAREST ) except Exception: return None def render_simple(self, desc: GridDesc) -> Image.Image: """Float-aligned renderer (handles non-power-of-2 sizes correctly).""" size = len(desc) cell_f = self.img_size / size img = Image.new("RGB", (self.img_size, self.img_size), (255, 255, 255)) draw = ImageDraw.Draw(img) colors = { "S": (0, 0, 255), "F": (200, 220, 255), "H": (80, 80, 80), "G": (0, 200, 0), } for r in range(size): for c in range(size): x0 = int(round(c * cell_f)) y0 = int(round(r * cell_f)) x1 = int(round((c + 1) * cell_f)) - 1 y1 = int(round((r + 1) * cell_f)) - 1 draw.rectangle( [x0, y0, x1, y1], fill=colors.get(desc[r][c], (200, 220, 255)), ) for i in range(size + 1): pos = int(round(i * cell_f)) draw.line([(pos, 0), (pos, self.img_size)], fill="black", width=1) draw.line([(0, pos), (self.img_size, pos)], fill="black", width=1) return img def render(self, desc: GridDesc, use_gym: bool = True) -> Image.Image: if use_gym: img = self.render_gym(desc) if img is not None: return img return self.render_simple(desc) def draw_solution_line( self, image: Image.Image, path: List[Tuple[int, int]], grid_size: int, ) -> Image.Image: draw = ImageDraw.Draw(image) w, h = image.size cw, ch_ = w / grid_size, h / grid_size pts = [(c * cw + cw / 2, r * ch_ + ch_ / 2) for r, c in path] draw.line(pts, fill=self.path_color, width=max(1, int(cw / 4)), joint="curve") return image # ==================== Video Frames ==================== def generate_video_frames( self, desc: GridDesc, path: List[Tuple[int, int]], n_start: int = 5, m_end: int = 5, frames: Optional[int] = None, use_gym: bool = True, ) -> List[Image.Image]: size = len(desc) n_steps = len(path) - 1 base_img = self.render(desc, use_gym=use_gym) if n_steps <= 0: return [base_img] * (n_start + m_end + 1) content = frames if frames is not None else n_steps content = max(1, content) result = [base_img.copy() for _ in range(n_start)] def _partial(steps): return self.draw_solution_line(base_img.copy(), path[:steps+1], size) if content == n_steps: for s in range(1, n_steps + 1): result.append(_partial(s)) elif content > n_steps: for s in range(1, n_steps + 1): lo = (s - 1) * content // n_steps hi = s * content // n_steps frame = _partial(s) result.append(frame) for _ in range(hi - lo - 1): result.append(frame.copy()) else: for f in range(content): result.append(_partial((f + 1) * n_steps // content)) result.extend([_partial(n_steps).copy() for _ in range(m_end)]) return result # ==================== Red-Path Extraction ==================== def extract_path_from_pixels( self, pixels: np.ndarray, rows: int, cols: int, start: Tuple[int, int], desc: Optional[GridDesc] = None, pixel_threshold: float = 0.01, ) -> str: """Detect red path (float-aligned cells to match renderer).""" img = Image.fromarray(pixels) w, h = img.size px = np.array(img, dtype=float) r_ch, g_ch, b_ch = px[:, :, 0], px[:, :, 1], px[:, :, 2] red_mask = (r_ch > 100) & (r_ch > g_ch * 1.2) & (r_ch > b_ch * 1.2) cell_h_f, cell_w_f = h / rows, w / cols path_grid = np.zeros((rows, cols), dtype=bool) for r in range(rows): y0 = int(round(r * cell_h_f)) y1 = int(round((r + 1) * cell_h_f)) for c in range(cols): x0 = int(round(c * cell_w_f)) x1 = int(round((c + 1) * cell_w_f)) sub = red_mask[y0:y1, x0:x1] if sub.size > 0 and np.mean(sub) > pixel_threshold: path_grid[r, c] = True visited = {start} cr, cc = start actions: List[str] = [] for _ in range(rows * cols * 2): found = False for act, (dr, dc) in [("R",(0,1)),("D",(1,0)),("L",(0,-1)),("U",(-1,0))]: nr, nc = cr + dr, cc + dc if 0 <= nr < rows and 0 <= nc < cols: if path_grid[nr, nc] and (nr, nc) not in visited: visited.add((nr, nc)) actions.append(act) cr, cc = nr, nc found = True break if not found: break return "".join(actions) def extract_path_from_image(self, img_path, rows, cols, start, desc=None): try: pixels = np.array(Image.open(img_path).convert("RGB")) return self.extract_path_from_pixels(pixels, rows, cols, start, desc) except Exception: return "" if __name__ == "__main__": import time proc = FrozenLakeProcessor(img_size=512) # ---- Benchmark: yield rate ---- print("=== Yield Rate: random vs guided ===") for sz in [8, 16, 32]: min_len = max(1, int(sz * sz * 0.1)) random.seed(42) t0 = time.perf_counter() found_r = 0 for _ in range(500): desc = proc._random_layout(sz, 0.8) path = proc.solve(desc) if path and (len(path) - 1) >= min_len: found_r += 1 t_rand = time.perf_counter() - t0 random.seed(42) t0 = time.perf_counter() found_g = 0 for _ in range(50): try: desc, path = proc.generate_guided(sz, 0.8, min_len, max_attempts=5) found_g += 1 except RuntimeError: pass t_guid = time.perf_counter() - t0 print(f" Size {sz:2d} (min={min_len:3d}): " f"random={found_r}/500 ({found_r/5:.1f}%) {t_rand:.2f}s | " f"guided={found_g}/50 ({found_g*2:.0f}%) {t_guid:.2f}s") # ---- generate_auto all sizes ---- print("\n=== generate_auto ===") for sz in [8, 16, 32, 64]: min_len = max(1, int(sz * sz * 0.1)) random.seed(42) t0 = time.perf_counter() desc, path = proc.generate_auto(sz, 0.8, min_len) elapsed = time.perf_counter() - t0 udrl = proc.path_to_udrl(path) ok = proc.verify_path_sim(desc, udrl) print(f" Size {sz:2d}: path={len(path)-1:3d} (min={min_len:3d}) " f"verify={ok} {elapsed:.3f}s") # ---- Extract round-trip (works for random-mode, guided corridors are too winding) ---- print("\n=== Extract round-trip ===") for sz in [8, 16, 24, 32]: random.seed(42 + sz) # Use random mode for smaller sizes (natural-looking grids) min_len = max(1, sz) try: desc, path = proc.generate(sz, 0.8, min_len, max_attempts=1000) except RuntimeError: desc, path = proc.generate_guided(sz, 0.8, min_len) img = proc.render(desc, use_gym=False) sol = proc.draw_solution_line(img.copy(), path, sz) start = proc.find_start(desc) extracted = proc.extract_path_from_pixels(np.array(sol), sz, sz, start) ok = proc.verify_path_sim(desc, extracted) print(f" Size {sz:2d}: verify={ok} " f"(GT={len(path)-1}, extracted={len(extracted)})") print("\nAll tests passed ✓")