""" Pathfinding constrained to walkable area: one exterior polygon + holes (compiled_map.json), or fallback to walkable.json polygons. Supports dynamic obstacles (building footprints). Coordinates: sources use 0-100; game uses 0-MAP_WIDTH, 0-MAP_HEIGHT. """ from __future__ import annotations import heapq import json import math from pathlib import Path from typing import Optional from .map import MAP_HEIGHT, MAP_WIDTH SCALE_X = MAP_WIDTH / 100.0 SCALE_Y = MAP_HEIGHT / 100.0 _GRID_STEP = 0.5 _GRID_W = int(MAP_WIDTH / _GRID_STEP) + 1 _GRID_H = int(MAP_HEIGHT / _GRID_STEP) + 1 # Compiled map: (exterior_scaled, holes_scaled) in game coords, or None _compiled_cache: Optional[tuple[list[tuple[float, float]], list[list[tuple[float, float]]]]] = None # Legacy: list of polygons in game coords (when no compiled map) _polygons_cache: Optional[list[list[tuple[float, float]]]] = None # Nav graph: (points_list, adjacency_dict). points_list[i] = (x, y) in game coords; adjacency[i] = [(j, cost), ...] _nav_graph_cache: Optional[tuple[list[tuple[float, float]], dict[int, list[tuple[int, float]]]]] = None # step=2.0 → cardinal dist=2.0, diagonal dist≈2.83; use 2.9 to include diagonals _NAV_CONNECT_RADIUS = 2.9 # Clearance margin around buildings when filtering nav points (should match UNIT_RADIUS in engine) _NAV_BUILDING_MARGIN = 0.6 # --- Performance caches --- # Pre-computed static walkable grid (no dynamic obstacles): grid[ci][cj] = bool _walkable_grid: Optional[list[list[bool]]] = None # _nav_valid_set cache: buildings_key -> set[int] # buildings_key is a frozenset of rounded blocked_rects tuples — changes only when buildings change _nav_valid_cache: dict[int, set[int]] = {} _NAV_VALID_CACHE_MAX = 64 # Path result cache: (start_i, end_i, buildings_key) -> path _path_cache: dict[tuple[int, int, int], Optional[list[tuple[float, float]]]] = {} _PATH_CACHE_MAX = 4096 # Spatial bucket index for fast nearest-nav-point lookup _nav_buckets: Optional[dict[tuple[int, int], list[int]]] = None _NAV_BUCKET_SIZE = 4.0 # tiles per bucket side # Grid A* max node expansions (safety limit against very large searches) _GRID_ASTAR_MAX_NODES = 8000 def _load_compiled() -> Optional[tuple[list[tuple[float, float]], list[list[tuple[float, float]]]]]: global _compiled_cache if _compiled_cache is not None: return _compiled_cache path = Path(__file__).resolve().parent.parent / "static" / "compiled_map.json" if not path.exists(): return None try: with open(path, encoding="utf-8") as f: data = json.load(f) except (OSError, json.JSONDecodeError): return None exterior_raw = data.get("exterior", []) holes_raw = data.get("holes", []) if len(exterior_raw) < 3: return None exterior = [(float(p[0]) * SCALE_X, float(p[1]) * SCALE_Y) for p in exterior_raw] holes = [ [(float(p[0]) * SCALE_X, float(p[1]) * SCALE_Y) for p in h] for h in holes_raw if len(h) >= 3 ] _compiled_cache = (exterior, holes) return _compiled_cache def _load_nav_graph() -> Optional[tuple[list[tuple[float, float]], dict[int, list[tuple[int, float]]]]]: """Load nav_points from compiled_map.json and build adjacency graph. Returns (points, adj) or None.""" global _nav_graph_cache, _nav_buckets if _nav_graph_cache is not None: return _nav_graph_cache path = Path(__file__).resolve().parent.parent / "static" / "compiled_map.json" if not path.exists(): return None try: with open(path, encoding="utf-8") as f: data = json.load(f) except (OSError, json.JSONDecodeError): return None raw = data.get("nav_points", []) if len(raw) < 2: return None points: list[tuple[float, float]] = [(float(p[0]), float(p[1])) for p in raw] adj: dict[int, list[tuple[int, float]]] = {i: [] for i in range(len(points))} for i in range(len(points)): xi, yi = points[i] for j in range(i + 1, len(points)): xj, yj = points[j] d = ((xj - xi) ** 2 + (yj - yi) ** 2) ** 0.5 if d <= _NAV_CONNECT_RADIUS and d > 0: adj[i].append((j, d)) adj[j].append((i, d)) _nav_graph_cache = (points, adj) # Build spatial bucket index buckets: dict[tuple[int, int], list[int]] = {} for i, (x, y) in enumerate(points): bk = (int(x / _NAV_BUCKET_SIZE), int(y / _NAV_BUCKET_SIZE)) buckets.setdefault(bk, []).append(i) _nav_buckets = buckets return _nav_graph_cache def _buildings_key(blocked_rects: Optional[list[tuple[float, float, float, float]]]) -> int: """Stable hash of building rects, used as cache key. Returns 0 when no buildings.""" if not blocked_rects: return 0 return hash(tuple( (round(r[0], 2), round(r[1], 2), round(r[2], 2), round(r[3], 2)) for r in sorted(blocked_rects) )) def _nav_point_blocked( x: float, y: float, blocked_rects: list[tuple[float, float, float, float]], ) -> bool: """True if nav point (x, y) is inside or too close to any building rect.""" for rx, ry, w, h in blocked_rects: cx = max(rx, min(rx + w, x)) cy = max(ry, min(ry + h, y)) if math.hypot(x - cx, y - cy) < _NAV_BUILDING_MARGIN: return True return False def _nav_valid_set( points: list[tuple[float, float]], blocked_rects: Optional[list[tuple[float, float, float, float]]], bkey: int = 0, ) -> Optional[set[int]]: """Return set of nav point indices not blocked by buildings, with caching.""" if not blocked_rects: return None # all valid cached = _nav_valid_cache.get(bkey) if cached is not None: return cached result = { i for i, (px, py) in enumerate(points) if not _nav_point_blocked(px, py, blocked_rects) } if len(_nav_valid_cache) >= _NAV_VALID_CACHE_MAX: # Evict oldest half for k in list(_nav_valid_cache.keys())[:_NAV_VALID_CACHE_MAX // 2]: del _nav_valid_cache[k] _nav_valid_cache[bkey] = result return result def _nearest_nav_index( x: float, y: float, points: list[tuple[float, float]], valid: Optional[set[int]] = None, ) -> int: """Return index of nav point closest to (x, y) using spatial bucket index.""" if _nav_buckets is not None: bx0 = int(x / _NAV_BUCKET_SIZE) by0 = int(y / _NAV_BUCKET_SIZE) best_i = -1 best_d2 = float("inf") for radius in range(6): # Search the ring at manhattan distance `radius` in bucket space for dbx in range(-radius, radius + 1): for dby in range(-radius, radius + 1): if abs(dbx) != radius and abs(dby) != radius: continue # inner cells already covered for i in _nav_buckets.get((bx0 + dbx, by0 + dby), ()): if valid is not None and i not in valid: continue px, py = points[i] d2 = (px - x) ** 2 + (py - y) ** 2 if d2 < best_d2: best_d2 = d2 best_i = i # Once we found a candidate, stop if the next ring is certainly farther if best_i >= 0: next_ring_min_dist2 = ((radius) * _NAV_BUCKET_SIZE) ** 2 if best_d2 <= next_ring_min_dist2: break if best_i >= 0: return best_i # Fallback: full linear scan best_i = -1 best_d2 = float("inf") for i, (px, py) in enumerate(points): if valid is not None and i not in valid: continue d2 = (px - x) ** 2 + (py - y) ** 2 if d2 < best_d2: best_d2 = d2 best_i = i if best_i == -1: # Ignore valid constraint as last resort for i, (px, py) in enumerate(points): d2 = (px - x) ** 2 + (py - y) ** 2 if d2 < best_d2: best_d2 = d2 best_i = i return max(best_i, 0) def _load_polygons() -> list[list[tuple[float, float]]]: global _polygons_cache if _polygons_cache is not None: return _polygons_cache path = Path(__file__).resolve().parent.parent / "static" / "walkable.json" if not path.exists(): _polygons_cache = [] return _polygons_cache with open(path, encoding="utf-8") as f: data = json.load(f) raw = data.get("polygons", []) if not raw and data.get("polygon"): raw = [data["polygon"]] result: list[list[tuple[float, float]]] = [] for poly in raw: if len(poly) < 3: continue scaled = [ (float(pt[0]) * SCALE_X, float(pt[1]) * SCALE_Y) for pt in poly ] result.append(scaled) _polygons_cache = result return result def _point_in_polygon(x: float, y: float, polygon: list[tuple[float, float]]) -> bool: n = len(polygon) inside = False j = n - 1 for i in range(n): xi, yi = polygon[i] xj, yj = polygon[j] if ((yi > y) != (yj > y)) and (x < (xj - xi) * (y - yi) / (yj - yi) + xi): inside = not inside j = i return inside def _point_in_rect(x: float, y: float, rx: float, ry: float, w: float, h: float) -> bool: return rx <= x < rx + w and ry <= y < ry + h def _static_walkable(x: float, y: float) -> bool: """True if (x,y) is in any walkable polygon.""" if x < 0 or x > MAP_WIDTH or y < 0 or y > MAP_HEIGHT: return False polygons = _load_polygons() if not polygons: return True for poly in polygons: if _point_in_polygon(x, y, poly): return True return False def _get_walkable_grid() -> list[list[bool]]: """Pre-computed static walkable grid. Computed once, shared across all games.""" global _walkable_grid if _walkable_grid is not None: return _walkable_grid grid = [[False] * _GRID_H for _ in range(_GRID_W)] for ci in range(_GRID_W): x = ci * _GRID_STEP for cj in range(_GRID_H): grid[ci][cj] = _static_walkable(x, cj * _GRID_STEP) _walkable_grid = grid return grid def is_walkable( x: float, y: float, *, blocked_rects: Optional[list[tuple[float, float, float, float]]] = None, ) -> bool: """ True if (x, y) is walkable (inside walkable area, not in any blocked rect). Uses pre-computed grid for the static part. """ ci = max(0, min(_GRID_W - 1, int(round(x / _GRID_STEP)))) cj = max(0, min(_GRID_H - 1, int(round(y / _GRID_STEP)))) if not _get_walkable_grid()[ci][cj]: return False if blocked_rects: for rx, ry, w, h in blocked_rects: if _point_in_rect(x, y, rx, ry, w, h): return False return True def _cell_center(ci: int, cj: int) -> tuple[float, float]: return (ci * _GRID_STEP, cj * _GRID_STEP) def _to_cell(x: float, y: float) -> tuple[int, int]: ci = int(round(x / _GRID_STEP)) cj = int(round(y / _GRID_STEP)) ci = max(0, min(_GRID_W - 1, ci)) cj = max(0, min(_GRID_H - 1, cj)) return (ci, cj) def _cell_walkable( ci: int, cj: int, blocked_rects: Optional[list[tuple[float, float, float, float]]] = None, ) -> bool: """Fast walkable check: uses pre-computed grid + inline building rect test.""" if not _get_walkable_grid()[ci][cj]: return False if blocked_rects: cx = ci * _GRID_STEP cy = cj * _GRID_STEP for rx, ry, w, h in blocked_rects: if rx <= cx < rx + w and ry <= cy < ry + h: return False return True def _neighbors( ci: int, cj: int, blocked_rects: Optional[list[tuple[float, float, float, float]]] = None, ) -> list[tuple[int, int, float]]: out: list[tuple[int, int, float]] = [] for di in (-1, 0, 1): for dj in (-1, 0, 1): if di == 0 and dj == 0: continue ni, nj = ci + di, cj + dj if 0 <= ni < _GRID_W and 0 <= nj < _GRID_H and _cell_walkable(ni, nj, blocked_rects): cost = 1.414 if di != 0 and dj != 0 else 1.0 out.append((ni, nj, cost)) return out def _find_path_navgraph( sx: float, sy: float, tx: float, ty: float, points: list[tuple[float, float]], adj: dict[int, list[tuple[int, float]]], blocked_rects: Optional[list[tuple[float, float, float, float]]] = None, bkey: int = 0, ) -> Optional[list[tuple[float, float]]]: """A* on nav graph with result caching. Returns waypoints or None.""" if not points or not adj: return None valid = _nav_valid_set(points, blocked_rects, bkey) start_i = _nearest_nav_index(sx, sy, points, valid) end_i = _nearest_nav_index(tx, ty, points, valid) if start_i == end_i: return [(tx, ty)] # Check path cache cache_key = (start_i, end_i, bkey) if cache_key in _path_cache: cached = _path_cache[cache_key] if cached is None: return None # Return copy with actual target endpoint result = list(cached) if result: result[-1] = (tx, ty) return result def heuristic(i: int) -> float: px, py = points[i] return ((tx - px) ** 2 + (ty - py) ** 2) ** 0.5 counter = 0 best_g: dict[int, float] = {start_i: 0.0} came_from: dict[int, Optional[int]] = {start_i: None} open_set: list[tuple[float, int, int]] = [] heapq.heappush(open_set, (heuristic(start_i), counter, start_i)) counter += 1 found = False while open_set: f, _, node_i = heapq.heappop(open_set) g = best_g.get(node_i, float("inf")) if f > g + heuristic(node_i) + 1e-9: continue if node_i == end_i: found = True break for j, cost in adj.get(node_i, []): if valid is not None and j not in valid: continue new_g = g + cost if new_g < best_g.get(j, float("inf")): best_g[j] = new_g came_from[j] = node_i heapq.heappush(open_set, (new_g + heuristic(j), counter, j)) counter += 1 if not found: _store_path_cache(cache_key, None) return None path_indices: list[int] = [] cur: Optional[int] = end_i while cur is not None: path_indices.append(cur) cur = came_from[cur] path_indices.reverse() result = [points[i] for i in path_indices] if result: result[-1] = (tx, ty) else: result = [(tx, ty)] _store_path_cache(cache_key, result) return result def _store_path_cache( key: tuple[int, int, int], path: Optional[list[tuple[float, float]]], ) -> None: if len(_path_cache) >= _PATH_CACHE_MAX: # Evict oldest quarter for k in list(_path_cache.keys())[:_PATH_CACHE_MAX // 4]: del _path_cache[k] _path_cache[key] = path def find_path( sx: float, sy: float, tx: float, ty: float, *, blocked_rects: Optional[list[tuple[float, float, float, float]]] = None, ) -> Optional[list[tuple[float, float]]]: """ A* path from (sx,sy) to (tx,ty) in game coordinates. Uses cached nav-point graph when available; falls back to grid A*. """ nav = _load_nav_graph() if nav is not None: points, adj = nav bkey = _buildings_key(blocked_rects) path = _find_path_navgraph(sx, sy, tx, ty, points, adj, blocked_rects=blocked_rects, bkey=bkey) if path is not None: return path return _find_path_grid(sx, sy, tx, ty, blocked_rects=blocked_rects) def _find_path_grid( sx: float, sy: float, tx: float, ty: float, *, blocked_rects: Optional[list[tuple[float, float, float, float]]] = None, ) -> Optional[list[tuple[float, float]]]: """Grid A* path (fallback when nav graph unavailable). Capped at _GRID_ASTAR_MAX_NODES.""" si, sj = _to_cell(sx, sy) ti, tj = _to_cell(tx, ty) if not _cell_walkable(si, sj, blocked_rects) or not _cell_walkable(ti, tj, blocked_rects): return None if si == ti and sj == tj: return [(tx, ty)] def heuristic(i: int, j: int) -> float: return ((ti - i) ** 2 + (tj - j) ** 2) ** 0.5 counter = 0 nodes_expanded = 0 open_set: list[tuple[float, int, int, int, float, Optional[tuple[int, int]]]] = [] heapq.heappush(open_set, (heuristic(si, sj), counter, si, sj, 0.0, None)) counter += 1 best_g: dict[tuple[int, int], float] = {(si, sj): 0.0} came_from: dict[tuple[int, int], Optional[tuple[int, int]]] = {(si, sj): None} while open_set and nodes_expanded < _GRID_ASTAR_MAX_NODES: f, _, ci, cj, g, _ = heapq.heappop(open_set) if g > best_g.get((ci, cj), float("inf")): continue nodes_expanded += 1 if ci == ti and cj == tj: cells: list[tuple[int, int]] = [] cur: Optional[tuple[int, int]] = (ci, cj) while cur is not None: cells.append(cur) cur = came_from[cur] cells.reverse() waypoints = [_cell_center(i, j) for i, j in cells[1:]] if waypoints: waypoints[-1] = (tx, ty) else: waypoints = [(tx, ty)] return waypoints for ni, nj, cost in _neighbors(ci, cj, blocked_rects): new_g = g + cost if new_g < best_g.get((ni, nj), float("inf")): best_g[(ni, nj)] = new_g came_from[(ni, nj)] = (ci, cj) f_new = new_g + heuristic(ni, nj) heapq.heappush(open_set, (f_new, counter, ni, nj, new_g, (ci, cj))) counter += 1 return None def nearest_walkable_navpoint( x: float, y: float, *, blocked_rects: Optional[list[tuple[float, float, float, float]]] = None, ) -> tuple[float, float]: """Return the nearest nav point that is walkable (not inside any building).""" nav = _load_nav_graph() if nav is not None: points, _ = nav bkey = _buildings_key(blocked_rects) valid = _nav_valid_set(points, blocked_rects, bkey) best_i = _nearest_nav_index(x, y, points, valid) if best_i >= 0: return points[best_i] return snap_to_walkable(x, y, blocked_rects=blocked_rects) def snap_to_walkable( x: float, y: float, *, blocked_rects: Optional[list[tuple[float, float, float, float]]] = None, ) -> tuple[float, float]: """Return nearest walkable point to (x,y).""" if is_walkable(x, y, blocked_rects=blocked_rects): return (x, y) best = (x, y) best_d = 1e9 for radius in [0.5, 1.0, 1.5, 2.0, 3.0, 4.0, 5.0]: for dx in [-radius, -radius * 0.5, 0, radius * 0.5, radius]: for dy in [-radius, -radius * 0.5, 0, radius * 0.5, radius]: if dx == 0 and dy == 0: continue nx = max(0, min(MAP_WIDTH, x + dx)) ny = max(0, min(MAP_HEIGHT, y + dy)) if is_walkable(nx, ny, blocked_rects=blocked_rects): d = (nx - x) ** 2 + (ny - y) ** 2 if d < best_d: best_d = d best = (nx, ny) if best_d < 1e9: break return best def invalidate_path_cache() -> None: """Call when buildings change (placed or destroyed) to flush path and nav-valid caches.""" _path_cache.clear() _nav_valid_cache.clear()