Update app.py
Browse files
app.py
CHANGED
|
@@ -1,11 +1,10 @@
|
|
| 1 |
import json
|
| 2 |
import math
|
| 3 |
-
import time
|
| 4 |
from dataclasses import dataclass, asdict
|
| 5 |
from typing import Dict, List, Tuple, Optional
|
| 6 |
|
| 7 |
import numpy as np
|
| 8 |
-
from PIL import Image, ImageDraw
|
| 9 |
|
| 10 |
import gradio as gr
|
| 11 |
|
|
@@ -18,7 +17,7 @@ import gradio as gr
|
|
| 18 |
# - Branching timelines (fork from any previous step)
|
| 19 |
# - Click-to-edit map tiles
|
| 20 |
#
|
| 21 |
-
#
|
| 22 |
# ============================================================
|
| 23 |
|
| 24 |
# -----------------------------
|
|
@@ -53,7 +52,7 @@ TILE_NAMES = {
|
|
| 53 |
TELE: "Teleporter",
|
| 54 |
}
|
| 55 |
|
| 56 |
-
# Palette (
|
| 57 |
SKY = np.array([14, 16, 26], dtype=np.uint8)
|
| 58 |
FLOOR_NEAR = np.array([24, 26, 40], dtype=np.uint8)
|
| 59 |
FLOOR_FAR = np.array([10, 11, 18], dtype=np.uint8)
|
|
@@ -70,8 +69,6 @@ AGENT_COLORS = {
|
|
| 70 |
# Deterministic RNG helper
|
| 71 |
# -----------------------------
|
| 72 |
def rng_for(seed: int, step: int, stream: int = 0) -> np.random.Generator:
|
| 73 |
-
# Stable stream keyed by (seed, step, stream)
|
| 74 |
-
# Using PCG64 bitgen for reproducibility.
|
| 75 |
mix = (seed * 1_000_003) ^ (step * 9_999_937) ^ (stream * 97_531)
|
| 76 |
return np.random.default_rng(mix & 0xFFFFFFFFFFFFFFFF)
|
| 77 |
|
|
@@ -84,22 +81,22 @@ class Agent:
|
|
| 84 |
x: int
|
| 85 |
y: int
|
| 86 |
ori: int # 0..3
|
| 87 |
-
energy: int = 100
|
| 88 |
|
| 89 |
@dataclass
|
| 90 |
class WorldState:
|
| 91 |
seed: int
|
| 92 |
step: int
|
| 93 |
-
grid: List[List[int]]
|
| 94 |
agents: Dict[str, Agent]
|
| 95 |
-
controlled: str
|
| 96 |
-
pov: str
|
| 97 |
autorun: bool
|
| 98 |
speed_hz: float
|
| 99 |
overlay: bool
|
| 100 |
event_log: List[str]
|
| 101 |
caught: bool
|
| 102 |
-
branches: Dict[str, int]
|
| 103 |
|
| 104 |
@dataclass
|
| 105 |
class Snapshot:
|
|
@@ -119,12 +116,12 @@ def default_grid() -> List[List[int]]:
|
|
| 119 |
g[y][0] = WALL
|
| 120 |
g[y][GRID_W - 1] = WALL
|
| 121 |
|
| 122 |
-
#
|
| 123 |
for x in range(4, 17):
|
| 124 |
g[7][x] = WALL
|
| 125 |
-
g[7][10] = DOOR
|
| 126 |
|
| 127 |
-
#
|
| 128 |
g[3][4] = FOOD
|
| 129 |
g[11][15] = FOOD
|
| 130 |
g[4][14] = NOISE
|
|
@@ -155,23 +152,21 @@ def init_state(seed: int) -> WorldState:
|
|
| 155 |
)
|
| 156 |
|
| 157 |
# -----------------------------
|
| 158 |
-
#
|
| 159 |
# -----------------------------
|
| 160 |
def init_belief() -> Dict[str, np.ndarray]:
|
| 161 |
-
# -1 unknown, else tile id
|
| 162 |
b = {}
|
| 163 |
for name in ["Predator", "Prey", "Scout"]:
|
| 164 |
b[name] = -1 * np.ones((GRID_H, GRID_W), dtype=np.int16)
|
| 165 |
return b
|
| 166 |
|
| 167 |
# -----------------------------
|
| 168 |
-
#
|
| 169 |
# -----------------------------
|
| 170 |
def in_bounds(x: int, y: int) -> bool:
|
| 171 |
return 0 <= x < GRID_W and 0 <= y < GRID_H
|
| 172 |
|
| 173 |
def is_blocking(tile: int) -> bool:
|
| 174 |
-
# door is passable (for drama); wall blocks; tele is passable
|
| 175 |
return tile == WALL
|
| 176 |
|
| 177 |
def move_forward(state: WorldState, a: Agent) -> None:
|
|
@@ -181,17 +176,14 @@ def move_forward(state: WorldState, a: Agent) -> None:
|
|
| 181 |
return
|
| 182 |
if is_blocking(state.grid[ny][nx]):
|
| 183 |
return
|
| 184 |
-
# Door toggle mechanic: if you step onto a door, it becomes empty (door opens)
|
| 185 |
if state.grid[ny][nx] == DOOR:
|
| 186 |
state.grid[ny][nx] = EMPTY
|
| 187 |
state.event_log.append(f"t={state.step}: {a.name} opened a door.")
|
| 188 |
a.x, a.y = nx, ny
|
| 189 |
|
| 190 |
-
# Teleporter: stepping onto TELE sends you to the other TELE (deterministically)
|
| 191 |
if state.grid[ny][nx] == TELE:
|
| 192 |
teles = [(x, y) for y in range(GRID_H) for x in range(GRID_W) if state.grid[y][x] == TELE]
|
| 193 |
if len(teles) >= 2:
|
| 194 |
-
# choose destination as "the other tele" based on sorted list
|
| 195 |
teles_sorted = sorted(teles)
|
| 196 |
idx = teles_sorted.index((nx, ny))
|
| 197 |
dest = teles_sorted[(idx + 1) % len(teles_sorted)]
|
|
@@ -205,10 +197,9 @@ def turn_right(a: Agent) -> None:
|
|
| 205 |
a.ori = (a.ori + 1) % 4
|
| 206 |
|
| 207 |
# -----------------------------
|
| 208 |
-
#
|
| 209 |
# -----------------------------
|
| 210 |
def los_clear(grid: List[List[int]], x0: int, y0: int, x1: int, y1: int) -> bool:
|
| 211 |
-
# Bresenham line-of-sight; walls block
|
| 212 |
dx = abs(x1 - x0)
|
| 213 |
dy = abs(y1 - y0)
|
| 214 |
sx = 1 if x0 < x1 else -1
|
|
@@ -230,15 +221,12 @@ def los_clear(grid: List[List[int]], x0: int, y0: int, x1: int, y1: int) -> bool
|
|
| 230 |
y += sy
|
| 231 |
|
| 232 |
def within_fov(observer: Agent, tx: int, ty: int, fov_deg: float = 78.0) -> bool:
|
| 233 |
-
# vector from observer to target in observer's local frame
|
| 234 |
dx = tx - observer.x
|
| 235 |
dy = ty - observer.y
|
| 236 |
if dx == 0 and dy == 0:
|
| 237 |
return True
|
| 238 |
-
# absolute angle of target
|
| 239 |
angle = math.degrees(math.atan2(dy, dx)) % 360
|
| 240 |
facing = ORI_DEG[observer.ori]
|
| 241 |
-
# smallest signed difference
|
| 242 |
diff = (angle - facing + 540) % 360 - 180
|
| 243 |
return abs(diff) <= (fov_deg / 2)
|
| 244 |
|
|
@@ -248,29 +236,26 @@ def visible(observer: Agent, target: Agent, grid: List[List[int]]) -> bool:
|
|
| 248 |
# -----------------------------
|
| 249 |
# Raycast pseudo-3D render
|
| 250 |
# -----------------------------
|
| 251 |
-
def raycast_view(state: WorldState, observer: Agent
|
| 252 |
-
# Returns RGB uint8 image
|
| 253 |
img = np.zeros((VIEW_H, VIEW_W, 3), dtype=np.uint8)
|
| 254 |
img[:, :] = SKY
|
| 255 |
|
| 256 |
-
# floor gradient
|
| 257 |
for y in range(VIEW_H // 2, VIEW_H):
|
| 258 |
t = (y - VIEW_H // 2) / (VIEW_H // 2 + 1e-6)
|
| 259 |
col = (1 - t) * FLOOR_NEAR + t * FLOOR_FAR
|
| 260 |
img[y, :] = col.astype(np.uint8)
|
| 261 |
|
| 262 |
-
# ray setup
|
| 263 |
fov = math.radians(FOV_DEG)
|
| 264 |
half_fov = fov / 2
|
|
|
|
| 265 |
for rx in range(RAY_W):
|
| 266 |
-
# camera plane: [-1, 1]
|
| 267 |
cam_x = (2 * rx / (RAY_W - 1)) - 1
|
| 268 |
ray_ang = math.radians(ORI_DEG[observer.ori]) + cam_x * half_fov
|
| 269 |
|
| 270 |
-
# DDA-like stepping
|
| 271 |
ox, oy = observer.x + 0.5, observer.y + 0.5
|
| 272 |
sin_a = math.sin(ray_ang)
|
| 273 |
cos_a = math.cos(ray_ang)
|
|
|
|
| 274 |
depth = 0.0
|
| 275 |
hit_side = 0
|
| 276 |
|
|
@@ -283,19 +268,15 @@ def raycast_view(state: WorldState, observer: Agent, belief: Optional[np.ndarray
|
|
| 283 |
|
| 284 |
tile = state.grid[ty][tx]
|
| 285 |
if tile == WALL:
|
| 286 |
-
# side shading based on ray direction
|
| 287 |
-
# crude: if abs(cos)>abs(sin) consider "vertical" else "horizontal"
|
| 288 |
hit_side = 1 if abs(cos_a) > abs(sin_a) else 0
|
| 289 |
break
|
| 290 |
if tile == DOOR:
|
| 291 |
-
# door is semi-visible in world; render thinner by treating as a hit but less dark
|
| 292 |
hit_side = 2
|
| 293 |
break
|
| 294 |
|
| 295 |
-
# project wall slice
|
| 296 |
if depth >= MAX_DEPTH:
|
| 297 |
continue
|
| 298 |
-
|
| 299 |
depth *= math.cos(ray_ang - math.radians(ORI_DEG[observer.ori]))
|
| 300 |
depth = max(depth, 0.001)
|
| 301 |
|
|
@@ -308,30 +289,24 @@ def raycast_view(state: WorldState, observer: Agent, belief: Optional[np.ndarray
|
|
| 308 |
elif hit_side == 1:
|
| 309 |
col = WALL_SIDE.copy()
|
| 310 |
else:
|
| 311 |
-
# door slice
|
| 312 |
col = np.array([180, 210, 255], dtype=np.uint8)
|
| 313 |
|
| 314 |
-
# slight depth dim
|
| 315 |
dim = max(0.25, 1.0 - (depth / MAX_DEPTH))
|
| 316 |
col = (col * dim).astype(np.uint8)
|
| 317 |
|
| 318 |
-
# draw slice (scaled to full VIEW_W)
|
| 319 |
x0 = int(rx * (VIEW_W / RAY_W))
|
| 320 |
x1 = int((rx + 1) * (VIEW_W / RAY_W))
|
| 321 |
img[y0:y1, x0:x1] = col
|
| 322 |
|
| 323 |
-
# overlay: draw "billboards" for visible agents
|
| 324 |
for other_name, other in state.agents.items():
|
| 325 |
if other_name == observer.name:
|
| 326 |
continue
|
| 327 |
if visible(observer, other, state.grid):
|
| 328 |
-
# place billboard at its relative angle
|
| 329 |
dx = other.x - observer.x
|
| 330 |
dy = other.y - observer.y
|
| 331 |
ang = (math.degrees(math.atan2(dy, dx)) % 360)
|
| 332 |
facing = ORI_DEG[observer.ori]
|
| 333 |
diff = (ang - facing + 540) % 360 - 180
|
| 334 |
-
# map diff to screen x
|
| 335 |
sx = int((diff / (FOV_DEG / 2)) * (VIEW_W / 2) + (VIEW_W / 2))
|
| 336 |
dist = math.sqrt(dx * dx + dy * dy)
|
| 337 |
h = int((VIEW_H * 0.65) / max(dist, 0.75))
|
|
@@ -345,7 +320,6 @@ def raycast_view(state: WorldState, observer: Agent, belief: Optional[np.ndarray
|
|
| 345 |
img[y0:y1, x0:x1] = np.array(col, dtype=np.uint8)
|
| 346 |
|
| 347 |
if state.overlay:
|
| 348 |
-
# reticle
|
| 349 |
cx, cy = VIEW_W // 2, VIEW_H // 2
|
| 350 |
img[cy - 1:cy + 2, cx - 10:cx + 10] = np.array([120, 190, 255], dtype=np.uint8)
|
| 351 |
img[cy - 10:cy + 10, cx - 1:cx + 2] = np.array([120, 190, 255], dtype=np.uint8)
|
|
@@ -353,7 +327,7 @@ def raycast_view(state: WorldState, observer: Agent, belief: Optional[np.ndarray
|
|
| 353 |
return img
|
| 354 |
|
| 355 |
# -----------------------------
|
| 356 |
-
# Top-down
|
| 357 |
# -----------------------------
|
| 358 |
def render_topdown(grid: np.ndarray, agents: Dict[str, Agent], title: str, show_agents: bool = True) -> Image.Image:
|
| 359 |
w = grid.shape[1] * TILE
|
|
@@ -361,12 +335,11 @@ def render_topdown(grid: np.ndarray, agents: Dict[str, Agent], title: str, show_
|
|
| 361 |
im = Image.new("RGB", (w, h + 28), (10, 12, 18))
|
| 362 |
draw = ImageDraw.Draw(im)
|
| 363 |
|
| 364 |
-
# tiles
|
| 365 |
for y in range(grid.shape[0]):
|
| 366 |
for x in range(grid.shape[1]):
|
| 367 |
t = int(grid[y, x])
|
| 368 |
if t == -1:
|
| 369 |
-
col = (18, 20, 32)
|
| 370 |
elif t == EMPTY:
|
| 371 |
col = (26, 30, 44)
|
| 372 |
elif t == WALL:
|
|
@@ -385,7 +358,6 @@ def render_topdown(grid: np.ndarray, agents: Dict[str, Agent], title: str, show_
|
|
| 385 |
x0, y0 = x * TILE, y * TILE + 28
|
| 386 |
draw.rectangle([x0, y0, x0 + TILE - 1, y0 + TILE - 1], fill=col)
|
| 387 |
|
| 388 |
-
# grid lines
|
| 389 |
for x in range(grid.shape[1] + 1):
|
| 390 |
xx = x * TILE
|
| 391 |
draw.line([xx, 28, xx, h + 28], fill=(12, 14, 22))
|
|
@@ -393,7 +365,6 @@ def render_topdown(grid: np.ndarray, agents: Dict[str, Agent], title: str, show_
|
|
| 393 |
yy = y * TILE + 28
|
| 394 |
draw.line([0, yy, w, yy], fill=(12, 14, 22))
|
| 395 |
|
| 396 |
-
# agents
|
| 397 |
if show_agents:
|
| 398 |
for name, a in agents.items():
|
| 399 |
cx = a.x * TILE + TILE // 2
|
|
@@ -401,23 +372,19 @@ def render_topdown(grid: np.ndarray, agents: Dict[str, Agent], title: str, show_
|
|
| 401 |
col = AGENT_COLORS.get(name, (220, 220, 220))
|
| 402 |
r = TILE // 3
|
| 403 |
draw.ellipse([cx - r, cy - r, cx + r, cy + r], fill=col)
|
| 404 |
-
# heading tick
|
| 405 |
dx, dy = DIRS[a.ori]
|
| 406 |
draw.line([cx, cy, cx + dx * r, cy + dy * r], fill=(10, 10, 10), width=3)
|
| 407 |
|
| 408 |
-
# title bar
|
| 409 |
draw.rectangle([0, 0, w, 28], fill=(14, 16, 26))
|
| 410 |
draw.text((8, 6), title, fill=(230, 230, 240))
|
| 411 |
-
|
| 412 |
return im
|
| 413 |
|
| 414 |
# -----------------------------
|
| 415 |
-
#
|
| 416 |
# -----------------------------
|
| 417 |
def predator_policy(state: WorldState, step: int) -> str:
|
| 418 |
pred = state.agents["Predator"]
|
| 419 |
prey = state.agents["Prey"]
|
| 420 |
-
# If prey visible, chase: turn toward prey then forward
|
| 421 |
if visible(pred, prey, state.grid):
|
| 422 |
dx = prey.x - pred.x
|
| 423 |
dy = prey.y - pred.y
|
|
@@ -429,28 +396,24 @@ def predator_policy(state: WorldState, step: int) -> str:
|
|
| 429 |
if diff > 10:
|
| 430 |
return "R"
|
| 431 |
return "F"
|
| 432 |
-
# else wander deterministically
|
| 433 |
r = rng_for(state.seed, step, stream=1)
|
| 434 |
return r.choice(["F", "L", "R", "F", "F"])
|
| 435 |
|
| 436 |
def prey_policy(state: WorldState, step: int) -> str:
|
| 437 |
prey = state.agents["Prey"]
|
| 438 |
pred = state.agents["Predator"]
|
| 439 |
-
# If predator visible, flee: turn away then forward
|
| 440 |
if visible(prey, pred, state.grid):
|
| 441 |
dx = pred.x - prey.x
|
| 442 |
dy = pred.y - prey.y
|
| 443 |
ang = (math.degrees(math.atan2(dy, dx)) % 360)
|
| 444 |
facing = ORI_DEG[prey.ori]
|
| 445 |
diff = (ang - facing + 540) % 360 - 180
|
| 446 |
-
# want to face opposite direction: add 180
|
| 447 |
diff_away = ((diff + 180) + 540) % 360 - 180
|
| 448 |
if diff_away < -10:
|
| 449 |
return "L"
|
| 450 |
if diff_away > 10:
|
| 451 |
return "R"
|
| 452 |
return "F"
|
| 453 |
-
# else seek food if adjacent, else wander
|
| 454 |
for turn in [0, -1, 1, 2]:
|
| 455 |
ori = (prey.ori + turn) % 4
|
| 456 |
dx, dy = DIRS[ori]
|
|
@@ -462,36 +425,32 @@ def prey_policy(state: WorldState, step: int) -> str:
|
|
| 462 |
return "L"
|
| 463 |
if turn == 1:
|
| 464 |
return "R"
|
| 465 |
-
return "R"
|
| 466 |
r = rng_for(state.seed, step, stream=2)
|
| 467 |
return r.choice(["F", "L", "R", "F"])
|
| 468 |
|
| 469 |
def scout_policy(state: WorldState, step: int) -> str:
|
| 470 |
-
# Scout tries to keep line-of-sight on predator without colliding
|
| 471 |
scout = state.agents["Scout"]
|
| 472 |
pred = state.agents["Predator"]
|
| 473 |
if los_clear(state.grid, scout.x, scout.y, pred.x, pred.y):
|
| 474 |
-
# orbit-ish: if too close, turn away; else meander
|
| 475 |
dist = abs(scout.x - pred.x) + abs(scout.y - pred.y)
|
| 476 |
if dist <= 3:
|
| 477 |
return "R"
|
| 478 |
r = rng_for(state.seed, step, stream=3)
|
| 479 |
return r.choice(["F", "L", "R", "F"])
|
| 480 |
-
|
| 481 |
-
|
| 482 |
-
|
| 483 |
-
|
| 484 |
-
|
| 485 |
-
|
| 486 |
-
|
| 487 |
-
|
| 488 |
-
|
| 489 |
-
|
| 490 |
-
return "R"
|
| 491 |
-
return "F"
|
| 492 |
|
| 493 |
# -----------------------------
|
| 494 |
-
#
|
| 495 |
# -----------------------------
|
| 496 |
def apply_action(state: WorldState, agent_name: str, action: str) -> None:
|
| 497 |
a = state.agents[agent_name]
|
|
@@ -521,13 +480,10 @@ def tick(state: WorldState, manual_action: Optional[str] = None) -> None:
|
|
| 521 |
if state.caught:
|
| 522 |
return
|
| 523 |
|
| 524 |
-
# Manual action applies to controlled agent first (if provided)
|
| 525 |
if manual_action:
|
| 526 |
apply_action(state, state.controlled, manual_action)
|
| 527 |
|
| 528 |
-
# Autonomy for the others (and for controlled if autorun)
|
| 529 |
step = state.step
|
| 530 |
-
# Controlled agent: if autorun and no manual action this tick, autopilot it
|
| 531 |
if state.autorun and not manual_action:
|
| 532 |
if state.controlled == "Predator":
|
| 533 |
act = predator_policy(state, step)
|
|
@@ -537,7 +493,6 @@ def tick(state: WorldState, manual_action: Optional[str] = None) -> None:
|
|
| 537 |
act = scout_policy(state, step)
|
| 538 |
apply_action(state, state.controlled, act)
|
| 539 |
|
| 540 |
-
# Non-controlled always run their policy each tick
|
| 541 |
for name in ["Predator", "Prey", "Scout"]:
|
| 542 |
if name == state.controlled:
|
| 543 |
continue
|
|
@@ -554,9 +509,9 @@ def tick(state: WorldState, manual_action: Optional[str] = None) -> None:
|
|
| 554 |
state.step += 1
|
| 555 |
|
| 556 |
# -----------------------------
|
| 557 |
-
# History
|
| 558 |
# -----------------------------
|
| 559 |
-
MAX_HISTORY = 3000
|
| 560 |
|
| 561 |
def snapshot_of(state: WorldState) -> Snapshot:
|
| 562 |
return Snapshot(
|
|
@@ -573,17 +528,13 @@ def restore_into(state: WorldState, snap: Snapshot) -> None:
|
|
| 573 |
for k, d in snap.agents.items():
|
| 574 |
state.agents[k] = Agent(**d)
|
| 575 |
state.caught = snap.caught
|
| 576 |
-
# preserve full log, but annotate jump
|
| 577 |
state.event_log.append(f"Jumped to t={snap.step} (rewind).")
|
| 578 |
|
| 579 |
# -----------------------------
|
| 580 |
# Belief updates
|
| 581 |
# -----------------------------
|
| 582 |
def update_belief_for_agent(state: WorldState, belief: np.ndarray, agent: Agent) -> None:
|
| 583 |
-
# Reveal tiles in a cone up to MAX_DEPTH using simple ray sampling
|
| 584 |
-
# plus always reveal own tile
|
| 585 |
belief[agent.y, agent.x] = state.grid[agent.y][agent.x]
|
| 586 |
-
|
| 587 |
base = math.radians(ORI_DEG[agent.ori])
|
| 588 |
half = math.radians(FOV_DEG / 2)
|
| 589 |
rays = 33 if agent.name != "Scout" else 45
|
|
@@ -605,29 +556,24 @@ def update_belief_for_agent(state: WorldState, belief: np.ndarray, agent: Agent)
|
|
| 605 |
break
|
| 606 |
|
| 607 |
# -----------------------------
|
| 608 |
-
# UI
|
| 609 |
# -----------------------------
|
| 610 |
def build_views(state: WorldState, beliefs: Dict[str, np.ndarray]) -> Tuple[np.ndarray, Image.Image, Image.Image, Image.Image, str, str]:
|
| 611 |
pov_agent = state.agents[state.pov]
|
| 612 |
|
| 613 |
-
# Update beliefs each frame (deterministic, based on current truth)
|
| 614 |
for name, a in state.agents.items():
|
| 615 |
update_belief_for_agent(state, beliefs[name], a)
|
| 616 |
|
| 617 |
-
# POV raycast
|
| 618 |
pov_img = raycast_view(state, pov_agent)
|
| 619 |
|
| 620 |
-
# Truth map
|
| 621 |
truth_np = np.array(state.grid, dtype=np.int16)
|
| 622 |
truth_img = render_topdown(truth_np, state.agents, f"Truth Map — t={state.step} seed={state.seed}", show_agents=True)
|
| 623 |
|
| 624 |
-
# Belief maps (two most interesting: controlled + other)
|
| 625 |
ctrl = state.controlled
|
| 626 |
other = "Prey" if ctrl == "Predator" else "Predator"
|
| 627 |
ctrl_img = render_topdown(beliefs[ctrl], state.agents, f"{ctrl} Belief (Fog-of-War)", show_agents=True)
|
| 628 |
other_img = render_topdown(beliefs[other], state.agents, f"{other} Belief (Fog-of-War)", show_agents=True)
|
| 629 |
|
| 630 |
-
# Status + log
|
| 631 |
pred = state.agents["Predator"]
|
| 632 |
prey = state.agents["Prey"]
|
| 633 |
scout = state.agents["Scout"]
|
|
@@ -644,7 +590,6 @@ def build_views(state: WorldState, beliefs: Dict[str, np.ndarray]) -> Tuple[np.n
|
|
| 644 |
return pov_img, truth_img, ctrl_img, other_img, status, log
|
| 645 |
|
| 646 |
def grid_click_to_tile(evt: gr.SelectData, selected_tile: int, state: WorldState) -> WorldState:
|
| 647 |
-
# evt.index is pixel coords (x,y) on truth image; our truth image has 28px title bar
|
| 648 |
x_px, y_px = evt.index
|
| 649 |
y_px = y_px - 28
|
| 650 |
if y_px < 0:
|
|
@@ -653,11 +598,8 @@ def grid_click_to_tile(evt: gr.SelectData, selected_tile: int, state: WorldState
|
|
| 653 |
gy = int(y_px // TILE)
|
| 654 |
if not in_bounds(gx, gy):
|
| 655 |
return state
|
| 656 |
-
|
| 657 |
-
# Protect borders from accidental deletion (optional)
|
| 658 |
if gx == 0 or gy == 0 or gx == GRID_W - 1 or gy == GRID_H - 1:
|
| 659 |
return state
|
| 660 |
-
|
| 661 |
state.grid[gy][gx] = selected_tile
|
| 662 |
state.event_log.append(f"t={state.step}: Edited tile ({gx},{gy}) -> {TILE_NAMES.get(selected_tile, selected_tile)}.")
|
| 663 |
return state
|
|
@@ -686,18 +628,14 @@ def import_run(txt: str) -> Tuple[WorldState, List[Snapshot], Dict[str, np.ndarr
|
|
| 686 |
st.overlay = bool(data.get("overlay", False))
|
| 687 |
st.branches = dict(data.get("branches", {"main": 0}))
|
| 688 |
|
| 689 |
-
|
| 690 |
-
|
| 691 |
-
history.append(Snapshot(**s))
|
| 692 |
-
|
| 693 |
-
beliefs = init_belief()
|
| 694 |
-
rewind_idx = min(len(history) - 1, len(history) - 1 if history else 0)
|
| 695 |
-
|
| 696 |
-
if history:
|
| 697 |
-
restore_into(st, history[-1])
|
| 698 |
|
|
|
|
|
|
|
|
|
|
| 699 |
st.event_log.append("Imported run.")
|
| 700 |
-
return st,
|
| 701 |
|
| 702 |
# -----------------------------
|
| 703 |
# Gradio app
|
|
@@ -705,13 +643,12 @@ def import_run(txt: str) -> Tuple[WorldState, List[Snapshot], Dict[str, np.ndarr
|
|
| 705 |
with gr.Blocks(title="ChronoSandbox — Agent Timeline Lab") as demo:
|
| 706 |
gr.Markdown(
|
| 707 |
"## ChronoSandbox — Agent Timeline Lab\n"
|
| 708 |
-
"Deterministic multi-agent POV sandbox with **time dilation, rewind, and branching
|
| 709 |
-
"
|
| 710 |
)
|
| 711 |
|
| 712 |
-
# Persistent state
|
| 713 |
st = gr.State(init_state(seed=1337))
|
| 714 |
-
history = gr.State([snapshot_of(init_state(seed=1337))])
|
| 715 |
beliefs = gr.State(init_belief())
|
| 716 |
rewind_index = gr.State(0)
|
| 717 |
|
|
@@ -758,12 +695,11 @@ with gr.Blocks(title="ChronoSandbox — Agent Timeline Lab") as demo:
|
|
| 758 |
import_box = gr.Textbox(label="Import JSON", lines=10)
|
| 759 |
btn_import = gr.Button("Import Run")
|
| 760 |
|
| 761 |
-
timer = gr.Timer(0.12)
|
| 762 |
|
| 763 |
def refresh(state: WorldState, hist: List[Snapshot], bel: Dict[str, np.ndarray], r_idx: int):
|
| 764 |
-
# clamp rewind slider max
|
| 765 |
r_max = max(0, len(hist) - 1)
|
| 766 |
-
r_idx = max(0, min(r_idx, r_max))
|
| 767 |
pov_np, truth_im, a_im, b_im, stxt, ltxt = build_views(state, bel)
|
| 768 |
return (
|
| 769 |
pov_np,
|
|
@@ -815,8 +751,7 @@ with gr.Blocks(title="ChronoSandbox — Agent Timeline Lab") as demo:
|
|
| 815 |
def jump_fn(state: WorldState, hist: List[Snapshot], bel: Dict[str, np.ndarray], r_idx: int, idx: int):
|
| 816 |
if not hist:
|
| 817 |
return refresh(state, hist, bel, r_idx) + (state, hist, bel, r_idx)
|
| 818 |
-
idx = int(idx)
|
| 819 |
-
idx = max(0, min(idx, len(hist) - 1))
|
| 820 |
restore_into(state, hist[idx])
|
| 821 |
r_idx = idx
|
| 822 |
return refresh(state, hist, bel, r_idx) + (state, hist, bel, r_idx)
|
|
@@ -827,8 +762,7 @@ with gr.Blocks(title="ChronoSandbox — Agent Timeline Lab") as demo:
|
|
| 827 |
state.event_log.append(f"t={state.step}: Branched timeline '{nm}' at history idx={r_idx}.")
|
| 828 |
return refresh(state, hist, bel, r_idx) + (state, hist, bel, r_idx)
|
| 829 |
|
| 830 |
-
def truth_click(
|
| 831 |
-
# apply edit, snapshot after edit
|
| 832 |
state = grid_click_to_tile(evt, int(tile), state)
|
| 833 |
hist.append(snapshot_of(state))
|
| 834 |
if len(hist) > MAX_HISTORY:
|
|
@@ -841,7 +775,6 @@ with gr.Blocks(title="ChronoSandbox — Agent Timeline Lab") as demo:
|
|
| 841 |
|
| 842 |
def import_fn(txt: str):
|
| 843 |
state, hist, bel, r_idx = import_run(txt)
|
| 844 |
-
# refresh outputs + return states
|
| 845 |
pov_np, truth_im, a_im, b_im, stxt, ltxt = build_views(state, bel)
|
| 846 |
r_max = max(0, len(hist) - 1)
|
| 847 |
return (
|
|
@@ -850,27 +783,95 @@ with gr.Blocks(title="ChronoSandbox — Agent Timeline Lab") as demo:
|
|
| 850 |
state, hist, bel, r_idx
|
| 851 |
)
|
| 852 |
|
| 853 |
-
#
|
| 854 |
-
btn_L.click(
|
| 855 |
-
|
| 856 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 857 |
|
| 858 |
-
btn_step.click(
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 859 |
|
| 860 |
-
toggle_control.click(
|
| 861 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 862 |
|
| 863 |
-
autorun.change(
|
| 864 |
-
|
| 865 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 866 |
|
| 867 |
-
btn_jump.click(
|
| 868 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 869 |
|
| 870 |
-
truth.select(
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 871 |
|
| 872 |
btn_export.click(export_fn, inputs=[st, history], outputs=[export_box], queue=True)
|
| 873 |
-
btn_import.click(
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 874 |
|
| 875 |
# Timer-driven autorun
|
| 876 |
def timer_fn(state: WorldState, hist: List[Snapshot], bel: Dict[str, np.ndarray], r_idx: int, ar: bool, sp: float):
|
|
@@ -880,8 +881,6 @@ with gr.Blocks(title="ChronoSandbox — Agent Timeline Lab") as demo:
|
|
| 880 |
if not state.autorun or state.caught:
|
| 881 |
return refresh(state, hist, bel, r_idx) + (state, hist, bel, r_idx)
|
| 882 |
|
| 883 |
-
# How many sim ticks per UI frame?
|
| 884 |
-
# timer runs ~8.33 Hz (0.12s). We convert desired Hz to ticks per frame.
|
| 885 |
ticks_per_frame = max(1, int(round(state.speed_hz * 0.12)))
|
| 886 |
for _ in range(ticks_per_frame):
|
| 887 |
tick(state, manual_action=None)
|
|
@@ -899,7 +898,11 @@ with gr.Blocks(title="ChronoSandbox — Agent Timeline Lab") as demo:
|
|
| 899 |
queue=True
|
| 900 |
)
|
| 901 |
|
| 902 |
-
|
| 903 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
| 904 |
|
| 905 |
demo.queue().launch()
|
|
|
|
| 1 |
import json
|
| 2 |
import math
|
|
|
|
| 3 |
from dataclasses import dataclass, asdict
|
| 4 |
from typing import Dict, List, Tuple, Optional
|
| 5 |
|
| 6 |
import numpy as np
|
| 7 |
+
from PIL import Image, ImageDraw
|
| 8 |
|
| 9 |
import gradio as gr
|
| 10 |
|
|
|
|
| 17 |
# - Branching timelines (fork from any previous step)
|
| 18 |
# - Click-to-edit map tiles
|
| 19 |
#
|
| 20 |
+
# Compatible with older Gradio versions by avoiding fn_kwargs in .click()
|
| 21 |
# ============================================================
|
| 22 |
|
| 23 |
# -----------------------------
|
|
|
|
| 52 |
TELE: "Teleporter",
|
| 53 |
}
|
| 54 |
|
| 55 |
+
# Palette (simple + inspectable)
|
| 56 |
SKY = np.array([14, 16, 26], dtype=np.uint8)
|
| 57 |
FLOOR_NEAR = np.array([24, 26, 40], dtype=np.uint8)
|
| 58 |
FLOOR_FAR = np.array([10, 11, 18], dtype=np.uint8)
|
|
|
|
| 69 |
# Deterministic RNG helper
|
| 70 |
# -----------------------------
|
| 71 |
def rng_for(seed: int, step: int, stream: int = 0) -> np.random.Generator:
|
|
|
|
|
|
|
| 72 |
mix = (seed * 1_000_003) ^ (step * 9_999_937) ^ (stream * 97_531)
|
| 73 |
return np.random.default_rng(mix & 0xFFFFFFFFFFFFFFFF)
|
| 74 |
|
|
|
|
| 81 |
x: int
|
| 82 |
y: int
|
| 83 |
ori: int # 0..3
|
| 84 |
+
energy: int = 100
|
| 85 |
|
| 86 |
@dataclass
|
| 87 |
class WorldState:
|
| 88 |
seed: int
|
| 89 |
step: int
|
| 90 |
+
grid: List[List[int]]
|
| 91 |
agents: Dict[str, Agent]
|
| 92 |
+
controlled: str
|
| 93 |
+
pov: str
|
| 94 |
autorun: bool
|
| 95 |
speed_hz: float
|
| 96 |
overlay: bool
|
| 97 |
event_log: List[str]
|
| 98 |
caught: bool
|
| 99 |
+
branches: Dict[str, int]
|
| 100 |
|
| 101 |
@dataclass
|
| 102 |
class Snapshot:
|
|
|
|
| 116 |
g[y][0] = WALL
|
| 117 |
g[y][GRID_W - 1] = WALL
|
| 118 |
|
| 119 |
+
# Interior structure
|
| 120 |
for x in range(4, 17):
|
| 121 |
g[7][x] = WALL
|
| 122 |
+
g[7][10] = DOOR
|
| 123 |
|
| 124 |
+
# Items
|
| 125 |
g[3][4] = FOOD
|
| 126 |
g[11][15] = FOOD
|
| 127 |
g[4][14] = NOISE
|
|
|
|
| 152 |
)
|
| 153 |
|
| 154 |
# -----------------------------
|
| 155 |
+
# Belief memory
|
| 156 |
# -----------------------------
|
| 157 |
def init_belief() -> Dict[str, np.ndarray]:
|
|
|
|
| 158 |
b = {}
|
| 159 |
for name in ["Predator", "Prey", "Scout"]:
|
| 160 |
b[name] = -1 * np.ones((GRID_H, GRID_W), dtype=np.int16)
|
| 161 |
return b
|
| 162 |
|
| 163 |
# -----------------------------
|
| 164 |
+
# Movement + collision
|
| 165 |
# -----------------------------
|
| 166 |
def in_bounds(x: int, y: int) -> bool:
|
| 167 |
return 0 <= x < GRID_W and 0 <= y < GRID_H
|
| 168 |
|
| 169 |
def is_blocking(tile: int) -> bool:
|
|
|
|
| 170 |
return tile == WALL
|
| 171 |
|
| 172 |
def move_forward(state: WorldState, a: Agent) -> None:
|
|
|
|
| 176 |
return
|
| 177 |
if is_blocking(state.grid[ny][nx]):
|
| 178 |
return
|
|
|
|
| 179 |
if state.grid[ny][nx] == DOOR:
|
| 180 |
state.grid[ny][nx] = EMPTY
|
| 181 |
state.event_log.append(f"t={state.step}: {a.name} opened a door.")
|
| 182 |
a.x, a.y = nx, ny
|
| 183 |
|
|
|
|
| 184 |
if state.grid[ny][nx] == TELE:
|
| 185 |
teles = [(x, y) for y in range(GRID_H) for x in range(GRID_W) if state.grid[y][x] == TELE]
|
| 186 |
if len(teles) >= 2:
|
|
|
|
| 187 |
teles_sorted = sorted(teles)
|
| 188 |
idx = teles_sorted.index((nx, ny))
|
| 189 |
dest = teles_sorted[(idx + 1) % len(teles_sorted)]
|
|
|
|
| 197 |
a.ori = (a.ori + 1) % 4
|
| 198 |
|
| 199 |
# -----------------------------
|
| 200 |
+
# LOS + FOV visibility
|
| 201 |
# -----------------------------
|
| 202 |
def los_clear(grid: List[List[int]], x0: int, y0: int, x1: int, y1: int) -> bool:
|
|
|
|
| 203 |
dx = abs(x1 - x0)
|
| 204 |
dy = abs(y1 - y0)
|
| 205 |
sx = 1 if x0 < x1 else -1
|
|
|
|
| 221 |
y += sy
|
| 222 |
|
| 223 |
def within_fov(observer: Agent, tx: int, ty: int, fov_deg: float = 78.0) -> bool:
|
|
|
|
| 224 |
dx = tx - observer.x
|
| 225 |
dy = ty - observer.y
|
| 226 |
if dx == 0 and dy == 0:
|
| 227 |
return True
|
|
|
|
| 228 |
angle = math.degrees(math.atan2(dy, dx)) % 360
|
| 229 |
facing = ORI_DEG[observer.ori]
|
|
|
|
| 230 |
diff = (angle - facing + 540) % 360 - 180
|
| 231 |
return abs(diff) <= (fov_deg / 2)
|
| 232 |
|
|
|
|
| 236 |
# -----------------------------
|
| 237 |
# Raycast pseudo-3D render
|
| 238 |
# -----------------------------
|
| 239 |
+
def raycast_view(state: WorldState, observer: Agent) -> np.ndarray:
|
|
|
|
| 240 |
img = np.zeros((VIEW_H, VIEW_W, 3), dtype=np.uint8)
|
| 241 |
img[:, :] = SKY
|
| 242 |
|
|
|
|
| 243 |
for y in range(VIEW_H // 2, VIEW_H):
|
| 244 |
t = (y - VIEW_H // 2) / (VIEW_H // 2 + 1e-6)
|
| 245 |
col = (1 - t) * FLOOR_NEAR + t * FLOOR_FAR
|
| 246 |
img[y, :] = col.astype(np.uint8)
|
| 247 |
|
|
|
|
| 248 |
fov = math.radians(FOV_DEG)
|
| 249 |
half_fov = fov / 2
|
| 250 |
+
|
| 251 |
for rx in range(RAY_W):
|
|
|
|
| 252 |
cam_x = (2 * rx / (RAY_W - 1)) - 1
|
| 253 |
ray_ang = math.radians(ORI_DEG[observer.ori]) + cam_x * half_fov
|
| 254 |
|
|
|
|
| 255 |
ox, oy = observer.x + 0.5, observer.y + 0.5
|
| 256 |
sin_a = math.sin(ray_ang)
|
| 257 |
cos_a = math.cos(ray_ang)
|
| 258 |
+
|
| 259 |
depth = 0.0
|
| 260 |
hit_side = 0
|
| 261 |
|
|
|
|
| 268 |
|
| 269 |
tile = state.grid[ty][tx]
|
| 270 |
if tile == WALL:
|
|
|
|
|
|
|
| 271 |
hit_side = 1 if abs(cos_a) > abs(sin_a) else 0
|
| 272 |
break
|
| 273 |
if tile == DOOR:
|
|
|
|
| 274 |
hit_side = 2
|
| 275 |
break
|
| 276 |
|
|
|
|
| 277 |
if depth >= MAX_DEPTH:
|
| 278 |
continue
|
| 279 |
+
|
| 280 |
depth *= math.cos(ray_ang - math.radians(ORI_DEG[observer.ori]))
|
| 281 |
depth = max(depth, 0.001)
|
| 282 |
|
|
|
|
| 289 |
elif hit_side == 1:
|
| 290 |
col = WALL_SIDE.copy()
|
| 291 |
else:
|
|
|
|
| 292 |
col = np.array([180, 210, 255], dtype=np.uint8)
|
| 293 |
|
|
|
|
| 294 |
dim = max(0.25, 1.0 - (depth / MAX_DEPTH))
|
| 295 |
col = (col * dim).astype(np.uint8)
|
| 296 |
|
|
|
|
| 297 |
x0 = int(rx * (VIEW_W / RAY_W))
|
| 298 |
x1 = int((rx + 1) * (VIEW_W / RAY_W))
|
| 299 |
img[y0:y1, x0:x1] = col
|
| 300 |
|
|
|
|
| 301 |
for other_name, other in state.agents.items():
|
| 302 |
if other_name == observer.name:
|
| 303 |
continue
|
| 304 |
if visible(observer, other, state.grid):
|
|
|
|
| 305 |
dx = other.x - observer.x
|
| 306 |
dy = other.y - observer.y
|
| 307 |
ang = (math.degrees(math.atan2(dy, dx)) % 360)
|
| 308 |
facing = ORI_DEG[observer.ori]
|
| 309 |
diff = (ang - facing + 540) % 360 - 180
|
|
|
|
| 310 |
sx = int((diff / (FOV_DEG / 2)) * (VIEW_W / 2) + (VIEW_W / 2))
|
| 311 |
dist = math.sqrt(dx * dx + dy * dy)
|
| 312 |
h = int((VIEW_H * 0.65) / max(dist, 0.75))
|
|
|
|
| 320 |
img[y0:y1, x0:x1] = np.array(col, dtype=np.uint8)
|
| 321 |
|
| 322 |
if state.overlay:
|
|
|
|
| 323 |
cx, cy = VIEW_W // 2, VIEW_H // 2
|
| 324 |
img[cy - 1:cy + 2, cx - 10:cx + 10] = np.array([120, 190, 255], dtype=np.uint8)
|
| 325 |
img[cy - 10:cy + 10, cx - 1:cx + 2] = np.array([120, 190, 255], dtype=np.uint8)
|
|
|
|
| 327 |
return img
|
| 328 |
|
| 329 |
# -----------------------------
|
| 330 |
+
# Top-down render
|
| 331 |
# -----------------------------
|
| 332 |
def render_topdown(grid: np.ndarray, agents: Dict[str, Agent], title: str, show_agents: bool = True) -> Image.Image:
|
| 333 |
w = grid.shape[1] * TILE
|
|
|
|
| 335 |
im = Image.new("RGB", (w, h + 28), (10, 12, 18))
|
| 336 |
draw = ImageDraw.Draw(im)
|
| 337 |
|
|
|
|
| 338 |
for y in range(grid.shape[0]):
|
| 339 |
for x in range(grid.shape[1]):
|
| 340 |
t = int(grid[y, x])
|
| 341 |
if t == -1:
|
| 342 |
+
col = (18, 20, 32)
|
| 343 |
elif t == EMPTY:
|
| 344 |
col = (26, 30, 44)
|
| 345 |
elif t == WALL:
|
|
|
|
| 358 |
x0, y0 = x * TILE, y * TILE + 28
|
| 359 |
draw.rectangle([x0, y0, x0 + TILE - 1, y0 + TILE - 1], fill=col)
|
| 360 |
|
|
|
|
| 361 |
for x in range(grid.shape[1] + 1):
|
| 362 |
xx = x * TILE
|
| 363 |
draw.line([xx, 28, xx, h + 28], fill=(12, 14, 22))
|
|
|
|
| 365 |
yy = y * TILE + 28
|
| 366 |
draw.line([0, yy, w, yy], fill=(12, 14, 22))
|
| 367 |
|
|
|
|
| 368 |
if show_agents:
|
| 369 |
for name, a in agents.items():
|
| 370 |
cx = a.x * TILE + TILE // 2
|
|
|
|
| 372 |
col = AGENT_COLORS.get(name, (220, 220, 220))
|
| 373 |
r = TILE // 3
|
| 374 |
draw.ellipse([cx - r, cy - r, cx + r, cy + r], fill=col)
|
|
|
|
| 375 |
dx, dy = DIRS[a.ori]
|
| 376 |
draw.line([cx, cy, cx + dx * r, cy + dy * r], fill=(10, 10, 10), width=3)
|
| 377 |
|
|
|
|
| 378 |
draw.rectangle([0, 0, w, 28], fill=(14, 16, 26))
|
| 379 |
draw.text((8, 6), title, fill=(230, 230, 240))
|
|
|
|
| 380 |
return im
|
| 381 |
|
| 382 |
# -----------------------------
|
| 383 |
+
# Policies (explicit + deterministic)
|
| 384 |
# -----------------------------
|
| 385 |
def predator_policy(state: WorldState, step: int) -> str:
|
| 386 |
pred = state.agents["Predator"]
|
| 387 |
prey = state.agents["Prey"]
|
|
|
|
| 388 |
if visible(pred, prey, state.grid):
|
| 389 |
dx = prey.x - pred.x
|
| 390 |
dy = prey.y - pred.y
|
|
|
|
| 396 |
if diff > 10:
|
| 397 |
return "R"
|
| 398 |
return "F"
|
|
|
|
| 399 |
r = rng_for(state.seed, step, stream=1)
|
| 400 |
return r.choice(["F", "L", "R", "F", "F"])
|
| 401 |
|
| 402 |
def prey_policy(state: WorldState, step: int) -> str:
|
| 403 |
prey = state.agents["Prey"]
|
| 404 |
pred = state.agents["Predator"]
|
|
|
|
| 405 |
if visible(prey, pred, state.grid):
|
| 406 |
dx = pred.x - prey.x
|
| 407 |
dy = pred.y - prey.y
|
| 408 |
ang = (math.degrees(math.atan2(dy, dx)) % 360)
|
| 409 |
facing = ORI_DEG[prey.ori]
|
| 410 |
diff = (ang - facing + 540) % 360 - 180
|
|
|
|
| 411 |
diff_away = ((diff + 180) + 540) % 360 - 180
|
| 412 |
if diff_away < -10:
|
| 413 |
return "L"
|
| 414 |
if diff_away > 10:
|
| 415 |
return "R"
|
| 416 |
return "F"
|
|
|
|
| 417 |
for turn in [0, -1, 1, 2]:
|
| 418 |
ori = (prey.ori + turn) % 4
|
| 419 |
dx, dy = DIRS[ori]
|
|
|
|
| 425 |
return "L"
|
| 426 |
if turn == 1:
|
| 427 |
return "R"
|
| 428 |
+
return "R"
|
| 429 |
r = rng_for(state.seed, step, stream=2)
|
| 430 |
return r.choice(["F", "L", "R", "F"])
|
| 431 |
|
| 432 |
def scout_policy(state: WorldState, step: int) -> str:
|
|
|
|
| 433 |
scout = state.agents["Scout"]
|
| 434 |
pred = state.agents["Predator"]
|
| 435 |
if los_clear(state.grid, scout.x, scout.y, pred.x, pred.y):
|
|
|
|
| 436 |
dist = abs(scout.x - pred.x) + abs(scout.y - pred.y)
|
| 437 |
if dist <= 3:
|
| 438 |
return "R"
|
| 439 |
r = rng_for(state.seed, step, stream=3)
|
| 440 |
return r.choice(["F", "L", "R", "F"])
|
| 441 |
+
dx = pred.x - scout.x
|
| 442 |
+
dy = pred.y - scout.y
|
| 443 |
+
ang = (math.degrees(math.atan2(dy, dx)) % 360)
|
| 444 |
+
facing = ORI_DEG[scout.ori]
|
| 445 |
+
diff = (ang - facing + 540) % 360 - 180
|
| 446 |
+
if diff < -10:
|
| 447 |
+
return "L"
|
| 448 |
+
if diff > 10:
|
| 449 |
+
return "R"
|
| 450 |
+
return "F"
|
|
|
|
|
|
|
| 451 |
|
| 452 |
# -----------------------------
|
| 453 |
+
# Simulation step
|
| 454 |
# -----------------------------
|
| 455 |
def apply_action(state: WorldState, agent_name: str, action: str) -> None:
|
| 456 |
a = state.agents[agent_name]
|
|
|
|
| 480 |
if state.caught:
|
| 481 |
return
|
| 482 |
|
|
|
|
| 483 |
if manual_action:
|
| 484 |
apply_action(state, state.controlled, manual_action)
|
| 485 |
|
|
|
|
| 486 |
step = state.step
|
|
|
|
| 487 |
if state.autorun and not manual_action:
|
| 488 |
if state.controlled == "Predator":
|
| 489 |
act = predator_policy(state, step)
|
|
|
|
| 493 |
act = scout_policy(state, step)
|
| 494 |
apply_action(state, state.controlled, act)
|
| 495 |
|
|
|
|
| 496 |
for name in ["Predator", "Prey", "Scout"]:
|
| 497 |
if name == state.controlled:
|
| 498 |
continue
|
|
|
|
| 509 |
state.step += 1
|
| 510 |
|
| 511 |
# -----------------------------
|
| 512 |
+
# History
|
| 513 |
# -----------------------------
|
| 514 |
+
MAX_HISTORY = 3000
|
| 515 |
|
| 516 |
def snapshot_of(state: WorldState) -> Snapshot:
|
| 517 |
return Snapshot(
|
|
|
|
| 528 |
for k, d in snap.agents.items():
|
| 529 |
state.agents[k] = Agent(**d)
|
| 530 |
state.caught = snap.caught
|
|
|
|
| 531 |
state.event_log.append(f"Jumped to t={snap.step} (rewind).")
|
| 532 |
|
| 533 |
# -----------------------------
|
| 534 |
# Belief updates
|
| 535 |
# -----------------------------
|
| 536 |
def update_belief_for_agent(state: WorldState, belief: np.ndarray, agent: Agent) -> None:
|
|
|
|
|
|
|
| 537 |
belief[agent.y, agent.x] = state.grid[agent.y][agent.x]
|
|
|
|
| 538 |
base = math.radians(ORI_DEG[agent.ori])
|
| 539 |
half = math.radians(FOV_DEG / 2)
|
| 540 |
rays = 33 if agent.name != "Scout" else 45
|
|
|
|
| 556 |
break
|
| 557 |
|
| 558 |
# -----------------------------
|
| 559 |
+
# Views + UI helpers
|
| 560 |
# -----------------------------
|
| 561 |
def build_views(state: WorldState, beliefs: Dict[str, np.ndarray]) -> Tuple[np.ndarray, Image.Image, Image.Image, Image.Image, str, str]:
|
| 562 |
pov_agent = state.agents[state.pov]
|
| 563 |
|
|
|
|
| 564 |
for name, a in state.agents.items():
|
| 565 |
update_belief_for_agent(state, beliefs[name], a)
|
| 566 |
|
|
|
|
| 567 |
pov_img = raycast_view(state, pov_agent)
|
| 568 |
|
|
|
|
| 569 |
truth_np = np.array(state.grid, dtype=np.int16)
|
| 570 |
truth_img = render_topdown(truth_np, state.agents, f"Truth Map — t={state.step} seed={state.seed}", show_agents=True)
|
| 571 |
|
|
|
|
| 572 |
ctrl = state.controlled
|
| 573 |
other = "Prey" if ctrl == "Predator" else "Predator"
|
| 574 |
ctrl_img = render_topdown(beliefs[ctrl], state.agents, f"{ctrl} Belief (Fog-of-War)", show_agents=True)
|
| 575 |
other_img = render_topdown(beliefs[other], state.agents, f"{other} Belief (Fog-of-War)", show_agents=True)
|
| 576 |
|
|
|
|
| 577 |
pred = state.agents["Predator"]
|
| 578 |
prey = state.agents["Prey"]
|
| 579 |
scout = state.agents["Scout"]
|
|
|
|
| 590 |
return pov_img, truth_img, ctrl_img, other_img, status, log
|
| 591 |
|
| 592 |
def grid_click_to_tile(evt: gr.SelectData, selected_tile: int, state: WorldState) -> WorldState:
|
|
|
|
| 593 |
x_px, y_px = evt.index
|
| 594 |
y_px = y_px - 28
|
| 595 |
if y_px < 0:
|
|
|
|
| 598 |
gy = int(y_px // TILE)
|
| 599 |
if not in_bounds(gx, gy):
|
| 600 |
return state
|
|
|
|
|
|
|
| 601 |
if gx == 0 or gy == 0 or gx == GRID_W - 1 or gy == GRID_H - 1:
|
| 602 |
return state
|
|
|
|
| 603 |
state.grid[gy][gx] = selected_tile
|
| 604 |
state.event_log.append(f"t={state.step}: Edited tile ({gx},{gy}) -> {TILE_NAMES.get(selected_tile, selected_tile)}.")
|
| 605 |
return state
|
|
|
|
| 628 |
st.overlay = bool(data.get("overlay", False))
|
| 629 |
st.branches = dict(data.get("branches", {"main": 0}))
|
| 630 |
|
| 631 |
+
hist = [Snapshot(**s) for s in data.get("history", [])]
|
| 632 |
+
bel = init_belief()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 633 |
|
| 634 |
+
r_idx = min(len(hist) - 1, len(hist) - 1 if hist else 0)
|
| 635 |
+
if hist:
|
| 636 |
+
restore_into(st, hist[-1])
|
| 637 |
st.event_log.append("Imported run.")
|
| 638 |
+
return st, hist, bel, r_idx
|
| 639 |
|
| 640 |
# -----------------------------
|
| 641 |
# Gradio app
|
|
|
|
| 643 |
with gr.Blocks(title="ChronoSandbox — Agent Timeline Lab") as demo:
|
| 644 |
gr.Markdown(
|
| 645 |
"## ChronoSandbox — Agent Timeline Lab\n"
|
| 646 |
+
"Deterministic multi-agent POV sandbox with **time dilation, rewind, and branching**.\n"
|
| 647 |
+
"Explicit rules, replayable runs."
|
| 648 |
)
|
| 649 |
|
|
|
|
| 650 |
st = gr.State(init_state(seed=1337))
|
| 651 |
+
history = gr.State([snapshot_of(init_state(seed=1337))])
|
| 652 |
beliefs = gr.State(init_belief())
|
| 653 |
rewind_index = gr.State(0)
|
| 654 |
|
|
|
|
| 695 |
import_box = gr.Textbox(label="Import JSON", lines=10)
|
| 696 |
btn_import = gr.Button("Import Run")
|
| 697 |
|
| 698 |
+
timer = gr.Timer(0.12)
|
| 699 |
|
| 700 |
def refresh(state: WorldState, hist: List[Snapshot], bel: Dict[str, np.ndarray], r_idx: int):
|
|
|
|
| 701 |
r_max = max(0, len(hist) - 1)
|
| 702 |
+
r_idx = max(0, min(int(r_idx), r_max))
|
| 703 |
pov_np, truth_im, a_im, b_im, stxt, ltxt = build_views(state, bel)
|
| 704 |
return (
|
| 705 |
pov_np,
|
|
|
|
| 751 |
def jump_fn(state: WorldState, hist: List[Snapshot], bel: Dict[str, np.ndarray], r_idx: int, idx: int):
|
| 752 |
if not hist:
|
| 753 |
return refresh(state, hist, bel, r_idx) + (state, hist, bel, r_idx)
|
| 754 |
+
idx = max(0, min(int(idx), len(hist) - 1))
|
|
|
|
| 755 |
restore_into(state, hist[idx])
|
| 756 |
r_idx = idx
|
| 757 |
return refresh(state, hist, bel, r_idx) + (state, hist, bel, r_idx)
|
|
|
|
| 762 |
state.event_log.append(f"t={state.step}: Branched timeline '{nm}' at history idx={r_idx}.")
|
| 763 |
return refresh(state, hist, bel, r_idx) + (state, hist, bel, r_idx)
|
| 764 |
|
| 765 |
+
def truth_click(tile: int, state: WorldState, hist: List[Snapshot], bel: Dict[str, np.ndarray], r_idx: int, evt: gr.SelectData):
|
|
|
|
| 766 |
state = grid_click_to_tile(evt, int(tile), state)
|
| 767 |
hist.append(snapshot_of(state))
|
| 768 |
if len(hist) > MAX_HISTORY:
|
|
|
|
| 775 |
|
| 776 |
def import_fn(txt: str):
|
| 777 |
state, hist, bel, r_idx = import_run(txt)
|
|
|
|
| 778 |
pov_np, truth_im, a_im, b_im, stxt, ltxt = build_views(state, bel)
|
| 779 |
r_max = max(0, len(hist) - 1)
|
| 780 |
return (
|
|
|
|
| 783 |
state, hist, bel, r_idx
|
| 784 |
)
|
| 785 |
|
| 786 |
+
# --- CLICK HANDLERS (NO fn_kwargs; use lambdas for compatibility) ---
|
| 787 |
+
btn_L.click(
|
| 788 |
+
lambda s, h, b, r: do_action(s, h, b, r, "L"),
|
| 789 |
+
inputs=[st, history, beliefs, rewind_index],
|
| 790 |
+
outputs=[pov_img, truth, belief_a, belief_b, status, log, rewind, rewind_index, st, history, beliefs, rewind_index],
|
| 791 |
+
api_name=False,
|
| 792 |
+
queue=True,
|
| 793 |
+
)
|
| 794 |
+
btn_F.click(
|
| 795 |
+
lambda s, h, b, r: do_action(s, h, b, r, "F"),
|
| 796 |
+
inputs=[st, history, beliefs, rewind_index],
|
| 797 |
+
outputs=[pov_img, truth, belief_a, belief_b, status, log, rewind, rewind_index, st, history, beliefs, rewind_index],
|
| 798 |
+
api_name=False,
|
| 799 |
+
queue=True,
|
| 800 |
+
)
|
| 801 |
+
btn_R.click(
|
| 802 |
+
lambda s, h, b, r: do_action(s, h, b, r, "R"),
|
| 803 |
+
inputs=[st, history, beliefs, rewind_index],
|
| 804 |
+
outputs=[pov_img, truth, belief_a, belief_b, status, log, rewind, rewind_index, st, history, beliefs, rewind_index],
|
| 805 |
+
api_name=False,
|
| 806 |
+
queue=True,
|
| 807 |
+
)
|
| 808 |
|
| 809 |
+
btn_step.click(
|
| 810 |
+
do_tick,
|
| 811 |
+
inputs=[st, history, beliefs, rewind_index],
|
| 812 |
+
outputs=[pov_img, truth, belief_a, belief_b, status, log, rewind, rewind_index, st, history, beliefs, rewind_index],
|
| 813 |
+
queue=True
|
| 814 |
+
)
|
| 815 |
|
| 816 |
+
toggle_control.click(
|
| 817 |
+
toggle_control_fn,
|
| 818 |
+
inputs=[st, history, beliefs, rewind_index],
|
| 819 |
+
outputs=[pov_img, truth, belief_a, belief_b, status, log, rewind, rewind_index, st, history, beliefs, rewind_index],
|
| 820 |
+
queue=True
|
| 821 |
+
)
|
| 822 |
+
toggle_pov.click(
|
| 823 |
+
toggle_pov_fn,
|
| 824 |
+
inputs=[st, history, beliefs, rewind_index],
|
| 825 |
+
outputs=[pov_img, truth, belief_a, belief_b, status, log, rewind, rewind_index, st, history, beliefs, rewind_index],
|
| 826 |
+
queue=True
|
| 827 |
+
)
|
| 828 |
|
| 829 |
+
autorun.change(
|
| 830 |
+
set_toggles,
|
| 831 |
+
inputs=[st, history, beliefs, rewind_index, autorun, speed, overlay],
|
| 832 |
+
outputs=[pov_img, truth, belief_a, belief_b, status, log, rewind, rewind_index, st, history, beliefs, rewind_index],
|
| 833 |
+
queue=True
|
| 834 |
+
)
|
| 835 |
+
speed.change(
|
| 836 |
+
set_toggles,
|
| 837 |
+
inputs=[st, history, beliefs, rewind_index, autorun, speed, overlay],
|
| 838 |
+
outputs=[pov_img, truth, belief_a, belief_b, status, log, rewind, rewind_index, st, history, beliefs, rewind_index],
|
| 839 |
+
queue=True
|
| 840 |
+
)
|
| 841 |
+
overlay.change(
|
| 842 |
+
set_toggles,
|
| 843 |
+
inputs=[st, history, beliefs, rewind_index, autorun, speed, overlay],
|
| 844 |
+
outputs=[pov_img, truth, belief_a, belief_b, status, log, rewind, rewind_index, st, history, beliefs, rewind_index],
|
| 845 |
+
queue=True
|
| 846 |
+
)
|
| 847 |
|
| 848 |
+
btn_jump.click(
|
| 849 |
+
jump_fn,
|
| 850 |
+
inputs=[st, history, beliefs, rewind_index, rewind],
|
| 851 |
+
outputs=[pov_img, truth, belief_a, belief_b, status, log, rewind, rewind_index, st, history, beliefs, rewind_index],
|
| 852 |
+
queue=True
|
| 853 |
+
)
|
| 854 |
+
btn_branch.click(
|
| 855 |
+
branch_fn,
|
| 856 |
+
inputs=[st, history, beliefs, rewind_index, branch_name],
|
| 857 |
+
outputs=[pov_img, truth, belief_a, belief_b, status, log, rewind, rewind_index, st, history, beliefs, rewind_index],
|
| 858 |
+
queue=True
|
| 859 |
+
)
|
| 860 |
|
| 861 |
+
truth.select(
|
| 862 |
+
truth_click,
|
| 863 |
+
inputs=[tile_pick, st, history, beliefs, rewind_index],
|
| 864 |
+
outputs=[pov_img, truth, belief_a, belief_b, status, log, rewind, rewind_index, st, history, beliefs, rewind_index],
|
| 865 |
+
queue=True
|
| 866 |
+
)
|
| 867 |
|
| 868 |
btn_export.click(export_fn, inputs=[st, history], outputs=[export_box], queue=True)
|
| 869 |
+
btn_import.click(
|
| 870 |
+
import_fn,
|
| 871 |
+
inputs=[import_box],
|
| 872 |
+
outputs=[pov_img, truth, belief_a, belief_b, status, log, rewind, st, history, beliefs, rewind_index],
|
| 873 |
+
queue=True
|
| 874 |
+
)
|
| 875 |
|
| 876 |
# Timer-driven autorun
|
| 877 |
def timer_fn(state: WorldState, hist: List[Snapshot], bel: Dict[str, np.ndarray], r_idx: int, ar: bool, sp: float):
|
|
|
|
| 881 |
if not state.autorun or state.caught:
|
| 882 |
return refresh(state, hist, bel, r_idx) + (state, hist, bel, r_idx)
|
| 883 |
|
|
|
|
|
|
|
| 884 |
ticks_per_frame = max(1, int(round(state.speed_hz * 0.12)))
|
| 885 |
for _ in range(ticks_per_frame):
|
| 886 |
tick(state, manual_action=None)
|
|
|
|
| 898 |
queue=True
|
| 899 |
)
|
| 900 |
|
| 901 |
+
demo.load(
|
| 902 |
+
refresh,
|
| 903 |
+
inputs=[st, history, beliefs, rewind_index],
|
| 904 |
+
outputs=[pov_img, truth, belief_a, belief_b, status, log, rewind, rewind_index],
|
| 905 |
+
queue=True
|
| 906 |
+
)
|
| 907 |
|
| 908 |
demo.queue().launch()
|