File size: 4,281 Bytes
a789c3a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ab91b06
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a789c3a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ab91b06
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a789c3a
 
 
 
 
 
ab91b06
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
"""Coverage analysis: aggregate reachability into actionable metrics."""

import numpy as np


def coverage_curve(
    min_times: np.ndarray,
    max_time: float = 60.0,
    step: float = 0.5,
) -> tuple[np.ndarray, np.ndarray]:
    """Cumulative coverage: % of area reachable within T minutes.

    Returns
    -------
    thresholds : ndarray — time values in minutes
    coverage_pct : ndarray — % of reachable cells covered at each threshold
    """
    reachable = min_times[np.isfinite(min_times)]
    thresholds = np.arange(0, max_time + step, step)
    coverage_pct = np.array(
        [(reachable <= t).sum() / len(reachable) * 100 for t in thresholds]
    )
    return thresholds, coverage_pct


def coverage_at_thresholds(
    min_times: np.ndarray,
    thresholds: list[float] = [5, 10, 15, 20, 25, 30],
) -> list[tuple[float, float]]:
    """Coverage % at specific time thresholds.

    Returns list of (threshold_min, coverage_pct).
    """
    reachable = min_times[np.isfinite(min_times)]
    n = len(reachable)
    return [(t, (reachable <= t).sum() / n * 100) for t in thresholds]


def weighted_coverage_curve(
    min_times: np.ndarray,
    weights: np.ndarray,
    max_time: float = 60.0,
    step: float = 0.5,
) -> tuple[np.ndarray, np.ndarray]:
    """Cumulative coverage of incident probability within T minutes."""
    times = np.asarray(min_times, dtype=np.float64)
    w = _normalized_weights(weights, len(times))
    thresholds = np.arange(0, max_time + step, step)
    coverage_pct = np.array(
        [
            w[np.isfinite(times) & (times <= t)].sum() * 100
            for t in thresholds
        ]
    )
    return thresholds, coverage_pct


def weighted_coverage_at_thresholds(
    min_times: np.ndarray,
    weights: np.ndarray,
    thresholds: list[float] = [5, 10, 15, 20, 25, 30],
) -> list[tuple[float, float]]:
    """Incident probability covered within specific time thresholds."""
    times = np.asarray(min_times, dtype=np.float64)
    w = _normalized_weights(weights, len(times))
    return [
        (t, float(w[np.isfinite(times) & (times <= t)].sum() * 100))
        for t in thresholds
    ]


def station_zones(
    travel_times: np.ndarray,
    min_times: np.ndarray,
) -> tuple[np.ndarray, list[tuple[int, int]]]:
    """Responsibility zones: which station is nearest to each cell.

    Returns
    -------
    assignments : ndarray of shape (n_cells,)
        Station index for each cell (-1 if unreachable).
    zone_sizes : list of (station_idx, cell_count)
        Number of cells assigned to each station, sorted descending.
    """
    assignments = np.argmin(travel_times, axis=0)
    assignments[~np.isfinite(min_times)] = -1

    n_stations = travel_times.shape[0]
    zone_sizes = []
    for s in range(n_stations):
        count = (assignments == s).sum()
        zone_sizes.append((s, int(count)))
    zone_sizes.sort(key=lambda x: x[1], reverse=True)

    return assignments, zone_sizes


def weighted_station_zones(
    travel_times: np.ndarray,
    min_times: np.ndarray,
    weights: np.ndarray,
) -> tuple[np.ndarray, list[tuple[int, float]]]:
    """Responsibility zones weighted by incident probability."""
    assignments = np.argmin(travel_times, axis=0)
    assignments[~np.isfinite(min_times)] = -1

    w = _normalized_weights(weights, travel_times.shape[1])
    zone_weights = []
    for s in range(travel_times.shape[0]):
        zone_weights.append((s, float(w[assignments == s].sum())))
    zone_weights.sort(key=lambda x: x[1], reverse=True)

    return assignments, zone_weights


def blind_spots(
    min_times: np.ndarray,
    threshold_min: float = 20.0,
) -> np.ndarray:
    """Indices of cells with response time above threshold (or unreachable)."""
    return np.where((min_times > threshold_min) | ~np.isfinite(min_times))[0]


def _normalized_weights(weights: np.ndarray, n: int) -> np.ndarray:
    w = np.asarray(weights, dtype=np.float64)
    if w.shape != (n,):
        raise ValueError("weights must have the same length as min_times")
    if np.any(~np.isfinite(w)) or np.any(w < 0):
        raise ValueError("weights must be finite and nonnegative")
    total = w.sum()
    if total <= 0:
        raise ValueError("weights must have positive total mass")
    return w / total