rescue-stations-placement / src /risk_distribution.py
Mike
feat: added fairway paramentization
cf5318d
"""Incident risk distributions over the water grid."""
from dataclasses import dataclass
from math import cos, radians, sin
from typing import Any, Callable
import numpy as np
from shapely.geometry import LineString, Point
from shapely.ops import transform
from .grid import METERS_PER_DEG_LAT, METERS_PER_DEG_LON
@dataclass
class GaussianKernel:
lat: float
lon: float
weight: float = 1.0
sigma_m: float = 1000.0
@dataclass
class AnisotropicGaussianKernel:
lat: float
lon: float
weight: float = 1.0
sigma_x_m: float = 1000.0
sigma_y_m: float = 500.0
angle_deg: float = 0.0
@dataclass
class IncidentDistribution:
"""Discrete approximation of lambda, q, and Q on the app grid."""
name: str
lats: np.ndarray
lons: np.ndarray
lambda_values: np.ndarray
q_values: np.ndarray
weights: np.ndarray
@classmethod
def from_intensity(
cls,
name: str,
lats: np.ndarray,
lons: np.ndarray,
lambda_values: np.ndarray,
cell_areas: np.ndarray | None = None,
) -> "IncidentDistribution":
q_values = intensity_to_density(lambda_values, cell_areas=cell_areas)
weights = density_to_weights(q_values, cell_areas=cell_areas)
return cls(
name=name,
lats=lats,
lons=lons,
lambda_values=np.asarray(lambda_values, dtype=np.float64),
q_values=q_values,
weights=weights,
)
@classmethod
def from_scenario(
cls,
name: str,
lats: np.ndarray,
lons: np.ndarray,
scenarios: dict[str, Any],
water_polygon=None,
shoreline=None,
cell_areas: np.ndarray | None = None,
) -> "IncidentDistribution":
lambda_values = scenario_intensity(
lats,
lons,
name,
scenarios,
water_polygon=water_polygon,
shoreline=shoreline,
cell_areas=cell_areas,
)
return cls.from_intensity(name, lats, lons, lambda_values, cell_areas=cell_areas)
def sample(self, n: int, rng: int | np.random.Generator | None = None) -> np.ndarray:
"""Sample grid cell indices from Q."""
generator = _as_generator(rng)
return generator.choice(len(self.weights), size=n, replace=True, p=self.weights)
def sample_points(
self,
n: int,
rng: int | np.random.Generator | None = None,
) -> tuple[np.ndarray, np.ndarray]:
"""Sample incident coordinates as (lats, lons)."""
indices = self.sample(n, rng=rng)
return self.lats[indices], self.lons[indices]
def probability(self, mask: np.ndarray) -> float:
"""Return Q(mask)."""
return float(self.weights[np.asarray(mask, dtype=bool)].sum())
def expected_time(self, min_times: np.ndarray, finite_only: bool = False) -> float:
"""Expected response time under Q.
If ``finite_only`` is false, positive probability of unreachable cells
makes the expectation infinite. If it is true, Q is renormalized over
finite cells before averaging.
"""
times = np.asarray(min_times, dtype=np.float64)
if finite_only:
mask = np.isfinite(times)
mass = self.weights[mask].sum()
if mass <= 0:
return float("inf")
return float(np.sum(self.weights[mask] * times[mask]) / mass)
if np.any((~np.isfinite(times)) & (self.weights > 0)):
return float("inf")
return float(np.sum(self.weights * times))
def expected_survival(
self,
min_times: np.ndarray,
survival_fn: Callable[[np.ndarray], np.ndarray],
) -> float:
"""Expected rescue success probability under Q."""
times = np.asarray(min_times, dtype=np.float64)
values = np.zeros_like(times, dtype=np.float64)
mask = np.isfinite(times)
values[mask] = survival_fn(times[mask])
return float(np.sum(self.weights * values))
def gaussian_kernel(
lats: np.ndarray,
lons: np.ndarray,
kernel: GaussianKernel,
) -> np.ndarray:
"""Evaluate one isotropic Gaussian risk kernel on the grid."""
if kernel.sigma_m <= 0:
raise ValueError("sigma_m must be positive")
dx, dy = coordinate_offsets_m(lats, lons, kernel.lat, kernel.lon)
dist_sq = dx * dx + dy * dy
return kernel.weight * np.exp(-dist_sq / (2.0 * kernel.sigma_m**2))
def anisotropic_gaussian_kernel(
lats: np.ndarray,
lons: np.ndarray,
kernel: AnisotropicGaussianKernel,
) -> np.ndarray:
"""Evaluate one rotated anisotropic Gaussian risk kernel on the grid."""
if kernel.sigma_x_m <= 0 or kernel.sigma_y_m <= 0:
raise ValueError("sigma_x_m and sigma_y_m must be positive")
dx, dy = coordinate_offsets_m(lats, lons, kernel.lat, kernel.lon)
angle = radians(kernel.angle_deg)
c, s = cos(angle), sin(angle)
x_rot = c * dx + s * dy
y_rot = -s * dx + c * dy
exponent = (x_rot / kernel.sigma_x_m) ** 2 + (y_rot / kernel.sigma_y_m) ** 2
return kernel.weight * np.exp(-0.5 * exponent)
def gaussian_mixture(
lats: np.ndarray,
lons: np.ndarray,
kernels: list[GaussianKernel],
) -> np.ndarray:
"""Evaluate a sum of isotropic Gaussian kernels."""
out = np.zeros(len(lats), dtype=np.float64)
for kernel in kernels:
out += gaussian_kernel(lats, lons, kernel)
return out
def anisotropic_gaussian_mixture(
lats: np.ndarray,
lons: np.ndarray,
kernels: list[AnisotropicGaussianKernel],
) -> np.ndarray:
"""Evaluate a sum of anisotropic Gaussian kernels."""
out = np.zeros(len(lats), dtype=np.float64)
for kernel in kernels:
out += anisotropic_gaussian_kernel(lats, lons, kernel)
return out
def shore_distance_intensity(
lats: np.ndarray,
lons: np.ndarray,
shoreline,
sigma_m: float,
) -> np.ndarray:
"""Risk component that decays with distance from the real shoreline."""
if shoreline is None:
raise ValueError("shoreline is required for shore_distance components")
if sigma_m <= 0:
raise ValueError("sigma_m must be positive")
distances = distance_to_geometry_m(lats, lons, shoreline)
return np.exp(-(distances * distances) / (2.0 * sigma_m**2))
def line_distance_mixture(lats: np.ndarray, lons: np.ndarray, lines: list[dict]) -> np.ndarray:
"""Evaluate line-based components with per-line weight and sigma."""
out = np.zeros(len(lats), dtype=np.float64)
for line in lines:
sigma_m = float(line["sigma_m"])
if sigma_m <= 0:
raise ValueError("line sigma_m must be positive")
points = line["points"]
geometry = LineString([(p["lon"], p["lat"]) for p in points])
distances = distance_to_geometry_m(lats, lons, geometry)
out += float(line.get("weight", 1.0)) * np.exp(
-(distances * distances) / (2.0 * sigma_m**2)
)
return out
def coordinate_offsets_m(
lats: np.ndarray,
lons: np.ndarray,
center_lat: float,
center_lon: float,
) -> tuple[np.ndarray, np.ndarray]:
"""Approximate coordinate offsets from a center in meters."""
dx = (np.asarray(lons, dtype=np.float64) - center_lon) * METERS_PER_DEG_LON
dy = (np.asarray(lats, dtype=np.float64) - center_lat) * METERS_PER_DEG_LAT
return dx, dy
def distance_to_geometry_m(lats: np.ndarray, lons: np.ndarray, geometry) -> np.ndarray:
"""Approximate point-to-geometry distances in meters."""
geometry_m = transform(_lonlat_to_xy_m, geometry)
out = np.empty(len(lats), dtype=np.float64)
for i, (lat, lon) in enumerate(zip(lats, lons)):
out[i] = geometry_m.distance(Point(lon * METERS_PER_DEG_LON, lat * METERS_PER_DEG_LAT))
return out
def intensity_to_density(
lambda_values: np.ndarray,
cell_areas: np.ndarray | None = None,
) -> np.ndarray:
"""Normalize lambda values into grid density q."""
values = _validated_nonnegative(lambda_values, name="lambda_values")
if cell_areas is None:
total = values.sum()
else:
areas = _validated_cell_areas(cell_areas, len(values))
total = np.sum(values * areas)
if total <= 0:
raise ValueError("intensity must have positive total mass")
return values / total
def density_to_weights(
q_values: np.ndarray,
cell_areas: np.ndarray | None = None,
) -> np.ndarray:
"""Convert grid density q into discrete probability weights Q({x_j})."""
q = _validated_nonnegative(q_values, name="q_values")
if cell_areas is None:
weights = q.copy()
else:
areas = _validated_cell_areas(cell_areas, len(q))
weights = q * areas
total = weights.sum()
if total <= 0:
raise ValueError("density must have positive total mass")
return weights / total
def intensity_to_weights(
lambda_values: np.ndarray,
cell_areas: np.ndarray | None = None,
) -> np.ndarray:
"""Direct lambda -> Q helper."""
return density_to_weights(
intensity_to_density(lambda_values, cell_areas=cell_areas),
cell_areas=cell_areas,
)
def scenario_intensity(
lats: np.ndarray,
lons: np.ndarray,
name: str,
scenarios: dict[str, Any],
water_polygon=None,
shoreline=None,
cell_areas: np.ndarray | None = None,
_stack: tuple[str, ...] = (),
) -> np.ndarray:
"""Build lambda values for a named configured scenario."""
if name in _stack:
cycle = " -> ".join([*_stack, name])
raise ValueError(f"cyclic scenario reference: {cycle}")
if name not in scenarios:
raise KeyError(f"unknown risk scenario: {name}")
cfg = scenarios[name]
kind = cfg.get("kind", "component_mixture")
if kind == "scenario_mixture":
return _scenario_mixture_intensity(
lats,
lons,
cfg,
scenarios,
water_polygon=water_polygon,
shoreline=shoreline,
cell_areas=cell_areas,
stack=(*_stack, name),
)
if kind != "component_mixture":
raise ValueError(f"unsupported scenario kind: {kind}")
return _component_mixture_intensity(
lats,
lons,
cfg,
water_polygon=water_polygon,
shoreline=shoreline,
cell_areas=cell_areas,
)
def component_intensity(
lats: np.ndarray,
lons: np.ndarray,
component: dict[str, Any],
water_polygon=None,
shoreline=None,
) -> np.ndarray:
"""Build raw lambda values for one configured component."""
kind = component.get("kind", "gaussian_mixture")
if kind == "gaussian_mixture":
kernels = [
GaussianKernel(
lat=float(raw["lat"]),
lon=float(raw["lon"]),
weight=float(raw.get("weight", 1.0)),
sigma_m=float(raw.get("sigma_m", component.get("sigma_m", 1000.0))),
)
for raw in component.get("kernels", [])
]
return gaussian_mixture(lats, lons, kernels)
if kind == "anisotropic_gaussian_mixture":
kernels = [
AnisotropicGaussianKernel(
lat=float(raw["lat"]),
lon=float(raw["lon"]),
weight=float(raw.get("weight", 1.0)),
sigma_x_m=float(raw["sigma_x_m"]),
sigma_y_m=float(raw["sigma_y_m"]),
angle_deg=float(raw.get("angle_deg", 0.0)),
)
for raw in component.get("kernels", [])
]
return anisotropic_gaussian_mixture(lats, lons, kernels)
if kind == "shore_distance":
return shore_distance_intensity(
lats,
lons,
shoreline if shoreline is not None else water_polygon.boundary,
sigma_m=float(component["sigma_m"]),
)
if kind == "line_distance":
return line_distance_mixture(lats, lons, component.get("lines", []))
if kind == "uniform":
return np.ones(len(lats), dtype=np.float64)
raise ValueError(f"unsupported risk component kind: {kind}")
def _component_mixture_intensity(
lats: np.ndarray,
lons: np.ndarray,
cfg: dict[str, Any],
water_polygon=None,
shoreline=None,
cell_areas: np.ndarray | None = None,
) -> np.ndarray:
normalize_components = bool(cfg.get("normalize_components", True))
out = np.zeros(len(lats), dtype=np.float64)
for component in cfg.get("components", []):
weight = float(component.get("weight", 1.0))
values = component_intensity(
lats,
lons,
component,
water_polygon=water_polygon,
shoreline=shoreline,
)
if normalize_components:
values = intensity_to_density(values, cell_areas=cell_areas)
out += weight * values
if np.all(out == 0):
raise ValueError("risk scenario produced zero intensity")
return out
def _scenario_mixture_intensity(
lats: np.ndarray,
lons: np.ndarray,
cfg: dict[str, Any],
scenarios: dict[str, Any],
water_polygon=None,
shoreline=None,
cell_areas: np.ndarray | None = None,
stack: tuple[str, ...] = (),
) -> np.ndarray:
out = np.zeros(len(lats), dtype=np.float64)
for item in cfg.get("scenarios", []):
weight = float(item.get("weight", 1.0))
child_name = item["name"]
child_values = scenario_intensity(
lats,
lons,
child_name,
scenarios,
water_polygon=water_polygon,
shoreline=shoreline,
cell_areas=cell_areas,
_stack=stack,
)
child_density = intensity_to_density(child_values, cell_areas=cell_areas)
out += weight * child_density
if np.all(out == 0):
raise ValueError("risk scenario mixture produced zero intensity")
return out
def _validated_nonnegative(values: np.ndarray, name: str) -> np.ndarray:
arr = np.asarray(values, dtype=np.float64)
if arr.ndim != 1:
raise ValueError(f"{name} must be a 1D array")
if np.any(~np.isfinite(arr)):
raise ValueError(f"{name} must be finite")
if np.any(arr < 0):
raise ValueError(f"{name} must be nonnegative")
return arr
def _validated_cell_areas(cell_areas: np.ndarray, n: int) -> np.ndarray:
areas = np.asarray(cell_areas, dtype=np.float64)
if areas.shape != (n,):
raise ValueError("cell_areas must have the same shape as lambda_values")
if np.any(~np.isfinite(areas)) or np.any(areas <= 0):
raise ValueError("cell_areas must be positive and finite")
return areas
def _as_generator(rng: int | np.random.Generator | None) -> np.random.Generator:
if isinstance(rng, np.random.Generator):
return rng
return np.random.default_rng(rng)
def _lonlat_to_xy_m(lon: float, lat: float, z: float | None = None):
x = lon * METERS_PER_DEG_LON
y = lat * METERS_PER_DEG_LAT
if z is None:
return x, y
return x, y, z