|
|
""" |
|
|
Metro Train Schedule Optimizer |
|
|
Generates optimal daily schedules from 5:00 AM to 11:00 PM |
|
|
Considers train health, maintenance, branding, and mileage balancing |
|
|
""" |
|
|
import sys |
|
|
import os |
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
|
|
|
|
|
import random |
|
|
from datetime import datetime, time, timedelta |
|
|
from typing import List, Dict, Tuple, Optional |
|
|
|
|
|
from DataService.metro_models import ( |
|
|
DaySchedule, Trainset, TrainStatus, ServiceBlock, FleetSummary, |
|
|
OptimizationMetrics, Alert, Severity, DecisionRationale, |
|
|
TrainHealthStatus, Route, OperationalHours, FitnessCertificates, |
|
|
JobCards, Branding, CertificateStatus, MaintenanceType |
|
|
) |
|
|
from DataService.metro_data_generator import MetroDataGenerator |
|
|
|
|
|
|
|
|
class MetroScheduleOptimizer: |
|
|
"""Optimize daily metro train schedules""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
date: str, |
|
|
num_trains: int, |
|
|
route: Route, |
|
|
train_health: List[TrainHealthStatus], |
|
|
depot_name: str = "Muttom_Depot", |
|
|
include_job_cards: bool = False |
|
|
): |
|
|
self.date = date |
|
|
self.num_trains = num_trains |
|
|
self.route = route |
|
|
self.train_health = {t.trainset_id: t for t in train_health} |
|
|
self.depot_name = depot_name |
|
|
self.generator = MetroDataGenerator(num_trains) |
|
|
self.include_job_cards = include_job_cards |
|
|
|
|
|
|
|
|
self.op_hours = OperationalHours() |
|
|
self.one_way_time_minutes = int( |
|
|
(route.total_distance_km / route.avg_speed_kmh) * 60 |
|
|
) |
|
|
self.round_trip_time_minutes = ( |
|
|
self.one_way_time_minutes * 2 + route.turnaround_time_minutes * 2 |
|
|
) |
|
|
|
|
|
|
|
|
self.train_data = self._initialize_train_data() |
|
|
|
|
|
def _initialize_train_data(self) -> Dict[str, Dict]: |
|
|
"""Initialize all train-specific data""" |
|
|
data = {} |
|
|
mileages = self.generator.get_realistic_mileage_distribution(self.num_trains) |
|
|
|
|
|
for i, train_id in enumerate(self.generator.trainset_ids): |
|
|
health = self.train_health[train_id] |
|
|
fitness_certs = self.generator.generate_fitness_certificates(train_id) |
|
|
job_cards = self.generator.generate_job_cards(train_id) if self.include_job_cards else JobCards(open=0, blocking=[]) |
|
|
branding = self.generator.generate_branding() |
|
|
|
|
|
readiness = self.generator.calculate_readiness_score( |
|
|
fitness_certs, job_cards, health.component_health |
|
|
) |
|
|
|
|
|
data[train_id] = { |
|
|
"health": health, |
|
|
"fitness_certs": fitness_certs, |
|
|
"job_cards": job_cards, |
|
|
"branding": branding, |
|
|
"readiness_score": readiness, |
|
|
"cumulative_km": mileages[i], |
|
|
"stabling_bay": random.choice(self.generator.DEPOT_BAYS) |
|
|
} |
|
|
|
|
|
return data |
|
|
|
|
|
def _calculate_service_hours(self) -> int: |
|
|
"""Calculate total service hours in a day""" |
|
|
start = datetime.combine(datetime.today(), self.op_hours.start_time) |
|
|
end = datetime.combine(datetime.today(), self.op_hours.end_time) |
|
|
return int((end - start).total_seconds() / 3600) |
|
|
|
|
|
def _is_train_available( |
|
|
self, |
|
|
train_id: str, |
|
|
start_hour: int, |
|
|
end_hour: int |
|
|
) -> bool: |
|
|
"""Check if train is available for given time window""" |
|
|
health = self.train_data[train_id]["health"] |
|
|
|
|
|
if health.is_fully_healthy: |
|
|
return True |
|
|
|
|
|
if not health.available_hours: |
|
|
return False |
|
|
|
|
|
|
|
|
for avail_start, avail_end in health.available_hours: |
|
|
req_start = time(start_hour, 0) |
|
|
req_end = time(end_hour, 0) |
|
|
|
|
|
if req_start >= avail_start and req_end <= avail_end: |
|
|
return True |
|
|
|
|
|
return False |
|
|
|
|
|
def _rank_trains_for_service(self) -> List[Tuple[str, float]]: |
|
|
"""Rank trains by suitability for revenue service""" |
|
|
rankings = [] |
|
|
|
|
|
for train_id, data in self.train_data.items(): |
|
|
score = 0.0 |
|
|
|
|
|
|
|
|
score += data["readiness_score"] * 0.4 |
|
|
|
|
|
|
|
|
certs = data["fitness_certs"] |
|
|
if certs.rolling_stock.status == CertificateStatus.VALID: |
|
|
score += 0.15 |
|
|
if certs.signalling.status == CertificateStatus.VALID: |
|
|
score += 0.05 |
|
|
|
|
|
|
|
|
if len(data["job_cards"].blocking) == 0: |
|
|
score += 0.15 |
|
|
|
|
|
|
|
|
branding = data["branding"] |
|
|
if branding.exposure_priority == "CRITICAL": |
|
|
score += 0.15 |
|
|
elif branding.exposure_priority == "HIGH": |
|
|
score += 0.10 |
|
|
elif branding.exposure_priority == "MEDIUM": |
|
|
score += 0.05 |
|
|
|
|
|
|
|
|
max_mileage = 200000 |
|
|
mileage_factor = 1.0 - (data["cumulative_km"] / max_mileage) |
|
|
score += mileage_factor * 0.10 |
|
|
|
|
|
rankings.append((train_id, score)) |
|
|
|
|
|
return sorted(rankings, key=lambda x: x[1], reverse=True) |
|
|
|
|
|
def _generate_service_blocks( |
|
|
self, |
|
|
train_id: str, |
|
|
duty_name: str, |
|
|
num_blocks: int = 2 |
|
|
) -> Tuple[List[ServiceBlock], int]: |
|
|
"""Generate service blocks for a train""" |
|
|
blocks = [] |
|
|
total_km = 0 |
|
|
|
|
|
|
|
|
service_hours = self._calculate_service_hours() |
|
|
block_duration_hours = service_hours // num_blocks |
|
|
|
|
|
current_hour = self.op_hours.start_time.hour |
|
|
|
|
|
for i in range(num_blocks): |
|
|
block_start_hour = current_hour + (i * block_duration_hours) |
|
|
if block_start_hour >= self.op_hours.end_time.hour: |
|
|
break |
|
|
|
|
|
|
|
|
block_minutes = block_duration_hours * 60 |
|
|
trips = max(1, block_minutes // self.round_trip_time_minutes) |
|
|
|
|
|
|
|
|
if i % 2 == 0: |
|
|
origin = self.route.stations[0].name |
|
|
destination = self.route.stations[-1].name |
|
|
else: |
|
|
origin = self.route.stations[-1].name |
|
|
destination = self.route.stations[0].name |
|
|
|
|
|
block_km = int(trips * self.route.total_distance_km * 2) |
|
|
total_km += block_km |
|
|
|
|
|
block = ServiceBlock( |
|
|
block_id=f"BLK-{random.randint(1, 999):03d}", |
|
|
departure_time=f"{block_start_hour:02d}:{random.randint(0, 45):02d}", |
|
|
origin=origin, |
|
|
destination=destination, |
|
|
trip_count=trips, |
|
|
estimated_km=block_km |
|
|
) |
|
|
blocks.append(block) |
|
|
|
|
|
return blocks, total_km |
|
|
|
|
|
def _assign_train_status( |
|
|
self, |
|
|
train_id: str, |
|
|
rank: int, |
|
|
required_service: int, |
|
|
min_standby: int |
|
|
) -> Tuple[TrainStatus, Optional[str], List[ServiceBlock], int]: |
|
|
"""Assign status and duty to a train""" |
|
|
data = self.train_data[train_id] |
|
|
health = data["health"] |
|
|
|
|
|
|
|
|
if not health.is_fully_healthy and not health.available_hours: |
|
|
|
|
|
if data["job_cards"].open > 0 or len(data["job_cards"].blocking) > 0: |
|
|
return TrainStatus.MAINTENANCE, None, [], 0 |
|
|
else: |
|
|
return TrainStatus.MAINTENANCE, None, [], 0 |
|
|
|
|
|
|
|
|
if len(data["job_cards"].blocking) > 0: |
|
|
return TrainStatus.MAINTENANCE, None, [], 0 |
|
|
|
|
|
|
|
|
certs = data["fitness_certs"] |
|
|
if certs.rolling_stock.status == CertificateStatus.EXPIRED: |
|
|
return TrainStatus.MAINTENANCE, None, [], 0 |
|
|
|
|
|
|
|
|
if rank <= required_service: |
|
|
|
|
|
if self._is_train_available( |
|
|
train_id, |
|
|
self.op_hours.start_time.hour, |
|
|
self.op_hours.end_time.hour |
|
|
): |
|
|
duty = f"DUTY-{chr(65 + (rank-1) // 10)}{(rank-1) % 10 + 1}" |
|
|
blocks, km = self._generate_service_blocks(train_id, duty) |
|
|
return TrainStatus.REVENUE_SERVICE, duty, blocks, km |
|
|
|
|
|
|
|
|
if rank <= required_service + min_standby: |
|
|
return TrainStatus.STANDBY, None, [], 0 |
|
|
|
|
|
|
|
|
roll = random.random() |
|
|
if roll < 0.05: |
|
|
return TrainStatus.CLEANING, None, [], 0 |
|
|
elif roll < 0.15: |
|
|
return TrainStatus.STANDBY, None, [], 0 |
|
|
else: |
|
|
return TrainStatus.MAINTENANCE, None, [], 0 |
|
|
|
|
|
def optimize_schedule( |
|
|
self, |
|
|
min_service_trains: int = 20, |
|
|
min_standby: int = 2, |
|
|
max_daily_km: int = 300 |
|
|
) -> DaySchedule: |
|
|
"""Generate optimized daily schedule""" |
|
|
start_time = datetime.now() |
|
|
|
|
|
|
|
|
rankings = self._rank_trains_for_service() |
|
|
|
|
|
|
|
|
trainsets = [] |
|
|
status_counts = { |
|
|
TrainStatus.REVENUE_SERVICE: 0, |
|
|
TrainStatus.STANDBY: 0, |
|
|
TrainStatus.MAINTENANCE: 0, |
|
|
TrainStatus.CLEANING: 0 |
|
|
} |
|
|
total_km = 0 |
|
|
readiness_scores = [] |
|
|
|
|
|
for rank, (train_id, score) in enumerate(rankings, 1): |
|
|
data = self.train_data[train_id] |
|
|
|
|
|
|
|
|
status, duty, blocks, daily_km = self._assign_train_status( |
|
|
train_id, rank, min_service_trains, min_standby |
|
|
) |
|
|
|
|
|
status_counts[status] += 1 |
|
|
total_km += daily_km |
|
|
readiness_scores.append(data["readiness_score"]) |
|
|
|
|
|
|
|
|
trainset = Trainset( |
|
|
trainset_id=train_id, |
|
|
status=status, |
|
|
priority_rank=rank if status == TrainStatus.REVENUE_SERVICE else None, |
|
|
assigned_duty=duty, |
|
|
service_blocks=blocks, |
|
|
daily_km_allocation=daily_km, |
|
|
cumulative_km=data["cumulative_km"], |
|
|
stabling_bay=data["stabling_bay"] if status != TrainStatus.MAINTENANCE else None, |
|
|
fitness_certificates=data["fitness_certs"], |
|
|
job_cards=data["job_cards"], |
|
|
branding=data["branding"], |
|
|
readiness_score=data["readiness_score"], |
|
|
constraints_met=data["readiness_score"] >= 0.7 |
|
|
) |
|
|
|
|
|
|
|
|
if status == TrainStatus.MAINTENANCE: |
|
|
trainset.maintenance_type = MaintenanceType.SCHEDULED_INSPECTION |
|
|
trainset.ibl_bay = random.choice(self.generator.IBL_BAYS) |
|
|
completion_time = datetime.now() + timedelta(hours=random.randint(4, 12)) |
|
|
trainset.estimated_completion = completion_time.isoformat() |
|
|
elif status == TrainStatus.CLEANING: |
|
|
trainset.cleaning_bay = random.choice(self.generator.WASH_BAYS) |
|
|
trainset.cleaning_type = random.choice(["DEEP_INTERIOR", "EXTERIOR", "FULL"]) |
|
|
completion_time = datetime.now() + timedelta(hours=random.randint(2, 4)) |
|
|
trainset.estimated_completion = completion_time.isoformat() |
|
|
trainset.scheduled_service_start = f"{random.randint(12, 18):02d}:30" |
|
|
elif status == TrainStatus.STANDBY: |
|
|
trainset.standby_reason = random.choice([ |
|
|
"MILEAGE_BALANCING", "EMERGENCY_BACKUP", "PEAK_HOUR_RESERVE" |
|
|
]) |
|
|
|
|
|
|
|
|
alerts = [] |
|
|
if data["fitness_certs"].telecom.status == CertificateStatus.EXPIRING_SOON: |
|
|
alerts.append("TELECOM_CERT_EXPIRES_SOON") |
|
|
if len(data["job_cards"].blocking) > 0: |
|
|
alerts.append(f"{len(data['job_cards'].blocking)}_BLOCKING_JOB_CARDS") |
|
|
trainset.alerts = alerts |
|
|
|
|
|
trainsets.append(trainset) |
|
|
|
|
|
|
|
|
fleet_summary = FleetSummary( |
|
|
total_trainsets=self.num_trains, |
|
|
revenue_service=status_counts[TrainStatus.REVENUE_SERVICE], |
|
|
standby=status_counts[TrainStatus.STANDBY], |
|
|
maintenance=status_counts[TrainStatus.MAINTENANCE], |
|
|
cleaning=status_counts[TrainStatus.CLEANING], |
|
|
availability_percent=round( |
|
|
(status_counts[TrainStatus.REVENUE_SERVICE] + status_counts[TrainStatus.STANDBY]) |
|
|
/ self.num_trains * 100, 1 |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
mileages = [data["cumulative_km"] for data in self.train_data.values()] |
|
|
variance = (max(mileages) - min(mileages)) / (sum(mileages) / len(mileages)) |
|
|
|
|
|
optimization_metrics = OptimizationMetrics( |
|
|
mileage_variance_coefficient=round(variance, 3), |
|
|
avg_readiness_score=round(sum(readiness_scores) / len(readiness_scores), 2), |
|
|
branding_sla_compliance=1.0, |
|
|
shunting_movements_required=random.randint(5, 15), |
|
|
total_planned_km=total_km, |
|
|
fitness_expiry_violations=0 |
|
|
) |
|
|
|
|
|
|
|
|
conflicts = [] |
|
|
for trainset in trainsets: |
|
|
data = self.train_data[trainset.trainset_id] |
|
|
|
|
|
if data["fitness_certs"].telecom.status == CertificateStatus.EXPIRING_SOON: |
|
|
conflicts.append(Alert( |
|
|
trainset_id=trainset.trainset_id, |
|
|
severity=Severity.MEDIUM, |
|
|
type="CERTIFICATE_EXPIRING", |
|
|
message="Telecom certificate expires soon" |
|
|
)) |
|
|
|
|
|
if len(data["job_cards"].blocking) > 0: |
|
|
conflicts.append(Alert( |
|
|
trainset_id=trainset.trainset_id, |
|
|
severity=Severity.HIGH, |
|
|
type="BLOCKING_MAINTENANCE", |
|
|
message=f"{len(data['job_cards'].blocking)} open job cards preventing service" |
|
|
)) |
|
|
|
|
|
|
|
|
end_time = datetime.now() |
|
|
runtime_ms = int((end_time - start_time).total_seconds() * 1000) |
|
|
|
|
|
rationale = DecisionRationale( |
|
|
algorithm_version="v2.5.0", |
|
|
objective_weights={ |
|
|
"service_readiness": 0.35, |
|
|
"mileage_balancing": 0.25, |
|
|
"branding_priority": 0.20, |
|
|
"operational_cost": 0.20 |
|
|
}, |
|
|
constraint_violations=0, |
|
|
optimization_runtime_ms=runtime_ms |
|
|
) |
|
|
|
|
|
|
|
|
schedule_id = f"KMRL-{self.date}-{random.choice(['DAWN', 'ALPHA', 'PRIME'])}" |
|
|
now = datetime.now() |
|
|
|
|
|
schedule = DaySchedule( |
|
|
schedule_id=schedule_id, |
|
|
generated_at=now.isoformat(), |
|
|
valid_from=f"{self.date}T{self.op_hours.start_time.isoformat()}+05:30", |
|
|
valid_until=f"{self.date}T{self.op_hours.end_time.isoformat()}+05:30", |
|
|
depot=self.depot_name, |
|
|
trainsets=trainsets, |
|
|
fleet_summary=fleet_summary, |
|
|
optimization_metrics=optimization_metrics, |
|
|
conflicts_and_alerts=conflicts, |
|
|
decision_rationale=rationale |
|
|
) |
|
|
|
|
|
return schedule |
|
|
|