Spaces:
Sleeping
Sleeping
| """ | |
| Tail-chain feature engineering. | |
| Same aircraft tail on consecutive legs → likely same crew. For each A→DFW→B | |
| sequence where both legs share a tail number (same FlightDate), computes: | |
| legs_before — legs flown by this tail before the A→DFW leg today | |
| block_min_before — block time (min) accumulated before A→DFW leg | |
| duty_start_hour — first dep hour of tail today (proxy for report time) | |
| total_duty_min — full window: tail's first dep → DFW→B scheduled arr | |
| fdp_utilization — total_duty_min / FAA Part 117 FDP limit (0–1.5 clipped) | |
| legs_after — legs on same tail after DFW→B (scheduling pressure) | |
| crosses_wocl — 1 if duty window overlaps FAA WOCL (0200–0559) | |
| legs_in_day — total legs on this tail today | |
| Aggregated to (airport_A, airport_B, Month, Year) to match sequence_features.parquet. | |
| Note: same-FlightDate matching only. Overnight arrivals (e.g. HNL→DFW dep | |
| evening, arr next morning) are on a different FlightDate and not chained — | |
| affects ~10% of sequences; acceptable for monthly aggregate features. | |
| Run: | |
| conda run -n aadata python src/tail_chain_features.py | |
| """ | |
| import os | |
| import glob | |
| import numpy as np | |
| import pandas as pd | |
| RAW_DIR = os.path.join(os.path.dirname(__file__), "..", "data", "raw") | |
| PROC_DIR = os.path.join(os.path.dirname(__file__), "..", "data", "processed") | |
| TURNAROUND_MIN = 30 # min gap between A arr and B dep at DFW | |
| TURNAROUND_MAX = 240 # max gap (4 hrs) | |
| WOCL_START = 2 * 60 # 0200 in minutes | |
| WOCL_END = 6 * 60 # 0600 in minutes | |
| # --------------------------------------------------------------------------- | |
| # FAA Part 117 simplified FDP table | |
| # --------------------------------------------------------------------------- | |
| # Table B (acclimated crew, no augmentation) — max FDP by report hour and | |
| # total segments in duty period. Rounded to nearest 30 min for simplicity. | |
| # Source: 14 CFR Part 117.23 Table B | |
| _FDP_TABLE = { | |
| # (report_hour_bin) -> [1seg, 2seg, 3seg, 4seg, 5+seg] | |
| # report 0000-0459: WOCL overlap → shorter limits | |
| "wocl": [9.0, 9.0, 9.0, 9.0, 9.0], | |
| # report 0500-0759: morning starts | |
| "morning": [9.0, 9.0, 9.0, 9.0, 9.0], | |
| # report 0800-1359: midday — longest FDP allowed | |
| "midday": [9.0, 10.0, 10.0, 10.5, 11.0], | |
| # report 1400-1759: afternoon | |
| "aftnoon": [9.0, 9.5, 9.5, 10.0, 10.0], | |
| # report 1800-2359: evening — WOCL approaches | |
| "evening": [9.0, 9.0, 9.0, 9.0, 9.0], | |
| } | |
| def fdp_limit_hours(duty_start_hour: int, total_legs: int) -> float: | |
| if duty_start_hour < 5: | |
| key = "wocl" | |
| elif duty_start_hour < 8: | |
| key = "morning" | |
| elif duty_start_hour < 14: | |
| key = "midday" | |
| elif duty_start_hour < 18: | |
| key = "aftnoon" | |
| else: | |
| key = "evening" | |
| seg_idx = min(total_legs - 1, 4) | |
| return _FDP_TABLE[key][seg_idx] | |
| # Vectorised version for DataFrame apply | |
| def _fdp_vec(start_hrs: pd.Series, legs: pd.Series) -> pd.Series: | |
| conditions = [ | |
| start_hrs < 5, | |
| start_hrs < 8, | |
| start_hrs < 14, | |
| start_hrs < 18, | |
| ] | |
| keys = ["wocl", "morning", "midday", "aftnoon"] | |
| result = pd.Series("evening", index=start_hrs.index) | |
| for cond, k in zip(reversed(conditions), reversed(keys)): | |
| result[cond] = k | |
| seg_idx = (legs - 1).clip(0, 4).astype(int) | |
| return result.map(_FDP_TABLE).combine(seg_idx, lambda table, idx: table[idx]) | |
| # --------------------------------------------------------------------------- | |
| # Core: build tail chains for one year's raw BTS file | |
| # --------------------------------------------------------------------------- | |
| def _parse_hhmm_to_min(series: pd.Series) -> pd.Series: | |
| """HHMM integer → minutes since midnight. Fills NaN with -1 (invalid).""" | |
| s = series.fillna(-1).astype(int) | |
| valid = s >= 0 | |
| h = (s // 100).clip(0, 23) | |
| m = (s % 100).clip(0, 59) | |
| result = (h * 60 + m).where(valid, other=np.nan) | |
| return result | |
| def build_year_chain_features(fpath: str) -> pd.DataFrame: | |
| year = int(os.path.basename(fpath).split("_")[-1].replace(".parquet", "")) | |
| print(f" Processing {year}...") | |
| df = pd.read_parquet(fpath) | |
| df["FlightDate"] = pd.to_datetime(df["FlightDate"]) | |
| # Drop cancelled flights (no actual times) and rows without tail numbers | |
| df = df[(df["Cancelled"] != 1) & df["Tail_Number"].notna()].copy() | |
| df = df[df["Tail_Number"].str.strip() != ""].copy() | |
| # Actual dep/arr in minutes since midnight | |
| df["dep_min"] = _parse_hhmm_to_min(df["DepTime"]) | |
| df["block"] = df["ActualElapsedTime"].fillna(df["CRSElapsedTime"]).fillna(0) | |
| df["arr_min"] = df["dep_min"] + df["block"] # avoids midnight-crossing issues | |
| # Drop rows where we can't compute dep_min | |
| df = df.dropna(subset=["dep_min"]) | |
| # Sort chronologically within each tail×day | |
| df = df.sort_values(["Tail_Number", "FlightDate", "dep_min"]) | |
| # --- Per-tail×day aggregate features --- | |
| grp = df.groupby(["Tail_Number", "FlightDate"]) | |
| # Cumulative block time BEFORE each flight (shift within group) | |
| df["cum_block_before"] = grp["block"].cumsum() - df["block"] | |
| # Leg rank (0 = first flight of day) | |
| df["leg_rank"] = grp.cumcount() | |
| # Total legs in the day | |
| df["legs_in_day"] = grp["block"].transform("count") | |
| # Duty start = dep_min of first flight of day on this tail | |
| df["duty_start_min"] = grp["dep_min"].transform("min") | |
| df["duty_start_hour"] = (df["duty_start_min"] // 60).astype(int) | |
| # --- Inbound: A → DFW --- | |
| ib = df[df["Dest"] == "DFW"].copy() | |
| ib = ib.rename(columns={ | |
| "Origin": "airport_A", | |
| "leg_rank": "leg_rank_A", | |
| "cum_block_before": "block_min_before", | |
| "arr_min": "arr_min_A", | |
| "dep_min": "dep_min_A", | |
| }) | |
| ib["legs_before"] = ib["leg_rank_A"] # legs before this inbound | |
| # --- Outbound: DFW → B --- | |
| ob = df[df["Origin"] == "DFW"].copy() | |
| ob = ob.rename(columns={ | |
| "Dest": "airport_B", | |
| "dep_min": "dep_min_B", | |
| "arr_min": "arr_min_B", | |
| "leg_rank": "leg_rank_B", | |
| "ArrDelay": "arr_delay_B", | |
| }) | |
| ob["arr_delay_B"] = ob["arr_delay_B"].fillna(0) | |
| ob["legs_after"] = ob["legs_in_day"] - ob["leg_rank_B"] - 1 | |
| # --- C leg pool: B → DFW (cascade downstream leg, IS in our DFW-filtered data) --- | |
| # These are the flights departing from airport B back to DFW. | |
| # LateAircraftDelay on this leg = cascade FROM the DFW→B arrival. | |
| c_pool = df[df["Dest"] == "DFW"].copy() | |
| c_pool = c_pool.rename(columns={ | |
| "Origin": "airport_C_origin", # == airport_B in cascade join | |
| "dep_min": "dep_min_C", | |
| "arr_min": "arr_min_C", | |
| "LateAircraftDelay": "c_late_aircraft_min", | |
| "ArrDelay": "c_arr_delay", | |
| }) | |
| c_pool["c_late_aircraft_min"] = c_pool["c_late_aircraft_min"].fillna(0) | |
| c_pool["c_late_flag"] = (c_pool["c_late_aircraft_min"] >= 15).astype(int) | |
| c_pool["c_arr_delay"] = c_pool["c_arr_delay"].fillna(0) | |
| # --- Match inbound + outbound by tail × date --- | |
| merge_keys = ["Tail_Number", "FlightDate", "Month", | |
| "duty_start_min", "duty_start_hour", "legs_in_day"] | |
| ib_cols = merge_keys + ["airport_A", "arr_min_A", "block_min_before", "legs_before"] | |
| ob_cols = merge_keys + ["airport_B", "dep_min_B", "arr_min_B", | |
| "arr_delay_B", "legs_after", "leg_rank_B"] | |
| pairs = ib[ib_cols].merge(ob[ob_cols], on=merge_keys, how="inner") | |
| # Turnaround filter: outbound must depart 30–240 min after inbound arrives | |
| ta = pairs["dep_min_B"] - pairs["arr_min_A"] | |
| pairs = pairs[(ta >= TURNAROUND_MIN) & (ta <= TURNAROUND_MAX)].copy() | |
| if pairs.empty: | |
| print(f" No matched pairs for {year}") | |
| return pd.DataFrame() | |
| # --- Match downstream C leg: B → DFW on same tail, 30–360 min after B arrival --- | |
| c_join_keys = ["Tail_Number", "FlightDate"] | |
| c_cols = c_join_keys + ["airport_C_origin", "dep_min_C", | |
| "c_late_aircraft_min", "c_late_flag", "c_arr_delay"] | |
| cascade = pairs[c_join_keys + ["airport_B", "arr_min_B"]].merge( | |
| c_pool[c_cols], | |
| on=c_join_keys, | |
| how="left", | |
| ) | |
| # C leg must depart from airport B (not some other airport on same tail) | |
| cascade = cascade[cascade["airport_C_origin"] == cascade["airport_B"]].copy() | |
| # Time window: C departs 30–360 min after B arrives | |
| c_gap = cascade["dep_min_C"] - cascade["arr_min_B"] | |
| cascade = cascade[(c_gap >= 30) & (c_gap <= 360)].copy() | |
| cascade["has_c_leg"] = 1 | |
| # Aggregate C-leg cascade per A→DFW→B pair (take worst-case if multiple C legs) | |
| c_agg = ( | |
| cascade.groupby(c_join_keys + ["airport_B", "arr_min_B"]) | |
| .agg( | |
| has_c_leg = ("has_c_leg", "max"), | |
| c_late_flag = ("c_late_flag", "max"), | |
| c_late_aircraft_min = ("c_late_aircraft_min", "max"), | |
| c_arr_delay = ("c_arr_delay", "max"), | |
| ) | |
| .reset_index() | |
| ) | |
| # Join cascade back to pairs | |
| pairs = pairs.merge( | |
| c_agg[c_join_keys + ["airport_B", "arr_min_B", | |
| "has_c_leg", "c_late_flag", | |
| "c_late_aircraft_min", "c_arr_delay"]], | |
| on=c_join_keys + ["airport_B", "arr_min_B"], | |
| how="left", | |
| ) | |
| pairs["has_c_leg"] = pairs["has_c_leg"].fillna(0) | |
| pairs["c_late_flag"] = pairs["c_late_flag"].fillna(0) | |
| pairs["c_late_aircraft_min"] = pairs["c_late_aircraft_min"].fillna(0) | |
| pairs["c_arr_delay"] = pairs["c_arr_delay"].fillna(0) | |
| # Cascade amplification: downstream LateAircraftDelay / B arrival delay | |
| # (how much of B's delay propagates to next leg) | |
| pairs["cascade_amplification"] = np.where( | |
| pairs["arr_delay_B"] > 5, | |
| (pairs["c_late_aircraft_min"] / pairs["arr_delay_B"].clip(lower=1)).clip(0, 3), | |
| np.nan, | |
| ) | |
| # --- Duty chain derived features --- | |
| pairs["total_duty_min"] = (pairs["arr_min_B"] - pairs["duty_start_min"]).clip(lower=0) | |
| fdp_hours = pairs.apply( | |
| lambda r: fdp_limit_hours(int(r["duty_start_hour"]), int(r["legs_in_day"])), | |
| axis=1, | |
| ) | |
| pairs["fdp_limit_min"] = fdp_hours * 60 | |
| pairs["fdp_utilization"] = (pairs["total_duty_min"] / pairs["fdp_limit_min"]).clip(0, 1.5) | |
| pairs["crosses_wocl"] = ( | |
| (pairs["duty_start_min"] < WOCL_END) & | |
| (pairs["arr_min_B"] > WOCL_START) | |
| ).astype(int) | |
| pairs["fdp_overrun"] = (pairs["fdp_utilization"] > 1.0).astype(int) | |
| # --- Aggregate to (airport_A, airport_B, Month, Year) --- | |
| pairs["Year"] = year | |
| agg = ( | |
| pairs.groupby(["airport_A", "airport_B", "Month", "Year"]) | |
| .agg( | |
| tc_n_pairs = ("total_duty_min", "count"), | |
| tc_legs_before_mean = ("legs_before", "mean"), | |
| tc_block_before_mean = ("block_min_before", "mean"), | |
| tc_duty_start_hour = ("duty_start_hour", "median"), | |
| tc_total_duty_mean = ("total_duty_min", "mean"), | |
| tc_total_duty_p75 = ("total_duty_min", lambda x: x.quantile(0.75)), | |
| tc_fdp_util_mean = ("fdp_utilization", "mean"), | |
| tc_fdp_util_p75 = ("fdp_utilization", lambda x: x.quantile(0.75)), | |
| tc_fdp_overrun_rate = ("fdp_overrun", "mean"), | |
| tc_wocl_rate = ("crosses_wocl", "mean"), | |
| tc_legs_after_mean = ("legs_after", "mean"), | |
| tc_legs_in_day_mean = ("legs_in_day", "mean"), | |
| # Downstream cascade: B → DFW third leg | |
| tc_downstream_rate = ("has_c_leg", "mean"), # fraction with matched C leg | |
| tc_cascade_late_rate = ("c_late_flag", "mean"), # fraction C late (late aircraft) | |
| tc_cascade_late_min = ("c_late_aircraft_min", "mean"), # avg downstream late aircraft min | |
| tc_cascade_amplif_mean = ("cascade_amplification", "mean"), # delay amplification ratio | |
| ) | |
| .reset_index() | |
| ) | |
| n_with_cascade = (pairs["has_c_leg"] > 0).sum() | |
| print(f" {year}: {len(pairs):,} pairs, " | |
| f"{n_with_cascade:,} ({n_with_cascade/len(pairs):.0%}) with B→DFW cascade leg " | |
| f"→ {len(agg):,} pair×month rows") | |
| return agg | |
| # --------------------------------------------------------------------------- | |
| # Airport-level cascade propagation index (no tail matching needed) | |
| # --------------------------------------------------------------------------- | |
| def build_airport_cascade_features(files: list) -> pd.DataFrame: | |
| """ | |
| For each airport × month: how strongly does it propagate incoming delays | |
| to its outbound flights? | |
| Uses LateAircraftDelay on flights departing from each airport (X→DFW legs): | |
| when an inbound flight arrives late at X, the outbound X→DFW leg often | |
| carries LateAircraftDelay. This measures cascade amplification at X. | |
| Features: | |
| ap_cascade_rate — fraction of departures from X with LateAircraftDelay > 15 min | |
| ap_cascade_min_mean — avg LateAircraftDelay on departures from X | |
| ap_cascade_min_p75 — p75 LateAircraftDelay on departures from X | |
| ap_cascade_given_late — conditional: P(late aircraft | inbound was delayed) | |
| """ | |
| print("\nBuilding airport-level cascade propagation features...") | |
| ap_frames = [] | |
| for fpath in files: | |
| year = int(os.path.basename(fpath).split("_")[-1].replace(".parquet", "")) | |
| df = pd.read_parquet(fpath) | |
| df = df[df["Cancelled"] != 1].copy() | |
| # Departures from non-DFW airports heading to DFW (X → DFW) | |
| depart = df[df["Dest"] == "DFW"].copy() | |
| depart["late_aircraft_min"] = depart["LateAircraftDelay"].fillna(0) | |
| depart["late_flag"] = (depart["late_aircraft_min"] >= 15).astype(int) | |
| depart["inbound_late"] = (depart["ArrDelay"].fillna(0) >= 15).astype(int) | |
| depart["airport"] = depart["Origin"] | |
| ap = ( | |
| depart.groupby(["airport", "Month"]) | |
| .agg( | |
| ap_n_flights = ("late_flag", "count"), | |
| ap_cascade_rate = ("late_flag", "mean"), | |
| ap_cascade_min_mean = ("late_aircraft_min", "mean"), | |
| ap_cascade_min_p75 = ("late_aircraft_min", lambda x: x.quantile(0.75)), | |
| # Conditional: P(late aircraft | inbound delayed) | |
| _late_both = ("late_flag", lambda x: | |
| (x & (depart.loc[x.index, "inbound_late"] == 1)).sum()), | |
| _n_inbound_late = ("inbound_late", "sum"), | |
| ) | |
| .reset_index() | |
| ) | |
| # Conditional cascade rate (guard against division by zero) | |
| ap["ap_cascade_given_late"] = np.where( | |
| ap["_n_inbound_late"] > 0, | |
| ap["_late_both"] / ap["_n_inbound_late"], | |
| np.nan, | |
| ) | |
| ap["Year"] = year | |
| ap_frames.append(ap.drop(columns=["_late_both", "_n_inbound_late"])) | |
| ap_all = pd.concat(ap_frames, ignore_index=True) | |
| # Average across years → stable airport×month profile | |
| ap_avg = ( | |
| ap_all.groupby(["airport", "Month"]) | |
| .agg( | |
| ap_cascade_rate = ("ap_cascade_rate", "mean"), | |
| ap_cascade_min_mean = ("ap_cascade_min_mean", "mean"), | |
| ap_cascade_min_p75 = ("ap_cascade_min_p75", "mean"), | |
| ap_cascade_given_late = ("ap_cascade_given_late", "mean"), | |
| ) | |
| .reset_index() | |
| ) | |
| out = os.path.join(PROC_DIR, "airport_cascade_features.parquet") | |
| ap_avg.to_parquet(out, index=False) | |
| print(f"Airport cascade features: {ap_avg.shape} saved → {out}") | |
| print(ap_avg[["ap_cascade_rate","ap_cascade_min_mean","ap_cascade_given_late"]] | |
| .describe().T[["mean","50%","max"]].round(3).to_string()) | |
| return ap_avg | |
| # --------------------------------------------------------------------------- | |
| # Main | |
| # --------------------------------------------------------------------------- | |
| def main(): | |
| all_files = sorted(glob.glob(os.path.join(RAW_DIR, "bts_all_dfw_*.parquet"))) | |
| aa_files = sorted(glob.glob(os.path.join(RAW_DIR, "bts_aa_dfw_*.parquet"))) | |
| year_file = {} | |
| for f in aa_files: | |
| yr = f.split("_")[-1].replace(".parquet", "") | |
| year_file[yr] = f | |
| for f in all_files: | |
| yr = f.split("_")[-1].replace(".parquet", "") | |
| year_file[yr] = f | |
| files = sorted(year_file.values()) | |
| print(f"Processing {len(files)} raw BTS files for tail-chain + cascade features...") | |
| # --- Tail-chain features --- | |
| frames = [] | |
| for fpath in files: | |
| try: | |
| frames.append(build_year_chain_features(fpath)) | |
| except Exception as e: | |
| print(f" ERROR on {fpath}: {e}") | |
| if frames: | |
| result = pd.concat([f for f in frames if not f.empty], ignore_index=True) | |
| out = os.path.join(PROC_DIR, "tail_chain_features.parquet") | |
| result.to_parquet(out, index=False) | |
| print(f"\nTail-chain: {len(result):,} pair×month×year rows → {out}") | |
| tc_cols = [c for c in result.columns if c.startswith("tc_")] | |
| print(result[tc_cols].describe().T[["mean", "50%", "max"]].round(3).to_string()) | |
| # --- Airport-level cascade features --- | |
| build_airport_cascade_features(files) | |
| if __name__ == "__main__": | |
| main() | |