Arpit-Bansal commited on
Commit
0f86bb9
·
1 Parent(s): 1b2c6dc

proper scheduling-1, better iterations set

Browse files
greedyOptim/advanced_optimizers.py CHANGED
@@ -3,7 +3,7 @@ Advanced optimization algorithms for trainset scheduling.
3
  Includes CMA-ES, Particle Swarm Optimization, and Simulated Annealing.
4
  """
5
  import numpy as np
6
- from typing import Optional
7
  import math
8
 
9
  from .models import OptimizationResult, OptimizationConfig
@@ -17,6 +17,8 @@ class CMAESOptimizer:
17
  self.evaluator = evaluator
18
  self.config = config or OptimizationConfig()
19
  self.n = evaluator.num_trainsets
 
 
20
  self.lam = self.config.population_size # Population size
21
  self.mu = self.config.population_size // 2 # Number of parents
22
 
@@ -47,12 +49,41 @@ class CMAESOptimizer:
47
  """Decode continuous values to discrete actions."""
48
  return np.clip(np.round(x), 0, 2).astype(int)
49
 
50
- def optimize(self, generations: int = 150) -> OptimizationResult:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
51
  """Run CMA-ES optimization."""
 
 
 
 
52
  best_fitness = float('inf')
53
  best_solution: Optional[np.ndarray] = None
 
54
 
55
  print(f"Starting CMA-ES optimization for {generations} generations")
 
 
56
 
57
  for gen in range(generations):
58
  try:
@@ -68,10 +99,20 @@ class CMAESOptimizer:
68
  # Evaluate
69
  fitness = []
70
  decoded_pop = []
 
 
71
  for ind in population:
72
  decoded = self._decode(ind)
73
  decoded_pop.append(decoded)
74
- fitness.append(self.evaluator.fitness_function(decoded))
 
 
 
 
 
 
 
 
75
 
76
  fitness = np.array(fitness)
77
  decoded_pop = np.array(decoded_pop)
@@ -81,6 +122,8 @@ class CMAESOptimizer:
81
  if fitness[gen_best_idx] < best_fitness:
82
  best_fitness = fitness[gen_best_idx]
83
  best_solution = decoded_pop[gen_best_idx].copy()
 
 
84
 
85
  if gen % 30 == 0:
86
  print(f"Generation {gen}: Best Fitness = {best_fitness:.2f}")
@@ -117,9 +160,10 @@ class CMAESOptimizer:
117
  if best_solution is None:
118
  raise RuntimeError("No valid solution found during CMA-ES optimization")
119
 
120
- return self._build_result(best_solution, best_fitness)
121
 
122
- def _build_result(self, solution: np.ndarray, fitness: float) -> OptimizationResult:
 
123
  """Build optimization result from solution."""
124
  objectives = self.evaluator.calculate_objectives(solution)
125
 
@@ -132,13 +176,27 @@ class CMAESOptimizer:
132
  valid, reason = self.evaluator.check_hard_constraints(ts_id)
133
  explanations[ts_id] = "✓ Fit for service" if valid else f"⚠ {reason}"
134
 
 
 
 
 
 
 
 
 
 
 
 
 
 
135
  return OptimizationResult(
136
  selected_trainsets=service,
137
  standby_trainsets=standby,
138
  maintenance_trainsets=maintenance,
139
  objectives=objectives,
140
  fitness_score=fitness,
141
- explanation=explanations
 
142
  )
143
 
144
 
@@ -150,6 +208,8 @@ class ParticleSwarmOptimizer:
150
  self.config = config or OptimizationConfig()
151
  self.n_particles = self.config.population_size
152
  self.n_dimensions = evaluator.num_trainsets
 
 
153
 
154
  # PSO parameters
155
  self.w = 0.7 # Inertia weight
@@ -161,8 +221,25 @@ class ParticleSwarmOptimizer:
161
  """Decode continuous values to discrete actions."""
162
  return np.clip(np.round(x), 0, 2).astype(int)
163
 
164
- def optimize(self, generations: int = 200) -> OptimizationResult:
 
 
 
 
 
 
 
 
 
 
 
 
 
165
  """Run PSO optimization."""
 
 
 
 
166
  # Initialize particles
167
  positions = np.random.uniform(0, 3, (self.n_particles, self.n_dimensions))
168
  velocities = np.random.uniform(-1, 1, (self.n_particles, self.n_dimensions))
@@ -170,29 +247,41 @@ class ParticleSwarmOptimizer:
170
  # Personal best positions and fitness
171
  p_best_positions = positions.copy()
172
  p_best_fitness = np.array([float('inf')] * self.n_particles)
 
173
 
174
  # Global best
175
  g_best_position = np.zeros(self.n_dimensions)
176
  g_best_fitness = float('inf')
 
177
 
178
  print(f"Starting PSO optimization with {self.n_particles} particles for {generations} generations")
 
 
179
 
180
  for gen in range(generations):
181
  try:
182
  for i in range(self.n_particles):
183
  # Evaluate particle
184
  decoded = self._decode(positions[i])
185
- fitness = self.evaluator.fitness_function(decoded)
 
 
 
 
 
 
186
 
187
  # Update personal best
188
  if fitness < p_best_fitness[i]:
189
  p_best_fitness[i] = fitness
190
  p_best_positions[i] = positions[i].copy()
 
191
 
192
  # Update global best
193
  if fitness < g_best_fitness:
194
  g_best_fitness = fitness
195
  g_best_position = positions[i].copy()
 
196
 
197
  # Update velocities and positions
198
  for i in range(self.n_particles):
@@ -217,9 +306,10 @@ class ParticleSwarmOptimizer:
217
  break
218
 
219
  best_solution = self._decode(g_best_position)
220
- return self._build_result(best_solution, g_best_fitness)
221
 
222
- def _build_result(self, solution: np.ndarray, fitness: float) -> OptimizationResult:
 
223
  """Build optimization result from solution."""
224
  objectives = self.evaluator.calculate_objectives(solution)
225
 
@@ -232,13 +322,27 @@ class ParticleSwarmOptimizer:
232
  valid, reason = self.evaluator.check_hard_constraints(ts_id)
233
  explanations[ts_id] = "✓ Fit for service" if valid else f"⚠ {reason}"
234
 
 
 
 
 
 
 
 
 
 
 
 
 
 
235
  return OptimizationResult(
236
  selected_trainsets=service,
237
  standby_trainsets=standby,
238
  maintenance_trainsets=maintenance,
239
  objectives=objectives,
240
  fitness_score=fitness,
241
- explanation=explanations
 
242
  )
243
 
244
 
@@ -249,6 +353,8 @@ class SimulatedAnnealingOptimizer:
249
  self.evaluator = evaluator
250
  self.config = config or OptimizationConfig()
251
  self.n_dimensions = evaluator.num_trainsets
 
 
252
 
253
  def _get_neighbor(self, solution: np.ndarray) -> np.ndarray:
254
  """Generate a neighbor solution by randomly changing one gene."""
@@ -257,26 +363,80 @@ class SimulatedAnnealingOptimizer:
257
  neighbor[idx] = np.random.randint(0, 3)
258
  return neighbor
259
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
260
  def _temperature(self, iteration: int, max_iterations: int) -> float:
261
  """Calculate temperature using exponential cooling."""
262
  return 100.0 * (0.95 ** iteration)
263
 
264
- def optimize(self, max_iterations: int = 10000) -> OptimizationResult:
265
  """Run Simulated Annealing optimization."""
 
 
 
 
266
  # Initialize with a random solution
267
  current_solution = np.random.randint(0, 3, self.n_dimensions)
268
- current_fitness = self.evaluator.fitness_function(current_solution)
 
 
 
 
 
269
 
270
  best_solution = current_solution.copy()
 
271
  best_fitness = current_fitness
272
 
273
  print(f"Starting Simulated Annealing optimization for {max_iterations} iterations")
 
 
274
 
275
  for iteration in range(max_iterations):
276
  try:
277
  # Generate neighbor
278
  neighbor = self._get_neighbor(current_solution)
279
- neighbor_fitness = self.evaluator.fitness_function(neighbor)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
280
 
281
  # Calculate acceptance probability
282
  temperature = self._temperature(iteration, max_iterations)
@@ -285,6 +445,7 @@ class SimulatedAnnealingOptimizer:
285
  # Accept better solution
286
  current_solution = neighbor
287
  current_fitness = neighbor_fitness
 
288
  elif temperature > 0:
289
  # Accept worse solution with probability
290
  delta = neighbor_fitness - current_fitness
@@ -292,11 +453,13 @@ class SimulatedAnnealingOptimizer:
292
  if np.random.random() < probability:
293
  current_solution = neighbor
294
  current_fitness = neighbor_fitness
 
295
 
296
  # Update best solution
297
  if current_fitness < best_fitness:
298
  best_solution = current_solution.copy()
299
  best_fitness = current_fitness
 
300
 
301
  if iteration % 1000 == 0:
302
  print(f"Iteration {iteration}: Best Fitness = {best_fitness:.2f}, Temperature = {temperature:.2f}")
@@ -305,9 +468,10 @@ class SimulatedAnnealingOptimizer:
305
  print(f"Error in SA iteration {iteration}: {e}")
306
  break
307
 
308
- return self._build_result(best_solution, best_fitness)
309
 
310
- def _build_result(self, solution: np.ndarray, fitness: float) -> OptimizationResult:
 
311
  """Build optimization result from solution."""
312
  objectives = self.evaluator.calculate_objectives(solution)
313
 
@@ -320,11 +484,25 @@ class SimulatedAnnealingOptimizer:
320
  valid, reason = self.evaluator.check_hard_constraints(ts_id)
321
  explanations[ts_id] = "✓ Fit for service" if valid else f"⚠ {reason}"
322
 
 
 
 
 
 
 
 
 
 
 
 
 
 
323
  return OptimizationResult(
324
  selected_trainsets=service,
325
  standby_trainsets=standby,
326
  maintenance_trainsets=maintenance,
327
  objectives=objectives,
328
  fitness_score=fitness,
329
- explanation=explanations
 
330
  )
 
3
  Includes CMA-ES, Particle Swarm Optimization, and Simulated Annealing.
4
  """
