|
|
""" |
|
|
Genetic Algorithm optimizer for trainset scheduling. |
|
|
""" |
|
|
import numpy as np |
|
|
from typing import Tuple, Optional, Dict, List |
|
|
|
|
|
from .models import OptimizationResult, OptimizationConfig |
|
|
from .evaluator import TrainsetSchedulingEvaluator |
|
|
|
|
|
|
|
|
class GeneticAlgorithmOptimizer: |
|
|
"""Genetic Algorithm implementation for trainset scheduling optimization.""" |
|
|
|
|
|
def __init__(self, evaluator: TrainsetSchedulingEvaluator, config: Optional[OptimizationConfig] = None): |
|
|
self.evaluator = evaluator |
|
|
self.config = config or OptimizationConfig() |
|
|
self.n_genes = evaluator.num_trainsets |
|
|
self.n_blocks = evaluator.num_blocks |
|
|
self.optimize_blocks = self.config.optimize_block_assignment |
|
|
|
|
|
def initialize_population(self) -> np.ndarray: |
|
|
"""Initialize random population with smart seeding.""" |
|
|
population = [] |
|
|
|
|
|
|
|
|
for _ in range(self.config.population_size // 2): |
|
|
solution = np.zeros(self.n_genes, dtype=int) |
|
|
|
|
|
|
|
|
indices = np.random.permutation(self.n_genes) |
|
|
service_count = min(self.config.required_service_trains, self.n_genes) |
|
|
standby_count = self.config.min_standby |
|
|
|
|
|
solution[indices[:service_count]] = 0 |
|
|
solution[indices[service_count:service_count + standby_count]] = 1 |
|
|
solution[indices[service_count + standby_count:]] = 2 |
|
|
|
|
|
population.append(solution) |
|
|
|
|
|
|
|
|
for _ in range(self.config.population_size // 2): |
|
|
solution = np.random.randint(0, 3, self.n_genes) |
|
|
population.append(solution) |
|
|
|
|
|
return np.array(population) |
|
|
|
|
|
def initialize_block_population(self, trainset_population: np.ndarray) -> np.ndarray: |
|
|
"""Initialize block assignment population. |
|
|
|
|
|
For each trainset solution, initialize a block assignment solution. |
|
|
block_solution[i] = index of trainset assigned to block i, or -1 if unassigned. |
|
|
""" |
|
|
block_population = [] |
|
|
|
|
|
for trainset_sol in trainset_population: |
|
|
|
|
|
service_indices = np.where(trainset_sol == 0)[0] |
|
|
|
|
|
if len(service_indices) == 0: |
|
|
|
|
|
block_sol = np.full(self.n_blocks, -1, dtype=int) |
|
|
else: |
|
|
|
|
|
block_sol = np.zeros(self.n_blocks, dtype=int) |
|
|
for i in range(self.n_blocks): |
|
|
|
|
|
block_sol[i] = service_indices[i % len(service_indices)] |
|
|
|
|
|
block_population.append(block_sol) |
|
|
|
|
|
return np.array(block_population) |
|
|
|
|
|
def tournament_selection(self, population: np.ndarray, |
|
|
fitness: np.ndarray, |
|
|
tournament_size: int = 5) -> np.ndarray: |
|
|
"""Tournament selection for parent selection.""" |
|
|
indices = np.random.choice(len(population), tournament_size, replace=False) |
|
|
tournament_fitness = fitness[indices] |
|
|
winner_idx = indices[np.argmin(tournament_fitness)] |
|
|
return population[winner_idx].copy() |
|
|
|
|
|
def crossover(self, parent1: np.ndarray, parent2: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: |
|
|
"""Two-point crossover with repair mechanism.""" |
|
|
if np.random.random() > self.config.crossover_rate: |
|
|
return parent1.copy(), parent2.copy() |
|
|
|
|
|
point1, point2 = sorted(np.random.choice(self.n_genes, 2, replace=False)) |
|
|
|
|
|
child1 = parent1.copy() |
|
|
child2 = parent2.copy() |
|
|
|
|
|
child1[point1:point2] = parent2[point1:point2] |
|
|
child2[point1:point2] = parent1[point1:point2] |
|
|
|
|
|
return child1, child2 |
|
|
|
|
|
def mutate(self, solution: np.ndarray) -> np.ndarray: |
|
|
"""Random mutation with constraint awareness.""" |
|
|
mutated = solution.copy() |
|
|
for i in range(self.n_genes): |
|
|
if np.random.random() < self.config.mutation_rate: |
|
|
mutated[i] = np.random.randint(0, 3) |
|
|
return mutated |
|
|
|
|
|
def repair_solution(self, solution: np.ndarray) -> np.ndarray: |
|
|
"""Repair solution to meet basic constraints.""" |
|
|
repaired = solution.copy() |
|
|
|
|
|
|
|
|
service_count = np.sum(repaired == 0) |
|
|
standby_count = np.sum(repaired == 1) |
|
|
|
|
|
|
|
|
target_service = min(self.config.required_service_trains, self.n_genes - self.config.min_standby) |
|
|
if service_count < target_service: |
|
|
needed = target_service - service_count |
|
|
candidates = np.where((repaired == 1) | (repaired == 2))[0] |
|
|
if len(candidates) >= needed: |
|
|
selected = np.random.choice(candidates, needed, replace=False) |
|
|
repaired[selected] = 0 |
|
|
|
|
|
|
|
|
standby_count = np.sum(repaired == 1) |
|
|
if standby_count < self.config.min_standby: |
|
|
needed = self.config.min_standby - standby_count |
|
|
candidates = np.where(repaired == 2)[0] |
|
|
if len(candidates) >= needed: |
|
|
selected = np.random.choice(candidates, min(needed, len(candidates)), replace=False) |
|
|
repaired[selected] = 1 |
|
|
|
|
|
return repaired |
|
|
|
|
|
def repair_block_solution(self, block_sol: np.ndarray, trainset_sol: np.ndarray) -> np.ndarray: |
|
|
"""Repair block assignments to only assign to service trains.""" |
|
|
repaired = block_sol.copy() |
|
|
service_indices = np.where(trainset_sol == 0)[0] |
|
|
|
|
|
if len(service_indices) == 0: |
|
|
return np.full(self.n_blocks, -1, dtype=int) |
|
|
|
|
|
for i in range(len(repaired)): |
|
|
if repaired[i] not in service_indices: |
|
|
|
|
|
repaired[i] = np.random.choice(service_indices) |
|
|
|
|
|
return repaired |
|
|
|
|
|
def mutate_block_solution(self, block_sol: np.ndarray, service_indices: np.ndarray) -> np.ndarray: |
|
|
"""Mutate block assignments.""" |
|
|
mutated = block_sol.copy() |
|
|
|
|
|
if len(service_indices) == 0: |
|
|
return mutated |
|
|
|
|
|
for i in range(len(mutated)): |
|
|
if np.random.random() < self.config.mutation_rate: |
|
|
mutated[i] = np.random.choice(service_indices) |
|
|
|
|
|
return mutated |
|
|
|
|
|
def optimize(self) -> OptimizationResult: |
|
|
"""Run genetic algorithm optimization.""" |
|
|
population = self.initialize_population() |
|
|
|
|
|
|
|
|
block_population = None |
|
|
if self.optimize_blocks: |
|
|
block_population = self.initialize_block_population(population) |
|
|
|
|
|
best_fitness = float('inf') |
|
|
best_solution: np.ndarray = population[0].copy() |
|
|
best_block_solution: Optional[np.ndarray] = None |
|
|
|
|
|
print(f"Starting GA optimization with {self.config.population_size} individuals for {self.config.generations} generations") |
|
|
if self.optimize_blocks: |
|
|
print(f"Optimizing block assignments for {self.n_blocks} service blocks") |
|
|
|
|
|
for gen in range(self.config.generations): |
|
|
try: |
|
|
|
|
|
if self.optimize_blocks and block_population is not None: |
|
|
fitness = np.array([ |
|
|
self.evaluator.schedule_fitness_function(population[i], block_population[i]) |
|
|
for i in range(len(population)) |
|
|
]) |
|
|
else: |
|
|
fitness = np.array([self.evaluator.fitness_function(ind) for ind in population]) |
|
|
|
|
|
|
|
|
gen_best_idx = np.argmin(fitness) |
|
|
if fitness[gen_best_idx] < best_fitness: |
|
|
best_fitness = fitness[gen_best_idx] |
|
|
best_solution = population[gen_best_idx].copy() |
|
|
if self.optimize_blocks and block_population is not None: |
|
|
best_block_solution = block_population[gen_best_idx].copy() |
|
|
|
|
|
if gen % 50 == 0: |
|
|
print(f"Generation {gen}: Best Fitness = {best_fitness:.2f}") |
|
|
|
|
|
|
|
|
new_population = [] |
|
|
new_block_population = [] if self.optimize_blocks else None |
|
|
|
|
|
|
|
|
elite_indices = np.argsort(fitness)[:self.config.elite_size] |
|
|
for idx in elite_indices: |
|
|
new_population.append(population[idx].copy()) |
|
|
if self.optimize_blocks and block_population is not None: |
|
|
new_block_population.append(block_population[idx].copy()) |
|
|
|
|
|
|
|
|
while len(new_population) < self.config.population_size: |
|
|
parent1 = self.tournament_selection(population, fitness) |
|
|
parent2 = self.tournament_selection(population, fitness) |
|
|
|
|
|
child1, child2 = self.crossover(parent1, parent2) |
|
|
child1 = self.mutate(child1) |
|
|
child2 = self.mutate(child2) |
|
|
|
|
|
|
|
|
child1 = self.repair_solution(child1) |
|
|
child2 = self.repair_solution(child2) |
|
|
|
|
|
new_population.append(child1) |
|
|
if len(new_population) < self.config.population_size: |
|
|
new_population.append(child2) |
|
|
|
|
|
|
|
|
if self.optimize_blocks and new_block_population is not None: |
|
|
service_indices_1 = np.where(child1 == 0)[0] |
|
|
service_indices_2 = np.where(child2 == 0)[0] |
|
|
|
|
|
|
|
|
block_child1 = self._create_block_for_trainset(child1) |
|
|
block_child1 = self.mutate_block_solution(block_child1, service_indices_1) |
|
|
|
|
|
new_block_population.append(block_child1) |
|
|
|
|
|
if len(new_block_population) < self.config.population_size: |
|
|
block_child2 = self._create_block_for_trainset(child2) |
|
|
block_child2 = self.mutate_block_solution(block_child2, service_indices_2) |
|
|
new_block_population.append(block_child2) |
|
|
|
|
|
population = np.array(new_population) |
|
|
if self.optimize_blocks: |
|
|
block_population = np.array(new_block_population) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error in generation {gen}: {e}") |
|
|
break |
|
|
|
|
|
|
|
|
return self._build_result(best_solution, best_fitness, best_block_solution) |
|
|
|
|
|
def _create_block_for_trainset(self, trainset_sol: np.ndarray) -> np.ndarray: |
|
|
"""Create block assignments for a trainset solution.""" |
|
|
service_indices = np.where(trainset_sol == 0)[0] |
|
|
|
|
|
if len(service_indices) == 0: |
|
|
return np.full(self.n_blocks, -1, dtype=int) |
|
|
|
|
|
block_sol = np.zeros(self.n_blocks, dtype=int) |
|
|
for i in range(self.n_blocks): |
|
|
block_sol[i] = service_indices[i % len(service_indices)] |
|
|
|
|
|
return block_sol |
|
|
|
|
|
def _build_result(self, solution: np.ndarray, fitness: float, |
|
|
block_solution: Optional[np.ndarray] = None) -> OptimizationResult: |
|
|
"""Build optimization result from solution.""" |
|
|
objectives = self.evaluator.calculate_objectives(solution) |
|
|
|
|
|
|
|
|
service = [self.evaluator.trainsets[i] for i, v in enumerate(solution) if v == 0] |
|
|
standby = [self.evaluator.trainsets[i] for i, v in enumerate(solution) if v == 1] |
|
|
maintenance = [self.evaluator.trainsets[i] for i, v in enumerate(solution) if v == 2] |
|
|
|
|
|
|
|
|
explanations = {} |
|
|
for ts_id in service: |
|
|
valid, reason = self.evaluator.check_hard_constraints(ts_id) |
|
|
explanations[ts_id] = "✓ Fit for service" if valid else f"⚠ {reason}" |
|
|
|
|
|
|
|
|
block_assignments = {} |
|
|
if block_solution is not None and self.optimize_blocks: |
|
|
for ts_id in service: |
|
|
block_assignments[ts_id] = [] |
|
|
|
|
|
for block_idx, train_idx in enumerate(block_solution): |
|
|
if 0 <= train_idx < len(self.evaluator.trainsets): |
|
|
ts_id = self.evaluator.trainsets[train_idx] |
|
|
if ts_id in block_assignments: |
|
|
block_id = self.evaluator.all_blocks[block_idx]['block_id'] |
|
|
block_assignments[ts_id].append(block_id) |
|
|
|
|
|
return OptimizationResult( |
|
|
selected_trainsets=service, |
|
|
standby_trainsets=standby, |
|
|
maintenance_trainsets=maintenance, |
|
|
objectives=objectives, |
|
|
fitness_score=fitness, |
|
|
explanation=explanations, |
|
|
service_block_assignments=block_assignments |
|
|
) |