File size: 19,248 Bytes
33c9f7f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
"""
OR-Tools based optimizers for trainset scheduling.
Provides exact and constraint programming solutions.
"""
import numpy as np
from typing import Dict, List, Optional, Any
import logging

from .models import OptimizationResult, OptimizationConfig
from .evaluator import TrainsetSchedulingEvaluator


def check_ortools_availability() -> bool:
    """Check if OR-Tools is available."""
    try:
        import ortools.sat.python.cp_model
        import ortools.linear_solver.pywraplp
        return True
    except ImportError:
        return False


class ORToolsOptimizer:
    """Base class for OR-Tools optimizers."""
    
    def __init__(self, evaluator: TrainsetSchedulingEvaluator, config: Optional[OptimizationConfig] = None):
        if not check_ortools_availability():
            raise ImportError(
                "OR-Tools not available. Install with: pip install ortools\n"
                "OR-Tools provides exact optimization methods that complement existing metaheuristics."
            )
            
        self.evaluator = evaluator
        self.config = config or OptimizationConfig()
        self.n_trainsets = evaluator.num_trainsets
        
        # Logging
        self.logger = logging.getLogger(__name__)


class CPSATOptimizer(ORToolsOptimizer):
    """Constraint Programming optimizer using OR-Tools CP-SAT solver."""
    
    def optimize(self, time_limit_seconds: int = 300) -> OptimizationResult:
        """Solve using CP-SAT with constraint programming."""
        # Import here to avoid issues with type checking
        from ortools.sat.python import cp_model
        
        model = cp_model.CpModel()
        
        # Decision variables: assignment[i] ∈ {0, 1, 2} for each trainset
        # 0 = Service, 1 = Standby, 2 = Maintenance
        assignment = []
        for i in range(self.n_trainsets):
            assignment.append(model.NewIntVar(0, 2, f'trainset_{i}'))
        
        # Helper binary variables for easier constraint formulation
        is_service = []
        is_standby = []
        is_maintenance = []
        
        for i in range(self.n_trainsets):
            is_service.append(model.NewBoolVar(f'service_{i}'))
            is_standby.append(model.NewBoolVar(f'standby_{i}'))
            is_maintenance.append(model.NewBoolVar(f'maintenance_{i}'))
            
            # Link assignment variables to binary indicators
            model.Add(assignment[i] == 0).OnlyEnforceIf(is_service[i])
            model.Add(assignment[i] != 0).OnlyEnforceIf(is_service[i].Not())
            model.Add(assignment[i] == 1).OnlyEnforceIf(is_standby[i])
            model.Add(assignment[i] != 1).OnlyEnforceIf(is_standby[i].Not())
            model.Add(assignment[i] == 2).OnlyEnforceIf(is_maintenance[i])
            model.Add(assignment[i] != 2).OnlyEnforceIf(is_maintenance[i].Not())
        
        # Constraint 1: Exact number of trains in service
        model.Add(sum(is_service) == self.config.required_service_trains)
        
        # Constraint 2: Minimum standby trains
        model.Add(sum(is_standby) >= self.config.min_standby)
        
        # Constraint 3: Hard constraints - trains with issues cannot be in service
        forced_maintenance = 0
        for i, trainset_id in enumerate(self.evaluator.trainsets):
            valid, reason = self.evaluator.check_hard_constraints(trainset_id)
            if not valid:
                # Force trainset to maintenance if it fails constraints
                model.Add(assignment[i] == 2)
                forced_maintenance += 1
                self.logger.info(f"Trainset {trainset_id} forced to maintenance: {reason}")
        
        print(f"Forced {forced_maintenance} trainsets to maintenance due to constraints")
        
        # Constraint 4: Branding requirements
        self._add_branding_constraints(model, is_service)
        
        # Objective: Multi-objective optimization using weighted sum
        self._set_multi_objective(model, is_service, is_standby, assignment)
        
        # Solve
        solver = cp_model.CpSolver()
        solver.parameters.max_time_in_seconds = time_limit_seconds
        solver.parameters.log_search_progress = True
        
        print(f"Solving with CP-SAT (time limit: {time_limit_seconds}s)...")
        status = solver.Solve(model)
        
        if status == cp_model.OPTIMAL:
            print("βœ… Optimal solution found!")
        elif status == cp_model.FEASIBLE:
            print("βœ… Feasible solution found!")
        else:
            print("❌ No solution found!")
            return self._create_fallback_solution()
        
        # Extract solution
        solution = np.array([solver.Value(assignment[i]) for i in range(self.n_trainsets)])
        
        print(f"CP-SAT solution: {np.sum(solution == 0)} service, {np.sum(solution == 1)} standby, {np.sum(solution == 2)} maintenance")
        
        return self._build_result(solution, solver.ObjectiveValue())
    
    def _add_branding_constraints(self, model: Any, is_service: List[Any]):
        """Add branding contract constraints."""
        # Group trainsets by brand and ensure minimum coverage
        brand_trainsets = {}
        for i, trainset_id in enumerate(self.evaluator.trainsets):
            if trainset_id in self.evaluator.brand_map:
                brand = self.evaluator.brand_map[trainset_id].get('brand_name', 'Unknown')
                if brand not in brand_trainsets:
                    brand_trainsets[brand] = []
                brand_trainsets[brand].append(i)
        
        # Ensure each brand has at least some representation in service
        for brand, trainset_indices in brand_trainsets.items():
            if len(trainset_indices) > 1:
                # At least 30% of branded trainsets should be in service if possible
                min_branded = max(1, len(trainset_indices) // 3)
                model.Add(sum(is_service[i] for i in trainset_indices) >= min_branded)
                print(f"Brand {brand}: {len(trainset_indices)} trainsets, requiring {min_branded} in service")
    
    def _set_multi_objective(self, model: Any, is_service: List[Any], is_standby: List[Any], assignment: List[Any]):
        """Set up multi-objective optimization using weighted sum."""
        # Import here to avoid type checking issues
        from ortools.sat.python import cp_model
        
        objective_terms = []
        
        # 1. Branding compliance - maximize service assignment for branded trainsets
        brand_score = 0
        for i, trainset_id in enumerate(self.evaluator.trainsets):
            if trainset_id in self.evaluator.brand_map:
                # Reward putting branded trainsets in service
                objective_terms.append((50, is_service[i]))
                brand_score += 1
        
        print(f"Found {brand_score} branded trainsets for optimization")
        
        # 2. Mileage balance - prefer lower mileage trainsets in service
        mileages = []
        for trainset_id in self.evaluator.trainsets:
            status = self.evaluator.status_map.get(trainset_id, {})
            mileage = status.get('total_mileage_km', 0)
            mileages.append(mileage)
        
        if mileages:
            avg_mileage = sum(mileages) / len(mileages)
            for i, mileage in enumerate(mileages):
                # Prefer trainsets with below-average mileage
                if mileage < avg_mileage:
                    weight = int((avg_mileage - mileage) / 1000)  # Scale down
                    objective_terms.append((max(1, weight), is_service[i]))
        
        # 3. Maintenance preference - prefer trainsets needing maintenance to go to maintenance
        for i, trainset_id in enumerate(self.evaluator.trainsets):
            constraints = self.evaluator.get_trainset_constraints(trainset_id)
            if constraints.maintenance_due:
                # Create auxiliary variable for maintenance assignment
                is_maint_var = model.NewBoolVar(f'maint_pref_{i}')
                model.Add(assignment[i] == 2).OnlyEnforceIf(is_maint_var)
                model.Add(assignment[i] != 2).OnlyEnforceIf(is_maint_var.Not())
                objective_terms.append((30, is_maint_var))
        
        # Set the objective
        if objective_terms:
            model.Maximize(sum(weight * var for weight, var in objective_terms))
            print(f"Set objective with {len(objective_terms)} terms")
        else:
            # Fallback objective: minimize assignments to maintenance (prefer service/standby)
            model.Minimize(sum(assignment[i] == 2 for i in range(self.n_trainsets)))
            print("Using fallback objective")
    
    def _create_fallback_solution(self) -> OptimizationResult:
        """Create a basic feasible solution when CP-SAT fails."""
        print("Creating fallback solution...")
        
        # Simple greedy assignment
        solution = np.full(self.n_trainsets, 2, dtype=int)  # Start with all in maintenance
        
        # Select best trainsets for service
        valid_trainsets = []
        for i, trainset_id in enumerate(self.evaluator.trainsets):
            valid, _ = self.evaluator.check_hard_constraints(trainset_id)
            if valid:
                valid_trainsets.append(i)
        
        # Sort by mileage (prefer lower mileage)
        valid_trainsets.sort(key=lambda i: self.evaluator.status_map.get(
            self.evaluator.trainsets[i], {}
        ).get('total_mileage_km', 0))
        
        # Assign required number to service
        service_count = min(len(valid_trainsets), self.config.required_service_trains)
        for i in range(service_count):
            solution[valid_trainsets[i]] = 0
        
        # Assign minimum to standby
        standby_start = service_count
        standby_count = min(len(valid_trainsets) - service_count, self.config.min_standby)
        for i in range(standby_count):
            if standby_start + i < len(valid_trainsets):
                solution[valid_trainsets[standby_start + i]] = 1
        
        return self._build_result(solution, float('inf'))
    
    def _build_result(self, solution: np.ndarray, objective_value: float) -> OptimizationResult:
        """Build optimization result from solution."""
        objectives = self.evaluator.calculate_objectives(solution)
        
        service = [self.evaluator.trainsets[i] for i, v in enumerate(solution) if v == 0]
        standby = [self.evaluator.trainsets[i] for i, v in enumerate(solution) if v == 1]
        maintenance = [self.evaluator.trainsets[i] for i, v in enumerate(solution) if v == 2]
        
        explanations = {}
        for ts_id in service:
            valid, reason = self.evaluator.check_hard_constraints(ts_id)
            explanations[ts_id] = "βœ… CP-SAT optimal" if valid else f"⚠ {reason}"
        
        for ts_id in standby:
            explanations[ts_id] = "🟑 Standby (CP-SAT)"
        
        for ts_id in maintenance:
            explanations[ts_id] = "πŸ”§ Maintenance (CP-SAT)"
        
        fitness = self.evaluator.fitness_function(solution)
        
        return OptimizationResult(
            selected_trainsets=service,
            standby_trainsets=standby,
            maintenance_trainsets=maintenance,
            objectives=objectives,
            fitness_score=fitness,
            explanation=explanations
        )


class MIPOptimizer(ORToolsOptimizer):
    """Mixed Integer Programming optimizer using OR-Tools."""
    
    def optimize(self, time_limit_seconds: int = 300) -> OptimizationResult:
        """Solve using Mixed Integer Programming."""
        # Import here to avoid type checking issues
        from ortools.linear_solver import pywraplp
        
        solver = pywraplp.Solver.CreateSolver('SCIP')
        if not solver:
            print("❌ SCIP solver not available, falling back to CP-SAT")
            cp_optimizer = CPSATOptimizer(self.evaluator, self.config)
            return cp_optimizer.optimize(time_limit_seconds)
        
        # Decision variables
        x = {}  # x[i,j] = 1 if trainset i is assigned to status j (0=service, 1=standby, 2=maintenance)
        for i in range(self.n_trainsets):
            for j in range(3):
                x[i, j] = solver.BoolVar(f'x_{i}_{j}')
        
        # Constraint: Each trainset has exactly one assignment
        for i in range(self.n_trainsets):
            solver.Add(sum(x[i, j] for j in range(3)) == 1)
        
        # Constraint: Exact number in service
        solver.Add(sum(x[i, 0] for i in range(self.n_trainsets)) == self.config.required_service_trains)
        
        # Constraint: Minimum standby
        solver.Add(sum(x[i, 1] for i in range(self.n_trainsets)) >= self.config.min_standby)
        
        # Hard constraints
        forced_maintenance = 0
        for i, trainset_id in enumerate(self.evaluator.trainsets):
            valid, _ = self.evaluator.check_hard_constraints(trainset_id)
            if not valid:
                # Force to maintenance
                solver.Add(x[i, 2] == 1)
                forced_maintenance += 1
        
        print(f"MIP: Forced {forced_maintenance} trainsets to maintenance")
        
        # Objective: Maximize weighted sum of objectives
        objective = solver.Objective()
        
        # Branding compliance
        brand_count = 0
        for i, trainset_id in enumerate(self.evaluator.trainsets):
            if trainset_id in self.evaluator.brand_map:
                objective.SetCoefficient(x[i, 0], 100)  # Reward service assignment for branded trains
                brand_count += 1
        
        # Mileage balance (simplified - prefer lower mileage in service)
        for i, trainset_id in enumerate(self.evaluator.trainsets):
            status = self.evaluator.status_map.get(trainset_id, {})
            mileage = status.get('total_mileage_km', 0)
            # Higher mileage gets lower weight (prefer lower mileage in service)
            weight = max(1, 200000 - mileage) // 1000
            objective.SetCoefficient(x[i, 0], weight)
        
        objective.SetMaximization()
        print(f"MIP: Set up optimization with {brand_count} branded trainsets")
        
        # Solve
        solver.SetTimeLimit(time_limit_seconds * 1000)  # milliseconds
        print(f"Solving with MIP (time limit: {time_limit_seconds}s)...")
        status = solver.Solve()
        
        if status == pywraplp.Solver.OPTIMAL:
            print("βœ… Optimal MIP solution found!")
        elif status == pywraplp.Solver.FEASIBLE:
            print("βœ… Feasible MIP solution found!")
        else:
            print("❌ No MIP solution found, falling back to CP-SAT")
            cp_optimizer = CPSATOptimizer(self.evaluator, self.config)
            return cp_optimizer.optimize(time_limit_seconds)
        
        # Extract solution
        solution = np.zeros(self.n_trainsets, dtype=int)
        for i in range(self.n_trainsets):
            for j in range(3):
                if x[i, j].solution_value() > 0.5:
                    solution[i] = j
                    break
        
        print(f"MIP solution: {np.sum(solution == 0)} service, {np.sum(solution == 1)} standby, {np.sum(solution == 2)} maintenance")
        
        # Use CP-SAT's result builder
        cp_optimizer = CPSATOptimizer(self.evaluator, self.config)
        result = cp_optimizer._build_result(solution, solver.Objective().Value())
        
        # Update explanations for MIP
        for ts_id in result.explanation:
            if "CP-SAT" in result.explanation[ts_id]:
                result.explanation[ts_id] = result.explanation[ts_id].replace("CP-SAT", "MIP")
        
        return result


# Integration functions
def optimize_with_ortools(data: Dict, method: str = 'cp-sat', **kwargs) -> OptimizationResult:
    """Optimize using OR-Tools methods.
    
    Args:
        data: Metro synthetic data
        method: 'cp-sat' or 'mip'
        **kwargs: Additional parameters (time_limit_seconds, config, etc.)
    """
    if not check_ortools_availability():
        raise ImportError(
            "OR-Tools not available. Install with: pip install ortools\n"
            "OR-Tools provides exact optimization methods that complement the existing metaheuristics."
        )
    
    from .evaluator import TrainsetSchedulingEvaluator
    
    evaluator = TrainsetSchedulingEvaluator(data)
    config = kwargs.get('config', OptimizationConfig())
    time_limit = kwargs.get('time_limit_seconds', 300)
    
    print(f"\nπŸ”§ OR-Tools {method.upper()} Optimization")
    print("=" * 50)
    print(f"Trainsets: {evaluator.num_trainsets}")
    print(f"Required in service: {config.required_service_trains}")
    print(f"Minimum standby: {config.min_standby}")
    
    if method == 'cp-sat':
        optimizer = CPSATOptimizer(evaluator, config)
        return optimizer.optimize(time_limit)
    elif method == 'mip':
        optimizer = MIPOptimizer(evaluator, config)
        return optimizer.optimize(time_limit)
    else:
        raise ValueError(f"Unknown OR-Tools method: {method}. Use 'cp-sat' or 'mip'")


if __name__ == "__main__":
    import json
    
    # Test OR-Tools integration
    if not check_ortools_availability():
        print("❌ OR-Tools not available. Install with: pip install ortools")
        print("OR-Tools provides exact optimization that complements the existing metaheuristics.")
        exit(1)
    
    # Load test data
    try:
        with open('../DataService/metro_enhanced_data.json', 'r') as f:
            data = json.load(f)
        print("βœ… Loaded enhanced synthetic data")
    except FileNotFoundError:
        try:
            with open('../DataService/metro_synthetic_data.json', 'r') as f:
                data = json.load(f)
            print("βœ… Loaded basic synthetic data")
        except FileNotFoundError:
            print("❌ No test data found. Please generate synthetic data first.")
            exit(1)
    
    print("\nπŸ”§ Testing OR-Tools Optimization Methods")
    print("=" * 60)
    
    # Test CP-SAT
    print("\n1. Testing CP-SAT optimizer...")
    try:
        result_cpsat = optimize_with_ortools(data, 'cp-sat', time_limit_seconds=60)
        print(f"βœ… CP-SAT completed: {len(result_cpsat.selected_trainsets)} in service, "
              f"fitness = {result_cpsat.fitness_score:.2f}")
    except Exception as e:
        print(f"❌ CP-SAT failed: {e}")
    
    # Test MIP
    print("\n2. Testing MIP optimizer...")
    try:
        result_mip = optimize_with_ortools(data, 'mip', time_limit_seconds=60)
        print(f"βœ… MIP completed: {len(result_mip.selected_trainsets)} in service, "
              f"fitness = {result_mip.fitness_score:.2f}")
    except Exception as e:
        print(f"❌ MIP failed: {e}")
    
    print("\nπŸŽ‰ OR-Tools integration test completed!")
    print("\nOR-Tools provides:")
    print("β€’ Exact optimization with mathematical guarantees")
    print("β€’ Constraint satisfaction with hard constraints")
    print("β€’ Complement to existing metaheuristic approaches")
    print("β€’ Optimal solutions for smaller problem instances")