# replayproof_sim.py from __future__ import annotations import hashlib from dataclasses import dataclass from typing import Dict, Any, List, Tuple, Optional import numpy as np from PIL import Image, ImageDraw # Tile encoding T_UNKNOWN = -1 T_EMPTY = 0 T_WALL = 1 T_COIN = 2 T_HAZARD = 3 T_GOAL = 4 T_AGENT = 5 ACTIONS = ["UP", "DOWN", "LEFT", "RIGHT", "WAIT"] @dataclass class SimConfig: size: int = 12 walls_pct: float = 0.18 coins: int = 5 hazards: int = 4 pov_radius: int = 4 max_steps: int = 2000 def to_dict(self) -> Dict[str, Any]: return { "size": int(self.size), "walls_pct": float(self.walls_pct), "coins": int(self.coins), "hazards": int(self.hazards), "pov_radius": int(self.pov_radius), "max_steps": int(self.max_steps), } @dataclass class SimState: cfg: SimConfig seed: int rng_state_tag: int # lightweight tag to pin reset RNG usage deterministically grid: np.ndarray # int8 (N,N) tiles excluding agent overlay agent_xy: Tuple[int, int] goal_xy: Tuple[int, int] score: int step: int done: bool last_state_sha256: Optional[str] = None def clone(self) -> "SimState": return SimState( cfg=self.cfg, seed=int(self.seed), rng_state_tag=int(self.rng_state_tag), grid=self.grid.copy(), agent_xy=(int(self.agent_xy[0]), int(self.agent_xy[1])), goal_xy=(int(self.goal_xy[0]), int(self.goal_xy[1])), score=int(self.score), step=int(self.step), done=bool(self.done), last_state_sha256=self.last_state_sha256, ) def _sha256_hex(b: bytes) -> str: return hashlib.sha256(b).hexdigest() def _state_hash(state: SimState) -> str: N = int(state.cfg.size) ax, ay = state.agent_xy gx, gy = state.goal_xy header = np.array( [N, ax, ay, gx, gy, int(state.score), int(state.step), int(state.done), int(state.rng_state_tag)], dtype=np.int32, ).tobytes() grid_bytes = state.grid.astype(np.int8).tobytes() return _sha256_hex(header + grid_bytes) def _in_bounds(N: int, x: int, y: int) -> bool: return 0 <= x < N and 0 <= y < N def reset_sim(cfg: SimConfig, seed: int) -> SimState: rng = np.random.RandomState(int(seed)) N = int(cfg.size) grid = np.zeros((N, N), dtype=np.int8) # Border walls grid[0, :] = T_WALL grid[N - 1, :] = T_WALL grid[:, 0] = T_WALL grid[:, N - 1] = T_WALL # Random internal walls internal = (rng.rand(N, N) < float(cfg.walls_pct)).astype(np.int8) * T_WALL internal[0, :] = 0 internal[N - 1, :] = 0 internal[:, 0] = 0 internal[:, N - 1] = 0 grid = np.maximum(grid, internal).astype(np.int8) # Fixed start/goal agent_xy = (1, 1) goal_xy = (N - 2, N - 2) grid[agent_xy[1], agent_xy[0]] = T_EMPTY grid[goal_xy[1], goal_xy[0]] = T_GOAL # Collect empty cells empties = [ (x, y) for y in range(1, N - 1) for x in range(1, N - 1) if grid[y, x] == T_EMPTY and (x, y) not in (agent_xy, goal_xy) ] rng.shuffle(empties) # Place coins for i in range(min(int(cfg.coins), len(empties))): x, y = empties[i] grid[y, x] = T_COIN # Place hazards start_idx = min(int(cfg.coins), len(empties)) for i in range(start_idx, min(start_idx + int(cfg.hazards), len(empties))): x, y = empties[i] grid[y, x] = T_HAZARD st = SimState( cfg=cfg, seed=int(seed), rng_state_tag=int(rng.randint(0, 2**31 - 1)), grid=grid, agent_xy=agent_xy, goal_xy=goal_xy, score=0, step=0, done=False, last_state_sha256=None, ) st.last_state_sha256 = _state_hash(st) return st def _agent_policy(cfg: SimConfig, state: SimState) -> str: # Deterministic greedy: prefer moves that reduce Manhattan distance to goal, # avoid walls. No randomness, so replay is stable. ax, ay = state.agent_xy gx, gy = state.goal_xy candidates: List[Tuple[str, int, int]] = [] if gx > ax: candidates.append(("RIGHT", ax + 1, ay)) if gx < ax: candidates.append(("LEFT", ax - 1, ay)) if gy > ay: candidates.append(("DOWN", ax, ay + 1)) if gy < ay: candidates.append(("UP", ax, ay - 1)) # Fallback order (still deterministic) candidates += [ ("UP", ax, ay - 1), ("DOWN", ax, ay + 1), ("LEFT", ax - 1, ay), ("RIGHT", ax + 1, ay), ("WAIT", ax, ay), ] N = int(cfg.size) for a, nx, ny in candidates: if not _in_bounds(N, nx, ny): continue if int(state.grid[ny, nx]) == T_WALL: continue return a return "WAIT" def step_sim(cfg: SimConfig, state: SimState) -> Tuple[SimState, str]: if state.done: return state, "WAIT" action = _agent_policy(cfg, state) ax, ay = state.agent_xy nx, ny = ax, ay if action == "UP": ny -= 1 elif action == "DOWN": ny += 1 elif action == "LEFT": nx -= 1 elif action == "RIGHT": nx += 1 elif action == "WAIT": pass new = state.clone() new.step += 1 N = int(cfg.size) if (not _in_bounds(N, nx, ny)) or int(new.grid[ny, nx]) == T_WALL: nx, ny = ax, ay # blocked tile = int(new.grid[ny, nx]) if tile == T_COIN: new.score += 1 new.grid[ny, nx] = T_EMPTY elif tile == T_HAZARD: new.score -= 2 # hazard persists elif tile == T_GOAL: new.score += 10 new.done = True new.agent_xy = (nx, ny) if new.step >= int(cfg.max_steps): new.done = True new.last_state_sha256 = _state_hash(new) return new, action def observation_array(state: SimState) -> np.ndarray: # Partial observability: tiles outside radius are unknown N = int(state.cfg.size) r = int(state.cfg.pov_radius) ax, ay = state.agent_xy obs = np.full((N, N), T_UNKNOWN, dtype=np.int8) y0, y1 = max(0, ay - r), min(N, ay + r + 1) x0, x1 = max(0, ax - r), min(N, ax + r + 1) obs[y0:y1, x0:x1] = state.grid[y0:y1, x0:x1] obs[ay, ax] = T_AGENT return obs def observation_sha256(state: SimState) -> str: obs = observation_array(state) return _sha256_hex(obs.astype(np.int8).tobytes()) # ----------------------------- # Rendering (simple pixel art) # ----------------------------- _BG = (10, 14, 22) _GRID = (38, 52, 80) _WALL = (160, 170, 190) _EMPTY = (18, 24, 36) _COIN = (240, 210, 60) _HAZ = (255, 90, 90) _GOAL = (120, 255, 170) _AGENT = (120, 180, 255) _UNKNOWN = (0, 0, 0) CELL = 24 PAD = 12 def _tile_color(t: int): if t == T_WALL: return _WALL if t == T_COIN: return _COIN if t == T_HAZARD: return _HAZ if t == T_GOAL: return _GOAL if t == T_AGENT: return _AGENT if t == T_UNKNOWN: return _UNKNOWN return _EMPTY def render_world_image(state: SimState) -> Image.Image: N = int(state.cfg.size) w = PAD * 2 + N * CELL h = PAD * 2 + N * CELL + 44 img = Image.new("RGB", (w, h), _BG) d = ImageDraw.Draw(img) d.text((PAD, 10), f"World | seed={state.seed} step={state.step} score={state.score}", fill=(235, 235, 235)) ox, oy = PAD, PAD + 34 for y in range(N): for x in range(N): t = int(state.grid[y, x]) if (x, y) == state.agent_xy: t = T_AGENT c = _tile_color(t) x0 = ox + x * CELL y0 = oy + y * CELL d.rectangle([x0, y0, x0 + CELL - 1, y0 + CELL - 1], fill=c) d.rectangle([x0, y0, x0 + CELL - 1, y0 + CELL - 1], outline=_GRID) hs = (state.last_state_sha256 or "")[:16] d.text((PAD, h - 18), f"state_hash={hs}", fill=(170, 170, 170)) return img def render_pov_image(state: SimState) -> Image.Image: N = int(state.cfg.size) obs = observation_array(state) w = PAD * 2 + N * CELL h = PAD * 2 + N * CELL + 44 img = Image.new("RGB", (w, h), _BG) d = ImageDraw.Draw(img) d.text( (PAD, 10), f"Agent POV | radius={state.cfg.pov_radius} obs_hash={observation_sha256(state)[:12]}", fill=(235, 235, 235), ) ox, oy = PAD, PAD + 34 for y in range(N): for x in range(N): t = int(obs[y, x]) c = _tile_color(t) x0 = ox + x * CELL y0 = oy + y * CELL d.rectangle([x0, y0, x0 + CELL - 1, y0 + CELL - 1], fill=c) d.rectangle([x0, y0, x0 + CELL - 1, y0 + CELL - 1], outline=_GRID) hs = (state.last_state_sha256 or "")[:16] d.text((PAD, h - 18), f"state_hash={hs}", fill=(170, 170, 170)) return img