5
  import numpy as np
6
+ from typing import Optional, Dict, List
7
  import math
8
 
9
  from .models import OptimizationResult, OptimizationConfig
 
17
  self.evaluator = evaluator
18
  self.config = config or OptimizationConfig()
19
  self.n = evaluator.num_trainsets
20
+ self.n_blocks = evaluator.num_blocks
21
+ self.optimize_blocks = self.config.optimize_block_assignment
22
  self.lam = self.config.population_size # Population size
23
  self.mu = self.config.population_size // 2 # Number of parents
24
 
 
49
  """Decode continuous values to discrete actions."""
50
  return np.clip(np.round(x), 0, 2).astype(int)
51
 
52
+ def _create_block_assignment(self, trainset_sol: np.ndarray) -> np.ndarray:
53
+ """Create optimized block assignments for a trainset solution."""
54
+ service_indices = np.where(trainset_sol == 0)[0]
55
+
56
+ if len(service_indices) == 0:
57
+ return np.full(self.n_blocks, -1, dtype=int)
58
+
59
+ # Distribute blocks evenly with some randomization
60
+ block_sol = np.zeros(self.n_blocks, dtype=int)
61
+ for i in range(self.n_blocks):
62
+ block_sol[i] = service_indices[i % len(service_indices)]
63
+
64
+ # Random shuffle to explore different assignments
65
+ np.random.shuffle(block_sol)
66
+
67
+ # Repair to ensure valid assignments
68
+ for i in range(len(block_sol)):
69
+ if block_sol[i] not in service_indices:
70
+ block_sol[i] = np.random.choice(service_indices)
71
+
72
+ return block_sol
73
+
74
+ def optimize(self, generations: Optional[int] = None) -> OptimizationResult:
75
  """Run CMA-ES optimization."""
76
+ # Use config.iterations as default if not specified
77
+ if generations is None:
78
+ generations = self.config.iterations * 15 # Scale iterations for CMA-ES
79
+
80
  best_fitness = float('inf')
81
  best_solution: Optional[np.ndarray] = None
82
+ best_block_solution: Optional[np.ndarray] = None
83
 
84
  print(f"Starting CMA-ES optimization for {generations} generations")
85
+ if self.optimize_blocks:
86
+ print(f"Optimizing block assignments for {self.n_blocks} service blocks")
87
 
88
  for gen in range(generations):
89
  try:
 
99
  # Evaluate
100
  fitness = []
101
  decoded_pop = []
102
+ block_pop = []
103
+
104
  for ind in population:
105
  decoded = self._decode(ind)
106
  decoded_pop.append(decoded)
107
+
108
+ if self.optimize_blocks:
109
+ block_sol = self._create_block_assignment(decoded)
110
+ block_pop.append(block_sol)
111
+ fit = self.evaluator.schedule_fitness_function(decoded, block_sol)
112
+ else:
113
+ fit = self.evaluator.fitness_function(decoded)
114
+
115
+ fitness.append(fit)
116
 
117
  fitness = np.array(fitness)
118
  decoded_pop = np.array(decoded_pop)
 
122
  if fitness[gen_best_idx] < best_fitness:
123
  best_fitness = fitness[gen_best_idx]
124
  best_solution = decoded_pop[gen_best_idx].copy()
125
+ if self.optimize_blocks and block_pop:
126
+ best_block_solution = block_pop[gen_best_idx].copy()
127
 
128
  if gen % 30 == 0:
129
  print(f"Generation {gen}: Best Fitness = {best_fitness:.2f}")
 
160
  if best_solution is None:
161
  raise RuntimeError("No valid solution found during CMA-ES optimization")
162
 
163
+ return self._build_result(best_solution, best_fitness, best_block_solution)
164
 
165
+ def _build_result(self, solution: np.ndarray, fitness: float,
166
+ block_solution: Optional[np.ndarray] = None) -> OptimizationResult:
167
  """Build optimization result from solution."""
168
  objectives = self.evaluator.calculate_objectives(solution)
169
 
 
176
  valid, reason = self.evaluator.check_hard_constraints(ts_id)
177
  explanations[ts_id] = "✓ Fit for service" if valid else f"⚠ {reason}"
178
 
179
+ # Build block assignments
180
+ block_assignments = {}
181
+ if block_solution is not None and self.optimize_blocks:
182
+ for ts_id in service:
183
+ block_assignments[ts_id] = []
184
+
185
+ for block_idx, train_idx in enumerate(block_solution):
186
+ if 0 <= train_idx < len(self.evaluator.trainsets):
187
+ ts_id = self.evaluator.trainsets[train_idx]
188
+ if ts_id in block_assignments:
189
+ block_id = self.evaluator.all_blocks[block_idx]['block_id']
190
+ block_assignments[ts_id].append(block_id)
191
+
192
  return OptimizationResult(
193
  selected_trainsets=service,
194
  standby_trainsets=standby,
195
  maintenance_trainsets=maintenance,
196
  objectives=objectives,
197
  fitness_score=fitness,
198
+ explanation=explanations,
199
+ service_block_assignments=block_assignments
200
  )
201
 
202
 
 
208
  self.config = config or OptimizationConfig()
209
  self.n_particles = self.config.population_size
210
  self.n_dimensions = evaluator.num_trainsets
211
+ self.n_blocks = evaluator.num_blocks
212
+ self.optimize_blocks = self.config.optimize_block_assignment
213
 
