""" Metro Train Schedule Optimizer Generates optimal daily schedules from 5:00 AM to 11:00 PM Considers train health, maintenance, branding, and mileage balancing """ import sys import os sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import random from datetime import datetime, time, timedelta from typing import List, Dict, Tuple, Optional from DataService.metro_models import ( DaySchedule, Trainset, TrainStatus, ServiceBlock, FleetSummary, OptimizationMetrics, Alert, Severity, DecisionRationale, TrainHealthStatus, Route, OperationalHours, FitnessCertificates, JobCards, Branding, CertificateStatus, MaintenanceType ) from DataService.metro_data_generator import MetroDataGenerator class MetroScheduleOptimizer: """Optimize daily metro train schedules""" def __init__( self, date: str, num_trains: int, route: Route, train_health: List[TrainHealthStatus], depot_name: str = "Muttom_Depot", include_job_cards: bool = False ): self.date = date self.num_trains = num_trains self.route = route self.train_health = {t.trainset_id: t for t in train_health} self.depot_name = depot_name self.generator = MetroDataGenerator(num_trains) self.include_job_cards = include_job_cards # Operating parameters self.op_hours = OperationalHours() self.one_way_time_minutes = int( (route.total_distance_km / route.avg_speed_kmh) * 60 ) self.round_trip_time_minutes = ( self.one_way_time_minutes * 2 + route.turnaround_time_minutes * 2 ) # Pre-generate train data self.train_data = self._initialize_train_data() def _initialize_train_data(self) -> Dict[str, Dict]: """Initialize all train-specific data""" data = {} mileages = self.generator.get_realistic_mileage_distribution(self.num_trains) for i, train_id in enumerate(self.generator.trainset_ids): health = self.train_health[train_id] fitness_certs = self.generator.generate_fitness_certificates(train_id) job_cards = self.generator.generate_job_cards(train_id) if self.include_job_cards else JobCards(open=0, blocking=[]) branding = self.generator.generate_branding() readiness = self.generator.calculate_readiness_score( fitness_certs, job_cards, health.component_health ) data[train_id] = { "health": health, "fitness_certs": fitness_certs, "job_cards": job_cards, "branding": branding, "readiness_score": readiness, "cumulative_km": mileages[i], "stabling_bay": random.choice(self.generator.DEPOT_BAYS) } return data def _calculate_service_hours(self) -> int: """Calculate total service hours in a day""" start = datetime.combine(datetime.today(), self.op_hours.start_time) end = datetime.combine(datetime.today(), self.op_hours.end_time) return int((end - start).total_seconds() / 3600) def _is_train_available( self, train_id: str, start_hour: int, end_hour: int ) -> bool: """Check if train is available for given time window""" health = self.train_data[train_id]["health"] if health.is_fully_healthy: return True if not health.available_hours: return False # Check if requested window overlaps with available hours for avail_start, avail_end in health.available_hours: req_start = time(start_hour, 0) req_end = time(end_hour, 0) if req_start >= avail_start and req_end <= avail_end: return True return False def _rank_trains_for_service(self) -> List[Tuple[str, float]]: """Rank trains by suitability for revenue service""" rankings = [] for train_id, data in self.train_data.items(): score = 0.0 # Base readiness score (40% weight) score += data["readiness_score"] * 0.4 # Certificate validity (20% weight) certs = data["fitness_certs"] if certs.rolling_stock.status == CertificateStatus.VALID: score += 0.15 if certs.signalling.status == CertificateStatus.VALID: score += 0.05 # No blocking job cards (15% weight) if len(data["job_cards"].blocking) == 0: score += 0.15 # Branding priority (15% weight) branding = data["branding"] if branding.exposure_priority == "CRITICAL": score += 0.15 elif branding.exposure_priority == "HIGH": score += 0.10 elif branding.exposure_priority == "MEDIUM": score += 0.05 # Mileage balancing (10% weight) - prefer lower mileage max_mileage = 200000 mileage_factor = 1.0 - (data["cumulative_km"] / max_mileage) score += mileage_factor * 0.10 rankings.append((train_id, score)) return sorted(rankings, key=lambda x: x[1], reverse=True) def _generate_service_blocks( self, train_id: str, duty_name: str, num_blocks: int = 2 ) -> Tuple[List[ServiceBlock], int]: """Generate service blocks for a train""" blocks = [] total_km = 0 # Distribute service across the day service_hours = self._calculate_service_hours() block_duration_hours = service_hours // num_blocks current_hour = self.op_hours.start_time.hour for i in range(num_blocks): block_start_hour = current_hour + (i * block_duration_hours) if block_start_hour >= self.op_hours.end_time.hour: break # Calculate trips for this block block_minutes = block_duration_hours * 60 trips = max(1, block_minutes // self.round_trip_time_minutes) # Alternate origin/destination if i % 2 == 0: origin = self.route.stations[0].name destination = self.route.stations[-1].name else: origin = self.route.stations[-1].name destination = self.route.stations[0].name block_km = int(trips * self.route.total_distance_km * 2) # Round trips total_km += block_km block = ServiceBlock( block_id=f"BLK-{random.randint(1, 999):03d}", departure_time=f"{block_start_hour:02d}:{random.randint(0, 45):02d}", origin=origin, destination=destination, trip_count=trips, estimated_km=block_km ) blocks.append(block) return blocks, total_km def _assign_train_status( self, train_id: str, rank: int, required_service: int, min_standby: int ) -> Tuple[TrainStatus, Optional[str], List[ServiceBlock], int]: """Assign status and duty to a train""" data = self.train_data[train_id] health = data["health"] # Check if train is unavailable if not health.is_fully_healthy and not health.available_hours: # Determine maintenance or out of service if data["job_cards"].open > 0 or len(data["job_cards"].blocking) > 0: return TrainStatus.MAINTENANCE, None, [], 0 else: return TrainStatus.MAINTENANCE, None, [], 0 # Check for blocking maintenance if len(data["job_cards"].blocking) > 0: return TrainStatus.MAINTENANCE, None, [], 0 # Check for expired certificates certs = data["fitness_certs"] if certs.rolling_stock.status == CertificateStatus.EXPIRED: return TrainStatus.MAINTENANCE, None, [], 0 # Assign to revenue service if rank <= required_service: # Check availability for full day if self._is_train_available( train_id, self.op_hours.start_time.hour, self.op_hours.end_time.hour ): duty = f"DUTY-{chr(65 + (rank-1) // 10)}{(rank-1) % 10 + 1}" blocks, km = self._generate_service_blocks(train_id, duty) return TrainStatus.REVENUE_SERVICE, duty, blocks, km # Assign to standby if rank <= required_service + min_standby: return TrainStatus.STANDBY, None, [], 0 # Random assignment of remaining trains roll = random.random() if roll < 0.05: return TrainStatus.CLEANING, None, [], 0 elif roll < 0.15: return TrainStatus.STANDBY, None, [], 0 else: return TrainStatus.MAINTENANCE, None, [], 0 def optimize_schedule( self, min_service_trains: int = 20, min_standby: int = 2, max_daily_km: int = 300 ) -> DaySchedule: """Generate optimized daily schedule""" start_time = datetime.now() # Rank trains rankings = self._rank_trains_for_service() # Build trainset list trainsets = [] status_counts = { TrainStatus.REVENUE_SERVICE: 0, TrainStatus.STANDBY: 0, TrainStatus.MAINTENANCE: 0, TrainStatus.CLEANING: 0 } total_km = 0 readiness_scores = [] for rank, (train_id, score) in enumerate(rankings, 1): data = self.train_data[train_id] # Assign status and blocks status, duty, blocks, daily_km = self._assign_train_status( train_id, rank, min_service_trains, min_standby ) status_counts[status] += 1 total_km += daily_km readiness_scores.append(data["readiness_score"]) # Build trainset object trainset = Trainset( trainset_id=train_id, status=status, priority_rank=rank if status == TrainStatus.REVENUE_SERVICE else None, assigned_duty=duty, service_blocks=blocks, daily_km_allocation=daily_km, cumulative_km=data["cumulative_km"], stabling_bay=data["stabling_bay"] if status != TrainStatus.MAINTENANCE else None, fitness_certificates=data["fitness_certs"], job_cards=data["job_cards"], branding=data["branding"], readiness_score=data["readiness_score"], constraints_met=data["readiness_score"] >= 0.7 ) # Add status-specific fields if status == TrainStatus.MAINTENANCE: trainset.maintenance_type = MaintenanceType.SCHEDULED_INSPECTION trainset.ibl_bay = random.choice(self.generator.IBL_BAYS) completion_time = datetime.now() + timedelta(hours=random.randint(4, 12)) trainset.estimated_completion = completion_time.isoformat() elif status == TrainStatus.CLEANING: trainset.cleaning_bay = random.choice(self.generator.WASH_BAYS) trainset.cleaning_type = random.choice(["DEEP_INTERIOR", "EXTERIOR", "FULL"]) completion_time = datetime.now() + timedelta(hours=random.randint(2, 4)) trainset.estimated_completion = completion_time.isoformat() trainset.scheduled_service_start = f"{random.randint(12, 18):02d}:30" elif status == TrainStatus.STANDBY: trainset.standby_reason = random.choice([ "MILEAGE_BALANCING", "EMERGENCY_BACKUP", "PEAK_HOUR_RESERVE" ]) # Generate alerts alerts = [] if data["fitness_certs"].telecom.status == CertificateStatus.EXPIRING_SOON: alerts.append("TELECOM_CERT_EXPIRES_SOON") if len(data["job_cards"].blocking) > 0: alerts.append(f"{len(data['job_cards'].blocking)}_BLOCKING_JOB_CARDS") trainset.alerts = alerts trainsets.append(trainset) # Build fleet summary fleet_summary = FleetSummary( total_trainsets=self.num_trains, revenue_service=status_counts[TrainStatus.REVENUE_SERVICE], standby=status_counts[TrainStatus.STANDBY], maintenance=status_counts[TrainStatus.MAINTENANCE], cleaning=status_counts[TrainStatus.CLEANING], availability_percent=round( (status_counts[TrainStatus.REVENUE_SERVICE] + status_counts[TrainStatus.STANDBY]) / self.num_trains * 100, 1 ) ) # Calculate optimization metrics mileages = [data["cumulative_km"] for data in self.train_data.values()] variance = (max(mileages) - min(mileages)) / (sum(mileages) / len(mileages)) optimization_metrics = OptimizationMetrics( mileage_variance_coefficient=round(variance, 3), avg_readiness_score=round(sum(readiness_scores) / len(readiness_scores), 2), branding_sla_compliance=1.0, # Placeholder shunting_movements_required=random.randint(5, 15), total_planned_km=total_km, fitness_expiry_violations=0 ) # Generate alerts conflicts = [] for trainset in trainsets: data = self.train_data[trainset.trainset_id] if data["fitness_certs"].telecom.status == CertificateStatus.EXPIRING_SOON: conflicts.append(Alert( trainset_id=trainset.trainset_id, severity=Severity.MEDIUM, type="CERTIFICATE_EXPIRING", message="Telecom certificate expires soon" )) if len(data["job_cards"].blocking) > 0: conflicts.append(Alert( trainset_id=trainset.trainset_id, severity=Severity.HIGH, type="BLOCKING_MAINTENANCE", message=f"{len(data['job_cards'].blocking)} open job cards preventing service" )) # Decision rationale end_time = datetime.now() runtime_ms = int((end_time - start_time).total_seconds() * 1000) rationale = DecisionRationale( algorithm_version="v2.5.0", objective_weights={ "service_readiness": 0.35, "mileage_balancing": 0.25, "branding_priority": 0.20, "operational_cost": 0.20 }, constraint_violations=0, optimization_runtime_ms=runtime_ms ) # Build complete schedule schedule_id = f"KMRL-{self.date}-{random.choice(['DAWN', 'ALPHA', 'PRIME'])}" now = datetime.now() schedule = DaySchedule( schedule_id=schedule_id, generated_at=now.isoformat(), valid_from=f"{self.date}T{self.op_hours.start_time.isoformat()}+05:30", valid_until=f"{self.date}T{self.op_hours.end_time.isoformat()}+05:30", depot=self.depot_name, trainsets=trainsets, fleet_summary=fleet_summary, optimization_metrics=optimization_metrics, conflicts_and_alerts=conflicts, decision_rationale=rationale ) return schedule