Mike
feat: added greedy + k-swap
b6cd270
"""Greedy + local swap algorithms for the placement problem."""
from itertools import combinations
import time
from dataclasses import dataclass, field
import numpy as np
from .problem import Problem
@dataclass
class Solution:
selected: np.ndarray # (m,) indices into problem.candidates
objective_value: float
objective_history: list[float] # F(base), F(after step 1), ..., F(after last step)
final_field: np.ndarray # (N,) t(A ∪ existing)
iterations: int
converged: bool
meta: dict = field(default_factory=dict)
def _field_with(t_base: np.ndarray, T_C: np.ndarray, sel: np.ndarray) -> np.ndarray:
if len(sel) == 0:
return t_base.copy()
return np.minimum(t_base, T_C[sel].min(axis=0))
def greedy(problem: Problem) -> Solution:
"""Submodular-style greedy: at each step pick candidate with the largest Δ."""
t0 = time.perf_counter()
obj = problem.objective
T_C = problem.candidates.travel_times
K = problem.candidates.K
m = problem.m
t_curr = problem.base_field.copy()
f_curr = obj.value(t_curr)
history = [f_curr]
selected: list[int] = []
available = np.ones(K, dtype=bool)
for step in range(m):
gains = obj.marginal_gain(t_curr, T_C)
gains[~available] = -np.inf
c_star = int(np.argmax(gains))
if not available[c_star] or gains[c_star] <= 0:
# No improving candidate (or all non-positive — happens for indicator phi
# once nothing further can be saved). Stop early.
break
selected.append(c_star)
available[c_star] = False
t_curr = np.minimum(t_curr, T_C[c_star])
f_curr = obj.value(t_curr)
history.append(f_curr)
return Solution(
selected=np.asarray(selected, dtype=np.int64),
objective_value=f_curr,
objective_history=history,
final_field=t_curr,
iterations=len(selected),
converged=(len(selected) == m),
meta={"algorithm": "greedy", "elapsed_sec": time.perf_counter() - t0},
)
def local_swap(problem: Problem, init: Solution, max_iters: int = 50) -> Solution:
"""First-improvement 1-swap: try replacing each a∈A with each c∈𝒞∖A."""
t0 = time.perf_counter()
obj = problem.objective
T_C = problem.candidates.travel_times
K = problem.candidates.K
A = list(int(i) for i in init.selected)
if len(A) == 0:
return init
t_curr = init.final_field.copy()
f_curr = init.objective_value
history = list(init.objective_history)
swaps = 0
converged = False
for it in range(max_iters):
improved = False
in_A = np.zeros(K, dtype=bool)
in_A[A] = True
for ai in range(len(A)):
# Field if we drop A[ai]: recompute min over (existing ∪ A\{a})
kept = [A[j] for j in range(len(A)) if j != ai]
t_without = _field_with(problem.base_field, T_C, np.asarray(kept, dtype=np.int64))
# Vectorized: try every c not currently in A
T_try = np.minimum(t_without, T_C) # (K, N)
f_try = (obj.phi(T_try) * obj.weights).sum(axis=1) # (K,)
f_try[in_A] = np.inf # cannot swap-in something already in A
c_best = int(np.argmin(f_try))
if f_try[c_best] + 1e-12 < f_curr:
# Accept swap
A[ai] = c_best
in_A = np.zeros(K, dtype=bool)
in_A[A] = True
t_curr = np.minimum(t_without, T_C[c_best])
f_curr = float(f_try[c_best])
history.append(f_curr)
swaps += 1
improved = True
break # restart outer scan after first improvement
if not improved:
converged = True
break
return Solution(
selected=np.asarray(A, dtype=np.int64),
objective_value=f_curr,
objective_history=history,
final_field=t_curr,
iterations=swaps,
converged=converged,
meta={
"algorithm": "local_swap",
"swaps": swaps,
"max_iters": max_iters,
"elapsed_sec": time.perf_counter() - t0,
},
)
def _greedy_refill(
problem: Problem,
t_start: np.ndarray,
blocked: np.ndarray,
n_add: int,
) -> tuple[list[int], np.ndarray, float] | None:
obj = problem.objective
T_C = problem.candidates.travel_times
added: list[int] = []
t_curr = t_start.copy()
unavailable = blocked.copy()
f_curr = obj.value(t_curr)
for _ in range(n_add):
T_try = np.minimum(t_curr, T_C)
f_try = (obj.phi(T_try) * obj.weights).sum(axis=1)
f_try[unavailable] = np.inf
c_best = int(np.argmin(f_try))
if not np.isfinite(f_try[c_best]):
return None
added.append(c_best)
unavailable[c_best] = True
t_curr = T_try[c_best]
f_curr = float(f_try[c_best])
return added, t_curr, f_curr
def local_k_swap(problem: Problem, init: Solution, swap_size: int, max_iters: int = 20) -> Solution:
"""Best-improvement k-swap with greedy refill for the k added candidates."""
t0 = time.perf_counter()
if swap_size < 1:
raise ValueError("swap_size must be positive")
T_C = problem.candidates.travel_times
K = problem.candidates.K
A = list(int(i) for i in init.selected)
if len(A) < swap_size:
return init
f_curr = init.objective_value
t_curr = init.final_field.copy()
history = list(init.objective_history)
swaps = 0
converged = False
for _ in range(max_iters):
best_A = None
best_t = None
best_f = f_curr
in_A = np.zeros(K, dtype=bool)
in_A[A] = True
for drop_positions in combinations(range(len(A)), swap_size):
drop_set = set(drop_positions)
kept = [A[i] for i in range(len(A)) if i not in drop_set]
t_without = _field_with(problem.base_field, T_C, np.asarray(kept, dtype=np.int64))
refill = _greedy_refill(problem, t_without, in_A, swap_size)
if refill is None:
continue
added, t_try, f_try = refill
if f_try + 1e-12 < best_f:
best_A = kept + added
best_t = t_try
best_f = f_try
if best_A is None:
converged = True
break
A = best_A
t_curr = best_t
f_curr = best_f
history.append(f_curr)
swaps += 1
return Solution(
selected=np.asarray(A, dtype=np.int64),
objective_value=f_curr,
objective_history=history,
final_field=t_curr,
iterations=swaps,
converged=converged,
meta={
"algorithm": f"local_{swap_size}_swap",
"swap_size": swap_size,
"swaps": swaps,
"max_iters": max_iters,
"elapsed_sec": time.perf_counter() - t0,
},
)
def greedy_then_swap(problem: Problem, max_iters: int = 50) -> Solution:
"""Greedy seed + local 1-swap polishing."""
t0 = time.perf_counter()
seed = greedy(problem)
polished = local_swap(problem, seed, max_iters=max_iters)
polished.meta = {
"algorithm": "greedy_then_swap",
"greedy": seed.meta,
"swap": polished.meta,
"elapsed_sec": time.perf_counter() - t0,
}
polished.objective_history = seed.objective_history + polished.objective_history[len(seed.objective_history):]
return polished
def greedy_then_k_swap(problem: Problem, swap_size: int, max_iters: int = 20) -> Solution:
"""Greedy seed + local k-swap polishing."""
t0 = time.perf_counter()
if swap_size == 1:
return greedy_then_swap(problem, max_iters=max_iters)
seed = greedy(problem)
polished = local_k_swap(problem, seed, swap_size=swap_size, max_iters=max_iters)
polished.meta = {
"algorithm": f"greedy_then_{swap_size}_swap",
"greedy": seed.meta,
"swap": polished.meta,
"elapsed_sec": time.perf_counter() - t0,
}
polished.objective_history = seed.objective_history + polished.objective_history[len(seed.objective_history):]
return polished