214
  # PSO parameters
215
  self.w = 0.7 # Inertia weight
 
221
  """Decode continuous values to discrete actions."""
222
  return np.clip(np.round(x), 0, 2).astype(int)
223
 
224
+ def _create_block_assignment(self, trainset_sol: np.ndarray) -> np.ndarray:
225
+ """Create block assignments for a trainset solution."""
226
+ service_indices = np.where(trainset_sol == 0)[0]
227
+
228
+ if len(service_indices) == 0:
229
+ return np.full(self.n_blocks, -1, dtype=int)
230
+
231
+ block_sol = np.zeros(self.n_blocks, dtype=int)
232
+ for i in range(self.n_blocks):
233
+ block_sol[i] = service_indices[i % len(service_indices)]
234
+
235
+ return block_sol
236
+
237
+ def optimize(self, generations: Optional[int] = None) -> OptimizationResult:
238
  """Run PSO optimization."""
239
+ # Use config.iterations as default if not specified
240
+ if generations is None:
241
+ generations = self.config.iterations * 20 # Scale iterations for PSO
242
+
243
  # Initialize particles
244
  positions = np.random.uniform(0, 3, (self.n_particles, self.n_dimensions))
245
  velocities = np.random.uniform(-1, 1, (self.n_particles, self.n_dimensions))
 
247
  # Personal best positions and fitness
248
  p_best_positions = positions.copy()
249
  p_best_fitness = np.array([float('inf')] * self.n_particles)
250
+ p_best_blocks = [None] * self.n_particles
251
 
252
  # Global best
253
  g_best_position = np.zeros(self.n_dimensions)
254
  g_best_fitness = float('inf')
255
+ g_best_block = None
256
 
257
  print(f"Starting PSO optimization with {self.n_particles} particles for {generations} generations")
258
+ if self.optimize_blocks:
259
+ print(f"Optimizing block assignments for {self.n_blocks} service blocks")
260
 
261
  for gen in range(generations):
262
  try:
263
  for i in range(self.n_particles):
264
  # Evaluate particle
265
  decoded = self._decode(positions[i])
266
+
267
+ if self.optimize_blocks:
268
+ block_sol = self._create_block_assignment(decoded)
269
+ fitness = self.evaluator.schedule_fitness_function(decoded, block_sol)
270
+ else:
271
+ block_sol = None
272
+ fitness = self.evaluator.fitness_function(decoded)
273
 
274
  # Update personal best
275
  if fitness < p_best_fitness[i]:
276
  p_best_fitness[i] = fitness
277
  p_best_positions[i] = positions[i].copy()
278
+ p_best_blocks[i] = block_sol.copy() if block_sol is not None else None
279
 
280
  # Update global best
281
  if fitness < g_best_fitness:
282
  g_best_fitness = fitness
283
  g_best_position = positions[i].copy()
284
+ g_best_block = block_sol.copy() if block_sol is not None else None
285
 
286
  # Update velocities and positions
287
  for i in range(self.n_particles):
 
306
  break
307
 
308
  best_solution = self._decode(g_best_position)
309
+ return self._build_result(best_solution, g_best_fitness, g_best_block)
310
 
311
+ def _build_result(self, solution: np.ndarray, fitness: float,
312
+ block_solution: Optional[np.ndarray] = None) -> OptimizationResult:
313
  """Build optimization result from solution."""
314
  objectives = self.evaluator.calculate_objectives(solution)
315
 
 
322
  valid, reason = self.evaluator.check_hard_constraints(ts_id)
323
  explanations[ts_id] = "✓ Fit for service" if valid else f"⚠ {reason}"
324
 
325
+ # Build block assignments
326
+ block_assignments = {}
327
+ if block_solution is not None and self.optimize_blocks:
328
+ for ts_id in service:
329
+ block_assignments[ts_id] = []
330
+
331
+ for block_idx, train_idx in enumerate(block_solution):
332
+ if 0 <= train_idx < len(self.evaluator.trainsets):
333
+ ts_id = self.evaluator.trainsets[train_idx]
334
+ if ts_id in block_assignments:
335
+ block_id = self.evaluator.all_blocks[block_idx]['block_id']
336
+ block_assignments[ts_id].append(block_id)
337
+
338
  return OptimizationResult(
339
  selected_trainsets=service,
340
  standby_trainsets=standby,
341
  maintenance_trainsets=maintenance,
342
  objectives=objectives,
343
  fitness_score=fitness,
344
+ explanation=explanations,
345
+ service_block_assignments=block_assignments
346
  )
347
 
348
 
 
353
  self.evaluator = evaluator
354
  self.config = config or OptimizationConfig()
355
  self.n_dimensions = evaluator.num_trainsets
356
+ self.n_blocks = evaluator.num_blocks
357
+ self.optimize_blocks = self.config.optimize_block_assignment
358
 
359
  def _get_neighbor(self, solution: np.ndarray) -> np.ndarray:
360
  """Generate a neighbor solution by randomly changing one gene."""
 
363
  neighbor[idx] = np.random.randint(0, 3)
364
  return neighbor
365
 
