ChatCraft / backend /game /pathfinding.py
gabraken's picture
feat: add game engine, voice commands, leaderboard, tutorial overlay, and stats tracking
29a88f8
"""
Pathfinding constrained to walkable area: one exterior polygon + holes (compiled_map.json),
or fallback to walkable.json polygons. Supports dynamic obstacles (building footprints).
Coordinates: sources use 0-100; game uses 0-MAP_WIDTH, 0-MAP_HEIGHT.
"""
from __future__ import annotations
import heapq
import json
import math
from pathlib import Path
from typing import Optional
from .map import MAP_HEIGHT, MAP_WIDTH
SCALE_X = MAP_WIDTH / 100.0
SCALE_Y = MAP_HEIGHT / 100.0
_GRID_STEP = 0.5
_GRID_W = int(MAP_WIDTH / _GRID_STEP) + 1
_GRID_H = int(MAP_HEIGHT / _GRID_STEP) + 1
# Compiled map: (exterior_scaled, holes_scaled) in game coords, or None
_compiled_cache: Optional[tuple[list[tuple[float, float]], list[list[tuple[float, float]]]]] = None
# Legacy: list of polygons in game coords (when no compiled map)
_polygons_cache: Optional[list[list[tuple[float, float]]]] = None
# Nav graph: (points_list, adjacency_dict). points_list[i] = (x, y) in game coords; adjacency[i] = [(j, cost), ...]
_nav_graph_cache: Optional[tuple[list[tuple[float, float]], dict[int, list[tuple[int, float]]]]] = None
# step=2.0 → cardinal dist=2.0, diagonal dist≈2.83; use 2.9 to include diagonals
_NAV_CONNECT_RADIUS = 2.9
# Clearance margin around buildings when filtering nav points (should match UNIT_RADIUS in engine)
_NAV_BUILDING_MARGIN = 0.6
# --- Performance caches ---
# Pre-computed static walkable grid (no dynamic obstacles): grid[ci][cj] = bool
_walkable_grid: Optional[list[list[bool]]] = None
# _nav_valid_set cache: buildings_key -> set[int]
# buildings_key is a frozenset of rounded blocked_rects tuples — changes only when buildings change
_nav_valid_cache: dict[int, set[int]] = {}
_NAV_VALID_CACHE_MAX = 64
# Path result cache: (start_i, end_i, buildings_key) -> path
_path_cache: dict[tuple[int, int, int], Optional[list[tuple[float, float]]]] = {}
_PATH_CACHE_MAX = 4096
# Spatial bucket index for fast nearest-nav-point lookup
_nav_buckets: Optional[dict[tuple[int, int], list[int]]] = None
_NAV_BUCKET_SIZE = 4.0 # tiles per bucket side
# Grid A* max node expansions (safety limit against very large searches)
_GRID_ASTAR_MAX_NODES = 8000
def _load_compiled() -> Optional[tuple[list[tuple[float, float]], list[list[tuple[float, float]]]]]:
global _compiled_cache
if _compiled_cache is not None:
return _compiled_cache
path = Path(__file__).resolve().parent.parent / "static" / "compiled_map.json"
if not path.exists():
return None
try:
with open(path, encoding="utf-8") as f:
data = json.load(f)
except (OSError, json.JSONDecodeError):
return None
exterior_raw = data.get("exterior", [])
holes_raw = data.get("holes", [])
if len(exterior_raw) < 3:
return None
exterior = [(float(p[0]) * SCALE_X, float(p[1]) * SCALE_Y) for p in exterior_raw]
holes = [
[(float(p[0]) * SCALE_X, float(p[1]) * SCALE_Y) for p in h]
for h in holes_raw if len(h) >= 3
]
_compiled_cache = (exterior, holes)
return _compiled_cache
def _load_nav_graph() -> Optional[tuple[list[tuple[float, float]], dict[int, list[tuple[int, float]]]]]:
"""Load nav_points from compiled_map.json and build adjacency graph. Returns (points, adj) or None."""
global _nav_graph_cache, _nav_buckets
if _nav_graph_cache is not None:
return _nav_graph_cache
path = Path(__file__).resolve().parent.parent / "static" / "compiled_map.json"
if not path.exists():
return None
try:
with open(path, encoding="utf-8") as f:
data = json.load(f)
except (OSError, json.JSONDecodeError):
return None
raw = data.get("nav_points", [])
if len(raw) < 2:
return None
points: list[tuple[float, float]] = [(float(p[0]), float(p[1])) for p in raw]
adj: dict[int, list[tuple[int, float]]] = {i: [] for i in range(len(points))}
for i in range(len(points)):
xi, yi = points[i]
for j in range(i + 1, len(points)):
xj, yj = points[j]
d = ((xj - xi) ** 2 + (yj - yi) ** 2) ** 0.5
if d <= _NAV_CONNECT_RADIUS and d > 0:
adj[i].append((j, d))
adj[j].append((i, d))
_nav_graph_cache = (points, adj)
# Build spatial bucket index
buckets: dict[tuple[int, int], list[int]] = {}
for i, (x, y) in enumerate(points):
bk = (int(x / _NAV_BUCKET_SIZE), int(y / _NAV_BUCKET_SIZE))
buckets.setdefault(bk, []).append(i)
_nav_buckets = buckets
return _nav_graph_cache
def _buildings_key(blocked_rects: Optional[list[tuple[float, float, float, float]]]) -> int:
"""Stable hash of building rects, used as cache key. Returns 0 when no buildings."""
if not blocked_rects:
return 0
return hash(tuple(
(round(r[0], 2), round(r[1], 2), round(r[2], 2), round(r[3], 2))
for r in sorted(blocked_rects)
))
def _nav_point_blocked(
x: float,
y: float,
blocked_rects: list[tuple[float, float, float, float]],
) -> bool:
"""True if nav point (x, y) is inside or too close to any building rect."""
for rx, ry, w, h in blocked_rects:
cx = max(rx, min(rx + w, x))
cy = max(ry, min(ry + h, y))
if math.hypot(x - cx, y - cy) < _NAV_BUILDING_MARGIN:
return True
return False
def _nav_valid_set(
points: list[tuple[float, float]],
blocked_rects: Optional[list[tuple[float, float, float, float]]],
bkey: int = 0,
) -> Optional[set[int]]:
"""Return set of nav point indices not blocked by buildings, with caching."""
if not blocked_rects:
return None # all valid
cached = _nav_valid_cache.get(bkey)
if cached is not None:
return cached
result = {
i for i, (px, py) in enumerate(points)
if not _nav_point_blocked(px, py, blocked_rects)
}
if len(_nav_valid_cache) >= _NAV_VALID_CACHE_MAX:
# Evict oldest half
for k in list(_nav_valid_cache.keys())[:_NAV_VALID_CACHE_MAX // 2]:
del _nav_valid_cache[k]
_nav_valid_cache[bkey] = result
return result
def _nearest_nav_index(
x: float,
y: float,
points: list[tuple[float, float]],
valid: Optional[set[int]] = None,
) -> int:
"""Return index of nav point closest to (x, y) using spatial bucket index."""
if _nav_buckets is not None:
bx0 = int(x / _NAV_BUCKET_SIZE)
by0 = int(y / _NAV_BUCKET_SIZE)
best_i = -1
best_d2 = float("inf")
for radius in range(6):
# Search the ring at manhattan distance `radius` in bucket space
for dbx in range(-radius, radius + 1):
for dby in range(-radius, radius + 1):
if abs(dbx) != radius and abs(dby) != radius:
continue # inner cells already covered
for i in _nav_buckets.get((bx0 + dbx, by0 + dby), ()):
if valid is not None and i not in valid:
continue
px, py = points[i]
d2 = (px - x) ** 2 + (py - y) ** 2
if d2 < best_d2:
best_d2 = d2
best_i = i
# Once we found a candidate, stop if the next ring is certainly farther
if best_i >= 0:
next_ring_min_dist2 = ((radius) * _NAV_BUCKET_SIZE) ** 2
if best_d2 <= next_ring_min_dist2:
break
if best_i >= 0:
return best_i
# Fallback: full linear scan
best_i = -1
best_d2 = float("inf")
for i, (px, py) in enumerate(points):
if valid is not None and i not in valid:
continue
d2 = (px - x) ** 2 + (py - y) ** 2
if d2 < best_d2:
best_d2 = d2
best_i = i
if best_i == -1:
# Ignore valid constraint as last resort
for i, (px, py) in enumerate(points):
d2 = (px - x) ** 2 + (py - y) ** 2
if d2 < best_d2:
best_d2 = d2
best_i = i
return max(best_i, 0)
def _load_polygons() -> list[list[tuple[float, float]]]:
global _polygons_cache
if _polygons_cache is not None:
return _polygons_cache
path = Path(__file__).resolve().parent.parent / "static" / "walkable.json"
if not path.exists():
_polygons_cache = []
return _polygons_cache
with open(path, encoding="utf-8") as f:
data = json.load(f)
raw = data.get("polygons", [])
if not raw and data.get("polygon"):
raw = [data["polygon"]]
result: list[list[tuple[float, float]]] = []
for poly in raw:
if len(poly) < 3:
continue
scaled = [
(float(pt[0]) * SCALE_X, float(pt[1]) * SCALE_Y)
for pt in poly
]
result.append(scaled)
_polygons_cache = result
return result
def _point_in_polygon(x: float, y: float, polygon: list[tuple[float, float]]) -> bool:
n = len(polygon)
inside = False
j = n - 1
for i in range(n):
xi, yi = polygon[i]
xj, yj = polygon[j]
if ((yi > y) != (yj > y)) and (x < (xj - xi) * (y - yi) / (yj - yi) + xi):
inside = not inside
j = i
return inside
def _point_in_rect(x: float, y: float, rx: float, ry: float, w: float, h: float) -> bool:
return rx <= x < rx + w and ry <= y < ry + h
def _static_walkable(x: float, y: float) -> bool:
"""True if (x,y) is in any walkable polygon."""
if x < 0 or x > MAP_WIDTH or y < 0 or y > MAP_HEIGHT:
return False
polygons = _load_polygons()
if not polygons:
return True
for poly in polygons:
if _point_in_polygon(x, y, poly):
return True
return False
def _get_walkable_grid() -> list[list[bool]]:
"""Pre-computed static walkable grid. Computed once, shared across all games."""
global _walkable_grid
if _walkable_grid is not None:
return _walkable_grid
grid = [[False] * _GRID_H for _ in range(_GRID_W)]
for ci in range(_GRID_W):
x = ci * _GRID_STEP
for cj in range(_GRID_H):
grid[ci][cj] = _static_walkable(x, cj * _GRID_STEP)
_walkable_grid = grid
return grid
def is_walkable(
x: float,
y: float,
*,
blocked_rects: Optional[list[tuple[float, float, float, float]]] = None,
) -> bool:
"""
True if (x, y) is walkable (inside walkable area, not in any blocked rect).
Uses pre-computed grid for the static part.
"""
ci = max(0, min(_GRID_W - 1, int(round(x / _GRID_STEP))))
cj = max(0, min(_GRID_H - 1, int(round(y / _GRID_STEP))))
if not _get_walkable_grid()[ci][cj]:
return False
if blocked_rects:
for rx, ry, w, h in blocked_rects:
if _point_in_rect(x, y, rx, ry, w, h):
return False
return True
def _cell_center(ci: int, cj: int) -> tuple[float, float]:
return (ci * _GRID_STEP, cj * _GRID_STEP)
def _to_cell(x: float, y: float) -> tuple[int, int]:
ci = int(round(x / _GRID_STEP))
cj = int(round(y / _GRID_STEP))
ci = max(0, min(_GRID_W - 1, ci))
cj = max(0, min(_GRID_H - 1, cj))
return (ci, cj)
def _cell_walkable(
ci: int,
cj: int,
blocked_rects: Optional[list[tuple[float, float, float, float]]] = None,
) -> bool:
"""Fast walkable check: uses pre-computed grid + inline building rect test."""
if not _get_walkable_grid()[ci][cj]:
return False
if blocked_rects:
cx = ci * _GRID_STEP
cy = cj * _GRID_STEP
for rx, ry, w, h in blocked_rects:
if rx <= cx < rx + w and ry <= cy < ry + h:
return False
return True
def _neighbors(
ci: int,
cj: int,
blocked_rects: Optional[list[tuple[float, float, float, float]]] = None,
) -> list[tuple[int, int, float]]:
out: list[tuple[int, int, float]] = []
for di in (-1, 0, 1):
for dj in (-1, 0, 1):
if di == 0 and dj == 0:
continue
ni, nj = ci + di, cj + dj
if 0 <= ni < _GRID_W and 0 <= nj < _GRID_H and _cell_walkable(ni, nj, blocked_rects):
cost = 1.414 if di != 0 and dj != 0 else 1.0
out.append((ni, nj, cost))
return out
def _find_path_navgraph(
sx: float, sy: float, tx: float, ty: float,
points: list[tuple[float, float]],
adj: dict[int, list[tuple[int, float]]],
blocked_rects: Optional[list[tuple[float, float, float, float]]] = None,
bkey: int = 0,
) -> Optional[list[tuple[float, float]]]:
"""A* on nav graph with result caching. Returns waypoints or None."""
if not points or not adj:
return None
valid = _nav_valid_set(points, blocked_rects, bkey)
start_i = _nearest_nav_index(sx, sy, points, valid)
end_i = _nearest_nav_index(tx, ty, points, valid)
if start_i == end_i:
return [(tx, ty)]
# Check path cache
cache_key = (start_i, end_i, bkey)
if cache_key in _path_cache:
cached = _path_cache[cache_key]
if cached is None:
return None
# Return copy with actual target endpoint
result = list(cached)
if result:
result[-1] = (tx, ty)
return result
def heuristic(i: int) -> float:
px, py = points[i]
return ((tx - px) ** 2 + (ty - py) ** 2) ** 0.5
counter = 0
best_g: dict[int, float] = {start_i: 0.0}
came_from: dict[int, Optional[int]] = {start_i: None}
open_set: list[tuple[float, int, int]] = []
heapq.heappush(open_set, (heuristic(start_i), counter, start_i))
counter += 1
found = False
while open_set:
f, _, node_i = heapq.heappop(open_set)
g = best_g.get(node_i, float("inf"))
if f > g + heuristic(node_i) + 1e-9:
continue
if node_i == end_i:
found = True
break
for j, cost in adj.get(node_i, []):
if valid is not None and j not in valid:
continue
new_g = g + cost
if new_g < best_g.get(j, float("inf")):
best_g[j] = new_g
came_from[j] = node_i
heapq.heappush(open_set, (new_g + heuristic(j), counter, j))
counter += 1
if not found:
_store_path_cache(cache_key, None)
return None
path_indices: list[int] = []
cur: Optional[int] = end_i
while cur is not None:
path_indices.append(cur)
cur = came_from[cur]
path_indices.reverse()
result = [points[i] for i in path_indices]
if result:
result[-1] = (tx, ty)
else:
result = [(tx, ty)]
_store_path_cache(cache_key, result)
return result
def _store_path_cache(
key: tuple[int, int, int],
path: Optional[list[tuple[float, float]]],
) -> None:
if len(_path_cache) >= _PATH_CACHE_MAX:
# Evict oldest quarter
for k in list(_path_cache.keys())[:_PATH_CACHE_MAX // 4]:
del _path_cache[k]
_path_cache[key] = path
def find_path(
sx: float,
sy: float,
tx: float,
ty: float,
*,
blocked_rects: Optional[list[tuple[float, float, float, float]]] = None,
) -> Optional[list[tuple[float, float]]]:
"""
A* path from (sx,sy) to (tx,ty) in game coordinates.
Uses cached nav-point graph when available; falls back to grid A*.
"""
nav = _load_nav_graph()
if nav is not None:
points, adj = nav
bkey = _buildings_key(blocked_rects)
path = _find_path_navgraph(sx, sy, tx, ty, points, adj,
blocked_rects=blocked_rects, bkey=bkey)
if path is not None:
return path
return _find_path_grid(sx, sy, tx, ty, blocked_rects=blocked_rects)
def _find_path_grid(
sx: float,
sy: float,
tx: float,
ty: float,
*,
blocked_rects: Optional[list[tuple[float, float, float, float]]] = None,
) -> Optional[list[tuple[float, float]]]:
"""Grid A* path (fallback when nav graph unavailable). Capped at _GRID_ASTAR_MAX_NODES."""
si, sj = _to_cell(sx, sy)
ti, tj = _to_cell(tx, ty)
if not _cell_walkable(si, sj, blocked_rects) or not _cell_walkable(ti, tj, blocked_rects):
return None
if si == ti and sj == tj:
return [(tx, ty)]
def heuristic(i: int, j: int) -> float:
return ((ti - i) ** 2 + (tj - j) ** 2) ** 0.5
counter = 0
nodes_expanded = 0
open_set: list[tuple[float, int, int, int, float, Optional[tuple[int, int]]]] = []
heapq.heappush(open_set, (heuristic(si, sj), counter, si, sj, 0.0, None))
counter += 1
best_g: dict[tuple[int, int], float] = {(si, sj): 0.0}
came_from: dict[tuple[int, int], Optional[tuple[int, int]]] = {(si, sj): None}
while open_set and nodes_expanded < _GRID_ASTAR_MAX_NODES:
f, _, ci, cj, g, _ = heapq.heappop(open_set)
if g > best_g.get((ci, cj), float("inf")):
continue
nodes_expanded += 1
if ci == ti and cj == tj:
cells: list[tuple[int, int]] = []
cur: Optional[tuple[int, int]] = (ci, cj)
while cur is not None:
cells.append(cur)
cur = came_from[cur]
cells.reverse()
waypoints = [_cell_center(i, j) for i, j in cells[1:]]
if waypoints:
waypoints[-1] = (tx, ty)
else:
waypoints = [(tx, ty)]
return waypoints
for ni, nj, cost in _neighbors(ci, cj, blocked_rects):
new_g = g + cost
if new_g < best_g.get((ni, nj), float("inf")):
best_g[(ni, nj)] = new_g
came_from[(ni, nj)] = (ci, cj)
f_new = new_g + heuristic(ni, nj)
heapq.heappush(open_set, (f_new, counter, ni, nj, new_g, (ci, cj)))
counter += 1
return None
def nearest_walkable_navpoint(
x: float,
y: float,
*,
blocked_rects: Optional[list[tuple[float, float, float, float]]] = None,
) -> tuple[float, float]:
"""Return the nearest nav point that is walkable (not inside any building)."""
nav = _load_nav_graph()
if nav is not None:
points, _ = nav
bkey = _buildings_key(blocked_rects)
valid = _nav_valid_set(points, blocked_rects, bkey)
best_i = _nearest_nav_index(x, y, points, valid)
if best_i >= 0:
return points[best_i]
return snap_to_walkable(x, y, blocked_rects=blocked_rects)
def snap_to_walkable(
x: float,
y: float,
*,
blocked_rects: Optional[list[tuple[float, float, float, float]]] = None,
) -> tuple[float, float]:
"""Return nearest walkable point to (x,y)."""
if is_walkable(x, y, blocked_rects=blocked_rects):
return (x, y)
best = (x, y)
best_d = 1e9
for radius in [0.5, 1.0, 1.5, 2.0, 3.0, 4.0, 5.0]:
for dx in [-radius, -radius * 0.5, 0, radius * 0.5, radius]:
for dy in [-radius, -radius * 0.5, 0, radius * 0.5, radius]:
if dx == 0 and dy == 0:
continue
nx = max(0, min(MAP_WIDTH, x + dx))
ny = max(0, min(MAP_HEIGHT, y + dy))
if is_walkable(nx, ny, blocked_rects=blocked_rects):
d = (nx - x) ** 2 + (ny - y) ** 2
if d < best_d:
best_d = d
best = (nx, ny)
if best_d < 1e9:
break
return best
def invalidate_path_cache() -> None:
"""Call when buildings change (placed or destroyed) to flush path and nav-valid caches."""
_path_cache.clear()
_nav_valid_cache.clear()