File size: 4,281 Bytes
a789c3a ab91b06 a789c3a ab91b06 a789c3a ab91b06 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 | """Coverage analysis: aggregate reachability into actionable metrics."""
import numpy as np
def coverage_curve(
min_times: np.ndarray,
max_time: float = 60.0,
step: float = 0.5,
) -> tuple[np.ndarray, np.ndarray]:
"""Cumulative coverage: % of area reachable within T minutes.
Returns
-------
thresholds : ndarray — time values in minutes
coverage_pct : ndarray — % of reachable cells covered at each threshold
"""
reachable = min_times[np.isfinite(min_times)]
thresholds = np.arange(0, max_time + step, step)
coverage_pct = np.array(
[(reachable <= t).sum() / len(reachable) * 100 for t in thresholds]
)
return thresholds, coverage_pct
def coverage_at_thresholds(
min_times: np.ndarray,
thresholds: list[float] = [5, 10, 15, 20, 25, 30],
) -> list[tuple[float, float]]:
"""Coverage % at specific time thresholds.
Returns list of (threshold_min, coverage_pct).
"""
reachable = min_times[np.isfinite(min_times)]
n = len(reachable)
return [(t, (reachable <= t).sum() / n * 100) for t in thresholds]
def weighted_coverage_curve(
min_times: np.ndarray,
weights: np.ndarray,
max_time: float = 60.0,
step: float = 0.5,
) -> tuple[np.ndarray, np.ndarray]:
"""Cumulative coverage of incident probability within T minutes."""
times = np.asarray(min_times, dtype=np.float64)
w = _normalized_weights(weights, len(times))
thresholds = np.arange(0, max_time + step, step)
coverage_pct = np.array(
[
w[np.isfinite(times) & (times <= t)].sum() * 100
for t in thresholds
]
)
return thresholds, coverage_pct
def weighted_coverage_at_thresholds(
min_times: np.ndarray,
weights: np.ndarray,
thresholds: list[float] = [5, 10, 15, 20, 25, 30],
) -> list[tuple[float, float]]:
"""Incident probability covered within specific time thresholds."""
times = np.asarray(min_times, dtype=np.float64)
w = _normalized_weights(weights, len(times))
return [
(t, float(w[np.isfinite(times) & (times <= t)].sum() * 100))
for t in thresholds
]
def station_zones(
travel_times: np.ndarray,
min_times: np.ndarray,
) -> tuple[np.ndarray, list[tuple[int, int]]]:
"""Responsibility zones: which station is nearest to each cell.
Returns
-------
assignments : ndarray of shape (n_cells,)
Station index for each cell (-1 if unreachable).
zone_sizes : list of (station_idx, cell_count)
Number of cells assigned to each station, sorted descending.
"""
assignments = np.argmin(travel_times, axis=0)
assignments[~np.isfinite(min_times)] = -1
n_stations = travel_times.shape[0]
zone_sizes = []
for s in range(n_stations):
count = (assignments == s).sum()
zone_sizes.append((s, int(count)))
zone_sizes.sort(key=lambda x: x[1], reverse=True)
return assignments, zone_sizes
def weighted_station_zones(
travel_times: np.ndarray,
min_times: np.ndarray,
weights: np.ndarray,
) -> tuple[np.ndarray, list[tuple[int, float]]]:
"""Responsibility zones weighted by incident probability."""
assignments = np.argmin(travel_times, axis=0)
assignments[~np.isfinite(min_times)] = -1
w = _normalized_weights(weights, travel_times.shape[1])
zone_weights = []
for s in range(travel_times.shape[0]):
zone_weights.append((s, float(w[assignments == s].sum())))
zone_weights.sort(key=lambda x: x[1], reverse=True)
return assignments, zone_weights
def blind_spots(
min_times: np.ndarray,
threshold_min: float = 20.0,
) -> np.ndarray:
"""Indices of cells with response time above threshold (or unreachable)."""
return np.where((min_times > threshold_min) | ~np.isfinite(min_times))[0]
def _normalized_weights(weights: np.ndarray, n: int) -> np.ndarray:
w = np.asarray(weights, dtype=np.float64)
if w.shape != (n,):
raise ValueError("weights must have the same length as min_times")
if np.any(~np.isfinite(w)) or np.any(w < 0):
raise ValueError("weights must be finite and nonnegative")
total = w.sum()
if total <= 0:
raise ValueError("weights must have positive total mass")
return w / total
|