File size: 6,793 Bytes
b443816 cc298f9 b443816 cc298f9 b443816 cc298f9 b443816 cc298f9 b443816 cc298f9 b443816 cc298f9 b443816 cc298f9 b443816 cc298f9 b443816 cc298f9 b443816 cc298f9 b443816 cc298f9 b443816 cc298f9 b443816 cc298f9 b443816 cc298f9 b443816 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 | """Sample candidate placements along the mainland shore and the Kronshtadt outline.
Two physical sources:
- Mainland coast — `load_shoreline()` (curated north + south mainland LineStrings).
- Kronshtadt outline — `load_kronshtadt_outline()` (extracted from zone outer rings
inside the Kronshtadt bbox; includes adjacent КЗС causeway segments).
`sample_shore_candidates` returns the union of both. Other water-boundary points
(small islands, the dam outside Kronshtadt) are intentionally excluded — placing a
rescue station there is not physically meaningful.
"""
from typing import Sequence
import numpy as np
from scipy.sparse import csr_matrix
from shapely.geometry import LineString, MultiLineString
from shapely.ops import transform
from ..data import load_kronshtadt_outline, load_shoreline
from ..grid import METERS_PER_DEG_LAT, METERS_PER_DEG_LON
from .placement import PlacementSet, attach_travel_times
def _to_meters(geom):
return transform(
lambda lon, lat, z=None: (lon * METERS_PER_DEG_LON, lat * METERS_PER_DEG_LAT),
geom,
)
def _from_meters(x: float, y: float) -> tuple[float, float]:
return y / METERS_PER_DEG_LAT, x / METERS_PER_DEG_LON
def _iter_linestrings(geom) -> list[LineString]:
if isinstance(geom, LineString):
return [geom]
if isinstance(geom, MultiLineString):
return list(geom.geoms)
if hasattr(geom, "geoms"):
out = []
for sub in geom.geoms:
out.extend(_iter_linestrings(sub))
return out
raise TypeError(f"unsupported boundary geometry: {type(geom).__name__}")
def _sample_along(
geom,
step_m: float,
min_segment_m: float = 200.0,
) -> tuple[np.ndarray, np.ndarray]:
"""Walk every linestring in `geom` (lon/lat) and emit a point every step_m meters."""
if step_m <= 0:
raise ValueError("step_m must be positive")
geom_m = _to_meters(geom)
lats: list[float] = []
lons: list[float] = []
for line in _iter_linestrings(geom_m):
L = line.length
if L < min_segment_m:
continue
n_steps = max(1, int(np.floor(L / step_m)))
for k in range(n_steps):
pt = line.interpolate(k * step_m)
la, lo = _from_meters(pt.x, pt.y)
lats.append(la)
lons.append(lo)
return np.asarray(lats, dtype=np.float64), np.asarray(lons, dtype=np.float64)
def sample_mainland_points(step_m: float = 300.0) -> tuple[np.ndarray, np.ndarray]:
"""Sample the mainland (`shoreline.geojson`) at constant arclength."""
return _sample_along(load_shoreline(), step_m=step_m)
def sample_kronshtadt_points(step_m: float = 300.0) -> tuple[np.ndarray, np.ndarray]:
"""Sample the Kronshtadt outline (extracted from zone outer rings)."""
return _sample_along(load_kronshtadt_outline(), step_m=step_m)
def _build(
*,
lat: np.ndarray,
lon: np.ndarray,
speed_kmh: float,
label_prefix: str,
graph: csr_matrix,
grid_lats: np.ndarray,
grid_lons: np.ndarray,
exclude_grid_indices: Sequence[int],
) -> PlacementSet:
speed = np.full(len(lat), float(speed_kmh), dtype=np.float64)
labels = [f"{label_prefix}_{i:04d}" for i in range(len(lat))]
placements = attach_travel_times(
lat=lat, lon=lon, speed_kmh=speed, labels=labels,
graph=graph, grid_lats=grid_lats, grid_lons=grid_lons,
)
if len(exclude_grid_indices):
excl = {int(i) for i in exclude_grid_indices}
keep = np.array([int(i) not in excl for i in placements.grid_index], dtype=bool)
placements = _select(placements, keep)
# Dedupe candidates that snapped to the same grid cell — keep first
_, first_idx = np.unique(placements.grid_index, return_index=True)
first_idx = np.sort(first_idx)
if len(first_idx) != placements.K:
placements = _select_by_index(placements, first_idx)
return placements
def _select(p: PlacementSet, mask: np.ndarray) -> PlacementSet:
return PlacementSet(
lat=p.lat[mask], lon=p.lon[mask], speed_kmh=p.speed_kmh[mask],
grid_index=p.grid_index[mask], travel_times=p.travel_times[mask],
labels=[lbl for lbl, k in zip(p.labels, mask) if k],
)
def _select_by_index(p: PlacementSet, idx: np.ndarray) -> PlacementSet:
return PlacementSet(
lat=p.lat[idx], lon=p.lon[idx], speed_kmh=p.speed_kmh[idx],
grid_index=p.grid_index[idx], travel_times=p.travel_times[idx],
labels=[p.labels[int(i)] for i in idx],
)
def _concat(a: PlacementSet, b: PlacementSet) -> PlacementSet:
return PlacementSet(
lat=np.concatenate([a.lat, b.lat]),
lon=np.concatenate([a.lon, b.lon]),
speed_kmh=np.concatenate([a.speed_kmh, b.speed_kmh]),
grid_index=np.concatenate([a.grid_index, b.grid_index]),
travel_times=np.concatenate([a.travel_times, b.travel_times], axis=0),
labels=list(a.labels) + list(b.labels),
)
def sample_mainland_candidates(
*,
step_m: float = 300.0,
speed_kmh: float = 40.0,
graph: csr_matrix,
grid_lats: np.ndarray,
grid_lons: np.ndarray,
exclude_grid_indices: Sequence[int] = (),
) -> PlacementSet:
lat, lon = sample_mainland_points(step_m=step_m)
return _build(
lat=lat, lon=lon, speed_kmh=speed_kmh, label_prefix="main",
graph=graph, grid_lats=grid_lats, grid_lons=grid_lons,
exclude_grid_indices=exclude_grid_indices,
)
def sample_kronshtadt_candidates(
*,
step_m: float = 300.0,
speed_kmh: float = 40.0,
graph: csr_matrix,
grid_lats: np.ndarray,
grid_lons: np.ndarray,
exclude_grid_indices: Sequence[int] = (),
) -> PlacementSet:
lat, lon = sample_kronshtadt_points(step_m=step_m)
return _build(
lat=lat, lon=lon, speed_kmh=speed_kmh, label_prefix="kron",
graph=graph, grid_lats=grid_lats, grid_lons=grid_lons,
exclude_grid_indices=exclude_grid_indices,
)
def sample_shore_candidates(
*,
step_m: float = 300.0,
speed_kmh: float = 40.0,
graph: csr_matrix,
grid_lats: np.ndarray,
grid_lons: np.ndarray,
exclude_grid_indices: Sequence[int] = (),
) -> PlacementSet:
"""Mainland coast + Kronshtadt outline, both at the same `step_m`."""
mainland = sample_mainland_candidates(
step_m=step_m, speed_kmh=speed_kmh,
graph=graph, grid_lats=grid_lats, grid_lons=grid_lons,
exclude_grid_indices=exclude_grid_indices,
)
kron = sample_kronshtadt_candidates(
step_m=step_m, speed_kmh=speed_kmh,
graph=graph, grid_lats=grid_lats, grid_lons=grid_lons,
exclude_grid_indices=tuple(int(i) for i in exclude_grid_indices)
+ tuple(int(i) for i in mainland.grid_index),
)
return _concat(mainland, kron)
|