File size: 18,772 Bytes
33c9f7f
 
 
 
 
 
 
 
 
0f86bb9
33c9f7f
 
026e398
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
33c9f7f
 
 
 
 
 
 
 
 
0f86bb9
 
 
 
 
33c9f7f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c7ea7db
33c9f7f
c7ea7db
33c9f7f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
026e398
 
 
33c9f7f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
026e398
 
 
33c9f7f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
01795ee
 
 
 
 
 
 
 
 
 
 
 
 
33c9f7f
 
 
 
 
 
 
 
 
01795ee
33c9f7f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
01795ee
 
 
 
 
 
 
 
 
33c9f7f
 
 
01795ee
33c9f7f
01795ee
 
 
 
 
33c9f7f
 
0f86bb9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
"""
Trainset scheduling evaluation module.
Handles constraint checking and objective function calculation.
"""
import numpy as np
from datetime import datetime
from typing import Dict, List, Tuple, Optional

from .models import OptimizationConfig, TrainsetConstraints
from .service_blocks import ServiceBlockGenerator


# Status normalization mappings (backend format -> internal format)
CERTIFICATE_STATUS_MAP = {
    'PENDING': 'Expiring-Soon',
    'IN_PROGRESS': 'Expiring-Soon',
    'ISSUED': 'Valid',
    'EXPIRED': 'Expired',
    'SUSPENDED': 'Suspended',
    'REVOKED': 'Expired',
    'RENEWED': 'Valid',
    'CANCELLED': 'Expired',
}

COMPONENT_STATUS_MAP = {
    'EXCELLENT': 'Good',
    'GOOD': 'Good',
    'FAIR': 'Fair',
    'POOR': 'Warning',
    'CRITICAL': 'Critical',
    'FAILED': 'Critical',
}


def normalize_certificate_status(status: str) -> str:
    """Normalize certificate status to internal format."""
    return CERTIFICATE_STATUS_MAP.get(status, status)


def normalize_component_status(status: str) -> str:
    """Normalize component status to internal format."""
    return COMPONENT_STATUS_MAP.get(status, status)


