File size: 3,859 Bytes
cc298f9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
31946d6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cc298f9
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
"""Per-cell separable objectives.

All objectives are normalized to MINIMIZATION. A separable objective has the
form F(t) = Σ_j w_j · φ(t_j), with w summing to 1 and φ defined on [0, ∞].

Key consequence: marginal gain of adding a candidate c to a current set A
(field t_A, candidate times τ_c) is

    Δ(c | A) = Σ_j w_j (φ(t_A_j) - φ(min(t_A_j, τ_cj))) ≥ 0,

because adding a station can only decrease t and we choose φ non-decreasing.
This makes greedy submodular and allows fully vectorized marginal gains.
"""

from dataclasses import dataclass
from typing import Callable

import numpy as np


@dataclass
class SeparableObjective:
    """Generic Σ w_j φ(t_j) objective with vectorized marginal gain."""

    weights: np.ndarray
    phi: Callable[[np.ndarray], np.ndarray]
    name: str = "objective"

    def __post_init__(self):
        w = np.asarray(self.weights, dtype=np.float64)
        if w.ndim != 1 or np.any(w < 0) or not np.isfinite(w.sum()) or w.sum() <= 0:
            raise ValueError("weights must be 1D, nonnegative, finite, with positive sum")
        self.weights = w / w.sum()

    def value(self, t: np.ndarray) -> float:
        return float(self.weights @ self.phi(np.asarray(t, dtype=np.float64)))

    def marginal_gain(self, t_A: np.ndarray, T_C: np.ndarray) -> np.ndarray:
        """Δ(c | A) for every row c of T_C. Returns (K,)."""
        t_A = np.asarray(t_A, dtype=np.float64)
        T_C = np.asarray(T_C, dtype=np.float64)
        phi_old = self.phi(t_A)
        # φ broadcasted over the (K, N) matrix of new fields min(T_C, t_A)
        phi_new = self.phi(np.minimum(T_C, t_A))
        return ((phi_old - phi_new) * self.weights).sum(axis=1)


def mean_response_time(weights: np.ndarray, t_cap_min: float = 120.0) -> SeparableObjective:
    """E[T] under Q. Caps unreachable cells at t_cap_min so the functional stays finite."""
    cap = float(t_cap_min)

    def phi(t):
        return np.minimum(t, cap)

    return SeparableObjective(weights=weights, phi=phi, name=f"mean_time(cap={cap:g})")


def weighted_coverage(weights: np.ndarray, threshold_min: float) -> SeparableObjective:
    """Negative weighted coverage: minimizing this maximizes Σ w_j 1{t_j ≤ T}."""
    T = float(threshold_min)

    def phi(t):
        return -(t <= T).astype(np.float64)

    return SeparableObjective(weights=weights, phi=phi, name=f"-coverage(T={T:g})")


def survival_exponential(median_min: float) -> Callable[[np.ndarray], np.ndarray]:
    """Exponential survival curve with constant hazard and S(median)=0.5."""
    median = max(float(median_min), 1e-9)
    lam = np.log(2.0) / median
    return lambda t: np.exp(-lam * np.asarray(t, dtype=np.float64))


def survival_increasing_intensity(
    median_min: float,
    max_time_min: float,
) -> Callable[[np.ndarray], np.ndarray]:
    """Survival curve with increasing hazard and S(max_time)=0."""
    median = max(float(median_min), 1e-9)
    max_time = max(float(max_time_min), median + 1e-9)
    scale = np.log(2.0) * (max_time - median) / (median * median)

    def survival(t):
        values = np.asarray(t, dtype=np.float64)
        out = np.zeros_like(values)
        active = values < max_time
        clipped = np.maximum(values[active], 0.0)
        hazard = scale * clipped * clipped / np.maximum(max_time - clipped, 1e-9)
        out[active] = np.exp(-hazard)
        return out

    return survival


def expected_failure(weights: np.ndarray, survival: Callable[[np.ndarray], np.ndarray]) -> SeparableObjective:
    """E[1 - S(T)]. Survival must satisfy S(0)=1, monotone decreasing, S(inf)=0."""

    def phi(t):
        out = np.ones_like(t, dtype=np.float64)
        finite = np.isfinite(t)
        out[finite] = 1.0 - survival(t[finite])
        return out

    return SeparableObjective(weights=weights, phi=phi, name="expected_failure")