Arpit-Bansal's picture
added ton of shit
33c9f7f
"""
Refactored base.py - now uses modular structure for better maintainability.
This module has been restructured into separate files:
- models.py: Data classes and configurations
- evaluator.py: Constraint checking and objective evaluation
- genetic_algorithm.py: Genetic Algorithm implementation
- advanced_optimizers.py: CMA-ES, PSO, and Simulated Annealing
- scheduler.py: Main interface and comparison tools
For backward compatibility, the main functions are still available here.
"""
import json
from typing import Dict, Tuple, Optional
import numpy as np
from datetime import datetime
# Import from new modular structure
from .scheduler import optimize_trainset_schedule, compare_optimization_methods
from .models import OptimizationResult, OptimizationConfig
# For backward compatibility, expose the main function with original signature
def optimize_trainset_schedule_main(data: Dict, method: str = 'ga') -> OptimizationResult:
"""Multi-objective optimizer for trainset scheduling using genetic algorithm.
This is a backward compatibility wrapper around the new modular system.
"""
# Use the new modular system
config = OptimizationConfig()
return optimize_trainset_schedule(data, method, config)
# Usage example
if __name__ == "__main__":
# Load synthetic data
try:
with open('metro_synthetic_data.json', 'r') as f:
data = json.load(f)
except FileNotFoundError:
print("Please generate synthetic data first")
exit(1)
# Run optimization with Genetic Algorithm (backward compatibility)
result_ga = optimize_trainset_schedule_main(data, method='ga')
print(f"\nOptimization completed!")
print(f"Service trainsets: {len(result_ga.selected_trainsets)}")
print(f"Standby trainsets: {len(result_ga.standby_trainsets)}")
print(f"Maintenance trainsets: {len(result_ga.maintenance_trainsets)}")
print(f"Final fitness score: {result_ga.fitness_score:.2f}")
# You can also use the new modular interface
print(f"\nFor more advanced features, use:")
print(f"from greedyOptim import optimize_trainset_schedule, compare_optimization_methods")
# Fitness certificates by trainset and department
self.fitness_map = {}
for cert in self.data['fitness_certificates']:
ts_id = cert['trainset_id']
if ts_id not in self.fitness_map:
self.fitness_map[ts_id] = {}
self.fitness_map[ts_id][cert['department']] = cert
# Job cards by trainset
self.job_map = {}
for job in self.data['job_cards']:
ts_id = job['trainset_id']
if ts_id not in self.job_map:
self.job_map[ts_id] = []
self.job_map[ts_id].append(job)
# Component health by trainset
self.health_map = {}
for health in self.data['component_health']:
ts_id = health['trainset_id']
if ts_id not in self.health_map:
self.health_map[ts_id] = []
self.health_map[ts_id].append(health)
# Branding contracts
self.brand_map = {b['trainset_id']: b for b in self.data['branding_contracts']}
# Maintenance schedule
self.maint_map = {m['trainset_id']: m for m in self.data['maintenance_schedule']}
def check_hard_constraints(self, trainset_id: str) -> Tuple[bool, str]:
"""Check if trainset passes hard constraints"""
# Check fitness certificates
if trainset_id in self.fitness_map:
for dept, cert in self.fitness_map[trainset_id].items():
if cert['status'] in ['Expired']:
return False, f"Expired {dept} certificate"
expiry = datetime.fromisoformat(cert['expiry_date'])
if expiry < datetime.now():
return False, f"{dept} certificate expired"
else:
return False, "Missing fitness certificates"
# Check open critical job cards
if trainset_id in self.job_map:
for job in self.job_map[trainset_id]:
if job['status'] == 'Open' and job['priority'] == 'Critical':
return False, f"Critical job card {job['job_card_id']} open"
# Check component health
if trainset_id in self.health_map:
for health in self.health_map[trainset_id]:
if health['status'] == 'Warning' and health['wear_level'] > 90:
return False, f"{health['component']} critical wear"
return True, "Pass"
def calculate_objectives(self, solution: np.ndarray) -> Dict[str, float]:
"""Calculate multiple objectives for a solution
Solution encoding: 0=Service, 1=Standby, 2=Maintenance
"""
objectives = {
'service_availability': 0.0,
'maintenance_cost': 0.0,
'branding_compliance': 0.0,
'mileage_balance': 0.0,
'constraint_penalty': 0.0
}
service_trains = []
standby_trains = []
maint_trains = []
for idx, action in enumerate(solution):
ts_id = self.trainsets[idx]
if action == 0:
service_trains.append(ts_id)
elif action == 1:
standby_trains.append(ts_id)
else:
maint_trains.append(ts_id)
# Objective 1: Service Availability (maximize)
# Penalty if not meeting required count
availability = len(service_trains) / self.required_service_trains
if len(service_trains) < self.required_service_trains:
objectives['constraint_penalty'] += (self.required_service_trains - len(service_trains)) * 100
objectives['service_availability'] = min(availability, 1.0) * 100
# Objective 2: Maintenance Cost (minimize via mileage balancing)
mileages = [self.status_map[ts]['total_mileage_km'] for ts in service_trains]
if mileages:
std_dev = float(np.std(mileages))
objectives['mileage_balance'] = 100.0 - min(std_dev / 1000.0, 100.0) # Lower std = better balance
# Objective 3: Branding Compliance (maximize)
brand_scores = []
for ts_id in service_trains:
if ts_id in self.brand_map:
contract = self.brand_map[ts_id]
target = contract['daily_target_hours']
actual = contract['actual_exposure_hours'] / 30 # Daily average
compliance = min(actual / target, 1.0) if target > 0 else 1.0
brand_scores.append(compliance)
objectives['branding_compliance'] = float(np.mean(brand_scores)) * 100.0 if brand_scores else 100.0
# Objective 4: Maintenance Cost (minimize)
# Trains needing maintenance but not scheduled increase cost
maint_cost = 0
for ts_id in service_trains:
if ts_id in self.maint_map:
if self.maint_map[ts_id]['status'] == 'Overdue':
maint_cost += 50 # Penalty for overdue maintenance
objectives['maintenance_cost'] = 100 - min(maint_cost, 100)
# Hard constraint violations
for ts_id in service_trains:
valid, reason = self.check_hard_constraints(ts_id)
if not valid:
objectives['constraint_penalty'] += 200
# Standby constraint
if len(standby_trains) < self.min_standby:
objectives['constraint_penalty'] += (self.min_standby - len(standby_trains)) * 50
return objectives
def fitness_function(self, solution: np.ndarray) -> float:
"""Aggregate fitness function (minimize)"""
obj = self.calculate_objectives(solution)
# Weighted sum (convert to minimization)
fitness = (
-obj['service_availability'] * 2.0 + # Maximize (negative weight)
-obj['branding_compliance'] * 1.5 + # Maximize
-obj['mileage_balance'] * 1.0 + # Maximize
-obj['maintenance_cost'] * 1.0 + # Maximize
obj['constraint_penalty'] * 5.0 # Minimize (positive weight)
)
return fitness
class GeneticAlgorithmOptimizer:
"""Custom Genetic Algorithm implementation"""
def __init__(self, evaluator: TrainsetSchedulingOptimizer,
population_size: int = 100,
generations: int = 200):
self.evaluator = evaluator
self.pop_size = population_size
self.generations = generations
self.n_genes = evaluator.num_trainsets
# GA parameters
self.mutation_rate = 0.1
self.crossover_rate = 0.8
self.elite_size = 5
def initialize_population(self) -> np.ndarray:
"""Initialize random population"""
population = []
# Seeded solutions
for _ in range(self.pop_size // 2):
solution = np.zeros(self.n_genes, dtype=int)
# Randomly assign 20 to service, 2+ to standby, rest to maintenance
indices = np.random.permutation(self.n_genes)
solution[indices[:20]] = 0 # Service
solution[indices[20:23]] = 1 # Standby
solution[indices[23:]] = 2 # Maintenance
population.append(solution)
# Random solutions
for _ in range(self.pop_size // 2):
solution = np.random.randint(0, 3, self.n_genes)
population.append(solution)
return np.array(population)
def tournament_selection(self, population: np.ndarray,
fitness: np.ndarray,
tournament_size: int = 5) -> np.ndarray:
"""Tournament selection"""
indices = np.random.choice(len(population), tournament_size, replace=False)
tournament_fitness = fitness[indices]
winner_idx = indices[np.argmin(tournament_fitness)]
return population[winner_idx].copy()
def crossover(self, parent1: np.ndarray, parent2: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""Two-point crossover"""
if np.random.random() > self.crossover_rate:
return parent1.copy(), parent2.copy()
point1, point2 = sorted(np.random.choice(self.n_genes, 2, replace=False))
child1 = parent1.copy()
child2 = parent2.copy()
child1[point1:point2] = parent2[point1:point2]
child2[point1:point2] = parent1[point1:point2]
return child1, child2
def mutate(self, solution: np.ndarray) -> np.ndarray:
"""Random mutation"""
mutated = solution.copy()
for i in range(self.n_genes):
if np.random.random() < self.mutation_rate:
mutated[i] = np.random.randint(0, 3)
return mutated
def optimize(self) -> OptimizationResult:
"""Run genetic algorithm"""
population = self.initialize_population()
best_fitness = float('inf')
best_solution: Optional[np.ndarray] = None
for gen in range(self.generations):
# Evaluate fitness
fitness = np.array([self.evaluator.fitness_function(ind) for ind in population])
# Track best
gen_best_idx = np.argmin(fitness)
if fitness[gen_best_idx] < best_fitness:
best_fitness = fitness[gen_best_idx]
best_solution = population[gen_best_idx].copy()
if gen % 50 == 0:
print(f"Generation {gen}: Best Fitness = {best_fitness:.2f}")
# Selection and reproduction
new_population = []
# Elitism
elite_indices = np.argsort(fitness)[:self.elite_size]
for idx in elite_indices:
new_population.append(population[idx].copy())
# Generate offspring
while len(new_population) < self.pop_size:
parent1 = self.tournament_selection(population, fitness)
parent2 = self.tournament_selection(population, fitness)
child1, child2 = self.crossover(parent1, parent2)
child1 = self.mutate(child1)
child2 = self.mutate(child2)
new_population.append(child1)
if len(new_population) < self.pop_size:
new_population.append(child2)
population = np.array(new_population)
# Final evaluation
if best_solution is None:
raise RuntimeError("No valid solution found during optimization")
objectives = self.evaluator.calculate_objectives(best_solution)
# Decode solution
service = [self.evaluator.trainsets[i] for i, v in enumerate(best_solution) if v == 0]
standby = [self.evaluator.trainsets[i] for i, v in enumerate(best_solution) if v == 1]
maintenance = [self.evaluator.trainsets[i] for i, v in enumerate(best_solution) if v == 2]
# Generate explanations
explanations = {}
for ts_id in service:
valid, reason = self.evaluator.check_hard_constraints(ts_id)
status = "✓ Fit for service" if valid else f"⚠ {reason}"
explanations[ts_id] = status
return OptimizationResult(
selected_trainsets=service,
standby_trainsets=standby,
maintenance_trainsets=maintenance,
objectives=objectives,
fitness_score=best_fitness,
explanation=explanations
)
class CMAESOptimizer:
"""CMA-ES (Covariance Matrix Adaptation Evolution Strategy)"""
def __init__(self, evaluator: TrainsetSchedulingOptimizer, population_size: int = 50):
self.evaluator = evaluator
self.n = evaluator.num_trainsets
self.lam = population_size # Population size
self.mu = population_size // 2 # Number of parents
# Initialize
self.mean = np.random.rand(self.n) * 3 # Initial mean in [0, 3)
self.sigma = 1.0 # Step size
self.C = np.eye(self.n) # Covariance matrix
# Strategy parameters
self.weights = np.log(self.mu + 0.5) - np.log(np.arange(1, self.mu + 1))
self.weights /= self.weights.sum()
self.mu_eff = 1.0 / (self.weights ** 2).sum()
self.cc = 4.0 / (self.n + 4.0)
self.cs = (self.mu_eff + 2.0) / (self.n + self.mu_eff + 5.0)
self.c1 = 2.0 / ((self.n + 1.3) ** 2 + self.mu_eff)
self.cmu = min(1 - self.c1, 2 * (self.mu_eff - 2 + 1 / self.mu_eff) / ((self.n + 2) ** 2 + self.mu_eff))
self.damps = 1.0 + 2.0 * max(0, np.sqrt((self.mu_eff - 1) / (self.n + 1)) - 1) + self.cs
self.pc = np.zeros(self.n)
self.ps = np.zeros(self.n)
def _decode(self, x: np.ndarray) -> np.ndarray:
"""Decode continuous values to discrete actions"""
return np.clip(np.round(x), 0, 2).astype(int)
def optimize(self, generations: int = 150) -> OptimizationResult:
"""Run CMA-ES optimization"""
best_fitness = float('inf')
best_solution: Optional[np.ndarray] = None
for gen in range(generations):
# Sample population
population = []
for _ in range(self.lam):
z = np.random.randn(self.n)
y = self.mean + self.sigma * (self.C @ z)
population.append(y)
population = np.array(population)
# Evaluate
fitness = []
decoded_pop = []
for ind in population:
decoded = self._decode(ind)
decoded_pop.append(decoded)
fitness.append(self.evaluator.fitness_function(decoded))
fitness = np.array(fitness)
decoded_pop = np.array(decoded_pop)
# Track best
gen_best_idx = np.argmin(fitness)
if fitness[gen_best_idx] < best_fitness:
best_fitness = fitness[gen_best_idx]
best_solution = decoded_pop[gen_best_idx].copy()
if gen % 30 == 0:
print(f"Generation {gen}: Best Fitness = {best_fitness:.2f}")
# Selection and recombination
sorted_indices = np.argsort(fitness)[:self.mu]
selected = population[sorted_indices]
old_mean = self.mean.copy()
self.mean = selected.T @ self.weights
# Update evolution paths
self.ps = (1 - self.cs) * self.ps + np.sqrt(self.cs * (2 - self.cs) * self.mu_eff) * (self.mean - old_mean) / self.sigma
hsig = (np.linalg.norm(self.ps) / np.sqrt(1 - (1 - self.cs) ** (2 * gen))
< (1.4 + 2 / (self.n + 1)) * np.sqrt(self.n))
self.pc = (1 - self.cc) * self.pc + hsig * np.sqrt(self.cc * (2 - self.cc) * self.mu_eff) * (self.mean - old_mean) / self.sigma
# Update covariance matrix
artmp = (selected - old_mean) / self.sigma
self.C = ((1 - self.c1 - self.cmu) * self.C
+ self.c1 * np.outer(self.pc, self.pc)
+ self.cmu * (artmp.T @ np.diag(self.weights) @ artmp))
# Update step size
self.sigma *= np.exp((self.cs / self.damps) * (np.linalg.norm(self.ps) / np.sqrt(self.n) - 1))
if best_solution is None:
raise RuntimeError("No valid solution found during CMA-ES optimization")
objectives = self.evaluator.calculate_objectives(best_solution)
service = [self.evaluator.trainsets[i] for i, v in enumerate(best_solution) if v == 0]
standby = [self.evaluator.trainsets[i] for i, v in enumerate(best_solution) if v == 1]
maintenance = [self.evaluator.trainsets[i] for i, v in enumerate(best_solution) if v == 2]
explanations = {}
for ts_id in service:
valid, reason = self.evaluator.check_hard_constraints(ts_id)
explanations[ts_id] = "✓ Fit for service" if valid else f"⚠ {reason}"
return OptimizationResult(
selected_trainsets=service,
standby_trainsets=standby,
maintenance_trainsets=maintenance,
objectives=objectives,
fitness_score=best_fitness,
explanation=explanations
)
def optimize_trainset_schedule(data: Dict, method: str = 'ga') -> OptimizationResult:
"""Main optimization function
Args:
data: Synthetic metro data dictionary
method: 'ga' for Genetic Algorithm, 'cmaes' for CMA-ES
"""
evaluator = TrainsetSchedulingOptimizer(data)
print(f"\nOptimizing with {method.upper()}...")
print(f"Trainsets: {evaluator.num_trainsets}")
print(f"Required in service: {evaluator.required_service_trains}")
print(f"Minimum standby: {evaluator.min_standby}\n")
if method == 'ga':
optimizer = GeneticAlgorithmOptimizer(evaluator, population_size=100, generations=200)
elif method == 'cmaes':
optimizer = CMAESOptimizer(evaluator, population_size=50)
return optimizer.optimize(generations=150)
else:
raise ValueError(f"Unknown method: {method}")
result = optimizer.optimize()
print("\n" + "="*60)
print("OPTIMIZATION RESULTS")
print("="*60)
print(f"\nService Trainsets ({len(result.selected_trainsets)}):")
for ts in result.selected_trainsets[:5]:
print(f" {ts}: {result.explanation.get(ts, 'N/A')}")
if len(result.selected_trainsets) > 5:
print(f" ... and {len(result.selected_trainsets) - 5} more")
print(f"\nStandby Trainsets ({len(result.standby_trainsets)}):")
for ts in result.standby_trainsets:
print(f" {ts}")
print(f"\nMaintenance Trainsets ({len(result.maintenance_trainsets)}):")
for ts in result.maintenance_trainsets:
print(f" {ts}")
print(f"\nObjectives:")
for key, value in result.objectives.items():
print(f" {key}: {value:.2f}")
print(f"\nFitness Score: {result.fitness_score:.2f}")
return result
# Usage example
if __name__ == "__main__":
# Load synthetic data
with open('metro_synthetic_data.json', 'r') as f:
data = json.load(f)
# Run optimization with Genetic Algorithm
result_ga = optimize_trainset_schedule(data, method='ga')
# Run optimization with CMA-ES
result_cmaes = optimize_trainset_schedule(data, method='cmaes')