|
|
""" |
|
|
OR-Tools based optimizers for trainset scheduling. |
|
|
Provides exact and constraint programming solutions. |
|
|
""" |
|
|
import numpy as np |
|
|
from typing import Dict, List, Optional, Tuple |
|
|
import logging |
|
|
|
|
|
try: |
|
|
from ortools.sat.python import cp_model |
|
|
from ortools.linear_solver import pywraplp |
|
|
ORTOOLS_AVAILABLE = True |
|
|
except ImportError: |
|
|
ORTOOLS_AVAILABLE = False |
|
|
cp_model = None |
|
|
pywraplp = None |
|
|
|
|
|
from .models import OptimizationResult, OptimizationConfig |
|
|
from .evaluator import TrainsetSchedulingEvaluator |
|
|
|
|
|
|
|
|
class CPSATOptimizer: |
|
|
"""Constraint Programming optimizer using OR-Tools CP-SAT solver.""" |
|
|
|
|
|
def __init__(self, evaluator: TrainsetSchedulingEvaluator, config: Optional[OptimizationConfig] = None): |
|
|
if not ORTOOLS_AVAILABLE: |
|
|
raise ImportError("OR-Tools not available. Install with: pip install ortools") |
|
|
|
|
|
self.evaluator = evaluator |
|
|
self.config = config or OptimizationConfig() |
|
|
self.n_trainsets = evaluator.num_trainsets |
|
|
|
|
|
|
|
|
self.logger = logging.getLogger(__name__) |
|
|
|
|
|
def optimize(self, time_limit_seconds: int = 300) -> OptimizationResult: |
|
|
"""Solve using CP-SAT with constraint programming.""" |
|
|
if not ORTOOLS_AVAILABLE: |
|
|
raise ImportError("OR-Tools not available") |
|
|
|
|
|
model = cp_model.CpModel() |
|
|
|
|
|
|
|
|
|
|
|
assignment = [] |
|
|
for i in range(self.n_trainsets): |
|
|
assignment.append(model.NewIntVar(0, 2, f'trainset_{i}')) |
|
|
|
|
|
|
|
|
is_service = [] |
|
|
is_standby = [] |
|
|
is_maintenance = [] |
|
|
|
|
|
for i in range(self.n_trainsets): |
|
|
is_service.append(model.NewBoolVar(f'service_{i}')) |
|
|
is_standby.append(model.NewBoolVar(f'standby_{i}')) |
|
|
is_maintenance.append(model.NewBoolVar(f'maintenance_{i}')) |
|
|
|
|
|
|
|
|
model.Add(assignment[i] == 0).OnlyEnforceIf(is_service[i]) |
|
|
model.Add(assignment[i] != 0).OnlyEnforceIf(is_service[i].Not()) |
|
|
model.Add(assignment[i] == 1).OnlyEnforceIf(is_standby[i]) |
|
|
model.Add(assignment[i] != 1).OnlyEnforceIf(is_standby[i].Not()) |
|
|
model.Add(assignment[i] == 2).OnlyEnforceIf(is_maintenance[i]) |
|
|
model.Add(assignment[i] != 2).OnlyEnforceIf(is_maintenance[i].Not()) |
|
|
|
|
|
|
|
|
model.Add(sum(is_service) == self.config.required_service_trains) |
|
|
|
|
|
|
|
|
model.Add(sum(is_standby) >= self.config.min_standby) |
|
|
|
|
|
|
|
|
for i, trainset_id in enumerate(self.evaluator.trainsets): |
|
|
valid, reason = self.evaluator.check_hard_constraints(trainset_id) |
|
|
if not valid: |
|
|
|
|
|
model.Add(assignment[i] == 2) |
|
|
self.logger.info(f"Trainset {trainset_id} forced to maintenance: {reason}") |
|
|
|
|
|
|
|
|
self._add_branding_constraints(model, is_service) |
|
|
|
|
|
|
|
|
self._set_multi_objective(model, is_service, is_standby, assignment) |
|
|
|
|
|
|
|
|
solver = cp_model.CpSolver() |
|
|
solver.parameters.max_time_in_seconds = time_limit_seconds |
|
|
solver.parameters.log_search_progress = True |
|
|
|
|
|
print(f"Solving with CP-SAT (time limit: {time_limit_seconds}s)...") |
|
|
status = solver.Solve(model) |
|
|
|
|
|
if status == cp_model.OPTIMAL: |
|
|
print("β
Optimal solution found!") |
|
|
elif status == cp_model.FEASIBLE: |
|
|
print("β
Feasible solution found!") |
|
|
else: |
|
|
print("β No solution found!") |
|
|
return self._create_fallback_solution() |
|
|
|
|
|
|
|
|
solution = np.array([solver.Value(assignment[i]) for i in range(self.n_trainsets)]) |
|
|
return self._build_result(solution, solver.ObjectiveValue()) |
|
|
|
|
|
def _add_branding_constraints(self, model, is_service: List): |
|
|
"""Add branding contract constraints.""" |
|
|
|
|
|
brand_trainsets = {} |
|
|
for i, trainset_id in enumerate(self.evaluator.trainsets): |
|
|
if trainset_id in self.evaluator.brand_map: |
|
|
brand = self.evaluator.brand_map[trainset_id].get('brand_name', 'Unknown') |
|
|
if brand not in brand_trainsets: |
|
|
brand_trainsets[brand] = [] |
|
|
brand_trainsets[brand].append(i) |
|
|
|
|
|
|
|
|
for brand, trainset_indices in brand_trainsets.items(): |
|
|
if len(trainset_indices) > 1: |
|
|
|
|
|
min_branded = max(1, len(trainset_indices) // 2) |
|
|
model.Add(sum(is_service[i] for i in trainset_indices) >= min_branded) |
|
|
|
|
|
def _set_multi_objective(self, model, is_service: List, is_standby: List, assignment: List): |
|
|
"""Set up multi-objective optimization using weighted sum.""" |
|
|
|
|
|
|
|
|
objective_terms = [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
mileage_ranges = self._categorize_trainsets_by_mileage() |
|
|
for range_name, trainset_indices in mileage_ranges.items(): |
|
|
if len(trainset_indices) > 1: |
|
|
|
|
|
range_service_vars = [is_service[i] for i in trainset_indices] |
|
|
|
|
|
avg_target = (self.config.required_service_trains * len(trainset_indices)) // self.n_trainsets |
|
|
if avg_target > 0: |
|
|
deviation_var = model.NewIntVar(0, len(trainset_indices), f'dev_{range_name}') |
|
|
model.Add(sum(range_service_vars) - avg_target <= deviation_var) |
|
|
model.Add(avg_target - sum(range_service_vars) <= deviation_var) |
|
|
objective_terms.append((-10, deviation_var)) |
|
|
|
|
|
|
|
|
brand_compliance_score = 0 |
|
|
for i, trainset_id in enumerate(self.evaluator.trainsets): |
|
|
if trainset_id in self.evaluator.brand_map: |
|
|
|
|
|
objective_terms.append((50, is_service[i])) |
|
|
|
|
|
|
|
|
for i, trainset_id in enumerate(self.evaluator.trainsets): |
|
|
constraints = self.evaluator.get_trainset_constraints(trainset_id) |
|
|
if constraints.maintenance_due: |
|
|
|
|
|
objective_terms.append((20, assignment[i] == 2)) |
|
|
|
|
|
|
|
|
if objective_terms: |
|
|
model.Maximize(sum(weight * var for weight, var in objective_terms)) |
|
|
else: |
|
|
|
|
|
model.Maximize(sum(is_service)) |
|
|
|
|
|
def _categorize_trainsets_by_mileage(self) -> Dict[str, List[int]]: |
|
|
"""Categorize trainsets by mileage ranges for balancing.""" |
|
|
ranges = {'low': [], 'medium': [], 'high': []} |
|
|
|
|
|
mileages = [] |
|
|
for trainset_id in self.evaluator.trainsets: |
|
|
status = self.evaluator.status_map.get(trainset_id, {}) |
|
|
mileage = status.get('total_mileage_km', 0) |
|
|
mileages.append(mileage) |
|
|
|
|
|
if not mileages: |
|
|
return ranges |
|
|
|
|
|
|
|
|
mileages_sorted = sorted(mileages) |
|
|
q1 = mileages_sorted[len(mileages_sorted) // 4] |
|
|
q3 = mileages_sorted[3 * len(mileages_sorted) // 4] |
|
|
|
|
|
for i, trainset_id in enumerate(self.evaluator.trainsets): |
|
|
status = self.evaluator.status_map.get(trainset_id, {}) |
|
|
mileage = status.get('total_mileage_km', 0) |
|
|
|
|
|
if mileage <= q1: |
|
|
ranges['low'].append(i) |
|
|
elif mileage >= q3: |
|
|
ranges['high'].append(i) |
|
|
else: |
|
|
ranges['medium'].append(i) |
|
|
|
|
|
return ranges |
|
|
|
|
|
def _create_fallback_solution(self) -> OptimizationResult: |
|
|
"""Create a basic feasible solution when CP-SAT fails.""" |
|
|
print("Creating fallback solution...") |
|
|
|
|
|
|
|
|
solution = np.full(self.n_trainsets, 2) |
|
|
|
|
|
|
|
|
valid_trainsets = [] |
|
|
for i, trainset_id in enumerate(self.evaluator.trainsets): |
|
|
valid, _ = self.evaluator.check_hard_constraints(trainset_id) |
|
|
if valid: |
|
|
valid_trainsets.append(i) |
|
|
|
|
|
|
|
|
service_count = min(len(valid_trainsets), self.config.required_service_trains) |
|
|
for i in range(service_count): |
|
|
solution[valid_trainsets[i]] = 0 |
|
|
|
|
|
|
|
|
standby_start = service_count |
|
|
standby_count = min(len(valid_trainsets) - service_count, self.config.min_standby) |
|
|
for i in range(standby_count): |
|
|
if standby_start + i < len(valid_trainsets): |
|
|
solution[valid_trainsets[standby_start + i]] = 1 |
|
|
|
|
|
return self._build_result(solution, float('inf')) |
|
|
|
|
|
def _build_result(self, solution: np.ndarray, objective_value: float) -> OptimizationResult: |
|
|
"""Build optimization result from solution.""" |
|
|
objectives = self.evaluator.calculate_objectives(solution) |
|
|
|
|
|
service = [self.evaluator.trainsets[i] for i, v in enumerate(solution) if v == 0] |
|
|
standby = [self.evaluator.trainsets[i] for i, v in enumerate(solution) if v == 1] |
|
|
maintenance = [self.evaluator.trainsets[i] for i, v in enumerate(solution) if v == 2] |
|
|
|
|
|
explanations = {} |
|
|
for ts_id in service: |
|
|
valid, reason = self.evaluator.check_hard_constraints(ts_id) |
|
|
explanations[ts_id] = "β
CP-SAT optimal assignment" if valid else f"β {reason}" |
|
|
|
|
|
fitness = self.evaluator.fitness_function(solution) |
|
|
|
|
|
return OptimizationResult( |
|
|
selected_trainsets=service, |
|
|
standby_trainsets=standby, |
|
|
maintenance_trainsets=maintenance, |
|
|
objectives=objectives, |
|
|
fitness_score=fitness, |
|
|
explanation=explanations |
|
|
) |
|
|
|
|
|
|
|
|
class MIPOptimizer: |
|
|
"""Mixed Integer Programming optimizer using OR-Tools.""" |
|
|
|
|
|
def __init__(self, evaluator: TrainsetSchedulingEvaluator, config: Optional[OptimizationConfig] = None): |
|
|
if not ORTOOLS_AVAILABLE: |
|
|
raise ImportError("OR-Tools not available. Install with: pip install ortools") |
|
|
|
|
|
self.evaluator = evaluator |
|
|
self.config = config or OptimizationConfig() |
|
|
self.n_trainsets = evaluator.num_trainsets |
|
|
|
|
|
def optimize(self, time_limit_seconds: int = 300) -> OptimizationResult: |
|
|
"""Solve using Mixed Integer Programming.""" |
|
|
solver = pywraplp.Solver.CreateSolver('SCIP') |
|
|
if not solver: |
|
|
print("β SCIP solver not available, falling back to CP-SAT") |
|
|
cp_optimizer = CPSATOptimizer(self.evaluator, self.config) |
|
|
return cp_optimizer.optimize(time_limit_seconds) |
|
|
|
|
|
|
|
|
x = {} |
|
|
for i in range(self.n_trainsets): |
|
|
for j in range(3): |
|
|
x[i, j] = solver.BoolVar(f'x_{i}_{j}') |
|
|
|
|
|
|
|
|
for i in range(self.n_trainsets): |
|
|
solver.Add(sum(x[i, j] for j in range(3)) == 1) |
|
|
|
|
|
|
|
|
solver.Add(sum(x[i, 0] for i in range(self.n_trainsets)) == self.config.required_service_trains) |
|
|
|
|
|
|
|
|
solver.Add(sum(x[i, 1] for i in range(self.n_trainsets)) >= self.config.min_standby) |
|
|
|
|
|
|
|
|
for i, trainset_id in enumerate(self.evaluator.trainsets): |
|
|
valid, _ = self.evaluator.check_hard_constraints(trainset_id) |
|
|
if not valid: |
|
|
|
|
|
solver.Add(x[i, 2] == 1) |
|
|
|
|
|
|
|
|
objective = solver.Objective() |
|
|
|
|
|
|
|
|
for i, trainset_id in enumerate(self.evaluator.trainsets): |
|
|
if trainset_id in self.evaluator.brand_map: |
|
|
objective.SetCoefficient(x[i, 0], 100) |
|
|
|
|
|
|
|
|
for i, trainset_id in enumerate(self.evaluator.trainsets): |
|
|
status = self.evaluator.status_map.get(trainset_id, {}) |
|
|
mileage = status.get('total_mileage_km', 0) |
|
|
|
|
|
weight = max(1, 200000 - mileage) // 1000 |
|
|
objective.SetCoefficient(x[i, 0], weight) |
|
|
|
|
|
objective.SetMaximization() |
|
|
|
|
|
|
|
|
solver.SetTimeLimit(time_limit_seconds * 1000) |
|
|
print(f"Solving with MIP (time limit: {time_limit_seconds}s)...") |
|
|
status = solver.Solve() |
|
|
|
|
|
if status == pywraplp.Solver.OPTIMAL: |
|
|
print("β
Optimal MIP solution found!") |
|
|
elif status == pywraplp.Solver.FEASIBLE: |
|
|
print("β
Feasible MIP solution found!") |
|
|
else: |
|
|
print("β No MIP solution found!") |
|
|
return self._create_fallback_solution() |
|
|
|
|
|
|
|
|
solution = np.zeros(self.n_trainsets, dtype=int) |
|
|
for i in range(self.n_trainsets): |
|
|
for j in range(3): |
|
|
if x[i, j].solution_value() > 0.5: |
|
|
solution[i] = j |
|
|
break |
|
|
|
|
|
return self._build_result(solution, solver.Objective().Value()) |
|
|
|
|
|
def _create_fallback_solution(self) -> OptimizationResult: |
|
|
"""Create fallback solution for MIP.""" |
|
|
cp_optimizer = CPSATOptimizer(self.evaluator, self.config) |
|
|
return cp_optimizer._create_fallback_solution() |
|
|
|
|
|
def _build_result(self, solution: np.ndarray, objective_value: float) -> OptimizationResult: |
|
|
"""Build result from MIP solution.""" |
|
|
cp_optimizer = CPSATOptimizer(self.evaluator, self.config) |
|
|
return cp_optimizer._build_result(solution, objective_value) |
|
|
|
|
|
|
|
|
|
|
|
def optimize_with_ortools(data: Dict, method: str = 'cp-sat', **kwargs) -> OptimizationResult: |
|
|
"""Optimize using OR-Tools methods. |
|
|
|
|
|
Args: |
|
|
data: Metro synthetic data |
|
|
method: 'cp-sat' or 'mip' |
|
|
**kwargs: Additional parameters (time_limit_seconds, etc.) |
|
|
""" |
|
|
if not ORTOOLS_AVAILABLE: |
|
|
raise ImportError( |
|
|
"OR-Tools not available. Install with: pip install ortools\n" |
|
|
"OR-Tools provides exact optimization methods that complement the existing metaheuristics." |
|
|
) |
|
|
|
|
|
from .evaluator import TrainsetSchedulingEvaluator |
|
|
|
|
|
evaluator = TrainsetSchedulingEvaluator(data) |
|
|
config = kwargs.get('config', OptimizationConfig()) |
|
|
time_limit = kwargs.get('time_limit_seconds', 300) |
|
|
|
|
|
if method == 'cp-sat': |
|
|
optimizer = CPSATOptimizer(evaluator, config) |
|
|
return optimizer.optimize(time_limit) |
|
|
elif method == 'mip': |
|
|
optimizer = MIPOptimizer(evaluator, config) |
|
|
return optimizer.optimize(time_limit) |
|
|
else: |
|
|
raise ValueError(f"Unknown OR-Tools method: {method}. Use 'cp-sat' or 'mip'") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
import json |
|
|
|
|
|
|
|
|
if not ORTOOLS_AVAILABLE: |
|
|
print("β OR-Tools not available. Install with: pip install ortools") |
|
|
exit(1) |
|
|
|
|
|
|
|
|
try: |
|
|
with open('metro_enhanced_data.json', 'r') as f: |
|
|
data = json.load(f) |
|
|
except FileNotFoundError: |
|
|
print("Please generate enhanced data first") |
|
|
exit(1) |
|
|
|
|
|
print("π§ Testing OR-Tools Optimization") |
|
|
print("=" * 50) |
|
|
|
|
|
|
|
|
print("\nTesting CP-SAT optimizer...") |
|
|
result_cpsat = optimize_with_ortools(data, 'cp-sat', time_limit_seconds=60) |
|
|
print(f"CP-SAT Result: {len(result_cpsat.selected_trainsets)} in service") |
|
|
|
|
|
|
|
|
print("\nTesting MIP optimizer...") |
|
|
result_mip = optimize_with_ortools(data, 'mip', time_limit_seconds=60) |
|
|
print(f"MIP Result: {len(result_mip.selected_trainsets)} in service") |
|
|
|
|
|
print("\nβ
OR-Tools integration successful!") |