File size: 13,993 Bytes
33c9f7f 0f86bb9 33c9f7f 0f86bb9 33c9f7f 0f86bb9 33c9f7f 0f86bb9 33c9f7f 0f86bb9 33c9f7f 0f86bb9 33c9f7f 0f86bb9 33c9f7f 0f86bb9 33c9f7f 0f86bb9 33c9f7f 0f86bb9 33c9f7f 0f86bb9 33c9f7f 0f86bb9 33c9f7f 0f86bb9 33c9f7f 0f86bb9 33c9f7f 0f86bb9 33c9f7f 0f86bb9 33c9f7f 0f86bb9 33c9f7f 0f86bb9 33c9f7f 0f86bb9 33c9f7f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 |
"""
Genetic Algorithm optimizer for trainset scheduling.
"""
import numpy as np
from typing import Tuple, Optional, Dict, List
from .models import OptimizationResult, OptimizationConfig
from .evaluator import TrainsetSchedulingEvaluator
class GeneticAlgorithmOptimizer:
"""Genetic Algorithm implementation for trainset scheduling optimization."""
def __init__(self, evaluator: TrainsetSchedulingEvaluator, config: Optional[OptimizationConfig] = None):
self.evaluator = evaluator
self.config = config or OptimizationConfig()
self.n_genes = evaluator.num_trainsets
self.n_blocks = evaluator.num_blocks
self.optimize_blocks = self.config.optimize_block_assignment
def initialize_population(self) -> np.ndarray:
"""Initialize random population with smart seeding."""
population = []
# Seeded solutions (50% of population)
for _ in range(self.config.population_size // 2):
solution = np.zeros(self.n_genes, dtype=int)
# Randomly assign required number to service, min to standby, rest to maintenance
indices = np.random.permutation(self.n_genes)
service_count = min(self.config.required_service_trains, self.n_genes)
standby_count = self.config.min_standby
solution[indices[:service_count]] = 0 # Service
solution[indices[service_count:service_count + standby_count]] = 1 # Standby
solution[indices[service_count + standby_count:]] = 2 # Maintenance
population.append(solution)
# Random solutions (50% of population)
for _ in range(self.config.population_size // 2):
solution = np.random.randint(0, 3, self.n_genes)
population.append(solution)
return np.array(population)
def initialize_block_population(self, trainset_population: np.ndarray) -> np.ndarray:
"""Initialize block assignment population.
For each trainset solution, initialize a block assignment solution.
block_solution[i] = index of trainset assigned to block i, or -1 if unassigned.
"""
block_population = []
for trainset_sol in trainset_population:
# Get service train indices
service_indices = np.where(trainset_sol == 0)[0]
if len(service_indices) == 0:
# No service trains, all blocks unassigned
block_sol = np.full(self.n_blocks, -1, dtype=int)
else:
# Distribute blocks evenly among service trains
block_sol = np.zeros(self.n_blocks, dtype=int)
for i in range(self.n_blocks):
# Assign to a random service train
block_sol[i] = service_indices[i % len(service_indices)]
block_population.append(block_sol)
return np.array(block_population)
def tournament_selection(self, population: np.ndarray,
fitness: np.ndarray,
tournament_size: int = 5) -> np.ndarray:
"""Tournament selection for parent selection."""
indices = np.random.choice(len(population), tournament_size, replace=False)
tournament_fitness = fitness[indices]
winner_idx = indices[np.argmin(tournament_fitness)]
return population[winner_idx].copy()
def crossover(self, parent1: np.ndarray, parent2: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""Two-point crossover with repair mechanism."""
if np.random.random() > self.config.crossover_rate:
return parent1.copy(), parent2.copy()
point1, point2 = sorted(np.random.choice(self.n_genes, 2, replace=False))
child1 = parent1.copy()
child2 = parent2.copy()
child1[point1:point2] = parent2[point1:point2]
child2[point1:point2] = parent1[point1:point2]
return child1, child2
def mutate(self, solution: np.ndarray) -> np.ndarray:
"""Random mutation with constraint awareness."""
mutated = solution.copy()
for i in range(self.n_genes):
if np.random.random() < self.config.mutation_rate:
mutated[i] = np.random.randint(0, 3)
return mutated
def repair_solution(self, solution: np.ndarray) -> np.ndarray:
"""Repair solution to meet basic constraints."""
repaired = solution.copy()
# Count current assignments
service_count = np.sum(repaired == 0)
standby_count = np.sum(repaired == 1)
# If too few in service, convert some from maintenance/standby
target_service = min(self.config.required_service_trains, self.n_genes - self.config.min_standby)
if service_count < target_service:
needed = target_service - service_count
candidates = np.where((repaired == 1) | (repaired == 2))[0]
if len(candidates) >= needed:
selected = np.random.choice(candidates, needed, replace=False)
repaired[selected] = 0
# If too few in standby, convert some from maintenance
standby_count = np.sum(repaired == 1)
if standby_count < self.config.min_standby:
needed = self.config.min_standby - standby_count
candidates = np.where(repaired == 2)[0]
if len(candidates) >= needed:
selected = np.random.choice(candidates, min(needed, len(candidates)), replace=False)
repaired[selected] = 1
return repaired
def repair_block_solution(self, block_sol: np.ndarray, trainset_sol: np.ndarray) -> np.ndarray:
"""Repair block assignments to only assign to service trains."""
repaired = block_sol.copy()
service_indices = np.where(trainset_sol == 0)[0]
if len(service_indices) == 0:
return np.full(self.n_blocks, -1, dtype=int)
for i in range(len(repaired)):
if repaired[i] not in service_indices:
# Reassign to a random service train
repaired[i] = np.random.choice(service_indices)
return repaired
def mutate_block_solution(self, block_sol: np.ndarray, service_indices: np.ndarray) -> np.ndarray:
"""Mutate block assignments."""
mutated = block_sol.copy()
if len(service_indices) == 0:
return mutated
for i in range(len(mutated)):
if np.random.random() < self.config.mutation_rate:
mutated[i] = np.random.choice(service_indices)
return mutated
def optimize(self) -> OptimizationResult:
"""Run genetic algorithm optimization."""
population = self.initialize_population()
# Initialize block population if optimizing blocks
block_population = None
if self.optimize_blocks:
block_population = self.initialize_block_population(population)
best_fitness = float('inf')
best_solution: np.ndarray = population[0].copy()
best_block_solution: Optional[np.ndarray] = None
print(f"Starting GA optimization with {self.config.population_size} individuals for {self.config.generations} generations")
if self.optimize_blocks:
print(f"Optimizing block assignments for {self.n_blocks} service blocks")
for gen in range(self.config.generations):
try:
# Evaluate fitness
if self.optimize_blocks and block_population is not None:
fitness = np.array([
self.evaluator.schedule_fitness_function(population[i], block_population[i])
for i in range(len(population))
])
else:
fitness = np.array([self.evaluator.fitness_function(ind) for ind in population])
# Track best solution
gen_best_idx = np.argmin(fitness)
if fitness[gen_best_idx] < best_fitness:
best_fitness = fitness[gen_best_idx]
best_solution = population[gen_best_idx].copy()
if self.optimize_blocks and block_population is not None:
best_block_solution = block_population[gen_best_idx].copy()
if gen % 50 == 0:
print(f"Generation {gen}: Best Fitness = {best_fitness:.2f}")
# Create new population
new_population = []
new_block_population = [] if self.optimize_blocks else None
# Elitism - keep best solutions
elite_indices = np.argsort(fitness)[:self.config.elite_size]
for idx in elite_indices:
new_population.append(population[idx].copy())
if self.optimize_blocks and block_population is not None:
new_block_population.append(block_population[idx].copy())
# Generate offspring through selection, crossover, and mutation
while len(new_population) < self.config.population_size:
parent1 = self.tournament_selection(population, fitness)
parent2 = self.tournament_selection(population, fitness)
child1, child2 = self.crossover(parent1, parent2)
child1 = self.mutate(child1)
child2 = self.mutate(child2)
# Repair solutions to meet basic constraints
child1 = self.repair_solution(child1)
child2 = self.repair_solution(child2)
new_population.append(child1)
if len(new_population) < self.config.population_size:
new_population.append(child2)
# Handle block solutions
if self.optimize_blocks and new_block_population is not None:
service_indices_1 = np.where(child1 == 0)[0]
service_indices_2 = np.where(child2 == 0)[0]
# Create new block assignments for children
block_child1 = self._create_block_for_trainset(child1)
block_child1 = self.mutate_block_solution(block_child1, service_indices_1)
new_block_population.append(block_child1)
if len(new_block_population) < self.config.population_size:
block_child2 = self._create_block_for_trainset(child2)
block_child2 = self.mutate_block_solution(block_child2, service_indices_2)
new_block_population.append(block_child2)
population = np.array(new_population)
if self.optimize_blocks:
block_population = np.array(new_block_population)
except Exception as e:
print(f"Error in generation {gen}: {e}")
break
# Build result
return self._build_result(best_solution, best_fitness, best_block_solution)
def _create_block_for_trainset(self, trainset_sol: np.ndarray) -> np.ndarray:
"""Create block assignments for a trainset solution."""
service_indices = np.where(trainset_sol == 0)[0]
if len(service_indices) == 0:
return np.full(self.n_blocks, -1, dtype=int)
block_sol = np.zeros(self.n_blocks, dtype=int)
for i in range(self.n_blocks):
block_sol[i] = service_indices[i % len(service_indices)]
return block_sol
def _build_result(self, solution: np.ndarray, fitness: float,
block_solution: Optional[np.ndarray] = None) -> OptimizationResult:
"""Build optimization result from solution."""
objectives = self.evaluator.calculate_objectives(solution)
# Decode solution
service = [self.evaluator.trainsets[i] for i, v in enumerate(solution) if v == 0]
standby = [self.evaluator.trainsets[i] for i, v in enumerate(solution) if v == 1]
maintenance = [self.evaluator.trainsets[i] for i, v in enumerate(solution) if v == 2]
# Generate explanations
explanations = {}
for ts_id in service:
valid, reason = self.evaluator.check_hard_constraints(ts_id)
explanations[ts_id] = "✓ Fit for service" if valid else f"⚠ {reason}"
# Build block assignments
block_assignments = {}
if block_solution is not None and self.optimize_blocks:
for ts_id in service:
block_assignments[ts_id] = []
for block_idx, train_idx in enumerate(block_solution):
if 0 <= train_idx < len(self.evaluator.trainsets):
ts_id = self.evaluator.trainsets[train_idx]
if ts_id in block_assignments:
block_id = self.evaluator.all_blocks[block_idx]['block_id']
block_assignments[ts_id].append(block_id)
return OptimizationResult(
selected_trainsets=service,
standby_trainsets=standby,
maintenance_trainsets=maintenance,
objectives=objectives,
fitness_score=fitness,
explanation=explanations,
service_block_assignments=block_assignments
) |