import math import random import json import os import numpy as np import gradio as gr # ============================================================ # RFT Predator Space β€” First-Person Observer View (Pseudo-3D) # FIXES: # 1) NO flashing progression panel (do NOT update it on timer ticks) # 2) AutoRun works in first-person POV (autopilot moves current POV agent) # 3) queue() enabled for reliable timer updates on Spaces # ============================================================ # ----------------------------- # View config (render) # ----------------------------- VIEW_W, VIEW_H = 560, 360 RAY_W = 280 FOV_DEG = 78 MAX_DEPTH = 18 MOVE_STEP = 1 AUTO_TICK_HZ = 8 SKY = np.array([14, 16, 26], dtype=np.uint8) FLOOR_NEAR = np.array([20, 22, 34], dtype=np.uint8) FLOOR_FAR = np.array([10, 11, 18], dtype=np.uint8) WALL_BASE = np.array([210, 210, 225], dtype=np.uint8) WALL_SIDE = np.array([150, 150, 170], dtype=np.uint8) AGENT_OTHER_COLOR = np.array([255, 140, 90], dtype=np.uint8) # billboard for the "other" observer RETICLE = np.array([120, 190, 255], dtype=np.uint8) # 0=E,1=S,2=W,3=N DIRS = [(1,0),(0,1),(-1,0),(0,-1)] ORI_DEG = [0, 90, 180, 270] DIR_TO_ORI = {(1,0):0, (0,1):1, (-1,0):2, (0,-1):3} # ----------------------------- # Progression / unlocks # ----------------------------- MAP_UNLOCKS = [ ("Training Bay", 0), ("Arena+", 1), ("Corridor Maze", 3), ("Rooms", 6), ("Labyrinth", 10), ("Dense Field", 15), ] # ----------------------------- # Saves # ----------------------------- SAVE_DIR = "saves" os.makedirs(SAVE_DIR, exist_ok=True) def _slot_path(slot: str) -> str: slot = (slot or "slot1").strip().replace(" ", "_") if not slot: slot = "slot1" if not slot.lower().endswith(".json"): slot += ".json" return os.path.join(SAVE_DIR, slot) def list_save_slots(): try: files = [fn for fn in os.listdir(SAVE_DIR) if fn.lower().endswith(".json")] files.sort() return files except Exception: return [] # ----------------------------- # Utility # ----------------------------- def clamp(x, lo, hi): return lo if x < lo else hi if x > hi else x def angle_diff_rad(a, b): return (a - b + math.pi) % (2*math.pi) - math.pi def seeded_rng(seed: int): return random.Random(int(seed) & 0xFFFFFFFF) def neighbors4(x, y): return [(x+1,y),(x-1,y),(x,y+1),(x,y-1)] def bfs_reachable(grid, start): H, W = grid.shape sx, sy = start if grid[sy, sx] == 1: return set() q = [(sx, sy)] seen = set([(sx, sy)]) while q: x, y = q.pop(0) for nx, ny in neighbors4(x, y): if 0 <= nx < W and 0 <= ny < H and (nx, ny) not in seen and grid[ny, nx] == 0: seen.add((nx, ny)) q.append((nx, ny)) return seen def pick_spawn_pair(grid, rng, min_dist=8): H, W = grid.shape empties = [(x, y) for y in range(1, H-1) for x in range(1, W-1) if grid[y, x] == 0] rng.shuffle(empties) for pred in empties[:800]: reach = bfs_reachable(grid, pred) if len(reach) < 30: continue candidates = [p for p in reach if (p[0]-pred[0])**2 + (p[1]-pred[1])**2 >= min_dist*min_dist] if candidates: prey = rng.choice(candidates) return pred, prey pred = empties[0] if empties else (1, 1) prey = empties[-1] if len(empties) > 1 else (2, 2) return pred, prey def add_border_walls(grid): H, W = grid.shape grid[0, :] = 1 grid[H-1, :] = 1 grid[:, 0] = 1 grid[:, W-1] = 1 return grid def compute_unlocks(catches: int): unlocked = set() for name, need in MAP_UNLOCKS: if catches >= need: unlocked.add(name) return unlocked # ----------------------------- # Map generators (deterministic by seed) # ----------------------------- def map_training(seed, w=23, h=23): rng = seeded_rng(seed) grid = np.zeros((h, w), dtype=np.int8) add_border_walls(grid) for y in range(2, h-2): for x in range(2, w-2): if rng.random() < 0.08: grid[y, x] = 1 return grid def map_arena_plus(seed, w=23, h=23): rng = seeded_rng(seed) grid = np.zeros((h, w), dtype=np.int8) add_border_walls(grid) cx, cy = w//2, h//2 for y in range(1, h-1): for x in range(1, w-1): r2 = (x-cx)**2 + (y-cy)**2 if 36 <= r2 <= 44 and rng.random() < 0.85: grid[y, x] = 1 for _ in range(8): x = rng.randint(3, w-4) y = rng.randint(3, h-4) grid[y, x] = 1 return grid def map_corridor_maze(seed, w=23, h=23): rng = seeded_rng(seed) grid = np.ones((h, w), dtype=np.int8) add_border_walls(grid) for y in range(1, h-1): for x in range(1, w-1): if x % 2 == 1 and y % 2 == 1: grid[y, x] = 0 start = (1, 1) stack = [start] visited = set([start]) def carve_between(a, b): ax, ay = a; bx, by = b mx, my = (ax+bx)//2, (ay+by)//2 grid[my, mx] = 0 while stack: x, y = stack[-1] dirs = [(2,0),(-2,0),(0,2),(0,-2)] rng.shuffle(dirs) moved = False for dx, dy in dirs: nx, ny = x+dx, y+dy if 1 <= nx < w-1 and 1 <= ny < h-1 and (nx, ny) not in visited: visited.add((nx, ny)) carve_between((x, y), (nx, ny)) stack.append((nx, ny)) moved = True break if not moved: stack.pop() grid[1,1] = 0 grid[1,2] = 0 grid[2,1] = 0 return grid def map_rooms(seed, w=25, h=25): rng = seeded_rng(seed) grid = np.ones((h, w), dtype=np.int8) add_border_walls(grid) rooms = [] for _ in range(10): rw = rng.randint(4, 7) rh = rng.randint(4, 7) rx = rng.randint(1, w-rw-2) ry = rng.randint(1, h-rh-2) grid[ry:ry+rh, rx:rx+rw] = 0 rooms.append((rx, ry, rw, rh)) for i in range(len(rooms)-1): x1 = rooms[i][0] + rooms[i][2]//2 y1 = rooms[i][1] + rooms[i][3]//2 x2 = rooms[i+1][0] + rooms[i+1][2]//2 y2 = rooms[i+1][1] + rooms[i+1][3]//2 if rng.random() < 0.5: grid[y1, min(x1,x2):max(x1,x2)+1] = 0 grid[min(y1,y2):max(y1,y2)+1, x2] = 0 else: grid[min(y1,y2):max(y1,y2)+1, x1] = 0 grid[y2, min(x1,x2):max(x1,x2)+1] = 0 return grid def map_labyrinth(seed, w=31, h=23): rng = seeded_rng(seed) grid = np.zeros((h, w), dtype=np.int8) add_border_walls(grid) for y in range(1, h-1): for x in range(1, w-1): if (x % 2 == 0 and rng.random() < 0.85) or (y % 3 == 0 and rng.random() < 0.55): grid[y, x] = 1 for x in range(1, w-1): grid[h//2, x] = 0 for x in range(3, w-3, 6): for y in range(2, h-2): if rng.random() < 0.75: grid[y, x] = 0 return grid def map_dense_field(seed, w=23, h=23): rng = seeded_rng(seed) grid = np.zeros((h, w), dtype=np.int8) add_border_walls(grid) for y in range(1, h-1): for x in range(1, w-1): if rng.random() < 0.22: grid[y, x] = 1 for _ in range(6): cx = rng.randint(3, w-4) cy = rng.randint(3, h-4) for yy in range(cy-2, cy+3): for xx in range(cx-2, cx+3): if 1 <= xx < w-1 and 1 <= yy < h-1: grid[yy, xx] = 0 return grid MAP_BUILDERS = { "Training Bay": map_training, "Arena+": map_arena_plus, "Corridor Maze": map_corridor_maze, "Rooms": map_rooms, "Labyrinth": map_labyrinth, "Dense Field": map_dense_field, } # ----------------------------- # State construction # ----------------------------- def build_state(seed, map_name, progress=None, override=None): rng = seeded_rng(seed) grid = MAP_BUILDERS[map_name](seed) pred, prey = pick_spawn_pair(grid, rng, min_dist=8) pred_ori = rng.randint(0, 3) prey_ori = (pred_ori + 2) % 4 if progress is None: progress = {"catches": 0, "unlocked": compute_unlocks(0)} st = { "seed": int(seed), "grid": grid, "pred": pred, "prey": prey, "ori": pred_ori, "prey_ori": prey_ori, "control": "pred", # "pred" or "prey" (view + manual inputs) "overlay": False, # coherence overlay "disturbance": 0.0, "last_impulse": 0.0, "step": 0, "caught": False, "auto_chase": False, "auto_run": False, "log": [f"Reset into map: {map_name}"], "map_name": map_name, "progress": progress, } if override: for k, v in override.items(): if k == "grid": continue st[k] = v return st # ----------------------------- # Save / Load helpers # ----------------------------- def serialize_state(st): catches = int(st["progress"]["catches"]) payload = { "version": 2, "seed": int(st["seed"]), "map_name": str(st["map_name"]), "step": int(st["step"]), "pred": [int(st["pred"][0]), int(st["pred"][1])], "prey": [int(st["prey"][0]), int(st["prey"][1])], "ori": int(st["ori"]), "prey_ori": int(st.get("prey_ori", 0)), "control": str(st.get("control", "pred")), "overlay": bool(st.get("overlay", False)), "disturbance": float(st.get("disturbance", 0.0)), "caught": bool(st["caught"]), "auto_chase": bool(st["auto_chase"]), "auto_run": bool(st["auto_run"]), "catches": catches, "log_tail": st["log"][-20:], } return payload def deserialize_state(payload): seed = int(payload.get("seed", 1)) map_name = str(payload.get("map_name", "Training Bay")) if map_name not in MAP_BUILDERS: map_name = "Training Bay" catches = int(payload.get("catches", 0)) progress = {"catches": catches, "unlocked": compute_unlocks(catches)} override = { "step": int(payload.get("step", 0)), "pred": tuple(payload.get("pred", [1, 1])), "prey": tuple(payload.get("prey", [2, 2])), "ori": int(payload.get("ori", 0)) % 4, "prey_ori": int(payload.get("prey_ori", 0)) % 4, "control": str(payload.get("control", "pred")) if str(payload.get("control", "pred")) in ("pred", "prey") else "pred", "overlay": bool(payload.get("overlay", False)), "disturbance": float(payload.get("disturbance", 0.0)), "last_impulse": 0.0, "caught": bool(payload.get("caught", False)), "auto_chase": bool(payload.get("auto_chase", False)), "auto_run": bool(payload.get("auto_run", False)), "log": (payload.get("log_tail", []) or [])[:], } st = build_state(seed, map_name, progress=progress, override=override) grid = st["grid"] H, W = grid.shape px, py = st["pred"] qx, qy = st["prey"] ok = ( 0 <= px < W and 0 <= py < H and 0 <= qx < W and 0 <= qy < H and grid[py, px] == 0 and grid[qy, qx] == 0 ) if not ok: rng = seeded_rng(seed + 777) st["pred"], st["prey"] = pick_spawn_pair(grid, rng, min_dist=8) st["log"].append("Loaded save had invalid positions; respawned safely.") st["log"].append("Loaded save.") return st def save_to_path(st, path): payload = serialize_state(st) with open(path, "w", encoding="utf-8") as f: json.dump(payload, f, indent=2) st["log"].append(f"Saved to: {path}") def load_from_path(path): with open(path, "r", encoding="utf-8") as f: payload = json.load(f) return deserialize_state(payload) # ----------------------------- # Perception + rendering # ----------------------------- def los_clear(grid, a, b): ax, ay = a[0] + 0.5, a[1] + 0.5 bx, by = b[0] + 0.5, b[1] + 0.5 dx, dy = bx - ax, by - ay dist = math.hypot(dx, dy) if dist < 1e-6: return True dx /= dist dy /= dist x, y = ax, ay steps = int(dist * 20) H, W = grid.shape for _ in range(steps): x += dx * (dist / steps) y += dy * (dist / steps) cx, cy = int(x), int(y) cx = clamp(cx, 0, W-1) cy = clamp(cy, 0, H-1) if grid[cy, cx] == 1: return False return True def dda_raycast(grid, px, py, ray_dx, ray_dy, max_depth=MAX_DEPTH): H, W = grid.shape map_x = int(px) map_y = int(py) delta_dist_x = abs(1.0 / ray_dx) if abs(ray_dx) > 1e-9 else 1e9 delta_dist_y = abs(1.0 / ray_dy) if abs(ray_dy) > 1e-9 else 1e9 if ray_dx < 0: step_x = -1 side_dist_x = (px - map_x) * delta_dist_x else: step_x = 1 side_dist_x = (map_x + 1.0 - px) * delta_dist_x if ray_dy < 0: step_y = -1 side_dist_y = (py - map_y) * delta_dist_y else: step_y = 1 side_dist_y = (map_y + 1.0 - py) * delta_dist_y hit = False side = 0 for _ in range(max_depth * 10): if side_dist_x < side_dist_y: side_dist_x += delta_dist_x map_x += step_x side = 0 else: side_dist_y += delta_dist_y map_y += step_y side = 1 if map_x < 0 or map_x >= W or map_y < 0 or map_y >= H: break if grid[map_y, map_x] == 1: hit = True break if not hit: return max_depth, 0, map_x, map_y if side == 0: denom = ray_dx if abs(ray_dx) > 1e-9 else 1e-9 perp = (map_x - px + (1 - step_x) / 2) / denom else: denom = ray_dy if abs(ray_dy) > 1e-9 else 1e-9 perp = (map_y - py + (1 - step_y) / 2) / denom perp = abs(perp) perp = clamp(perp, 0.0005, max_depth) return perp, side, map_x, map_y def _apply_coherence_overlay(img, disturbance: float): d = float(disturbance) if d <= 0.001: return img alpha = clamp(d * 0.06, 0.0, 0.22) # subtle h, w, _ = img.shape cx, cy = w // 2, h // 2 edge = int(min(w, h) * 0.08) if edge >= 2: tint = np.array([22, 8, 18], dtype=np.float32) img[:edge, :, :] = np.clip(img[:edge, :, :].astype(np.float32) + tint * alpha, 0, 255).astype(np.uint8) img[h-edge:, :, :] = np.clip(img[h-edge:, :, :].astype(np.float32) + tint * alpha, 0, 255).astype(np.uint8) img[:, :edge, :] = np.clip(img[:, :edge, :].astype(np.float32) + tint * alpha, 0, 255).astype(np.uint8) img[:, w-edge:, :] = np.clip(img[:, w-edge:, :].astype(np.float32) + tint * alpha, 0, 255).astype(np.uint8) line_col = np.array([180, 70, 160], dtype=np.float32) for i in range(-40, 41): x = cx + i y = cy + int(i * 0.35) if 0 <= x < w and 0 <= y < h: img[y:y+1, x:x+1, :] = np.clip(img[y:y+1, x:x+1, :].astype(np.float32) * (1-alpha) + line_col * alpha, 0, 255).astype(np.uint8) y2 = cy - int(i * 0.35) if 0 <= x < w and 0 <= y2 < h: img[y2:y2+1, x:x+1, :] = np.clip(img[y2:y2+1, x:x+1, :].astype(np.float32) * (1-alpha) + line_col * alpha, 0, 255).astype(np.uint8) return img def render_first_person(st): grid = st["grid"] if st["control"] == "prey": view_cell = st["prey"] view_ori = st["prey_ori"] other_cell = st["pred"] else: view_cell = st["pred"] view_ori = st["ori"] other_cell = st["prey"] (cx, cy) = view_cell px = cx + 0.5 py = cy + 0.5 fov = math.radians(FOV_DEG) base = math.radians(ORI_DEG[view_ori]) img = np.zeros((VIEW_H, VIEW_W, 3), dtype=np.uint8) img[:VIEW_H//2, :, :] = SKY for y in range(VIEW_H//2, VIEW_H): t = (y - VIEW_H//2) / max(1, (VIEW_H//2 - 1)) col = (FLOOR_NEAR * (1 - t) + FLOOR_FAR * t).astype(np.uint8) img[y, :, :] = col wall_dists = np.full(RAY_W, MAX_DEPTH, dtype=np.float32) for x in range(RAY_W): u = (x / (RAY_W - 1)) if RAY_W > 1 else 0.5 ang = base + (u - 0.5) * fov ray_dx = math.cos(ang) ray_dy = math.sin(ang) dist, side, hitx, hity = dda_raycast(grid, px, py, ray_dx, ray_dy, MAX_DEPTH) dist *= math.cos(ang - base) dist = clamp(dist, 0.001, MAX_DEPTH) wall_dists[x] = dist slice_h = int((VIEW_H * 0.92) / dist) slice_h = clamp(slice_h, 1, VIEW_H) top = (VIEW_H - slice_h) // 2 bot = top + slice_h shade = 1.0 / (1.0 + dist * 0.12) shade = clamp(shade, 0.12, 1.0) base_col = WALL_SIDE if side == 1 else WALL_BASE checker = ((hitx + hity) & 1) tex = 0.90 if checker == 0 else 1.05 col = np.clip(base_col.astype(np.float32) * shade * tex, 0, 255).astype(np.uint8) x0 = int(x * VIEW_W / RAY_W) x1 = int((x + 1) * VIEW_W / RAY_W) if x1 <= x0: x1 = x0 + 1 img[top:bot, x0:x1, :] = col other_vis = False if not st["caught"] and los_clear(grid, view_cell, other_cell): vx = (other_cell[0] + 0.5) - px vy = (other_cell[1] + 0.5) - py other_dist = math.hypot(vx, vy) other_ang = math.atan2(vy, vx) rel = angle_diff_rad(other_ang, base) if abs(rel) <= fov * 0.5 and other_dist < MAX_DEPTH: other_vis = True u = (rel / fov) + 0.5 sx_ray = int(u * (RAY_W - 1)) sx_ray = clamp(sx_ray, 0, RAY_W - 1) sprite_h = int((VIEW_H * 0.75) / max(0.2, other_dist)) sprite_w = int(sprite_h * 0.45) sprite_h = clamp(sprite_h, 8, VIEW_H) sprite_w = clamp(sprite_w, 6, VIEW_W) sx = int(sx_ray * VIEW_W / RAY_W) sy = VIEW_H // 2 x0 = clamp(sx - sprite_w // 2, 0, VIEW_W - 1) x1 = clamp(sx + sprite_w // 2, 0, VIEW_W - 1) y0 = clamp(sy - sprite_h // 2, 0, VIEW_H - 1) y1 = clamp(sy + sprite_h // 2, 0, VIEW_H - 1) for vxcol in range(x0, x1): rx = int(vxcol * RAY_W / VIEW_W) rx = clamp(rx, 0, RAY_W - 1) if other_dist < wall_dists[rx]: img[y0:y1, vxcol:vxcol+1, :] = AGENT_OTHER_COLOR cxh, cyh = VIEW_W // 2, VIEW_H // 2 img[cyh-1:cyh+2, cxh-12:cxh+13, :] = RETICLE img[cyh-12:cyh+13, cxh-1:cxh+2, :] = RETICLE hud_h = 26 img[:hud_h, :, :] = np.clip(img[:hud_h, :, :].astype(np.int16) + 20, 0, 255).astype(np.uint8) def dot(x, y, c): img[y:y+6, x:x+6, :] = c dot(8, 10, np.array([90, 255, 140], np.uint8) if st["auto_chase"] else np.array([60, 60, 70], np.uint8)) dot(20, 10, np.array([120, 190, 255], np.uint8) if st["auto_run"] else np.array([60, 60, 70], np.uint8)) dot(32, 10, np.array([255, 140, 90], np.uint8) if other_vis else np.array([60, 60, 70], np.uint8)) if st.get("overlay", False): img = _apply_coherence_overlay(img, st.get("disturbance", 0.0)) return img def render_minimap(st, scale=14): grid = st["grid"] H, W = grid.shape img = np.zeros((H*scale, W*scale, 3), dtype=np.uint8) img[:, :, :] = np.array([18, 20, 32], dtype=np.uint8) wall = np.array([220, 220, 235], dtype=np.uint8) for y in range(H): for x in range(W): if grid[y, x] == 1: img[y*scale:(y+1)*scale, x*scale:(x+1)*scale, :] = wall px, py = st["pred"] qx, qy = st["prey"] pred_col = np.array([120, 190, 255], np.uint8) prey_col = np.array([255, 140, 90], np.uint8) img[py*scale:(py+1)*scale, px*scale:(px+1)*scale, :] = pred_col img[qy*scale:(qy+1)*scale, qx*scale:(qx+1)*scale, :] = prey_col dx, dy = DIRS[st["ori"]] hx, hy = px + dx, py + dy if 0 <= hx < W and 0 <= hy < H: img[hy*scale:(hy+1)*scale, hx*scale:(hx+1)*scale, :] = np.array([80, 255, 160], np.uint8) dx2, dy2 = DIRS[st["prey_ori"]] hx2, hy2 = qx + dx2, qy + dy2 if 0 <= hx2 < W and 0 <= hy2 < H: img[hy2*scale:(hy2+1)*scale, hx2*scale:(hx2+1)*scale, :] = np.array([255, 220, 120], np.uint8) if st["control"] == "pred": x0, y0 = px*scale, py*scale else: x0, y0 = qx*scale, qy*scale ring = np.array([240, 240, 140], np.uint8) img[y0:y0+scale, x0:x0+2, :] = ring img[y0:y0+scale, x0+scale-2:x0+scale, :] = ring img[y0:y0+2, x0:x0+scale, :] = ring img[y0+scale-2:y0+scale, x0:x0+scale, :] = ring return img def unlock_summary(st): catches = st["progress"]["catches"] unlocked = st["progress"]["unlocked"] lines = [] for name, need in MAP_UNLOCKS: if name in unlocked: lines.append(f"βœ… {name} (unlocked)") else: lines.append(f"πŸ”’ {name} (needs {need} catches)") return "### Map progression\n" + "\n".join(lines) + f"\n\n**Total catches:** {catches}" def status(st): pred_ori_txt = ["E", "S", "W", "N"][st["ori"]] prey_ori_txt = ["E", "S", "W", "N"][st["prey_ori"]] tail = st["log"][-10:] catches = st["progress"]["catches"] current = st["map_name"] mode = "Manual" if st["auto_run"] and st["auto_chase"]: mode = "AutoRun+AutoChase" elif st["auto_run"] and not st["auto_chase"]: mode = "Hybrid AutoRun (wander)" ctrl = "Predator" if st["control"] == "pred" else "Prey" coh = st.get("disturbance", 0.0) return ( f"Map: {current} | Catches: {catches} | Step: {st['step']} | Mode: {mode} | Control: {ctrl} | Overlay: {st.get('overlay', False)}\n" f"Predator: {st['pred']} {pred_ori_txt} | Prey: {st['prey']} {prey_ori_txt} | " f"AutoChase: {st['auto_chase']} | AutoRun: {st['auto_run']} | Caught: {st['caught']} | Coherence: {coh:.2f}\n\n" + "\n".join(tail) ) # ----------------------------- # Actions (manual + autonomous) # ----------------------------- def _add_impulse(st, x): st["last_impulse"] = float(st.get("last_impulse", 0.0)) + float(x) def _step_disturbance(st): d = float(st.get("disturbance", 0.0)) imp = float(st.get("last_impulse", 0.0)) st["disturbance"] = 0.92 * d + imp st["last_impulse"] = 0.0 def _agent_pos_ori(st, who): if who == "prey": return st["prey"], st["prey_ori"] return st["pred"], st["ori"] def _set_agent_pos_ori(st, who, pos=None, ori=None): if who == "prey": if pos is not None: st["prey"] = pos if ori is not None: st["prey_ori"] = int(ori) % 4 else: if pos is not None: st["pred"] = pos if ori is not None: st["ori"] = int(ori) % 4 def _turn(st, who, direction): if st["caught"]: return _, ori = _agent_pos_ori(st, who) ori = (ori + direction) % 4 _set_agent_pos_ori(st, who, ori=ori) _add_impulse(st, 0.9) def _forward(st, who): if st["caught"]: return (x, y), ori = _agent_pos_ori(st, who) dx, dy = DIRS[ori] nx, ny = x + dx * MOVE_STEP, y + dy * MOVE_STEP if st["grid"][ny, nx] == 0: _set_agent_pos_ori(st, who, pos=(nx, ny)) _add_impulse(st, 0.25) else: st["log"].append(f"{'Prey' if who=='prey' else 'Predator'} bumped wall.") _add_impulse(st, 0.7) def _check_catch_and_unlock(st): if st["pred"] == st["prey"] and not st["caught"]: st["caught"] = True st["log"].append("CAUGHT the prey.") st["progress"]["catches"] += 1 st["progress"]["unlocked"] = compute_unlocks(st["progress"]["catches"]) st["log"].append(f"Catches now {st['progress']['catches']}. Unlocks updated.") _add_impulse(st, 1.2) return True return False def prey_flee_step(st): if st["caught"]: return rng = seeded_rng(st["seed"] + 1337 + st["step"] * 19) px, py = st["prey"] ax, ay = st["pred"] options = [(0,0),(1,0),(-1,0),(0,1),(0,-1)] scored = [] for dx, dy in options: nx, ny = px + dx, py + dy if st["grid"][ny, nx] == 1: continue dist = (nx-ax)**2 + (ny-ay)**2 scored.append((dist + rng.random()*0.1, (nx, ny), (dx, dy))) if scored: scored.sort(reverse=True) pick = scored[0] if rng.random() < 0.78 else rng.choice(scored) _, (nx, ny), (dx, dy) = pick st["prey"] = (nx, ny) if (dx, dy) in DIR_TO_ORI and (dx, dy) != (0,0): st["prey_ori"] = DIR_TO_ORI[(dx, dy)] def predator_wander_step(st): if st["caught"]: return rng = seeded_rng(st["seed"] + 4242 + st["step"] * 23) (x, y) = st["pred"] ori = st["ori"] dx, dy = DIRS[ori] front_blocked = (st["grid"][y+dy, x+dx] == 1) r = rng.random() if front_blocked: if r < 0.5: _turn(st, "pred", -1); st["log"].append("AutoWander: avoid left.") else: _turn(st, "pred", +1); st["log"].append("AutoWander: avoid right.") else: if r < 0.72: _forward(st, "pred"); st["log"].append("AutoWander: forward.") elif r < 0.86: _turn(st, "pred", -1); st["log"].append("AutoWander: turn left.") else: _turn(st, "pred", +1); st["log"].append("AutoWander: turn right.") def predator_chase_step(st): if st["caught"]: return grid = st["grid"] px = st["pred"][0] + 0.5 py = st["pred"][1] + 0.5 base = math.radians(ORI_DEG[st["ori"]]) fov = math.radians(FOV_DEG) prey = st["prey"] if los_clear(grid, st["pred"], prey): vx = (prey[0] + 0.5) - px vy = (prey[1] + 0.5) - py ang = math.atan2(vy, vx) rel = angle_diff_rad(ang, base) if abs(rel) <= fov * 0.5: if rel < -0.10: _turn(st, "pred", -1); st["log"].append("AutoChase: turn left.") elif rel > 0.10: _turn(st, "pred", +1); st["log"].append("AutoChase: turn right.") else: _forward(st, "pred"); st["log"].append("AutoChase: forward.") return predator_wander_step(st) def prey_autopilot_step(st): # if user is viewing prey and AutoRun is on, prey should still behave like prey (flee) prey_flee_step(st) st["log"].append("AutoPrey: flee.") def tick(st): if st["caught"]: return False # unlock did not change st["step"] += 1 unlock_changed = False # AutoRun = autopilot for the currently viewed observer if st["auto_run"]: if st["control"] == "pred": if st["auto_chase"]: predator_chase_step(st) else: predator_wander_step(st) else: prey_autopilot_step(st) # The non-controlled agent still runs its own policy each step if st["control"] != "pred": if st["auto_chase"]: predator_chase_step(st) else: predator_wander_step(st) if st["control"] != "prey": prey_flee_step(st) # capture + unlock unlock_changed = _check_catch_and_unlock(st) _step_disturbance(st) if st["step"] >= 600: st["caught"] = True st["log"].append("Max steps reached (freeze).") return unlock_changed # ----------------------------- # Gradio handlers # ----------------------------- def ui_refresh_slots(current_value=None): choices = list_save_slots() if current_value and current_value in choices: value = current_value else: value = choices[0] if choices else "slot1.json" return gr.Dropdown(choices=choices if choices else ["slot1.json"], value=value) def ui_reset(seed, map_choice, st=None): seed = int(seed) progress = st["progress"] if st else {"catches": 0, "unlocked": compute_unlocks(0)} if map_choice not in progress["unlocked"]: map_choice = "Training Bay" new_st = build_state(seed, map_choice, progress=progress) if st: new_st["control"] = st.get("control", "pred") new_st["overlay"] = st.get("overlay", False) return new_st, render_first_person(new_st), render_minimap(new_st), status(new_st), unlock_summary(new_st) def ui_toggle_control(st): st["control"] = "prey" if st["control"] == "pred" else "pred" st["log"].append(f"Control switched to: {'Prey' if st['control']=='prey' else 'Predator'}.") _add_impulse(st, 0.15) return st, render_first_person(st), render_minimap(st), status(st), unlock_summary(st) def ui_turn_left(st): who = st["control"] _turn(st, who, -1) st["log"].append(f"{'Prey' if who=='prey' else 'Predator'} turn left.") unlock_changed = tick(st) if unlock_changed: return st, render_first_person(st), render_minimap(st), status(st), unlock_summary(st) return st, render_first_person(st), render_minimap(st), status(st), gr.update() # avoid re-render if unchanged def ui_turn_right(st): who = st["control"] _turn(st, who, +1) st["log"].append(f"{'Prey' if who=='prey' else 'Predator'} turn right.") unlock_changed = tick(st) if unlock_changed: return st, render_first_person(st), render_minimap(st), status(st), unlock_summary(st) return st, render_first_person(st), render_minimap(st), status(st), gr.update() def ui_forward(st): who = st["control"] _forward(st, who) st["log"].append(f"{'Prey' if who=='prey' else 'Predator'} forward.") unlock_changed = tick(st) if unlock_changed: return st, render_first_person(st), render_minimap(st), status(st), unlock_summary(st) return st, render_first_person(st), render_minimap(st), status(st), gr.update() def ui_toggle_chase(st): st["auto_chase"] = not st["auto_chase"] st["log"].append(f"AutoChase set to {st['auto_chase']}.") _add_impulse(st, 0.10) return st, render_first_person(st), render_minimap(st), status(st), gr.update() def ui_toggle_run(st): st["auto_run"] = not st["auto_run"] st["log"].append(f"AutoRun set to {st['auto_run']}.") _add_impulse(st, 0.10) return st, render_first_person(st), render_minimap(st), status(st), gr.update() def ui_toggle_overlay(st): st["overlay"] = not st.get("overlay", False) st["log"].append(f"Overlay set to {st['overlay']}.") return st, render_first_person(st), render_minimap(st), status(st), gr.update() def ui_tick(st): unlock_changed = tick(st) if unlock_changed: return st, render_first_person(st), render_minimap(st), status(st), unlock_summary(st) return st, render_first_person(st), render_minimap(st), status(st), gr.update() def ui_timer(st): # IMPORTANT: do NOT update unlock markdown here (prevents flashing) if st["auto_run"] and not st["caught"]: _ = tick(st) return st, render_first_person(st), render_minimap(st), status(st) def ui_swap_roles(st): if st["caught"]: return st, render_first_person(st), render_minimap(st), status(st), gr.update() st["pred"], st["prey"] = st["prey"], st["pred"] st["ori"], st["prey_ori"] = st["prey_ori"], st["ori"] st["log"].append("Swapped roles (Predator ⇄ Prey).") _add_impulse(st, 0.35) changed = _check_catch_and_unlock(st) if changed: return st, render_first_person(st), render_minimap(st), status(st), unlock_summary(st) return st, render_first_person(st), render_minimap(st), status(st), gr.update() # ---- Save/load UI handlers ---- def ui_save_slot(st, slot_name): try: path = _slot_path(slot_name) save_to_path(st, path) export_path = path except Exception as e: st["log"].append(f"Save failed: {e}") export_path = None dd = ui_refresh_slots(os.path.basename(export_path) if export_path else None) return st, render_first_person(st), render_minimap(st), status(st), unlock_summary(st), export_path, dd def ui_load_slot(st, selected_slot): path = os.path.join(SAVE_DIR, selected_slot) if selected_slot else _slot_path("slot1") try: if not os.path.exists(path): st["log"].append(f"No save found at {path}") dd = ui_refresh_slots(selected_slot) return st, render_first_person(st), render_minimap(st), status(st), gr.update(), None, dd loaded = load_from_path(path) dd = ui_refresh_slots(os.path.basename(path)) return loaded, render_first_person(loaded), render_minimap(loaded), status(loaded), unlock_summary(loaded), None, dd except Exception as e: st["log"].append(f"Load failed: {e}") dd = ui_refresh_slots(selected_slot) return st, render_first_person(st), render_minimap(st), status(st), gr.update(), None, dd def ui_import_save(st, uploaded_file): try: if uploaded_file is None: st["log"].append("Import: no file provided.") dd = ui_refresh_slots() return st, render_first_person(st), render_minimap(st), status(st), gr.update(), None, dd loaded = load_from_path(uploaded_file) dd = ui_refresh_slots() return loaded, render_first_person(loaded), render_minimap(loaded), status(loaded), unlock_summary(loaded), None, dd except Exception as e: st["log"].append(f"Import failed: {e}") dd = ui_refresh_slots() return st, render_first_person(st), render_minimap(st), status(st), gr.update(), None, dd # ----------------------------- # App # ----------------------------- all_map_names = [name for name, _ in MAP_UNLOCKS] initial_progress = {"catches": 0, "unlocked": compute_unlocks(0)} initial_state = build_state(seed=1, map_name="Training Bay", progress=initial_progress) initial_slots = list_save_slots() initial_slot_value = initial_slots[0] if initial_slots else "slot1.json" with gr.Blocks(title="RFT Predator Space β€” Symmetric Observers") as demo: gr.Markdown( "## Experience reality through an RFT observer agent’s perspective\n" "**Accessibility note:** the progression panel is now event-driven (no flashing).\n\n" "**AutoRun:** moves the current POV observer (first-person autopilot)." ) st = gr.State(initial_state) with gr.Row(): seed = gr.Number(label="Seed", value=1, precision=0) map_choice = gr.Dropdown(label="Map (locked unless unlocked)", choices=all_map_names, value="Training Bay") btn_reset = gr.Button("Reset") btn_control = gr.Button("Toggle Control (Pred ↔ Prey)") btn_tick = gr.Button("Tick") with gr.Row(): btn_left = gr.Button("Turn Left") btn_fwd = gr.Button("Forward") btn_right = gr.Button("Turn Right") with gr.Row(): btn_chase = gr.Button("Toggle AutoChase") btn_run = gr.Button("Toggle AutoRun") btn_overlay = gr.Button("Toggle Overlay (optional)") btn_swap = gr.Button("Swap Roles (Pred ⇄ Prey)") with gr.Row(): view = gr.Image(label="First-person observer view", type="numpy") mini = gr.Image(label="Minimap (debug)", type="numpy") with gr.Row(): info = gr.Textbox(label="Run log", lines=12) unlocks = gr.Markdown(value=unlock_summary(initial_state)) gr.Markdown("### Save / Load") with gr.Row(): slot_pick = gr.Dropdown(label="Existing saves", choices=initial_slots if initial_slots else ["slot1.json"], value=initial_slot_value) slot_name = gr.Textbox(label="New save name (optional)", value="slot1") with gr.Row(): btn_refresh = gr.Button("Refresh Saves List") btn_save = gr.Button("Save (to name)") btn_load = gr.Button("Load (selected)") with gr.Row(): export_file = gr.File(label="Exported Save File (download this)", interactive=False) import_file = gr.File(label="Import Save File (upload)", interactive=True) btn_import = gr.Button("Import (Load Uploaded File)") demo.load( lambda: (st.value, render_first_person(st.value), render_minimap(st.value), status(st.value), unlock_summary(st.value), None, ui_refresh_slots(initial_slot_value)), outputs=[st, view, mini, info, unlocks, export_file, slot_pick] ) btn_reset.click(ui_reset, inputs=[seed, map_choice, st], outputs=[st, view, mini, info, unlocks]) btn_control.click(ui_toggle_control, inputs=[st], outputs=[st, view, mini, info, unlocks]) btn_left.click(ui_turn_left, inputs=[st], outputs=[st, view, mini, info, unlocks]) btn_right.click(ui_turn_right, inputs=[st], outputs=[st, view, mini, info, unlocks]) btn_fwd.click(ui_forward, inputs=[st], outputs=[st, view, mini, info, unlocks]) btn_chase.click(ui_toggle_chase, inputs=[st], outputs=[st, view, mini, info, unlocks]) btn_run.click(ui_toggle_run, inputs=[st], outputs=[st, view, mini, info, unlocks]) btn_overlay.click(ui_toggle_overlay, inputs=[st], outputs=[st, view, mini, info, unlocks]) btn_swap.click(ui_swap_roles, inputs=[st], outputs=[st, view, mini, info, unlocks]) btn_tick.click(ui_tick, inputs=[st], outputs=[st, view, mini, info, unlocks]) btn_refresh.click(lambda cur: ui_refresh_slots(cur), inputs=[slot_pick], outputs=[slot_pick]) btn_save.click( lambda st_, name_, pick_: ui_save_slot(st_, name_ if (name_ and name_.strip()) else pick_), inputs=[st, slot_name, slot_pick], outputs=[st, view, mini, info, unlocks, export_file, slot_pick] ) btn_load.click( ui_load_slot, inputs=[st, slot_pick], outputs=[st, view, mini, info, unlocks, export_file, slot_pick] ) btn_import.click( ui_import_save, inputs=[st, import_file], outputs=[st, view, mini, info, unlocks, export_file, slot_pick] ) # Timer outputs DO NOT include unlocks (prevents flashing) if hasattr(gr, "Timer"): gr.Timer(1.0 / AUTO_TICK_HZ).tick( ui_timer, inputs=[st], outputs=[st, view, mini, info] ) # queue() helps Timer behave reliably in Spaces demo.queue().launch()