train-schedule-optimization / greedyOptim /advanced_optimizers.py
Arpit-Bansal's picture
proper scheduling-1, better iterations set
0f86bb9
"""
Advanced optimization algorithms for trainset scheduling.
Includes CMA-ES, Particle Swarm Optimization, and Simulated Annealing.
"""
import numpy as np
from typing import Optional, Dict, List
import math
from .models import OptimizationResult, OptimizationConfig
from .evaluator import TrainsetSchedulingEvaluator
class CMAESOptimizer:
"""CMA-ES (Covariance Matrix Adaptation Evolution Strategy) optimizer."""
def __init__(self, evaluator: TrainsetSchedulingEvaluator, config: Optional[OptimizationConfig] = None):
self.evaluator = evaluator
self.config = config or OptimizationConfig()
self.n = evaluator.num_trainsets
self.n_blocks = evaluator.num_blocks
self.optimize_blocks = self.config.optimize_block_assignment
self.lam = self.config.population_size # Population size
self.mu = self.config.population_size // 2 # Number of parents
# Initialize CMA-ES parameters
self._initialize_cmaes()
def _initialize_cmaes(self):
"""Initialize CMA-ES strategy parameters."""
self.mean = np.random.rand(self.n) * 3 # Initial mean in [0, 3)
self.sigma = 1.0 # Step size
self.C = np.eye(self.n) # Covariance matrix
# Strategy parameters
self.weights = np.log(self.mu + 0.5) - np.log(np.arange(1, self.mu + 1))
self.weights /= self.weights.sum()
self.mu_eff = 1.0 / (self.weights ** 2).sum()
self.cc = 4.0 / (self.n + 4.0)
self.cs = (self.mu_eff + 2.0) / (self.n + self.mu_eff + 5.0)
self.c1 = 2.0 / ((self.n + 1.3) ** 2 + self.mu_eff)
self.cmu = min(1 - self.c1, 2 * (self.mu_eff - 2 + 1 / self.mu_eff) / ((self.n + 2) ** 2 + self.mu_eff))
self.damps = 1.0 + 2.0 * max(0, np.sqrt((self.mu_eff - 1) / (self.n + 1)) - 1) + self.cs
self.pc = np.zeros(self.n)
self.ps = np.zeros(self.n)
def _decode(self, x: np.ndarray) -> np.ndarray:
"""Decode continuous values to discrete actions."""
return np.clip(np.round(x), 0, 2).astype(int)
def _create_block_assignment(self, trainset_sol: np.ndarray) -> np.ndarray:
"""Create optimized block assignments for a trainset solution."""
service_indices = np.where(trainset_sol == 0)[0]
if len(service_indices) == 0:
return np.full(self.n_blocks, -1, dtype=int)
# Distribute blocks evenly with some randomization
block_sol = np.zeros(self.n_blocks, dtype=int)
for i in range(self.n_blocks):
block_sol[i] = service_indices[i % len(service_indices)]
# Random shuffle to explore different assignments
np.random.shuffle(block_sol)
# Repair to ensure valid assignments
for i in range(len(block_sol)):
if block_sol[i] not in service_indices:
block_sol[i] = np.random.choice(service_indices)
return block_sol
def optimize(self, generations: Optional[int] = None) -> OptimizationResult:
"""Run CMA-ES optimization."""
# Use config.iterations as default if not specified
if generations is None:
generations = self.config.iterations * 15 # Scale iterations for CMA-ES
best_fitness = float('inf')
best_solution: Optional[np.ndarray] = None
best_block_solution: Optional[np.ndarray] = None
print(f"Starting CMA-ES optimization for {generations} generations")
if self.optimize_blocks:
print(f"Optimizing block assignments for {self.n_blocks} service blocks")
for gen in range(generations):
try:
# Sample population
population = []
for _ in range(self.lam):
z = np.random.randn(self.n)
y = self.mean + self.sigma * (self.C @ z)
population.append(y)
population = np.array(population)
# Evaluate
fitness = []
decoded_pop = []
block_pop = []
for ind in population:
decoded = self._decode(ind)
decoded_pop.append(decoded)
if self.optimize_blocks:
block_sol = self._create_block_assignment(decoded)
block_pop.append(block_sol)
fit = self.evaluator.schedule_fitness_function(decoded, block_sol)
else:
fit = self.evaluator.fitness_function(decoded)
fitness.append(fit)
fitness = np.array(fitness)
decoded_pop = np.array(decoded_pop)
# Track best
gen_best_idx = np.argmin(fitness)
if fitness[gen_best_idx] < best_fitness:
best_fitness = fitness[gen_best_idx]
best_solution = decoded_pop[gen_best_idx].copy()
if self.optimize_blocks and block_pop:
best_block_solution = block_pop[gen_best_idx].copy()
if gen % 30 == 0:
print(f"Generation {gen}: Best Fitness = {best_fitness:.2f}")
# Selection and recombination
sorted_indices = np.argsort(fitness)[:self.mu]
selected = population[sorted_indices]
old_mean = self.mean.copy()
self.mean = selected.T @ self.weights
# Update evolution paths
self.ps = (1 - self.cs) * self.ps + np.sqrt(self.cs * (2 - self.cs) * self.mu_eff) * (self.mean - old_mean) / self.sigma
# Fix: Use (gen + 1) to avoid division by zero in first iteration
hsig = (np.linalg.norm(self.ps) / np.sqrt(1 - (1 - self.cs) ** (2 * (gen + 1)))
< (1.4 + 2 / (self.n + 1)) * np.sqrt(self.n))
self.pc = (1 - self.cc) * self.pc + hsig * np.sqrt(self.cc * (2 - self.cc) * self.mu_eff) * (self.mean - old_mean) / self.sigma
# Update covariance matrix
artmp = (selected - old_mean) / self.sigma
self.C = ((1 - self.c1 - self.cmu) * self.C
+ self.c1 * np.outer(self.pc, self.pc)
+ self.cmu * (artmp.T @ np.diag(self.weights) @ artmp))
# Update step size
self.sigma *= np.exp((self.cs / self.damps) * (np.linalg.norm(self.ps) / np.sqrt(self.n) - 1))
except Exception as e:
print(f"Error in CMA-ES generation {gen}: {e}")
break
if best_solution is None:
raise RuntimeError("No valid solution found during CMA-ES optimization")
return self._build_result(best_solution, best_fitness, best_block_solution)
def _build_result(self, solution: np.ndarray, fitness: float,
block_solution: Optional[np.ndarray] = None) -> OptimizationResult:
"""Build optimization result from solution."""
objectives = self.evaluator.calculate_objectives(solution)
service = [self.evaluator.trainsets[i] for i, v in enumerate(solution) if v == 0]
standby = [self.evaluator.trainsets[i] for i, v in enumerate(solution) if v == 1]
maintenance = [self.evaluator.trainsets[i] for i, v in enumerate(solution) if v == 2]
explanations = {}
for ts_id in service:
valid, reason = self.evaluator.check_hard_constraints(ts_id)
explanations[ts_id] = "✓ Fit for service" if valid else f"⚠ {reason}"
# Build block assignments
block_assignments = {}
if block_solution is not None and self.optimize_blocks:
for ts_id in service:
block_assignments[ts_id] = []
for block_idx, train_idx in enumerate(block_solution):
if 0 <= train_idx < len(self.evaluator.trainsets):
ts_id = self.evaluator.trainsets[train_idx]
if ts_id in block_assignments:
block_id = self.evaluator.all_blocks[block_idx]['block_id']
block_assignments[ts_id].append(block_id)
return OptimizationResult(
selected_trainsets=service,
standby_trainsets=standby,
maintenance_trainsets=maintenance,
objectives=objectives,
fitness_score=fitness,
explanation=explanations,
service_block_assignments=block_assignments
)
class ParticleSwarmOptimizer:
"""Particle Swarm Optimization for trainset scheduling."""
def __init__(self, evaluator: TrainsetSchedulingEvaluator, config: Optional[OptimizationConfig] = None):
self.evaluator = evaluator
self.config = config or OptimizationConfig()
self.n_particles = self.config.population_size
self.n_dimensions = evaluator.num_trainsets
self.n_blocks = evaluator.num_blocks
self.optimize_blocks = self.config.optimize_block_assignment
# PSO parameters
self.w = 0.7 # Inertia weight
self.c1 = 1.5 # Cognitive parameter
self.c2 = 1.5 # Social parameter
self.v_max = 2.0 # Maximum velocity (for clamping)
def _decode(self, x: np.ndarray) -> np.ndarray:
"""Decode continuous values to discrete actions."""
return np.clip(np.round(x), 0, 2).astype(int)
def _create_block_assignment(self, trainset_sol: np.ndarray) -> np.ndarray:
"""Create block assignments for a trainset solution."""
service_indices = np.where(trainset_sol == 0)[0]
if len(service_indices) == 0:
return np.full(self.n_blocks, -1, dtype=int)
block_sol = np.zeros(self.n_blocks, dtype=int)
for i in range(self.n_blocks):
block_sol[i] = service_indices[i % len(service_indices)]
return block_sol
def optimize(self, generations: Optional[int] = None) -> OptimizationResult:
"""Run PSO optimization."""
# Use config.iterations as default if not specified
if generations is None:
generations = self.config.iterations * 20 # Scale iterations for PSO
# Initialize particles
positions = np.random.uniform(0, 3, (self.n_particles, self.n_dimensions))
velocities = np.random.uniform(-1, 1, (self.n_particles, self.n_dimensions))
# Personal best positions and fitness
p_best_positions = positions.copy()
p_best_fitness = np.array([float('inf')] * self.n_particles)
p_best_blocks = [None] * self.n_particles
# Global best
g_best_position = np.zeros(self.n_dimensions)
g_best_fitness = float('inf')
g_best_block = None
print(f"Starting PSO optimization with {self.n_particles} particles for {generations} generations")
if self.optimize_blocks:
print(f"Optimizing block assignments for {self.n_blocks} service blocks")
for gen in range(generations):
try:
for i in range(self.n_particles):
# Evaluate particle
decoded = self._decode(positions[i])
if self.optimize_blocks:
block_sol = self._create_block_assignment(decoded)
fitness = self.evaluator.schedule_fitness_function(decoded, block_sol)
else:
block_sol = None
fitness = self.evaluator.fitness_function(decoded)
# Update personal best
if fitness < p_best_fitness[i]:
p_best_fitness[i] = fitness
p_best_positions[i] = positions[i].copy()
p_best_blocks[i] = block_sol.copy() if block_sol is not None else None
# Update global best
if fitness < g_best_fitness:
g_best_fitness = fitness
g_best_position = positions[i].copy()
g_best_block = block_sol.copy() if block_sol is not None else None
# Update velocities and positions
for i in range(self.n_particles):
r1, r2 = np.random.random(2)
velocities[i] = (self.w * velocities[i] +
self.c1 * r1 * (p_best_positions[i] - positions[i]) +
self.c2 * r2 * (g_best_position - positions[i]))
# Clamp velocity to prevent explosion
velocities[i] = np.clip(velocities[i], -self.v_max, self.v_max)
positions[i] += velocities[i]
# Bound positions
positions[i] = np.clip(positions[i], 0, 3)
if gen % 50 == 0:
print(f"Generation {gen}: Best Fitness = {g_best_fitness:.2f}")
except Exception as e:
print(f"Error in PSO generation {gen}: {e}")
break
best_solution = self._decode(g_best_position)
return self._build_result(best_solution, g_best_fitness, g_best_block)
def _build_result(self, solution: np.ndarray, fitness: float,
block_solution: Optional[np.ndarray] = None) -> OptimizationResult:
"""Build optimization result from solution."""
objectives = self.evaluator.calculate_objectives(solution)
service = [self.evaluator.trainsets[i] for i, v in enumerate(solution) if v == 0]
standby = [self.evaluator.trainsets[i] for i, v in enumerate(solution) if v == 1]
maintenance = [self.evaluator.trainsets[i] for i, v in enumerate(solution) if v == 2]
explanations = {}
for ts_id in service:
valid, reason = self.evaluator.check_hard_constraints(ts_id)
explanations[ts_id] = "✓ Fit for service" if valid else f"⚠ {reason}"
# Build block assignments
block_assignments = {}
if block_solution is not None and self.optimize_blocks:
for ts_id in service:
block_assignments[ts_id] = []
for block_idx, train_idx in enumerate(block_solution):
if 0 <= train_idx < len(self.evaluator.trainsets):
ts_id = self.evaluator.trainsets[train_idx]
if ts_id in block_assignments:
block_id = self.evaluator.all_blocks[block_idx]['block_id']
block_assignments[ts_id].append(block_id)
return OptimizationResult(
selected_trainsets=service,
standby_trainsets=standby,
maintenance_trainsets=maintenance,
objectives=objectives,
fitness_score=fitness,
explanation=explanations,
service_block_assignments=block_assignments
)
class SimulatedAnnealingOptimizer:
"""Simulated Annealing optimizer for trainset scheduling."""
def __init__(self, evaluator: TrainsetSchedulingEvaluator, config: Optional[OptimizationConfig] = None):
self.evaluator = evaluator
self.config = config or OptimizationConfig()
self.n_dimensions = evaluator.num_trainsets
self.n_blocks = evaluator.num_blocks
self.optimize_blocks = self.config.optimize_block_assignment
def _get_neighbor(self, solution: np.ndarray) -> np.ndarray:
"""Generate a neighbor solution by randomly changing one gene."""
neighbor = solution.copy()
idx = np.random.randint(0, len(solution))
neighbor[idx] = np.random.randint(0, 3)
return neighbor
def _get_block_neighbor(self, block_sol: np.ndarray, service_indices: np.ndarray) -> np.ndarray:
"""Generate a neighbor block assignment."""
neighbor = block_sol.copy()
if len(service_indices) == 0:
return neighbor
# Randomly reassign a few blocks
num_changes = max(1, self.n_blocks // 20)
for _ in range(num_changes):
idx = np.random.randint(0, len(neighbor))
neighbor[idx] = np.random.choice(service_indices)
return neighbor
def _create_block_assignment(self, trainset_sol: np.ndarray) -> np.ndarray:
"""Create block assignments for a trainset solution."""
service_indices = np.where(trainset_sol == 0)[0]
if len(service_indices) == 0:
return np.full(self.n_blocks, -1, dtype=int)
block_sol = np.zeros(self.n_blocks, dtype=int)
for i in range(self.n_blocks):
block_sol[i] = service_indices[i % len(service_indices)]
return block_sol
def _temperature(self, iteration: int, max_iterations: int) -> float:
"""Calculate temperature using exponential cooling."""
return 100.0 * (0.95 ** iteration)
def optimize(self, max_iterations: Optional[int] = None) -> OptimizationResult:
"""Run Simulated Annealing optimization."""
# Use config.iterations as default if not specified
if max_iterations is None:
max_iterations = self.config.iterations * 1000 # Scale iterations for SA
# Initialize with a random solution
current_solution = np.random.randint(0, 3, self.n_dimensions)
current_block_sol = self._create_block_assignment(current_solution) if self.optimize_blocks else None
if self.optimize_blocks and current_block_sol is not None:
current_fitness = self.evaluator.schedule_fitness_function(current_solution, current_block_sol)
else:
current_fitness = self.evaluator.fitness_function(current_solution)
best_solution = current_solution.copy()
best_block_sol = current_block_sol.copy() if current_block_sol is not None else None
best_fitness = current_fitness
print(f"Starting Simulated Annealing optimization for {max_iterations} iterations")
if self.optimize_blocks:
print(f"Optimizing block assignments for {self.n_blocks} service blocks")
for iteration in range(max_iterations):
try:
# Generate neighbor
neighbor = self._get_neighbor(current_solution)
if self.optimize_blocks:
service_indices = np.where(neighbor == 0)[0]
# Sometimes create new block assignments, sometimes just modify
if np.random.random() < 0.3:
neighbor_block_sol = self._create_block_assignment(neighbor)
else:
neighbor_block_sol = self._get_block_neighbor(
current_block_sol if current_block_sol is not None else self._create_block_assignment(neighbor),
service_indices
)
neighbor_fitness = self.evaluator.schedule_fitness_function(neighbor, neighbor_block_sol)
else:
neighbor_block_sol = None
neighbor_fitness = self.evaluator.fitness_function(neighbor)
# Calculate acceptance probability
temperature = self._temperature(iteration, max_iterations)
if neighbor_fitness < current_fitness:
# Accept better solution
current_solution = neighbor
current_fitness = neighbor_fitness
current_block_sol = neighbor_block_sol
elif temperature > 0:
# Accept worse solution with probability
delta = neighbor_fitness - current_fitness
probability = math.exp(-delta / temperature)
if np.random.random() < probability:
current_solution = neighbor
current_fitness = neighbor_fitness
current_block_sol = neighbor_block_sol
# Update best solution
if current_fitness < best_fitness:
best_solution = current_solution.copy()
best_fitness = current_fitness
best_block_sol = current_block_sol.copy() if current_block_sol is not None else None
if iteration % 1000 == 0:
print(f"Iteration {iteration}: Best Fitness = {best_fitness:.2f}, Temperature = {temperature:.2f}")
except Exception as e:
print(f"Error in SA iteration {iteration}: {e}")
break
return self._build_result(best_solution, best_fitness, best_block_sol)
def _build_result(self, solution: np.ndarray, fitness: float,
block_solution: Optional[np.ndarray] = None) -> OptimizationResult:
"""Build optimization result from solution."""
objectives = self.evaluator.calculate_objectives(solution)
service = [self.evaluator.trainsets[i] for i, v in enumerate(solution) if v == 0]
standby = [self.evaluator.trainsets[i] for i, v in enumerate(solution) if v == 1]
maintenance = [self.evaluator.trainsets[i] for i, v in enumerate(solution) if v == 2]
explanations = {}
for ts_id in service:
valid, reason = self.evaluator.check_hard_constraints(ts_id)
explanations[ts_id] = "✓ Fit for service" if valid else f"⚠ {reason}"
# Build block assignments
block_assignments = {}
if block_solution is not None and self.optimize_blocks:
for ts_id in service:
block_assignments[ts_id] = []
for block_idx, train_idx in enumerate(block_solution):
if 0 <= train_idx < len(self.evaluator.trainsets):
ts_id = self.evaluator.trainsets[train_idx]
if ts_id in block_assignments:
block_id = self.evaluator.all_blocks[block_idx]['block_id']
block_assignments[ts_id].append(block_id)
return OptimizationResult(
selected_trainsets=service,
standby_trainsets=standby,
maintenance_trainsets=maintenance,
objectives=objectives,
fitness_score=fitness,
explanation=explanations,
service_block_assignments=block_assignments
)