366
+ def _get_block_neighbor(self, block_sol: np.ndarray, service_indices: np.ndarray) -> np.ndarray:
367
+ """Generate a neighbor block assignment."""
368
+ neighbor = block_sol.copy()
369
+
370
+ if len(service_indices) == 0:
371
+ return neighbor
372
+
373
+ # Randomly reassign a few blocks
374
+ num_changes = max(1, self.n_blocks // 20)
375
+ for _ in range(num_changes):
376
+ idx = np.random.randint(0, len(neighbor))
377
+ neighbor[idx] = np.random.choice(service_indices)
378
+
379
+ return neighbor
380
+
381
+ def _create_block_assignment(self, trainset_sol: np.ndarray) -> np.ndarray:
382
+ """Create block assignments for a trainset solution."""
383
+ service_indices = np.where(trainset_sol == 0)[0]
384
+
385
+ if len(service_indices) == 0:
386
+ return np.full(self.n_blocks, -1, dtype=int)
387
+
388
+ block_sol = np.zeros(self.n_blocks, dtype=int)
389
+ for i in range(self.n_blocks):
390
+ block_sol[i] = service_indices[i % len(service_indices)]
391
+
392
+ return block_sol
393
+
394
  def _temperature(self, iteration: int, max_iterations: int) -> float:
395
  """Calculate temperature using exponential cooling."""
396
  return 100.0 * (0.95 ** iteration)
397
 
398
+ def optimize(self, max_iterations: Optional[int] = None) -> OptimizationResult:
399
  """Run Simulated Annealing optimization."""
400
+ # Use config.iterations as default if not specified
401
+ if max_iterations is None:
402
+ max_iterations = self.config.iterations * 1000 # Scale iterations for SA
403
+
404
  # Initialize with a random solution
405
  current_solution = np.random.randint(0, 3, self.n_dimensions)
406
+ current_block_sol = self._create_block_assignment(current_solution) if self.optimize_blocks else None
407
+
408
+ if self.optimize_blocks and current_block_sol is not None:
409
+ current_fitness = self.evaluator.schedule_fitness_function(current_solution, current_block_sol)
410
+ else:
411
+ current_fitness = self.evaluator.fitness_function(current_solution)
412
 
413
  best_solution = current_solution.copy()
414
+ best_block_sol = current_block_sol.copy() if current_block_sol is not None else None
415
  best_fitness = current_fitness
416
 
417
  print(f"Starting Simulated Annealing optimization for {max_iterations} iterations")
418
+ if self.optimize_blocks:
419
+ print(f"Optimizing block assignments for {self.n_blocks} service blocks")
420
 
421
  for iteration in range(max_iterations):
422
  try:
423
  # Generate neighbor
424
  neighbor = self._get_neighbor(current_solution)
425
+
426
+ if self.optimize_blocks:
427
+ service_indices = np.where(neighbor == 0)[0]
428
+ # Sometimes create new block assignments, sometimes just modify
429
+ if np.random.random() < 0.3:
430
+ neighbor_block_sol = self._create_block_assignment(neighbor)
431
+ else:
432
+ neighbor_block_sol = self._get_block_neighbor(
433
+ current_block_sol if current_block_sol is not None else self._create_block_assignment(neighbor),
434
+ service_indices
435
+ )
436
+ neighbor_fitness = self.evaluator.schedule_fitness_function(neighbor, neighbor_block_sol)
437
+ else:
438
+ neighbor_block_sol = None
439
+ neighbor_fitness = self.evaluator.fitness_function(neighbor)
440
 
441
  # Calculate acceptance probability
442
  temperature = self._temperature(iteration, max_iterations)
 
445
  # Accept better solution
446
  current_solution = neighbor
447
  current_fitness = neighbor_fitness
448
+ current_block_sol = neighbor_block_sol
449
  elif temperature > 0:
450
  # Accept worse solution with probability
451
  delta = neighbor_fitness - current_fitness
 
453
  if np.random.random() < probability:
454
  current_solution = neighbor
455
  current_fitness = neighbor_fitness
456
+ current_block_sol = neighbor_block_sol
457
 
458
  # Update best solution
459
  if current_fitness < best_fitness:
460
  best_solution = current_solution.copy()
461
  best_fitness = current_fitness
462
+ best_block_sol = current_block_sol.copy() if current_block_sol is not None else None
463
 
464
  if iteration % 1000 == 0:
465
  print(f"Iteration {iteration}: Best Fitness = {best_fitness:.2f}, Temperature = {temperature:.2f}")
 
468
  print(f"Error in SA iteration {iteration}: {e}")
469
  break
470
 
471
+ return self._build_result(best_solution, best_fitness, best_block_sol)
472
 
473
+ def _build_result(self, solution: np.ndarray, fitness: float,
474
+ block_solution: Optional[np.ndarray] = None) -> OptimizationResult:
475
  """Build optimization result from solution."""
476
  objectives = self.evaluator.calculate_objectives(solution)
477
 
 
484
  valid, reason = self.evaluator.check_hard_constraints(ts_id)
485
  explanations[ts_id] = "✓ Fit for service" if valid else f"⚠ {reason}"
486
 
487
+ # Build block assignments
488
+ block_assignments = {}
489
+ if block_solution is not None and self.optimize_blocks:
490
+ for ts_id in service:
491
+ block_assignments[ts_id] = []
492
+
493
+ for block_idx, train_idx in enumerate(block_solution):
494
+ if 0 <= train_idx < len(self.evaluator.trainsets):
495
+ ts_id = self.evaluator.trainsets[train_idx]
496
+ if ts_id in block_assignments:
497
+ block_id = self.evaluator.all_blocks[block_idx]['block_id']
498
+ block_assignments[ts_id].append(block_id)
499
+
500
  return OptimizationResult(
501
  selected_trainsets=service,
502
  standby_trainsets=standby,
503
  maintenance_trainsets=maintenance,
504
  objectives=objectives,
505
  fitness_score=fitness,
506
+ explanation=explanations,
507
+ service_block_assignments=block_assignments
508
  )
greedyOptim/evaluator.py CHANGED
@@ -7,6 +7,7 @@ from datetime import datetime
7
  from typing import Dict, List, Tuple, Optional
8
 
9
  from .models import OptimizationConfig, TrainsetConstraints
 
10
 
11
 
12
  # Status normalization mappings (backend format -> internal format)
@@ -50,6 +51,11 @@ class TrainsetSchedulingEvaluator:
50
  self.trainsets = [ts['trainset_id'] for ts in data['trainset_status']]
51
  self.num_trainsets = len(self.trainsets)
52
 
 
 
 
 
 
53
  # Build lookup dictionaries
54
  self._build_lookups()
55
 
@@ -276,4 +282,146 @@ class TrainsetSchedulingEvaluator:
276
  obj['constraint_penalty'] * 5.0 # Minimize (positive weight)
277
  )
278
 
279
- return fitness
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7
  from typing import Dict, List, Tuple, Optional
8
 
9
  from .models import OptimizationConfig, TrainsetConstraints
10
+ from .service_blocks import ServiceBlockGenerator
11
 
12
 
13
  # Status normalization mappings (backend format -> internal format)
 
51
  self.trainsets = [ts['trainset_id'] for ts in data['trainset_status']]
52
  self.num_trainsets = len(self.trainsets)
53
 
54
+ # Service block generator for schedule optimization
55
+ self.block_generator = ServiceBlockGenerator()
56
+ self.all_blocks = self.block_generator.get_all_service_blocks()
57
+ self.num_blocks = len(self.all_blocks)
58
+
59
  # Build lookup dictionaries
60
  self._build_lookups()
61
 
 
282
  obj['constraint_penalty'] * 5.0 # Minimize (positive weight)
283
  )
284
 