class TrainsetSchedulingEvaluator:
    """Multi-objective evaluator for trainset scheduling optimization."""
    
    def __init__(self, data: Dict, config: Optional[OptimizationConfig] = None):
        self.data = data
        self.config = config or OptimizationConfig()
        self.trainsets = [ts['trainset_id'] for ts in data['trainset_status']]
        self.num_trainsets = len(self.trainsets)
        
        # Service block generator for schedule optimization
        self.block_generator = ServiceBlockGenerator()
        self.all_blocks = self.block_generator.get_all_service_blocks()
        self.num_blocks = len(self.all_blocks)
        
        # Build lookup dictionaries
        self._build_lookups()
        
    def _build_lookups(self):
        """Build fast lookup dictionaries for optimization."""
        self.status_map = {ts['trainset_id']: ts for ts in self.data['trainset_status']}
        
        # Fitness certificates by trainset and department
        self.fitness_map = {}
        for cert in self.data['fitness_certificates']:
            ts_id = cert['trainset_id']
            if ts_id not in self.fitness_map:
                self.fitness_map[ts_id] = {}
            self.fitness_map[ts_id][cert['department']] = cert
        
        # Job cards by trainset (optional - may be empty)
        self.job_map = {}
        for job in self.data.get('job_cards', []):
            ts_id = job['trainset_id']
            if ts_id not in self.job_map:
                self.job_map[ts_id] = []
            self.job_map[ts_id].append(job)
        
        # Component health by trainset
        self.health_map = {}
        for health in self.data['component_health']:
            ts_id = health['trainset_id']
            if ts_id not in self.health_map:
                self.health_map[ts_id] = []
            self.health_map[ts_id].append(health)
        
        # Branding contracts
        self.brand_map = {}
        for brand in self.data.get('branding_contracts', []):
            ts_id = brand['trainset_id']
            self.brand_map[ts_id] = brand
        
        # Maintenance schedule
        self.maint_map = {}
        for maint in self.data.get('maintenance_schedule', []):
            ts_id = maint['trainset_id']
            self.maint_map[ts_id] = maint
    
    def get_trainset_constraints(self, trainset_id: str) -> TrainsetConstraints:
        """Get all constraints for a specific trainset."""
        try:
            # Check fitness certificates
            has_valid_certs = True
            if trainset_id in self.fitness_map:
                for dept, cert in self.fitness_map[trainset_id].items():
                    # Normalize status to handle both legacy and backend formats
                    status = normalize_certificate_status(cert['status'])
                    if status in ['Expired']:
                        has_valid_certs = False
                        break
                    try:
                        expiry = datetime.fromisoformat(cert['expiry_date'])
                        if expiry < datetime.now():
                            has_valid_certs = False
                            break
                    except ValueError:
                        has_valid_certs = False
                        break
            else:
                has_valid_certs = False
            
            # Check critical jobs
            has_critical_jobs = False
            if trainset_id in self.job_map:
                for job in self.job_map[trainset_id]:
                    if job['status'] == 'Open' and job['priority'] == 'Critical':
                        has_critical_jobs = True
                        break
            
            # Check component warnings
            component_warnings = []
            if trainset_id in self.health_map:
                for health in self.health_map[trainset_id]:
                    # Normalize status to handle both legacy and backend formats
                    status = normalize_component_status(health['status'])
                    if status in ['Warning', 'Critical'] and health.get('wear_level', 0) > 90:
                        component_warnings.append(health['component'])
            
            # Check maintenance status
            maintenance_due = False
            if trainset_id in self.maint_map:
                maintenance_due = self.maint_map[trainset_id]['status'] == 'Overdue'
            
            # Get mileage and service info
            status = self.status_map.get(trainset_id, {})
            mileage = status.get('total_mileage_km', 0)
            
            # Calculate days since last service
            last_service_days = 0
            if 'last_service_date' in status:
                try:
                    last_service = datetime.fromisoformat(status['last_service_date'])
                    last_service_days = (datetime.now() - last_service).days
                except ValueError:
                    last_service_days = 999  # Unknown, assume old
            
            return TrainsetConstraints(
                has_valid_certificates=has_valid_certs,
                has_critical_jobs=has_critical_jobs,
                component_warnings=component_warnings,
                maintenance_due=maintenance_due,
                mileage=mileage,
                last_service_days=last_service_days
            )
        except Exception:
            # Return safe defaults if data is malformed
            return TrainsetConstraints(
                has_valid_certificates=False,
                has_critical_jobs=True,
                component_warnings=['Unknown'],
                maintenance_due=True,
                mileage=0,
                last_service_days=999
            )
    
    def check_hard_constraints(self, trainset_id: str) -> Tuple[bool, str]:
        """Check if trainset passes hard constraints for service."""
        constraints = self.get_trainset_constraints(trainset_id)
        
        if not constraints.has_valid_certificates:
            return False, "Invalid/expired certificates"
        
        if constraints.has_critical_jobs:
            return False, "Critical maintenance jobs pending"
        
        if constraints.component_warnings:
            return False, f"Critical component wear: {', '.join(constraints.component_warnings)}"
        
        return True, "Passes all constraints"
    
    def calculate_objectives(self, solution: np.ndarray) -> Dict[str, float]:
        """Calculate multiple objectives for a solution.
        
        Solution encoding: 0=Service, 1=Standby, 2=Maintenance
        """
        objectives = {
            'service_availability': 0.0,
            'maintenance_cost': 0.0,
            'branding_compliance': 0.0,
            'mileage_balance': 0.0,
            'constraint_penalty': 0.0
        }
        
        try:
            service_trains = []
            standby_trains = []
            maint_trains = []
            
            for idx, action in enumerate(solution):
                ts_id = self.trainsets[idx]
                if action == 0:
                    service_trains.append(ts_id)
                elif action == 1:
                    standby_trains.append(ts_id)
                else:
                    maint_trains.append(ts_id)
            
            # Objective 1: Service Availability (maximize)
            # Reward having MORE than minimum required (smooth operations)
            num_service = len(service_trains)
            if num_service < self.config.required_service_trains:
                # Heavy penalty for not meeting minimum
                objectives['constraint_penalty'] += (self.config.required_service_trains - num_service) * 200.0
                objectives['service_availability'] = (num_service / self.config.required_service_trains) * 100.0
            else:
                # Reward additional trains beyond minimum (up to 50% more for full fleet coverage)
                # This encourages smooth operations with more trains available
                bonus_trains = num_service - self.config.required_service_trains
                max_bonus = int(self.config.required_service_trains * 0.5)  # Up to 50% more
                bonus_score = min(bonus_trains / max_bonus, 1.0) * 20.0 if max_bonus > 0 else 0
                objectives['service_availability'] = 100.0 + bonus_score
            
            # Objective 2: Mileage Balance (maximize via minimizing std dev)
            mileages = [self.status_map[ts].get('total_mileage_km', 0) for ts in service_trains]
            if mileages and len(mileages) > 1:
                std_dev = float(np.std(mileages))
                objectives['mileage_balance'] = 100.0 - min(std_dev / 1000.0, 100.0)
            else:
                objectives['mileage_balance'] = 100.0
            
            # Objective 3: Branding Compliance (low priority - nice to have)
            brand_scores = []
            for ts_id in service_trains:
                if ts_id in self.brand_map:
                    contract = self.brand_map[ts_id]
                    target = contract.get('daily_target_hours', 8)
                    actual = contract.get('actual_exposure_hours', 0) / 30.0  # Daily average
                    compliance = min(actual / target, 1.0) if target > 0 else 1.0
                    brand_scores.append(compliance)
            
            objectives['branding_compliance'] = float(np.mean(brand_scores)) * 100.0 if brand_scores else 100.0
            
            # Objective 4: Maintenance Cost (minimize)
            maint_cost = 0.0
            for ts_id in service_trains:
                if ts_id in self.maint_map:
                    if self.maint_map[ts_id].get('status') == 'Overdue':
                        maint_cost += 50.0
            objectives['maintenance_cost'] = 100.0 - min(maint_cost, 100.0)
            
            # Hard constraint violations
            for ts_id in service_trains:
                valid, _ = self.check_hard_constraints(ts_id)
                if not valid:
                    objectives['constraint_penalty'] += 200.0
            
            # Standby constraint
            if len(standby_trains) < self.config.min_standby:
                objectives['constraint_penalty'] += (self.config.min_standby - len(standby_trains)) * 50.0
            
        except Exception as e:
            # Penalize heavily for any errors during evaluation
            objectives['constraint_penalty'] += 1000.0
            print(f"Error in objective calculation: {e}")
        
        return objectives
    
    def fitness_function(self, solution: np.ndarray) -> float:
        """Aggregate fitness function for minimization.
        
        Priority order (highest to lowest):
        1. Meeting minimum service trains (hard constraint)
        2. Having MORE trains for smooth operations
        3. Mileage balance across fleet
        4. Maintenance cost optimization
        5. Branding compliance (low priority, nice-to-have)
        """
        obj = self.calculate_objectives(solution)
        
        # Weighted sum (convert maximization objectives to minimization)
        # Higher weight = more important
        fitness = (
            -obj['service_availability'] * 5.0 +      # HIGHEST: Maximize trains in service
            -obj['mileage_balance'] * 1.5 +            # Medium: Fleet wear balance
            -obj['maintenance_cost'] * 1.0 +           # Medium: Avoid overdue maintenance
            -obj['branding_compliance'] * 0.2 +        # LOW: Branding is nice-to-have
            obj['constraint_penalty'] * 10.0           # CRITICAL: Hard constraints must be met
        )
        
        return fitness
    
    def evaluate_schedule_quality(self, service_trains: List[str], 
                                   block_assignments: Dict[str, List[int]]) -> Dict[str, float]:
        """Evaluate schedule quality objectives.
        
        Args:
            service_trains: List of trainset IDs in service
            block_assignments: Maps trainset_id -> list of block indices
            
        Returns:
            Dictionary with schedule quality scores
        """
        scores = {
            'headway_consistency': 0.0,
            'service_coverage': 0.0,
            'block_distribution': 0.0,
            'peak_coverage': 0.0
        }
        
        if not block_assignments:
            return scores
        
        # Flatten all assigned block indices
        all_assigned_blocks = set()
        blocks_per_train = []
        
        for ts_id, block_indices in block_assignments.items():
            all_assigned_blocks.update(block_indices)
            blocks_per_train.append(len(block_indices))
        
        # 1. Service Coverage: What % of blocks are covered?
        coverage = len(all_assigned_blocks) / self.num_blocks if self.num_blocks > 0 else 0
        scores['service_coverage'] = coverage * 100.0
        
        # 2. Peak Coverage: Are peak blocks covered?
        peak_indices = self.block_generator.get_peak_block_indices()
        covered_peak = len(all_assigned_blocks.intersection(peak_indices))
        peak_coverage = covered_peak / len(peak_indices) if peak_indices else 0
        scores['peak_coverage'] = peak_coverage * 100.0
        
        # 3. Block Distribution: Are blocks evenly distributed across trains?
        if blocks_per_train and len(blocks_per_train) > 1:
            std_dev = float(np.std(blocks_per_train))
            mean_blocks = float(np.mean(blocks_per_train))
            cv = std_dev / mean_blocks if mean_blocks > 0 else 1.0
            # Lower CV = better distribution (100 - penalty)
            scores['block_distribution'] = max(0, 100.0 - cv * 50.0)
        else:
            scores['block_distribution'] = 100.0
        
        # 4. Headway Consistency: Check departure gaps
        scores['headway_consistency'] = self._evaluate_headway_consistency(all_assigned_blocks)
        
        return scores
    
    def _evaluate_headway_consistency(self, assigned_block_indices: set) -> float:
        """Evaluate headway consistency for assigned blocks.
        
        Args:
            assigned_block_indices: Set of block indices that are covered
            
        Returns:
            Headway consistency score (0-100)
        """
        if not assigned_block_indices:
            return 0.0
        
        # Get departure times of assigned blocks
        departure_minutes = []
        for idx in assigned_block_indices:
            if idx < len(self.all_blocks):
                block = self.all_blocks[idx]
                time_str = block['departure_time']
                hour, minute = map(int, time_str.split(':'))
                departure_minutes.append(hour * 60 + minute)
        
        if len(departure_minutes) < 2:
            return 50.0  # Not enough data
        
        # Sort and calculate gaps
        departure_minutes.sort()
        gaps = []
        for i in range(1, len(departure_minutes)):
            gaps.append(departure_minutes[i] - departure_minutes[i-1])
        
        if not gaps:
            return 50.0
        
        # Calculate coefficient of variation for gaps
        mean_gap = float(np.mean(gaps))
        std_gap = float(np.std(gaps))
        
        # Lower CV = more consistent headways
        cv = std_gap / mean_gap if mean_gap > 0 else 1.0
        
        # Score: 100 for perfect consistency (CV=0), decreasing with higher CV
        score = max(0, 100.0 - cv * 100.0)
        
        return score
    
    def schedule_fitness_function(self, trainset_solution: np.ndarray, 
                                   block_solution: np.ndarray) -> float:
        """Combined fitness function for trainset and block assignment optimization.
        
        Args:
            trainset_solution: Array where trainset_solution[i] = 0/1/2 (service/standby/maint)
            block_solution: Array where block_solution[j] = trainset_index or -1 (unassigned)
            
        Returns:
            Combined fitness score (lower is better)
        """
        # First evaluate trainset selection
        base_fitness = self.fitness_function(trainset_solution)
        
        # Decode service trains
        service_train_indices = [i for i, v in enumerate(trainset_solution) if v == 0]
        service_trains = [self.trainsets[i] for i in service_train_indices]
        
        # Build block assignments
        block_assignments = {}
        for ts_idx in service_train_indices:
            ts_id = self.trainsets[ts_idx]
            block_assignments[ts_id] = []
        
        for block_idx, assigned_train_idx in enumerate(block_solution):
            if assigned_train_idx >= 0 and assigned_train_idx < len(self.trainsets):
                ts_id = self.trainsets[int(assigned_train_idx)]
                if ts_id in block_assignments:
                    block_assignments[ts_id].append(block_idx)
        
        # Evaluate schedule quality
        schedule_scores = self.evaluate_schedule_quality(service_trains, block_assignments)
        
        # Add schedule objectives to fitness
        schedule_penalty = (
            -(schedule_scores['service_coverage'] * 1.5) +     # Maximize coverage
            -(schedule_scores['peak_coverage'] * 2.0) +        # Maximize peak coverage
            -(schedule_scores['block_distribution'] * 1.0) +   # Maximize even distribution
            -(schedule_scores['headway_consistency'] * 1.0)    # Maximize consistency
        )
        
        return base_fitness + schedule_penalty