| """Sample candidate placements along the mainland shore and the Kronshtadt outline. |
| |
| Two physical sources: |
| - Mainland coast — `load_shoreline()` (curated north + south mainland LineStrings). |
| - Kronshtadt outline — `load_kronshtadt_outline()` (extracted from zone outer rings |
| inside the Kronshtadt bbox; includes adjacent КЗС causeway segments). |
| |
| `sample_shore_candidates` returns the union of both. Other water-boundary points |
| (small islands, the dam outside Kronshtadt) are intentionally excluded — placing a |
| rescue station there is not physically meaningful. |
| """ |
|
|
| from typing import Sequence |
|
|
| import numpy as np |
| from scipy.sparse import csr_matrix |
| from shapely.geometry import LineString, MultiLineString |
| from shapely.ops import transform |
|
|
| from ..data import load_kronshtadt_outline, load_shoreline |
| from ..grid import METERS_PER_DEG_LAT, METERS_PER_DEG_LON |
| from .placement import PlacementSet, attach_travel_times |
|
|
|
|
| def _to_meters(geom): |
| return transform( |
| lambda lon, lat, z=None: (lon * METERS_PER_DEG_LON, lat * METERS_PER_DEG_LAT), |
| geom, |
| ) |
|
|
|
|
| def _from_meters(x: float, y: float) -> tuple[float, float]: |
| return y / METERS_PER_DEG_LAT, x / METERS_PER_DEG_LON |
|
|
|
|
| def _iter_linestrings(geom) -> list[LineString]: |
| if isinstance(geom, LineString): |
| return [geom] |
| if isinstance(geom, MultiLineString): |
| return list(geom.geoms) |
| if hasattr(geom, "geoms"): |
| out = [] |
| for sub in geom.geoms: |
| out.extend(_iter_linestrings(sub)) |
| return out |
| raise TypeError(f"unsupported boundary geometry: {type(geom).__name__}") |
|
|
|
|
| def _sample_along( |
| geom, |
| step_m: float, |
| min_segment_m: float = 200.0, |
| ) -> tuple[np.ndarray, np.ndarray]: |
| """Walk every linestring in `geom` (lon/lat) and emit a point every step_m meters.""" |
| if step_m <= 0: |
| raise ValueError("step_m must be positive") |
|
|
| geom_m = _to_meters(geom) |
| lats: list[float] = [] |
| lons: list[float] = [] |
| for line in _iter_linestrings(geom_m): |
| L = line.length |
| if L < min_segment_m: |
| continue |
| n_steps = max(1, int(np.floor(L / step_m))) |
| for k in range(n_steps): |
| pt = line.interpolate(k * step_m) |
| la, lo = _from_meters(pt.x, pt.y) |
| lats.append(la) |
| lons.append(lo) |
| return np.asarray(lats, dtype=np.float64), np.asarray(lons, dtype=np.float64) |
|
|
|
|
| def sample_mainland_points(step_m: float = 300.0) -> tuple[np.ndarray, np.ndarray]: |
| """Sample the mainland (`shoreline.geojson`) at constant arclength.""" |
| return _sample_along(load_shoreline(), step_m=step_m) |
|
|
|
|
| def sample_kronshtadt_points(step_m: float = 300.0) -> tuple[np.ndarray, np.ndarray]: |
| """Sample the Kronshtadt outline (extracted from zone outer rings).""" |
| return _sample_along(load_kronshtadt_outline(), step_m=step_m) |
|
|
|
|
| def _build( |
| *, |
| lat: np.ndarray, |
| lon: np.ndarray, |
| speed_kmh: float, |
| label_prefix: str, |
| graph: csr_matrix, |
| grid_lats: np.ndarray, |
| grid_lons: np.ndarray, |
| exclude_grid_indices: Sequence[int], |
| ) -> PlacementSet: |
| speed = np.full(len(lat), float(speed_kmh), dtype=np.float64) |
| labels = [f"{label_prefix}_{i:04d}" for i in range(len(lat))] |
| placements = attach_travel_times( |
| lat=lat, lon=lon, speed_kmh=speed, labels=labels, |
| graph=graph, grid_lats=grid_lats, grid_lons=grid_lons, |
| ) |
| if len(exclude_grid_indices): |
| excl = {int(i) for i in exclude_grid_indices} |
| keep = np.array([int(i) not in excl for i in placements.grid_index], dtype=bool) |
| placements = _select(placements, keep) |
| |
| _, first_idx = np.unique(placements.grid_index, return_index=True) |
| first_idx = np.sort(first_idx) |
| if len(first_idx) != placements.K: |
| placements = _select_by_index(placements, first_idx) |
| return placements |
|
|
|
|
| def _select(p: PlacementSet, mask: np.ndarray) -> PlacementSet: |
| return PlacementSet( |
| lat=p.lat[mask], lon=p.lon[mask], speed_kmh=p.speed_kmh[mask], |
| grid_index=p.grid_index[mask], travel_times=p.travel_times[mask], |
| labels=[lbl for lbl, k in zip(p.labels, mask) if k], |
| ) |
|
|
|
|
| def _select_by_index(p: PlacementSet, idx: np.ndarray) -> PlacementSet: |
| return PlacementSet( |
| lat=p.lat[idx], lon=p.lon[idx], speed_kmh=p.speed_kmh[idx], |
| grid_index=p.grid_index[idx], travel_times=p.travel_times[idx], |
| labels=[p.labels[int(i)] for i in idx], |
| ) |
|
|
|
|
| def _concat(a: PlacementSet, b: PlacementSet) -> PlacementSet: |
| return PlacementSet( |
| lat=np.concatenate([a.lat, b.lat]), |
| lon=np.concatenate([a.lon, b.lon]), |
| speed_kmh=np.concatenate([a.speed_kmh, b.speed_kmh]), |
| grid_index=np.concatenate([a.grid_index, b.grid_index]), |
| travel_times=np.concatenate([a.travel_times, b.travel_times], axis=0), |
| labels=list(a.labels) + list(b.labels), |
| ) |
|
|
|
|
| def sample_mainland_candidates( |
| *, |
| step_m: float = 300.0, |
| speed_kmh: float = 40.0, |
| graph: csr_matrix, |
| grid_lats: np.ndarray, |
| grid_lons: np.ndarray, |
| exclude_grid_indices: Sequence[int] = (), |
| ) -> PlacementSet: |
| lat, lon = sample_mainland_points(step_m=step_m) |
| return _build( |
| lat=lat, lon=lon, speed_kmh=speed_kmh, label_prefix="main", |
| graph=graph, grid_lats=grid_lats, grid_lons=grid_lons, |
| exclude_grid_indices=exclude_grid_indices, |
| ) |
|
|
|
|
| def sample_kronshtadt_candidates( |
| *, |
| step_m: float = 300.0, |
| speed_kmh: float = 40.0, |
| graph: csr_matrix, |
| grid_lats: np.ndarray, |
| grid_lons: np.ndarray, |
| exclude_grid_indices: Sequence[int] = (), |
| ) -> PlacementSet: |
| lat, lon = sample_kronshtadt_points(step_m=step_m) |
| return _build( |
| lat=lat, lon=lon, speed_kmh=speed_kmh, label_prefix="kron", |
| graph=graph, grid_lats=grid_lats, grid_lons=grid_lons, |
| exclude_grid_indices=exclude_grid_indices, |
| ) |
|
|
|
|
| def sample_shore_candidates( |
| *, |
| step_m: float = 300.0, |
| speed_kmh: float = 40.0, |
| graph: csr_matrix, |
| grid_lats: np.ndarray, |
| grid_lons: np.ndarray, |
| exclude_grid_indices: Sequence[int] = (), |
| ) -> PlacementSet: |
| """Mainland coast + Kronshtadt outline, both at the same `step_m`.""" |
| mainland = sample_mainland_candidates( |
| step_m=step_m, speed_kmh=speed_kmh, |
| graph=graph, grid_lats=grid_lats, grid_lons=grid_lons, |
| exclude_grid_indices=exclude_grid_indices, |
| ) |
| kron = sample_kronshtadt_candidates( |
| step_m=step_m, speed_kmh=speed_kmh, |
| graph=graph, grid_lats=grid_lats, grid_lons=grid_lons, |
| exclude_grid_indices=tuple(int(i) for i in exclude_grid_indices) |
| + tuple(int(i) for i in mainland.grid_index), |
| ) |
| return _concat(mainland, kron) |
|
|