RFTSystems commited on
Commit
d90b4e9
·
verified ·
1 Parent(s): 16545b4

Create replayproof_sim.py

Browse files
Files changed (1) hide show
  1. replayproof_sim.py +342 -0
replayproof_sim.py ADDED
@@ -0,0 +1,342 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # replayproof_sim.py
2
+ from __future__ import annotations
3
+
4
+ import hashlib
5
+ from dataclasses import dataclass
6
+ from typing import Dict, Any, List, Tuple, Optional
7
+
8
+ import numpy as np
9
+ from PIL import Image, ImageDraw
10
+
11
+ # Tile encoding
12
+ T_UNKNOWN = -1
13
+ T_EMPTY = 0
14
+ T_WALL = 1
15
+ T_COIN = 2
16
+ T_HAZARD = 3
17
+ T_GOAL = 4
18
+ T_AGENT = 5
19
+
20
+ ACTIONS = ["UP", "DOWN", "LEFT", "RIGHT", "WAIT"]
21
+
22
+
23
+ @dataclass
24
+ class SimConfig:
25
+ size: int = 12
26
+ walls_pct: float = 0.18
27
+ coins: int = 5
28
+ hazards: int = 4
29
+ pov_radius: int = 4
30
+ max_steps: int = 2000
31
+
32
+ def to_dict(self) -> Dict[str, Any]:
33
+ return {
34
+ "size": int(self.size),
35
+ "walls_pct": float(self.walls_pct),
36
+ "coins": int(self.coins),
37
+ "hazards": int(self.hazards),
38
+ "pov_radius": int(self.pov_radius),
39
+ "max_steps": int(self.max_steps),
40
+ }
41
+
42
+
43
+ @dataclass
44
+ class SimState:
45
+ cfg: SimConfig
46
+ seed: int
47
+ rng_state_tag: int # lightweight tag to pin reset RNG usage deterministically
48
+ grid: np.ndarray # int8 (N,N) tiles excluding agent overlay
49
+ agent_xy: Tuple[int, int]
50
+ goal_xy: Tuple[int, int]
51
+ score: int
52
+ step: int
53
+ done: bool
54
+ last_state_sha256: Optional[str] = None
55
+
56
+ def clone(self) -> "SimState":
57
+ return SimState(
58
+ cfg=self.cfg,
59
+ seed=int(self.seed),
60
+ rng_state_tag=int(self.rng_state_tag),
61
+ grid=self.grid.copy(),
62
+ agent_xy=(int(self.agent_xy[0]), int(self.agent_xy[1])),
63
+ goal_xy=(int(self.goal_xy[0]), int(self.goal_xy[1])),
64
+ score=int(self.score),
65
+ step=int(self.step),
66
+ done=bool(self.done),
67
+ last_state_sha256=self.last_state_sha256,
68
+ )
69
+
70
+
71
+ def _sha256_hex(b: bytes) -> str:
72
+ return hashlib.sha256(b).hexdigest()
73
+
74
+
75
+ def _state_hash(state: SimState) -> str:
76
+ N = int(state.cfg.size)
77
+ ax, ay = state.agent_xy
78
+ gx, gy = state.goal_xy
79
+ header = np.array(
80
+ [N, ax, ay, gx, gy, int(state.score), int(state.step), int(state.done), int(state.rng_state_tag)],
81
+ dtype=np.int32,
82
+ ).tobytes()
83
+ grid_bytes = state.grid.astype(np.int8).tobytes()
84
+ return _sha256_hex(header + grid_bytes)
85
+
86
+
87
+ def _in_bounds(N: int, x: int, y: int) -> bool:
88
+ return 0 <= x < N and 0 <= y < N
89
+
90
+
91
+ def reset_sim(cfg: SimConfig, seed: int) -> SimState:
92
+ rng = np.random.RandomState(int(seed))
93
+ N = int(cfg.size)
94
+
95
+ grid = np.zeros((N, N), dtype=np.int8)
96
+
97
+ # Border walls
98
+ grid[0, :] = T_WALL
99
+ grid[N - 1, :] = T_WALL
100
+ grid[:, 0] = T_WALL
101
+ grid[:, N - 1] = T_WALL
102
+
103
+ # Random internal walls
104
+ internal = (rng.rand(N, N) < float(cfg.walls_pct)).astype(np.int8) * T_WALL
105
+ internal[0, :] = 0
106
+ internal[N - 1, :] = 0
107
+ internal[:, 0] = 0
108
+ internal[:, N - 1] = 0
109
+ grid = np.maximum(grid, internal).astype(np.int8)
110
+
111
+ # Fixed start/goal
112
+ agent_xy = (1, 1)
113
+ goal_xy = (N - 2, N - 2)
114
+ grid[agent_xy[1], agent_xy[0]] = T_EMPTY
115
+ grid[goal_xy[1], goal_xy[0]] = T_GOAL
116
+
117
+ # Collect empty cells
118
+ empties = [
119
+ (x, y)
120
+ for y in range(1, N - 1)
121
+ for x in range(1, N - 1)
122
+ if grid[y, x] == T_EMPTY and (x, y) not in (agent_xy, goal_xy)
123
+ ]
124
+ rng.shuffle(empties)
125
+
126
+ # Place coins
127
+ for i in range(min(int(cfg.coins), len(empties))):
128
+ x, y = empties[i]
129
+ grid[y, x] = T_COIN
130
+
131
+ # Place hazards
132
+ start_idx = min(int(cfg.coins), len(empties))
133
+ for i in range(start_idx, min(start_idx + int(cfg.hazards), len(empties))):
134
+ x, y = empties[i]
135
+ grid[y, x] = T_HAZARD
136
+
137
+ st = SimState(
138
+ cfg=cfg,
139
+ seed=int(seed),
140
+ rng_state_tag=int(rng.randint(0, 2**31 - 1)),
141
+ grid=grid,
142
+ agent_xy=agent_xy,
143
+ goal_xy=goal_xy,
144
+ score=0,
145
+ step=0,
146
+ done=False,
147
+ last_state_sha256=None,
148
+ )
149
+ st.last_state_sha256 = _state_hash(st)
150
+ return st
151
+
152
+
153
+ def _agent_policy(cfg: SimConfig, state: SimState) -> str:
154
+ # Deterministic greedy: prefer moves that reduce Manhattan distance to goal,
155
+ # avoid walls. No randomness, so replay is stable.
156
+ ax, ay = state.agent_xy
157
+ gx, gy = state.goal_xy
158
+
159
+ candidates: List[Tuple[str, int, int]] = []
160
+ if gx > ax:
161
+ candidates.append(("RIGHT", ax + 1, ay))
162
+ if gx < ax:
163
+ candidates.append(("LEFT", ax - 1, ay))
164
+ if gy > ay:
165
+ candidates.append(("DOWN", ax, ay + 1))
166
+ if gy < ay:
167
+ candidates.append(("UP", ax, ay - 1))
168
+
169
+ # Fallback order (still deterministic)
170
+ candidates += [
171
+ ("UP", ax, ay - 1),
172
+ ("DOWN", ax, ay + 1),
173
+ ("LEFT", ax - 1, ay),
174
+ ("RIGHT", ax + 1, ay),
175
+ ("WAIT", ax, ay),
176
+ ]
177
+
178
+ N = int(cfg.size)
179
+ for a, nx, ny in candidates:
180
+ if not _in_bounds(N, nx, ny):
181
+ continue
182
+ if int(state.grid[ny, nx]) == T_WALL:
183
+ continue
184
+ return a
185
+ return "WAIT"
186
+
187
+
188
+ def step_sim(cfg: SimConfig, state: SimState) -> Tuple[SimState, str]:
189
+ if state.done:
190
+ return state, "WAIT"
191
+
192
+ action = _agent_policy(cfg, state)
193
+ ax, ay = state.agent_xy
194
+ nx, ny = ax, ay
195
+
196
+ if action == "UP":
197
+ ny -= 1
198
+ elif action == "DOWN":
199
+ ny += 1
200
+ elif action == "LEFT":
201
+ nx -= 1
202
+ elif action == "RIGHT":
203
+ nx += 1
204
+ elif action == "WAIT":
205
+ pass
206
+
207
+ new = state.clone()
208
+ new.step += 1
209
+
210
+ N = int(cfg.size)
211
+ if (not _in_bounds(N, nx, ny)) or int(new.grid[ny, nx]) == T_WALL:
212
+ nx, ny = ax, ay # blocked
213
+
214
+ tile = int(new.grid[ny, nx])
215
+ if tile == T_COIN:
216
+ new.score += 1
217
+ new.grid[ny, nx] = T_EMPTY
218
+ elif tile == T_HAZARD:
219
+ new.score -= 2 # hazard persists
220
+ elif tile == T_GOAL:
221
+ new.score += 10
222
+ new.done = True
223
+
224
+ new.agent_xy = (nx, ny)
225
+
226
+ if new.step >= int(cfg.max_steps):
227
+ new.done = True
228
+
229
+ new.last_state_sha256 = _state_hash(new)
230
+ return new, action
231
+
232
+
233
+ def observation_array(state: SimState) -> np.ndarray:
234
+ # Partial observability: tiles outside radius are unknown
235
+ N = int(state.cfg.size)
236
+ r = int(state.cfg.pov_radius)
237
+ ax, ay = state.agent_xy
238
+
239
+ obs = np.full((N, N), T_UNKNOWN, dtype=np.int8)
240
+
241
+ y0, y1 = max(0, ay - r), min(N, ay + r + 1)
242
+ x0, x1 = max(0, ax - r), min(N, ax + r + 1)
243
+
244
+ obs[y0:y1, x0:x1] = state.grid[y0:y1, x0:x1]
245
+ obs[ay, ax] = T_AGENT
246
+ return obs
247
+
248
+
249
+ def observation_sha256(state: SimState) -> str:
250
+ obs = observation_array(state)
251
+ return _sha256_hex(obs.astype(np.int8).tobytes())
252
+
253
+
254
+ # -----------------------------
255
+ # Rendering (simple pixel art)
256
+ # -----------------------------
257
+ _BG = (10, 14, 22)
258
+ _GRID = (38, 52, 80)
259
+ _WALL = (160, 170, 190)
260
+ _EMPTY = (18, 24, 36)
261
+ _COIN = (240, 210, 60)
262
+ _HAZ = (255, 90, 90)
263
+ _GOAL = (120, 255, 170)
264
+ _AGENT = (120, 180, 255)
265
+ _UNKNOWN = (0, 0, 0)
266
+
267
+ CELL = 24
268
+ PAD = 12
269
+
270
+
271
+ def _tile_color(t: int):
272
+ if t == T_WALL:
273
+ return _WALL
274
+ if t == T_COIN:
275
+ return _COIN
276
+ if t == T_HAZARD:
277
+ return _HAZ
278
+ if t == T_GOAL:
279
+ return _GOAL
280
+ if t == T_AGENT:
281
+ return _AGENT
282
+ if t == T_UNKNOWN:
283
+ return _UNKNOWN
284
+ return _EMPTY
285
+
286
+
287
+ def render_world_image(state: SimState) -> Image.Image:
288
+ N = int(state.cfg.size)
289
+ w = PAD * 2 + N * CELL
290
+ h = PAD * 2 + N * CELL + 44
291
+
292
+ img = Image.new("RGB", (w, h), _BG)
293
+ d = ImageDraw.Draw(img)
294
+
295
+ d.text((PAD, 10), f"World | seed={state.seed} step={state.step} score={state.score}", fill=(235, 235, 235))
296
+
297
+ ox, oy = PAD, PAD + 34
298
+ for y in range(N):
299
+ for x in range(N):
300
+ t = int(state.grid[y, x])
301
+ if (x, y) == state.agent_xy:
302
+ t = T_AGENT
303
+ c = _tile_color(t)
304
+ x0 = ox + x * CELL
305
+ y0 = oy + y * CELL
306
+ d.rectangle([x0, y0, x0 + CELL - 1, y0 + CELL - 1], fill=c)
307
+ d.rectangle([x0, y0, x0 + CELL - 1, y0 + CELL - 1], outline=_GRID)
308
+
309
+ hs = (state.last_state_sha256 or "")[:16]
310
+ d.text((PAD, h - 18), f"state_hash={hs}", fill=(170, 170, 170))
311
+ return img
312
+
313
+
314
+ def render_pov_image(state: SimState) -> Image.Image:
315
+ N = int(state.cfg.size)
316
+ obs = observation_array(state)
317
+
318
+ w = PAD * 2 + N * CELL
319
+ h = PAD * 2 + N * CELL + 44
320
+
321
+ img = Image.new("RGB", (w, h), _BG)
322
+ d = ImageDraw.Draw(img)
323
+
324
+ d.text(
325
+ (PAD, 10),
326
+ f"Agent POV | radius={state.cfg.pov_radius} obs_hash={observation_sha256(state)[:12]}",
327
+ fill=(235, 235, 235),
328
+ )
329
+
330
+ ox, oy = PAD, PAD + 34
331
+ for y in range(N):
332
+ for x in range(N):
333
+ t = int(obs[y, x])
334
+ c = _tile_color(t)
335
+ x0 = ox + x * CELL
336
+ y0 = oy + y * CELL
337
+ d.rectangle([x0, y0, x0 + CELL - 1, y0 + CELL - 1], fill=c)
338
+ d.rectangle([x0, y0, x0 + CELL - 1, y0 + CELL - 1], outline=_GRID)
339
+
340
+ hs = (state.last_state_sha256 or "")[:16]
341
+ d.text((PAD, h - 18), f"state_hash={hs}", fill=(170, 170, 170))
342
+ return img