Spaces:
Sleeping
Sleeping
| """ | |
| Full downstream cascade chain analysis: A→DFW→B→C→D→... | |
| For each A→DFW→B sequence (tail-matched), follows the SAME aircraft forward | |
| through any subsequent legs on the same day: B→C→D→E→... | |
| Measures how far and how severely the delay cascade propagates downstream. | |
| Requires: | |
| - data/raw/bts_full_{year}.parquet (from download_bts_full.py) | |
| - data/raw/bts_all_dfw_{year}.parquet (existing, to identify A→DFW→B seed sequences) | |
| Output: data/processed/cascade_chain_features.parquet | |
| Features per (airport_A, airport_B, Month, Year): | |
| cc_chain_depth_mean — avg downstream legs observed after DFW→B | |
| cc_chain_depth_max — max downstream chain depth observed | |
| cc_total_delay_mean — avg total LateAircraftDelay across ALL downstream legs (min) | |
| cc_total_delay_p75 — p75 total downstream delay | |
| cc_cascade_rate — fraction of sequences where ≥1 downstream leg is late-aircraft | |
| cc_recovery_rate — fraction where delay fully recovers (downstream leg has 0 delay) | |
| cc_amplification_mean — total_downstream_delay / B_arr_delay (>1 = amplified) | |
| cc_affected_airports_mean — avg unique airports hit downstream per cascade | |
| cc_max_single_leg_delay — avg of worst single downstream leg delay | |
| Run: | |
| conda run -n aadata python src/cascade_chain_features.py | |
| conda run -n aadata python src/cascade_chain_features.py --max-depth 6 | |
| """ | |
| import os | |
| import glob | |
| import argparse | |
| import numpy as np | |
| import pandas as pd | |
| RAW_DIR = os.path.join(os.path.dirname(__file__), "..", "data", "raw") | |
| PROC_DIR = os.path.join(os.path.dirname(__file__), "..", "data", "processed") | |
| TURNAROUND_MIN = 30 # min gap inbound arr → outbound dep (minutes) | |
| TURNAROUND_MAX = 240 # max gap A→DFW (4 hrs) | |
| LEG_GAP_MIN = 15 # min gap between any two legs in cascade chain | |
| LEG_GAP_MAX = 360 # max gap between cascade legs (6 hrs) | |
| LATE_THRESHOLD = 15 # minutes to count as a late aircraft delay event | |
| def _parse_hhmm_to_min(series: pd.Series) -> pd.Series: | |
| s = series.fillna(-1).astype(int) | |
| valid = s >= 0 | |
| h = (s // 100).clip(0, 23) | |
| m = (s % 100).clip(0, 59) | |
| return (h * 60 + m).where(valid, other=np.nan) | |
| def _resolve_files() -> dict: | |
| """Map year → (dfw_file, full_file). Only years with both files are processed.""" | |
| dfw_files = {} | |
| full_files = {} | |
| for f in glob.glob(os.path.join(RAW_DIR, "bts_all_dfw_*.parquet")): | |
| yr = int(os.path.basename(f).split("_")[-1].replace(".parquet", "")) | |
| dfw_files[yr] = f | |
| for f in glob.glob(os.path.join(RAW_DIR, "bts_full_*.parquet")): | |
| yr = int(os.path.basename(f).split("_")[-1].replace(".parquet", "")) | |
| full_files[yr] = f | |
| common = sorted(set(dfw_files) & set(full_files)) | |
| if not common: | |
| print("No years with both bts_all_dfw_* and bts_full_* files found.") | |
| print("Run download_bts_full.py first.") | |
| return {} | |
| return {yr: (dfw_files[yr], full_files[yr]) for yr in common} | |
| def build_year_cascade(year: int, dfw_path: str, full_path: str, | |
| max_depth: int = 8) -> pd.DataFrame: | |
| print(f" {year}: loading data...") | |
| # --- Load DFW data: find A→DFW→B seed sequences (tail-matched) --- | |
| dfw = pd.read_parquet(dfw_path) | |
| dfw = dfw[dfw["Cancelled"] != 1].copy() | |
| dfw["dep_min"] = _parse_hhmm_to_min(dfw["DepTime"]) | |
| dfw["block"] = dfw["ActualElapsedTime"].fillna(dfw["CRSElapsedTime"]).fillna(0) | |
| dfw["arr_min"] = dfw["dep_min"] + dfw["block"] | |
| dfw = dfw.dropna(subset=["dep_min", "Tail_Number"]) | |
| dfw = dfw[dfw["Tail_Number"].str.strip() != ""] | |
| # Inbound A→DFW | |
| ib = dfw[dfw["Dest"] == "DFW"][ | |
| ["Tail_Number", "FlightDate", "Month", "Origin", "arr_min", "dep_min"] | |
| ].rename(columns={"Origin": "airport_A", "arr_min": "arr_min_A", "dep_min": "dep_min_A"}) | |
| # Outbound DFW→B | |
| ob = dfw[dfw["Origin"] == "DFW"][ | |
| ["Tail_Number", "FlightDate", "ArrDelay", "Dest", "dep_min", "arr_min"] | |
| ].rename(columns={"Dest": "airport_B", "dep_min": "dep_min_B", | |
| "arr_min": "arr_min_B", "ArrDelay": "arr_delay_B"}) | |
| ob["arr_delay_B"] = ob["arr_delay_B"].fillna(0) | |
| # Match A→DFW + DFW→B by tail × date with turnaround window | |
| seeds = ib.merge(ob, on=["Tail_Number", "FlightDate"]) | |
| ta = seeds["dep_min_B"] - seeds["arr_min_A"] | |
| seeds = seeds[(ta >= TURNAROUND_MIN) & (ta <= TURNAROUND_MAX)].copy() | |
| if seeds.empty: | |
| print(f" No seed sequences for {year}") | |
| return pd.DataFrame() | |
| print(f" {year}: {len(seeds):,} seed sequences | loading full BTS...") | |
| # --- Load full national BTS: all flights for cascade tracking --- | |
| full = pd.read_parquet(full_path) | |
| full = full[full["Cancelled"] != 1].copy() | |
| full["dep_min"] = _parse_hhmm_to_min(full["DepTime"]) | |
| full["block"] = full["ActualElapsedTime"].fillna(full["CRSElapsedTime"]).fillna(0) | |
| full["arr_min"] = full["dep_min"] + full["block"] | |
| full["late_aircraft_min"] = full["LateAircraftDelay"].fillna(0) | |
| full["late_flag"] = (full["late_aircraft_min"] >= LATE_THRESHOLD).astype(int) | |
| full["arr_delay"] = full["ArrDelay"].fillna(0) | |
| full = full.dropna(subset=["dep_min", "Tail_Number"]) | |
| full = full[full["Tail_Number"].str.strip() != ""] | |
| # Keep only tails that appear in our seed sequences (massive memory saving) | |
| seed_tails = set(seeds["Tail_Number"].unique()) | |
| full = full[full["Tail_Number"].isin(seed_tails)].copy() | |
| print(f" Full BTS filtered to {len(seed_tails):,} seed tails: {len(full):,} flights") | |
| # Sort for chain building | |
| full = full.sort_values(["Tail_Number", "FlightDate", "dep_min"]) | |
| # Build per-tail×date lookup: list of (dep_min, arr_min, dest, late_aircraft_min, arr_delay) | |
| # We'll use this to walk forward from the DFW→B arrival | |
| full_grp = full.groupby(["Tail_Number", "FlightDate"]) | |
| # --- Walk the cascade chain for each seed sequence --- | |
| records = [] | |
| for _, seed in seeds.iterrows(): | |
| tail = seed["Tail_Number"] | |
| date = seed["FlightDate"] | |
| arr_B = seed["arr_min_B"] | |
| arr_dly = seed["arr_delay_B"] | |
| try: | |
| tail_day = full_grp.get_group((tail, date)) | |
| except KeyError: | |
| # Tail not in full data for this date (rare) | |
| continue | |
| # Find all legs AFTER the DFW→B arrival | |
| downstream = tail_day[tail_day["dep_min"] >= arr_B + LEG_GAP_MIN].sort_values("dep_min") | |
| # Walk the chain leg by leg | |
| chain_airports = [] | |
| chain_delays = [] # LateAircraftDelay on each leg | |
| chain_arr_delays = [] # ArrDelay on each leg | |
| current_arr = arr_B | |
| for _, leg in downstream.iterrows(): | |
| gap = leg["dep_min"] - current_arr | |
| if gap < LEG_GAP_MIN or gap > LEG_GAP_MAX: | |
| break # too short (overlap) or too long (crew rest / new duty) | |
| if len(chain_delays) >= max_depth: | |
| break | |
| chain_airports.append(leg["Dest"]) | |
| chain_delays.append(leg["late_aircraft_min"]) | |
| chain_arr_delays.append(leg["arr_delay"]) | |
| current_arr = leg["arr_min"] | |
| depth = len(chain_delays) | |
| total_delay = sum(chain_delays) | |
| any_late = any(d >= LATE_THRESHOLD for d in chain_delays) | |
| recovered = any(d == 0 for d in chain_delays) if chain_delays else False | |
| n_airports = len(set(chain_airports)) | |
| max_leg_delay = max(chain_delays) if chain_delays else 0 | |
| amplif = (total_delay / max(arr_dly, 1)) if arr_dly > 5 else np.nan | |
| records.append({ | |
| "Tail_Number": tail, | |
| "FlightDate": date, | |
| "airport_A": seed["airport_A"], | |
| "airport_B": seed["airport_B"], | |
| "Month": seed["Month"], | |
| "Year": year, | |
| "depth": depth, | |
| "total_delay": total_delay, | |
| "any_late": int(any_late), | |
| "recovered": int(recovered), | |
| "n_airports": n_airports, | |
| "max_leg_delay": max_leg_delay, | |
| "amplification": amplif, | |
| }) | |
| if not records: | |
| print(f" No cascade chains built for {year}") | |
| return pd.DataFrame() | |
| chains = pd.DataFrame(records) | |
| n_cascaded = chains["any_late"].sum() | |
| print(f" {year}: {len(chains):,} chains | " | |
| f"{n_cascaded/len(chains):.1%} with downstream cascade | " | |
| f"avg depth {chains['depth'].mean():.1f}") | |
| # --- Aggregate to (airport_A, airport_B, Month, Year) --- | |
| agg = ( | |
| chains.groupby(["airport_A", "airport_B", "Month", "Year"]) | |
| .agg( | |
| cc_n_chains = ("depth", "count"), | |
| cc_chain_depth_mean = ("depth", "mean"), | |
| cc_chain_depth_max = ("depth", "max"), | |
| cc_total_delay_mean = ("total_delay", "mean"), | |
| cc_total_delay_p75 = ("total_delay", lambda x: x.quantile(0.75)), | |
| cc_cascade_rate = ("any_late", "mean"), | |
| cc_recovery_rate = ("recovered", "mean"), | |
| cc_amplification_mean = ("amplification", "mean"), | |
| cc_affected_airports_mean= ("n_airports", "mean"), | |
| cc_max_single_leg_delay = ("max_leg_delay", "mean"), | |
| ) | |
| .reset_index() | |
| ) | |
| print(f" → {len(agg):,} pair×month rows") | |
| return agg | |
| def main(max_depth: int = 8): | |
| year_files = _resolve_files() | |
| if not year_files: | |
| return | |
| print(f"Building cascade chain features for years: {sorted(year_files)}") | |
| print(f"Max cascade depth: {max_depth} legs\n") | |
| frames = [] | |
| for year, (dfw_path, full_path) in year_files.items(): | |
| try: | |
| agg = build_year_cascade(year, dfw_path, full_path, max_depth) | |
| if not agg.empty: | |
| frames.append(agg) | |
| except Exception as e: | |
| print(f" ERROR {year}: {e}") | |
| if not frames: | |
| print("No cascade features produced.") | |
| return | |
| result = pd.concat(frames, ignore_index=True) | |
| out = os.path.join(PROC_DIR, "cascade_chain_features.parquet") | |
| result.to_parquet(out, index=False) | |
| print(f"\nTotal: {len(result):,} pair×month×year rows → {out}") | |
| print("\nFeature summary:") | |
| cc_cols = [c for c in result.columns if c.startswith("cc_")] | |
| print(result[cc_cols].describe().T[["mean", "50%", "max"]].round(3).to_string()) | |
| print("\nNext: add CC_FEATURES to model_lgbm.py TAIL_CHAIN_FEATURES list and retrain.") | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--max-depth", type=int, default=8, | |
| help="Max number of downstream legs to track (default: 8)") | |
| args = parser.parse_args() | |
| main(args.max_depth) | |