| """Greedy + local swap algorithms for the placement problem.""" |
|
|
| from itertools import combinations |
| import time |
| from dataclasses import dataclass, field |
|
|
| import numpy as np |
|
|
| from .problem import Problem |
|
|
|
|
| @dataclass |
| class Solution: |
| selected: np.ndarray |
| objective_value: float |
| objective_history: list[float] |
| final_field: np.ndarray |
| iterations: int |
| converged: bool |
| meta: dict = field(default_factory=dict) |
|
|
|
|
| def _field_with(t_base: np.ndarray, T_C: np.ndarray, sel: np.ndarray) -> np.ndarray: |
| if len(sel) == 0: |
| return t_base.copy() |
| return np.minimum(t_base, T_C[sel].min(axis=0)) |
|
|
|
|
| def greedy(problem: Problem) -> Solution: |
| """Submodular-style greedy: at each step pick candidate with the largest Δ.""" |
| t0 = time.perf_counter() |
| obj = problem.objective |
| T_C = problem.candidates.travel_times |
| K = problem.candidates.K |
| m = problem.m |
|
|
| t_curr = problem.base_field.copy() |
| f_curr = obj.value(t_curr) |
| history = [f_curr] |
|
|
| selected: list[int] = [] |
| available = np.ones(K, dtype=bool) |
|
|
| for step in range(m): |
| gains = obj.marginal_gain(t_curr, T_C) |
| gains[~available] = -np.inf |
| c_star = int(np.argmax(gains)) |
| if not available[c_star] or gains[c_star] <= 0: |
| |
| |
| break |
| selected.append(c_star) |
| available[c_star] = False |
| t_curr = np.minimum(t_curr, T_C[c_star]) |
| f_curr = obj.value(t_curr) |
| history.append(f_curr) |
|
|
| return Solution( |
| selected=np.asarray(selected, dtype=np.int64), |
| objective_value=f_curr, |
| objective_history=history, |
| final_field=t_curr, |
| iterations=len(selected), |
| converged=(len(selected) == m), |
| meta={"algorithm": "greedy", "elapsed_sec": time.perf_counter() - t0}, |
| ) |
|
|
|
|
| def local_swap(problem: Problem, init: Solution, max_iters: int = 50) -> Solution: |
| """First-improvement 1-swap: try replacing each a∈A with each c∈𝒞∖A.""" |
| t0 = time.perf_counter() |
| obj = problem.objective |
| T_C = problem.candidates.travel_times |
| K = problem.candidates.K |
|
|
| A = list(int(i) for i in init.selected) |
| if len(A) == 0: |
| return init |
|
|
| t_curr = init.final_field.copy() |
| f_curr = init.objective_value |
| history = list(init.objective_history) |
|
|
| swaps = 0 |
| converged = False |
| for it in range(max_iters): |
| improved = False |
| in_A = np.zeros(K, dtype=bool) |
| in_A[A] = True |
|
|
| for ai in range(len(A)): |
| |
| kept = [A[j] for j in range(len(A)) if j != ai] |
| t_without = _field_with(problem.base_field, T_C, np.asarray(kept, dtype=np.int64)) |
| |
| T_try = np.minimum(t_without, T_C) |
| f_try = (obj.phi(T_try) * obj.weights).sum(axis=1) |
| f_try[in_A] = np.inf |
| c_best = int(np.argmin(f_try)) |
| if f_try[c_best] + 1e-12 < f_curr: |
| |
| A[ai] = c_best |
| in_A = np.zeros(K, dtype=bool) |
| in_A[A] = True |
| t_curr = np.minimum(t_without, T_C[c_best]) |
| f_curr = float(f_try[c_best]) |
| history.append(f_curr) |
| swaps += 1 |
| improved = True |
| break |
|
|
| if not improved: |
| converged = True |
| break |
|
|
| return Solution( |
| selected=np.asarray(A, dtype=np.int64), |
| objective_value=f_curr, |
| objective_history=history, |
| final_field=t_curr, |
| iterations=swaps, |
| converged=converged, |
| meta={ |
| "algorithm": "local_swap", |
| "swaps": swaps, |
| "max_iters": max_iters, |
| "elapsed_sec": time.perf_counter() - t0, |
| }, |
| ) |
|
|
|
|
| def _greedy_refill( |
| problem: Problem, |
| t_start: np.ndarray, |
| blocked: np.ndarray, |
| n_add: int, |
| ) -> tuple[list[int], np.ndarray, float] | None: |
| obj = problem.objective |
| T_C = problem.candidates.travel_times |
|
|
| added: list[int] = [] |
| t_curr = t_start.copy() |
| unavailable = blocked.copy() |
| f_curr = obj.value(t_curr) |
|
|
| for _ in range(n_add): |
| T_try = np.minimum(t_curr, T_C) |
| f_try = (obj.phi(T_try) * obj.weights).sum(axis=1) |
| f_try[unavailable] = np.inf |
| c_best = int(np.argmin(f_try)) |
| if not np.isfinite(f_try[c_best]): |
| return None |
| added.append(c_best) |
| unavailable[c_best] = True |
| t_curr = T_try[c_best] |
| f_curr = float(f_try[c_best]) |
|
|
| return added, t_curr, f_curr |
|
|
|
|
| def local_k_swap(problem: Problem, init: Solution, swap_size: int, max_iters: int = 20) -> Solution: |
| """Best-improvement k-swap with greedy refill for the k added candidates.""" |
| t0 = time.perf_counter() |
| if swap_size < 1: |
| raise ValueError("swap_size must be positive") |
|
|
| T_C = problem.candidates.travel_times |
| K = problem.candidates.K |
| A = list(int(i) for i in init.selected) |
| if len(A) < swap_size: |
| return init |
|
|
| f_curr = init.objective_value |
| t_curr = init.final_field.copy() |
| history = list(init.objective_history) |
|
|
| swaps = 0 |
| converged = False |
| for _ in range(max_iters): |
| best_A = None |
| best_t = None |
| best_f = f_curr |
|
|
| in_A = np.zeros(K, dtype=bool) |
| in_A[A] = True |
| for drop_positions in combinations(range(len(A)), swap_size): |
| drop_set = set(drop_positions) |
| kept = [A[i] for i in range(len(A)) if i not in drop_set] |
| t_without = _field_with(problem.base_field, T_C, np.asarray(kept, dtype=np.int64)) |
|
|
| refill = _greedy_refill(problem, t_without, in_A, swap_size) |
| if refill is None: |
| continue |
| added, t_try, f_try = refill |
| if f_try + 1e-12 < best_f: |
| best_A = kept + added |
| best_t = t_try |
| best_f = f_try |
|
|
| if best_A is None: |
| converged = True |
| break |
|
|
| A = best_A |
| t_curr = best_t |
| f_curr = best_f |
| history.append(f_curr) |
| swaps += 1 |
|
|
| return Solution( |
| selected=np.asarray(A, dtype=np.int64), |
| objective_value=f_curr, |
| objective_history=history, |
| final_field=t_curr, |
| iterations=swaps, |
| converged=converged, |
| meta={ |
| "algorithm": f"local_{swap_size}_swap", |
| "swap_size": swap_size, |
| "swaps": swaps, |
| "max_iters": max_iters, |
| "elapsed_sec": time.perf_counter() - t0, |
| }, |
| ) |
|
|
|
|
| def greedy_then_swap(problem: Problem, max_iters: int = 50) -> Solution: |
| """Greedy seed + local 1-swap polishing.""" |
| t0 = time.perf_counter() |
| seed = greedy(problem) |
| polished = local_swap(problem, seed, max_iters=max_iters) |
| polished.meta = { |
| "algorithm": "greedy_then_swap", |
| "greedy": seed.meta, |
| "swap": polished.meta, |
| "elapsed_sec": time.perf_counter() - t0, |
| } |
| polished.objective_history = seed.objective_history + polished.objective_history[len(seed.objective_history):] |
| return polished |
|
|
|
|
| def greedy_then_k_swap(problem: Problem, swap_size: int, max_iters: int = 20) -> Solution: |
| """Greedy seed + local k-swap polishing.""" |
| t0 = time.perf_counter() |
| if swap_size == 1: |
| return greedy_then_swap(problem, max_iters=max_iters) |
| seed = greedy(problem) |
| polished = local_k_swap(problem, seed, swap_size=swap_size, max_iters=max_iters) |
| polished.meta = { |
| "algorithm": f"greedy_then_{swap_size}_swap", |
| "greedy": seed.meta, |
| "swap": polished.meta, |
| "elapsed_sec": time.perf_counter() - t0, |
| } |
| polished.objective_history = seed.objective_history + polished.objective_history[len(seed.objective_history):] |
| return polished |
|
|