285
+ return fitness
286
+
287
+ def evaluate_schedule_quality(self, service_trains: List[str],
288
+ block_assignments: Dict[str, List[int]]) -> Dict[str, float]:
289
+ """Evaluate schedule quality objectives.
290
+
291
+ Args:
292
+ service_trains: List of trainset IDs in service
293
+ block_assignments: Maps trainset_id -> list of block indices
294
+
295
+ Returns:
296
+ Dictionary with schedule quality scores
297
+ """
298
+ scores = {
299
+ 'headway_consistency': 0.0,
300
+ 'service_coverage': 0.0,
301
+ 'block_distribution': 0.0,
302
+ 'peak_coverage': 0.0
303
+ }
304
+
305
+ if not block_assignments:
306
+ return scores
307
+
308
+ # Flatten all assigned block indices
309
+ all_assigned_blocks = set()
310
+ blocks_per_train = []
311
+
312
+ for ts_id, block_indices in block_assignments.items():
313
+ all_assigned_blocks.update(block_indices)
314
+ blocks_per_train.append(len(block_indices))
315
+
316
+ # 1. Service Coverage: What % of blocks are covered?
317
+ coverage = len(all_assigned_blocks) / self.num_blocks if self.num_blocks > 0 else 0
318
+ scores['service_coverage'] = coverage * 100.0
319
+
320
+ # 2. Peak Coverage: Are peak blocks covered?
321
+ peak_indices = self.block_generator.get_peak_block_indices()
322
+ covered_peak = len(all_assigned_blocks.intersection(peak_indices))
323
+ peak_coverage = covered_peak / len(peak_indices) if peak_indices else 0
324
+ scores['peak_coverage'] = peak_coverage * 100.0
325
+
326
+ # 3. Block Distribution: Are blocks evenly distributed across trains?
327
+ if blocks_per_train and len(blocks_per_train) > 1:
328
+ std_dev = float(np.std(blocks_per_train))
329
+ mean_blocks = float(np.mean(blocks_per_train))
330
+ cv = std_dev / mean_blocks if mean_blocks > 0 else 1.0
331
+ # Lower CV = better distribution (100 - penalty)
332
+ scores['block_distribution'] = max(0, 100.0 - cv * 50.0)
333
+ else:
334
+ scores['block_distribution'] = 100.0
335
+
336
+ # 4. Headway Consistency: Check departure gaps
337
+ scores['headway_consistency'] = self._evaluate_headway_consistency(all_assigned_blocks)
338
+
339
+ return scores
340
+
341
+ def _evaluate_headway_consistency(self, assigned_block_indices: set) -> float:
342
+ """Evaluate headway consistency for assigned blocks.
343
+
344
+ Args:
345
+ assigned_block_indices: Set of block indices that are covered
346
+
347
+ Returns:
348
+ Headway consistency score (0-100)
349
+ """
350
+ if not assigned_block_indices:
351
+ return 0.0
352
+
353
+ # Get departure times of assigned blocks
354
+ departure_minutes = []
355
+ for idx in assigned_block_indices:
356
+ if idx < len(self.all_blocks):
357
+ block = self.all_blocks[idx]
358
+ time_str = block['departure_time']
359
+ hour, minute = map(int, time_str.split(':'))
360
+ departure_minutes.append(hour * 60 + minute)
361
+
362
+ if len(departure_minutes) < 2:
363
+ return 50.0 # Not enough data
364
+
365
+ # Sort and calculate gaps
366
+ departure_minutes.sort()
367
+ gaps = []
368
+ for i in range(1, len(departure_minutes)):
369
+ gaps.append(departure_minutes[i] - departure_minutes[i-1])
370
+
371
+ if not gaps:
372
+ return 50.0
373
+
374
+ # Calculate coefficient of variation for gaps
375
+ mean_gap = float(np.mean(gaps))
376
+ std_gap = float(np.std(gaps))
377
+
378
+ # Lower CV = more consistent headways
379
+ cv = std_gap / mean_gap if mean_gap > 0 else 1.0
380
+
381
+ # Score: 100 for perfect consistency (CV=0), decreasing with higher CV
382
+ score = max(0, 100.0 - cv * 100.0)
383
+
384
+ return score
385
+
386
+ def schedule_fitness_function(self, trainset_solution: np.ndarray,
387
+ block_solution: np.ndarray) -> float:
388
+ """Combined fitness function for trainset and block assignment optimization.
389
+
390
+ Args:
391
+ trainset_solution: Array where trainset_solution[i] = 0/1/2 (service/standby/maint)
392
+ block_solution: Array where block_solution[j] = trainset_index or -1 (unassigned)
393
+
394
+ Returns:
395
+ Combined fitness score (lower is better)
396
+ """
397
+ # First evaluate trainset selection
398
+ base_fitness = self.fitness_function(trainset_solution)
399
+
400
+ # Decode service trains
401
+ service_train_indices = [i for i, v in enumerate(trainset_solution) if v == 0]
402
+ service_trains = [self.trainsets[i] for i in service_train_indices]
403
+
404
+ # Build block assignments
405
+ block_assignments = {}
406
+ for ts_idx in service_train_indices:
407
+ ts_id = self.trainsets[ts_idx]
408
+ block_assignments[ts_id] = []
409
+
410
+ for block_idx, assigned_train_idx in enumerate(block_solution):
411
+ if assigned_train_idx >= 0 and assigned_train_idx < len(self.trainsets):
412
+ ts_id = self.trainsets[int(assigned_train_idx)]
413
+ if ts_id in block_assignments:
414
+ block_assignments[ts_id].append(block_idx)
415
+
416
+ # Evaluate schedule quality
417
+ schedule_scores = self.evaluate_schedule_quality(service_trains, block_assignments)
418
+
419
+ # Add schedule objectives to fitness
420
+ schedule_penalty = (
421
+ -(schedule_scores['service_coverage'] * 1.5) + # Maximize coverage
422
+ -(schedule_scores['peak_coverage'] * 2.0) + # Maximize peak coverage
423
+ -(schedule_scores['block_distribution'] * 1.0) + # Maximize even distribution
424
+ -(schedule_scores['headway_consistency'] * 1.0) # Maximize consistency
425
+ )
426
+
427
+ return base_fitness + schedule_penalty
greedyOptim/genetic_algorithm.py CHANGED
@@ -2,7 +2,7 @@
2
  Genetic Algorithm optimizer for trainset scheduling.
3
  """
4
  import numpy as np
5
- from typing import Tuple, Optional
6
 
7
  from .models import OptimizationResult, OptimizationConfig
8
  from .evaluator import TrainsetSchedulingEvaluator
@@ -15,6 +15,8 @@ class GeneticAlgorithmOptimizer:
15
  self.evaluator = evaluator
16
  self.config = config or OptimizationConfig()
17
  self.n_genes = evaluator.num_trainsets
 
 
18
 
19
  def initialize_population(self) -> np.ndarray:
20
  """Initialize random population with smart seeding."""
@@ -26,7 +28,7 @@ class GeneticAlgorithmOptimizer:
26
 
27
  # Randomly assign required number to service, min to standby, rest to maintenance
28
  indices = np.random.permutation(self.n_genes)
29
- service_count = self.config.required_service_trains
30
  standby_count = self.config.min_standby
31
 
32
  solution[indices[:service_count]] = 0 # Service
@@ -42,6 +44,32 @@ class GeneticAlgorithmOptimizer:
42
 
43
  return np.array(population)
44
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
45
  def tournament_selection(self, population: np.ndarray,
46
  fitness: np.ndarray,
47
  tournament_size: int = 5) -> np.ndarray:
@@ -83,8 +111,9 @@ class GeneticAlgorithmOptimizer:
83
  standby_count = np.sum(repaired == 1)
84
 
85
  # If too few in service, convert some from maintenance/standby
86
- if service_count < self.config.required_service_trains:
87
- needed = self.config.required_service_trains - service_count
 
88
  candidates = np.where((repaired == 1) | (repaired == 2))[0]
89
  if len(candidates) >= needed:
90
  selected = np.random.choice(candidates, needed, replace=False)
@@ -101,35 +130,83 @@ class GeneticAlgorithmOptimizer:
101
 
102
  return repaired
103
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
104
  def optimize(self) -> OptimizationResult:
105
  """Run genetic algorithm optimization."""
106
  population = self.initialize_population()
 
 
 
 
 
 
107
  best_fitness = float('inf')
108
  best_solution: np.ndarray = population[0].copy()
 
109
 
110
  print(f"Starting GA optimization with {self.config.population_size} individuals for {self.config.generations} generations")
 
 
111
 
112
  for gen in range(self.config.generations):
113
  try:
114
  # Evaluate fitness
115
- fitness = np.array([self.evaluator.fitness_function(ind) for ind in population])
 
 
 
 
 
 
116
 
117
  # Track best solution
118
  gen_best_idx = np.argmin(fitness)
119
  if fitness[gen_best_idx] < best_fitness:
120
  best_fitness = fitness[gen_best_idx]
121
  best_solution = population[gen_best_idx].copy()
 
 
122
 
123
  if gen % 50 == 0:
124
  print(f"Generation {gen}: Best Fitness = {best_fitness:.2f}")
125
 
126
  # Create new population
127
  new_population = []
 
128
 
129
  # Elitism - keep best solutions
130
  elite_indices = np.argsort(fitness)[:self.config.elite_size]
131
  for idx in elite_indices:
132
  new_population.append(population[idx].copy())
 
 
133
 
134
  # Generate offspring through selection, crossover, and mutation
135
  while len(new_population) < self.config.population_size:
@@ -147,17 +224,49 @@ class GeneticAlgorithmOptimizer:
147
  new_population.append(child1)
148
  if len(new_population) < self.config.population_size:
149
  new_population.append(child2)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
150
 
151
  population = np.array(new_population)
 
 
152
 
153
  except Exception as e:
154
  print(f"Error in generation {gen}: {e}")
155
  break
156
 
157
  # Build result
158
- return self._build_result(best_solution, best_fitness)
159
 
160
- def _build_result(self, solution: np.ndarray, fitness: float) -> OptimizationResult:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
161
  """Build optimization result from solution."""
162
  objectives = self.evaluator.calculate_objectives(solution)
163
 
@@ -172,11 +281,25 @@ class GeneticAlgorithmOptimizer:
172
  valid, reason = self.evaluator.check_hard_constraints(ts_id)
173
  explanations[ts_id] = "�� Fit for service" if valid else f"⚠ {reason}"
174
 
 
 
 
 
 
 
 
 
 
 
 
 
 
175
  return OptimizationResult(
176
  selected_trainsets=service,
177
  standby_trainsets=standby,
178
  maintenance_trainsets=maintenance,
179
  objectives=objectives,
180
  fitness_score=fitness,
181
- explanation=explanations
 
182
  )
 
2
  Genetic Algorithm optimizer for trainset scheduling.
3
  """
