Spaces:
Running
Running
| """ | |
| Pathfinding constrained to walkable area: one exterior polygon + holes (compiled_map.json), | |
| or fallback to walkable.json polygons. Supports dynamic obstacles (building footprints). | |
| Coordinates: sources use 0-100; game uses 0-MAP_WIDTH, 0-MAP_HEIGHT. | |
| """ | |
| from __future__ import annotations | |
| import heapq | |
| import json | |
| import math | |
| from pathlib import Path | |
| from typing import Optional | |
| from .map import MAP_HEIGHT, MAP_WIDTH | |
| SCALE_X = MAP_WIDTH / 100.0 | |
| SCALE_Y = MAP_HEIGHT / 100.0 | |
| _GRID_STEP = 0.5 | |
| _GRID_W = int(MAP_WIDTH / _GRID_STEP) + 1 | |
| _GRID_H = int(MAP_HEIGHT / _GRID_STEP) + 1 | |
| # Compiled map: (exterior_scaled, holes_scaled) in game coords, or None | |
| _compiled_cache: Optional[tuple[list[tuple[float, float]], list[list[tuple[float, float]]]]] = None | |
| # Legacy: list of polygons in game coords (when no compiled map) | |
| _polygons_cache: Optional[list[list[tuple[float, float]]]] = None | |
| # Nav graph: (points_list, adjacency_dict). points_list[i] = (x, y) in game coords; adjacency[i] = [(j, cost), ...] | |
| _nav_graph_cache: Optional[tuple[list[tuple[float, float]], dict[int, list[tuple[int, float]]]]] = None | |
| # step=2.0 → cardinal dist=2.0, diagonal dist≈2.83; use 2.9 to include diagonals | |
| _NAV_CONNECT_RADIUS = 2.9 | |
| # Clearance margin around buildings when filtering nav points (should match UNIT_RADIUS in engine) | |
| _NAV_BUILDING_MARGIN = 0.6 | |
| # --- Performance caches --- | |
| # Pre-computed static walkable grid (no dynamic obstacles): grid[ci][cj] = bool | |
| _walkable_grid: Optional[list[list[bool]]] = None | |
| # _nav_valid_set cache: buildings_key -> set[int] | |
| # buildings_key is a frozenset of rounded blocked_rects tuples — changes only when buildings change | |
| _nav_valid_cache: dict[int, set[int]] = {} | |
| _NAV_VALID_CACHE_MAX = 64 | |
| # Path result cache: (start_i, end_i, buildings_key) -> path | |
| _path_cache: dict[tuple[int, int, int], Optional[list[tuple[float, float]]]] = {} | |
| _PATH_CACHE_MAX = 4096 | |
| # Spatial bucket index for fast nearest-nav-point lookup | |
| _nav_buckets: Optional[dict[tuple[int, int], list[int]]] = None | |
| _NAV_BUCKET_SIZE = 4.0 # tiles per bucket side | |
| # Grid A* max node expansions (safety limit against very large searches) | |
| _GRID_ASTAR_MAX_NODES = 8000 | |
| def _load_compiled() -> Optional[tuple[list[tuple[float, float]], list[list[tuple[float, float]]]]]: | |
| global _compiled_cache | |
| if _compiled_cache is not None: | |
| return _compiled_cache | |
| path = Path(__file__).resolve().parent.parent / "static" / "compiled_map.json" | |
| if not path.exists(): | |
| return None | |
| try: | |
| with open(path, encoding="utf-8") as f: | |
| data = json.load(f) | |
| except (OSError, json.JSONDecodeError): | |
| return None | |
| exterior_raw = data.get("exterior", []) | |
| holes_raw = data.get("holes", []) | |
| if len(exterior_raw) < 3: | |
| return None | |
| exterior = [(float(p[0]) * SCALE_X, float(p[1]) * SCALE_Y) for p in exterior_raw] | |
| holes = [ | |
| [(float(p[0]) * SCALE_X, float(p[1]) * SCALE_Y) for p in h] | |
| for h in holes_raw if len(h) >= 3 | |
| ] | |
| _compiled_cache = (exterior, holes) | |
| return _compiled_cache | |
| def _load_nav_graph() -> Optional[tuple[list[tuple[float, float]], dict[int, list[tuple[int, float]]]]]: | |
| """Load nav_points from compiled_map.json and build adjacency graph. Returns (points, adj) or None.""" | |
| global _nav_graph_cache, _nav_buckets | |
| if _nav_graph_cache is not None: | |
| return _nav_graph_cache | |
| path = Path(__file__).resolve().parent.parent / "static" / "compiled_map.json" | |
| if not path.exists(): | |
| return None | |
| try: | |
| with open(path, encoding="utf-8") as f: | |
| data = json.load(f) | |
| except (OSError, json.JSONDecodeError): | |
| return None | |
| raw = data.get("nav_points", []) | |
| if len(raw) < 2: | |
| return None | |
| points: list[tuple[float, float]] = [(float(p[0]), float(p[1])) for p in raw] | |
| adj: dict[int, list[tuple[int, float]]] = {i: [] for i in range(len(points))} | |
| for i in range(len(points)): | |
| xi, yi = points[i] | |
| for j in range(i + 1, len(points)): | |
| xj, yj = points[j] | |
| d = ((xj - xi) ** 2 + (yj - yi) ** 2) ** 0.5 | |
| if d <= _NAV_CONNECT_RADIUS and d > 0: | |
| adj[i].append((j, d)) | |
| adj[j].append((i, d)) | |
| _nav_graph_cache = (points, adj) | |
| # Build spatial bucket index | |
| buckets: dict[tuple[int, int], list[int]] = {} | |
| for i, (x, y) in enumerate(points): | |
| bk = (int(x / _NAV_BUCKET_SIZE), int(y / _NAV_BUCKET_SIZE)) | |
| buckets.setdefault(bk, []).append(i) | |
| _nav_buckets = buckets | |
| return _nav_graph_cache | |
| def _buildings_key(blocked_rects: Optional[list[tuple[float, float, float, float]]]) -> int: | |
| """Stable hash of building rects, used as cache key. Returns 0 when no buildings.""" | |
| if not blocked_rects: | |
| return 0 | |
| return hash(tuple( | |
| (round(r[0], 2), round(r[1], 2), round(r[2], 2), round(r[3], 2)) | |
| for r in sorted(blocked_rects) | |
| )) | |
| def _nav_point_blocked( | |
| x: float, | |
| y: float, | |
| blocked_rects: list[tuple[float, float, float, float]], | |
| ) -> bool: | |
| """True if nav point (x, y) is inside or too close to any building rect.""" | |
| for rx, ry, w, h in blocked_rects: | |
| cx = max(rx, min(rx + w, x)) | |
| cy = max(ry, min(ry + h, y)) | |
| if math.hypot(x - cx, y - cy) < _NAV_BUILDING_MARGIN: | |
| return True | |
| return False | |
| def _nav_valid_set( | |
| points: list[tuple[float, float]], | |
| blocked_rects: Optional[list[tuple[float, float, float, float]]], | |
| bkey: int = 0, | |
| ) -> Optional[set[int]]: | |
| """Return set of nav point indices not blocked by buildings, with caching.""" | |
| if not blocked_rects: | |
| return None # all valid | |
| cached = _nav_valid_cache.get(bkey) | |
| if cached is not None: | |
| return cached | |
| result = { | |
| i for i, (px, py) in enumerate(points) | |
| if not _nav_point_blocked(px, py, blocked_rects) | |
| } | |
| if len(_nav_valid_cache) >= _NAV_VALID_CACHE_MAX: | |
| # Evict oldest half | |
| for k in list(_nav_valid_cache.keys())[:_NAV_VALID_CACHE_MAX // 2]: | |
| del _nav_valid_cache[k] | |
| _nav_valid_cache[bkey] = result | |
| return result | |
| def _nearest_nav_index( | |
| x: float, | |
| y: float, | |
| points: list[tuple[float, float]], | |
| valid: Optional[set[int]] = None, | |
| ) -> int: | |
| """Return index of nav point closest to (x, y) using spatial bucket index.""" | |
| if _nav_buckets is not None: | |
| bx0 = int(x / _NAV_BUCKET_SIZE) | |
| by0 = int(y / _NAV_BUCKET_SIZE) | |
| best_i = -1 | |
| best_d2 = float("inf") | |
| for radius in range(6): | |
| # Search the ring at manhattan distance `radius` in bucket space | |
| for dbx in range(-radius, radius + 1): | |
| for dby in range(-radius, radius + 1): | |
| if abs(dbx) != radius and abs(dby) != radius: | |
| continue # inner cells already covered | |
| for i in _nav_buckets.get((bx0 + dbx, by0 + dby), ()): | |
| if valid is not None and i not in valid: | |
| continue | |
| px, py = points[i] | |
| d2 = (px - x) ** 2 + (py - y) ** 2 | |
| if d2 < best_d2: | |
| best_d2 = d2 | |
| best_i = i | |
| # Once we found a candidate, stop if the next ring is certainly farther | |
| if best_i >= 0: | |
| next_ring_min_dist2 = ((radius) * _NAV_BUCKET_SIZE) ** 2 | |
| if best_d2 <= next_ring_min_dist2: | |
| break | |
| if best_i >= 0: | |
| return best_i | |
| # Fallback: full linear scan | |
| best_i = -1 | |
| best_d2 = float("inf") | |
| for i, (px, py) in enumerate(points): | |
| if valid is not None and i not in valid: | |
| continue | |
| d2 = (px - x) ** 2 + (py - y) ** 2 | |
| if d2 < best_d2: | |
| best_d2 = d2 | |
| best_i = i | |
| if best_i == -1: | |
| # Ignore valid constraint as last resort | |
| for i, (px, py) in enumerate(points): | |
| d2 = (px - x) ** 2 + (py - y) ** 2 | |
| if d2 < best_d2: | |
| best_d2 = d2 | |
| best_i = i | |
| return max(best_i, 0) | |
| def _load_polygons() -> list[list[tuple[float, float]]]: | |
| global _polygons_cache | |
| if _polygons_cache is not None: | |
| return _polygons_cache | |
| path = Path(__file__).resolve().parent.parent / "static" / "walkable.json" | |
| if not path.exists(): | |
| _polygons_cache = [] | |
| return _polygons_cache | |
| with open(path, encoding="utf-8") as f: | |
| data = json.load(f) | |
| raw = data.get("polygons", []) | |
| if not raw and data.get("polygon"): | |
| raw = [data["polygon"]] | |
| result: list[list[tuple[float, float]]] = [] | |
| for poly in raw: | |
| if len(poly) < 3: | |
| continue | |
| scaled = [ | |
| (float(pt[0]) * SCALE_X, float(pt[1]) * SCALE_Y) | |
| for pt in poly | |
| ] | |
| result.append(scaled) | |
| _polygons_cache = result | |
| return result | |
| def _point_in_polygon(x: float, y: float, polygon: list[tuple[float, float]]) -> bool: | |
| n = len(polygon) | |
| inside = False | |
| j = n - 1 | |
| for i in range(n): | |
| xi, yi = polygon[i] | |
| xj, yj = polygon[j] | |
| if ((yi > y) != (yj > y)) and (x < (xj - xi) * (y - yi) / (yj - yi) + xi): | |
| inside = not inside | |
| j = i | |
| return inside | |
| def _point_in_rect(x: float, y: float, rx: float, ry: float, w: float, h: float) -> bool: | |
| return rx <= x < rx + w and ry <= y < ry + h | |
| def _static_walkable(x: float, y: float) -> bool: | |
| """True if (x,y) is in any walkable polygon.""" | |
| if x < 0 or x > MAP_WIDTH or y < 0 or y > MAP_HEIGHT: | |
| return False | |
| polygons = _load_polygons() | |
| if not polygons: | |
| return True | |
| for poly in polygons: | |
| if _point_in_polygon(x, y, poly): | |
| return True | |
| return False | |
| def _get_walkable_grid() -> list[list[bool]]: | |
| """Pre-computed static walkable grid. Computed once, shared across all games.""" | |
| global _walkable_grid | |
| if _walkable_grid is not None: | |
| return _walkable_grid | |
| grid = [[False] * _GRID_H for _ in range(_GRID_W)] | |
| for ci in range(_GRID_W): | |
| x = ci * _GRID_STEP | |
| for cj in range(_GRID_H): | |
| grid[ci][cj] = _static_walkable(x, cj * _GRID_STEP) | |
| _walkable_grid = grid | |
| return grid | |
| def is_walkable( | |
| x: float, | |
| y: float, | |
| *, | |
| blocked_rects: Optional[list[tuple[float, float, float, float]]] = None, | |
| ) -> bool: | |
| """ | |
| True if (x, y) is walkable (inside walkable area, not in any blocked rect). | |
| Uses pre-computed grid for the static part. | |
| """ | |
| ci = max(0, min(_GRID_W - 1, int(round(x / _GRID_STEP)))) | |
| cj = max(0, min(_GRID_H - 1, int(round(y / _GRID_STEP)))) | |
| if not _get_walkable_grid()[ci][cj]: | |
| return False | |
| if blocked_rects: | |
| for rx, ry, w, h in blocked_rects: | |
| if _point_in_rect(x, y, rx, ry, w, h): | |
| return False | |
| return True | |
| def _cell_center(ci: int, cj: int) -> tuple[float, float]: | |
| return (ci * _GRID_STEP, cj * _GRID_STEP) | |
| def _to_cell(x: float, y: float) -> tuple[int, int]: | |
| ci = int(round(x / _GRID_STEP)) | |
| cj = int(round(y / _GRID_STEP)) | |
| ci = max(0, min(_GRID_W - 1, ci)) | |
| cj = max(0, min(_GRID_H - 1, cj)) | |
| return (ci, cj) | |
| def _cell_walkable( | |
| ci: int, | |
| cj: int, | |
| blocked_rects: Optional[list[tuple[float, float, float, float]]] = None, | |
| ) -> bool: | |
| """Fast walkable check: uses pre-computed grid + inline building rect test.""" | |
| if not _get_walkable_grid()[ci][cj]: | |
| return False | |
| if blocked_rects: | |
| cx = ci * _GRID_STEP | |
| cy = cj * _GRID_STEP | |
| for rx, ry, w, h in blocked_rects: | |
| if rx <= cx < rx + w and ry <= cy < ry + h: | |
| return False | |
| return True | |
| def _neighbors( | |
| ci: int, | |
| cj: int, | |
| blocked_rects: Optional[list[tuple[float, float, float, float]]] = None, | |
| ) -> list[tuple[int, int, float]]: | |
| out: list[tuple[int, int, float]] = [] | |
| for di in (-1, 0, 1): | |
| for dj in (-1, 0, 1): | |
| if di == 0 and dj == 0: | |
| continue | |
| ni, nj = ci + di, cj + dj | |
| if 0 <= ni < _GRID_W and 0 <= nj < _GRID_H and _cell_walkable(ni, nj, blocked_rects): | |
| cost = 1.414 if di != 0 and dj != 0 else 1.0 | |
| out.append((ni, nj, cost)) | |
| return out | |
| def _find_path_navgraph( | |
| sx: float, sy: float, tx: float, ty: float, | |
| points: list[tuple[float, float]], | |
| adj: dict[int, list[tuple[int, float]]], | |
| blocked_rects: Optional[list[tuple[float, float, float, float]]] = None, | |
| bkey: int = 0, | |
| ) -> Optional[list[tuple[float, float]]]: | |
| """A* on nav graph with result caching. Returns waypoints or None.""" | |
| if not points or not adj: | |
| return None | |
| valid = _nav_valid_set(points, blocked_rects, bkey) | |
| start_i = _nearest_nav_index(sx, sy, points, valid) | |
| end_i = _nearest_nav_index(tx, ty, points, valid) | |
| if start_i == end_i: | |
| return [(tx, ty)] | |
| # Check path cache | |
| cache_key = (start_i, end_i, bkey) | |
| if cache_key in _path_cache: | |
| cached = _path_cache[cache_key] | |
| if cached is None: | |
| return None | |
| # Return copy with actual target endpoint | |
| result = list(cached) | |
| if result: | |
| result[-1] = (tx, ty) | |
| return result | |
| def heuristic(i: int) -> float: | |
| px, py = points[i] | |
| return ((tx - px) ** 2 + (ty - py) ** 2) ** 0.5 | |
| counter = 0 | |
| best_g: dict[int, float] = {start_i: 0.0} | |
| came_from: dict[int, Optional[int]] = {start_i: None} | |
| open_set: list[tuple[float, int, int]] = [] | |
| heapq.heappush(open_set, (heuristic(start_i), counter, start_i)) | |
| counter += 1 | |
| found = False | |
| while open_set: | |
| f, _, node_i = heapq.heappop(open_set) | |
| g = best_g.get(node_i, float("inf")) | |
| if f > g + heuristic(node_i) + 1e-9: | |
| continue | |
| if node_i == end_i: | |
| found = True | |
| break | |
| for j, cost in adj.get(node_i, []): | |
| if valid is not None and j not in valid: | |
| continue | |
| new_g = g + cost | |
| if new_g < best_g.get(j, float("inf")): | |
| best_g[j] = new_g | |
| came_from[j] = node_i | |
| heapq.heappush(open_set, (new_g + heuristic(j), counter, j)) | |
| counter += 1 | |
| if not found: | |
| _store_path_cache(cache_key, None) | |
| return None | |
| path_indices: list[int] = [] | |
| cur: Optional[int] = end_i | |
| while cur is not None: | |
| path_indices.append(cur) | |
| cur = came_from[cur] | |
| path_indices.reverse() | |
| result = [points[i] for i in path_indices] | |
| if result: | |
| result[-1] = (tx, ty) | |
| else: | |
| result = [(tx, ty)] | |
| _store_path_cache(cache_key, result) | |
| return result | |
| def _store_path_cache( | |
| key: tuple[int, int, int], | |
| path: Optional[list[tuple[float, float]]], | |
| ) -> None: | |
| if len(_path_cache) >= _PATH_CACHE_MAX: | |
| # Evict oldest quarter | |
| for k in list(_path_cache.keys())[:_PATH_CACHE_MAX // 4]: | |
| del _path_cache[k] | |
| _path_cache[key] = path | |
| def find_path( | |
| sx: float, | |
| sy: float, | |
| tx: float, | |
| ty: float, | |
| *, | |
| blocked_rects: Optional[list[tuple[float, float, float, float]]] = None, | |
| ) -> Optional[list[tuple[float, float]]]: | |
| """ | |
| A* path from (sx,sy) to (tx,ty) in game coordinates. | |
| Uses cached nav-point graph when available; falls back to grid A*. | |
| """ | |
| nav = _load_nav_graph() | |
| if nav is not None: | |
| points, adj = nav | |
| bkey = _buildings_key(blocked_rects) | |
| path = _find_path_navgraph(sx, sy, tx, ty, points, adj, | |
| blocked_rects=blocked_rects, bkey=bkey) | |
| if path is not None: | |
| return path | |
| return _find_path_grid(sx, sy, tx, ty, blocked_rects=blocked_rects) | |
| def _find_path_grid( | |
| sx: float, | |
| sy: float, | |
| tx: float, | |
| ty: float, | |
| *, | |
| blocked_rects: Optional[list[tuple[float, float, float, float]]] = None, | |
| ) -> Optional[list[tuple[float, float]]]: | |
| """Grid A* path (fallback when nav graph unavailable). Capped at _GRID_ASTAR_MAX_NODES.""" | |
| si, sj = _to_cell(sx, sy) | |
| ti, tj = _to_cell(tx, ty) | |
| if not _cell_walkable(si, sj, blocked_rects) or not _cell_walkable(ti, tj, blocked_rects): | |
| return None | |
| if si == ti and sj == tj: | |
| return [(tx, ty)] | |
| def heuristic(i: int, j: int) -> float: | |
| return ((ti - i) ** 2 + (tj - j) ** 2) ** 0.5 | |
| counter = 0 | |
| nodes_expanded = 0 | |
| open_set: list[tuple[float, int, int, int, float, Optional[tuple[int, int]]]] = [] | |
| heapq.heappush(open_set, (heuristic(si, sj), counter, si, sj, 0.0, None)) | |
| counter += 1 | |
| best_g: dict[tuple[int, int], float] = {(si, sj): 0.0} | |
| came_from: dict[tuple[int, int], Optional[tuple[int, int]]] = {(si, sj): None} | |
| while open_set and nodes_expanded < _GRID_ASTAR_MAX_NODES: | |
| f, _, ci, cj, g, _ = heapq.heappop(open_set) | |
| if g > best_g.get((ci, cj), float("inf")): | |
| continue | |
| nodes_expanded += 1 | |
| if ci == ti and cj == tj: | |
| cells: list[tuple[int, int]] = [] | |
| cur: Optional[tuple[int, int]] = (ci, cj) | |
| while cur is not None: | |
| cells.append(cur) | |
| cur = came_from[cur] | |
| cells.reverse() | |
| waypoints = [_cell_center(i, j) for i, j in cells[1:]] | |
| if waypoints: | |
| waypoints[-1] = (tx, ty) | |
| else: | |
| waypoints = [(tx, ty)] | |
| return waypoints | |
| for ni, nj, cost in _neighbors(ci, cj, blocked_rects): | |
| new_g = g + cost | |
| if new_g < best_g.get((ni, nj), float("inf")): | |
| best_g[(ni, nj)] = new_g | |
| came_from[(ni, nj)] = (ci, cj) | |
| f_new = new_g + heuristic(ni, nj) | |
| heapq.heappush(open_set, (f_new, counter, ni, nj, new_g, (ci, cj))) | |
| counter += 1 | |
| return None | |
| def nearest_walkable_navpoint( | |
| x: float, | |
| y: float, | |
| *, | |
| blocked_rects: Optional[list[tuple[float, float, float, float]]] = None, | |
| ) -> tuple[float, float]: | |
| """Return the nearest nav point that is walkable (not inside any building).""" | |
| nav = _load_nav_graph() | |
| if nav is not None: | |
| points, _ = nav | |
| bkey = _buildings_key(blocked_rects) | |
| valid = _nav_valid_set(points, blocked_rects, bkey) | |
| best_i = _nearest_nav_index(x, y, points, valid) | |
| if best_i >= 0: | |
| return points[best_i] | |
| return snap_to_walkable(x, y, blocked_rects=blocked_rects) | |
| def snap_to_walkable( | |
| x: float, | |
| y: float, | |
| *, | |
| blocked_rects: Optional[list[tuple[float, float, float, float]]] = None, | |
| ) -> tuple[float, float]: | |
| """Return nearest walkable point to (x,y).""" | |
| if is_walkable(x, y, blocked_rects=blocked_rects): | |
| return (x, y) | |
| best = (x, y) | |
| best_d = 1e9 | |
| for radius in [0.5, 1.0, 1.5, 2.0, 3.0, 4.0, 5.0]: | |
| for dx in [-radius, -radius * 0.5, 0, radius * 0.5, radius]: | |
| for dy in [-radius, -radius * 0.5, 0, radius * 0.5, radius]: | |
| if dx == 0 and dy == 0: | |
| continue | |
| nx = max(0, min(MAP_WIDTH, x + dx)) | |
| ny = max(0, min(MAP_HEIGHT, y + dy)) | |
| if is_walkable(nx, ny, blocked_rects=blocked_rects): | |
| d = (nx - x) ** 2 + (ny - y) ** 2 | |
| if d < best_d: | |
| best_d = d | |
| best = (nx, ny) | |
| if best_d < 1e9: | |
| break | |
| return best | |
| def invalidate_path_cache() -> None: | |
| """Call when buildings change (placed or destroyed) to flush path and nav-valid caches.""" | |
| _path_cache.clear() | |
| _nav_valid_cache.clear() | |