Mike
optimization rework
31946d6
"""Per-cell separable objectives.
All objectives are normalized to MINIMIZATION. A separable objective has the
form F(t) = Σ_j w_j · φ(t_j), with w summing to 1 and φ defined on [0, ∞].
Key consequence: marginal gain of adding a candidate c to a current set A
(field t_A, candidate times τ_c) is
Δ(c | A) = Σ_j w_j (φ(t_A_j) - φ(min(t_A_j, τ_cj))) ≥ 0,
because adding a station can only decrease t and we choose φ non-decreasing.
This makes greedy submodular and allows fully vectorized marginal gains.
"""
from dataclasses import dataclass
from typing import Callable
import numpy as np
@dataclass
class SeparableObjective:
"""Generic Σ w_j φ(t_j) objective with vectorized marginal gain."""
weights: np.ndarray
phi: Callable[[np.ndarray], np.ndarray]
name: str = "objective"
def __post_init__(self):
w = np.asarray(self.weights, dtype=np.float64)
if w.ndim != 1 or np.any(w < 0) or not np.isfinite(w.sum()) or w.sum() <= 0:
raise ValueError("weights must be 1D, nonnegative, finite, with positive sum")
self.weights = w / w.sum()
def value(self, t: np.ndarray) -> float:
return float(self.weights @ self.phi(np.asarray(t, dtype=np.float64)))
def marginal_gain(self, t_A: np.ndarray, T_C: np.ndarray) -> np.ndarray:
"""Δ(c | A) for every row c of T_C. Returns (K,)."""
t_A = np.asarray(t_A, dtype=np.float64)
T_C = np.asarray(T_C, dtype=np.float64)
phi_old = self.phi(t_A)
# φ broadcasted over the (K, N) matrix of new fields min(T_C, t_A)
phi_new = self.phi(np.minimum(T_C, t_A))
return ((phi_old - phi_new) * self.weights).sum(axis=1)
def mean_response_time(weights: np.ndarray, t_cap_min: float = 120.0) -> SeparableObjective:
"""E[T] under Q. Caps unreachable cells at t_cap_min so the functional stays finite."""
cap = float(t_cap_min)
def phi(t):
return np.minimum(t, cap)
return SeparableObjective(weights=weights, phi=phi, name=f"mean_time(cap={cap:g})")
def weighted_coverage(weights: np.ndarray, threshold_min: float) -> SeparableObjective:
"""Negative weighted coverage: minimizing this maximizes Σ w_j 1{t_j ≤ T}."""
T = float(threshold_min)
def phi(t):
return -(t <= T).astype(np.float64)
return SeparableObjective(weights=weights, phi=phi, name=f"-coverage(T={T:g})")
def survival_exponential(median_min: float) -> Callable[[np.ndarray], np.ndarray]:
"""Exponential survival curve with constant hazard and S(median)=0.5."""
median = max(float(median_min), 1e-9)
lam = np.log(2.0) / median
return lambda t: np.exp(-lam * np.asarray(t, dtype=np.float64))
def survival_increasing_intensity(
median_min: float,
max_time_min: float,
) -> Callable[[np.ndarray], np.ndarray]:
"""Survival curve with increasing hazard and S(max_time)=0."""
median = max(float(median_min), 1e-9)
max_time = max(float(max_time_min), median + 1e-9)
scale = np.log(2.0) * (max_time - median) / (median * median)
def survival(t):
values = np.asarray(t, dtype=np.float64)
out = np.zeros_like(values)
active = values < max_time
clipped = np.maximum(values[active], 0.0)
hazard = scale * clipped * clipped / np.maximum(max_time - clipped, 1e-9)
out[active] = np.exp(-hazard)
return out
return survival
def expected_failure(weights: np.ndarray, survival: Callable[[np.ndarray], np.ndarray]) -> SeparableObjective:
"""E[1 - S(T)]. Survival must satisfy S(0)=1, monotone decreasing, S(inf)=0."""
def phi(t):
out = np.ones_like(t, dtype=np.float64)
finite = np.isfinite(t)
out[finite] = 1.0 - survival(t[finite])
return out
return SeparableObjective(weights=weights, phi=phi, name="expected_failure")