4
  import numpy as np
5
+ from typing import Tuple, Optional, Dict, List
6
 
7
  from .models import OptimizationResult, OptimizationConfig
8
  from .evaluator import TrainsetSchedulingEvaluator
 
15
  self.evaluator = evaluator
16
  self.config = config or OptimizationConfig()
17
  self.n_genes = evaluator.num_trainsets
18
+ self.n_blocks = evaluator.num_blocks
19
+ self.optimize_blocks = self.config.optimize_block_assignment
20
 
21
  def initialize_population(self) -> np.ndarray:
22
  """Initialize random population with smart seeding."""
 
28
 
29
  # Randomly assign required number to service, min to standby, rest to maintenance
30
  indices = np.random.permutation(self.n_genes)
31
+ service_count = min(self.config.required_service_trains, self.n_genes)
32
  standby_count = self.config.min_standby
33
 
34
  solution[indices[:service_count]] = 0 # Service
 
44
 
45
  return np.array(population)
46
 
47
+ def initialize_block_population(self, trainset_population: np.ndarray) -> np.ndarray:
48
+ """Initialize block assignment population.
49
+
50
+ For each trainset solution, initialize a block assignment solution.
51
+ block_solution[i] = index of trainset assigned to block i, or -1 if unassigned.
52
+ """
53
+ block_population = []
54
+
55
+ for trainset_sol in trainset_population:
56
+ # Get service train indices
57
+ service_indices = np.where(trainset_sol == 0)[0]
58
+
59
+ if len(service_indices) == 0:
60
+ # No service trains, all blocks unassigned
61
+ block_sol = np.full(self.n_blocks, -1, dtype=int)
62
+ else:
63
+ # Distribute blocks evenly among service trains
64
+ block_sol = np.zeros(self.n_blocks, dtype=int)
65
+ for i in range(self.n_blocks):
66
+ # Assign to a random service train
67
+ block_sol[i] = service_indices[i % len(service_indices)]
68
+
69
+ block_population.append(block_sol)
70
+
71
+ return np.array(block_population)
72
+
73
  def tournament_selection(self, population: np.ndarray,
74
  fitness: np.ndarray,
75
  tournament_size: int = 5) -> np.ndarray:
 
111
  standby_count = np.sum(repaired == 1)
112
 
113
  # If too few in service, convert some from maintenance/standby
114
+ target_service = min(self.config.required_service_trains, self.n_genes - self.config.min_standby)
115
+ if service_count < target_service:
116
+ needed = target_service - service_count
117
  candidates = np.where((repaired == 1) | (repaired == 2))[0]
118
  if len(candidates) >= needed:
119
  selected = np.random.choice(candidates, needed, replace=False)
 
130
 
131
  return repaired
132
 
133
+ def repair_block_solution(self, block_sol: np.ndarray, trainset_sol: np.ndarray) -> np.ndarray:
134
+ """Repair block assignments to only assign to service trains."""
135
+ repaired = block_sol.copy()
136
+ service_indices = np.where(trainset_sol == 0)[0]
137
+
138
+ if len(service_indices) == 0:
139
+ return np.full(self.n_blocks, -1, dtype=int)
140
+
141
+ for i in range(len(repaired)):
142
+ if repaired[i] not in service_indices:
143
+ # Reassign to a random service train
144
+ repaired[i] = np.random.choice(service_indices)
145
+
146
+ return repaired
147
+
148
+ def mutate_block_solution(self, block_sol: np.ndarray, service_indices: np.ndarray) -> np.ndarray:
149
+ """Mutate block assignments."""
150
+ mutated = block_sol.copy()
151
+
152
+ if len(service_indices) == 0:
153
+ return mutated
154
+
155
+ for i in range(len(mutated)):
156
+ if np.random.random() < self.config.mutation_rate:
157
+ mutated[i] = np.random.choice(service_indices)
158
+
159
+ return mutated
160
+
161
  def optimize(self) -> OptimizationResult:
162
  """Run genetic algorithm optimization."""
163
  population = self.initialize_population()
164
+
165
+ # Initialize block population if optimizing blocks
166
+ block_population = None
167
+ if self.optimize_blocks:
168
+ block_population = self.initialize_block_population(population)
169
+
170
  best_fitness = float('inf')
171
  best_solution: np.ndarray = population[0].copy()
172
+ best_block_solution: Optional[np.ndarray] = None
173
 
174
  print(f"Starting GA optimization with {self.config.population_size} individuals for {self.config.generations} generations")
175
+ if self.optimize_blocks:
176
+ print(f"Optimizing block assignments for {self.n_blocks} service blocks")
177
 
178
  for gen in range(self.config.generations):
179
  try:
180
  # Evaluate fitness
181
+ if self.optimize_blocks and block_population is not None:
182
+ fitness = np.array([
183
+ self.evaluator.schedule_fitness_function(population[i], block_population[i])
184
+ for i in range(len(population))
185
+ ])
186
+ else:
187
+ fitness = np.array([self.evaluator.fitness_function(ind) for ind in population])
188
 
189
  # Track best solution
190
  gen_best_idx = np.argmin(fitness)
191
  if fitness[gen_best_idx] < best_fitness:
192
  best_fitness = fitness[gen_best_idx]
193
  best_solution = population[gen_best_idx].copy()
194
+ if self.optimize_blocks and block_population is not None:
195
+ best_block_solution = block_population[gen_best_idx].copy()
196
 
197
  if gen % 50 == 0:
198
  print(f"Generation {gen}: Best Fitness = {best_fitness:.2f}")
199
 
200
  # Create new population
201
  new_population = []
202
+ new_block_population = [] if self.optimize_blocks else None
203
 
204
  # Elitism - keep best solutions
205
  elite_indices = np.argsort(fitness)[:self.config.elite_size]
206
  for idx in elite_indices:
207
  new_population.append(population[idx].copy())
208
+ if self.optimize_blocks and block_population is not None:
209
+ new_block_population.append(block_population[idx].copy())
210
 
211
  # Generate offspring through selection, crossover, and mutation
212
  while len(new_population) < self.config.population_size:
 
224
  new_population.append(child1)
225
  if len(new_population) < self.config.population_size:
226
  new_population.append(child2)
227
+
228
+ # Handle block solutions
229
+ if self.optimize_blocks and new_block_population is not None:
230
+ service_indices_1 = np.where(child1 == 0)[0]
231
+ service_indices_2 = np.where(child2 == 0)[0]
232
+
233
+ # Create new block assignments for children
234
+ block_child1 = self._create_block_for_trainset(child1)
235
+ block_child1 = self.mutate_block_solution(block_child1, service_indices_1)
236
+
237
+ new_block_population.append(block_child1)
238
+
239
+ if len(new_block_population) < self.config.population_size:
240
+ block_child2 = self._create_block_for_trainset(child2)
241
+ block_child2 = self.mutate_block_solution(block_child2, service_indices_2)
242
+ new_block_population.append(block_child2)
243
 
244
  population = np.array(new_population)
245
+ if self.optimize_blocks:
246
+ block_population = np.array(new_block_population)
247
 
248
  except Exception as e:
249
  print(f"Error in generation {gen}: {e}")
250
  break
251
 
252
  # Build result
253
+ return self._build_result(best_solution, best_fitness, best_block_solution)
254
 
