runway-zero / src /runway_zero /simulator.py
work-dwivediishivam's picture
Space Redeployed to preserve GPU; switched to FREE TIER; fixed GUI with Actual Replay
90bdd23
from __future__ import annotations
from dataclasses import asdict
from typing import Any, Dict, Iterable, List, Optional
from runway_zero.models import Aircraft, Crew, Flight, RewardBreakdown, StepResult
from runway_zero.openenv_adapter import OpenEnvEnvironment
from runway_zero.scenarios import ROUTE_MINUTES, build_scenario
class RunwayZeroEnv(OpenEnvEnvironment):
"""Deterministic airport recovery environment.
The API follows the OpenEnv/Gym mental model:
- reset(stage, seed) -> observation
- step(actions) -> observation, reward, done, info
- state -> full serializable state
"""
def __init__(self, stage: int = 1, seed: int = 7, step_minutes: int = 15):
super().__init__()
self.stage = stage
self.seed = seed
self.step_minutes = step_minutes
self.reset(stage=stage, seed=seed)
def reset(self, stage: Optional[int] = None, seed: Optional[int] = None) -> Dict[str, Any]:
if stage is not None:
self.stage = stage
if seed is not None:
self.seed = seed
scenario = build_scenario(self.stage, self.seed)
self.time = scenario["start_time"]
self.end_time = scenario["end_time"]
self.airports = scenario["airports"]
self.airlines = scenario["airlines"]
self.aircraft = scenario["aircraft"]
self.crew = scenario["crew"]
self.flights = scenario["flights"]
self.disruptions = scenario["disruptions"]
self.history: List[Dict[str, Any]] = []
self.last_reward = RewardBreakdown()
self.done = False
return self.observation()
def step(self, actions: Optional[Iterable[Dict[str, Any]]] = None) -> StepResult:
if self.done:
return StepResult(self.observation(), 0.0, True, {"message": "episode already done"})
if hasattr(actions, "actions"):
actions = getattr(actions, "actions")
if isinstance(actions, dict):
actions = [actions]
actions = list(actions or [])
reward = RewardBreakdown()
active_disruptions = self._activate_disruptions(reward)
self._departure_usage: Dict[str, int] = {code: 0 for code in self.airports}
for action in actions:
reward.add(self._apply_action(action))
reward.add(self._process_arrivals())
reward.add(self._process_operational_pressure())
self.time += self.step_minutes
self.done = self.time >= self.end_time or self._all_flights_terminal()
if self.done:
reward.add(self._terminal_reward())
self.last_reward = reward
frame = {
"time": self.time,
"actions": actions,
"agent_messages": self._agent_messages(actions, active_disruptions),
"active_disruptions": [asdict(item) for item in active_disruptions],
"reward": reward.to_dict(),
"metrics": self.metrics(),
"observation": self.observation(),
}
self.history.append(frame)
return StepResult(
observation=self.observation(),
reward=reward.total,
done=self.done,
info={
"reward_breakdown": reward.to_dict(),
"metrics": self.metrics(),
"active_disruptions": [asdict(item) for item in active_disruptions],
},
)
@property
def state(self) -> Dict[str, Any]:
return {
"stage": self.stage,
"seed": self.seed,
"time": self.time,
"end_time": self.end_time,
"airports": {code: asdict(airport) for code, airport in self.airports.items()},
"airlines": {code: asdict(airline) for code, airline in self.airlines.items()},
"aircraft": {code: asdict(aircraft) for code, aircraft in self.aircraft.items()},
"crew": {code: asdict(crew) for code, crew in self.crew.items()},
"flights": {code: asdict(flight) for code, flight in self.flights.items()},
"disruptions": [asdict(disruption) for disruption in self.disruptions],
"last_reward": self.last_reward.to_dict(),
"metrics": self.metrics(),
}
def observation(self) -> Dict[str, Any]:
pending = self.pending_decisions()
return {
"stage": self.stage,
"time": self.time,
"clock": minutes_to_clock(self.time),
"challenge_brief": self._challenge_brief(),
"stakeholders": self._stakeholders(),
"airports": [asdict(airport) for airport in self.airports.values()],
"active_disruptions": [
asdict(item) for item in self.disruptions if item.active and not item.resolved
],
"pending_decisions": pending,
"network_summary": self.metrics(),
"action_schema": {
"depart": {"flight_id": "string"},
"hold": {"flight_id": "string", "minutes": "int", "reason": "string"},
"cancel": {"flight_id": "string", "reason": "string"},
"swap_aircraft": {"flight_id": "string", "aircraft_id": "string"},
"protect_connection": {"flight_id": "string"},
"reroute": {
"flight_id": "string",
"destination": "airport_code",
"reason": "string",
},
"request_maintenance": {"flight_id": "string"},
"allocate_compensation": {"flight_id": "string", "amount": "int"},
"negotiate_slot": {
"flight_id": "string",
"bid": "int",
"promise": "delay|passengers|profit",
},
},
}
def _challenge_brief(self) -> Dict[str, Any]:
if self.stage == 1:
return {
"title": "Operations Recovery",
"goal": "Clear a compact DEL-BOM-BLR-HYD network while fog, runway loss, and aircraft faults cascade.",
"win_condition": "Maximize safe departures and arrivals with minimal delay and cancellation.",
}
if self.stage == 2:
return {
"title": "Passenger-Aware Recovery",
"goal": "Protect connections, emergencies, and stranded passenger groups across the national network.",
"win_condition": "Recover throughput without sacrificing satisfaction or emergency priority.",
}
if self.stage == 4:
return {
"title": "IndiGo Crisis Replay",
"goal": "Recover a December 2025-style crew availability and mass-cancellation crisis across the Indian network.",
"win_condition": "Reduce cancellations, rebuild rotations, preserve legal crew constraints, and keep passenger harm visible.",
}
return {
"title": "Economic Multi-Agent Control",
"goal": "Balance tower neutrality against airline agents competing for slots, cash, reputation, and passengers.",
"win_condition": "Keep the network efficient while preventing any airline from gaming scarce runway capacity.",
}
def _stakeholders(self) -> List[Dict[str, str]]:
items = [
{
"name": "Tower Central",
"objective": "Safety, throughput, fairness, and emergency priority.",
},
{
"name": "Airport Ops",
"objective": "Keep gates, stands, fuel, crews, and runways from deadlocking.",
},
]
for airline in self.airlines.values():
if self.stage == 1:
objective = "Keep aircraft and crew rotations legal while minimizing operational delay."
elif self.stage == 2:
objective = "Protect passenger connections, satisfaction, and emergency priority."
else:
objective = "Protect cash, reputation, slot access, and passenger satisfaction."
items.append(
{
"name": airline.name,
"objective": objective,
}
)
return items
def pending_decisions(self) -> List[Dict[str, Any]]:
decisions = []
for flight in self.flights.values():
if flight.status != "scheduled":
continue
if flight.scheduled_dep <= self.time and self.time >= flight.hold_until:
aircraft = self.aircraft[flight.aircraft_id]
crew = self.crew[flight.crew_id]
decisions.append(
{
"flight_id": flight.flight_id,
"airline": flight.airline,
"origin": flight.origin,
"destination": flight.destination,
"scheduled_dep": flight.scheduled_dep,
"scheduled_arr": flight.scheduled_arr,
"delay_minutes": flight.dep_delay(self.time),
"passengers": flight.passengers,
"priority": flight.priority,
"aircraft_ready": aircraft.is_available(self.time, flight.origin),
"crew_ready": crew.is_available(self.time, flight.origin),
"weather_delay": self.airports[flight.origin].weather_delay,
}
)
return sorted(decisions, key=lambda item: (-int(item["priority"] == "emergency"), item["delay_minutes"]))
def metrics(self) -> Dict[str, Any]:
flights = list(self.flights.values())
arrived = [flight for flight in flights if flight.status == "arrived"]
cancelled = [flight for flight in flights if flight.status == "cancelled"]
active = [flight for flight in flights if flight.status in {"scheduled", "airborne"}]
total_dep_delay = sum(flight.dep_delay(self.time) for flight in flights)
total_arr_delay = sum(flight.arr_delay() for flight in arrived)
stranded = sum(
group.count
for flight in flights
for group in flight.passenger_groups
if group.stranded or flight.status == "cancelled"
)
satisfaction_values = [
group.satisfaction for flight in flights for group in flight.passenger_groups
]
airline_cash = {code: round(airline.cash, 2) for code, airline in self.airlines.items()}
return {
"flights_total": len(flights),
"flights_arrived": len(arrived),
"flights_cancelled": len(cancelled),
"flights_active": len(active),
"total_dep_delay": total_dep_delay,
"total_arr_delay": total_arr_delay,
"stranded_passengers": stranded,
"avg_satisfaction": round(
sum(satisfaction_values) / max(1, len(satisfaction_values)), 2
),
"airline_cash": airline_cash,
}
def _activate_disruptions(self, reward: RewardBreakdown) -> List[Any]:
active_now = []
for disruption in self.disruptions:
if disruption.resolved:
continue
if disruption.time <= self.time < disruption.time + disruption.duration:
if not disruption.active:
disruption.active = True
self._apply_disruption_start(disruption)
reward.action_validity_score += 1.0
active_now.append(disruption)
elif disruption.active and self.time >= disruption.time + disruption.duration:
disruption.active = False
disruption.resolved = True
self._apply_disruption_end(disruption)
return active_now
def _apply_disruption_start(self, disruption: Any) -> None:
if disruption.kind == "weather" and disruption.target in self.airports:
self.airports[disruption.target].weather_delay += 15 * disruption.severity
elif disruption.kind == "runway_closure" and disruption.target in self.airports:
self.airports[disruption.target].runway_closed_until = disruption.time + disruption.duration
elif disruption.kind == "gate_block" and disruption.target in self.airports:
self.airports[disruption.target].gate_blocked_until = disruption.time + disruption.duration
elif disruption.kind == "aircraft_fault" and disruption.target in self.aircraft:
self.aircraft[disruption.target].broken_until = disruption.time + disruption.duration
elif disruption.kind == "crew_timeout" and disruption.target in self.crew:
self.crew[disruption.target].duty_end = min(
self.crew[disruption.target].duty_end, disruption.time
)
elif disruption.kind == "emergency" and disruption.target in self.airports:
self.airports[disruption.target].emergency_priority = True
for flight in self.flights.values():
if flight.destination == disruption.target and flight.status == "airborne":
flight.priority = "emergency"
def _apply_disruption_end(self, disruption: Any) -> None:
if disruption.kind == "weather" and disruption.target in self.airports:
self.airports[disruption.target].weather_delay = max(
0, self.airports[disruption.target].weather_delay - 15 * disruption.severity
)
elif disruption.kind == "emergency" and disruption.target in self.airports:
self.airports[disruption.target].emergency_priority = False
def _apply_action(self, action: Dict[str, Any]) -> RewardBreakdown:
reward = RewardBreakdown()
kind = action.get("action")
flight_id = action.get("flight_id")
flight = self.flights.get(flight_id) if flight_id else None
allowed = {
"depart",
"hold",
"cancel",
"swap_aircraft",
"protect_connection",
"reroute",
"request_maintenance",
"allocate_compensation",
"negotiate_slot",
}
if kind not in allowed:
reward.action_validity_score -= 8
return reward
if flight is None:
reward.action_validity_score -= 6
return reward
if kind == "depart":
return self._depart(flight)
if kind == "hold":
minutes = int(action.get("minutes", self.step_minutes))
flight.hold_until = max(flight.hold_until, self.time + max(self.step_minutes, minutes))
flight.delay_reason = action.get("reason", "held by controller")
reward.delay_score -= 0.04 * flight.passengers * (minutes / self.step_minutes)
reward.action_validity_score += 0.2
return reward
if kind == "cancel":
return self._cancel(flight, action.get("reason", "cancelled by controller"))
if kind == "swap_aircraft":
new_aircraft_id = action.get("aircraft_id")
return self._swap_aircraft(flight, new_aircraft_id)
if kind == "protect_connection":
return self._protect_connection(flight)
if kind == "reroute":
return self._reroute(flight, str(action.get("destination", "")), action.get("reason", "network recovery"))
if kind == "request_maintenance":
return self._request_maintenance(flight)
if kind == "allocate_compensation":
return self._allocate_compensation(flight, float(action.get("amount", 0)))
if kind == "negotiate_slot":
return self._negotiate_slot(flight, float(action.get("bid", 0)), str(action.get("promise", "")))
return reward
def _depart(self, flight: Flight) -> RewardBreakdown:
reward = RewardBreakdown()
if flight.status != "scheduled":
reward.action_validity_score -= 5
return reward
origin = self.airports[flight.origin]
destination = self.airports[flight.destination]
aircraft = self.aircraft[flight.aircraft_id]
crew = self.crew[flight.crew_id]
departure_capacity = max(1, origin.available_runways(self.time) * 2)
if self._departure_usage.get(flight.origin, 0) >= departure_capacity:
reward.safety_score -= 60
reward.action_validity_score -= 10
return reward
if not aircraft.is_available(self.time, flight.origin):
reward.safety_score -= 70
reward.action_validity_score -= 10
return reward
if not crew.is_available(self.time, flight.origin):
reward.safety_score -= 50
reward.action_validity_score -= 8
return reward
delay = max(0, self.time - flight.scheduled_dep)
weather_delay = origin.weather_delay + destination.weather_delay
flight.status = "airborne"
flight.actual_dep = self.time
flight.eta = self.time + flight.duration + weather_delay
self._departure_usage[flight.origin] = self._departure_usage.get(flight.origin, 0) + 1
aircraft.airport = "AIRBORNE"
aircraft.available_at = flight.eta + 35
crew.airport = "AIRBORNE"
crew.available_at = flight.eta + 45
reward.delay_score += max(2.0, 15.0 - delay * 0.05)
reward.passenger_score -= delay * flight.passengers * 0.002
reward.money_score -= delay * self.airlines[flight.airline].operating_cost_per_minute * 0.02
reward.action_validity_score += 2
return reward
def _cancel(self, flight: Flight, reason: str) -> RewardBreakdown:
reward = RewardBreakdown()
if flight.status in {"arrived", "cancelled"}:
reward.action_validity_score -= 8
return reward
flight.status = "cancelled"
flight.cancellations_reason = reason
for group in flight.passenger_groups:
group.stranded = True
group.satisfaction = max(0, group.satisfaction - 70)
airline = self.airlines[flight.airline]
compensation = flight.passengers * airline.compensation_per_stranded_passenger
airline.cash -= compensation
airline.reputation = max(0, airline.reputation - 4)
reward.delay_score -= 30
reward.passenger_score -= flight.passengers * 0.9
reward.money_score -= compensation / 10_000
return reward
def _swap_aircraft(self, flight: Flight, aircraft_id: Optional[str]) -> RewardBreakdown:
reward = RewardBreakdown()
aircraft = self.aircraft.get(aircraft_id or "")
if aircraft is None or aircraft.airline != flight.airline:
reward.action_validity_score -= 6
return reward
if not aircraft.is_available(self.time, flight.origin):
reward.action_validity_score -= 4
return reward
old_aircraft = self.aircraft[flight.aircraft_id]
flight.aircraft_id = aircraft.aircraft_id
old_aircraft.available_at = max(old_aircraft.available_at, self.time + 30)
reward.action_validity_score += 3
reward.money_score -= 3
return reward
def _reroute(self, flight: Flight, destination: str, reason: str) -> RewardBreakdown:
reward = RewardBreakdown()
if flight.status != "scheduled" or destination not in self.airports or destination == flight.origin:
reward.action_validity_score -= 6
return reward
route = (flight.origin, destination)
if route not in ROUTE_MINUTES:
reward.action_validity_score -= 5
return reward
old_destination = flight.destination
flight.destination = destination
flight.scheduled_arr = flight.scheduled_dep + ROUTE_MINUTES[route]
flight.delay_reason = f"rerouted from {old_destination}: {reason}"
reward.delay_score += 4 if self.airports[old_destination].weather_delay > 0 else -2
reward.passenger_score -= flight.passengers * 0.03
reward.money_score -= 8
reward.action_validity_score += 2
return reward
def _request_maintenance(self, flight: Flight) -> RewardBreakdown:
reward = RewardBreakdown()
aircraft = self.aircraft[flight.aircraft_id]
if self.time < aircraft.broken_until:
repaired_at = max(self.time + 45, aircraft.broken_until - 45)
aircraft.broken_until = repaired_at
reward.safety_score += 8
reward.money_score -= 18
reward.action_validity_score += 2
else:
reward.action_validity_score -= 2
reward.money_score -= 4
return reward
def _allocate_compensation(self, flight: Flight, amount: float) -> RewardBreakdown:
reward = RewardBreakdown()
if amount <= 0:
reward.action_validity_score -= 4
return reward
airline = self.airlines[flight.airline]
capped_amount = min(amount, 9000)
airline.cash -= capped_amount * flight.passengers
for group in flight.passenger_groups:
group.satisfaction = min(100, group.satisfaction + min(18, capped_amount / 600))
reward.passenger_score += flight.passengers * min(0.18, capped_amount / 50_000)
reward.money_score -= capped_amount * flight.passengers / 18_000
reward.action_validity_score += 1
return reward
def _negotiate_slot(self, flight: Flight, bid: float, promise: str) -> RewardBreakdown:
reward = RewardBreakdown()
if self.stage < 3:
reward.action_validity_score -= 2
return reward
airline = self.airlines[flight.airline]
bid = max(0, min(50_000, bid))
airline.cash -= bid
if "passenger" in promise or "delay" in promise:
reward.fairness_score += 1.5
reward.passenger_score += 1.0
else:
reward.fairness_score -= 2.5
reward.money_score -= bid / 20_000
reward.action_validity_score += 1
return reward
def _protect_connection(self, flight: Flight) -> RewardBreakdown:
reward = RewardBreakdown()
protected = 0
for group in flight.passenger_groups:
if group.priority in {"connection", "priority"}:
group.satisfaction = min(100, group.satisfaction + 12)
protected += group.count
reward.passenger_score += protected * 0.15
reward.money_score -= protected * 0.03
return reward
def _process_arrivals(self) -> RewardBreakdown:
reward = RewardBreakdown()
gate_load = self._gate_load()
for flight in self.flights.values():
if flight.status != "airborne" or flight.eta is None or flight.eta > self.time:
continue
destination = self.airports[flight.destination]
gate_capacity = destination.available_gates(self.time)
if gate_load.get(flight.destination, 0) >= gate_capacity:
flight.eta += self.step_minutes
reward.delay_score -= flight.passengers * 0.04
reward.passenger_score -= flight.passengers * 0.03
continue
flight.status = "arrived"
flight.actual_arr = self.time
aircraft = self.aircraft[flight.aircraft_id]
crew = self.crew[flight.crew_id]
aircraft.airport = flight.destination
crew.airport = flight.destination
aircraft.available_at = self.time + 35
crew.available_at = self.time + 45
gate_load[flight.destination] = gate_load.get(flight.destination, 0) + 1
arrival_delay = flight.arr_delay()
reward.delay_score += max(1.0, 12.0 - arrival_delay * 0.04)
reward.passenger_score += max(0.0, 8.0 - arrival_delay * 0.02)
return reward
def _gate_load(self) -> Dict[str, int]:
load = {code: 0 for code in self.airports}
for flight in self.flights.values():
if flight.status == "arrived" and flight.actual_arr is not None:
if self.time - flight.actual_arr <= 45:
load[flight.destination] = load.get(flight.destination, 0) + 1
return load
def _process_operational_pressure(self) -> RewardBreakdown:
reward = RewardBreakdown()
airline_delay = {code: 0 for code in self.airlines}
for flight in self.flights.values():
if flight.status != "scheduled":
continue
delay = max(0, self.time - flight.scheduled_dep)
if delay <= 0:
continue
airline_delay[flight.airline] += delay
reward.delay_score -= min(12, delay * 0.03)
for group in flight.passenger_groups:
hit = 0.5 if group.priority == "standard" else 0.9
group.satisfaction = max(0, group.satisfaction - hit)
if delay > 120 and group.priority == "connection":
group.stranded = True
reward.passenger_score -= group.count * 0.2
reward.passenger_score -= flight.passengers * 0.005
reward.money_score -= self.airlines[flight.airline].operating_cost_per_minute * 0.01
for disruption in self.disruptions:
if not disruption.active:
continue
if disruption.kind == "demand_shock" and disruption.target == flight.origin:
reward.passenger_score -= flight.passengers * 0.008 * disruption.severity
elif disruption.kind == "fuel_delay" and disruption.target == flight.origin:
reward.money_score -= 2.2 * disruption.severity
elif disruption.kind == "slot_conflict" and disruption.target == flight.origin:
reward.fairness_score -= max(0, delay - 30) * 0.006 * disruption.severity
if self.stage >= 3 and airline_delay:
values = list(airline_delay.values())
spread = max(values) - min(values)
reward.fairness_score -= spread * 0.01
return reward
def _terminal_reward(self) -> RewardBreakdown:
reward = RewardBreakdown()
metrics = self.metrics()
reward.delay_score -= metrics["total_dep_delay"] * 0.01
reward.passenger_score -= metrics["stranded_passengers"] * 0.4
reward.passenger_score += metrics["avg_satisfaction"] * 0.5
reward.money_score += sum(metrics["airline_cash"].values()) / 200_000
reward.safety_score += 50
return reward
def _all_flights_terminal(self) -> bool:
return all(flight.status in {"arrived", "cancelled"} for flight in self.flights.values())
def find_available_aircraft(self, flight: Flight) -> Optional[Aircraft]:
for aircraft in self.aircraft.values():
if aircraft.airline == flight.airline and aircraft.is_available(self.time, flight.origin):
return aircraft
return None
def find_available_crew(self, flight: Flight) -> Optional[Crew]:
for crew in self.crew.values():
if crew.airline == flight.airline and crew.is_available(self.time, flight.origin):
return crew
return None
def _agent_messages(
self, actions: List[Dict[str, Any]], active_disruptions: List[Any]
) -> List[Dict[str, str]]:
messages: List[Dict[str, str]] = []
for disruption in active_disruptions[:3]:
target = disruption.target
messages.append(
{
"from": "Tower Central",
"to": "Network Ops",
"type": "alert",
"message": f"{disruption.kind.replace('_', ' ').title()} active at {target}: {disruption.message}",
}
)
if self.stage >= 3 and disruption.kind == "slot_conflict":
for airline in self.airlines.values():
messages.append(
{
"from": airline.name,
"to": "Tower Central",
"type": "slot_bid",
"message": (
f"{airline.name} requests priority slots at {target}; "
"we can protect connections but need a fast departure bank."
),
}
)
if disruption.kind in {"fuel_delay", "demand_shock"} and disruption.target in self.airports:
messages.append(
{
"from": "Airport Ops",
"to": "Airline Agents",
"type": "constraint",
"message": (
f"{self.airports[disruption.target].city} constraint is live. "
"Choose between holding, rerouting, compensation, and slot negotiation."
),
}
)
for action in actions[:8]:
flight_id = str(action.get("flight_id", "network"))
flight = self.flights.get(flight_id)
airline = self.airlines.get(flight.airline).name if flight else "Airline Ops"
kind = str(action.get("action", "action")).replace("_", " ")
if action.get("action") == "depart" and flight:
message = f"Cleared {flight_id} from {flight.origin} to {flight.destination}."
elif action.get("action") == "hold":
message = f"Holding {flight_id} for {action.get('minutes', 15)} minutes: {action.get('reason', 'operational recovery')}."
elif action.get("action") == "swap_aircraft":
message = f"Swapping aircraft for {flight_id} to {action.get('aircraft_id')}."
elif action.get("action") == "protect_connection":
message = f"Protect connection groups on {flight_id}; passenger impact is being prioritized."
elif action.get("action") == "cancel":
message = f"Cancel {flight_id}; compensation and stranded passenger penalties will apply."
else:
message = f"{kind.title()} requested for {flight_id}."
messages.append(
{
"from": airline,
"to": "Tower Central",
"type": str(action.get("action", "decision")),
"message": message,
}
)
return messages
def minutes_to_clock(minutes: int) -> str:
hours = (minutes // 60) % 24
mins = minutes % 60
return f"{hours:02d}:{mins:02d}"