| """Coverage analysis: aggregate reachability into actionable metrics.""" |
|
|
| import numpy as np |
|
|
|
|
| def coverage_curve( |
| min_times: np.ndarray, |
| max_time: float = 60.0, |
| step: float = 0.5, |
| ) -> tuple[np.ndarray, np.ndarray]: |
| """Cumulative coverage: % of area reachable within T minutes. |
| |
| Returns |
| ------- |
| thresholds : ndarray — time values in minutes |
| coverage_pct : ndarray — % of reachable cells covered at each threshold |
| """ |
| reachable = min_times[np.isfinite(min_times)] |
| thresholds = np.arange(0, max_time + step, step) |
| coverage_pct = np.array( |
| [(reachable <= t).sum() / len(reachable) * 100 for t in thresholds] |
| ) |
| return thresholds, coverage_pct |
|
|
|
|
| def coverage_at_thresholds( |
| min_times: np.ndarray, |
| thresholds: list[float] = [5, 10, 15, 20, 25, 30], |
| ) -> list[tuple[float, float]]: |
| """Coverage % at specific time thresholds. |
| |
| Returns list of (threshold_min, coverage_pct). |
| """ |
| reachable = min_times[np.isfinite(min_times)] |
| n = len(reachable) |
| return [(t, (reachable <= t).sum() / n * 100) for t in thresholds] |
|
|
|
|
| def weighted_coverage_curve( |
| min_times: np.ndarray, |
| weights: np.ndarray, |
| max_time: float = 60.0, |
| step: float = 0.5, |
| ) -> tuple[np.ndarray, np.ndarray]: |
| """Cumulative coverage of incident probability within T minutes.""" |
| times = np.asarray(min_times, dtype=np.float64) |
| w = _normalized_weights(weights, len(times)) |
| thresholds = np.arange(0, max_time + step, step) |
| coverage_pct = np.array( |
| [ |
| w[np.isfinite(times) & (times <= t)].sum() * 100 |
| for t in thresholds |
| ] |
| ) |
| return thresholds, coverage_pct |
|
|
|
|
| def weighted_coverage_at_thresholds( |
| min_times: np.ndarray, |
| weights: np.ndarray, |
| thresholds: list[float] = [5, 10, 15, 20, 25, 30], |
| ) -> list[tuple[float, float]]: |
| """Incident probability covered within specific time thresholds.""" |
| times = np.asarray(min_times, dtype=np.float64) |
| w = _normalized_weights(weights, len(times)) |
| return [ |
| (t, float(w[np.isfinite(times) & (times <= t)].sum() * 100)) |
| for t in thresholds |
| ] |
|
|
|
|
| def station_zones( |
| travel_times: np.ndarray, |
| min_times: np.ndarray, |
| ) -> tuple[np.ndarray, list[tuple[int, int]]]: |
| """Responsibility zones: which station is nearest to each cell. |
| |
| Returns |
| ------- |
| assignments : ndarray of shape (n_cells,) |
| Station index for each cell (-1 if unreachable). |
| zone_sizes : list of (station_idx, cell_count) |
| Number of cells assigned to each station, sorted descending. |
| """ |
| assignments = np.argmin(travel_times, axis=0) |
| assignments[~np.isfinite(min_times)] = -1 |
|
|
| n_stations = travel_times.shape[0] |
| zone_sizes = [] |
| for s in range(n_stations): |
| count = (assignments == s).sum() |
| zone_sizes.append((s, int(count))) |
| zone_sizes.sort(key=lambda x: x[1], reverse=True) |
|
|
| return assignments, zone_sizes |
|
|
|
|
| def weighted_station_zones( |
| travel_times: np.ndarray, |
| min_times: np.ndarray, |
| weights: np.ndarray, |
| ) -> tuple[np.ndarray, list[tuple[int, float]]]: |
| """Responsibility zones weighted by incident probability.""" |
| assignments = np.argmin(travel_times, axis=0) |
| assignments[~np.isfinite(min_times)] = -1 |
|
|
| w = _normalized_weights(weights, travel_times.shape[1]) |
| zone_weights = [] |
| for s in range(travel_times.shape[0]): |
| zone_weights.append((s, float(w[assignments == s].sum()))) |
| zone_weights.sort(key=lambda x: x[1], reverse=True) |
|
|
| return assignments, zone_weights |
|
|
|
|
| def blind_spots( |
| min_times: np.ndarray, |
| threshold_min: float = 20.0, |
| ) -> np.ndarray: |
| """Indices of cells with response time above threshold (or unreachable).""" |
| return np.where((min_times > threshold_min) | ~np.isfinite(min_times))[0] |
|
|
|
|
| def _normalized_weights(weights: np.ndarray, n: int) -> np.ndarray: |
| w = np.asarray(weights, dtype=np.float64) |
| if w.shape != (n,): |
| raise ValueError("weights must have the same length as min_times") |
| if np.any(~np.isfinite(w)) or np.any(w < 0): |
| raise ValueError("weights must be finite and nonnegative") |
| total = w.sum() |
| if total <= 0: |
| raise ValueError("weights must have positive total mass") |
| return w / total |
|
|