255
+ def _create_block_for_trainset(self, trainset_sol: np.ndarray) -> np.ndarray:
256
+ """Create block assignments for a trainset solution."""
257
+ service_indices = np.where(trainset_sol == 0)[0]
258
+
259
+ if len(service_indices) == 0:
260
+ return np.full(self.n_blocks, -1, dtype=int)
261
+
262
+ block_sol = np.zeros(self.n_blocks, dtype=int)
263
+ for i in range(self.n_blocks):
264
+ block_sol[i] = service_indices[i % len(service_indices)]
265
+
266
+ return block_sol
267
+
268
+ def _build_result(self, solution: np.ndarray, fitness: float,
269
+ block_solution: Optional[np.ndarray] = None) -> OptimizationResult:
270
  """Build optimization result from solution."""
271
  objectives = self.evaluator.calculate_objectives(solution)
272
 
 
281
  valid, reason = self.evaluator.check_hard_constraints(ts_id)
282
  explanations[ts_id] = "�� Fit for service" if valid else f"⚠ {reason}"
283
 
284
+ # Build block assignments
285
+ block_assignments = {}
286
+ if block_solution is not None and self.optimize_blocks:
287
+ for ts_id in service:
288
+ block_assignments[ts_id] = []
289
+
290
+ for block_idx, train_idx in enumerate(block_solution):
291
+ if 0 <= train_idx < len(self.evaluator.trainsets):
292
+ ts_id = self.evaluator.trainsets[train_idx]
293
+ if ts_id in block_assignments:
294
+ block_id = self.evaluator.all_blocks[block_idx]['block_id']
295
+ block_assignments[ts_id].append(block_id)
296
+
297
  return OptimizationResult(
298
  selected_trainsets=service,
299
  standby_trainsets=standby,
300
  maintenance_trainsets=maintenance,
301
  objectives=objectives,
302
  fitness_score=fitness,
303
+ explanation=explanations,
304
+ service_block_assignments=block_assignments
305
  )
greedyOptim/models.py CHANGED
@@ -197,6 +197,8 @@ class OptimizationResult:
197
  objectives: Dict[str, float]
198
  fitness_score: float
199
  explanation: Dict[str, str]
 
 
200
 
201
 
202
  @dataclass
@@ -206,9 +208,11 @@ class OptimizationConfig:
206
  min_standby: int = 2
207
  population_size: int = 100
208
  generations: int = 200
 
209
  mutation_rate: float = 0.1
210
  crossover_rate: float = 0.8
211
  elite_size: int = 5
 
212
 
213
 
214
  @dataclass
 
197
  objectives: Dict[str, float]
198
  fitness_score: float
199
  explanation: Dict[str, str]
200
+ # Block assignments: maps trainset_id -> list of block_ids
201
+ service_block_assignments: Dict[str, List[str]] = field(default_factory=dict)
202
 
203
 
204
  @dataclass
 
208
  min_standby: int = 2
209
  population_size: int = 100
210
  generations: int = 200
211
+ iterations: int = 15 # For SA, CMA-ES, PSO (configurable)
212
  mutation_rate: float = 0.1
213
  crossover_rate: float = 0.8
214
  elite_size: int = 5
215
+ optimize_block_assignment: bool = True # Enable block assignment optimization
216
 
217
 
218
  @dataclass
greedyOptim/schedule_generator.py CHANGED
@@ -94,11 +94,30 @@ class ScheduleGenerator:
94
  total_km = 0
95
  mileages = []
96
 
 
 
 
 
 
 
 
 
 
 
 
 
97
  # Service trains
98
  num_service = len(optimization_result.selected_trainsets)
99
  for idx, ts_id in enumerate(optimization_result.selected_trainsets):
 
 
 
 
 
100
  trainset, ts_alerts, km = self._generate_service_trainset(
101
- ts_id, idx, num_service
 
 
102
  )
103
  trainsets.append(trainset)
104
  alerts.extend(ts_alerts)
@@ -163,18 +182,38 @@ class ScheduleGenerator:
163
  self,
164
  trainset_id: str,
165
  index: int,
166
- num_service: int
 
 
167
  ) -> tuple:
168
  """Generate schedule for a service trainset.
169
 
 
 
 
 
 
 
 
170
  Returns:
171
  Tuple of (ScheduleTrainset, alerts, daily_km)
172
  """
173
  ts_data = self.status_map.get(trainset_id, {})
174
  cumulative_km = ts_data.get('total_mileage_km', 0)
175
 
176
- # Generate service blocks
177
- blocks_data = self.service_block_generator.generate_service_blocks(index, num_service)
 
 
 
 
 
 
 
 
 
 
 
