Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| from dataclasses import asdict | |
| from typing import Any, Dict, Iterable, List, Optional | |
| from runway_zero.models import Aircraft, Crew, Flight, RewardBreakdown, StepResult | |
| from runway_zero.openenv_adapter import OpenEnvEnvironment | |
| from runway_zero.scenarios import ROUTE_MINUTES, build_scenario | |
| class RunwayZeroEnv(OpenEnvEnvironment): | |
| """Deterministic airport recovery environment. | |
| The API follows the OpenEnv/Gym mental model: | |
| - reset(stage, seed) -> observation | |
| - step(actions) -> observation, reward, done, info | |
| - state -> full serializable state | |
| """ | |
| def __init__(self, stage: int = 1, seed: int = 7, step_minutes: int = 15): | |
| super().__init__() | |
| self.stage = stage | |
| self.seed = seed | |
| self.step_minutes = step_minutes | |
| self.reset(stage=stage, seed=seed) | |
| def reset(self, stage: Optional[int] = None, seed: Optional[int] = None) -> Dict[str, Any]: | |
| if stage is not None: | |
| self.stage = stage | |
| if seed is not None: | |
| self.seed = seed | |
| scenario = build_scenario(self.stage, self.seed) | |
| self.time = scenario["start_time"] | |
| self.end_time = scenario["end_time"] | |
| self.airports = scenario["airports"] | |
| self.airlines = scenario["airlines"] | |
| self.aircraft = scenario["aircraft"] | |
| self.crew = scenario["crew"] | |
| self.flights = scenario["flights"] | |
| self.disruptions = scenario["disruptions"] | |
| self.history: List[Dict[str, Any]] = [] | |
| self.last_reward = RewardBreakdown() | |
| self.done = False | |
| return self.observation() | |
| def step(self, actions: Optional[Iterable[Dict[str, Any]]] = None) -> StepResult: | |
| if self.done: | |
| return StepResult(self.observation(), 0.0, True, {"message": "episode already done"}) | |
| if hasattr(actions, "actions"): | |
| actions = getattr(actions, "actions") | |
| if isinstance(actions, dict): | |
| actions = [actions] | |
| actions = list(actions or []) | |
| reward = RewardBreakdown() | |
| active_disruptions = self._activate_disruptions(reward) | |
| self._departure_usage: Dict[str, int] = {code: 0 for code in self.airports} | |
| for action in actions: | |
| reward.add(self._apply_action(action)) | |
| reward.add(self._process_arrivals()) | |
| reward.add(self._process_operational_pressure()) | |
| self.time += self.step_minutes | |
| self.done = self.time >= self.end_time or self._all_flights_terminal() | |
| if self.done: | |
| reward.add(self._terminal_reward()) | |
| self.last_reward = reward | |
| frame = { | |
| "time": self.time, | |
| "actions": actions, | |
| "agent_messages": self._agent_messages(actions, active_disruptions), | |
| "active_disruptions": [asdict(item) for item in active_disruptions], | |
| "reward": reward.to_dict(), | |
| "metrics": self.metrics(), | |
| "observation": self.observation(), | |
| } | |
| self.history.append(frame) | |
| return StepResult( | |
| observation=self.observation(), | |
| reward=reward.total, | |
| done=self.done, | |
| info={ | |
| "reward_breakdown": reward.to_dict(), | |
| "metrics": self.metrics(), | |
| "active_disruptions": [asdict(item) for item in active_disruptions], | |
| }, | |
| ) | |
| def state(self) -> Dict[str, Any]: | |
| return { | |
| "stage": self.stage, | |
| "seed": self.seed, | |
| "time": self.time, | |
| "end_time": self.end_time, | |
| "airports": {code: asdict(airport) for code, airport in self.airports.items()}, | |
| "airlines": {code: asdict(airline) for code, airline in self.airlines.items()}, | |
| "aircraft": {code: asdict(aircraft) for code, aircraft in self.aircraft.items()}, | |
| "crew": {code: asdict(crew) for code, crew in self.crew.items()}, | |
| "flights": {code: asdict(flight) for code, flight in self.flights.items()}, | |
| "disruptions": [asdict(disruption) for disruption in self.disruptions], | |
| "last_reward": self.last_reward.to_dict(), | |
| "metrics": self.metrics(), | |
| } | |
| def observation(self) -> Dict[str, Any]: | |
| pending = self.pending_decisions() | |
| return { | |
| "stage": self.stage, | |
| "time": self.time, | |
| "clock": minutes_to_clock(self.time), | |
| "challenge_brief": self._challenge_brief(), | |
| "stakeholders": self._stakeholders(), | |
| "airports": [asdict(airport) for airport in self.airports.values()], | |
| "active_disruptions": [ | |
| asdict(item) for item in self.disruptions if item.active and not item.resolved | |
| ], | |
| "pending_decisions": pending, | |
| "network_summary": self.metrics(), | |
| "action_schema": { | |
| "depart": {"flight_id": "string"}, | |
| "hold": {"flight_id": "string", "minutes": "int", "reason": "string"}, | |
| "cancel": {"flight_id": "string", "reason": "string"}, | |
| "swap_aircraft": {"flight_id": "string", "aircraft_id": "string"}, | |
| "protect_connection": {"flight_id": "string"}, | |
| "reroute": { | |
| "flight_id": "string", | |
| "destination": "airport_code", | |
| "reason": "string", | |
| }, | |
| "request_maintenance": {"flight_id": "string"}, | |
| "allocate_compensation": {"flight_id": "string", "amount": "int"}, | |
| "negotiate_slot": { | |
| "flight_id": "string", | |
| "bid": "int", | |
| "promise": "delay|passengers|profit", | |
| }, | |
| }, | |
| } | |
| def _challenge_brief(self) -> Dict[str, Any]: | |
| if self.stage == 1: | |
| return { | |
| "title": "Operations Recovery", | |
| "goal": "Clear a compact DEL-BOM-BLR-HYD network while fog, runway loss, and aircraft faults cascade.", | |
| "win_condition": "Maximize safe departures and arrivals with minimal delay and cancellation.", | |
| } | |
| if self.stage == 2: | |
| return { | |
| "title": "Passenger-Aware Recovery", | |
| "goal": "Protect connections, emergencies, and stranded passenger groups across the national network.", | |
| "win_condition": "Recover throughput without sacrificing satisfaction or emergency priority.", | |
| } | |
| if self.stage == 4: | |
| return { | |
| "title": "IndiGo Crisis Replay", | |
| "goal": "Recover a December 2025-style crew availability and mass-cancellation crisis across the Indian network.", | |
| "win_condition": "Reduce cancellations, rebuild rotations, preserve legal crew constraints, and keep passenger harm visible.", | |
| } | |
| return { | |
| "title": "Economic Multi-Agent Control", | |
| "goal": "Balance tower neutrality against airline agents competing for slots, cash, reputation, and passengers.", | |
| "win_condition": "Keep the network efficient while preventing any airline from gaming scarce runway capacity.", | |
| } | |
| def _stakeholders(self) -> List[Dict[str, str]]: | |
| items = [ | |
| { | |
| "name": "Tower Central", | |
| "objective": "Safety, throughput, fairness, and emergency priority.", | |
| }, | |
| { | |
| "name": "Airport Ops", | |
| "objective": "Keep gates, stands, fuel, crews, and runways from deadlocking.", | |
| }, | |
| ] | |
| for airline in self.airlines.values(): | |
| if self.stage == 1: | |
| objective = "Keep aircraft and crew rotations legal while minimizing operational delay." | |
| elif self.stage == 2: | |
| objective = "Protect passenger connections, satisfaction, and emergency priority." | |
| else: | |
| objective = "Protect cash, reputation, slot access, and passenger satisfaction." | |
| items.append( | |
| { | |
| "name": airline.name, | |
| "objective": objective, | |
| } | |
| ) | |
| return items | |
| def pending_decisions(self) -> List[Dict[str, Any]]: | |
| decisions = [] | |
| for flight in self.flights.values(): | |
| if flight.status != "scheduled": | |
| continue | |
| if flight.scheduled_dep <= self.time and self.time >= flight.hold_until: | |
| aircraft = self.aircraft[flight.aircraft_id] | |
| crew = self.crew[flight.crew_id] | |
| decisions.append( | |
| { | |
| "flight_id": flight.flight_id, | |
| "airline": flight.airline, | |
| "origin": flight.origin, | |
| "destination": flight.destination, | |
| "scheduled_dep": flight.scheduled_dep, | |
| "scheduled_arr": flight.scheduled_arr, | |
| "delay_minutes": flight.dep_delay(self.time), | |
| "passengers": flight.passengers, | |
| "priority": flight.priority, | |
| "aircraft_ready": aircraft.is_available(self.time, flight.origin), | |
| "crew_ready": crew.is_available(self.time, flight.origin), | |
| "weather_delay": self.airports[flight.origin].weather_delay, | |
| } | |
| ) | |
| return sorted(decisions, key=lambda item: (-int(item["priority"] == "emergency"), item["delay_minutes"])) | |
| def metrics(self) -> Dict[str, Any]: | |
| flights = list(self.flights.values()) | |
| arrived = [flight for flight in flights if flight.status == "arrived"] | |
| cancelled = [flight for flight in flights if flight.status == "cancelled"] | |
| active = [flight for flight in flights if flight.status in {"scheduled", "airborne"}] | |
| total_dep_delay = sum(flight.dep_delay(self.time) for flight in flights) | |
| total_arr_delay = sum(flight.arr_delay() for flight in arrived) | |
| stranded = sum( | |
| group.count | |
| for flight in flights | |
| for group in flight.passenger_groups | |
| if group.stranded or flight.status == "cancelled" | |
| ) | |
| satisfaction_values = [ | |
| group.satisfaction for flight in flights for group in flight.passenger_groups | |
| ] | |
| airline_cash = {code: round(airline.cash, 2) for code, airline in self.airlines.items()} | |
| return { | |
| "flights_total": len(flights), | |
| "flights_arrived": len(arrived), | |
| "flights_cancelled": len(cancelled), | |
| "flights_active": len(active), | |
| "total_dep_delay": total_dep_delay, | |
| "total_arr_delay": total_arr_delay, | |
| "stranded_passengers": stranded, | |
| "avg_satisfaction": round( | |
| sum(satisfaction_values) / max(1, len(satisfaction_values)), 2 | |
| ), | |
| "airline_cash": airline_cash, | |
| } | |
| def _activate_disruptions(self, reward: RewardBreakdown) -> List[Any]: | |
| active_now = [] | |
| for disruption in self.disruptions: | |
| if disruption.resolved: | |
| continue | |
| if disruption.time <= self.time < disruption.time + disruption.duration: | |
| if not disruption.active: | |
| disruption.active = True | |
| self._apply_disruption_start(disruption) | |
| reward.action_validity_score += 1.0 | |
| active_now.append(disruption) | |
| elif disruption.active and self.time >= disruption.time + disruption.duration: | |
| disruption.active = False | |
| disruption.resolved = True | |
| self._apply_disruption_end(disruption) | |
| return active_now | |
| def _apply_disruption_start(self, disruption: Any) -> None: | |
| if disruption.kind == "weather" and disruption.target in self.airports: | |
| self.airports[disruption.target].weather_delay += 15 * disruption.severity | |
| elif disruption.kind == "runway_closure" and disruption.target in self.airports: | |
| self.airports[disruption.target].runway_closed_until = disruption.time + disruption.duration | |
| elif disruption.kind == "gate_block" and disruption.target in self.airports: | |
| self.airports[disruption.target].gate_blocked_until = disruption.time + disruption.duration | |
| elif disruption.kind == "aircraft_fault" and disruption.target in self.aircraft: | |
| self.aircraft[disruption.target].broken_until = disruption.time + disruption.duration | |
| elif disruption.kind == "crew_timeout" and disruption.target in self.crew: | |
| self.crew[disruption.target].duty_end = min( | |
| self.crew[disruption.target].duty_end, disruption.time | |
| ) | |
| elif disruption.kind == "emergency" and disruption.target in self.airports: | |
| self.airports[disruption.target].emergency_priority = True | |
| for flight in self.flights.values(): | |
| if flight.destination == disruption.target and flight.status == "airborne": | |
| flight.priority = "emergency" | |
| def _apply_disruption_end(self, disruption: Any) -> None: | |
| if disruption.kind == "weather" and disruption.target in self.airports: | |
| self.airports[disruption.target].weather_delay = max( | |
| 0, self.airports[disruption.target].weather_delay - 15 * disruption.severity | |
| ) | |
| elif disruption.kind == "emergency" and disruption.target in self.airports: | |
| self.airports[disruption.target].emergency_priority = False | |
| def _apply_action(self, action: Dict[str, Any]) -> RewardBreakdown: | |
| reward = RewardBreakdown() | |
| kind = action.get("action") | |
| flight_id = action.get("flight_id") | |
| flight = self.flights.get(flight_id) if flight_id else None | |
| allowed = { | |
| "depart", | |
| "hold", | |
| "cancel", | |
| "swap_aircraft", | |
| "protect_connection", | |
| "reroute", | |
| "request_maintenance", | |
| "allocate_compensation", | |
| "negotiate_slot", | |
| } | |
| if kind not in allowed: | |
| reward.action_validity_score -= 8 | |
| return reward | |
| if flight is None: | |
| reward.action_validity_score -= 6 | |
| return reward | |
| if kind == "depart": | |
| return self._depart(flight) | |
| if kind == "hold": | |
| minutes = int(action.get("minutes", self.step_minutes)) | |
| flight.hold_until = max(flight.hold_until, self.time + max(self.step_minutes, minutes)) | |
| flight.delay_reason = action.get("reason", "held by controller") | |
| reward.delay_score -= 0.04 * flight.passengers * (minutes / self.step_minutes) | |
| reward.action_validity_score += 0.2 | |
| return reward | |
| if kind == "cancel": | |
| return self._cancel(flight, action.get("reason", "cancelled by controller")) | |
| if kind == "swap_aircraft": | |
| new_aircraft_id = action.get("aircraft_id") | |
| return self._swap_aircraft(flight, new_aircraft_id) | |
| if kind == "protect_connection": | |
| return self._protect_connection(flight) | |
| if kind == "reroute": | |
| return self._reroute(flight, str(action.get("destination", "")), action.get("reason", "network recovery")) | |
| if kind == "request_maintenance": | |
| return self._request_maintenance(flight) | |
| if kind == "allocate_compensation": | |
| return self._allocate_compensation(flight, float(action.get("amount", 0))) | |
| if kind == "negotiate_slot": | |
| return self._negotiate_slot(flight, float(action.get("bid", 0)), str(action.get("promise", ""))) | |
| return reward | |
| def _depart(self, flight: Flight) -> RewardBreakdown: | |
| reward = RewardBreakdown() | |
| if flight.status != "scheduled": | |
| reward.action_validity_score -= 5 | |
| return reward | |
| origin = self.airports[flight.origin] | |
| destination = self.airports[flight.destination] | |
| aircraft = self.aircraft[flight.aircraft_id] | |
| crew = self.crew[flight.crew_id] | |
| departure_capacity = max(1, origin.available_runways(self.time) * 2) | |
| if self._departure_usage.get(flight.origin, 0) >= departure_capacity: | |
| reward.safety_score -= 60 | |
| reward.action_validity_score -= 10 | |
| return reward | |
| if not aircraft.is_available(self.time, flight.origin): | |
| reward.safety_score -= 70 | |
| reward.action_validity_score -= 10 | |
| return reward | |
| if not crew.is_available(self.time, flight.origin): | |
| reward.safety_score -= 50 | |
| reward.action_validity_score -= 8 | |
| return reward | |
| delay = max(0, self.time - flight.scheduled_dep) | |
| weather_delay = origin.weather_delay + destination.weather_delay | |
| flight.status = "airborne" | |
| flight.actual_dep = self.time | |
| flight.eta = self.time + flight.duration + weather_delay | |
| self._departure_usage[flight.origin] = self._departure_usage.get(flight.origin, 0) + 1 | |
| aircraft.airport = "AIRBORNE" | |
| aircraft.available_at = flight.eta + 35 | |
| crew.airport = "AIRBORNE" | |
| crew.available_at = flight.eta + 45 | |
| reward.delay_score += max(2.0, 15.0 - delay * 0.05) | |
| reward.passenger_score -= delay * flight.passengers * 0.002 | |
| reward.money_score -= delay * self.airlines[flight.airline].operating_cost_per_minute * 0.02 | |
| reward.action_validity_score += 2 | |
| return reward | |
| def _cancel(self, flight: Flight, reason: str) -> RewardBreakdown: | |
| reward = RewardBreakdown() | |
| if flight.status in {"arrived", "cancelled"}: | |
| reward.action_validity_score -= 8 | |
| return reward | |
| flight.status = "cancelled" | |
| flight.cancellations_reason = reason | |
| for group in flight.passenger_groups: | |
| group.stranded = True | |
| group.satisfaction = max(0, group.satisfaction - 70) | |
| airline = self.airlines[flight.airline] | |
| compensation = flight.passengers * airline.compensation_per_stranded_passenger | |
| airline.cash -= compensation | |
| airline.reputation = max(0, airline.reputation - 4) | |
| reward.delay_score -= 30 | |
| reward.passenger_score -= flight.passengers * 0.9 | |
| reward.money_score -= compensation / 10_000 | |
| return reward | |
| def _swap_aircraft(self, flight: Flight, aircraft_id: Optional[str]) -> RewardBreakdown: | |
| reward = RewardBreakdown() | |
| aircraft = self.aircraft.get(aircraft_id or "") | |
| if aircraft is None or aircraft.airline != flight.airline: | |
| reward.action_validity_score -= 6 | |
| return reward | |
| if not aircraft.is_available(self.time, flight.origin): | |
| reward.action_validity_score -= 4 | |
| return reward | |
| old_aircraft = self.aircraft[flight.aircraft_id] | |
| flight.aircraft_id = aircraft.aircraft_id | |
| old_aircraft.available_at = max(old_aircraft.available_at, self.time + 30) | |
| reward.action_validity_score += 3 | |
| reward.money_score -= 3 | |
| return reward | |
| def _reroute(self, flight: Flight, destination: str, reason: str) -> RewardBreakdown: | |
| reward = RewardBreakdown() | |
| if flight.status != "scheduled" or destination not in self.airports or destination == flight.origin: | |
| reward.action_validity_score -= 6 | |
| return reward | |
| route = (flight.origin, destination) | |
| if route not in ROUTE_MINUTES: | |
| reward.action_validity_score -= 5 | |
| return reward | |
| old_destination = flight.destination | |
| flight.destination = destination | |
| flight.scheduled_arr = flight.scheduled_dep + ROUTE_MINUTES[route] | |
| flight.delay_reason = f"rerouted from {old_destination}: {reason}" | |
| reward.delay_score += 4 if self.airports[old_destination].weather_delay > 0 else -2 | |
| reward.passenger_score -= flight.passengers * 0.03 | |
| reward.money_score -= 8 | |
| reward.action_validity_score += 2 | |
| return reward | |
| def _request_maintenance(self, flight: Flight) -> RewardBreakdown: | |
| reward = RewardBreakdown() | |
| aircraft = self.aircraft[flight.aircraft_id] | |
| if self.time < aircraft.broken_until: | |
| repaired_at = max(self.time + 45, aircraft.broken_until - 45) | |
| aircraft.broken_until = repaired_at | |
| reward.safety_score += 8 | |
| reward.money_score -= 18 | |
| reward.action_validity_score += 2 | |
| else: | |
| reward.action_validity_score -= 2 | |
| reward.money_score -= 4 | |
| return reward | |
| def _allocate_compensation(self, flight: Flight, amount: float) -> RewardBreakdown: | |
| reward = RewardBreakdown() | |
| if amount <= 0: | |
| reward.action_validity_score -= 4 | |
| return reward | |
| airline = self.airlines[flight.airline] | |
| capped_amount = min(amount, 9000) | |
| airline.cash -= capped_amount * flight.passengers | |
| for group in flight.passenger_groups: | |
| group.satisfaction = min(100, group.satisfaction + min(18, capped_amount / 600)) | |
| reward.passenger_score += flight.passengers * min(0.18, capped_amount / 50_000) | |
| reward.money_score -= capped_amount * flight.passengers / 18_000 | |
| reward.action_validity_score += 1 | |
| return reward | |
| def _negotiate_slot(self, flight: Flight, bid: float, promise: str) -> RewardBreakdown: | |
| reward = RewardBreakdown() | |
| if self.stage < 3: | |
| reward.action_validity_score -= 2 | |
| return reward | |
| airline = self.airlines[flight.airline] | |
| bid = max(0, min(50_000, bid)) | |
| airline.cash -= bid | |
| if "passenger" in promise or "delay" in promise: | |
| reward.fairness_score += 1.5 | |
| reward.passenger_score += 1.0 | |
| else: | |
| reward.fairness_score -= 2.5 | |
| reward.money_score -= bid / 20_000 | |
| reward.action_validity_score += 1 | |
| return reward | |
| def _protect_connection(self, flight: Flight) -> RewardBreakdown: | |
| reward = RewardBreakdown() | |
| protected = 0 | |
| for group in flight.passenger_groups: | |
| if group.priority in {"connection", "priority"}: | |
| group.satisfaction = min(100, group.satisfaction + 12) | |
| protected += group.count | |
| reward.passenger_score += protected * 0.15 | |
| reward.money_score -= protected * 0.03 | |
| return reward | |
| def _process_arrivals(self) -> RewardBreakdown: | |
| reward = RewardBreakdown() | |
| gate_load = self._gate_load() | |
| for flight in self.flights.values(): | |
| if flight.status != "airborne" or flight.eta is None or flight.eta > self.time: | |
| continue | |
| destination = self.airports[flight.destination] | |
| gate_capacity = destination.available_gates(self.time) | |
| if gate_load.get(flight.destination, 0) >= gate_capacity: | |
| flight.eta += self.step_minutes | |
| reward.delay_score -= flight.passengers * 0.04 | |
| reward.passenger_score -= flight.passengers * 0.03 | |
| continue | |
| flight.status = "arrived" | |
| flight.actual_arr = self.time | |
| aircraft = self.aircraft[flight.aircraft_id] | |
| crew = self.crew[flight.crew_id] | |
| aircraft.airport = flight.destination | |
| crew.airport = flight.destination | |
| aircraft.available_at = self.time + 35 | |
| crew.available_at = self.time + 45 | |
| gate_load[flight.destination] = gate_load.get(flight.destination, 0) + 1 | |
| arrival_delay = flight.arr_delay() | |
| reward.delay_score += max(1.0, 12.0 - arrival_delay * 0.04) | |
| reward.passenger_score += max(0.0, 8.0 - arrival_delay * 0.02) | |
| return reward | |
| def _gate_load(self) -> Dict[str, int]: | |
| load = {code: 0 for code in self.airports} | |
| for flight in self.flights.values(): | |
| if flight.status == "arrived" and flight.actual_arr is not None: | |
| if self.time - flight.actual_arr <= 45: | |
| load[flight.destination] = load.get(flight.destination, 0) + 1 | |
| return load | |
| def _process_operational_pressure(self) -> RewardBreakdown: | |
| reward = RewardBreakdown() | |
| airline_delay = {code: 0 for code in self.airlines} | |
| for flight in self.flights.values(): | |
| if flight.status != "scheduled": | |
| continue | |
| delay = max(0, self.time - flight.scheduled_dep) | |
| if delay <= 0: | |
| continue | |
| airline_delay[flight.airline] += delay | |
| reward.delay_score -= min(12, delay * 0.03) | |
| for group in flight.passenger_groups: | |
| hit = 0.5 if group.priority == "standard" else 0.9 | |
| group.satisfaction = max(0, group.satisfaction - hit) | |
| if delay > 120 and group.priority == "connection": | |
| group.stranded = True | |
| reward.passenger_score -= group.count * 0.2 | |
| reward.passenger_score -= flight.passengers * 0.005 | |
| reward.money_score -= self.airlines[flight.airline].operating_cost_per_minute * 0.01 | |
| for disruption in self.disruptions: | |
| if not disruption.active: | |
| continue | |
| if disruption.kind == "demand_shock" and disruption.target == flight.origin: | |
| reward.passenger_score -= flight.passengers * 0.008 * disruption.severity | |
| elif disruption.kind == "fuel_delay" and disruption.target == flight.origin: | |
| reward.money_score -= 2.2 * disruption.severity | |
| elif disruption.kind == "slot_conflict" and disruption.target == flight.origin: | |
| reward.fairness_score -= max(0, delay - 30) * 0.006 * disruption.severity | |
| if self.stage >= 3 and airline_delay: | |
| values = list(airline_delay.values()) | |
| spread = max(values) - min(values) | |
| reward.fairness_score -= spread * 0.01 | |
| return reward | |
| def _terminal_reward(self) -> RewardBreakdown: | |
| reward = RewardBreakdown() | |
| metrics = self.metrics() | |
| reward.delay_score -= metrics["total_dep_delay"] * 0.01 | |
| reward.passenger_score -= metrics["stranded_passengers"] * 0.4 | |
| reward.passenger_score += metrics["avg_satisfaction"] * 0.5 | |
| reward.money_score += sum(metrics["airline_cash"].values()) / 200_000 | |
| reward.safety_score += 50 | |
| return reward | |
| def _all_flights_terminal(self) -> bool: | |
| return all(flight.status in {"arrived", "cancelled"} for flight in self.flights.values()) | |
| def find_available_aircraft(self, flight: Flight) -> Optional[Aircraft]: | |
| for aircraft in self.aircraft.values(): | |
| if aircraft.airline == flight.airline and aircraft.is_available(self.time, flight.origin): | |
| return aircraft | |
| return None | |
| def find_available_crew(self, flight: Flight) -> Optional[Crew]: | |
| for crew in self.crew.values(): | |
| if crew.airline == flight.airline and crew.is_available(self.time, flight.origin): | |
| return crew | |
| return None | |
| def _agent_messages( | |
| self, actions: List[Dict[str, Any]], active_disruptions: List[Any] | |
| ) -> List[Dict[str, str]]: | |
| messages: List[Dict[str, str]] = [] | |
| for disruption in active_disruptions[:3]: | |
| target = disruption.target | |
| messages.append( | |
| { | |
| "from": "Tower Central", | |
| "to": "Network Ops", | |
| "type": "alert", | |
| "message": f"{disruption.kind.replace('_', ' ').title()} active at {target}: {disruption.message}", | |
| } | |
| ) | |
| if self.stage >= 3 and disruption.kind == "slot_conflict": | |
| for airline in self.airlines.values(): | |
| messages.append( | |
| { | |
| "from": airline.name, | |
| "to": "Tower Central", | |
| "type": "slot_bid", | |
| "message": ( | |
| f"{airline.name} requests priority slots at {target}; " | |
| "we can protect connections but need a fast departure bank." | |
| ), | |
| } | |
| ) | |
| if disruption.kind in {"fuel_delay", "demand_shock"} and disruption.target in self.airports: | |
| messages.append( | |
| { | |
| "from": "Airport Ops", | |
| "to": "Airline Agents", | |
| "type": "constraint", | |
| "message": ( | |
| f"{self.airports[disruption.target].city} constraint is live. " | |
| "Choose between holding, rerouting, compensation, and slot negotiation." | |
| ), | |
| } | |
| ) | |
| for action in actions[:8]: | |
| flight_id = str(action.get("flight_id", "network")) | |
| flight = self.flights.get(flight_id) | |
| airline = self.airlines.get(flight.airline).name if flight else "Airline Ops" | |
| kind = str(action.get("action", "action")).replace("_", " ") | |
| if action.get("action") == "depart" and flight: | |
| message = f"Cleared {flight_id} from {flight.origin} to {flight.destination}." | |
| elif action.get("action") == "hold": | |
| message = f"Holding {flight_id} for {action.get('minutes', 15)} minutes: {action.get('reason', 'operational recovery')}." | |
| elif action.get("action") == "swap_aircraft": | |
| message = f"Swapping aircraft for {flight_id} to {action.get('aircraft_id')}." | |
| elif action.get("action") == "protect_connection": | |
| message = f"Protect connection groups on {flight_id}; passenger impact is being prioritized." | |
| elif action.get("action") == "cancel": | |
| message = f"Cancel {flight_id}; compensation and stranded passenger penalties will apply." | |
| else: | |
| message = f"{kind.title()} requested for {flight_id}." | |
| messages.append( | |
| { | |
| "from": airline, | |
| "to": "Tower Central", | |
| "type": str(action.get("action", "decision")), | |
| "message": message, | |
| } | |
| ) | |
| return messages | |
| def minutes_to_clock(minutes: int) -> str: | |
| hours = (minutes // 60) % 24 | |
| mins = minutes % 60 | |
| return f"{hours:02d}:{mins:02d}" | |