Arpit-Bansal's picture
added ton of shit
33c9f7f
"""
OR-Tools based optimizers for trainset scheduling.
Provides exact and constraint programming solutions.
"""
import numpy as np
from typing import Dict, List, Optional, Tuple
import logging
try:
from ortools.sat.python import cp_model
from ortools.linear_solver import pywraplp
ORTOOLS_AVAILABLE = True
except ImportError:
ORTOOLS_AVAILABLE = False
cp_model = None # type: ignore
pywraplp = None # type: ignore
from .models import OptimizationResult, OptimizationConfig
from .evaluator import TrainsetSchedulingEvaluator
class CPSATOptimizer:
"""Constraint Programming optimizer using OR-Tools CP-SAT solver."""
def __init__(self, evaluator: TrainsetSchedulingEvaluator, config: Optional[OptimizationConfig] = None):
if not ORTOOLS_AVAILABLE:
raise ImportError("OR-Tools not available. Install with: pip install ortools")
self.evaluator = evaluator
self.config = config or OptimizationConfig()
self.n_trainsets = evaluator.num_trainsets
# Logging
self.logger = logging.getLogger(__name__)
def optimize(self, time_limit_seconds: int = 300) -> OptimizationResult:
"""Solve using CP-SAT with constraint programming."""
if not ORTOOLS_AVAILABLE:
raise ImportError("OR-Tools not available")
model = cp_model.CpModel()
# Decision variables: assignment[i] ∈ {0, 1, 2} for each trainset
# 0 = Service, 1 = Standby, 2 = Maintenance
assignment = []
for i in range(self.n_trainsets):
assignment.append(model.NewIntVar(0, 2, f'trainset_{i}'))
# Helper binary variables for easier constraint formulation
is_service = []
is_standby = []
is_maintenance = []
for i in range(self.n_trainsets):
is_service.append(model.NewBoolVar(f'service_{i}'))
is_standby.append(model.NewBoolVar(f'standby_{i}'))
is_maintenance.append(model.NewBoolVar(f'maintenance_{i}'))
# Link assignment variables to binary indicators
model.Add(assignment[i] == 0).OnlyEnforceIf(is_service[i])
model.Add(assignment[i] != 0).OnlyEnforceIf(is_service[i].Not())
model.Add(assignment[i] == 1).OnlyEnforceIf(is_standby[i])
model.Add(assignment[i] != 1).OnlyEnforceIf(is_standby[i].Not())
model.Add(assignment[i] == 2).OnlyEnforceIf(is_maintenance[i])
model.Add(assignment[i] != 2).OnlyEnforceIf(is_maintenance[i].Not())
# Constraint 1: Exact number of trains in service
model.Add(sum(is_service) == self.config.required_service_trains)
# Constraint 2: Minimum standby trains
model.Add(sum(is_standby) >= self.config.min_standby)
# Constraint 3: Hard constraints - trains with issues cannot be in service
for i, trainset_id in enumerate(self.evaluator.trainsets):
valid, reason = self.evaluator.check_hard_constraints(trainset_id)
if not valid:
# Force trainset to maintenance if it fails constraints
model.Add(assignment[i] == 2)
self.logger.info(f"Trainset {trainset_id} forced to maintenance: {reason}")
# Constraint 4: Branding requirements
self._add_branding_constraints(model, is_service)
# Objective: Multi-objective optimization using weighted sum
self._set_multi_objective(model, is_service, is_standby, assignment)
# Solve
solver = cp_model.CpSolver()
solver.parameters.max_time_in_seconds = time_limit_seconds
solver.parameters.log_search_progress = True
print(f"Solving with CP-SAT (time limit: {time_limit_seconds}s)...")
status = solver.Solve(model)
if status == cp_model.OPTIMAL:
print("βœ… Optimal solution found!")
elif status == cp_model.FEASIBLE:
print("βœ… Feasible solution found!")
else:
print("❌ No solution found!")
return self._create_fallback_solution()
# Extract solution
solution = np.array([solver.Value(assignment[i]) for i in range(self.n_trainsets)])
return self._build_result(solution, solver.ObjectiveValue())
def _add_branding_constraints(self, model, is_service: List):
"""Add branding contract constraints."""
# Group trainsets by brand and ensure minimum coverage
brand_trainsets = {}
for i, trainset_id in enumerate(self.evaluator.trainsets):
if trainset_id in self.evaluator.brand_map:
brand = self.evaluator.brand_map[trainset_id].get('brand_name', 'Unknown')
if brand not in brand_trainsets:
brand_trainsets[brand] = []
brand_trainsets[brand].append(i)
# Ensure each brand has at least some representation in service
for brand, trainset_indices in brand_trainsets.items():
if len(trainset_indices) > 1:
# At least 50% of branded trainsets should be in service if possible
min_branded = max(1, len(trainset_indices) // 2)
model.Add(sum(is_service[i] for i in trainset_indices) >= min_branded)
def _set_multi_objective(self, model, is_service: List, is_standby: List, assignment: List):
"""Set up multi-objective optimization using weighted sum."""
# Objective components
objective_terms = []
# 1. Maximize service availability (already satisfied by exact constraint)
# This is handled by the exact constraint, so no need to optimize
# 2. Minimize mileage imbalance - use auxiliary variables for quadratic terms
# Approximate mileage balance by preferring even distribution across mileage ranges
mileage_ranges = self._categorize_trainsets_by_mileage()
for range_name, trainset_indices in mileage_ranges.items():
if len(trainset_indices) > 1:
# Try to balance service assignment across mileage ranges
range_service_vars = [is_service[i] for i in trainset_indices]
# Add soft constraint to balance - minimize deviation from average
avg_target = (self.config.required_service_trains * len(trainset_indices)) // self.n_trainsets
if avg_target > 0:
deviation_var = model.NewIntVar(0, len(trainset_indices), f'dev_{range_name}')
model.Add(sum(range_service_vars) - avg_target <= deviation_var)
model.Add(avg_target - sum(range_service_vars) <= deviation_var)
objective_terms.append((-10, deviation_var)) # Minimize deviation
# 3. Maximize branding compliance
brand_compliance_score = 0
for i, trainset_id in enumerate(self.evaluator.trainsets):
if trainset_id in self.evaluator.brand_map:
# Reward putting branded trainsets in service
objective_terms.append((50, is_service[i])) # Positive weight to maximize
# 4. Minimize maintenance overhead
for i, trainset_id in enumerate(self.evaluator.trainsets):
constraints = self.evaluator.get_trainset_constraints(trainset_id)
if constraints.maintenance_due:
# Prefer putting maintenance-due trainsets in maintenance
objective_terms.append((20, assignment[i] == 2))
# Set the objective
if objective_terms:
model.Maximize(sum(weight * var for weight, var in objective_terms))
else:
# Fallback objective: maximize service assignments (though already constrained)
model.Maximize(sum(is_service))
def _categorize_trainsets_by_mileage(self) -> Dict[str, List[int]]:
"""Categorize trainsets by mileage ranges for balancing."""
ranges = {'low': [], 'medium': [], 'high': []}
mileages = []
for trainset_id in self.evaluator.trainsets:
status = self.evaluator.status_map.get(trainset_id, {})
mileage = status.get('total_mileage_km', 0)
mileages.append(mileage)
if not mileages:
return ranges
# Define ranges based on quartiles
mileages_sorted = sorted(mileages)
q1 = mileages_sorted[len(mileages_sorted) // 4]
q3 = mileages_sorted[3 * len(mileages_sorted) // 4]
for i, trainset_id in enumerate(self.evaluator.trainsets):
status = self.evaluator.status_map.get(trainset_id, {})
mileage = status.get('total_mileage_km', 0)
if mileage <= q1:
ranges['low'].append(i)
elif mileage >= q3:
ranges['high'].append(i)
else:
ranges['medium'].append(i)
return ranges
def _create_fallback_solution(self) -> OptimizationResult:
"""Create a basic feasible solution when CP-SAT fails."""
print("Creating fallback solution...")
# Simple greedy assignment
solution = np.full(self.n_trainsets, 2) # Start with all in maintenance
# Select best trainsets for service
valid_trainsets = []
for i, trainset_id in enumerate(self.evaluator.trainsets):
valid, _ = self.evaluator.check_hard_constraints(trainset_id)
if valid:
valid_trainsets.append(i)
# Assign required number to service
service_count = min(len(valid_trainsets), self.config.required_service_trains)
for i in range(service_count):
solution[valid_trainsets[i]] = 0
# Assign minimum to standby
standby_start = service_count
standby_count = min(len(valid_trainsets) - service_count, self.config.min_standby)
for i in range(standby_count):
if standby_start + i < len(valid_trainsets):
solution[valid_trainsets[standby_start + i]] = 1
return self._build_result(solution, float('inf'))
def _build_result(self, solution: np.ndarray, objective_value: float) -> OptimizationResult:
"""Build optimization result from solution."""
objectives = self.evaluator.calculate_objectives(solution)
service = [self.evaluator.trainsets[i] for i, v in enumerate(solution) if v == 0]
standby = [self.evaluator.trainsets[i] for i, v in enumerate(solution) if v == 1]
maintenance = [self.evaluator.trainsets[i] for i, v in enumerate(solution) if v == 2]
explanations = {}
for ts_id in service:
valid, reason = self.evaluator.check_hard_constraints(ts_id)
explanations[ts_id] = "βœ… CP-SAT optimal assignment" if valid else f"⚠ {reason}"
fitness = self.evaluator.fitness_function(solution)
return OptimizationResult(
selected_trainsets=service,
standby_trainsets=standby,
maintenance_trainsets=maintenance,
objectives=objectives,
fitness_score=fitness,
explanation=explanations
)
class MIPOptimizer:
"""Mixed Integer Programming optimizer using OR-Tools."""
def __init__(self, evaluator: TrainsetSchedulingEvaluator, config: Optional[OptimizationConfig] = None):
if not ORTOOLS_AVAILABLE:
raise ImportError("OR-Tools not available. Install with: pip install ortools")
self.evaluator = evaluator
self.config = config or OptimizationConfig()
self.n_trainsets = evaluator.num_trainsets
def optimize(self, time_limit_seconds: int = 300) -> OptimizationResult:
"""Solve using Mixed Integer Programming."""
solver = pywraplp.Solver.CreateSolver('SCIP')
if not solver:
print("❌ SCIP solver not available, falling back to CP-SAT")
cp_optimizer = CPSATOptimizer(self.evaluator, self.config)
return cp_optimizer.optimize(time_limit_seconds)
# Decision variables
x = {} # x[i,j] = 1 if trainset i is assigned to status j (0=service, 1=standby, 2=maintenance)
for i in range(self.n_trainsets):
for j in range(3):
x[i, j] = solver.BoolVar(f'x_{i}_{j}')
# Constraint: Each trainset has exactly one assignment
for i in range(self.n_trainsets):
solver.Add(sum(x[i, j] for j in range(3)) == 1)
# Constraint: Exact number in service
solver.Add(sum(x[i, 0] for i in range(self.n_trainsets)) == self.config.required_service_trains)
# Constraint: Minimum standby
solver.Add(sum(x[i, 1] for i in range(self.n_trainsets)) >= self.config.min_standby)
# Hard constraints
for i, trainset_id in enumerate(self.evaluator.trainsets):
valid, _ = self.evaluator.check_hard_constraints(trainset_id)
if not valid:
# Force to maintenance
solver.Add(x[i, 2] == 1)
# Objective: Maximize weighted sum of objectives
objective = solver.Objective()
# Branding compliance
for i, trainset_id in enumerate(self.evaluator.trainsets):
if trainset_id in self.evaluator.brand_map:
objective.SetCoefficient(x[i, 0], 100) # Reward service assignment for branded trains
# Mileage balance (simplified - prefer lower mileage in service)
for i, trainset_id in enumerate(self.evaluator.trainsets):
status = self.evaluator.status_map.get(trainset_id, {})
mileage = status.get('total_mileage_km', 0)
# Higher mileage gets lower weight (prefer lower mileage in service)
weight = max(1, 200000 - mileage) // 1000
objective.SetCoefficient(x[i, 0], weight)
objective.SetMaximization()
# Solve
solver.SetTimeLimit(time_limit_seconds * 1000) # milliseconds
print(f"Solving with MIP (time limit: {time_limit_seconds}s)...")
status = solver.Solve()
if status == pywraplp.Solver.OPTIMAL:
print("βœ… Optimal MIP solution found!")
elif status == pywraplp.Solver.FEASIBLE:
print("βœ… Feasible MIP solution found!")
else:
print("❌ No MIP solution found!")
return self._create_fallback_solution()
# Extract solution
solution = np.zeros(self.n_trainsets, dtype=int)
for i in range(self.n_trainsets):
for j in range(3):
if x[i, j].solution_value() > 0.5:
solution[i] = j
break
return self._build_result(solution, solver.Objective().Value())
def _create_fallback_solution(self) -> OptimizationResult:
"""Create fallback solution for MIP."""
cp_optimizer = CPSATOptimizer(self.evaluator, self.config)
return cp_optimizer._create_fallback_solution()
def _build_result(self, solution: np.ndarray, objective_value: float) -> OptimizationResult:
"""Build result from MIP solution."""
cp_optimizer = CPSATOptimizer(self.evaluator, self.config)
return cp_optimizer._build_result(solution, objective_value)
# Integration function
def optimize_with_ortools(data: Dict, method: str = 'cp-sat', **kwargs) -> OptimizationResult:
"""Optimize using OR-Tools methods.
Args:
data: Metro synthetic data
method: 'cp-sat' or 'mip'
**kwargs: Additional parameters (time_limit_seconds, etc.)
"""
if not ORTOOLS_AVAILABLE:
raise ImportError(
"OR-Tools not available. Install with: pip install ortools\n"
"OR-Tools provides exact optimization methods that complement the existing metaheuristics."
)
from .evaluator import TrainsetSchedulingEvaluator
evaluator = TrainsetSchedulingEvaluator(data)
config = kwargs.get('config', OptimizationConfig())
time_limit = kwargs.get('time_limit_seconds', 300)
if method == 'cp-sat':
optimizer = CPSATOptimizer(evaluator, config)
return optimizer.optimize(time_limit)
elif method == 'mip':
optimizer = MIPOptimizer(evaluator, config)
return optimizer.optimize(time_limit)
else:
raise ValueError(f"Unknown OR-Tools method: {method}. Use 'cp-sat' or 'mip'")
if __name__ == "__main__":
import json
# Test OR-Tools integration
if not ORTOOLS_AVAILABLE:
print("❌ OR-Tools not available. Install with: pip install ortools")
exit(1)
# Load test data
try:
with open('metro_enhanced_data.json', 'r') as f:
data = json.load(f)
except FileNotFoundError:
print("Please generate enhanced data first")
exit(1)
print("πŸ”§ Testing OR-Tools Optimization")
print("=" * 50)
# Test CP-SAT
print("\nTesting CP-SAT optimizer...")
result_cpsat = optimize_with_ortools(data, 'cp-sat', time_limit_seconds=60)
print(f"CP-SAT Result: {len(result_cpsat.selected_trainsets)} in service")
# Test MIP
print("\nTesting MIP optimizer...")
result_mip = optimize_with_ortools(data, 'mip', time_limit_seconds=60)
print(f"MIP Result: {len(result_mip.selected_trainsets)} in service")
print("\nβœ… OR-Tools integration successful!")