178
  service_blocks = [
179
  ServiceBlock(
180
  block_id=b['block_id'],
 
94
  total_km = 0
95
  mileages = []
96
 
97
+ # Check if we have optimized block assignments
98
+ has_optimized_blocks = (
99
+ optimization_result.service_block_assignments and
100
+ len(optimization_result.service_block_assignments) > 0
101
+ )
102
+
103
+ # Build block lookup for optimized assignments
104
+ block_lookup = {}
105
+ if has_optimized_blocks:
106
+ all_blocks = self.service_block_generator.get_all_service_blocks()
107
+ block_lookup = {b['block_id']: b for b in all_blocks}
108
+
109
  # Service trains
110
  num_service = len(optimization_result.selected_trainsets)
111
  for idx, ts_id in enumerate(optimization_result.selected_trainsets):
112
+ # Get optimized blocks for this trainset if available
113
+ assigned_block_ids = None
114
+ if has_optimized_blocks:
115
+ assigned_block_ids = optimization_result.service_block_assignments.get(ts_id, [])
116
+
117
  trainset, ts_alerts, km = self._generate_service_trainset(
118
+ ts_id, idx, num_service,
119
+ assigned_block_ids=assigned_block_ids,
120
+ block_lookup=block_lookup
121
  )
122
  trainsets.append(trainset)
123
  alerts.extend(ts_alerts)
 
182
  self,
183
  trainset_id: str,
184
  index: int,
185
+ num_service: int,
186
+ assigned_block_ids: list = None,
187
+ block_lookup: dict = None
188
  ) -> tuple:
189
  """Generate schedule for a service trainset.
190
 
191
+ Args:
192
+ trainset_id: ID of the trainset
193
+ index: Index in service trainsets list
194
+ num_service: Total number of service trainsets
195
+ assigned_block_ids: List of block IDs from optimizer (if using optimized blocks)
196
+ block_lookup: Dictionary mapping block_id to block data
197
+
198
  Returns:
199
  Tuple of (ScheduleTrainset, alerts, daily_km)
200
  """
201
  ts_data = self.status_map.get(trainset_id, {})
202
  cumulative_km = ts_data.get('total_mileage_km', 0)
203
 
204
+ # Use optimized blocks if available, otherwise fall back to index-based generation
205
+ if assigned_block_ids and block_lookup:
206
+ # Use optimizer-assigned blocks
207
+ blocks_data = []
208
+ for block_id in assigned_block_ids:
209
+ if block_id in block_lookup:
210
+ blocks_data.append(block_lookup[block_id])
211
+ # Sort by departure time
212
+ blocks_data.sort(key=lambda b: b['departure_time'])
213
+ else:
214
+ # Fall back to legacy index-based block generation
215
+ blocks_data = self.service_block_generator.generate_service_blocks(index, num_service)
216
+
217
  service_blocks = [
218
  ServiceBlock(
219
  block_id=b['block_id'],
greedyOptim/scheduler.py CHANGED
@@ -69,15 +69,16 @@ class TrainsetSchedulingOptimizer:
69
  result = optimizer.optimize()
70
  elif method == 'cmaes':
71
  optimizer = CMAESOptimizer(self.evaluator, self.config)
72
- generations = kwargs.get('generations', 150)
 
73
  result = optimizer.optimize(generations)
74
  elif method == 'pso':
75
  optimizer = ParticleSwarmOptimizer(self.evaluator, self.config)
76
- generations = kwargs.get('generations', 200)
77
  result = optimizer.optimize(generations)
78
  elif method == 'sa':
79
  optimizer = SimulatedAnnealingOptimizer(self.evaluator, self.config)
80
- max_iterations = kwargs.get('max_iterations', 10000)
81
  result = optimizer.optimize(max_iterations)
82
  elif method == 'nsga2':
83
  optimizer = MultiObjectiveOptimizer(self.evaluator, self.config)
 
69
  result = optimizer.optimize()
70
  elif method == 'cmaes':
71
  optimizer = CMAESOptimizer(self.evaluator, self.config)
72
+ # Use config.iterations if generations not specified in kwargs
73
+ generations = kwargs.get('generations') # None means use optimizer's default
74
  result = optimizer.optimize(generations)
75
  elif method == 'pso':
76
  optimizer = ParticleSwarmOptimizer(self.evaluator, self.config)
77
+ generations = kwargs.get('generations') # None means use optimizer's default
78
  result = optimizer.optimize(generations)
79
  elif method == 'sa':
80
  optimizer = SimulatedAnnealingOptimizer(self.evaluator, self.config)
81
+ max_iterations = kwargs.get('max_iterations') # None means use optimizer's default
82
  result = optimizer.optimize(max_iterations)
83
  elif method == 'nsga2':
84
  optimizer = MultiObjectiveOptimizer(self.evaluator, self.config)
greedyOptim/service_blocks.py CHANGED
@@ -2,8 +2,8 @@
2
  Service Block Generator
3
  Generates realistic service blocks with departure times for train schedules.
4
  """
5
- from typing import List, Dict
6
- from datetime import time
7
 
8
 
9
  class ServiceBlockGenerator:
@@ -27,6 +27,112 @@ class ServiceBlockGenerator:
27
  """Initialize service block generator."""
28
  self.round_trip_time_hours = (self.ROUTE_LENGTH_KM * 2) / self.AVG_SPEED_KMH
29
  self.round_trip_time_minutes = self.round_trip_time_hours * 60
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
30
 
31
  def generate_service_blocks(self, train_index: int, num_service_trains: int) -> List[Dict]:
32
  """Generate service blocks for a train with staggered departures.
 
2
  Service Block Generator
3
  Generates realistic service blocks with departure times for train schedules.
4
  """
5
+ from typing import List, Dict, Tuple
6
+ from datetime import time, datetime, timedelta
7
 
8
 
9
  class ServiceBlockGenerator:
 
27
  """Initialize service block generator."""
28
  self.round_trip_time_hours = (self.ROUTE_LENGTH_KM * 2) / self.AVG_SPEED_KMH
29
  self.round_trip_time_minutes = self.round_trip_time_hours * 60
30
+ self._all_blocks_cache = None
31
+
32
+ def get_all_service_blocks(self) -> List[Dict]:
33
+ """Get all available service blocks for the day.
34
+
35
+ Pre-generates all possible service blocks that need to be assigned to trainsets.
36
+ These represent the "slots" that the optimizer will fill.
37
+
38
+ Returns:
39
+ List of all service block dictionaries with block_id, departure_time, etc.
40
+ """
41
+ if self._all_blocks_cache is not None:
42
+ return self._all_blocks_cache
43
+
44
+ all_blocks = []
45
+ block_counter = 0
46
+
47
+ # Morning peak blocks (7:00 - 10:00)
48
+ # Need departures every 6 minutes = 10 per hour = 30 blocks
49
+ for hour in [7, 8, 9]:
50
+ for minute in range(0, 60, 6):
51
+ block_counter += 1
52
+ origin = self.TERMINALS[block_counter % 2]
53
+ destination = self.TERMINALS[(block_counter + 1) % 2]
54
+ all_blocks.append({
55
+ 'block_id': f'BLK-{block_counter:03d}',
56
+ 'departure_time': f'{hour:02d}:{minute:02d}',
57
+ 'origin': origin,
58
+ 'destination': destination,
59
+ 'trip_count': 3, # ~3 hours of peak service
60
+ 'estimated_km': int(3 * self.ROUTE_LENGTH_KM * 2),
61
+ 'period': 'morning_peak',
62
+ 'is_peak': True
63
+ })
64
+
65
+ # Midday off-peak blocks (10:00 - 17:00)
66
+ # Need departures every 15 minutes = 4 per hour = 28 blocks
67
+ for hour in range(10, 17):
68
+ for minute in range(0, 60, 15):
69
+ block_counter += 1
70
+ origin = self.TERMINALS[block_counter % 2]
71
+ destination = self.TERMINALS[(block_counter + 1) % 2]
72
+ all_blocks.append({
73
+ 'block_id': f'BLK-{block_counter:03d}',
74
+ 'departure_time': f'{hour:02d}:{minute:02d}',
75
+ 'origin': origin,
76
+ 'destination': destination,
77
+ 'trip_count': 2,
78
+ 'estimated_km': int(2 * self.ROUTE_LENGTH_KM * 2),
79
+ 'period': 'midday',
80
+ 'is_peak': False
81
+ })
82
+
83
+ # Evening peak blocks (17:00 - 21:00)
84
+ # Need departures every 6 minutes = 10 per hour = 40 blocks
85
+ for hour in range(17, 21):
86
+ for minute in range(0, 60, 6):
87
+ block_counter += 1
88
+ origin = self.TERMINALS[block_counter % 2]
89
+ destination = self.TERMINALS[(block_counter + 1) % 2]
90
+ all_blocks.append({
91
+ 'block_id': f'BLK-{block_counter:03d}',
92
+ 'departure_time': f'{hour:02d}:{minute:02d}',
93
+ 'origin': origin,
94
+ 'destination': destination,
95
+ 'trip_count': 3,
96
+ 'estimated_km': int(3 * self.ROUTE_LENGTH_KM * 2),
97
+ 'period': 'evening_peak',
98
+ 'is_peak': True
99
+ })
100
+
101
+ # Late evening blocks (21:00 - 23:00)
102
+ # Need departures every 15 minutes = 4 per hour = 8 blocks
103
+ for hour in range(21, 23):
104
+ for minute in range(0, 60, 15):
105
+ block_counter += 1
106
+ origin = self.TERMINALS[block_counter % 2]
107
+ destination = self.TERMINALS[(block_counter + 1) % 2]
108
+ all_blocks.append({
109
+ 'block_id': f'BLK-{block_counter:03d}',
110
+ 'departure_time': f'{hour:02d}:{minute:02d}',
111
+ 'origin': origin,
112
+ 'destination': destination,
113
+ 'trip_count': 1,
114
+ 'estimated_km': int(1 * self.ROUTE_LENGTH_KM * 2),
115
+ 'period': 'late_evening',
116
+ 'is_peak': False
117
+ })
118
+
119
+ self._all_blocks_cache = all_blocks
120
+ return all_blocks
121
+
122
+ def get_block_count(self) -> int:
123
+ """Get total number of service blocks."""
124
+ return len(self.get_all_service_blocks())
125
+
126
+ def get_peak_block_indices(self) -> List[int]:
127
+ """Get indices of peak hour blocks."""
128
+ blocks = self.get_all_service_blocks()
129
+ return [i for i, b in enumerate(blocks) if b['is_peak']]
130
+
131
+ def get_blocks_by_ids(self, block_ids: List[str]) -> List[Dict]:
132
+ """Get blocks by their IDs."""
133
+ all_blocks = self.get_all_service_blocks()
134
+ block_map = {b['block_id']: b for b in all_blocks}
135
+ return [block_map[bid] for bid in block_ids if bid in block_map]
136
 
137
  def generate_service_blocks(self, train_index: int, num_service_trains: int) -> List[Dict]:
138
  """Generate service blocks for a train with staggered departures.