Mike
feat: vectorised graph build
ab42110
"""Coverage analysis: aggregate reachability into actionable metrics."""
import numpy as np
def coverage_curve(
min_times: np.ndarray,
max_time: float = 60.0,
step: float = 0.5,
) -> tuple[np.ndarray, np.ndarray]:
"""Cumulative coverage: % of area reachable within T minutes.
Returns
-------
thresholds : ndarray — time values in minutes
coverage_pct : ndarray — % of reachable cells covered at each threshold
"""
reachable = min_times[np.isfinite(min_times)]
thresholds = np.arange(0, max_time + step, step)
coverage_pct = np.array(
[(reachable <= t).sum() / len(reachable) * 100 for t in thresholds]
)
return thresholds, coverage_pct
def coverage_at_thresholds(
min_times: np.ndarray,
thresholds: list[float] = [5, 10, 15, 20, 25, 30],
) -> list[tuple[float, float]]:
"""Coverage % at specific time thresholds.
Returns list of (threshold_min, coverage_pct).
"""
reachable = min_times[np.isfinite(min_times)]
n = len(reachable)
return [(t, (reachable <= t).sum() / n * 100) for t in thresholds]
def weighted_coverage_curve(
min_times: np.ndarray,
weights: np.ndarray,
max_time: float = 60.0,
step: float = 0.5,
) -> tuple[np.ndarray, np.ndarray]:
"""Cumulative coverage of incident probability within T minutes."""
times = np.asarray(min_times, dtype=np.float64)
w = _normalized_weights(weights, len(times))
thresholds = np.arange(0, max_time + step, step)
coverage_pct = np.array(
[
w[np.isfinite(times) & (times <= t)].sum() * 100
for t in thresholds
]
)
return thresholds, coverage_pct
def weighted_coverage_at_thresholds(
min_times: np.ndarray,
weights: np.ndarray,
thresholds: list[float] = [5, 10, 15, 20, 25, 30],
) -> list[tuple[float, float]]:
"""Incident probability covered within specific time thresholds."""
times = np.asarray(min_times, dtype=np.float64)
w = _normalized_weights(weights, len(times))
return [
(t, float(w[np.isfinite(times) & (times <= t)].sum() * 100))
for t in thresholds
]
def station_zones(
travel_times: np.ndarray,
min_times: np.ndarray,
) -> tuple[np.ndarray, list[tuple[int, int]]]:
"""Responsibility zones: which station is nearest to each cell.
Returns
-------
assignments : ndarray of shape (n_cells,)
Station index for each cell (-1 if unreachable).
zone_sizes : list of (station_idx, cell_count)
Number of cells assigned to each station, sorted descending.
"""
assignments = np.argmin(travel_times, axis=0)
assignments[~np.isfinite(min_times)] = -1
n_stations = travel_times.shape[0]
zone_sizes = []
for s in range(n_stations):
count = (assignments == s).sum()
zone_sizes.append((s, int(count)))
zone_sizes.sort(key=lambda x: x[1], reverse=True)
return assignments, zone_sizes
def weighted_station_zones(
travel_times: np.ndarray,
min_times: np.ndarray,
weights: np.ndarray,
) -> tuple[np.ndarray, list[tuple[int, float]]]:
"""Responsibility zones weighted by incident probability."""
assignments = np.argmin(travel_times, axis=0)
assignments[~np.isfinite(min_times)] = -1
w = _normalized_weights(weights, travel_times.shape[1])
zone_weights = []
for s in range(travel_times.shape[0]):
zone_weights.append((s, float(w[assignments == s].sum())))
zone_weights.sort(key=lambda x: x[1], reverse=True)
return assignments, zone_weights
def blind_spots(
min_times: np.ndarray,
threshold_min: float = 20.0,
) -> np.ndarray:
"""Indices of cells with response time above threshold (or unreachable)."""
return np.where((min_times > threshold_min) | ~np.isfinite(min_times))[0]
def _normalized_weights(weights: np.ndarray, n: int) -> np.ndarray:
w = np.asarray(weights, dtype=np.float64)
if w.shape != (n,):
raise ValueError("weights must have the same length as min_times")
if np.any(~np.isfinite(w)) or np.any(w < 0):
raise ValueError("weights must be finite and nonnegative")
total = w.sum()
if total <= 0:
raise ValueError("weights must have positive total mass")
return w / total